Skip to main content

sabi/tokio/
data_hub.rs

1// Copyright (C) 2024-2026 Takayuki Sato. All Rights Reserved.
2// This program is free software under MIT License.
3// See the file LICENSE in this distribution for more details.
4
5use super::data_src::{copy_global_data_srcs_to_map, create_data_conn_from_global_data_src_async};
6use super::{DataConn, DataConnContainer, DataConnManager, DataHub, DataSrc, DataSrcManager};
7use crate::SendSyncNonNull;
8
9use std::collections::HashMap;
10use std::future::Future;
11use std::pin::Pin;
12use std::sync::Arc;
13use std::{any, ptr};
14
15/// Represents errors that can occur within the `DataHub`.
16#[derive(Debug)]
17pub enum DataHubError {
18    /// An error indicating that one or more local data sources failed during their setup processes.
19    FailToSetupLocalDataSrcs {
20        /// A vector of errors, each containing the name of the data source and the error itself.
21        errors: Vec<(Arc<str>, errs::Err)>,
22    },
23
24    /// An error indicating that no suitable data source was found to create a data connection
25    /// with the specified name and type.
26    NoDataSrcToCreateDataConn {
27        /// The name of the data connection that could not be created.
28        name: Arc<str>,
29        /// The string representation of the data connection type that was requested.
30        data_conn_type: &'static str,
31    },
32}
33
34impl DataHub {
35    /// Creates a new `DataHub` instance.
36    ///
37    /// This initializes the `DataHub` with no local data sources and an empty data connection manager.
38    /// Global data sources, if any, are copied into the `data_src_map`.
39    #[allow(clippy::new_without_default)]
40    pub fn new() -> Self {
41        let mut data_src_map = HashMap::new();
42        copy_global_data_srcs_to_map(&mut data_src_map);
43
44        Self {
45            local_data_src_manager: DataSrcManager::new(true),
46            data_src_map,
47            data_conn_manager: DataConnManager::new(),
48            fixed: false,
49        }
50    }
51
52    /// Creates a new `DataHub` instance with a specified commit order for data connections.
53    ///
54    /// This allows defining the order in which data connections will be committed. Connections
55    /// not specified in `names` will be committed after the specified ones, in an undefined order.
56    /// Global data sources are copied into the `data_src_map`.
57    ///
58    /// # Arguments
59    ///
60    /// * `names` - An array of string slices specifying the desired commit order by data connection name.
61    pub fn with_commit_order(names: &[&str]) -> Self {
62        let mut data_src_map = HashMap::new();
63        copy_global_data_srcs_to_map(&mut data_src_map);
64
65        Self {
66            local_data_src_manager: DataSrcManager::new(true),
67            data_src_map,
68            data_conn_manager: DataConnManager::with_commit_order(names),
69            fixed: false,
70        }
71    }
72
73    /// Registers a local data source with the `DataHub`.
74    ///
75    /// This method allows adding a custom data source which can provide data connections.
76    /// Data sources can only be added before `run_async` or `txn_async` are called.
77    ///
78    /// # Arguments
79    ///
80    /// * `name` - The name to associate with this data source.
81    /// * `ds` - The data source instance, which must implement `DataSrc` and have a `'static` lifetime.
82    ///   If this `DataHub` is moved between threads, `ds` must also implement `Send`.
83    ///
84    /// # Type Parameters
85    ///
86    /// * `S` - The type of the data source.
87    /// * `C` - The type of the data connection provided by the data source.
88    pub fn uses<S, C>(&mut self, name: impl Into<Arc<str>>, ds: S)
89    where
90        S: DataSrc<C> + 'static,
91        C: DataConn + 'static,
92    {
93        if self.fixed {
94            return;
95        }
96        self.local_data_src_manager.add(name, ds);
97    }
98
99    /// Deregisters a local data source from the `DataHub`.
100    ///
101    /// This removes a data source previously added with `uses`. Data sources can only be
102    /// removed before `run_async` or `txn_async` are called.
103    ///
104    /// # Arguments
105    ///
106    /// * `name` - The name of the data source to remove.
107    pub fn disuses(&mut self, name: impl AsRef<str>) {
108        if self.fixed {
109            return;
110        }
111        self.data_src_map.remove(name.as_ref());
112        self.local_data_src_manager.remove(name);
113    }
114
115    #[inline]
116    async fn begin_async(&mut self) -> errs::Result<()> {
117        self.fixed = true;
118
119        let mut errors = Vec::new();
120
121        self.local_data_src_manager.setup_async(&mut errors).await;
122        if errors.is_empty() {
123            self.local_data_src_manager
124                .copy_ds_ready_to_map(&mut self.data_src_map);
125            Ok(())
126        } else {
127            Err(errs::Err::new(DataHubError::FailToSetupLocalDataSrcs {
128                errors,
129            }))
130        }
131    }
132
133    #[inline]
134    async fn commit_async(&mut self) -> errs::Result<()> {
135        self.data_conn_manager.commit_async().await
136    }
137
138    #[inline]
139    async fn rollback_async(&mut self) {
140        self.data_conn_manager.rollback_async().await
141    }
142
143    #[inline]
144    fn end(&mut self) {
145        self.data_conn_manager.close();
146        self.fixed = false;
147    }
148
149    /// Executes an asynchronous logic function with the `DataHub` and handles setup and cleanup.
150    ///
151    /// This method sets up local data sources, runs the provided `logic_fn`, and then
152    /// cleans up all data connections and sources. It does *not* automatically commit
153    /// or rollback any transactions.
154    ///
155    /// # Arguments
156    ///
157    /// * `logic_fn` - An asynchronous function that takes a mutable reference to `DataHub`
158    ///                and returns a `Result`. This function contains the application's logic.
159    ///                The returned `Future` must implement `Send`.
160    ///
161    /// # Type Parameters
162    ///
163    /// * `F` - The type of the asynchronous logic function.
164    ///
165    /// # Returns
166    ///
167    /// A `Result` indicating the success or failure of the `logic_fn` execution or
168    /// the setup of data sources.
169    #[allow(clippy::doc_overindented_list_items)]
170    pub async fn run_async<F>(&mut self, mut logic_fn: F) -> errs::Result<()>
171    where
172        for<'a> F:
173            FnMut(&'a mut DataHub) -> Pin<Box<dyn Future<Output = errs::Result<()>> + Send + 'a>>,
174    {
175        let mut r = self.begin_async().await;
176        if r.is_ok() {
177            r = logic_fn(self).await;
178        }
179        self.end();
180        r
181    }
182
183    /// Executes an asynchronous transactional logic function with the `DataHub`, managing
184    /// setup, commit, and rollback for data connections.
185    ///
186    /// This method sets up local data sources, runs the provided `logic_fn`. If the logic
187    /// function succeeds, it attempts to commit all created data connections. If either
188    /// the logic function or the commit fails, it rolls back all data connections.
189    /// Finally, it cleans up all data connections and sources.
190    ///
191    /// # Arguments
192    ///
193    /// * `logic_fn` - An asynchronous function that takes a mutable reference to `DataHub`
194    ///                and returns a `Result`. This function contains the application's transactional logic.
195    ///                The returned `Future` must implement `Send`.
196    ///
197    /// # Type Parameters
198    ///
199    /// * `F` - The type of the asynchronous transactional logic function.
200    ///
201    /// # Returns
202    ///
203    /// A `Result` indicating the success or failure of the `logic_fn` execution,
204    /// the setup of data sources, or the commit/rollback operations.
205    #[allow(clippy::doc_overindented_list_items)]
206    pub async fn txn_async<F>(&mut self, mut logic_fn: F) -> errs::Result<()>
207    where
208        for<'a> F:
209            FnMut(&'a mut DataHub) -> Pin<Box<dyn Future<Output = errs::Result<()>> + Send + 'a>>,
210    {
211        let mut r = self.begin_async().await;
212        if r.is_ok() {
213            r = logic_fn(self).await;
214        }
215        if r.is_ok() {
216            r = self.commit_async().await;
217        }
218        if r.is_err() {
219            self.rollback_async().await;
220        }
221        self.end();
222        r
223    }
224
225    /// Retrieves an existing data connection or creates a new one if it doesn't exist.
226    ///
227    /// This asynchronous method first checks if a data connection with the given `name`
228    /// and type `C` already exists. If not, it attempts to find a suitable data source
229    /// (local or global) to create a new data connection.
230    ///
231    /// # Arguments
232    ///
233    /// * `name` - The name of the data connection to retrieve or create.
234    ///
235    /// # Type Parameters
236    ///
237    /// * `C` - The expected type of the data connection, which must implement `DataConn` and have a `'static` lifetime.
238    ///
239    /// # Returns
240    ///
241    /// A `Result` which is `Ok` containing a mutable reference to the data connection
242    /// if found or successfully created, or an `Err` if no suitable data source is found
243    /// or connection creation fails.
244    pub async fn get_data_conn_async<C>(&mut self, name: impl AsRef<str>) -> errs::Result<&mut C>
245    where
246        C: DataConn + 'static,
247    {
248        if let Some(nnptr) = self.data_conn_manager.find_by_name(name.as_ref()) {
249            let typed_nnptr = DataConnManager::to_typed_ptr::<C>(&nnptr)?;
250            return Ok(unsafe { &mut (*typed_nnptr).data_conn });
251        }
252
253        if let Some((local, index)) = self.data_src_map.get(name.as_ref()) {
254            let boxed = if *local {
255                self.local_data_src_manager
256                    .create_data_conn_async::<C>(*index, name.as_ref())
257                    .await?
258            } else {
259                create_data_conn_from_global_data_src_async::<C>(*index, name.as_ref()).await?
260            };
261
262            let ptr = Box::into_raw(boxed);
263            if let Some(nnptr) = ptr::NonNull::new(ptr) {
264                let ssnnptr = SendSyncNonNull::new(nnptr);
265                self.data_conn_manager.add(ssnnptr);
266
267                let typed_ptr = ptr.cast::<DataConnContainer<C>>();
268                return Ok(unsafe { &mut (*typed_ptr).data_conn });
269            } else {
270                // impossible case.
271            }
272        }
273
274        Err(errs::Err::new(DataHubError::NoDataSrcToCreateDataConn {
275            name: name.as_ref().into(),
276            data_conn_type: any::type_name::<C>(),
277        }))
278    }
279}
280
281#[macro_export]
282#[doc(hidden)]
283macro_rules! _logic {
284    ($f:expr) => {
285        |data| {
286            let fut: std::pin::Pin<Box<dyn std::future::Future<Output = errs::Result<()>> + Send>> =
287                Box::pin(async move { $f(data).await });
288            fut
289        }
290    };
291}
292
293#[cfg(test)]
294mod tests_of_data_hub {
295    use super::*;
296    use crate::tokio::AsyncGroup;
297    use std::sync::Mutex;
298
299    #[derive(Clone, Copy, PartialEq)]
300    enum Failure {
301        None,
302        FailToPreCommit,
303        FailToCommit,
304        FailToSetup,
305        FailToCreateDataConn,
306    }
307
308    struct MyDataConn {
309        id: i8,
310        failure: Failure,
311        committed: bool,
312        logger: Arc<Mutex<Vec<String>>>,
313    }
314    impl MyDataConn {
315        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, failure: Failure) -> Self {
316            logger
317                .lock()
318                .unwrap()
319                .push(format!("MyDataConn::new {}", id));
320            Self {
321                id,
322                failure,
323                committed: false,
324                logger,
325            }
326        }
327    }
328    impl Drop for MyDataConn {
329        fn drop(&mut self) {
330            self.logger
331                .lock()
332                .unwrap()
333                .push(format!("MyDataConn::drop {}", self.id));
334        }
335    }
336    impl DataConn for MyDataConn {
337        async fn pre_commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
338            if self.failure == Failure::FailToPreCommit {
339                self.logger
340                    .lock()
341                    .unwrap()
342                    .push(format!("MyDataConn::pre_commit {} failed", self.id));
343                Err(errs::Err::new("pre commit error"))
344            } else {
345                self.logger
346                    .lock()
347                    .unwrap()
348                    .push(format!("MyDataConn::pre_commit {}", self.id));
349                Ok(())
350            }
351        }
352        async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
353            if self.failure == Failure::FailToCommit {
354                self.logger
355                    .lock()
356                    .unwrap()
357                    .push(format!("MyDataConn::commit {} failed", self.id));
358                Err(errs::Err::new("commit error"))
359            } else {
360                self.logger
361                    .lock()
362                    .unwrap()
363                    .push(format!("MyDataConn::commit {}", self.id));
364                self.committed = true;
365                Ok(())
366            }
367        }
368        async fn post_commit_async(&mut self, _ag: &mut AsyncGroup) {
369            self.logger
370                .lock()
371                .unwrap()
372                .push(format!("MyDataConn::post_commit {}", self.id));
373        }
374        async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {
375            self.logger
376                .lock()
377                .unwrap()
378                .push(format!("MyDataConn::rollback {}", self.id));
379        }
380        async fn force_back_async(&mut self, _ag: &mut AsyncGroup) {
381            self.logger
382                .lock()
383                .unwrap()
384                .push(format!("MyDataConn::force_back {}", self.id));
385        }
386        fn close(&mut self) {
387            self.logger
388                .lock()
389                .unwrap()
390                .push(format!("MyDataConn::close {}", self.id));
391        }
392    }
393
394    struct MyDataSrc {
395        id: i8,
396        failure: Failure,
397        logger: Arc<Mutex<Vec<String>>>,
398    }
399    impl MyDataSrc {
400        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, failure: Failure) -> Self {
401            logger
402                .lock()
403                .unwrap()
404                .push(format!("MyDataSrc::new {}", id));
405            Self {
406                id,
407                failure,
408                logger,
409            }
410        }
411    }
412    impl Drop for MyDataSrc {
413        fn drop(&mut self) {
414            self.logger
415                .lock()
416                .unwrap()
417                .push(format!("MyDataSrc::drop {}", self.id));
418        }
419    }
420    impl DataSrc<MyDataConn> for MyDataSrc {
421        async fn setup_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
422            if self.failure == Failure::FailToSetup {
423                self.logger
424                    .lock()
425                    .unwrap()
426                    .push(format!("MyDataSrc::setup {} failed", self.id));
427                Err(errs::Err::new("setup error".to_string()))
428            } else {
429                self.logger
430                    .lock()
431                    .unwrap()
432                    .push(format!("MyDataSrc::setup {}", self.id));
433                Ok(())
434            }
435        }
436        fn close(&mut self) {
437            self.logger
438                .lock()
439                .unwrap()
440                .push(format!("MyDataSrc::close {}", self.id));
441        }
442        async fn create_data_conn_async(&mut self) -> errs::Result<Box<MyDataConn>> {
443            if self.failure == Failure::FailToCreateDataConn {
444                self.logger
445                    .lock()
446                    .unwrap()
447                    .push(format!("MyDataSrc::create_data_conn {} failed", self.id));
448                return Err(errs::Err::new("eeee".to_string()));
449            }
450            {
451                self.logger
452                    .lock()
453                    .unwrap()
454                    .push(format!("MyDataSrc::create_data_conn {}", self.id));
455            }
456            let conn = MyDataConn::new(self.id, self.logger.clone(), self.failure);
457            Ok(Box::new(conn))
458        }
459    }
460
461    #[test]
462    fn test_new() {
463        let hub = DataHub::new();
464        assert!(hub.local_data_src_manager.vec_unready.is_empty());
465        assert!(hub.local_data_src_manager.vec_ready.is_empty());
466        assert!(hub.local_data_src_manager.local);
467        assert!(hub.data_src_map.is_empty());
468        assert!(hub.data_conn_manager.vec.is_empty());
469        assert!(hub.data_conn_manager.index_map.is_empty());
470        assert!(!hub.fixed);
471    }
472
473    #[test]
474    fn test_with_commit_order() {
475        let hub = DataHub::with_commit_order(&["bar", "qux", "foo"]);
476        assert!(hub.local_data_src_manager.vec_unready.is_empty());
477        assert!(hub.local_data_src_manager.vec_ready.is_empty());
478        assert!(hub.local_data_src_manager.local);
479        assert!(hub.data_src_map.is_empty());
480        assert_eq!(hub.data_conn_manager.vec.len(), 3);
481        assert_eq!(hub.data_conn_manager.index_map.len(), 3);
482        assert!(!hub.fixed);
483    }
484
485    #[tokio::test]
486    async fn test_uses_and_ok() {
487        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
488
489        let mut hub = DataHub::new();
490        hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
491        hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
492
493        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
494        assert!(hub.local_data_src_manager.vec_ready.is_empty());
495        assert!(hub.local_data_src_manager.local);
496        assert!(hub.data_src_map.is_empty());
497        assert_eq!(hub.data_conn_manager.vec.len(), 0);
498        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
499        assert!(!hub.fixed);
500
501        assert!(hub.begin_async().await.is_ok());
502
503        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
504        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
505        assert!(hub.local_data_src_manager.local);
506        assert_eq!(hub.data_src_map.len(), 2);
507        assert_eq!(hub.data_conn_manager.vec.len(), 0);
508        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
509        assert!(hub.fixed);
510    }
511
512    #[tokio::test]
513    async fn test_uses_but_already_fixed() {
514        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
515
516        let mut hub = DataHub::new();
517        hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
518
519        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 1);
520        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 0);
521        assert!(hub.local_data_src_manager.local);
522        assert_eq!(hub.data_src_map.len(), 0);
523        assert_eq!(hub.data_conn_manager.vec.len(), 0);
524        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
525        assert!(!hub.fixed);
526
527        assert!(hub.begin_async().await.is_ok());
528
529        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
530        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 1);
531        assert!(hub.local_data_src_manager.local);
532        assert_eq!(hub.data_src_map.len(), 1);
533        assert_eq!(hub.data_conn_manager.vec.len(), 0);
534        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
535        assert!(hub.fixed);
536
537        hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
538
539        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
540        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 1);
541        assert!(hub.local_data_src_manager.local);
542        assert_eq!(hub.data_src_map.len(), 1);
543        assert_eq!(hub.data_conn_manager.vec.len(), 0);
544        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
545        assert!(hub.fixed);
546    }
547
548    #[test]
549    fn test_disuses_and_ok() {
550        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
551
552        let mut hub = DataHub::new();
553        hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
554        hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
555
556        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
557        assert!(hub.local_data_src_manager.vec_ready.is_empty());
558        assert!(hub.local_data_src_manager.local);
559        assert!(hub.data_src_map.is_empty());
560        assert_eq!(hub.data_conn_manager.vec.len(), 0);
561        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
562        assert!(!hub.fixed);
563
564        hub.disuses("foo");
565
566        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 1);
567        assert!(hub.local_data_src_manager.vec_ready.is_empty());
568        assert!(hub.local_data_src_manager.local);
569        assert!(hub.data_src_map.is_empty());
570        assert_eq!(hub.data_conn_manager.vec.len(), 0);
571        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
572        assert!(!hub.fixed);
573
574        hub.disuses("bar");
575
576        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
577        assert!(hub.local_data_src_manager.vec_ready.is_empty());
578        assert!(hub.local_data_src_manager.local);
579        assert!(hub.data_src_map.is_empty());
580        assert_eq!(hub.data_conn_manager.vec.len(), 0);
581        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
582        assert!(!hub.fixed);
583    }
584
585    #[tokio::test]
586    async fn test_disuses_and_fix() {
587        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
588
589        let mut hub = DataHub::new();
590        hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
591        hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
592
593        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
594        assert!(hub.local_data_src_manager.vec_ready.is_empty());
595        assert!(hub.local_data_src_manager.local);
596        assert!(hub.data_src_map.is_empty());
597        assert_eq!(hub.data_conn_manager.vec.len(), 0);
598        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
599        assert!(!hub.fixed);
600
601        hub.disuses("foo");
602
603        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 1);
604        assert!(hub.local_data_src_manager.vec_ready.is_empty());
605        assert!(hub.local_data_src_manager.local);
606        assert!(hub.data_src_map.is_empty());
607        assert_eq!(hub.data_conn_manager.vec.len(), 0);
608        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
609        assert!(!hub.fixed);
610
611        hub.disuses("bar");
612
613        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
614        assert!(hub.local_data_src_manager.vec_ready.is_empty());
615        assert!(hub.local_data_src_manager.local);
616        assert!(hub.data_src_map.is_empty());
617        assert_eq!(hub.data_conn_manager.vec.len(), 0);
618        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
619        assert!(!hub.fixed);
620
621        hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
622        hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
623
624        assert!(hub.begin_async().await.is_ok());
625
626        assert!(hub.local_data_src_manager.vec_unready.is_empty());
627        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
628        assert!(hub.local_data_src_manager.local);
629        assert_eq!(hub.data_src_map.len(), 2);
630        assert_eq!(hub.data_conn_manager.vec.len(), 0);
631        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
632        assert!(hub.fixed);
633
634        hub.uses("baz", MyDataSrc::new(3, logger.clone(), Failure::None));
635
636        assert!(hub.local_data_src_manager.vec_unready.is_empty());
637        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
638        assert!(hub.local_data_src_manager.local);
639        assert_eq!(hub.data_src_map.len(), 2);
640        assert_eq!(hub.data_conn_manager.vec.len(), 0);
641        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
642        assert!(hub.fixed);
643
644        hub.disuses("bar");
645
646        assert!(hub.local_data_src_manager.vec_unready.is_empty());
647        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
648        assert!(hub.local_data_src_manager.local);
649        assert_eq!(hub.data_src_map.len(), 2);
650        assert_eq!(hub.data_conn_manager.vec.len(), 0);
651        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
652        assert!(hub.fixed);
653
654        hub.end();
655
656        assert!(hub.local_data_src_manager.vec_unready.is_empty());
657        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
658        assert!(hub.local_data_src_manager.local);
659        assert_eq!(hub.data_src_map.len(), 2);
660        assert_eq!(hub.data_conn_manager.vec.len(), 0);
661        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
662        assert!(!hub.fixed);
663
664        hub.disuses("bar");
665
666        assert!(hub.local_data_src_manager.vec_unready.is_empty());
667        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 1);
668        assert!(hub.local_data_src_manager.local);
669        assert_eq!(hub.data_src_map.len(), 1);
670        assert_eq!(hub.data_conn_manager.vec.len(), 0);
671        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
672        assert!(!hub.fixed);
673
674        hub.disuses("foo");
675
676        assert!(hub.local_data_src_manager.vec_unready.is_empty());
677        assert!(hub.local_data_src_manager.vec_ready.is_empty());
678        assert!(hub.local_data_src_manager.local);
679        assert_eq!(hub.data_src_map.len(), 0);
680        assert_eq!(hub.data_conn_manager.vec.len(), 0);
681        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
682        assert!(!hub.fixed);
683    }
684
685    #[tokio::test]
686    async fn test_begin_if_empty() {
687        let mut hub = DataHub::new();
688        assert!(hub.begin_async().await.is_ok());
689
690        assert!(hub.local_data_src_manager.vec_unready.is_empty());
691        assert!(hub.local_data_src_manager.vec_ready.is_empty());
692        assert!(hub.local_data_src_manager.local);
693        assert_eq!(hub.data_src_map.len(), 0);
694        assert_eq!(hub.data_conn_manager.vec.len(), 0);
695        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
696        assert!(hub.fixed);
697
698        hub.end();
699
700        assert!(hub.local_data_src_manager.vec_unready.is_empty());
701        assert!(hub.local_data_src_manager.vec_ready.is_empty());
702        assert!(hub.local_data_src_manager.local);
703        assert_eq!(hub.data_src_map.len(), 0);
704        assert_eq!(hub.data_conn_manager.vec.len(), 0);
705        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
706        assert!(!hub.fixed);
707    }
708
709    #[tokio::test]
710    async fn test_begin_and_ok() {
711        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
712
713        {
714            let mut hub = DataHub::new();
715
716            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
717            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
718
719            assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
720            assert_eq!(hub.local_data_src_manager.vec_ready.len(), 0);
721            assert_eq!(hub.local_data_src_manager.local, true);
722            assert_eq!(hub.data_src_map.len(), 0);
723            assert_eq!(hub.data_conn_manager.vec.len(), 0);
724            assert_eq!(hub.data_conn_manager.index_map.len(), 0);
725            assert_eq!(hub.fixed, false);
726
727            assert_eq!(hub.begin_async().await.is_ok(), true);
728
729            assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
730            assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
731            assert_eq!(hub.local_data_src_manager.local, true);
732            assert_eq!(hub.data_src_map.len(), 2);
733            assert_eq!(hub.data_conn_manager.vec.len(), 0);
734            assert_eq!(hub.data_conn_manager.index_map.len(), 0);
735            assert_eq!(hub.fixed, true);
736
737            hub.end();
738
739            assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
740            assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
741            assert_eq!(hub.local_data_src_manager.local, true);
742            assert_eq!(hub.data_src_map.len(), 2);
743            assert_eq!(hub.data_conn_manager.vec.len(), 0);
744            assert_eq!(hub.data_conn_manager.index_map.len(), 0);
745            assert_eq!(hub.fixed, false);
746        }
747
748        assert_eq!(
749            *logger.lock().unwrap(),
750            &[
751                "MyDataSrc::new 1",
752                "MyDataSrc::new 2",
753                "MyDataSrc::setup 1",
754                "MyDataSrc::setup 2",
755                "MyDataSrc::close 2",
756                "MyDataSrc::drop 2",
757                "MyDataSrc::close 1",
758                "MyDataSrc::drop 1",
759            ]
760        );
761    }
762
763    #[tokio::test]
764    async fn test_begin_but_failed() {
765        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
766
767        {
768            let mut hub = DataHub::new();
769
770            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
771            hub.uses(
772                "bar",
773                MyDataSrc::new(2, logger.clone(), Failure::FailToSetup),
774            );
775            hub.uses("baz", MyDataSrc::new(3, logger.clone(), Failure::None));
776
777            assert_eq!(hub.local_data_src_manager.vec_unready.len(), 3);
778            assert_eq!(hub.local_data_src_manager.vec_ready.len(), 0);
779            assert_eq!(hub.local_data_src_manager.local, true);
780            assert_eq!(hub.data_src_map.len(), 0);
781            assert_eq!(hub.data_conn_manager.vec.len(), 0);
782            assert_eq!(hub.data_conn_manager.index_map.len(), 0);
783            assert_eq!(hub.fixed, false);
784
785            if let Err(err) = hub.begin_async().await {
786                match err.reason::<DataHubError>() {
787                    Ok(DataHubError::FailToSetupLocalDataSrcs { errors }) => {
788                        assert_eq!(errors.len(), 1);
789                        assert_eq!(errors[0].0, "bar".into());
790                        assert_eq!(errors[0].1.reason::<String>().unwrap(), "setup error");
791                    }
792                    _ => panic!(),
793                }
794            } else {
795                panic!();
796            }
797
798            hub.end();
799        }
800
801        assert_eq!(
802            *logger.lock().unwrap(),
803            &[
804                "MyDataSrc::new 1",
805                "MyDataSrc::new 2",
806                "MyDataSrc::new 3",
807                "MyDataSrc::setup 1",
808                "MyDataSrc::setup 2 failed",
809                "MyDataSrc::close 2",
810                "MyDataSrc::close 1",
811                "MyDataSrc::drop 3",
812                "MyDataSrc::drop 2",
813                "MyDataSrc::drop 1",
814            ]
815        );
816    }
817
818    #[tokio::test]
819    async fn test_run_and_ok() {
820        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
821        {
822            let mut hub = DataHub::new();
823
824            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
825            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
826
827            let logger_clone = logger.clone();
828            assert!(hub
829                .run_async(|_data| {
830                    let logger_clone2 = logger_clone.clone();
831                    Box::pin(async move {
832                        logger_clone2
833                            .lock()
834                            .unwrap()
835                            .push("execute logic".to_string());
836                        Ok(())
837                    })
838                })
839                .await
840                .is_ok());
841        }
842
843        assert_eq!(
844            *logger.lock().unwrap(),
845            &[
846                "MyDataSrc::new 1",
847                "MyDataSrc::new 2",
848                "MyDataSrc::setup 1",
849                "MyDataSrc::setup 2",
850                "execute logic",
851                "MyDataSrc::close 2",
852                "MyDataSrc::drop 2",
853                "MyDataSrc::close 1",
854                "MyDataSrc::drop 1",
855            ]
856        );
857    }
858
859    #[tokio::test]
860    async fn test_run_but_failed() {
861        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
862        {
863            let mut hub = DataHub::new();
864
865            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
866            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
867
868            let logger_clone = logger.clone();
869            if let Err(err) = hub
870                .run_async(|_data| {
871                    let logger_clone2 = logger_clone.clone();
872                    Box::pin(async move {
873                        logger_clone2
874                            .lock()
875                            .unwrap()
876                            .push("execute logic but fail".to_string());
877                        Err(errs::Err::new("logic error".to_string()))
878                    })
879                })
880                .await
881            {
882                match err.reason::<String>() {
883                    Ok(s) => assert_eq!(s, "logic error"),
884                    _ => panic!(),
885                }
886            } else {
887                panic!();
888            }
889        }
890
891        assert_eq!(
892            *logger.lock().unwrap(),
893            &[
894                "MyDataSrc::new 1",
895                "MyDataSrc::new 2",
896                "MyDataSrc::setup 1",
897                "MyDataSrc::setup 2",
898                "execute logic but fail",
899                "MyDataSrc::close 2",
900                "MyDataSrc::drop 2",
901                "MyDataSrc::close 1",
902                "MyDataSrc::drop 1",
903            ]
904        );
905    }
906
907    #[tokio::test]
908    async fn test_txn_and_no_data_access_and_ok() {
909        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
910        {
911            let mut hub = DataHub::new();
912
913            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
914            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
915
916            let logger_clone = logger.clone();
917            assert!(hub
918                .txn_async(|_data| {
919                    let logger_clone2 = logger_clone.clone();
920                    Box::pin(async move {
921                        logger_clone2
922                            .lock()
923                            .unwrap()
924                            .push("execute logic".to_string());
925                        Ok(())
926                    })
927                })
928                .await
929                .is_ok());
930        }
931
932        assert_eq!(
933            *logger.lock().unwrap(),
934            &[
935                "MyDataSrc::new 1",
936                "MyDataSrc::new 2",
937                "MyDataSrc::setup 1",
938                "MyDataSrc::setup 2",
939                "execute logic",
940                "MyDataSrc::close 2",
941                "MyDataSrc::drop 2",
942                "MyDataSrc::close 1",
943                "MyDataSrc::drop 1",
944            ]
945        );
946    }
947
948    #[tokio::test]
949    async fn test_txn_and_has_data_access_and_ok() {
950        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
951        {
952            let mut hub = DataHub::new();
953
954            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
955            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
956
957            let logger_clone = logger.clone();
958            hub.txn_async(move |data| {
959                let logger_clone2 = logger_clone.clone();
960                Box::pin(async move {
961                    logger_clone2
962                        .lock()
963                        .unwrap()
964                        .push("execute logic".to_string());
965                    let _conn1 = data.get_data_conn_async::<MyDataConn>("foo").await?;
966                    let _conn2 = data.get_data_conn_async::<MyDataConn>("bar").await?;
967                    Ok(())
968                })
969            })
970            .await
971            .unwrap()
972        }
973
974        assert_eq!(
975            *logger.lock().unwrap(),
976            &[
977                "MyDataSrc::new 1",
978                "MyDataSrc::new 2",
979                "MyDataSrc::setup 1",
980                "MyDataSrc::setup 2",
981                "execute logic",
982                "MyDataSrc::create_data_conn 1",
983                "MyDataConn::new 1",
984                "MyDataSrc::create_data_conn 2",
985                "MyDataConn::new 2",
986                "MyDataConn::pre_commit 1",
987                "MyDataConn::pre_commit 2",
988                "MyDataConn::commit 1",
989                "MyDataConn::commit 2",
990                "MyDataConn::post_commit 1",
991                "MyDataConn::post_commit 2",
992                "MyDataConn::close 1",
993                "MyDataConn::drop 1",
994                "MyDataConn::close 2",
995                "MyDataConn::drop 2",
996                "MyDataSrc::close 2",
997                "MyDataSrc::drop 2",
998                "MyDataSrc::close 1",
999                "MyDataSrc::drop 1",
1000            ]
1001        );
1002    }
1003
1004    #[tokio::test]
1005    async fn test_txn_but_failed() {
1006        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1007        {
1008            let mut hub = DataHub::new();
1009
1010            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
1011            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
1012
1013            let logger_clone = logger.clone();
1014            if let Err(e) = hub
1015                .txn_async(move |data| {
1016                    let logger_clone2 = logger_clone.clone();
1017                    Box::pin(async move {
1018                        logger_clone2
1019                            .lock()
1020                            .unwrap()
1021                            .push("execute logic".to_string());
1022                        let _conn1 = data.get_data_conn_async::<MyDataConn>("foo").await?;
1023                        let _conn2 = data.get_data_conn_async::<MyDataConn>("bar").await?;
1024                        Err(errs::Err::new("logic error"))
1025                    })
1026                })
1027                .await
1028            {
1029                match e.reason::<&str>() {
1030                    Ok(s) => assert_eq!(s, &"logic error"),
1031                    _ => panic!(),
1032                }
1033            }
1034        }
1035
1036        assert_eq!(
1037            *logger.lock().unwrap(),
1038            &[
1039                "MyDataSrc::new 1",
1040                "MyDataSrc::new 2",
1041                "MyDataSrc::setup 1",
1042                "MyDataSrc::setup 2",
1043                "execute logic",
1044                "MyDataSrc::create_data_conn 1",
1045                "MyDataConn::new 1",
1046                "MyDataSrc::create_data_conn 2",
1047                "MyDataConn::new 2",
1048                "MyDataConn::rollback 1",
1049                "MyDataConn::rollback 2",
1050                "MyDataConn::close 1",
1051                "MyDataConn::drop 1",
1052                "MyDataConn::close 2",
1053                "MyDataConn::drop 2",
1054                "MyDataSrc::close 2",
1055                "MyDataSrc::drop 2",
1056                "MyDataSrc::close 1",
1057                "MyDataSrc::drop 1",
1058            ]
1059        );
1060    }
1061
1062    #[tokio::test]
1063    async fn test_txn_with_commit_order() {
1064        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1065        {
1066            let mut hub = DataHub::with_commit_order(&["bar", "foo"]);
1067
1068            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
1069            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
1070
1071            let logger_clone = logger.clone();
1072            hub.txn_async(move |data| {
1073                let logger_clone2 = logger_clone.clone();
1074                Box::pin(async move {
1075                    logger_clone2
1076                        .lock()
1077                        .unwrap()
1078                        .push("execute logic".to_string());
1079                    let _conn1 = data.get_data_conn_async::<MyDataConn>("foo").await?;
1080                    let _conn2 = data.get_data_conn_async::<MyDataConn>("bar").await?;
1081                    Ok(())
1082                })
1083            })
1084            .await
1085            .unwrap();
1086        }
1087
1088        assert_eq!(
1089            *logger.lock().unwrap(),
1090            &[
1091                "MyDataSrc::new 1",
1092                "MyDataSrc::new 2",
1093                "MyDataSrc::setup 1",
1094                "MyDataSrc::setup 2",
1095                "execute logic",
1096                "MyDataSrc::create_data_conn 1",
1097                "MyDataConn::new 1",
1098                "MyDataSrc::create_data_conn 2",
1099                "MyDataConn::new 2",
1100                "MyDataConn::pre_commit 2",
1101                "MyDataConn::pre_commit 1",
1102                "MyDataConn::commit 2",
1103                "MyDataConn::commit 1",
1104                "MyDataConn::post_commit 2",
1105                "MyDataConn::post_commit 1",
1106                "MyDataConn::close 2",
1107                "MyDataConn::drop 2",
1108                "MyDataConn::close 1",
1109                "MyDataConn::drop 1",
1110                "MyDataSrc::close 2",
1111                "MyDataSrc::drop 2",
1112                "MyDataSrc::close 1",
1113                "MyDataSrc::drop 1",
1114            ]
1115        );
1116    }
1117
1118    #[tokio::test]
1119    async fn test_get_data_conn_and_failed() {
1120        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1121        {
1122            let mut hub = DataHub::new();
1123
1124            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
1125            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
1126
1127            let logger_clone = logger.clone();
1128            let err = hub
1129                .txn_async(move |data| {
1130                    let logger_clone2 = logger_clone.clone();
1131                    Box::pin(async move {
1132                        logger_clone2
1133                            .lock()
1134                            .unwrap()
1135                            .push("execute logic".to_string());
1136                        let _conn1 = data.get_data_conn_async::<MyDataConn>("fxx").await?;
1137                        Ok(())
1138                    })
1139                })
1140                .await
1141                .unwrap_err();
1142
1143            match err.reason::<DataHubError>() {
1144                Ok(r) => match r {
1145                    DataHubError::NoDataSrcToCreateDataConn {
1146                        name,
1147                        data_conn_type,
1148                    } => {
1149                        assert_eq!(name.as_ref(), "fxx");
1150                        assert_eq!(
1151                            data_conn_type,
1152                            &"sabi::tokio::data_hub::tests_of_data_hub::MyDataConn"
1153                        );
1154                    }
1155                    _ => panic!(),
1156                },
1157                _ => panic!(),
1158            }
1159        }
1160
1161        assert_eq!(
1162            *logger.lock().unwrap(),
1163            &[
1164                "MyDataSrc::new 1",
1165                "MyDataSrc::new 2",
1166                "MyDataSrc::setup 1",
1167                "MyDataSrc::setup 2",
1168                "execute logic",
1169                "MyDataSrc::close 2",
1170                "MyDataSrc::drop 2",
1171                "MyDataSrc::close 1",
1172                "MyDataSrc::drop 1",
1173            ]
1174        );
1175    }
1176
1177    trait Data {}
1178    impl Data for DataHub {}
1179
1180    async fn process_async(_data: &mut impl Data) -> errs::Result<()> {
1181        Ok(())
1182    }
1183
1184    #[tokio::test]
1185    async fn data_hub_implements_send_trait() {
1186        let handle = tokio::spawn(async {
1187            let mut data = DataHub::new();
1188            data.run_async(_logic!(process_async)).await.unwrap();
1189        });
1190
1191        handle.await.unwrap();
1192    }
1193}