Skip to main content

tibba_model_token/
account.rs

1// Copyright 2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::{
16    Error, JsonSnafu, ModelListParams, Schema, SchemaAllowCreate, SchemaAllowEdit, SchemaType,
17    SchemaView, SqlxSnafu, format_datetime,
18};
19use serde::{Deserialize, Serialize};
20use snafu::ResultExt;
21use sqlx::FromRow;
22use sqlx::{Pool, Postgres, QueryBuilder};
23use std::collections::HashMap;
24use tibba_error::Error as AppError;
25use tibba_model::Model;
26use time::PrimitiveDateTime;
27
28type Result<T> = std::result::Result<T, Error>;
29
30#[derive(FromRow)]
31struct TokenAccountSchema {
32    id: i64,
33    user_id: i64,
34    balance: i64,
35    total_recharged: i64,
36    total_consumed: i64,
37    status: i16,
38    remark: String,
39    created: PrimitiveDateTime,
40    modified: PrimitiveDateTime,
41}
42
43#[derive(Debug, Clone, Deserialize, Serialize)]
44pub struct TokenAccount {
45    pub id: i64,
46    pub user_id: i64,
47    pub balance: i64,
48    pub total_recharged: i64,
49    pub total_consumed: i64,
50    pub status: i16,
51    pub remark: String,
52    pub created: String,
53    pub modified: String,
54}
55
56impl From<TokenAccountSchema> for TokenAccount {
57    fn from(s: TokenAccountSchema) -> Self {
58        Self {
59            id: s.id,
60            user_id: s.user_id,
61            balance: s.balance,
62            total_recharged: s.total_recharged,
63            total_consumed: s.total_consumed,
64            status: s.status,
65            remark: s.remark,
66            created: format_datetime(s.created),
67            modified: format_datetime(s.modified),
68        }
69    }
70}
71
72#[derive(Debug, Clone, Deserialize, Default)]
73pub struct TokenAccountInsertParams {
74    pub user_id: i64,
75    pub remark: Option<String>,
76}
77
78#[derive(Debug, Clone, Deserialize, Default)]
79pub struct TokenAccountUpdateParams {
80    pub status: Option<i16>,
81    pub remark: Option<String>,
82}
83
84#[derive(Default)]
85pub struct TokenAccountModel {}
86
87impl TokenAccountModel {
88    /// 按用户 ID 查询账户。
89    pub async fn get_by_user_id(
90        &self,
91        pool: &Pool<Postgres>,
92        user_id: i64,
93    ) -> Result<Option<TokenAccount>> {
94        let result = sqlx::query_as::<_, TokenAccountSchema>(
95            r#"SELECT * FROM token_accounts WHERE user_id = $1 AND deleted_at IS NULL"#,
96        )
97        .bind(user_id)
98        .fetch_optional(pool)
99        .await
100        .context(SqlxSnafu)?;
101        Ok(result.map(Into::into))
102    }
103
104    /// 若账户不存在则自动创建,返回当前账户信息。
105    /// 通常在用户注册后调用。
106    pub async fn get_or_create(&self, pool: &Pool<Postgres>, user_id: i64) -> Result<TokenAccount> {
107        sqlx::query(
108            r#"INSERT INTO token_accounts (user_id) VALUES ($1) ON CONFLICT (user_id) WHERE deleted_at IS NULL DO NOTHING"#,
109        )
110        .bind(user_id)
111        .execute(pool)
112        .await
113        .context(SqlxSnafu)?;
114
115        self.get_by_user_id(pool, user_id)
116            .await?
117            .ok_or(Error::NotFound)
118    }
119
120    /// 原子性增加余额(充值)。
121    /// 同步更新 total_recharged 汇总字段。
122    /// 返回更新后的余额。
123    pub async fn add_balance(
124        &self,
125        pool: &Pool<Postgres>,
126        user_id: i64,
127        amount: i64,
128    ) -> Result<i64> {
129        let row: (i64,) = sqlx::query_as(
130            r#"
131            UPDATE token_accounts
132               SET balance          = balance + $1,
133                   total_recharged  = total_recharged + $1
134             WHERE user_id = $2 AND deleted_at IS NULL
135             RETURNING balance"#,
136        )
137        .bind(amount)
138        .bind(user_id)
139        .fetch_one(pool)
140        .await
141        .context(SqlxSnafu)?;
142        Ok(row.0)
143    }
144
145    /// 原子性扣减余额(消费)。
146    /// 余额不足时返回 HTTP 402,不会产生负余额。
147    /// 返回扣减后的余额。
148    pub async fn deduct_balance(
149        &self,
150        pool: &Pool<Postgres>,
151        user_id: i64,
152        amount: i64,
153    ) -> std::result::Result<i64, AppError> {
154        let result = sqlx::query_as::<_, (i64,)>(
155            r#"
156            UPDATE token_accounts
157               SET balance         = balance - $1,
158                   total_consumed  = total_consumed + $1
159             WHERE user_id = $2
160               AND balance >= $1
161               AND status = 1
162               AND deleted_at IS NULL
163             RETURNING balance"#,
164        )
165        .bind(amount)
166        .bind(user_id)
167        .fetch_optional(pool)
168        .await
169        .map_err(|e| {
170            AppError::new(e)
171                .with_category("token")
172                .with_sub_category("sqlx")
173                .with_exception(true)
174        })?;
175
176        match result {
177            Some(row) => Ok(row.0),
178            None => Err(AppError::new("insufficient balance")
179                .with_category("token")
180                .with_sub_category("insufficient_balance")
181                .with_status(402)),
182        }
183    }
184}
185
186impl Model for TokenAccountModel {
187    type Output = TokenAccount;
188    fn new() -> Self {
189        Self::default()
190    }
191
192    async fn schema_view(&self, _pool: &Pool<Postgres>) -> SchemaView {
193        SchemaView {
194            schemas: vec![
195                Schema::new_id(),
196                Schema::new_user_search("user_id"),
197                Schema {
198                    name: "balance".to_string(),
199                    category: SchemaType::Number,
200                    read_only: true,
201                    ..Default::default()
202                },
203                Schema {
204                    name: "total_recharged".to_string(),
205                    category: SchemaType::Number,
206                    read_only: true,
207                    ..Default::default()
208                },
209                Schema {
210                    name: "total_consumed".to_string(),
211                    category: SchemaType::Number,
212                    read_only: true,
213                    ..Default::default()
214                },
215                Schema::new_status(),
216                Schema::new_remark(),
217                Schema::new_created(),
218                Schema::new_modified(),
219            ],
220            allow_edit: SchemaAllowEdit {
221                roles: vec!["su".to_string(), "admin".to_string()],
222                ..Default::default()
223            },
224            allow_create: SchemaAllowCreate {
225                roles: vec!["su".to_string(), "admin".to_string()],
226                ..Default::default()
227            },
228        }
229    }
230
231    async fn insert(&self, pool: &Pool<Postgres>, data: serde_json::Value) -> Result<u64> {
232        let params: TokenAccountInsertParams = serde_json::from_value(data).context(JsonSnafu)?;
233        let row: (i64,) = sqlx::query_as(
234            r#"INSERT INTO token_accounts (user_id, remark) VALUES ($1, $2) RETURNING id"#,
235        )
236        .bind(params.user_id)
237        .bind(params.remark.unwrap_or_default())
238        .fetch_one(pool)
239        .await
240        .context(SqlxSnafu)?;
241        Ok(row.0 as u64)
242    }
243
244    async fn get_by_id(&self, pool: &Pool<Postgres>, id: u64) -> Result<Option<Self::Output>> {
245        let result = sqlx::query_as::<_, TokenAccountSchema>(
246            r#"SELECT * FROM token_accounts WHERE id = $1 AND deleted_at IS NULL"#,
247        )
248        .bind(id as i64)
249        .fetch_optional(pool)
250        .await
251        .context(SqlxSnafu)?;
252        Ok(result.map(Into::into))
253    }
254
255    async fn update_by_id(
256        &self,
257        pool: &Pool<Postgres>,
258        id: u64,
259        data: serde_json::Value,
260    ) -> Result<()> {
261        let params: TokenAccountUpdateParams = serde_json::from_value(data).context(JsonSnafu)?;
262        let mut qb: QueryBuilder<Postgres> =
263            QueryBuilder::new("UPDATE token_accounts SET modified = NOW()");
264        if let Some(status) = params.status {
265            qb.push(", status = ").push_bind(status);
266        }
267        if let Some(remark) = params.remark {
268            qb.push(", remark = ").push_bind(remark);
269        }
270        qb.push(" WHERE id = ").push_bind(id as i64);
271        qb.push(" AND deleted_at IS NULL");
272        qb.build().execute(pool).await.context(SqlxSnafu)?;
273        Ok(())
274    }
275
276    async fn delete_by_id(&self, pool: &Pool<Postgres>, id: u64) -> Result<()> {
277        sqlx::query(
278            r#"UPDATE token_accounts SET deleted_at = NOW() WHERE id = $1 AND deleted_at IS NULL"#,
279        )
280        .bind(id as i64)
281        .execute(pool)
282        .await
283        .context(SqlxSnafu)?;
284        Ok(())
285    }
286
287    async fn count(&self, pool: &Pool<Postgres>, params: &ModelListParams) -> Result<i64> {
288        let mut qb: QueryBuilder<Postgres> =
289            QueryBuilder::new("SELECT COUNT(*) FROM token_accounts");
290        self.push_conditions(&mut qb, params)?;
291        let row: (i64,) = qb
292            .build_query_as()
293            .fetch_one(pool)
294            .await
295            .context(SqlxSnafu)?;
296        Ok(row.0)
297    }
298
299    async fn list(
300        &self,
301        pool: &Pool<Postgres>,
302        params: &ModelListParams,
303    ) -> Result<Vec<Self::Output>> {
304        let mut qb: QueryBuilder<Postgres> = QueryBuilder::new("SELECT * FROM token_accounts");
305        self.push_conditions(&mut qb, params)?;
306        params.push_pagination(&mut qb);
307        let rows = qb
308            .build_query_as::<TokenAccountSchema>()
309            .fetch_all(pool)
310            .await
311            .context(SqlxSnafu)?;
312        Ok(rows.into_iter().map(Into::into).collect())
313    }
314
315    fn push_filter_conditions<'args>(
316        &self,
317        qb: &mut QueryBuilder<'args, Postgres>,
318        filters: &HashMap<String, String>,
319    ) -> Result<()> {
320        if let Some(status) = filters.get("status") {
321            if let Ok(s) = status.parse::<i16>() {
322                qb.push(" AND status = ").push_bind(s);
323            }
324        }
325        Ok(())
326    }
327}