basic_repo/
basic_repo.rs

1#![allow(dead_code)]
2#![allow(clippy::extra_unused_lifetimes)]
3#![allow(clippy::needless_lifetimes)]
4
5use parking_lot::ArcMutexGuard;
6use sqlx::prelude::FromRow;
7use sqlx::{QueryBuilder, Transaction};
8use sqlx_utils::prelude::*;
9use std::sync::{Arc, LazyLock};
10use std::time::Duration;
11
12pub static DATABASE_URL: LazyLock<String> =
13    LazyLock::new(|| std::env::var("DATABASE_URL").expect("failed to get DATABASE_URL"));
14
15sql_filter! {
16    pub struct UserFilter<UserRepo> {
17        SELECT id, name, email as username FROM users WHERE (?id = i64 AND name LIKE String)
18    }
19}
20
21#[derive(Clone, FromRow)]
22pub struct User {
23    id: i64,
24    name: String,
25}
26
27impl Model for User {
28    type Id = i64;
29
30    fn get_id(&self) -> Option<i64> {
31        Some(self.id)
32    }
33}
34
35repository! {
36    pub UserRepo<User>;
37}
38
39repository! {
40    !zst
41    pub UserRepo2<User>;
42}
43
44repository_insert! {
45    UserRepo<User>;
46
47    insert_query(model) {
48        sqlx::query("INSERT INTO users (name) VALUES (?)").bind(&model.name)
49    }
50}
51
52repository_update! {
53    UserRepo<User>;
54
55    update_query(model) {
56        sqlx::query("UPDATE users SET name = ? where id = ?").bind(model.id).bind(&model.name)
57    }
58}
59
60repository_delete! {
61    UserRepo<User>;
62
63    delete_by_id_query(id) {
64        sqlx::query("DELETE FROM users WHERE id = ?").bind(id)
65    }
66
67    delete_by_filter_query(filter) {
68        let mut builder = QueryBuilder::new("DELETE FROM users WHERE ");
69
70        filter.apply_filter(&mut builder);
71
72        builder
73    }
74}
75
76pub enum UserContext {
77    System,
78    UnAuthenticated,
79}
80
81#[derive(Debug)]
82pub enum DbError {
83    SqlxError(sqlx::Error),
84    SqlxUtils(sqlx_utils::Error),
85    NotAllowed,
86}
87
88impl From<sqlx::Error> for DbError {
89    fn from(e: sqlx::Error) -> Self {
90        DbError::SqlxError(e)
91    }
92}
93
94impl From<sqlx_utils::Error> for DbError {
95    fn from(e: sqlx_utils::Error) -> Self {
96        DbError::SqlxUtils(e)
97    }
98}
99
100impl UserRepo {
101    pub async fn save_with_context(
102        &self,
103        model: User,
104        context: UserContext,
105    ) -> Result<User, DbError> {
106        self.with_transaction(move |mut tx| async move {
107            let res = match context {
108                UserContext::System => self
109                    .save_with_executor(&mut *tx, model)
110                    .await
111                    .map_err(Into::into),
112                UserContext::UnAuthenticated => Err(DbError::NotAllowed),
113            };
114
115            (res, tx)
116        })
117        .await
118    }
119
120    pub async fn save_with_tx<'a, 'b>(&'a self, model: User) -> Result<Vec<User>, DbError> {
121        self.transaction_sequential::<'a, 'b>([
122            move |mut tx: Transaction<'b, Database>| async move {
123                let res = self.save_with_executor(&mut *tx, model).await;
124
125                (res, tx)
126            },
127        ])
128        .await
129        .map_err(Into::into)
130    }
131
132    pub async fn save_with_rx_concurrent<'a, 'b>(
133        &'a self,
134        model: User,
135    ) -> Result<Vec<User>, DbError>
136    where
137        'b: 'a,
138    {
139        self.transaction_concurrent::<'a, 'b>([
140            |tx: Arc<parking_lot::Mutex<Transaction<'b, Database>>>| async move {
141                let mut tx = match tx.try_lock_arc() {
142                    Some(tx) => tx,
143                    None => return Err(Error::MutexLockError),
144                };
145
146                let res = USER_REPO.save_with_executor(&mut **tx, model).await;
147
148                ArcMutexGuard::<parking_lot::RawMutex, Transaction<'b, sqlx::Any>>::unlock_fair(tx);
149
150                res
151            },
152        ])
153        .await
154        .map_err(Into::into)
155    }
156
157    pub async fn try_save_with_tx<'a, 'b>(
158        &'a self,
159        model: User,
160    ) -> Result<Vec<User>, Vec<DbError>> {
161        self.try_transaction::<'a, 'b>([move |mut tx: Transaction<'b, Database>| async move {
162            let res = self.save_with_executor(&mut *tx, model).await;
163
164            (res, tx)
165        }])
166        .await
167        .map_err(|errors| errors.into_iter().map(Into::into).collect())
168    }
169}
170
171async fn action<'b>(
172    _: Transaction<'b, Database>,
173) -> (Result<User, DbError>, Transaction<'b, Database>) {
174    unimplemented!()
175}
176
177#[tokio::main]
178async fn main() {
179    install_default_drivers();
180
181    initialize_db_pool(
182        PoolOptions::new()
183            .max_connections(21)
184            .min_connections(5)
185            .idle_timeout(Duration::from_secs(60 * 10))
186            .max_lifetime(Duration::from_secs(60 * 60 * 24))
187            .acquire_timeout(Duration::from_secs(20))
188            .connect(&DATABASE_URL)
189            .await
190            .expect("Failed to connect to database"),
191    );
192
193    let user = User {
194        id: 1,
195        name: String::new(),
196    };
197
198    USER_REPO.save_ref(&user).await.unwrap();
199
200    USER_REPO.save_in_transaction(user.clone()).await.unwrap();
201
202    USER_REPO
203        .save_with_context(user.clone(), UserContext::System)
204        .await
205        .unwrap();
206
207    USER_REPO.with_transaction(action).await.unwrap();
208
209    USER_REPO
210        .delete_by_values_in_transaction("id", [1, 2, 3, 11, 22])
211        .await
212        .unwrap();
213
214    USER_REPO
215        .get_one_by_filter(UserFilter::new("name").id(1))
216        .await
217        .unwrap();
218}