Skip to main content

sabi/
data_conn.rs

1// Copyright (C) 2024-2026 Takayuki Sato. All Rights Reserved.
2// This program is free software under MIT License.
3// See the file LICENSE in this distribution for more details.
4
5use crate::{AsyncGroup, DataConn, DataConnContainer, DataConnManager};
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::{any, mem, ptr};
10
11/// An enum type representing the reasons for errors that can occur within `DataConn` operations.
12#[allow(clippy::enum_variant_names)]
13#[derive(Debug)]
14pub enum DataConnError {
15    /// Indicates a failure during the pre-commit process of one or more [`DataConn`] instances
16    /// involved in a transaction.
17    /// Contains a vector of data connection names and their corresponding errors.
18    FailToPreCommitDataConn {
19        /// The vector contains errors that occurred in each [`DataConn`] object.
20        errors: Vec<(Arc<str>, errs::Err)>,
21    },
22
23    /// Indicates a failure during the commit process of one or more [`DataConn`] instances
24    /// involved in a transaction.
25    /// Contains a vector of data connection names and their corresponding errors.
26    FailToCommitDataConn {
27        /// The vector contains errors that occurred in each [`DataConn`] object.
28        errors: Vec<(Arc<str>, errs::Err)>,
29    },
30
31    /// Indicates a failure to cast a retrieved [`DataConn`] to the expected type.
32    FailToCastDataConn {
33        /// The name of the data connection that failed to cast.
34        name: Arc<str>,
35
36        /// The type name to which the [`DataConn`] attempted to cast.
37        target_type: &'static str,
38    },
39}
40
41impl<C> DataConnContainer<C>
42where
43    C: DataConn + 'static,
44{
45    pub(crate) fn new(name: impl Into<Arc<str>>, data_conn: Box<C>) -> Self {
46        Self {
47            drop_fn: drop_data_conn::<C>,
48            is_fn: is_data_conn::<C>,
49            commit_fn: commit_data_conn::<C>,
50            pre_commit_fn: pre_commit_data_conn::<C>,
51            post_commit_fn: post_commit_data_conn::<C>,
52            should_force_back_fn: should_force_back_data_conn::<C>,
53            rollback_fn: rollback_data_conn::<C>,
54            force_back_fn: force_back_data_conn::<C>,
55            close_fn: close_data_conn::<C>,
56
57            name: name.into(),
58            data_conn,
59        }
60    }
61}
62
63fn drop_data_conn<C>(ptr: *const DataConnContainer)
64where
65    C: DataConn + 'static,
66{
67    let typed_ptr = ptr as *mut DataConnContainer<C>;
68    unsafe {
69        drop(Box::from_raw(typed_ptr));
70    }
71}
72
73fn is_data_conn<C>(type_id: any::TypeId) -> bool
74where
75    C: DataConn + 'static,
76{
77    any::TypeId::of::<C>() == type_id
78}
79
80fn commit_data_conn<C>(ptr: *const DataConnContainer, ag: &mut AsyncGroup) -> errs::Result<()>
81where
82    C: DataConn + 'static,
83{
84    let typed_ptr = ptr as *mut DataConnContainer<C>;
85    unsafe { (*typed_ptr).data_conn.commit(ag) }
86}
87
88fn pre_commit_data_conn<C>(ptr: *const DataConnContainer, ag: &mut AsyncGroup) -> errs::Result<()>
89where
90    C: DataConn + 'static,
91{
92    let typed_ptr = ptr as *mut DataConnContainer<C>;
93    unsafe { (*typed_ptr).data_conn.pre_commit(ag) }
94}
95
96fn post_commit_data_conn<C>(ptr: *const DataConnContainer, ag: &mut AsyncGroup)
97where
98    C: DataConn + 'static,
99{
100    let typed_ptr = ptr as *mut DataConnContainer<C>;
101    unsafe {
102        (*typed_ptr).data_conn.post_commit(ag);
103    }
104}
105
106fn should_force_back_data_conn<C>(ptr: *const DataConnContainer) -> bool
107where
108    C: DataConn + 'static,
109{
110    let typed_ptr = ptr as *mut DataConnContainer<C>;
111    unsafe { (*typed_ptr).data_conn.should_force_back() }
112}
113
114fn rollback_data_conn<C>(ptr: *const DataConnContainer, ag: &mut AsyncGroup)
115where
116    C: DataConn + 'static,
117{
118    let typed_ptr = ptr as *mut DataConnContainer<C>;
119    unsafe {
120        (*typed_ptr).data_conn.rollback(ag);
121    }
122}
123
124fn force_back_data_conn<C>(ptr: *const DataConnContainer, ag: &mut AsyncGroup)
125where
126    C: DataConn + 'static,
127{
128    let typed_ptr = ptr as *mut DataConnContainer<C>;
129    unsafe {
130        (*typed_ptr).data_conn.force_back(ag);
131    }
132}
133
134fn close_data_conn<C>(ptr: *const DataConnContainer)
135where
136    C: DataConn + 'static,
137{
138    let typed_ptr = ptr as *mut DataConnContainer<C>;
139    unsafe {
140        (*typed_ptr).data_conn.close();
141    }
142}
143
144impl DataConnManager {
145    pub(crate) fn new() -> Self {
146        Self {
147            vec: Vec::new(),
148            index_map: HashMap::new(),
149        }
150    }
151
152    pub(crate) fn with_commit_order(names: &[&str]) -> Self {
153        let mut index_map = HashMap::with_capacity(names.len());
154        // To overwrite later indexed elements with eariler ones when names overlap
155        for (i, nm) in names.iter().rev().enumerate() {
156            index_map.insert((*nm).into(), names.len() - 1 - i);
157        }
158
159        Self {
160            vec: vec![None; names.len()],
161            index_map,
162        }
163    }
164
165    pub(crate) fn add(&mut self, nnptr: ptr::NonNull<DataConnContainer>) {
166        let name = unsafe { (*nnptr.as_ptr()).name.clone() };
167        if let Some(index) = self.index_map.get(&name) {
168            self.vec[*index] = Some(nnptr);
169        } else {
170            let index = self.vec.len();
171            self.vec.push(Some(nnptr));
172            self.index_map.insert(name.clone(), index);
173        }
174    }
175
176    pub(crate) fn find_by_name(
177        &self,
178        name: impl AsRef<str>,
179    ) -> Option<ptr::NonNull<DataConnContainer>> {
180        if let Some(index) = self.index_map.get(name.as_ref()) {
181            if *index < self.vec.len() {
182                if let Some(nnptr) = self.vec[*index] {
183                    let ptr = nnptr.as_ptr();
184                    let cont_name = unsafe { &(*ptr).name };
185                    if cont_name.as_ref() == name.as_ref() {
186                        return Some(nnptr);
187                    }
188                }
189            }
190        }
191
192        None
193    }
194
195    pub(crate) fn to_typed_ptr<C>(
196        nnptr: &ptr::NonNull<DataConnContainer>,
197    ) -> errs::Result<*mut DataConnContainer<C>>
198    where
199        C: DataConn + 'static,
200    {
201        let ptr = nnptr.as_ptr();
202        let name = unsafe { &(*ptr).name };
203        let type_id = any::TypeId::of::<C>();
204        let is_fn = unsafe { (*ptr).is_fn };
205
206        if !is_fn(type_id) {
207            return Err(errs::Err::new(DataConnError::FailToCastDataConn {
208                name: name.clone(),
209                target_type: any::type_name::<C>(),
210            }));
211        }
212
213        let typed_ptr = ptr as *mut DataConnContainer<C>;
214        Ok(typed_ptr)
215    }
216
217    pub(crate) fn commit(&self) -> errs::Result<()> {
218        let mut errors = Vec::new();
219
220        let mut ag = AsyncGroup::new();
221        for nnptr in self.vec.iter().flatten() {
222            let ptr = nnptr.as_ptr();
223            let pre_commit_fn = unsafe { (*ptr).pre_commit_fn };
224            ag._name = unsafe { (*ptr).name.clone() };
225            if let Err(err) = pre_commit_fn(ptr, &mut ag) {
226                errors.push((ag._name.clone(), err));
227                break;
228            }
229        }
230        ag.join_and_collect_errors(&mut errors);
231
232        if !errors.is_empty() {
233            return Err(errs::Err::new(DataConnError::FailToPreCommitDataConn {
234                errors,
235            }));
236        }
237
238        let mut ag = AsyncGroup::new();
239        for nnptr in self.vec.iter().flatten() {
240            let ptr = nnptr.as_ptr();
241            let commit_fn = unsafe { (*ptr).commit_fn };
242            ag._name = unsafe { (*ptr).name.clone() };
243            if let Err(err) = commit_fn(ptr, &mut ag) {
244                errors.push((ag._name.clone(), err));
245                break;
246            }
247        }
248        ag.join_and_collect_errors(&mut errors);
249
250        if !errors.is_empty() {
251            return Err(errs::Err::new(DataConnError::FailToCommitDataConn {
252                errors,
253            }));
254        }
255
256        let mut ag = AsyncGroup::new();
257        for nnptr in self.vec.iter().flatten() {
258            let ptr = nnptr.as_ptr();
259            let post_commit_fn = unsafe { (*ptr).post_commit_fn };
260            ag._name = unsafe { (*ptr).name.clone() };
261            post_commit_fn(ptr, &mut ag);
262        }
263        ag.join_and_ignore_errors();
264
265        Ok(())
266    }
267
268    pub(crate) fn rollback(&mut self) {
269        let mut ag = AsyncGroup::new();
270        for nnptr in self.vec.iter().flatten() {
271            let ptr = nnptr.as_ptr();
272            let should_force_back_fn = unsafe { (*ptr).should_force_back_fn };
273            let force_back_fn = unsafe { (*ptr).force_back_fn };
274            let rollback_fn = unsafe { (*ptr).rollback_fn };
275            ag._name = unsafe { (*ptr).name.clone() };
276
277            if should_force_back_fn(ptr) {
278                force_back_fn(ptr, &mut ag);
279            } else {
280                rollback_fn(ptr, &mut ag);
281            }
282        }
283        ag.join_and_ignore_errors();
284    }
285
286    pub(crate) fn close(&mut self) {
287        self.index_map.clear();
288
289        let vec: Vec<Option<ptr::NonNull<DataConnContainer>>> = mem::take(&mut self.vec);
290
291        for nnptr in vec.iter().flatten() {
292            let ptr = nnptr.as_ptr();
293            let close_fn = unsafe { (*ptr).close_fn };
294            let drop_fn = unsafe { (*ptr).drop_fn };
295            close_fn(ptr);
296            drop_fn(ptr);
297        }
298    }
299}
300
301impl Drop for DataConnManager {
302    fn drop(&mut self) {
303        self.close();
304    }
305}
306
307#[cfg(test)]
308mod tests_of_data_conn {
309    use super::*;
310    use std::sync::{
311        atomic::{AtomicBool, Ordering},
312        Arc, Mutex,
313    };
314    use std::{thread, time};
315
316    #[derive(PartialEq, Copy, Clone)]
317    enum Fail {
318        Not,
319        Commit,
320        PreCommit,
321    }
322
323    struct SyncDataConn {
324        id: i8,
325        committed: bool,
326        fail: Fail,
327        logger: Arc<Mutex<Vec<String>>>,
328    }
329    impl SyncDataConn {
330        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: Fail) -> Self {
331            logger
332                .lock()
333                .unwrap()
334                .push(format!("SyncDataConn::new {}", id));
335            Self {
336                id,
337                committed: false,
338                fail,
339                logger,
340            }
341        }
342    }
343    impl Drop for SyncDataConn {
344        fn drop(&mut self) {
345            self.logger
346                .lock()
347                .unwrap()
348                .push(format!("SyncDataConn::drop {}", self.id));
349        }
350    }
351    impl DataConn for SyncDataConn {
352        fn commit(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
353            if self.fail == Fail::Commit {
354                self.logger
355                    .lock()
356                    .unwrap()
357                    .push(format!("SyncDataConn::commit {} failed", self.id));
358                return Err(errs::Err::new("ZZZ".to_string()));
359            }
360            self.committed = true;
361            self.logger
362                .lock()
363                .unwrap()
364                .push(format!("SyncDataConn::commit {}", self.id));
365            Ok(())
366        }
367        fn pre_commit(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
368            if self.fail == Fail::PreCommit {
369                self.logger
370                    .lock()
371                    .unwrap()
372                    .push(format!("SyncDataConn::pre_commit {} failed", self.id));
373                return Err(errs::Err::new("zzz".to_string()));
374            }
375            self.logger
376                .lock()
377                .unwrap()
378                .push(format!("SyncDataConn::pre_commit {}", self.id));
379            Ok(())
380        }
381        fn post_commit(&mut self, _ag: &mut AsyncGroup) {
382            self.logger
383                .lock()
384                .unwrap()
385                .push(format!("SyncDataConn::post_commit {}", self.id));
386        }
387        fn should_force_back(&self) -> bool {
388            self.committed
389        }
390        fn rollback(&mut self, _ag: &mut AsyncGroup) {
391            self.logger
392                .lock()
393                .unwrap()
394                .push(format!("SyncDataConn::rollback {}", self.id));
395        }
396        fn force_back(&mut self, _ag: &mut AsyncGroup) {
397            self.logger
398                .lock()
399                .unwrap()
400                .push(format!("SyncDataConn::force_back {}", self.id));
401        }
402        fn close(&mut self) {
403            self.logger
404                .lock()
405                .unwrap()
406                .push(format!("SyncDataConn::close {}", self.id));
407        }
408    }
409
410    struct AsyncDataConn {
411        id: i8,
412        committed: Arc<AtomicBool>,
413        fail: Fail,
414        logger: Arc<Mutex<Vec<String>>>,
415    }
416    impl AsyncDataConn {
417        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: Fail) -> Self {
418            logger
419                .lock()
420                .unwrap()
421                .push(format!("AsyncDataConn::new {}", id));
422            Self {
423                id,
424                committed: Arc::new(AtomicBool::new(false)),
425                fail,
426                logger,
427            }
428        }
429    }
430    impl Drop for AsyncDataConn {
431        fn drop(&mut self) {
432            self.logger
433                .lock()
434                .unwrap()
435                .push(format!("AsyncDataConn::drop {}", self.id));
436        }
437    }
438    impl DataConn for AsyncDataConn {
439        fn commit(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
440            let fail = self.fail;
441            let logger = self.logger.clone();
442            let id = self.id;
443            let committed = self.committed.clone();
444            ag.add(move || {
445                thread::sleep(time::Duration::from_millis(100));
446                if fail == Fail::Commit {
447                    logger
448                        .lock()
449                        .unwrap()
450                        .push(format!("AsyncDataConn::commit {} failed", id));
451                    return Err(errs::Err::new("YYY".to_string()));
452                }
453                committed.store(true, Ordering::Release);
454                logger
455                    .lock()
456                    .unwrap()
457                    .push(format!("AsyncDataConn::commit {}", id));
458                Ok(())
459            });
460            Ok(())
461        }
462        fn pre_commit(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
463            let fail = self.fail;
464            let logger = self.logger.clone();
465            let id = self.id;
466            ag.add(move || {
467                thread::sleep(time::Duration::from_millis(100));
468                if fail == Fail::PreCommit {
469                    logger
470                        .lock()
471                        .unwrap()
472                        .push(format!("AsyncDataConn::pre_commit {} failed", id));
473                    return Err(errs::Err::new("yyy".to_string()));
474                }
475                logger
476                    .lock()
477                    .unwrap()
478                    .push(format!("AsyncDataConn::pre_commit {}", id));
479                Ok(())
480            });
481            Ok(())
482        }
483        fn post_commit(&mut self, ag: &mut AsyncGroup) {
484            let logger = self.logger.clone();
485            let id = self.id;
486            ag.add(move || {
487                thread::sleep(time::Duration::from_millis(100));
488                logger
489                    .lock()
490                    .unwrap()
491                    .push(format!("AsyncDataConn::post_commit {}", id));
492                Ok(())
493            });
494        }
495        fn should_force_back(&self) -> bool {
496            self.committed.load(Ordering::Acquire)
497        }
498        fn rollback(&mut self, ag: &mut AsyncGroup) {
499            let logger = self.logger.clone();
500            let id = self.id;
501            ag.add(move || {
502                thread::sleep(time::Duration::from_millis(100));
503                logger
504                    .lock()
505                    .unwrap()
506                    .push(format!("AsyncDataConn::rollback {}", id));
507                Ok(())
508            });
509        }
510        fn force_back(&mut self, ag: &mut AsyncGroup) {
511            let logger = self.logger.clone();
512            let id = self.id;
513            ag.add(move || {
514                thread::sleep(time::Duration::from_millis(100));
515                logger
516                    .lock()
517                    .unwrap()
518                    .push(format!("AsyncDataConn::force_back {}", id));
519                Ok(())
520            });
521        }
522        fn close(&mut self) {
523            self.logger
524                .lock()
525                .unwrap()
526                .push(format!("AsyncDataConn::close {}", self.id));
527        }
528    }
529
530    mod tests_of_data_conn_manager {
531        use super::*;
532
533        #[test]
534        fn test_new() {
535            let manager = DataConnManager::new();
536            assert!(manager.vec.is_empty());
537            assert!(manager.index_map.is_empty());
538        }
539
540        #[test]
541        fn test_with_commit_order() {
542            let manager = DataConnManager::with_commit_order(&["bar", "baz", "foo"]);
543            assert_eq!(manager.vec, vec![None, None, None]);
544            assert_eq!(manager.index_map.len(), 3);
545            assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
546            assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
547            assert_eq!(*manager.index_map.get("baz").unwrap(), 1);
548        }
549
550        #[test]
551        fn test_new_and_add() {
552            let logger = Arc::new(Mutex::new(Vec::new()));
553
554            let mut manager = DataConnManager::new();
555            assert!(manager.vec.is_empty());
556            assert!(manager.index_map.is_empty());
557
558            let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
559            let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
560            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
561            manager.add(nnptr);
562            assert_eq!(manager.vec.len(), 1);
563            assert_eq!(manager.index_map.len(), 1);
564            assert_eq!(*manager.index_map.get("foo").unwrap(), 0);
565
566            let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
567            let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
568            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
569            manager.add(nnptr);
570            assert_eq!(manager.vec.len(), 2);
571            assert_eq!(manager.index_map.len(), 2);
572            assert_eq!(*manager.index_map.get("foo").unwrap(), 0);
573            assert_eq!(*manager.index_map.get("bar").unwrap(), 1);
574        }
575
576        #[test]
577        fn test_with_commit_order_and_add() {
578            let logger = Arc::new(Mutex::new(Vec::new()));
579
580            let mut manager = DataConnManager::with_commit_order(&["bar", "baz", "foo"]);
581            assert_eq!(manager.vec, vec![None, None, None]);
582            assert_eq!(manager.index_map.len(), 3);
583            assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
584            assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
585            assert_eq!(*manager.index_map.get("baz").unwrap(), 1);
586
587            let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
588            let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
589            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
590            manager.add(nnptr);
591            assert_eq!(manager.vec.len(), 3);
592            assert_eq!(manager.index_map.len(), 3);
593            assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
594
595            let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
596            let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
597            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
598            manager.add(nnptr);
599            assert_eq!(manager.vec.len(), 3);
600            assert_eq!(manager.index_map.len(), 3);
601            assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
602            assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
603
604            let conn = SyncDataConn::new(3, logger.clone(), Fail::Not);
605            let boxed = Box::new(DataConnContainer::new("qux", Box::new(conn)));
606            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
607            manager.add(nnptr);
608            assert_eq!(manager.vec.len(), 4);
609            assert_eq!(manager.index_map.len(), 4);
610            assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
611            assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
612            assert_eq!(*manager.index_map.get("qux").unwrap(), 3);
613        }
614
615        #[test]
616        fn test_find_by_name_but_none() {
617            let manager = DataConnManager::new();
618            assert!(manager.find_by_name("foo").is_none());
619            assert!(manager.find_by_name("bar").is_none());
620        }
621
622        #[test]
623        fn test_find_by_name_and_found() {
624            let logger = Arc::new(Mutex::new(Vec::new()));
625
626            let mut manager = DataConnManager::new();
627
628            let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
629            let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
630            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
631            manager.add(nnptr);
632
633            let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
634            let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
635            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
636            manager.add(nnptr);
637
638            if let Some(nnptr) = manager.find_by_name("foo") {
639                let name = unsafe { (*nnptr.as_ptr()).name.clone() };
640                assert_eq!(name.as_ref(), "foo");
641            } else {
642                panic!();
643            }
644
645            if let Some(nnptr) = manager.find_by_name("bar") {
646                let name = unsafe { (*nnptr.as_ptr()).name.clone() };
647                assert_eq!(name.as_ref(), "bar");
648            } else {
649                panic!();
650            }
651        }
652
653        #[test]
654        fn test_to_typed_ptr() {
655            let logger = Arc::new(Mutex::new(Vec::new()));
656
657            let mut manager = DataConnManager::new();
658
659            let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
660            let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
661            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
662            manager.add(nnptr);
663
664            let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
665            let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
666            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
667            manager.add(nnptr);
668
669            let nnptr = manager.find_by_name("foo").unwrap();
670            if let Ok(typed_nnptr) = DataConnManager::to_typed_ptr::<SyncDataConn>(&nnptr) {
671                assert_eq!(any::type_name_of_val(&typed_nnptr), "*mut sabi::DataConnContainer<sabi::data_conn::tests_of_data_conn::SyncDataConn>");
672                assert_eq!(unsafe { (*typed_nnptr).name.clone() }, "foo".into());
673            } else {
674                panic!();
675            }
676
677            let nnptr = manager.find_by_name("bar").unwrap();
678            if let Ok(typed_nnptr) = DataConnManager::to_typed_ptr::<AsyncDataConn>(&nnptr) {
679                assert_eq!(any::type_name_of_val(&typed_nnptr), "*mut sabi::DataConnContainer<sabi::data_conn::tests_of_data_conn::AsyncDataConn>");
680                assert_eq!(unsafe { (*typed_nnptr).name.clone() }, "bar".into());
681            } else {
682                panic!();
683            }
684        }
685
686        #[test]
687        fn test_to_typed_ptr_but_fail() {
688            let logger = Arc::new(Mutex::new(Vec::new()));
689
690            let mut manager = DataConnManager::new();
691
692            let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
693            let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
694            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
695            manager.add(nnptr);
696
697            let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
698            let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
699            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
700            manager.add(nnptr);
701
702            let nnptr = manager.find_by_name("foo").unwrap();
703            if let Err(err) = DataConnManager::to_typed_ptr::<AsyncDataConn>(&nnptr) {
704                match err.reason::<DataConnError>() {
705                    Ok(DataConnError::FailToCastDataConn { name, target_type }) => {
706                        assert_eq!(name.as_ref(), "foo");
707                        assert_eq!(
708                            *target_type,
709                            "sabi::data_conn::tests_of_data_conn::AsyncDataConn"
710                        );
711                    }
712                    _ => panic!(),
713                }
714            } else {
715                panic!();
716            }
717
718            let nnptr = manager.find_by_name("bar").unwrap();
719            if let Err(err) = DataConnManager::to_typed_ptr::<SyncDataConn>(&nnptr) {
720                match err.reason::<DataConnError>() {
721                    Ok(DataConnError::FailToCastDataConn { name, target_type }) => {
722                        assert_eq!(name.as_ref(), "bar");
723                        assert_eq!(
724                            *target_type,
725                            "sabi::data_conn::tests_of_data_conn::SyncDataConn"
726                        );
727                    }
728                    _ => panic!(),
729                }
730            } else {
731                panic!();
732            }
733        }
734
735        #[test]
736        fn test_commit_ok() {
737            let logger = Arc::new(Mutex::new(Vec::new()));
738
739            {
740                let mut manager = DataConnManager::new();
741
742                let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
743                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
744                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
745                manager.add(nnptr);
746
747                let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
748                let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
749                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
750                manager.add(nnptr);
751
752                assert!(manager.commit().is_ok());
753            }
754
755            assert_eq!(
756                *logger.lock().unwrap(),
757                &[
758                    "SyncDataConn::new 1",
759                    "AsyncDataConn::new 2",
760                    "SyncDataConn::pre_commit 1",
761                    "AsyncDataConn::pre_commit 2",
762                    "SyncDataConn::commit 1",
763                    "AsyncDataConn::commit 2",
764                    "SyncDataConn::post_commit 1",
765                    "AsyncDataConn::post_commit 2",
766                    "SyncDataConn::close 1",
767                    "SyncDataConn::drop 1",
768                    "AsyncDataConn::close 2",
769                    "AsyncDataConn::drop 2",
770                ]
771            );
772        }
773
774        #[test]
775        fn test_commit_with_order() {
776            let logger = Arc::new(Mutex::new(Vec::new()));
777
778            {
779                let mut manager = DataConnManager::with_commit_order(&["bar", "baz", "foo"]);
780
781                let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
782                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
783                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
784                manager.add(nnptr);
785
786                let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
787                let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
788                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
789                manager.add(nnptr);
790
791                let conn = SyncDataConn::new(3, logger.clone(), Fail::Not);
792                let boxed = Box::new(DataConnContainer::new("qux", Box::new(conn)));
793                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
794                manager.add(nnptr);
795
796                assert!(manager.commit().is_ok());
797            }
798
799            assert_eq!(
800                *logger.lock().unwrap(),
801                &[
802                    "SyncDataConn::new 1",
803                    "AsyncDataConn::new 2",
804                    "SyncDataConn::new 3",
805                    "SyncDataConn::pre_commit 1",
806                    "SyncDataConn::pre_commit 3",
807                    "AsyncDataConn::pre_commit 2", // because of async
808                    "SyncDataConn::commit 1",
809                    "SyncDataConn::commit 3",
810                    "AsyncDataConn::commit 2", // because of async
811                    "SyncDataConn::post_commit 1",
812                    "SyncDataConn::post_commit 3",
813                    "AsyncDataConn::post_commit 2", // because of async
814                    "AsyncDataConn::close 2",
815                    "AsyncDataConn::drop 2",
816                    "SyncDataConn::close 1",
817                    "SyncDataConn::drop 1",
818                    "SyncDataConn::close 3",
819                    "SyncDataConn::drop 3",
820                ]
821            );
822        }
823
824        #[test]
825        fn test_commit_but_fail_first_sync_pre_commit() {
826            let logger = Arc::new(Mutex::new(Vec::new()));
827
828            {
829                let mut manager = DataConnManager::new();
830
831                let conn = SyncDataConn::new(1, logger.clone(), Fail::PreCommit);
832                let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
833                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
834                manager.add(nnptr);
835
836                let conn = AsyncDataConn::new(2, logger.clone(), Fail::PreCommit);
837                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
838                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
839                manager.add(nnptr);
840
841                if let Err(e) = manager.commit() {
842                    match e.reason::<DataConnError>() {
843                        Ok(DataConnError::FailToPreCommitDataConn { errors }) => {
844                            assert_eq!(errors.len(), 1);
845                            assert_eq!(errors[0].0, "foo".into());
846                            assert_eq!(errors[0].1.reason::<String>().unwrap(), "zzz");
847                        }
848                        _ => panic!(),
849                    }
850                } else {
851                    panic!();
852                }
853            }
854
855            assert_eq!(
856                *logger.lock().unwrap(),
857                &[
858                    "SyncDataConn::new 1",
859                    "AsyncDataConn::new 2",
860                    "SyncDataConn::pre_commit 1 failed",
861                    "SyncDataConn::close 1",
862                    "SyncDataConn::drop 1",
863                    "AsyncDataConn::close 2",
864                    "AsyncDataConn::drop 2",
865                ]
866            );
867        }
868
869        #[test]
870        fn test_commit_but_fail_first_async_pre_commit() {
871            let logger = Arc::new(Mutex::new(Vec::new()));
872
873            {
874                let mut manager = DataConnManager::new();
875
876                let conn = SyncDataConn::new(1, logger.clone(), Fail::PreCommit);
877                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
878                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
879                manager.add(nnptr);
880
881                let conn = AsyncDataConn::new(2, logger.clone(), Fail::PreCommit);
882                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
883                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
884                manager.add(nnptr);
885
886                if let Err(e) = manager.commit() {
887                    match e.reason::<DataConnError>() {
888                        Ok(DataConnError::FailToPreCommitDataConn { errors }) => {
889                            assert_eq!(errors.len(), 1);
890                            assert_eq!(errors[0].0, "foo".into());
891                            assert_eq!(errors[0].1.reason::<String>().unwrap(), "zzz");
892                        }
893                        _ => panic!(),
894                    }
895                } else {
896                    panic!();
897                }
898            }
899
900            assert_eq!(
901                *logger.lock().unwrap(),
902                &[
903                    "SyncDataConn::new 1",
904                    "AsyncDataConn::new 2",
905                    "SyncDataConn::pre_commit 1 failed",
906                    "SyncDataConn::close 1",
907                    "SyncDataConn::drop 1",
908                    "AsyncDataConn::close 2",
909                    "AsyncDataConn::drop 2",
910                ]
911            );
912        }
913
914        #[test]
915        fn test_commit_but_fail_second_pre_commit() {
916            let logger = Arc::new(Mutex::new(Vec::new()));
917
918            {
919                let mut manager = DataConnManager::new();
920
921                let conn = AsyncDataConn::new(1, logger.clone(), Fail::PreCommit);
922                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
923                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
924                manager.add(nnptr);
925
926                let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
927                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
928                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
929                manager.add(nnptr);
930
931                if let Err(e) = manager.commit() {
932                    match e.reason::<DataConnError>() {
933                        Ok(DataConnError::FailToPreCommitDataConn { errors }) => {
934                            assert_eq!(errors.len(), 1);
935                            assert_eq!(errors[0].0, "foo".into());
936                            assert_eq!(errors[0].1.reason::<String>().unwrap(), "yyy");
937                        }
938                        _ => panic!(),
939                    }
940                } else {
941                    panic!();
942                }
943            }
944
945            assert_eq!(
946                *logger.lock().unwrap(),
947                &[
948                    "AsyncDataConn::new 1",
949                    "SyncDataConn::new 2",
950                    "SyncDataConn::pre_commit 2",
951                    "AsyncDataConn::pre_commit 1 failed",
952                    "AsyncDataConn::close 1",
953                    "AsyncDataConn::drop 1",
954                    "SyncDataConn::close 2",
955                    "SyncDataConn::drop 2",
956                ]
957            );
958        }
959
960        #[test]
961        fn test_commit_but_fail_first_sync_commit() {
962            let logger = Arc::new(Mutex::new(Vec::new()));
963
964            {
965                let mut manager = DataConnManager::new();
966
967                let conn = SyncDataConn::new(1, logger.clone(), Fail::Commit);
968                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
969                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
970                manager.add(nnptr);
971
972                let conn = AsyncDataConn::new(2, logger.clone(), Fail::Commit);
973                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
974                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
975                manager.add(nnptr);
976
977                if let Err(e) = manager.commit() {
978                    match e.reason::<DataConnError>() {
979                        Ok(DataConnError::FailToCommitDataConn { errors }) => {
980                            assert_eq!(errors.len(), 1);
981                            assert_eq!(errors[0].0, "foo".into());
982                            assert_eq!(errors[0].1.reason::<String>().unwrap(), "ZZZ");
983                        }
984                        _ => panic!(),
985                    }
986                } else {
987                    panic!();
988                }
989            }
990
991            assert_eq!(
992                *logger.lock().unwrap(),
993                &[
994                    "SyncDataConn::new 1",
995                    "AsyncDataConn::new 2",
996                    "SyncDataConn::pre_commit 1",
997                    "AsyncDataConn::pre_commit 2",
998                    "SyncDataConn::commit 1 failed",
999                    "SyncDataConn::close 1",
1000                    "SyncDataConn::drop 1",
1001                    "AsyncDataConn::close 2",
1002                    "AsyncDataConn::drop 2",
1003                ]
1004            );
1005        }
1006
1007        #[test]
1008        fn test_commit_but_fail_first_async_commit() {
1009            let logger = Arc::new(Mutex::new(Vec::new()));
1010
1011            {
1012                let mut manager = DataConnManager::new();
1013
1014                let conn = AsyncDataConn::new(1, logger.clone(), Fail::Commit);
1015                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1016                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1017                manager.add(nnptr);
1018
1019                let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
1020                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1021                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1022                manager.add(nnptr);
1023
1024                if let Err(e) = manager.commit() {
1025                    match e.reason::<DataConnError>() {
1026                        Ok(DataConnError::FailToCommitDataConn { errors }) => {
1027                            assert_eq!(errors.len(), 1);
1028                            assert_eq!(errors[0].0, "foo".into());
1029                            assert_eq!(errors[0].1.reason::<String>().unwrap(), "YYY");
1030                        }
1031                        _ => panic!(),
1032                    }
1033                } else {
1034                    panic!();
1035                }
1036            }
1037
1038            assert_eq!(
1039                *logger.lock().unwrap(),
1040                &[
1041                    "AsyncDataConn::new 1",
1042                    "SyncDataConn::new 2",
1043                    "SyncDataConn::pre_commit 2",
1044                    "AsyncDataConn::pre_commit 1",
1045                    "SyncDataConn::commit 2",
1046                    "AsyncDataConn::commit 1 failed",
1047                    "AsyncDataConn::close 1",
1048                    "AsyncDataConn::drop 1",
1049                    "SyncDataConn::close 2",
1050                    "SyncDataConn::drop 2",
1051                ]
1052            );
1053        }
1054
1055        #[test]
1056        fn test_commit_but_fail_second_commit() {
1057            let logger = Arc::new(Mutex::new(Vec::new()));
1058
1059            {
1060                let mut manager = DataConnManager::new();
1061
1062                let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
1063                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1064                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1065                manager.add(nnptr);
1066
1067                let conn = AsyncDataConn::new(2, logger.clone(), Fail::Commit);
1068                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1069                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1070                manager.add(nnptr);
1071
1072                if let Err(e) = manager.commit() {
1073                    match e.reason::<DataConnError>() {
1074                        Ok(DataConnError::FailToCommitDataConn { errors }) => {
1075                            assert_eq!(errors.len(), 1);
1076                            assert_eq!(errors[0].0, "bar".into());
1077                            assert_eq!(errors[0].1.reason::<String>().unwrap(), "YYY");
1078                        }
1079                        _ => panic!(),
1080                    }
1081                } else {
1082                    panic!();
1083                }
1084            }
1085
1086            assert_eq!(
1087                *logger.lock().unwrap(),
1088                &[
1089                    "SyncDataConn::new 1",
1090                    "AsyncDataConn::new 2",
1091                    "SyncDataConn::pre_commit 1",
1092                    "AsyncDataConn::pre_commit 2",
1093                    "SyncDataConn::commit 1",
1094                    "AsyncDataConn::commit 2 failed",
1095                    "SyncDataConn::close 1",
1096                    "SyncDataConn::drop 1",
1097                    "AsyncDataConn::close 2",
1098                    "AsyncDataConn::drop 2",
1099                ]
1100            );
1101        }
1102
1103        #[test]
1104        fn test_rollback_and_first_is_sync() {
1105            let logger = Arc::new(Mutex::new(Vec::new()));
1106
1107            {
1108                let mut manager = DataConnManager::new();
1109
1110                let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
1111                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1112                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1113                manager.add(nnptr);
1114
1115                let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
1116                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1117                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1118                manager.add(nnptr);
1119
1120                manager.rollback();
1121            }
1122
1123            assert_eq!(
1124                *logger.lock().unwrap(),
1125                &[
1126                    "SyncDataConn::new 1",
1127                    "AsyncDataConn::new 2",
1128                    "SyncDataConn::rollback 1",
1129                    "AsyncDataConn::rollback 2",
1130                    "SyncDataConn::close 1",
1131                    "SyncDataConn::drop 1",
1132                    "AsyncDataConn::close 2",
1133                    "AsyncDataConn::drop 2",
1134                ]
1135            );
1136        }
1137
1138        #[test]
1139        fn test_rollback_and_first_is_async() {
1140            let logger = Arc::new(Mutex::new(Vec::new()));
1141
1142            {
1143                let mut manager = DataConnManager::new();
1144
1145                let conn = AsyncDataConn::new(1, logger.clone(), Fail::Not);
1146                let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
1147                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1148                manager.add(nnptr);
1149
1150                let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
1151                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1152                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1153                manager.add(nnptr);
1154
1155                manager.rollback();
1156            }
1157
1158            assert_eq!(
1159                *logger.lock().unwrap(),
1160                &[
1161                    "AsyncDataConn::new 1",
1162                    "SyncDataConn::new 2",
1163                    "SyncDataConn::rollback 2",
1164                    "AsyncDataConn::rollback 1",
1165                    "AsyncDataConn::close 1",
1166                    "AsyncDataConn::drop 1",
1167                    "SyncDataConn::close 2",
1168                    "SyncDataConn::drop 2",
1169                ]
1170            );
1171        }
1172
1173        #[test]
1174        fn test_force_back_and_first_is_sync() {
1175            let logger = Arc::new(Mutex::new(Vec::new()));
1176
1177            {
1178                let mut manager = DataConnManager::new();
1179
1180                let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
1181                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1182                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1183                manager.add(nnptr);
1184
1185                let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
1186                let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
1187                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1188                manager.add(nnptr);
1189
1190                assert!(manager.commit().is_ok());
1191                manager.rollback();
1192            }
1193
1194            assert_eq!(
1195                *logger.lock().unwrap(),
1196                &[
1197                    "SyncDataConn::new 1",
1198                    "AsyncDataConn::new 2",
1199                    "SyncDataConn::pre_commit 1",
1200                    "AsyncDataConn::pre_commit 2",
1201                    "SyncDataConn::commit 1",
1202                    "AsyncDataConn::commit 2",
1203                    "SyncDataConn::post_commit 1",
1204                    "AsyncDataConn::post_commit 2",
1205                    "SyncDataConn::force_back 1",
1206                    "AsyncDataConn::force_back 2",
1207                    "SyncDataConn::close 1",
1208                    "SyncDataConn::drop 1",
1209                    "AsyncDataConn::close 2",
1210                    "AsyncDataConn::drop 2",
1211                ]
1212            );
1213        }
1214
1215        #[test]
1216        fn test_force_back_and_first_is_async() {
1217            let logger = Arc::new(Mutex::new(Vec::new()));
1218
1219            {
1220                let mut manager = DataConnManager::new();
1221
1222                let conn = AsyncDataConn::new(1, logger.clone(), Fail::Not);
1223                let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
1224                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1225                manager.add(nnptr);
1226
1227                let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
1228                let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
1229                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1230                manager.add(nnptr);
1231
1232                assert!(manager.commit().is_ok());
1233                manager.rollback();
1234            }
1235
1236            assert_eq!(
1237                *logger.lock().unwrap(),
1238                &[
1239                    "AsyncDataConn::new 1",
1240                    "SyncDataConn::new 2",
1241                    "SyncDataConn::pre_commit 2",
1242                    "AsyncDataConn::pre_commit 1",
1243                    "SyncDataConn::commit 2",
1244                    "AsyncDataConn::commit 1",
1245                    "SyncDataConn::post_commit 2",
1246                    "AsyncDataConn::post_commit 1",
1247                    "SyncDataConn::force_back 2",
1248                    "AsyncDataConn::force_back 1",
1249                    "AsyncDataConn::close 1",
1250                    "AsyncDataConn::drop 1",
1251                    "SyncDataConn::close 2",
1252                    "SyncDataConn::drop 2",
1253                ]
1254            );
1255        }
1256    }
1257}