1#![allow(dead_code)]
2#![allow(clippy::extra_unused_lifetimes)]
3#![allow(clippy::needless_lifetimes)]
4
5use parking_lot::ArcMutexGuard;
6use sqlx::prelude::FromRow;
7use sqlx::{QueryBuilder, Transaction};
8use sqlx_utils::prelude::*;
9use std::sync::{Arc, LazyLock};
10use std::time::Duration;
11
12pub static DATABASE_URL: LazyLock<String> =
13 LazyLock::new(|| std::env::var("DATABASE_URL").expect("failed to get DATABASE_URL"));
14
15sql_filter! {
16 pub struct UserFilter<UserRepo> {
17 SELECT id, name, email as username FROM users WHERE (?id = i64 AND name LIKE String)
18 }
19}
20
21#[derive(Clone, FromRow)]
22pub struct User {
23 id: i64,
24 name: String,
25}
26
27impl Model for User {
28 type Id = i64;
29
30 fn get_id(&self) -> Option<i64> {
31 Some(self.id)
32 }
33}
34
35repository! {
36 pub UserRepo<User>;
37}
38
39repository! {
40 !zst
41 pub UserRepo2<User>;
42}
43
44repository_insert! {
45 UserRepo<User>;
46
47 insert_query(model) {
48 sqlx::query("INSERT INTO users (name) VALUES (?)").bind(&model.name)
49 }
50}
51
52repository_update! {
53 UserRepo<User>;
54
55 update_query(model) {
56 sqlx::query("UPDATE users SET name = ? where id = ?").bind(model.id).bind(&model.name)
57 }
58}
59
60repository_delete! {
61 UserRepo<User>;
62
63 delete_by_id_query(id) {
64 sqlx::query("DELETE FROM users WHERE id = ?").bind(id)
65 }
66
67 delete_by_filter_query(filter) {
68 let mut builder = QueryBuilder::new("DELETE FROM users WHERE ");
69
70 filter.apply_filter(&mut builder);
71
72 builder
73 }
74}
75
76pub enum UserContext {
77 System,
78 UnAuthenticated,
79}
80
81#[derive(Debug)]
82pub enum DbError {
83 SqlxError(sqlx::Error),
84 SqlxUtils(sqlx_utils::Error),
85 NotAllowed,
86}
87
88impl From<sqlx::Error> for DbError {
89 fn from(e: sqlx::Error) -> Self {
90 DbError::SqlxError(e)
91 }
92}
93
94impl From<sqlx_utils::Error> for DbError {
95 fn from(e: sqlx_utils::Error) -> Self {
96 DbError::SqlxUtils(e)
97 }
98}
99
100impl UserRepo {
101 pub async fn save_with_context(
102 &self,
103 model: User,
104 context: UserContext,
105 ) -> Result<User, DbError> {
106 self.with_transaction(move |mut tx| async move {
107 let res = match context {
108 UserContext::System => self
109 .save_with_executor(&mut *tx, model)
110 .await
111 .map_err(Into::into),
112 UserContext::UnAuthenticated => Err(DbError::NotAllowed),
113 };
114
115 (res, tx)
116 })
117 .await
118 }
119
120 pub async fn save_with_tx<'a, 'b>(&'a self, model: User) -> Result<Vec<User>, DbError> {
121 self.transaction_sequential::<'a, 'b>([
122 move |mut tx: Transaction<'b, Database>| async move {
123 let res = self.save_with_executor(&mut *tx, model).await;
124
125 (res, tx)
126 },
127 ])
128 .await
129 .map_err(Into::into)
130 }
131
132 pub async fn save_with_rx_concurrent<'a, 'b>(
133 &'a self,
134 model: User,
135 ) -> Result<Vec<User>, DbError>
136 where
137 'b: 'a,
138 {
139 self.transaction_concurrent::<'a, 'b>([
140 |tx: Arc<parking_lot::Mutex<Transaction<'b, Database>>>| async move {
141 let mut tx = match tx.try_lock_arc() {
142 Some(tx) => tx,
143 None => return Err(Error::MutexLockError),
144 };
145
146 let res = USER_REPO.save_with_executor(&mut **tx, model).await;
147
148 ArcMutexGuard::<parking_lot::RawMutex, Transaction<'b, sqlx::Any>>::unlock_fair(tx);
149
150 res
151 },
152 ])
153 .await
154 .map_err(Into::into)
155 }
156
157 pub async fn try_save_with_tx<'a, 'b>(
158 &'a self,
159 model: User,
160 ) -> Result<Vec<User>, Vec<DbError>> {
161 self.try_transaction::<'a, 'b>([move |mut tx: Transaction<'b, Database>| async move {
162 let res = self.save_with_executor(&mut *tx, model).await;
163
164 (res, tx)
165 }])
166 .await
167 .map_err(|errors| errors.into_iter().map(Into::into).collect())
168 }
169}
170
171async fn action<'b>(
172 _: Transaction<'b, Database>,
173) -> (Result<User, DbError>, Transaction<'b, Database>) {
174 unimplemented!()
175}
176
177#[tokio::main]
178async fn main() {
179 install_default_drivers();
180
181 initialize_db_pool(
182 PoolOptions::new()
183 .max_connections(21)
184 .min_connections(5)
185 .idle_timeout(Duration::from_secs(60 * 10))
186 .max_lifetime(Duration::from_secs(60 * 60 * 24))
187 .acquire_timeout(Duration::from_secs(20))
188 .connect(&DATABASE_URL)
189 .await
190 .expect("Failed to connect to database"),
191 );
192
193 let user = User {
194 id: 1,
195 name: String::new(),
196 };
197
198 USER_REPO.save_ref(&user).await.unwrap();
199
200 USER_REPO.save_in_transaction(user.clone()).await.unwrap();
201
202 USER_REPO
203 .save_with_context(user.clone(), UserContext::System)
204 .await
205 .unwrap();
206
207 USER_REPO.with_transaction(action).await.unwrap();
208
209 USER_REPO
210 .delete_by_values_in_transaction("id", [1, 2, 3, 11, 22])
211 .await
212 .unwrap();
213
214 USER_REPO
215 .get_one_by_filter(UserFilter::new("name").id(1))
216 .await
217 .unwrap();
218}