1use super::{
16 Error, JsonSnafu, ModelListParams, Schema, SchemaAllowCreate, SchemaAllowEdit, SchemaType,
17 SchemaView, SqlxSnafu, format_datetime,
18};
19use serde::{Deserialize, Serialize};
20use snafu::ResultExt;
21use sqlx::FromRow;
22use sqlx::{Pool, Postgres, QueryBuilder};
23use std::collections::HashMap;
24use tibba_model::Model;
25use time::PrimitiveDateTime;
26
27type Result<T> = std::result::Result<T, Error>;
28
29#[derive(FromRow)]
30struct TokenAccountSchema {
31 id: i64,
32 user_id: i64,
33 balance: i64,
34 total_recharged: i64,
35 total_consumed: i64,
36 status: i16,
37 remark: String,
38 created: PrimitiveDateTime,
39 modified: PrimitiveDateTime,
40}
41
42#[derive(Debug, Clone, Deserialize, Serialize)]
43pub struct TokenAccount {
44 pub id: i64,
45 pub user_id: i64,
46 pub balance: i64,
47 pub total_recharged: i64,
48 pub total_consumed: i64,
49 pub status: i16,
50 pub remark: String,
51 pub created: String,
52 pub modified: String,
53}
54
55impl From<TokenAccountSchema> for TokenAccount {
56 fn from(s: TokenAccountSchema) -> Self {
57 Self {
58 id: s.id,
59 user_id: s.user_id,
60 balance: s.balance,
61 total_recharged: s.total_recharged,
62 total_consumed: s.total_consumed,
63 status: s.status,
64 remark: s.remark,
65 created: format_datetime(s.created),
66 modified: format_datetime(s.modified),
67 }
68 }
69}
70
71#[derive(Debug, Clone, Deserialize, Default)]
72pub struct TokenAccountInsertParams {
73 pub user_id: i64,
74 pub remark: Option<String>,
75}
76
77#[derive(Debug, Clone, Deserialize, Default)]
78pub struct TokenAccountUpdateParams {
79 pub status: Option<i16>,
80 pub remark: Option<String>,
81}
82
83#[derive(Default)]
84pub struct TokenAccountModel {}
85
86impl TokenAccountModel {
87 pub async fn get_by_user_id(
89 &self,
90 pool: &Pool<Postgres>,
91 user_id: i64,
92 ) -> Result<Option<TokenAccount>> {
93 let result = sqlx::query_as::<_, TokenAccountSchema>(
94 r#"SELECT * FROM token_accounts WHERE user_id = $1 AND deleted_at IS NULL"#,
95 )
96 .bind(user_id)
97 .fetch_optional(pool)
98 .await
99 .context(SqlxSnafu)?;
100 Ok(result.map(Into::into))
101 }
102
103 pub async fn get_or_create(&self, pool: &Pool<Postgres>, user_id: i64) -> Result<TokenAccount> {
106 sqlx::query(
107 r#"INSERT INTO token_accounts (user_id) VALUES ($1) ON CONFLICT (user_id) WHERE deleted_at IS NULL DO NOTHING"#,
108 )
109 .bind(user_id)
110 .execute(pool)
111 .await
112 .context(SqlxSnafu)?;
113
114 self.get_by_user_id(pool, user_id)
115 .await?
116 .ok_or(Error::NotFound)
117 }
118
119 pub async fn add_balance(
123 &self,
124 pool: &Pool<Postgres>,
125 user_id: i64,
126 amount: i64,
127 ) -> Result<i64> {
128 let row: (i64,) = sqlx::query_as(
129 r#"
130 UPDATE token_accounts
131 SET balance = balance + $1,
132 total_recharged = total_recharged + $1
133 WHERE user_id = $2 AND deleted_at IS NULL
134 RETURNING balance"#,
135 )
136 .bind(amount)
137 .bind(user_id)
138 .fetch_one(pool)
139 .await
140 .context(SqlxSnafu)?;
141 Ok(row.0)
142 }
143
144 pub async fn deduct_balance(
148 &self,
149 pool: &Pool<Postgres>,
150 user_id: i64,
151 amount: i64,
152 ) -> Result<i64> {
153 let result = sqlx::query_as::<_, (i64,)>(
154 r#"
155 UPDATE token_accounts
156 SET balance = balance - $1,
157 total_consumed = total_consumed + $1
158 WHERE user_id = $2
159 AND balance >= $1
160 AND status = 1
161 AND deleted_at IS NULL
162 RETURNING balance"#,
163 )
164 .bind(amount)
165 .bind(user_id)
166 .fetch_optional(pool)
167 .await
168 .context(SqlxSnafu)?;
169
170 match result {
171 Some(row) => Ok(row.0),
172 None => Err(Error::InsufficientBalance),
173 }
174 }
175}
176
177impl Model for TokenAccountModel {
178 type Output = TokenAccount;
179 fn new() -> Self {
180 Self::default()
181 }
182
183 async fn schema_view(&self, _pool: &Pool<Postgres>) -> SchemaView {
184 SchemaView {
185 schemas: vec![
186 Schema::new_id(),
187 Schema::new_user_search("user_id"),
188 Schema {
189 name: "balance".to_string(),
190 category: SchemaType::Number,
191 read_only: true,
192 ..Default::default()
193 },
194 Schema {
195 name: "total_recharged".to_string(),
196 category: SchemaType::Number,
197 read_only: true,
198 ..Default::default()
199 },
200 Schema {
201 name: "total_consumed".to_string(),
202 category: SchemaType::Number,
203 read_only: true,
204 ..Default::default()
205 },
206 Schema::new_status(),
207 Schema::new_remark(),
208 Schema::new_created(),
209 Schema::new_modified(),
210 ],
211 allow_edit: SchemaAllowEdit {
212 roles: vec!["su".to_string(), "admin".to_string()],
213 ..Default::default()
214 },
215 allow_create: SchemaAllowCreate {
216 roles: vec!["su".to_string(), "admin".to_string()],
217 ..Default::default()
218 },
219 }
220 }
221
222 async fn insert(&self, pool: &Pool<Postgres>, mut data: serde_json::Value) -> Result<u64> {
223 if let Some(obj) = data.as_object_mut() {
225 if let Some(id_str) = obj.get("user_id").and_then(|v| v.as_str()) {
226 if let Ok(id) = id_str.parse::<i64>() {
227 obj.insert("user_id".to_string(), id.into());
228 }
229 }
230 }
231 let params: TokenAccountInsertParams = serde_json::from_value(data).context(JsonSnafu)?;
232 let row: (i64,) = sqlx::query_as(
233 r#"INSERT INTO token_accounts (user_id, remark) VALUES ($1, $2) RETURNING id"#,
234 )
235 .bind(params.user_id)
236 .bind(params.remark.unwrap_or_default())
237 .fetch_one(pool)
238 .await
239 .context(SqlxSnafu)?;
240 Ok(row.0 as u64)
241 }
242
243 async fn get_by_id(&self, pool: &Pool<Postgres>, id: u64) -> Result<Option<Self::Output>> {
244 let result = sqlx::query_as::<_, TokenAccountSchema>(
245 r#"SELECT * FROM token_accounts WHERE id = $1 AND deleted_at IS NULL"#,
246 )
247 .bind(id as i64)
248 .fetch_optional(pool)
249 .await
250 .context(SqlxSnafu)?;
251 Ok(result.map(Into::into))
252 }
253
254 async fn update_by_id(
255 &self,
256 pool: &Pool<Postgres>,
257 id: u64,
258 data: serde_json::Value,
259 ) -> Result<()> {
260 let params: TokenAccountUpdateParams = serde_json::from_value(data).context(JsonSnafu)?;
261 let mut qb: QueryBuilder<Postgres> =
262 QueryBuilder::new("UPDATE token_accounts SET modified = NOW()");
263 if let Some(status) = params.status {
264 qb.push(", status = ").push_bind(status);
265 }
266 if let Some(remark) = params.remark {
267 qb.push(", remark = ").push_bind(remark);
268 }
269 qb.push(" WHERE id = ").push_bind(id as i64);
270 qb.push(" AND deleted_at IS NULL");
271 qb.build().execute(pool).await.context(SqlxSnafu)?;
272 Ok(())
273 }
274
275 async fn delete_by_id(&self, pool: &Pool<Postgres>, id: u64) -> Result<()> {
276 sqlx::query(
277 r#"UPDATE token_accounts SET deleted_at = NOW(), modified = NOW() WHERE id = $1 AND deleted_at IS NULL"#,
278 )
279 .bind(id as i64)
280 .execute(pool)
281 .await
282 .context(SqlxSnafu)?;
283 Ok(())
284 }
285
286 async fn count(&self, pool: &Pool<Postgres>, params: &ModelListParams) -> Result<i64> {
287 let mut qb: QueryBuilder<Postgres> =
288 QueryBuilder::new("SELECT COUNT(*) FROM token_accounts");
289 self.push_conditions(&mut qb, params)?;
290 let row: (i64,) = qb
291 .build_query_as()
292 .fetch_one(pool)
293 .await
294 .context(SqlxSnafu)?;
295 Ok(row.0)
296 }
297
298 async fn list(
299 &self,
300 pool: &Pool<Postgres>,
301 params: &ModelListParams,
302 ) -> Result<Vec<Self::Output>> {
303 let mut qb: QueryBuilder<Postgres> = QueryBuilder::new("SELECT * FROM token_accounts");
304 self.push_conditions(&mut qb, params)?;
305 params.push_pagination(&mut qb);
306 let rows = qb
307 .build_query_as::<TokenAccountSchema>()
308 .fetch_all(pool)
309 .await
310 .context(SqlxSnafu)?;
311 Ok(rows.into_iter().map(Into::into).collect())
312 }
313
314 fn push_filter_conditions<'args>(
315 &self,
316 qb: &mut QueryBuilder<'args, Postgres>,
317 filters: &HashMap<String, String>,
318 ) -> Result<()> {
319 if let Some(status) = filters.get("status") {
320 if let Ok(s) = status.parse::<i16>() {
321 qb.push(" AND status = ").push_bind(s);
322 }
323 }
324 Ok(())
325 }
326}