Skip to main content

sabi/
data_conn.rs

1// Copyright (C) 2024-2026 Takayuki Sato. All Rights Reserved.
2// This program is free software under MIT License.
3// See the file LICENSE in this distribution for more details.
4
5use crate::{AsyncGroup, DataConn, DataConnContainer, DataConnManager, SendSyncNonNull};
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::{any, mem};
10
11/// An enum type representing the reasons for errors that can occur within `DataConn` operations.
12#[allow(clippy::enum_variant_names)]
13#[derive(Debug)]
14pub enum DataConnError {
15    /// Indicates a failure during the pre-commit process of one or more [`DataConn`] instances
16    /// involved in a transaction.
17    /// Contains a vector of data connection names and their corresponding errors.
18    FailToPreCommitDataConn {
19        /// The vector contains errors that occurred in each [`DataConn`] object.
20        errors: Vec<(Arc<str>, errs::Err)>,
21    },
22
23    /// Indicates a failure during the commit process of one or more [`DataConn`] instances
24    /// involved in a transaction.
25    /// Contains a vector of data connection names and their corresponding errors.
26    FailToCommitDataConn {
27        /// The vector contains errors that occurred in each [`DataConn`] object.
28        errors: Vec<(Arc<str>, errs::Err)>,
29    },
30
31    /// Indicates a failure to cast a retrieved [`DataConn`] to the expected type.
32    FailToCastDataConn {
33        /// The name of the data connection that failed to cast.
34        name: Arc<str>,
35
36        /// The type name to which the [`DataConn`] attempted to cast.
37        target_type: &'static str,
38    },
39}
40
41impl<C> DataConnContainer<C>
42where
43    C: DataConn + 'static,
44{
45    pub(crate) fn new(name: impl Into<Arc<str>>, data_conn: Box<C>) -> Self {
46        Self {
47            drop_fn: drop_data_conn::<C>,
48            is_fn: is_data_conn::<C>,
49            commit_fn: commit_data_conn::<C>,
50            pre_commit_fn: pre_commit_data_conn::<C>,
51            post_commit_fn: post_commit_data_conn::<C>,
52            should_force_back_fn: should_force_back_data_conn::<C>,
53            rollback_fn: rollback_data_conn::<C>,
54            force_back_fn: force_back_data_conn::<C>,
55            close_fn: close_data_conn::<C>,
56
57            name: name.into(),
58            data_conn,
59        }
60    }
61}
62
63fn drop_data_conn<C>(ptr: *const DataConnContainer)
64where
65    C: DataConn + 'static,
66{
67    let typed_ptr = ptr as *mut DataConnContainer<C>;
68    unsafe {
69        drop(Box::from_raw(typed_ptr));
70    }
71}
72
73fn is_data_conn<C>(type_id: any::TypeId) -> bool
74where
75    C: DataConn + 'static,
76{
77    any::TypeId::of::<C>() == type_id
78}
79
80fn commit_data_conn<C>(ptr: *const DataConnContainer, ag: &mut AsyncGroup) -> errs::Result<()>
81where
82    C: DataConn + 'static,
83{
84    let typed_ptr = ptr as *mut DataConnContainer<C>;
85    unsafe { (*typed_ptr).data_conn.commit(ag) }
86}
87
88fn pre_commit_data_conn<C>(ptr: *const DataConnContainer, ag: &mut AsyncGroup) -> errs::Result<()>
89where
90    C: DataConn + 'static,
91{
92    let typed_ptr = ptr as *mut DataConnContainer<C>;
93    unsafe { (*typed_ptr).data_conn.pre_commit(ag) }
94}
95
96fn post_commit_data_conn<C>(ptr: *const DataConnContainer, ag: &mut AsyncGroup)
97where
98    C: DataConn + 'static,
99{
100    let typed_ptr = ptr as *mut DataConnContainer<C>;
101    unsafe {
102        (*typed_ptr).data_conn.post_commit(ag);
103    }
104}
105
106fn should_force_back_data_conn<C>(ptr: *const DataConnContainer) -> bool
107where
108    C: DataConn + 'static,
109{
110    let typed_ptr = ptr as *mut DataConnContainer<C>;
111    unsafe { (*typed_ptr).data_conn.should_force_back() }
112}
113
114fn rollback_data_conn<C>(ptr: *const DataConnContainer, ag: &mut AsyncGroup)
115where
116    C: DataConn + 'static,
117{
118    let typed_ptr = ptr as *mut DataConnContainer<C>;
119    unsafe {
120        (*typed_ptr).data_conn.rollback(ag);
121    }
122}
123
124fn force_back_data_conn<C>(ptr: *const DataConnContainer, ag: &mut AsyncGroup)
125where
126    C: DataConn + 'static,
127{
128    let typed_ptr = ptr as *mut DataConnContainer<C>;
129    unsafe {
130        (*typed_ptr).data_conn.force_back(ag);
131    }
132}
133
134fn close_data_conn<C>(ptr: *const DataConnContainer)
135where
136    C: DataConn + 'static,
137{
138    let typed_ptr = ptr as *mut DataConnContainer<C>;
139    unsafe {
140        (*typed_ptr).data_conn.close();
141    }
142}
143
144impl DataConnManager {
145    pub(crate) fn new() -> Self {
146        Self {
147            vec: Vec::new(),
148            index_map: HashMap::new(),
149        }
150    }
151
152    pub(crate) fn with_commit_order(names: &[&str]) -> Self {
153        let mut index_map = HashMap::with_capacity(names.len());
154        // To overwrite later indexed elements with eariler ones when names overlap
155        for (i, nm) in names.iter().rev().enumerate() {
156            index_map.insert((*nm).into(), names.len() - 1 - i);
157        }
158
159        Self {
160            vec: vec![None; names.len()],
161            index_map,
162        }
163    }
164
165    pub(crate) fn add(&mut self, ssnnptr: SendSyncNonNull<DataConnContainer>) {
166        let name = unsafe { (*ssnnptr.non_null_ptr.as_ptr()).name.clone() };
167        if let Some(index) = self.index_map.get(&name) {
168            self.vec[*index] = Some(ssnnptr);
169        } else {
170            let index = self.vec.len();
171            self.vec.push(Some(ssnnptr));
172            self.index_map.insert(name.clone(), index);
173        }
174    }
175
176    pub(crate) fn find_by_name(
177        &self,
178        name: impl AsRef<str>,
179    ) -> Option<SendSyncNonNull<DataConnContainer>> {
180        if let Some(index) = self.index_map.get(name.as_ref()) {
181            if *index < self.vec.len() {
182                if let Some(ssnnptr) = &self.vec[*index] {
183                    let ptr = ssnnptr.non_null_ptr.as_ptr();
184                    let cont_name = unsafe { &(*ptr).name };
185                    if cont_name.as_ref() == name.as_ref() {
186                        return Some(ssnnptr.clone());
187                    }
188                }
189            }
190        }
191
192        None
193    }
194
195    pub(crate) fn to_typed_ptr<C>(
196        ssnnptr: &SendSyncNonNull<DataConnContainer>,
197    ) -> errs::Result<*mut DataConnContainer<C>>
198    where
199        C: DataConn + 'static,
200    {
201        let ptr = ssnnptr.non_null_ptr.as_ptr();
202        let name = unsafe { &(*ptr).name };
203        let type_id = any::TypeId::of::<C>();
204        let is_fn = unsafe { (*ptr).is_fn };
205
206        if !is_fn(type_id) {
207            return Err(errs::Err::new(DataConnError::FailToCastDataConn {
208                name: name.clone(),
209                target_type: any::type_name::<C>(),
210            }));
211        }
212
213        let typed_ptr = ptr as *mut DataConnContainer<C>;
214        Ok(typed_ptr)
215    }
216
217    pub(crate) fn commit(&self) -> errs::Result<()> {
218        let mut errors = Vec::new();
219
220        let mut ag = AsyncGroup::new();
221        for ssnnptr in self.vec.iter().flatten() {
222            let ptr = ssnnptr.non_null_ptr.as_ptr();
223            let pre_commit_fn = unsafe { (*ptr).pre_commit_fn };
224            ag._name = unsafe { (*ptr).name.clone() };
225            if let Err(err) = pre_commit_fn(ptr, &mut ag) {
226                errors.push((ag._name.clone(), err));
227                break;
228            }
229        }
230        ag.join_and_collect_errors(&mut errors);
231
232        if !errors.is_empty() {
233            return Err(errs::Err::new(DataConnError::FailToPreCommitDataConn {
234                errors,
235            }));
236        }
237
238        let mut ag = AsyncGroup::new();
239        for ssnnptr in self.vec.iter().flatten() {
240            let ptr = ssnnptr.non_null_ptr.as_ptr();
241            let commit_fn = unsafe { (*ptr).commit_fn };
242            ag._name = unsafe { (*ptr).name.clone() };
243            if let Err(err) = commit_fn(ptr, &mut ag) {
244                errors.push((ag._name.clone(), err));
245                break;
246            }
247        }
248        ag.join_and_collect_errors(&mut errors);
249
250        if !errors.is_empty() {
251            return Err(errs::Err::new(DataConnError::FailToCommitDataConn {
252                errors,
253            }));
254        }
255
256        let mut ag = AsyncGroup::new();
257        for ssnnptr in self.vec.iter().flatten() {
258            let ptr = ssnnptr.non_null_ptr.as_ptr();
259            let post_commit_fn = unsafe { (*ptr).post_commit_fn };
260            ag._name = unsafe { (*ptr).name.clone() };
261            post_commit_fn(ptr, &mut ag);
262        }
263        ag.join_and_ignore_errors();
264
265        Ok(())
266    }
267
268    pub(crate) fn rollback(&mut self) {
269        let mut ag = AsyncGroup::new();
270        for ssnnptr in self.vec.iter().flatten() {
271            let ptr = ssnnptr.non_null_ptr.as_ptr();
272            let should_force_back_fn = unsafe { (*ptr).should_force_back_fn };
273            let force_back_fn = unsafe { (*ptr).force_back_fn };
274            let rollback_fn = unsafe { (*ptr).rollback_fn };
275            ag._name = unsafe { (*ptr).name.clone() };
276
277            if should_force_back_fn(ptr) {
278                force_back_fn(ptr, &mut ag);
279            } else {
280                rollback_fn(ptr, &mut ag);
281            }
282        }
283        ag.join_and_ignore_errors();
284    }
285
286    pub(crate) fn close(&mut self) {
287        self.index_map.clear();
288
289        let vec: Vec<Option<SendSyncNonNull<DataConnContainer>>> = mem::take(&mut self.vec);
290
291        for ssnnptr in vec.iter().flatten() {
292            let ptr = ssnnptr.non_null_ptr.as_ptr();
293            let close_fn = unsafe { (*ptr).close_fn };
294            let drop_fn = unsafe { (*ptr).drop_fn };
295            close_fn(ptr);
296            drop_fn(ptr);
297        }
298    }
299}
300
301impl Drop for DataConnManager {
302    fn drop(&mut self) {
303        self.close();
304    }
305}
306
307#[cfg(test)]
308mod tests_of_data_conn {
309    use super::*;
310    use std::sync::{
311        atomic::{AtomicBool, Ordering},
312        Arc, Mutex,
313    };
314    use std::{ptr, thread, time};
315
316    #[derive(PartialEq, Copy, Clone)]
317    enum Fail {
318        Not,
319        Commit,
320        PreCommit,
321    }
322
323    struct SyncDataConn {
324        id: i8,
325        committed: bool,
326        fail: Fail,
327        logger: Arc<Mutex<Vec<String>>>,
328    }
329    impl SyncDataConn {
330        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: Fail) -> Self {
331            logger
332                .lock()
333                .unwrap()
334                .push(format!("SyncDataConn::new {}", id));
335            Self {
336                id,
337                committed: false,
338                fail,
339                logger,
340            }
341        }
342    }
343    impl Drop for SyncDataConn {
344        fn drop(&mut self) {
345            self.logger
346                .lock()
347                .unwrap()
348                .push(format!("SyncDataConn::drop {}", self.id));
349        }
350    }
351    impl DataConn for SyncDataConn {
352        fn commit(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
353            if self.fail == Fail::Commit {
354                self.logger
355                    .lock()
356                    .unwrap()
357                    .push(format!("SyncDataConn::commit {} failed", self.id));
358                return Err(errs::Err::new("ZZZ".to_string()));
359            }
360            self.committed = true;
361            self.logger
362                .lock()
363                .unwrap()
364                .push(format!("SyncDataConn::commit {}", self.id));
365            Ok(())
366        }
367        fn pre_commit(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
368            if self.fail == Fail::PreCommit {
369                self.logger
370                    .lock()
371                    .unwrap()
372                    .push(format!("SyncDataConn::pre_commit {} failed", self.id));
373                return Err(errs::Err::new("zzz".to_string()));
374            }
375            self.logger
376                .lock()
377                .unwrap()
378                .push(format!("SyncDataConn::pre_commit {}", self.id));
379            Ok(())
380        }
381        fn post_commit(&mut self, _ag: &mut AsyncGroup) {
382            self.logger
383                .lock()
384                .unwrap()
385                .push(format!("SyncDataConn::post_commit {}", self.id));
386        }
387        fn should_force_back(&self) -> bool {
388            self.committed
389        }
390        fn rollback(&mut self, _ag: &mut AsyncGroup) {
391            self.logger
392                .lock()
393                .unwrap()
394                .push(format!("SyncDataConn::rollback {}", self.id));
395        }
396        fn force_back(&mut self, _ag: &mut AsyncGroup) {
397            self.logger
398                .lock()
399                .unwrap()
400                .push(format!("SyncDataConn::force_back {}", self.id));
401        }
402        fn close(&mut self) {
403            self.logger
404                .lock()
405                .unwrap()
406                .push(format!("SyncDataConn::close {}", self.id));
407        }
408    }
409
410    struct AsyncDataConn {
411        id: i8,
412        committed: Arc<AtomicBool>,
413        fail: Fail,
414        logger: Arc<Mutex<Vec<String>>>,
415    }
416    impl AsyncDataConn {
417        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: Fail) -> Self {
418            logger
419                .lock()
420                .unwrap()
421                .push(format!("AsyncDataConn::new {}", id));
422            Self {
423                id,
424                committed: Arc::new(AtomicBool::new(false)),
425                fail,
426                logger,
427            }
428        }
429    }
430    impl Drop for AsyncDataConn {
431        fn drop(&mut self) {
432            self.logger
433                .lock()
434                .unwrap()
435                .push(format!("AsyncDataConn::drop {}", self.id));
436        }
437    }
438    impl DataConn for AsyncDataConn {
439        fn commit(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
440            let fail = self.fail;
441            let logger = self.logger.clone();
442            let id = self.id;
443            let committed = self.committed.clone();
444            ag.add(move || {
445                thread::sleep(time::Duration::from_millis(100));
446                if fail == Fail::Commit {
447                    logger
448                        .lock()
449                        .unwrap()
450                        .push(format!("AsyncDataConn::commit {} failed", id));
451                    return Err(errs::Err::new("YYY".to_string()));
452                }
453                committed.store(true, Ordering::Release);
454                logger
455                    .lock()
456                    .unwrap()
457                    .push(format!("AsyncDataConn::commit {}", id));
458                Ok(())
459            });
460            Ok(())
461        }
462        fn pre_commit(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
463            let fail = self.fail;
464            let logger = self.logger.clone();
465            let id = self.id;
466            ag.add(move || {
467                thread::sleep(time::Duration::from_millis(100));
468                if fail == Fail::PreCommit {
469                    logger
470                        .lock()
471                        .unwrap()
472                        .push(format!("AsyncDataConn::pre_commit {} failed", id));
473                    return Err(errs::Err::new("yyy".to_string()));
474                }
475                logger
476                    .lock()
477                    .unwrap()
478                    .push(format!("AsyncDataConn::pre_commit {}", id));
479                Ok(())
480            });
481            Ok(())
482        }
483        fn post_commit(&mut self, ag: &mut AsyncGroup) {
484            let logger = self.logger.clone();
485            let id = self.id;
486            ag.add(move || {
487                thread::sleep(time::Duration::from_millis(100));
488                logger
489                    .lock()
490                    .unwrap()
491                    .push(format!("AsyncDataConn::post_commit {}", id));
492                Ok(())
493            });
494        }
495        fn should_force_back(&self) -> bool {
496            self.committed.load(Ordering::Acquire)
497        }
498        fn rollback(&mut self, ag: &mut AsyncGroup) {
499            let logger = self.logger.clone();
500            let id = self.id;
501            ag.add(move || {
502                thread::sleep(time::Duration::from_millis(100));
503                logger
504                    .lock()
505                    .unwrap()
506                    .push(format!("AsyncDataConn::rollback {}", id));
507                Ok(())
508            });
509        }
510        fn force_back(&mut self, ag: &mut AsyncGroup) {
511            let logger = self.logger.clone();
512            let id = self.id;
513            ag.add(move || {
514                thread::sleep(time::Duration::from_millis(100));
515                logger
516                    .lock()
517                    .unwrap()
518                    .push(format!("AsyncDataConn::force_back {}", id));
519                Ok(())
520            });
521        }
522        fn close(&mut self) {
523            self.logger
524                .lock()
525                .unwrap()
526                .push(format!("AsyncDataConn::close {}", self.id));
527        }
528    }
529
530    mod tests_of_data_conn_manager {
531        use super::*;
532
533        #[test]
534        fn test_new() {
535            let manager = DataConnManager::new();
536            assert!(manager.vec.is_empty());
537            assert!(manager.index_map.is_empty());
538        }
539
540        #[test]
541        fn test_with_commit_order() {
542            let manager = DataConnManager::with_commit_order(&["bar", "baz", "foo"]);
543            assert_eq!(manager.vec.len(), 3);
544            assert!(manager.vec[0].is_none());
545            assert!(manager.vec[1].is_none());
546            assert!(manager.vec[2].is_none());
547            assert_eq!(manager.index_map.len(), 3);
548            assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
549            assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
550            assert_eq!(*manager.index_map.get("baz").unwrap(), 1);
551        }
552
553        #[test]
554        fn test_new_and_add() {
555            let logger = Arc::new(Mutex::new(Vec::new()));
556
557            let mut manager = DataConnManager::new();
558            assert!(manager.vec.is_empty());
559            assert!(manager.index_map.is_empty());
560
561            let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
562            let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
563            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
564            let ssnnptr = SendSyncNonNull::new(nnptr);
565            manager.add(ssnnptr);
566            assert_eq!(manager.vec.len(), 1);
567            assert_eq!(manager.index_map.len(), 1);
568            assert_eq!(*manager.index_map.get("foo").unwrap(), 0);
569
570            let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
571            let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
572            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
573            let ssnnptr = SendSyncNonNull::new(nnptr);
574            manager.add(ssnnptr);
575            assert_eq!(manager.vec.len(), 2);
576            assert_eq!(manager.index_map.len(), 2);
577            assert_eq!(*manager.index_map.get("foo").unwrap(), 0);
578            assert_eq!(*manager.index_map.get("bar").unwrap(), 1);
579        }
580
581        #[test]
582        fn test_with_commit_order_and_add() {
583            let logger = Arc::new(Mutex::new(Vec::new()));
584
585            let mut manager = DataConnManager::with_commit_order(&["bar", "baz", "foo"]);
586            assert_eq!(manager.vec.len(), 3);
587            assert!(manager.vec[0].is_none());
588            assert!(manager.vec[1].is_none());
589            assert!(manager.vec[2].is_none());
590            assert_eq!(manager.index_map.len(), 3);
591            assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
592            assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
593            assert_eq!(*manager.index_map.get("baz").unwrap(), 1);
594
595            let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
596            let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
597            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
598            let ssnnptr = SendSyncNonNull::new(nnptr);
599            manager.add(ssnnptr);
600            assert_eq!(manager.vec.len(), 3);
601            assert_eq!(manager.index_map.len(), 3);
602            assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
603
604            let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
605            let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
606            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
607            let ssnnptr = SendSyncNonNull::new(nnptr);
608            manager.add(ssnnptr);
609            assert_eq!(manager.vec.len(), 3);
610            assert_eq!(manager.index_map.len(), 3);
611            assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
612            assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
613
614            let conn = SyncDataConn::new(3, logger.clone(), Fail::Not);
615            let boxed = Box::new(DataConnContainer::new("qux", Box::new(conn)));
616            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
617            let ssnnptr = SendSyncNonNull::new(nnptr);
618            manager.add(ssnnptr);
619            assert_eq!(manager.vec.len(), 4);
620            assert_eq!(manager.index_map.len(), 4);
621            assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
622            assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
623            assert_eq!(*manager.index_map.get("qux").unwrap(), 3);
624        }
625
626        #[test]
627        fn test_find_by_name_but_none() {
628            let manager = DataConnManager::new();
629            assert!(manager.find_by_name("foo").is_none());
630            assert!(manager.find_by_name("bar").is_none());
631        }
632
633        #[test]
634        fn test_find_by_name_and_found() {
635            let logger = Arc::new(Mutex::new(Vec::new()));
636
637            let mut manager = DataConnManager::new();
638
639            let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
640            let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
641            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
642            let ssnnptr = SendSyncNonNull::new(nnptr);
643            manager.add(ssnnptr);
644
645            let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
646            let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
647            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
648            let ssnnptr = SendSyncNonNull::new(nnptr);
649            manager.add(ssnnptr);
650
651            if let Some(ssnnptr) = manager.find_by_name("foo") {
652                let name = unsafe { (*ssnnptr.non_null_ptr.as_ptr()).name.clone() };
653                assert_eq!(name.as_ref(), "foo");
654            } else {
655                panic!();
656            }
657
658            if let Some(ssnnptr) = manager.find_by_name("bar") {
659                let name = unsafe { (*ssnnptr.non_null_ptr.as_ptr()).name.clone() };
660                assert_eq!(name.as_ref(), "bar");
661            } else {
662                panic!();
663            }
664        }
665
666        #[test]
667        fn test_to_typed_ptr() {
668            let logger = Arc::new(Mutex::new(Vec::new()));
669
670            let mut manager = DataConnManager::new();
671
672            let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
673            let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
674            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
675            let ssnnptr = SendSyncNonNull::new(nnptr);
676            manager.add(ssnnptr);
677
678            let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
679            let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
680            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
681            let ssnnptr = SendSyncNonNull::new(nnptr);
682            manager.add(ssnnptr);
683
684            let ssnnptr = manager.find_by_name("foo").unwrap();
685            if let Ok(typed_ssnnptr) = DataConnManager::to_typed_ptr::<SyncDataConn>(&ssnnptr) {
686                assert_eq!(any::type_name_of_val(&typed_ssnnptr), "*mut sabi::DataConnContainer<sabi::data_conn::tests_of_data_conn::SyncDataConn>");
687                assert_eq!(unsafe { (*typed_ssnnptr).name.clone() }, "foo".into());
688            } else {
689                panic!();
690            }
691
692            let ssnnptr = manager.find_by_name("bar").unwrap();
693            if let Ok(typed_ssnnptr) = DataConnManager::to_typed_ptr::<AsyncDataConn>(&ssnnptr) {
694                assert_eq!(any::type_name_of_val(&typed_ssnnptr), "*mut sabi::DataConnContainer<sabi::data_conn::tests_of_data_conn::AsyncDataConn>");
695                assert_eq!(unsafe { (*typed_ssnnptr).name.clone() }, "bar".into());
696            } else {
697                panic!();
698            }
699        }
700
701        #[test]
702        fn test_to_typed_ptr_but_fail() {
703            let logger = Arc::new(Mutex::new(Vec::new()));
704
705            let mut manager = DataConnManager::new();
706
707            let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
708            let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
709            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
710            let ssnnptr = SendSyncNonNull::new(nnptr);
711            manager.add(ssnnptr);
712
713            let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
714            let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
715            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
716            let ssnnptr = SendSyncNonNull::new(nnptr);
717            manager.add(ssnnptr);
718
719            let ssnnptr = manager.find_by_name("foo").unwrap();
720            if let Err(err) = DataConnManager::to_typed_ptr::<AsyncDataConn>(&ssnnptr) {
721                match err.reason::<DataConnError>() {
722                    Ok(DataConnError::FailToCastDataConn { name, target_type }) => {
723                        assert_eq!(name.as_ref(), "foo");
724                        assert_eq!(
725                            *target_type,
726                            "sabi::data_conn::tests_of_data_conn::AsyncDataConn"
727                        );
728                    }
729                    _ => panic!(),
730                }
731            } else {
732                panic!();
733            }
734
735            let ssnnptr = manager.find_by_name("bar").unwrap();
736            if let Err(err) = DataConnManager::to_typed_ptr::<SyncDataConn>(&ssnnptr) {
737                match err.reason::<DataConnError>() {
738                    Ok(DataConnError::FailToCastDataConn { name, target_type }) => {
739                        assert_eq!(name.as_ref(), "bar");
740                        assert_eq!(
741                            *target_type,
742                            "sabi::data_conn::tests_of_data_conn::SyncDataConn"
743                        );
744                    }
745                    _ => panic!(),
746                }
747            } else {
748                panic!();
749            }
750        }
751
752        #[test]
753        fn test_commit_ok() {
754            let logger = Arc::new(Mutex::new(Vec::new()));
755
756            {
757                let mut manager = DataConnManager::new();
758
759                let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
760                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
761                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
762                let ssnnptr = SendSyncNonNull::new(nnptr);
763                manager.add(ssnnptr);
764
765                let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
766                let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
767                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
768                let ssnnptr = SendSyncNonNull::new(nnptr);
769                manager.add(ssnnptr);
770
771                assert!(manager.commit().is_ok());
772            }
773
774            assert_eq!(
775                *logger.lock().unwrap(),
776                &[
777                    "SyncDataConn::new 1",
778                    "AsyncDataConn::new 2",
779                    "SyncDataConn::pre_commit 1",
780                    "AsyncDataConn::pre_commit 2",
781                    "SyncDataConn::commit 1",
782                    "AsyncDataConn::commit 2",
783                    "SyncDataConn::post_commit 1",
784                    "AsyncDataConn::post_commit 2",
785                    "SyncDataConn::close 1",
786                    "SyncDataConn::drop 1",
787                    "AsyncDataConn::close 2",
788                    "AsyncDataConn::drop 2",
789                ]
790            );
791        }
792
793        #[test]
794        fn test_commit_with_order() {
795            let logger = Arc::new(Mutex::new(Vec::new()));
796
797            {
798                let mut manager = DataConnManager::with_commit_order(&["bar", "baz", "foo"]);
799
800                let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
801                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
802                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
803                let ssnnptr = SendSyncNonNull::new(nnptr);
804                manager.add(ssnnptr);
805
806                let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
807                let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
808                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
809                let ssnnptr = SendSyncNonNull::new(nnptr);
810                manager.add(ssnnptr);
811
812                let conn = SyncDataConn::new(3, logger.clone(), Fail::Not);
813                let boxed = Box::new(DataConnContainer::new("qux", Box::new(conn)));
814                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
815                let ssnnptr = SendSyncNonNull::new(nnptr);
816                manager.add(ssnnptr);
817
818                assert!(manager.commit().is_ok());
819            }
820
821            assert_eq!(
822                *logger.lock().unwrap(),
823                &[
824                    "SyncDataConn::new 1",
825                    "AsyncDataConn::new 2",
826                    "SyncDataConn::new 3",
827                    "SyncDataConn::pre_commit 1",
828                    "SyncDataConn::pre_commit 3",
829                    "AsyncDataConn::pre_commit 2", // because of async
830                    "SyncDataConn::commit 1",
831                    "SyncDataConn::commit 3",
832                    "AsyncDataConn::commit 2", // because of async
833                    "SyncDataConn::post_commit 1",
834                    "SyncDataConn::post_commit 3",
835                    "AsyncDataConn::post_commit 2", // because of async
836                    "AsyncDataConn::close 2",
837                    "AsyncDataConn::drop 2",
838                    "SyncDataConn::close 1",
839                    "SyncDataConn::drop 1",
840                    "SyncDataConn::close 3",
841                    "SyncDataConn::drop 3",
842                ]
843            );
844        }
845
846        #[test]
847        fn test_commit_but_fail_first_sync_pre_commit() {
848            let logger = Arc::new(Mutex::new(Vec::new()));
849
850            {
851                let mut manager = DataConnManager::new();
852
853                let conn = SyncDataConn::new(1, logger.clone(), Fail::PreCommit);
854                let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
855                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
856                let ssnnptr = SendSyncNonNull::new(nnptr);
857                manager.add(ssnnptr);
858
859                let conn = AsyncDataConn::new(2, logger.clone(), Fail::PreCommit);
860                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
861                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
862                let ssnnptr = SendSyncNonNull::new(nnptr);
863                manager.add(ssnnptr);
864
865                if let Err(e) = manager.commit() {
866                    match e.reason::<DataConnError>() {
867                        Ok(DataConnError::FailToPreCommitDataConn { errors }) => {
868                            assert_eq!(errors.len(), 1);
869                            assert_eq!(errors[0].0, "foo".into());
870                            assert_eq!(errors[0].1.reason::<String>().unwrap(), "zzz");
871                        }
872                        _ => panic!(),
873                    }
874                } else {
875                    panic!();
876                }
877            }
878
879            assert_eq!(
880                *logger.lock().unwrap(),
881                &[
882                    "SyncDataConn::new 1",
883                    "AsyncDataConn::new 2",
884                    "SyncDataConn::pre_commit 1 failed",
885                    "SyncDataConn::close 1",
886                    "SyncDataConn::drop 1",
887                    "AsyncDataConn::close 2",
888                    "AsyncDataConn::drop 2",
889                ]
890            );
891        }
892
893        #[test]
894        fn test_commit_but_fail_first_async_pre_commit() {
895            let logger = Arc::new(Mutex::new(Vec::new()));
896
897            {
898                let mut manager = DataConnManager::new();
899
900                let conn = SyncDataConn::new(1, logger.clone(), Fail::PreCommit);
901                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
902                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
903                let ssnnptr = SendSyncNonNull::new(nnptr);
904                manager.add(ssnnptr);
905
906                let conn = AsyncDataConn::new(2, logger.clone(), Fail::PreCommit);
907                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
908                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
909                let ssnnptr = SendSyncNonNull::new(nnptr);
910                manager.add(ssnnptr);
911
912                if let Err(e) = manager.commit() {
913                    match e.reason::<DataConnError>() {
914                        Ok(DataConnError::FailToPreCommitDataConn { errors }) => {
915                            assert_eq!(errors.len(), 1);
916                            assert_eq!(errors[0].0, "foo".into());
917                            assert_eq!(errors[0].1.reason::<String>().unwrap(), "zzz");
918                        }
919                        _ => panic!(),
920                    }
921                } else {
922                    panic!();
923                }
924            }
925
926            assert_eq!(
927                *logger.lock().unwrap(),
928                &[
929                    "SyncDataConn::new 1",
930                    "AsyncDataConn::new 2",
931                    "SyncDataConn::pre_commit 1 failed",
932                    "SyncDataConn::close 1",
933                    "SyncDataConn::drop 1",
934                    "AsyncDataConn::close 2",
935                    "AsyncDataConn::drop 2",
936                ]
937            );
938        }
939
940        #[test]
941        fn test_commit_but_fail_second_pre_commit() {
942            let logger = Arc::new(Mutex::new(Vec::new()));
943
944            {
945                let mut manager = DataConnManager::new();
946
947                let conn = AsyncDataConn::new(1, logger.clone(), Fail::PreCommit);
948                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
949                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
950                let ssnnptr = SendSyncNonNull::new(nnptr);
951                manager.add(ssnnptr);
952
953                let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
954                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
955                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
956                let ssnnptr = SendSyncNonNull::new(nnptr);
957                manager.add(ssnnptr);
958
959                if let Err(e) = manager.commit() {
960                    match e.reason::<DataConnError>() {
961                        Ok(DataConnError::FailToPreCommitDataConn { errors }) => {
962                            assert_eq!(errors.len(), 1);
963                            assert_eq!(errors[0].0, "foo".into());
964                            assert_eq!(errors[0].1.reason::<String>().unwrap(), "yyy");
965                        }
966                        _ => panic!(),
967                    }
968                } else {
969                    panic!();
970                }
971            }
972
973            assert_eq!(
974                *logger.lock().unwrap(),
975                &[
976                    "AsyncDataConn::new 1",
977                    "SyncDataConn::new 2",
978                    "SyncDataConn::pre_commit 2",
979                    "AsyncDataConn::pre_commit 1 failed",
980                    "AsyncDataConn::close 1",
981                    "AsyncDataConn::drop 1",
982                    "SyncDataConn::close 2",
983                    "SyncDataConn::drop 2",
984                ]
985            );
986        }
987
988        #[test]
989        fn test_commit_but_fail_first_sync_commit() {
990            let logger = Arc::new(Mutex::new(Vec::new()));
991
992            {
993                let mut manager = DataConnManager::new();
994
995                let conn = SyncDataConn::new(1, logger.clone(), Fail::Commit);
996                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
997                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
998                let ssnnptr = SendSyncNonNull::new(nnptr);
999                manager.add(ssnnptr);
1000
1001                let conn = AsyncDataConn::new(2, logger.clone(), Fail::Commit);
1002                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1003                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1004                let ssnnptr = SendSyncNonNull::new(nnptr);
1005                manager.add(ssnnptr);
1006
1007                if let Err(e) = manager.commit() {
1008                    match e.reason::<DataConnError>() {
1009                        Ok(DataConnError::FailToCommitDataConn { errors }) => {
1010                            assert_eq!(errors.len(), 1);
1011                            assert_eq!(errors[0].0, "foo".into());
1012                            assert_eq!(errors[0].1.reason::<String>().unwrap(), "ZZZ");
1013                        }
1014                        _ => panic!(),
1015                    }
1016                } else {
1017                    panic!();
1018                }
1019            }
1020
1021            assert_eq!(
1022                *logger.lock().unwrap(),
1023                &[
1024                    "SyncDataConn::new 1",
1025                    "AsyncDataConn::new 2",
1026                    "SyncDataConn::pre_commit 1",
1027                    "AsyncDataConn::pre_commit 2",
1028                    "SyncDataConn::commit 1 failed",
1029                    "SyncDataConn::close 1",
1030                    "SyncDataConn::drop 1",
1031                    "AsyncDataConn::close 2",
1032                    "AsyncDataConn::drop 2",
1033                ]
1034            );
1035        }
1036
1037        #[test]
1038        fn test_commit_but_fail_first_async_commit() {
1039            let logger = Arc::new(Mutex::new(Vec::new()));
1040
1041            {
1042                let mut manager = DataConnManager::new();
1043
1044                let conn = AsyncDataConn::new(1, logger.clone(), Fail::Commit);
1045                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1046                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1047                let ssnnptr = SendSyncNonNull::new(nnptr);
1048                manager.add(ssnnptr);
1049
1050                let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
1051                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1052                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1053                let ssnnptr = SendSyncNonNull::new(nnptr);
1054                manager.add(ssnnptr);
1055
1056                if let Err(e) = manager.commit() {
1057                    match e.reason::<DataConnError>() {
1058                        Ok(DataConnError::FailToCommitDataConn { errors }) => {
1059                            assert_eq!(errors.len(), 1);
1060                            assert_eq!(errors[0].0, "foo".into());
1061                            assert_eq!(errors[0].1.reason::<String>().unwrap(), "YYY");
1062                        }
1063                        _ => panic!(),
1064                    }
1065                } else {
1066                    panic!();
1067                }
1068            }
1069
1070            assert_eq!(
1071                *logger.lock().unwrap(),
1072                &[
1073                    "AsyncDataConn::new 1",
1074                    "SyncDataConn::new 2",
1075                    "SyncDataConn::pre_commit 2",
1076                    "AsyncDataConn::pre_commit 1",
1077                    "SyncDataConn::commit 2",
1078                    "AsyncDataConn::commit 1 failed",
1079                    "AsyncDataConn::close 1",
1080                    "AsyncDataConn::drop 1",
1081                    "SyncDataConn::close 2",
1082                    "SyncDataConn::drop 2",
1083                ]
1084            );
1085        }
1086
1087        #[test]
1088        fn test_commit_but_fail_second_commit() {
1089            let logger = Arc::new(Mutex::new(Vec::new()));
1090
1091            {
1092                let mut manager = DataConnManager::new();
1093
1094                let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
1095                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1096                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1097                let ssnnptr = SendSyncNonNull::new(nnptr);
1098                manager.add(ssnnptr);
1099
1100                let conn = AsyncDataConn::new(2, logger.clone(), Fail::Commit);
1101                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1102                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1103                let ssnnptr = SendSyncNonNull::new(nnptr);
1104                manager.add(ssnnptr);
1105
1106                if let Err(e) = manager.commit() {
1107                    match e.reason::<DataConnError>() {
1108                        Ok(DataConnError::FailToCommitDataConn { errors }) => {
1109                            assert_eq!(errors.len(), 1);
1110                            assert_eq!(errors[0].0, "bar".into());
1111                            assert_eq!(errors[0].1.reason::<String>().unwrap(), "YYY");
1112                        }
1113                        _ => panic!(),
1114                    }
1115                } else {
1116                    panic!();
1117                }
1118            }
1119
1120            assert_eq!(
1121                *logger.lock().unwrap(),
1122                &[
1123                    "SyncDataConn::new 1",
1124                    "AsyncDataConn::new 2",
1125                    "SyncDataConn::pre_commit 1",
1126                    "AsyncDataConn::pre_commit 2",
1127                    "SyncDataConn::commit 1",
1128                    "AsyncDataConn::commit 2 failed",
1129                    "SyncDataConn::close 1",
1130                    "SyncDataConn::drop 1",
1131                    "AsyncDataConn::close 2",
1132                    "AsyncDataConn::drop 2",
1133                ]
1134            );
1135        }
1136
1137        #[test]
1138        fn test_rollback_and_first_is_sync() {
1139            let logger = Arc::new(Mutex::new(Vec::new()));
1140
1141            {
1142                let mut manager = DataConnManager::new();
1143
1144                let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
1145                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1146                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1147                let ssnnptr = SendSyncNonNull::new(nnptr);
1148                manager.add(ssnnptr);
1149
1150                let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
1151                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1152                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1153                let ssnnptr = SendSyncNonNull::new(nnptr);
1154                manager.add(ssnnptr);
1155
1156                manager.rollback();
1157            }
1158
1159            assert_eq!(
1160                *logger.lock().unwrap(),
1161                &[
1162                    "SyncDataConn::new 1",
1163                    "AsyncDataConn::new 2",
1164                    "SyncDataConn::rollback 1",
1165                    "AsyncDataConn::rollback 2",
1166                    "SyncDataConn::close 1",
1167                    "SyncDataConn::drop 1",
1168                    "AsyncDataConn::close 2",
1169                    "AsyncDataConn::drop 2",
1170                ]
1171            );
1172        }
1173
1174        #[test]
1175        fn test_rollback_and_first_is_async() {
1176            let logger = Arc::new(Mutex::new(Vec::new()));
1177
1178            {
1179                let mut manager = DataConnManager::new();
1180
1181                let conn = AsyncDataConn::new(1, logger.clone(), Fail::Not);
1182                let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
1183                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1184                let ssnnptr = SendSyncNonNull::new(nnptr);
1185                manager.add(ssnnptr);
1186
1187                let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
1188                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1189                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1190                let ssnnptr = SendSyncNonNull::new(nnptr);
1191                manager.add(ssnnptr);
1192
1193                manager.rollback();
1194            }
1195
1196            assert_eq!(
1197                *logger.lock().unwrap(),
1198                &[
1199                    "AsyncDataConn::new 1",
1200                    "SyncDataConn::new 2",
1201                    "SyncDataConn::rollback 2",
1202                    "AsyncDataConn::rollback 1",
1203                    "AsyncDataConn::close 1",
1204                    "AsyncDataConn::drop 1",
1205                    "SyncDataConn::close 2",
1206                    "SyncDataConn::drop 2",
1207                ]
1208            );
1209        }
1210
1211        #[test]
1212        fn test_force_back_and_first_is_sync() {
1213            let logger = Arc::new(Mutex::new(Vec::new()));
1214
1215            {
1216                let mut manager = DataConnManager::new();
1217
1218                let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
1219                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1220                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1221                let ssnnptr = SendSyncNonNull::new(nnptr);
1222                manager.add(ssnnptr);
1223
1224                let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
1225                let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
1226                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1227                let ssnnptr = SendSyncNonNull::new(nnptr);
1228                manager.add(ssnnptr);
1229
1230                assert!(manager.commit().is_ok());
1231                manager.rollback();
1232            }
1233
1234            assert_eq!(
1235                *logger.lock().unwrap(),
1236                &[
1237                    "SyncDataConn::new 1",
1238                    "AsyncDataConn::new 2",
1239                    "SyncDataConn::pre_commit 1",
1240                    "AsyncDataConn::pre_commit 2",
1241                    "SyncDataConn::commit 1",
1242                    "AsyncDataConn::commit 2",
1243                    "SyncDataConn::post_commit 1",
1244                    "AsyncDataConn::post_commit 2",
1245                    "SyncDataConn::force_back 1",
1246                    "AsyncDataConn::force_back 2",
1247                    "SyncDataConn::close 1",
1248                    "SyncDataConn::drop 1",
1249                    "AsyncDataConn::close 2",
1250                    "AsyncDataConn::drop 2",
1251                ]
1252            );
1253        }
1254
1255        #[test]
1256        fn test_force_back_and_first_is_async() {
1257            let logger = Arc::new(Mutex::new(Vec::new()));
1258
1259            {
1260                let mut manager = DataConnManager::new();
1261
1262                let conn = AsyncDataConn::new(1, logger.clone(), Fail::Not);
1263                let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
1264                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1265                let ssnnptr = SendSyncNonNull::new(nnptr);
1266                manager.add(ssnnptr);
1267
1268                let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
1269                let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
1270                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1271                let ssnnptr = SendSyncNonNull::new(nnptr);
1272                manager.add(ssnnptr);
1273
1274                assert!(manager.commit().is_ok());
1275                manager.rollback();
1276            }
1277
1278            assert_eq!(
1279                *logger.lock().unwrap(),
1280                &[
1281                    "AsyncDataConn::new 1",
1282                    "SyncDataConn::new 2",
1283                    "SyncDataConn::pre_commit 2",
1284                    "AsyncDataConn::pre_commit 1",
1285                    "SyncDataConn::commit 2",
1286                    "AsyncDataConn::commit 1",
1287                    "SyncDataConn::post_commit 2",
1288                    "AsyncDataConn::post_commit 1",
1289                    "SyncDataConn::force_back 2",
1290                    "AsyncDataConn::force_back 1",
1291                    "AsyncDataConn::close 1",
1292                    "AsyncDataConn::drop 1",
1293                    "SyncDataConn::close 2",
1294                    "SyncDataConn::drop 2",
1295                ]
1296            );
1297        }
1298    }
1299}