Skip to main content

rbatis/
executor.rs

1use crate::decode::decode;
2use crate::intercept::ResultType;
3use crate::rbatis::RBatis;
4use crate::{Action, Error};
5use futures::Future;
6use futures_core::future::BoxFuture;
7use rbdc::db::{Connection, ExecResult};
8use rbdc::rt::tokio::sync::Mutex;
9use rbs::Value;
10use serde::de::DeserializeOwned;
11use std::any::Any;
12use std::fmt::{Debug, Formatter};
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::sync::Arc;
15
16/// the RBatis Executor. this trait impl with structs = RBatis,RBatisConnExecutor,RBatisTxExecutor,RBatisTxExecutorGuard
17pub trait Executor: RBatisRef + Send + Sync {
18    fn id(&self) -> i64;
19
20    fn name(&self) -> &str {
21        std::any::type_name::<Self>()
22    }
23
24    fn exec(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>>;
25    fn query(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<Value, Error>>;
26}
27
28pub trait RBatisRef: Any + Send + Sync {
29    fn rb_ref(&self) -> &RBatis;
30
31    fn driver_type(&self) -> crate::Result<&str> {
32        self.rb_ref().driver_type()
33    }
34}
35
36impl RBatisRef for RBatis {
37    fn rb_ref(&self) -> &RBatis {
38        self
39    }
40}
41
42#[derive(Clone)]
43pub struct RBatisConnExecutor {
44    pub id: i64,
45    pub rb: RBatis,
46    pub conn: Arc<Mutex<Box<dyn Connection>>>,
47    pub intercepts: Arc<dark_std::sync::SyncVec<Arc<dyn crate::intercept::Intercept>>>,
48}
49
50impl RBatisConnExecutor {
51    pub fn new(id: i64, conn: Box<dyn Connection>, rb: RBatis) -> Self {
52        Self {
53            id: id,
54            conn: Arc::new(Mutex::new(conn)),
55            rb: rb.clone(),
56            intercepts: rb.intercepts.clone(),
57        }
58    }
59
60    pub fn take_connection(self) -> Option<Box<dyn Connection>> {
61        let conn = Arc::into_inner(self.conn);
62        match conn {
63            Option::Some(conn) => {
64                let v = Mutex::into_inner(conn);
65                Some(v)
66            }
67            Option::None => None,
68        }
69    }
70}
71
72impl Debug for RBatisConnExecutor {
73    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
74        f.debug_struct("RBatisTxExecutor")
75            .field("id", &self.id)
76            .field("rb", &self.rb)
77            .finish()
78    }
79}
80
81impl RBatisConnExecutor {
82    pub async fn exec(&self, sql: &str, args: Vec<Value>) -> Result<ExecResult, Error> {
83        let v = Executor::exec(self, sql, args).await?;
84        Ok(v)
85    }
86
87    pub async fn query(&self, sql: &str, args: Vec<Value>) -> Result<Value, Error> {
88        let v = Executor::query(self, sql, args).await?;
89        Ok(v)
90    }
91
92    // Fast path for query_decode - inlined to avoid trait method call overhead
93    pub async fn query_decode<T>(&self, sql: &str, mut args: Vec<Value>) -> Result<T, Error>
94    where
95        T: DeserializeOwned,
96    {
97        // Fast path: no interceptors - skip all overhead
98        if self.intercepts.is_empty() {
99            let result = self.conn.lock().await.get_values(sql, args).await;
100            return result.and_then(|v| decode(v));
101        }
102
103        // Inline the query logic to avoid double async call
104        let mut sql = if sql.is_empty() {
105            String::new()
106        } else {
107            sql.to_string()
108        };
109
110        let rb_task_id = self.rb.task_id_generator.generate();
111        let mut before_result: Result<Value, Error> = Err(Error::from(""));
112
113        // Before intercepts
114        for item in self.intercepts.iter() {
115            let next = item
116                .before(
117                    rb_task_id,
118                    self,
119                    &mut sql,
120                    &mut args,
121                    ResultType::Query(&mut before_result),
122                )
123                .await?;
124            match next {
125                Action::Next => {}
126                Action::Return => {
127                    // Convert Result<Value, Error> to Result<T, Error>
128                    return before_result.and_then(|v| decode(v));
129                }
130            }
131        }
132
133        // Execute query
134        let mut conn = self.conn.lock().await;
135        let mut args_after = if args.is_empty() {
136            Vec::new()
137        } else {
138            args.clone()
139        };
140        let mut result = conn.get_values(&sql, args).await;
141        drop(conn); // Release lock early
142
143        // After intercepts
144        for item in self.intercepts.iter() {
145            let next = item
146                .after(
147                    rb_task_id,
148                    self,
149                    &mut sql,
150                    &mut args_after,
151                    ResultType::Query(&mut result),
152                )
153                .await?;
154            match next {
155                Action::Next => {}
156                Action::Return => {
157                    return before_result.and_then(|v| decode(v));
158                }
159            }
160        }
161
162        // Decode result
163        result.and_then(|v| decode(v))
164    }
165}
166
167impl Executor for RBatisConnExecutor {
168    #[inline]
169    fn id(&self) -> i64 {
170        self.id
171    }
172
173    fn exec(&self, sql: &str, mut args: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
174        // Fast path for empty SQL (common in benchmarks)
175        let mut sql = if sql.is_empty() {
176            String::new()
177        } else {
178            sql.to_string()
179        };
180
181        Box::pin(async move {
182            let rb_task_id = self.rb.task_id_generator.generate();
183            let mut before_result = Err(Error::from(""));
184            for item in self.intercepts.iter() {
185                let next = item
186                    .before(
187                        rb_task_id,
188                        self,
189                        &mut sql,
190                        &mut args,
191                        ResultType::Exec(&mut before_result),
192                    )
193                    .await?;
194                match next {
195                    Action::Next => {}
196                    Action::Return => {
197                        return before_result;
198                    }
199                }
200            }
201            // Fast path for empty args
202            let mut args_after = if args.is_empty() {
203                Vec::new()
204            } else {
205                args.clone()
206            };
207            let mut result = self.conn.lock().await.exec(&sql, args).await;
208            for item in self.intercepts.iter() {
209                let next = item
210                    .after(
211                        rb_task_id,
212                        self,
213                        &mut sql,
214                        &mut args_after,
215                        ResultType::Exec(&mut result),
216                    )
217                    .await?;
218                match next {
219                    Action::Next => {}
220                    Action::Return => {
221                        return before_result;
222                    }
223                }
224            }
225            result
226        })
227    }
228
229    fn query(&self, sql: &str, mut args: Vec<Value>) -> BoxFuture<'_, Result<Value, Error>> {
230        // Fast path for empty SQL (common in benchmarks)
231        let mut sql = if sql.is_empty() {
232            String::new()
233        } else {
234            sql.to_string()
235        };
236
237        Box::pin(async move {
238            let rb_task_id = self.rb.task_id_generator.generate();
239            let mut before_result = Err(Error::from(""));
240            for item in self.intercepts.iter() {
241                let next = item
242                    .before(
243                        rb_task_id,
244                        self,
245                        &mut sql,
246                        &mut args,
247                        ResultType::Query(&mut before_result),
248                    )
249                    .await?;
250                match next {
251                    Action::Next => {}
252                    Action::Return => {
253                        return before_result;
254                    }
255                }
256            }
257            let mut conn = self.conn.lock().await;
258            // Fast path for empty args
259            let mut args_after = if args.is_empty() {
260                Vec::new()
261            } else {
262                args.clone()
263            };
264            let mut result = conn.get_values(&sql, args).await;
265            for item in self.intercepts.iter() {
266                let next = item
267                    .after(
268                        rb_task_id,
269                        self,
270                        &mut sql,
271                        &mut args_after,
272                        ResultType::Query(&mut result),
273                    )
274                    .await?;
275                match next {
276                    Action::Next => {}
277                    Action::Return => {
278                        return before_result;
279                    }
280                }
281            }
282            result
283        })
284    }
285}
286
287impl RBatisRef for RBatisConnExecutor {
288    fn rb_ref(&self) -> &RBatis {
289        &self.rb
290    }
291}
292
293impl RBatisConnExecutor {
294    pub fn begin(self) -> BoxFuture<'static, Result<RBatisTxExecutor, Error>> {
295        Box::pin(async move {
296            let task_id = self.rb.task_id_generator.generate();
297            let id = self.id;
298            let rb = self.rb.clone();
299            let conn = self.take_connection();
300            let mut conn = conn.ok_or_else(|| Error::from("Failed to unwrap Arc"))?;
301            conn.begin().await?;
302            let conn_executor = RBatisConnExecutor::new(id, conn, rb);
303            Ok(RBatisTxExecutor::new(task_id, conn_executor))
304        })
305    }
306
307    pub fn rollback(&self) -> BoxFuture<'_, Result<(), Error>> {
308        Box::pin(async { Ok(self.conn.lock().await.rollback().await?) })
309    }
310
311    pub fn commit(&self) -> BoxFuture<'_, Result<(), Error>> {
312        Box::pin(async { Ok(self.conn.lock().await.commit().await?) })
313    }
314}
315
316/// `RBatisTxExecutor` is a type that represents an executor for transactional operations in RBatis.
317///
318/// # Type Description
319///
320/// The `RBatisTxExecutor` is responsible for executing SQL statements within the context of a transaction.
321/// It provides methods to execute queries, updates, and other SQL operations, ensuring that all operations
322/// are part of the same transactional context.
323#[derive(Clone)]
324pub struct RBatisTxExecutor {
325    pub tx_id: i64,
326    pub conn_executor: RBatisConnExecutor,
327    /// please use tx.done()
328    /// if tx call .commit() or .rollback() done = true.
329    /// if tx not call .commit() or .rollback() done = false
330    done: Arc<AtomicBool>,
331}
332
333impl Debug for RBatisTxExecutor {
334    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
335        f.debug_struct("RBatisTxExecutor")
336            .field("tx_id", &self.tx_id)
337            .field("conn_executor", &self.conn_executor)
338            .field("done", &self.done)
339            .finish()
340    }
341}
342
343impl<'a> RBatisTxExecutor {
344    pub fn new(tx_id: i64, conn_executor: RBatisConnExecutor) -> Self {
345        RBatisTxExecutor {
346            tx_id: tx_id,
347            conn_executor: conn_executor,
348            done: Arc::new(AtomicBool::new(false)),
349        }
350    }
351
352    /// exec
353    pub async fn exec(&self, sql: &str, args: Vec<Value>) -> Result<ExecResult, Error> {
354        let v = Executor::exec(self, sql, args).await?;
355        Ok(v)
356    }
357    /// query value
358    pub async fn query(&self, sql: &str, args: Vec<Value>) -> Result<Value, Error> {
359        let v = Executor::query(self, sql, args).await?;
360        Ok(v)
361    }
362    /// query and decode
363    pub async fn query_decode<T>(&self, sql: &str, args: Vec<Value>) -> Result<T, Error>
364    where
365        T: DeserializeOwned,
366    {
367        let v = Executor::query(self, sql, args).await?;
368        Ok(decode(v)?)
369    }
370
371    pub fn begin(self) -> BoxFuture<'static, Result<Self, Error>> {
372        Box::pin(async move {
373            self.conn_executor.conn.lock().await.begin().await?;
374            Ok(self)
375        })
376    }
377
378    pub fn rollback(&self) -> BoxFuture<'_, Result<(), Error>> {
379        Box::pin(async {
380            let r = self.conn_executor.conn.lock().await.rollback().await?;
381            self.done.store(true, Ordering::Relaxed);
382            Ok(r)
383        })
384    }
385
386    pub fn commit(&self) -> BoxFuture<'_, Result<(), Error>> {
387        Box::pin(async {
388            let r = self.conn_executor.conn.lock().await.commit().await?;
389            self.done.store(true, Ordering::Relaxed);
390            Ok(r)
391        })
392    }
393
394    /// tx is done?
395    pub fn done(&self) -> bool {
396        self.done.load(Ordering::Relaxed)
397    }
398
399    ///change is done
400    pub fn set_done(&self, done: bool) {
401        self.done.store(done, Ordering::Relaxed)
402    }
403
404    /// defer and use future method
405    /// for example:
406    /// ```rust
407    ///  use rbatis::executor::RBatisTxExecutor;
408    ///  use rbatis::{Error, RBatis};
409    ///
410    ///  async fn test_tx(tx:RBatisTxExecutor) -> Result<(),Error>{
411    ///         tx.defer_async(|tx| async move {
412    ///              if !tx.done(){ let _ = tx.rollback().await; }
413    ///         });
414    ///     Ok(())
415    /// }
416    /// ```
417    pub fn defer_async<F>(&self, callback: fn(s: RBatisTxExecutor) -> F) -> RBatisTxExecutorGuard
418    where
419        F: Future<Output = ()> + Send + 'static,
420    {
421        RBatisTxExecutorGuard {
422            tx: self.clone(),
423            callback: Arc::new(move |arg| {
424                let future = callback(arg);
425                rbdc::rt::spawn(future);
426            }),
427        }
428    }
429}
430
431impl Executor for RBatisTxExecutor {
432    fn id(&self) -> i64 {
433        self.tx_id
434    }
435
436    fn exec(&self, sql: &str, mut args: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
437        let mut sql = sql.to_string();
438        Box::pin(async move {
439            let mut before_result = Err(Error::from(""));
440            for item in self.conn_executor.intercepts.iter() {
441                let next = item
442                    .before(
443                        self.tx_id,
444                        self,
445                        &mut sql,
446                        &mut args,
447                        ResultType::Exec(&mut before_result),
448                    )
449                    .await?;
450                match next{
451                    Action::Next=>{
452                        //run next intercept
453                    }
454                    Action::Return=>{
455                        return before_result;
456                    }
457                }
458            }
459            let mut args_after = args.clone();
460            let mut result = self.conn_executor.conn.lock().await.exec(&sql, args).await;
461            for item in self.conn_executor.intercepts.iter() {
462                let next = item
463                    .after(
464                        self.tx_id,
465                        self,
466                        &mut sql,
467                        &mut args_after,
468                        ResultType::Exec(&mut result),
469                    )
470                    .await?;
471                match next{
472                    Action::Next=>{
473                        //run next intercept
474                    }
475                    Action::Return=>{
476                        return before_result;
477                    }
478                }
479            }
480            result
481        })
482    }
483
484    fn query(&self, sql: &str, mut args: Vec<Value>) -> BoxFuture<'_, Result<Value, Error>> {
485        let mut sql = sql.to_string();
486        Box::pin(async move {
487            let mut before_result = Err(Error::from(""));
488            for item in self.conn_executor.intercepts.iter() {
489                let next = item
490                    .before(
491                        self.tx_id,
492                        self,
493                        &mut sql,
494                        &mut args,
495                        ResultType::Query(&mut before_result),
496                    )
497                    .await?;
498                match next{
499                    Action::Next=>{
500                        //run next intercept
501                    }
502                    Action::Return=>{
503                        return before_result;
504                    }
505                }
506            }
507            let mut conn = self.conn_executor.conn.lock().await;
508            let mut args_after = args.clone();
509            let mut result = conn.get_values(&sql, args).await;
510            for item in self.conn_executor.intercepts.iter() {
511                let next = item
512                    .after(
513                        self.tx_id,
514                        self,
515                        &mut sql,
516                        &mut args_after,
517                        ResultType::Query(&mut result),
518                    )
519                    .await?;
520                match next{
521                    Action::Next=>{
522                        //run next intercept
523                    }
524                    Action::Return=>{
525                        return before_result;
526                    }
527                }
528            }
529            Ok(result?)
530        })
531    }
532}
533
534impl RBatisRef for RBatisTxExecutor {
535    fn rb_ref(&self) -> &RBatis {
536        &self.conn_executor.rb
537    }
538}
539
540impl RBatisTxExecutor {
541    pub fn take_connection(self) -> Option<Box<dyn Connection>> {
542        self.conn_executor.take_connection()
543    }
544}
545
546/// `RBatisTxExecutorGuard` is a guard object that manages transactions for RBatis.
547///
548/// # Type Description
549///
550/// The `RBatisTxExecutorGuard` implements the `Drop` trait to ensure that transactions are
551/// automatically committed or rolled back when the guard goes out of scope. It encapsulates
552/// the transaction executor and provides a set of methods to manipulate database transactions.
553#[derive(Clone)]
554pub struct RBatisTxExecutorGuard {
555    pub tx: RBatisTxExecutor,
556    pub callback: Arc<dyn FnMut(RBatisTxExecutor) + Send + Sync>,
557}
558
559impl Debug for RBatisTxExecutorGuard {
560    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
561        f.debug_struct("RBatisTxExecutorGuard")
562            .field("tx", &self.tx)
563            .finish()
564    }
565}
566
567impl RBatisTxExecutorGuard {
568    pub fn tx_id(&self) -> i64 {
569        self.tx.tx_id
570    }
571
572    pub async fn commit(&self) -> crate::Result<()> {
573        self.tx.commit().await?;
574        Ok(())
575    }
576
577    pub async fn rollback(&self) -> crate::Result<()> {
578        self.tx.rollback().await?;
579        Ok(())
580    }
581
582    pub fn take_connection(self) -> Option<Box<dyn Connection>> {
583        self.tx.clone().take_connection()
584    }
585
586    pub async fn query_decode<T>(&self, sql: &str, args: Vec<Value>) -> Result<T, Error>
587    where
588        T: DeserializeOwned,
589    {
590        self.tx.query_decode(sql, args).await
591    }
592}
593
594impl Drop for RBatisTxExecutorGuard {
595    fn drop(&mut self) {
596        match Arc::get_mut(&mut self.callback) {
597            None => {}
598            Some(callback) => {
599                callback(self.tx.clone());
600            }
601        }
602    }
603}
604
605impl RBatisRef for RBatisTxExecutorGuard {
606    fn rb_ref(&self) -> &RBatis {
607        self.tx.rb_ref()
608    }
609}
610
611impl Executor for RBatisTxExecutorGuard {
612    fn id(&self) -> i64 {
613        self.tx.id()
614    }
615
616    fn exec(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
617        let sql = sql.to_string();
618        Box::pin(async move { self.tx.exec(&sql, args).await })
619    }
620
621    fn query(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<Value, Error>> {
622        let sql = sql.to_string();
623        Box::pin(async move { self.tx.query(&sql, args).await })
624    }
625}
626
627impl RBatis {
628    /// exec sql
629    pub async fn exec(&self, sql: &str, args: Vec<Value>) -> Result<ExecResult, Error> {
630        let conn = self.acquire().await?;
631        conn.exec(sql, args).await
632    }
633
634    /// query raw Value
635    pub async fn query(&self, sql: &str, args: Vec<Value>) -> Result<Value, Error> {
636        let conn = self.acquire().await?;
637        let v = conn.query(sql, args).await?;
638        Ok(v)
639    }
640
641    /// query and decode - fully inlined to avoid RBatisConnExecutor allocation
642    pub async fn query_decode<T>(&self, sql: &str, mut args: Vec<Value>) -> Result<T, Error>
643    where
644        T: DeserializeOwned,
645    {
646        // Fast path: no interceptors - skip all overhead
647        if self.intercepts.is_empty() {
648            let pool = self.pool.get().ok_or_else(|| Error::from("[rb] rbatis pool not inited!"))?;
649            let mut conn = pool.get().await?;
650            let result = conn.get_values(sql, args).await;
651            return result.and_then(|v| decode(v));
652        }
653
654        // Full path with interceptors
655        let mut sql = if sql.is_empty() {
656            String::new()
657        } else {
658            sql.to_string()
659        };
660
661        let rb_task_id = self.task_id_generator.generate();
662        let mut before_result: Result<Value, Error> = Err(Error::from(""));
663
664        // Before intercepts
665        for item in self.intercepts.iter() {
666            let next = item
667                .before(
668                    rb_task_id,
669                    self,
670                    &mut sql,
671                    &mut args,
672                    ResultType::Query(&mut before_result),
673                )
674                .await?;
675            match next {
676                Action::Next => {}
677                Action::Return => {
678                    return before_result.and_then(|v| decode(v));
679                }
680            }
681        }
682
683        // Execute query
684        let pool = self.pool.get().ok_or_else(|| Error::from("[rb] rbatis pool not inited!"))?;
685        let mut conn = pool.get().await?;
686        let mut args_after = if args.is_empty() {
687            Vec::new()
688        } else {
689            args.clone()
690        };
691        let mut result = conn.get_values(&sql, args).await;
692
693        // After intercepts
694        for item in self.intercepts.iter() {
695            let next = item
696                .after(
697                    rb_task_id,
698                    self,
699                    &mut sql,
700                    &mut args_after,
701                    ResultType::Query(&mut result),
702                )
703                .await?;
704            match next {
705                Action::Next => {}
706                Action::Return => {
707                    return before_result.and_then(|v| decode(v));
708                }
709            }
710        }
711
712        // Decode result
713        result.and_then(|v| decode(v))
714    }
715}
716
717impl Executor for RBatis {
718    fn id(&self) -> i64 {
719        0
720    }
721
722    fn exec(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
723        let sql = sql.to_string();
724        Box::pin(async move {
725            let conn = self.acquire().await?;
726            conn.exec(&sql, args).await
727        })
728    }
729
730    fn query(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<Value, Error>> {
731        let sql = sql.to_string();
732        Box::pin(async move {
733            let conn = self.acquire().await?;
734            conn.query(&sql, args).await
735        })
736    }
737}
738