1use crate::PREFIX_USAGE;
2use axum::{Json, Router, extract::Query, routing::get};
3use crabllm_core::{BoxFuture, RequestContext, Storage, storage_key};
4use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6
7pub struct UsageTracker {
8 storage: Arc<dyn Storage>,
9}
10
11impl UsageTracker {
12 pub fn new(_config: &serde_json::Value, storage: Arc<dyn Storage>) -> Result<Self, String> {
13 Ok(Self { storage })
14 }
15
16 pub fn admin_routes(&self) -> Router {
18 let storage = self.storage.clone();
19 Router::new().route(
20 "/v1/admin/usage",
21 get(move |query: Query<AdminUsageQuery>| {
22 let storage = storage.clone();
23 async move { admin_usage_handler(storage, query.0).await }
24 }),
25 )
26 }
27
28 async fn record(
30 &self,
31 principal: &str,
32 model: &str,
33 prompt_tokens: u32,
34 completion_tokens: u32,
35 cache_hit_tokens: u32,
36 ) {
37 let prompt_suffix = format!("{principal}:{model}:p");
38 let completion_suffix = format!("{principal}:{model}:c");
39
40 let _ = self
41 .storage
42 .increment(
43 &storage_key(&PREFIX_USAGE, prompt_suffix.as_bytes()),
44 prompt_tokens as i64,
45 )
46 .await;
47 let _ = self
48 .storage
49 .increment(
50 &storage_key(&PREFIX_USAGE, completion_suffix.as_bytes()),
51 completion_tokens as i64,
52 )
53 .await;
54 if cache_hit_tokens > 0 {
55 let ch_suffix = format!("{principal}:{model}:ch");
56 let _ = self
57 .storage
58 .increment(
59 &storage_key(&PREFIX_USAGE, ch_suffix.as_bytes()),
60 cache_hit_tokens as i64,
61 )
62 .await;
63 }
64 }
65}
66
67impl crabllm_core::Extension for UsageTracker {
68 fn name(&self) -> &str {
69 "usage"
70 }
71
72 fn prefix(&self) -> crabllm_core::Prefix {
73 PREFIX_USAGE
74 }
75
76 fn on_response(
77 &self,
78 ctx: &RequestContext,
79 _raw_request: &[u8],
80 raw_response: &[u8],
81 ) -> BoxFuture<'_, ()> {
82 let usage = crabllm_core::Usage::from(raw_response);
83 if usage.total_tokens() == 0 {
84 return Box::pin(async {});
85 }
86
87 let principal = ctx
88 .principal
89 .clone()
90 .unwrap_or_else(|| "__global".to_string());
91 let model = ctx.model.clone();
92
93 Box::pin(async move {
94 self.record(
95 &principal,
96 &model,
97 usage.prompt_tokens(),
98 usage.completion_tokens(),
99 usage.cache_read_tokens,
100 )
101 .await;
102 })
103 }
104
105 fn on_chunk(&self, ctx: &RequestContext, raw_chunk: &[u8]) -> BoxFuture<'_, ()> {
106 let usage = crabllm_core::Usage::from(raw_chunk);
107 if usage.total_tokens() == 0 {
108 return Box::pin(async {});
109 }
110
111 let principal = ctx
112 .principal
113 .clone()
114 .unwrap_or_else(|| "__global".to_string());
115 let model = ctx.model.clone();
116
117 Box::pin(async move {
118 self.record(
119 &principal,
120 &model,
121 usage.prompt_tokens(),
122 usage.completion_tokens(),
123 usage.cache_read_tokens,
124 )
125 .await;
126 })
127 }
128}
129
130#[derive(Deserialize)]
131struct AdminUsageQuery {
132 name: Option<String>,
133 model: Option<String>,
134}
135
136#[derive(Deserialize)]
137pub struct UserUsageQuery {
138 pub model: Option<String>,
139}
140
141#[derive(Serialize)]
142#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
143pub struct UsageEntry {
144 name: String,
145 model: String,
146 prompt_tokens: i64,
147 completion_tokens: i64,
148 cache_hit_tokens: i64,
149}
150
151pub async fn query_usage(
153 storage: &dyn Storage,
154 name_filter: Option<&str>,
155 model_filter: Option<&str>,
156) -> Json<Vec<UsageEntry>> {
157 let pairs = storage.list(&PREFIX_USAGE).await.unwrap_or_default();
158
159 let mut entries: std::collections::HashMap<(String, String), (i64, i64, i64)> =
160 std::collections::HashMap::new();
161
162 for (raw_key, raw_value) in &pairs {
163 let suffix = match std::str::from_utf8(&raw_key[crabllm_core::PREFIX_LEN..]) {
164 Ok(s) => s,
165 Err(_) => continue,
166 };
167
168 let Some((rest, kind)) = suffix.rsplit_once(':') else {
170 continue;
171 };
172 let Some((principal, model)) = rest.split_once(':') else {
173 continue;
174 };
175
176 if let Some(filter) = name_filter
177 && principal != filter
178 {
179 continue;
180 }
181 if let Some(filter) = model_filter
182 && model != filter
183 {
184 continue;
185 }
186
187 let val = raw_value
188 .get(..8)
189 .and_then(|b| b.try_into().ok())
190 .map(i64::from_le_bytes)
191 .unwrap_or(0);
192
193 let entry = entries
194 .entry((principal.to_string(), model.to_string()))
195 .or_insert((0, 0, 0));
196
197 match kind {
198 "p" => entry.0 = val,
199 "c" => entry.1 = val,
200 "ch" => entry.2 = val,
201 _ => {}
202 }
203 }
204
205 let result: Vec<UsageEntry> = entries
206 .into_iter()
207 .map(
208 |((name, model), (prompt, completion, cache_hit))| UsageEntry {
209 name,
210 model,
211 prompt_tokens: prompt,
212 completion_tokens: completion,
213 cache_hit_tokens: cache_hit,
214 },
215 )
216 .collect();
217
218 Json(result)
219}
220
221async fn admin_usage_handler(
222 storage: Arc<dyn Storage>,
223 query: AdminUsageQuery,
224) -> Json<Vec<UsageEntry>> {
225 query_usage(&*storage, query.name.as_deref(), query.model.as_deref()).await
226}