Skip to main content

sabi/data_src/
mod.rs

1// Copyright (C) 2024-2026 Takayuki Sato. All Rights Reserved.
2// This program is free software under MIT License.
3// See the file LICENSE in this distribution for more details.
4
5mod global_setup;
6
7pub(crate) use global_setup::{
8    copy_global_data_srcs_to_map, create_data_conn_from_global_data_src,
9};
10pub use global_setup::{create_static_data_src_container, setup, setup_with_order, uses};
11
12use crate::{
13    AsyncGroup, DataConn, DataConnContainer, DataSrc, DataSrcContainer, DataSrcManager,
14    SendSyncNonNull,
15};
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::{any, mem, ptr};
20
21/// An enum type representing the reasons for errors that can occur within [`DataSrc`] operations.
22#[derive(Debug)]
23pub enum DataSrcError {
24    /// Indicates a failure during the setup process of one or more global data sources.
25    /// Contains a vector of data source names and their corresponding errors.
26    FailToSetupGlobalDataSrcs {
27        /// The vector contains errors that occurred in each [`DataSrc`] object.
28        errors: Vec<(Arc<str>, errs::Err)>,
29    },
30
31    /// Indicates that a setup process for global data sources is currently ongoing.
32    DuringSetupGlobalDataSrcs,
33
34    /// Indicates that global data sources have already been set up.
35    AlreadySetupGlobalDataSrcs,
36
37    /// Indicates a failure to cast a retrieved [`DataConn`] to the expected type.
38    FailToCastDataConn {
39        /// The name of the data connection that failed to cast.
40        name: Arc<str>,
41        /// The type name to which the [`DataConn`] attempted to cast.
42        target_type: &'static str,
43    },
44
45    /// Indicates a failure to create a [`DataConn`] object from its [`DataSrc`].
46    FailToCreateDataConn {
47        /// The name of the data source that failed to be created.
48        name: Arc<str>,
49        /// The type name of the [`DataConn`] that failed to be created.
50        data_conn_type: &'static str,
51    },
52
53    /// Indicates that no [`DataSrc`] was found to create a [`DataConn`] for the specified name
54    /// and type.
55    NotFoundDataSrcToCreateDataConn {
56        /// The name of the data source that could not be found.
57        name: Arc<str>,
58        /// The type name of the [`DataConn`] that was requested.
59        data_conn_type: &'static str,
60    },
61}
62
63impl<S, C> DataSrcContainer<S, C>
64where
65    S: DataSrc<C>,
66    C: DataConn + 'static,
67{
68    pub(crate) fn new(name: impl Into<Arc<str>>, data_src: S, local: bool) -> Self {
69        Self {
70            drop_fn: drop_data_src::<S, C>,
71            setup_fn: setup_data_src::<S, C>,
72            close_fn: close_data_src::<S, C>,
73            create_data_conn_fn: create_data_conn::<S, C>,
74            is_data_conn_fn: is_data_conn::<C>,
75
76            local,
77            name: name.into(),
78            data_src,
79        }
80    }
81}
82
83fn drop_data_src<S, C>(ptr: *const DataSrcContainer)
84where
85    S: DataSrc<C>,
86    C: DataConn + 'static,
87{
88    let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
89    drop(unsafe { Box::from_raw(typed_ptr) });
90}
91
92fn setup_data_src<S, C>(ptr: *const DataSrcContainer, ag: &mut AsyncGroup) -> errs::Result<()>
93where
94    S: DataSrc<C>,
95    C: DataConn + 'static,
96{
97    let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
98    unsafe { (*typed_ptr).data_src.setup(ag) }
99}
100
101fn close_data_src<S, C>(ptr: *const DataSrcContainer)
102where
103    S: DataSrc<C>,
104    C: DataConn + 'static,
105{
106    let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
107    unsafe { (*typed_ptr).data_src.close() };
108}
109
110fn create_data_conn<S, C>(ptr: *const DataSrcContainer) -> errs::Result<Box<DataConnContainer<C>>>
111where
112    S: DataSrc<C>,
113    C: DataConn + 'static,
114{
115    let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
116    let conn: Box<C> = unsafe { (*typed_ptr).data_src.create_data_conn() }?;
117    let name = unsafe { &(*typed_ptr).name };
118    Ok(Box::new(DataConnContainer::<C>::new(
119        name.to_string(),
120        conn,
121    )))
122}
123
124fn is_data_conn<C>(type_id: any::TypeId) -> bool
125where
126    C: DataConn + 'static,
127{
128    any::TypeId::of::<C>() == type_id
129}
130
131impl DataSrcManager {
132    pub(crate) const fn new(local: bool) -> Self {
133        Self {
134            vec_unready: Vec::new(),
135            vec_ready: Vec::new(),
136            local,
137        }
138    }
139
140    pub(crate) fn prepend(&mut self, vec: Vec<SendSyncNonNull<DataSrcContainer>>) {
141        self.vec_unready.splice(0..0, vec);
142    }
143
144    pub(crate) fn add<S, C>(&mut self, name: impl Into<Arc<str>>, ds: S)
145    where
146        S: DataSrc<C>,
147        C: DataConn + 'static,
148    {
149        let boxed = Box::new(DataSrcContainer::<S, C>::new(name, ds, self.local));
150        let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
151        self.vec_unready.push(SendSyncNonNull::new(ptr));
152    }
153
154    pub(crate) fn remove(&mut self, name: impl AsRef<str>) {
155        let extracted_vec: Vec<_> = self
156            .vec_ready
157            .extract_if(.., |ssnnptr| {
158                unsafe { &(*ssnnptr.non_null_ptr.as_ptr()).name }.as_ref() == name.as_ref()
159            })
160            .collect();
161
162        for ssnnptr in extracted_vec.iter().rev() {
163            let ptr = ssnnptr.non_null_ptr.as_ptr();
164            let close_fn = unsafe { (*ptr).close_fn };
165            let drop_fn = unsafe { (*ptr).drop_fn };
166            close_fn(ptr);
167            drop_fn(ptr);
168        }
169
170        let extracted_vec: Vec<_> = self
171            .vec_unready
172            .extract_if(.., |ssnnptr| {
173                unsafe { &(*ssnnptr.non_null_ptr.as_ptr()).name }.as_ref() == name.as_ref()
174            })
175            .collect();
176
177        for ssnnptr in extracted_vec.iter().rev() {
178            let ptr = ssnnptr.non_null_ptr.as_ptr();
179            let drop_fn = unsafe { (*ptr).drop_fn };
180            drop_fn(ptr);
181        }
182    }
183
184    pub(crate) fn close(&mut self) {
185        let vec = mem::take(&mut self.vec_ready);
186        for ssnnptr in vec.into_iter().rev() {
187            let ptr = ssnnptr.non_null_ptr.as_ptr();
188            let close_fn = unsafe { (*ptr).close_fn };
189            let drop_fn = unsafe { (*ptr).drop_fn };
190            close_fn(ptr);
191            drop_fn(ptr);
192        }
193        let vec = mem::take(&mut self.vec_unready);
194        for ssnnptr in vec.into_iter().rev() {
195            let ptr = ssnnptr.non_null_ptr.as_ptr();
196            let drop_fn = unsafe { (*ptr).drop_fn };
197            drop_fn(ptr);
198        }
199    }
200
201    pub(crate) fn setup(&mut self, errors: &mut Vec<(Arc<str>, errs::Err)>) {
202        if self.vec_unready.is_empty() {
203            return;
204        }
205
206        let mut n_done = 0;
207        let mut ag = AsyncGroup::new();
208        for ssnnptr in self.vec_unready.iter() {
209            n_done += 1;
210            let ptr = ssnnptr.non_null_ptr.as_ptr();
211            let setup_fn = unsafe { (*ptr).setup_fn };
212            ag._name = unsafe { (*ptr).name.clone() };
213            if let Err(err) = setup_fn(ptr, &mut ag) {
214                errors.push((ag._name.clone(), err));
215                break;
216            }
217        }
218        ag.join_and_collect_errors(errors);
219
220        if errors.is_empty() {
221            self.vec_ready.append(&mut self.vec_unready);
222        } else {
223            for ssnnptr in self.vec_unready[0..n_done].iter().rev() {
224                let ptr = ssnnptr.non_null_ptr.as_ptr();
225                let close_fn = unsafe { (*ptr).close_fn };
226                close_fn(ptr);
227            }
228        }
229    }
230
231    pub(crate) fn setup_with_order(
232        &mut self,
233        names: &[&str],
234        errors: &mut Vec<(Arc<str>, errs::Err)>,
235    ) {
236        if self.vec_unready.is_empty() {
237            return;
238        }
239
240        let mut index_map: HashMap<&str, usize> = HashMap::with_capacity(names.len());
241        // To overwrite later indexed elements with eariler ones when names overlap
242        for (i, nm) in names.iter().rev().enumerate() {
243            index_map.insert(*nm, names.len() - 1 - i);
244        }
245
246        let vec_unready = mem::take(&mut self.vec_unready);
247
248        let mut ordered_vec: Vec<Option<SendSyncNonNull<DataSrcContainer>>> =
249            vec![None; index_map.len()];
250        for ssnnptr in vec_unready.into_iter() {
251            let ptr = ssnnptr.non_null_ptr.as_ptr();
252            let name = unsafe { (*ptr).name.clone() };
253            if let Some(index) = index_map.remove(name.as_ref()) {
254                ordered_vec[index] = Some(ssnnptr);
255            } else {
256                ordered_vec.push(Some(ssnnptr));
257            }
258        }
259
260        let mut n_done = 0;
261        let mut ag = AsyncGroup::new();
262        for ssnnptr_opt in ordered_vec.iter() {
263            n_done += 1;
264            if let Some(ssnnptr) = ssnnptr_opt {
265                let ptr = ssnnptr.non_null_ptr.as_ptr();
266                let setup_fn = unsafe { (*ptr).setup_fn };
267                ag._name = unsafe { (*ptr).name.clone() };
268                if let Err(err) = setup_fn(ptr, &mut ag) {
269                    errors.push((ag._name.clone(), err));
270                    break;
271                }
272            }
273        }
274        ag.join_and_collect_errors(errors);
275
276        if errors.is_empty() {
277            for ssnnptr in ordered_vec.into_iter().flatten() {
278                self.vec_ready.push(ssnnptr);
279            }
280        } else {
281            for ssnnptr in ordered_vec[0..n_done].iter().flatten().rev() {
282                let ptr = ssnnptr.non_null_ptr.as_ptr();
283                let close_fn = unsafe { (*ptr).close_fn };
284                close_fn(ptr);
285            }
286            for ssnnptr in ordered_vec.into_iter().flatten() {
287                self.vec_unready.push(ssnnptr);
288            }
289        }
290    }
291
292    pub(crate) fn copy_ds_ready_to_map(&self, index_map: &mut HashMap<Arc<str>, (bool, usize)>) {
293        for (i, ssnnptr) in self.vec_ready.iter().enumerate() {
294            let ptr = ssnnptr.non_null_ptr.as_ptr();
295            let name = unsafe { (*ptr).name.clone() };
296            index_map.insert(name, (self.local, i));
297        }
298    }
299
300    pub(crate) fn create_data_conn<C>(
301        &self,
302        index: usize,
303        name: impl AsRef<str>,
304    ) -> errs::Result<Box<DataConnContainer>>
305    where
306        C: DataConn + 'static,
307    {
308        if let Some(ssnnptr) = self.vec_ready.get(index) {
309            let ptr = ssnnptr.non_null_ptr.as_ptr();
310            let type_id = any::TypeId::of::<C>();
311            let is_fn = unsafe { (*ptr).is_data_conn_fn };
312            let create_data_conn_fn = unsafe { (*ptr).create_data_conn_fn };
313            if !is_fn(type_id) {
314                Err(errs::Err::new(DataSrcError::FailToCastDataConn {
315                    name: name.as_ref().into(),
316                    target_type: any::type_name::<C>(),
317                }))
318            } else {
319                match create_data_conn_fn(ptr) {
320                    Ok(boxed) => Ok(boxed),
321                    Err(err) => Err(errs::Err::with_source(
322                        DataSrcError::FailToCreateDataConn {
323                            name: name.as_ref().into(),
324                            data_conn_type: any::type_name::<C>(),
325                        },
326                        err,
327                    )),
328                }
329            }
330        } else {
331            Err(errs::Err::new(
332                DataSrcError::NotFoundDataSrcToCreateDataConn {
333                    name: name.as_ref().into(),
334                    data_conn_type: any::type_name::<C>(),
335                },
336            ))
337        }
338    }
339}
340
341impl Drop for DataSrcManager {
342    fn drop(&mut self) {
343        self.close();
344    }
345}
346
347#[cfg(test)]
348mod tests_of_data_src {
349    use super::*;
350    use std::sync::{Arc, Mutex};
351
352    struct SyncDataConn {}
353    impl SyncDataConn {
354        fn new() -> Self {
355            Self {}
356        }
357    }
358    impl DataConn for SyncDataConn {
359        fn commit(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
360            Ok(())
361        }
362        fn rollback(&mut self, _ag: &mut AsyncGroup) {}
363        fn close(&mut self) {}
364    }
365
366    struct AsyncDataConn {}
367    impl AsyncDataConn {
368        fn new() -> Self {
369            Self {}
370        }
371    }
372    impl DataConn for AsyncDataConn {
373        fn commit(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
374            Ok(())
375        }
376        fn rollback(&mut self, _ag: &mut AsyncGroup) {}
377        fn close(&mut self) {}
378    }
379
380    struct SyncDataSrc {
381        id: i8,
382        logger: Arc<Mutex<Vec<String>>>,
383        fail_to_setup: bool,
384        fail_to_create_data_conn: bool,
385    }
386    impl SyncDataSrc {
387        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail_to_setup: bool) -> Self {
388            logger
389                .lock()
390                .unwrap()
391                .push(format!("SyncDataSrc::new {}", id));
392            Self {
393                id,
394                logger: logger,
395                fail_to_setup,
396                fail_to_create_data_conn: false,
397            }
398        }
399        fn new_for_fail_to_create_data_conn(id: i8, logger: Arc<Mutex<Vec<String>>>) -> Self {
400            Self {
401                id,
402                logger: logger,
403                fail_to_setup: false,
404                fail_to_create_data_conn: true,
405            }
406        }
407    }
408    impl Drop for SyncDataSrc {
409        fn drop(&mut self) {
410            self.logger
411                .lock()
412                .unwrap()
413                .push(format!("SyncDataSrc::drop {}", self.id));
414        }
415    }
416    impl DataSrc<SyncDataConn> for SyncDataSrc {
417        fn setup(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
418            if self.fail_to_setup {
419                self.logger
420                    .lock()
421                    .unwrap()
422                    .push(format!("SyncDataSrc::setup {} failed", self.id));
423                return Err(errs::Err::new("XXX".to_string()));
424            }
425            self.logger
426                .lock()
427                .unwrap()
428                .push(format!("SyncDataSrc::setup {}", self.id));
429            Ok(())
430        }
431        fn close(&mut self) {
432            self.logger
433                .lock()
434                .unwrap()
435                .push(format!("SyncDataSrc::close {}", self.id));
436        }
437        fn create_data_conn(&mut self) -> errs::Result<Box<SyncDataConn>> {
438            {
439                self.logger
440                    .lock()
441                    .unwrap()
442                    .push(format!("SyncDataSrc::create_data_conn {}", self.id));
443            }
444            if self.fail_to_create_data_conn {
445                return Err(errs::Err::new("eeee".to_string()));
446            }
447            let conn = SyncDataConn::new();
448            Ok(Box::new(conn))
449        }
450    }
451
452    struct AsyncDataSrc {
453        id: i8,
454        fail: bool,
455        logger: Arc<Mutex<Vec<String>>>,
456        wait: u64,
457    }
458    impl AsyncDataSrc {
459        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: bool, wait: u64) -> Self {
460            logger
461                .lock()
462                .unwrap()
463                .push(format!("AsyncDataSrc::new {}", id));
464            Self {
465                id,
466                fail,
467                logger,
468                wait,
469            }
470        }
471    }
472    impl Drop for AsyncDataSrc {
473        fn drop(&mut self) {
474            self.logger
475                .lock()
476                .unwrap()
477                .push(format!("AsyncDataSrc::drop {}", self.id));
478        }
479    }
480    impl DataSrc<AsyncDataConn> for AsyncDataSrc {
481        fn setup(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
482            let logger = self.logger.clone();
483            let fail = self.fail;
484            let id = self.id;
485            let wait = self.wait;
486            ag.add(move || {
487                std::thread::sleep(std::time::Duration::from_millis(wait));
488                let mut logger = logger.lock().unwrap();
489                if fail {
490                    logger.push(format!("AsyncDataSrc::setup {} failed to setup", id));
491                    return Err(errs::Err::new("XXX".to_string()));
492                }
493                logger.push(format!("AsyncDataSrc::setup {}", id));
494                Ok(())
495            });
496            Ok(())
497        }
498        fn close(&mut self) {
499            self.logger
500                .lock()
501                .unwrap()
502                .push(format!("AsyncDataSrc::close {}", self.id));
503        }
504        fn create_data_conn(&mut self) -> errs::Result<Box<AsyncDataConn>> {
505            {
506                self.logger
507                    .lock()
508                    .unwrap()
509                    .push(format!("AsyncDataSrc::create_data_conn {}", self.id));
510            }
511            let conn = AsyncDataConn::new();
512            Ok(Box::new(conn))
513        }
514    }
515
516    #[test]
517    fn test_of_new() {
518        let manager = DataSrcManager::new(true);
519        assert!(manager.local);
520        assert_eq!(manager.vec_unready.len(), 0);
521        assert_eq!(manager.vec_ready.len(), 0);
522
523        let manager = DataSrcManager::new(false);
524        assert!(!manager.local);
525        assert_eq!(manager.vec_unready.len(), 0);
526        assert_eq!(manager.vec_ready.len(), 0);
527    }
528
529    #[test]
530    fn test_of_prepend() {
531        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
532
533        {
534            let mut vec = Vec::<SendSyncNonNull<DataSrcContainer>>::new();
535
536            let ds = SyncDataSrc::new(1, logger.clone(), false);
537            let boxed = Box::new(DataSrcContainer::new("foo", ds, true));
538            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
539            vec.push(SendSyncNonNull::new(ptr));
540
541            let ds = AsyncDataSrc::new(2, logger.clone(), false, 0);
542            let boxed = Box::new(DataSrcContainer::new("bar", ds, true));
543            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
544            vec.push(SendSyncNonNull::new(ptr));
545
546            let mut manager = DataSrcManager::new(true);
547            manager.prepend(vec);
548
549            assert!(manager.local);
550            assert_eq!(manager.vec_unready.len(), 2);
551            assert_eq!(manager.vec_ready.len(), 0);
552
553            assert_eq!(
554                unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
555                "foo".into()
556            );
557            assert_eq!(
558                unsafe { manager.vec_unready[1].non_null_ptr.as_ref().name.clone() },
559                "bar".into()
560            );
561
562            let mut vec = Vec::<SendSyncNonNull<DataSrcContainer>>::new();
563
564            let ds = SyncDataSrc::new(3, logger.clone(), false);
565            let boxed = Box::new(DataSrcContainer::new("baz", ds, true));
566            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
567            vec.push(SendSyncNonNull::new(ptr));
568
569            let ds = AsyncDataSrc::new(4, logger.clone(), false, 0);
570            let boxed = Box::new(DataSrcContainer::new("qux", ds, true));
571            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
572            vec.push(SendSyncNonNull::new(ptr));
573
574            manager.prepend(vec);
575
576            assert!(manager.local);
577            assert_eq!(manager.vec_unready.len(), 4);
578            assert_eq!(manager.vec_ready.len(), 0);
579
580            assert_eq!(
581                unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
582                "baz".into()
583            );
584            assert_eq!(
585                unsafe { manager.vec_unready[1].non_null_ptr.as_ref().name.clone() },
586                "qux".into()
587            );
588            assert_eq!(
589                unsafe { manager.vec_unready[2].non_null_ptr.as_ref().name.clone() },
590                "foo".into()
591            );
592            assert_eq!(
593                unsafe { manager.vec_unready[3].non_null_ptr.as_ref().name.clone() },
594                "bar".into()
595            );
596        }
597
598        assert_eq!(
599            *logger.lock().unwrap(),
600            vec![
601                "SyncDataSrc::new 1",
602                "AsyncDataSrc::new 2",
603                "SyncDataSrc::new 3",
604                "AsyncDataSrc::new 4",
605                "AsyncDataSrc::drop 2",
606                "SyncDataSrc::drop 1",
607                "AsyncDataSrc::drop 4",
608                "SyncDataSrc::drop 3",
609            ],
610        );
611    }
612
613    #[test]
614    fn test_of_add() {
615        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
616
617        {
618            let mut manager = DataSrcManager::new(true);
619
620            let ds = SyncDataSrc::new(1, logger.clone(), false);
621            manager.add("foo", ds);
622
623            assert!(manager.local);
624            assert_eq!(manager.vec_unready.len(), 1);
625            assert_eq!(manager.vec_ready.len(), 0);
626
627            assert_eq!(
628                unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
629                "foo".into()
630            );
631
632            let ds = AsyncDataSrc::new(2, logger.clone(), false, 0);
633            manager.add("bar", ds);
634
635            assert!(manager.local);
636            assert_eq!(manager.vec_unready.len(), 2);
637            assert_eq!(manager.vec_ready.len(), 0);
638
639            assert_eq!(
640                unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
641                "foo".into()
642            );
643            assert_eq!(
644                unsafe { manager.vec_unready[1].non_null_ptr.as_ref().name.clone() },
645                "bar".into()
646            );
647        }
648
649        assert_eq!(
650            *logger.lock().unwrap(),
651            vec![
652                "SyncDataSrc::new 1",
653                "AsyncDataSrc::new 2",
654                "AsyncDataSrc::drop 2",
655                "SyncDataSrc::drop 1",
656            ],
657        );
658    }
659
660    #[test]
661    fn test_of_remove() {
662        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
663
664        {
665            let mut manager = DataSrcManager::new(true);
666
667            let ds1 = SyncDataSrc::new(1, logger.clone(), false);
668            let boxed = Box::new(DataSrcContainer::new("foo", ds1, true));
669            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
670            manager.vec_unready.push(SendSyncNonNull::new(ptr));
671
672            let ds2 = AsyncDataSrc::new(2, logger.clone(), false, 0);
673            let boxed = Box::new(DataSrcContainer::new("bar", ds2, true));
674            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
675            manager.vec_unready.push(SendSyncNonNull::new(ptr));
676
677            let ds3 = SyncDataSrc::new(3, logger.clone(), false);
678            let boxed = Box::new(DataSrcContainer::new("baz", ds3, true));
679            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
680            manager.vec_ready.push(SendSyncNonNull::new(ptr));
681
682            let ds4 = AsyncDataSrc::new(4, logger.clone(), false, 0);
683            let boxed = Box::new(DataSrcContainer::new("qux", ds4, true));
684            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
685            manager.vec_ready.push(SendSyncNonNull::new(ptr));
686
687            assert!(manager.local);
688            assert_eq!(manager.vec_unready.len(), 2);
689            assert_eq!(manager.vec_ready.len(), 2);
690
691            manager.remove("baz");
692            manager.remove("foo");
693            manager.remove("qux");
694            manager.remove("bar");
695        }
696
697        assert_eq!(
698            *logger.lock().unwrap(),
699            vec![
700                "SyncDataSrc::new 1",
701                "AsyncDataSrc::new 2",
702                "SyncDataSrc::new 3",
703                "AsyncDataSrc::new 4",
704                "SyncDataSrc::close 3",
705                "SyncDataSrc::drop 3",
706                "SyncDataSrc::drop 1",
707                "AsyncDataSrc::close 4",
708                "AsyncDataSrc::drop 4",
709                "AsyncDataSrc::drop 2",
710            ],
711        );
712    }
713
714    #[test]
715    fn test_of_close() {
716        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
717
718        {
719            let mut manager = DataSrcManager::new(true);
720
721            let ds1 = SyncDataSrc::new(1, logger.clone(), false);
722            let boxed = Box::new(DataSrcContainer::new("foo", ds1, true));
723            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
724            manager.vec_unready.push(SendSyncNonNull::new(ptr));
725
726            let ds2 = AsyncDataSrc::new(2, logger.clone(), false, 0);
727            let boxed = Box::new(DataSrcContainer::new("bar", ds2, true));
728            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
729            manager.vec_unready.push(SendSyncNonNull::new(ptr));
730
731            let ds3 = SyncDataSrc::new(3, logger.clone(), false);
732            let boxed = Box::new(DataSrcContainer::new("baz", ds3, true));
733            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
734            manager.vec_ready.push(SendSyncNonNull::new(ptr));
735
736            let ds4 = AsyncDataSrc::new(4, logger.clone(), false, 0);
737            let boxed = Box::new(DataSrcContainer::new("qux", ds4, true));
738            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
739            manager.vec_ready.push(SendSyncNonNull::new(ptr));
740
741            assert!(manager.local);
742            assert_eq!(manager.vec_unready.len(), 2);
743            assert_eq!(manager.vec_ready.len(), 2);
744
745            manager.close();
746        }
747
748        assert_eq!(
749            *logger.lock().unwrap(),
750            vec![
751                "SyncDataSrc::new 1",
752                "AsyncDataSrc::new 2",
753                "SyncDataSrc::new 3",
754                "AsyncDataSrc::new 4",
755                "AsyncDataSrc::close 4",
756                "AsyncDataSrc::drop 4",
757                "SyncDataSrc::close 3",
758                "SyncDataSrc::drop 3",
759                "AsyncDataSrc::drop 2",
760                "SyncDataSrc::drop 1",
761            ],
762        );
763    }
764
765    #[test]
766    fn test_of_setup_and_ok() {
767        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
768
769        {
770            let mut manager = DataSrcManager::new(true);
771
772            let ds1 = SyncDataSrc::new(1, logger.clone(), false);
773            manager.add("foo", ds1);
774
775            let ds2 = SyncDataSrc::new(2, logger.clone(), false);
776            manager.add("bar", ds2);
777
778            assert!(manager.local);
779            assert_eq!(manager.vec_unready.len(), 2);
780            assert_eq!(manager.vec_ready.len(), 0);
781
782            let mut vec = Vec::new();
783            manager.setup(&mut vec);
784
785            assert!(manager.local);
786            assert_eq!(manager.vec_unready.len(), 0);
787            assert_eq!(manager.vec_ready.len(), 2);
788        }
789
790        assert_eq!(
791            *logger.lock().unwrap(),
792            vec![
793                "SyncDataSrc::new 1",
794                "SyncDataSrc::new 2",
795                "SyncDataSrc::setup 1",
796                "SyncDataSrc::setup 2",
797                "SyncDataSrc::close 2",
798                "SyncDataSrc::drop 2",
799                "SyncDataSrc::close 1",
800                "SyncDataSrc::drop 1",
801            ],
802        );
803    }
804
805    #[test]
806    fn test_of_setup_but_error() {
807        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
808
809        {
810            let mut manager = DataSrcManager::new(true);
811
812            let ds1 = SyncDataSrc::new(1, logger.clone(), false);
813            manager.add("foo", ds1);
814
815            let ds2 = SyncDataSrc::new(2, logger.clone(), true);
816            manager.add("bar", ds2);
817
818            let ds3 = SyncDataSrc::new(3, logger.clone(), true);
819            manager.add("bar", ds3);
820
821            assert!(manager.local);
822            assert_eq!(manager.vec_unready.len(), 3);
823            assert_eq!(manager.vec_ready.len(), 0);
824
825            let mut vec = Vec::new();
826            manager.setup(&mut vec);
827
828            assert!(manager.local);
829            assert_eq!(manager.vec_unready.len(), 3);
830            assert_eq!(manager.vec_ready.len(), 0);
831        }
832
833        assert_eq!(
834            *logger.lock().unwrap(),
835            vec![
836                "SyncDataSrc::new 1",
837                "SyncDataSrc::new 2",
838                "SyncDataSrc::new 3",
839                "SyncDataSrc::setup 1",
840                "SyncDataSrc::setup 2 failed",
841                "SyncDataSrc::close 2",
842                "SyncDataSrc::close 1",
843                "SyncDataSrc::drop 3",
844                "SyncDataSrc::drop 2",
845                "SyncDataSrc::drop 1",
846            ],
847        );
848    }
849
850    #[test]
851    fn test_of_setup_with_order_and_ok() {
852        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
853
854        {
855            let mut manager = DataSrcManager::new(true);
856
857            let ds1 = SyncDataSrc::new(1, logger.clone(), false);
858            manager.add("foo", ds1);
859
860            let ds2 = SyncDataSrc::new(2, logger.clone(), false);
861            manager.add("bar", ds2);
862
863            let ds3 = SyncDataSrc::new(3, logger.clone(), false);
864            manager.add("baz", ds3);
865
866            assert!(manager.local);
867            assert_eq!(manager.vec_unready.len(), 3);
868            assert_eq!(manager.vec_ready.len(), 0);
869
870            let mut vec = Vec::new();
871            manager.setup_with_order(&["baz", "foo"], &mut vec);
872
873            assert!(manager.local);
874            assert_eq!(manager.vec_unready.len(), 0);
875            assert_eq!(manager.vec_ready.len(), 3);
876        }
877
878        assert_eq!(
879            *logger.lock().unwrap(),
880            vec![
881                "SyncDataSrc::new 1",
882                "SyncDataSrc::new 2",
883                "SyncDataSrc::new 3",
884                "SyncDataSrc::setup 3",
885                "SyncDataSrc::setup 1",
886                "SyncDataSrc::setup 2",
887                "SyncDataSrc::close 2",
888                "SyncDataSrc::drop 2",
889                "SyncDataSrc::close 1",
890                "SyncDataSrc::drop 1",
891                "SyncDataSrc::close 3",
892                "SyncDataSrc::drop 3",
893            ],
894        );
895    }
896
897    #[test]
898    fn test_of_setup_with_order_but_fail() {
899        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
900
901        {
902            let mut manager = DataSrcManager::new(true);
903
904            let ds1 = SyncDataSrc::new(1, logger.clone(), true);
905            manager.add("foo", ds1);
906
907            let ds2 = SyncDataSrc::new(2, logger.clone(), true);
908            manager.add("bar", ds2);
909
910            let ds3 = SyncDataSrc::new(3, logger.clone(), false);
911            manager.add("baz", ds3);
912
913            assert!(manager.local);
914            assert_eq!(manager.vec_unready.len(), 3);
915            assert_eq!(manager.vec_ready.len(), 0);
916
917            let mut vec = Vec::new();
918            manager.setup_with_order(&["baz", "foo"], &mut vec);
919
920            assert!(manager.local);
921            assert_eq!(manager.vec_unready.len(), 3);
922            assert_eq!(manager.vec_ready.len(), 0);
923        }
924
925        assert_eq!(
926            *logger.lock().unwrap(),
927            vec![
928                "SyncDataSrc::new 1",
929                "SyncDataSrc::new 2",
930                "SyncDataSrc::new 3",
931                "SyncDataSrc::setup 3",
932                "SyncDataSrc::setup 1 failed",
933                "SyncDataSrc::close 1",
934                "SyncDataSrc::close 3",
935                "SyncDataSrc::drop 2",
936                "SyncDataSrc::drop 1",
937                "SyncDataSrc::drop 3",
938            ],
939        );
940    }
941
942    #[test]
943    fn test_of_setup_with_order_containing_duplicated_name_and_ok() {
944        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
945
946        {
947            let mut manager = DataSrcManager::new(true);
948
949            let ds1 = SyncDataSrc::new(1, logger.clone(), false);
950            manager.add("foo", ds1);
951
952            let ds2 = SyncDataSrc::new(2, logger.clone(), false);
953            manager.add("bar", ds2);
954
955            let ds3 = SyncDataSrc::new(3, logger.clone(), false);
956            manager.add("baz", ds3);
957
958            assert!(manager.local);
959            assert_eq!(manager.vec_unready.len(), 3);
960            assert_eq!(manager.vec_ready.len(), 0);
961
962            let mut vec = Vec::new();
963            manager.setup_with_order(&["baz", "foo", "baz"], &mut vec);
964
965            assert!(manager.local);
966            assert_eq!(manager.vec_unready.len(), 0);
967            assert_eq!(manager.vec_ready.len(), 3);
968        }
969
970        assert_eq!(
971            *logger.lock().unwrap(),
972            vec![
973                "SyncDataSrc::new 1",
974                "SyncDataSrc::new 2",
975                "SyncDataSrc::new 3",
976                "SyncDataSrc::setup 3",
977                "SyncDataSrc::setup 1",
978                "SyncDataSrc::setup 2",
979                "SyncDataSrc::close 2",
980                "SyncDataSrc::drop 2",
981                "SyncDataSrc::close 1",
982                "SyncDataSrc::drop 1",
983                "SyncDataSrc::close 3",
984                "SyncDataSrc::drop 3",
985            ],
986        );
987    }
988
989    #[test]
990    fn test_of_copy_ds_ready_to_map() {
991        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
992        let mut errors = Vec::new();
993
994        let mut index_map = HashMap::<Arc<str>, (bool, usize)>::new();
995
996        let manager = DataSrcManager::new(true);
997        manager.copy_ds_ready_to_map(&mut index_map);
998        assert!(index_map.is_empty());
999
1000        let mut manager = DataSrcManager::new(true);
1001        let ds1 = SyncDataSrc::new(1, logger.clone(), false);
1002        manager.add("foo", ds1);
1003        manager.setup(&mut errors);
1004        assert!(errors.is_empty());
1005        manager.copy_ds_ready_to_map(&mut index_map);
1006        assert_eq!(index_map.len(), 1);
1007        assert_eq!(index_map.get("foo").unwrap(), &(true, 0));
1008
1009        let mut manager = DataSrcManager::new(false);
1010        let ds2 = AsyncDataSrc::new(2, logger.clone(), false, 0);
1011        let ds3 = SyncDataSrc::new(3, logger.clone(), false);
1012        manager.add("bar", ds2);
1013        manager.add("baz", ds3);
1014        manager.setup(&mut errors);
1015        assert!(errors.is_empty());
1016        manager.copy_ds_ready_to_map(&mut index_map);
1017        assert_eq!(index_map.len(), 3);
1018        assert_eq!(index_map.get("foo").unwrap(), &(true, 0));
1019        assert_eq!(index_map.get("bar").unwrap(), &(false, 0));
1020        assert_eq!(index_map.get("baz").unwrap(), &(false, 1));
1021    }
1022
1023    #[test]
1024    fn test_of_create_data_conn_and_ok() {
1025        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1026        let mut errors = Vec::new();
1027
1028        let mut manager = DataSrcManager::new(true);
1029        let ds1 = SyncDataSrc::new(1, logger.clone(), false);
1030        manager.add("foo", ds1);
1031        manager.setup(&mut errors);
1032
1033        if let Ok(boxed) = manager.create_data_conn::<SyncDataConn>(0, "foo") {
1034            assert_eq!(boxed.name.clone(), "foo".into());
1035        } else {
1036            panic!();
1037        }
1038    }
1039
1040    #[test]
1041    fn test_of_create_data_conn_but_not_found() {
1042        let mut errors = Vec::new();
1043
1044        let mut manager = DataSrcManager::new(true);
1045        manager.setup(&mut errors);
1046
1047        if let Err(err) = manager.create_data_conn::<SyncDataConn>(0, "foo") {
1048            match err.reason::<DataSrcError>() {
1049                Ok(DataSrcError::NotFoundDataSrcToCreateDataConn {
1050                    name,
1051                    data_conn_type,
1052                }) => {
1053                    assert_eq!(*name, "foo".into());
1054                    assert_eq!(
1055                        *data_conn_type,
1056                        "sabi::data_src::tests_of_data_src::SyncDataConn"
1057                    );
1058                }
1059                _ => panic!(),
1060            }
1061        } else {
1062            panic!();
1063        }
1064    }
1065
1066    #[test]
1067    fn test_of_create_data_conn_but_fail_to_cast() {
1068        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1069        let mut errors = Vec::new();
1070
1071        let mut manager = DataSrcManager::new(true);
1072        let ds1 = SyncDataSrc::new(1, logger.clone(), false);
1073        manager.add("foo", ds1);
1074        manager.setup(&mut errors);
1075
1076        if let Err(err) = manager.create_data_conn::<AsyncDataConn>(0, "foo") {
1077            match err.reason::<DataSrcError>() {
1078                Ok(DataSrcError::FailToCastDataConn { name, target_type }) => {
1079                    assert_eq!(*name, "foo".into());
1080                    assert_eq!(
1081                        *target_type,
1082                        "sabi::data_src::tests_of_data_src::AsyncDataConn"
1083                    );
1084                }
1085                _ => panic!(),
1086            }
1087        } else {
1088            panic!();
1089        }
1090    }
1091
1092    #[test]
1093    fn test_of_create_data_conn_but_fail_to_create() {
1094        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1095        let mut errors = Vec::new();
1096
1097        let mut manager = DataSrcManager::new(true);
1098        let ds1 = SyncDataSrc::new_for_fail_to_create_data_conn(1, logger.clone());
1099        manager.add("foo", ds1);
1100        manager.setup(&mut errors);
1101
1102        if let Err(err) = manager.create_data_conn::<SyncDataConn>(0, "foo") {
1103            match err.reason::<DataSrcError>() {
1104                Ok(DataSrcError::FailToCreateDataConn {
1105                    name,
1106                    data_conn_type,
1107                }) => {
1108                    assert_eq!(*name, "foo".into());
1109                    assert_eq!(
1110                        *data_conn_type,
1111                        "sabi::data_src::tests_of_data_src::SyncDataConn"
1112                    );
1113                }
1114                _ => panic!(),
1115            }
1116        } else {
1117            panic!();
1118        }
1119    }
1120}