Skip to main content

sabi/tokio/data_src/
mod.rs

1// Copyright (C) 2024-2026 Takayuki Sato. All Rights Reserved.
2// This program is free software under MIT License.
3// See the file LICENSE in this distribution for more details.
4
5mod global_setup;
6
7pub(crate) use global_setup::{
8    copy_global_data_srcs_to_map, create_data_conn_from_global_data_src_async,
9};
10pub use global_setup::{
11    create_static_data_src_container, setup_async, setup_with_order_async, uses_async,
12};
13
14use crate::tokio::{
15    AsyncGroup, DataConn, DataConnContainer, DataSrc, DataSrcContainer, DataSrcManager,
16    SendSyncNonNull,
17};
18
19use std::collections::HashMap;
20use std::future::Future;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::{any, mem, ptr};
24
25/// Represents errors that can occur during data source operations.
26#[derive(Debug)]
27pub enum DataSrcError {
28    /// An error indicating that one or more global data sources failed during their setup process.
29    FailToSetupGlobalDataSrcs {
30        /// A vector of errors, each containing the name of the data source and the error itself.
31        errors: Vec<(Arc<str>, errs::Err)>,
32    },
33
34    /// An error indicating that a global data source setup is currently in progress.
35    DuringSetupGlobalDataSrcs,
36
37    /// An error indicating that global data sources have already been set up.
38    AlreadySetupGlobalDataSrcs,
39
40    /// An error indicating that a data connection could not be cast to the target type.
41    FailToCastDataConn {
42        /// The name of the data source that failed to provide the correct connection type.
43        name: Arc<str>,
44        /// The string representation of the target data connection type that was requested.
45        target_type: &'static str,
46    },
47
48    /// An error indicating that a data connection could not be created by its data source.
49    FailToCreateDataConn {
50        /// The name of the data source that failed to create a data connection.
51        name: Arc<str>,
52        /// The string representation of the data connection type that was requested.
53        data_conn_type: &'static str,
54    },
55
56    /// An error indicating that no data source was found for the requested data connection.
57    NotFoundDataSrcToCreateDataConn {
58        /// The name of the data source that was not found.
59        name: Arc<str>,
60        /// The string representation of the data connection type that was requested.
61        data_conn_type: &'static str,
62    },
63}
64
65impl<S, C> DataSrcContainer<S, C>
66where
67    S: DataSrc<C> + 'static,
68    C: DataConn + 'static,
69{
70    pub(crate) fn new(name: impl Into<Arc<str>>, data_src: S, local: bool) -> Self {
71        Self {
72            drop_fn: drop_data_src::<S, C>,
73            close_fn: close_data_src::<S, C>,
74            is_data_conn_fn: is_data_conn::<C>,
75
76            setup_fn: setup_data_src_async::<S, C>,
77            create_data_conn_fn: create_data_conn_async::<S, C>,
78
79            local,
80            name: name.into(),
81            data_src,
82        }
83    }
84}
85
86fn drop_data_src<S, C>(ptr: *const DataSrcContainer)
87where
88    S: DataSrc<C>,
89    C: DataConn + 'static,
90{
91    let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
92    drop(unsafe { Box::from_raw(typed_ptr) });
93}
94
95fn close_data_src<S, C>(ptr: *const DataSrcContainer)
96where
97    S: DataSrc<C>,
98    C: DataConn + 'static,
99{
100    let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
101    unsafe { (*typed_ptr).data_src.close() };
102}
103
104fn is_data_conn<C>(type_id: any::TypeId) -> bool
105where
106    C: DataConn + 'static,
107{
108    any::TypeId::of::<C>() == type_id
109}
110
111fn setup_data_src_async<S, C>(
112    ptr: *const DataSrcContainer,
113    ag: &mut AsyncGroup,
114) -> Pin<Box<dyn Future<Output = errs::Result<()>> + Send + '_>>
115where
116    S: DataSrc<C> + 'static,
117    C: DataConn + 'static,
118{
119    let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
120    let data_src = unsafe { &mut (*typed_ptr).data_src };
121    Box::pin(data_src.setup_async(ag))
122}
123
124#[allow(clippy::type_complexity)]
125fn create_data_conn_async<'a, S, C>(
126    ptr: *const DataSrcContainer,
127) -> Pin<Box<dyn Future<Output = errs::Result<Box<DataConnContainer<C>>>> + 'a>>
128where
129    S: DataSrc<C> + 'a,
130    C: DataConn + 'static,
131{
132    let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
133    let data_src = unsafe { &mut (*typed_ptr).data_src };
134    let name = unsafe { &(*typed_ptr).name };
135    Box::pin(async move {
136        let conn: Box<C> = data_src.create_data_conn_async().await?;
137        Ok(Box::new(DataConnContainer::<C>::new(
138            name.to_string(),
139            conn,
140        )))
141    })
142}
143
144impl DataSrcManager {
145    pub(crate) const fn new(local: bool) -> Self {
146        Self {
147            vec_unready: Vec::new(),
148            vec_ready: Vec::new(),
149            local,
150        }
151    }
152
153    pub(crate) fn prepend(&mut self, vec: Vec<SendSyncNonNull<DataSrcContainer>>) {
154        self.vec_unready.splice(0..0, vec);
155    }
156
157    pub(crate) fn add<S, C>(&mut self, name: impl Into<Arc<str>>, ds: S)
158    where
159        S: DataSrc<C> + 'static,
160        C: DataConn + 'static,
161    {
162        let boxed = Box::new(DataSrcContainer::<S, C>::new(name, ds, self.local));
163        let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
164        self.vec_unready.push(SendSyncNonNull::new(ptr));
165    }
166
167    pub(crate) fn remove(&mut self, name: impl AsRef<str>) {
168        let extracted_vec: Vec<_> = self
169            .vec_ready
170            .extract_if(.., |ssnnptr| {
171                unsafe { &(*ssnnptr.non_null_ptr.as_ptr()).name }.as_ref() == name.as_ref()
172            })
173            .collect();
174
175        for ssnnptr in extracted_vec.iter().rev() {
176            let ptr = ssnnptr.non_null_ptr.as_ptr();
177            let close_fn = unsafe { (*ptr).close_fn };
178            let drop_fn = unsafe { (*ptr).drop_fn };
179            close_fn(ptr);
180            drop_fn(ptr);
181        }
182
183        let extracted_vec: Vec<_> = self
184            .vec_unready
185            .extract_if(.., |ssnnptr| {
186                unsafe { &(*ssnnptr.non_null_ptr.as_ptr()).name }.as_ref() == name.as_ref()
187            })
188            .collect();
189
190        for ssnnptr in extracted_vec.iter().rev() {
191            let ptr = ssnnptr.non_null_ptr.as_ptr();
192            let drop_fn = unsafe { (*ptr).drop_fn };
193            drop_fn(ptr);
194        }
195    }
196
197    pub(crate) fn close(&mut self) {
198        let vec = mem::take(&mut self.vec_ready);
199        for ssnnptr in vec.into_iter().rev() {
200            let ptr = ssnnptr.non_null_ptr.as_ptr();
201            let close_fn = unsafe { (*ptr).close_fn };
202            let drop_fn = unsafe { (*ptr).drop_fn };
203            close_fn(ptr);
204            drop_fn(ptr);
205        }
206        let vec = mem::take(&mut self.vec_unready);
207        for ssnnptr in vec.into_iter().rev() {
208            let ptr = ssnnptr.non_null_ptr.as_ptr();
209            let drop_fn = unsafe { (*ptr).drop_fn };
210            drop_fn(ptr);
211        }
212    }
213
214    pub(crate) async fn setup_async(&mut self, errors: &mut Vec<(Arc<str>, errs::Err)>) {
215        if self.vec_unready.is_empty() {
216            return;
217        }
218
219        let mut n_done = 0;
220        let mut ag = AsyncGroup::new();
221        for ssnnptr in self.vec_unready.iter() {
222            n_done += 1;
223            let ptr = ssnnptr.non_null_ptr.as_ptr();
224            let setup_fn = unsafe { (*ptr).setup_fn };
225            ag._name = unsafe { (*ptr).name.clone() };
226            if let Err(err) = setup_fn(ptr, &mut ag).await {
227                errors.push((ag._name.clone(), err));
228                break;
229            }
230        }
231        ag.join_and_collect_errors_async(errors).await;
232
233        if errors.is_empty() {
234            self.vec_ready.append(&mut self.vec_unready);
235        } else {
236            for ssnnptr in self.vec_unready[0..n_done].iter().rev() {
237                let ptr = ssnnptr.non_null_ptr.as_ptr();
238                let close_fn = unsafe { (*ptr).close_fn };
239                close_fn(ptr);
240            }
241        }
242    }
243
244    pub(crate) async fn setup_with_order_async(
245        &mut self,
246        names: &[&str],
247        errors: &mut Vec<(Arc<str>, errs::Err)>,
248    ) {
249        if self.vec_unready.is_empty() {
250            return;
251        }
252
253        let mut index_map: HashMap<&str, usize> = HashMap::with_capacity(names.len());
254        // To overwrite later indexed elements with eariler ones when names overlap
255        for (i, nm) in names.iter().rev().enumerate() {
256            index_map.insert(*nm, names.len() - 1 - i);
257        }
258
259        let vec_unready = mem::take(&mut self.vec_unready);
260
261        let mut ordered_vec: Vec<Option<SendSyncNonNull<DataSrcContainer>>> =
262            vec![None; index_map.len()];
263        for ssnnptr in vec_unready.into_iter() {
264            let ptr = ssnnptr.non_null_ptr.as_ptr();
265            let name = unsafe { (*ptr).name.clone() };
266            if let Some(index) = index_map.remove(name.as_ref()) {
267                ordered_vec[index] = Some(ssnnptr);
268            } else {
269                ordered_vec.push(Some(ssnnptr));
270            }
271        }
272
273        let mut n_done = 0;
274        let mut ag = AsyncGroup::new();
275        for ssnnptr_opt in ordered_vec.iter() {
276            n_done += 1;
277            if let Some(ssnnptr) = ssnnptr_opt {
278                let ptr = ssnnptr.non_null_ptr.as_ptr();
279                let setup_fn = unsafe { (*ptr).setup_fn };
280                ag._name = unsafe { (*ptr).name.clone() };
281                if let Err(err) = setup_fn(ptr, &mut ag).await {
282                    errors.push((ag._name.clone(), err));
283                    break;
284                }
285            }
286        }
287        ag.join_and_collect_errors_async(errors).await;
288
289        if errors.is_empty() {
290            for ssnnptr in ordered_vec.into_iter().flatten() {
291                self.vec_ready.push(ssnnptr);
292            }
293        } else {
294            for ssnnptr in ordered_vec[0..n_done].iter().flatten().rev() {
295                let ptr = ssnnptr.non_null_ptr.as_ptr();
296                let close_fn = unsafe { (*ptr).close_fn };
297                close_fn(ptr);
298            }
299            for ssnnptr in ordered_vec.into_iter().flatten() {
300                self.vec_unready.push(ssnnptr);
301            }
302        }
303    }
304
305    pub(crate) fn copy_ds_ready_to_map(&self, index_map: &mut HashMap<Arc<str>, (bool, usize)>) {
306        for (i, ssnnptr) in self.vec_ready.iter().enumerate() {
307            let ptr = ssnnptr.non_null_ptr.as_ptr();
308            let name = unsafe { (*ptr).name.clone() };
309            index_map.insert(name, (self.local, i));
310        }
311    }
312
313    pub(crate) async fn create_data_conn_async<C>(
314        &self,
315        index: usize,
316        name: impl AsRef<str>,
317    ) -> errs::Result<Box<DataConnContainer>>
318    where
319        C: DataConn + 'static,
320    {
321        if let Some(ssnnptr) = self.vec_ready.get(index) {
322            let ptr = ssnnptr.non_null_ptr.as_ptr();
323            let type_id = any::TypeId::of::<C>();
324            let is_fn = unsafe { (*ptr).is_data_conn_fn };
325            let create_data_conn_fn = unsafe { (*ptr).create_data_conn_fn };
326            if !is_fn(type_id) {
327                Err(errs::Err::new(DataSrcError::FailToCastDataConn {
328                    name: name.as_ref().into(),
329                    target_type: any::type_name::<C>(),
330                }))
331            } else {
332                match create_data_conn_fn(ptr).await {
333                    Ok(boxed) => Ok(boxed),
334                    Err(err) => Err(errs::Err::with_source(
335                        DataSrcError::FailToCreateDataConn {
336                            name: name.as_ref().into(),
337                            data_conn_type: any::type_name::<C>(),
338                        },
339                        err,
340                    )),
341                }
342            }
343        } else {
344            Err(errs::Err::new(
345                DataSrcError::NotFoundDataSrcToCreateDataConn {
346                    name: name.as_ref().into(),
347                    data_conn_type: any::type_name::<C>(),
348                },
349            ))
350        }
351    }
352}
353
354impl Drop for DataSrcManager {
355    fn drop(&mut self) {
356        self.close();
357    }
358}
359
360#[cfg(test)]
361mod tests_of_data_src {
362    use super::*;
363    use std::sync::Arc;
364    use tokio::sync::Mutex;
365
366    struct SyncDataConn {}
367    impl SyncDataConn {
368        fn new() -> Self {
369            Self {}
370        }
371    }
372    impl DataConn for SyncDataConn {
373        async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
374            Ok(())
375        }
376        async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {}
377        fn close(&mut self) {}
378    }
379
380    struct AsyncDataConn {}
381    impl AsyncDataConn {
382        fn new() -> Self {
383            Self {}
384        }
385    }
386    impl DataConn for AsyncDataConn {
387        async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
388            Ok(())
389        }
390        async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {}
391        fn close(&mut self) {}
392    }
393
394    struct SyncDataSrc {
395        id: i8,
396        logger: Arc<Mutex<Vec<String>>>,
397        fail_to_setup: bool,
398        fail_to_create_data_conn: bool,
399    }
400    impl SyncDataSrc {
401        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail_to_setup: bool) -> Self {
402            let logger_clone = logger.clone();
403            tokio::spawn(async move {
404                logger_clone
405                    .lock()
406                    .await
407                    .push(format!("SyncDataSrc::new {}", id));
408            });
409            Self {
410                id,
411                logger: logger,
412                fail_to_setup,
413                fail_to_create_data_conn: false,
414            }
415        }
416        fn new_for_fail_to_create_data_conn(id: i8, logger: Arc<Mutex<Vec<String>>>) -> Self {
417            Self {
418                id,
419                logger: logger,
420                fail_to_setup: false,
421                fail_to_create_data_conn: true,
422            }
423        }
424    }
425    impl Drop for SyncDataSrc {
426        fn drop(&mut self) {
427            let logger = self.logger.clone();
428            let id = self.id;
429            tokio::spawn(async move {
430                logger
431                    .lock()
432                    .await
433                    .push(format!("SyncDataSrc::drop {}", id));
434            });
435        }
436    }
437    impl DataSrc<SyncDataConn> for SyncDataSrc {
438        async fn setup_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
439            let fail = self.fail_to_setup;
440            let id = self.id;
441            let logger = self.logger.clone();
442
443            if fail {
444                logger
445                    .lock()
446                    .await
447                    .push(format!("SyncDataSrc::setup {} failed", id));
448                return Err(errs::Err::new("XXX".to_string()));
449            }
450            logger
451                .lock()
452                .await
453                .push(format!("SyncDataSrc::setup {}", id));
454            Ok(())
455        }
456        fn close(&mut self) {
457            let logger = self.logger.clone();
458            let id = self.id;
459            tokio::spawn(async move {
460                logger
461                    .lock()
462                    .await
463                    .push(format!("SyncDataSrc::close {}", id));
464            });
465        }
466        async fn create_data_conn_async(&mut self) -> errs::Result<Box<SyncDataConn>> {
467            let id = self.id;
468            let logger = self.logger.clone();
469            {
470                logger
471                    .lock()
472                    .await
473                    .push(format!("SyncDataSrc::create_data_conn {}", id));
474            }
475            if self.fail_to_create_data_conn {
476                return Err(errs::Err::new("eeee".to_string()));
477            }
478            let conn = SyncDataConn::new();
479            Ok(Box::new(conn))
480        }
481    }
482
483    struct AsyncDataSrc {
484        id: i8,
485        fail: bool,
486        logger: Arc<Mutex<Vec<String>>>,
487        wait: u64,
488    }
489    impl AsyncDataSrc {
490        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: bool, wait: u64) -> Self {
491            let logger_clone = logger.clone();
492            tokio::spawn(async move {
493                logger_clone
494                    .lock()
495                    .await
496                    .push(format!("AsyncDataSrc::new {}", id));
497            });
498            Self {
499                id,
500                fail,
501                logger,
502                wait,
503            }
504        }
505    }
506    impl Drop for AsyncDataSrc {
507        fn drop(&mut self) {
508            let logger = self.logger.clone();
509            let id = self.id;
510            tokio::spawn(async move {
511                logger
512                    .lock()
513                    .await
514                    .push(format!("AsyncDataSrc::drop {}", id));
515            });
516        }
517    }
518    impl DataSrc<AsyncDataConn> for AsyncDataSrc {
519        async fn setup_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
520            let logger = self.logger.clone();
521            let fail = self.fail;
522            let id = self.id;
523            let wait = self.wait;
524
525            ag.add(async move {
526                tokio::time::sleep(std::time::Duration::from_millis(wait)).await;
527                let mut logger = logger.lock().await;
528                if fail {
529                    logger.push(format!("AsyncDataSrc::setup {} failed to setup", id));
530                    return Err(errs::Err::new("XXX".to_string()));
531                }
532                logger.push(format!("AsyncDataSrc::setup {}", id));
533                Ok(())
534            });
535            Ok(())
536        }
537        fn close(&mut self) {
538            let logger = self.logger.clone();
539            let id = self.id;
540            tokio::spawn(async move {
541                logger
542                    .lock()
543                    .await
544                    .push(format!("AsyncDataSrc::close {}", id));
545            });
546        }
547        async fn create_data_conn_async(&mut self) -> errs::Result<Box<AsyncDataConn>> {
548            let logger = self.logger.clone();
549            {
550                logger
551                    .lock()
552                    .await
553                    .push(format!("AsyncDataSrc::create_data_conn {}", self.id));
554            }
555            let conn = AsyncDataConn::new();
556            Ok(Box::new(conn))
557        }
558    }
559
560    #[tokio::test]
561    async fn test_of_new() {
562        let manager = DataSrcManager::new(true);
563        assert!(manager.local);
564        assert_eq!(manager.vec_unready.len(), 0);
565        assert_eq!(manager.vec_ready.len(), 0);
566
567        let manager = DataSrcManager::new(false);
568        assert!(!manager.local);
569        assert_eq!(manager.vec_unready.len(), 0);
570        assert_eq!(manager.vec_ready.len(), 0);
571    }
572
573    #[tokio::test]
574    async fn test_of_prepend() {
575        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
576
577        {
578            let mut vec = Vec::<SendSyncNonNull<DataSrcContainer>>::new();
579
580            let ds = SyncDataSrc::new(1, logger.clone(), false);
581            let boxed = Box::new(DataSrcContainer::new("foo", ds, true));
582            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
583            vec.push(SendSyncNonNull::new(ptr));
584
585            let ds = AsyncDataSrc::new(2, logger.clone(), false, 0);
586            let boxed = Box::new(DataSrcContainer::new("bar", ds, true));
587            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
588            vec.push(SendSyncNonNull::new(ptr));
589
590            let mut manager = DataSrcManager::new(true);
591            manager.prepend(vec);
592
593            assert!(manager.local);
594            assert_eq!(manager.vec_unready.len(), 2);
595            assert_eq!(manager.vec_ready.len(), 0);
596
597            assert_eq!(
598                unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
599                "foo".into()
600            );
601            assert_eq!(
602                unsafe { manager.vec_unready[1].non_null_ptr.as_ref().name.clone() },
603                "bar".into()
604            );
605
606            let mut vec = Vec::<SendSyncNonNull<DataSrcContainer>>::new();
607
608            let ds = SyncDataSrc::new(3, logger.clone(), false);
609            let boxed = Box::new(DataSrcContainer::new("baz", ds, true));
610            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
611            vec.push(SendSyncNonNull::new(ptr));
612
613            let ds = AsyncDataSrc::new(4, logger.clone(), false, 0);
614            let boxed = Box::new(DataSrcContainer::new("qux", ds, true));
615            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
616            vec.push(SendSyncNonNull::new(ptr));
617
618            manager.prepend(vec);
619
620            assert!(manager.local);
621            assert_eq!(manager.vec_unready.len(), 4);
622            assert_eq!(manager.vec_ready.len(), 0);
623
624            assert_eq!(
625                unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
626                "baz".into()
627            );
628            assert_eq!(
629                unsafe { manager.vec_unready[1].non_null_ptr.as_ref().name.clone() },
630                "qux".into()
631            );
632            assert_eq!(
633                unsafe { manager.vec_unready[2].non_null_ptr.as_ref().name.clone() },
634                "foo".into()
635            );
636            assert_eq!(
637                unsafe { manager.vec_unready[3].non_null_ptr.as_ref().name.clone() },
638                "bar".into()
639            );
640        }
641
642        // Give some time for drop to be called
643        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
644
645        let locked = logger.lock().await;
646        assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
647        assert!(locked.contains(&"AsyncDataSrc::new 2".to_string()));
648        assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
649        assert!(locked.contains(&"AsyncDataSrc::new 4".to_string()));
650        assert!(locked.contains(&"AsyncDataSrc::drop 2".to_string()));
651        assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
652        assert!(locked.contains(&"AsyncDataSrc::drop 4".to_string()));
653        assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
654    }
655
656    #[tokio::test]
657    async fn test_of_add() {
658        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
659
660        {
661            let mut manager = DataSrcManager::new(true);
662
663            let ds = SyncDataSrc::new(1, logger.clone(), false);
664            manager.add("foo", ds);
665
666            assert!(manager.local);
667            assert_eq!(manager.vec_unready.len(), 1);
668            assert_eq!(manager.vec_ready.len(), 0);
669
670            assert_eq!(
671                unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
672                "foo".into()
673            );
674
675            let ds = AsyncDataSrc::new(2, logger.clone(), false, 0);
676            manager.add("bar", ds);
677
678            assert!(manager.local);
679            assert_eq!(manager.vec_unready.len(), 2);
680            assert_eq!(manager.vec_ready.len(), 0);
681
682            assert_eq!(
683                unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
684                "foo".into()
685            );
686            assert_eq!(
687                unsafe { manager.vec_unready[1].non_null_ptr.as_ref().name.clone() },
688                "bar".into()
689            );
690        }
691
692        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
693
694        let locked = logger.lock().await;
695        assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
696        assert!(locked.contains(&"AsyncDataSrc::new 2".to_string()));
697        assert!(locked.contains(&"AsyncDataSrc::drop 2".to_string()));
698        assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
699    }
700
701    #[tokio::test]
702    async fn test_of_remove() {
703        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
704
705        {
706            let mut manager = DataSrcManager::new(true);
707
708            let ds1 = SyncDataSrc::new(1, logger.clone(), false);
709            let boxed = Box::new(DataSrcContainer::new("foo", ds1, true));
710            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
711            manager.vec_unready.push(SendSyncNonNull::new(ptr));
712
713            let ds2 = AsyncDataSrc::new(2, logger.clone(), false, 0);
714            let boxed = Box::new(DataSrcContainer::new("bar", ds2, true));
715            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
716            manager.vec_unready.push(SendSyncNonNull::new(ptr));
717
718            let ds3 = SyncDataSrc::new(3, logger.clone(), false);
719            let boxed = Box::new(DataSrcContainer::new("baz", ds3, true));
720            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
721            manager.vec_ready.push(SendSyncNonNull::new(ptr));
722
723            let ds4 = AsyncDataSrc::new(4, logger.clone(), false, 0);
724            let boxed = Box::new(DataSrcContainer::new("qux", ds4, true));
725            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
726            manager.vec_ready.push(SendSyncNonNull::new(ptr));
727
728            assert!(manager.local);
729            assert_eq!(manager.vec_unready.len(), 2);
730            assert_eq!(manager.vec_ready.len(), 2);
731
732            manager.remove("baz");
733            manager.remove("foo");
734            manager.remove("qux");
735            manager.remove("bar");
736        }
737
738        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
739
740        let locked = logger.lock().await;
741        assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
742        assert!(locked.contains(&"AsyncDataSrc::new 2".to_string()));
743        assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
744        assert!(locked.contains(&"AsyncDataSrc::new 4".to_string()));
745        assert!(locked.contains(&"SyncDataSrc::close 3".to_string()));
746        assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
747        assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
748        assert!(locked.contains(&"AsyncDataSrc::close 4".to_string()));
749        assert!(locked.contains(&"AsyncDataSrc::drop 4".to_string()));
750        assert!(locked.contains(&"AsyncDataSrc::drop 2".to_string()));
751    }
752
753    #[tokio::test]
754    async fn test_of_close() {
755        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
756
757        {
758            let mut manager = DataSrcManager::new(true);
759
760            let ds1 = SyncDataSrc::new(1, logger.clone(), false);
761            let boxed = Box::new(DataSrcContainer::new("foo", ds1, true));
762            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
763            manager.vec_unready.push(SendSyncNonNull::new(ptr));
764
765            let ds2 = AsyncDataSrc::new(2, logger.clone(), false, 0);
766            let boxed = Box::new(DataSrcContainer::new("bar", ds2, true));
767            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
768            manager.vec_unready.push(SendSyncNonNull::new(ptr));
769
770            let ds3 = SyncDataSrc::new(3, logger.clone(), false);
771            let boxed = Box::new(DataSrcContainer::new("baz", ds3, true));
772            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
773            manager.vec_ready.push(SendSyncNonNull::new(ptr));
774
775            let ds4 = AsyncDataSrc::new(4, logger.clone(), false, 0);
776            let boxed = Box::new(DataSrcContainer::new("qux", ds4, true));
777            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
778            manager.vec_ready.push(SendSyncNonNull::new(ptr));
779
780            assert!(manager.local);
781            assert_eq!(manager.vec_unready.len(), 2);
782            assert_eq!(manager.vec_ready.len(), 2);
783
784            manager.close();
785        }
786
787        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
788
789        let locked = logger.lock().await;
790        assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
791        assert!(locked.contains(&"AsyncDataSrc::new 2".to_string()));
792        assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
793        assert!(locked.contains(&"AsyncDataSrc::new 4".to_string()));
794        assert!(locked.contains(&"AsyncDataSrc::close 4".to_string()));
795        assert!(locked.contains(&"AsyncDataSrc::drop 4".to_string()));
796        assert!(locked.contains(&"SyncDataSrc::close 3".to_string()));
797        assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
798        assert!(locked.contains(&"AsyncDataSrc::drop 2".to_string()));
799        assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
800    }
801
802    #[tokio::test]
803    async fn test_of_setup_async_and_ok() {
804        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
805
806        {
807            let mut manager = DataSrcManager::new(true);
808
809            let ds1 = SyncDataSrc::new(1, logger.clone(), false);
810            manager.add("foo", ds1);
811
812            let ds2 = SyncDataSrc::new(2, logger.clone(), false);
813            manager.add("bar", ds2);
814
815            assert!(manager.local);
816            assert_eq!(manager.vec_unready.len(), 2);
817            assert_eq!(manager.vec_ready.len(), 0);
818
819            let mut vec = Vec::new();
820            manager.setup_async(&mut vec).await;
821
822            assert!(manager.local);
823            assert_eq!(manager.vec_unready.len(), 0);
824            assert_eq!(manager.vec_ready.len(), 2);
825        }
826
827        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
828
829        let locked = logger.lock().await;
830        assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
831        assert!(locked.contains(&"SyncDataSrc::new 2".to_string()));
832        assert!(locked.contains(&"SyncDataSrc::setup 1".to_string()));
833        assert!(locked.contains(&"SyncDataSrc::setup 2".to_string()));
834        assert!(locked.contains(&"SyncDataSrc::close 2".to_string()));
835        assert!(locked.contains(&"SyncDataSrc::drop 2".to_string()));
836        assert!(locked.contains(&"SyncDataSrc::close 1".to_string()));
837        assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
838    }
839
840    #[tokio::test]
841    async fn test_of_setup_but_error() {
842        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
843
844        {
845            let mut manager = DataSrcManager::new(true);
846
847            let ds1 = SyncDataSrc::new(1, logger.clone(), false);
848            manager.add("foo", ds1);
849
850            let ds2 = SyncDataSrc::new(2, logger.clone(), true);
851            manager.add("bar", ds2);
852
853            let ds3 = SyncDataSrc::new(3, logger.clone(), true);
854            manager.add("bar", ds3);
855
856            assert!(manager.local);
857            assert_eq!(manager.vec_unready.len(), 3);
858            assert_eq!(manager.vec_ready.len(), 0);
859
860            let mut vec = Vec::new();
861            manager.setup_async(&mut vec).await;
862
863            assert!(manager.local);
864            assert_eq!(manager.vec_unready.len(), 3);
865            assert_eq!(manager.vec_ready.len(), 0);
866        }
867
868        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
869
870        let locked = logger.lock().await;
871        assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
872        assert!(locked.contains(&"SyncDataSrc::new 2".to_string()));
873        assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
874        assert!(locked.contains(&"SyncDataSrc::setup 1".to_string()));
875        assert!(locked.contains(&"SyncDataSrc::setup 2 failed".to_string()));
876        assert!(locked.contains(&"SyncDataSrc::close 2".to_string()));
877        assert!(locked.contains(&"SyncDataSrc::close 1".to_string()));
878        assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
879        assert!(locked.contains(&"SyncDataSrc::drop 2".to_string()));
880        assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
881    }
882
883    #[tokio::test]
884    async fn test_of_setup_with_order_and_ok() {
885        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
886
887        {
888            let mut manager = DataSrcManager::new(true);
889
890            let ds1 = SyncDataSrc::new(1, logger.clone(), false);
891            manager.add("foo", ds1);
892
893            let ds2 = SyncDataSrc::new(2, logger.clone(), false);
894            manager.add("bar", ds2);
895
896            let ds3 = SyncDataSrc::new(3, logger.clone(), false);
897            manager.add("baz", ds3);
898
899            assert!(manager.local);
900            assert_eq!(manager.vec_unready.len(), 3);
901            assert_eq!(manager.vec_ready.len(), 0);
902
903            let mut vec = Vec::new();
904            manager
905                .setup_with_order_async(&["baz", "foo"], &mut vec)
906                .await;
907
908            assert!(manager.local);
909            assert_eq!(manager.vec_unready.len(), 0);
910            assert_eq!(manager.vec_ready.len(), 3);
911        }
912
913        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
914
915        let locked = logger.lock().await;
916        assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
917        assert!(locked.contains(&"SyncDataSrc::new 2".to_string()));
918        assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
919        assert!(locked.contains(&"SyncDataSrc::setup 3".to_string()));
920        assert!(locked.contains(&"SyncDataSrc::setup 1".to_string()));
921        assert!(locked.contains(&"SyncDataSrc::setup 2".to_string()));
922        assert!(locked.contains(&"SyncDataSrc::close 2".to_string()));
923        assert!(locked.contains(&"SyncDataSrc::drop 2".to_string()));
924        assert!(locked.contains(&"SyncDataSrc::close 1".to_string()));
925        assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
926        assert!(locked.contains(&"SyncDataSrc::close 3".to_string()));
927        assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
928    }
929
930    #[tokio::test]
931    async fn test_of_setup_with_order_but_fail() {
932        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
933
934        {
935            let mut manager = DataSrcManager::new(true);
936
937            let ds1 = SyncDataSrc::new(1, logger.clone(), true);
938            manager.add("foo", ds1);
939
940            let ds2 = SyncDataSrc::new(2, logger.clone(), true);
941            manager.add("bar", ds2);
942
943            let ds3 = SyncDataSrc::new(3, logger.clone(), false);
944            manager.add("baz", ds3);
945
946            assert!(manager.local);
947            assert_eq!(manager.vec_unready.len(), 3);
948            assert_eq!(manager.vec_ready.len(), 0);
949
950            let mut vec = Vec::new();
951            manager
952                .setup_with_order_async(&["baz", "foo"], &mut vec)
953                .await;
954
955            assert!(manager.local);
956            assert_eq!(manager.vec_unready.len(), 3);
957            assert_eq!(manager.vec_ready.len(), 0);
958        }
959
960        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
961
962        let locked = logger.lock().await;
963        assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
964        assert!(locked.contains(&"SyncDataSrc::new 2".to_string()));
965        assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
966        assert!(locked.contains(&"SyncDataSrc::setup 3".to_string()));
967        assert!(locked.contains(&"SyncDataSrc::setup 1 failed".to_string()));
968        assert!(locked.contains(&"SyncDataSrc::close 1".to_string()));
969        assert!(locked.contains(&"SyncDataSrc::close 3".to_string()));
970        assert!(locked.contains(&"SyncDataSrc::drop 2".to_string()));
971        assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
972        assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
973    }
974
975    #[tokio::test]
976    async fn test_of_setup_with_order_containing_duplicated_name_and_ok() {
977        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
978
979        {
980            let mut manager = DataSrcManager::new(true);
981
982            let ds1 = SyncDataSrc::new(1, logger.clone(), false);
983            manager.add("foo", ds1);
984
985            let ds2 = SyncDataSrc::new(2, logger.clone(), false);
986            manager.add("bar", ds2);
987
988            let ds3 = SyncDataSrc::new(3, logger.clone(), false);
989            manager.add("baz", ds3);
990
991            assert!(manager.local);
992            assert_eq!(manager.vec_unready.len(), 3);
993            assert_eq!(manager.vec_ready.len(), 0);
994
995            let mut vec = Vec::new();
996            manager
997                .setup_with_order_async(&["baz", "foo", "baz"], &mut vec)
998                .await;
999
1000            assert!(manager.local);
1001            assert_eq!(manager.vec_unready.len(), 0);
1002            assert_eq!(manager.vec_ready.len(), 3);
1003        }
1004
1005        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1006
1007        let locked = logger.lock().await;
1008        assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
1009        assert!(locked.contains(&"SyncDataSrc::new 2".to_string()));
1010        assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
1011        assert!(locked.contains(&"SyncDataSrc::setup 3".to_string()));
1012        assert!(locked.contains(&"SyncDataSrc::setup 1".to_string()));
1013        assert!(locked.contains(&"SyncDataSrc::setup 2".to_string()));
1014        assert!(locked.contains(&"SyncDataSrc::close 2".to_string()));
1015        assert!(locked.contains(&"SyncDataSrc::drop 2".to_string()));
1016        assert!(locked.contains(&"SyncDataSrc::close 1".to_string()));
1017        assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
1018        assert!(locked.contains(&"SyncDataSrc::close 3".to_string()));
1019        assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
1020    }
1021
1022    #[tokio::test]
1023    async fn test_of_copy_ds_ready_to_map() {
1024        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1025        let mut errors = Vec::new();
1026
1027        let mut index_map = HashMap::<Arc<str>, (bool, usize)>::new();
1028
1029        let manager = DataSrcManager::new(true);
1030        manager.copy_ds_ready_to_map(&mut index_map);
1031        assert!(index_map.is_empty());
1032
1033        let mut manager = DataSrcManager::new(true);
1034        let ds1 = SyncDataSrc::new(1, logger.clone(), false);
1035        manager.add("foo", ds1);
1036        manager.setup_async(&mut errors).await;
1037        assert!(errors.is_empty());
1038        manager.copy_ds_ready_to_map(&mut index_map);
1039        assert_eq!(index_map.len(), 1);
1040        assert_eq!(index_map.get("foo").unwrap(), &(true, 0));
1041
1042        let mut manager = DataSrcManager::new(false);
1043        let ds2 = AsyncDataSrc::new(2, logger.clone(), false, 0);
1044        let ds3 = SyncDataSrc::new(3, logger.clone(), false);
1045        manager.add("bar", ds2);
1046        manager.add("baz", ds3);
1047        manager.setup_async(&mut errors).await;
1048        assert!(errors.is_empty());
1049        manager.copy_ds_ready_to_map(&mut index_map);
1050        assert_eq!(index_map.len(), 3);
1051        assert_eq!(index_map.get("foo").unwrap(), &(true, 0));
1052        assert_eq!(index_map.get("bar").unwrap(), &(false, 0));
1053        assert_eq!(index_map.get("baz").unwrap(), &(false, 1));
1054    }
1055
1056    #[tokio::test]
1057    async fn test_of_create_data_conn_and_ok() {
1058        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1059        let mut errors = Vec::new();
1060
1061        let mut manager = DataSrcManager::new(true);
1062        let ds1 = SyncDataSrc::new(1, logger.clone(), false);
1063        manager.add("foo", ds1);
1064        manager.setup_async(&mut errors).await;
1065
1066        if let Ok(boxed) = manager
1067            .create_data_conn_async::<SyncDataConn>(0, "foo")
1068            .await
1069        {
1070            assert_eq!(boxed.name.clone(), "foo".into());
1071        } else {
1072            panic!();
1073        }
1074    }
1075
1076    #[tokio::test]
1077    async fn test_of_create_data_conn_but_not_found() {
1078        let mut errors = Vec::new();
1079
1080        let mut manager = DataSrcManager::new(true);
1081        manager.setup_async(&mut errors).await;
1082
1083        if let Err(err) = manager
1084            .create_data_conn_async::<SyncDataConn>(0, "foo")
1085            .await
1086        {
1087            match err.reason::<DataSrcError>() {
1088                Ok(DataSrcError::NotFoundDataSrcToCreateDataConn {
1089                    name,
1090                    data_conn_type,
1091                }) => {
1092                    assert_eq!(*name, "foo".into());
1093                    assert_eq!(
1094                        *data_conn_type,
1095                        "sabi::tokio::data_src::tests_of_data_src::SyncDataConn"
1096                    );
1097                }
1098                _ => panic!(),
1099            }
1100        } else {
1101            panic!();
1102        }
1103    }
1104
1105    #[tokio::test]
1106    async fn test_of_create_data_conn_but_fail_to_cast() {
1107        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1108        let mut errors = Vec::new();
1109
1110        let mut manager = DataSrcManager::new(true);
1111        let ds1 = SyncDataSrc::new(1, logger.clone(), false);
1112        manager.add("foo", ds1);
1113        manager.setup_async(&mut errors).await;
1114
1115        if let Err(err) = manager
1116            .create_data_conn_async::<AsyncDataConn>(0, "foo")
1117            .await
1118        {
1119            match err.reason::<DataSrcError>() {
1120                Ok(DataSrcError::FailToCastDataConn { name, target_type }) => {
1121                    assert_eq!(*name, "foo".into());
1122                    assert_eq!(
1123                        *target_type,
1124                        "sabi::tokio::data_src::tests_of_data_src::AsyncDataConn"
1125                    );
1126                }
1127                _ => panic!(),
1128            }
1129        } else {
1130            panic!();
1131        }
1132    }
1133
1134    #[tokio::test]
1135    async fn test_of_create_data_conn_but_fail_to_create() {
1136        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1137        let mut errors = Vec::new();
1138
1139        let mut manager = DataSrcManager::new(true);
1140        let ds1 = SyncDataSrc::new_for_fail_to_create_data_conn(1, logger.clone());
1141        manager.add("foo", ds1);
1142        manager.setup_async(&mut errors).await;
1143
1144        if let Err(err) = manager
1145            .create_data_conn_async::<SyncDataConn>(0, "foo")
1146            .await
1147        {
1148            match err.reason::<DataSrcError>() {
1149                Ok(DataSrcError::FailToCreateDataConn {
1150                    name,
1151                    data_conn_type,
1152                }) => {
1153                    assert_eq!(*name, "foo".into());
1154                    assert_eq!(
1155                        *data_conn_type,
1156                        "sabi::tokio::data_src::tests_of_data_src::SyncDataConn"
1157                    );
1158                }
1159                _ => panic!(),
1160            }
1161        } else {
1162            panic!();
1163        }
1164    }
1165}