rbatis/
executor.rs

1use crate::decode::decode;
2use crate::rbatis::RBatis;
3use crate::Error;
4use dark_std::sync::SyncVec;
5use futures::Future;
6use futures_core::future::BoxFuture;
7use rbdc::db::{Connection, ExecResult};
8use rbdc::rt::tokio::sync::Mutex;
9use rbs::Value;
10use serde::de::DeserializeOwned;
11use std::any::Any;
12use std::fmt::{Debug, Formatter};
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::sync::Arc;
15use crate::intercept::ResultType;
16
17/// the RBatis Executor. this trait impl with structs = RBatis,RBatisConnExecutor,RBatisTxExecutor,RBatisTxExecutorGuard
18pub trait Executor: RBatisRef + Send + Sync {
19    fn id(&self) -> i64;
20    fn name(&self) -> &str {
21        std::any::type_name::<Self>()
22    }
23    fn exec(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>>;
24    fn query(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<Value, Error>>;
25    fn as_any(&self) -> &dyn Any
26    where
27        Self: Sized,
28    {
29        self
30    }
31}
32
33pub trait RBatisRef: Any + Send + Sync {
34    fn rb_ref(&self) -> &RBatis;
35
36    fn driver_type(&self) -> crate::Result<&str> {
37        self.rb_ref().driver_type()
38    }
39}
40
41impl RBatisRef for RBatis {
42    fn rb_ref(&self) -> &RBatis {
43        self
44    }
45}
46
47pub struct RBatisConnExecutor {
48    pub id: i64,
49    pub rb: RBatis,
50    pub conn: Mutex<Box<dyn Connection>>,
51}
52
53impl RBatisConnExecutor {
54    pub fn new(id: i64, conn: Box<dyn Connection>, rb: RBatis) -> Self {
55        Self {
56            id: id,
57            conn: Mutex::new(conn),
58            rb: rb,
59        }
60    }
61}
62
63impl Debug for RBatisConnExecutor {
64    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
65        f.debug_struct("RBatisTxExecutor")
66            .field("id", &self.id)
67            .field("rb", &self.rb)
68            .finish()
69    }
70}
71
72impl RBatisConnExecutor {
73    pub async fn exec(&self, sql: &str, args: Vec<Value>) -> Result<ExecResult, Error> {
74        let v = Executor::exec(self, sql, args).await?;
75        Ok(v)
76    }
77
78    pub async fn query(&self, sql: &str, args: Vec<Value>) -> Result<Value, Error> {
79        let v = Executor::query(self, sql, args).await?;
80        Ok(v)
81    }
82
83    pub async fn query_decode<T>(&self, sql: &str, args: Vec<Value>) -> Result<T, Error>
84    where
85        T: DeserializeOwned,
86    {
87        let v = Executor::query(self, sql, args).await?;
88        Ok(decode(v)?)
89    }
90}
91
92impl Executor for RBatisConnExecutor {
93    fn id(&self) -> i64 {
94        self.id
95    }
96
97    fn exec(&self, sql: &str, mut args: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
98        let mut sql = sql.to_string();
99        Box::pin(async move {
100            let rb_task_id = self.rb.task_id_generator.generate();
101            let mut before_result = Err(Error::from(""));
102            for item in self.rb_ref().intercepts.iter() {
103                let next = item
104                    .before(
105                        rb_task_id,
106                        self as &dyn Executor,
107                        &mut sql,
108                        &mut args,
109                        ResultType::Exec(&mut before_result),
110                    )
111                    .await?;
112                if let Some(next) = next {
113                    if !next {
114                        break;
115                    }
116                } else {
117                    return before_result;
118                }
119            }
120            let mut args_after = args.clone();
121            let mut result = self.conn.lock().await.exec(&sql, args).await;
122            for item in self.rb_ref().intercepts.iter() {
123                let next = item
124                    .after(
125                        rb_task_id,
126                        self as &dyn Executor,
127                        &mut sql,
128                        &mut args_after,
129                        ResultType::Exec(&mut result),
130                    )
131                    .await?;
132                if let Some(next) = next {
133                    if !next {
134                        break;
135                    }
136                } else {
137                    return result;
138                }
139            }
140            result
141        })
142    }
143
144    fn query(&self, sql: &str, mut args: Vec<Value>) -> BoxFuture<'_, Result<Value, Error>> {
145        let mut sql = sql.to_string();
146        Box::pin(async move {
147            let rb_task_id = self.rb.task_id_generator.generate();
148            let mut before_result = Err(Error::from(""));
149            for item in self.rb_ref().intercepts.iter() {
150                let next = item
151                    .before(
152                        rb_task_id,
153                        self,
154                        &mut sql,
155                        &mut args,
156                        ResultType::Query(&mut before_result),
157                    )
158                    .await?;
159                if let Some(next) = next {
160                    if !next {
161                        break;
162                    }
163                } else {
164                    return before_result.map(|v| Value::from(v));
165                }
166            }
167            let mut conn = self.conn.lock().await;
168            let mut args_after = args.clone();
169            let mut result = conn.get_values(&sql, args).await;
170            for item in self.rb_ref().intercepts.iter() {
171                let next = item
172                    .after(
173                        rb_task_id,
174                        self,
175                        &mut sql,
176                        &mut args_after,
177                        ResultType::Query(&mut result),
178                    )
179                    .await?;
180                if let Some(next) = next {
181                    if !next {
182                        break;
183                    }
184                } else {
185                    return result.map(|v| Value::from(v));
186                }
187            }
188            Ok(Value::Array(result?))
189        })
190    }
191}
192
193impl RBatisRef for RBatisConnExecutor {
194    fn rb_ref(&self) -> &RBatis {
195        &self.rb
196    }
197}
198
199impl RBatisConnExecutor {
200    pub fn begin(self) -> BoxFuture<'static, Result<RBatisTxExecutor, Error>> {
201        Box::pin(async move {
202            let mut conn = self.conn.into_inner();
203            conn.begin().await?;
204            Ok(RBatisTxExecutor::new(
205                self.rb.task_id_generator.generate(),
206                self.rb,
207                conn,
208            ))
209        })
210    }
211
212    pub fn rollback(&self) -> BoxFuture<'_, Result<(), Error>> {
213        Box::pin(async { Ok(self.conn.lock().await.rollback().await?) })
214    }
215
216    pub fn commit(&self) -> BoxFuture<'_, Result<(), Error>> {
217        Box::pin(async { Ok(self.conn.lock().await.commit().await?) })
218    }
219}
220
221/// `RBatisTxExecutor` is a type that represents an executor for transactional operations in RBatis.
222///
223/// # Type Description
224///
225/// The `RBatisTxExecutor` is responsible for executing SQL statements within the context of a transaction.
226/// It provides methods to execute queries, updates, and other SQL operations, ensuring that all operations
227/// are part of the same transactional context.
228#[derive(Clone)]
229pub struct RBatisTxExecutor {
230    pub tx_id: i64,
231    pub conn: Arc<Mutex<Box<dyn Connection>>>,
232    pub rb: RBatis,
233    /// please use tx.done()
234    /// if tx call .commit() or .rollback() done = true.
235    /// if tx not call .commit() or .rollback() done = false
236    done: Arc<AtomicBool>,
237}
238
239impl Debug for RBatisTxExecutor {
240    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
241        f.debug_struct("RBatisTxExecutor")
242            .field("tx_id", &self.tx_id)
243            .field("rb", &self.rb)
244            .field("done", &self.done)
245            .finish()
246    }
247}
248
249impl<'a> RBatisTxExecutor {
250    pub fn new(tx_id: i64, rb: RBatis, conn: Box<dyn Connection>) -> Self {
251        RBatisTxExecutor {
252            tx_id: tx_id,
253            conn: Arc::new(Mutex::new(conn)),
254            rb: rb,
255            done: Arc::new(AtomicBool::new(false)),
256        }
257    }
258
259    /// exec
260    pub async fn exec(&self, sql: &str, args: Vec<Value>) -> Result<ExecResult, Error> {
261        let v = Executor::exec(self, sql, args).await?;
262        Ok(v)
263    }
264    /// query value
265    pub async fn query(&self, sql: &str, args: Vec<Value>) -> Result<Value, Error> {
266        let v = Executor::query(self, sql, args).await?;
267        Ok(v)
268    }
269    /// query and decode
270    pub async fn query_decode<T>(&self, sql: &str, args: Vec<Value>) -> Result<T, Error>
271    where
272        T: DeserializeOwned,
273    {
274        let v = Executor::query(self, sql, args).await?;
275        Ok(decode(v)?)
276    }
277
278    pub fn begin(self) -> BoxFuture<'static, Result<Self, Error>> {
279        Box::pin(async move {
280            self.conn.lock().await.begin().await?;
281            Ok(self)
282        })
283    }
284
285    pub fn rollback(&self) -> BoxFuture<'_, Result<(), Error>> {
286        Box::pin(async {
287            let r = self.conn.lock().await.rollback().await?;
288            self.done.store(true, Ordering::Relaxed);
289            Ok(r)
290        })
291    }
292
293    pub fn commit(&self) -> BoxFuture<'_, Result<(), Error>> {
294        Box::pin(async {
295            let r = self.conn.lock().await.commit().await?;
296            self.done.store(true, Ordering::Relaxed);
297            Ok(r)
298        })
299    }
300
301    /// tx is done?
302    pub fn done(&self) -> bool {
303        self.done.load(Ordering::Relaxed)
304    }
305
306    ///change is done
307    pub fn set_done(&self, done: bool) {
308        self.done.store(done, Ordering::Relaxed)
309    }
310}
311
312impl Executor for RBatisTxExecutor {
313    fn id(&self) -> i64 {
314        self.tx_id
315    }
316
317    fn exec(&self, sql: &str, mut args: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
318        let mut sql = sql.to_string();
319        Box::pin(async move {
320            let mut before_result = Err(Error::from(""));
321            for item in self.rb_ref().intercepts.iter() {
322                let next = item
323                    .before(
324                        self.tx_id,
325                        self,
326                        &mut sql,
327                        &mut args,
328                        ResultType::Exec(&mut before_result),
329                    )
330                    .await?;
331                if let Some(next) = next {
332                    if !next {
333                        break;
334                    }
335                } else {
336                    return before_result;
337                }
338            }
339            let mut args_after = args.clone();
340            let mut result = self.conn.lock().await.exec(&sql, args).await;
341            for item in self.rb_ref().intercepts.iter() {
342                let next = item
343                    .after(
344                        self.tx_id,
345                        self,
346                        &mut sql,
347                        &mut args_after,
348                        ResultType::Exec(&mut result),
349                    )
350                    .await?;
351                if let Some(next) = next {
352                    if !next {
353                        break;
354                    }
355                } else {
356                    return result;
357                }
358            }
359            result
360        })
361    }
362
363    fn query(&self, sql: &str, mut args: Vec<Value>) -> BoxFuture<'_, Result<Value, Error>> {
364        let mut sql = sql.to_string();
365        Box::pin(async move {
366            let mut before_result = Err(Error::from(""));
367            for item in self.rb_ref().intercepts.iter() {
368                let next = item
369                    .before(
370                        self.tx_id,
371                        self,
372                        &mut sql,
373                        &mut args,
374                        ResultType::Query(&mut before_result),
375                    )
376                    .await?;
377                if let Some(next) = next {
378                    if !next {
379                        break;
380                    }
381                } else {
382                    return before_result.map(|v| Value::from(v));
383                }
384            }
385            let mut conn = self.conn.lock().await;
386            let mut args_after = args.clone();
387            let conn = conn.get_values(&sql, args);
388            let mut result = conn.await;
389            for item in self.rb_ref().intercepts.iter() {
390                let next = item
391                    .after(
392                        self.tx_id,
393                        self,
394                        &mut sql,
395                        &mut args_after,
396                        ResultType::Query(&mut result),
397                    )
398                    .await?;
399                if let Some(next) = next {
400                    if !next {
401                        break;
402                    }
403                } else {
404                    return result.map(|v| Value::from(v));
405                }
406            }
407            Ok(Value::Array(result?))
408        })
409    }
410}
411
412impl RBatisRef for RBatisTxExecutor {
413    fn rb_ref(&self) -> &RBatis {
414        &self.rb
415    }
416}
417
418impl RBatisTxExecutor {
419    pub fn take_connection(self) -> Option<Box<dyn Connection>> {
420        match Arc::into_inner(self.conn) {
421            None => None,
422            Some(v) => {
423                let inner = v.into_inner();
424                Some(inner)
425            }
426        }
427    }
428}
429
430/// `RBatisTxExecutorGuard` is a guard object that manages transactions for RBatis.
431///
432/// # Type Description
433///
434/// The `RBatisTxExecutorGuard` implements the `Drop` trait to ensure that transactions are
435/// automatically committed or rolled back when the guard goes out of scope. It encapsulates
436/// the transaction executor and provides a set of methods to manipulate database transactions.
437#[derive(Clone)]
438pub struct RBatisTxExecutorGuard {
439    pub tx: RBatisTxExecutor,
440    pub callback: Arc<dyn FnMut(RBatisTxExecutor) + Send + Sync>,
441}
442
443impl Debug for RBatisTxExecutorGuard {
444    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
445        f.debug_struct("RBatisTxExecutorGuard")
446            .field("tx", &self.tx)
447            .finish()
448    }
449}
450
451impl RBatisTxExecutorGuard {
452    pub fn tx_id(&self) -> i64 {
453        self.tx.tx_id
454    }
455
456    pub async fn commit(&self) -> crate::Result<()> {
457        self.tx.commit().await?;
458        Ok(())
459    }
460
461    pub async fn rollback(&self) -> crate::Result<()> {
462        self.tx.rollback().await?;
463        Ok(())
464    }
465
466    pub fn take_connection(self) -> Option<Box<dyn Connection>> {
467        self.tx.clone().take_connection()
468    }
469
470    pub async fn query_decode<T>(&self, sql: &str, args: Vec<Value>) -> Result<T, Error>
471    where
472        T: DeserializeOwned,
473    {
474        self.tx.query_decode(sql, args).await
475    }
476}
477
478impl RBatisTxExecutor {
479    /// defer and use future method
480    /// for example:
481    /// ```rust
482    ///  use rbatis::executor::RBatisTxExecutor;
483    ///  use rbatis::{Error, RBatis};
484    ///
485    ///  async fn test_tx(tx:RBatisTxExecutor) -> Result<(),Error>{
486    ///         tx.defer_async(|tx| async move {
487    ///              if !tx.done(){ let _ = tx.rollback().await; }
488    ///         });
489    ///     Ok(())
490    /// }
491    /// ```
492    pub fn defer_async<F>(&self, callback: fn(s: RBatisTxExecutor) -> F) -> RBatisTxExecutorGuard
493    where
494        F: Future<Output = ()> + Send + 'static,
495    {
496        RBatisTxExecutorGuard {
497            tx: self.clone(),
498            callback: Arc::new(move |arg| {
499                let future = callback(arg);
500                rbdc::rt::spawn(future);
501            }),
502        }
503    }
504}
505
506impl Drop for RBatisTxExecutorGuard {
507    fn drop(&mut self) {
508        match Arc::get_mut(&mut self.callback) {
509            None => {}
510            Some(callback) => {
511                callback(self.tx.clone());
512            }
513        }
514    }
515}
516
517impl RBatisRef for RBatisTxExecutorGuard {
518    fn rb_ref(&self) -> &RBatis {
519        self.tx.rb_ref()
520    }
521}
522
523impl Executor for RBatisTxExecutorGuard {
524    fn id(&self) -> i64 {
525        self.tx.id()
526    }
527
528    fn exec(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
529        let sql = sql.to_string();
530        Box::pin(async move { self.tx.exec(&sql, args).await })
531    }
532
533    fn query(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<Value, Error>> {
534        let sql = sql.to_string();
535        Box::pin(async move { self.tx.query(&sql, args).await })
536    }
537}
538
539impl RBatis {
540    /// exec sql
541    pub async fn exec(&self, sql: &str, args: Vec<Value>) -> Result<ExecResult, Error> {
542        let conn = self.acquire().await?;
543        conn.exec(sql, args).await
544    }
545
546    /// query raw Value
547    pub async fn query(&self, sql: &str, args: Vec<Value>) -> Result<Value, Error> {
548        let conn = self.acquire().await?;
549        let v = conn.query(sql, args).await?;
550        Ok(v)
551    }
552
553    /// query and decode
554    pub async fn query_decode<T>(&self, sql: &str, args: Vec<Value>) -> Result<T, Error>
555    where
556        T: DeserializeOwned,
557    {
558        let conn = self.acquire().await?;
559        let v = conn.query(sql, args).await?;
560        Ok(decode(v)?)
561    }
562}
563
564impl Executor for RBatis {
565    fn id(&self) -> i64 {
566        0
567    }
568
569    fn exec(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
570        let sql = sql.to_string();
571        Box::pin(async move {
572            let conn = self.acquire().await?;
573            conn.exec(&sql, args).await
574        })
575    }
576
577    fn query(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<Value, Error>> {
578        let sql = sql.to_string();
579        Box::pin(async move {
580            let conn = self.acquire().await?;
581            conn.query(&sql, args).await
582        })
583    }
584}
585
586#[derive(Debug)]
587pub struct TempExecutor<'a> {
588    pub rb: &'a RBatis,
589    pub sql: SyncVec<String>,
590    pub args: SyncVec<Vec<Value>>,
591}
592
593impl<'a> TempExecutor<'a> {
594    pub fn new(rb: &'a RBatis) -> Self {
595        Self {
596            rb: rb,
597            sql: SyncVec::new(),
598            args: SyncVec::new(),
599        }
600    }
601
602    pub fn clear_sql(&self) -> Vec<String> {
603        let mut arr = vec![];
604        loop {
605            if let Some(v) = self.sql.remove(0) {
606                arr.push(v);
607            } else {
608                break;
609            }
610        }
611        arr
612    }
613
614    pub fn clear_args(&self) -> Vec<Vec<Value>> {
615        let mut arr = vec![];
616        loop {
617            if let Some(v) = self.args.remove(0) {
618                arr.push(v);
619            } else {
620                break;
621            }
622        }
623        arr
624    }
625}
626
627impl RBatisRef for TempExecutor<'static> {
628    fn rb_ref(&self) -> &RBatis {
629        self.rb
630    }
631}
632
633impl Executor for TempExecutor<'static> {
634    fn id(&self) -> i64 {
635        0
636    }
637
638    fn exec(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
639        self.sql.push(sql.to_string());
640        self.args.push(args);
641        Box::pin(async { Ok(ExecResult::default()) })
642    }
643
644    fn query(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<Value, Error>> {
645        self.sql.push(sql.to_string());
646        self.args.push(args);
647        Box::pin(async { Ok(Value::default()) })
648    }
649}