Skip to main content

sabi/data_src/
mod.rs

1// Copyright (C) 2024-2026 Takayuki Sato. All Rights Reserved.
2// This program is free software under MIT License.
3// See the file LICENSE in this distribution for more details.
4
5mod global_setup;
6
7pub(crate) use global_setup::{
8    copy_global_data_srcs_to_map, create_data_conn_from_global_data_src,
9};
10pub use global_setup::{create_static_data_src_container, setup, setup_with_order, uses};
11
12use crate::{
13    AsyncGroup, DataConn, DataConnContainer, DataSrc, DataSrcContainer, DataSrcManager,
14    SendSyncNonNull,
15};
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::{any, mem, ptr};
20
21/// An enum type representing the reasons for errors that can occur within [`DataSrc`] operations.
22#[derive(Debug)]
23pub enum DataSrcError {
24    /// Indicates a failure to register a global data source.
25    /// This can happen if the global data source manager is in an invalid state.
26    FailToRegisterGlobalDataSrc {
27        /// The name of the data source that failed to register.
28        name: Arc<str>,
29    },
30
31    /// Indicates a failure during the setup process of one or more global data sources.
32    /// Contains a vector of data source names and their corresponding errors.
33    FailToSetupGlobalDataSrcs {
34        /// The vector contains errors that occurred in each [`DataSrc`] object.
35        errors: Vec<(Arc<str>, errs::Err)>,
36    },
37
38    /// Indicates that a setup process for global data sources is currently ongoing.
39    DuringSetupGlobalDataSrcs,
40
41    /// Indicates that global data sources have already been set up.
42    AlreadySetupGlobalDataSrcs,
43
44    /// Indicates a failure to cast a retrieved [`DataConn`] to the expected type.
45    FailToCastDataConn {
46        /// The name of the data connection that failed to cast.
47        name: Arc<str>,
48        /// The type name to which the [`DataConn`] attempted to cast.
49        target_type: &'static str,
50    },
51
52    /// Indicates a failure to create a [`DataConn`] object from its [`DataSrc`].
53    FailToCreateDataConn {
54        /// The name of the data source that failed to be created.
55        name: Arc<str>,
56        /// The type name of the [`DataConn`] that failed to be created.
57        data_conn_type: &'static str,
58    },
59
60    /// Indicates that no [`DataSrc`] was found to create a [`DataConn`] for the specified name
61    /// and type.
62    NotFoundDataSrcToCreateDataConn {
63        /// The name of the data source that could not be found.
64        name: Arc<str>,
65        /// The type name of the [`DataConn`] that was requested.
66        data_conn_type: &'static str,
67    },
68}
69
70impl<S, C> DataSrcContainer<S, C>
71where
72    S: DataSrc<C>,
73    C: DataConn + 'static,
74{
75    pub(crate) fn new(name: impl Into<Arc<str>>, data_src: S, local: bool) -> Self {
76        Self {
77            drop_fn: drop_data_src::<S, C>,
78            setup_fn: setup_data_src::<S, C>,
79            close_fn: close_data_src::<S, C>,
80            create_data_conn_fn: create_data_conn::<S, C>,
81            is_data_conn_fn: is_data_conn::<C>,
82
83            local,
84            name: name.into(),
85            data_src,
86        }
87    }
88}
89
90fn drop_data_src<S, C>(ptr: *const DataSrcContainer)
91where
92    S: DataSrc<C>,
93    C: DataConn + 'static,
94{
95    let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
96    drop(unsafe { Box::from_raw(typed_ptr) });
97}
98
99fn setup_data_src<S, C>(ptr: *const DataSrcContainer, ag: &mut AsyncGroup) -> errs::Result<()>
100where
101    S: DataSrc<C>,
102    C: DataConn + 'static,
103{
104    let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
105    unsafe { (*typed_ptr).data_src.setup(ag) }
106}
107
108fn close_data_src<S, C>(ptr: *const DataSrcContainer)
109where
110    S: DataSrc<C>,
111    C: DataConn + 'static,
112{
113    let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
114    unsafe { (*typed_ptr).data_src.close() };
115}
116
117fn create_data_conn<S, C>(ptr: *const DataSrcContainer) -> errs::Result<Box<DataConnContainer<C>>>
118where
119    S: DataSrc<C>,
120    C: DataConn + 'static,
121{
122    let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
123    let conn: Box<C> = unsafe { (*typed_ptr).data_src.create_data_conn() }?;
124    let name = unsafe { &(*typed_ptr).name };
125    Ok(Box::new(DataConnContainer::<C>::new(
126        name.to_string(),
127        conn,
128    )))
129}
130
131fn is_data_conn<C>(type_id: any::TypeId) -> bool
132where
133    C: DataConn + 'static,
134{
135    any::TypeId::of::<C>() == type_id
136}
137
138impl DataSrcManager {
139    pub(crate) const fn new(local: bool) -> Self {
140        Self {
141            vec_unready: Vec::new(),
142            vec_ready: Vec::new(),
143            local,
144        }
145    }
146
147    pub(crate) fn prepend(&mut self, vec: Vec<SendSyncNonNull<DataSrcContainer>>) {
148        self.vec_unready.splice(0..0, vec);
149    }
150
151    pub(crate) fn add<S, C>(&mut self, name: impl Into<Arc<str>>, ds: S)
152    where
153        S: DataSrc<C>,
154        C: DataConn + 'static,
155    {
156        let boxed = Box::new(DataSrcContainer::<S, C>::new(name, ds, self.local));
157        let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
158        self.vec_unready.push(SendSyncNonNull::new(ptr));
159    }
160
161    pub(crate) fn remove(&mut self, name: impl AsRef<str>) {
162        let extracted_vec: Vec<_> = self
163            .vec_ready
164            .extract_if(.., |ssnnptr| {
165                unsafe { &(*ssnnptr.non_null_ptr.as_ptr()).name }.as_ref() == name.as_ref()
166            })
167            .collect();
168
169        for ssnnptr in extracted_vec.iter().rev() {
170            let ptr = ssnnptr.non_null_ptr.as_ptr();
171            let close_fn = unsafe { (*ptr).close_fn };
172            let drop_fn = unsafe { (*ptr).drop_fn };
173            close_fn(ptr);
174            drop_fn(ptr);
175        }
176
177        let extracted_vec: Vec<_> = self
178            .vec_unready
179            .extract_if(.., |ssnnptr| {
180                unsafe { &(*ssnnptr.non_null_ptr.as_ptr()).name }.as_ref() == name.as_ref()
181            })
182            .collect();
183
184        for ssnnptr in extracted_vec.iter().rev() {
185            let ptr = ssnnptr.non_null_ptr.as_ptr();
186            let drop_fn = unsafe { (*ptr).drop_fn };
187            drop_fn(ptr);
188        }
189    }
190
191    pub(crate) fn close(&mut self) {
192        let vec = mem::take(&mut self.vec_ready);
193        for ssnnptr in vec.into_iter().rev() {
194            let ptr = ssnnptr.non_null_ptr.as_ptr();
195            let close_fn = unsafe { (*ptr).close_fn };
196            let drop_fn = unsafe { (*ptr).drop_fn };
197            close_fn(ptr);
198            drop_fn(ptr);
199        }
200        let vec = mem::take(&mut self.vec_unready);
201        for ssnnptr in vec.into_iter().rev() {
202            let ptr = ssnnptr.non_null_ptr.as_ptr();
203            let drop_fn = unsafe { (*ptr).drop_fn };
204            drop_fn(ptr);
205        }
206    }
207
208    pub(crate) fn setup(&mut self, errors: &mut Vec<(Arc<str>, errs::Err)>) {
209        if self.vec_unready.is_empty() {
210            return;
211        }
212
213        let mut n_done = 0;
214        let mut ag = AsyncGroup::new();
215        for ssnnptr in self.vec_unready.iter() {
216            n_done += 1;
217            let ptr = ssnnptr.non_null_ptr.as_ptr();
218            let setup_fn = unsafe { (*ptr).setup_fn };
219            ag._name = unsafe { (*ptr).name.clone() };
220            if let Err(err) = setup_fn(ptr, &mut ag) {
221                errors.push((ag._name.clone(), err));
222                break;
223            }
224        }
225        ag.join_and_collect_errors(errors);
226
227        if errors.is_empty() {
228            self.vec_ready.append(&mut self.vec_unready);
229        } else {
230            for ssnnptr in self.vec_unready[0..n_done].iter().rev() {
231                let ptr = ssnnptr.non_null_ptr.as_ptr();
232                let close_fn = unsafe { (*ptr).close_fn };
233                close_fn(ptr);
234            }
235        }
236    }
237
238    pub(crate) fn setup_with_order(
239        &mut self,
240        names: &[&str],
241        errors: &mut Vec<(Arc<str>, errs::Err)>,
242    ) {
243        if self.vec_unready.is_empty() {
244            return;
245        }
246
247        let mut index_map: HashMap<&str, usize> = HashMap::with_capacity(names.len());
248        // To overwrite later indexed elements with eariler ones when names overlap
249        for (i, nm) in names.iter().rev().enumerate() {
250            index_map.insert(*nm, names.len() - 1 - i);
251        }
252
253        let vec_unready = mem::take(&mut self.vec_unready);
254
255        let mut ordered_vec: Vec<Option<SendSyncNonNull<DataSrcContainer>>> =
256            vec![None; index_map.len()];
257        for ssnnptr in vec_unready.into_iter() {
258            let ptr = ssnnptr.non_null_ptr.as_ptr();
259            let name = unsafe { (*ptr).name.clone() };
260            if let Some(index) = index_map.remove(name.as_ref()) {
261                ordered_vec[index] = Some(ssnnptr);
262            } else {
263                ordered_vec.push(Some(ssnnptr));
264            }
265        }
266
267        let mut n_done = 0;
268        let mut ag = AsyncGroup::new();
269        for ssnnptr_opt in ordered_vec.iter() {
270            n_done += 1;
271            if let Some(ssnnptr) = ssnnptr_opt {
272                let ptr = ssnnptr.non_null_ptr.as_ptr();
273                let setup_fn = unsafe { (*ptr).setup_fn };
274                ag._name = unsafe { (*ptr).name.clone() };
275                if let Err(err) = setup_fn(ptr, &mut ag) {
276                    errors.push((ag._name.clone(), err));
277                    break;
278                }
279            }
280        }
281        ag.join_and_collect_errors(errors);
282
283        if errors.is_empty() {
284            for ssnnptr in ordered_vec.into_iter().flatten() {
285                self.vec_ready.push(ssnnptr);
286            }
287        } else {
288            for ssnnptr in ordered_vec[0..n_done].iter().flatten().rev() {
289                let ptr = ssnnptr.non_null_ptr.as_ptr();
290                let close_fn = unsafe { (*ptr).close_fn };
291                close_fn(ptr);
292            }
293            for ssnnptr in ordered_vec.into_iter().flatten() {
294                self.vec_unready.push(ssnnptr);
295            }
296        }
297    }
298
299    pub(crate) fn copy_ds_ready_to_map(&self, index_map: &mut HashMap<Arc<str>, (bool, usize)>) {
300        for (i, ssnnptr) in self.vec_ready.iter().enumerate() {
301            let ptr = ssnnptr.non_null_ptr.as_ptr();
302            let name = unsafe { (*ptr).name.clone() };
303            index_map.insert(name, (self.local, i));
304        }
305    }
306
307    pub(crate) fn create_data_conn<C>(
308        &self,
309        index: usize,
310        name: impl AsRef<str>,
311    ) -> errs::Result<Box<DataConnContainer>>
312    where
313        C: DataConn + 'static,
314    {
315        if let Some(ssnnptr) = self.vec_ready.get(index) {
316            let ptr = ssnnptr.non_null_ptr.as_ptr();
317            let type_id = any::TypeId::of::<C>();
318            let is_fn = unsafe { (*ptr).is_data_conn_fn };
319            let create_data_conn_fn = unsafe { (*ptr).create_data_conn_fn };
320            if !is_fn(type_id) {
321                Err(errs::Err::new(DataSrcError::FailToCastDataConn {
322                    name: name.as_ref().into(),
323                    target_type: any::type_name::<C>(),
324                }))
325            } else {
326                match create_data_conn_fn(ptr) {
327                    Ok(boxed) => Ok(boxed),
328                    Err(err) => Err(errs::Err::with_source(
329                        DataSrcError::FailToCreateDataConn {
330                            name: name.as_ref().into(),
331                            data_conn_type: any::type_name::<C>(),
332                        },
333                        err,
334                    )),
335                }
336            }
337        } else {
338            Err(errs::Err::new(
339                DataSrcError::NotFoundDataSrcToCreateDataConn {
340                    name: name.as_ref().into(),
341                    data_conn_type: any::type_name::<C>(),
342                },
343            ))
344        }
345    }
346}
347
348impl Drop for DataSrcManager {
349    fn drop(&mut self) {
350        self.close();
351    }
352}
353
354#[cfg(test)]
355mod tests_of_data_src {
356    use super::*;
357    use std::sync::{Arc, Mutex};
358
359    struct SyncDataConn {}
360    impl SyncDataConn {
361        fn new() -> Self {
362            Self {}
363        }
364    }
365    impl DataConn for SyncDataConn {
366        fn commit(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
367            Ok(())
368        }
369        fn rollback(&mut self, _ag: &mut AsyncGroup) {}
370        fn close(&mut self) {}
371    }
372
373    struct AsyncDataConn {}
374    impl AsyncDataConn {
375        fn new() -> Self {
376            Self {}
377        }
378    }
379    impl DataConn for AsyncDataConn {
380        fn commit(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
381            Ok(())
382        }
383        fn rollback(&mut self, _ag: &mut AsyncGroup) {}
384        fn close(&mut self) {}
385    }
386
387    struct SyncDataSrc {
388        id: i8,
389        logger: Arc<Mutex<Vec<String>>>,
390        fail_to_setup: bool,
391        fail_to_create_data_conn: bool,
392    }
393    impl SyncDataSrc {
394        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail_to_setup: bool) -> Self {
395            logger
396                .lock()
397                .unwrap()
398                .push(format!("SyncDataSrc::new {}", id));
399            Self {
400                id,
401                logger: logger,
402                fail_to_setup,
403                fail_to_create_data_conn: false,
404            }
405        }
406        fn new_for_fail_to_create_data_conn(id: i8, logger: Arc<Mutex<Vec<String>>>) -> Self {
407            Self {
408                id,
409                logger: logger,
410                fail_to_setup: false,
411                fail_to_create_data_conn: true,
412            }
413        }
414    }
415    impl Drop for SyncDataSrc {
416        fn drop(&mut self) {
417            self.logger
418                .lock()
419                .unwrap()
420                .push(format!("SyncDataSrc::drop {}", self.id));
421        }
422    }
423    impl DataSrc<SyncDataConn> for SyncDataSrc {
424        fn setup(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
425            if self.fail_to_setup {
426                self.logger
427                    .lock()
428                    .unwrap()
429                    .push(format!("SyncDataSrc::setup {} failed", self.id));
430                return Err(errs::Err::new("XXX".to_string()));
431            }
432            self.logger
433                .lock()
434                .unwrap()
435                .push(format!("SyncDataSrc::setup {}", self.id));
436            Ok(())
437        }
438        fn close(&mut self) {
439            self.logger
440                .lock()
441                .unwrap()
442                .push(format!("SyncDataSrc::close {}", self.id));
443        }
444        fn create_data_conn(&mut self) -> errs::Result<Box<SyncDataConn>> {
445            {
446                self.logger
447                    .lock()
448                    .unwrap()
449                    .push(format!("SyncDataSrc::create_data_conn {}", self.id));
450            }
451            if self.fail_to_create_data_conn {
452                return Err(errs::Err::new("eeee".to_string()));
453            }
454            let conn = SyncDataConn::new();
455            Ok(Box::new(conn))
456        }
457    }
458
459    struct AsyncDataSrc {
460        id: i8,
461        fail: bool,
462        logger: Arc<Mutex<Vec<String>>>,
463        wait: u64,
464    }
465    impl AsyncDataSrc {
466        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: bool, wait: u64) -> Self {
467            logger
468                .lock()
469                .unwrap()
470                .push(format!("AsyncDataSrc::new {}", id));
471            Self {
472                id,
473                fail,
474                logger,
475                wait,
476            }
477        }
478    }
479    impl Drop for AsyncDataSrc {
480        fn drop(&mut self) {
481            self.logger
482                .lock()
483                .unwrap()
484                .push(format!("AsyncDataSrc::drop {}", self.id));
485        }
486    }
487    impl DataSrc<AsyncDataConn> for AsyncDataSrc {
488        fn setup(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
489            let logger = self.logger.clone();
490            let fail = self.fail;
491            let id = self.id;
492            let wait = self.wait;
493            ag.add(move || {
494                std::thread::sleep(std::time::Duration::from_millis(wait));
495                let mut logger = logger.lock().unwrap();
496                if fail {
497                    logger.push(format!("AsyncDataSrc::setup {} failed to setup", id));
498                    return Err(errs::Err::new("XXX".to_string()));
499                }
500                logger.push(format!("AsyncDataSrc::setup {}", id));
501                Ok(())
502            });
503            Ok(())
504        }
505        fn close(&mut self) {
506            self.logger
507                .lock()
508                .unwrap()
509                .push(format!("AsyncDataSrc::close {}", self.id));
510        }
511        fn create_data_conn(&mut self) -> errs::Result<Box<AsyncDataConn>> {
512            {
513                self.logger
514                    .lock()
515                    .unwrap()
516                    .push(format!("AsyncDataSrc::create_data_conn {}", self.id));
517            }
518            let conn = AsyncDataConn::new();
519            Ok(Box::new(conn))
520        }
521    }
522
523    #[test]
524    fn test_of_new() {
525        let manager = DataSrcManager::new(true);
526        assert!(manager.local);
527        assert_eq!(manager.vec_unready.len(), 0);
528        assert_eq!(manager.vec_ready.len(), 0);
529
530        let manager = DataSrcManager::new(false);
531        assert!(!manager.local);
532        assert_eq!(manager.vec_unready.len(), 0);
533        assert_eq!(manager.vec_ready.len(), 0);
534    }
535
536    #[test]
537    fn test_of_prepend() {
538        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
539
540        {
541            let mut vec = Vec::<SendSyncNonNull<DataSrcContainer>>::new();
542
543            let ds = SyncDataSrc::new(1, logger.clone(), false);
544            let boxed = Box::new(DataSrcContainer::new("foo", ds, true));
545            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
546            vec.push(SendSyncNonNull::new(ptr));
547
548            let ds = AsyncDataSrc::new(2, logger.clone(), false, 0);
549            let boxed = Box::new(DataSrcContainer::new("bar", ds, true));
550            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
551            vec.push(SendSyncNonNull::new(ptr));
552
553            let mut manager = DataSrcManager::new(true);
554            manager.prepend(vec);
555
556            assert!(manager.local);
557            assert_eq!(manager.vec_unready.len(), 2);
558            assert_eq!(manager.vec_ready.len(), 0);
559
560            assert_eq!(
561                unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
562                "foo".into()
563            );
564            assert_eq!(
565                unsafe { manager.vec_unready[1].non_null_ptr.as_ref().name.clone() },
566                "bar".into()
567            );
568
569            let mut vec = Vec::<SendSyncNonNull<DataSrcContainer>>::new();
570
571            let ds = SyncDataSrc::new(3, logger.clone(), false);
572            let boxed = Box::new(DataSrcContainer::new("baz", ds, true));
573            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
574            vec.push(SendSyncNonNull::new(ptr));
575
576            let ds = AsyncDataSrc::new(4, logger.clone(), false, 0);
577            let boxed = Box::new(DataSrcContainer::new("qux", ds, true));
578            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
579            vec.push(SendSyncNonNull::new(ptr));
580
581            manager.prepend(vec);
582
583            assert!(manager.local);
584            assert_eq!(manager.vec_unready.len(), 4);
585            assert_eq!(manager.vec_ready.len(), 0);
586
587            assert_eq!(
588                unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
589                "baz".into()
590            );
591            assert_eq!(
592                unsafe { manager.vec_unready[1].non_null_ptr.as_ref().name.clone() },
593                "qux".into()
594            );
595            assert_eq!(
596                unsafe { manager.vec_unready[2].non_null_ptr.as_ref().name.clone() },
597                "foo".into()
598            );
599            assert_eq!(
600                unsafe { manager.vec_unready[3].non_null_ptr.as_ref().name.clone() },
601                "bar".into()
602            );
603        }
604
605        assert_eq!(
606            *logger.lock().unwrap(),
607            vec![
608                "SyncDataSrc::new 1",
609                "AsyncDataSrc::new 2",
610                "SyncDataSrc::new 3",
611                "AsyncDataSrc::new 4",
612                "AsyncDataSrc::drop 2",
613                "SyncDataSrc::drop 1",
614                "AsyncDataSrc::drop 4",
615                "SyncDataSrc::drop 3",
616            ],
617        );
618    }
619
620    #[test]
621    fn test_of_add() {
622        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
623
624        {
625            let mut manager = DataSrcManager::new(true);
626
627            let ds = SyncDataSrc::new(1, logger.clone(), false);
628            manager.add("foo", ds);
629
630            assert!(manager.local);
631            assert_eq!(manager.vec_unready.len(), 1);
632            assert_eq!(manager.vec_ready.len(), 0);
633
634            assert_eq!(
635                unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
636                "foo".into()
637            );
638
639            let ds = AsyncDataSrc::new(2, logger.clone(), false, 0);
640            manager.add("bar", ds);
641
642            assert!(manager.local);
643            assert_eq!(manager.vec_unready.len(), 2);
644            assert_eq!(manager.vec_ready.len(), 0);
645
646            assert_eq!(
647                unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
648                "foo".into()
649            );
650            assert_eq!(
651                unsafe { manager.vec_unready[1].non_null_ptr.as_ref().name.clone() },
652                "bar".into()
653            );
654        }
655
656        assert_eq!(
657            *logger.lock().unwrap(),
658            vec![
659                "SyncDataSrc::new 1",
660                "AsyncDataSrc::new 2",
661                "AsyncDataSrc::drop 2",
662                "SyncDataSrc::drop 1",
663            ],
664        );
665    }
666
667    #[test]
668    fn test_of_remove() {
669        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
670
671        {
672            let mut manager = DataSrcManager::new(true);
673
674            let ds1 = SyncDataSrc::new(1, logger.clone(), false);
675            let boxed = Box::new(DataSrcContainer::new("foo", ds1, true));
676            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
677            manager.vec_unready.push(SendSyncNonNull::new(ptr));
678
679            let ds2 = AsyncDataSrc::new(2, logger.clone(), false, 0);
680            let boxed = Box::new(DataSrcContainer::new("bar", ds2, true));
681            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
682            manager.vec_unready.push(SendSyncNonNull::new(ptr));
683
684            let ds3 = SyncDataSrc::new(3, logger.clone(), false);
685            let boxed = Box::new(DataSrcContainer::new("baz", ds3, true));
686            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
687            manager.vec_ready.push(SendSyncNonNull::new(ptr));
688
689            let ds4 = AsyncDataSrc::new(4, logger.clone(), false, 0);
690            let boxed = Box::new(DataSrcContainer::new("qux", ds4, true));
691            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
692            manager.vec_ready.push(SendSyncNonNull::new(ptr));
693
694            assert!(manager.local);
695            assert_eq!(manager.vec_unready.len(), 2);
696            assert_eq!(manager.vec_ready.len(), 2);
697
698            manager.remove("baz");
699            manager.remove("foo");
700            manager.remove("qux");
701            manager.remove("bar");
702        }
703
704        assert_eq!(
705            *logger.lock().unwrap(),
706            vec![
707                "SyncDataSrc::new 1",
708                "AsyncDataSrc::new 2",
709                "SyncDataSrc::new 3",
710                "AsyncDataSrc::new 4",
711                "SyncDataSrc::close 3",
712                "SyncDataSrc::drop 3",
713                "SyncDataSrc::drop 1",
714                "AsyncDataSrc::close 4",
715                "AsyncDataSrc::drop 4",
716                "AsyncDataSrc::drop 2",
717            ],
718        );
719    }
720
721    #[test]
722    fn test_of_close() {
723        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
724
725        {
726            let mut manager = DataSrcManager::new(true);
727
728            let ds1 = SyncDataSrc::new(1, logger.clone(), false);
729            let boxed = Box::new(DataSrcContainer::new("foo", ds1, true));
730            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
731            manager.vec_unready.push(SendSyncNonNull::new(ptr));
732
733            let ds2 = AsyncDataSrc::new(2, logger.clone(), false, 0);
734            let boxed = Box::new(DataSrcContainer::new("bar", ds2, true));
735            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
736            manager.vec_unready.push(SendSyncNonNull::new(ptr));
737
738            let ds3 = SyncDataSrc::new(3, logger.clone(), false);
739            let boxed = Box::new(DataSrcContainer::new("baz", ds3, true));
740            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
741            manager.vec_ready.push(SendSyncNonNull::new(ptr));
742
743            let ds4 = AsyncDataSrc::new(4, logger.clone(), false, 0);
744            let boxed = Box::new(DataSrcContainer::new("qux", ds4, true));
745            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
746            manager.vec_ready.push(SendSyncNonNull::new(ptr));
747
748            assert!(manager.local);
749            assert_eq!(manager.vec_unready.len(), 2);
750            assert_eq!(manager.vec_ready.len(), 2);
751
752            manager.close();
753        }
754
755        assert_eq!(
756            *logger.lock().unwrap(),
757            vec![
758                "SyncDataSrc::new 1",
759                "AsyncDataSrc::new 2",
760                "SyncDataSrc::new 3",
761                "AsyncDataSrc::new 4",
762                "AsyncDataSrc::close 4",
763                "AsyncDataSrc::drop 4",
764                "SyncDataSrc::close 3",
765                "SyncDataSrc::drop 3",
766                "AsyncDataSrc::drop 2",
767                "SyncDataSrc::drop 1",
768            ],
769        );
770    }
771
772    #[test]
773    fn test_of_setup_and_ok() {
774        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
775
776        {
777            let mut manager = DataSrcManager::new(true);
778
779            let ds1 = SyncDataSrc::new(1, logger.clone(), false);
780            manager.add("foo", ds1);
781
782            let ds2 = SyncDataSrc::new(2, logger.clone(), false);
783            manager.add("bar", ds2);
784
785            assert!(manager.local);
786            assert_eq!(manager.vec_unready.len(), 2);
787            assert_eq!(manager.vec_ready.len(), 0);
788
789            let mut vec = Vec::new();
790            manager.setup(&mut vec);
791
792            assert!(manager.local);
793            assert_eq!(manager.vec_unready.len(), 0);
794            assert_eq!(manager.vec_ready.len(), 2);
795        }
796
797        assert_eq!(
798            *logger.lock().unwrap(),
799            vec![
800                "SyncDataSrc::new 1",
801                "SyncDataSrc::new 2",
802                "SyncDataSrc::setup 1",
803                "SyncDataSrc::setup 2",
804                "SyncDataSrc::close 2",
805                "SyncDataSrc::drop 2",
806                "SyncDataSrc::close 1",
807                "SyncDataSrc::drop 1",
808            ],
809        );
810    }
811
812    #[test]
813    fn test_of_setup_but_error() {
814        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
815
816        {
817            let mut manager = DataSrcManager::new(true);
818
819            let ds1 = SyncDataSrc::new(1, logger.clone(), false);
820            manager.add("foo", ds1);
821
822            let ds2 = SyncDataSrc::new(2, logger.clone(), true);
823            manager.add("bar", ds2);
824
825            let ds3 = SyncDataSrc::new(3, logger.clone(), true);
826            manager.add("bar", ds3);
827
828            assert!(manager.local);
829            assert_eq!(manager.vec_unready.len(), 3);
830            assert_eq!(manager.vec_ready.len(), 0);
831
832            let mut vec = Vec::new();
833            manager.setup(&mut vec);
834
835            assert!(manager.local);
836            assert_eq!(manager.vec_unready.len(), 3);
837            assert_eq!(manager.vec_ready.len(), 0);
838        }
839
840        assert_eq!(
841            *logger.lock().unwrap(),
842            vec![
843                "SyncDataSrc::new 1",
844                "SyncDataSrc::new 2",
845                "SyncDataSrc::new 3",
846                "SyncDataSrc::setup 1",
847                "SyncDataSrc::setup 2 failed",
848                "SyncDataSrc::close 2",
849                "SyncDataSrc::close 1",
850                "SyncDataSrc::drop 3",
851                "SyncDataSrc::drop 2",
852                "SyncDataSrc::drop 1",
853            ],
854        );
855    }
856
857    #[test]
858    fn test_of_setup_with_order_and_ok() {
859        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
860
861        {
862            let mut manager = DataSrcManager::new(true);
863
864            let ds1 = SyncDataSrc::new(1, logger.clone(), false);
865            manager.add("foo", ds1);
866
867            let ds2 = SyncDataSrc::new(2, logger.clone(), false);
868            manager.add("bar", ds2);
869
870            let ds3 = SyncDataSrc::new(3, logger.clone(), false);
871            manager.add("baz", ds3);
872
873            assert!(manager.local);
874            assert_eq!(manager.vec_unready.len(), 3);
875            assert_eq!(manager.vec_ready.len(), 0);
876
877            let mut vec = Vec::new();
878            manager.setup_with_order(&["baz", "foo"], &mut vec);
879
880            assert!(manager.local);
881            assert_eq!(manager.vec_unready.len(), 0);
882            assert_eq!(manager.vec_ready.len(), 3);
883        }
884
885        assert_eq!(
886            *logger.lock().unwrap(),
887            vec![
888                "SyncDataSrc::new 1",
889                "SyncDataSrc::new 2",
890                "SyncDataSrc::new 3",
891                "SyncDataSrc::setup 3",
892                "SyncDataSrc::setup 1",
893                "SyncDataSrc::setup 2",
894                "SyncDataSrc::close 2",
895                "SyncDataSrc::drop 2",
896                "SyncDataSrc::close 1",
897                "SyncDataSrc::drop 1",
898                "SyncDataSrc::close 3",
899                "SyncDataSrc::drop 3",
900            ],
901        );
902    }
903
904    #[test]
905    fn test_of_setup_with_order_but_fail() {
906        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
907
908        {
909            let mut manager = DataSrcManager::new(true);
910
911            let ds1 = SyncDataSrc::new(1, logger.clone(), true);
912            manager.add("foo", ds1);
913
914            let ds2 = SyncDataSrc::new(2, logger.clone(), true);
915            manager.add("bar", ds2);
916
917            let ds3 = SyncDataSrc::new(3, logger.clone(), false);
918            manager.add("baz", ds3);
919
920            assert!(manager.local);
921            assert_eq!(manager.vec_unready.len(), 3);
922            assert_eq!(manager.vec_ready.len(), 0);
923
924            let mut vec = Vec::new();
925            manager.setup_with_order(&["baz", "foo"], &mut vec);
926
927            assert!(manager.local);
928            assert_eq!(manager.vec_unready.len(), 3);
929            assert_eq!(manager.vec_ready.len(), 0);
930        }
931
932        assert_eq!(
933            *logger.lock().unwrap(),
934            vec![
935                "SyncDataSrc::new 1",
936                "SyncDataSrc::new 2",
937                "SyncDataSrc::new 3",
938                "SyncDataSrc::setup 3",
939                "SyncDataSrc::setup 1 failed",
940                "SyncDataSrc::close 1",
941                "SyncDataSrc::close 3",
942                "SyncDataSrc::drop 2",
943                "SyncDataSrc::drop 1",
944                "SyncDataSrc::drop 3",
945            ],
946        );
947    }
948
949    #[test]
950    fn test_of_setup_with_order_containing_duplicated_name_and_ok() {
951        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
952
953        {
954            let mut manager = DataSrcManager::new(true);
955
956            let ds1 = SyncDataSrc::new(1, logger.clone(), false);
957            manager.add("foo", ds1);
958
959            let ds2 = SyncDataSrc::new(2, logger.clone(), false);
960            manager.add("bar", ds2);
961
962            let ds3 = SyncDataSrc::new(3, logger.clone(), false);
963            manager.add("baz", ds3);
964
965            assert!(manager.local);
966            assert_eq!(manager.vec_unready.len(), 3);
967            assert_eq!(manager.vec_ready.len(), 0);
968
969            let mut vec = Vec::new();
970            manager.setup_with_order(&["baz", "foo", "baz"], &mut vec);
971
972            assert!(manager.local);
973            assert_eq!(manager.vec_unready.len(), 0);
974            assert_eq!(manager.vec_ready.len(), 3);
975        }
976
977        assert_eq!(
978            *logger.lock().unwrap(),
979            vec![
980                "SyncDataSrc::new 1",
981                "SyncDataSrc::new 2",
982                "SyncDataSrc::new 3",
983                "SyncDataSrc::setup 3",
984                "SyncDataSrc::setup 1",
985                "SyncDataSrc::setup 2",
986                "SyncDataSrc::close 2",
987                "SyncDataSrc::drop 2",
988                "SyncDataSrc::close 1",
989                "SyncDataSrc::drop 1",
990                "SyncDataSrc::close 3",
991                "SyncDataSrc::drop 3",
992            ],
993        );
994    }
995
996    #[test]
997    fn test_of_copy_ds_ready_to_map() {
998        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
999        let mut errors = Vec::new();
1000
1001        let mut index_map = HashMap::<Arc<str>, (bool, usize)>::new();
1002
1003        let manager = DataSrcManager::new(true);
1004        manager.copy_ds_ready_to_map(&mut index_map);
1005        assert!(index_map.is_empty());
1006
1007        let mut manager = DataSrcManager::new(true);
1008        let ds1 = SyncDataSrc::new(1, logger.clone(), false);
1009        manager.add("foo", ds1);
1010        manager.setup(&mut errors);
1011        assert!(errors.is_empty());
1012        manager.copy_ds_ready_to_map(&mut index_map);
1013        assert_eq!(index_map.len(), 1);
1014        assert_eq!(index_map.get("foo").unwrap(), &(true, 0));
1015
1016        let mut manager = DataSrcManager::new(false);
1017        let ds2 = AsyncDataSrc::new(2, logger.clone(), false, 0);
1018        let ds3 = SyncDataSrc::new(3, logger.clone(), false);
1019        manager.add("bar", ds2);
1020        manager.add("baz", ds3);
1021        manager.setup(&mut errors);
1022        assert!(errors.is_empty());
1023        manager.copy_ds_ready_to_map(&mut index_map);
1024        assert_eq!(index_map.len(), 3);
1025        assert_eq!(index_map.get("foo").unwrap(), &(true, 0));
1026        assert_eq!(index_map.get("bar").unwrap(), &(false, 0));
1027        assert_eq!(index_map.get("baz").unwrap(), &(false, 1));
1028    }
1029
1030    #[test]
1031    fn test_of_create_data_conn_and_ok() {
1032        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1033        let mut errors = Vec::new();
1034
1035        let mut manager = DataSrcManager::new(true);
1036        let ds1 = SyncDataSrc::new(1, logger.clone(), false);
1037        manager.add("foo", ds1);
1038        manager.setup(&mut errors);
1039
1040        if let Ok(boxed) = manager.create_data_conn::<SyncDataConn>(0, "foo") {
1041            assert_eq!(boxed.name.clone(), "foo".into());
1042        } else {
1043            panic!();
1044        }
1045    }
1046
1047    #[test]
1048    fn test_of_create_data_conn_but_not_found() {
1049        let mut errors = Vec::new();
1050
1051        let mut manager = DataSrcManager::new(true);
1052        manager.setup(&mut errors);
1053
1054        if let Err(err) = manager.create_data_conn::<SyncDataConn>(0, "foo") {
1055            match err.reason::<DataSrcError>() {
1056                Ok(DataSrcError::NotFoundDataSrcToCreateDataConn {
1057                    name,
1058                    data_conn_type,
1059                }) => {
1060                    assert_eq!(*name, "foo".into());
1061                    assert_eq!(
1062                        *data_conn_type,
1063                        "sabi::data_src::tests_of_data_src::SyncDataConn"
1064                    );
1065                }
1066                _ => panic!(),
1067            }
1068        } else {
1069            panic!();
1070        }
1071    }
1072
1073    #[test]
1074    fn test_of_create_data_conn_but_fail_to_cast() {
1075        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1076        let mut errors = Vec::new();
1077
1078        let mut manager = DataSrcManager::new(true);
1079        let ds1 = SyncDataSrc::new(1, logger.clone(), false);
1080        manager.add("foo", ds1);
1081        manager.setup(&mut errors);
1082
1083        if let Err(err) = manager.create_data_conn::<AsyncDataConn>(0, "foo") {
1084            match err.reason::<DataSrcError>() {
1085                Ok(DataSrcError::FailToCastDataConn { name, target_type }) => {
1086                    assert_eq!(*name, "foo".into());
1087                    assert_eq!(
1088                        *target_type,
1089                        "sabi::data_src::tests_of_data_src::AsyncDataConn"
1090                    );
1091                }
1092                _ => panic!(),
1093            }
1094        } else {
1095            panic!();
1096        }
1097    }
1098
1099    #[test]
1100    fn test_of_create_data_conn_but_fail_to_create() {
1101        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1102        let mut errors = Vec::new();
1103
1104        let mut manager = DataSrcManager::new(true);
1105        let ds1 = SyncDataSrc::new_for_fail_to_create_data_conn(1, logger.clone());
1106        manager.add("foo", ds1);
1107        manager.setup(&mut errors);
1108
1109        if let Err(err) = manager.create_data_conn::<SyncDataConn>(0, "foo") {
1110            match err.reason::<DataSrcError>() {
1111                Ok(DataSrcError::FailToCreateDataConn {
1112                    name,
1113                    data_conn_type,
1114                }) => {
1115                    assert_eq!(*name, "foo".into());
1116                    assert_eq!(
1117                        *data_conn_type,
1118                        "sabi::data_src::tests_of_data_src::SyncDataConn"
1119                    );
1120                }
1121                _ => panic!(),
1122            }
1123        } else {
1124            panic!();
1125        }
1126    }
1127}