Skip to main content

sabi/tokio/
data_conn.rs

1// Copyright (C) 2024-2026 Takayuki Sato. All Rights Reserved.
2// This program is free software under MIT License.
3// See the file LICENSE in this distribution for more details.
4
5use super::{AsyncGroup, DataConn, DataConnContainer, DataConnManager};
6
7use std::collections::HashMap;
8use std::future::Future;
9use std::pin::Pin;
10use std::sync::Arc;
11use std::{any, mem, ptr};
12
13/// Represents errors that can occur during data connection operations.
14#[allow(clippy::enum_variant_names)]
15#[derive(Debug)]
16pub enum DataConnError {
17    /// An error indicating that one or more data connections failed during the pre-commit process.
18    FailToPreCommitDataConn {
19        /// A vector of errors, each containing the name of the data connection and the error itself.
20        errors: Vec<(Arc<str>, errs::Err)>,
21    },
22
23    /// An error indicating that one or more data connections failed during the commit process.
24    FailToCommitDataConn {
25        /// A vector of errors, each containing the name of the data connection and the error itself.
26        errors: Vec<(Arc<str>, errs::Err)>,
27    },
28
29    /// An error indicating that a data connection could not be cast to the target type.
30    FailToCastDataConn {
31        /// The name of the data connection that failed to cast.
32        name: Arc<str>,
33        /// The string representation of the target type to which the connection could not be cast.
34        target_type: &'static str,
35    },
36}
37
38impl<C> DataConnContainer<C>
39where
40    C: DataConn + 'static,
41{
42    pub(crate) fn new(name: impl Into<Arc<str>>, data_conn: Box<C>) -> Self {
43        Self {
44            drop_fn: drop_data_conn::<C>,
45            is_fn: is_data_conn::<C>,
46            commit_fn: commit_data_conn_async::<C>,
47            pre_commit_fn: pre_commit_data_conn_async::<C>,
48            post_commit_fn: post_commit_data_conn_async::<C>,
49            should_force_back_fn: should_force_back_data_conn::<C>,
50            rollback_fn: rollback_data_conn_async::<C>,
51            force_back_fn: force_back_data_conn_async::<C>,
52            close_fn: close_data_conn::<C>,
53
54            name: name.into(),
55            data_conn,
56        }
57    }
58}
59
60fn drop_data_conn<C>(ptr: *const DataConnContainer)
61where
62    C: DataConn + 'static,
63{
64    unsafe {
65        drop(Box::from_raw(ptr as *mut DataConnContainer<C>));
66    }
67}
68
69fn is_data_conn<C>(type_id: any::TypeId) -> bool
70where
71    C: DataConn + 'static,
72{
73    any::TypeId::of::<C>() == type_id
74}
75
76fn commit_data_conn_async<C>(
77    ptr: *const DataConnContainer,
78    ag: &mut AsyncGroup,
79) -> Pin<Box<dyn Future<Output = errs::Result<()>> + '_>>
80where
81    C: DataConn + 'static,
82{
83    let container = unsafe { &mut *(ptr as *mut DataConnContainer<C>) };
84    Box::pin(container.data_conn.commit_async(ag))
85}
86
87fn pre_commit_data_conn_async<C>(
88    ptr: *const DataConnContainer,
89    ag: &mut AsyncGroup,
90) -> Pin<Box<dyn Future<Output = errs::Result<()>> + '_>>
91where
92    C: DataConn + 'static,
93{
94    let container = unsafe { &mut *(ptr as *mut DataConnContainer<C>) };
95    Box::pin(container.data_conn.pre_commit_async(ag))
96}
97
98fn post_commit_data_conn_async<C>(
99    ptr: *const DataConnContainer,
100    ag: &mut AsyncGroup,
101) -> Pin<Box<dyn Future<Output = ()> + '_>>
102where
103    C: DataConn + 'static,
104{
105    let container = unsafe { &mut *(ptr as *mut DataConnContainer<C>) };
106    Box::pin(container.data_conn.post_commit_async(ag))
107}
108
109fn should_force_back_data_conn<C>(ptr: *const DataConnContainer) -> bool
110where
111    C: DataConn + 'static,
112{
113    let container = unsafe { &*(ptr as *const DataConnContainer<C>) };
114    container.data_conn.should_force_back()
115}
116
117fn rollback_data_conn_async<C>(
118    ptr: *const DataConnContainer,
119    ag: &mut AsyncGroup,
120) -> Pin<Box<dyn Future<Output = ()> + '_>>
121where
122    C: DataConn + 'static,
123{
124    let container = unsafe { &mut *(ptr as *mut DataConnContainer<C>) };
125    Box::pin(container.data_conn.rollback_async(ag))
126}
127
128fn force_back_data_conn_async<C>(
129    ptr: *const DataConnContainer,
130    ag: &mut AsyncGroup,
131) -> Pin<Box<dyn Future<Output = ()> + '_>>
132where
133    C: DataConn + 'static,
134{
135    let container = unsafe { &mut *(ptr as *mut DataConnContainer<C>) };
136    Box::pin(container.data_conn.force_back_async(ag))
137}
138
139fn close_data_conn<C>(ptr: *const DataConnContainer)
140where
141    C: DataConn + 'static,
142{
143    let container = unsafe { &mut *(ptr as *mut DataConnContainer<C>) };
144    container.data_conn.close();
145}
146
147impl DataConnManager {
148    pub(crate) fn new() -> Self {
149        Self {
150            vec: Vec::new(),
151            index_map: HashMap::new(),
152        }
153    }
154
155    pub(crate) fn with_commit_order(names: &[&str]) -> Self {
156        let mut index_map = HashMap::with_capacity(names.len());
157        // To overwrite later indexed elements with eariler ones when names overlap
158        for (i, nm) in names.iter().rev().enumerate() {
159            index_map.insert((*nm).into(), names.len() - 1 - i);
160        }
161
162        Self {
163            vec: vec![None; names.len()],
164            index_map,
165        }
166    }
167
168    pub(crate) fn add(&mut self, nnptr: ptr::NonNull<DataConnContainer>) {
169        let name = unsafe { (*nnptr.as_ptr()).name.clone() };
170        if let Some(index) = self.index_map.get(&name) {
171            self.vec[*index] = Some(nnptr);
172        } else {
173            let index = self.vec.len();
174            self.vec.push(Some(nnptr));
175            self.index_map.insert(name.clone(), index);
176        }
177    }
178
179    pub(crate) fn find_by_name(
180        &self,
181        name: impl AsRef<str>,
182    ) -> Option<ptr::NonNull<DataConnContainer>> {
183        if let Some(index) = self.index_map.get(name.as_ref()) {
184            if *index < self.vec.len() {
185                if let Some(nnptr) = self.vec[*index] {
186                    let ptr = nnptr.as_ptr();
187                    let cont_name = unsafe { &(*ptr).name };
188                    if cont_name.as_ref() == name.as_ref() {
189                        return Some(nnptr);
190                    }
191                }
192            }
193        }
194
195        None
196    }
197
198    pub(crate) fn to_typed_ptr<C>(
199        nnptr: &ptr::NonNull<DataConnContainer>,
200    ) -> errs::Result<*mut DataConnContainer<C>>
201    where
202        C: DataConn + 'static,
203    {
204        let ptr = nnptr.as_ptr();
205        let name = unsafe { &(*ptr).name };
206        let type_id = any::TypeId::of::<C>();
207        let is_fn = unsafe { (*ptr).is_fn };
208
209        if !is_fn(type_id) {
210            return Err(errs::Err::new(DataConnError::FailToCastDataConn {
211                name: name.clone(),
212                target_type: any::type_name::<C>(),
213            }));
214        }
215
216        let typed_ptr = ptr as *mut DataConnContainer<C>;
217        Ok(typed_ptr)
218    }
219
220    pub(crate) async fn commit_async(&self) -> errs::Result<()> {
221        let mut errors = Vec::new();
222
223        let mut ag = AsyncGroup::new();
224        for nnptr in self.vec.iter().flatten() {
225            let ptr = nnptr.as_ptr();
226            let pre_commit_fn = unsafe { (*ptr).pre_commit_fn };
227            ag._name = unsafe { (*ptr).name.clone() };
228            if let Err(err) = pre_commit_fn(ptr, &mut ag).await {
229                errors.push((ag._name.clone(), err));
230                break;
231            }
232        }
233        ag.join_and_collect_errors_async(&mut errors).await;
234
235        if !errors.is_empty() {
236            return Err(errs::Err::new(DataConnError::FailToPreCommitDataConn {
237                errors,
238            }));
239        }
240
241        let mut ag = AsyncGroup::new();
242        for nnptr in self.vec.iter().flatten() {
243            let ptr = nnptr.as_ptr();
244            let commit_fn = unsafe { (*ptr).commit_fn };
245            ag._name = unsafe { (*ptr).name.clone() };
246            if let Err(err) = commit_fn(ptr, &mut ag).await {
247                errors.push((ag._name.clone(), err));
248                break;
249            }
250        }
251        ag.join_and_collect_errors_async(&mut errors).await;
252
253        if !errors.is_empty() {
254            return Err(errs::Err::new(DataConnError::FailToCommitDataConn {
255                errors,
256            }));
257        }
258
259        let mut ag = AsyncGroup::new();
260        for nnptr in self.vec.iter().flatten() {
261            let ptr = nnptr.as_ptr();
262            let post_commit_fn = unsafe { (*ptr).post_commit_fn };
263            ag._name = unsafe { (*ptr).name.clone() };
264            post_commit_fn(ptr, &mut ag).await;
265        }
266        ag.join_and_ignore_errors_async().await;
267
268        Ok(())
269    }
270
271    pub(crate) async fn rollback_async(&mut self) {
272        let mut ag = AsyncGroup::new();
273        for nnptr in self.vec.iter().flatten() {
274            let ptr = nnptr.as_ptr();
275            let should_force_back_fn = unsafe { (*ptr).should_force_back_fn };
276            let force_back_fn = unsafe { (*ptr).force_back_fn };
277            let rollback_fn = unsafe { (*ptr).rollback_fn };
278            ag._name = unsafe { (*ptr).name.clone() };
279
280            if should_force_back_fn(ptr) {
281                force_back_fn(ptr, &mut ag).await;
282            } else {
283                rollback_fn(ptr, &mut ag).await;
284            }
285        }
286        ag.join_and_ignore_errors_async().await;
287    }
288
289    pub(crate) fn close(&mut self) {
290        self.index_map.clear();
291
292        let vec: Vec<Option<ptr::NonNull<DataConnContainer>>> = mem::take(&mut self.vec);
293
294        for nnptr in vec.iter().flatten() {
295            let ptr = nnptr.as_ptr();
296            let close_fn = unsafe { (*ptr).close_fn };
297            let drop_fn = unsafe { (*ptr).drop_fn };
298            close_fn(ptr);
299            drop_fn(ptr);
300        }
301    }
302}
303
304impl Drop for DataConnManager {
305    fn drop(&mut self) {
306        self.close();
307    }
308}
309
310#[cfg(test)]
311mod tests_of_data_conn {
312    use super::*;
313    use std::sync::{
314        atomic::{AtomicBool, Ordering},
315        Arc, Mutex,
316    };
317    use tokio::time;
318
319    #[derive(PartialEq, Copy, Clone)]
320    enum Fail {
321        Not,
322        Commit,
323        PreCommit,
324    }
325
326    struct SyncDataConn {
327        id: i8,
328        committed: AtomicBool,
329        fail: Fail,
330        logger: Arc<Mutex<Vec<String>>>,
331    }
332    impl SyncDataConn {
333        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: Fail) -> Self {
334            logger
335                .lock()
336                .unwrap()
337                .push(format!("SyncDataConn::new {}", id));
338            Self {
339                id,
340                committed: AtomicBool::new(false),
341                fail,
342                logger,
343            }
344        }
345    }
346    impl Drop for SyncDataConn {
347        fn drop(&mut self) {
348            self.logger
349                .lock()
350                .unwrap()
351                .push(format!("SyncDataConn::drop {}", self.id));
352        }
353    }
354    impl DataConn for SyncDataConn {
355        async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
356            let fail = self.fail;
357            let id = self.id;
358            let logger = self.logger.clone();
359            let committed = &self.committed;
360
361            if fail == Fail::Commit {
362                logger
363                    .lock()
364                    .unwrap()
365                    .push(format!("SyncDataConn::commit {} failed", id));
366                return Err(errs::Err::new("ZZZ".to_string()));
367            }
368            committed.store(true, Ordering::Release);
369            logger
370                .lock()
371                .unwrap()
372                .push(format!("SyncDataConn::commit {}", id));
373            Ok(())
374        }
375        async fn pre_commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
376            let fail = self.fail;
377            let id = self.id;
378            let logger = self.logger.clone();
379
380            if fail == Fail::PreCommit {
381                logger
382                    .lock()
383                    .unwrap()
384                    .push(format!("SyncDataConn::pre_commit {} failed", id));
385                return Err(errs::Err::new("zzz".to_string()));
386            }
387            logger
388                .lock()
389                .unwrap()
390                .push(format!("SyncDataConn::pre_commit {}", id));
391            Ok(())
392        }
393        async fn post_commit_async(&mut self, _ag: &mut AsyncGroup) {
394            let id = self.id;
395            let logger = self.logger.clone();
396
397            logger
398                .lock()
399                .unwrap()
400                .push(format!("SyncDataConn::post_commit {}", id));
401        }
402        fn should_force_back(&self) -> bool {
403            self.committed.load(Ordering::Acquire)
404        }
405        async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {
406            let id = self.id;
407            let logger = self.logger.clone();
408
409            logger
410                .lock()
411                .unwrap()
412                .push(format!("SyncDataConn::rollback {}", id));
413        }
414        async fn force_back_async(&mut self, _ag: &mut AsyncGroup) {
415            let id = self.id;
416            let logger = self.logger.clone();
417
418            logger
419                .lock()
420                .unwrap()
421                .push(format!("SyncDataConn::force_back {}", id));
422        }
423        fn close(&mut self) {
424            self.logger
425                .lock()
426                .unwrap()
427                .push(format!("SyncDataConn::close {}", self.id));
428        }
429    }
430
431    struct AsyncDataConn {
432        id: i8,
433        committed: Arc<AtomicBool>,
434        fail: Fail,
435        logger: Arc<Mutex<Vec<String>>>,
436    }
437    impl AsyncDataConn {
438        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: Fail) -> Self {
439            logger
440                .lock()
441                .unwrap()
442                .push(format!("AsyncDataConn::new {}", id));
443            Self {
444                id,
445                committed: Arc::new(AtomicBool::new(false)),
446                fail,
447                logger,
448            }
449        }
450    }
451    impl Drop for AsyncDataConn {
452        fn drop(&mut self) {
453            self.logger
454                .lock()
455                .unwrap()
456                .push(format!("AsyncDataConn::drop {}", self.id));
457        }
458    }
459    impl DataConn for AsyncDataConn {
460        async fn commit_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
461            let fail = self.fail;
462            let id = self.id;
463            let logger = self.logger.clone();
464            let committed = self.committed.clone();
465
466            ag.add(async move {
467                time::sleep(time::Duration::from_millis(100)).await;
468                if fail == Fail::Commit {
469                    logger
470                        .lock()
471                        .unwrap()
472                        .push(format!("AsyncDataConn::commit {} failed", id));
473                    return Err(errs::Err::new("YYY".to_string()));
474                }
475                committed.store(true, Ordering::Release);
476                logger
477                    .lock()
478                    .unwrap()
479                    .push(format!("AsyncDataConn::commit {}", id));
480                Ok(())
481            });
482            Ok(())
483        }
484        async fn pre_commit_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
485            let fail = self.fail;
486            let id = self.id;
487            let logger = self.logger.clone();
488
489            ag.add(async move {
490                time::sleep(time::Duration::from_millis(100)).await;
491                if fail == Fail::PreCommit {
492                    logger
493                        .lock()
494                        .unwrap()
495                        .push(format!("AsyncDataConn::pre_commit {} failed", id));
496                    return Err(errs::Err::new("yyy".to_string()));
497                }
498                logger
499                    .lock()
500                    .unwrap()
501                    .push(format!("AsyncDataConn::pre_commit {}", id));
502                Ok(())
503            });
504            Ok(())
505        }
506        async fn post_commit_async(&mut self, ag: &mut AsyncGroup) {
507            let logger = self.logger.clone();
508            let id = self.id;
509
510            ag.add(async move {
511                time::sleep(time::Duration::from_millis(100)).await;
512                logger
513                    .lock()
514                    .unwrap()
515                    .push(format!("AsyncDataConn::post_commit {}", id));
516                Ok(())
517            });
518        }
519        fn should_force_back(&self) -> bool {
520            self.committed.load(Ordering::Acquire)
521        }
522        async fn rollback_async(&mut self, ag: &mut AsyncGroup) {
523            let logger = self.logger.clone();
524            let id = self.id;
525
526            ag.add(async move {
527                time::sleep(time::Duration::from_millis(100)).await;
528                logger
529                    .lock()
530                    .unwrap()
531                    .push(format!("AsyncDataConn::rollback {}", id));
532                Ok(())
533            });
534        }
535        async fn force_back_async(&mut self, ag: &mut AsyncGroup) {
536            let logger = self.logger.clone();
537            let id = self.id;
538
539            ag.add(async move {
540                time::sleep(time::Duration::from_millis(100)).await;
541                logger
542                    .lock()
543                    .unwrap()
544                    .push(format!("AsyncDataConn::force_back {}", id));
545                Ok(())
546            });
547        }
548        fn close(&mut self) {
549            self.logger
550                .lock()
551                .unwrap()
552                .push(format!("AsyncDataConn::close {}", self.id));
553        }
554    }
555
556    mod tests_of_data_conn_manager {
557        use super::*;
558
559        #[tokio::test]
560        async fn test_new() {
561            let manager = DataConnManager::new();
562            assert!(manager.vec.is_empty());
563        }
564
565        #[tokio::test]
566        async fn test_with_commit_order() {
567            let manager = DataConnManager::with_commit_order(&["bar", "baz", "foo"]);
568            assert_eq!(manager.vec, vec![None, None, None]);
569            assert_eq!(manager.index_map.len(), 3);
570            assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
571            assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
572            assert_eq!(*manager.index_map.get("baz").unwrap(), 1);
573        }
574
575        #[test]
576        fn test_new_and_add() {
577            let logger = Arc::new(Mutex::new(Vec::new()));
578
579            let mut manager = DataConnManager::new();
580            assert!(manager.vec.is_empty());
581            assert!(manager.index_map.is_empty());
582
583            let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
584            let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
585            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
586            manager.add(nnptr);
587            assert_eq!(manager.vec.len(), 1);
588            assert_eq!(manager.index_map.len(), 1);
589            assert_eq!(*manager.index_map.get("foo").unwrap(), 0);
590
591            let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
592            let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
593            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
594            manager.add(nnptr);
595            assert_eq!(manager.vec.len(), 2);
596            assert_eq!(manager.index_map.len(), 2);
597            assert_eq!(*manager.index_map.get("foo").unwrap(), 0);
598            assert_eq!(*manager.index_map.get("bar").unwrap(), 1);
599        }
600
601        #[test]
602        fn test_with_commit_order_and_add() {
603            let logger = Arc::new(Mutex::new(Vec::new()));
604
605            let mut manager = DataConnManager::with_commit_order(&["bar", "baz", "foo"]);
606            assert_eq!(manager.vec, vec![None, None, None]);
607            assert_eq!(manager.index_map.len(), 3);
608            assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
609            assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
610            assert_eq!(*manager.index_map.get("baz").unwrap(), 1);
611
612            let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
613            let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
614            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
615            manager.add(nnptr);
616            assert_eq!(manager.vec.len(), 3);
617            assert_eq!(manager.index_map.len(), 3);
618            assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
619
620            let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
621            let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
622            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
623            manager.add(nnptr);
624            assert_eq!(manager.vec.len(), 3);
625            assert_eq!(manager.index_map.len(), 3);
626            assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
627            assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
628
629            let conn = SyncDataConn::new(3, logger.clone(), Fail::Not);
630            let boxed = Box::new(DataConnContainer::new("qux", Box::new(conn)));
631            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
632            manager.add(nnptr);
633            assert_eq!(manager.vec.len(), 4);
634            assert_eq!(manager.index_map.len(), 4);
635            assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
636            assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
637            assert_eq!(*manager.index_map.get("qux").unwrap(), 3);
638        }
639
640        #[test]
641        fn test_find_by_name_but_none() {
642            let manager = DataConnManager::new();
643            assert!(manager.find_by_name("foo").is_none());
644            assert!(manager.find_by_name("bar").is_none());
645        }
646
647        #[test]
648        fn test_find_by_name_and_found() {
649            let logger = Arc::new(Mutex::new(Vec::new()));
650
651            let mut manager = DataConnManager::new();
652
653            let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
654            let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
655            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
656            manager.add(nnptr);
657
658            let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
659            let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
660            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
661            manager.add(nnptr);
662
663            if let Some(nnptr) = manager.find_by_name("foo") {
664                let name = unsafe { (*nnptr.as_ptr()).name.clone() };
665                assert_eq!(name.as_ref(), "foo");
666            } else {
667                panic!();
668            }
669
670            if let Some(nnptr) = manager.find_by_name("bar") {
671                let name = unsafe { (*nnptr.as_ptr()).name.clone() };
672                assert_eq!(name.as_ref(), "bar");
673            } else {
674                panic!();
675            }
676        }
677
678        #[test]
679        fn test_to_typed_ptr() {
680            let logger = Arc::new(Mutex::new(Vec::new()));
681
682            let mut manager = DataConnManager::new();
683
684            let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
685            let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
686            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
687            manager.add(nnptr);
688
689            let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
690            let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
691            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
692            manager.add(nnptr);
693
694            let nnptr = manager.find_by_name("foo").unwrap();
695            if let Ok(typed_nnptr) = DataConnManager::to_typed_ptr::<SyncDataConn>(&nnptr) {
696                assert_eq!(any::type_name_of_val(&typed_nnptr), "*mut sabi::tokio::DataConnContainer<sabi::tokio::data_conn::tests_of_data_conn::SyncDataConn>");
697                assert_eq!(unsafe { (*typed_nnptr).name.clone() }, "foo".into());
698            } else {
699                panic!();
700            }
701
702            let nnptr = manager.find_by_name("bar").unwrap();
703            if let Ok(typed_nnptr) = DataConnManager::to_typed_ptr::<AsyncDataConn>(&nnptr) {
704                assert_eq!(any::type_name_of_val(&typed_nnptr), "*mut sabi::tokio::DataConnContainer<sabi::tokio::data_conn::tests_of_data_conn::AsyncDataConn>");
705                assert_eq!(unsafe { (*typed_nnptr).name.clone() }, "bar".into());
706            } else {
707                panic!();
708            }
709        }
710
711        #[test]
712        fn test_to_typed_ptr_but_fail() {
713            let logger = Arc::new(Mutex::new(Vec::new()));
714
715            let mut manager = DataConnManager::new();
716
717            let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
718            let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
719            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
720            manager.add(nnptr);
721
722            let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
723            let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
724            let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
725            manager.add(nnptr);
726
727            let nnptr = manager.find_by_name("foo").unwrap();
728            if let Err(err) = DataConnManager::to_typed_ptr::<AsyncDataConn>(&nnptr) {
729                match err.reason::<DataConnError>() {
730                    Ok(DataConnError::FailToCastDataConn { name, target_type }) => {
731                        assert_eq!(name.as_ref(), "foo");
732                        assert_eq!(
733                            *target_type,
734                            "sabi::tokio::data_conn::tests_of_data_conn::AsyncDataConn"
735                        );
736                    }
737                    _ => panic!(),
738                }
739            } else {
740                panic!();
741            }
742
743            let nnptr = manager.find_by_name("bar").unwrap();
744            if let Err(err) = DataConnManager::to_typed_ptr::<SyncDataConn>(&nnptr) {
745                match err.reason::<DataConnError>() {
746                    Ok(DataConnError::FailToCastDataConn { name, target_type }) => {
747                        assert_eq!(name.as_ref(), "bar");
748                        assert_eq!(
749                            *target_type,
750                            "sabi::tokio::data_conn::tests_of_data_conn::SyncDataConn"
751                        );
752                    }
753                    _ => panic!(),
754                }
755            } else {
756                panic!();
757            }
758        }
759
760        #[tokio::test]
761        async fn test_commit_ok() {
762            let logger = Arc::new(Mutex::new(Vec::new()));
763
764            {
765                let mut manager = DataConnManager::new();
766
767                let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
768                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
769                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
770                manager.add(nnptr);
771
772                let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
773                let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
774                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
775                manager.add(nnptr);
776
777                assert!(manager.commit_async().await.is_ok());
778            }
779
780            assert_eq!(
781                *logger.lock().unwrap(),
782                &[
783                    "SyncDataConn::new 1",
784                    "AsyncDataConn::new 2",
785                    "SyncDataConn::pre_commit 1",
786                    "AsyncDataConn::pre_commit 2",
787                    "SyncDataConn::commit 1",
788                    "AsyncDataConn::commit 2",
789                    "SyncDataConn::post_commit 1",
790                    "AsyncDataConn::post_commit 2",
791                    "SyncDataConn::close 1",
792                    "SyncDataConn::drop 1",
793                    "AsyncDataConn::close 2",
794                    "AsyncDataConn::drop 2",
795                ]
796            );
797        }
798
799        #[tokio::test]
800        async fn test_commit_with_order() {
801            let logger = Arc::new(Mutex::new(Vec::new()));
802
803            {
804                let mut manager = DataConnManager::with_commit_order(&["bar", "baz", "foo"]);
805
806                let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
807                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
808                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
809                manager.add(nnptr);
810
811                let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
812                let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
813                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
814                manager.add(nnptr);
815
816                let conn = SyncDataConn::new(3, logger.clone(), Fail::Not);
817                let boxed = Box::new(DataConnContainer::new("qux", Box::new(conn)));
818                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
819                manager.add(nnptr);
820
821                assert!(manager.commit_async().await.is_ok());
822            }
823
824            assert_eq!(
825                *logger.lock().unwrap(),
826                &[
827                    "SyncDataConn::new 1",
828                    "AsyncDataConn::new 2",
829                    "SyncDataConn::new 3",
830                    "SyncDataConn::pre_commit 1",
831                    "SyncDataConn::pre_commit 3",
832                    "AsyncDataConn::pre_commit 2", // because of async
833                    "SyncDataConn::commit 1",
834                    "SyncDataConn::commit 3",
835                    "AsyncDataConn::commit 2", // because of async
836                    "SyncDataConn::post_commit 1",
837                    "SyncDataConn::post_commit 3",
838                    "AsyncDataConn::post_commit 2", // because of async
839                    "AsyncDataConn::close 2",
840                    "AsyncDataConn::drop 2",
841                    "SyncDataConn::close 1",
842                    "SyncDataConn::drop 1",
843                    "SyncDataConn::close 3",
844                    "SyncDataConn::drop 3",
845                ]
846            );
847        }
848
849        #[tokio::test]
850        async fn test_commit_but_fail_first_sync_pre_commit() {
851            let logger = Arc::new(Mutex::new(Vec::new()));
852
853            {
854                let mut manager = DataConnManager::new();
855
856                let conn = SyncDataConn::new(1, logger.clone(), Fail::PreCommit);
857                let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
858                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
859                manager.add(nnptr);
860
861                let conn = AsyncDataConn::new(2, logger.clone(), Fail::PreCommit);
862                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
863                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
864                manager.add(nnptr);
865
866                if let Err(e) = manager.commit_async().await {
867                    match e.reason::<DataConnError>() {
868                        Ok(DataConnError::FailToPreCommitDataConn { errors }) => {
869                            assert_eq!(errors.len(), 1);
870                            assert_eq!(errors[0].0, "foo".into());
871                            assert_eq!(errors[0].1.reason::<String>().unwrap(), "zzz");
872                        }
873                        _ => panic!(),
874                    }
875                } else {
876                    panic!();
877                }
878            }
879
880            assert_eq!(
881                *logger.lock().unwrap(),
882                &[
883                    "SyncDataConn::new 1",
884                    "AsyncDataConn::new 2",
885                    "SyncDataConn::pre_commit 1 failed",
886                    "SyncDataConn::close 1",
887                    "SyncDataConn::drop 1",
888                    "AsyncDataConn::close 2",
889                    "AsyncDataConn::drop 2",
890                ]
891            );
892        }
893
894        #[tokio::test]
895        async fn test_commit_but_fail_first_async_pre_commit() {
896            let logger = Arc::new(Mutex::new(Vec::new()));
897
898            {
899                let mut manager = DataConnManager::new();
900
901                let conn = SyncDataConn::new(1, logger.clone(), Fail::PreCommit);
902                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
903                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
904                manager.add(nnptr);
905
906                let conn = AsyncDataConn::new(2, logger.clone(), Fail::PreCommit);
907                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
908                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
909                manager.add(nnptr);
910
911                if let Err(e) = manager.commit_async().await {
912                    match e.reason::<DataConnError>() {
913                        Ok(DataConnError::FailToPreCommitDataConn { errors }) => {
914                            assert_eq!(errors.len(), 1);
915                            assert_eq!(errors[0].0, "foo".into());
916                            assert_eq!(errors[0].1.reason::<String>().unwrap(), "zzz");
917                        }
918                        _ => panic!(),
919                    }
920                } else {
921                    panic!();
922                }
923            }
924
925            assert_eq!(
926                *logger.lock().unwrap(),
927                &[
928                    "SyncDataConn::new 1",
929                    "AsyncDataConn::new 2",
930                    "SyncDataConn::pre_commit 1 failed",
931                    "SyncDataConn::close 1",
932                    "SyncDataConn::drop 1",
933                    "AsyncDataConn::close 2",
934                    "AsyncDataConn::drop 2",
935                ]
936            );
937        }
938
939        #[tokio::test]
940        async fn test_commit_but_fail_second_pre_commit() {
941            let logger = Arc::new(Mutex::new(Vec::new()));
942
943            {
944                let mut manager = DataConnManager::new();
945
946                let conn = AsyncDataConn::new(1, logger.clone(), Fail::PreCommit);
947                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
948                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
949                manager.add(nnptr);
950
951                let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
952                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
953                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
954                manager.add(nnptr);
955
956                if let Err(e) = manager.commit_async().await {
957                    match e.reason::<DataConnError>() {
958                        Ok(DataConnError::FailToPreCommitDataConn { errors }) => {
959                            assert_eq!(errors.len(), 1);
960                            assert_eq!(errors[0].0, "foo".into());
961                            assert_eq!(errors[0].1.reason::<String>().unwrap(), "yyy");
962                        }
963                        _ => panic!(),
964                    }
965                } else {
966                    panic!();
967                }
968            }
969
970            assert_eq!(
971                *logger.lock().unwrap(),
972                &[
973                    "AsyncDataConn::new 1",
974                    "SyncDataConn::new 2",
975                    "SyncDataConn::pre_commit 2",
976                    "AsyncDataConn::pre_commit 1 failed",
977                    "AsyncDataConn::close 1",
978                    "AsyncDataConn::drop 1",
979                    "SyncDataConn::close 2",
980                    "SyncDataConn::drop 2",
981                ]
982            );
983        }
984
985        #[tokio::test]
986        async fn test_commit_but_fail_first_sync_commit() {
987            let logger = Arc::new(Mutex::new(Vec::new()));
988
989            {
990                let mut manager = DataConnManager::new();
991
992                let conn = SyncDataConn::new(1, logger.clone(), Fail::Commit);
993                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
994                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
995                manager.add(nnptr);
996
997                let conn = AsyncDataConn::new(2, logger.clone(), Fail::Commit);
998                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
999                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1000                manager.add(nnptr);
1001
1002                if let Err(e) = manager.commit_async().await {
1003                    match e.reason::<DataConnError>() {
1004                        Ok(DataConnError::FailToCommitDataConn { errors }) => {
1005                            assert_eq!(errors.len(), 1);
1006                            assert_eq!(errors[0].0, "foo".into());
1007                            assert_eq!(errors[0].1.reason::<String>().unwrap(), "ZZZ");
1008                        }
1009                        _ => panic!(),
1010                    }
1011                } else {
1012                    panic!();
1013                }
1014            }
1015
1016            assert_eq!(
1017                *logger.lock().unwrap(),
1018                &[
1019                    "SyncDataConn::new 1",
1020                    "AsyncDataConn::new 2",
1021                    "SyncDataConn::pre_commit 1",
1022                    "AsyncDataConn::pre_commit 2",
1023                    "SyncDataConn::commit 1 failed",
1024                    "SyncDataConn::close 1",
1025                    "SyncDataConn::drop 1",
1026                    "AsyncDataConn::close 2",
1027                    "AsyncDataConn::drop 2",
1028                ]
1029            );
1030        }
1031
1032        #[tokio::test]
1033        async fn test_commit_but_fail_first_async_commit() {
1034            let logger = Arc::new(Mutex::new(Vec::new()));
1035
1036            {
1037                let mut manager = DataConnManager::new();
1038
1039                let conn = AsyncDataConn::new(1, logger.clone(), Fail::Commit);
1040                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1041                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1042                manager.add(nnptr);
1043
1044                let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
1045                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1046                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1047                manager.add(nnptr);
1048
1049                if let Err(e) = manager.commit_async().await {
1050                    match e.reason::<DataConnError>() {
1051                        Ok(DataConnError::FailToCommitDataConn { errors }) => {
1052                            assert_eq!(errors.len(), 1);
1053                            assert_eq!(errors[0].0, "foo".into());
1054                            assert_eq!(errors[0].1.reason::<String>().unwrap(), "YYY");
1055                        }
1056                        _ => panic!(),
1057                    }
1058                } else {
1059                    panic!();
1060                }
1061            }
1062
1063            assert_eq!(
1064                *logger.lock().unwrap(),
1065                &[
1066                    "AsyncDataConn::new 1",
1067                    "SyncDataConn::new 2",
1068                    "SyncDataConn::pre_commit 2",
1069                    "AsyncDataConn::pre_commit 1",
1070                    "SyncDataConn::commit 2",
1071                    "AsyncDataConn::commit 1 failed",
1072                    "AsyncDataConn::close 1",
1073                    "AsyncDataConn::drop 1",
1074                    "SyncDataConn::close 2",
1075                    "SyncDataConn::drop 2",
1076                ]
1077            );
1078        }
1079
1080        #[tokio::test]
1081        async fn test_commit_but_fail_second_commit() {
1082            let logger = Arc::new(Mutex::new(Vec::new()));
1083
1084            {
1085                let mut manager = DataConnManager::new();
1086
1087                let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
1088                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1089                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1090                manager.add(nnptr);
1091
1092                let conn = AsyncDataConn::new(2, logger.clone(), Fail::Commit);
1093                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1094                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1095                manager.add(nnptr);
1096
1097                if let Err(e) = manager.commit_async().await {
1098                    match e.reason::<DataConnError>() {
1099                        Ok(DataConnError::FailToCommitDataConn { errors }) => {
1100                            assert_eq!(errors.len(), 1);
1101                            assert_eq!(errors[0].0, "bar".into());
1102                            assert_eq!(errors[0].1.reason::<String>().unwrap(), "YYY");
1103                        }
1104                        _ => panic!(),
1105                    }
1106                } else {
1107                    panic!();
1108                }
1109            }
1110
1111            assert_eq!(
1112                *logger.lock().unwrap(),
1113                &[
1114                    "SyncDataConn::new 1",
1115                    "AsyncDataConn::new 2",
1116                    "SyncDataConn::pre_commit 1",
1117                    "AsyncDataConn::pre_commit 2",
1118                    "SyncDataConn::commit 1",
1119                    "AsyncDataConn::commit 2 failed",
1120                    "SyncDataConn::close 1",
1121                    "SyncDataConn::drop 1",
1122                    "AsyncDataConn::close 2",
1123                    "AsyncDataConn::drop 2",
1124                ]
1125            );
1126        }
1127
1128        #[tokio::test]
1129        async fn test_rollback_and_first_is_sync() {
1130            let logger = Arc::new(Mutex::new(Vec::new()));
1131
1132            {
1133                let mut manager = DataConnManager::new();
1134
1135                let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
1136                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1137                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1138                manager.add(nnptr);
1139
1140                let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
1141                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1142                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1143                manager.add(nnptr);
1144
1145                manager.rollback_async().await;
1146            }
1147
1148            assert_eq!(
1149                *logger.lock().unwrap(),
1150                &[
1151                    "SyncDataConn::new 1",
1152                    "AsyncDataConn::new 2",
1153                    "SyncDataConn::rollback 1",
1154                    "AsyncDataConn::rollback 2",
1155                    "SyncDataConn::close 1",
1156                    "SyncDataConn::drop 1",
1157                    "AsyncDataConn::close 2",
1158                    "AsyncDataConn::drop 2",
1159                ]
1160            );
1161        }
1162
1163        #[tokio::test]
1164        async fn test_rollback_and_first_is_async() {
1165            let logger = Arc::new(Mutex::new(Vec::new()));
1166
1167            {
1168                let mut manager = DataConnManager::new();
1169
1170                let conn = AsyncDataConn::new(1, logger.clone(), Fail::Not);
1171                let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
1172                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1173                manager.add(nnptr);
1174
1175                let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
1176                let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1177                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1178                manager.add(nnptr);
1179
1180                manager.rollback_async().await;
1181            }
1182
1183            assert_eq!(
1184                *logger.lock().unwrap(),
1185                &[
1186                    "AsyncDataConn::new 1",
1187                    "SyncDataConn::new 2",
1188                    "SyncDataConn::rollback 2",
1189                    "AsyncDataConn::rollback 1",
1190                    "AsyncDataConn::close 1",
1191                    "AsyncDataConn::drop 1",
1192                    "SyncDataConn::close 2",
1193                    "SyncDataConn::drop 2",
1194                ]
1195            );
1196        }
1197
1198        #[tokio::test]
1199        async fn test_force_back_and_first_is_sync() {
1200            let logger = Arc::new(Mutex::new(Vec::new()));
1201
1202            {
1203                let mut manager = DataConnManager::new();
1204
1205                let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
1206                let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1207                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1208                manager.add(nnptr);
1209
1210                let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
1211                let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
1212                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1213                manager.add(nnptr);
1214
1215                assert!(manager.commit_async().await.is_ok());
1216                manager.rollback_async().await;
1217            }
1218
1219            assert_eq!(
1220                *logger.lock().unwrap(),
1221                &[
1222                    "SyncDataConn::new 1",
1223                    "AsyncDataConn::new 2",
1224                    "SyncDataConn::pre_commit 1",
1225                    "AsyncDataConn::pre_commit 2",
1226                    "SyncDataConn::commit 1",
1227                    "AsyncDataConn::commit 2",
1228                    "SyncDataConn::post_commit 1",
1229                    "AsyncDataConn::post_commit 2",
1230                    "SyncDataConn::force_back 1",
1231                    "AsyncDataConn::force_back 2",
1232                    "SyncDataConn::close 1",
1233                    "SyncDataConn::drop 1",
1234                    "AsyncDataConn::close 2",
1235                    "AsyncDataConn::drop 2",
1236                ]
1237            );
1238        }
1239
1240        #[tokio::test]
1241        async fn test_force_back_and_first_is_async() {
1242            let logger = Arc::new(Mutex::new(Vec::new()));
1243
1244            {
1245                let mut manager = DataConnManager::new();
1246
1247                let conn = AsyncDataConn::new(1, logger.clone(), Fail::Not);
1248                let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
1249                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1250                manager.add(nnptr);
1251
1252                let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
1253                let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
1254                let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1255                manager.add(nnptr);
1256
1257                assert!(manager.commit_async().await.is_ok());
1258                manager.rollback_async().await;
1259            }
1260
1261            assert_eq!(
1262                *logger.lock().unwrap(),
1263                &[
1264                    "AsyncDataConn::new 1",
1265                    "SyncDataConn::new 2",
1266                    "SyncDataConn::pre_commit 2",
1267                    "AsyncDataConn::pre_commit 1",
1268                    "SyncDataConn::commit 2",
1269                    "AsyncDataConn::commit 1",
1270                    "SyncDataConn::post_commit 2",
1271                    "AsyncDataConn::post_commit 1",
1272                    "SyncDataConn::force_back 2",
1273                    "AsyncDataConn::force_back 1",
1274                    "AsyncDataConn::close 1",
1275                    "AsyncDataConn::drop 1",
1276                    "SyncDataConn::close 2",
1277                    "SyncDataConn::drop 2",
1278                ]
1279            );
1280        }
1281    }
1282}