Skip to main content

tibba_model_token/
usage.rs

1// Copyright 2026 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::{
16    Error, JsonSnafu, ModelListParams, Schema, SchemaAllowCreate, SchemaAllowEdit, SchemaType,
17    SchemaView, SqlxSnafu, format_datetime,
18};
19use serde::{Deserialize, Serialize};
20use snafu::ResultExt;
21use sqlx::FromRow;
22use sqlx::{Pool, Postgres, QueryBuilder};
23use std::collections::HashMap;
24use tibba_model::Model;
25use time::PrimitiveDateTime;
26
27type Result<T> = std::result::Result<T, Error>;
28
29#[derive(FromRow)]
30struct TokenUsageSchema {
31    id: i64,
32    user_id: i64,
33    service: String,
34    amount: i64,
35    model: String,
36    input_tokens: i32,
37    output_tokens: i32,
38    api_path: String,
39    duration_ms: i32,
40    biz_id: String,
41    remark: String,
42    created: PrimitiveDateTime,
43    modified: PrimitiveDateTime,
44}
45
46#[derive(Debug, Clone, Deserialize, Serialize)]
47pub struct TokenUsage {
48    pub id: i64,
49    pub user_id: i64,
50    pub service: String,
51    pub amount: i64,
52    pub model: String,
53    pub input_tokens: i32,
54    pub output_tokens: i32,
55    pub api_path: String,
56    pub duration_ms: i32,
57    pub biz_id: String,
58    pub remark: String,
59    pub created: String,
60    pub modified: String,
61}
62
63impl From<TokenUsageSchema> for TokenUsage {
64    fn from(s: TokenUsageSchema) -> Self {
65        Self {
66            id: s.id,
67            user_id: s.user_id,
68            service: s.service,
69            amount: s.amount,
70            model: s.model,
71            input_tokens: s.input_tokens,
72            output_tokens: s.output_tokens,
73            api_path: s.api_path,
74            duration_ms: s.duration_ms,
75            biz_id: s.biz_id,
76            remark: s.remark,
77            created: format_datetime(s.created),
78            modified: format_datetime(s.modified),
79        }
80    }
81}
82
83/// 记录一次消耗的参数,由调用方(如 tibba-llm)填写后传入。
84#[derive(Debug, Clone, Deserialize, Default)]
85pub struct TokenUsageInsertParams {
86    pub user_id: i64,
87    pub service: String,
88    pub amount: i64,
89    /// LLM 场景填模型名,其他场景留空
90    pub model: Option<String>,
91    /// LLM 输入 token 数
92    pub input_tokens: Option<i32>,
93    /// LLM 输出 token 数
94    pub output_tokens: Option<i32>,
95    /// 通用 API 场景的路径
96    pub api_path: Option<String>,
97    /// 调用耗时(毫秒)
98    pub duration_ms: Option<i32>,
99    /// 关联业务 ID(请求 ID、任务 ID 等)
100    pub biz_id: Option<String>,
101    pub remark: Option<String>,
102}
103
104/// 按用户/服务维度的消耗汇总。
105#[derive(Debug, Clone, Serialize)]
106pub struct TokenUsageSummary {
107    pub user_id: i64,
108    pub service: String,
109    pub model: String,
110    pub total_amount: i64,
111    pub total_input_tokens: i64,
112    pub total_output_tokens: i64,
113    pub call_count: i64,
114}
115
116#[derive(Default)]
117pub struct TokenUsageModel {}
118
119impl TokenUsageModel {
120    /// 按用户 ID 查询消耗记录(分页,倒序)。
121    pub async fn list_by_user(
122        &self,
123        pool: &Pool<Postgres>,
124        user_id: i64,
125        page: u64,
126        limit: u64,
127    ) -> Result<Vec<TokenUsage>> {
128        let limit = limit.min(200);
129        let offset = (page.max(1) - 1) * limit;
130        let rows = sqlx::query_as::<_, TokenUsageSchema>(
131            r#"SELECT * FROM token_usages WHERE user_id = $1 AND deleted_at IS NULL ORDER BY id DESC LIMIT $2 OFFSET $3"#,
132        )
133        .bind(user_id)
134        .bind(limit as i64)
135        .bind(offset as i64)
136        .fetch_all(pool)
137        .await
138        .context(SqlxSnafu)?;
139        Ok(rows.into_iter().map(Into::into).collect())
140    }
141
142    /// 按服务+模型维度聚合消耗汇总(用于统计分析)。
143    pub async fn summary_by_service(
144        &self,
145        pool: &Pool<Postgres>,
146        user_id: Option<i64>,
147    ) -> Result<Vec<TokenUsageSummary>> {
148        let mut qb: QueryBuilder<Postgres> = QueryBuilder::new(
149            r#"SELECT user_id, service, model,
150                      SUM(amount) AS total_amount,
151                      SUM(input_tokens) AS total_input_tokens,
152                      SUM(output_tokens) AS total_output_tokens,
153                      COUNT(*) AS call_count
154               FROM token_usages
155               WHERE deleted_at IS NULL"#,
156        );
157        if let Some(uid) = user_id {
158            qb.push(" AND user_id = ").push_bind(uid);
159        }
160        qb.push(" GROUP BY user_id, service, model ORDER BY total_amount DESC");
161
162        #[derive(FromRow)]
163        struct SummaryRow {
164            user_id: i64,
165            service: String,
166            model: String,
167            total_amount: i64,
168            total_input_tokens: i64,
169            total_output_tokens: i64,
170            call_count: i64,
171        }
172
173        let rows = qb
174            .build_query_as::<SummaryRow>()
175            .fetch_all(pool)
176            .await
177            .context(SqlxSnafu)?;
178
179        Ok(rows
180            .into_iter()
181            .map(|r| TokenUsageSummary {
182                user_id: r.user_id,
183                service: r.service,
184                model: r.model,
185                total_amount: r.total_amount,
186                total_input_tokens: r.total_input_tokens,
187                total_output_tokens: r.total_output_tokens,
188                call_count: r.call_count,
189            })
190            .collect())
191    }
192}
193
194impl Model for TokenUsageModel {
195    type Output = TokenUsage;
196    fn new() -> Self {
197        Self::default()
198    }
199
200    async fn schema_view(&self, _pool: &Pool<Postgres>) -> SchemaView {
201        SchemaView {
202            schemas: vec![
203                Schema::new_id(),
204                Schema {
205                    name: "user_id".to_string(),
206                    category: SchemaType::Number,
207                    read_only: true,
208                    filterable: true,
209                    ..Default::default()
210                },
211                Schema {
212                    name: "service".to_string(),
213                    category: SchemaType::String,
214                    read_only: true,
215                    filterable: true,
216                    ..Default::default()
217                },
218                Schema {
219                    name: "amount".to_string(),
220                    category: SchemaType::Number,
221                    read_only: true,
222                    ..Default::default()
223                },
224                Schema {
225                    name: "model".to_string(),
226                    category: SchemaType::String,
227                    read_only: true,
228                    filterable: true,
229                    ..Default::default()
230                },
231                Schema {
232                    name: "input_tokens".to_string(),
233                    category: SchemaType::Number,
234                    read_only: true,
235                    ..Default::default()
236                },
237                Schema {
238                    name: "output_tokens".to_string(),
239                    category: SchemaType::Number,
240                    read_only: true,
241                    ..Default::default()
242                },
243                Schema {
244                    name: "api_path".to_string(),
245                    category: SchemaType::String,
246                    read_only: true,
247                    ..Default::default()
248                },
249                Schema {
250                    name: "duration_ms".to_string(),
251                    category: SchemaType::Number,
252                    read_only: true,
253                    ..Default::default()
254                },
255                Schema {
256                    name: "biz_id".to_string(),
257                    category: SchemaType::String,
258                    read_only: true,
259                    ..Default::default()
260                },
261                Schema::new_readonly_remark(),
262                Schema::new_created(),
263                Schema::new_filterable_modified(),
264            ],
265            allow_edit: SchemaAllowEdit {
266                disabled: true,
267                ..Default::default()
268            },
269            allow_create: SchemaAllowCreate {
270                disabled: true,
271                ..Default::default()
272            },
273        }
274    }
275
276    async fn insert(&self, pool: &Pool<Postgres>, data: serde_json::Value) -> Result<u64> {
277        let p: TokenUsageInsertParams = serde_json::from_value(data).context(JsonSnafu)?;
278        let row: (i64,) = sqlx::query_as(
279            r#"INSERT INTO token_usages
280               (user_id, service, amount, model, input_tokens, output_tokens, api_path, duration_ms, biz_id, remark)
281               VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
282               RETURNING id"#,
283        )
284        .bind(p.user_id)
285        .bind(&p.service)
286        .bind(p.amount)
287        .bind(p.model.unwrap_or_default())
288        .bind(p.input_tokens.unwrap_or(0))
289        .bind(p.output_tokens.unwrap_or(0))
290        .bind(p.api_path.unwrap_or_default())
291        .bind(p.duration_ms.unwrap_or(0))
292        .bind(p.biz_id.unwrap_or_default())
293        .bind(p.remark.unwrap_or_default())
294        .fetch_one(pool)
295        .await
296        .context(SqlxSnafu)?;
297        Ok(row.0 as u64)
298    }
299
300    async fn get_by_id(&self, pool: &Pool<Postgres>, id: u64) -> Result<Option<Self::Output>> {
301        let result = sqlx::query_as::<_, TokenUsageSchema>(
302            r#"SELECT * FROM token_usages WHERE id = $1 AND deleted_at IS NULL"#,
303        )
304        .bind(id as i64)
305        .fetch_optional(pool)
306        .await
307        .context(SqlxSnafu)?;
308        Ok(result.map(Into::into))
309    }
310
311    async fn delete_by_id(&self, pool: &Pool<Postgres>, id: u64) -> Result<()> {
312        sqlx::query(
313            r#"UPDATE token_usages SET deleted_at = NOW(), modified = NOW() WHERE id = $1 AND deleted_at IS NULL"#,
314        )
315        .bind(id as i64)
316        .execute(pool)
317        .await
318        .context(SqlxSnafu)?;
319        Ok(())
320    }
321
322    async fn count(&self, pool: &Pool<Postgres>, params: &ModelListParams) -> Result<i64> {
323        let mut qb: QueryBuilder<Postgres> = QueryBuilder::new("SELECT COUNT(*) FROM token_usages");
324        self.push_conditions(&mut qb, params)?;
325        let row: (i64,) = qb
326            .build_query_as()
327            .fetch_one(pool)
328            .await
329            .context(SqlxSnafu)?;
330        Ok(row.0)
331    }
332
333    async fn list(
334        &self,
335        pool: &Pool<Postgres>,
336        params: &ModelListParams,
337    ) -> Result<Vec<Self::Output>> {
338        let mut qb: QueryBuilder<Postgres> = QueryBuilder::new("SELECT * FROM token_usages");
339        self.push_conditions(&mut qb, params)?;
340        params.push_pagination(&mut qb);
341        let rows = qb
342            .build_query_as::<TokenUsageSchema>()
343            .fetch_all(pool)
344            .await
345            .context(SqlxSnafu)?;
346        Ok(rows.into_iter().map(Into::into).collect())
347    }
348
349    fn push_filter_conditions<'args>(
350        &self,
351        qb: &mut QueryBuilder<'args, Postgres>,
352        filters: &HashMap<String, String>,
353    ) -> Result<()> {
354        if let Some(user_id) = filters.get("user_id") {
355            if let Ok(v) = user_id.parse::<i64>() {
356                qb.push(" AND user_id = ").push_bind(v);
357            }
358        }
359        if let Some(service) = filters.get("service") {
360            qb.push(" AND service = ").push_bind(service.clone());
361        }
362        if let Some(model) = filters.get("model") {
363            qb.push(" AND model = ").push_bind(model.clone());
364        }
365        Ok(())
366    }
367}