1use super::{AsyncGroup, DataConn, DataConnContainer, DataConnManager};
6
7use std::collections::HashMap;
8use std::future::Future;
9use std::pin::Pin;
10use std::sync::Arc;
11use std::{any, mem, ptr};
12
13#[allow(clippy::enum_variant_names)]
15#[derive(Debug)]
16pub enum DataConnError {
17 FailToPreCommitDataConn {
19 errors: Vec<(Arc<str>, errs::Err)>,
21 },
22
23 FailToCommitDataConn {
25 errors: Vec<(Arc<str>, errs::Err)>,
27 },
28
29 FailToCastDataConn {
31 name: Arc<str>,
33 target_type: &'static str,
35 },
36}
37
38impl<C> DataConnContainer<C>
39where
40 C: DataConn + 'static,
41{
42 pub(crate) fn new(name: impl Into<Arc<str>>, data_conn: Box<C>) -> Self {
43 Self {
44 drop_fn: drop_data_conn::<C>,
45 is_fn: is_data_conn::<C>,
46 commit_fn: commit_data_conn_async::<C>,
47 pre_commit_fn: pre_commit_data_conn_async::<C>,
48 post_commit_fn: post_commit_data_conn_async::<C>,
49 should_force_back_fn: should_force_back_data_conn::<C>,
50 rollback_fn: rollback_data_conn_async::<C>,
51 force_back_fn: force_back_data_conn_async::<C>,
52 close_fn: close_data_conn::<C>,
53
54 name: name.into(),
55 data_conn,
56 }
57 }
58}
59
60fn drop_data_conn<C>(ptr: *const DataConnContainer)
61where
62 C: DataConn + 'static,
63{
64 unsafe {
65 drop(Box::from_raw(ptr as *mut DataConnContainer<C>));
66 }
67}
68
69fn is_data_conn<C>(type_id: any::TypeId) -> bool
70where
71 C: DataConn + 'static,
72{
73 any::TypeId::of::<C>() == type_id
74}
75
76fn commit_data_conn_async<C>(
77 ptr: *const DataConnContainer,
78 ag: &mut AsyncGroup,
79) -> Pin<Box<dyn Future<Output = errs::Result<()>> + '_>>
80where
81 C: DataConn + 'static,
82{
83 let container = unsafe { &mut *(ptr as *mut DataConnContainer<C>) };
84 Box::pin(container.data_conn.commit_async(ag))
85}
86
87fn pre_commit_data_conn_async<C>(
88 ptr: *const DataConnContainer,
89 ag: &mut AsyncGroup,
90) -> Pin<Box<dyn Future<Output = errs::Result<()>> + '_>>
91where
92 C: DataConn + 'static,
93{
94 let container = unsafe { &mut *(ptr as *mut DataConnContainer<C>) };
95 Box::pin(container.data_conn.pre_commit_async(ag))
96}
97
98fn post_commit_data_conn_async<C>(
99 ptr: *const DataConnContainer,
100 ag: &mut AsyncGroup,
101) -> Pin<Box<dyn Future<Output = ()> + '_>>
102where
103 C: DataConn + 'static,
104{
105 let container = unsafe { &mut *(ptr as *mut DataConnContainer<C>) };
106 Box::pin(container.data_conn.post_commit_async(ag))
107}
108
109fn should_force_back_data_conn<C>(ptr: *const DataConnContainer) -> bool
110where
111 C: DataConn + 'static,
112{
113 let container = unsafe { &*(ptr as *const DataConnContainer<C>) };
114 container.data_conn.should_force_back()
115}
116
117fn rollback_data_conn_async<C>(
118 ptr: *const DataConnContainer,
119 ag: &mut AsyncGroup,
120) -> Pin<Box<dyn Future<Output = ()> + '_>>
121where
122 C: DataConn + 'static,
123{
124 let container = unsafe { &mut *(ptr as *mut DataConnContainer<C>) };
125 Box::pin(container.data_conn.rollback_async(ag))
126}
127
128fn force_back_data_conn_async<C>(
129 ptr: *const DataConnContainer,
130 ag: &mut AsyncGroup,
131) -> Pin<Box<dyn Future<Output = ()> + '_>>
132where
133 C: DataConn + 'static,
134{
135 let container = unsafe { &mut *(ptr as *mut DataConnContainer<C>) };
136 Box::pin(container.data_conn.force_back_async(ag))
137}
138
139fn close_data_conn<C>(ptr: *const DataConnContainer)
140where
141 C: DataConn + 'static,
142{
143 let container = unsafe { &mut *(ptr as *mut DataConnContainer<C>) };
144 container.data_conn.close();
145}
146
147impl DataConnManager {
148 pub(crate) fn new() -> Self {
149 Self {
150 vec: Vec::new(),
151 index_map: HashMap::new(),
152 }
153 }
154
155 pub(crate) fn with_commit_order(names: &[&str]) -> Self {
156 let mut index_map = HashMap::with_capacity(names.len());
157 for (i, nm) in names.iter().rev().enumerate() {
159 index_map.insert((*nm).into(), names.len() - 1 - i);
160 }
161
162 Self {
163 vec: vec![None; names.len()],
164 index_map,
165 }
166 }
167
168 pub(crate) fn add(&mut self, nnptr: ptr::NonNull<DataConnContainer>) {
169 let name = unsafe { (*nnptr.as_ptr()).name.clone() };
170 if let Some(index) = self.index_map.get(&name) {
171 self.vec[*index] = Some(nnptr);
172 } else {
173 let index = self.vec.len();
174 self.vec.push(Some(nnptr));
175 self.index_map.insert(name.clone(), index);
176 }
177 }
178
179 pub(crate) fn find_by_name(
180 &self,
181 name: impl AsRef<str>,
182 ) -> Option<ptr::NonNull<DataConnContainer>> {
183 if let Some(index) = self.index_map.get(name.as_ref()) {
184 if *index < self.vec.len() {
185 if let Some(nnptr) = self.vec[*index] {
186 let ptr = nnptr.as_ptr();
187 let cont_name = unsafe { &(*ptr).name };
188 if cont_name.as_ref() == name.as_ref() {
189 return Some(nnptr);
190 }
191 }
192 }
193 }
194
195 None
196 }
197
198 pub(crate) fn to_typed_ptr<C>(
199 nnptr: &ptr::NonNull<DataConnContainer>,
200 ) -> errs::Result<*mut DataConnContainer<C>>
201 where
202 C: DataConn + 'static,
203 {
204 let ptr = nnptr.as_ptr();
205 let name = unsafe { &(*ptr).name };
206 let type_id = any::TypeId::of::<C>();
207 let is_fn = unsafe { (*ptr).is_fn };
208
209 if !is_fn(type_id) {
210 return Err(errs::Err::new(DataConnError::FailToCastDataConn {
211 name: name.clone(),
212 target_type: any::type_name::<C>(),
213 }));
214 }
215
216 let typed_ptr = ptr as *mut DataConnContainer<C>;
217 Ok(typed_ptr)
218 }
219
220 pub(crate) async fn commit_async(&self) -> errs::Result<()> {
221 let mut errors = Vec::new();
222
223 let mut ag = AsyncGroup::new();
224 for nnptr in self.vec.iter().flatten() {
225 let ptr = nnptr.as_ptr();
226 let pre_commit_fn = unsafe { (*ptr).pre_commit_fn };
227 ag._name = unsafe { (*ptr).name.clone() };
228 if let Err(err) = pre_commit_fn(ptr, &mut ag).await {
229 errors.push((ag._name.clone(), err));
230 break;
231 }
232 }
233 ag.join_and_collect_errors_async(&mut errors).await;
234
235 if !errors.is_empty() {
236 return Err(errs::Err::new(DataConnError::FailToPreCommitDataConn {
237 errors,
238 }));
239 }
240
241 let mut ag = AsyncGroup::new();
242 for nnptr in self.vec.iter().flatten() {
243 let ptr = nnptr.as_ptr();
244 let commit_fn = unsafe { (*ptr).commit_fn };
245 ag._name = unsafe { (*ptr).name.clone() };
246 if let Err(err) = commit_fn(ptr, &mut ag).await {
247 errors.push((ag._name.clone(), err));
248 break;
249 }
250 }
251 ag.join_and_collect_errors_async(&mut errors).await;
252
253 if !errors.is_empty() {
254 return Err(errs::Err::new(DataConnError::FailToCommitDataConn {
255 errors,
256 }));
257 }
258
259 let mut ag = AsyncGroup::new();
260 for nnptr in self.vec.iter().flatten() {
261 let ptr = nnptr.as_ptr();
262 let post_commit_fn = unsafe { (*ptr).post_commit_fn };
263 ag._name = unsafe { (*ptr).name.clone() };
264 post_commit_fn(ptr, &mut ag).await;
265 }
266 ag.join_and_ignore_errors_async().await;
267
268 Ok(())
269 }
270
271 pub(crate) async fn rollback_async(&mut self) {
272 let mut ag = AsyncGroup::new();
273 for nnptr in self.vec.iter().flatten() {
274 let ptr = nnptr.as_ptr();
275 let should_force_back_fn = unsafe { (*ptr).should_force_back_fn };
276 let force_back_fn = unsafe { (*ptr).force_back_fn };
277 let rollback_fn = unsafe { (*ptr).rollback_fn };
278 ag._name = unsafe { (*ptr).name.clone() };
279
280 if should_force_back_fn(ptr) {
281 force_back_fn(ptr, &mut ag).await;
282 } else {
283 rollback_fn(ptr, &mut ag).await;
284 }
285 }
286 ag.join_and_ignore_errors_async().await;
287 }
288
289 pub(crate) fn close(&mut self) {
290 self.index_map.clear();
291
292 let vec: Vec<Option<ptr::NonNull<DataConnContainer>>> = mem::take(&mut self.vec);
293
294 for nnptr in vec.iter().flatten() {
295 let ptr = nnptr.as_ptr();
296 let close_fn = unsafe { (*ptr).close_fn };
297 let drop_fn = unsafe { (*ptr).drop_fn };
298 close_fn(ptr);
299 drop_fn(ptr);
300 }
301 }
302}
303
304impl Drop for DataConnManager {
305 fn drop(&mut self) {
306 self.close();
307 }
308}
309
310#[cfg(test)]
311mod tests_of_data_conn {
312 use super::*;
313 use std::sync::{
314 atomic::{AtomicBool, Ordering},
315 Arc, Mutex,
316 };
317 use tokio::time;
318
319 #[derive(PartialEq, Copy, Clone)]
320 enum Fail {
321 Not,
322 Commit,
323 PreCommit,
324 }
325
326 struct SyncDataConn {
327 id: i8,
328 committed: AtomicBool,
329 fail: Fail,
330 logger: Arc<Mutex<Vec<String>>>,
331 }
332 impl SyncDataConn {
333 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: Fail) -> Self {
334 logger
335 .lock()
336 .unwrap()
337 .push(format!("SyncDataConn::new {}", id));
338 Self {
339 id,
340 committed: AtomicBool::new(false),
341 fail,
342 logger,
343 }
344 }
345 }
346 impl Drop for SyncDataConn {
347 fn drop(&mut self) {
348 self.logger
349 .lock()
350 .unwrap()
351 .push(format!("SyncDataConn::drop {}", self.id));
352 }
353 }
354 impl DataConn for SyncDataConn {
355 async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
356 let fail = self.fail;
357 let id = self.id;
358 let logger = self.logger.clone();
359 let committed = &self.committed;
360
361 if fail == Fail::Commit {
362 logger
363 .lock()
364 .unwrap()
365 .push(format!("SyncDataConn::commit {} failed", id));
366 return Err(errs::Err::new("ZZZ".to_string()));
367 }
368 committed.store(true, Ordering::Release);
369 logger
370 .lock()
371 .unwrap()
372 .push(format!("SyncDataConn::commit {}", id));
373 Ok(())
374 }
375 async fn pre_commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
376 let fail = self.fail;
377 let id = self.id;
378 let logger = self.logger.clone();
379
380 if fail == Fail::PreCommit {
381 logger
382 .lock()
383 .unwrap()
384 .push(format!("SyncDataConn::pre_commit {} failed", id));
385 return Err(errs::Err::new("zzz".to_string()));
386 }
387 logger
388 .lock()
389 .unwrap()
390 .push(format!("SyncDataConn::pre_commit {}", id));
391 Ok(())
392 }
393 async fn post_commit_async(&mut self, _ag: &mut AsyncGroup) {
394 let id = self.id;
395 let logger = self.logger.clone();
396
397 logger
398 .lock()
399 .unwrap()
400 .push(format!("SyncDataConn::post_commit {}", id));
401 }
402 fn should_force_back(&self) -> bool {
403 self.committed.load(Ordering::Acquire)
404 }
405 async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {
406 let id = self.id;
407 let logger = self.logger.clone();
408
409 logger
410 .lock()
411 .unwrap()
412 .push(format!("SyncDataConn::rollback {}", id));
413 }
414 async fn force_back_async(&mut self, _ag: &mut AsyncGroup) {
415 let id = self.id;
416 let logger = self.logger.clone();
417
418 logger
419 .lock()
420 .unwrap()
421 .push(format!("SyncDataConn::force_back {}", id));
422 }
423 fn close(&mut self) {
424 self.logger
425 .lock()
426 .unwrap()
427 .push(format!("SyncDataConn::close {}", self.id));
428 }
429 }
430
431 struct AsyncDataConn {
432 id: i8,
433 committed: Arc<AtomicBool>,
434 fail: Fail,
435 logger: Arc<Mutex<Vec<String>>>,
436 }
437 impl AsyncDataConn {
438 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: Fail) -> Self {
439 logger
440 .lock()
441 .unwrap()
442 .push(format!("AsyncDataConn::new {}", id));
443 Self {
444 id,
445 committed: Arc::new(AtomicBool::new(false)),
446 fail,
447 logger,
448 }
449 }
450 }
451 impl Drop for AsyncDataConn {
452 fn drop(&mut self) {
453 self.logger
454 .lock()
455 .unwrap()
456 .push(format!("AsyncDataConn::drop {}", self.id));
457 }
458 }
459 impl DataConn for AsyncDataConn {
460 async fn commit_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
461 let fail = self.fail;
462 let id = self.id;
463 let logger = self.logger.clone();
464 let committed = self.committed.clone();
465
466 ag.add(async move {
467 time::sleep(time::Duration::from_millis(100)).await;
468 if fail == Fail::Commit {
469 logger
470 .lock()
471 .unwrap()
472 .push(format!("AsyncDataConn::commit {} failed", id));
473 return Err(errs::Err::new("YYY".to_string()));
474 }
475 committed.store(true, Ordering::Release);
476 logger
477 .lock()
478 .unwrap()
479 .push(format!("AsyncDataConn::commit {}", id));
480 Ok(())
481 });
482 Ok(())
483 }
484 async fn pre_commit_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
485 let fail = self.fail;
486 let id = self.id;
487 let logger = self.logger.clone();
488
489 ag.add(async move {
490 time::sleep(time::Duration::from_millis(100)).await;
491 if fail == Fail::PreCommit {
492 logger
493 .lock()
494 .unwrap()
495 .push(format!("AsyncDataConn::pre_commit {} failed", id));
496 return Err(errs::Err::new("yyy".to_string()));
497 }
498 logger
499 .lock()
500 .unwrap()
501 .push(format!("AsyncDataConn::pre_commit {}", id));
502 Ok(())
503 });
504 Ok(())
505 }
506 async fn post_commit_async(&mut self, ag: &mut AsyncGroup) {
507 let logger = self.logger.clone();
508 let id = self.id;
509
510 ag.add(async move {
511 time::sleep(time::Duration::from_millis(100)).await;
512 logger
513 .lock()
514 .unwrap()
515 .push(format!("AsyncDataConn::post_commit {}", id));
516 Ok(())
517 });
518 }
519 fn should_force_back(&self) -> bool {
520 self.committed.load(Ordering::Acquire)
521 }
522 async fn rollback_async(&mut self, ag: &mut AsyncGroup) {
523 let logger = self.logger.clone();
524 let id = self.id;
525
526 ag.add(async move {
527 time::sleep(time::Duration::from_millis(100)).await;
528 logger
529 .lock()
530 .unwrap()
531 .push(format!("AsyncDataConn::rollback {}", id));
532 Ok(())
533 });
534 }
535 async fn force_back_async(&mut self, ag: &mut AsyncGroup) {
536 let logger = self.logger.clone();
537 let id = self.id;
538
539 ag.add(async move {
540 time::sleep(time::Duration::from_millis(100)).await;
541 logger
542 .lock()
543 .unwrap()
544 .push(format!("AsyncDataConn::force_back {}", id));
545 Ok(())
546 });
547 }
548 fn close(&mut self) {
549 self.logger
550 .lock()
551 .unwrap()
552 .push(format!("AsyncDataConn::close {}", self.id));
553 }
554 }
555
556 mod tests_of_data_conn_manager {
557 use super::*;
558
559 #[tokio::test]
560 async fn test_new() {
561 let manager = DataConnManager::new();
562 assert!(manager.vec.is_empty());
563 }
564
565 #[tokio::test]
566 async fn test_with_commit_order() {
567 let manager = DataConnManager::with_commit_order(&["bar", "baz", "foo"]);
568 assert_eq!(manager.vec, vec![None, None, None]);
569 assert_eq!(manager.index_map.len(), 3);
570 assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
571 assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
572 assert_eq!(*manager.index_map.get("baz").unwrap(), 1);
573 }
574
575 #[test]
576 fn test_new_and_add() {
577 let logger = Arc::new(Mutex::new(Vec::new()));
578
579 let mut manager = DataConnManager::new();
580 assert!(manager.vec.is_empty());
581 assert!(manager.index_map.is_empty());
582
583 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
584 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
585 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
586 manager.add(nnptr);
587 assert_eq!(manager.vec.len(), 1);
588 assert_eq!(manager.index_map.len(), 1);
589 assert_eq!(*manager.index_map.get("foo").unwrap(), 0);
590
591 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
592 let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
593 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
594 manager.add(nnptr);
595 assert_eq!(manager.vec.len(), 2);
596 assert_eq!(manager.index_map.len(), 2);
597 assert_eq!(*manager.index_map.get("foo").unwrap(), 0);
598 assert_eq!(*manager.index_map.get("bar").unwrap(), 1);
599 }
600
601 #[test]
602 fn test_with_commit_order_and_add() {
603 let logger = Arc::new(Mutex::new(Vec::new()));
604
605 let mut manager = DataConnManager::with_commit_order(&["bar", "baz", "foo"]);
606 assert_eq!(manager.vec, vec![None, None, None]);
607 assert_eq!(manager.index_map.len(), 3);
608 assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
609 assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
610 assert_eq!(*manager.index_map.get("baz").unwrap(), 1);
611
612 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
613 let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
614 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
615 manager.add(nnptr);
616 assert_eq!(manager.vec.len(), 3);
617 assert_eq!(manager.index_map.len(), 3);
618 assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
619
620 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
621 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
622 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
623 manager.add(nnptr);
624 assert_eq!(manager.vec.len(), 3);
625 assert_eq!(manager.index_map.len(), 3);
626 assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
627 assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
628
629 let conn = SyncDataConn::new(3, logger.clone(), Fail::Not);
630 let boxed = Box::new(DataConnContainer::new("qux", Box::new(conn)));
631 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
632 manager.add(nnptr);
633 assert_eq!(manager.vec.len(), 4);
634 assert_eq!(manager.index_map.len(), 4);
635 assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
636 assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
637 assert_eq!(*manager.index_map.get("qux").unwrap(), 3);
638 }
639
640 #[test]
641 fn test_find_by_name_but_none() {
642 let manager = DataConnManager::new();
643 assert!(manager.find_by_name("foo").is_none());
644 assert!(manager.find_by_name("bar").is_none());
645 }
646
647 #[test]
648 fn test_find_by_name_and_found() {
649 let logger = Arc::new(Mutex::new(Vec::new()));
650
651 let mut manager = DataConnManager::new();
652
653 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
654 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
655 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
656 manager.add(nnptr);
657
658 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
659 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
660 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
661 manager.add(nnptr);
662
663 if let Some(nnptr) = manager.find_by_name("foo") {
664 let name = unsafe { (*nnptr.as_ptr()).name.clone() };
665 assert_eq!(name.as_ref(), "foo");
666 } else {
667 panic!();
668 }
669
670 if let Some(nnptr) = manager.find_by_name("bar") {
671 let name = unsafe { (*nnptr.as_ptr()).name.clone() };
672 assert_eq!(name.as_ref(), "bar");
673 } else {
674 panic!();
675 }
676 }
677
678 #[test]
679 fn test_to_typed_ptr() {
680 let logger = Arc::new(Mutex::new(Vec::new()));
681
682 let mut manager = DataConnManager::new();
683
684 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
685 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
686 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
687 manager.add(nnptr);
688
689 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
690 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
691 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
692 manager.add(nnptr);
693
694 let nnptr = manager.find_by_name("foo").unwrap();
695 if let Ok(typed_nnptr) = DataConnManager::to_typed_ptr::<SyncDataConn>(&nnptr) {
696 assert_eq!(any::type_name_of_val(&typed_nnptr), "*mut sabi::tokio::DataConnContainer<sabi::tokio::data_conn::tests_of_data_conn::SyncDataConn>");
697 assert_eq!(unsafe { (*typed_nnptr).name.clone() }, "foo".into());
698 } else {
699 panic!();
700 }
701
702 let nnptr = manager.find_by_name("bar").unwrap();
703 if let Ok(typed_nnptr) = DataConnManager::to_typed_ptr::<AsyncDataConn>(&nnptr) {
704 assert_eq!(any::type_name_of_val(&typed_nnptr), "*mut sabi::tokio::DataConnContainer<sabi::tokio::data_conn::tests_of_data_conn::AsyncDataConn>");
705 assert_eq!(unsafe { (*typed_nnptr).name.clone() }, "bar".into());
706 } else {
707 panic!();
708 }
709 }
710
711 #[test]
712 fn test_to_typed_ptr_but_fail() {
713 let logger = Arc::new(Mutex::new(Vec::new()));
714
715 let mut manager = DataConnManager::new();
716
717 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
718 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
719 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
720 manager.add(nnptr);
721
722 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
723 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
724 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
725 manager.add(nnptr);
726
727 let nnptr = manager.find_by_name("foo").unwrap();
728 if let Err(err) = DataConnManager::to_typed_ptr::<AsyncDataConn>(&nnptr) {
729 match err.reason::<DataConnError>() {
730 Ok(DataConnError::FailToCastDataConn { name, target_type }) => {
731 assert_eq!(name.as_ref(), "foo");
732 assert_eq!(
733 *target_type,
734 "sabi::tokio::data_conn::tests_of_data_conn::AsyncDataConn"
735 );
736 }
737 _ => panic!(),
738 }
739 } else {
740 panic!();
741 }
742
743 let nnptr = manager.find_by_name("bar").unwrap();
744 if let Err(err) = DataConnManager::to_typed_ptr::<SyncDataConn>(&nnptr) {
745 match err.reason::<DataConnError>() {
746 Ok(DataConnError::FailToCastDataConn { name, target_type }) => {
747 assert_eq!(name.as_ref(), "bar");
748 assert_eq!(
749 *target_type,
750 "sabi::tokio::data_conn::tests_of_data_conn::SyncDataConn"
751 );
752 }
753 _ => panic!(),
754 }
755 } else {
756 panic!();
757 }
758 }
759
760 #[tokio::test]
761 async fn test_commit_ok() {
762 let logger = Arc::new(Mutex::new(Vec::new()));
763
764 {
765 let mut manager = DataConnManager::new();
766
767 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
768 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
769 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
770 manager.add(nnptr);
771
772 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
773 let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
774 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
775 manager.add(nnptr);
776
777 assert!(manager.commit_async().await.is_ok());
778 }
779
780 assert_eq!(
781 *logger.lock().unwrap(),
782 &[
783 "SyncDataConn::new 1",
784 "AsyncDataConn::new 2",
785 "SyncDataConn::pre_commit 1",
786 "AsyncDataConn::pre_commit 2",
787 "SyncDataConn::commit 1",
788 "AsyncDataConn::commit 2",
789 "SyncDataConn::post_commit 1",
790 "AsyncDataConn::post_commit 2",
791 "SyncDataConn::close 1",
792 "SyncDataConn::drop 1",
793 "AsyncDataConn::close 2",
794 "AsyncDataConn::drop 2",
795 ]
796 );
797 }
798
799 #[tokio::test]
800 async fn test_commit_with_order() {
801 let logger = Arc::new(Mutex::new(Vec::new()));
802
803 {
804 let mut manager = DataConnManager::with_commit_order(&["bar", "baz", "foo"]);
805
806 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
807 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
808 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
809 manager.add(nnptr);
810
811 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
812 let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
813 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
814 manager.add(nnptr);
815
816 let conn = SyncDataConn::new(3, logger.clone(), Fail::Not);
817 let boxed = Box::new(DataConnContainer::new("qux", Box::new(conn)));
818 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
819 manager.add(nnptr);
820
821 assert!(manager.commit_async().await.is_ok());
822 }
823
824 assert_eq!(
825 *logger.lock().unwrap(),
826 &[
827 "SyncDataConn::new 1",
828 "AsyncDataConn::new 2",
829 "SyncDataConn::new 3",
830 "SyncDataConn::pre_commit 1",
831 "SyncDataConn::pre_commit 3",
832 "AsyncDataConn::pre_commit 2", "SyncDataConn::commit 1",
834 "SyncDataConn::commit 3",
835 "AsyncDataConn::commit 2", "SyncDataConn::post_commit 1",
837 "SyncDataConn::post_commit 3",
838 "AsyncDataConn::post_commit 2", "AsyncDataConn::close 2",
840 "AsyncDataConn::drop 2",
841 "SyncDataConn::close 1",
842 "SyncDataConn::drop 1",
843 "SyncDataConn::close 3",
844 "SyncDataConn::drop 3",
845 ]
846 );
847 }
848
849 #[tokio::test]
850 async fn test_commit_but_fail_first_sync_pre_commit() {
851 let logger = Arc::new(Mutex::new(Vec::new()));
852
853 {
854 let mut manager = DataConnManager::new();
855
856 let conn = SyncDataConn::new(1, logger.clone(), Fail::PreCommit);
857 let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
858 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
859 manager.add(nnptr);
860
861 let conn = AsyncDataConn::new(2, logger.clone(), Fail::PreCommit);
862 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
863 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
864 manager.add(nnptr);
865
866 if let Err(e) = manager.commit_async().await {
867 match e.reason::<DataConnError>() {
868 Ok(DataConnError::FailToPreCommitDataConn { errors }) => {
869 assert_eq!(errors.len(), 1);
870 assert_eq!(errors[0].0, "foo".into());
871 assert_eq!(errors[0].1.reason::<String>().unwrap(), "zzz");
872 }
873 _ => panic!(),
874 }
875 } else {
876 panic!();
877 }
878 }
879
880 assert_eq!(
881 *logger.lock().unwrap(),
882 &[
883 "SyncDataConn::new 1",
884 "AsyncDataConn::new 2",
885 "SyncDataConn::pre_commit 1 failed",
886 "SyncDataConn::close 1",
887 "SyncDataConn::drop 1",
888 "AsyncDataConn::close 2",
889 "AsyncDataConn::drop 2",
890 ]
891 );
892 }
893
894 #[tokio::test]
895 async fn test_commit_but_fail_first_async_pre_commit() {
896 let logger = Arc::new(Mutex::new(Vec::new()));
897
898 {
899 let mut manager = DataConnManager::new();
900
901 let conn = SyncDataConn::new(1, logger.clone(), Fail::PreCommit);
902 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
903 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
904 manager.add(nnptr);
905
906 let conn = AsyncDataConn::new(2, logger.clone(), Fail::PreCommit);
907 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
908 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
909 manager.add(nnptr);
910
911 if let Err(e) = manager.commit_async().await {
912 match e.reason::<DataConnError>() {
913 Ok(DataConnError::FailToPreCommitDataConn { errors }) => {
914 assert_eq!(errors.len(), 1);
915 assert_eq!(errors[0].0, "foo".into());
916 assert_eq!(errors[0].1.reason::<String>().unwrap(), "zzz");
917 }
918 _ => panic!(),
919 }
920 } else {
921 panic!();
922 }
923 }
924
925 assert_eq!(
926 *logger.lock().unwrap(),
927 &[
928 "SyncDataConn::new 1",
929 "AsyncDataConn::new 2",
930 "SyncDataConn::pre_commit 1 failed",
931 "SyncDataConn::close 1",
932 "SyncDataConn::drop 1",
933 "AsyncDataConn::close 2",
934 "AsyncDataConn::drop 2",
935 ]
936 );
937 }
938
939 #[tokio::test]
940 async fn test_commit_but_fail_second_pre_commit() {
941 let logger = Arc::new(Mutex::new(Vec::new()));
942
943 {
944 let mut manager = DataConnManager::new();
945
946 let conn = AsyncDataConn::new(1, logger.clone(), Fail::PreCommit);
947 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
948 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
949 manager.add(nnptr);
950
951 let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
952 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
953 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
954 manager.add(nnptr);
955
956 if let Err(e) = manager.commit_async().await {
957 match e.reason::<DataConnError>() {
958 Ok(DataConnError::FailToPreCommitDataConn { errors }) => {
959 assert_eq!(errors.len(), 1);
960 assert_eq!(errors[0].0, "foo".into());
961 assert_eq!(errors[0].1.reason::<String>().unwrap(), "yyy");
962 }
963 _ => panic!(),
964 }
965 } else {
966 panic!();
967 }
968 }
969
970 assert_eq!(
971 *logger.lock().unwrap(),
972 &[
973 "AsyncDataConn::new 1",
974 "SyncDataConn::new 2",
975 "SyncDataConn::pre_commit 2",
976 "AsyncDataConn::pre_commit 1 failed",
977 "AsyncDataConn::close 1",
978 "AsyncDataConn::drop 1",
979 "SyncDataConn::close 2",
980 "SyncDataConn::drop 2",
981 ]
982 );
983 }
984
985 #[tokio::test]
986 async fn test_commit_but_fail_first_sync_commit() {
987 let logger = Arc::new(Mutex::new(Vec::new()));
988
989 {
990 let mut manager = DataConnManager::new();
991
992 let conn = SyncDataConn::new(1, logger.clone(), Fail::Commit);
993 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
994 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
995 manager.add(nnptr);
996
997 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Commit);
998 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
999 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1000 manager.add(nnptr);
1001
1002 if let Err(e) = manager.commit_async().await {
1003 match e.reason::<DataConnError>() {
1004 Ok(DataConnError::FailToCommitDataConn { errors }) => {
1005 assert_eq!(errors.len(), 1);
1006 assert_eq!(errors[0].0, "foo".into());
1007 assert_eq!(errors[0].1.reason::<String>().unwrap(), "ZZZ");
1008 }
1009 _ => panic!(),
1010 }
1011 } else {
1012 panic!();
1013 }
1014 }
1015
1016 assert_eq!(
1017 *logger.lock().unwrap(),
1018 &[
1019 "SyncDataConn::new 1",
1020 "AsyncDataConn::new 2",
1021 "SyncDataConn::pre_commit 1",
1022 "AsyncDataConn::pre_commit 2",
1023 "SyncDataConn::commit 1 failed",
1024 "SyncDataConn::close 1",
1025 "SyncDataConn::drop 1",
1026 "AsyncDataConn::close 2",
1027 "AsyncDataConn::drop 2",
1028 ]
1029 );
1030 }
1031
1032 #[tokio::test]
1033 async fn test_commit_but_fail_first_async_commit() {
1034 let logger = Arc::new(Mutex::new(Vec::new()));
1035
1036 {
1037 let mut manager = DataConnManager::new();
1038
1039 let conn = AsyncDataConn::new(1, logger.clone(), Fail::Commit);
1040 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1041 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1042 manager.add(nnptr);
1043
1044 let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
1045 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1046 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1047 manager.add(nnptr);
1048
1049 if let Err(e) = manager.commit_async().await {
1050 match e.reason::<DataConnError>() {
1051 Ok(DataConnError::FailToCommitDataConn { errors }) => {
1052 assert_eq!(errors.len(), 1);
1053 assert_eq!(errors[0].0, "foo".into());
1054 assert_eq!(errors[0].1.reason::<String>().unwrap(), "YYY");
1055 }
1056 _ => panic!(),
1057 }
1058 } else {
1059 panic!();
1060 }
1061 }
1062
1063 assert_eq!(
1064 *logger.lock().unwrap(),
1065 &[
1066 "AsyncDataConn::new 1",
1067 "SyncDataConn::new 2",
1068 "SyncDataConn::pre_commit 2",
1069 "AsyncDataConn::pre_commit 1",
1070 "SyncDataConn::commit 2",
1071 "AsyncDataConn::commit 1 failed",
1072 "AsyncDataConn::close 1",
1073 "AsyncDataConn::drop 1",
1074 "SyncDataConn::close 2",
1075 "SyncDataConn::drop 2",
1076 ]
1077 );
1078 }
1079
1080 #[tokio::test]
1081 async fn test_commit_but_fail_second_commit() {
1082 let logger = Arc::new(Mutex::new(Vec::new()));
1083
1084 {
1085 let mut manager = DataConnManager::new();
1086
1087 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
1088 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1089 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1090 manager.add(nnptr);
1091
1092 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Commit);
1093 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1094 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1095 manager.add(nnptr);
1096
1097 if let Err(e) = manager.commit_async().await {
1098 match e.reason::<DataConnError>() {
1099 Ok(DataConnError::FailToCommitDataConn { errors }) => {
1100 assert_eq!(errors.len(), 1);
1101 assert_eq!(errors[0].0, "bar".into());
1102 assert_eq!(errors[0].1.reason::<String>().unwrap(), "YYY");
1103 }
1104 _ => panic!(),
1105 }
1106 } else {
1107 panic!();
1108 }
1109 }
1110
1111 assert_eq!(
1112 *logger.lock().unwrap(),
1113 &[
1114 "SyncDataConn::new 1",
1115 "AsyncDataConn::new 2",
1116 "SyncDataConn::pre_commit 1",
1117 "AsyncDataConn::pre_commit 2",
1118 "SyncDataConn::commit 1",
1119 "AsyncDataConn::commit 2 failed",
1120 "SyncDataConn::close 1",
1121 "SyncDataConn::drop 1",
1122 "AsyncDataConn::close 2",
1123 "AsyncDataConn::drop 2",
1124 ]
1125 );
1126 }
1127
1128 #[tokio::test]
1129 async fn test_rollback_and_first_is_sync() {
1130 let logger = Arc::new(Mutex::new(Vec::new()));
1131
1132 {
1133 let mut manager = DataConnManager::new();
1134
1135 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
1136 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1137 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1138 manager.add(nnptr);
1139
1140 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
1141 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1142 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1143 manager.add(nnptr);
1144
1145 manager.rollback_async().await;
1146 }
1147
1148 assert_eq!(
1149 *logger.lock().unwrap(),
1150 &[
1151 "SyncDataConn::new 1",
1152 "AsyncDataConn::new 2",
1153 "SyncDataConn::rollback 1",
1154 "AsyncDataConn::rollback 2",
1155 "SyncDataConn::close 1",
1156 "SyncDataConn::drop 1",
1157 "AsyncDataConn::close 2",
1158 "AsyncDataConn::drop 2",
1159 ]
1160 );
1161 }
1162
1163 #[tokio::test]
1164 async fn test_rollback_and_first_is_async() {
1165 let logger = Arc::new(Mutex::new(Vec::new()));
1166
1167 {
1168 let mut manager = DataConnManager::new();
1169
1170 let conn = AsyncDataConn::new(1, logger.clone(), Fail::Not);
1171 let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
1172 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1173 manager.add(nnptr);
1174
1175 let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
1176 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1177 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1178 manager.add(nnptr);
1179
1180 manager.rollback_async().await;
1181 }
1182
1183 assert_eq!(
1184 *logger.lock().unwrap(),
1185 &[
1186 "AsyncDataConn::new 1",
1187 "SyncDataConn::new 2",
1188 "SyncDataConn::rollback 2",
1189 "AsyncDataConn::rollback 1",
1190 "AsyncDataConn::close 1",
1191 "AsyncDataConn::drop 1",
1192 "SyncDataConn::close 2",
1193 "SyncDataConn::drop 2",
1194 ]
1195 );
1196 }
1197
1198 #[tokio::test]
1199 async fn test_force_back_and_first_is_sync() {
1200 let logger = Arc::new(Mutex::new(Vec::new()));
1201
1202 {
1203 let mut manager = DataConnManager::new();
1204
1205 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
1206 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1207 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1208 manager.add(nnptr);
1209
1210 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
1211 let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
1212 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1213 manager.add(nnptr);
1214
1215 assert!(manager.commit_async().await.is_ok());
1216 manager.rollback_async().await;
1217 }
1218
1219 assert_eq!(
1220 *logger.lock().unwrap(),
1221 &[
1222 "SyncDataConn::new 1",
1223 "AsyncDataConn::new 2",
1224 "SyncDataConn::pre_commit 1",
1225 "AsyncDataConn::pre_commit 2",
1226 "SyncDataConn::commit 1",
1227 "AsyncDataConn::commit 2",
1228 "SyncDataConn::post_commit 1",
1229 "AsyncDataConn::post_commit 2",
1230 "SyncDataConn::force_back 1",
1231 "AsyncDataConn::force_back 2",
1232 "SyncDataConn::close 1",
1233 "SyncDataConn::drop 1",
1234 "AsyncDataConn::close 2",
1235 "AsyncDataConn::drop 2",
1236 ]
1237 );
1238 }
1239
1240 #[tokio::test]
1241 async fn test_force_back_and_first_is_async() {
1242 let logger = Arc::new(Mutex::new(Vec::new()));
1243
1244 {
1245 let mut manager = DataConnManager::new();
1246
1247 let conn = AsyncDataConn::new(1, logger.clone(), Fail::Not);
1248 let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
1249 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1250 manager.add(nnptr);
1251
1252 let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
1253 let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
1254 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1255 manager.add(nnptr);
1256
1257 assert!(manager.commit_async().await.is_ok());
1258 manager.rollback_async().await;
1259 }
1260
1261 assert_eq!(
1262 *logger.lock().unwrap(),
1263 &[
1264 "AsyncDataConn::new 1",
1265 "SyncDataConn::new 2",
1266 "SyncDataConn::pre_commit 2",
1267 "AsyncDataConn::pre_commit 1",
1268 "SyncDataConn::commit 2",
1269 "AsyncDataConn::commit 1",
1270 "SyncDataConn::post_commit 2",
1271 "AsyncDataConn::post_commit 1",
1272 "SyncDataConn::force_back 2",
1273 "AsyncDataConn::force_back 1",
1274 "AsyncDataConn::close 1",
1275 "AsyncDataConn::drop 1",
1276 "SyncDataConn::close 2",
1277 "SyncDataConn::drop 2",
1278 ]
1279 );
1280 }
1281 }
1282}