Skip to main content

sabi/tokio/
data_conn.rs

1// Copyright (C) 2024-2026 Takayuki Sato. All Rights Reserved.
2// This program is free software under MIT License.
3// See the file LICENSE in this distribution for more details.
4
5use super::{AsyncGroup, DataConn, DataConnContainer, DataConnManager, SendSyncNonNull};
6
7use std::collections::HashMap;
8use std::future::Future;
9use std::pin::Pin;
10use std::sync::Arc;
11use std::{any, mem};
12
13/// Represents errors that can occur during data connection operations.
14#[allow(clippy::enum_variant_names)]
15#[derive(Debug)]
16pub enum DataConnError {
17    /// An error indicating that one or more data connections failed during the pre-commit process.
18    FailToPreCommitDataConn {
19        /// A vector of errors, each containing the name of the data connection and the error itself.
20        errors: Vec<(Arc<str>, errs::Err)>,
21    },
22
23    /// An error indicating that one or more data connections failed during the commit process.
24    FailToCommitDataConn {
25        /// A vector of errors, each containing the name of the data connection and the error itself.
26        errors: Vec<(Arc<str>, errs::Err)>,
27    },
28
29    /// An error indicating that a data connection could not be cast to the target type.
30    FailToCastDataConn {
31        /// The name of the data connection that failed to cast.
32        name: Arc<str>,
33        /// The string representation of the target type to which the connection could not be cast.
34        target_type: &'static str,
35    },
36}
37
38impl<C> DataConnContainer<C>
39where
40    C: DataConn + 'static,
41{
42    pub(crate) fn new(name: impl Into<Arc<str>>, data_conn: Box<C>) -> Self {
43        Self {
44            drop_fn: drop_data_conn::<C>,
45            is_fn: is_data_conn::<C>,
46            commit_fn: commit_data_conn_async::<C>,
47            pre_commit_fn: pre_commit_data_conn_async::<C>,
48            post_commit_fn: post_commit_data_conn_async::<C>,
49            should_force_back_fn: should_force_back_data_conn::<C>,
50            rollback_fn: rollback_data_conn_async::<C>,
51            force_back_fn: force_back_data_conn_async::<C>,
52            close_fn: close_data_conn::<C>,
53
54            name: name.into(),
55            data_conn,
56        }
57    }
58}
59
60fn drop_data_conn<C>(ptr: *const DataConnContainer)
61where
62    C: DataConn + 'static,
63{
64    unsafe {
65        drop(Box::from_raw(ptr as *mut DataConnContainer<C>));
66    }
67}
68
69fn is_data_conn<C>(type_id: any::TypeId) -> bool
70where
71    C: DataConn + 'static,
72{
73    any::TypeId::of::<C>() == type_id
74}
75
76fn commit_data_conn_async<C>(
77    ptr: *const DataConnContainer,
78    ag: &mut AsyncGroup,
79) -> Pin<Box<dyn Future<Output = errs::Result<()>> + '_>>
80where
81    C: DataConn + 'static,
82{
83    let container = unsafe { &mut *(ptr as *mut DataConnContainer<C>) };
84    Box::pin(container.data_conn.commit_async(ag))
85}
86
87fn pre_commit_data_conn_async<C>(
88    ptr: *const DataConnContainer,
89    ag: &mut AsyncGroup,
90) -> Pin<Box<dyn Future<Output = errs::Result<()>> + '_>>
91where
92    C: DataConn + 'static,
93{
94    let container = unsafe { &mut *(ptr as *mut DataConnContainer<C>) };
95    Box::pin(container.data_conn.pre_commit_async(ag))
96}
97
98fn post_commit_data_conn_async<C>(
99    ptr: *const DataConnContainer,
100    ag: &mut AsyncGroup,
101) -> Pin<Box<dyn Future<Output = ()> + '_>>
102where
103    C: DataConn + 'static,
104{
105    let container = unsafe { &mut *(ptr as *mut DataConnContainer<C>) };
106    Box::pin(container.data_conn.post_commit_async(ag))
107}
108
109fn should_force_back_data_conn<C>(ptr: *const DataConnContainer) -> bool
110where
111    C: DataConn + 'static,
112{
113    let container = unsafe { &*(ptr as *const DataConnContainer<C>) };
114    container.data_conn.should_force_back()
115}
116
117fn rollback_data_conn_async<C>(
118    ptr: *const DataConnContainer,
119    ag: &mut AsyncGroup,
120) -> Pin<Box<dyn Future<Output = ()> + '_>>
121where
122    C: DataConn + 'static,
123{
124    let container = unsafe { &mut *(ptr as *mut DataConnContainer<C>) };
125    Box::pin(container.data_conn.rollback_async(ag))
126}
127
128fn force_back_data_conn_async<C>(
129    ptr: *const DataConnContainer,
130    ag: &mut AsyncGroup,
131) -> Pin<Box<dyn Future<Output = ()> + '_>>
132where
133    C: DataConn + 'static,
134{
135    let container = unsafe { &mut *(ptr as *mut DataConnContainer<C>) };
136    Box::pin(container.data_conn.force_back_async(ag))
137}
138
139fn close_data_conn<C>(ptr: *const DataConnContainer)
140where
141    C: DataConn + 'static,
142{
143    let container = unsafe { &mut *(ptr as *mut DataConnContainer<C>) };
144    container.data_conn.close();
145}
146
147impl DataConnManager {
148    pub(crate) fn new() -> Self {
149        Self {
150            vec: Vec::new(),
151            index_map: HashMap::new(),
152        }
153    }
154
155    pub(crate) fn with_commit_order(names: &[&str]) -> Self {
156        let mut index_map = HashMap::with_capacity(names.len());
157        // To overwrite later indexed elements with eariler ones when names overlap
158        for (i, nm) in names.iter().rev().enumerate() {
159            index_map.insert((*nm).into(), names.len() - 1 - i);
160        }
161
162        Self {
163            vec: vec![None; names.len()],
164            index_map,
165        }
166    }
167
168    pub(crate) fn add(&mut self, ssnnptr: SendSyncNonNull<DataConnContainer>) {
169        let name = unsafe { (*ssnnptr.non_null_ptr.as_ptr()).name.clone() };
170        if let Some(index) = self.index_map.get(&name) {
171            self.vec[*index] = Some(ssnnptr);
172        } else {
173            let index = self.vec.len();
174            self.vec.push(Some(ssnnptr));
175            self.index_map.insert(name.clone(), index);
176        }
177    }
178
179    pub(crate) fn find_by_name(
180        &self,
181        name: impl AsRef<str>,
182    ) -> Option<SendSyncNonNull<DataConnContainer>> {
183        if let Some(index) = self.index_map.get(name.as_ref()) {
184            if *index < self.vec.len() {
185                if let Some(ssnnptr) = &self.vec[*index] {
186                    let ptr = ssnnptr.non_null_ptr.as_ptr();
187                    let cont_name = unsafe { &(*ptr).name };
188                    if cont_name.as_ref() == name.as_ref() {
189                        return Some(ssnnptr.clone());
190                    }
191                }
192            }
193        }
194
195        None
196    }
197
198    pub(crate) fn to_typed_ptr<C>(
199        ssnnptr: &SendSyncNonNull<DataConnContainer>,
200    ) -> errs::Result<*mut DataConnContainer<C>>
201    where
202        C: DataConn + 'static,
203    {
204        let ptr = ssnnptr.non_null_ptr.as_ptr();
205        let name = unsafe { &(*ptr).name };
206        let type_id = any::TypeId::of::<C>();
207        let is_fn = unsafe { (*ptr).is_fn };
208
209        if !is_fn(type_id) {
210            return Err(errs::Err::new(DataConnError::FailToCastDataConn {
211                name: name.clone(),
212                target_type: any::type_name::<C>(),
213            }));
214        }
215
216        let typed_ptr = ptr as *mut DataConnContainer<C>;
217        Ok(typed_ptr)
218    }
219
220    pub(crate) async fn commit_async(&self) -> errs::Result<()> {
221        let mut errors = Vec::new();
222
223        let mut ag = AsyncGroup::new();
224        for ssnnptr in self.vec.iter().flatten() {
225            let ptr = ssnnptr.non_null_ptr.as_ptr();
226            let pre_commit_fn = unsafe { (*ptr).pre_commit_fn };
227            ag._name = unsafe { (*ptr).name.clone() };
228            if let Err(err) = pre_commit_fn(ptr, &mut ag).await {
229                errors.push((ag._name.clone(), err));
230                break;
231            }
232        }
233        ag.join_and_collect_errors_async(&mut errors).await;
234
235        if !errors.is_empty() {
236            return Err(errs::Err::new(DataConnError::FailToPreCommitDataConn {
237                errors,
238            }));
239        }
240
241        let mut ag = AsyncGroup::new();
242        for ssnnptr in self.vec.iter().flatten() {
243            let ptr = ssnnptr.non_null_ptr.as_ptr();
244            let commit_fn = unsafe { (*ptr).commit_fn };
245            ag._name = unsafe { (*ptr).name.clone() };
246            if let Err(err) = commit_fn(ptr, &mut ag).await {
247                errors.push((ag._name.clone(), err));
248                break;
249            }
250        }
251        ag.join_and_collect_errors_async(&mut errors).await;
252
253        if !errors.is_empty() {
254            return Err(errs::Err::new(DataConnError::FailToCommitDataConn {
255                errors,
256            }));
257        }
258
259        let mut ag = AsyncGroup::new();
260        for ssnnptr in self.vec.iter().flatten() {
261            let ptr = ssnnptr.non_null_ptr.as_ptr();
262            let post_commit_fn = unsafe { (*ptr).post_commit_fn };
263            ag._name = unsafe { (*ptr).name.clone() };
264            post_commit_fn(ptr, &mut ag).await;
265        }
266        ag.join_and_ignore_errors_async().await;
267
268        Ok(())
269    }
270
271    pub(crate) async fn rollback_async(&mut self) {
272        let mut ag = AsyncGroup::new();
273        for ssnnptr in self.vec.iter().flatten() {
274            let ptr = ssnnptr.non_null_ptr.as_ptr();
275            let should_force_back_fn = unsafe { (*ptr).should_force_back_fn };
276            let force_back_fn = unsafe { (*ptr).force_back_fn };
277            let rollback_fn = unsafe { (*ptr).rollback_fn };
278            ag._name = unsafe { (*ptr).name.clone() };
279
280            if should_force_back_fn(ptr) {
281                force_back_fn(ptr, &mut ag).await;
282            } else {
283                rollback_fn(ptr, &mut ag).await;
284            }
285        }
286        ag.join_and_ignore_errors_async().await;
287    }
288
289    pub(crate) fn close(&mut self) {
290        self.index_map.clear();
291
292        let vec: Vec<Option<SendSyncNonNull<DataConnContainer>>> = mem::take(&mut self.vec);
293
294        for ssnnptr in vec.iter().flatten() {
295            let ptr = ssnnptr.non_null_ptr.as_ptr();
296            let close_fn = unsafe { (*ptr).close_fn };
297            let drop_fn = unsafe { (*ptr).drop_fn };
298            close_fn(ptr);
299            drop_fn(ptr);
300        }
301    }
302}
303
304impl Drop for DataConnManager {
305    fn drop(&mut self) {
306        self.close();
307    }
308}
309
310#[cfg(test)]
311mod tests_of_data_conn {
312    use super::*;
313    use std::ptr;
314    use std::sync::{
315        atomic::{AtomicBool, Ordering},
316        Arc, Mutex,
317    };
318    use tokio::time;
319
320    #[derive(PartialEq, Copy, Clone)]
321    enum Fail {
322        Not,
323        Commit,
324        PreCommit,
325    }
326
327    struct SyncDataConn {
328        id: i8,
329        committed: AtomicBool,
330        fail: Fail,
331        logger: Arc<Mutex<Vec<String>>>,
332    }
333    impl SyncDataConn {
334        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: Fail) -> Self {
335            logger
336                .lock()
337                .unwrap()
338                .push(format!("SyncDataConn::new {}", id));
339            Self {
340                id,
341                committed: AtomicBool::new(false),
342                fail,
343                logger,
344            }
345        }
346    }
347    impl Drop for SyncDataConn {
348        fn drop(&mut self) {
349            self.logger
350                .lock()
351                .unwrap()
352                .push(format!("SyncDataConn::drop {}", self.id));
353        }
354    }
355    impl DataConn for SyncDataConn {
356        async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
357            let fail = self.fail;
358            let id = self.id;
359            let logger = self.logger.clone();
360            let committed = &self.committed;
361
362            if fail == Fail::Commit {
363                logger
364                    .lock()
365                    .unwrap()
366                    .push(format!("SyncDataConn::commit {} failed", id));
367                return Err(errs::Err::new("ZZZ".to_string()));
368            }
369            committed.store(true, Ordering::Release);
370            logger
371                .lock()
372                .unwrap()
373                .push(format!("SyncDataConn::commit {}", id));
374            Ok(())
375        }
376        async fn pre_commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
377            let fail = self.fail;
378            let id = self.id;
379            let logger = self.logger.clone();
380
381            if fail == Fail::PreCommit {
382                logger
383                    .lock()
384                    .unwrap()
385                    .push(format!("SyncDataConn::pre_commit {} failed", id));
386                return Err(errs::Err::new("zzz".to_string()));
387            }
388            logger
389                .lock()
390                .unwrap()
391                .push(format!("SyncDataConn::pre_commit {}", id));
392            Ok(())
393        }
394        async fn post_commit_async(&mut self, _ag: &mut AsyncGroup) {
395            let id = self.id;
396            let logger = self.logger.clone();
397
398            logger
399                .lock()
400                .unwrap()
401                .push(format!("SyncDataConn::post_commit {}", id));
402        }
403        fn should_force_back(&self) -> bool {
404            self.committed.load(Ordering::Acquire)
405        }
406        async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {
407            let id = self.id;
408            let logger = self.logger.clone();
409
410            logger
411                .lock()
412                .unwrap()
413                .push(format!("SyncDataConn::rollback {}", id));
414        }
415        async fn force_back_async(&mut self, _ag: &mut AsyncGroup) {
416            let id = self.id;
417            let logger = self.logger.clone();
418
419            logger
420                .lock()
421                .unwrap()
422                .push(format!("SyncDataConn::force_back {}", id));
423        }
424        fn close(&mut self) {
425            self.logger
426                .lock()
427                .unwrap()
428                .push(format!("SyncDataConn::close {}", self.id));
429        }
430    }
431
432    struct AsyncDataConn {
433        id: i8,
434        committed: Arc<AtomicBool>,
435        fail: Fail,
436        logger: Arc<Mutex<Vec<String>>>,
437    }
438    impl AsyncDataConn {
439        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: Fail) -> Self {
440            logger
441                .lock()
442                .unwrap()
443                .push(format!("AsyncDataConn::new {}", id));
444            Self {
445                id,
446                committed: Arc::new(AtomicBool::new(false)),
447                fail,
448                logger,
449            }
450        }
451    }
452    impl Drop for AsyncDataConn {
453        fn drop(&mut self) {
454            self.logger
455                .lock()
456                .unwrap()
457                .push(format!("AsyncDataConn::drop {}", self.id));
458        }
459    }
460    impl DataConn for AsyncDataConn {
461        async fn commit_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
462            let fail = self.fail;
463            let id = self.id;
464            let logger = self.logger.clone();
465            let committed = self.committed.clone();
466
467            ag.add(async move {
468                time::sleep(time::Duration::from_millis(100)).await;
469                if fail == Fail::Commit {
470                    logger
471                        .lock()
472                        .unwrap()
473                        .push(format!("AsyncDataConn::commit {} failed", id));
474                    return Err(errs::Err::new("YYY".to_string()));
475                }
476                committed.store(true, Ordering::Release);
477                logger
478                    .lock()
479                    .unwrap()
480                    .push(format!("AsyncDataConn::commit {}", id));
481                Ok(())
482            });
483            Ok(())
484        }
485        async fn pre_commit_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
486            let fail = self.fail;
487            let id = self.id;
488            let logger = self.logger.clone();
489
490            ag.add(async move {
491                time::sleep(time::Duration::from_millis(100)).await;
492                if fail == Fail::PreCommit {
493                    logger
494                        .lock()
495                        .unwrap()
496                        .push(format!("AsyncDataConn::pre_commit {} failed", id));
497                    return Err(errs::Err::new("yyy".to_string()));
498                }
499                logger
500                    .lock()
501                    .unwrap()
502                    .push(format!("AsyncDataConn::pre_commit {}", id));
503                Ok(())
504            });
505            Ok(())
506        }
507        async fn post_commit_async(&mut self, ag: &mut AsyncGroup) {
508            let logger = self.logger.clone();
509            let id = self.id;
510
511            ag.add(async move {
512                time::sleep(time::Duration::from_millis(100)).await;
513                logger
514                    .lock()
515                    .unwrap()
516                    .push(format!("AsyncDataConn::post_commit {}", id));
517                Ok(())
518            });
519        }
520        fn should_force_back(&self) -> bool {
521            self.committed.load(Ordering::Acquire)
522        }
523        async fn rollback_async(&mut self, ag: &mut AsyncGroup) {
524            let logger = self.logger.clone();
525            let id = self.id;
526
527            ag.add(async move {
528                time::sleep(time::Duration::from_millis(100)).await;
529                logger
530                    .lock()
531                    .unwrap()
532                    .push(format!("AsyncDataConn::rollback {}", id));
533                Ok(())
534            });
535        }
536        async fn force_back_async(&mut self, ag: &mut AsyncGroup) {
537            let logger = self.logger.clone();
538            let id = self.id;
539
540            ag.add(async move {
541                time::sleep(time::Duration::from_millis(100)).await;
542                logger
543                    .lock()
544                    .unwrap()
545                    .push(format!("AsyncDataConn::force_back {}", id));
546                Ok(())
547            });
548        }
549        fn close(&mut self) {
550            self.logger
551                .lock()
552                .unwrap()
553                .push(format!("AsyncDataConn::close {}", self.id));
554        }
555    }
556
557    mod tests_of_data_conn_manager {
558        use super::*;
559
560        #[tokio::test]
561        async fn test_new() {
562            let manager = DataConnManager::new();
563            assert!(manager.vec.is_empty());
564        }
565
566        #[tokio::test]
567        async fn test_with_commit_order() {
568            let manager = DataConnManager::with_commit_order(&["bar", "baz", "foo"]);
569            assert_eq!(manager.vec.len(), 3);
570            assert!(manager.vec[0].is_none());
571            assert!(manager.vec[1].is_none());
572            assert!(manager.vec[2].is_none());
573            assert_eq!(manager.index_map.len(), 3);
574            assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
575            assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
576            assert_eq!(*manager.index_map.get("baz").unwrap(), 1);
577        }
578
579        #[test]
580        fn test_new_and_add() {
581            let logger = Arc::new(Mutex::new(Vec::new()));
582
583            let mut manager = DataConnManager::new();
584            assert!(manager.vec.is_empty());
585            assert!(manager.index_map.is_empty());
586
587            let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
588            let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
589            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
590            let ssnnptr = SendSyncNonNull::new(nnptr);
591            manager.add(ssnnptr);
592            assert_eq!(manager.vec.len(), 1);
593            assert_eq!(manager.index_map.len(), 1);
594            assert_eq!(*manager.index_map.get("foo").unwrap(), 0);
595
596            let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
597            let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
598            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
599            let ssnnptr = SendSyncNonNull::new(nnptr);
600            manager.add(ssnnptr);
601            assert_eq!(manager.vec.len(), 2);
602            assert_eq!(manager.index_map.len(), 2);
603            assert_eq!(*manager.index_map.get("foo").unwrap(), 0);
604            assert_eq!(*manager.index_map.get("bar").unwrap(), 1);
605        }
606
607        #[test]
608        fn test_with_commit_order_and_add() {
609            let logger = Arc::new(Mutex::new(Vec::new()));
610
611            let mut manager = DataConnManager::with_commit_order(&["bar", "baz", "foo"]);
612            assert_eq!(manager.vec.len(), 3);
613            assert!(manager.vec[0].is_none());
614            assert!(manager.vec[1].is_none());
615            assert!(manager.vec[2].is_none());
616            assert_eq!(manager.index_map.len(), 3);
617            assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
618            assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
619            assert_eq!(*manager.index_map.get("baz").unwrap(), 1);
620
621            let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
622            let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
623            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
624            let ssnnptr = SendSyncNonNull::new(nnptr);
625            manager.add(ssnnptr);
626            assert_eq!(manager.vec.len(), 3);
627            assert_eq!(manager.index_map.len(), 3);
628            assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
629
630            let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
631            let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
632            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
633            let ssnnptr = SendSyncNonNull::new(nnptr);
634            manager.add(ssnnptr);
635            assert_eq!(manager.vec.len(), 3);
636            assert_eq!(manager.index_map.len(), 3);
637            assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
638            assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
639
640            let conn = SyncDataConn::new(3, logger.clone(), Fail::Not);
641            let boxed = Box::new(DataConnContainer::new("qux", Box::new(conn)));
642            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
643            let ssnnptr = SendSyncNonNull::new(nnptr);
644            manager.add(ssnnptr);
645            assert_eq!(manager.vec.len(), 4);
646            assert_eq!(manager.index_map.len(), 4);
647            assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
648            assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
649            assert_eq!(*manager.index_map.get("qux").unwrap(), 3);
650        }
651
652        #[test]
653        fn test_find_by_name_but_none() {
654            let manager = DataConnManager::new();
655            assert!(manager.find_by_name("foo").is_none());
656            assert!(manager.find_by_name("bar").is_none());
657        }
658
659        #[test]
660        fn test_find_by_name_and_found() {
661            let logger = Arc::new(Mutex::new(Vec::new()));
662
663            let mut manager = DataConnManager::new();
664
665            let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
666            let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
667            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
668            let ssnnptr = SendSyncNonNull::new(nnptr);
669            manager.add(ssnnptr);
670
671            let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
672            let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
673            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
674            let ssnnptr = SendSyncNonNull::new(nnptr);
675            manager.add(ssnnptr);
676
677            if let Some(ssnnptr) = manager.find_by_name("foo") {
678                let name = unsafe { (*ssnnptr.non_null_ptr.as_ptr()).name.clone() };
679                assert_eq!(name.as_ref(), "foo");
680            } else {
681                panic!();
682            }
683
684            if let Some(ssnnptr) = manager.find_by_name("bar") {
685                let name = unsafe { (*ssnnptr.non_null_ptr.as_ptr()).name.clone() };
686                assert_eq!(name.as_ref(), "bar");
687            } else {
688                panic!();
689            }
690        }
691
692        #[test]
693        fn test_to_typed_ptr() {
694            let logger = Arc::new(Mutex::new(Vec::new()));
695
696            let mut manager = DataConnManager::new();
697
698            let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
699            let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
700            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
701            let ssnnptr = SendSyncNonNull::new(nnptr);
702            manager.add(ssnnptr);
703
704            let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
705            let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
706            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
707            let ssnnptr = SendSyncNonNull::new(nnptr);
708            manager.add(ssnnptr);
709
710            let nnptr = manager.find_by_name("foo").unwrap();
711            if let Ok(typed_nnptr) = DataConnManager::to_typed_ptr::<SyncDataConn>(&nnptr) {
712                assert_eq!(any::type_name_of_val(&typed_nnptr), "*mut sabi::tokio::DataConnContainer<sabi::tokio::data_conn::tests_of_data_conn::SyncDataConn>");
713                assert_eq!(unsafe { (*typed_nnptr).name.clone() }, "foo".into());
714            } else {
715                panic!();
716            }
717
718            let nnptr = manager.find_by_name("bar").unwrap();
719            if let Ok(typed_nnptr) = DataConnManager::to_typed_ptr::<AsyncDataConn>(&nnptr) {
720                assert_eq!(any::type_name_of_val(&typed_nnptr), "*mut sabi::tokio::DataConnContainer<sabi::tokio::data_conn::tests_of_data_conn::AsyncDataConn>");
721                assert_eq!(unsafe { (*typed_nnptr).name.clone() }, "bar".into());
722            } else {
723                panic!();
724            }
725        }
726
727        #[test]
728        fn test_to_typed_ptr_but_fail() {
729            let logger = Arc::new(Mutex::new(Vec::new()));
730
731            let mut manager = DataConnManager::new();
732
733            let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
734            let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
735            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
736            let ssnnptr = SendSyncNonNull::new(nnptr);
737            manager.add(ssnnptr);
738
739            let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
740            let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
741            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
742            let ssnnptr = SendSyncNonNull::new(nnptr);
743            manager.add(ssnnptr);
744
745            let ssnnptr = manager.find_by_name("foo").unwrap();
746            if let Err(err) = DataConnManager::to_typed_ptr::<AsyncDataConn>(&ssnnptr) {
747                match err.reason::<DataConnError>() {
748                    Ok(DataConnError::FailToCastDataConn { name, target_type }) => {
749                        assert_eq!(name.as_ref(), "foo");
750                        assert_eq!(
751                            *target_type,
752                            "sabi::tokio::data_conn::tests_of_data_conn::AsyncDataConn"
753                        );
754                    }
755                    _ => panic!(),
756                }
757            } else {
758                panic!();
759            }
760
761            let ssnnptr = manager.find_by_name("bar").unwrap();
762            if let Err(err) = DataConnManager::to_typed_ptr::<SyncDataConn>(&ssnnptr) {
763                match err.reason::<DataConnError>() {
764                    Ok(DataConnError::FailToCastDataConn { name, target_type }) => {
765                        assert_eq!(name.as_ref(), "bar");
766                        assert_eq!(
767                            *target_type,
768                            "sabi::tokio::data_conn::tests_of_data_conn::SyncDataConn"
769                        );
770                    }
771                    _ => panic!(),
772                }
773            } else {
774                panic!();
775            }
776        }
777
778        #[tokio::test]
779        async fn test_commit_ok() {
780            let logger = Arc::new(Mutex::new(Vec::new()));
781
782            {
783                let mut manager = DataConnManager::new();
784
785                let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
786                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
787                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
788                let ssnnptr = SendSyncNonNull::new(nnptr);
789                manager.add(ssnnptr);
790
791                let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
792                let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
793                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
794                let ssnnptr = SendSyncNonNull::new(nnptr);
795                manager.add(ssnnptr);
796
797                assert!(manager.commit_async().await.is_ok());
798            }
799
800            assert_eq!(
801                *logger.lock().unwrap(),
802                &[
803                    "SyncDataConn::new 1",
804                    "AsyncDataConn::new 2",
805                    "SyncDataConn::pre_commit 1",
806                    "AsyncDataConn::pre_commit 2",
807                    "SyncDataConn::commit 1",
808                    "AsyncDataConn::commit 2",
809                    "SyncDataConn::post_commit 1",
810                    "AsyncDataConn::post_commit 2",
811                    "SyncDataConn::close 1",
812                    "SyncDataConn::drop 1",
813                    "AsyncDataConn::close 2",
814                    "AsyncDataConn::drop 2",
815                ]
816            );
817        }
818
819        #[tokio::test]
820        async fn test_commit_with_order() {
821            let logger = Arc::new(Mutex::new(Vec::new()));
822
823            {
824                let mut manager = DataConnManager::with_commit_order(&["bar", "baz", "foo"]);
825
826                let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
827                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
828                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
829                let ssnnptr = SendSyncNonNull::new(nnptr);
830                manager.add(ssnnptr);
831
832                let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
833                let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
834                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
835                let ssnnptr = SendSyncNonNull::new(nnptr);
836                manager.add(ssnnptr);
837
838                let conn = SyncDataConn::new(3, logger.clone(), Fail::Not);
839                let boxed = Box::new(DataConnContainer::new("qux", Box::new(conn)));
840                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
841                let ssnnptr = SendSyncNonNull::new(nnptr);
842                manager.add(ssnnptr);
843
844                assert!(manager.commit_async().await.is_ok());
845            }
846
847            assert_eq!(
848                *logger.lock().unwrap(),
849                &[
850                    "SyncDataConn::new 1",
851                    "AsyncDataConn::new 2",
852                    "SyncDataConn::new 3",
853                    "SyncDataConn::pre_commit 1",
854                    "SyncDataConn::pre_commit 3",
855                    "AsyncDataConn::pre_commit 2", // because of async
856                    "SyncDataConn::commit 1",
857                    "SyncDataConn::commit 3",
858                    "AsyncDataConn::commit 2", // because of async
859                    "SyncDataConn::post_commit 1",
860                    "SyncDataConn::post_commit 3",
861                    "AsyncDataConn::post_commit 2", // because of async
862                    "AsyncDataConn::close 2",
863                    "AsyncDataConn::drop 2",
864                    "SyncDataConn::close 1",
865                    "SyncDataConn::drop 1",
866                    "SyncDataConn::close 3",
867                    "SyncDataConn::drop 3",
868                ]
869            );
870        }
871
872        #[tokio::test]
873        async fn test_commit_but_fail_first_sync_pre_commit() {
874            let logger = Arc::new(Mutex::new(Vec::new()));
875
876            {
877                let mut manager = DataConnManager::new();
878
879                let conn = SyncDataConn::new(1, logger.clone(), Fail::PreCommit);
880                let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
881                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
882                let ssnnptr = SendSyncNonNull::new(nnptr);
883                manager.add(ssnnptr);
884
885                let conn = AsyncDataConn::new(2, logger.clone(), Fail::PreCommit);
886                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
887                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
888                let ssnnptr = SendSyncNonNull::new(nnptr);
889                manager.add(ssnnptr);
890
891                if let Err(e) = manager.commit_async().await {
892                    match e.reason::<DataConnError>() {
893                        Ok(DataConnError::FailToPreCommitDataConn { errors }) => {
894                            assert_eq!(errors.len(), 1);
895                            assert_eq!(errors[0].0, "foo".into());
896                            assert_eq!(errors[0].1.reason::<String>().unwrap(), "zzz");
897                        }
898                        _ => panic!(),
899                    }
900                } else {
901                    panic!();
902                }
903            }
904
905            assert_eq!(
906                *logger.lock().unwrap(),
907                &[
908                    "SyncDataConn::new 1",
909                    "AsyncDataConn::new 2",
910                    "SyncDataConn::pre_commit 1 failed",
911                    "SyncDataConn::close 1",
912                    "SyncDataConn::drop 1",
913                    "AsyncDataConn::close 2",
914                    "AsyncDataConn::drop 2",
915                ]
916            );
917        }
918
919        #[tokio::test]
920        async fn test_commit_but_fail_first_async_pre_commit() {
921            let logger = Arc::new(Mutex::new(Vec::new()));
922
923            {
924                let mut manager = DataConnManager::new();
925
926                let conn = SyncDataConn::new(1, logger.clone(), Fail::PreCommit);
927                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
928                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
929                let ssnnptr = SendSyncNonNull::new(nnptr);
930                manager.add(ssnnptr);
931
932                let conn = AsyncDataConn::new(2, logger.clone(), Fail::PreCommit);
933                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
934                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
935                let ssnnptr = SendSyncNonNull::new(nnptr);
936                manager.add(ssnnptr);
937
938                if let Err(e) = manager.commit_async().await {
939                    match e.reason::<DataConnError>() {
940                        Ok(DataConnError::FailToPreCommitDataConn { errors }) => {
941                            assert_eq!(errors.len(), 1);
942                            assert_eq!(errors[0].0, "foo".into());
943                            assert_eq!(errors[0].1.reason::<String>().unwrap(), "zzz");
944                        }
945                        _ => panic!(),
946                    }
947                } else {
948                    panic!();
949                }
950            }
951
952            assert_eq!(
953                *logger.lock().unwrap(),
954                &[
955                    "SyncDataConn::new 1",
956                    "AsyncDataConn::new 2",
957                    "SyncDataConn::pre_commit 1 failed",
958                    "SyncDataConn::close 1",
959                    "SyncDataConn::drop 1",
960                    "AsyncDataConn::close 2",
961                    "AsyncDataConn::drop 2",
962                ]
963            );
964        }
965
966        #[tokio::test]
967        async fn test_commit_but_fail_second_pre_commit() {
968            let logger = Arc::new(Mutex::new(Vec::new()));
969
970            {
971                let mut manager = DataConnManager::new();
972
973                let conn = AsyncDataConn::new(1, logger.clone(), Fail::PreCommit);
974                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
975                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
976                let ssnnptr = SendSyncNonNull::new(nnptr);
977                manager.add(ssnnptr);
978
979                let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
980                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
981                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
982                let ssnnptr = SendSyncNonNull::new(nnptr);
983                manager.add(ssnnptr);
984
985                if let Err(e) = manager.commit_async().await {
986                    match e.reason::<DataConnError>() {
987                        Ok(DataConnError::FailToPreCommitDataConn { errors }) => {
988                            assert_eq!(errors.len(), 1);
989                            assert_eq!(errors[0].0, "foo".into());
990                            assert_eq!(errors[0].1.reason::<String>().unwrap(), "yyy");
991                        }
992                        _ => panic!(),
993                    }
994                } else {
995                    panic!();
996                }
997            }
998
999            assert_eq!(
1000                *logger.lock().unwrap(),
1001                &[
1002                    "AsyncDataConn::new 1",
1003                    "SyncDataConn::new 2",
1004                    "SyncDataConn::pre_commit 2",
1005                    "AsyncDataConn::pre_commit 1 failed",
1006                    "AsyncDataConn::close 1",
1007                    "AsyncDataConn::drop 1",
1008                    "SyncDataConn::close 2",
1009                    "SyncDataConn::drop 2",
1010                ]
1011            );
1012        }
1013
1014        #[tokio::test]
1015        async fn test_commit_but_fail_first_sync_commit() {
1016            let logger = Arc::new(Mutex::new(Vec::new()));
1017
1018            {
1019                let mut manager = DataConnManager::new();
1020
1021                let conn = SyncDataConn::new(1, logger.clone(), Fail::Commit);
1022                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1023                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1024                let ssnnptr = SendSyncNonNull::new(nnptr);
1025                manager.add(ssnnptr);
1026
1027                let conn = AsyncDataConn::new(2, logger.clone(), Fail::Commit);
1028                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1029                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1030                let ssnnptr = SendSyncNonNull::new(nnptr);
1031                manager.add(ssnnptr);
1032
1033                if let Err(e) = manager.commit_async().await {
1034                    match e.reason::<DataConnError>() {
1035                        Ok(DataConnError::FailToCommitDataConn { errors }) => {
1036                            assert_eq!(errors.len(), 1);
1037                            assert_eq!(errors[0].0, "foo".into());
1038                            assert_eq!(errors[0].1.reason::<String>().unwrap(), "ZZZ");
1039                        }
1040                        _ => panic!(),
1041                    }
1042                } else {
1043                    panic!();
1044                }
1045            }
1046
1047            assert_eq!(
1048                *logger.lock().unwrap(),
1049                &[
1050                    "SyncDataConn::new 1",
1051                    "AsyncDataConn::new 2",
1052                    "SyncDataConn::pre_commit 1",
1053                    "AsyncDataConn::pre_commit 2",
1054                    "SyncDataConn::commit 1 failed",
1055                    "SyncDataConn::close 1",
1056                    "SyncDataConn::drop 1",
1057                    "AsyncDataConn::close 2",
1058                    "AsyncDataConn::drop 2",
1059                ]
1060            );
1061        }
1062
1063        #[tokio::test]
1064        async fn test_commit_but_fail_first_async_commit() {
1065            let logger = Arc::new(Mutex::new(Vec::new()));
1066
1067            {
1068                let mut manager = DataConnManager::new();
1069
1070                let conn = AsyncDataConn::new(1, logger.clone(), Fail::Commit);
1071                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1072                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1073                let ssnnptr = SendSyncNonNull::new(nnptr);
1074                manager.add(ssnnptr);
1075
1076                let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
1077                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1078                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1079                let ssnnptr = SendSyncNonNull::new(nnptr);
1080                manager.add(ssnnptr);
1081
1082                if let Err(e) = manager.commit_async().await {
1083                    match e.reason::<DataConnError>() {
1084                        Ok(DataConnError::FailToCommitDataConn { errors }) => {
1085                            assert_eq!(errors.len(), 1);
1086                            assert_eq!(errors[0].0, "foo".into());
1087                            assert_eq!(errors[0].1.reason::<String>().unwrap(), "YYY");
1088                        }
1089                        _ => panic!(),
1090                    }
1091                } else {
1092                    panic!();
1093                }
1094            }
1095
1096            assert_eq!(
1097                *logger.lock().unwrap(),
1098                &[
1099                    "AsyncDataConn::new 1",
1100                    "SyncDataConn::new 2",
1101                    "SyncDataConn::pre_commit 2",
1102                    "AsyncDataConn::pre_commit 1",
1103                    "SyncDataConn::commit 2",
1104                    "AsyncDataConn::commit 1 failed",
1105                    "AsyncDataConn::close 1",
1106                    "AsyncDataConn::drop 1",
1107                    "SyncDataConn::close 2",
1108                    "SyncDataConn::drop 2",
1109                ]
1110            );
1111        }
1112
1113        #[tokio::test]
1114        async fn test_commit_but_fail_second_commit() {
1115            let logger = Arc::new(Mutex::new(Vec::new()));
1116
1117            {
1118                let mut manager = DataConnManager::new();
1119
1120                let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
1121                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1122                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1123                let ssnnptr = SendSyncNonNull::new(nnptr);
1124                manager.add(ssnnptr);
1125
1126                let conn = AsyncDataConn::new(2, logger.clone(), Fail::Commit);
1127                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1128                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1129                let ssnnptr = SendSyncNonNull::new(nnptr);
1130                manager.add(ssnnptr);
1131
1132                if let Err(e) = manager.commit_async().await {
1133                    match e.reason::<DataConnError>() {
1134                        Ok(DataConnError::FailToCommitDataConn { errors }) => {
1135                            assert_eq!(errors.len(), 1);
1136                            assert_eq!(errors[0].0, "bar".into());
1137                            assert_eq!(errors[0].1.reason::<String>().unwrap(), "YYY");
1138                        }
1139                        _ => panic!(),
1140                    }
1141                } else {
1142                    panic!();
1143                }
1144            }
1145
1146            assert_eq!(
1147                *logger.lock().unwrap(),
1148                &[
1149                    "SyncDataConn::new 1",
1150                    "AsyncDataConn::new 2",
1151                    "SyncDataConn::pre_commit 1",
1152                    "AsyncDataConn::pre_commit 2",
1153                    "SyncDataConn::commit 1",
1154                    "AsyncDataConn::commit 2 failed",
1155                    "SyncDataConn::close 1",
1156                    "SyncDataConn::drop 1",
1157                    "AsyncDataConn::close 2",
1158                    "AsyncDataConn::drop 2",
1159                ]
1160            );
1161        }
1162
1163        #[tokio::test]
1164        async fn test_rollback_and_first_is_sync() {
1165            let logger = Arc::new(Mutex::new(Vec::new()));
1166
1167            {
1168                let mut manager = DataConnManager::new();
1169
1170                let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
1171                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1172                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1173                let ssnnptr = SendSyncNonNull::new(nnptr);
1174                manager.add(ssnnptr);
1175
1176                let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
1177                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1178                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1179                let ssnnptr = SendSyncNonNull::new(nnptr);
1180                manager.add(ssnnptr);
1181
1182                manager.rollback_async().await;
1183            }
1184
1185            assert_eq!(
1186                *logger.lock().unwrap(),
1187                &[
1188                    "SyncDataConn::new 1",
1189                    "AsyncDataConn::new 2",
1190                    "SyncDataConn::rollback 1",
1191                    "AsyncDataConn::rollback 2",
1192                    "SyncDataConn::close 1",
1193                    "SyncDataConn::drop 1",
1194                    "AsyncDataConn::close 2",
1195                    "AsyncDataConn::drop 2",
1196                ]
1197            );
1198        }
1199
1200        #[tokio::test]
1201        async fn test_rollback_and_first_is_async() {
1202            let logger = Arc::new(Mutex::new(Vec::new()));
1203
1204            {
1205                let mut manager = DataConnManager::new();
1206
1207                let conn = AsyncDataConn::new(1, logger.clone(), Fail::Not);
1208                let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
1209                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1210                let ssnnptr = SendSyncNonNull::new(nnptr);
1211                manager.add(ssnnptr);
1212
1213                let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
1214                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1215                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1216                let ssnnptr = SendSyncNonNull::new(nnptr);
1217                manager.add(ssnnptr);
1218
1219                manager.rollback_async().await;
1220            }
1221
1222            assert_eq!(
1223                *logger.lock().unwrap(),
1224                &[
1225                    "AsyncDataConn::new 1",
1226                    "SyncDataConn::new 2",
1227                    "SyncDataConn::rollback 2",
1228                    "AsyncDataConn::rollback 1",
1229                    "AsyncDataConn::close 1",
1230                    "AsyncDataConn::drop 1",
1231                    "SyncDataConn::close 2",
1232                    "SyncDataConn::drop 2",
1233                ]
1234            );
1235        }
1236
1237        #[tokio::test]
1238        async fn test_force_back_and_first_is_sync() {
1239            let logger = Arc::new(Mutex::new(Vec::new()));
1240
1241            {
1242                let mut manager = DataConnManager::new();
1243
1244                let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
1245                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1246                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1247                let ssnnptr = SendSyncNonNull::new(nnptr);
1248                manager.add(ssnnptr);
1249
1250                let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
1251                let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
1252                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1253                let ssnnptr = SendSyncNonNull::new(nnptr);
1254                manager.add(ssnnptr);
1255
1256                assert!(manager.commit_async().await.is_ok());
1257                manager.rollback_async().await;
1258            }
1259
1260            assert_eq!(
1261                *logger.lock().unwrap(),
1262                &[
1263                    "SyncDataConn::new 1",
1264                    "AsyncDataConn::new 2",
1265                    "SyncDataConn::pre_commit 1",
1266                    "AsyncDataConn::pre_commit 2",
1267                    "SyncDataConn::commit 1",
1268                    "AsyncDataConn::commit 2",
1269                    "SyncDataConn::post_commit 1",
1270                    "AsyncDataConn::post_commit 2",
1271                    "SyncDataConn::force_back 1",
1272                    "AsyncDataConn::force_back 2",
1273                    "SyncDataConn::close 1",
1274                    "SyncDataConn::drop 1",
1275                    "AsyncDataConn::close 2",
1276                    "AsyncDataConn::drop 2",
1277                ]
1278            );
1279        }
1280
1281        #[tokio::test]
1282        async fn test_force_back_and_first_is_async() {
1283            let logger = Arc::new(Mutex::new(Vec::new()));
1284
1285            {
1286                let mut manager = DataConnManager::new();
1287
1288                let conn = AsyncDataConn::new(1, logger.clone(), Fail::Not);
1289                let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
1290                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1291                let ssnnptr = SendSyncNonNull::new(nnptr);
1292                manager.add(ssnnptr);
1293
1294                let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
1295                let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
1296                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1297                let ssnnptr = SendSyncNonNull::new(nnptr);
1298                manager.add(ssnnptr);
1299
1300                assert!(manager.commit_async().await.is_ok());
1301                manager.rollback_async().await;
1302            }
1303
1304            assert_eq!(
1305                *logger.lock().unwrap(),
1306                &[
1307                    "AsyncDataConn::new 1",
1308                    "SyncDataConn::new 2",
1309                    "SyncDataConn::pre_commit 2",
1310                    "AsyncDataConn::pre_commit 1",
1311                    "SyncDataConn::commit 2",
1312                    "AsyncDataConn::commit 1",
1313                    "SyncDataConn::post_commit 2",
1314                    "AsyncDataConn::post_commit 1",
1315                    "SyncDataConn::force_back 2",
1316                    "AsyncDataConn::force_back 1",
1317                    "AsyncDataConn::close 1",
1318                    "AsyncDataConn::drop 1",
1319                    "SyncDataConn::close 2",
1320                    "SyncDataConn::drop 2",
1321                ]
1322            );
1323        }
1324    }
1325}