Skip to main content

sabi/tokio/
data_hub.rs

1// Copyright (C) 2024-2026 Takayuki Sato. All Rights Reserved.
2// This program is free software under MIT License.
3// See the file LICENSE in this distribution for more details.
4
5use super::data_src::{copy_global_data_srcs_to_map, create_data_conn_from_global_data_src_async};
6use super::{DataConn, DataConnContainer, DataConnManager, DataHub, DataSrc, DataSrcManager};
7
8use std::collections::HashMap;
9use std::future::Future;
10use std::pin::Pin;
11use std::sync::Arc;
12use std::{any, ptr};
13
14/// Represents errors that can occur within the `DataHub`.
15#[derive(Debug)]
16pub enum DataHubError {
17    /// An error indicating that one or more local data sources failed during their setup processes.
18    FailToSetupLocalDataSrcs {
19        /// A vector of errors, each containing the name of the data source and the error itself.
20        errors: Vec<(Arc<str>, errs::Err)>,
21    },
22
23    /// An error indicating that no suitable data source was found to create a data connection
24    /// with the specified name and type.
25    NoDataSrcToCreateDataConn {
26        /// The name of the data connection that could not be created.
27        name: Arc<str>,
28        /// The string representation of the data connection type that was requested.
29        data_conn_type: &'static str,
30    },
31}
32
33impl DataHub {
34    /// Creates a new `DataHub` instance.
35    ///
36    /// This initializes the `DataHub` with no local data sources and an empty data connection manager.
37    /// Global data sources, if any, are copied into the `data_src_map`.
38    #[allow(clippy::new_without_default)]
39    pub fn new() -> Self {
40        let mut data_src_map = HashMap::new();
41        copy_global_data_srcs_to_map(&mut data_src_map);
42
43        Self {
44            local_data_src_manager: DataSrcManager::new(true),
45            data_src_map,
46            data_conn_manager: DataConnManager::new(),
47            fixed: false,
48        }
49    }
50
51    /// Creates a new `DataHub` instance with a specified commit order for data connections.
52    ///
53    /// This allows defining the order in which data connections will be committed. Connections
54    /// not specified in `names` will be committed after the specified ones, in an undefined order.
55    /// Global data sources are copied into the `data_src_map`.
56    ///
57    /// # Arguments
58    ///
59    /// * `names` - An array of string slices specifying the desired commit order by data connection name.
60    pub fn with_commit_order(names: &[&str]) -> Self {
61        let mut data_src_map = HashMap::new();
62        copy_global_data_srcs_to_map(&mut data_src_map);
63
64        Self {
65            local_data_src_manager: DataSrcManager::new(true),
66            data_src_map,
67            data_conn_manager: DataConnManager::with_commit_order(names),
68            fixed: false,
69        }
70    }
71
72    /// Registers a local data source with the `DataHub`.
73    ///
74    /// This method allows adding a custom data source which can provide data connections.
75    /// Data sources can only be added before `run_async` or `txn_async` are called.
76    ///
77    /// # Arguments
78    ///
79    /// * `name` - The name to associate with this data source.
80    /// * `ds` - The data source instance, which must implement `DataSrc` and have a `'static` lifetime.
81    ///
82    /// # Type Parameters
83    ///
84    /// * `S` - The type of the data source.
85    /// * `C` - The type of the data connection provided by the data source.
86    pub fn uses<S, C>(&mut self, name: impl Into<Arc<str>>, ds: S)
87    where
88        S: DataSrc<C> + 'static,
89        C: DataConn + 'static,
90    {
91        if self.fixed {
92            return;
93        }
94        self.local_data_src_manager.add(name, ds);
95    }
96
97    /// Deregisters a local data source from the `DataHub`.
98    ///
99    /// This removes a data source previously added with `uses`. Data sources can only be
100    /// removed before `run_async` or `txn_async` are called.
101    ///
102    /// # Arguments
103    ///
104    /// * `name` - The name of the data source to remove.
105    pub fn disuses(&mut self, name: impl AsRef<str>) {
106        if self.fixed {
107            return;
108        }
109        self.data_src_map.remove(name.as_ref());
110        self.local_data_src_manager.remove(name);
111    }
112
113    #[inline]
114    async fn begin_async(&mut self) -> errs::Result<()> {
115        self.fixed = true;
116
117        let mut errors = Vec::new();
118
119        self.local_data_src_manager.setup_async(&mut errors).await;
120        if errors.is_empty() {
121            self.local_data_src_manager
122                .copy_ds_ready_to_map(&mut self.data_src_map);
123            Ok(())
124        } else {
125            Err(errs::Err::new(DataHubError::FailToSetupLocalDataSrcs {
126                errors,
127            }))
128        }
129    }
130
131    #[inline]
132    async fn commit_async(&mut self) -> errs::Result<()> {
133        self.data_conn_manager.commit_async().await
134    }
135
136    #[inline]
137    async fn rollback_async(&mut self) {
138        self.data_conn_manager.rollback_async().await
139    }
140
141    #[inline]
142    fn end(&mut self) {
143        self.data_conn_manager.close();
144        self.fixed = false;
145    }
146
147    /// Executes an asynchronous logic function with the `DataHub` and handles setup and cleanup.
148    ///
149    /// This method sets up local data sources, runs the provided `logic_fn`, and then
150    /// cleans up all data connections and sources. It does *not* automatically commit
151    /// or rollback any transactions.
152    ///
153    /// # Arguments
154    ///
155    /// * `logic_fn` - An asynchronous function that takes a mutable reference to `DataHub`
156    ///                and returns a `Result`. This function contains the application's logic.
157    ///
158    /// # Type Parameters
159    ///
160    /// * `F` - The type of the asynchronous logic function.
161    ///
162    /// # Returns
163    ///
164    /// A `Result` indicating the success or failure of the `logic_fn` execution or
165    /// the setup of data sources.
166    #[allow(clippy::doc_overindented_list_items)]
167    pub async fn run_async<F>(&mut self, mut logic_fn: F) -> errs::Result<()>
168    where
169        for<'a> F: FnMut(&'a mut DataHub) -> Pin<Box<dyn Future<Output = errs::Result<()>> + 'a>>,
170    {
171        let mut r = self.begin_async().await;
172        if r.is_ok() {
173            r = logic_fn(self).await;
174        }
175        self.end();
176        r
177    }
178
179    /// Executes an asynchronous transactional logic function with the `DataHub`, managing
180    /// setup, commit, and rollback for data connections.
181    ///
182    /// This method sets up local data sources, runs the provided `logic_fn`. If the logic
183    /// function succeeds, it attempts to commit all created data connections. If either
184    /// the logic function or the commit fails, it rolls back all data connections.
185    /// Finally, it cleans up all data connections and sources.
186    ///
187    /// # Arguments
188    ///
189    /// * `logic_fn` - An asynchronous function that takes a mutable reference to `DataHub`
190    ///                and returns a `Result`. This function contains the application's transactional logic.
191    ///
192    /// # Type Parameters
193    ///
194    /// * `F` - The type of the asynchronous transactional logic function.
195    ///
196    /// # Returns
197    ///
198    /// A `Result` indicating the success or failure of the `logic_fn` execution,
199    /// the setup of data sources, or the commit/rollback operations.
200    #[allow(clippy::doc_overindented_list_items)]
201    pub async fn txn_async<F>(&mut self, mut logic_fn: F) -> errs::Result<()>
202    where
203        for<'a> F: FnMut(&'a mut DataHub) -> Pin<Box<dyn Future<Output = errs::Result<()>> + 'a>>,
204    {
205        let mut r = self.begin_async().await;
206        if r.is_ok() {
207            r = logic_fn(self).await;
208        }
209        if r.is_ok() {
210            r = self.commit_async().await;
211        }
212        if r.is_err() {
213            self.rollback_async().await;
214        }
215        self.end();
216        r
217    }
218
219    /// Retrieves an existing data connection or creates a new one if it doesn't exist.
220    ///
221    /// This asynchronous method first checks if a data connection with the given `name`
222    /// and type `C` already exists. If not, it attempts to find a suitable data source
223    /// (local or global) to create a new data connection.
224    ///
225    /// # Arguments
226    ///
227    /// * `name` - The name of the data connection to retrieve or create.
228    ///
229    /// # Type Parameters
230    ///
231    /// * `C` - The expected type of the data connection, which must implement `DataConn` and have a `'static` lifetime.
232    ///
233    /// # Returns
234    ///
235    /// A `Result` which is `Ok` containing a mutable reference to the data connection
236    /// if found or successfully created, or an `Err` if no suitable data source is found
237    /// or connection creation fails.
238    pub async fn get_data_conn_async<C>(&mut self, name: impl AsRef<str>) -> errs::Result<&mut C>
239    where
240        C: DataConn + 'static,
241    {
242        if let Some(nnptr) = self.data_conn_manager.find_by_name(name.as_ref()) {
243            let typed_nnptr = DataConnManager::to_typed_ptr::<C>(&nnptr)?;
244            return Ok(unsafe { &mut (*typed_nnptr).data_conn });
245        }
246
247        if let Some((local, index)) = self.data_src_map.get(name.as_ref()) {
248            let boxed = if *local {
249                self.local_data_src_manager
250                    .create_data_conn_async::<C>(*index, name.as_ref())
251                    .await?
252            } else {
253                create_data_conn_from_global_data_src_async::<C>(*index, name.as_ref()).await?
254            };
255
256            let ptr = Box::into_raw(boxed);
257            if let Some(nnptr) = ptr::NonNull::new(ptr) {
258                self.data_conn_manager
259                    .add(nnptr.cast::<DataConnContainer>());
260
261                let typed_ptr = ptr.cast::<DataConnContainer<C>>();
262                return Ok(unsafe { &mut (*typed_ptr).data_conn });
263            } else {
264                // impossible case.
265            }
266        }
267
268        Err(errs::Err::new(DataHubError::NoDataSrcToCreateDataConn {
269            name: name.as_ref().into(),
270            data_conn_type: any::type_name::<C>(),
271        }))
272    }
273}
274
275/// A convenience macro to easily convert an asynchronous function into a `Pin<Box<dyn Future>>`
276/// closure suitable for `DataHub`'s `run_async` or `txn_async` methods.
277///
278/// This macro simplifies passing async functions by handling the boxing and pinning.
279///
280/// # Example
281///
282/// ```ignore
283/// async fn my_logic(data_hub: &mut DataHub) -> errs::Result<()> {
284///     // ... some logic using data_hub
285///     Ok(())
286/// }
287///
288/// #[tokio::main]
289/// async fn main() {
290///     let mut hub = DataHub::new();
291///     hub.txn_async(logic!(my_logic)).await.unwrap();
292/// }
293/// ```
294#[macro_export]
295macro_rules! logic {
296    ($f:expr) => {
297        |data| Box::pin($f(data))
298    };
299}
300
301#[cfg(test)]
302mod tests_of_data_hub {
303    use super::*;
304    use crate::tokio::AsyncGroup;
305    use std::sync::Mutex;
306
307    #[derive(Clone, Copy, PartialEq)]
308    enum Failure {
309        None,
310        FailToPreCommit,
311        FailToCommit,
312        FailToSetup,
313        FailToCreateDataConn,
314    }
315
316    struct MyDataConn {
317        id: i8,
318        failure: Failure,
319        committed: bool,
320        logger: Arc<Mutex<Vec<String>>>,
321    }
322    impl MyDataConn {
323        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, failure: Failure) -> Self {
324            logger
325                .lock()
326                .unwrap()
327                .push(format!("MyDataConn::new {}", id));
328            Self {
329                id,
330                failure,
331                committed: false,
332                logger,
333            }
334        }
335    }
336    impl Drop for MyDataConn {
337        fn drop(&mut self) {
338            self.logger
339                .lock()
340                .unwrap()
341                .push(format!("MyDataConn::drop {}", self.id));
342        }
343    }
344    impl DataConn for MyDataConn {
345        async fn pre_commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
346            if self.failure == Failure::FailToPreCommit {
347                self.logger
348                    .lock()
349                    .unwrap()
350                    .push(format!("MyDataConn::pre_commit {} failed", self.id));
351                Err(errs::Err::new("pre commit error"))
352            } else {
353                self.logger
354                    .lock()
355                    .unwrap()
356                    .push(format!("MyDataConn::pre_commit {}", self.id));
357                Ok(())
358            }
359        }
360        async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
361            if self.failure == Failure::FailToCommit {
362                self.logger
363                    .lock()
364                    .unwrap()
365                    .push(format!("MyDataConn::commit {} failed", self.id));
366                Err(errs::Err::new("commit error"))
367            } else {
368                self.logger
369                    .lock()
370                    .unwrap()
371                    .push(format!("MyDataConn::commit {}", self.id));
372                self.committed = true;
373                Ok(())
374            }
375        }
376        async fn post_commit_async(&mut self, _ag: &mut AsyncGroup) {
377            self.logger
378                .lock()
379                .unwrap()
380                .push(format!("MyDataConn::post_commit {}", self.id));
381        }
382        async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {
383            self.logger
384                .lock()
385                .unwrap()
386                .push(format!("MyDataConn::rollback {}", self.id));
387        }
388        async fn force_back_async(&mut self, _ag: &mut AsyncGroup) {
389            self.logger
390                .lock()
391                .unwrap()
392                .push(format!("MyDataConn::force_back {}", self.id));
393        }
394        fn close(&mut self) {
395            self.logger
396                .lock()
397                .unwrap()
398                .push(format!("MyDataConn::close {}", self.id));
399        }
400    }
401
402    struct MyDataSrc {
403        id: i8,
404        failure: Failure,
405        logger: Arc<Mutex<Vec<String>>>,
406    }
407    impl MyDataSrc {
408        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, failure: Failure) -> Self {
409            logger
410                .lock()
411                .unwrap()
412                .push(format!("MyDataSrc::new {}", id));
413            Self {
414                id,
415                failure,
416                logger,
417            }
418        }
419    }
420    impl Drop for MyDataSrc {
421        fn drop(&mut self) {
422            self.logger
423                .lock()
424                .unwrap()
425                .push(format!("MyDataSrc::drop {}", self.id));
426        }
427    }
428    impl DataSrc<MyDataConn> for MyDataSrc {
429        async fn setup_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
430            if self.failure == Failure::FailToSetup {
431                self.logger
432                    .lock()
433                    .unwrap()
434                    .push(format!("MyDataSrc::setup {} failed", self.id));
435                Err(errs::Err::new("setup error".to_string()))
436            } else {
437                self.logger
438                    .lock()
439                    .unwrap()
440                    .push(format!("MyDataSrc::setup {}", self.id));
441                Ok(())
442            }
443        }
444        fn close(&mut self) {
445            self.logger
446                .lock()
447                .unwrap()
448                .push(format!("MyDataSrc::close {}", self.id));
449        }
450        async fn create_data_conn_async(&mut self) -> errs::Result<Box<MyDataConn>> {
451            if self.failure == Failure::FailToCreateDataConn {
452                self.logger
453                    .lock()
454                    .unwrap()
455                    .push(format!("MyDataSrc::create_data_conn {} failed", self.id));
456                return Err(errs::Err::new("eeee".to_string()));
457            }
458            {
459                self.logger
460                    .lock()
461                    .unwrap()
462                    .push(format!("MyDataSrc::create_data_conn {}", self.id));
463            }
464            let conn = MyDataConn::new(self.id, self.logger.clone(), self.failure);
465            Ok(Box::new(conn))
466        }
467    }
468
469    #[test]
470    fn test_new() {
471        let hub = DataHub::new();
472        assert!(hub.local_data_src_manager.vec_unready.is_empty());
473        assert!(hub.local_data_src_manager.vec_ready.is_empty());
474        assert!(hub.local_data_src_manager.local);
475        assert!(hub.data_src_map.is_empty());
476        assert!(hub.data_conn_manager.vec.is_empty());
477        assert!(hub.data_conn_manager.index_map.is_empty());
478        assert!(!hub.fixed);
479    }
480
481    #[test]
482    fn test_with_commit_order() {
483        let hub = DataHub::with_commit_order(&["bar", "qux", "foo"]);
484        assert!(hub.local_data_src_manager.vec_unready.is_empty());
485        assert!(hub.local_data_src_manager.vec_ready.is_empty());
486        assert!(hub.local_data_src_manager.local);
487        assert!(hub.data_src_map.is_empty());
488        assert_eq!(hub.data_conn_manager.vec.len(), 3);
489        assert_eq!(hub.data_conn_manager.index_map.len(), 3);
490        assert!(!hub.fixed);
491    }
492
493    #[tokio::test]
494    async fn test_uses_and_ok() {
495        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
496
497        let mut hub = DataHub::new();
498        hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
499        hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
500
501        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
502        assert!(hub.local_data_src_manager.vec_ready.is_empty());
503        assert!(hub.local_data_src_manager.local);
504        assert!(hub.data_src_map.is_empty());
505        assert_eq!(hub.data_conn_manager.vec.len(), 0);
506        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
507        assert!(!hub.fixed);
508
509        assert!(hub.begin_async().await.is_ok());
510
511        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
512        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
513        assert!(hub.local_data_src_manager.local);
514        assert_eq!(hub.data_src_map.len(), 2);
515        assert_eq!(hub.data_conn_manager.vec.len(), 0);
516        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
517        assert!(hub.fixed);
518    }
519
520    #[tokio::test]
521    async fn test_uses_but_already_fixed() {
522        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
523
524        let mut hub = DataHub::new();
525        hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
526
527        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 1);
528        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 0);
529        assert!(hub.local_data_src_manager.local);
530        assert_eq!(hub.data_src_map.len(), 0);
531        assert_eq!(hub.data_conn_manager.vec.len(), 0);
532        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
533        assert!(!hub.fixed);
534
535        assert!(hub.begin_async().await.is_ok());
536
537        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
538        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 1);
539        assert!(hub.local_data_src_manager.local);
540        assert_eq!(hub.data_src_map.len(), 1);
541        assert_eq!(hub.data_conn_manager.vec.len(), 0);
542        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
543        assert!(hub.fixed);
544
545        hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
546
547        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
548        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 1);
549        assert!(hub.local_data_src_manager.local);
550        assert_eq!(hub.data_src_map.len(), 1);
551        assert_eq!(hub.data_conn_manager.vec.len(), 0);
552        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
553        assert!(hub.fixed);
554    }
555
556    #[test]
557    fn test_disuses_and_ok() {
558        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
559
560        let mut hub = DataHub::new();
561        hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
562        hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
563
564        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
565        assert!(hub.local_data_src_manager.vec_ready.is_empty());
566        assert!(hub.local_data_src_manager.local);
567        assert!(hub.data_src_map.is_empty());
568        assert_eq!(hub.data_conn_manager.vec.len(), 0);
569        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
570        assert!(!hub.fixed);
571
572        hub.disuses("foo");
573
574        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 1);
575        assert!(hub.local_data_src_manager.vec_ready.is_empty());
576        assert!(hub.local_data_src_manager.local);
577        assert!(hub.data_src_map.is_empty());
578        assert_eq!(hub.data_conn_manager.vec.len(), 0);
579        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
580        assert!(!hub.fixed);
581
582        hub.disuses("bar");
583
584        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
585        assert!(hub.local_data_src_manager.vec_ready.is_empty());
586        assert!(hub.local_data_src_manager.local);
587        assert!(hub.data_src_map.is_empty());
588        assert_eq!(hub.data_conn_manager.vec.len(), 0);
589        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
590        assert!(!hub.fixed);
591    }
592
593    #[tokio::test]
594    async fn test_disuses_and_fix() {
595        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
596
597        let mut hub = DataHub::new();
598        hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
599        hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
600
601        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
602        assert!(hub.local_data_src_manager.vec_ready.is_empty());
603        assert!(hub.local_data_src_manager.local);
604        assert!(hub.data_src_map.is_empty());
605        assert_eq!(hub.data_conn_manager.vec.len(), 0);
606        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
607        assert!(!hub.fixed);
608
609        hub.disuses("foo");
610
611        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 1);
612        assert!(hub.local_data_src_manager.vec_ready.is_empty());
613        assert!(hub.local_data_src_manager.local);
614        assert!(hub.data_src_map.is_empty());
615        assert_eq!(hub.data_conn_manager.vec.len(), 0);
616        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
617        assert!(!hub.fixed);
618
619        hub.disuses("bar");
620
621        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
622        assert!(hub.local_data_src_manager.vec_ready.is_empty());
623        assert!(hub.local_data_src_manager.local);
624        assert!(hub.data_src_map.is_empty());
625        assert_eq!(hub.data_conn_manager.vec.len(), 0);
626        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
627        assert!(!hub.fixed);
628
629        hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
630        hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
631
632        assert!(hub.begin_async().await.is_ok());
633
634        assert!(hub.local_data_src_manager.vec_unready.is_empty());
635        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
636        assert!(hub.local_data_src_manager.local);
637        assert_eq!(hub.data_src_map.len(), 2);
638        assert_eq!(hub.data_conn_manager.vec.len(), 0);
639        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
640        assert!(hub.fixed);
641
642        hub.uses("baz", MyDataSrc::new(3, logger.clone(), Failure::None));
643
644        assert!(hub.local_data_src_manager.vec_unready.is_empty());
645        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
646        assert!(hub.local_data_src_manager.local);
647        assert_eq!(hub.data_src_map.len(), 2);
648        assert_eq!(hub.data_conn_manager.vec.len(), 0);
649        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
650        assert!(hub.fixed);
651
652        hub.disuses("bar");
653
654        assert!(hub.local_data_src_manager.vec_unready.is_empty());
655        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
656        assert!(hub.local_data_src_manager.local);
657        assert_eq!(hub.data_src_map.len(), 2);
658        assert_eq!(hub.data_conn_manager.vec.len(), 0);
659        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
660        assert!(hub.fixed);
661
662        hub.end();
663
664        assert!(hub.local_data_src_manager.vec_unready.is_empty());
665        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
666        assert!(hub.local_data_src_manager.local);
667        assert_eq!(hub.data_src_map.len(), 2);
668        assert_eq!(hub.data_conn_manager.vec.len(), 0);
669        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
670        assert!(!hub.fixed);
671
672        hub.disuses("bar");
673
674        assert!(hub.local_data_src_manager.vec_unready.is_empty());
675        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 1);
676        assert!(hub.local_data_src_manager.local);
677        assert_eq!(hub.data_src_map.len(), 1);
678        assert_eq!(hub.data_conn_manager.vec.len(), 0);
679        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
680        assert!(!hub.fixed);
681
682        hub.disuses("foo");
683
684        assert!(hub.local_data_src_manager.vec_unready.is_empty());
685        assert!(hub.local_data_src_manager.vec_ready.is_empty());
686        assert!(hub.local_data_src_manager.local);
687        assert_eq!(hub.data_src_map.len(), 0);
688        assert_eq!(hub.data_conn_manager.vec.len(), 0);
689        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
690        assert!(!hub.fixed);
691    }
692
693    #[tokio::test]
694    async fn test_begin_if_empty() {
695        let mut hub = DataHub::new();
696        assert!(hub.begin_async().await.is_ok());
697
698        assert!(hub.local_data_src_manager.vec_unready.is_empty());
699        assert!(hub.local_data_src_manager.vec_ready.is_empty());
700        assert!(hub.local_data_src_manager.local);
701        assert_eq!(hub.data_src_map.len(), 0);
702        assert_eq!(hub.data_conn_manager.vec.len(), 0);
703        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
704        assert!(hub.fixed);
705
706        hub.end();
707
708        assert!(hub.local_data_src_manager.vec_unready.is_empty());
709        assert!(hub.local_data_src_manager.vec_ready.is_empty());
710        assert!(hub.local_data_src_manager.local);
711        assert_eq!(hub.data_src_map.len(), 0);
712        assert_eq!(hub.data_conn_manager.vec.len(), 0);
713        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
714        assert!(!hub.fixed);
715    }
716
717    #[tokio::test]
718    async fn test_begin_and_ok() {
719        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
720
721        {
722            let mut hub = DataHub::new();
723
724            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
725            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
726
727            assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
728            assert_eq!(hub.local_data_src_manager.vec_ready.len(), 0);
729            assert_eq!(hub.local_data_src_manager.local, true);
730            assert_eq!(hub.data_src_map.len(), 0);
731            assert_eq!(hub.data_conn_manager.vec.len(), 0);
732            assert_eq!(hub.data_conn_manager.index_map.len(), 0);
733            assert_eq!(hub.fixed, false);
734
735            assert_eq!(hub.begin_async().await.is_ok(), true);
736
737            assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
738            assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
739            assert_eq!(hub.local_data_src_manager.local, true);
740            assert_eq!(hub.data_src_map.len(), 2);
741            assert_eq!(hub.data_conn_manager.vec.len(), 0);
742            assert_eq!(hub.data_conn_manager.index_map.len(), 0);
743            assert_eq!(hub.fixed, true);
744
745            hub.end();
746
747            assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
748            assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
749            assert_eq!(hub.local_data_src_manager.local, true);
750            assert_eq!(hub.data_src_map.len(), 2);
751            assert_eq!(hub.data_conn_manager.vec.len(), 0);
752            assert_eq!(hub.data_conn_manager.index_map.len(), 0);
753            assert_eq!(hub.fixed, false);
754        }
755
756        assert_eq!(
757            *logger.lock().unwrap(),
758            &[
759                "MyDataSrc::new 1",
760                "MyDataSrc::new 2",
761                "MyDataSrc::setup 1",
762                "MyDataSrc::setup 2",
763                "MyDataSrc::close 2",
764                "MyDataSrc::drop 2",
765                "MyDataSrc::close 1",
766                "MyDataSrc::drop 1",
767            ]
768        );
769    }
770
771    #[tokio::test]
772    async fn test_begin_but_failed() {
773        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
774
775        {
776            let mut hub = DataHub::new();
777
778            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
779            hub.uses(
780                "bar",
781                MyDataSrc::new(2, logger.clone(), Failure::FailToSetup),
782            );
783            hub.uses("baz", MyDataSrc::new(3, logger.clone(), Failure::None));
784
785            assert_eq!(hub.local_data_src_manager.vec_unready.len(), 3);
786            assert_eq!(hub.local_data_src_manager.vec_ready.len(), 0);
787            assert_eq!(hub.local_data_src_manager.local, true);
788            assert_eq!(hub.data_src_map.len(), 0);
789            assert_eq!(hub.data_conn_manager.vec.len(), 0);
790            assert_eq!(hub.data_conn_manager.index_map.len(), 0);
791            assert_eq!(hub.fixed, false);
792
793            if let Err(err) = hub.begin_async().await {
794                match err.reason::<DataHubError>() {
795                    Ok(DataHubError::FailToSetupLocalDataSrcs { errors }) => {
796                        assert_eq!(errors.len(), 1);
797                        assert_eq!(errors[0].0, "bar".into());
798                        assert_eq!(errors[0].1.reason::<String>().unwrap(), "setup error");
799                    }
800                    _ => panic!(),
801                }
802            } else {
803                panic!();
804            }
805
806            hub.end();
807        }
808
809        assert_eq!(
810            *logger.lock().unwrap(),
811            &[
812                "MyDataSrc::new 1",
813                "MyDataSrc::new 2",
814                "MyDataSrc::new 3",
815                "MyDataSrc::setup 1",
816                "MyDataSrc::setup 2 failed",
817                "MyDataSrc::close 2",
818                "MyDataSrc::close 1",
819                "MyDataSrc::drop 3",
820                "MyDataSrc::drop 2",
821                "MyDataSrc::drop 1",
822            ]
823        );
824    }
825
826    #[tokio::test]
827    async fn test_run_and_ok() {
828        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
829        {
830            let mut hub = DataHub::new();
831
832            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
833            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
834
835            let logger_clone = logger.clone();
836            assert!(hub
837                .run_async(|_data| {
838                    let logger_clone2 = logger_clone.clone();
839                    Box::pin(async move {
840                        logger_clone2
841                            .lock()
842                            .unwrap()
843                            .push("execute logic".to_string());
844                        Ok(())
845                    })
846                })
847                .await
848                .is_ok());
849        }
850
851        assert_eq!(
852            *logger.lock().unwrap(),
853            &[
854                "MyDataSrc::new 1",
855                "MyDataSrc::new 2",
856                "MyDataSrc::setup 1",
857                "MyDataSrc::setup 2",
858                "execute logic",
859                "MyDataSrc::close 2",
860                "MyDataSrc::drop 2",
861                "MyDataSrc::close 1",
862                "MyDataSrc::drop 1",
863            ]
864        );
865    }
866
867    #[tokio::test]
868    async fn test_run_but_failed() {
869        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
870        {
871            let mut hub = DataHub::new();
872
873            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
874            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
875
876            let logger_clone = logger.clone();
877            if let Err(err) = hub
878                .run_async(|_data| {
879                    let logger_clone2 = logger_clone.clone();
880                    Box::pin(async move {
881                        logger_clone2
882                            .lock()
883                            .unwrap()
884                            .push("execute logic but fail".to_string());
885                        Err(errs::Err::new("logic error".to_string()))
886                    })
887                })
888                .await
889            {
890                match err.reason::<String>() {
891                    Ok(s) => assert_eq!(s, "logic error"),
892                    _ => panic!(),
893                }
894            } else {
895                panic!();
896            }
897        }
898
899        assert_eq!(
900            *logger.lock().unwrap(),
901            &[
902                "MyDataSrc::new 1",
903                "MyDataSrc::new 2",
904                "MyDataSrc::setup 1",
905                "MyDataSrc::setup 2",
906                "execute logic but fail",
907                "MyDataSrc::close 2",
908                "MyDataSrc::drop 2",
909                "MyDataSrc::close 1",
910                "MyDataSrc::drop 1",
911            ]
912        );
913    }
914
915    #[tokio::test]
916    async fn test_txn_and_no_data_access_and_ok() {
917        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
918        {
919            let mut hub = DataHub::new();
920
921            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
922            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
923
924            let logger_clone = logger.clone();
925            assert!(hub
926                .txn_async(|_data| {
927                    let logger_clone2 = logger_clone.clone();
928                    Box::pin(async move {
929                        logger_clone2
930                            .lock()
931                            .unwrap()
932                            .push("execute logic".to_string());
933                        Ok(())
934                    })
935                })
936                .await
937                .is_ok());
938        }
939
940        assert_eq!(
941            *logger.lock().unwrap(),
942            &[
943                "MyDataSrc::new 1",
944                "MyDataSrc::new 2",
945                "MyDataSrc::setup 1",
946                "MyDataSrc::setup 2",
947                "execute logic",
948                "MyDataSrc::close 2",
949                "MyDataSrc::drop 2",
950                "MyDataSrc::close 1",
951                "MyDataSrc::drop 1",
952            ]
953        );
954    }
955
956    #[tokio::test]
957    async fn test_txn_and_has_data_access_and_ok() {
958        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
959        {
960            let mut hub = DataHub::new();
961
962            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
963            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
964
965            let logger_clone = logger.clone();
966            hub.txn_async(move |data| {
967                let logger_clone2 = logger_clone.clone();
968                Box::pin(async move {
969                    logger_clone2
970                        .lock()
971                        .unwrap()
972                        .push("execute logic".to_string());
973                    let _conn1 = data.get_data_conn_async::<MyDataConn>("foo").await?;
974                    let _conn2 = data.get_data_conn_async::<MyDataConn>("bar").await?;
975                    Ok(())
976                })
977            })
978            .await
979            .unwrap()
980        }
981
982        assert_eq!(
983            *logger.lock().unwrap(),
984            &[
985                "MyDataSrc::new 1",
986                "MyDataSrc::new 2",
987                "MyDataSrc::setup 1",
988                "MyDataSrc::setup 2",
989                "execute logic",
990                "MyDataSrc::create_data_conn 1",
991                "MyDataConn::new 1",
992                "MyDataSrc::create_data_conn 2",
993                "MyDataConn::new 2",
994                "MyDataConn::pre_commit 1",
995                "MyDataConn::pre_commit 2",
996                "MyDataConn::commit 1",
997                "MyDataConn::commit 2",
998                "MyDataConn::post_commit 1",
999                "MyDataConn::post_commit 2",
1000                "MyDataConn::close 1",
1001                "MyDataConn::drop 1",
1002                "MyDataConn::close 2",
1003                "MyDataConn::drop 2",
1004                "MyDataSrc::close 2",
1005                "MyDataSrc::drop 2",
1006                "MyDataSrc::close 1",
1007                "MyDataSrc::drop 1",
1008            ]
1009        );
1010    }
1011
1012    #[tokio::test]
1013    async fn test_txn_but_failed() {
1014        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1015        {
1016            let mut hub = DataHub::new();
1017
1018            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
1019            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
1020
1021            let logger_clone = logger.clone();
1022            if let Err(e) = hub
1023                .txn_async(move |data| {
1024                    let logger_clone2 = logger_clone.clone();
1025                    Box::pin(async move {
1026                        logger_clone2
1027                            .lock()
1028                            .unwrap()
1029                            .push("execute logic".to_string());
1030                        let _conn1 = data.get_data_conn_async::<MyDataConn>("foo").await?;
1031                        let _conn2 = data.get_data_conn_async::<MyDataConn>("bar").await?;
1032                        Err(errs::Err::new("logic error"))
1033                    })
1034                })
1035                .await
1036            {
1037                match e.reason::<&str>() {
1038                    Ok(s) => assert_eq!(s, &"logic error"),
1039                    _ => panic!(),
1040                }
1041            }
1042        }
1043
1044        assert_eq!(
1045            *logger.lock().unwrap(),
1046            &[
1047                "MyDataSrc::new 1",
1048                "MyDataSrc::new 2",
1049                "MyDataSrc::setup 1",
1050                "MyDataSrc::setup 2",
1051                "execute logic",
1052                "MyDataSrc::create_data_conn 1",
1053                "MyDataConn::new 1",
1054                "MyDataSrc::create_data_conn 2",
1055                "MyDataConn::new 2",
1056                "MyDataConn::rollback 1",
1057                "MyDataConn::rollback 2",
1058                "MyDataConn::close 1",
1059                "MyDataConn::drop 1",
1060                "MyDataConn::close 2",
1061                "MyDataConn::drop 2",
1062                "MyDataSrc::close 2",
1063                "MyDataSrc::drop 2",
1064                "MyDataSrc::close 1",
1065                "MyDataSrc::drop 1",
1066            ]
1067        );
1068    }
1069
1070    #[tokio::test]
1071    async fn test_txn_with_commit_order() {
1072        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1073        {
1074            let mut hub = DataHub::with_commit_order(&["bar", "foo"]);
1075
1076            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
1077            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
1078
1079            let logger_clone = logger.clone();
1080            hub.txn_async(move |data| {
1081                let logger_clone2 = logger_clone.clone();
1082                Box::pin(async move {
1083                    logger_clone2
1084                        .lock()
1085                        .unwrap()
1086                        .push("execute logic".to_string());
1087                    let _conn1 = data.get_data_conn_async::<MyDataConn>("foo").await?;
1088                    let _conn2 = data.get_data_conn_async::<MyDataConn>("bar").await?;
1089                    Ok(())
1090                })
1091            })
1092            .await
1093            .unwrap();
1094        }
1095
1096        assert_eq!(
1097            *logger.lock().unwrap(),
1098            &[
1099                "MyDataSrc::new 1",
1100                "MyDataSrc::new 2",
1101                "MyDataSrc::setup 1",
1102                "MyDataSrc::setup 2",
1103                "execute logic",
1104                "MyDataSrc::create_data_conn 1",
1105                "MyDataConn::new 1",
1106                "MyDataSrc::create_data_conn 2",
1107                "MyDataConn::new 2",
1108                "MyDataConn::pre_commit 2",
1109                "MyDataConn::pre_commit 1",
1110                "MyDataConn::commit 2",
1111                "MyDataConn::commit 1",
1112                "MyDataConn::post_commit 2",
1113                "MyDataConn::post_commit 1",
1114                "MyDataConn::close 2",
1115                "MyDataConn::drop 2",
1116                "MyDataConn::close 1",
1117                "MyDataConn::drop 1",
1118                "MyDataSrc::close 2",
1119                "MyDataSrc::drop 2",
1120                "MyDataSrc::close 1",
1121                "MyDataSrc::drop 1",
1122            ]
1123        );
1124    }
1125
1126    #[tokio::test]
1127    async fn test_get_data_conn_and_failed() {
1128        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1129        {
1130            let mut hub = DataHub::new();
1131
1132            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
1133            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
1134
1135            let logger_clone = logger.clone();
1136            let err = hub
1137                .txn_async(move |data| {
1138                    let logger_clone2 = logger_clone.clone();
1139                    Box::pin(async move {
1140                        logger_clone2
1141                            .lock()
1142                            .unwrap()
1143                            .push("execute logic".to_string());
1144                        let _conn1 = data.get_data_conn_async::<MyDataConn>("fxx").await?;
1145                        Ok(())
1146                    })
1147                })
1148                .await
1149                .unwrap_err();
1150
1151            match err.reason::<DataHubError>() {
1152                Ok(r) => match r {
1153                    DataHubError::NoDataSrcToCreateDataConn {
1154                        name,
1155                        data_conn_type,
1156                    } => {
1157                        assert_eq!(name.as_ref(), "fxx");
1158                        assert_eq!(
1159                            data_conn_type,
1160                            &"sabi::tokio::data_hub::tests_of_data_hub::MyDataConn"
1161                        );
1162                    }
1163                    _ => panic!(),
1164                },
1165                _ => panic!(),
1166            }
1167        }
1168
1169        assert_eq!(
1170            *logger.lock().unwrap(),
1171            &[
1172                "MyDataSrc::new 1",
1173                "MyDataSrc::new 2",
1174                "MyDataSrc::setup 1",
1175                "MyDataSrc::setup 2",
1176                "execute logic",
1177                "MyDataSrc::close 2",
1178                "MyDataSrc::drop 2",
1179                "MyDataSrc::close 1",
1180                "MyDataSrc::drop 1",
1181            ]
1182        );
1183    }
1184}