basic_repo/
basic_repo.rs

1#![allow(dead_code)]
2
3use parking_lot::ArcMutexGuard;
4use sqlx::{QueryBuilder, Transaction};
5use sqlx_utils::prelude::*;
6use std::sync::{Arc, LazyLock};
7use std::time::Duration;
8
9pub static DATABASE_URL: LazyLock<String> =
10    LazyLock::new(|| std::env::var("DATABASE_URL").expect("failed to get DATABASE_URL"));
11
12sql_filter! {
13    pub struct UserFilter {
14        SELECT * FROM users WHERE ?id = i64
15    }
16}
17
18#[derive(Clone)]
19pub struct User {
20    id: i64,
21    name: String,
22}
23
24impl Model for User {
25    type Id = i64;
26
27    fn get_id(&self) -> Option<i64> {
28        Some(self.id)
29    }
30}
31
32repository! {
33    pub UserRepo<User>;
34}
35
36repository! {
37    !zst
38    pub UserRepo2<User>;
39}
40
41repository_insert! {
42    UserRepo<User>;
43
44    insert_query(model) {
45        sqlx::query("INSERT INTO users (name) VALUES (?)").bind(&model.name)
46    }
47}
48
49repository_update! {
50    UserRepo<User>;
51
52    update_query(model) {
53        sqlx::query("UPDATE users SET name = ? where id = ?").bind(model.id).bind(&model.name)
54    }
55}
56
57repository_delete! {
58    UserRepo<User>;
59
60    delete_by_id_query(id) {
61        sqlx::query("DELETE FROM users WHERE id = ?").bind(id)
62    }
63
64    delete_by_filter_query(filter) {
65        let mut builder = QueryBuilder::new("DELETE FROM users WHERE ");
66
67        filter.apply_filter(&mut builder);
68
69        builder
70    }
71}
72
73pub enum UserContext {
74    System,
75    UnAuthenticated,
76}
77
78#[derive(Debug)]
79pub enum DbError {
80    SqlxError(sqlx::Error),
81    SqlxUtils(sqlx_utils::Error),
82    NotAllowed,
83}
84
85impl From<sqlx::Error> for DbError {
86    fn from(e: sqlx::Error) -> Self {
87        DbError::SqlxError(e)
88    }
89}
90
91impl From<sqlx_utils::Error> for DbError {
92    fn from(e: sqlx_utils::Error) -> Self {
93        DbError::SqlxUtils(e)
94    }
95}
96
97impl UserRepo {
98    pub async fn save_with_context(
99        &self,
100        model: User,
101        context: UserContext,
102    ) -> Result<User, DbError> {
103        self.with_transaction(move |mut tx| async move {
104            let res = match context {
105                UserContext::System => self
106                    .save_with_executor(&mut *tx, model)
107                    .await
108                    .map_err(Into::into),
109                UserContext::UnAuthenticated => Err(DbError::NotAllowed),
110            };
111
112            (res, tx)
113        })
114        .await
115    }
116
117    pub async fn save_with_tx<'a, 'b>(&'a self, model: User) -> Result<Vec<User>, DbError> {
118        self.transaction_sequential::<'a, 'b>([
119            move |mut tx: Transaction<'b, Database>| async move {
120                let res = self.save_with_executor(&mut *tx, model).await;
121
122                (res, tx)
123            },
124        ])
125        .await
126        .map_err(Into::into)
127    }
128
129    pub async fn save_with_rx_concurrent<'a, 'b>(
130        &'a self,
131        model: User,
132    ) -> Result<Vec<User>, DbError>
133    where
134        'b: 'a,
135    {
136        self.transaction_concurrent::<'a, 'b>([
137            |tx: Arc<parking_lot::Mutex<Transaction<'b, Database>>>| async move {
138                let mut tx = match tx.try_lock_arc() {
139                    Some(tx) => tx,
140                    None => return Err(Error::MutexLockError),
141                };
142
143                let res = USER_REPO.save_with_executor(&mut **tx, model).await;
144
145                ArcMutexGuard::<parking_lot::RawMutex, Transaction<'b, sqlx::Any>>::unlock_fair(tx);
146
147                res
148            },
149        ])
150        .await
151        .map_err(Into::into)
152    }
153
154    pub async fn try_save_with_tx<'a, 'b>(
155        &'a self,
156        model: User,
157    ) -> Result<Vec<User>, Vec<DbError>> {
158        self.try_transaction::<'a, 'b>([move |mut tx: Transaction<'b, Database>| async move {
159            let res = self.save_with_executor(&mut *tx, model).await;
160
161            (res, tx)
162        }])
163        .await
164        .map_err(|errors| errors.into_iter().map(Into::into).collect())
165    }
166}
167
168async fn action<'b>(
169    _: Transaction<'b, Database>,
170) -> (Result<User, DbError>, Transaction<'b, Database>) {
171    unimplemented!()
172}
173
174#[tokio::main]
175async fn main() {
176    install_default_drivers();
177
178    initialize_db_pool(
179        PoolOptions::new()
180            .max_connections(21)
181            .min_connections(5)
182            .idle_timeout(Duration::from_secs(60 * 10))
183            .max_lifetime(Duration::from_secs(60 * 60 * 24))
184            .acquire_timeout(Duration::from_secs(20))
185            .connect(&DATABASE_URL)
186            .await
187            .expect("Failed to connect to database"),
188    );
189
190    let user = User {
191        id: 1,
192        name: String::new(),
193    };
194
195    USER_REPO.save_ref(&user).await.unwrap();
196
197    USER_REPO.save_in_transaction(user.clone()).await.unwrap();
198
199    USER_REPO
200        .save_with_context(user.clone(), UserContext::System)
201        .await
202        .unwrap();
203
204    USER_REPO.with_transaction(action).await.unwrap();
205
206    USER_REPO
207        .delete_by_values_in_transaction("id", [1, 2, 3, 11, 22])
208        .await
209        .unwrap();
210}