tibba_model_token/
service.rs1use super::{Error, SqlxSnafu, TokenRechargeInsertParams, TokenUsageInsertParams};
16use snafu::ResultExt;
17use sqlx::{Pool, Postgres};
18
19type Result<T> = std::result::Result<T, Error>;
20
21pub struct RechargeResult {
22 pub recharge_id: i64,
23 pub new_balance: i64,
24}
25
26pub struct ConsumeResult {
27 pub usage_id: i64,
28 pub new_balance: i64,
29}
30
31pub struct TokenService;
32
33impl TokenService {
34 pub async fn recharge(
37 pool: &Pool<Postgres>,
38 params: TokenRechargeInsertParams,
39 ) -> Result<RechargeResult> {
40 let mut tx = pool.begin().await.context(SqlxSnafu)?;
41
42 sqlx::query(
44 r#"INSERT INTO token_accounts (user_id)
45 VALUES ($1)
46 ON CONFLICT (user_id) WHERE deleted_at IS NULL DO NOTHING"#,
47 )
48 .bind(params.user_id)
49 .execute(&mut *tx)
50 .await
51 .context(SqlxSnafu)?;
52
53 let (recharge_id,): (i64,) = sqlx::query_as(
55 r#"INSERT INTO token_recharges
56 (user_id, amount, source, order_id, remark, created_by)
57 VALUES ($1, $2, $3, $4, $5, $6)
58 RETURNING id"#,
59 )
60 .bind(params.user_id)
61 .bind(params.amount)
62 .bind(params.source)
63 .bind(params.order_id.unwrap_or_default())
64 .bind(params.remark.unwrap_or_default())
65 .bind(params.created_by.unwrap_or(0))
66 .fetch_one(&mut *tx)
67 .await
68 .context(SqlxSnafu)?;
69
70 let (new_balance,): (i64,) = sqlx::query_as(
72 r#"UPDATE token_accounts
73 SET balance = balance + $1,
74 total_recharged = total_recharged + $1
75 WHERE user_id = $2 AND deleted_at IS NULL
76 RETURNING balance"#,
77 )
78 .bind(params.amount)
79 .bind(params.user_id)
80 .fetch_one(&mut *tx)
81 .await
82 .context(SqlxSnafu)?;
83
84 tx.commit().await.context(SqlxSnafu)?;
85
86 Ok(RechargeResult {
87 recharge_id,
88 new_balance,
89 })
90 }
91
92 pub async fn consume(
95 pool: &Pool<Postgres>,
96 params: TokenUsageInsertParams,
97 ) -> Result<ConsumeResult> {
98 let mut tx = pool.begin().await.context(SqlxSnafu)?;
99
100 let result: Option<(i64,)> = sqlx::query_as(
102 r#"UPDATE token_accounts
103 SET balance = balance - $1,
104 total_consumed = total_consumed + $1
105 WHERE user_id = $2
106 AND balance >= $1
107 AND status = 1
108 AND deleted_at IS NULL
109 RETURNING balance"#,
110 )
111 .bind(params.amount)
112 .bind(params.user_id)
113 .fetch_optional(&mut *tx)
114 .await
115 .context(SqlxSnafu)?;
116
117 let new_balance = match result {
118 Some(row) => row.0,
119 None => return Err(Error::InsufficientBalance),
120 };
121
122 let (usage_id,): (i64,) = sqlx::query_as(
124 r#"INSERT INTO token_usages
125 (user_id, service, amount, model, input_tokens, output_tokens,
126 api_path, duration_ms, biz_id, remark)
127 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
128 RETURNING id"#,
129 )
130 .bind(params.user_id)
131 .bind(¶ms.service)
132 .bind(params.amount)
133 .bind(params.model.unwrap_or_default())
134 .bind(params.input_tokens.unwrap_or(0))
135 .bind(params.output_tokens.unwrap_or(0))
136 .bind(params.api_path.unwrap_or_default())
137 .bind(params.duration_ms.unwrap_or(0))
138 .bind(params.biz_id.unwrap_or_default())
139 .bind(params.remark.unwrap_or_default())
140 .fetch_one(&mut *tx)
141 .await
142 .context(SqlxSnafu)?;
143
144 tx.commit().await.context(SqlxSnafu)?;
145
146 Ok(ConsumeResult {
147 usage_id,
148 new_balance,
149 })
150 }
151}