Skip to main content

sabi/tokio/
data_hub.rs

1// Copyright (C) 2024-2026 Takayuki Sato. All Rights Reserved.
2// This program is free software under MIT License.
3// See the file LICENSE in this distribution for more details.
4
5use super::data_src::{copy_global_data_srcs_to_map, create_data_conn_from_global_data_src_async};
6use super::{DataConn, DataConnContainer, DataConnManager, DataHub, DataSrc, DataSrcManager};
7
8use std::collections::HashMap;
9use std::future::Future;
10use std::pin::Pin;
11use std::sync::Arc;
12use std::{any, ptr};
13
14/// Represents errors that can occur within the `DataHub`.
15#[derive(Debug)]
16pub enum DataHubError {
17    /// An error indicating that one or more local data sources failed during their setup processes.
18    FailToSetupLocalDataSrcs {
19        /// A vector of errors, each containing the name of the data source and the error itself.
20        errors: Vec<(Arc<str>, errs::Err)>,
21    },
22
23    /// An error indicating that no suitable data source was found to create a data connection
24    /// with the specified name and type.
25    NoDataSrcToCreateDataConn {
26        /// The name of the data connection that could not be created.
27        name: Arc<str>,
28        /// The string representation of the data connection type that was requested.
29        data_conn_type: &'static str,
30    },
31}
32
33impl DataHub {
34    /// Creates a new `DataHub` instance.
35    ///
36    /// This initializes the `DataHub` with no local data sources and an empty data connection manager.
37    /// Global data sources, if any, are copied into the `data_src_map`.
38    #[allow(clippy::new_without_default)]
39    pub fn new() -> Self {
40        let mut data_src_map = HashMap::new();
41        copy_global_data_srcs_to_map(&mut data_src_map);
42
43        Self {
44            local_data_src_manager: DataSrcManager::new(true),
45            data_src_map,
46            data_conn_manager: DataConnManager::new(),
47            fixed: false,
48        }
49    }
50
51    /// Creates a new `DataHub` instance with a specified commit order for data connections.
52    ///
53    /// This allows defining the order in which data connections will be committed. Connections
54    /// not specified in `names` will be committed after the specified ones, in an undefined order.
55    /// Global data sources are copied into the `data_src_map`.
56    ///
57    /// # Arguments
58    ///
59    /// * `names` - An array of string slices specifying the desired commit order by data connection name.
60    pub fn with_commit_order(names: &[&str]) -> Self {
61        let mut data_src_map = HashMap::new();
62        copy_global_data_srcs_to_map(&mut data_src_map);
63
64        Self {
65            local_data_src_manager: DataSrcManager::new(true),
66            data_src_map,
67            data_conn_manager: DataConnManager::with_commit_order(names),
68            fixed: false,
69        }
70    }
71
72    /// Registers a local data source with the `DataHub`.
73    ///
74    /// This method allows adding a custom data source which can provide data connections.
75    /// Data sources can only be added before `run_async` or `txn_async` are called.
76    ///
77    /// # Arguments
78    ///
79    /// * `name` - The name to associate with this data source.
80    /// * `ds` - The data source instance, which must implement `DataSrc` and have a `'static` lifetime.
81    ///
82    /// # Type Parameters
83    ///
84    /// * `S` - The type of the data source.
85    /// * `C` - The type of the data connection provided by the data source.
86    pub fn uses<S, C>(&mut self, name: impl Into<Arc<str>>, ds: S)
87    where
88        S: DataSrc<C> + 'static,
89        C: DataConn + 'static,
90    {
91        if self.fixed {
92            return;
93        }
94        self.local_data_src_manager.add(name, ds);
95    }
96
97    /// Deregisters a local data source from the `DataHub`.
98    ///
99    /// This removes a data source previously added with `uses`. Data sources can only be
100    /// removed before `run_async` or `txn_async` are called.
101    ///
102    /// # Arguments
103    ///
104    /// * `name` - The name of the data source to remove.
105    pub fn disuses(&mut self, name: impl AsRef<str>) {
106        if self.fixed {
107            return;
108        }
109        self.data_src_map.remove(name.as_ref());
110        self.local_data_src_manager.remove(name);
111    }
112
113    #[inline]
114    async fn begin_async(&mut self) -> errs::Result<()> {
115        self.fixed = true;
116
117        let mut errors = Vec::new();
118
119        self.local_data_src_manager.setup_async(&mut errors).await;
120        if errors.is_empty() {
121            self.local_data_src_manager
122                .copy_ds_ready_to_map(&mut self.data_src_map);
123            Ok(())
124        } else {
125            Err(errs::Err::new(DataHubError::FailToSetupLocalDataSrcs {
126                errors,
127            }))
128        }
129    }
130
131    #[inline]
132    async fn commit_async(&mut self) -> errs::Result<()> {
133        self.data_conn_manager.commit_async().await
134    }
135
136    #[inline]
137    async fn rollback_async(&mut self) {
138        self.data_conn_manager.rollback_async().await
139    }
140
141    #[inline]
142    fn end(&mut self) {
143        self.data_conn_manager.close();
144        self.fixed = false;
145    }
146
147    /// Executes an asynchronous logic function with the `DataHub` and handles setup and cleanup.
148    ///
149    /// This method sets up local data sources, runs the provided `logic_fn`, and then
150    /// cleans up all data connections and sources. It does *not* automatically commit
151    /// or rollback any transactions.
152    ///
153    /// # Arguments
154    ///
155    /// * `logic_fn` - An asynchronous function that takes a mutable reference to `DataHub`
156    ///                and returns a `Result`. This function contains the application's logic.
157    ///
158    /// # Type Parameters
159    ///
160    /// * `F` - The type of the asynchronous logic function.
161    ///
162    /// # Returns
163    ///
164    /// A `Result` indicating the success or failure of the `logic_fn` execution or
165    /// the setup of data sources.
166    #[allow(clippy::doc_overindented_list_items)]
167    pub async fn run_async<F>(&mut self, mut logic_fn: F) -> errs::Result<()>
168    where
169        for<'a> F: FnMut(&'a mut DataHub) -> Pin<Box<dyn Future<Output = errs::Result<()>> + 'a>>,
170    {
171        let mut r = self.begin_async().await;
172        if r.is_ok() {
173            r = logic_fn(self).await;
174        }
175        self.end();
176        r
177    }
178
179    /// Executes an asynchronous transactional logic function with the `DataHub`, managing
180    /// setup, commit, and rollback for data connections.
181    ///
182    /// This method sets up local data sources, runs the provided `logic_fn`. If the logic
183    /// function succeeds, it attempts to commit all created data connections. If either
184    /// the logic function or the commit fails, it rolls back all data connections.
185    /// Finally, it cleans up all data connections and sources.
186    ///
187    /// # Arguments
188    ///
189    /// * `logic_fn` - An asynchronous function that takes a mutable reference to `DataHub`
190    ///                and returns a `Result`. This function contains the application's transactional logic.
191    ///
192    /// # Type Parameters
193    ///
194    /// * `F` - The type of the asynchronous transactional logic function.
195    ///
196    /// # Returns
197    ///
198    /// A `Result` indicating the success or failure of the `logic_fn` execution,
199    /// the setup of data sources, or the commit/rollback operations.
200    #[allow(clippy::doc_overindented_list_items)]
201    pub async fn txn_async<F>(&mut self, mut logic_fn: F) -> errs::Result<()>
202    where
203        for<'a> F: FnMut(&'a mut DataHub) -> Pin<Box<dyn Future<Output = errs::Result<()>> + 'a>>,
204    {
205        let mut r = self.begin_async().await;
206        if r.is_ok() {
207            r = logic_fn(self).await;
208        }
209        if r.is_ok() {
210            r = self.commit_async().await;
211        }
212        if r.is_err() {
213            self.rollback_async().await;
214        }
215        self.end();
216        r
217    }
218
219    /// Retrieves an existing data connection or creates a new one if it doesn't exist.
220    ///
221    /// This asynchronous method first checks if a data connection with the given `name`
222    /// and type `C` already exists. If not, it attempts to find a suitable data source
223    /// (local or global) to create a new data connection.
224    ///
225    /// # Arguments
226    ///
227    /// * `name` - The name of the data connection to retrieve or create.
228    ///
229    /// # Type Parameters
230    ///
231    /// * `C` - The expected type of the data connection, which must implement `DataConn` and have a `'static` lifetime.
232    ///
233    /// # Returns
234    ///
235    /// A `Result` which is `Ok` containing a mutable reference to the data connection
236    /// if found or successfully created, or an `Err` if no suitable data source is found
237    /// or connection creation fails.
238    pub async fn get_data_conn_async<C>(&mut self, name: impl AsRef<str>) -> errs::Result<&mut C>
239    where
240        C: DataConn + 'static,
241    {
242        if let Some(nnptr) = self.data_conn_manager.find_by_name(name.as_ref()) {
243            let typed_nnptr = DataConnManager::to_typed_ptr::<C>(&nnptr)?;
244            return Ok(unsafe { &mut (*typed_nnptr).data_conn });
245        }
246
247        if let Some((local, index)) = self.data_src_map.get(name.as_ref()) {
248            let boxed = if *local {
249                self.local_data_src_manager
250                    .create_data_conn_async::<C>(*index, name.as_ref())
251                    .await?
252            } else {
253                create_data_conn_from_global_data_src_async::<C>(*index, name.as_ref()).await?
254            };
255
256            let ptr = Box::into_raw(boxed);
257            if let Some(nnptr) = ptr::NonNull::new(ptr) {
258                self.data_conn_manager
259                    .add(nnptr.cast::<DataConnContainer>());
260
261                let typed_ptr = ptr.cast::<DataConnContainer<C>>();
262                return Ok(unsafe { &mut (*typed_ptr).data_conn });
263            } else {
264                // impossible case.
265            }
266        }
267
268        Err(errs::Err::new(DataHubError::NoDataSrcToCreateDataConn {
269            name: name.as_ref().into(),
270            data_conn_type: any::type_name::<C>(),
271        }))
272    }
273}
274
275#[macro_export]
276#[doc(hidden)]
277macro_rules! _logic {
278    ($f:expr) => {
279        |data| Box::pin($f(data))
280    };
281}
282
283#[cfg(test)]
284mod tests_of_data_hub {
285    use super::*;
286    use crate::tokio::AsyncGroup;
287    use std::sync::Mutex;
288
289    #[derive(Clone, Copy, PartialEq)]
290    enum Failure {
291        None,
292        FailToPreCommit,
293        FailToCommit,
294        FailToSetup,
295        FailToCreateDataConn,
296    }
297
298    struct MyDataConn {
299        id: i8,
300        failure: Failure,
301        committed: bool,
302        logger: Arc<Mutex<Vec<String>>>,
303    }
304    impl MyDataConn {
305        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, failure: Failure) -> Self {
306            logger
307                .lock()
308                .unwrap()
309                .push(format!("MyDataConn::new {}", id));
310            Self {
311                id,
312                failure,
313                committed: false,
314                logger,
315            }
316        }
317    }
318    impl Drop for MyDataConn {
319        fn drop(&mut self) {
320            self.logger
321                .lock()
322                .unwrap()
323                .push(format!("MyDataConn::drop {}", self.id));
324        }
325    }
326    impl DataConn for MyDataConn {
327        async fn pre_commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
328            if self.failure == Failure::FailToPreCommit {
329                self.logger
330                    .lock()
331                    .unwrap()
332                    .push(format!("MyDataConn::pre_commit {} failed", self.id));
333                Err(errs::Err::new("pre commit error"))
334            } else {
335                self.logger
336                    .lock()
337                    .unwrap()
338                    .push(format!("MyDataConn::pre_commit {}", self.id));
339                Ok(())
340            }
341        }
342        async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
343            if self.failure == Failure::FailToCommit {
344                self.logger
345                    .lock()
346                    .unwrap()
347                    .push(format!("MyDataConn::commit {} failed", self.id));
348                Err(errs::Err::new("commit error"))
349            } else {
350                self.logger
351                    .lock()
352                    .unwrap()
353                    .push(format!("MyDataConn::commit {}", self.id));
354                self.committed = true;
355                Ok(())
356            }
357        }
358        async fn post_commit_async(&mut self, _ag: &mut AsyncGroup) {
359            self.logger
360                .lock()
361                .unwrap()
362                .push(format!("MyDataConn::post_commit {}", self.id));
363        }
364        async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {
365            self.logger
366                .lock()
367                .unwrap()
368                .push(format!("MyDataConn::rollback {}", self.id));
369        }
370        async fn force_back_async(&mut self, _ag: &mut AsyncGroup) {
371            self.logger
372                .lock()
373                .unwrap()
374                .push(format!("MyDataConn::force_back {}", self.id));
375        }
376        fn close(&mut self) {
377            self.logger
378                .lock()
379                .unwrap()
380                .push(format!("MyDataConn::close {}", self.id));
381        }
382    }
383
384    struct MyDataSrc {
385        id: i8,
386        failure: Failure,
387        logger: Arc<Mutex<Vec<String>>>,
388    }
389    impl MyDataSrc {
390        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, failure: Failure) -> Self {
391            logger
392                .lock()
393                .unwrap()
394                .push(format!("MyDataSrc::new {}", id));
395            Self {
396                id,
397                failure,
398                logger,
399            }
400        }
401    }
402    impl Drop for MyDataSrc {
403        fn drop(&mut self) {
404            self.logger
405                .lock()
406                .unwrap()
407                .push(format!("MyDataSrc::drop {}", self.id));
408        }
409    }
410    impl DataSrc<MyDataConn> for MyDataSrc {
411        async fn setup_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
412            if self.failure == Failure::FailToSetup {
413                self.logger
414                    .lock()
415                    .unwrap()
416                    .push(format!("MyDataSrc::setup {} failed", self.id));
417                Err(errs::Err::new("setup error".to_string()))
418            } else {
419                self.logger
420                    .lock()
421                    .unwrap()
422                    .push(format!("MyDataSrc::setup {}", self.id));
423                Ok(())
424            }
425        }
426        fn close(&mut self) {
427            self.logger
428                .lock()
429                .unwrap()
430                .push(format!("MyDataSrc::close {}", self.id));
431        }
432        async fn create_data_conn_async(&mut self) -> errs::Result<Box<MyDataConn>> {
433            if self.failure == Failure::FailToCreateDataConn {
434                self.logger
435                    .lock()
436                    .unwrap()
437                    .push(format!("MyDataSrc::create_data_conn {} failed", self.id));
438                return Err(errs::Err::new("eeee".to_string()));
439            }
440            {
441                self.logger
442                    .lock()
443                    .unwrap()
444                    .push(format!("MyDataSrc::create_data_conn {}", self.id));
445            }
446            let conn = MyDataConn::new(self.id, self.logger.clone(), self.failure);
447            Ok(Box::new(conn))
448        }
449    }
450
451    #[test]
452    fn test_new() {
453        let hub = DataHub::new();
454        assert!(hub.local_data_src_manager.vec_unready.is_empty());
455        assert!(hub.local_data_src_manager.vec_ready.is_empty());
456        assert!(hub.local_data_src_manager.local);
457        assert!(hub.data_src_map.is_empty());
458        assert!(hub.data_conn_manager.vec.is_empty());
459        assert!(hub.data_conn_manager.index_map.is_empty());
460        assert!(!hub.fixed);
461    }
462
463    #[test]
464    fn test_with_commit_order() {
465        let hub = DataHub::with_commit_order(&["bar", "qux", "foo"]);
466        assert!(hub.local_data_src_manager.vec_unready.is_empty());
467        assert!(hub.local_data_src_manager.vec_ready.is_empty());
468        assert!(hub.local_data_src_manager.local);
469        assert!(hub.data_src_map.is_empty());
470        assert_eq!(hub.data_conn_manager.vec.len(), 3);
471        assert_eq!(hub.data_conn_manager.index_map.len(), 3);
472        assert!(!hub.fixed);
473    }
474
475    #[tokio::test]
476    async fn test_uses_and_ok() {
477        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
478
479        let mut hub = DataHub::new();
480        hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
481        hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
482
483        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
484        assert!(hub.local_data_src_manager.vec_ready.is_empty());
485        assert!(hub.local_data_src_manager.local);
486        assert!(hub.data_src_map.is_empty());
487        assert_eq!(hub.data_conn_manager.vec.len(), 0);
488        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
489        assert!(!hub.fixed);
490
491        assert!(hub.begin_async().await.is_ok());
492
493        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
494        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
495        assert!(hub.local_data_src_manager.local);
496        assert_eq!(hub.data_src_map.len(), 2);
497        assert_eq!(hub.data_conn_manager.vec.len(), 0);
498        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
499        assert!(hub.fixed);
500    }
501
502    #[tokio::test]
503    async fn test_uses_but_already_fixed() {
504        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
505
506        let mut hub = DataHub::new();
507        hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
508
509        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 1);
510        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 0);
511        assert!(hub.local_data_src_manager.local);
512        assert_eq!(hub.data_src_map.len(), 0);
513        assert_eq!(hub.data_conn_manager.vec.len(), 0);
514        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
515        assert!(!hub.fixed);
516
517        assert!(hub.begin_async().await.is_ok());
518
519        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
520        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 1);
521        assert!(hub.local_data_src_manager.local);
522        assert_eq!(hub.data_src_map.len(), 1);
523        assert_eq!(hub.data_conn_manager.vec.len(), 0);
524        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
525        assert!(hub.fixed);
526
527        hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
528
529        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
530        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 1);
531        assert!(hub.local_data_src_manager.local);
532        assert_eq!(hub.data_src_map.len(), 1);
533        assert_eq!(hub.data_conn_manager.vec.len(), 0);
534        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
535        assert!(hub.fixed);
536    }
537
538    #[test]
539    fn test_disuses_and_ok() {
540        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
541
542        let mut hub = DataHub::new();
543        hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
544        hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
545
546        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
547        assert!(hub.local_data_src_manager.vec_ready.is_empty());
548        assert!(hub.local_data_src_manager.local);
549        assert!(hub.data_src_map.is_empty());
550        assert_eq!(hub.data_conn_manager.vec.len(), 0);
551        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
552        assert!(!hub.fixed);
553
554        hub.disuses("foo");
555
556        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 1);
557        assert!(hub.local_data_src_manager.vec_ready.is_empty());
558        assert!(hub.local_data_src_manager.local);
559        assert!(hub.data_src_map.is_empty());
560        assert_eq!(hub.data_conn_manager.vec.len(), 0);
561        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
562        assert!(!hub.fixed);
563
564        hub.disuses("bar");
565
566        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
567        assert!(hub.local_data_src_manager.vec_ready.is_empty());
568        assert!(hub.local_data_src_manager.local);
569        assert!(hub.data_src_map.is_empty());
570        assert_eq!(hub.data_conn_manager.vec.len(), 0);
571        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
572        assert!(!hub.fixed);
573    }
574
575    #[tokio::test]
576    async fn test_disuses_and_fix() {
577        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
578
579        let mut hub = DataHub::new();
580        hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
581        hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
582
583        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
584        assert!(hub.local_data_src_manager.vec_ready.is_empty());
585        assert!(hub.local_data_src_manager.local);
586        assert!(hub.data_src_map.is_empty());
587        assert_eq!(hub.data_conn_manager.vec.len(), 0);
588        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
589        assert!(!hub.fixed);
590
591        hub.disuses("foo");
592
593        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 1);
594        assert!(hub.local_data_src_manager.vec_ready.is_empty());
595        assert!(hub.local_data_src_manager.local);
596        assert!(hub.data_src_map.is_empty());
597        assert_eq!(hub.data_conn_manager.vec.len(), 0);
598        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
599        assert!(!hub.fixed);
600
601        hub.disuses("bar");
602
603        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
604        assert!(hub.local_data_src_manager.vec_ready.is_empty());
605        assert!(hub.local_data_src_manager.local);
606        assert!(hub.data_src_map.is_empty());
607        assert_eq!(hub.data_conn_manager.vec.len(), 0);
608        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
609        assert!(!hub.fixed);
610
611        hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
612        hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
613
614        assert!(hub.begin_async().await.is_ok());
615
616        assert!(hub.local_data_src_manager.vec_unready.is_empty());
617        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
618        assert!(hub.local_data_src_manager.local);
619        assert_eq!(hub.data_src_map.len(), 2);
620        assert_eq!(hub.data_conn_manager.vec.len(), 0);
621        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
622        assert!(hub.fixed);
623
624        hub.uses("baz", MyDataSrc::new(3, logger.clone(), Failure::None));
625
626        assert!(hub.local_data_src_manager.vec_unready.is_empty());
627        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
628        assert!(hub.local_data_src_manager.local);
629        assert_eq!(hub.data_src_map.len(), 2);
630        assert_eq!(hub.data_conn_manager.vec.len(), 0);
631        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
632        assert!(hub.fixed);
633
634        hub.disuses("bar");
635
636        assert!(hub.local_data_src_manager.vec_unready.is_empty());
637        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
638        assert!(hub.local_data_src_manager.local);
639        assert_eq!(hub.data_src_map.len(), 2);
640        assert_eq!(hub.data_conn_manager.vec.len(), 0);
641        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
642        assert!(hub.fixed);
643
644        hub.end();
645
646        assert!(hub.local_data_src_manager.vec_unready.is_empty());
647        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
648        assert!(hub.local_data_src_manager.local);
649        assert_eq!(hub.data_src_map.len(), 2);
650        assert_eq!(hub.data_conn_manager.vec.len(), 0);
651        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
652        assert!(!hub.fixed);
653
654        hub.disuses("bar");
655
656        assert!(hub.local_data_src_manager.vec_unready.is_empty());
657        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 1);
658        assert!(hub.local_data_src_manager.local);
659        assert_eq!(hub.data_src_map.len(), 1);
660        assert_eq!(hub.data_conn_manager.vec.len(), 0);
661        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
662        assert!(!hub.fixed);
663
664        hub.disuses("foo");
665
666        assert!(hub.local_data_src_manager.vec_unready.is_empty());
667        assert!(hub.local_data_src_manager.vec_ready.is_empty());
668        assert!(hub.local_data_src_manager.local);
669        assert_eq!(hub.data_src_map.len(), 0);
670        assert_eq!(hub.data_conn_manager.vec.len(), 0);
671        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
672        assert!(!hub.fixed);
673    }
674
675    #[tokio::test]
676    async fn test_begin_if_empty() {
677        let mut hub = DataHub::new();
678        assert!(hub.begin_async().await.is_ok());
679
680        assert!(hub.local_data_src_manager.vec_unready.is_empty());
681        assert!(hub.local_data_src_manager.vec_ready.is_empty());
682        assert!(hub.local_data_src_manager.local);
683        assert_eq!(hub.data_src_map.len(), 0);
684        assert_eq!(hub.data_conn_manager.vec.len(), 0);
685        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
686        assert!(hub.fixed);
687
688        hub.end();
689
690        assert!(hub.local_data_src_manager.vec_unready.is_empty());
691        assert!(hub.local_data_src_manager.vec_ready.is_empty());
692        assert!(hub.local_data_src_manager.local);
693        assert_eq!(hub.data_src_map.len(), 0);
694        assert_eq!(hub.data_conn_manager.vec.len(), 0);
695        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
696        assert!(!hub.fixed);
697    }
698
699    #[tokio::test]
700    async fn test_begin_and_ok() {
701        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
702
703        {
704            let mut hub = DataHub::new();
705
706            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
707            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
708
709            assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
710            assert_eq!(hub.local_data_src_manager.vec_ready.len(), 0);
711            assert_eq!(hub.local_data_src_manager.local, true);
712            assert_eq!(hub.data_src_map.len(), 0);
713            assert_eq!(hub.data_conn_manager.vec.len(), 0);
714            assert_eq!(hub.data_conn_manager.index_map.len(), 0);
715            assert_eq!(hub.fixed, false);
716
717            assert_eq!(hub.begin_async().await.is_ok(), true);
718
719            assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
720            assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
721            assert_eq!(hub.local_data_src_manager.local, true);
722            assert_eq!(hub.data_src_map.len(), 2);
723            assert_eq!(hub.data_conn_manager.vec.len(), 0);
724            assert_eq!(hub.data_conn_manager.index_map.len(), 0);
725            assert_eq!(hub.fixed, true);
726
727            hub.end();
728
729            assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
730            assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
731            assert_eq!(hub.local_data_src_manager.local, true);
732            assert_eq!(hub.data_src_map.len(), 2);
733            assert_eq!(hub.data_conn_manager.vec.len(), 0);
734            assert_eq!(hub.data_conn_manager.index_map.len(), 0);
735            assert_eq!(hub.fixed, false);
736        }
737
738        assert_eq!(
739            *logger.lock().unwrap(),
740            &[
741                "MyDataSrc::new 1",
742                "MyDataSrc::new 2",
743                "MyDataSrc::setup 1",
744                "MyDataSrc::setup 2",
745                "MyDataSrc::close 2",
746                "MyDataSrc::drop 2",
747                "MyDataSrc::close 1",
748                "MyDataSrc::drop 1",
749            ]
750        );
751    }
752
753    #[tokio::test]
754    async fn test_begin_but_failed() {
755        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
756
757        {
758            let mut hub = DataHub::new();
759
760            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
761            hub.uses(
762                "bar",
763                MyDataSrc::new(2, logger.clone(), Failure::FailToSetup),
764            );
765            hub.uses("baz", MyDataSrc::new(3, logger.clone(), Failure::None));
766
767            assert_eq!(hub.local_data_src_manager.vec_unready.len(), 3);
768            assert_eq!(hub.local_data_src_manager.vec_ready.len(), 0);
769            assert_eq!(hub.local_data_src_manager.local, true);
770            assert_eq!(hub.data_src_map.len(), 0);
771            assert_eq!(hub.data_conn_manager.vec.len(), 0);
772            assert_eq!(hub.data_conn_manager.index_map.len(), 0);
773            assert_eq!(hub.fixed, false);
774
775            if let Err(err) = hub.begin_async().await {
776                match err.reason::<DataHubError>() {
777                    Ok(DataHubError::FailToSetupLocalDataSrcs { errors }) => {
778                        assert_eq!(errors.len(), 1);
779                        assert_eq!(errors[0].0, "bar".into());
780                        assert_eq!(errors[0].1.reason::<String>().unwrap(), "setup error");
781                    }
782                    _ => panic!(),
783                }
784            } else {
785                panic!();
786            }
787
788            hub.end();
789        }
790
791        assert_eq!(
792            *logger.lock().unwrap(),
793            &[
794                "MyDataSrc::new 1",
795                "MyDataSrc::new 2",
796                "MyDataSrc::new 3",
797                "MyDataSrc::setup 1",
798                "MyDataSrc::setup 2 failed",
799                "MyDataSrc::close 2",
800                "MyDataSrc::close 1",
801                "MyDataSrc::drop 3",
802                "MyDataSrc::drop 2",
803                "MyDataSrc::drop 1",
804            ]
805        );
806    }
807
808    #[tokio::test]
809    async fn test_run_and_ok() {
810        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
811        {
812            let mut hub = DataHub::new();
813
814            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
815            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
816
817            let logger_clone = logger.clone();
818            assert!(hub
819                .run_async(|_data| {
820                    let logger_clone2 = logger_clone.clone();
821                    Box::pin(async move {
822                        logger_clone2
823                            .lock()
824                            .unwrap()
825                            .push("execute logic".to_string());
826                        Ok(())
827                    })
828                })
829                .await
830                .is_ok());
831        }
832
833        assert_eq!(
834            *logger.lock().unwrap(),
835            &[
836                "MyDataSrc::new 1",
837                "MyDataSrc::new 2",
838                "MyDataSrc::setup 1",
839                "MyDataSrc::setup 2",
840                "execute logic",
841                "MyDataSrc::close 2",
842                "MyDataSrc::drop 2",
843                "MyDataSrc::close 1",
844                "MyDataSrc::drop 1",
845            ]
846        );
847    }
848
849    #[tokio::test]
850    async fn test_run_but_failed() {
851        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
852        {
853            let mut hub = DataHub::new();
854
855            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
856            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
857
858            let logger_clone = logger.clone();
859            if let Err(err) = hub
860                .run_async(|_data| {
861                    let logger_clone2 = logger_clone.clone();
862                    Box::pin(async move {
863                        logger_clone2
864                            .lock()
865                            .unwrap()
866                            .push("execute logic but fail".to_string());
867                        Err(errs::Err::new("logic error".to_string()))
868                    })
869                })
870                .await
871            {
872                match err.reason::<String>() {
873                    Ok(s) => assert_eq!(s, "logic error"),
874                    _ => panic!(),
875                }
876            } else {
877                panic!();
878            }
879        }
880
881        assert_eq!(
882            *logger.lock().unwrap(),
883            &[
884                "MyDataSrc::new 1",
885                "MyDataSrc::new 2",
886                "MyDataSrc::setup 1",
887                "MyDataSrc::setup 2",
888                "execute logic but fail",
889                "MyDataSrc::close 2",
890                "MyDataSrc::drop 2",
891                "MyDataSrc::close 1",
892                "MyDataSrc::drop 1",
893            ]
894        );
895    }
896
897    #[tokio::test]
898    async fn test_txn_and_no_data_access_and_ok() {
899        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
900        {
901            let mut hub = DataHub::new();
902
903            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
904            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
905
906            let logger_clone = logger.clone();
907            assert!(hub
908                .txn_async(|_data| {
909                    let logger_clone2 = logger_clone.clone();
910                    Box::pin(async move {
911                        logger_clone2
912                            .lock()
913                            .unwrap()
914                            .push("execute logic".to_string());
915                        Ok(())
916                    })
917                })
918                .await
919                .is_ok());
920        }
921
922        assert_eq!(
923            *logger.lock().unwrap(),
924            &[
925                "MyDataSrc::new 1",
926                "MyDataSrc::new 2",
927                "MyDataSrc::setup 1",
928                "MyDataSrc::setup 2",
929                "execute logic",
930                "MyDataSrc::close 2",
931                "MyDataSrc::drop 2",
932                "MyDataSrc::close 1",
933                "MyDataSrc::drop 1",
934            ]
935        );
936    }
937
938    #[tokio::test]
939    async fn test_txn_and_has_data_access_and_ok() {
940        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
941        {
942            let mut hub = DataHub::new();
943
944            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
945            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
946
947            let logger_clone = logger.clone();
948            hub.txn_async(move |data| {
949                let logger_clone2 = logger_clone.clone();
950                Box::pin(async move {
951                    logger_clone2
952                        .lock()
953                        .unwrap()
954                        .push("execute logic".to_string());
955                    let _conn1 = data.get_data_conn_async::<MyDataConn>("foo").await?;
956                    let _conn2 = data.get_data_conn_async::<MyDataConn>("bar").await?;
957                    Ok(())
958                })
959            })
960            .await
961            .unwrap()
962        }
963
964        assert_eq!(
965            *logger.lock().unwrap(),
966            &[
967                "MyDataSrc::new 1",
968                "MyDataSrc::new 2",
969                "MyDataSrc::setup 1",
970                "MyDataSrc::setup 2",
971                "execute logic",
972                "MyDataSrc::create_data_conn 1",
973                "MyDataConn::new 1",
974                "MyDataSrc::create_data_conn 2",
975                "MyDataConn::new 2",
976                "MyDataConn::pre_commit 1",
977                "MyDataConn::pre_commit 2",
978                "MyDataConn::commit 1",
979                "MyDataConn::commit 2",
980                "MyDataConn::post_commit 1",
981                "MyDataConn::post_commit 2",
982                "MyDataConn::close 1",
983                "MyDataConn::drop 1",
984                "MyDataConn::close 2",
985                "MyDataConn::drop 2",
986                "MyDataSrc::close 2",
987                "MyDataSrc::drop 2",
988                "MyDataSrc::close 1",
989                "MyDataSrc::drop 1",
990            ]
991        );
992    }
993
994    #[tokio::test]
995    async fn test_txn_but_failed() {
996        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
997        {
998            let mut hub = DataHub::new();
999
1000            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
1001            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
1002
1003            let logger_clone = logger.clone();
1004            if let Err(e) = hub
1005                .txn_async(move |data| {
1006                    let logger_clone2 = logger_clone.clone();
1007                    Box::pin(async move {
1008                        logger_clone2
1009                            .lock()
1010                            .unwrap()
1011                            .push("execute logic".to_string());
1012                        let _conn1 = data.get_data_conn_async::<MyDataConn>("foo").await?;
1013                        let _conn2 = data.get_data_conn_async::<MyDataConn>("bar").await?;
1014                        Err(errs::Err::new("logic error"))
1015                    })
1016                })
1017                .await
1018            {
1019                match e.reason::<&str>() {
1020                    Ok(s) => assert_eq!(s, &"logic error"),
1021                    _ => panic!(),
1022                }
1023            }
1024        }
1025
1026        assert_eq!(
1027            *logger.lock().unwrap(),
1028            &[
1029                "MyDataSrc::new 1",
1030                "MyDataSrc::new 2",
1031                "MyDataSrc::setup 1",
1032                "MyDataSrc::setup 2",
1033                "execute logic",
1034                "MyDataSrc::create_data_conn 1",
1035                "MyDataConn::new 1",
1036                "MyDataSrc::create_data_conn 2",
1037                "MyDataConn::new 2",
1038                "MyDataConn::rollback 1",
1039                "MyDataConn::rollback 2",
1040                "MyDataConn::close 1",
1041                "MyDataConn::drop 1",
1042                "MyDataConn::close 2",
1043                "MyDataConn::drop 2",
1044                "MyDataSrc::close 2",
1045                "MyDataSrc::drop 2",
1046                "MyDataSrc::close 1",
1047                "MyDataSrc::drop 1",
1048            ]
1049        );
1050    }
1051
1052    #[tokio::test]
1053    async fn test_txn_with_commit_order() {
1054        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1055        {
1056            let mut hub = DataHub::with_commit_order(&["bar", "foo"]);
1057
1058            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
1059            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
1060
1061            let logger_clone = logger.clone();
1062            hub.txn_async(move |data| {
1063                let logger_clone2 = logger_clone.clone();
1064                Box::pin(async move {
1065                    logger_clone2
1066                        .lock()
1067                        .unwrap()
1068                        .push("execute logic".to_string());
1069                    let _conn1 = data.get_data_conn_async::<MyDataConn>("foo").await?;
1070                    let _conn2 = data.get_data_conn_async::<MyDataConn>("bar").await?;
1071                    Ok(())
1072                })
1073            })
1074            .await
1075            .unwrap();
1076        }
1077
1078        assert_eq!(
1079            *logger.lock().unwrap(),
1080            &[
1081                "MyDataSrc::new 1",
1082                "MyDataSrc::new 2",
1083                "MyDataSrc::setup 1",
1084                "MyDataSrc::setup 2",
1085                "execute logic",
1086                "MyDataSrc::create_data_conn 1",
1087                "MyDataConn::new 1",
1088                "MyDataSrc::create_data_conn 2",
1089                "MyDataConn::new 2",
1090                "MyDataConn::pre_commit 2",
1091                "MyDataConn::pre_commit 1",
1092                "MyDataConn::commit 2",
1093                "MyDataConn::commit 1",
1094                "MyDataConn::post_commit 2",
1095                "MyDataConn::post_commit 1",
1096                "MyDataConn::close 2",
1097                "MyDataConn::drop 2",
1098                "MyDataConn::close 1",
1099                "MyDataConn::drop 1",
1100                "MyDataSrc::close 2",
1101                "MyDataSrc::drop 2",
1102                "MyDataSrc::close 1",
1103                "MyDataSrc::drop 1",
1104            ]
1105        );
1106    }
1107
1108    #[tokio::test]
1109    async fn test_get_data_conn_and_failed() {
1110        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1111        {
1112            let mut hub = DataHub::new();
1113
1114            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
1115            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
1116
1117            let logger_clone = logger.clone();
1118            let err = hub
1119                .txn_async(move |data| {
1120                    let logger_clone2 = logger_clone.clone();
1121                    Box::pin(async move {
1122                        logger_clone2
1123                            .lock()
1124                            .unwrap()
1125                            .push("execute logic".to_string());
1126                        let _conn1 = data.get_data_conn_async::<MyDataConn>("fxx").await?;
1127                        Ok(())
1128                    })
1129                })
1130                .await
1131                .unwrap_err();
1132
1133            match err.reason::<DataHubError>() {
1134                Ok(r) => match r {
1135                    DataHubError::NoDataSrcToCreateDataConn {
1136                        name,
1137                        data_conn_type,
1138                    } => {
1139                        assert_eq!(name.as_ref(), "fxx");
1140                        assert_eq!(
1141                            data_conn_type,
1142                            &"sabi::tokio::data_hub::tests_of_data_hub::MyDataConn"
1143                        );
1144                    }
1145                    _ => panic!(),
1146                },
1147                _ => panic!(),
1148            }
1149        }
1150
1151        assert_eq!(
1152            *logger.lock().unwrap(),
1153            &[
1154                "MyDataSrc::new 1",
1155                "MyDataSrc::new 2",
1156                "MyDataSrc::setup 1",
1157                "MyDataSrc::setup 2",
1158                "execute logic",
1159                "MyDataSrc::close 2",
1160                "MyDataSrc::drop 2",
1161                "MyDataSrc::close 1",
1162                "MyDataSrc::drop 1",
1163            ]
1164        );
1165    }
1166}