1use crate::decode::decode;
2use crate::rbatis::RBatis;
3use crate::Error;
4use dark_std::sync::SyncVec;
5use futures::Future;
6use futures_core::future::BoxFuture;
7use rbdc::db::{Connection, ExecResult};
8use rbdc::rt::tokio::sync::Mutex;
9use rbs::Value;
10use serde::de::DeserializeOwned;
11use std::any::Any;
12use std::fmt::{Debug, Formatter};
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::sync::Arc;
15use crate::intercept::ResultType;
16
17pub trait Executor: RBatisRef + Send + Sync {
19 fn id(&self) -> i64;
20 fn name(&self) -> &str {
21 std::any::type_name::<Self>()
22 }
23 fn exec(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>>;
24 fn query(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<Value, Error>>;
25 fn as_any(&self) -> &dyn Any
26 where
27 Self: Sized,
28 {
29 self
30 }
31}
32
33pub trait RBatisRef: Any + Send + Sync {
34 fn rb_ref(&self) -> &RBatis;
35
36 fn driver_type(&self) -> crate::Result<&str> {
37 self.rb_ref().driver_type()
38 }
39}
40
41impl RBatisRef for RBatis {
42 fn rb_ref(&self) -> &RBatis {
43 self
44 }
45}
46
47pub struct RBatisConnExecutor {
48 pub id: i64,
49 pub rb: RBatis,
50 pub conn: Mutex<Box<dyn Connection>>,
51}
52
53impl RBatisConnExecutor {
54 pub fn new(id: i64, conn: Box<dyn Connection>, rb: RBatis) -> Self {
55 Self {
56 id: id,
57 conn: Mutex::new(conn),
58 rb: rb,
59 }
60 }
61}
62
63impl Debug for RBatisConnExecutor {
64 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
65 f.debug_struct("RBatisTxExecutor")
66 .field("id", &self.id)
67 .field("rb", &self.rb)
68 .finish()
69 }
70}
71
72impl RBatisConnExecutor {
73 pub async fn exec(&self, sql: &str, args: Vec<Value>) -> Result<ExecResult, Error> {
74 let v = Executor::exec(self, sql, args).await?;
75 Ok(v)
76 }
77
78 pub async fn query(&self, sql: &str, args: Vec<Value>) -> Result<Value, Error> {
79 let v = Executor::query(self, sql, args).await?;
80 Ok(v)
81 }
82
83 pub async fn query_decode<T>(&self, sql: &str, args: Vec<Value>) -> Result<T, Error>
84 where
85 T: DeserializeOwned,
86 {
87 let v = Executor::query(self, sql, args).await?;
88 Ok(decode(v)?)
89 }
90}
91
92impl Executor for RBatisConnExecutor {
93 fn id(&self) -> i64 {
94 self.id
95 }
96
97 fn exec(&self, sql: &str, mut args: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
98 let mut sql = sql.to_string();
99 Box::pin(async move {
100 let rb_task_id = self.rb.task_id_generator.generate();
101 let mut before_result = Err(Error::from(""));
102 for item in self.rb_ref().intercepts.iter() {
103 let next = item
104 .before(
105 rb_task_id,
106 self as &dyn Executor,
107 &mut sql,
108 &mut args,
109 ResultType::Exec(&mut before_result),
110 )
111 .await?;
112 if let Some(next) = next {
113 if !next {
114 break;
115 }
116 } else {
117 return before_result;
118 }
119 }
120 let mut args_after = args.clone();
121 let mut result = self.conn.lock().await.exec(&sql, args).await;
122 for item in self.rb_ref().intercepts.iter() {
123 let next = item
124 .after(
125 rb_task_id,
126 self as &dyn Executor,
127 &mut sql,
128 &mut args_after,
129 ResultType::Exec(&mut result),
130 )
131 .await?;
132 if let Some(next) = next {
133 if !next {
134 break;
135 }
136 } else {
137 return result;
138 }
139 }
140 result
141 })
142 }
143
144 fn query(&self, sql: &str, mut args: Vec<Value>) -> BoxFuture<'_, Result<Value, Error>> {
145 let mut sql = sql.to_string();
146 Box::pin(async move {
147 let rb_task_id = self.rb.task_id_generator.generate();
148 let mut before_result = Err(Error::from(""));
149 for item in self.rb_ref().intercepts.iter() {
150 let next = item
151 .before(
152 rb_task_id,
153 self,
154 &mut sql,
155 &mut args,
156 ResultType::Query(&mut before_result),
157 )
158 .await?;
159 if let Some(next) = next {
160 if !next {
161 break;
162 }
163 } else {
164 return before_result.map(|v| Value::from(v));
165 }
166 }
167 let mut conn = self.conn.lock().await;
168 let mut args_after = args.clone();
169 let mut result = conn.get_values(&sql, args).await;
170 for item in self.rb_ref().intercepts.iter() {
171 let next = item
172 .after(
173 rb_task_id,
174 self,
175 &mut sql,
176 &mut args_after,
177 ResultType::Query(&mut result),
178 )
179 .await?;
180 if let Some(next) = next {
181 if !next {
182 break;
183 }
184 } else {
185 return result.map(|v| Value::from(v));
186 }
187 }
188 Ok(Value::Array(result?))
189 })
190 }
191}
192
193impl RBatisRef for RBatisConnExecutor {
194 fn rb_ref(&self) -> &RBatis {
195 &self.rb
196 }
197}
198
199impl RBatisConnExecutor {
200 pub fn begin(self) -> BoxFuture<'static, Result<RBatisTxExecutor, Error>> {
201 Box::pin(async move {
202 let mut conn = self.conn.into_inner();
203 conn.begin().await?;
204 Ok(RBatisTxExecutor::new(
205 self.rb.task_id_generator.generate(),
206 self.rb,
207 conn,
208 ))
209 })
210 }
211
212 pub fn rollback(&self) -> BoxFuture<'_, Result<(), Error>> {
213 Box::pin(async { Ok(self.conn.lock().await.rollback().await?) })
214 }
215
216 pub fn commit(&self) -> BoxFuture<'_, Result<(), Error>> {
217 Box::pin(async { Ok(self.conn.lock().await.commit().await?) })
218 }
219}
220
221#[derive(Clone)]
229pub struct RBatisTxExecutor {
230 pub tx_id: i64,
231 pub conn: Arc<Mutex<Box<dyn Connection>>>,
232 pub rb: RBatis,
233 done: Arc<AtomicBool>,
237}
238
239impl Debug for RBatisTxExecutor {
240 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
241 f.debug_struct("RBatisTxExecutor")
242 .field("tx_id", &self.tx_id)
243 .field("rb", &self.rb)
244 .field("done", &self.done)
245 .finish()
246 }
247}
248
249impl<'a> RBatisTxExecutor {
250 pub fn new(tx_id: i64, rb: RBatis, conn: Box<dyn Connection>) -> Self {
251 RBatisTxExecutor {
252 tx_id: tx_id,
253 conn: Arc::new(Mutex::new(conn)),
254 rb: rb,
255 done: Arc::new(AtomicBool::new(false)),
256 }
257 }
258
259 pub async fn exec(&self, sql: &str, args: Vec<Value>) -> Result<ExecResult, Error> {
261 let v = Executor::exec(self, sql, args).await?;
262 Ok(v)
263 }
264 pub async fn query(&self, sql: &str, args: Vec<Value>) -> Result<Value, Error> {
266 let v = Executor::query(self, sql, args).await?;
267 Ok(v)
268 }
269 pub async fn query_decode<T>(&self, sql: &str, args: Vec<Value>) -> Result<T, Error>
271 where
272 T: DeserializeOwned,
273 {
274 let v = Executor::query(self, sql, args).await?;
275 Ok(decode(v)?)
276 }
277
278 pub fn begin(self) -> BoxFuture<'static, Result<Self, Error>> {
279 Box::pin(async move {
280 self.conn.lock().await.begin().await?;
281 Ok(self)
282 })
283 }
284
285 pub fn rollback(&self) -> BoxFuture<'_, Result<(), Error>> {
286 Box::pin(async {
287 let r = self.conn.lock().await.rollback().await?;
288 self.done.store(true, Ordering::Relaxed);
289 Ok(r)
290 })
291 }
292
293 pub fn commit(&self) -> BoxFuture<'_, Result<(), Error>> {
294 Box::pin(async {
295 let r = self.conn.lock().await.commit().await?;
296 self.done.store(true, Ordering::Relaxed);
297 Ok(r)
298 })
299 }
300
301 pub fn done(&self) -> bool {
303 self.done.load(Ordering::Relaxed)
304 }
305
306 pub fn set_done(&self, done: bool) {
308 self.done.store(done, Ordering::Relaxed)
309 }
310}
311
312impl Executor for RBatisTxExecutor {
313 fn id(&self) -> i64 {
314 self.tx_id
315 }
316
317 fn exec(&self, sql: &str, mut args: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
318 let mut sql = sql.to_string();
319 Box::pin(async move {
320 let mut before_result = Err(Error::from(""));
321 for item in self.rb_ref().intercepts.iter() {
322 let next = item
323 .before(
324 self.tx_id,
325 self,
326 &mut sql,
327 &mut args,
328 ResultType::Exec(&mut before_result),
329 )
330 .await?;
331 if let Some(next) = next {
332 if !next {
333 break;
334 }
335 } else {
336 return before_result;
337 }
338 }
339 let mut args_after = args.clone();
340 let mut result = self.conn.lock().await.exec(&sql, args).await;
341 for item in self.rb_ref().intercepts.iter() {
342 let next = item
343 .after(
344 self.tx_id,
345 self,
346 &mut sql,
347 &mut args_after,
348 ResultType::Exec(&mut result),
349 )
350 .await?;
351 if let Some(next) = next {
352 if !next {
353 break;
354 }
355 } else {
356 return result;
357 }
358 }
359 result
360 })
361 }
362
363 fn query(&self, sql: &str, mut args: Vec<Value>) -> BoxFuture<'_, Result<Value, Error>> {
364 let mut sql = sql.to_string();
365 Box::pin(async move {
366 let mut before_result = Err(Error::from(""));
367 for item in self.rb_ref().intercepts.iter() {
368 let next = item
369 .before(
370 self.tx_id,
371 self,
372 &mut sql,
373 &mut args,
374 ResultType::Query(&mut before_result),
375 )
376 .await?;
377 if let Some(next) = next {
378 if !next {
379 break;
380 }
381 } else {
382 return before_result.map(|v| Value::from(v));
383 }
384 }
385 let mut conn = self.conn.lock().await;
386 let mut args_after = args.clone();
387 let conn = conn.get_values(&sql, args);
388 let mut result = conn.await;
389 for item in self.rb_ref().intercepts.iter() {
390 let next = item
391 .after(
392 self.tx_id,
393 self,
394 &mut sql,
395 &mut args_after,
396 ResultType::Query(&mut result),
397 )
398 .await?;
399 if let Some(next) = next {
400 if !next {
401 break;
402 }
403 } else {
404 return result.map(|v| Value::from(v));
405 }
406 }
407 Ok(Value::Array(result?))
408 })
409 }
410}
411
412impl RBatisRef for RBatisTxExecutor {
413 fn rb_ref(&self) -> &RBatis {
414 &self.rb
415 }
416}
417
418impl RBatisTxExecutor {
419 pub fn take_connection(self) -> Option<Box<dyn Connection>> {
420 match Arc::into_inner(self.conn) {
421 None => None,
422 Some(v) => {
423 let inner = v.into_inner();
424 Some(inner)
425 }
426 }
427 }
428}
429
430#[derive(Clone)]
438pub struct RBatisTxExecutorGuard {
439 pub tx: RBatisTxExecutor,
440 pub callback: Arc<dyn FnMut(RBatisTxExecutor) + Send + Sync>,
441}
442
443impl Debug for RBatisTxExecutorGuard {
444 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
445 f.debug_struct("RBatisTxExecutorGuard")
446 .field("tx", &self.tx)
447 .finish()
448 }
449}
450
451impl RBatisTxExecutorGuard {
452 pub fn tx_id(&self) -> i64 {
453 self.tx.tx_id
454 }
455
456 pub async fn commit(&self) -> crate::Result<()> {
457 self.tx.commit().await?;
458 Ok(())
459 }
460
461 pub async fn rollback(&self) -> crate::Result<()> {
462 self.tx.rollback().await?;
463 Ok(())
464 }
465
466 pub fn take_connection(self) -> Option<Box<dyn Connection>> {
467 self.tx.clone().take_connection()
468 }
469
470 pub async fn query_decode<T>(&self, sql: &str, args: Vec<Value>) -> Result<T, Error>
471 where
472 T: DeserializeOwned,
473 {
474 self.tx.query_decode(sql, args).await
475 }
476}
477
478impl RBatisTxExecutor {
479 pub fn defer_async<F>(&self, callback: fn(s: RBatisTxExecutor) -> F) -> RBatisTxExecutorGuard
493 where
494 F: Future<Output = ()> + Send + 'static,
495 {
496 RBatisTxExecutorGuard {
497 tx: self.clone(),
498 callback: Arc::new(move |arg| {
499 let future = callback(arg);
500 rbdc::rt::spawn(future);
501 }),
502 }
503 }
504}
505
506impl Drop for RBatisTxExecutorGuard {
507 fn drop(&mut self) {
508 match Arc::get_mut(&mut self.callback) {
509 None => {}
510 Some(callback) => {
511 callback(self.tx.clone());
512 }
513 }
514 }
515}
516
517impl RBatisRef for RBatisTxExecutorGuard {
518 fn rb_ref(&self) -> &RBatis {
519 self.tx.rb_ref()
520 }
521}
522
523impl Executor for RBatisTxExecutorGuard {
524 fn id(&self) -> i64 {
525 self.tx.id()
526 }
527
528 fn exec(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
529 let sql = sql.to_string();
530 Box::pin(async move { self.tx.exec(&sql, args).await })
531 }
532
533 fn query(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<Value, Error>> {
534 let sql = sql.to_string();
535 Box::pin(async move { self.tx.query(&sql, args).await })
536 }
537}
538
539impl RBatis {
540 pub async fn exec(&self, sql: &str, args: Vec<Value>) -> Result<ExecResult, Error> {
542 let conn = self.acquire().await?;
543 conn.exec(sql, args).await
544 }
545
546 pub async fn query(&self, sql: &str, args: Vec<Value>) -> Result<Value, Error> {
548 let conn = self.acquire().await?;
549 let v = conn.query(sql, args).await?;
550 Ok(v)
551 }
552
553 pub async fn query_decode<T>(&self, sql: &str, args: Vec<Value>) -> Result<T, Error>
555 where
556 T: DeserializeOwned,
557 {
558 let conn = self.acquire().await?;
559 let v = conn.query(sql, args).await?;
560 Ok(decode(v)?)
561 }
562}
563
564impl Executor for RBatis {
565 fn id(&self) -> i64 {
566 0
567 }
568
569 fn exec(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
570 let sql = sql.to_string();
571 Box::pin(async move {
572 let conn = self.acquire().await?;
573 conn.exec(&sql, args).await
574 })
575 }
576
577 fn query(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<Value, Error>> {
578 let sql = sql.to_string();
579 Box::pin(async move {
580 let conn = self.acquire().await?;
581 conn.query(&sql, args).await
582 })
583 }
584}
585
586#[derive(Debug)]
587pub struct TempExecutor<'a> {
588 pub rb: &'a RBatis,
589 pub sql: SyncVec<String>,
590 pub args: SyncVec<Vec<Value>>,
591}
592
593impl<'a> TempExecutor<'a> {
594 pub fn new(rb: &'a RBatis) -> Self {
595 Self {
596 rb: rb,
597 sql: SyncVec::new(),
598 args: SyncVec::new(),
599 }
600 }
601
602 pub fn clear_sql(&self) -> Vec<String> {
603 let mut arr = vec![];
604 loop {
605 if let Some(v) = self.sql.remove(0) {
606 arr.push(v);
607 } else {
608 break;
609 }
610 }
611 arr
612 }
613
614 pub fn clear_args(&self) -> Vec<Vec<Value>> {
615 let mut arr = vec![];
616 loop {
617 if let Some(v) = self.args.remove(0) {
618 arr.push(v);
619 } else {
620 break;
621 }
622 }
623 arr
624 }
625}
626
627impl RBatisRef for TempExecutor<'static> {
628 fn rb_ref(&self) -> &RBatis {
629 self.rb
630 }
631}
632
633impl Executor for TempExecutor<'static> {
634 fn id(&self) -> i64 {
635 0
636 }
637
638 fn exec(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
639 self.sql.push(sql.to_string());
640 self.args.push(args);
641 Box::pin(async { Ok(ExecResult::default()) })
642 }
643
644 fn query(&self, sql: &str, args: Vec<Value>) -> BoxFuture<'_, Result<Value, Error>> {
645 self.sql.push(sql.to_string());
646 self.args.push(args);
647 Box::pin(async { Ok(Value::default()) })
648 }
649}