Skip to main content

tibba_model_token/
service.rs

1// Copyright 2026 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::{Error, SqlxSnafu, TokenRechargeInsertParams, TokenUsageInsertParams};
16use snafu::ResultExt;
17use sqlx::{Pool, Postgres};
18
19type Result<T> = std::result::Result<T, Error>;
20
21pub struct RechargeResult {
22    pub recharge_id: i64,
23    pub new_balance: i64,
24}
25
26pub struct ConsumeResult {
27    pub usage_id: i64,
28    pub new_balance: i64,
29}
30
31pub struct TokenService;
32
33impl TokenService {
34    /// 充值:在同一事务中插入充值记录并更新账户余额。
35    /// 若账户不存在则自动创建。
36    pub async fn recharge(
37        pool: &Pool<Postgres>,
38        params: TokenRechargeInsertParams,
39    ) -> Result<RechargeResult> {
40        let mut tx = pool.begin().await.context(SqlxSnafu)?;
41
42        // 确保账户存在
43        sqlx::query(
44            r#"INSERT INTO token_accounts (user_id)
45               VALUES ($1)
46               ON CONFLICT (user_id) WHERE deleted_at IS NULL DO NOTHING"#,
47        )
48        .bind(params.user_id)
49        .execute(&mut *tx)
50        .await
51        .context(SqlxSnafu)?;
52
53        // 插入充值记录
54        let (recharge_id,): (i64,) = sqlx::query_as(
55            r#"INSERT INTO token_recharges
56               (user_id, amount, source, order_id, remark, created_by)
57               VALUES ($1, $2, $3, $4, $5, $6)
58               RETURNING id"#,
59        )
60        .bind(params.user_id)
61        .bind(params.amount)
62        .bind(params.source)
63        .bind(params.order_id.unwrap_or_default())
64        .bind(params.remark.unwrap_or_default())
65        .bind(params.created_by.unwrap_or(0))
66        .fetch_one(&mut *tx)
67        .await
68        .context(SqlxSnafu)?;
69
70        // 更新账户余额与充值汇总
71        let (new_balance,): (i64,) = sqlx::query_as(
72            r#"UPDATE token_accounts
73               SET balance         = balance + $1,
74                   total_recharged = total_recharged + $1
75             WHERE user_id = $2 AND deleted_at IS NULL
76             RETURNING balance"#,
77        )
78        .bind(params.amount)
79        .bind(params.user_id)
80        .fetch_one(&mut *tx)
81        .await
82        .context(SqlxSnafu)?;
83
84        tx.commit().await.context(SqlxSnafu)?;
85
86        Ok(RechargeResult {
87            recharge_id,
88            new_balance,
89        })
90    }
91
92    /// 消费:在同一事务中扣减余额并写入消费记录。
93    /// 余额不足时返回 `Error::InsufficientBalance`,不写消费记录,事务回滚。
94    pub async fn consume(
95        pool: &Pool<Postgres>,
96        params: TokenUsageInsertParams,
97    ) -> Result<ConsumeResult> {
98        let mut tx = pool.begin().await.context(SqlxSnafu)?;
99
100        // 原子扣减余额(余额不足则返回 None)
101        let result: Option<(i64,)> = sqlx::query_as(
102            r#"UPDATE token_accounts
103               SET balance        = balance - $1,
104                   total_consumed = total_consumed + $1
105             WHERE user_id = $2
106               AND balance >= $1
107               AND status = 1
108               AND deleted_at IS NULL
109             RETURNING balance"#,
110        )
111        .bind(params.amount)
112        .bind(params.user_id)
113        .fetch_optional(&mut *tx)
114        .await
115        .context(SqlxSnafu)?;
116
117        let new_balance = match result {
118            Some(row) => row.0,
119            None => return Err(Error::InsufficientBalance),
120        };
121
122        // 写入消费记录
123        let (usage_id,): (i64,) = sqlx::query_as(
124            r#"INSERT INTO token_usages
125               (user_id, service, amount, model, input_tokens, output_tokens,
126                api_path, duration_ms, biz_id, remark)
127               VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
128               RETURNING id"#,
129        )
130        .bind(params.user_id)
131        .bind(&params.service)
132        .bind(params.amount)
133        .bind(params.model.unwrap_or_default())
134        .bind(params.input_tokens.unwrap_or(0))
135        .bind(params.output_tokens.unwrap_or(0))
136        .bind(params.api_path.unwrap_or_default())
137        .bind(params.duration_ms.unwrap_or(0))
138        .bind(params.biz_id.unwrap_or_default())
139        .bind(params.remark.unwrap_or_default())
140        .fetch_one(&mut *tx)
141        .await
142        .context(SqlxSnafu)?;
143
144        tx.commit().await.context(SqlxSnafu)?;
145
146        Ok(ConsumeResult {
147            usage_id,
148            new_balance,
149        })
150    }
151}