tibba_model_token/
service.rs1use super::{
16 Error, RECHARGE_SOURCE_ADMIN, SERVICE_ADMIN_ADJUST, SqlxSnafu, TokenRechargeInsertParams,
17 TokenUsageInsertParams,
18};
19use snafu::ResultExt;
20use sqlx::{Pool, Postgres};
21
22type Result<T> = std::result::Result<T, Error>;
23
24pub struct RechargeResult {
25 pub recharge_id: i64,
26 pub new_balance: i64,
27}
28
29pub struct ConsumeResult {
30 pub usage_id: i64,
31 pub new_balance: i64,
32}
33
34pub struct AdjustResult {
35 pub new_balance: i64,
36}
37
38pub struct TokenService;
39
40impl TokenService {
41 pub async fn recharge(
44 pool: &Pool<Postgres>,
45 params: TokenRechargeInsertParams,
46 ) -> Result<RechargeResult> {
47 let mut tx = pool.begin().await.context(SqlxSnafu)?;
48
49 sqlx::query(
51 r#"INSERT INTO token_accounts (user_id)
52 VALUES ($1)
53 ON CONFLICT (user_id) WHERE deleted_at IS NULL DO NOTHING"#,
54 )
55 .bind(params.user_id)
56 .execute(&mut *tx)
57 .await
58 .context(SqlxSnafu)?;
59
60 let (recharge_id,): (i64,) = sqlx::query_as(
62 r#"INSERT INTO token_recharges
63 (user_id, amount, source, order_id, remark, created_by)
64 VALUES ($1, $2, $3, $4, $5, $6)
65 RETURNING id"#,
66 )
67 .bind(params.user_id)
68 .bind(params.amount)
69 .bind(params.source)
70 .bind(params.order_id.unwrap_or_default())
71 .bind(params.remark.unwrap_or_default())
72 .bind(params.created_by.unwrap_or(0))
73 .fetch_one(&mut *tx)
74 .await
75 .context(SqlxSnafu)?;
76
77 let (new_balance,): (i64,) = sqlx::query_as(
79 r#"UPDATE token_accounts
80 SET balance = balance + $1,
81 total_recharged = total_recharged + $1
82 WHERE user_id = $2 AND deleted_at IS NULL
83 RETURNING balance"#,
84 )
85 .bind(params.amount)
86 .bind(params.user_id)
87 .fetch_one(&mut *tx)
88 .await
89 .context(SqlxSnafu)?;
90
91 tx.commit().await.context(SqlxSnafu)?;
92
93 Ok(RechargeResult {
94 recharge_id,
95 new_balance,
96 })
97 }
98
99 pub async fn consume(
102 pool: &Pool<Postgres>,
103 params: TokenUsageInsertParams,
104 ) -> Result<ConsumeResult> {
105 let mut tx = pool.begin().await.context(SqlxSnafu)?;
106
107 let result: Option<(i64,)> = sqlx::query_as(
109 r#"UPDATE token_accounts
110 SET balance = balance - $1,
111 total_consumed = total_consumed + $1
112 WHERE user_id = $2
113 AND balance >= $1
114 AND status = 1
115 AND deleted_at IS NULL
116 RETURNING balance"#,
117 )
118 .bind(params.amount)
119 .bind(params.user_id)
120 .fetch_optional(&mut *tx)
121 .await
122 .context(SqlxSnafu)?;
123
124 let new_balance = match result {
125 Some(row) => row.0,
126 None => return Err(Error::InsufficientBalance),
127 };
128
129 let (usage_id,): (i64,) = sqlx::query_as(
131 r#"INSERT INTO token_usages
132 (user_id, service, amount, model, input_tokens, output_tokens,
133 api_path, duration_ms, biz_id, remark)
134 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
135 RETURNING id"#,
136 )
137 .bind(params.user_id)
138 .bind(¶ms.service)
139 .bind(params.amount)
140 .bind(params.model.unwrap_or_default())
141 .bind(params.input_tokens.unwrap_or(0))
142 .bind(params.output_tokens.unwrap_or(0))
143 .bind(params.api_path.unwrap_or_default())
144 .bind(params.duration_ms.unwrap_or(0))
145 .bind(params.biz_id.unwrap_or_default())
146 .bind(params.remark.unwrap_or_default())
147 .fetch_one(&mut *tx)
148 .await
149 .context(SqlxSnafu)?;
150
151 tx.commit().await.context(SqlxSnafu)?;
152
153 Ok(ConsumeResult {
154 usage_id,
155 new_balance,
156 })
157 }
158
159 pub async fn adjust(
166 pool: &Pool<Postgres>,
167 user_id: i64,
168 amount: i64,
169 admin_user_id: i64,
170 remark: Option<String>,
171 ) -> Result<AdjustResult> {
172 if amount == 0 {
173 return Err(Error::InvalidAmount {
174 message: "amount must be non-zero".to_string(),
175 });
176 }
177
178 let remark = remark
179 .filter(|s| !s.trim().is_empty())
180 .unwrap_or_else(|| "admin adjust".to_string());
181
182 let new_balance = if amount > 0 {
183 Self::recharge(
184 pool,
185 TokenRechargeInsertParams {
186 user_id,
187 amount,
188 source: RECHARGE_SOURCE_ADMIN,
189 remark: Some(remark),
190 created_by: Some(admin_user_id),
191 ..Default::default()
192 },
193 )
194 .await?
195 .new_balance
196 } else {
197 Self::consume(
198 pool,
199 TokenUsageInsertParams {
200 user_id,
201 service: SERVICE_ADMIN_ADJUST.to_string(),
202 amount: -amount,
203 biz_id: Some(format!("admin:{admin_user_id}")),
204 remark: Some(remark),
205 ..Default::default()
206 },
207 )
208 .await?
209 .new_balance
210 };
211
212 Ok(AdjustResult { new_balance })
213 }
214}