Skip to main content

crabllm_proxy/ext/
usage.rs

1use crate::PREFIX_USAGE;
2use axum::{Json, Router, extract::Query, routing::get};
3use crabllm_core::{BoxFuture, RequestContext, Storage, storage_key};
4use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6
7pub struct UsageTracker {
8    storage: Arc<dyn Storage>,
9}
10
11impl UsageTracker {
12    pub fn new(_config: &serde_json::Value, storage: Arc<dyn Storage>) -> Result<Self, String> {
13        Ok(Self { storage })
14    }
15
16    /// Admin usage routes: `GET /v1/admin/usage?name=&model=` — global view.
17    pub fn admin_routes(&self) -> Router {
18        let storage = self.storage.clone();
19        Router::new().route(
20            "/v1/admin/usage",
21            get(move |query: Query<AdminUsageQuery>| {
22                let storage = storage.clone();
23                async move { admin_usage_handler(storage, query.0).await }
24            }),
25        )
26    }
27
28    /// Record token usage for a given key and model.
29    async fn record(
30        &self,
31        principal: &str,
32        model: &str,
33        prompt_tokens: u32,
34        completion_tokens: u32,
35        cache_hit_tokens: u32,
36    ) {
37        let prompt_suffix = format!("{principal}:{model}:p");
38        let completion_suffix = format!("{principal}:{model}:c");
39
40        let _ = self
41            .storage
42            .increment(
43                &storage_key(&PREFIX_USAGE, prompt_suffix.as_bytes()),
44                prompt_tokens as i64,
45            )
46            .await;
47        let _ = self
48            .storage
49            .increment(
50                &storage_key(&PREFIX_USAGE, completion_suffix.as_bytes()),
51                completion_tokens as i64,
52            )
53            .await;
54        if cache_hit_tokens > 0 {
55            let ch_suffix = format!("{principal}:{model}:ch");
56            let _ = self
57                .storage
58                .increment(
59                    &storage_key(&PREFIX_USAGE, ch_suffix.as_bytes()),
60                    cache_hit_tokens as i64,
61                )
62                .await;
63        }
64    }
65}
66
67impl crabllm_core::Extension for UsageTracker {
68    fn name(&self) -> &str {
69        "usage"
70    }
71
72    fn prefix(&self) -> crabllm_core::Prefix {
73        PREFIX_USAGE
74    }
75
76    fn on_response(
77        &self,
78        ctx: &RequestContext,
79        _raw_request: &[u8],
80        raw_response: &[u8],
81    ) -> BoxFuture<'_, ()> {
82        let usage = crabllm_core::Usage::from(raw_response);
83        if usage.total_tokens() == 0 {
84            return Box::pin(async {});
85        }
86
87        let principal = ctx
88            .principal
89            .clone()
90            .unwrap_or_else(|| "__global".to_string());
91        let model = ctx.model.clone();
92
93        Box::pin(async move {
94            self.record(
95                &principal,
96                &model,
97                usage.prompt_tokens(),
98                usage.completion_tokens(),
99                usage.cache_read_tokens,
100            )
101            .await;
102        })
103    }
104
105    fn on_chunk(&self, ctx: &RequestContext, raw_chunk: &[u8]) -> BoxFuture<'_, ()> {
106        let usage = crabllm_core::Usage::from(raw_chunk);
107        if usage.total_tokens() == 0 {
108            return Box::pin(async {});
109        }
110
111        let principal = ctx
112            .principal
113            .clone()
114            .unwrap_or_else(|| "__global".to_string());
115        let model = ctx.model.clone();
116
117        Box::pin(async move {
118            self.record(
119                &principal,
120                &model,
121                usage.prompt_tokens(),
122                usage.completion_tokens(),
123                usage.cache_read_tokens,
124            )
125            .await;
126        })
127    }
128}
129
130#[derive(Deserialize)]
131struct AdminUsageQuery {
132    name: Option<String>,
133    model: Option<String>,
134}
135
136#[derive(Deserialize)]
137pub struct UserUsageQuery {
138    pub model: Option<String>,
139}
140
141#[derive(Serialize)]
142#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
143pub struct UsageEntry {
144    name: String,
145    model: String,
146    prompt_tokens: i64,
147    completion_tokens: i64,
148    cache_hit_tokens: i64,
149}
150
151/// Query usage data from storage, filtered by optional principal and model.
152pub async fn query_usage(
153    storage: &dyn Storage,
154    name_filter: Option<&str>,
155    model_filter: Option<&str>,
156) -> Json<Vec<UsageEntry>> {
157    let pairs = storage.list(&PREFIX_USAGE).await.unwrap_or_default();
158
159    let mut entries: std::collections::HashMap<(String, String), (i64, i64, i64)> =
160        std::collections::HashMap::new();
161
162    for (raw_key, raw_value) in &pairs {
163        let suffix = match std::str::from_utf8(&raw_key[crabllm_core::PREFIX_LEN..]) {
164            Ok(s) => s,
165            Err(_) => continue,
166        };
167
168        // suffix format: "{principal}:{model}:{p|c|ch}"
169        let Some((rest, kind)) = suffix.rsplit_once(':') else {
170            continue;
171        };
172        let Some((principal, model)) = rest.split_once(':') else {
173            continue;
174        };
175
176        if let Some(filter) = name_filter
177            && principal != filter
178        {
179            continue;
180        }
181        if let Some(filter) = model_filter
182            && model != filter
183        {
184            continue;
185        }
186
187        let val = raw_value
188            .get(..8)
189            .and_then(|b| b.try_into().ok())
190            .map(i64::from_le_bytes)
191            .unwrap_or(0);
192
193        let entry = entries
194            .entry((principal.to_string(), model.to_string()))
195            .or_insert((0, 0, 0));
196
197        match kind {
198            "p" => entry.0 = val,
199            "c" => entry.1 = val,
200            "ch" => entry.2 = val,
201            _ => {}
202        }
203    }
204
205    let result: Vec<UsageEntry> = entries
206        .into_iter()
207        .map(
208            |((name, model), (prompt, completion, cache_hit))| UsageEntry {
209                name,
210                model,
211                prompt_tokens: prompt,
212                completion_tokens: completion,
213                cache_hit_tokens: cache_hit,
214            },
215        )
216        .collect();
217
218    Json(result)
219}
220
221async fn admin_usage_handler(
222    storage: Arc<dyn Storage>,
223    query: AdminUsageQuery,
224) -> Json<Vec<UsageEntry>> {
225    query_usage(&*storage, query.name.as_deref(), query.model.as_deref()).await
226}