Skip to main content

sabi/
data_hub.rs

1// Copyright (C) 2024-2026 Takayuki Sato. All Rights Reserved.
2// This program is free software under MIT License.
3// See the file LICENSE in this distribution for more details.
4
5use crate::data_src::{copy_global_data_srcs_to_map, create_data_conn_from_global_data_src};
6use crate::{DataConn, DataConnManager, DataHub, DataSrc, DataSrcManager};
7
8#[allow(unused)] // for rustdoc
9use crate::DataAcc;
10
11use crate::DataConnContainer;
12
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::{any, ptr};
16
17/// An enum type representing the reasons for errors that can occur within [`DataHub`] operations.
18#[derive(Debug)]
19pub enum DataHubError {
20    /// Indicates a failure during the setup process of one or more session-local data sources.
21    /// Contains a vector of data source names and their corresponding errors.
22    FailToSetupLocalDataSrcs {
23        /// The vector contains errors that occurred in each [`DataSrc`] object.
24        errors: Vec<(Arc<str>, errs::Err)>,
25    },
26
27    /// Indicates that no [`DataSrc`] was found to create a [`DataConn`] for the specified name
28    /// and type.
29    NoDataSrcToCreateDataConn {
30        /// The name of the data source that could not be found.
31        name: Arc<str>,
32        /// The type name of the [`DataConn`] that was requested.
33        data_conn_type: &'static str,
34    },
35}
36
37impl DataHub {
38    /// Creates a new [`DataHub`] instance.
39    ///
40    /// Upon creation, it collects references to globally set-up data sources
41    /// into its internal map for quick access.
42    #[allow(clippy::new_without_default)]
43    pub fn new() -> Self {
44        let mut data_src_map = HashMap::new();
45        copy_global_data_srcs_to_map(&mut data_src_map);
46
47        Self {
48            local_data_src_manager: DataSrcManager::new(true),
49            data_src_map,
50            data_conn_manager: DataConnManager::new(),
51            fixed: false,
52        }
53    }
54
55    /// Creates a new [`DataHub`] instance with a specified commit order for data connections.
56    ///
57    /// This constructor allows defining a specific order for pre-commit, commit, and post-commit
58    /// operations for named data connections. Data connections not specified in `names` will
59    /// be processed after the named ones, in their order of acquisition.
60    ///
61    /// Upon creation, it collects references to globally set-up data sources
62    /// into its internal map for quick access.
63    ///
64    /// # Parameters
65    ///
66    /// * `names`: A slice of `&str` representing the names of data connections to commit in a
67    ///   specific order.
68    pub fn with_commit_order(names: &[&str]) -> Self {
69        let mut data_src_map = HashMap::new();
70        copy_global_data_srcs_to_map(&mut data_src_map);
71
72        Self {
73            local_data_src_manager: DataSrcManager::new(true),
74            data_src_map,
75            data_conn_manager: DataConnManager::with_commit_order(names),
76            fixed: false,
77        }
78    }
79
80    #[allow(rustdoc::broken_intra_doc_links)]
81    /// Registers a session-local data source with this [`DataHub`] instance.
82    ///
83    /// This method is similar to the global [`uses!`] macro but registers a data source
84    /// that is local to this specific [`DataHub`] session. Once the [`DataHub`]'s state is
85    /// "fixed" (while [`DataHub::run`] or [`DataHub::txn`] method is executing),
86    /// further calls to `uses` are ignored. However, after the method completes,
87    /// the [`DataHub`]'s "fixed" state is reset, allowing for new data sources to be
88    /// registered or removed via [`DataHub::disuses`] method in subsequent operations.
89    ///
90    /// # Parameters
91    ///
92    /// * `name`: The unique name for the local data source.
93    /// * `ds`: The [`DataSrc`] instance to register.
94    pub fn uses<S, C>(&mut self, name: impl Into<Arc<str>>, ds: S)
95    where
96        S: DataSrc<C>,
97        C: DataConn + 'static,
98    {
99        if self.fixed {
100            return;
101        }
102        self.local_data_src_manager.add(name, ds);
103    }
104
105    /// Unregisters and drops a session-local data source by its name.
106    ///
107    /// This method removes a data source that was previously registered via [`DataHub::uses`].
108    /// This operation is ignored if the [`DataHub`]'s state is already "fixed".
109    ///
110    /// # Parameters
111    ///
112    /// * `name`: The name of the local data source to unregister.
113    pub fn disuses(&mut self, name: impl AsRef<str>) {
114        if self.fixed {
115            return;
116        }
117        self.data_src_map.remove(name.as_ref());
118        self.local_data_src_manager.remove(name);
119    }
120
121    #[inline]
122    fn begin(&mut self) -> errs::Result<()> {
123        self.fixed = true;
124
125        let mut errors = Vec::new();
126
127        self.local_data_src_manager.setup(&mut errors);
128        if errors.is_empty() {
129            self.local_data_src_manager
130                .copy_ds_ready_to_map(&mut self.data_src_map);
131            Ok(())
132        } else {
133            Err(errs::Err::new(DataHubError::FailToSetupLocalDataSrcs {
134                errors,
135            }))
136        }
137    }
138
139    #[inline]
140    fn commit(&mut self) -> errs::Result<()> {
141        self.data_conn_manager.commit()
142    }
143
144    #[inline]
145    fn rollback(&mut self) {
146        self.data_conn_manager.rollback()
147    }
148
149    #[inline]
150    fn end(&mut self) {
151        self.data_conn_manager.close();
152        self.fixed = false;
153    }
154
155    /// Executes a given logic function without transaction control.
156    ///
157    /// This method sets up local data sources, runs the provided closure,
158    /// and then cleans up the [`DataHub`]'s session resources. It does not
159    /// perform commit or rollback operations.
160    ///
161    /// # Parameters
162    ///
163    /// * `logic_fn`: A closure that encapsulates the business logic to be executed.
164    ///   It takes a mutable reference to [`DataHub`] as an argument.
165    ///
166    /// # Returns
167    ///
168    /// * `errs::Result<()>`: The result of the logic function's execution,
169    ///   or an error if executing `logic_fn` fails.
170    pub fn run<F>(&mut self, mut logic_fn: F) -> errs::Result<()>
171    where
172        F: FnMut(&mut DataHub) -> errs::Result<()>,
173    {
174        let mut r = self.begin();
175        if r.is_ok() {
176            r = logic_fn(self);
177        }
178        self.end();
179        r
180    }
181
182    /// Executes a given logic function within a transaction.
183    ///
184    /// This method first sets up local data sources, then runs the provided closure.
185    /// If the closure returns `Ok`, it attempts to commit all changes. If the commit fails,
186    /// or if the logic function itself returns an [`errs::Err`], a rollback operation
187    /// is performed. After succeeding `pre_commit` and `commit` methods of all [`DataConn`]s,
188    /// `post_commit` methods of all [`DataConn`]s are executed.
189    /// Finally, it cleans up the [`DataHub`]'s session resources.
190    ///
191    /// # Parameters
192    ///
193    /// * `logic_fn`: A closure that encapsulates the business logic to be executed.
194    ///   It takes a mutable reference to [`DataHub`] as an argument.
195    ///
196    /// # Returns
197    ///
198    /// * `errs::Result<()>`: The final result of the transaction (success or failure of
199    ///   logic/commit), or an error if executing `logic_fn` fails.
200    pub fn txn<F>(&mut self, mut logic_fn: F) -> errs::Result<()>
201    where
202        F: FnMut(&mut DataHub) -> errs::Result<()>,
203    {
204        let mut r = self.begin();
205        if r.is_ok() {
206            r = logic_fn(self);
207        }
208        if r.is_ok() {
209            r = self.commit();
210        }
211        if r.is_err() {
212            self.rollback();
213        }
214        self.end();
215        r
216    }
217
218    /// Retrieves a mutable reference to a [`DataConn`] object by name, creating it if necessary.
219    ///
220    /// This is the core method used by [`DataAcc`] implementations to obtain connections
221    /// to external data services. It first checks if a [`DataConn`] with the given name
222    /// already exists in the [`DataHub`]'s session. If not, it attempts to find a
223    /// corresponding [`DataSrc`] and create a new [`DataConn`] from it.
224    ///
225    /// # Type Parameters
226    ///
227    /// * `C`: The concrete type of [`DataConn`] expected.
228    ///
229    /// # Parameters
230    ///
231    /// * `name`: The name of the data source/connection to retrieve.
232    ///
233    /// # Returns
234    ///
235    /// * `errs::Result<&mut C>`: A mutable reference to the [`DataConn`] instance if successful,
236    ///   or an [`errs::Err`] if the data source is not found, or if the retrieved/created
237    ///   [`DataConn`] cannot be cast to the specified type `C`.
238    pub fn get_data_conn<C>(&mut self, name: impl AsRef<str>) -> errs::Result<&mut C>
239    where
240        C: DataConn + 'static,
241    {
242        if let Some(nnptr) = self.data_conn_manager.find_by_name(name.as_ref()) {
243            let typed_nnptr = DataConnManager::to_typed_ptr::<C>(&nnptr)?;
244            return Ok(unsafe { &mut (*typed_nnptr).data_conn });
245        }
246
247        if let Some((local, index)) = self.data_src_map.get(name.as_ref()) {
248            let boxed = if *local {
249                self.local_data_src_manager
250                    .create_data_conn::<C>(*index, name.as_ref())?
251            } else {
252                create_data_conn_from_global_data_src::<C>(*index, name.as_ref())?
253            };
254
255            let ptr = Box::into_raw(boxed);
256            if let Some(nnptr) = ptr::NonNull::new(ptr) {
257                self.data_conn_manager
258                    .add(nnptr.cast::<DataConnContainer>());
259
260                let typed_ptr = ptr.cast::<DataConnContainer<C>>();
261                return Ok(unsafe { &mut (*typed_ptr).data_conn });
262            } else {
263                // impossible case.
264            }
265        }
266
267        Err(errs::Err::new(DataHubError::NoDataSrcToCreateDataConn {
268            name: name.as_ref().into(),
269            data_conn_type: any::type_name::<C>(),
270        }))
271    }
272}
273
274#[cfg(test)]
275mod tests_of_data_hub {
276    use super::*;
277    use crate::AsyncGroup;
278    use std::sync::Mutex;
279
280    #[derive(Clone, Copy, PartialEq)]
281    enum Failure {
282        None,
283        FailToPreCommit,
284        FailToCommit,
285        FailToSetup,
286        FailToCreateDataConn,
287    }
288
289    struct MyDataConn {
290        id: i8,
291        failure: Failure,
292        committed: bool,
293        logger: Arc<Mutex<Vec<String>>>,
294    }
295    impl MyDataConn {
296        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, failure: Failure) -> Self {
297            logger
298                .lock()
299                .unwrap()
300                .push(format!("MyDataConn::new {}", id));
301            Self {
302                id,
303                failure,
304                committed: false,
305                logger,
306            }
307        }
308    }
309    impl Drop for MyDataConn {
310        fn drop(&mut self) {
311            self.logger
312                .lock()
313                .unwrap()
314                .push(format!("MyDataConn::drop {}", self.id));
315        }
316    }
317    impl DataConn for MyDataConn {
318        fn pre_commit(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
319            if self.failure == Failure::FailToPreCommit {
320                self.logger
321                    .lock()
322                    .unwrap()
323                    .push(format!("MyDataConn::pre_commit {} failed", self.id));
324                Err(errs::Err::new("pre commit error"))
325            } else {
326                self.logger
327                    .lock()
328                    .unwrap()
329                    .push(format!("MyDataConn::pre_commit {}", self.id));
330                Ok(())
331            }
332        }
333        fn commit(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
334            if self.failure == Failure::FailToCommit {
335                self.logger
336                    .lock()
337                    .unwrap()
338                    .push(format!("MyDataConn::commit {} failed", self.id));
339                Err(errs::Err::new("commit error"))
340            } else {
341                self.logger
342                    .lock()
343                    .unwrap()
344                    .push(format!("MyDataConn::commit {}", self.id));
345                self.committed = true;
346                Ok(())
347            }
348        }
349        fn post_commit(&mut self, _ag: &mut AsyncGroup) {
350            self.logger
351                .lock()
352                .unwrap()
353                .push(format!("MyDataConn::post_commit {}", self.id));
354        }
355        fn rollback(&mut self, _ag: &mut AsyncGroup) {
356            self.logger
357                .lock()
358                .unwrap()
359                .push(format!("MyDataConn::rollback {}", self.id));
360        }
361        fn force_back(&mut self, _ag: &mut AsyncGroup) {
362            self.logger
363                .lock()
364                .unwrap()
365                .push(format!("MyDataConn::force_back {}", self.id));
366        }
367        fn close(&mut self) {
368            self.logger
369                .lock()
370                .unwrap()
371                .push(format!("MyDataConn::close {}", self.id));
372        }
373    }
374
375    struct MyDataSrc {
376        id: i8,
377        failure: Failure,
378        logger: Arc<Mutex<Vec<String>>>,
379    }
380    impl MyDataSrc {
381        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, failure: Failure) -> Self {
382            logger
383                .lock()
384                .unwrap()
385                .push(format!("MyDataSrc::new {}", id));
386            Self {
387                id,
388                failure,
389                logger,
390            }
391        }
392    }
393    impl Drop for MyDataSrc {
394        fn drop(&mut self) {
395            self.logger
396                .lock()
397                .unwrap()
398                .push(format!("MyDataSrc::drop {}", self.id));
399        }
400    }
401    impl DataSrc<MyDataConn> for MyDataSrc {
402        fn setup(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
403            if self.failure == Failure::FailToSetup {
404                self.logger
405                    .lock()
406                    .unwrap()
407                    .push(format!("MyDataSrc::setup {} failed", self.id));
408                Err(errs::Err::new("setup error".to_string()))
409            } else {
410                self.logger
411                    .lock()
412                    .unwrap()
413                    .push(format!("MyDataSrc::setup {}", self.id));
414                Ok(())
415            }
416        }
417        fn close(&mut self) {
418            self.logger
419                .lock()
420                .unwrap()
421                .push(format!("MyDataSrc::close {}", self.id));
422        }
423        fn create_data_conn(&mut self) -> errs::Result<Box<MyDataConn>> {
424            if self.failure == Failure::FailToCreateDataConn {
425                self.logger
426                    .lock()
427                    .unwrap()
428                    .push(format!("MyDataSrc::create_data_conn {} failed", self.id));
429                return Err(errs::Err::new("eeee".to_string()));
430            }
431            {
432                self.logger
433                    .lock()
434                    .unwrap()
435                    .push(format!("MyDataSrc::create_data_conn {}", self.id));
436            }
437            let conn = MyDataConn::new(self.id, self.logger.clone(), self.failure);
438            Ok(Box::new(conn))
439        }
440    }
441
442    #[test]
443    fn test_new() {
444        let hub = DataHub::new();
445        assert!(hub.local_data_src_manager.vec_unready.is_empty());
446        assert!(hub.local_data_src_manager.vec_ready.is_empty());
447        assert!(hub.local_data_src_manager.local);
448        assert!(hub.data_src_map.is_empty());
449        assert!(hub.data_conn_manager.vec.is_empty());
450        assert!(hub.data_conn_manager.index_map.is_empty());
451        assert!(!hub.fixed);
452    }
453
454    #[test]
455    fn test_with_commit_order() {
456        let hub = DataHub::with_commit_order(&["bar", "qux", "foo"]);
457        assert!(hub.local_data_src_manager.vec_unready.is_empty());
458        assert!(hub.local_data_src_manager.vec_ready.is_empty());
459        assert!(hub.local_data_src_manager.local);
460        assert!(hub.data_src_map.is_empty());
461        assert_eq!(hub.data_conn_manager.vec.len(), 3);
462        assert_eq!(hub.data_conn_manager.index_map.len(), 3);
463        assert!(!hub.fixed);
464    }
465
466    #[test]
467    fn test_uses_and_ok() {
468        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
469
470        let mut hub = DataHub::new();
471        hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
472        hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
473
474        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
475        assert!(hub.local_data_src_manager.vec_ready.is_empty());
476        assert!(hub.local_data_src_manager.local);
477        assert!(hub.data_src_map.is_empty());
478        assert_eq!(hub.data_conn_manager.vec.len(), 0);
479        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
480        assert!(!hub.fixed);
481
482        assert!(hub.begin().is_ok());
483
484        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
485        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
486        assert!(hub.local_data_src_manager.local);
487        assert_eq!(hub.data_src_map.len(), 2);
488        assert_eq!(hub.data_conn_manager.vec.len(), 0);
489        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
490        assert!(hub.fixed);
491    }
492
493    #[test]
494    fn test_uses_but_already_fixed() {
495        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
496
497        let mut hub = DataHub::new();
498        hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
499
500        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 1);
501        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 0);
502        assert!(hub.local_data_src_manager.local);
503        assert_eq!(hub.data_src_map.len(), 0);
504        assert_eq!(hub.data_conn_manager.vec.len(), 0);
505        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
506        assert!(!hub.fixed);
507
508        assert!(hub.begin().is_ok());
509
510        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
511        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 1);
512        assert!(hub.local_data_src_manager.local);
513        assert_eq!(hub.data_src_map.len(), 1);
514        assert_eq!(hub.data_conn_manager.vec.len(), 0);
515        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
516        assert!(hub.fixed);
517
518        hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
519
520        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
521        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 1);
522        assert!(hub.local_data_src_manager.local);
523        assert_eq!(hub.data_src_map.len(), 1);
524        assert_eq!(hub.data_conn_manager.vec.len(), 0);
525        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
526        assert!(hub.fixed);
527    }
528
529    #[test]
530    fn test_disuses_and_ok() {
531        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
532
533        let mut hub = DataHub::new();
534        hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
535        hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
536
537        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
538        assert!(hub.local_data_src_manager.vec_ready.is_empty());
539        assert!(hub.local_data_src_manager.local);
540        assert!(hub.data_src_map.is_empty());
541        assert_eq!(hub.data_conn_manager.vec.len(), 0);
542        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
543        assert!(!hub.fixed);
544
545        hub.disuses("foo");
546
547        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 1);
548        assert!(hub.local_data_src_manager.vec_ready.is_empty());
549        assert!(hub.local_data_src_manager.local);
550        assert!(hub.data_src_map.is_empty());
551        assert_eq!(hub.data_conn_manager.vec.len(), 0);
552        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
553        assert!(!hub.fixed);
554
555        hub.disuses("bar");
556
557        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
558        assert!(hub.local_data_src_manager.vec_ready.is_empty());
559        assert!(hub.local_data_src_manager.local);
560        assert!(hub.data_src_map.is_empty());
561        assert_eq!(hub.data_conn_manager.vec.len(), 0);
562        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
563        assert!(!hub.fixed);
564    }
565
566    #[test]
567    fn test_disuses_and_fix() {
568        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
569
570        let mut hub = DataHub::new();
571        hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
572        hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
573
574        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
575        assert!(hub.local_data_src_manager.vec_ready.is_empty());
576        assert!(hub.local_data_src_manager.local);
577        assert!(hub.data_src_map.is_empty());
578        assert_eq!(hub.data_conn_manager.vec.len(), 0);
579        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
580        assert!(!hub.fixed);
581
582        hub.disuses("foo");
583
584        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 1);
585        assert!(hub.local_data_src_manager.vec_ready.is_empty());
586        assert!(hub.local_data_src_manager.local);
587        assert!(hub.data_src_map.is_empty());
588        assert_eq!(hub.data_conn_manager.vec.len(), 0);
589        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
590        assert!(!hub.fixed);
591
592        hub.disuses("bar");
593
594        assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
595        assert!(hub.local_data_src_manager.vec_ready.is_empty());
596        assert!(hub.local_data_src_manager.local);
597        assert!(hub.data_src_map.is_empty());
598        assert_eq!(hub.data_conn_manager.vec.len(), 0);
599        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
600        assert!(!hub.fixed);
601
602        hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
603        hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
604
605        assert!(hub.begin().is_ok());
606
607        assert!(hub.local_data_src_manager.vec_unready.is_empty());
608        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
609        assert!(hub.local_data_src_manager.local);
610        assert_eq!(hub.data_src_map.len(), 2);
611        assert_eq!(hub.data_conn_manager.vec.len(), 0);
612        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
613        assert!(hub.fixed);
614
615        hub.uses("baz", MyDataSrc::new(3, logger.clone(), Failure::None));
616
617        assert!(hub.local_data_src_manager.vec_unready.is_empty());
618        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
619        assert!(hub.local_data_src_manager.local);
620        assert_eq!(hub.data_src_map.len(), 2);
621        assert_eq!(hub.data_conn_manager.vec.len(), 0);
622        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
623        assert!(hub.fixed);
624
625        hub.disuses("bar");
626
627        assert!(hub.local_data_src_manager.vec_unready.is_empty());
628        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
629        assert!(hub.local_data_src_manager.local);
630        assert_eq!(hub.data_src_map.len(), 2);
631        assert_eq!(hub.data_conn_manager.vec.len(), 0);
632        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
633        assert!(hub.fixed);
634
635        hub.end();
636
637        assert!(hub.local_data_src_manager.vec_unready.is_empty());
638        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
639        assert!(hub.local_data_src_manager.local);
640        assert_eq!(hub.data_src_map.len(), 2);
641        assert_eq!(hub.data_conn_manager.vec.len(), 0);
642        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
643        assert!(!hub.fixed);
644
645        hub.disuses("bar");
646
647        assert!(hub.local_data_src_manager.vec_unready.is_empty());
648        assert_eq!(hub.local_data_src_manager.vec_ready.len(), 1);
649        assert!(hub.local_data_src_manager.local);
650        assert_eq!(hub.data_src_map.len(), 1);
651        assert_eq!(hub.data_conn_manager.vec.len(), 0);
652        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
653        assert!(!hub.fixed);
654
655        hub.disuses("foo");
656
657        assert!(hub.local_data_src_manager.vec_unready.is_empty());
658        assert!(hub.local_data_src_manager.vec_ready.is_empty());
659        assert!(hub.local_data_src_manager.local);
660        assert_eq!(hub.data_src_map.len(), 0);
661        assert_eq!(hub.data_conn_manager.vec.len(), 0);
662        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
663        assert!(!hub.fixed);
664    }
665
666    #[test]
667    fn test_begin_if_empty() {
668        let mut hub = DataHub::new();
669        assert!(hub.begin().is_ok());
670
671        assert!(hub.local_data_src_manager.vec_unready.is_empty());
672        assert!(hub.local_data_src_manager.vec_ready.is_empty());
673        assert!(hub.local_data_src_manager.local);
674        assert_eq!(hub.data_src_map.len(), 0);
675        assert_eq!(hub.data_conn_manager.vec.len(), 0);
676        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
677        assert!(hub.fixed);
678
679        hub.end();
680
681        assert!(hub.local_data_src_manager.vec_unready.is_empty());
682        assert!(hub.local_data_src_manager.vec_ready.is_empty());
683        assert!(hub.local_data_src_manager.local);
684        assert_eq!(hub.data_src_map.len(), 0);
685        assert_eq!(hub.data_conn_manager.vec.len(), 0);
686        assert_eq!(hub.data_conn_manager.index_map.len(), 0);
687        assert!(!hub.fixed);
688    }
689
690    #[test]
691    fn test_begin_and_ok() {
692        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
693
694        {
695            let mut hub = DataHub::new();
696
697            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
698            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
699
700            assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
701            assert_eq!(hub.local_data_src_manager.vec_ready.len(), 0);
702            assert_eq!(hub.local_data_src_manager.local, true);
703            assert_eq!(hub.data_src_map.len(), 0);
704            assert_eq!(hub.data_conn_manager.vec.len(), 0);
705            assert_eq!(hub.data_conn_manager.index_map.len(), 0);
706            assert_eq!(hub.fixed, false);
707
708            assert_eq!(hub.begin().is_ok(), true);
709
710            assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
711            assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
712            assert_eq!(hub.local_data_src_manager.local, true);
713            assert_eq!(hub.data_src_map.len(), 2);
714            assert_eq!(hub.data_conn_manager.vec.len(), 0);
715            assert_eq!(hub.data_conn_manager.index_map.len(), 0);
716            assert_eq!(hub.fixed, true);
717
718            hub.end();
719
720            assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
721            assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
722            assert_eq!(hub.local_data_src_manager.local, true);
723            assert_eq!(hub.data_src_map.len(), 2);
724            assert_eq!(hub.data_conn_manager.vec.len(), 0);
725            assert_eq!(hub.data_conn_manager.index_map.len(), 0);
726            assert_eq!(hub.fixed, false);
727        }
728
729        assert_eq!(
730            *logger.lock().unwrap(),
731            &[
732                "MyDataSrc::new 1",
733                "MyDataSrc::new 2",
734                "MyDataSrc::setup 1",
735                "MyDataSrc::setup 2",
736                "MyDataSrc::close 2",
737                "MyDataSrc::drop 2",
738                "MyDataSrc::close 1",
739                "MyDataSrc::drop 1",
740            ]
741        );
742    }
743
744    #[test]
745    fn test_begin_but_failed() {
746        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
747
748        {
749            let mut hub = DataHub::new();
750
751            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
752            hub.uses(
753                "bar",
754                MyDataSrc::new(2, logger.clone(), Failure::FailToSetup),
755            );
756            hub.uses("baz", MyDataSrc::new(3, logger.clone(), Failure::None));
757
758            assert_eq!(hub.local_data_src_manager.vec_unready.len(), 3);
759            assert_eq!(hub.local_data_src_manager.vec_ready.len(), 0);
760            assert_eq!(hub.local_data_src_manager.local, true);
761            assert_eq!(hub.data_src_map.len(), 0);
762            assert_eq!(hub.data_conn_manager.vec.len(), 0);
763            assert_eq!(hub.data_conn_manager.index_map.len(), 0);
764            assert_eq!(hub.fixed, false);
765
766            if let Err(err) = hub.begin() {
767                match err.reason::<DataHubError>() {
768                    Ok(DataHubError::FailToSetupLocalDataSrcs { errors }) => {
769                        assert_eq!(errors.len(), 1);
770                        assert_eq!(errors[0].0, "bar".into());
771                        assert_eq!(errors[0].1.reason::<String>().unwrap(), "setup error");
772                    }
773                    _ => panic!(),
774                }
775            } else {
776                panic!();
777            }
778
779            hub.end();
780        }
781
782        assert_eq!(
783            *logger.lock().unwrap(),
784            &[
785                "MyDataSrc::new 1",
786                "MyDataSrc::new 2",
787                "MyDataSrc::new 3",
788                "MyDataSrc::setup 1",
789                "MyDataSrc::setup 2 failed",
790                "MyDataSrc::close 2",
791                "MyDataSrc::close 1",
792                "MyDataSrc::drop 3",
793                "MyDataSrc::drop 2",
794                "MyDataSrc::drop 1",
795            ]
796        );
797    }
798
799    #[test]
800    fn test_run_and_ok() {
801        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
802        {
803            let mut hub = DataHub::new();
804
805            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
806            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
807
808            let logger_clone = logger.clone();
809            assert!(hub
810                .run(move |_data| {
811                    logger_clone
812                        .lock()
813                        .unwrap()
814                        .push("execute logic".to_string());
815                    Ok(())
816                })
817                .is_ok());
818        }
819
820        assert_eq!(
821            *logger.lock().unwrap(),
822            &[
823                "MyDataSrc::new 1",
824                "MyDataSrc::new 2",
825                "MyDataSrc::setup 1",
826                "MyDataSrc::setup 2",
827                "execute logic",
828                "MyDataSrc::close 2",
829                "MyDataSrc::drop 2",
830                "MyDataSrc::close 1",
831                "MyDataSrc::drop 1",
832            ]
833        );
834    }
835
836    #[test]
837    fn test_run_but_failed() {
838        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
839        {
840            let mut hub = DataHub::new();
841
842            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
843            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
844
845            let logger_clone = logger.clone();
846            if let Err(err) = hub.run(move |_data| {
847                logger_clone
848                    .lock()
849                    .unwrap()
850                    .push("execute logic but fail".to_string());
851                Err(errs::Err::new("logic error".to_string()))
852            }) {
853                match err.reason::<String>() {
854                    Ok(s) => assert_eq!(s, "logic error"),
855                    _ => panic!(),
856                }
857            } else {
858                panic!();
859            }
860        }
861
862        assert_eq!(
863            *logger.lock().unwrap(),
864            &[
865                "MyDataSrc::new 1",
866                "MyDataSrc::new 2",
867                "MyDataSrc::setup 1",
868                "MyDataSrc::setup 2",
869                "execute logic but fail",
870                "MyDataSrc::close 2",
871                "MyDataSrc::drop 2",
872                "MyDataSrc::close 1",
873                "MyDataSrc::drop 1",
874            ]
875        );
876    }
877
878    #[test]
879    fn test_txn_and_no_data_access_and_ok() {
880        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
881        {
882            let mut hub = DataHub::new();
883
884            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
885            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
886
887            let logger_clone = logger.clone();
888            assert!(hub
889                .txn(move |_data| {
890                    logger_clone
891                        .lock()
892                        .unwrap()
893                        .push("execute logic".to_string());
894                    Ok(())
895                })
896                .is_ok());
897        }
898
899        assert_eq!(
900            *logger.lock().unwrap(),
901            &[
902                "MyDataSrc::new 1",
903                "MyDataSrc::new 2",
904                "MyDataSrc::setup 1",
905                "MyDataSrc::setup 2",
906                "execute logic",
907                "MyDataSrc::close 2",
908                "MyDataSrc::drop 2",
909                "MyDataSrc::close 1",
910                "MyDataSrc::drop 1",
911            ]
912        );
913    }
914
915    #[test]
916    fn test_txn_and_has_data_access_and_ok() {
917        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
918        {
919            let mut hub = DataHub::new();
920
921            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
922            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
923
924            let logger_clone = logger.clone();
925            hub.txn(move |data| {
926                logger_clone
927                    .lock()
928                    .unwrap()
929                    .push("execute logic".to_string());
930                let _conn1 = data.get_data_conn::<MyDataConn>("foo")?;
931                let _conn2 = data.get_data_conn::<MyDataConn>("bar")?;
932                Ok(())
933            })
934            .unwrap();
935        }
936
937        assert_eq!(
938            *logger.lock().unwrap(),
939            &[
940                "MyDataSrc::new 1",
941                "MyDataSrc::new 2",
942                "MyDataSrc::setup 1",
943                "MyDataSrc::setup 2",
944                "execute logic",
945                "MyDataSrc::create_data_conn 1",
946                "MyDataConn::new 1",
947                "MyDataSrc::create_data_conn 2",
948                "MyDataConn::new 2",
949                "MyDataConn::pre_commit 1",
950                "MyDataConn::pre_commit 2",
951                "MyDataConn::commit 1",
952                "MyDataConn::commit 2",
953                "MyDataConn::post_commit 1",
954                "MyDataConn::post_commit 2",
955                "MyDataConn::close 1",
956                "MyDataConn::drop 1",
957                "MyDataConn::close 2",
958                "MyDataConn::drop 2",
959                "MyDataSrc::close 2",
960                "MyDataSrc::drop 2",
961                "MyDataSrc::close 1",
962                "MyDataSrc::drop 1",
963            ]
964        );
965    }
966
967    #[test]
968    fn test_txn_but_failed() {
969        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
970        {
971            let mut hub = DataHub::new();
972
973            hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
974            hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
975
976            let logger_clone = logger.clone();
977            if let Err(e) = hub.txn(move |data| {
978                logger_clone
979                    .lock()
980                    .unwrap()
981                    .push("execute logic".to_string());
982                let _conn1 = data.get_data_conn::<MyDataConn>("foo")?;
983                let _conn2 = data.get_data_conn::<MyDataConn>("bar")?;
984                Err(errs::Err::new("logic error"))
985            }) {
986                match e.reason::<&str>() {
987                    Ok(s) => assert_eq!(s, &"logic error"),
988                    _ => panic!(),
989                }
990            }
991        }
992
993        assert_eq!(
994            *logger.lock().unwrap(),
995            &[
996                "MyDataSrc::new 1",
997                "MyDataSrc::new 2",
998                "MyDataSrc::setup 1",
999                "MyDataSrc::setup 2",
1000                "execute logic",
1001                "MyDataSrc::create_data_conn 1",
1002                "MyDataConn::new 1",
1003                "MyDataSrc::create_data_conn 2",
1004                "MyDataConn::new 2",
1005                "MyDataConn::rollback 1",
1006                "MyDataConn::rollback 2",
1007                "MyDataConn::close 1",
1008                "MyDataConn::drop 1",
1009                "MyDataConn::close 2",
1010                "MyDataConn::drop 2",
1011                "MyDataSrc::close 2",
1012                "MyDataSrc::drop 2",
1013                "MyDataSrc::close 1",
1014                "MyDataSrc::drop 1",
1015            ]
1016        );
1017    }
1018
1019    #[test]
1020    fn test_txn_with_commit_order() {
1021        let _logger = Arc::new(Mutex::new(Vec::<String>::new()));
1022        {
1023            let mut _hub = DataHub::new();
1024        }
1025    }
1026
1027    #[test]
1028    fn test_get_data_conn_and_ok() {
1029        let _logger = Arc::new(Mutex::new(Vec::<String>::new()));
1030        {
1031            let mut _hub = DataHub::new();
1032        }
1033    }
1034
1035    #[test]
1036    fn test_get_data_conn_and_failed() {
1037        let _logger = Arc::new(Mutex::new(Vec::<String>::new()));
1038        {
1039            let mut _hub = DataHub::new();
1040        }
1041    }
1042}