sabi/
data_hub.rs

1// Copyright (C) 2024-2025 Takayuki Sato. All Rights Reserved.
2// This program is free software under MIT License.
3// See the file LICENSE in this distribution for more details.
4
5use std::any;
6use std::collections::HashMap;
7use std::sync;
8
9use errs::Err;
10
11use crate::data_conn::DataConnList;
12use crate::data_src::DataSrcList;
13use crate::{AsyncGroup, DataConn, DataConnContainer, DataSrc, DataSrcContainer};
14
15static mut GLOBAL_DATA_SRC_LIST: DataSrcList = DataSrcList::new(false);
16
17#[cfg(not(test))]
18static GLOBAL_DATA_SRCS_FIXED: sync::OnceLock<()> = sync::OnceLock::new();
19#[cfg(test)]
20static GLOBAL_DATA_SRCS_FIXED: sync::atomic::AtomicBool = sync::atomic::AtomicBool::new(false);
21
22/// An enum type representing the reasons for errors that can occur within `DataHub` operations.
23#[derive(Debug)]
24pub enum DataHubError {
25    /// Indicates a failure during the setup process of one or more global data sources.
26    /// Contains a map of data source names to their corresponding errors.
27    FailToSetupGlobalDataSrcs {
28        /// The map contains errors that occurred in each `DataSrc` object.
29        errors: HashMap<String, Err>,
30    },
31
32    /// Indicates a failure during the setup process of one or more session-local data sources.
33    /// Contains a map of data source names to their corresponding errors.
34    FailToSetupLocalDataSrcs {
35        /// The map contains errors that occurred in each `DataSrc` object.
36        errors: HashMap<String, Err>,
37    },
38
39    /// Indicates a failure during the commit process of one or more `DataConn` instances
40    /// involved in a transaction. Contains a map of data connection names to their errors.
41    FailToCommitDataConn {
42        /// The map contains errors that occurred in each `DataConn` object.
43        errors: HashMap<String, Err>,
44    },
45
46    /// Indicates a failure during the pre commit process of one or more `DataConn` instances
47    /// involved in a transaction. Contains a map of data connection names to their errors.
48    FailToPreCommitDataConn {
49        /// The map contains errors that occurred in each `DataConn` object.
50        errors: HashMap<String, Err>,
51    },
52
53    /// Indicates that no `DataSrc` was found to create a `DataConn` for the specified name
54    /// and type.
55    NoDataSrcToCreateDataConn {
56        /// The name of the data source that could not be found.
57        name: String,
58
59        /// The type name of the `DataConn` that was requested.
60        data_conn_type: &'static str,
61    },
62
63    /// Indicates a failure to create a `DataConn` object.
64    FailToCreateDataConn {
65        /// The name of the data source that failed to be created.
66        name: String,
67
68        /// The type name of the `DataConn` that failed to be created.
69        data_conn_type: &'static str,
70    },
71
72    /// Indicates a failure to cast a retrieved `DataConn` to the expected type.
73    FailToCastDataConn {
74        /// The name of the data connection that failed to cast.
75        name: String,
76
77        /// The type name to which the `DataConn` attempted to cast.
78        cast_to_type: &'static str,
79    },
80}
81
82/// Registers a global data source that can be used throughout the application.
83///
84/// This function associates a given `DataSrc` implementation with a unique name.
85/// This name will later be used to retrieve session-specific `DataConn` instances
86/// from this data source.
87///
88/// Global data sources are set up once via the `setup` function and are available
89/// to all `DataHub` instances.
90///
91/// # Parameters
92///
93/// * `name`: The unique name for the data source.
94/// * `ds`: The `DataSrc` instance to register.
95pub fn uses<S, C>(name: &str, ds: S)
96where
97    S: DataSrc<C>,
98    C: DataConn + 'static,
99{
100    #[cfg(not(test))]
101    let fixed = GLOBAL_DATA_SRCS_FIXED.get().is_some();
102    #[cfg(test)]
103    let fixed = GLOBAL_DATA_SRCS_FIXED.load(sync::atomic::Ordering::Relaxed);
104
105    if !fixed {
106        #[allow(static_mut_refs)]
107        unsafe {
108            GLOBAL_DATA_SRC_LIST.add_data_src(name.to_string(), ds);
109        }
110    }
111}
112
113/// Executes the setup process for all globally registered data sources.
114///
115/// This setup typically involves tasks such as creating connection pools,
116/// opening global connections, or performing initial configurations necessary
117/// for creating session-specific connections. The setup can run synchronously
118/// or asynchronously using an `AsyncGroup` if operations are time-consuming.
119///
120/// If any data source fails to set up, this function returns an `Err` with
121/// `DataHubError::FailToSetupGlobalDataSrcs`, containing a map of the names
122/// of the failed data sources and their corresponding `Err` objects. In such a case,
123/// all global data sources that were successfully set up are also closed and dropped.
124///
125/// If all data source setups are successful, the `Result::Ok` which contains an object
126/// is returned. This object is designed to close and drop global data sources when
127/// it's dropped.
128/// Thanks to Rust's ownership mechanism, this ensures that the global data sources are
129/// automatically cleaned up when the return value goes out of scope.
130///
131/// **NOTE:** Do not receive the `Result` or its inner object into an anonymous
132/// variable using `let _ = ...`.
133/// If you do, the inner object is dropped immediately at that point.
134///
135/// # Returns
136///
137/// * `Result<AutoShutdown, Err>`: An `AutoShutdown` if all global data sources are
138///   set up successfully, or an `Err` if any setup fails.
139pub fn setup() -> Result<AutoShutdown, Err> {
140    #[cfg(not(test))]
141    let ok = GLOBAL_DATA_SRCS_FIXED.set(()).is_ok();
142    #[cfg(test)]
143    let ok = GLOBAL_DATA_SRCS_FIXED
144        .compare_exchange(
145            false,
146            true,
147            sync::atomic::Ordering::Relaxed,
148            sync::atomic::Ordering::Relaxed,
149        )
150        .is_ok();
151
152    if ok {
153        #[allow(static_mut_refs)]
154        let err_map = unsafe { GLOBAL_DATA_SRC_LIST.setup_data_srcs() };
155        if err_map.len() > 0 {
156            #[allow(static_mut_refs)]
157            unsafe {
158                GLOBAL_DATA_SRC_LIST.close_and_drop_data_srcs();
159            }
160            return Err(Err::new(DataHubError::FailToSetupGlobalDataSrcs {
161                errors: err_map,
162            }));
163        }
164    }
165
166    Ok(AutoShutdown {})
167}
168
169/// Executes asynchronously the setup process for all globally registered data sources.
170///
171/// This setup typically involves tasks such as creating connection pools,
172/// opening global connections, or performing initial configurations necessary
173/// for creating session-specific connections. The setup can run synchronously
174/// or asynchronously using an `AsyncGroup` if operations are time-consuming.
175///
176/// If any data source fails to set up, this function returns an `Err` with
177/// `DataHubError::FailToSetupGlobalDataSrcs`, containing a map of the names
178/// of the failed data sources and their corresponding `Err` objects. In such a case,
179/// all global data sources that were successfully set up are also closed and dropped.
180///
181/// If all data source setups are successful, the `Result::Ok` which contains an object
182/// is returned. This object is designed to close and drop global data sources when
183/// it's dropped.
184/// Thanks to Rust's ownership mechanism, this ensures that the global data sources are
185/// automatically cleaned up when the return value goes out of scope.
186///
187/// **NOTE:** Do not receive the `Result` or its inner object into an anonymous
188/// variable using `let _ = ...`.
189/// If you do, the inner object is dropped immediately at that point.
190///
191/// # Returns
192///
193/// * `Result<AutoShutdown, Err>`: An `AutoShutdown` if all global data sources are
194///   set up successfully, or an `Err` if any setup fails.
195pub async fn setup_async() -> Result<AutoShutdown, Err> {
196    #[cfg(not(test))]
197    let ok = GLOBAL_DATA_SRCS_FIXED.set(()).is_ok();
198    #[cfg(test)]
199    let ok = GLOBAL_DATA_SRCS_FIXED
200        .compare_exchange(
201            false,
202            true,
203            sync::atomic::Ordering::Relaxed,
204            sync::atomic::Ordering::Relaxed,
205        )
206        .is_ok();
207
208    if ok {
209        #[allow(static_mut_refs)]
210        let err_map = unsafe { GLOBAL_DATA_SRC_LIST.setup_data_srcs_async() }.await;
211        if err_map.len() > 0 {
212            #[allow(static_mut_refs)]
213            unsafe {
214                GLOBAL_DATA_SRC_LIST.close_and_drop_data_srcs();
215            }
216            return Err(Err::new(DataHubError::FailToSetupGlobalDataSrcs {
217                errors: err_map,
218            }));
219        }
220    }
221
222    Ok(AutoShutdown {})
223}
224
225/// A utility struct that ensure to close and drop global data sources when it goes out scope.
226///
227/// This struct implements the `Drop` trait, and its `drop` method handles the closing and
228/// dropping of registered global data sources.
229/// Therefore this ensures that these operations are automatically executed at the end of
230/// the scope.
231///
232/// **NOTE:** Do not receive this an instance of this struct into an anonymous variable
233/// (`let _ = ...`), because an anonymous variable dropped immediately at that point.
234pub struct AutoShutdown {}
235
236impl Drop for AutoShutdown {
237    fn drop(&mut self) {
238        #[allow(static_mut_refs)]
239        unsafe {
240            GLOBAL_DATA_SRC_LIST.close_and_drop_data_srcs();
241        }
242    }
243}
244
245/// The struct that acts as a central hub for data input/output operations, integrating
246/// multiple *Data* traits (which are passed to business logic functions as their arguments) with
247/// `DataAcc` traits (which implement default data I/O methods for external services).
248///
249/// It facilitates data access by providing `DataConn` objects, created from
250/// both global data sources (registered via the global `uses` function) and
251/// session-local data sources (registered via `DataHub::uses` method).
252///
253/// The `DataHub` is capable of performing aggregated transactional operations
254/// on all `DataConn` objects created from its registered `DataSrc` instances.
255pub struct DataHub {
256    local_data_src_list: DataSrcList,
257    data_src_map: HashMap<String, *mut DataSrcContainer>,
258    data_conn_list: DataConnList,
259    data_conn_map: HashMap<String, *mut DataConnContainer>,
260    fixed: bool,
261}
262
263impl DataHub {
264    /// Creates a new `DataHub` instance.
265    ///
266    /// Upon creation, it attempts to "fix" the global data sources (making them immutable
267    /// for further registration) and copies references to already set-up global data
268    /// sources into its internal map for quick access.
269    pub fn new() -> Self {
270        #[cfg(not(test))]
271        let _ = GLOBAL_DATA_SRCS_FIXED.set(());
272        #[cfg(test)]
273        GLOBAL_DATA_SRCS_FIXED.store(true, sync::atomic::Ordering::Relaxed);
274
275        let mut data_src_map = HashMap::new();
276
277        #[allow(static_mut_refs)]
278        unsafe {
279            GLOBAL_DATA_SRC_LIST.copy_container_ptrs_did_setup_into(&mut data_src_map);
280        }
281
282        Self {
283            local_data_src_list: DataSrcList::new(true),
284            data_src_map,
285            data_conn_list: DataConnList::new(),
286            data_conn_map: HashMap::new(),
287            fixed: false,
288        }
289    }
290
291    /// Registers a session-local data source with this `DataHub` instance.
292    ///
293    /// This method is similar to the global `uses` function but registers a data source
294    /// that is local to this specific `DataHub` session. Once the `DataHub`'s state is
295    /// "fixed" (while `run!` or `txn!` macro is executing), further calls
296    /// to `uses` are ignored. However, after `run!` or `txn!` completes, the `DataHub`'s
297    /// "fixed" state is reset, allowing for new data sources to be registered or removed
298    /// via `disuses` method in subsequent operations.
299    ///
300    /// # Parameters
301    ///
302    /// * `name`: The unique name for the local data source.
303    /// * `ds`: The `DataSrc` instance to register.
304    pub fn uses<S, C>(&mut self, name: &str, ds: S)
305    where
306        S: DataSrc<C>,
307        C: DataConn + 'static,
308    {
309        if self.fixed {
310            return;
311        }
312
313        self.local_data_src_list.add_data_src(name.to_string(), ds);
314    }
315
316    /// Unregisters and drops a session-local data source by its name.
317    ///
318    /// This method removes a data source that was previously registered via `DataHub::uses`.
319    /// This operation is ignored if the `DataHub`'s state is already "fixed".
320    ///
321    /// # Parameters
322    ///
323    /// * `name`: The name of the local data source to unregister.
324    pub fn disuses(&mut self, name: &str) {
325        if self.fixed {
326            return;
327        }
328
329        self.data_src_map
330            .retain(|nm, p| unsafe { !(*(*p)).local } || nm != name);
331        self.local_data_src_list
332            .remove_and_drop_container_ptr_did_setup_by_name(name);
333        self.local_data_src_list
334            .remove_and_drop_container_ptr_not_setup_by_name(name);
335    }
336
337    #[doc(hidden)]
338    pub fn begin(&mut self) -> Result<(), Err> {
339        self.fixed = true;
340
341        let err_map = self.local_data_src_list.setup_data_srcs();
342
343        self.local_data_src_list
344            .copy_container_ptrs_did_setup_into(&mut self.data_src_map);
345
346        if err_map.len() > 0 {
347            return Err(Err::new(DataHubError::FailToSetupLocalDataSrcs {
348                errors: err_map,
349            }));
350        }
351
352        Ok(())
353    }
354
355    #[doc(hidden)]
356    pub async fn begin_async(&mut self) -> Result<(), Err> {
357        self.fixed = true;
358
359        let err_map = self.local_data_src_list.setup_data_srcs_async().await;
360
361        self.local_data_src_list
362            .copy_container_ptrs_did_setup_into(&mut self.data_src_map);
363
364        if err_map.len() > 0 {
365            return Err(Err::new(DataHubError::FailToSetupLocalDataSrcs {
366                errors: err_map,
367            }));
368        }
369
370        Ok(())
371    }
372
373    #[doc(hidden)]
374    pub fn commit(&mut self) -> Result<(), Err> {
375        let mut err_map = HashMap::new();
376
377        let mut ag = AsyncGroup::new();
378
379        let mut ptr = self.data_conn_list.head();
380        while !ptr.is_null() {
381            let pre_commit_fn = unsafe { (*ptr).pre_commit_fn };
382            let name = unsafe { &(*ptr).name };
383            let next = unsafe { (*ptr).next };
384
385            ag.name = name;
386
387            if let Err(err) = pre_commit_fn(ptr, &mut ag) {
388                err_map.insert(name.to_string(), err);
389                break;
390            }
391
392            ptr = next;
393        }
394
395        ag.join_and_collect_errors(&mut err_map);
396
397        if !err_map.is_empty() {
398            return Err(Err::new(DataHubError::FailToPreCommitDataConn {
399                errors: err_map,
400            }));
401        }
402
403        let mut ag = AsyncGroup::new();
404
405        let mut ptr = self.data_conn_list.head();
406        while !ptr.is_null() {
407            let commit_fn = unsafe { (*ptr).commit_fn };
408            let name = unsafe { &(*ptr).name };
409            let next = unsafe { (*ptr).next };
410
411            ag.name = name;
412
413            if let Err(err) = commit_fn(ptr, &mut ag) {
414                err_map.insert(name.to_string(), err);
415                break;
416            }
417
418            ptr = next;
419        }
420
421        ag.join_and_collect_errors(&mut err_map);
422
423        if !err_map.is_empty() {
424            return Err(Err::new(DataHubError::FailToCommitDataConn {
425                errors: err_map,
426            }));
427        }
428
429        let mut ag = AsyncGroup::new();
430
431        let mut ptr = self.data_conn_list.head();
432        while !ptr.is_null() {
433            let post_commit_fn = unsafe { (*ptr).post_commit_fn };
434            let name = unsafe { &(*ptr).name };
435            let next = unsafe { (*ptr).next };
436
437            ag.name = name;
438
439            post_commit_fn(ptr, &mut ag);
440
441            ptr = next;
442        }
443
444        ag.join_and_ignore_errors();
445
446        return Ok(());
447    }
448
449    #[doc(hidden)]
450    pub async fn commit_async(&mut self) -> Result<(), Err> {
451        let mut err_map = HashMap::new();
452
453        let mut ag = AsyncGroup::new();
454
455        let mut ptr = self.data_conn_list.head();
456        while !ptr.is_null() {
457            let pre_commit_fn = unsafe { (*ptr).pre_commit_fn };
458            let name = unsafe { &(*ptr).name };
459            let next = unsafe { (*ptr).next };
460
461            ag.name = name;
462
463            if let Err(err) = pre_commit_fn(ptr, &mut ag) {
464                err_map.insert(name.to_string(), err);
465                break;
466            }
467
468            ptr = next;
469        }
470
471        ag.join_and_collect_errors_async(&mut err_map).await;
472
473        if !err_map.is_empty() {
474            return Err(Err::new(DataHubError::FailToPreCommitDataConn {
475                errors: err_map,
476            }));
477        }
478
479        let mut ag = AsyncGroup::new();
480
481        let mut ptr = self.data_conn_list.head();
482        while !ptr.is_null() {
483            let commit_fn = unsafe { (*ptr).commit_fn };
484            let name = unsafe { &(*ptr).name };
485            let next = unsafe { (*ptr).next };
486
487            ag.name = name;
488
489            if let Err(err) = commit_fn(ptr, &mut ag) {
490                err_map.insert(name.to_string(), err);
491                break;
492            }
493
494            ptr = next;
495        }
496
497        ag.join_and_collect_errors_async(&mut err_map).await;
498
499        if !err_map.is_empty() {
500            return Err(Err::new(DataHubError::FailToCommitDataConn {
501                errors: err_map,
502            }));
503        }
504
505        let mut ag = AsyncGroup::new();
506
507        let mut ptr = self.data_conn_list.head();
508        while !ptr.is_null() {
509            let post_commit_fn = unsafe { (*ptr).post_commit_fn };
510            let name = unsafe { &(*ptr).name };
511            let next = unsafe { (*ptr).next };
512
513            ag.name = name;
514
515            post_commit_fn(ptr, &mut ag);
516
517            ptr = next;
518        }
519
520        ag.join_and_ignore_errors_async().await;
521
522        return Ok(());
523    }
524
525    #[doc(hidden)]
526    pub fn rollback(&mut self) {
527        let mut ag = AsyncGroup::new();
528
529        let mut ptr = self.data_conn_list.head();
530        while !ptr.is_null() {
531            let should_force_back_fn = unsafe { (*ptr).should_force_back_fn };
532            let force_back_fn = unsafe { (*ptr).force_back_fn };
533            let rollback_fn = unsafe { (*ptr).rollback_fn };
534            let name = unsafe { &(*ptr).name };
535            let next = unsafe { (*ptr).next };
536
537            ag.name = name;
538
539            if should_force_back_fn(ptr) {
540                force_back_fn(ptr, &mut ag);
541            } else {
542                rollback_fn(ptr, &mut ag);
543            }
544
545            ptr = next;
546        }
547
548        ag.join_and_ignore_errors();
549    }
550
551    #[doc(hidden)]
552    pub async fn rollback_async(&mut self) {
553        let mut ag = AsyncGroup::new();
554
555        let mut ptr = self.data_conn_list.head();
556        while !ptr.is_null() {
557            let should_force_back_fn = unsafe { (*ptr).should_force_back_fn };
558            let force_back_fn = unsafe { (*ptr).force_back_fn };
559            let rollback_fn = unsafe { (*ptr).rollback_fn };
560            let name = unsafe { &(*ptr).name };
561            let next = unsafe { (*ptr).next };
562
563            ag.name = name;
564
565            if should_force_back_fn(ptr) {
566                force_back_fn(ptr, &mut ag);
567            } else {
568                rollback_fn(ptr, &mut ag);
569            }
570
571            ptr = next;
572        }
573
574        ag.join_and_ignore_errors_async().await;
575    }
576
577    #[doc(hidden)]
578    pub fn end(&mut self) {
579        self.data_conn_map.clear();
580        self.data_conn_list.close_and_drop_data_conns();
581        self.fixed = false;
582    }
583
584    /// Retrieves a mutable reference to a `DataConn` object by name, creating it if necessary.
585    ///
586    /// This is the core method used by `DataAcc` implementations to obtain connections
587    /// to external data services. It first checks if a `DataConn` with the given name
588    /// already exists in the `DataHub`'s session. If not, it attempts to find a
589    /// corresponding `DataSrc` and create a new `DataConn` from it.
590    ///
591    /// # Type Parameters
592    ///
593    /// * `C`: The concrete type of `DataConn` expected.
594    ///
595    /// # Parameters
596    ///
597    /// * `name`: The name of the data source/connection to retrieve.
598    ///
599    /// # Returns
600    ///
601    /// * `Result<&mut C, Err>`: A mutable reference to the `DataConn` instance if successful,
602    ///   or an `Err` if the data source is not found, or if the retrieved/created `DataConn`
603    ///   cannot be cast to the specified type `C`.
604    pub fn get_data_conn<C>(&mut self, name: &str) -> Result<&mut C, Err>
605    where
606        C: DataConn + 'static,
607    {
608        match self.data_conn_map.get(name) {
609            Some(conn_ptr) => {
610                let type_id = any::TypeId::of::<C>();
611                let is_fn = unsafe { (*(*conn_ptr)).is_fn };
612                if !is_fn(type_id) {
613                    return Err(Err::new(DataHubError::FailToCastDataConn {
614                        name: name.to_string(),
615                        cast_to_type: any::type_name::<C>(),
616                    }));
617                }
618                let typed_ptr = (*conn_ptr) as *mut DataConnContainer<C>;
619                return Ok(unsafe { &mut ((*typed_ptr).data_conn) });
620            }
621            None => match self.data_src_map.get(name) {
622                Some(src_ptr) => {
623                    let type_id = any::TypeId::of::<C>();
624                    let is_data_conn_fn = unsafe { (*(*src_ptr)).is_data_conn_fn };
625                    if !is_data_conn_fn(type_id) {
626                        return Err(Err::new(DataHubError::FailToCastDataConn {
627                            name: name.to_string(),
628                            cast_to_type: any::type_name::<C>(),
629                        }));
630                    }
631
632                    let create_data_conn_fn = unsafe { (*(*src_ptr)).create_data_conn_fn };
633                    match create_data_conn_fn(*src_ptr) {
634                        Ok(boxed) => {
635                            let raw_ptr = Box::into_raw(boxed);
636                            let conn_ptr = raw_ptr.cast::<DataConnContainer>();
637
638                            self.data_conn_list.append_container_ptr(conn_ptr);
639                            self.data_conn_map.insert(name.to_string(), conn_ptr);
640
641                            let typed_ptr = raw_ptr.cast::<DataConnContainer<C>>();
642                            return Ok(unsafe { &mut (*typed_ptr).data_conn });
643                        }
644                        Err(err) => {
645                            return Err(Err::with_source(
646                                DataHubError::FailToCreateDataConn {
647                                    name: name.to_string(),
648                                    data_conn_type: any::type_name::<C>(),
649                                },
650                                err,
651                            ));
652                        }
653                    }
654                }
655                None => {
656                    return Err(Err::new(DataHubError::NoDataSrcToCreateDataConn {
657                        name: name.to_string(),
658                        data_conn_type: any::type_name::<C>(),
659                    }));
660                }
661            },
662        }
663    }
664}
665
666impl Drop for DataHub {
667    fn drop(&mut self) {
668        self.data_conn_map.clear();
669        self.data_conn_list.close_and_drop_data_conns();
670
671        self.data_src_map.clear();
672        self.local_data_src_list.close_and_drop_data_srcs();
673    }
674}
675
676/// Executes a given logic function without transaction control.
677///
678/// This macro sets up local data sources, runs the provided closure,
679/// and then cleans up the `DataHub`'s session resources. It does not
680/// perform commit or rollback operations.
681///
682/// # Parameters
683///
684/// * `logic_fn`: A closure that encapsulates the business logic to be executed.
685///   It takes a mutable reference to `DataHub` as an argument.
686/// * `hub`: A hub struct instance for data input/output operations.
687///
688/// # Returns
689///
690/// * `Result<(), Err>`: The result of the logic function's execution,
691///   or an error if executing `logic_fn` fails.
692#[macro_export]
693macro_rules! run {
694    ($logic_fn:expr, $hub:expr) => {{
695        let hub = &mut ($hub);
696        let mut r = hub.begin();
697        if r.is_ok() {
698            r = ($logic_fn)(hub);
699        }
700        hub.end();
701        r
702    }};
703}
704
705/// Executes a given logic function within a transaction.
706///
707/// This macro first sets up local data sources, then runs the provided closure.
708/// If the closure returns `Ok`, it attempts to commit all changes. If the commit fails,
709/// or if the logic function itself returns an `Err`, a rollback operation
710/// is performed. After succeeding `pre_commit` and `commit` methods of all `DataConn`s,
711/// `post_commit` methods of all `DataConn`s are executed.
712/// Finally, it cleans up the `DataHub`'s session resources.
713///
714/// # Parameters
715///
716/// * `logic_fn`: A closure that encapsulates the business logic to be executed.
717///   It takes a mutable reference to `DataHub` as an argument.
718/// * `hub`: A hub struct instance for data input/output operations.
719///
720/// # Returns
721///
722/// * `Result<(), Err>`: The final result of the transaction (success or failure of
723///   logic/commit), or an error if executing `logic_fn` fails.
724#[macro_export]
725macro_rules! txn {
726    ($logic_fn:expr, $hub:expr) => {{
727        let hub = &mut ($hub);
728        let mut r = hub.begin();
729        if r.is_ok() {
730            r = ($logic_fn)(hub);
731        }
732        if r.is_ok() {
733            r = hub.commit();
734        }
735        if r.is_err() {
736            hub.rollback();
737        }
738        hub.end();
739        r
740    }};
741}
742
743/// Executes asynchronously a given logic function without transaction control.
744///
745/// This macro sets up local data sources, runs the provided closure,
746/// and then cleans up the `DataHub`'s session resources. It does not
747/// perform commit or rollback operations.
748///
749/// # Parameters
750///
751/// * `logic_fn`: A closure that encapsulates the business logic to be executed.
752///   It takes a mutable reference to `DataHub` as an argument.
753/// * `hub`: A hub struct instance for data input/output operations.
754///
755/// # Returns
756///
757/// * `Result<(), Err>`: The result of the logic function's execution,
758///   or an error if executing `logic_fn` fails.
759#[macro_export]
760macro_rules! run_async {
761    ($logic_fn:expr, $hub:expr) => {
762        async {
763            let hub = &mut ($hub);
764            let mut r = hub.begin_async().await;
765            if r.is_ok() {
766                r = ($logic_fn)(hub).await;
767            }
768            hub.end();
769            r
770        }
771    };
772}
773
774/// Executes asynchronously a given logic function within a transaction.
775///
776/// This macro first sets up local data sources, then runs the provided closure.
777/// If the closure returns `Ok`, it attempts to commit all changes. If the commit fails,
778/// or if the logic function itself returns an `Err`, a rollback operation
779/// is performed. After succeeding `pre_commit` and `commit` methods of all `DataConn`s,
780/// `post_commit` methods of all `DataConn`s are executed.
781/// Finally, it cleans up the `DataHub`'s session resources.
782///
783/// # Parameters
784///
785/// * `logic_fn`: A closure that encapsulates the business logic to be executed.
786///   It takes a mutable reference to `DataHub` as an argument.
787/// * `hub`: A hub struct instance for data input/output operations.
788///
789/// # Returns
790///
791/// * `Result<(), Err>`: The final result of the transaction (success or failure of
792///   logic/commit), or an error if executing `logic_fn` fails.
793#[macro_export]
794macro_rules! txn_async {
795    ($logic_fn:expr, $hub:expr) => {
796        async {
797            let hub = &mut ($hub);
798            let mut r = hub.begin_async().await;
799            if r.is_ok() {
800                r = ($logic_fn)(hub).await;
801            }
802            if r.is_ok() {
803                r = hub.commit_async().await;
804            }
805            if r.is_err() {
806                hub.rollback_async().await;
807            }
808            hub.end();
809            r
810        }
811    };
812}
813
814#[cfg(test)]
815pub(crate) fn clear_global_data_srcs_fixed() {
816    GLOBAL_DATA_SRCS_FIXED.store(false, sync::atomic::Ordering::Relaxed);
817}
818
819#[cfg(test)]
820pub(crate) static TEST_SEQ: sync::LazyLock<sync::Mutex<()>> =
821    sync::LazyLock::new(|| sync::Mutex::new(()));
822
823#[cfg(test)]
824mod tests_data_hub {
825    use super::*;
826    use std::sync::{Arc, Mutex};
827    use tokio::time;
828
829    #[derive(PartialEq, Copy, Clone)]
830    enum Fail {
831        Not,
832        Setup,
833        CreateDataConn,
834        Commit,
835        PreCommit,
836    }
837
838    struct SyncDataSrc {
839        id: i8,
840        fail: Fail,
841        logger: Arc<Mutex<Vec<String>>>,
842    }
843
844    impl SyncDataSrc {
845        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: Fail) -> Self {
846            Self { id, fail, logger }
847        }
848    }
849
850    impl Drop for SyncDataSrc {
851        fn drop(&mut self) {
852            let mut logger = self.logger.lock().unwrap();
853            logger.push(format!("SyncDataSrc {} dropped", self.id));
854        }
855    }
856
857    impl DataSrc<SyncDataConn> for SyncDataSrc {
858        fn setup(&mut self, _ag: &mut AsyncGroup) -> Result<(), Err> {
859            let mut logger = self.logger.lock().unwrap();
860            if self.fail == Fail::Setup {
861                logger.push(format!("SyncDataSrc {} failed to setup", self.id));
862                return Err(Err::new("XXX".to_string()));
863            }
864            logger.push(format!("SyncDataSrc {} setupped", self.id));
865            Ok(())
866        }
867
868        fn close(&mut self) {
869            let mut logger = self.logger.lock().unwrap();
870            logger.push(format!("SyncDataSrc {} closed", self.id));
871        }
872
873        fn create_data_conn(&mut self) -> Result<Box<SyncDataConn>, Err> {
874            let mut logger = self.logger.lock().unwrap();
875            if self.fail == Fail::CreateDataConn {
876                logger.push(format!(
877                    "SyncDataSrc {} failed to create a DataConn",
878                    self.id
879                ));
880                return Err(Err::new("xxx".to_string()));
881            }
882            logger.push(format!("SyncDataSrc {} created DataConn", self.id));
883            let conn = SyncDataConn::new(self.id, self.logger.clone(), self.fail);
884            Ok(Box::new(conn))
885        }
886    }
887
888    struct AsyncDataSrc {
889        id: i8,
890        fail: Fail,
891        logger: Arc<Mutex<Vec<String>>>,
892    }
893
894    impl AsyncDataSrc {
895        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: Fail) -> Self {
896            Self { id, fail, logger }
897        }
898    }
899
900    impl Drop for AsyncDataSrc {
901        fn drop(&mut self) {
902            let mut logger = self.logger.lock().unwrap();
903            logger.push(format!("AsyncDataSrc {} dropped", self.id));
904        }
905    }
906
907    impl DataSrc<AsyncDataConn> for AsyncDataSrc {
908        fn setup(&mut self, ag: &mut AsyncGroup) -> Result<(), Err> {
909            let fail = self.fail;
910            let logger = self.logger.clone();
911            let id = self.id;
912
913            ag.add(async move {
914                // The `.await` must be executed outside the Mutex lock.
915                let _ = time::sleep(time::Duration::from_millis(100)).await;
916
917                if fail == Fail::Setup {
918                    logger
919                        .lock()
920                        .unwrap()
921                        .push(format!("AsyncDataSrc {} failed to setup", id));
922                    return Err(Err::new("YYY".to_string()));
923                }
924
925                logger
926                    .lock()
927                    .unwrap()
928                    .push(format!("AsyncDataSrc {} setupped", id));
929                Ok(())
930            });
931            Ok(())
932        }
933
934        fn close(&mut self) {
935            let mut logger = self.logger.lock().unwrap();
936            logger.push(format!("AsyncDataSrc {} closed", self.id));
937        }
938
939        fn create_data_conn(&mut self) -> Result<Box<AsyncDataConn>, Err> {
940            let mut logger = self.logger.lock().unwrap();
941            logger.push(format!("AsyncDataSrc {} created DataConn", self.id));
942            let conn = AsyncDataConn::new(self.id, self.logger.clone(), self.fail);
943            Ok(Box::new(conn))
944        }
945    }
946
947    struct SyncDataConn {
948        id: i8,
949        committed: bool,
950        fail: Fail,
951        logger: Arc<Mutex<Vec<String>>>,
952    }
953
954    impl SyncDataConn {
955        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: Fail) -> Self {
956            Self {
957                id,
958                committed: false,
959                fail,
960                logger,
961            }
962        }
963    }
964
965    impl Drop for SyncDataConn {
966        fn drop(&mut self) {
967            let mut logger = self.logger.lock().unwrap();
968            logger.push(format!("SyncDataConn {} dropped", self.id));
969        }
970    }
971
972    impl DataConn for SyncDataConn {
973        fn commit(&mut self, _ag: &mut AsyncGroup) -> Result<(), Err> {
974            let mut logger = self.logger.lock().unwrap();
975            if self.fail == Fail::Commit {
976                logger.push(format!("SyncDataConn {} failed to commit", self.id));
977                return Err(Err::new("ZZZ".to_string()));
978            }
979            self.committed = true;
980            logger.push(format!("SyncDataConn {} committed", self.id));
981            Ok(())
982        }
983
984        fn pre_commit(&mut self, _ag: &mut AsyncGroup) -> Result<(), Err> {
985            let mut logger = self.logger.lock().unwrap();
986            if self.fail == Fail::PreCommit {
987                logger.push(format!("SyncDataConn {} failed to pre commit", self.id));
988                return Err(Err::new("zzz".to_string()));
989            }
990            logger.push(format!("SyncDataConn {} pre committed", self.id));
991            Ok(())
992        }
993
994        fn post_commit(&mut self, _ag: &mut AsyncGroup) {
995            let mut logger = self.logger.lock().unwrap();
996            logger.push(format!("SyncDataConn {} post committed", self.id));
997        }
998
999        fn should_force_back(&self) -> bool {
1000            self.committed
1001        }
1002
1003        fn rollback(&mut self, _ag: &mut AsyncGroup) {
1004            let mut logger = self.logger.lock().unwrap();
1005            logger.push(format!("SyncDataConn {} rollbacked", self.id));
1006        }
1007
1008        fn force_back(&mut self, _ag: &mut AsyncGroup) {
1009            let mut logger = self.logger.lock().unwrap();
1010            logger.push(format!("SyncDataConn {} forced back", self.id));
1011        }
1012
1013        fn close(&mut self) {
1014            let mut logger = self.logger.lock().unwrap();
1015            logger.push(format!("SyncDataConn {} closed", self.id));
1016        }
1017    }
1018
1019    struct AsyncDataConn {
1020        id: i8,
1021        committed: bool,
1022        fail: Fail,
1023        logger: Arc<Mutex<Vec<String>>>,
1024    }
1025
1026    impl AsyncDataConn {
1027        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: Fail) -> Self {
1028            Self {
1029                id,
1030                committed: false,
1031                fail,
1032                logger,
1033            }
1034        }
1035    }
1036
1037    impl Drop for AsyncDataConn {
1038        fn drop(&mut self) {
1039            let mut logger = self.logger.lock().unwrap();
1040            logger.push(format!("AsyncDataConn {} dropped", self.id));
1041        }
1042    }
1043
1044    impl DataConn for AsyncDataConn {
1045        fn commit(&mut self, _ag: &mut AsyncGroup) -> Result<(), Err> {
1046            let mut logger = self.logger.lock().unwrap();
1047            if self.fail == Fail::Commit {
1048                logger.push(format!("AsyncDataConn {} failed to commit", self.id));
1049                return Err(Err::new("VVV".to_string()));
1050            }
1051            self.committed = true;
1052            logger.push(format!("AsyncDataConn {} committed", self.id));
1053            Ok(())
1054        }
1055
1056        fn pre_commit(&mut self, _ag: &mut AsyncGroup) -> Result<(), Err> {
1057            let mut logger = self.logger.lock().unwrap();
1058            if self.fail == Fail::PreCommit {
1059                logger.push(format!("AsyncDataConn {} failed to pre commit", self.id));
1060                return Err(Err::new("vvv".to_string()));
1061            }
1062            logger.push(format!("AsyncDataConn {} pre committed", self.id));
1063            Ok(())
1064        }
1065
1066        fn post_commit(&mut self, _ag: &mut AsyncGroup) {
1067            let mut logger = self.logger.lock().unwrap();
1068            logger.push(format!("AsyncDataConn {} post committed", self.id));
1069        }
1070
1071        fn should_force_back(&self) -> bool {
1072            self.committed
1073        }
1074
1075        fn rollback(&mut self, _ag: &mut AsyncGroup) {
1076            let mut logger = self.logger.lock().unwrap();
1077            logger.push(format!("AsyncDataConn {} rollbacked", self.id));
1078        }
1079
1080        fn force_back(&mut self, _ag: &mut AsyncGroup) {
1081            let mut logger = self.logger.lock().unwrap();
1082            logger.push(format!("AsyncDataConn {} forced back", self.id));
1083        }
1084
1085        fn close(&mut self) {
1086            let mut logger = self.logger.lock().unwrap();
1087            logger.push(format!("AsyncDataConn {} closed", self.id));
1088        }
1089    }
1090
1091    mod tests_of_global_functions {
1092        use super::*;
1093
1094        #[test]
1095        fn test_setup_and_shutdown() {
1096            let _unused = TEST_SEQ.lock().unwrap();
1097            clear_global_data_srcs_fixed();
1098
1099            #[allow(static_mut_refs)]
1100            unsafe {
1101                let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1102                assert!(ptr.is_null());
1103
1104                let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1105                assert!(ptr.is_null());
1106            }
1107
1108            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1109
1110            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
1111            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
1112
1113            #[allow(static_mut_refs)]
1114            unsafe {
1115                let mut ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1116                assert!(!ptr.is_null());
1117                assert_eq!((*ptr).name, "foo");
1118                ptr = (*ptr).next;
1119                assert!(!ptr.is_null());
1120                assert_eq!((*ptr).name, "bar");
1121                ptr = (*ptr).next;
1122                assert!(ptr.is_null());
1123
1124                let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1125                assert!(ptr.is_null());
1126            }
1127
1128            {
1129                let result = setup();
1130                assert!(result.is_ok());
1131
1132                #[allow(static_mut_refs)]
1133                unsafe {
1134                    let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1135                    assert!(ptr.is_null());
1136
1137                    let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1138                    assert!(!ptr.is_null());
1139                    assert_eq!((*ptr).name, "foo");
1140                    ptr = (*ptr).next;
1141                    assert!(!ptr.is_null());
1142                    assert_eq!((*ptr).name, "bar");
1143                    ptr = (*ptr).next;
1144                    assert!(ptr.is_null());
1145                }
1146            }
1147
1148            #[allow(static_mut_refs)]
1149            unsafe {
1150                let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1151                assert!(ptr.is_null());
1152
1153                let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1154                assert!(ptr.is_null());
1155            }
1156
1157            assert_eq!(
1158                *logger.lock().unwrap(),
1159                vec![
1160                    "SyncDataSrc 2 setupped",
1161                    "AsyncDataSrc 1 setupped",
1162                    "SyncDataSrc 2 closed",
1163                    "SyncDataSrc 2 dropped",
1164                    "AsyncDataSrc 1 closed",
1165                    "AsyncDataSrc 1 dropped",
1166                ],
1167            );
1168        }
1169
1170        #[test]
1171        fn test_shutdown_later() {
1172            let _unused = TEST_SEQ.lock().unwrap();
1173            clear_global_data_srcs_fixed();
1174
1175            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1176
1177            {
1178                #[allow(static_mut_refs)]
1179                unsafe {
1180                    let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1181                    assert!(ptr.is_null());
1182
1183                    let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1184                    assert!(ptr.is_null());
1185                }
1186
1187                uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
1188                uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
1189
1190                #[allow(static_mut_refs)]
1191                unsafe {
1192                    let mut ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1193                    assert!(!ptr.is_null());
1194                    assert_eq!((*ptr).name, "foo");
1195                    ptr = (*ptr).next;
1196                    assert!(!ptr.is_null());
1197                    assert_eq!((*ptr).name, "bar");
1198                    ptr = (*ptr).next;
1199                    assert!(ptr.is_null());
1200
1201                    let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1202                    assert!(ptr.is_null());
1203                }
1204
1205                let result = setup();
1206                assert!(result.is_ok());
1207
1208                #[allow(static_mut_refs)]
1209                unsafe {
1210                    let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1211                    assert!(ptr.is_null());
1212
1213                    let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1214                    assert!(!ptr.is_null());
1215                    assert_eq!((*ptr).name, "foo");
1216                    ptr = (*ptr).next;
1217                    assert!(!ptr.is_null());
1218                    assert_eq!((*ptr).name, "bar");
1219                    ptr = (*ptr).next;
1220                    assert!(ptr.is_null());
1221                }
1222            }
1223
1224            #[allow(static_mut_refs)]
1225            unsafe {
1226                let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1227                assert!(ptr.is_null());
1228
1229                let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1230                assert!(ptr.is_null());
1231            }
1232
1233            assert_eq!(
1234                *logger.lock().unwrap(),
1235                vec![
1236                    "SyncDataSrc 2 setupped",
1237                    "AsyncDataSrc 1 setupped",
1238                    "SyncDataSrc 2 closed",
1239                    "SyncDataSrc 2 dropped",
1240                    "AsyncDataSrc 1 closed",
1241                    "AsyncDataSrc 1 dropped",
1242                ],
1243            );
1244        }
1245
1246        #[test]
1247        fn test_fail_to_setup() {
1248            let _unused = TEST_SEQ.lock().unwrap();
1249            clear_global_data_srcs_fixed();
1250
1251            #[allow(static_mut_refs)]
1252            unsafe {
1253                let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1254                assert!(ptr.is_null());
1255
1256                let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1257                assert!(ptr.is_null());
1258            }
1259
1260            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1261
1262            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Setup));
1263            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Setup));
1264
1265            #[allow(static_mut_refs)]
1266            unsafe {
1267                let mut ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1268                assert!(!ptr.is_null());
1269                assert_eq!((*ptr).name, "foo");
1270                ptr = (*ptr).next;
1271                assert!(!ptr.is_null());
1272                assert_eq!((*ptr).name, "bar");
1273                ptr = (*ptr).next;
1274                assert!(ptr.is_null());
1275
1276                let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1277                assert!(ptr.is_null());
1278            }
1279
1280            match setup() {
1281                Ok(_) => panic!(),
1282                Err(err) => match err.reason::<DataHubError>() {
1283                    Ok(r) => match r {
1284                        DataHubError::FailToSetupGlobalDataSrcs { errors } => {
1285                            let err = errors.get("foo").unwrap();
1286                            match err.reason::<String>() {
1287                                Ok(s) => assert_eq!(s, "YYY"),
1288                                Err(_) => panic!(),
1289                            }
1290                            let err = errors.get("bar").unwrap();
1291                            match err.reason::<String>() {
1292                                Ok(s) => assert_eq!(s, "XXX"),
1293                                Err(_) => panic!(),
1294                            }
1295                        }
1296                        _ => panic!(),
1297                    },
1298                    Err(_) => panic!(),
1299                },
1300            }
1301
1302            #[allow(static_mut_refs)]
1303            unsafe {
1304                let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1305                assert!(ptr.is_null());
1306
1307                let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1308                assert!(ptr.is_null());
1309            }
1310
1311            assert_eq!(
1312                logger.lock().unwrap().clone(),
1313                vec![
1314                    "SyncDataSrc 2 failed to setup",
1315                    "AsyncDataSrc 1 failed to setup",
1316                    "SyncDataSrc 2 dropped",
1317                    "AsyncDataSrc 1 dropped",
1318                ]
1319            );
1320        }
1321
1322        #[test]
1323        fn test_cannot_add_global_data_src_after_setup() {
1324            let _unused = TEST_SEQ.lock().unwrap();
1325            clear_global_data_srcs_fixed();
1326
1327            #[allow(static_mut_refs)]
1328            unsafe {
1329                let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1330                assert!(ptr.is_null());
1331
1332                let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1333                assert!(ptr.is_null());
1334            }
1335
1336            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1337
1338            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
1339
1340            #[allow(static_mut_refs)]
1341            unsafe {
1342                let mut ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1343                assert!(!ptr.is_null());
1344                assert_eq!((*ptr).name, "foo");
1345                ptr = (*ptr).next;
1346                assert!(ptr.is_null());
1347
1348                let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1349                assert!(ptr.is_null());
1350            }
1351
1352            {
1353                let result = setup();
1354                assert!(result.is_ok());
1355
1356                #[allow(static_mut_refs)]
1357                unsafe {
1358                    let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1359                    assert!(ptr.is_null());
1360
1361                    let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1362                    assert!(!ptr.is_null());
1363                    assert_eq!((*ptr).name, "foo");
1364                    ptr = (*ptr).next;
1365                    assert!(ptr.is_null());
1366                }
1367
1368                uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
1369
1370                #[allow(static_mut_refs)]
1371                unsafe {
1372                    let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1373                    assert!(ptr.is_null());
1374
1375                    let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1376                    assert!(!ptr.is_null());
1377                    assert_eq!((*ptr).name, "foo");
1378                    ptr = (*ptr).next;
1379                    assert!(ptr.is_null());
1380                }
1381            }
1382
1383            assert_eq!(
1384                *logger.lock().unwrap(),
1385                vec![
1386                    "AsyncDataSrc 1 setupped",
1387                    "SyncDataSrc 2 dropped",
1388                    "AsyncDataSrc 1 closed",
1389                    "AsyncDataSrc 1 dropped",
1390                ]
1391            );
1392        }
1393
1394        #[test]
1395        fn test_do_nothing_if_executing_setup_twice() {
1396            let _unused = TEST_SEQ.lock().unwrap();
1397            clear_global_data_srcs_fixed();
1398
1399            #[allow(static_mut_refs)]
1400            unsafe {
1401                let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1402                assert!(ptr.is_null());
1403
1404                let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1405                assert!(ptr.is_null());
1406            }
1407
1408            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1409
1410            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
1411
1412            #[allow(static_mut_refs)]
1413            unsafe {
1414                let mut ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1415                assert!(!ptr.is_null());
1416                assert_eq!((*ptr).name, "foo");
1417                ptr = (*ptr).next;
1418                assert!(ptr.is_null());
1419
1420                let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1421                assert!(ptr.is_null());
1422            }
1423
1424            {
1425                let result = setup();
1426                assert!(result.is_ok());
1427
1428                #[allow(static_mut_refs)]
1429                unsafe {
1430                    let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1431                    assert!(ptr.is_null());
1432
1433                    let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1434                    assert!(!ptr.is_null());
1435                    assert_eq!((*ptr).name, "foo");
1436                    ptr = (*ptr).next;
1437                    assert!(ptr.is_null());
1438                }
1439
1440                let result = setup();
1441                assert!(result.is_ok());
1442
1443                #[allow(static_mut_refs)]
1444                unsafe {
1445                    let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1446                    assert!(ptr.is_null());
1447
1448                    let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1449                    assert!(!ptr.is_null());
1450                    assert_eq!((*ptr).name, "foo");
1451                    ptr = (*ptr).next;
1452                    assert!(ptr.is_null());
1453                }
1454            }
1455
1456            assert_eq!(
1457                logger.lock().unwrap().clone(),
1458                vec![
1459                    "AsyncDataSrc 1 setupped",
1460                    "AsyncDataSrc 1 closed",
1461                    "AsyncDataSrc 1 dropped",
1462                ]
1463            );
1464        }
1465
1466        #[tokio::test]
1467        async fn async_test_setup_and_shutdown() {
1468            let _unused = TEST_SEQ.lock().unwrap();
1469            clear_global_data_srcs_fixed();
1470
1471            #[allow(static_mut_refs)]
1472            unsafe {
1473                let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1474                assert!(ptr.is_null());
1475
1476                let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1477                assert!(ptr.is_null());
1478            }
1479
1480            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1481
1482            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
1483            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
1484
1485            #[allow(static_mut_refs)]
1486            unsafe {
1487                let mut ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1488                assert!(!ptr.is_null());
1489                assert_eq!((*ptr).name, "foo");
1490                ptr = (*ptr).next;
1491                assert!(!ptr.is_null());
1492                assert_eq!((*ptr).name, "bar");
1493                ptr = (*ptr).next;
1494                assert!(ptr.is_null());
1495
1496                let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1497                assert!(ptr.is_null());
1498            }
1499
1500            {
1501                let result = setup_async().await;
1502                assert!(result.is_ok());
1503
1504                #[allow(static_mut_refs)]
1505                unsafe {
1506                    let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1507                    assert!(ptr.is_null());
1508
1509                    let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1510                    assert!(!ptr.is_null());
1511                    assert_eq!((*ptr).name, "foo");
1512                    ptr = (*ptr).next;
1513                    assert!(!ptr.is_null());
1514                    assert_eq!((*ptr).name, "bar");
1515                    ptr = (*ptr).next;
1516                    assert!(ptr.is_null());
1517                }
1518            }
1519
1520            #[allow(static_mut_refs)]
1521            unsafe {
1522                let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1523                assert!(ptr.is_null());
1524
1525                let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1526                assert!(ptr.is_null());
1527            }
1528
1529            assert_eq!(
1530                *logger.lock().unwrap(),
1531                vec![
1532                    "SyncDataSrc 2 setupped",
1533                    "AsyncDataSrc 1 setupped",
1534                    "SyncDataSrc 2 closed",
1535                    "SyncDataSrc 2 dropped",
1536                    "AsyncDataSrc 1 closed",
1537                    "AsyncDataSrc 1 dropped",
1538                ],
1539            );
1540        }
1541
1542        #[tokio::test]
1543        async fn async_test_shutdown_later() {
1544            let _unused = TEST_SEQ.lock().unwrap();
1545            clear_global_data_srcs_fixed();
1546
1547            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1548
1549            {
1550                #[allow(static_mut_refs)]
1551                unsafe {
1552                    let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1553                    assert!(ptr.is_null());
1554
1555                    let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1556                    assert!(ptr.is_null());
1557                }
1558
1559                uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
1560                uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
1561
1562                #[allow(static_mut_refs)]
1563                unsafe {
1564                    let mut ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1565                    assert!(!ptr.is_null());
1566                    assert_eq!((*ptr).name, "foo");
1567                    ptr = (*ptr).next;
1568                    assert!(!ptr.is_null());
1569                    assert_eq!((*ptr).name, "bar");
1570                    ptr = (*ptr).next;
1571                    assert!(ptr.is_null());
1572
1573                    let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1574                    assert!(ptr.is_null());
1575                }
1576
1577                let result = setup_async().await;
1578                assert!(result.is_ok());
1579
1580                #[allow(static_mut_refs)]
1581                unsafe {
1582                    let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1583                    assert!(ptr.is_null());
1584
1585                    let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1586                    assert!(!ptr.is_null());
1587                    assert_eq!((*ptr).name, "foo");
1588                    ptr = (*ptr).next;
1589                    assert!(!ptr.is_null());
1590                    assert_eq!((*ptr).name, "bar");
1591                    ptr = (*ptr).next;
1592                    assert!(ptr.is_null());
1593                }
1594            }
1595
1596            #[allow(static_mut_refs)]
1597            unsafe {
1598                let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1599                assert!(ptr.is_null());
1600
1601                let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1602                assert!(ptr.is_null());
1603            }
1604
1605            assert_eq!(
1606                *logger.lock().unwrap(),
1607                vec![
1608                    "SyncDataSrc 2 setupped",
1609                    "AsyncDataSrc 1 setupped",
1610                    "SyncDataSrc 2 closed",
1611                    "SyncDataSrc 2 dropped",
1612                    "AsyncDataSrc 1 closed",
1613                    "AsyncDataSrc 1 dropped",
1614                ],
1615            );
1616        }
1617
1618        #[tokio::test]
1619        async fn async_test_fail_to_setup() {
1620            let _unused = TEST_SEQ.lock().unwrap();
1621            clear_global_data_srcs_fixed();
1622
1623            #[allow(static_mut_refs)]
1624            unsafe {
1625                let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1626                assert!(ptr.is_null());
1627
1628                let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1629                assert!(ptr.is_null());
1630            }
1631
1632            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1633
1634            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Setup));
1635            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Setup));
1636
1637            #[allow(static_mut_refs)]
1638            unsafe {
1639                let mut ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1640                assert!(!ptr.is_null());
1641                assert_eq!((*ptr).name, "foo");
1642                ptr = (*ptr).next;
1643                assert!(!ptr.is_null());
1644                assert_eq!((*ptr).name, "bar");
1645                ptr = (*ptr).next;
1646                assert!(ptr.is_null());
1647
1648                let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1649                assert!(ptr.is_null());
1650            }
1651
1652            match setup_async().await {
1653                Ok(_) => panic!(),
1654                Err(err) => match err.reason::<DataHubError>() {
1655                    Ok(r) => match r {
1656                        DataHubError::FailToSetupGlobalDataSrcs { errors } => {
1657                            let err = errors.get("foo").unwrap();
1658                            match err.reason::<String>() {
1659                                Ok(s) => assert_eq!(s, "YYY"),
1660                                Err(_) => panic!(),
1661                            }
1662                            let err = errors.get("bar").unwrap();
1663                            match err.reason::<String>() {
1664                                Ok(s) => assert_eq!(s, "XXX"),
1665                                Err(_) => panic!(),
1666                            }
1667                        }
1668                        _ => panic!(),
1669                    },
1670                    Err(_) => panic!(),
1671                },
1672            }
1673
1674            #[allow(static_mut_refs)]
1675            unsafe {
1676                let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1677                assert!(ptr.is_null());
1678
1679                let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1680                assert!(ptr.is_null());
1681            }
1682
1683            assert_eq!(
1684                logger.lock().unwrap().clone(),
1685                vec![
1686                    "SyncDataSrc 2 failed to setup",
1687                    "AsyncDataSrc 1 failed to setup",
1688                    "SyncDataSrc 2 dropped",
1689                    "AsyncDataSrc 1 dropped",
1690                ]
1691            );
1692        }
1693
1694        #[tokio::test]
1695        async fn async_test_cannot_add_global_data_src_after_setup() {
1696            let _unused = TEST_SEQ.lock().unwrap();
1697            clear_global_data_srcs_fixed();
1698
1699            #[allow(static_mut_refs)]
1700            unsafe {
1701                let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1702                assert!(ptr.is_null());
1703
1704                let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1705                assert!(ptr.is_null());
1706            }
1707
1708            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1709
1710            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
1711
1712            #[allow(static_mut_refs)]
1713            unsafe {
1714                let mut ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1715                assert!(!ptr.is_null());
1716                assert_eq!((*ptr).name, "foo");
1717                ptr = (*ptr).next;
1718                assert!(ptr.is_null());
1719
1720                let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1721                assert!(ptr.is_null());
1722            }
1723
1724            {
1725                let result = setup_async().await;
1726                assert!(result.is_ok());
1727
1728                #[allow(static_mut_refs)]
1729                unsafe {
1730                    let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1731                    assert!(ptr.is_null());
1732
1733                    let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1734                    assert!(!ptr.is_null());
1735                    assert_eq!((*ptr).name, "foo");
1736                    ptr = (*ptr).next;
1737                    assert!(ptr.is_null());
1738                }
1739
1740                uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
1741
1742                #[allow(static_mut_refs)]
1743                unsafe {
1744                    let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1745                    assert!(ptr.is_null());
1746
1747                    let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1748                    assert!(!ptr.is_null());
1749                    assert_eq!((*ptr).name, "foo");
1750                    ptr = (*ptr).next;
1751                    assert!(ptr.is_null());
1752                }
1753            }
1754
1755            assert_eq!(
1756                *logger.lock().unwrap(),
1757                vec![
1758                    "AsyncDataSrc 1 setupped",
1759                    "SyncDataSrc 2 dropped",
1760                    "AsyncDataSrc 1 closed",
1761                    "AsyncDataSrc 1 dropped",
1762                ]
1763            );
1764        }
1765
1766        #[tokio::test]
1767        async fn async_test_do_nothing_if_executing_setup_twice() {
1768            let _unused = TEST_SEQ.lock().unwrap();
1769            clear_global_data_srcs_fixed();
1770
1771            #[allow(static_mut_refs)]
1772            unsafe {
1773                let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1774                assert!(ptr.is_null());
1775
1776                let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1777                assert!(ptr.is_null());
1778            }
1779
1780            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1781
1782            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
1783
1784            #[allow(static_mut_refs)]
1785            unsafe {
1786                let mut ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1787                assert!(!ptr.is_null());
1788                assert_eq!((*ptr).name, "foo");
1789                ptr = (*ptr).next;
1790                assert!(ptr.is_null());
1791
1792                let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1793                assert!(ptr.is_null());
1794            }
1795
1796            {
1797                let result = setup_async().await;
1798                assert!(result.is_ok());
1799
1800                #[allow(static_mut_refs)]
1801                unsafe {
1802                    let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1803                    assert!(ptr.is_null());
1804
1805                    let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1806                    assert!(!ptr.is_null());
1807                    assert_eq!((*ptr).name, "foo");
1808                    ptr = (*ptr).next;
1809                    assert!(ptr.is_null());
1810                }
1811
1812                let result = setup();
1813                assert!(result.is_ok());
1814
1815                #[allow(static_mut_refs)]
1816                unsafe {
1817                    let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1818                    assert!(ptr.is_null());
1819
1820                    let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1821                    assert!(!ptr.is_null());
1822                    assert_eq!((*ptr).name, "foo");
1823                    ptr = (*ptr).next;
1824                    assert!(ptr.is_null());
1825                }
1826            }
1827
1828            assert_eq!(
1829                logger.lock().unwrap().clone(),
1830                vec![
1831                    "AsyncDataSrc 1 setupped",
1832                    "AsyncDataSrc 1 closed",
1833                    "AsyncDataSrc 1 dropped",
1834                ]
1835            );
1836        }
1837    }
1838
1839    mod tests_of_data_hub_local {
1840        use super::*;
1841        use std::error::Error;
1842
1843        #[test]
1844        fn test_new_and_close_with_no_global_data_srcs() {
1845            let _unused = TEST_SEQ.lock().unwrap();
1846
1847            let hub = DataHub::new();
1848
1849            assert!(hub.local_data_src_list.not_setup_head().is_null());
1850            assert!(hub.local_data_src_list.did_setup_head().is_null());
1851            assert!(hub.data_conn_list.head().is_null());
1852            assert_eq!(hub.data_src_map.len(), 0);
1853            assert_eq!(hub.data_conn_map.len(), 0);
1854            assert_eq!(hub.fixed, false);
1855        }
1856
1857        #[test]
1858        fn test_new_and_close_with_global_data_srcs() {
1859            let _unused = TEST_SEQ.lock().unwrap();
1860            clear_global_data_srcs_fixed();
1861
1862            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1863
1864            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
1865            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
1866
1867            if let Ok(_auto_shutdown) = setup() {
1868                #[allow(static_mut_refs)]
1869                unsafe {
1870                    let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1871                    assert!(ptr.is_null());
1872
1873                    let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1874                    assert!(!ptr.is_null());
1875                    assert_eq!((*ptr).name, "foo");
1876                    ptr = (*ptr).next;
1877                    assert!(!ptr.is_null());
1878                    assert_eq!((*ptr).name, "bar");
1879                    ptr = (*ptr).next;
1880                    assert!(ptr.is_null());
1881                }
1882
1883                let hub = DataHub::new();
1884
1885                assert!(hub.local_data_src_list.not_setup_head().is_null());
1886                assert!(hub.local_data_src_list.did_setup_head().is_null());
1887                assert!(hub.data_conn_list.head().is_null());
1888                assert_eq!(hub.data_src_map.len(), 2);
1889                assert_eq!(hub.data_conn_map.len(), 0);
1890                assert_eq!(hub.fixed, false);
1891
1892                #[allow(static_mut_refs)]
1893                let mut ptr = unsafe { GLOBAL_DATA_SRC_LIST.did_setup_head() };
1894                assert!(!ptr.is_null());
1895                ptr = unsafe { (*ptr).next };
1896                assert!(!ptr.is_null());
1897                ptr = unsafe { (*ptr).next };
1898                assert!(ptr.is_null());
1899                #[allow(static_mut_refs)]
1900                let ptr = unsafe { GLOBAL_DATA_SRC_LIST.not_setup_head() };
1901                assert!(ptr.is_null());
1902            } else {
1903                panic!();
1904            }
1905
1906            #[allow(static_mut_refs)]
1907            let ptr = unsafe { GLOBAL_DATA_SRC_LIST.did_setup_head() };
1908            assert!(ptr.is_null());
1909            #[allow(static_mut_refs)]
1910            let ptr = unsafe { GLOBAL_DATA_SRC_LIST.not_setup_head() };
1911            assert!(ptr.is_null());
1912
1913            assert_eq!(
1914                *logger.lock().unwrap(),
1915                vec![
1916                    "SyncDataSrc 2 setupped",
1917                    "AsyncDataSrc 1 setupped",
1918                    "SyncDataSrc 2 closed",
1919                    "SyncDataSrc 2 dropped",
1920                    "AsyncDataSrc 1 closed",
1921                    "AsyncDataSrc 1 dropped",
1922                ]
1923            );
1924        }
1925
1926        #[test]
1927        fn test_uses_and_disuses() {
1928            let _unused = TEST_SEQ.lock().unwrap();
1929            clear_global_data_srcs_fixed();
1930
1931            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1932
1933            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
1934            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
1935
1936            if let Ok(_auto_shutdown) = setup() {
1937                let mut hub = DataHub::new();
1938
1939                assert!(hub.local_data_src_list.not_setup_head().is_null());
1940                assert!(hub.local_data_src_list.did_setup_head().is_null());
1941                assert!(hub.data_conn_list.head().is_null());
1942                assert_eq!(hub.data_src_map.len(), 2);
1943                assert_eq!(hub.data_conn_map.len(), 0);
1944                assert_eq!(hub.fixed, false);
1945
1946                hub.uses("baz", SyncDataSrc::new(3, logger.clone(), Fail::Not));
1947                let mut ptr = hub.local_data_src_list.not_setup_head();
1948                assert!(!ptr.is_null());
1949                ptr = unsafe { (*ptr).next };
1950                assert!(ptr.is_null());
1951                assert!(hub.local_data_src_list.did_setup_head().is_null());
1952                assert!(hub.data_conn_list.head().is_null());
1953                assert_eq!(hub.data_src_map.len(), 2);
1954                assert_eq!(hub.data_conn_map.len(), 0);
1955                assert_eq!(hub.fixed, false);
1956
1957                hub.uses("qux", AsyncDataSrc::new(4, logger.clone(), Fail::Not));
1958                let mut ptr = hub.local_data_src_list.not_setup_head();
1959                assert!(!ptr.is_null());
1960                ptr = unsafe { (*ptr).next };
1961                assert!(!ptr.is_null());
1962                ptr = unsafe { (*ptr).next };
1963                assert!(ptr.is_null());
1964                assert!(hub.local_data_src_list.did_setup_head().is_null());
1965                assert!(hub.data_conn_list.head().is_null());
1966                assert_eq!(hub.data_src_map.len(), 2);
1967                assert_eq!(hub.data_conn_map.len(), 0);
1968                assert_eq!(hub.fixed, false);
1969
1970                hub.disuses("foo"); // do nothing because of global
1971                hub.disuses("bar"); // do nothing because of global
1972                let mut ptr = hub.local_data_src_list.not_setup_head();
1973                assert!(!ptr.is_null());
1974                ptr = unsafe { (*ptr).next };
1975                assert!(!ptr.is_null());
1976                ptr = unsafe { (*ptr).next };
1977                assert!(ptr.is_null());
1978                assert!(hub.local_data_src_list.did_setup_head().is_null());
1979                assert!(hub.data_conn_list.head().is_null());
1980                assert_eq!(hub.data_src_map.len(), 2);
1981                assert_eq!(hub.data_conn_map.len(), 0);
1982                assert_eq!(hub.fixed, false);
1983
1984                hub.disuses("baz");
1985                let mut ptr = hub.local_data_src_list.not_setup_head();
1986                assert!(!ptr.is_null());
1987                ptr = unsafe { (*ptr).next };
1988                assert!(ptr.is_null());
1989                assert!(hub.local_data_src_list.did_setup_head().is_null());
1990                assert!(hub.data_conn_list.head().is_null());
1991                assert_eq!(hub.data_src_map.len(), 2);
1992                assert_eq!(hub.data_conn_map.len(), 0);
1993                assert_eq!(hub.fixed, false);
1994
1995                hub.disuses("qux");
1996                let ptr = hub.local_data_src_list.not_setup_head();
1997                assert!(ptr.is_null());
1998                assert!(hub.local_data_src_list.did_setup_head().is_null());
1999                assert!(hub.data_conn_list.head().is_null());
2000                assert_eq!(hub.data_src_map.len(), 2);
2001                assert_eq!(hub.data_conn_map.len(), 0);
2002                assert_eq!(hub.fixed, false);
2003            } else {
2004                panic!();
2005            }
2006
2007            assert_eq!(
2008                *logger.lock().unwrap(),
2009                vec![
2010                    "SyncDataSrc 2 setupped",
2011                    "AsyncDataSrc 1 setupped",
2012                    "SyncDataSrc 3 closed",
2013                    "SyncDataSrc 3 dropped",
2014                    "AsyncDataSrc 4 closed",
2015                    "AsyncDataSrc 4 dropped",
2016                    "SyncDataSrc 2 closed",
2017                    "SyncDataSrc 2 dropped",
2018                    "AsyncDataSrc 1 closed",
2019                    "AsyncDataSrc 1 dropped",
2020                ]
2021            );
2022        }
2023
2024        #[test]
2025        fn test_cannot_add_and_remove_data_src_between_begin_and_end() {
2026            let _unused = TEST_SEQ.lock().unwrap();
2027            clear_global_data_srcs_fixed();
2028
2029            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
2030
2031            if let Ok(_auto_shutdown) = setup() {
2032                let mut hub = DataHub::new();
2033
2034                let ptr = hub.local_data_src_list.not_setup_head();
2035                assert!(ptr.is_null());
2036                let ptr = hub.local_data_src_list.did_setup_head();
2037                assert!(ptr.is_null());
2038                assert!(hub.data_conn_list.head().is_null());
2039                assert_eq!(hub.data_src_map.len(), 0);
2040                assert_eq!(hub.data_conn_map.len(), 0);
2041                assert_eq!(hub.fixed, false);
2042
2043                hub.uses("baz", SyncDataSrc::new(1, logger.clone(), Fail::Not));
2044
2045                let mut ptr = hub.local_data_src_list.not_setup_head();
2046                assert!(!ptr.is_null());
2047                ptr = unsafe { (*ptr).next };
2048                assert!(ptr.is_null());
2049                let ptr = hub.local_data_src_list.did_setup_head();
2050                assert!(ptr.is_null());
2051                assert!(hub.data_conn_list.head().is_null());
2052                assert_eq!(hub.data_src_map.len(), 0);
2053                assert_eq!(hub.data_conn_map.len(), 0);
2054                assert_eq!(hub.fixed, false);
2055
2056                assert!(hub.begin().is_ok());
2057
2058                let ptr = hub.local_data_src_list.not_setup_head();
2059                assert!(ptr.is_null());
2060                let mut ptr = hub.local_data_src_list.did_setup_head();
2061                assert!(!ptr.is_null());
2062                ptr = unsafe { (*ptr).next };
2063                assert!(ptr.is_null());
2064                assert!(hub.data_conn_list.head().is_null());
2065                assert_eq!(hub.data_src_map.len(), 1);
2066                assert_eq!(hub.data_conn_map.len(), 0);
2067                assert_eq!(hub.fixed, true);
2068
2069                hub.uses("foo", AsyncDataSrc::new(2, logger.clone(), Fail::Not));
2070
2071                let ptr = hub.local_data_src_list.not_setup_head();
2072                assert!(ptr.is_null());
2073                let mut ptr = hub.local_data_src_list.did_setup_head();
2074                assert!(!ptr.is_null());
2075                ptr = unsafe { (*ptr).next };
2076                assert!(ptr.is_null());
2077                assert!(hub.data_conn_list.head().is_null());
2078                assert_eq!(hub.data_src_map.len(), 1);
2079                assert_eq!(hub.data_conn_map.len(), 0);
2080                assert_eq!(hub.fixed, true);
2081
2082                hub.disuses("baz");
2083
2084                let ptr = hub.local_data_src_list.not_setup_head();
2085                assert!(ptr.is_null());
2086                let mut ptr = hub.local_data_src_list.did_setup_head();
2087                assert!(!ptr.is_null());
2088                ptr = unsafe { (*ptr).next };
2089                assert!(ptr.is_null());
2090                assert!(hub.data_conn_list.head().is_null());
2091                assert_eq!(hub.data_src_map.len(), 1);
2092                assert_eq!(hub.data_conn_map.len(), 0);
2093                assert_eq!(hub.fixed, true);
2094
2095                hub.end();
2096
2097                let ptr = hub.local_data_src_list.not_setup_head();
2098                assert!(ptr.is_null());
2099                let mut ptr = hub.local_data_src_list.did_setup_head();
2100                assert!(!ptr.is_null());
2101                ptr = unsafe { (*ptr).next };
2102                assert!(ptr.is_null());
2103                assert!(hub.data_conn_list.head().is_null());
2104                assert_eq!(hub.data_src_map.len(), 1);
2105                assert_eq!(hub.data_conn_map.len(), 0);
2106                assert_eq!(hub.fixed, false);
2107
2108                hub.uses("foo", AsyncDataSrc::new(2, logger.clone(), Fail::Not));
2109
2110                let mut ptr = hub.local_data_src_list.not_setup_head();
2111                assert!(!ptr.is_null());
2112                ptr = unsafe { (*ptr).next };
2113                assert!(ptr.is_null());
2114                let mut ptr = hub.local_data_src_list.did_setup_head();
2115                assert!(!ptr.is_null());
2116                ptr = unsafe { (*ptr).next };
2117                assert!(ptr.is_null());
2118                assert!(hub.data_conn_list.head().is_null());
2119                assert_eq!(hub.data_src_map.len(), 1);
2120                assert_eq!(hub.data_conn_map.len(), 0);
2121                assert_eq!(hub.fixed, false);
2122
2123                hub.disuses("baz");
2124
2125                let mut ptr = hub.local_data_src_list.not_setup_head();
2126                assert!(!ptr.is_null());
2127                ptr = unsafe { (*ptr).next };
2128                assert!(ptr.is_null());
2129                let ptr = hub.local_data_src_list.did_setup_head();
2130                assert!(ptr.is_null());
2131                assert!(hub.data_conn_list.head().is_null());
2132                assert_eq!(hub.data_src_map.len(), 0);
2133                assert_eq!(hub.data_conn_map.len(), 0);
2134                assert_eq!(hub.fixed, false);
2135            }
2136        }
2137
2138        #[test]
2139        fn test_begin_and_end() {
2140            let _unused = TEST_SEQ.lock().unwrap();
2141            clear_global_data_srcs_fixed();
2142
2143            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
2144
2145            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
2146            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
2147
2148            if let Ok(_auto_shutdown) = setup() {
2149                let mut hub = DataHub::new();
2150
2151                hub.uses("baz", SyncDataSrc::new(3, logger.clone(), Fail::Not));
2152                hub.uses("qux", AsyncDataSrc::new(4, logger.clone(), Fail::Not));
2153
2154                let mut ptr = hub.local_data_src_list.not_setup_head();
2155                assert!(!ptr.is_null());
2156                ptr = unsafe { (*ptr).next };
2157                assert!(!ptr.is_null());
2158                ptr = unsafe { (*ptr).next };
2159                assert!(ptr.is_null());
2160                assert!(hub.local_data_src_list.did_setup_head().is_null());
2161                assert!(hub.data_conn_list.head().is_null());
2162                assert_eq!(hub.data_src_map.len(), 2);
2163                assert_eq!(hub.data_conn_map.len(), 0);
2164                assert_eq!(hub.fixed, false);
2165
2166                assert!(hub.begin().is_ok());
2167
2168                assert!(hub.local_data_src_list.not_setup_head().is_null());
2169                let mut ptr = hub.local_data_src_list.did_setup_head();
2170                assert!(!ptr.is_null());
2171                ptr = unsafe { (*ptr).next };
2172                assert!(!ptr.is_null());
2173                ptr = unsafe { (*ptr).next };
2174                assert!(ptr.is_null());
2175                assert!(hub.data_conn_list.head().is_null());
2176                assert_eq!(hub.data_src_map.len(), 4);
2177                assert_eq!(hub.data_conn_map.len(), 0);
2178                assert_eq!(hub.fixed, true);
2179
2180                hub.end();
2181
2182                assert!(hub.local_data_src_list.not_setup_head().is_null());
2183                let mut ptr = hub.local_data_src_list.did_setup_head();
2184                assert!(!ptr.is_null());
2185                ptr = unsafe { (*ptr).next };
2186                assert!(!ptr.is_null());
2187                ptr = unsafe { (*ptr).next };
2188                assert!(ptr.is_null());
2189                assert!(hub.data_conn_list.head().is_null());
2190                assert_eq!(hub.data_src_map.len(), 4);
2191                assert_eq!(hub.data_conn_map.len(), 0);
2192                assert_eq!(hub.fixed, false);
2193            }
2194
2195            assert_eq!(
2196                *logger.lock().unwrap(),
2197                vec![
2198                    "SyncDataSrc 2 setupped",
2199                    "AsyncDataSrc 1 setupped",
2200                    "SyncDataSrc 3 setupped",
2201                    "AsyncDataSrc 4 setupped",
2202                    "AsyncDataSrc 4 closed",
2203                    "AsyncDataSrc 4 dropped",
2204                    "SyncDataSrc 3 closed",
2205                    "SyncDataSrc 3 dropped",
2206                    "SyncDataSrc 2 closed",
2207                    "SyncDataSrc 2 dropped",
2208                    "AsyncDataSrc 1 closed",
2209                    "AsyncDataSrc 1 dropped",
2210                ]
2211            );
2212        }
2213
2214        #[test]
2215        fn test_begin_and_end_but_fail_sync() {
2216            let _unused = TEST_SEQ.lock().unwrap();
2217            clear_global_data_srcs_fixed();
2218
2219            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
2220
2221            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
2222            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
2223
2224            if let Ok(_auto_shutdown) = setup() {
2225                let mut hub = DataHub::new();
2226                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
2227                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Setup));
2228
2229                if let Err(err) = hub.begin() {
2230                    match err.reason::<DataHubError>() {
2231                        Ok(r) => match r {
2232                            DataHubError::FailToSetupLocalDataSrcs { errors } => {
2233                                assert_eq!(errors.len(), 1);
2234                                if let Some(err) = errors.get("qux") {
2235                                    match err.reason::<String>() {
2236                                        Ok(s) => assert_eq!(s, "XXX"),
2237                                        Err(_) => panic!(),
2238                                    }
2239                                } else {
2240                                    panic!();
2241                                }
2242                            }
2243                            _ => panic!(),
2244                        },
2245                        Err(_) => panic!(),
2246                    }
2247                } else {
2248                    panic!();
2249                }
2250
2251                let mut ptr = hub.local_data_src_list.not_setup_head();
2252                assert!(!ptr.is_null());
2253                ptr = unsafe { (*ptr).next };
2254                assert!(ptr.is_null());
2255                let mut ptr = hub.local_data_src_list.did_setup_head();
2256                assert!(!ptr.is_null());
2257                ptr = unsafe { (*ptr).next };
2258                assert!(ptr.is_null());
2259                assert!(hub.data_conn_list.head().is_null());
2260                assert_eq!(hub.data_src_map.len(), 3);
2261                assert_eq!(hub.data_conn_map.len(), 0);
2262                assert_eq!(hub.fixed, true);
2263
2264                hub.end();
2265
2266                let mut ptr = hub.local_data_src_list.not_setup_head();
2267                assert!(!ptr.is_null());
2268                ptr = unsafe { (*ptr).next };
2269                assert!(ptr.is_null());
2270                let mut ptr = hub.local_data_src_list.did_setup_head();
2271                assert!(!ptr.is_null());
2272                ptr = unsafe { (*ptr).next };
2273                assert!(ptr.is_null());
2274                assert!(hub.data_conn_list.head().is_null());
2275                assert_eq!(hub.data_src_map.len(), 3);
2276                assert_eq!(hub.data_conn_map.len(), 0);
2277                assert_eq!(hub.fixed, false);
2278            } else {
2279                panic!();
2280            }
2281        }
2282
2283        #[test]
2284        fn test_begin_and_end_but_fail_async() {
2285            let _unused = TEST_SEQ.lock().unwrap();
2286            clear_global_data_srcs_fixed();
2287
2288            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
2289
2290            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
2291            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
2292
2293            if let Ok(_auto_shutdown) = setup() {
2294                let mut hub = DataHub::new();
2295                hub.uses("baz", AsyncDataSrc::new(1, logger.clone(), Fail::Setup));
2296                hub.uses("qux", SyncDataSrc::new(2, logger.clone(), Fail::Not));
2297
2298                if let Err(err) = hub.begin() {
2299                    match err.reason::<DataHubError>() {
2300                        Ok(r) => match r {
2301                            DataHubError::FailToSetupLocalDataSrcs { errors } => {
2302                                assert_eq!(errors.len(), 1);
2303                                if let Some(err) = errors.get("baz") {
2304                                    match err.reason::<String>() {
2305                                        Ok(s) => assert_eq!(s, "YYY"),
2306                                        Err(_) => panic!(),
2307                                    }
2308                                } else {
2309                                    panic!();
2310                                }
2311                            }
2312                            _ => panic!(),
2313                        },
2314                        Err(_) => panic!(),
2315                    }
2316                } else {
2317                    panic!();
2318                }
2319
2320                let mut ptr = hub.local_data_src_list.not_setup_head();
2321                assert!(!ptr.is_null());
2322                ptr = unsafe { (*ptr).next };
2323                assert!(ptr.is_null());
2324                let mut ptr = hub.local_data_src_list.did_setup_head();
2325                assert!(!ptr.is_null());
2326                ptr = unsafe { (*ptr).next };
2327                assert!(ptr.is_null());
2328                assert!(hub.data_conn_list.head().is_null());
2329                assert_eq!(hub.data_src_map.len(), 3);
2330                assert_eq!(hub.data_conn_map.len(), 0);
2331                assert_eq!(hub.fixed, true);
2332
2333                hub.end();
2334
2335                let mut ptr = hub.local_data_src_list.not_setup_head();
2336                assert!(!ptr.is_null());
2337                ptr = unsafe { (*ptr).next };
2338                assert!(ptr.is_null());
2339                let mut ptr = hub.local_data_src_list.did_setup_head();
2340                assert!(!ptr.is_null());
2341                ptr = unsafe { (*ptr).next };
2342                assert!(ptr.is_null());
2343                assert!(hub.data_conn_list.head().is_null());
2344                assert_eq!(hub.data_src_map.len(), 3);
2345                assert_eq!(hub.data_conn_map.len(), 0);
2346                assert_eq!(hub.fixed, false);
2347            } else {
2348                panic!();
2349            }
2350        }
2351
2352        #[test]
2353        fn test_commit_and_post_commit() {
2354            let _unused = TEST_SEQ.lock().unwrap();
2355            clear_global_data_srcs_fixed();
2356
2357            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
2358
2359            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
2360            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
2361
2362            if let Ok(_auto_shutdown) = setup() {
2363                let mut hub = DataHub::new();
2364                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
2365                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
2366
2367                if let Ok(_) = hub.begin() {
2368                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
2369                        assert_eq!(
2370                            any::type_name_of_val(conn1),
2371                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
2372                        );
2373                    } else {
2374                        panic!();
2375                    }
2376                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
2377                        assert_eq!(
2378                            any::type_name_of_val(conn2),
2379                            "sabi::data_hub::tests_data_hub::SyncDataConn"
2380                        );
2381                    } else {
2382                        panic!();
2383                    }
2384                    if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
2385                        assert_eq!(
2386                            any::type_name_of_val(conn3),
2387                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
2388                        );
2389                    } else {
2390                        panic!();
2391                    }
2392                    if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
2393                        assert_eq!(
2394                            any::type_name_of_val(conn4),
2395                            "sabi::data_hub::tests_data_hub::SyncDataConn"
2396                        );
2397                    } else {
2398                        panic!();
2399                    }
2400
2401                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
2402                        assert_eq!(
2403                            any::type_name_of_val(conn1),
2404                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
2405                        );
2406                    } else {
2407                        panic!();
2408                    }
2409                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
2410                        assert_eq!(
2411                            any::type_name_of_val(conn2),
2412                            "sabi::data_hub::tests_data_hub::SyncDataConn"
2413                        );
2414                    } else {
2415                        panic!();
2416                    }
2417                    if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
2418                        assert_eq!(
2419                            any::type_name_of_val(conn3),
2420                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
2421                        );
2422                    } else {
2423                        panic!();
2424                    }
2425                    if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
2426                        assert_eq!(
2427                            any::type_name_of_val(conn4),
2428                            "sabi::data_hub::tests_data_hub::SyncDataConn"
2429                        );
2430                    } else {
2431                        panic!();
2432                    }
2433
2434                    assert!(hub.commit().is_ok());
2435                    hub.end();
2436                } else {
2437                    panic!();
2438                }
2439            } else {
2440                panic!();
2441            }
2442
2443            assert_eq!(
2444                *logger.lock().unwrap(),
2445                vec![
2446                    "SyncDataSrc 2 setupped",
2447                    "AsyncDataSrc 1 setupped",
2448                    "SyncDataSrc 4 setupped",
2449                    "AsyncDataSrc 3 setupped",
2450                    "AsyncDataSrc 1 created DataConn",
2451                    "SyncDataSrc 2 created DataConn",
2452                    "AsyncDataSrc 3 created DataConn",
2453                    "SyncDataSrc 4 created DataConn",
2454                    "AsyncDataConn 1 pre committed",
2455                    "SyncDataConn 2 pre committed",
2456                    "AsyncDataConn 3 pre committed",
2457                    "SyncDataConn 4 pre committed",
2458                    "AsyncDataConn 1 committed",
2459                    "SyncDataConn 2 committed",
2460                    "AsyncDataConn 3 committed",
2461                    "SyncDataConn 4 committed",
2462                    "AsyncDataConn 1 post committed",
2463                    "SyncDataConn 2 post committed",
2464                    "AsyncDataConn 3 post committed",
2465                    "SyncDataConn 4 post committed",
2466                    "SyncDataConn 4 closed",
2467                    "SyncDataConn 4 dropped",
2468                    "AsyncDataConn 3 closed",
2469                    "AsyncDataConn 3 dropped",
2470                    "SyncDataConn 2 closed",
2471                    "SyncDataConn 2 dropped",
2472                    "AsyncDataConn 1 closed",
2473                    "AsyncDataConn 1 dropped",
2474                    "SyncDataSrc 4 closed",
2475                    "SyncDataSrc 4 dropped",
2476                    "AsyncDataSrc 3 closed",
2477                    "AsyncDataSrc 3 dropped",
2478                    "SyncDataSrc 2 closed",
2479                    "SyncDataSrc 2 dropped",
2480                    "AsyncDataSrc 1 closed",
2481                    "AsyncDataSrc 1 dropped",
2482                ],
2483            );
2484        }
2485
2486        #[test]
2487        fn test_fail_to_cast_new_data_conn() {
2488            let _unused = TEST_SEQ.lock().unwrap();
2489            clear_global_data_srcs_fixed();
2490
2491            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
2492
2493            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
2494
2495            if let Ok(_auto_shutdown) = setup() {
2496                let mut hub = DataHub::new();
2497                hub.uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
2498
2499                if let Ok(_) = hub.begin() {
2500                    if let Err(err) = hub.get_data_conn::<SyncDataConn>("foo") {
2501                        match err.reason::<DataHubError>() {
2502                            Ok(r) => match r {
2503                                DataHubError::FailToCastDataConn { name, cast_to_type } => {
2504                                    assert_eq!(name, "foo");
2505                                    assert_eq!(
2506                                        *cast_to_type,
2507                                        "sabi::data_hub::tests_data_hub::SyncDataConn"
2508                                    );
2509                                }
2510                                _ => panic!(),
2511                            },
2512                            Err(_) => panic!(),
2513                        }
2514                    } else {
2515                        panic!();
2516                    }
2517
2518                    if let Err(err) = hub.get_data_conn::<AsyncDataConn>("bar") {
2519                        match err.reason::<DataHubError>() {
2520                            Ok(r) => match r {
2521                                DataHubError::FailToCastDataConn { name, cast_to_type } => {
2522                                    assert_eq!(name, "bar");
2523                                    assert_eq!(
2524                                        *cast_to_type,
2525                                        "sabi::data_hub::tests_data_hub::AsyncDataConn"
2526                                    );
2527                                }
2528                                _ => panic!(),
2529                            },
2530                            Err(_) => panic!(),
2531                        }
2532                    } else {
2533                        panic!();
2534                    }
2535                } else {
2536                    panic!();
2537                }
2538            } else {
2539                panic!();
2540            }
2541
2542            assert_eq!(
2543                *logger.lock().unwrap(),
2544                vec![
2545                    "AsyncDataSrc 1 setupped",
2546                    "SyncDataSrc 2 setupped",
2547                    "SyncDataSrc 2 closed",
2548                    "SyncDataSrc 2 dropped",
2549                    "AsyncDataSrc 1 closed",
2550                    "AsyncDataSrc 1 dropped",
2551                ],
2552            );
2553        }
2554
2555        #[test]
2556        fn test_fail_to_cast_reused_data_conn() {
2557            let _unused = TEST_SEQ.lock().unwrap();
2558            clear_global_data_srcs_fixed();
2559
2560            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
2561
2562            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
2563
2564            if let Ok(_auto_shutdown) = setup() {
2565                let mut hub = DataHub::new();
2566                hub.uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
2567
2568                if let Ok(_) = hub.begin() {
2569                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
2570                        assert_eq!(
2571                            any::type_name_of_val(conn1),
2572                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
2573                        );
2574                    } else {
2575                        panic!();
2576                    }
2577                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
2578                        assert_eq!(
2579                            any::type_name_of_val(conn2),
2580                            "sabi::data_hub::tests_data_hub::SyncDataConn"
2581                        );
2582                    } else {
2583                        panic!();
2584                    }
2585
2586                    if let Err(err) = hub.get_data_conn::<SyncDataConn>("foo") {
2587                        match err.reason::<DataHubError>() {
2588                            Ok(r) => match r {
2589                                DataHubError::FailToCastDataConn { name, cast_to_type } => {
2590                                    assert_eq!(name, "foo");
2591                                    assert_eq!(
2592                                        *cast_to_type,
2593                                        "sabi::data_hub::tests_data_hub::SyncDataConn"
2594                                    );
2595                                }
2596                                _ => panic!(),
2597                            },
2598                            Err(_) => panic!(),
2599                        }
2600                    } else {
2601                        panic!();
2602                    }
2603
2604                    if let Err(err) = hub.get_data_conn::<AsyncDataConn>("bar") {
2605                        match err.reason::<DataHubError>() {
2606                            Ok(r) => match r {
2607                                DataHubError::FailToCastDataConn { name, cast_to_type } => {
2608                                    assert_eq!(name, "bar");
2609                                    assert_eq!(
2610                                        *cast_to_type,
2611                                        "sabi::data_hub::tests_data_hub::AsyncDataConn"
2612                                    );
2613                                }
2614                                _ => panic!(),
2615                            },
2616                            Err(_) => panic!(),
2617                        }
2618                    } else {
2619                        panic!();
2620                    }
2621                } else {
2622                    panic!();
2623                }
2624            } else {
2625                panic!();
2626            }
2627
2628            assert_eq!(
2629                *logger.lock().unwrap(),
2630                vec![
2631                    "AsyncDataSrc 1 setupped",
2632                    "SyncDataSrc 2 setupped",
2633                    "AsyncDataSrc 1 created DataConn",
2634                    "SyncDataSrc 2 created DataConn",
2635                    "SyncDataConn 2 closed",
2636                    "SyncDataConn 2 dropped",
2637                    "AsyncDataConn 1 closed",
2638                    "AsyncDataConn 1 dropped",
2639                    "SyncDataSrc 2 closed",
2640                    "SyncDataSrc 2 dropped",
2641                    "AsyncDataSrc 1 closed",
2642                    "AsyncDataSrc 1 dropped",
2643                ],
2644            );
2645        }
2646
2647        #[test]
2648        fn test_fail_to_create_data_conn() {
2649            let _unused = TEST_SEQ.lock().unwrap();
2650            clear_global_data_srcs_fixed();
2651
2652            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
2653
2654            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
2655
2656            if let Ok(_auto_shutdown) = setup() {
2657                let mut hub = DataHub::new();
2658                hub.uses(
2659                    "bar",
2660                    SyncDataSrc::new(2, logger.clone(), Fail::CreateDataConn),
2661                );
2662
2663                if let Ok(_) = hub.begin() {
2664                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
2665                        assert_eq!(
2666                            any::type_name_of_val(conn1),
2667                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
2668                        );
2669                    } else {
2670                        panic!();
2671                    }
2672
2673                    if let Err(err) = hub.get_data_conn::<SyncDataConn>("bar") {
2674                        match err.reason::<DataHubError>() {
2675                            Ok(r) => match r {
2676                                DataHubError::FailToCreateDataConn {
2677                                    name,
2678                                    data_conn_type,
2679                                } => {
2680                                    assert_eq!(name, "bar");
2681                                    assert_eq!(
2682                                        *data_conn_type,
2683                                        "sabi::data_hub::tests_data_hub::SyncDataConn"
2684                                    );
2685                                }
2686                                _ => panic!(),
2687                            },
2688                            Err(_) => panic!(),
2689                        }
2690                        match err.source() {
2691                            Some(e) => {
2692                                assert_eq!(
2693                                    e.downcast_ref::<errs::Err>()
2694                                        .unwrap()
2695                                        .reason::<String>()
2696                                        .unwrap(),
2697                                    "xxx"
2698                                );
2699                            }
2700                            None => panic!(),
2701                        }
2702                    } else {
2703                        panic!();
2704                    }
2705                } else {
2706                    panic!();
2707                }
2708            } else {
2709                panic!();
2710            }
2711
2712            assert_eq!(
2713                *logger.lock().unwrap(),
2714                vec![
2715                    "AsyncDataSrc 1 setupped",
2716                    "SyncDataSrc 2 setupped",
2717                    "AsyncDataSrc 1 created DataConn",
2718                    "SyncDataSrc 2 failed to create a DataConn",
2719                    "AsyncDataConn 1 closed",
2720                    "AsyncDataConn 1 dropped",
2721                    "SyncDataSrc 2 closed",
2722                    "SyncDataSrc 2 dropped",
2723                    "AsyncDataSrc 1 closed",
2724                    "AsyncDataSrc 1 dropped",
2725                ],
2726            );
2727        }
2728
2729        #[test]
2730        fn test_fail_to_create_data_conn_because_of_no_data_src() {
2731            let _unused = TEST_SEQ.lock().unwrap();
2732            clear_global_data_srcs_fixed();
2733
2734            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
2735
2736            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
2737
2738            if let Ok(_auto_shutdown) = setup() {
2739                let mut hub = DataHub::new();
2740                hub.uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
2741
2742                if let Ok(_) = hub.begin() {
2743                    if let Err(err) = hub.get_data_conn::<SyncDataConn>("baz") {
2744                        match err.reason::<DataHubError>() {
2745                            Ok(r) => match r {
2746                                DataHubError::NoDataSrcToCreateDataConn {
2747                                    name,
2748                                    data_conn_type,
2749                                } => {
2750                                    assert_eq!(name, "baz");
2751                                    assert_eq!(
2752                                        *data_conn_type,
2753                                        "sabi::data_hub::tests_data_hub::SyncDataConn"
2754                                    );
2755                                }
2756                                _ => panic!(),
2757                            },
2758                            Err(_) => panic!(),
2759                        }
2760                    } else {
2761                        panic!();
2762                    }
2763
2764                    if let Err(err) = hub.get_data_conn::<AsyncDataConn>("qux") {
2765                        match err.reason::<DataHubError>() {
2766                            Ok(r) => match r {
2767                                DataHubError::NoDataSrcToCreateDataConn {
2768                                    name,
2769                                    data_conn_type,
2770                                } => {
2771                                    assert_eq!(name, "qux");
2772                                    assert_eq!(
2773                                        *data_conn_type,
2774                                        "sabi::data_hub::tests_data_hub::AsyncDataConn"
2775                                    );
2776                                }
2777                                _ => panic!(),
2778                            },
2779                            Err(_) => panic!(),
2780                        }
2781                    } else {
2782                        panic!();
2783                    }
2784                } else {
2785                    panic!();
2786                }
2787            } else {
2788                panic!();
2789            }
2790
2791            assert_eq!(
2792                *logger.lock().unwrap(),
2793                vec![
2794                    "AsyncDataSrc 1 setupped",
2795                    "SyncDataSrc 2 setupped",
2796                    "SyncDataSrc 2 closed",
2797                    "SyncDataSrc 2 dropped",
2798                    "AsyncDataSrc 1 closed",
2799                    "AsyncDataSrc 1 dropped",
2800                ],
2801            );
2802        }
2803
2804        #[test]
2805        fn test_commit_when_no_data_conn() {
2806            let _unused = TEST_SEQ.lock().unwrap();
2807            clear_global_data_srcs_fixed();
2808
2809            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
2810
2811            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
2812            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
2813
2814            if let Ok(_auto_shutdown) = setup() {
2815                let mut hub = DataHub::new();
2816                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
2817                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
2818
2819                if let Ok(_) = hub.begin() {
2820                    assert!(hub.commit().is_ok());
2821                    hub.end();
2822                } else {
2823                    panic!();
2824                }
2825            } else {
2826                panic!();
2827            }
2828
2829            assert_eq!(
2830                *logger.lock().unwrap(),
2831                vec![
2832                    "SyncDataSrc 2 setupped",
2833                    "AsyncDataSrc 1 setupped",
2834                    "SyncDataSrc 4 setupped",
2835                    "AsyncDataSrc 3 setupped",
2836                    "SyncDataSrc 4 closed",
2837                    "SyncDataSrc 4 dropped",
2838                    "AsyncDataSrc 3 closed",
2839                    "AsyncDataSrc 3 dropped",
2840                    "SyncDataSrc 2 closed",
2841                    "SyncDataSrc 2 dropped",
2842                    "AsyncDataSrc 1 closed",
2843                    "AsyncDataSrc 1 dropped",
2844                ],
2845            );
2846        }
2847
2848        #[test]
2849        fn test_commit_but_fail_global_sync() {
2850            let _unused = TEST_SEQ.lock().unwrap();
2851            clear_global_data_srcs_fixed();
2852
2853            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
2854
2855            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
2856            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Commit));
2857
2858            if let Ok(_auto_shutdown) = setup() {
2859                let mut hub = DataHub::new();
2860                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
2861                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
2862
2863                if let Ok(_) = hub.begin() {
2864                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
2865                        assert_eq!(
2866                            any::type_name_of_val(conn1),
2867                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
2868                        );
2869                    } else {
2870                        panic!();
2871                    }
2872                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
2873                        assert_eq!(
2874                            any::type_name_of_val(conn2),
2875                            "sabi::data_hub::tests_data_hub::SyncDataConn"
2876                        );
2877                    } else {
2878                        panic!();
2879                    }
2880                    if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
2881                        assert_eq!(
2882                            any::type_name_of_val(conn3),
2883                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
2884                        );
2885                    } else {
2886                        panic!();
2887                    }
2888                    if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
2889                        assert_eq!(
2890                            any::type_name_of_val(conn4),
2891                            "sabi::data_hub::tests_data_hub::SyncDataConn"
2892                        );
2893                    } else {
2894                        panic!();
2895                    }
2896
2897                    match hub.commit() {
2898                        Ok(_) => panic!(),
2899                        Err(err) => match err.reason::<DataHubError>() {
2900                            Ok(r) => match r {
2901                                DataHubError::FailToCommitDataConn { errors } => {
2902                                    assert_eq!(errors.len(), 1);
2903                                    if let Some(e) = errors.get("bar") {
2904                                        if let Ok(s) = e.reason::<String>() {
2905                                            assert_eq!(s, "ZZZ");
2906                                        } else {
2907                                            panic!();
2908                                        }
2909                                    } else {
2910                                        panic!();
2911                                    }
2912                                }
2913                                _ => panic!(),
2914                            },
2915                            Err(_) => panic!(),
2916                        },
2917                    }
2918
2919                    hub.end();
2920                } else {
2921                    panic!();
2922                }
2923            } else {
2924                panic!();
2925            }
2926
2927            assert_eq!(
2928                *logger.lock().unwrap(),
2929                vec![
2930                    "SyncDataSrc 2 setupped",
2931                    "AsyncDataSrc 1 setupped",
2932                    "SyncDataSrc 4 setupped",
2933                    "AsyncDataSrc 3 setupped",
2934                    "AsyncDataSrc 1 created DataConn",
2935                    "SyncDataSrc 2 created DataConn",
2936                    "AsyncDataSrc 3 created DataConn",
2937                    "SyncDataSrc 4 created DataConn",
2938                    "AsyncDataConn 1 pre committed",
2939                    "SyncDataConn 2 pre committed",
2940                    "AsyncDataConn 3 pre committed",
2941                    "SyncDataConn 4 pre committed",
2942                    "AsyncDataConn 1 committed",
2943                    "SyncDataConn 2 failed to commit",
2944                    "SyncDataConn 4 closed",
2945                    "SyncDataConn 4 dropped",
2946                    "AsyncDataConn 3 closed",
2947                    "AsyncDataConn 3 dropped",
2948                    "SyncDataConn 2 closed",
2949                    "SyncDataConn 2 dropped",
2950                    "AsyncDataConn 1 closed",
2951                    "AsyncDataConn 1 dropped",
2952                    "SyncDataSrc 4 closed",
2953                    "SyncDataSrc 4 dropped",
2954                    "AsyncDataSrc 3 closed",
2955                    "AsyncDataSrc 3 dropped",
2956                    "SyncDataSrc 2 closed",
2957                    "SyncDataSrc 2 dropped",
2958                    "AsyncDataSrc 1 closed",
2959                    "AsyncDataSrc 1 dropped",
2960                ],
2961            );
2962        }
2963
2964        #[test]
2965        fn test_commit_but_fail_global_async() {
2966            let _unused = TEST_SEQ.lock().unwrap();
2967            clear_global_data_srcs_fixed();
2968
2969            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
2970
2971            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Commit));
2972            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
2973
2974            if let Ok(_auto_shutdown) = setup() {
2975                let mut hub = DataHub::new();
2976                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
2977                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
2978
2979                if let Ok(_) = hub.begin() {
2980                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
2981                        assert_eq!(
2982                            any::type_name_of_val(conn1),
2983                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
2984                        );
2985                    } else {
2986                        panic!();
2987                    }
2988                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
2989                        assert_eq!(
2990                            any::type_name_of_val(conn2),
2991                            "sabi::data_hub::tests_data_hub::SyncDataConn"
2992                        );
2993                    } else {
2994                        panic!();
2995                    }
2996                    if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
2997                        assert_eq!(
2998                            any::type_name_of_val(conn3),
2999                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
3000                        );
3001                    } else {
3002                        panic!();
3003                    }
3004                    if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
3005                        assert_eq!(
3006                            any::type_name_of_val(conn4),
3007                            "sabi::data_hub::tests_data_hub::SyncDataConn"
3008                        );
3009                    } else {
3010                        panic!();
3011                    }
3012
3013                    match hub.commit() {
3014                        Ok(_) => panic!(),
3015                        Err(err) => match err.reason::<DataHubError>() {
3016                            Ok(r) => match r {
3017                                DataHubError::FailToCommitDataConn { errors } => {
3018                                    assert_eq!(errors.len(), 1);
3019                                    if let Some(e) = errors.get("foo") {
3020                                        if let Ok(s) = e.reason::<String>() {
3021                                            assert_eq!(s, "VVV");
3022                                        } else {
3023                                            panic!();
3024                                        }
3025                                    } else {
3026                                        panic!();
3027                                    }
3028                                }
3029                                _ => panic!(),
3030                            },
3031                            Err(_) => panic!(),
3032                        },
3033                    }
3034
3035                    hub.end();
3036                } else {
3037                    panic!();
3038                }
3039            } else {
3040                panic!();
3041            }
3042
3043            assert_eq!(
3044                *logger.lock().unwrap(),
3045                vec![
3046                    "SyncDataSrc 2 setupped",
3047                    "AsyncDataSrc 1 setupped",
3048                    "SyncDataSrc 4 setupped",
3049                    "AsyncDataSrc 3 setupped",
3050                    "AsyncDataSrc 1 created DataConn",
3051                    "SyncDataSrc 2 created DataConn",
3052                    "AsyncDataSrc 3 created DataConn",
3053                    "SyncDataSrc 4 created DataConn",
3054                    "AsyncDataConn 1 pre committed",
3055                    "SyncDataConn 2 pre committed",
3056                    "AsyncDataConn 3 pre committed",
3057                    "SyncDataConn 4 pre committed",
3058                    "AsyncDataConn 1 failed to commit",
3059                    "SyncDataConn 4 closed",
3060                    "SyncDataConn 4 dropped",
3061                    "AsyncDataConn 3 closed",
3062                    "AsyncDataConn 3 dropped",
3063                    "SyncDataConn 2 closed",
3064                    "SyncDataConn 2 dropped",
3065                    "AsyncDataConn 1 closed",
3066                    "AsyncDataConn 1 dropped",
3067                    "SyncDataSrc 4 closed",
3068                    "SyncDataSrc 4 dropped",
3069                    "AsyncDataSrc 3 closed",
3070                    "AsyncDataSrc 3 dropped",
3071                    "SyncDataSrc 2 closed",
3072                    "SyncDataSrc 2 dropped",
3073                    "AsyncDataSrc 1 closed",
3074                    "AsyncDataSrc 1 dropped",
3075                ],
3076            );
3077        }
3078
3079        #[test]
3080        fn test_commit_but_fail_local_sync() {
3081            let _unused = TEST_SEQ.lock().unwrap();
3082            clear_global_data_srcs_fixed();
3083
3084            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
3085
3086            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
3087            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
3088
3089            if let Ok(_auto_shutdown) = setup() {
3090                let mut hub = DataHub::new();
3091                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
3092                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Commit));
3093
3094                if let Ok(_) = hub.begin() {
3095                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
3096                        assert_eq!(
3097                            any::type_name_of_val(conn1),
3098                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
3099                        );
3100                    } else {
3101                        panic!();
3102                    }
3103                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
3104                        assert_eq!(
3105                            any::type_name_of_val(conn2),
3106                            "sabi::data_hub::tests_data_hub::SyncDataConn"
3107                        );
3108                    } else {
3109                        panic!();
3110                    }
3111                    if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
3112                        assert_eq!(
3113                            any::type_name_of_val(conn3),
3114                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
3115                        );
3116                    } else {
3117                        panic!();
3118                    }
3119                    if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
3120                        assert_eq!(
3121                            any::type_name_of_val(conn4),
3122                            "sabi::data_hub::tests_data_hub::SyncDataConn"
3123                        );
3124                    } else {
3125                        panic!();
3126                    }
3127
3128                    match hub.commit() {
3129                        Ok(_) => panic!(),
3130                        Err(err) => match err.reason::<DataHubError>() {
3131                            Ok(r) => match r {
3132                                DataHubError::FailToCommitDataConn { errors } => {
3133                                    assert_eq!(errors.len(), 1);
3134                                    if let Some(e) = errors.get("qux") {
3135                                        if let Ok(s) = e.reason::<String>() {
3136                                            assert_eq!(s, "ZZZ");
3137                                        } else {
3138                                            panic!();
3139                                        }
3140                                    } else {
3141                                        panic!();
3142                                    }
3143                                }
3144                                _ => panic!(),
3145                            },
3146                            Err(_) => panic!(),
3147                        },
3148                    }
3149
3150                    hub.end();
3151                } else {
3152                    panic!();
3153                }
3154            } else {
3155                panic!();
3156            }
3157
3158            assert_eq!(
3159                *logger.lock().unwrap(),
3160                vec![
3161                    "SyncDataSrc 2 setupped",
3162                    "AsyncDataSrc 1 setupped",
3163                    "SyncDataSrc 4 setupped",
3164                    "AsyncDataSrc 3 setupped",
3165                    "AsyncDataSrc 1 created DataConn",
3166                    "SyncDataSrc 2 created DataConn",
3167                    "AsyncDataSrc 3 created DataConn",
3168                    "SyncDataSrc 4 created DataConn",
3169                    "AsyncDataConn 1 pre committed",
3170                    "SyncDataConn 2 pre committed",
3171                    "AsyncDataConn 3 pre committed",
3172                    "SyncDataConn 4 pre committed",
3173                    "AsyncDataConn 1 committed",
3174                    "SyncDataConn 2 committed",
3175                    "AsyncDataConn 3 committed",
3176                    "SyncDataConn 4 failed to commit",
3177                    "SyncDataConn 4 closed",
3178                    "SyncDataConn 4 dropped",
3179                    "AsyncDataConn 3 closed",
3180                    "AsyncDataConn 3 dropped",
3181                    "SyncDataConn 2 closed",
3182                    "SyncDataConn 2 dropped",
3183                    "AsyncDataConn 1 closed",
3184                    "AsyncDataConn 1 dropped",
3185                    "SyncDataSrc 4 closed",
3186                    "SyncDataSrc 4 dropped",
3187                    "AsyncDataSrc 3 closed",
3188                    "AsyncDataSrc 3 dropped",
3189                    "SyncDataSrc 2 closed",
3190                    "SyncDataSrc 2 dropped",
3191                    "AsyncDataSrc 1 closed",
3192                    "AsyncDataSrc 1 dropped",
3193                ],
3194            );
3195        }
3196
3197        #[test]
3198        fn test_commit_but_fail_local_async() {
3199            let _unused = TEST_SEQ.lock().unwrap();
3200            clear_global_data_srcs_fixed();
3201
3202            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
3203
3204            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
3205            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
3206
3207            if let Ok(_auto_shutdown) = setup() {
3208                let mut hub = DataHub::new();
3209                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Commit));
3210                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
3211
3212                if let Ok(_) = hub.begin() {
3213                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
3214                        assert_eq!(
3215                            any::type_name_of_val(conn1),
3216                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
3217                        );
3218                    } else {
3219                        panic!();
3220                    }
3221                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
3222                        assert_eq!(
3223                            any::type_name_of_val(conn2),
3224                            "sabi::data_hub::tests_data_hub::SyncDataConn"
3225                        );
3226                    } else {
3227                        panic!();
3228                    }
3229                    if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
3230                        assert_eq!(
3231                            any::type_name_of_val(conn3),
3232                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
3233                        );
3234                    } else {
3235                        panic!();
3236                    }
3237                    if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
3238                        assert_eq!(
3239                            any::type_name_of_val(conn4),
3240                            "sabi::data_hub::tests_data_hub::SyncDataConn"
3241                        );
3242                    } else {
3243                        panic!();
3244                    }
3245
3246                    match hub.commit() {
3247                        Ok(_) => panic!(),
3248                        Err(err) => match err.reason::<DataHubError>() {
3249                            Ok(r) => match r {
3250                                DataHubError::FailToCommitDataConn { errors } => {
3251                                    assert_eq!(errors.len(), 1);
3252                                    if let Some(e) = errors.get("baz") {
3253                                        if let Ok(s) = e.reason::<String>() {
3254                                            assert_eq!(s, "VVV");
3255                                        } else {
3256                                            panic!();
3257                                        }
3258                                    } else {
3259                                        panic!();
3260                                    }
3261                                }
3262                                _ => panic!(),
3263                            },
3264                            Err(_) => panic!(),
3265                        },
3266                    }
3267
3268                    hub.end();
3269                } else {
3270                    panic!();
3271                }
3272            } else {
3273                panic!();
3274            }
3275
3276            assert_eq!(
3277                *logger.lock().unwrap(),
3278                vec![
3279                    "SyncDataSrc 2 setupped",
3280                    "AsyncDataSrc 1 setupped",
3281                    "SyncDataSrc 4 setupped",
3282                    "AsyncDataSrc 3 setupped",
3283                    "AsyncDataSrc 1 created DataConn",
3284                    "SyncDataSrc 2 created DataConn",
3285                    "AsyncDataSrc 3 created DataConn",
3286                    "SyncDataSrc 4 created DataConn",
3287                    "AsyncDataConn 1 pre committed",
3288                    "SyncDataConn 2 pre committed",
3289                    "AsyncDataConn 3 pre committed",
3290                    "SyncDataConn 4 pre committed",
3291                    "AsyncDataConn 1 committed",
3292                    "SyncDataConn 2 committed",
3293                    "AsyncDataConn 3 failed to commit",
3294                    "SyncDataConn 4 closed",
3295                    "SyncDataConn 4 dropped",
3296                    "AsyncDataConn 3 closed",
3297                    "AsyncDataConn 3 dropped",
3298                    "SyncDataConn 2 closed",
3299                    "SyncDataConn 2 dropped",
3300                    "AsyncDataConn 1 closed",
3301                    "AsyncDataConn 1 dropped",
3302                    "SyncDataSrc 4 closed",
3303                    "SyncDataSrc 4 dropped",
3304                    "AsyncDataSrc 3 closed",
3305                    "AsyncDataSrc 3 dropped",
3306                    "SyncDataSrc 2 closed",
3307                    "SyncDataSrc 2 dropped",
3308                    "AsyncDataSrc 1 closed",
3309                    "AsyncDataSrc 1 dropped",
3310                ],
3311            );
3312        }
3313
3314        #[test]
3315        fn test_pre_commit_but_fail_global_sync() {
3316            let _unused = TEST_SEQ.lock().unwrap();
3317            clear_global_data_srcs_fixed();
3318
3319            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
3320
3321            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
3322            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::PreCommit));
3323
3324            if let Ok(_auto_shutdown) = setup() {
3325                let mut hub = DataHub::new();
3326                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
3327                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
3328
3329                if let Ok(_) = hub.begin() {
3330                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
3331                        assert_eq!(
3332                            any::type_name_of_val(conn1),
3333                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
3334                        );
3335                    } else {
3336                        panic!();
3337                    }
3338                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
3339                        assert_eq!(
3340                            any::type_name_of_val(conn2),
3341                            "sabi::data_hub::tests_data_hub::SyncDataConn"
3342                        );
3343                    } else {
3344                        panic!();
3345                    }
3346                    if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
3347                        assert_eq!(
3348                            any::type_name_of_val(conn3),
3349                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
3350                        );
3351                    } else {
3352                        panic!();
3353                    }
3354                    if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
3355                        assert_eq!(
3356                            any::type_name_of_val(conn4),
3357                            "sabi::data_hub::tests_data_hub::SyncDataConn"
3358                        );
3359                    } else {
3360                        panic!();
3361                    }
3362
3363                    match hub.commit() {
3364                        Ok(_) => panic!(),
3365                        Err(err) => match err.reason::<DataHubError>() {
3366                            Ok(r) => match r {
3367                                DataHubError::FailToPreCommitDataConn { errors } => {
3368                                    assert_eq!(errors.len(), 1);
3369                                    if let Some(e) = errors.get("bar") {
3370                                        if let Ok(s) = e.reason::<String>() {
3371                                            assert_eq!(s, "zzz");
3372                                        } else {
3373                                            panic!();
3374                                        }
3375                                    } else {
3376                                        panic!();
3377                                    }
3378                                }
3379                                _ => panic!(),
3380                            },
3381                            Err(_) => panic!(),
3382                        },
3383                    }
3384
3385                    hub.end();
3386                } else {
3387                    panic!();
3388                }
3389            } else {
3390                panic!();
3391            }
3392
3393            assert_eq!(
3394                *logger.lock().unwrap(),
3395                vec![
3396                    "SyncDataSrc 2 setupped",
3397                    "AsyncDataSrc 1 setupped",
3398                    "SyncDataSrc 4 setupped",
3399                    "AsyncDataSrc 3 setupped",
3400                    "AsyncDataSrc 1 created DataConn",
3401                    "SyncDataSrc 2 created DataConn",
3402                    "AsyncDataSrc 3 created DataConn",
3403                    "SyncDataSrc 4 created DataConn",
3404                    "AsyncDataConn 1 pre committed",
3405                    "SyncDataConn 2 failed to pre commit",
3406                    "SyncDataConn 4 closed",
3407                    "SyncDataConn 4 dropped",
3408                    "AsyncDataConn 3 closed",
3409                    "AsyncDataConn 3 dropped",
3410                    "SyncDataConn 2 closed",
3411                    "SyncDataConn 2 dropped",
3412                    "AsyncDataConn 1 closed",
3413                    "AsyncDataConn 1 dropped",
3414                    "SyncDataSrc 4 closed",
3415                    "SyncDataSrc 4 dropped",
3416                    "AsyncDataSrc 3 closed",
3417                    "AsyncDataSrc 3 dropped",
3418                    "SyncDataSrc 2 closed",
3419                    "SyncDataSrc 2 dropped",
3420                    "AsyncDataSrc 1 closed",
3421                    "AsyncDataSrc 1 dropped",
3422                ],
3423            );
3424        }
3425
3426        #[test]
3427        fn test_pre_commit_but_fail_global_async() {
3428            let _unused = TEST_SEQ.lock().unwrap();
3429            clear_global_data_srcs_fixed();
3430
3431            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
3432
3433            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::PreCommit));
3434            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
3435
3436            if let Ok(_auto_shutdown) = setup() {
3437                let mut hub = DataHub::new();
3438                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
3439                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
3440
3441                if let Ok(_) = hub.begin() {
3442                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
3443                        assert_eq!(
3444                            any::type_name_of_val(conn1),
3445                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
3446                        );
3447                    } else {
3448                        panic!();
3449                    }
3450                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
3451                        assert_eq!(
3452                            any::type_name_of_val(conn2),
3453                            "sabi::data_hub::tests_data_hub::SyncDataConn"
3454                        );
3455                    } else {
3456                        panic!();
3457                    }
3458                    if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
3459                        assert_eq!(
3460                            any::type_name_of_val(conn3),
3461                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
3462                        );
3463                    } else {
3464                        panic!();
3465                    }
3466                    if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
3467                        assert_eq!(
3468                            any::type_name_of_val(conn4),
3469                            "sabi::data_hub::tests_data_hub::SyncDataConn"
3470                        );
3471                    } else {
3472                        panic!();
3473                    }
3474
3475                    match hub.commit() {
3476                        Ok(_) => panic!(),
3477                        Err(err) => match err.reason::<DataHubError>() {
3478                            Ok(r) => match r {
3479                                DataHubError::FailToPreCommitDataConn { errors } => {
3480                                    assert_eq!(errors.len(), 1);
3481                                    if let Some(e) = errors.get("foo") {
3482                                        if let Ok(s) = e.reason::<String>() {
3483                                            assert_eq!(s, "vvv");
3484                                        } else {
3485                                            panic!();
3486                                        }
3487                                    } else {
3488                                        panic!();
3489                                    }
3490                                }
3491                                _ => panic!(),
3492                            },
3493                            Err(_) => panic!(),
3494                        },
3495                    }
3496
3497                    hub.end();
3498                } else {
3499                    panic!();
3500                }
3501            } else {
3502                panic!();
3503            }
3504
3505            assert_eq!(
3506                *logger.lock().unwrap(),
3507                vec![
3508                    "SyncDataSrc 2 setupped",
3509                    "AsyncDataSrc 1 setupped",
3510                    "SyncDataSrc 4 setupped",
3511                    "AsyncDataSrc 3 setupped",
3512                    "AsyncDataSrc 1 created DataConn",
3513                    "SyncDataSrc 2 created DataConn",
3514                    "AsyncDataSrc 3 created DataConn",
3515                    "SyncDataSrc 4 created DataConn",
3516                    "AsyncDataConn 1 failed to pre commit",
3517                    "SyncDataConn 4 closed",
3518                    "SyncDataConn 4 dropped",
3519                    "AsyncDataConn 3 closed",
3520                    "AsyncDataConn 3 dropped",
3521                    "SyncDataConn 2 closed",
3522                    "SyncDataConn 2 dropped",
3523                    "AsyncDataConn 1 closed",
3524                    "AsyncDataConn 1 dropped",
3525                    "SyncDataSrc 4 closed",
3526                    "SyncDataSrc 4 dropped",
3527                    "AsyncDataSrc 3 closed",
3528                    "AsyncDataSrc 3 dropped",
3529                    "SyncDataSrc 2 closed",
3530                    "SyncDataSrc 2 dropped",
3531                    "AsyncDataSrc 1 closed",
3532                    "AsyncDataSrc 1 dropped",
3533                ],
3534            );
3535        }
3536
3537        #[test]
3538        fn test_pre_commit_but_fail_local_sync() {
3539            let _unused = TEST_SEQ.lock().unwrap();
3540            clear_global_data_srcs_fixed();
3541
3542            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
3543
3544            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
3545            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
3546
3547            if let Ok(_auto_shutdown) = setup() {
3548                let mut hub = DataHub::new();
3549                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
3550                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::PreCommit));
3551
3552                if let Ok(_) = hub.begin() {
3553                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
3554                        assert_eq!(
3555                            any::type_name_of_val(conn1),
3556                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
3557                        );
3558                    } else {
3559                        panic!();
3560                    }
3561                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
3562                        assert_eq!(
3563                            any::type_name_of_val(conn2),
3564                            "sabi::data_hub::tests_data_hub::SyncDataConn"
3565                        );
3566                    } else {
3567                        panic!();
3568                    }
3569                    if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
3570                        assert_eq!(
3571                            any::type_name_of_val(conn3),
3572                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
3573                        );
3574                    } else {
3575                        panic!();
3576                    }
3577                    if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
3578                        assert_eq!(
3579                            any::type_name_of_val(conn4),
3580                            "sabi::data_hub::tests_data_hub::SyncDataConn"
3581                        );
3582                    } else {
3583                        panic!();
3584                    }
3585
3586                    match hub.commit() {
3587                        Ok(_) => panic!(),
3588                        Err(err) => match err.reason::<DataHubError>() {
3589                            Ok(r) => match r {
3590                                DataHubError::FailToPreCommitDataConn { errors } => {
3591                                    assert_eq!(errors.len(), 1);
3592                                    if let Some(e) = errors.get("qux") {
3593                                        if let Ok(s) = e.reason::<String>() {
3594                                            assert_eq!(s, "zzz");
3595                                        } else {
3596                                            panic!();
3597                                        }
3598                                    } else {
3599                                        panic!();
3600                                    }
3601                                }
3602                                _ => panic!(),
3603                            },
3604                            Err(_) => panic!(),
3605                        },
3606                    }
3607
3608                    hub.end();
3609                } else {
3610                    panic!();
3611                }
3612            } else {
3613                panic!();
3614            }
3615
3616            assert_eq!(
3617                *logger.lock().unwrap(),
3618                vec![
3619                    "SyncDataSrc 2 setupped",
3620                    "AsyncDataSrc 1 setupped",
3621                    "SyncDataSrc 4 setupped",
3622                    "AsyncDataSrc 3 setupped",
3623                    "AsyncDataSrc 1 created DataConn",
3624                    "SyncDataSrc 2 created DataConn",
3625                    "AsyncDataSrc 3 created DataConn",
3626                    "SyncDataSrc 4 created DataConn",
3627                    "AsyncDataConn 1 pre committed",
3628                    "SyncDataConn 2 pre committed",
3629                    "AsyncDataConn 3 pre committed",
3630                    "SyncDataConn 4 failed to pre commit",
3631                    "SyncDataConn 4 closed",
3632                    "SyncDataConn 4 dropped",
3633                    "AsyncDataConn 3 closed",
3634                    "AsyncDataConn 3 dropped",
3635                    "SyncDataConn 2 closed",
3636                    "SyncDataConn 2 dropped",
3637                    "AsyncDataConn 1 closed",
3638                    "AsyncDataConn 1 dropped",
3639                    "SyncDataSrc 4 closed",
3640                    "SyncDataSrc 4 dropped",
3641                    "AsyncDataSrc 3 closed",
3642                    "AsyncDataSrc 3 dropped",
3643                    "SyncDataSrc 2 closed",
3644                    "SyncDataSrc 2 dropped",
3645                    "AsyncDataSrc 1 closed",
3646                    "AsyncDataSrc 1 dropped",
3647                ],
3648            );
3649        }
3650
3651        #[test]
3652        fn test_pre_commit_but_fail_local_async() {
3653            let _unused = TEST_SEQ.lock().unwrap();
3654            clear_global_data_srcs_fixed();
3655
3656            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
3657
3658            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
3659            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
3660
3661            if let Ok(_auto_shutdown) = setup() {
3662                let mut hub = DataHub::new();
3663                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::PreCommit));
3664                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
3665
3666                if let Ok(_) = hub.begin() {
3667                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
3668                        assert_eq!(
3669                            any::type_name_of_val(conn1),
3670                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
3671                        );
3672                    } else {
3673                        panic!();
3674                    }
3675                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
3676                        assert_eq!(
3677                            any::type_name_of_val(conn2),
3678                            "sabi::data_hub::tests_data_hub::SyncDataConn"
3679                        );
3680                    } else {
3681                        panic!();
3682                    }
3683                    if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
3684                        assert_eq!(
3685                            any::type_name_of_val(conn3),
3686                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
3687                        );
3688                    } else {
3689                        panic!();
3690                    }
3691                    if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
3692                        assert_eq!(
3693                            any::type_name_of_val(conn4),
3694                            "sabi::data_hub::tests_data_hub::SyncDataConn"
3695                        );
3696                    } else {
3697                        panic!();
3698                    }
3699
3700                    match hub.commit() {
3701                        Ok(_) => panic!(),
3702                        Err(err) => match err.reason::<DataHubError>() {
3703                            Ok(r) => match r {
3704                                DataHubError::FailToPreCommitDataConn { errors } => {
3705                                    assert_eq!(errors.len(), 1);
3706                                    if let Some(e) = errors.get("baz") {
3707                                        if let Ok(s) = e.reason::<String>() {
3708                                            assert_eq!(s, "vvv");
3709                                        } else {
3710                                            panic!();
3711                                        }
3712                                    } else {
3713                                        panic!();
3714                                    }
3715                                }
3716                                _ => panic!(),
3717                            },
3718                            Err(_) => panic!(),
3719                        },
3720                    }
3721
3722                    hub.end();
3723                } else {
3724                    panic!();
3725                }
3726            } else {
3727                panic!();
3728            }
3729
3730            assert_eq!(
3731                *logger.lock().unwrap(),
3732                vec![
3733                    "SyncDataSrc 2 setupped",
3734                    "AsyncDataSrc 1 setupped",
3735                    "SyncDataSrc 4 setupped",
3736                    "AsyncDataSrc 3 setupped",
3737                    "AsyncDataSrc 1 created DataConn",
3738                    "SyncDataSrc 2 created DataConn",
3739                    "AsyncDataSrc 3 created DataConn",
3740                    "SyncDataSrc 4 created DataConn",
3741                    "AsyncDataConn 1 pre committed",
3742                    "SyncDataConn 2 pre committed",
3743                    "AsyncDataConn 3 failed to pre commit",
3744                    "SyncDataConn 4 closed",
3745                    "SyncDataConn 4 dropped",
3746                    "AsyncDataConn 3 closed",
3747                    "AsyncDataConn 3 dropped",
3748                    "SyncDataConn 2 closed",
3749                    "SyncDataConn 2 dropped",
3750                    "AsyncDataConn 1 closed",
3751                    "AsyncDataConn 1 dropped",
3752                    "SyncDataSrc 4 closed",
3753                    "SyncDataSrc 4 dropped",
3754                    "AsyncDataSrc 3 closed",
3755                    "AsyncDataSrc 3 dropped",
3756                    "SyncDataSrc 2 closed",
3757                    "SyncDataSrc 2 dropped",
3758                    "AsyncDataSrc 1 closed",
3759                    "AsyncDataSrc 1 dropped",
3760                ],
3761            );
3762        }
3763
3764        #[test]
3765        fn test_rollback() {
3766            let _unused = TEST_SEQ.lock().unwrap();
3767            clear_global_data_srcs_fixed();
3768
3769            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
3770
3771            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
3772            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
3773
3774            if let Ok(_auto_shutdown) = setup() {
3775                let mut hub = DataHub::new();
3776                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
3777                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
3778
3779                if let Ok(_) = hub.begin() {
3780                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
3781                        assert_eq!(
3782                            any::type_name_of_val(conn1),
3783                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
3784                        );
3785                    } else {
3786                        panic!();
3787                    }
3788                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
3789                        assert_eq!(
3790                            any::type_name_of_val(conn2),
3791                            "sabi::data_hub::tests_data_hub::SyncDataConn"
3792                        );
3793                    } else {
3794                        panic!();
3795                    }
3796                    if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
3797                        assert_eq!(
3798                            any::type_name_of_val(conn3),
3799                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
3800                        );
3801                    } else {
3802                        panic!();
3803                    }
3804                    if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
3805                        assert_eq!(
3806                            any::type_name_of_val(conn4),
3807                            "sabi::data_hub::tests_data_hub::SyncDataConn"
3808                        );
3809                    } else {
3810                        panic!();
3811                    }
3812
3813                    hub.rollback();
3814                    hub.end();
3815                } else {
3816                    panic!();
3817                }
3818            } else {
3819                panic!();
3820            }
3821
3822            assert_eq!(
3823                *logger.lock().unwrap(),
3824                vec![
3825                    "SyncDataSrc 2 setupped",
3826                    "AsyncDataSrc 1 setupped",
3827                    "SyncDataSrc 4 setupped",
3828                    "AsyncDataSrc 3 setupped",
3829                    "AsyncDataSrc 1 created DataConn",
3830                    "SyncDataSrc 2 created DataConn",
3831                    "AsyncDataSrc 3 created DataConn",
3832                    "SyncDataSrc 4 created DataConn",
3833                    "AsyncDataConn 1 rollbacked",
3834                    "SyncDataConn 2 rollbacked",
3835                    "AsyncDataConn 3 rollbacked",
3836                    "SyncDataConn 4 rollbacked",
3837                    "SyncDataConn 4 closed",
3838                    "SyncDataConn 4 dropped",
3839                    "AsyncDataConn 3 closed",
3840                    "AsyncDataConn 3 dropped",
3841                    "SyncDataConn 2 closed",
3842                    "SyncDataConn 2 dropped",
3843                    "AsyncDataConn 1 closed",
3844                    "AsyncDataConn 1 dropped",
3845                    "SyncDataSrc 4 closed",
3846                    "SyncDataSrc 4 dropped",
3847                    "AsyncDataSrc 3 closed",
3848                    "AsyncDataSrc 3 dropped",
3849                    "SyncDataSrc 2 closed",
3850                    "SyncDataSrc 2 dropped",
3851                    "AsyncDataSrc 1 closed",
3852                    "AsyncDataSrc 1 dropped",
3853                ],
3854            );
3855        }
3856
3857        #[test]
3858        fn test_force_back() {
3859            let _unused = TEST_SEQ.lock().unwrap();
3860            clear_global_data_srcs_fixed();
3861
3862            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
3863
3864            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
3865            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
3866
3867            if let Ok(_auto_shutdown) = setup() {
3868                let mut hub = DataHub::new();
3869                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
3870                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
3871
3872                if let Ok(_) = hub.begin() {
3873                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
3874                        assert_eq!(
3875                            any::type_name_of_val(conn1),
3876                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
3877                        );
3878                    } else {
3879                        panic!();
3880                    }
3881                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
3882                        assert_eq!(
3883                            any::type_name_of_val(conn2),
3884                            "sabi::data_hub::tests_data_hub::SyncDataConn"
3885                        );
3886                    } else {
3887                        panic!();
3888                    }
3889                    if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
3890                        assert_eq!(
3891                            any::type_name_of_val(conn3),
3892                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
3893                        );
3894                    } else {
3895                        panic!();
3896                    }
3897                    if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
3898                        assert_eq!(
3899                            any::type_name_of_val(conn4),
3900                            "sabi::data_hub::tests_data_hub::SyncDataConn"
3901                        );
3902                    } else {
3903                        panic!();
3904                    }
3905
3906                    assert!(hub.commit().is_ok());
3907                    hub.rollback();
3908                    hub.end();
3909                } else {
3910                    panic!();
3911                }
3912            } else {
3913                panic!();
3914            }
3915
3916            assert_eq!(
3917                *logger.lock().unwrap(),
3918                vec![
3919                    "SyncDataSrc 2 setupped",
3920                    "AsyncDataSrc 1 setupped",
3921                    "SyncDataSrc 4 setupped",
3922                    "AsyncDataSrc 3 setupped",
3923                    "AsyncDataSrc 1 created DataConn",
3924                    "SyncDataSrc 2 created DataConn",
3925                    "AsyncDataSrc 3 created DataConn",
3926                    "SyncDataSrc 4 created DataConn",
3927                    "AsyncDataConn 1 pre committed",
3928                    "SyncDataConn 2 pre committed",
3929                    "AsyncDataConn 3 pre committed",
3930                    "SyncDataConn 4 pre committed",
3931                    "AsyncDataConn 1 committed",
3932                    "SyncDataConn 2 committed",
3933                    "AsyncDataConn 3 committed",
3934                    "SyncDataConn 4 committed",
3935                    "AsyncDataConn 1 post committed",
3936                    "SyncDataConn 2 post committed",
3937                    "AsyncDataConn 3 post committed",
3938                    "SyncDataConn 4 post committed",
3939                    "AsyncDataConn 1 forced back",
3940                    "SyncDataConn 2 forced back",
3941                    "AsyncDataConn 3 forced back",
3942                    "SyncDataConn 4 forced back",
3943                    "SyncDataConn 4 closed",
3944                    "SyncDataConn 4 dropped",
3945                    "AsyncDataConn 3 closed",
3946                    "AsyncDataConn 3 dropped",
3947                    "SyncDataConn 2 closed",
3948                    "SyncDataConn 2 dropped",
3949                    "AsyncDataConn 1 closed",
3950                    "AsyncDataConn 1 dropped",
3951                    "SyncDataSrc 4 closed",
3952                    "SyncDataSrc 4 dropped",
3953                    "AsyncDataSrc 3 closed",
3954                    "AsyncDataSrc 3 dropped",
3955                    "SyncDataSrc 2 closed",
3956                    "SyncDataSrc 2 dropped",
3957                    "AsyncDataSrc 1 closed",
3958                    "AsyncDataSrc 1 dropped",
3959                ],
3960            );
3961        }
3962
3963        #[tokio::test]
3964        async fn async_test_new_and_close_with_global_data_srcs() {
3965            let _unused = TEST_SEQ.lock().unwrap();
3966            clear_global_data_srcs_fixed();
3967
3968            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
3969
3970            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
3971            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
3972
3973            if let Ok(_auto_shutdown) = setup_async().await {
3974                #[allow(static_mut_refs)]
3975                unsafe {
3976                    let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
3977                    assert!(ptr.is_null());
3978
3979                    let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
3980                    assert!(!ptr.is_null());
3981                    assert_eq!((*ptr).name, "foo");
3982                    ptr = (*ptr).next;
3983                    assert!(!ptr.is_null());
3984                    assert_eq!((*ptr).name, "bar");
3985                    ptr = (*ptr).next;
3986                    assert!(ptr.is_null());
3987                }
3988
3989                let hub = DataHub::new();
3990
3991                assert!(hub.local_data_src_list.not_setup_head().is_null());
3992                assert!(hub.local_data_src_list.did_setup_head().is_null());
3993                assert!(hub.data_conn_list.head().is_null());
3994                assert_eq!(hub.data_src_map.len(), 2);
3995                assert_eq!(hub.data_conn_map.len(), 0);
3996                assert_eq!(hub.fixed, false);
3997
3998                #[allow(static_mut_refs)]
3999                let mut ptr = unsafe { GLOBAL_DATA_SRC_LIST.did_setup_head() };
4000                assert!(!ptr.is_null());
4001                ptr = unsafe { (*ptr).next };
4002                assert!(!ptr.is_null());
4003                ptr = unsafe { (*ptr).next };
4004                assert!(ptr.is_null());
4005                #[allow(static_mut_refs)]
4006                let ptr = unsafe { GLOBAL_DATA_SRC_LIST.not_setup_head() };
4007                assert!(ptr.is_null());
4008            } else {
4009                panic!();
4010            }
4011
4012            #[allow(static_mut_refs)]
4013            let ptr = unsafe { GLOBAL_DATA_SRC_LIST.did_setup_head() };
4014            assert!(ptr.is_null());
4015            #[allow(static_mut_refs)]
4016            let ptr = unsafe { GLOBAL_DATA_SRC_LIST.not_setup_head() };
4017            assert!(ptr.is_null());
4018
4019            assert_eq!(
4020                *logger.lock().unwrap(),
4021                vec![
4022                    "SyncDataSrc 2 setupped",
4023                    "AsyncDataSrc 1 setupped",
4024                    "SyncDataSrc 2 closed",
4025                    "SyncDataSrc 2 dropped",
4026                    "AsyncDataSrc 1 closed",
4027                    "AsyncDataSrc 1 dropped",
4028                ]
4029            );
4030        }
4031
4032        #[tokio::test]
4033        async fn async_test_uses_and_disuses() {
4034            let _unused = TEST_SEQ.lock().unwrap();
4035            clear_global_data_srcs_fixed();
4036
4037            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
4038
4039            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
4040            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
4041
4042            if let Ok(_auto_shutdown) = setup_async().await {
4043                let mut hub = DataHub::new();
4044
4045                assert!(hub.local_data_src_list.not_setup_head().is_null());
4046                assert!(hub.local_data_src_list.did_setup_head().is_null());
4047                assert!(hub.data_conn_list.head().is_null());
4048                assert_eq!(hub.data_src_map.len(), 2);
4049                assert_eq!(hub.data_conn_map.len(), 0);
4050                assert_eq!(hub.fixed, false);
4051
4052                hub.uses("baz", SyncDataSrc::new(3, logger.clone(), Fail::Not));
4053                let mut ptr = hub.local_data_src_list.not_setup_head();
4054                assert!(!ptr.is_null());
4055                ptr = unsafe { (*ptr).next };
4056                assert!(ptr.is_null());
4057                assert!(hub.local_data_src_list.did_setup_head().is_null());
4058                assert!(hub.data_conn_list.head().is_null());
4059                assert_eq!(hub.data_src_map.len(), 2);
4060                assert_eq!(hub.data_conn_map.len(), 0);
4061                assert_eq!(hub.fixed, false);
4062
4063                hub.uses("qux", AsyncDataSrc::new(4, logger.clone(), Fail::Not));
4064                let mut ptr = hub.local_data_src_list.not_setup_head();
4065                assert!(!ptr.is_null());
4066                ptr = unsafe { (*ptr).next };
4067                assert!(!ptr.is_null());
4068                ptr = unsafe { (*ptr).next };
4069                assert!(ptr.is_null());
4070                assert!(hub.local_data_src_list.did_setup_head().is_null());
4071                assert!(hub.data_conn_list.head().is_null());
4072                assert_eq!(hub.data_src_map.len(), 2);
4073                assert_eq!(hub.data_conn_map.len(), 0);
4074                assert_eq!(hub.fixed, false);
4075
4076                hub.disuses("foo"); // do nothing because of global
4077                hub.disuses("bar"); // do nothing because of global
4078                let mut ptr = hub.local_data_src_list.not_setup_head();
4079                assert!(!ptr.is_null());
4080                ptr = unsafe { (*ptr).next };
4081                assert!(!ptr.is_null());
4082                ptr = unsafe { (*ptr).next };
4083                assert!(ptr.is_null());
4084                assert!(hub.local_data_src_list.did_setup_head().is_null());
4085                assert!(hub.data_conn_list.head().is_null());
4086                assert_eq!(hub.data_src_map.len(), 2);
4087                assert_eq!(hub.data_conn_map.len(), 0);
4088                assert_eq!(hub.fixed, false);
4089
4090                hub.disuses("baz");
4091                let mut ptr = hub.local_data_src_list.not_setup_head();
4092                assert!(!ptr.is_null());
4093                ptr = unsafe { (*ptr).next };
4094                assert!(ptr.is_null());
4095                assert!(hub.local_data_src_list.did_setup_head().is_null());
4096                assert!(hub.data_conn_list.head().is_null());
4097                assert_eq!(hub.data_src_map.len(), 2);
4098                assert_eq!(hub.data_conn_map.len(), 0);
4099                assert_eq!(hub.fixed, false);
4100
4101                hub.disuses("qux");
4102                let ptr = hub.local_data_src_list.not_setup_head();
4103                assert!(ptr.is_null());
4104                assert!(hub.local_data_src_list.did_setup_head().is_null());
4105                assert!(hub.data_conn_list.head().is_null());
4106                assert_eq!(hub.data_src_map.len(), 2);
4107                assert_eq!(hub.data_conn_map.len(), 0);
4108                assert_eq!(hub.fixed, false);
4109            } else {
4110                panic!();
4111            }
4112
4113            assert_eq!(
4114                *logger.lock().unwrap(),
4115                vec![
4116                    "SyncDataSrc 2 setupped",
4117                    "AsyncDataSrc 1 setupped",
4118                    "SyncDataSrc 3 closed",
4119                    "SyncDataSrc 3 dropped",
4120                    "AsyncDataSrc 4 closed",
4121                    "AsyncDataSrc 4 dropped",
4122                    "SyncDataSrc 2 closed",
4123                    "SyncDataSrc 2 dropped",
4124                    "AsyncDataSrc 1 closed",
4125                    "AsyncDataSrc 1 dropped",
4126                ]
4127            );
4128        }
4129
4130        #[tokio::test]
4131        async fn async_test_cannot_add_and_remove_data_src_between_begin_and_end() {
4132            let _unused = TEST_SEQ.lock().unwrap();
4133            clear_global_data_srcs_fixed();
4134
4135            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
4136
4137            if let Ok(_auto_shutdown) = setup_async().await {
4138                let mut hub = DataHub::new();
4139
4140                let ptr = hub.local_data_src_list.not_setup_head();
4141                assert!(ptr.is_null());
4142                let ptr = hub.local_data_src_list.did_setup_head();
4143                assert!(ptr.is_null());
4144                assert!(hub.data_conn_list.head().is_null());
4145                assert_eq!(hub.data_src_map.len(), 0);
4146                assert_eq!(hub.data_conn_map.len(), 0);
4147                assert_eq!(hub.fixed, false);
4148
4149                hub.uses("baz", SyncDataSrc::new(1, logger.clone(), Fail::Not));
4150
4151                let mut ptr = hub.local_data_src_list.not_setup_head();
4152                assert!(!ptr.is_null());
4153                ptr = unsafe { (*ptr).next };
4154                assert!(ptr.is_null());
4155                let ptr = hub.local_data_src_list.did_setup_head();
4156                assert!(ptr.is_null());
4157                assert!(hub.data_conn_list.head().is_null());
4158                assert_eq!(hub.data_src_map.len(), 0);
4159                assert_eq!(hub.data_conn_map.len(), 0);
4160                assert_eq!(hub.fixed, false);
4161
4162                assert!(hub.begin_async().await.is_ok());
4163
4164                let ptr = hub.local_data_src_list.not_setup_head();
4165                assert!(ptr.is_null());
4166                let mut ptr = hub.local_data_src_list.did_setup_head();
4167                assert!(!ptr.is_null());
4168                ptr = unsafe { (*ptr).next };
4169                assert!(ptr.is_null());
4170                assert!(hub.data_conn_list.head().is_null());
4171                assert_eq!(hub.data_src_map.len(), 1);
4172                assert_eq!(hub.data_conn_map.len(), 0);
4173                assert_eq!(hub.fixed, true);
4174
4175                hub.uses("foo", AsyncDataSrc::new(2, logger.clone(), Fail::Not));
4176
4177                let ptr = hub.local_data_src_list.not_setup_head();
4178                assert!(ptr.is_null());
4179                let mut ptr = hub.local_data_src_list.did_setup_head();
4180                assert!(!ptr.is_null());
4181                ptr = unsafe { (*ptr).next };
4182                assert!(ptr.is_null());
4183                assert!(hub.data_conn_list.head().is_null());
4184                assert_eq!(hub.data_src_map.len(), 1);
4185                assert_eq!(hub.data_conn_map.len(), 0);
4186                assert_eq!(hub.fixed, true);
4187
4188                hub.disuses("baz");
4189
4190                let ptr = hub.local_data_src_list.not_setup_head();
4191                assert!(ptr.is_null());
4192                let mut ptr = hub.local_data_src_list.did_setup_head();
4193                assert!(!ptr.is_null());
4194                ptr = unsafe { (*ptr).next };
4195                assert!(ptr.is_null());
4196                assert!(hub.data_conn_list.head().is_null());
4197                assert_eq!(hub.data_src_map.len(), 1);
4198                assert_eq!(hub.data_conn_map.len(), 0);
4199                assert_eq!(hub.fixed, true);
4200
4201                hub.end();
4202
4203                let ptr = hub.local_data_src_list.not_setup_head();
4204                assert!(ptr.is_null());
4205                let mut ptr = hub.local_data_src_list.did_setup_head();
4206                assert!(!ptr.is_null());
4207                ptr = unsafe { (*ptr).next };
4208                assert!(ptr.is_null());
4209                assert!(hub.data_conn_list.head().is_null());
4210                assert_eq!(hub.data_src_map.len(), 1);
4211                assert_eq!(hub.data_conn_map.len(), 0);
4212                assert_eq!(hub.fixed, false);
4213
4214                hub.uses("foo", AsyncDataSrc::new(2, logger.clone(), Fail::Not));
4215
4216                let mut ptr = hub.local_data_src_list.not_setup_head();
4217                assert!(!ptr.is_null());
4218                ptr = unsafe { (*ptr).next };
4219                assert!(ptr.is_null());
4220                let mut ptr = hub.local_data_src_list.did_setup_head();
4221                assert!(!ptr.is_null());
4222                ptr = unsafe { (*ptr).next };
4223                assert!(ptr.is_null());
4224                assert!(hub.data_conn_list.head().is_null());
4225                assert_eq!(hub.data_src_map.len(), 1);
4226                assert_eq!(hub.data_conn_map.len(), 0);
4227                assert_eq!(hub.fixed, false);
4228
4229                hub.disuses("baz");
4230
4231                let mut ptr = hub.local_data_src_list.not_setup_head();
4232                assert!(!ptr.is_null());
4233                ptr = unsafe { (*ptr).next };
4234                assert!(ptr.is_null());
4235                let ptr = hub.local_data_src_list.did_setup_head();
4236                assert!(ptr.is_null());
4237                assert!(hub.data_conn_list.head().is_null());
4238                assert_eq!(hub.data_src_map.len(), 0);
4239                assert_eq!(hub.data_conn_map.len(), 0);
4240                assert_eq!(hub.fixed, false);
4241            }
4242        }
4243
4244        #[tokio::test]
4245        async fn async_test_begin_and_end() {
4246            let _unused = TEST_SEQ.lock().unwrap();
4247            clear_global_data_srcs_fixed();
4248
4249            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
4250
4251            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
4252            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
4253
4254            if let Ok(_auto_shutdown) = setup_async().await {
4255                let mut hub = DataHub::new();
4256
4257                hub.uses("baz", SyncDataSrc::new(3, logger.clone(), Fail::Not));
4258                hub.uses("qux", AsyncDataSrc::new(4, logger.clone(), Fail::Not));
4259
4260                let mut ptr = hub.local_data_src_list.not_setup_head();
4261                assert!(!ptr.is_null());
4262                ptr = unsafe { (*ptr).next };
4263                assert!(!ptr.is_null());
4264                ptr = unsafe { (*ptr).next };
4265                assert!(ptr.is_null());
4266                assert!(hub.local_data_src_list.did_setup_head().is_null());
4267                assert!(hub.data_conn_list.head().is_null());
4268                assert_eq!(hub.data_src_map.len(), 2);
4269                assert_eq!(hub.data_conn_map.len(), 0);
4270                assert_eq!(hub.fixed, false);
4271
4272                assert!(hub.begin_async().await.is_ok());
4273
4274                assert!(hub.local_data_src_list.not_setup_head().is_null());
4275                let mut ptr = hub.local_data_src_list.did_setup_head();
4276                assert!(!ptr.is_null());
4277                ptr = unsafe { (*ptr).next };
4278                assert!(!ptr.is_null());
4279                ptr = unsafe { (*ptr).next };
4280                assert!(ptr.is_null());
4281                assert!(hub.data_conn_list.head().is_null());
4282                assert_eq!(hub.data_src_map.len(), 4);
4283                assert_eq!(hub.data_conn_map.len(), 0);
4284                assert_eq!(hub.fixed, true);
4285
4286                hub.end();
4287
4288                assert!(hub.local_data_src_list.not_setup_head().is_null());
4289                let mut ptr = hub.local_data_src_list.did_setup_head();
4290                assert!(!ptr.is_null());
4291                ptr = unsafe { (*ptr).next };
4292                assert!(!ptr.is_null());
4293                ptr = unsafe { (*ptr).next };
4294                assert!(ptr.is_null());
4295                assert!(hub.data_conn_list.head().is_null());
4296                assert_eq!(hub.data_src_map.len(), 4);
4297                assert_eq!(hub.data_conn_map.len(), 0);
4298                assert_eq!(hub.fixed, false);
4299            }
4300
4301            assert_eq!(
4302                *logger.lock().unwrap(),
4303                vec![
4304                    "SyncDataSrc 2 setupped",
4305                    "AsyncDataSrc 1 setupped",
4306                    "SyncDataSrc 3 setupped",
4307                    "AsyncDataSrc 4 setupped",
4308                    "AsyncDataSrc 4 closed",
4309                    "AsyncDataSrc 4 dropped",
4310                    "SyncDataSrc 3 closed",
4311                    "SyncDataSrc 3 dropped",
4312                    "SyncDataSrc 2 closed",
4313                    "SyncDataSrc 2 dropped",
4314                    "AsyncDataSrc 1 closed",
4315                    "AsyncDataSrc 1 dropped",
4316                ]
4317            );
4318        }
4319
4320        #[tokio::test]
4321        async fn async_test_begin_and_end_but_fail_sync() {
4322            let _unused = TEST_SEQ.lock().unwrap();
4323            clear_global_data_srcs_fixed();
4324
4325            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
4326
4327            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
4328            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
4329
4330            if let Ok(_auto_shutdown) = setup_async().await {
4331                let mut hub = DataHub::new();
4332                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
4333                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Setup));
4334
4335                if let Err(err) = hub.begin_async().await {
4336                    match err.reason::<DataHubError>() {
4337                        Ok(r) => match r {
4338                            DataHubError::FailToSetupLocalDataSrcs { errors } => {
4339                                assert_eq!(errors.len(), 1);
4340                                if let Some(err) = errors.get("qux") {
4341                                    match err.reason::<String>() {
4342                                        Ok(s) => assert_eq!(s, "XXX"),
4343                                        Err(_) => panic!(),
4344                                    }
4345                                } else {
4346                                    panic!();
4347                                }
4348                            }
4349                            _ => panic!(),
4350                        },
4351                        Err(_) => panic!(),
4352                    }
4353                } else {
4354                    panic!();
4355                }
4356
4357                let mut ptr = hub.local_data_src_list.not_setup_head();
4358                assert!(!ptr.is_null());
4359                ptr = unsafe { (*ptr).next };
4360                assert!(ptr.is_null());
4361                let mut ptr = hub.local_data_src_list.did_setup_head();
4362                assert!(!ptr.is_null());
4363                ptr = unsafe { (*ptr).next };
4364                assert!(ptr.is_null());
4365                assert!(hub.data_conn_list.head().is_null());
4366                assert_eq!(hub.data_src_map.len(), 3);
4367                assert_eq!(hub.data_conn_map.len(), 0);
4368                assert_eq!(hub.fixed, true);
4369
4370                hub.end();
4371
4372                let mut ptr = hub.local_data_src_list.not_setup_head();
4373                assert!(!ptr.is_null());
4374                ptr = unsafe { (*ptr).next };
4375                assert!(ptr.is_null());
4376                let mut ptr = hub.local_data_src_list.did_setup_head();
4377                assert!(!ptr.is_null());
4378                ptr = unsafe { (*ptr).next };
4379                assert!(ptr.is_null());
4380                assert!(hub.data_conn_list.head().is_null());
4381                assert_eq!(hub.data_src_map.len(), 3);
4382                assert_eq!(hub.data_conn_map.len(), 0);
4383                assert_eq!(hub.fixed, false);
4384            } else {
4385                panic!();
4386            }
4387        }
4388
4389        #[tokio::test]
4390        async fn async_test_begin_and_end_but_fail_async() {
4391            let _unused = TEST_SEQ.lock().unwrap();
4392            clear_global_data_srcs_fixed();
4393
4394            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
4395
4396            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
4397            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
4398
4399            if let Ok(_auto_shutdown) = setup_async().await {
4400                let mut hub = DataHub::new();
4401                hub.uses("baz", AsyncDataSrc::new(1, logger.clone(), Fail::Setup));
4402                hub.uses("qux", SyncDataSrc::new(2, logger.clone(), Fail::Not));
4403
4404                if let Err(err) = hub.begin_async().await {
4405                    match err.reason::<DataHubError>() {
4406                        Ok(r) => match r {
4407                            DataHubError::FailToSetupLocalDataSrcs { errors } => {
4408                                assert_eq!(errors.len(), 1);
4409                                if let Some(err) = errors.get("baz") {
4410                                    match err.reason::<String>() {
4411                                        Ok(s) => assert_eq!(s, "YYY"),
4412                                        Err(_) => panic!(),
4413                                    }
4414                                } else {
4415                                    panic!();
4416                                }
4417                            }
4418                            _ => panic!(),
4419                        },
4420                        Err(_) => panic!(),
4421                    }
4422                } else {
4423                    panic!();
4424                }
4425
4426                let mut ptr = hub.local_data_src_list.not_setup_head();
4427                assert!(!ptr.is_null());
4428                ptr = unsafe { (*ptr).next };
4429                assert!(ptr.is_null());
4430                let mut ptr = hub.local_data_src_list.did_setup_head();
4431                assert!(!ptr.is_null());
4432                ptr = unsafe { (*ptr).next };
4433                assert!(ptr.is_null());
4434                assert!(hub.data_conn_list.head().is_null());
4435                assert_eq!(hub.data_src_map.len(), 3);
4436                assert_eq!(hub.data_conn_map.len(), 0);
4437                assert_eq!(hub.fixed, true);
4438
4439                hub.end();
4440
4441                let mut ptr = hub.local_data_src_list.not_setup_head();
4442                assert!(!ptr.is_null());
4443                ptr = unsafe { (*ptr).next };
4444                assert!(ptr.is_null());
4445                let mut ptr = hub.local_data_src_list.did_setup_head();
4446                assert!(!ptr.is_null());
4447                ptr = unsafe { (*ptr).next };
4448                assert!(ptr.is_null());
4449                assert!(hub.data_conn_list.head().is_null());
4450                assert_eq!(hub.data_src_map.len(), 3);
4451                assert_eq!(hub.data_conn_map.len(), 0);
4452                assert_eq!(hub.fixed, false);
4453            } else {
4454                panic!();
4455            }
4456        }
4457
4458        #[tokio::test]
4459        async fn async_test_commit_and_post_commit() {
4460            let _unused = TEST_SEQ.lock().unwrap();
4461            clear_global_data_srcs_fixed();
4462
4463            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
4464
4465            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
4466            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
4467
4468            if let Ok(_auto_shutdown) = setup_async().await {
4469                let mut hub = DataHub::new();
4470                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
4471                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
4472
4473                if let Ok(_) = hub.begin_async().await {
4474                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
4475                        assert_eq!(
4476                            any::type_name_of_val(conn1),
4477                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
4478                        );
4479                    } else {
4480                        panic!();
4481                    }
4482                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
4483                        assert_eq!(
4484                            any::type_name_of_val(conn2),
4485                            "sabi::data_hub::tests_data_hub::SyncDataConn"
4486                        );
4487                    } else {
4488                        panic!();
4489                    }
4490                    if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
4491                        assert_eq!(
4492                            any::type_name_of_val(conn3),
4493                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
4494                        );
4495                    } else {
4496                        panic!();
4497                    }
4498                    if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
4499                        assert_eq!(
4500                            any::type_name_of_val(conn4),
4501                            "sabi::data_hub::tests_data_hub::SyncDataConn"
4502                        );
4503                    } else {
4504                        panic!();
4505                    }
4506
4507                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
4508                        assert_eq!(
4509                            any::type_name_of_val(conn1),
4510                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
4511                        );
4512                    } else {
4513                        panic!();
4514                    }
4515                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
4516                        assert_eq!(
4517                            any::type_name_of_val(conn2),
4518                            "sabi::data_hub::tests_data_hub::SyncDataConn"
4519                        );
4520                    } else {
4521                        panic!();
4522                    }
4523                    if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
4524                        assert_eq!(
4525                            any::type_name_of_val(conn3),
4526                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
4527                        );
4528                    } else {
4529                        panic!();
4530                    }
4531                    if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
4532                        assert_eq!(
4533                            any::type_name_of_val(conn4),
4534                            "sabi::data_hub::tests_data_hub::SyncDataConn"
4535                        );
4536                    } else {
4537                        panic!();
4538                    }
4539
4540                    assert!(hub.commit_async().await.is_ok());
4541                    hub.end();
4542                } else {
4543                    panic!();
4544                }
4545            } else {
4546                panic!();
4547            }
4548
4549            assert_eq!(
4550                *logger.lock().unwrap(),
4551                vec![
4552                    "SyncDataSrc 2 setupped",
4553                    "AsyncDataSrc 1 setupped",
4554                    "SyncDataSrc 4 setupped",
4555                    "AsyncDataSrc 3 setupped",
4556                    "AsyncDataSrc 1 created DataConn",
4557                    "SyncDataSrc 2 created DataConn",
4558                    "AsyncDataSrc 3 created DataConn",
4559                    "SyncDataSrc 4 created DataConn",
4560                    "AsyncDataConn 1 pre committed",
4561                    "SyncDataConn 2 pre committed",
4562                    "AsyncDataConn 3 pre committed",
4563                    "SyncDataConn 4 pre committed",
4564                    "AsyncDataConn 1 committed",
4565                    "SyncDataConn 2 committed",
4566                    "AsyncDataConn 3 committed",
4567                    "SyncDataConn 4 committed",
4568                    "AsyncDataConn 1 post committed",
4569                    "SyncDataConn 2 post committed",
4570                    "AsyncDataConn 3 post committed",
4571                    "SyncDataConn 4 post committed",
4572                    "SyncDataConn 4 closed",
4573                    "SyncDataConn 4 dropped",
4574                    "AsyncDataConn 3 closed",
4575                    "AsyncDataConn 3 dropped",
4576                    "SyncDataConn 2 closed",
4577                    "SyncDataConn 2 dropped",
4578                    "AsyncDataConn 1 closed",
4579                    "AsyncDataConn 1 dropped",
4580                    "SyncDataSrc 4 closed",
4581                    "SyncDataSrc 4 dropped",
4582                    "AsyncDataSrc 3 closed",
4583                    "AsyncDataSrc 3 dropped",
4584                    "SyncDataSrc 2 closed",
4585                    "SyncDataSrc 2 dropped",
4586                    "AsyncDataSrc 1 closed",
4587                    "AsyncDataSrc 1 dropped",
4588                ],
4589            );
4590        }
4591
4592        #[tokio::test]
4593        async fn async_test_fail_to_cast_new_data_conn() {
4594            let _unused = TEST_SEQ.lock().unwrap();
4595            clear_global_data_srcs_fixed();
4596
4597            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
4598
4599            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
4600
4601            if let Ok(_auto_shutdown) = setup_async().await {
4602                let mut hub = DataHub::new();
4603                hub.uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
4604
4605                if let Ok(_) = hub.begin_async().await {
4606                    if let Err(err) = hub.get_data_conn::<SyncDataConn>("foo") {
4607                        match err.reason::<DataHubError>() {
4608                            Ok(r) => match r {
4609                                DataHubError::FailToCastDataConn { name, cast_to_type } => {
4610                                    assert_eq!(name, "foo");
4611                                    assert_eq!(
4612                                        *cast_to_type,
4613                                        "sabi::data_hub::tests_data_hub::SyncDataConn"
4614                                    );
4615                                }
4616                                _ => panic!(),
4617                            },
4618                            Err(_) => panic!(),
4619                        }
4620                    } else {
4621                        panic!();
4622                    }
4623
4624                    if let Err(err) = hub.get_data_conn::<AsyncDataConn>("bar") {
4625                        match err.reason::<DataHubError>() {
4626                            Ok(r) => match r {
4627                                DataHubError::FailToCastDataConn { name, cast_to_type } => {
4628                                    assert_eq!(name, "bar");
4629                                    assert_eq!(
4630                                        *cast_to_type,
4631                                        "sabi::data_hub::tests_data_hub::AsyncDataConn"
4632                                    );
4633                                }
4634                                _ => panic!(),
4635                            },
4636                            Err(_) => panic!(),
4637                        }
4638                    } else {
4639                        panic!();
4640                    }
4641                } else {
4642                    panic!();
4643                }
4644            } else {
4645                panic!();
4646            }
4647
4648            assert_eq!(
4649                *logger.lock().unwrap(),
4650                vec![
4651                    "AsyncDataSrc 1 setupped",
4652                    "SyncDataSrc 2 setupped",
4653                    "SyncDataSrc 2 closed",
4654                    "SyncDataSrc 2 dropped",
4655                    "AsyncDataSrc 1 closed",
4656                    "AsyncDataSrc 1 dropped",
4657                ],
4658            );
4659        }
4660
4661        #[tokio::test]
4662        async fn async_test_fail_to_cast_reused_data_conn() {
4663            let _unused = TEST_SEQ.lock().unwrap();
4664            clear_global_data_srcs_fixed();
4665
4666            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
4667
4668            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
4669
4670            if let Ok(_auto_shutdown) = setup_async().await {
4671                let mut hub = DataHub::new();
4672                hub.uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
4673
4674                if let Ok(_) = hub.begin_async().await {
4675                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
4676                        assert_eq!(
4677                            any::type_name_of_val(conn1),
4678                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
4679                        );
4680                    } else {
4681                        panic!();
4682                    }
4683                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
4684                        assert_eq!(
4685                            any::type_name_of_val(conn2),
4686                            "sabi::data_hub::tests_data_hub::SyncDataConn"
4687                        );
4688                    } else {
4689                        panic!();
4690                    }
4691
4692                    if let Err(err) = hub.get_data_conn::<SyncDataConn>("foo") {
4693                        match err.reason::<DataHubError>() {
4694                            Ok(r) => match r {
4695                                DataHubError::FailToCastDataConn { name, cast_to_type } => {
4696                                    assert_eq!(name, "foo");
4697                                    assert_eq!(
4698                                        *cast_to_type,
4699                                        "sabi::data_hub::tests_data_hub::SyncDataConn"
4700                                    );
4701                                }
4702                                _ => panic!(),
4703                            },
4704                            Err(_) => panic!(),
4705                        }
4706                    } else {
4707                        panic!();
4708                    }
4709
4710                    if let Err(err) = hub.get_data_conn::<AsyncDataConn>("bar") {
4711                        match err.reason::<DataHubError>() {
4712                            Ok(r) => match r {
4713                                DataHubError::FailToCastDataConn { name, cast_to_type } => {
4714                                    assert_eq!(name, "bar");
4715                                    assert_eq!(
4716                                        *cast_to_type,
4717                                        "sabi::data_hub::tests_data_hub::AsyncDataConn"
4718                                    );
4719                                }
4720                                _ => panic!(),
4721                            },
4722                            Err(_) => panic!(),
4723                        }
4724                    } else {
4725                        panic!();
4726                    }
4727                } else {
4728                    panic!();
4729                }
4730            } else {
4731                panic!();
4732            }
4733
4734            assert_eq!(
4735                *logger.lock().unwrap(),
4736                vec![
4737                    "AsyncDataSrc 1 setupped",
4738                    "SyncDataSrc 2 setupped",
4739                    "AsyncDataSrc 1 created DataConn",
4740                    "SyncDataSrc 2 created DataConn",
4741                    "SyncDataConn 2 closed",
4742                    "SyncDataConn 2 dropped",
4743                    "AsyncDataConn 1 closed",
4744                    "AsyncDataConn 1 dropped",
4745                    "SyncDataSrc 2 closed",
4746                    "SyncDataSrc 2 dropped",
4747                    "AsyncDataSrc 1 closed",
4748                    "AsyncDataSrc 1 dropped",
4749                ],
4750            );
4751        }
4752
4753        #[tokio::test]
4754        async fn async_test_fail_to_create_data_conn() {
4755            let _unused = TEST_SEQ.lock().unwrap();
4756            clear_global_data_srcs_fixed();
4757
4758            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
4759
4760            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
4761
4762            if let Ok(_auto_shutdown) = setup_async().await {
4763                let mut hub = DataHub::new();
4764                hub.uses(
4765                    "bar",
4766                    SyncDataSrc::new(2, logger.clone(), Fail::CreateDataConn),
4767                );
4768
4769                if let Ok(_) = hub.begin_async().await {
4770                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
4771                        assert_eq!(
4772                            any::type_name_of_val(conn1),
4773                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
4774                        );
4775                    } else {
4776                        panic!();
4777                    }
4778
4779                    if let Err(err) = hub.get_data_conn::<SyncDataConn>("bar") {
4780                        match err.reason::<DataHubError>() {
4781                            Ok(r) => match r {
4782                                DataHubError::FailToCreateDataConn {
4783                                    name,
4784                                    data_conn_type,
4785                                } => {
4786                                    assert_eq!(name, "bar");
4787                                    assert_eq!(
4788                                        *data_conn_type,
4789                                        "sabi::data_hub::tests_data_hub::SyncDataConn"
4790                                    );
4791                                }
4792                                _ => panic!(),
4793                            },
4794                            Err(_) => panic!(),
4795                        }
4796                        match err.source() {
4797                            Some(e) => {
4798                                assert_eq!(
4799                                    e.downcast_ref::<errs::Err>()
4800                                        .unwrap()
4801                                        .reason::<String>()
4802                                        .unwrap(),
4803                                    "xxx"
4804                                );
4805                            }
4806                            None => panic!(),
4807                        }
4808                    } else {
4809                        panic!();
4810                    }
4811                } else {
4812                    panic!();
4813                }
4814            } else {
4815                panic!();
4816            }
4817
4818            assert_eq!(
4819                *logger.lock().unwrap(),
4820                vec![
4821                    "AsyncDataSrc 1 setupped",
4822                    "SyncDataSrc 2 setupped",
4823                    "AsyncDataSrc 1 created DataConn",
4824                    "SyncDataSrc 2 failed to create a DataConn",
4825                    "AsyncDataConn 1 closed",
4826                    "AsyncDataConn 1 dropped",
4827                    "SyncDataSrc 2 closed",
4828                    "SyncDataSrc 2 dropped",
4829                    "AsyncDataSrc 1 closed",
4830                    "AsyncDataSrc 1 dropped",
4831                ],
4832            );
4833        }
4834
4835        #[tokio::test]
4836        async fn async_test_fail_to_create_data_conn_because_of_no_data_src() {
4837            let _unused = TEST_SEQ.lock().unwrap();
4838            clear_global_data_srcs_fixed();
4839
4840            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
4841
4842            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
4843
4844            if let Ok(_auto_shutdown) = setup_async().await {
4845                let mut hub = DataHub::new();
4846                hub.uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
4847
4848                if let Ok(_) = hub.begin_async().await {
4849                    if let Err(err) = hub.get_data_conn::<SyncDataConn>("baz") {
4850                        match err.reason::<DataHubError>() {
4851                            Ok(r) => match r {
4852                                DataHubError::NoDataSrcToCreateDataConn {
4853                                    name,
4854                                    data_conn_type,
4855                                } => {
4856                                    assert_eq!(name, "baz");
4857                                    assert_eq!(
4858                                        *data_conn_type,
4859                                        "sabi::data_hub::tests_data_hub::SyncDataConn"
4860                                    );
4861                                }
4862                                _ => panic!(),
4863                            },
4864                            Err(_) => panic!(),
4865                        }
4866                    } else {
4867                        panic!();
4868                    }
4869
4870                    if let Err(err) = hub.get_data_conn::<AsyncDataConn>("qux") {
4871                        match err.reason::<DataHubError>() {
4872                            Ok(r) => match r {
4873                                DataHubError::NoDataSrcToCreateDataConn {
4874                                    name,
4875                                    data_conn_type,
4876                                } => {
4877                                    assert_eq!(name, "qux");
4878                                    assert_eq!(
4879                                        *data_conn_type,
4880                                        "sabi::data_hub::tests_data_hub::AsyncDataConn"
4881                                    );
4882                                }
4883                                _ => panic!(),
4884                            },
4885                            Err(_) => panic!(),
4886                        }
4887                    } else {
4888                        panic!();
4889                    }
4890                } else {
4891                    panic!();
4892                }
4893            } else {
4894                panic!();
4895            }
4896
4897            assert_eq!(
4898                *logger.lock().unwrap(),
4899                vec![
4900                    "AsyncDataSrc 1 setupped",
4901                    "SyncDataSrc 2 setupped",
4902                    "SyncDataSrc 2 closed",
4903                    "SyncDataSrc 2 dropped",
4904                    "AsyncDataSrc 1 closed",
4905                    "AsyncDataSrc 1 dropped",
4906                ],
4907            );
4908        }
4909
4910        #[tokio::test]
4911        async fn async_test_commit_and_post_commit_when_no_data_conn() {
4912            let _unused = TEST_SEQ.lock().unwrap();
4913            clear_global_data_srcs_fixed();
4914
4915            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
4916
4917            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
4918            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
4919
4920            if let Ok(_auto_shutdown) = setup_async().await {
4921                let mut hub = DataHub::new();
4922                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
4923                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
4924
4925                if let Ok(_) = hub.begin_async().await {
4926                    assert!(hub.commit_async().await.is_ok());
4927                    hub.end();
4928                } else {
4929                    panic!();
4930                }
4931            } else {
4932                panic!();
4933            }
4934
4935            assert_eq!(
4936                *logger.lock().unwrap(),
4937                vec![
4938                    "SyncDataSrc 2 setupped",
4939                    "AsyncDataSrc 1 setupped",
4940                    "SyncDataSrc 4 setupped",
4941                    "AsyncDataSrc 3 setupped",
4942                    "SyncDataSrc 4 closed",
4943                    "SyncDataSrc 4 dropped",
4944                    "AsyncDataSrc 3 closed",
4945                    "AsyncDataSrc 3 dropped",
4946                    "SyncDataSrc 2 closed",
4947                    "SyncDataSrc 2 dropped",
4948                    "AsyncDataSrc 1 closed",
4949                    "AsyncDataSrc 1 dropped",
4950                ],
4951            );
4952        }
4953
4954        #[tokio::test]
4955        async fn async_test_commit_and_but_fail_global_sync() {
4956            let _unused = TEST_SEQ.lock().unwrap();
4957            clear_global_data_srcs_fixed();
4958
4959            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
4960
4961            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
4962            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Commit));
4963
4964            if let Ok(_auto_shutdown) = setup_async().await {
4965                let mut hub = DataHub::new();
4966                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
4967                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
4968
4969                if let Ok(_) = hub.begin_async().await {
4970                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
4971                        assert_eq!(
4972                            any::type_name_of_val(conn1),
4973                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
4974                        );
4975                    } else {
4976                        panic!();
4977                    }
4978                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
4979                        assert_eq!(
4980                            any::type_name_of_val(conn2),
4981                            "sabi::data_hub::tests_data_hub::SyncDataConn"
4982                        );
4983                    } else {
4984                        panic!();
4985                    }
4986                    if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
4987                        assert_eq!(
4988                            any::type_name_of_val(conn3),
4989                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
4990                        );
4991                    } else {
4992                        panic!();
4993                    }
4994                    if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
4995                        assert_eq!(
4996                            any::type_name_of_val(conn4),
4997                            "sabi::data_hub::tests_data_hub::SyncDataConn"
4998                        );
4999                    } else {
5000                        panic!();
5001                    }
5002
5003                    match hub.commit_async().await {
5004                        Ok(_) => panic!(),
5005                        Err(err) => match err.reason::<DataHubError>() {
5006                            Ok(r) => match r {
5007                                DataHubError::FailToCommitDataConn { errors } => {
5008                                    assert_eq!(errors.len(), 1);
5009                                    if let Some(e) = errors.get("bar") {
5010                                        if let Ok(s) = e.reason::<String>() {
5011                                            assert_eq!(s, "ZZZ");
5012                                        } else {
5013                                            panic!();
5014                                        }
5015                                    } else {
5016                                        panic!();
5017                                    }
5018                                }
5019                                _ => panic!(),
5020                            },
5021                            Err(_) => panic!(),
5022                        },
5023                    }
5024
5025                    hub.end();
5026                } else {
5027                    panic!();
5028                }
5029            } else {
5030                panic!();
5031            }
5032
5033            assert_eq!(
5034                *logger.lock().unwrap(),
5035                vec![
5036                    "SyncDataSrc 2 setupped",
5037                    "AsyncDataSrc 1 setupped",
5038                    "SyncDataSrc 4 setupped",
5039                    "AsyncDataSrc 3 setupped",
5040                    "AsyncDataSrc 1 created DataConn",
5041                    "SyncDataSrc 2 created DataConn",
5042                    "AsyncDataSrc 3 created DataConn",
5043                    "SyncDataSrc 4 created DataConn",
5044                    "AsyncDataConn 1 pre committed",
5045                    "SyncDataConn 2 pre committed",
5046                    "AsyncDataConn 3 pre committed",
5047                    "SyncDataConn 4 pre committed",
5048                    "AsyncDataConn 1 committed",
5049                    "SyncDataConn 2 failed to commit",
5050                    "SyncDataConn 4 closed",
5051                    "SyncDataConn 4 dropped",
5052                    "AsyncDataConn 3 closed",
5053                    "AsyncDataConn 3 dropped",
5054                    "SyncDataConn 2 closed",
5055                    "SyncDataConn 2 dropped",
5056                    "AsyncDataConn 1 closed",
5057                    "AsyncDataConn 1 dropped",
5058                    "SyncDataSrc 4 closed",
5059                    "SyncDataSrc 4 dropped",
5060                    "AsyncDataSrc 3 closed",
5061                    "AsyncDataSrc 3 dropped",
5062                    "SyncDataSrc 2 closed",
5063                    "SyncDataSrc 2 dropped",
5064                    "AsyncDataSrc 1 closed",
5065                    "AsyncDataSrc 1 dropped",
5066                ],
5067            );
5068        }
5069
5070        #[tokio::test]
5071        async fn async_test_commit_but_fail_global_async() {
5072            let _unused = TEST_SEQ.lock().unwrap();
5073            clear_global_data_srcs_fixed();
5074
5075            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
5076
5077            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Commit));
5078            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
5079
5080            if let Ok(_auto_shutdown) = setup_async().await {
5081                let mut hub = DataHub::new();
5082                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
5083                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
5084
5085                if let Ok(_) = hub.begin_async().await {
5086                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
5087                        assert_eq!(
5088                            any::type_name_of_val(conn1),
5089                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
5090                        );
5091                    } else {
5092                        panic!();
5093                    }
5094                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
5095                        assert_eq!(
5096                            any::type_name_of_val(conn2),
5097                            "sabi::data_hub::tests_data_hub::SyncDataConn"
5098                        );
5099                    } else {
5100                        panic!();
5101                    }
5102                    if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
5103                        assert_eq!(
5104                            any::type_name_of_val(conn3),
5105                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
5106                        );
5107                    } else {
5108                        panic!();
5109                    }
5110                    if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
5111                        assert_eq!(
5112                            any::type_name_of_val(conn4),
5113                            "sabi::data_hub::tests_data_hub::SyncDataConn"
5114                        );
5115                    } else {
5116                        panic!();
5117                    }
5118
5119                    match hub.commit_async().await {
5120                        Ok(_) => panic!(),
5121                        Err(err) => match err.reason::<DataHubError>() {
5122                            Ok(r) => match r {
5123                                DataHubError::FailToCommitDataConn { errors } => {
5124                                    assert_eq!(errors.len(), 1);
5125                                    if let Some(e) = errors.get("foo") {
5126                                        if let Ok(s) = e.reason::<String>() {
5127                                            assert_eq!(s, "VVV");
5128                                        } else {
5129                                            panic!();
5130                                        }
5131                                    } else {
5132                                        panic!();
5133                                    }
5134                                }
5135                                _ => panic!(),
5136                            },
5137                            Err(_) => panic!(),
5138                        },
5139                    }
5140
5141                    hub.end();
5142                } else {
5143                    panic!();
5144                }
5145            } else {
5146                panic!();
5147            }
5148
5149            assert_eq!(
5150                *logger.lock().unwrap(),
5151                vec![
5152                    "SyncDataSrc 2 setupped",
5153                    "AsyncDataSrc 1 setupped",
5154                    "SyncDataSrc 4 setupped",
5155                    "AsyncDataSrc 3 setupped",
5156                    "AsyncDataSrc 1 created DataConn",
5157                    "SyncDataSrc 2 created DataConn",
5158                    "AsyncDataSrc 3 created DataConn",
5159                    "SyncDataSrc 4 created DataConn",
5160                    "AsyncDataConn 1 pre committed",
5161                    "SyncDataConn 2 pre committed",
5162                    "AsyncDataConn 3 pre committed",
5163                    "SyncDataConn 4 pre committed",
5164                    "AsyncDataConn 1 failed to commit",
5165                    "SyncDataConn 4 closed",
5166                    "SyncDataConn 4 dropped",
5167                    "AsyncDataConn 3 closed",
5168                    "AsyncDataConn 3 dropped",
5169                    "SyncDataConn 2 closed",
5170                    "SyncDataConn 2 dropped",
5171                    "AsyncDataConn 1 closed",
5172                    "AsyncDataConn 1 dropped",
5173                    "SyncDataSrc 4 closed",
5174                    "SyncDataSrc 4 dropped",
5175                    "AsyncDataSrc 3 closed",
5176                    "AsyncDataSrc 3 dropped",
5177                    "SyncDataSrc 2 closed",
5178                    "SyncDataSrc 2 dropped",
5179                    "AsyncDataSrc 1 closed",
5180                    "AsyncDataSrc 1 dropped",
5181                ],
5182            );
5183        }
5184
5185        #[tokio::test]
5186        async fn async_test_commit_but_fail_local_sync() {
5187            let _unused = TEST_SEQ.lock().unwrap();
5188            clear_global_data_srcs_fixed();
5189
5190            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
5191
5192            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
5193            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
5194
5195            if let Ok(_auto_shutdown) = setup_async().await {
5196                let mut hub = DataHub::new();
5197                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
5198                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Commit));
5199
5200                if let Ok(_) = hub.begin_async().await {
5201                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
5202                        assert_eq!(
5203                            any::type_name_of_val(conn1),
5204                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
5205                        );
5206                    } else {
5207                        panic!();
5208                    }
5209                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
5210                        assert_eq!(
5211                            any::type_name_of_val(conn2),
5212                            "sabi::data_hub::tests_data_hub::SyncDataConn"
5213                        );
5214                    } else {
5215                        panic!();
5216                    }
5217                    if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
5218                        assert_eq!(
5219                            any::type_name_of_val(conn3),
5220                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
5221                        );
5222                    } else {
5223                        panic!();
5224                    }
5225                    if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
5226                        assert_eq!(
5227                            any::type_name_of_val(conn4),
5228                            "sabi::data_hub::tests_data_hub::SyncDataConn"
5229                        );
5230                    } else {
5231                        panic!();
5232                    }
5233
5234                    match hub.commit_async().await {
5235                        Ok(_) => panic!(),
5236                        Err(err) => match err.reason::<DataHubError>() {
5237                            Ok(r) => match r {
5238                                DataHubError::FailToCommitDataConn { errors } => {
5239                                    assert_eq!(errors.len(), 1);
5240                                    if let Some(e) = errors.get("qux") {
5241                                        if let Ok(s) = e.reason::<String>() {
5242                                            assert_eq!(s, "ZZZ");
5243                                        } else {
5244                                            panic!();
5245                                        }
5246                                    } else {
5247                                        panic!();
5248                                    }
5249                                }
5250                                _ => panic!(),
5251                            },
5252                            Err(_) => panic!(),
5253                        },
5254                    }
5255
5256                    hub.end();
5257                } else {
5258                    panic!();
5259                }
5260            } else {
5261                panic!();
5262            }
5263
5264            assert_eq!(
5265                *logger.lock().unwrap(),
5266                vec![
5267                    "SyncDataSrc 2 setupped",
5268                    "AsyncDataSrc 1 setupped",
5269                    "SyncDataSrc 4 setupped",
5270                    "AsyncDataSrc 3 setupped",
5271                    "AsyncDataSrc 1 created DataConn",
5272                    "SyncDataSrc 2 created DataConn",
5273                    "AsyncDataSrc 3 created DataConn",
5274                    "SyncDataSrc 4 created DataConn",
5275                    "AsyncDataConn 1 pre committed",
5276                    "SyncDataConn 2 pre committed",
5277                    "AsyncDataConn 3 pre committed",
5278                    "SyncDataConn 4 pre committed",
5279                    "AsyncDataConn 1 committed",
5280                    "SyncDataConn 2 committed",
5281                    "AsyncDataConn 3 committed",
5282                    "SyncDataConn 4 failed to commit",
5283                    "SyncDataConn 4 closed",
5284                    "SyncDataConn 4 dropped",
5285                    "AsyncDataConn 3 closed",
5286                    "AsyncDataConn 3 dropped",
5287                    "SyncDataConn 2 closed",
5288                    "SyncDataConn 2 dropped",
5289                    "AsyncDataConn 1 closed",
5290                    "AsyncDataConn 1 dropped",
5291                    "SyncDataSrc 4 closed",
5292                    "SyncDataSrc 4 dropped",
5293                    "AsyncDataSrc 3 closed",
5294                    "AsyncDataSrc 3 dropped",
5295                    "SyncDataSrc 2 closed",
5296                    "SyncDataSrc 2 dropped",
5297                    "AsyncDataSrc 1 closed",
5298                    "AsyncDataSrc 1 dropped",
5299                ],
5300            );
5301        }
5302
5303        #[tokio::test]
5304        async fn async_test_commit_but_fail_local_async() {
5305            let _unused = TEST_SEQ.lock().unwrap();
5306            clear_global_data_srcs_fixed();
5307
5308            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
5309
5310            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
5311            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
5312
5313            if let Ok(_auto_shutdown) = setup_async().await {
5314                let mut hub = DataHub::new();
5315                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Commit));
5316                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
5317
5318                if let Ok(_) = hub.begin_async().await {
5319                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
5320                        assert_eq!(
5321                            any::type_name_of_val(conn1),
5322                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
5323                        );
5324                    } else {
5325                        panic!();
5326                    }
5327                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
5328                        assert_eq!(
5329                            any::type_name_of_val(conn2),
5330                            "sabi::data_hub::tests_data_hub::SyncDataConn"
5331                        );
5332                    } else {
5333                        panic!();
5334                    }
5335                    if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
5336                        assert_eq!(
5337                            any::type_name_of_val(conn3),
5338                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
5339                        );
5340                    } else {
5341                        panic!();
5342                    }
5343                    if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
5344                        assert_eq!(
5345                            any::type_name_of_val(conn4),
5346                            "sabi::data_hub::tests_data_hub::SyncDataConn"
5347                        );
5348                    } else {
5349                        panic!();
5350                    }
5351
5352                    match hub.commit_async().await {
5353                        Ok(_) => panic!(),
5354                        Err(err) => match err.reason::<DataHubError>() {
5355                            Ok(r) => match r {
5356                                DataHubError::FailToCommitDataConn { errors } => {
5357                                    assert_eq!(errors.len(), 1);
5358                                    if let Some(e) = errors.get("baz") {
5359                                        if let Ok(s) = e.reason::<String>() {
5360                                            assert_eq!(s, "VVV");
5361                                        } else {
5362                                            panic!();
5363                                        }
5364                                    } else {
5365                                        panic!();
5366                                    }
5367                                }
5368                                _ => panic!(),
5369                            },
5370                            Err(_) => panic!(),
5371                        },
5372                    }
5373
5374                    hub.end();
5375                } else {
5376                    panic!();
5377                }
5378            } else {
5379                panic!();
5380            }
5381
5382            assert_eq!(
5383                *logger.lock().unwrap(),
5384                vec![
5385                    "SyncDataSrc 2 setupped",
5386                    "AsyncDataSrc 1 setupped",
5387                    "SyncDataSrc 4 setupped",
5388                    "AsyncDataSrc 3 setupped",
5389                    "AsyncDataSrc 1 created DataConn",
5390                    "SyncDataSrc 2 created DataConn",
5391                    "AsyncDataSrc 3 created DataConn",
5392                    "SyncDataSrc 4 created DataConn",
5393                    "AsyncDataConn 1 pre committed",
5394                    "SyncDataConn 2 pre committed",
5395                    "AsyncDataConn 3 pre committed",
5396                    "SyncDataConn 4 pre committed",
5397                    "AsyncDataConn 1 committed",
5398                    "SyncDataConn 2 committed",
5399                    "AsyncDataConn 3 failed to commit",
5400                    "SyncDataConn 4 closed",
5401                    "SyncDataConn 4 dropped",
5402                    "AsyncDataConn 3 closed",
5403                    "AsyncDataConn 3 dropped",
5404                    "SyncDataConn 2 closed",
5405                    "SyncDataConn 2 dropped",
5406                    "AsyncDataConn 1 closed",
5407                    "AsyncDataConn 1 dropped",
5408                    "SyncDataSrc 4 closed",
5409                    "SyncDataSrc 4 dropped",
5410                    "AsyncDataSrc 3 closed",
5411                    "AsyncDataSrc 3 dropped",
5412                    "SyncDataSrc 2 closed",
5413                    "SyncDataSrc 2 dropped",
5414                    "AsyncDataSrc 1 closed",
5415                    "AsyncDataSrc 1 dropped",
5416                ],
5417            );
5418        }
5419
5420        #[tokio::test]
5421        async fn async_test_pre_commit_but_fail_global_sync() {
5422            let _unused = TEST_SEQ.lock().unwrap();
5423            clear_global_data_srcs_fixed();
5424
5425            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
5426
5427            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
5428            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::PreCommit));
5429
5430            if let Ok(_auto_shutdown) = setup_async().await {
5431                let mut hub = DataHub::new();
5432                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
5433                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
5434
5435                if let Ok(_) = hub.begin_async().await {
5436                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
5437                        assert_eq!(
5438                            any::type_name_of_val(conn1),
5439                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
5440                        );
5441                    } else {
5442                        panic!();
5443                    }
5444                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
5445                        assert_eq!(
5446                            any::type_name_of_val(conn2),
5447                            "sabi::data_hub::tests_data_hub::SyncDataConn"
5448                        );
5449                    } else {
5450                        panic!();
5451                    }
5452                    if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
5453                        assert_eq!(
5454                            any::type_name_of_val(conn3),
5455                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
5456                        );
5457                    } else {
5458                        panic!();
5459                    }
5460                    if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
5461                        assert_eq!(
5462                            any::type_name_of_val(conn4),
5463                            "sabi::data_hub::tests_data_hub::SyncDataConn"
5464                        );
5465                    } else {
5466                        panic!();
5467                    }
5468
5469                    match hub.commit_async().await {
5470                        Ok(_) => panic!(),
5471                        Err(err) => match err.reason::<DataHubError>() {
5472                            Ok(r) => match r {
5473                                DataHubError::FailToPreCommitDataConn { errors } => {
5474                                    assert_eq!(errors.len(), 1);
5475                                    if let Some(e) = errors.get("bar") {
5476                                        if let Ok(s) = e.reason::<String>() {
5477                                            assert_eq!(s, "zzz");
5478                                        } else {
5479                                            panic!();
5480                                        }
5481                                    } else {
5482                                        panic!();
5483                                    }
5484                                }
5485                                _ => panic!(),
5486                            },
5487                            Err(_) => panic!(),
5488                        },
5489                    }
5490
5491                    hub.end();
5492                } else {
5493                    panic!();
5494                }
5495            } else {
5496                panic!();
5497            }
5498
5499            assert_eq!(
5500                *logger.lock().unwrap(),
5501                vec![
5502                    "SyncDataSrc 2 setupped",
5503                    "AsyncDataSrc 1 setupped",
5504                    "SyncDataSrc 4 setupped",
5505                    "AsyncDataSrc 3 setupped",
5506                    "AsyncDataSrc 1 created DataConn",
5507                    "SyncDataSrc 2 created DataConn",
5508                    "AsyncDataSrc 3 created DataConn",
5509                    "SyncDataSrc 4 created DataConn",
5510                    "AsyncDataConn 1 pre committed",
5511                    "SyncDataConn 2 failed to pre commit",
5512                    "SyncDataConn 4 closed",
5513                    "SyncDataConn 4 dropped",
5514                    "AsyncDataConn 3 closed",
5515                    "AsyncDataConn 3 dropped",
5516                    "SyncDataConn 2 closed",
5517                    "SyncDataConn 2 dropped",
5518                    "AsyncDataConn 1 closed",
5519                    "AsyncDataConn 1 dropped",
5520                    "SyncDataSrc 4 closed",
5521                    "SyncDataSrc 4 dropped",
5522                    "AsyncDataSrc 3 closed",
5523                    "AsyncDataSrc 3 dropped",
5524                    "SyncDataSrc 2 closed",
5525                    "SyncDataSrc 2 dropped",
5526                    "AsyncDataSrc 1 closed",
5527                    "AsyncDataSrc 1 dropped",
5528                ],
5529            );
5530        }
5531
5532        #[tokio::test]
5533        async fn async_test_pre_commit_but_fail_global_async() {
5534            let _unused = TEST_SEQ.lock().unwrap();
5535            clear_global_data_srcs_fixed();
5536
5537            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
5538
5539            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::PreCommit));
5540            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
5541
5542            if let Ok(_auto_shutdown) = setup_async().await {
5543                let mut hub = DataHub::new();
5544                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
5545                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
5546
5547                if let Ok(_) = hub.begin_async().await {
5548                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
5549                        assert_eq!(
5550                            any::type_name_of_val(conn1),
5551                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
5552                        );
5553                    } else {
5554                        panic!();
5555                    }
5556                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
5557                        assert_eq!(
5558                            any::type_name_of_val(conn2),
5559                            "sabi::data_hub::tests_data_hub::SyncDataConn"
5560                        );
5561                    } else {
5562                        panic!();
5563                    }
5564                    if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
5565                        assert_eq!(
5566                            any::type_name_of_val(conn3),
5567                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
5568                        );
5569                    } else {
5570                        panic!();
5571                    }
5572                    if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
5573                        assert_eq!(
5574                            any::type_name_of_val(conn4),
5575                            "sabi::data_hub::tests_data_hub::SyncDataConn"
5576                        );
5577                    } else {
5578                        panic!();
5579                    }
5580
5581                    match hub.commit_async().await {
5582                        Ok(_) => panic!(),
5583                        Err(err) => match err.reason::<DataHubError>() {
5584                            Ok(r) => match r {
5585                                DataHubError::FailToPreCommitDataConn { errors } => {
5586                                    assert_eq!(errors.len(), 1);
5587                                    if let Some(e) = errors.get("foo") {
5588                                        if let Ok(s) = e.reason::<String>() {
5589                                            assert_eq!(s, "vvv");
5590                                        } else {
5591                                            panic!();
5592                                        }
5593                                    } else {
5594                                        panic!();
5595                                    }
5596                                }
5597                                _ => panic!(),
5598                            },
5599                            Err(_) => panic!(),
5600                        },
5601                    }
5602
5603                    hub.end();
5604                } else {
5605                    panic!();
5606                }
5607            } else {
5608                panic!();
5609            }
5610
5611            assert_eq!(
5612                *logger.lock().unwrap(),
5613                vec![
5614                    "SyncDataSrc 2 setupped",
5615                    "AsyncDataSrc 1 setupped",
5616                    "SyncDataSrc 4 setupped",
5617                    "AsyncDataSrc 3 setupped",
5618                    "AsyncDataSrc 1 created DataConn",
5619                    "SyncDataSrc 2 created DataConn",
5620                    "AsyncDataSrc 3 created DataConn",
5621                    "SyncDataSrc 4 created DataConn",
5622                    "AsyncDataConn 1 failed to pre commit",
5623                    "SyncDataConn 4 closed",
5624                    "SyncDataConn 4 dropped",
5625                    "AsyncDataConn 3 closed",
5626                    "AsyncDataConn 3 dropped",
5627                    "SyncDataConn 2 closed",
5628                    "SyncDataConn 2 dropped",
5629                    "AsyncDataConn 1 closed",
5630                    "AsyncDataConn 1 dropped",
5631                    "SyncDataSrc 4 closed",
5632                    "SyncDataSrc 4 dropped",
5633                    "AsyncDataSrc 3 closed",
5634                    "AsyncDataSrc 3 dropped",
5635                    "SyncDataSrc 2 closed",
5636                    "SyncDataSrc 2 dropped",
5637                    "AsyncDataSrc 1 closed",
5638                    "AsyncDataSrc 1 dropped",
5639                ],
5640            );
5641        }
5642
5643        #[tokio::test]
5644        async fn async_test_pre_commit_but_fail_local_sync() {
5645            let _unused = TEST_SEQ.lock().unwrap();
5646            clear_global_data_srcs_fixed();
5647
5648            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
5649
5650            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
5651            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
5652
5653            if let Ok(_auto_shutdown) = setup_async().await {
5654                let mut hub = DataHub::new();
5655                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
5656                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::PreCommit));
5657
5658                if let Ok(_) = hub.begin_async().await {
5659                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
5660                        assert_eq!(
5661                            any::type_name_of_val(conn1),
5662                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
5663                        );
5664                    } else {
5665                        panic!();
5666                    }
5667                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
5668                        assert_eq!(
5669                            any::type_name_of_val(conn2),
5670                            "sabi::data_hub::tests_data_hub::SyncDataConn"
5671                        );
5672                    } else {
5673                        panic!();
5674                    }
5675                    if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
5676                        assert_eq!(
5677                            any::type_name_of_val(conn3),
5678                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
5679                        );
5680                    } else {
5681                        panic!();
5682                    }
5683                    if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
5684                        assert_eq!(
5685                            any::type_name_of_val(conn4),
5686                            "sabi::data_hub::tests_data_hub::SyncDataConn"
5687                        );
5688                    } else {
5689                        panic!();
5690                    }
5691
5692                    match hub.commit_async().await {
5693                        Ok(_) => panic!(),
5694                        Err(err) => match err.reason::<DataHubError>() {
5695                            Ok(r) => match r {
5696                                DataHubError::FailToPreCommitDataConn { errors } => {
5697                                    assert_eq!(errors.len(), 1);
5698                                    if let Some(e) = errors.get("qux") {
5699                                        if let Ok(s) = e.reason::<String>() {
5700                                            assert_eq!(s, "zzz");
5701                                        } else {
5702                                            panic!();
5703                                        }
5704                                    } else {
5705                                        panic!();
5706                                    }
5707                                }
5708                                _ => panic!(),
5709                            },
5710                            Err(_) => panic!(),
5711                        },
5712                    }
5713
5714                    hub.end();
5715                } else {
5716                    panic!();
5717                }
5718            } else {
5719                panic!();
5720            }
5721
5722            assert_eq!(
5723                *logger.lock().unwrap(),
5724                vec![
5725                    "SyncDataSrc 2 setupped",
5726                    "AsyncDataSrc 1 setupped",
5727                    "SyncDataSrc 4 setupped",
5728                    "AsyncDataSrc 3 setupped",
5729                    "AsyncDataSrc 1 created DataConn",
5730                    "SyncDataSrc 2 created DataConn",
5731                    "AsyncDataSrc 3 created DataConn",
5732                    "SyncDataSrc 4 created DataConn",
5733                    "AsyncDataConn 1 pre committed",
5734                    "SyncDataConn 2 pre committed",
5735                    "AsyncDataConn 3 pre committed",
5736                    "SyncDataConn 4 failed to pre commit",
5737                    "SyncDataConn 4 closed",
5738                    "SyncDataConn 4 dropped",
5739                    "AsyncDataConn 3 closed",
5740                    "AsyncDataConn 3 dropped",
5741                    "SyncDataConn 2 closed",
5742                    "SyncDataConn 2 dropped",
5743                    "AsyncDataConn 1 closed",
5744                    "AsyncDataConn 1 dropped",
5745                    "SyncDataSrc 4 closed",
5746                    "SyncDataSrc 4 dropped",
5747                    "AsyncDataSrc 3 closed",
5748                    "AsyncDataSrc 3 dropped",
5749                    "SyncDataSrc 2 closed",
5750                    "SyncDataSrc 2 dropped",
5751                    "AsyncDataSrc 1 closed",
5752                    "AsyncDataSrc 1 dropped",
5753                ],
5754            );
5755        }
5756
5757        #[tokio::test]
5758        async fn async_test_pre_commit_but_fail_local_async() {
5759            let _unused = TEST_SEQ.lock().unwrap();
5760            clear_global_data_srcs_fixed();
5761
5762            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
5763
5764            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
5765            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
5766
5767            if let Ok(_auto_shutdown) = setup_async().await {
5768                let mut hub = DataHub::new();
5769                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::PreCommit));
5770                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
5771
5772                if let Ok(_) = hub.begin_async().await {
5773                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
5774                        assert_eq!(
5775                            any::type_name_of_val(conn1),
5776                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
5777                        );
5778                    } else {
5779                        panic!();
5780                    }
5781                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
5782                        assert_eq!(
5783                            any::type_name_of_val(conn2),
5784                            "sabi::data_hub::tests_data_hub::SyncDataConn"
5785                        );
5786                    } else {
5787                        panic!();
5788                    }
5789                    if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
5790                        assert_eq!(
5791                            any::type_name_of_val(conn3),
5792                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
5793                        );
5794                    } else {
5795                        panic!();
5796                    }
5797                    if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
5798                        assert_eq!(
5799                            any::type_name_of_val(conn4),
5800                            "sabi::data_hub::tests_data_hub::SyncDataConn"
5801                        );
5802                    } else {
5803                        panic!();
5804                    }
5805
5806                    match hub.commit_async().await {
5807                        Ok(_) => panic!(),
5808                        Err(err) => match err.reason::<DataHubError>() {
5809                            Ok(r) => match r {
5810                                DataHubError::FailToPreCommitDataConn { errors } => {
5811                                    assert_eq!(errors.len(), 1);
5812                                    if let Some(e) = errors.get("baz") {
5813                                        if let Ok(s) = e.reason::<String>() {
5814                                            assert_eq!(s, "vvv");
5815                                        } else {
5816                                            panic!();
5817                                        }
5818                                    } else {
5819                                        panic!();
5820                                    }
5821                                }
5822                                _ => panic!(),
5823                            },
5824                            Err(_) => panic!(),
5825                        },
5826                    }
5827
5828                    hub.end();
5829                } else {
5830                    panic!();
5831                }
5832            } else {
5833                panic!();
5834            }
5835
5836            assert_eq!(
5837                *logger.lock().unwrap(),
5838                vec![
5839                    "SyncDataSrc 2 setupped",
5840                    "AsyncDataSrc 1 setupped",
5841                    "SyncDataSrc 4 setupped",
5842                    "AsyncDataSrc 3 setupped",
5843                    "AsyncDataSrc 1 created DataConn",
5844                    "SyncDataSrc 2 created DataConn",
5845                    "AsyncDataSrc 3 created DataConn",
5846                    "SyncDataSrc 4 created DataConn",
5847                    "AsyncDataConn 1 pre committed",
5848                    "SyncDataConn 2 pre committed",
5849                    "AsyncDataConn 3 failed to pre commit",
5850                    "SyncDataConn 4 closed",
5851                    "SyncDataConn 4 dropped",
5852                    "AsyncDataConn 3 closed",
5853                    "AsyncDataConn 3 dropped",
5854                    "SyncDataConn 2 closed",
5855                    "SyncDataConn 2 dropped",
5856                    "AsyncDataConn 1 closed",
5857                    "AsyncDataConn 1 dropped",
5858                    "SyncDataSrc 4 closed",
5859                    "SyncDataSrc 4 dropped",
5860                    "AsyncDataSrc 3 closed",
5861                    "AsyncDataSrc 3 dropped",
5862                    "SyncDataSrc 2 closed",
5863                    "SyncDataSrc 2 dropped",
5864                    "AsyncDataSrc 1 closed",
5865                    "AsyncDataSrc 1 dropped",
5866                ],
5867            );
5868        }
5869
5870        #[tokio::test]
5871        async fn async_test_rollback() {
5872            let _unused = TEST_SEQ.lock().unwrap();
5873            clear_global_data_srcs_fixed();
5874
5875            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
5876
5877            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
5878            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
5879
5880            if let Ok(_auto_shutdown) = setup_async().await {
5881                let mut hub = DataHub::new();
5882                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
5883                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
5884
5885                if let Ok(_) = hub.begin_async().await {
5886                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
5887                        assert_eq!(
5888                            any::type_name_of_val(conn1),
5889                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
5890                        );
5891                    } else {
5892                        panic!();
5893                    }
5894                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
5895                        assert_eq!(
5896                            any::type_name_of_val(conn2),
5897                            "sabi::data_hub::tests_data_hub::SyncDataConn"
5898                        );
5899                    } else {
5900                        panic!();
5901                    }
5902                    if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
5903                        assert_eq!(
5904                            any::type_name_of_val(conn3),
5905                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
5906                        );
5907                    } else {
5908                        panic!();
5909                    }
5910                    if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
5911                        assert_eq!(
5912                            any::type_name_of_val(conn4),
5913                            "sabi::data_hub::tests_data_hub::SyncDataConn"
5914                        );
5915                    } else {
5916                        panic!();
5917                    }
5918
5919                    hub.rollback_async().await;
5920                    hub.end();
5921                } else {
5922                    panic!();
5923                }
5924            } else {
5925                panic!();
5926            }
5927
5928            assert_eq!(
5929                *logger.lock().unwrap(),
5930                vec![
5931                    "SyncDataSrc 2 setupped",
5932                    "AsyncDataSrc 1 setupped",
5933                    "SyncDataSrc 4 setupped",
5934                    "AsyncDataSrc 3 setupped",
5935                    "AsyncDataSrc 1 created DataConn",
5936                    "SyncDataSrc 2 created DataConn",
5937                    "AsyncDataSrc 3 created DataConn",
5938                    "SyncDataSrc 4 created DataConn",
5939                    "AsyncDataConn 1 rollbacked",
5940                    "SyncDataConn 2 rollbacked",
5941                    "AsyncDataConn 3 rollbacked",
5942                    "SyncDataConn 4 rollbacked",
5943                    "SyncDataConn 4 closed",
5944                    "SyncDataConn 4 dropped",
5945                    "AsyncDataConn 3 closed",
5946                    "AsyncDataConn 3 dropped",
5947                    "SyncDataConn 2 closed",
5948                    "SyncDataConn 2 dropped",
5949                    "AsyncDataConn 1 closed",
5950                    "AsyncDataConn 1 dropped",
5951                    "SyncDataSrc 4 closed",
5952                    "SyncDataSrc 4 dropped",
5953                    "AsyncDataSrc 3 closed",
5954                    "AsyncDataSrc 3 dropped",
5955                    "SyncDataSrc 2 closed",
5956                    "SyncDataSrc 2 dropped",
5957                    "AsyncDataSrc 1 closed",
5958                    "AsyncDataSrc 1 dropped",
5959                ],
5960            );
5961        }
5962
5963        #[tokio::test]
5964        async fn async_test_force_back() {
5965            let _unused = TEST_SEQ.lock().unwrap();
5966            clear_global_data_srcs_fixed();
5967
5968            let logger = Arc::new(Mutex::new(Vec::<String>::new()));
5969
5970            uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
5971            uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
5972
5973            if let Ok(_auto_shutdown) = setup_async().await {
5974                let mut hub = DataHub::new();
5975                hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
5976                hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
5977
5978                if let Ok(_) = hub.begin_async().await {
5979                    if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
5980                        assert_eq!(
5981                            any::type_name_of_val(conn1),
5982                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
5983                        );
5984                    } else {
5985                        panic!();
5986                    }
5987                    if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
5988                        assert_eq!(
5989                            any::type_name_of_val(conn2),
5990                            "sabi::data_hub::tests_data_hub::SyncDataConn"
5991                        );
5992                    } else {
5993                        panic!();
5994                    }
5995                    if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
5996                        assert_eq!(
5997                            any::type_name_of_val(conn3),
5998                            "sabi::data_hub::tests_data_hub::AsyncDataConn"
5999                        );
6000                    } else {
6001                        panic!();
6002                    }
6003                    if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
6004                        assert_eq!(
6005                            any::type_name_of_val(conn4),
6006                            "sabi::data_hub::tests_data_hub::SyncDataConn"
6007                        );
6008                    } else {
6009                        panic!();
6010                    }
6011
6012                    assert!(hub.commit_async().await.is_ok());
6013                    hub.rollback_async().await;
6014                    hub.end();
6015                } else {
6016                    panic!();
6017                }
6018            } else {
6019                panic!();
6020            }
6021
6022            assert_eq!(
6023                *logger.lock().unwrap(),
6024                vec![
6025                    "SyncDataSrc 2 setupped",
6026                    "AsyncDataSrc 1 setupped",
6027                    "SyncDataSrc 4 setupped",
6028                    "AsyncDataSrc 3 setupped",
6029                    "AsyncDataSrc 1 created DataConn",
6030                    "SyncDataSrc 2 created DataConn",
6031                    "AsyncDataSrc 3 created DataConn",
6032                    "SyncDataSrc 4 created DataConn",
6033                    "AsyncDataConn 1 pre committed",
6034                    "SyncDataConn 2 pre committed",
6035                    "AsyncDataConn 3 pre committed",
6036                    "SyncDataConn 4 pre committed",
6037                    "AsyncDataConn 1 committed",
6038                    "SyncDataConn 2 committed",
6039                    "AsyncDataConn 3 committed",
6040                    "SyncDataConn 4 committed",
6041                    "AsyncDataConn 1 post committed",
6042                    "SyncDataConn 2 post committed",
6043                    "AsyncDataConn 3 post committed",
6044                    "SyncDataConn 4 post committed",
6045                    "AsyncDataConn 1 forced back",
6046                    "SyncDataConn 2 forced back",
6047                    "AsyncDataConn 3 forced back",
6048                    "SyncDataConn 4 forced back",
6049                    "SyncDataConn 4 closed",
6050                    "SyncDataConn 4 dropped",
6051                    "AsyncDataConn 3 closed",
6052                    "AsyncDataConn 3 dropped",
6053                    "SyncDataConn 2 closed",
6054                    "SyncDataConn 2 dropped",
6055                    "AsyncDataConn 1 closed",
6056                    "AsyncDataConn 1 dropped",
6057                    "SyncDataSrc 4 closed",
6058                    "SyncDataSrc 4 dropped",
6059                    "AsyncDataSrc 3 closed",
6060                    "AsyncDataSrc 3 dropped",
6061                    "SyncDataSrc 2 closed",
6062                    "SyncDataSrc 2 dropped",
6063                    "AsyncDataSrc 1 closed",
6064                    "AsyncDataSrc 1 dropped",
6065                ],
6066            );
6067        }
6068    }
6069}