1use super::{
16 Error, JsonSnafu, ModelListParams, Schema, SchemaAllowCreate, SchemaAllowEdit, SchemaType,
17 SchemaView, SqlxSnafu, format_datetime,
18};
19use serde::{Deserialize, Serialize};
20use snafu::ResultExt;
21use sqlx::FromRow;
22use sqlx::{Pool, Postgres, QueryBuilder};
23use std::collections::HashMap;
24use tibba_model::Model;
25use time::PrimitiveDateTime;
26
27type Result<T> = std::result::Result<T, Error>;
28
29#[derive(FromRow)]
30struct TokenUsageSchema {
31 id: i64,
32 user_id: i64,
33 service: String,
34 amount: i64,
35 model: String,
36 input_tokens: i32,
37 output_tokens: i32,
38 api_path: String,
39 duration_ms: i32,
40 biz_id: String,
41 remark: String,
42 created: PrimitiveDateTime,
43 modified: PrimitiveDateTime,
44}
45
46#[derive(Debug, Clone, Deserialize, Serialize)]
47pub struct TokenUsage {
48 pub id: i64,
49 pub user_id: i64,
50 pub service: String,
51 pub amount: i64,
52 pub model: String,
53 pub input_tokens: i32,
54 pub output_tokens: i32,
55 pub api_path: String,
56 pub duration_ms: i32,
57 pub biz_id: String,
58 pub remark: String,
59 pub created: String,
60 pub modified: String,
61}
62
63impl From<TokenUsageSchema> for TokenUsage {
64 fn from(s: TokenUsageSchema) -> Self {
65 Self {
66 id: s.id,
67 user_id: s.user_id,
68 service: s.service,
69 amount: s.amount,
70 model: s.model,
71 input_tokens: s.input_tokens,
72 output_tokens: s.output_tokens,
73 api_path: s.api_path,
74 duration_ms: s.duration_ms,
75 biz_id: s.biz_id,
76 remark: s.remark,
77 created: format_datetime(s.created),
78 modified: format_datetime(s.modified),
79 }
80 }
81}
82
83#[derive(Debug, Clone, Deserialize, Default)]
85pub struct TokenUsageInsertParams {
86 pub user_id: i64,
87 pub service: String,
88 pub amount: i64,
89 pub model: Option<String>,
91 pub input_tokens: Option<i32>,
93 pub output_tokens: Option<i32>,
95 pub api_path: Option<String>,
97 pub duration_ms: Option<i32>,
99 pub biz_id: Option<String>,
101 pub remark: Option<String>,
102}
103
104#[derive(Debug, Clone, Serialize)]
106pub struct TokenUsageSummary {
107 pub user_id: i64,
108 pub service: String,
109 pub model: String,
110 pub total_amount: i64,
111 pub total_input_tokens: i64,
112 pub total_output_tokens: i64,
113 pub call_count: i64,
114}
115
116#[derive(Default)]
117pub struct TokenUsageModel {}
118
119impl TokenUsageModel {
120 pub async fn list_by_user(
122 &self,
123 pool: &Pool<Postgres>,
124 user_id: i64,
125 page: u64,
126 limit: u64,
127 ) -> Result<Vec<TokenUsage>> {
128 let limit = limit.min(200);
129 let offset = (page.max(1) - 1) * limit;
130 let rows = sqlx::query_as::<_, TokenUsageSchema>(
131 r#"SELECT * FROM token_usages WHERE user_id = $1 AND deleted_at IS NULL ORDER BY id DESC LIMIT $2 OFFSET $3"#,
132 )
133 .bind(user_id)
134 .bind(limit as i64)
135 .bind(offset as i64)
136 .fetch_all(pool)
137 .await
138 .context(SqlxSnafu)?;
139 Ok(rows.into_iter().map(Into::into).collect())
140 }
141
142 pub async fn summary_by_service(
144 &self,
145 pool: &Pool<Postgres>,
146 user_id: Option<i64>,
147 ) -> Result<Vec<TokenUsageSummary>> {
148 let mut qb: QueryBuilder<Postgres> = QueryBuilder::new(
149 r#"SELECT user_id, service, model,
150 SUM(amount) AS total_amount,
151 SUM(input_tokens) AS total_input_tokens,
152 SUM(output_tokens) AS total_output_tokens,
153 COUNT(*) AS call_count
154 FROM token_usages
155 WHERE deleted_at IS NULL"#,
156 );
157 if let Some(uid) = user_id {
158 qb.push(" AND user_id = ").push_bind(uid);
159 }
160 qb.push(" GROUP BY user_id, service, model ORDER BY total_amount DESC");
161
162 #[derive(FromRow)]
163 struct SummaryRow {
164 user_id: i64,
165 service: String,
166 model: String,
167 total_amount: i64,
168 total_input_tokens: i64,
169 total_output_tokens: i64,
170 call_count: i64,
171 }
172
173 let rows = qb
174 .build_query_as::<SummaryRow>()
175 .fetch_all(pool)
176 .await
177 .context(SqlxSnafu)?;
178
179 Ok(rows
180 .into_iter()
181 .map(|r| TokenUsageSummary {
182 user_id: r.user_id,
183 service: r.service,
184 model: r.model,
185 total_amount: r.total_amount,
186 total_input_tokens: r.total_input_tokens,
187 total_output_tokens: r.total_output_tokens,
188 call_count: r.call_count,
189 })
190 .collect())
191 }
192}
193
194impl Model for TokenUsageModel {
195 type Output = TokenUsage;
196 fn new() -> Self {
197 Self::default()
198 }
199
200 async fn schema_view(&self, _pool: &Pool<Postgres>) -> SchemaView {
201 SchemaView {
202 schemas: vec![
203 Schema::new_id(),
204 Schema {
205 name: "user_id".to_string(),
206 category: SchemaType::Number,
207 read_only: true,
208 filterable: true,
209 ..Default::default()
210 },
211 Schema {
212 name: "service".to_string(),
213 category: SchemaType::String,
214 read_only: true,
215 filterable: true,
216 ..Default::default()
217 },
218 Schema {
219 name: "amount".to_string(),
220 category: SchemaType::Number,
221 read_only: true,
222 ..Default::default()
223 },
224 Schema {
225 name: "model".to_string(),
226 category: SchemaType::String,
227 read_only: true,
228 filterable: true,
229 ..Default::default()
230 },
231 Schema {
232 name: "input_tokens".to_string(),
233 category: SchemaType::Number,
234 read_only: true,
235 ..Default::default()
236 },
237 Schema {
238 name: "output_tokens".to_string(),
239 category: SchemaType::Number,
240 read_only: true,
241 ..Default::default()
242 },
243 Schema {
244 name: "api_path".to_string(),
245 category: SchemaType::String,
246 read_only: true,
247 ..Default::default()
248 },
249 Schema {
250 name: "duration_ms".to_string(),
251 category: SchemaType::Number,
252 read_only: true,
253 ..Default::default()
254 },
255 Schema {
256 name: "biz_id".to_string(),
257 category: SchemaType::String,
258 read_only: true,
259 ..Default::default()
260 },
261 Schema::new_readonly_remark(),
262 Schema::new_created(),
263 Schema::new_filterable_modified(),
264 ],
265 allow_edit: SchemaAllowEdit {
266 disabled: true,
267 ..Default::default()
268 },
269 allow_create: SchemaAllowCreate {
270 disabled: true,
271 ..Default::default()
272 },
273 }
274 }
275
276 async fn insert(&self, pool: &Pool<Postgres>, data: serde_json::Value) -> Result<u64> {
277 let p: TokenUsageInsertParams = serde_json::from_value(data).context(JsonSnafu)?;
278 let row: (i64,) = sqlx::query_as(
279 r#"INSERT INTO token_usages
280 (user_id, service, amount, model, input_tokens, output_tokens, api_path, duration_ms, biz_id, remark)
281 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
282 RETURNING id"#,
283 )
284 .bind(p.user_id)
285 .bind(&p.service)
286 .bind(p.amount)
287 .bind(p.model.unwrap_or_default())
288 .bind(p.input_tokens.unwrap_or(0))
289 .bind(p.output_tokens.unwrap_or(0))
290 .bind(p.api_path.unwrap_or_default())
291 .bind(p.duration_ms.unwrap_or(0))
292 .bind(p.biz_id.unwrap_or_default())
293 .bind(p.remark.unwrap_or_default())
294 .fetch_one(pool)
295 .await
296 .context(SqlxSnafu)?;
297 Ok(row.0 as u64)
298 }
299
300 async fn get_by_id(&self, pool: &Pool<Postgres>, id: u64) -> Result<Option<Self::Output>> {
301 let result = sqlx::query_as::<_, TokenUsageSchema>(
302 r#"SELECT * FROM token_usages WHERE id = $1 AND deleted_at IS NULL"#,
303 )
304 .bind(id as i64)
305 .fetch_optional(pool)
306 .await
307 .context(SqlxSnafu)?;
308 Ok(result.map(Into::into))
309 }
310
311 async fn delete_by_id(&self, pool: &Pool<Postgres>, id: u64) -> Result<()> {
312 sqlx::query(
313 r#"UPDATE token_usages SET deleted_at = NOW(), modified = NOW() WHERE id = $1 AND deleted_at IS NULL"#,
314 )
315 .bind(id as i64)
316 .execute(pool)
317 .await
318 .context(SqlxSnafu)?;
319 Ok(())
320 }
321
322 async fn count(&self, pool: &Pool<Postgres>, params: &ModelListParams) -> Result<i64> {
323 let mut qb: QueryBuilder<Postgres> = QueryBuilder::new("SELECT COUNT(*) FROM token_usages");
324 self.push_conditions(&mut qb, params)?;
325 let row: (i64,) = qb
326 .build_query_as()
327 .fetch_one(pool)
328 .await
329 .context(SqlxSnafu)?;
330 Ok(row.0)
331 }
332
333 async fn list(
334 &self,
335 pool: &Pool<Postgres>,
336 params: &ModelListParams,
337 ) -> Result<Vec<Self::Output>> {
338 let mut qb: QueryBuilder<Postgres> = QueryBuilder::new("SELECT * FROM token_usages");
339 self.push_conditions(&mut qb, params)?;
340 params.push_pagination(&mut qb);
341 let rows = qb
342 .build_query_as::<TokenUsageSchema>()
343 .fetch_all(pool)
344 .await
345 .context(SqlxSnafu)?;
346 Ok(rows.into_iter().map(Into::into).collect())
347 }
348
349 fn push_filter_conditions<'args>(
350 &self,
351 qb: &mut QueryBuilder<'args, Postgres>,
352 filters: &HashMap<String, String>,
353 ) -> Result<()> {
354 if let Some(user_id) = filters.get("user_id") {
355 if let Ok(v) = user_id.parse::<i64>() {
356 qb.push(" AND user_id = ").push_bind(v);
357 }
358 }
359 if let Some(service) = filters.get("service") {
360 qb.push(" AND service = ").push_bind(service.clone());
361 }
362 if let Some(model) = filters.get("model") {
363 qb.push(" AND model = ").push_bind(model.clone());
364 }
365 Ok(())
366 }
367}