1use crate::decode::decode;
2use crate::intercept::ResultType;
3use crate::rbatis::RBatis;
4use crate::{Action, Error};
5use futures::Future;
6use futures_core::future::BoxFuture;
7use rbdc::db::{Connection, ExecResult};
8use rbdc::rt::tokio::sync::Mutex;
9use rbs::Value;
10use serde::de::DeserializeOwned;
11use std::any::Any;
12use std::fmt::{Debug, Formatter};
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::sync::Arc;
15
16pub trait Executor: RBatisRef + Send + Sync {
18 fn id(&self) -> i64;
19
20 fn name(&self) -> &str {
21 std::any::type_name::<Self>()
22 }
23
24 fn exec(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>>;
25 fn query(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<Value, Error>>;
26}
27
28pub trait RBatisRef: Any + Send + Sync {
29 fn rb_ref(&self) -> &RBatis;
30
31 fn driver_type(&self) -> crate::Result<&str> {
32 self.rb_ref().driver_type()
33 }
34}
35
36impl RBatisRef for RBatis {
37 fn rb_ref(&self) -> &RBatis {
38 self
39 }
40}
41
42#[derive(Clone)]
43pub struct RBatisConnExecutor {
44 pub id: i64,
45 pub rb: RBatis,
46 pub conn: Arc<Mutex<Box<dyn Connection>>>,
47 pub intercepts: Arc<dark_std::sync::SyncVec<Arc<dyn crate::intercept::Intercept>>>,
48}
49
50impl RBatisConnExecutor {
51 pub fn new(id: i64, conn: Box<dyn Connection>, rb: RBatis) -> Self {
52 Self {
53 id: id,
54 conn: Arc::new(Mutex::new(conn)),
55 rb: rb.clone(),
56 intercepts: rb.intercepts.clone(),
57 }
58 }
59
60 pub fn take_connection(self) -> Option<Box<dyn Connection>> {
61 let conn = Arc::into_inner(self.conn);
62 match conn {
63 Option::Some(conn) => {
64 let v = Mutex::into_inner(conn);
65 Some(v)
66 }
67 Option::None => None,
68 }
69 }
70}
71
72impl Debug for RBatisConnExecutor {
73 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
74 f.debug_struct("RBatisTxExecutor")
75 .field("id", &self.id)
76 .field("rb", &self.rb)
77 .finish()
78 }
79}
80
81impl RBatisConnExecutor {
82 pub async fn exec(&self, sql: &str, args: Vec<Value>) -> Result<ExecResult, Error> {
83 let v = Executor::exec(self, sql, args).await?;
84 Ok(v)
85 }
86
87 pub async fn query(&self, sql: &str, args: Vec<Value>) -> Result<Value, Error> {
88 let v = Executor::query(self, sql, args).await?;
89 Ok(v)
90 }
91
92 pub async fn query_decode<T>(&self, sql: &str, mut args: Vec<Value>) -> Result<T, Error>
94 where
95 T: DeserializeOwned,
96 {
97 if self.intercepts.is_empty() {
99 let result = self.conn.lock().await.get_values(sql, args).await;
100 return result.and_then(|v| decode(v));
101 }
102
103 let mut sql = if sql.is_empty() {
105 String::new()
106 } else {
107 sql.to_string()
108 };
109
110 let rb_task_id = self.rb.task_id_generator.generate();
111 let mut before_result: Result<Value, Error> = Err(Error::from(""));
112
113 for item in self.intercepts.iter() {
115 let next = item
116 .before(
117 rb_task_id,
118 self,
119 &mut sql,
120 &mut args,
121 ResultType::Query(&mut before_result),
122 )
123 .await?;
124 match next {
125 Action::Next => {}
126 Action::Return => {
127 return before_result.and_then(|v| decode(v));
129 }
130 }
131 }
132
133 let mut conn = self.conn.lock().await;
135 let mut args_after = if args.is_empty() {
136 Vec::new()
137 } else {
138 args.clone()
139 };
140 let mut result = conn.get_values(&sql, args).await;
141 drop(conn); for item in self.intercepts.iter() {
145 let next = item
146 .after(
147 rb_task_id,
148 self,
149 &mut sql,
150 &mut args_after,
151 ResultType::Query(&mut result),
152 )
153 .await?;
154 match next {
155 Action::Next => {}
156 Action::Return => {
157 return before_result.and_then(|v| decode(v));
158 }
159 }
160 }
161
162 result.and_then(|v| decode(v))
164 }
165}
166
167impl Executor for RBatisConnExecutor {
168 #[inline]
169 fn id(&self) -> i64 {
170 self.id
171 }
172
173 fn exec(&self, sql: &str, mut args: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
174 let mut sql = if sql.is_empty() {
176 String::new()
177 } else {
178 sql.to_string()
179 };
180
181 Box::pin(async move {
182 let rb_task_id = self.rb.task_id_generator.generate();
183 let mut before_result = Err(Error::from(""));
184 for item in self.intercepts.iter() {
185 let next = item
186 .before(
187 rb_task_id,
188 self,
189 &mut sql,
190 &mut args,
191 ResultType::Exec(&mut before_result),
192 )
193 .await?;
194 match next {
195 Action::Next => {}
196 Action::Return => {
197 return before_result;
198 }
199 }
200 }
201 let mut args_after = if args.is_empty() {
203 Vec::new()
204 } else {
205 args.clone()
206 };
207 let mut result = self.conn.lock().await.exec(&sql, args).await;
208 for item in self.intercepts.iter() {
209 let next = item
210 .after(
211 rb_task_id,
212 self,
213 &mut sql,
214 &mut args_after,
215 ResultType::Exec(&mut result),
216 )
217 .await?;
218 match next {
219 Action::Next => {}
220 Action::Return => {
221 return before_result;
222 }
223 }
224 }
225 result
226 })
227 }
228
229 fn query(&self, sql: &str, mut args: Vec<Value>) -> BoxFuture<'_, Result<Value, Error>> {
230 let mut sql = if sql.is_empty() {
232 String::new()
233 } else {
234 sql.to_string()
235 };
236
237 Box::pin(async move {
238 let rb_task_id = self.rb.task_id_generator.generate();
239 let mut before_result = Err(Error::from(""));
240 for item in self.intercepts.iter() {
241 let next = item
242 .before(
243 rb_task_id,
244 self,
245 &mut sql,
246 &mut args,
247 ResultType::Query(&mut before_result),
248 )
249 .await?;
250 match next {
251 Action::Next => {}
252 Action::Return => {
253 return before_result;
254 }
255 }
256 }
257 let mut conn = self.conn.lock().await;
258 let mut args_after = if args.is_empty() {
260 Vec::new()
261 } else {
262 args.clone()
263 };
264 let mut result = conn.get_values(&sql, args).await;
265 for item in self.intercepts.iter() {
266 let next = item
267 .after(
268 rb_task_id,
269 self,
270 &mut sql,
271 &mut args_after,
272 ResultType::Query(&mut result),
273 )
274 .await?;
275 match next {
276 Action::Next => {}
277 Action::Return => {
278 return before_result;
279 }
280 }
281 }
282 result
283 })
284 }
285}
286
287impl RBatisRef for RBatisConnExecutor {
288 fn rb_ref(&self) -> &RBatis {
289 &self.rb
290 }
291}
292
293impl RBatisConnExecutor {
294 pub fn begin(self) -> BoxFuture<'static, Result<RBatisTxExecutor, Error>> {
295 Box::pin(async move {
296 let task_id = self.rb.task_id_generator.generate();
297 let id = self.id;
298 let rb = self.rb.clone();
299 let conn = self.take_connection();
300 let mut conn = conn.ok_or_else(|| Error::from("Failed to unwrap Arc"))?;
301 conn.begin().await?;
302 let conn_executor = RBatisConnExecutor::new(id, conn, rb);
303 Ok(RBatisTxExecutor::new(task_id, conn_executor))
304 })
305 }
306
307 pub fn rollback(&self) -> BoxFuture<'_, Result<(), Error>> {
308 Box::pin(async { Ok(self.conn.lock().await.rollback().await?) })
309 }
310
311 pub fn commit(&self) -> BoxFuture<'_, Result<(), Error>> {
312 Box::pin(async { Ok(self.conn.lock().await.commit().await?) })
313 }
314}
315
316#[derive(Clone)]
324pub struct RBatisTxExecutor {
325 pub tx_id: i64,
326 pub conn_executor: RBatisConnExecutor,
327 done: Arc<AtomicBool>,
331}
332
333impl Debug for RBatisTxExecutor {
334 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
335 f.debug_struct("RBatisTxExecutor")
336 .field("tx_id", &self.tx_id)
337 .field("conn_executor", &self.conn_executor)
338 .field("done", &self.done)
339 .finish()
340 }
341}
342
343impl<'a> RBatisTxExecutor {
344 pub fn new(tx_id: i64, conn_executor: RBatisConnExecutor) -> Self {
345 RBatisTxExecutor {
346 tx_id: tx_id,
347 conn_executor: conn_executor,
348 done: Arc::new(AtomicBool::new(false)),
349 }
350 }
351
352 pub async fn exec(&self, sql: &str, args: Vec<Value>) -> Result<ExecResult, Error> {
354 let v = Executor::exec(self, sql, args).await?;
355 Ok(v)
356 }
357 pub async fn query(&self, sql: &str, args: Vec<Value>) -> Result<Value, Error> {
359 let v = Executor::query(self, sql, args).await?;
360 Ok(v)
361 }
362 pub async fn query_decode<T>(&self, sql: &str, args: Vec<Value>) -> Result<T, Error>
364 where
365 T: DeserializeOwned,
366 {
367 let v = Executor::query(self, sql, args).await?;
368 Ok(decode(v)?)
369 }
370
371 pub fn begin(self) -> BoxFuture<'static, Result<Self, Error>> {
372 Box::pin(async move {
373 self.conn_executor.conn.lock().await.begin().await?;
374 Ok(self)
375 })
376 }
377
378 pub fn rollback(&self) -> BoxFuture<'_, Result<(), Error>> {
379 Box::pin(async {
380 let r = self.conn_executor.conn.lock().await.rollback().await?;
381 self.done.store(true, Ordering::Relaxed);
382 Ok(r)
383 })
384 }
385
386 pub fn commit(&self) -> BoxFuture<'_, Result<(), Error>> {
387 Box::pin(async {
388 let r = self.conn_executor.conn.lock().await.commit().await?;
389 self.done.store(true, Ordering::Relaxed);
390 Ok(r)
391 })
392 }
393
394 pub fn done(&self) -> bool {
396 self.done.load(Ordering::Relaxed)
397 }
398
399 pub fn set_done(&self, done: bool) {
401 self.done.store(done, Ordering::Relaxed)
402 }
403
404 pub fn defer_async<F>(&self, callback: fn(s: RBatisTxExecutor) -> F) -> RBatisTxExecutorGuard
418 where
419 F: Future<Output = ()> + Send + 'static,
420 {
421 RBatisTxExecutorGuard {
422 tx: self.clone(),
423 callback: Arc::new(move |arg| {
424 let future = callback(arg);
425 rbdc::rt::spawn(future);
426 }),
427 }
428 }
429}
430
431impl Executor for RBatisTxExecutor {
432 fn id(&self) -> i64 {
433 self.tx_id
434 }
435
436 fn exec(&self, sql: &str, mut args: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
437 let mut sql = sql.to_string();
438 Box::pin(async move {
439 let mut before_result = Err(Error::from(""));
440 for item in self.conn_executor.intercepts.iter() {
441 let next = item
442 .before(
443 self.tx_id,
444 self,
445 &mut sql,
446 &mut args,
447 ResultType::Exec(&mut before_result),
448 )
449 .await?;
450 match next{
451 Action::Next=>{
452 }
454 Action::Return=>{
455 return before_result;
456 }
457 }
458 }
459 let mut args_after = args.clone();
460 let mut result = self.conn_executor.conn.lock().await.exec(&sql, args).await;
461 for item in self.conn_executor.intercepts.iter() {
462 let next = item
463 .after(
464 self.tx_id,
465 self,
466 &mut sql,
467 &mut args_after,
468 ResultType::Exec(&mut result),
469 )
470 .await?;
471 match next{
472 Action::Next=>{
473 }
475 Action::Return=>{
476 return before_result;
477 }
478 }
479 }
480 result
481 })
482 }
483
484 fn query(&self, sql: &str, mut args: Vec<Value>) -> BoxFuture<'_, Result<Value, Error>> {
485 let mut sql = sql.to_string();
486 Box::pin(async move {
487 let mut before_result = Err(Error::from(""));
488 for item in self.conn_executor.intercepts.iter() {
489 let next = item
490 .before(
491 self.tx_id,
492 self,
493 &mut sql,
494 &mut args,
495 ResultType::Query(&mut before_result),
496 )
497 .await?;
498 match next{
499 Action::Next=>{
500 }
502 Action::Return=>{
503 return before_result;
504 }
505 }
506 }
507 let mut conn = self.conn_executor.conn.lock().await;
508 let mut args_after = args.clone();
509 let mut result = conn.get_values(&sql, args).await;
510 for item in self.conn_executor.intercepts.iter() {
511 let next = item
512 .after(
513 self.tx_id,
514 self,
515 &mut sql,
516 &mut args_after,
517 ResultType::Query(&mut result),
518 )
519 .await?;
520 match next{
521 Action::Next=>{
522 }
524 Action::Return=>{
525 return before_result;
526 }
527 }
528 }
529 Ok(result?)
530 })
531 }
532}
533
534impl RBatisRef for RBatisTxExecutor {
535 fn rb_ref(&self) -> &RBatis {
536 &self.conn_executor.rb
537 }
538}
539
540impl RBatisTxExecutor {
541 pub fn take_connection(self) -> Option<Box<dyn Connection>> {
542 self.conn_executor.take_connection()
543 }
544}
545
546#[derive(Clone)]
554pub struct RBatisTxExecutorGuard {
555 pub tx: RBatisTxExecutor,
556 pub callback: Arc<dyn FnMut(RBatisTxExecutor) + Send + Sync>,
557}
558
559impl Debug for RBatisTxExecutorGuard {
560 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
561 f.debug_struct("RBatisTxExecutorGuard")
562 .field("tx", &self.tx)
563 .finish()
564 }
565}
566
567impl RBatisTxExecutorGuard {
568 pub fn tx_id(&self) -> i64 {
569 self.tx.tx_id
570 }
571
572 pub async fn commit(&self) -> crate::Result<()> {
573 self.tx.commit().await?;
574 Ok(())
575 }
576
577 pub async fn rollback(&self) -> crate::Result<()> {
578 self.tx.rollback().await?;
579 Ok(())
580 }
581
582 pub fn take_connection(self) -> Option<Box<dyn Connection>> {
583 self.tx.clone().take_connection()
584 }
585
586 pub async fn query_decode<T>(&self, sql: &str, args: Vec<Value>) -> Result<T, Error>
587 where
588 T: DeserializeOwned,
589 {
590 self.tx.query_decode(sql, args).await
591 }
592}
593
594impl Drop for RBatisTxExecutorGuard {
595 fn drop(&mut self) {
596 match Arc::get_mut(&mut self.callback) {
597 None => {}
598 Some(callback) => {
599 callback(self.tx.clone());
600 }
601 }
602 }
603}
604
605impl RBatisRef for RBatisTxExecutorGuard {
606 fn rb_ref(&self) -> &RBatis {
607 self.tx.rb_ref()
608 }
609}
610
611impl Executor for RBatisTxExecutorGuard {
612 fn id(&self) -> i64 {
613 self.tx.id()
614 }
615
616 fn exec(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
617 let sql = sql.to_string();
618 Box::pin(async move { self.tx.exec(&sql, args).await })
619 }
620
621 fn query(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<Value, Error>> {
622 let sql = sql.to_string();
623 Box::pin(async move { self.tx.query(&sql, args).await })
624 }
625}
626
627impl RBatis {
628 pub async fn exec(&self, sql: &str, args: Vec<Value>) -> Result<ExecResult, Error> {
630 let conn = self.acquire().await?;
631 conn.exec(sql, args).await
632 }
633
634 pub async fn query(&self, sql: &str, args: Vec<Value>) -> Result<Value, Error> {
636 let conn = self.acquire().await?;
637 let v = conn.query(sql, args).await?;
638 Ok(v)
639 }
640
641 pub async fn query_decode<T>(&self, sql: &str, mut args: Vec<Value>) -> Result<T, Error>
643 where
644 T: DeserializeOwned,
645 {
646 if self.intercepts.is_empty() {
648 let pool = self.pool.get().ok_or_else(|| Error::from("[rb] rbatis pool not inited!"))?;
649 let mut conn = pool.get().await?;
650 let result = conn.get_values(sql, args).await;
651 return result.and_then(|v| decode(v));
652 }
653
654 let mut sql = if sql.is_empty() {
656 String::new()
657 } else {
658 sql.to_string()
659 };
660
661 let rb_task_id = self.task_id_generator.generate();
662 let mut before_result: Result<Value, Error> = Err(Error::from(""));
663
664 for item in self.intercepts.iter() {
666 let next = item
667 .before(
668 rb_task_id,
669 self,
670 &mut sql,
671 &mut args,
672 ResultType::Query(&mut before_result),
673 )
674 .await?;
675 match next {
676 Action::Next => {}
677 Action::Return => {
678 return before_result.and_then(|v| decode(v));
679 }
680 }
681 }
682
683 let pool = self.pool.get().ok_or_else(|| Error::from("[rb] rbatis pool not inited!"))?;
685 let mut conn = pool.get().await?;
686 let mut args_after = if args.is_empty() {
687 Vec::new()
688 } else {
689 args.clone()
690 };
691 let mut result = conn.get_values(&sql, args).await;
692
693 for item in self.intercepts.iter() {
695 let next = item
696 .after(
697 rb_task_id,
698 self,
699 &mut sql,
700 &mut args_after,
701 ResultType::Query(&mut result),
702 )
703 .await?;
704 match next {
705 Action::Next => {}
706 Action::Return => {
707 return before_result.and_then(|v| decode(v));
708 }
709 }
710 }
711
712 result.and_then(|v| decode(v))
714 }
715}
716
717impl Executor for RBatis {
718 fn id(&self) -> i64 {
719 0
720 }
721
722 fn exec(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
723 let sql = sql.to_string();
724 Box::pin(async move {
725 let conn = self.acquire().await?;
726 conn.exec(&sql, args).await
727 })
728 }
729
730 fn query(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<Value, Error>> {
731 let sql = sql.to_string();
732 Box::pin(async move {
733 let conn = self.acquire().await?;
734 conn.query(&sql, args).await
735 })
736 }
737}
738