Skip to main content

tibba_model_token/
service.rs

1// Copyright 2026 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::{
16    Error, RECHARGE_SOURCE_ADMIN, SERVICE_ADMIN_ADJUST, SqlxSnafu, TokenRechargeInsertParams,
17    TokenUsageInsertParams,
18};
19use snafu::ResultExt;
20use sqlx::{Pool, Postgres};
21
22type Result<T> = std::result::Result<T, Error>;
23
24pub struct RechargeResult {
25    pub recharge_id: i64,
26    pub new_balance: i64,
27}
28
29pub struct ConsumeResult {
30    pub usage_id: i64,
31    pub new_balance: i64,
32}
33
34pub struct AdjustResult {
35    pub new_balance: i64,
36}
37
38pub struct TokenService;
39
40impl TokenService {
41    /// 充值:在同一事务中插入充值记录并更新账户余额。
42    /// 若账户不存在则自动创建。
43    pub async fn recharge(
44        pool: &Pool<Postgres>,
45        params: TokenRechargeInsertParams,
46    ) -> Result<RechargeResult> {
47        let mut tx = pool.begin().await.context(SqlxSnafu)?;
48
49        // 确保账户存在
50        sqlx::query(
51            r#"INSERT INTO token_accounts (user_id)
52               VALUES ($1)
53               ON CONFLICT (user_id) WHERE deleted_at IS NULL DO NOTHING"#,
54        )
55        .bind(params.user_id)
56        .execute(&mut *tx)
57        .await
58        .context(SqlxSnafu)?;
59
60        // 插入充值记录
61        let (recharge_id,): (i64,) = sqlx::query_as(
62            r#"INSERT INTO token_recharges
63               (user_id, amount, source, order_id, remark, created_by)
64               VALUES ($1, $2, $3, $4, $5, $6)
65               RETURNING id"#,
66        )
67        .bind(params.user_id)
68        .bind(params.amount)
69        .bind(params.source)
70        .bind(params.order_id.unwrap_or_default())
71        .bind(params.remark.unwrap_or_default())
72        .bind(params.created_by.unwrap_or(0))
73        .fetch_one(&mut *tx)
74        .await
75        .context(SqlxSnafu)?;
76
77        // 更新账户余额与充值汇总
78        let (new_balance,): (i64,) = sqlx::query_as(
79            r#"UPDATE token_accounts
80               SET balance         = balance + $1,
81                   total_recharged = total_recharged + $1
82             WHERE user_id = $2 AND deleted_at IS NULL
83             RETURNING balance"#,
84        )
85        .bind(params.amount)
86        .bind(params.user_id)
87        .fetch_one(&mut *tx)
88        .await
89        .context(SqlxSnafu)?;
90
91        tx.commit().await.context(SqlxSnafu)?;
92
93        Ok(RechargeResult {
94            recharge_id,
95            new_balance,
96        })
97    }
98
99    /// 消费:在同一事务中扣减余额并写入消费记录。
100    /// 余额不足时返回 `Error::InsufficientBalance`,不写消费记录,事务回滚。
101    pub async fn consume(
102        pool: &Pool<Postgres>,
103        params: TokenUsageInsertParams,
104    ) -> Result<ConsumeResult> {
105        let mut tx = pool.begin().await.context(SqlxSnafu)?;
106
107        // 原子扣减余额(余额不足则返回 None)
108        let result: Option<(i64,)> = sqlx::query_as(
109            r#"UPDATE token_accounts
110               SET balance        = balance - $1,
111                   total_consumed = total_consumed + $1
112             WHERE user_id = $2
113               AND balance >= $1
114               AND status = 1
115               AND deleted_at IS NULL
116             RETURNING balance"#,
117        )
118        .bind(params.amount)
119        .bind(params.user_id)
120        .fetch_optional(&mut *tx)
121        .await
122        .context(SqlxSnafu)?;
123
124        let new_balance = match result {
125            Some(row) => row.0,
126            None => return Err(Error::InsufficientBalance),
127        };
128
129        // 写入消费记录
130        let (usage_id,): (i64,) = sqlx::query_as(
131            r#"INSERT INTO token_usages
132               (user_id, service, amount, model, input_tokens, output_tokens,
133                api_path, duration_ms, biz_id, remark)
134               VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
135               RETURNING id"#,
136        )
137        .bind(params.user_id)
138        .bind(&params.service)
139        .bind(params.amount)
140        .bind(params.model.unwrap_or_default())
141        .bind(params.input_tokens.unwrap_or(0))
142        .bind(params.output_tokens.unwrap_or(0))
143        .bind(params.api_path.unwrap_or_default())
144        .bind(params.duration_ms.unwrap_or(0))
145        .bind(params.biz_id.unwrap_or_default())
146        .bind(params.remark.unwrap_or_default())
147        .fetch_one(&mut *tx)
148        .await
149        .context(SqlxSnafu)?;
150
151        tx.commit().await.context(SqlxSnafu)?;
152
153        Ok(ConsumeResult {
154            usage_id,
155            new_balance,
156        })
157    }
158
159    /// 管理员调整某用户的 token 余额(保留审计流水,禁止绕过流水直接 UPDATE):
160    /// - `amount > 0` → 调用 [`Self::recharge`],source=`RECHARGE_SOURCE_ADMIN`,`created_by=admin_user_id`
161    /// - `amount < 0` → 调用 [`Self::consume`],service=`SERVICE_ADMIN_ADJUST`,`biz_id=admin:<id>` 保留操作者
162    /// - `amount == 0` → 返回 [`Error::InvalidAmount`]
163    ///
164    /// `remark` 为空或空白会被替换为 "admin adjust"。
165    pub async fn adjust(
166        pool: &Pool<Postgres>,
167        user_id: i64,
168        amount: i64,
169        admin_user_id: i64,
170        remark: Option<String>,
171    ) -> Result<AdjustResult> {
172        if amount == 0 {
173            return Err(Error::InvalidAmount {
174                message: "amount must be non-zero".to_string(),
175            });
176        }
177
178        let remark = remark
179            .filter(|s| !s.trim().is_empty())
180            .unwrap_or_else(|| "admin adjust".to_string());
181
182        let new_balance = if amount > 0 {
183            Self::recharge(
184                pool,
185                TokenRechargeInsertParams {
186                    user_id,
187                    amount,
188                    source: RECHARGE_SOURCE_ADMIN,
189                    remark: Some(remark),
190                    created_by: Some(admin_user_id),
191                    ..Default::default()
192                },
193            )
194            .await?
195            .new_balance
196        } else {
197            Self::consume(
198                pool,
199                TokenUsageInsertParams {
200                    user_id,
201                    service: SERVICE_ADMIN_ADJUST.to_string(),
202                    amount: -amount,
203                    biz_id: Some(format!("admin:{admin_user_id}")),
204                    remark: Some(remark),
205                    ..Default::default()
206                },
207            )
208            .await?
209            .new_balance
210        };
211
212        Ok(AdjustResult { new_balance })
213    }
214}