Skip to main content

sabi/tokio/data_src/
mod.rs

1// Copyright (C) 2024-2026 Takayuki Sato. All Rights Reserved.
2// This program is free software under MIT License.
3// See the file LICENSE in this distribution for more details.
4
5mod global_setup;
6
7pub(crate) use global_setup::{
8    copy_global_data_srcs_to_map, create_data_conn_from_global_data_src_async,
9};
10pub use global_setup::{
11    create_static_data_src_container, setup_async, setup_with_order_async, uses, uses_async,
12};
13
14use crate::tokio::{
15    AsyncGroup, DataConn, DataConnContainer, DataSrc, DataSrcContainer, DataSrcManager,
16    SendSyncNonNull,
17};
18
19use std::collections::HashMap;
20use std::future::Future;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::{any, mem, ptr};
24
25/// Represents errors that can occur during data source operations.
26#[derive(Debug)]
27pub enum DataSrcError {
28    /// An error indicating a failure to register a global data source.
29    /// This can happen if the global data source manager is in an invalid state.
30    FailToRegisterGlobalDataSrc {
31        /// The name of the data source that failed to register.
32        name: Arc<str>,
33    },
34
35    /// An error indicating that one or more global data sources failed during their setup process.
36    FailToSetupGlobalDataSrcs {
37        /// A vector of errors, each containing the name of the data source and the error itself.
38        errors: Vec<(Arc<str>, errs::Err)>,
39    },
40
41    /// An error indicating that a global data source setup is currently in progress.
42    DuringSetupGlobalDataSrcs,
43
44    /// An error indicating that global data sources have already been set up.
45    AlreadySetupGlobalDataSrcs,
46
47    /// An error indicating that a data connection could not be cast to the target type.
48    FailToCastDataConn {
49        /// The name of the data source that failed to provide the correct connection type.
50        name: Arc<str>,
51        /// The string representation of the target data connection type that was requested.
52        target_type: &'static str,
53    },
54
55    /// An error indicating that a data connection could not be created by its data source.
56    FailToCreateDataConn {
57        /// The name of the data source that failed to create a data connection.
58        name: Arc<str>,
59        /// The string representation of the data connection type that was requested.
60        data_conn_type: &'static str,
61    },
62
63    /// An error indicating that no data source was found for the requested data connection.
64    NotFoundDataSrcToCreateDataConn {
65        /// The name of the data source that was not found.
66        name: Arc<str>,
67        /// The string representation of the data connection type that was requested.
68        data_conn_type: &'static str,
69    },
70}
71
72impl<S, C> DataSrcContainer<S, C>
73where
74    S: DataSrc<C> + 'static,
75    C: DataConn + 'static,
76{
77    pub(crate) fn new(name: impl Into<Arc<str>>, data_src: S, local: bool) -> Self {
78        Self {
79            drop_fn: drop_data_src::<S, C>,
80            close_fn: close_data_src::<S, C>,
81            is_data_conn_fn: is_data_conn::<C>,
82
83            setup_fn: setup_data_src_async::<S, C>,
84            create_data_conn_fn: create_data_conn_async::<S, C>,
85
86            local,
87            name: name.into(),
88            data_src,
89        }
90    }
91}
92
93fn drop_data_src<S, C>(ptr: *const DataSrcContainer)
94where
95    S: DataSrc<C>,
96    C: DataConn + 'static,
97{
98    let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
99    drop(unsafe { Box::from_raw(typed_ptr) });
100}
101
102fn close_data_src<S, C>(ptr: *const DataSrcContainer)
103where
104    S: DataSrc<C>,
105    C: DataConn + 'static,
106{
107    let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
108    unsafe { (*typed_ptr).data_src.close() };
109}
110
111fn is_data_conn<C>(type_id: any::TypeId) -> bool
112where
113    C: DataConn + 'static,
114{
115    any::TypeId::of::<C>() == type_id
116}
117
118fn setup_data_src_async<S, C>(
119    ptr: *const DataSrcContainer,
120    ag: &mut AsyncGroup,
121) -> Pin<Box<dyn Future<Output = errs::Result<()>> + Send + '_>>
122where
123    S: DataSrc<C> + 'static,
124    C: DataConn + 'static,
125{
126    let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
127    let data_src = unsafe { &mut (*typed_ptr).data_src };
128    Box::pin(data_src.setup_async(ag))
129}
130
131#[allow(clippy::type_complexity)]
132fn create_data_conn_async<'a, S, C>(
133    ptr: *const DataSrcContainer,
134) -> Pin<Box<dyn Future<Output = errs::Result<Box<DataConnContainer<C>>>> + 'a>>
135where
136    S: DataSrc<C> + 'a,
137    C: DataConn + 'static,
138{
139    let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
140    let data_src = unsafe { &mut (*typed_ptr).data_src };
141    let name = unsafe { &(*typed_ptr).name };
142    Box::pin(async move {
143        let conn: Box<C> = data_src.create_data_conn_async().await?;
144        Ok(Box::new(DataConnContainer::<C>::new(
145            name.to_string(),
146            conn,
147        )))
148    })
149}
150
151impl DataSrcManager {
152    pub(crate) const fn new(local: bool) -> Self {
153        Self {
154            vec_unready: Vec::new(),
155            vec_ready: Vec::new(),
156            local,
157        }
158    }
159
160    pub(crate) fn prepend(&mut self, vec: Vec<SendSyncNonNull<DataSrcContainer>>) {
161        self.vec_unready.splice(0..0, vec);
162    }
163
164    pub(crate) fn add<S, C>(&mut self, name: impl Into<Arc<str>>, ds: S)
165    where
166        S: DataSrc<C> + 'static,
167        C: DataConn + 'static,
168    {
169        let boxed = Box::new(DataSrcContainer::<S, C>::new(name, ds, self.local));
170        let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
171        self.vec_unready.push(SendSyncNonNull::new(ptr));
172    }
173
174    pub(crate) fn remove(&mut self, name: impl AsRef<str>) {
175        let extracted_vec: Vec<_> = self
176            .vec_ready
177            .extract_if(.., |ssnnptr| {
178                unsafe { &(*ssnnptr.non_null_ptr.as_ptr()).name }.as_ref() == name.as_ref()
179            })
180            .collect();
181
182        for ssnnptr in extracted_vec.iter().rev() {
183            let ptr = ssnnptr.non_null_ptr.as_ptr();
184            let close_fn = unsafe { (*ptr).close_fn };
185            let drop_fn = unsafe { (*ptr).drop_fn };
186            close_fn(ptr);
187            drop_fn(ptr);
188        }
189
190        let extracted_vec: Vec<_> = self
191            .vec_unready
192            .extract_if(.., |ssnnptr| {
193                unsafe { &(*ssnnptr.non_null_ptr.as_ptr()).name }.as_ref() == name.as_ref()
194            })
195            .collect();
196
197        for ssnnptr in extracted_vec.iter().rev() {
198            let ptr = ssnnptr.non_null_ptr.as_ptr();
199            let drop_fn = unsafe { (*ptr).drop_fn };
200            drop_fn(ptr);
201        }
202    }
203
204    pub(crate) fn close(&mut self) {
205        let vec = mem::take(&mut self.vec_ready);
206        for ssnnptr in vec.into_iter().rev() {
207            let ptr = ssnnptr.non_null_ptr.as_ptr();
208            let close_fn = unsafe { (*ptr).close_fn };
209            let drop_fn = unsafe { (*ptr).drop_fn };
210            close_fn(ptr);
211            drop_fn(ptr);
212        }
213        let vec = mem::take(&mut self.vec_unready);
214        for ssnnptr in vec.into_iter().rev() {
215            let ptr = ssnnptr.non_null_ptr.as_ptr();
216            let drop_fn = unsafe { (*ptr).drop_fn };
217            drop_fn(ptr);
218        }
219    }
220
221    pub(crate) async fn setup_async(&mut self, errors: &mut Vec<(Arc<str>, errs::Err)>) {
222        if self.vec_unready.is_empty() {
223            return;
224        }
225
226        let mut n_done = 0;
227        let mut ag = AsyncGroup::new();
228        for ssnnptr in self.vec_unready.iter() {
229            n_done += 1;
230            let ptr = ssnnptr.non_null_ptr.as_ptr();
231            let setup_fn = unsafe { (*ptr).setup_fn };
232            ag._name = unsafe { (*ptr).name.clone() };
233            if let Err(err) = setup_fn(ptr, &mut ag).await {
234                errors.push((ag._name.clone(), err));
235                break;
236            }
237        }
238        ag.join_and_collect_errors_async(errors).await;
239
240        if errors.is_empty() {
241            self.vec_ready.append(&mut self.vec_unready);
242        } else {
243            for ssnnptr in self.vec_unready[0..n_done].iter().rev() {
244                let ptr = ssnnptr.non_null_ptr.as_ptr();
245                let close_fn = unsafe { (*ptr).close_fn };
246                close_fn(ptr);
247            }
248        }
249    }
250
251    pub(crate) async fn setup_with_order_async(
252        &mut self,
253        names: &[&str],
254        errors: &mut Vec<(Arc<str>, errs::Err)>,
255    ) {
256        if self.vec_unready.is_empty() {
257            return;
258        }
259
260        let mut index_map: HashMap<&str, usize> = HashMap::with_capacity(names.len());
261        // To overwrite later indexed elements with eariler ones when names overlap
262        for (i, nm) in names.iter().rev().enumerate() {
263            index_map.insert(*nm, names.len() - 1 - i);
264        }
265
266        let vec_unready = mem::take(&mut self.vec_unready);
267
268        let mut ordered_vec: Vec<Option<SendSyncNonNull<DataSrcContainer>>> =
269            vec![None; index_map.len()];
270        for ssnnptr in vec_unready.into_iter() {
271            let ptr = ssnnptr.non_null_ptr.as_ptr();
272            let name = unsafe { (*ptr).name.clone() };
273            if let Some(index) = index_map.remove(name.as_ref()) {
274                ordered_vec[index] = Some(ssnnptr);
275            } else {
276                ordered_vec.push(Some(ssnnptr));
277            }
278        }
279
280        let mut n_done = 0;
281        let mut ag = AsyncGroup::new();
282        for ssnnptr_opt in ordered_vec.iter() {
283            n_done += 1;
284            if let Some(ssnnptr) = ssnnptr_opt {
285                let ptr = ssnnptr.non_null_ptr.as_ptr();
286                let setup_fn = unsafe { (*ptr).setup_fn };
287                ag._name = unsafe { (*ptr).name.clone() };
288                if let Err(err) = setup_fn(ptr, &mut ag).await {
289                    errors.push((ag._name.clone(), err));
290                    break;
291                }
292            }
293        }
294        ag.join_and_collect_errors_async(errors).await;
295
296        if errors.is_empty() {
297            for ssnnptr in ordered_vec.into_iter().flatten() {
298                self.vec_ready.push(ssnnptr);
299            }
300        } else {
301            for ssnnptr in ordered_vec[0..n_done].iter().flatten().rev() {
302                let ptr = ssnnptr.non_null_ptr.as_ptr();
303                let close_fn = unsafe { (*ptr).close_fn };
304                close_fn(ptr);
305            }
306            for ssnnptr in ordered_vec.into_iter().flatten() {
307                self.vec_unready.push(ssnnptr);
308            }
309        }
310    }
311
312    pub(crate) fn copy_ds_ready_to_map(&self, index_map: &mut HashMap<Arc<str>, (bool, usize)>) {
313        for (i, ssnnptr) in self.vec_ready.iter().enumerate() {
314            let ptr = ssnnptr.non_null_ptr.as_ptr();
315            let name = unsafe { (*ptr).name.clone() };
316            index_map.insert(name, (self.local, i));
317        }
318    }
319
320    pub(crate) async fn create_data_conn_async<C>(
321        &self,
322        index: usize,
323        name: impl AsRef<str>,
324    ) -> errs::Result<Box<DataConnContainer>>
325    where
326        C: DataConn + 'static,
327    {
328        if let Some(ssnnptr) = self.vec_ready.get(index) {
329            let ptr = ssnnptr.non_null_ptr.as_ptr();
330            let type_id = any::TypeId::of::<C>();
331            let is_fn = unsafe { (*ptr).is_data_conn_fn };
332            let create_data_conn_fn = unsafe { (*ptr).create_data_conn_fn };
333            if !is_fn(type_id) {
334                Err(errs::Err::new(DataSrcError::FailToCastDataConn {
335                    name: name.as_ref().into(),
336                    target_type: any::type_name::<C>(),
337                }))
338            } else {
339                match create_data_conn_fn(ptr).await {
340                    Ok(boxed) => Ok(boxed),
341                    Err(err) => Err(errs::Err::with_source(
342                        DataSrcError::FailToCreateDataConn {
343                            name: name.as_ref().into(),
344                            data_conn_type: any::type_name::<C>(),
345                        },
346                        err,
347                    )),
348                }
349            }
350        } else {
351            Err(errs::Err::new(
352                DataSrcError::NotFoundDataSrcToCreateDataConn {
353                    name: name.as_ref().into(),
354                    data_conn_type: any::type_name::<C>(),
355                },
356            ))
357        }
358    }
359}
360
361impl Drop for DataSrcManager {
362    fn drop(&mut self) {
363        self.close();
364    }
365}
366
367#[cfg(test)]
368mod tests_of_data_src {
369    use super::*;
370    use std::sync::Arc;
371    use tokio::sync::Mutex;
372
373    struct SyncDataConn {}
374    impl SyncDataConn {
375        fn new() -> Self {
376            Self {}
377        }
378    }
379    impl DataConn for SyncDataConn {
380        async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
381            Ok(())
382        }
383        async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {}
384        fn close(&mut self) {}
385    }
386
387    struct AsyncDataConn {}
388    impl AsyncDataConn {
389        fn new() -> Self {
390            Self {}
391        }
392    }
393    impl DataConn for AsyncDataConn {
394        async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
395            Ok(())
396        }
397        async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {}
398        fn close(&mut self) {}
399    }
400
401    struct SyncDataSrc {
402        id: i8,
403        logger: Arc<Mutex<Vec<String>>>,
404        fail_to_setup: bool,
405        fail_to_create_data_conn: bool,
406    }
407    impl SyncDataSrc {
408        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail_to_setup: bool) -> Self {
409            let logger_clone = logger.clone();
410            tokio::spawn(async move {
411                logger_clone
412                    .lock()
413                    .await
414                    .push(format!("SyncDataSrc::new {}", id));
415            });
416            Self {
417                id,
418                logger: logger,
419                fail_to_setup,
420                fail_to_create_data_conn: false,
421            }
422        }
423        fn new_for_fail_to_create_data_conn(id: i8, logger: Arc<Mutex<Vec<String>>>) -> Self {
424            Self {
425                id,
426                logger: logger,
427                fail_to_setup: false,
428                fail_to_create_data_conn: true,
429            }
430        }
431    }
432    impl Drop for SyncDataSrc {
433        fn drop(&mut self) {
434            let logger = self.logger.clone();
435            let id = self.id;
436            tokio::spawn(async move {
437                logger
438                    .lock()
439                    .await
440                    .push(format!("SyncDataSrc::drop {}", id));
441            });
442        }
443    }
444    impl DataSrc<SyncDataConn> for SyncDataSrc {
445        async fn setup_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
446            let fail = self.fail_to_setup;
447            let id = self.id;
448            let logger = self.logger.clone();
449
450            if fail {
451                logger
452                    .lock()
453                    .await
454                    .push(format!("SyncDataSrc::setup {} failed", id));
455                return Err(errs::Err::new("XXX".to_string()));
456            }
457            logger
458                .lock()
459                .await
460                .push(format!("SyncDataSrc::setup {}", id));
461            Ok(())
462        }
463        fn close(&mut self) {
464            let logger = self.logger.clone();
465            let id = self.id;
466            tokio::spawn(async move {
467                logger
468                    .lock()
469                    .await
470                    .push(format!("SyncDataSrc::close {}", id));
471            });
472        }
473        async fn create_data_conn_async(&mut self) -> errs::Result<Box<SyncDataConn>> {
474            let id = self.id;
475            let logger = self.logger.clone();
476            {
477                logger
478                    .lock()
479                    .await
480                    .push(format!("SyncDataSrc::create_data_conn {}", id));
481            }
482            if self.fail_to_create_data_conn {
483                return Err(errs::Err::new("eeee".to_string()));
484            }
485            let conn = SyncDataConn::new();
486            Ok(Box::new(conn))
487        }
488    }
489
490    struct AsyncDataSrc {
491        id: i8,
492        fail: bool,
493        logger: Arc<Mutex<Vec<String>>>,
494        wait: u64,
495    }
496    impl AsyncDataSrc {
497        fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: bool, wait: u64) -> Self {
498            let logger_clone = logger.clone();
499            tokio::spawn(async move {
500                logger_clone
501                    .lock()
502                    .await
503                    .push(format!("AsyncDataSrc::new {}", id));
504            });
505            Self {
506                id,
507                fail,
508                logger,
509                wait,
510            }
511        }
512    }
513    impl Drop for AsyncDataSrc {
514        fn drop(&mut self) {
515            let logger = self.logger.clone();
516            let id = self.id;
517            tokio::spawn(async move {
518                logger
519                    .lock()
520                    .await
521                    .push(format!("AsyncDataSrc::drop {}", id));
522            });
523        }
524    }
525    impl DataSrc<AsyncDataConn> for AsyncDataSrc {
526        async fn setup_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
527            let logger = self.logger.clone();
528            let fail = self.fail;
529            let id = self.id;
530            let wait = self.wait;
531
532            ag.add(async move {
533                tokio::time::sleep(std::time::Duration::from_millis(wait)).await;
534                let mut logger = logger.lock().await;
535                if fail {
536                    logger.push(format!("AsyncDataSrc::setup {} failed to setup", id));
537                    return Err(errs::Err::new("XXX".to_string()));
538                }
539                logger.push(format!("AsyncDataSrc::setup {}", id));
540                Ok(())
541            });
542            Ok(())
543        }
544        fn close(&mut self) {
545            let logger = self.logger.clone();
546            let id = self.id;
547            tokio::spawn(async move {
548                logger
549                    .lock()
550                    .await
551                    .push(format!("AsyncDataSrc::close {}", id));
552            });
553        }
554        async fn create_data_conn_async(&mut self) -> errs::Result<Box<AsyncDataConn>> {
555            let logger = self.logger.clone();
556            {
557                logger
558                    .lock()
559                    .await
560                    .push(format!("AsyncDataSrc::create_data_conn {}", self.id));
561            }
562            let conn = AsyncDataConn::new();
563            Ok(Box::new(conn))
564        }
565    }
566
567    #[tokio::test]
568    async fn test_of_new() {
569        let manager = DataSrcManager::new(true);
570        assert!(manager.local);
571        assert_eq!(manager.vec_unready.len(), 0);
572        assert_eq!(manager.vec_ready.len(), 0);
573
574        let manager = DataSrcManager::new(false);
575        assert!(!manager.local);
576        assert_eq!(manager.vec_unready.len(), 0);
577        assert_eq!(manager.vec_ready.len(), 0);
578    }
579
580    #[tokio::test]
581    async fn test_of_prepend() {
582        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
583
584        {
585            let mut vec = Vec::<SendSyncNonNull<DataSrcContainer>>::new();
586
587            let ds = SyncDataSrc::new(1, logger.clone(), false);
588            let boxed = Box::new(DataSrcContainer::new("foo", ds, true));
589            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
590            vec.push(SendSyncNonNull::new(ptr));
591
592            let ds = AsyncDataSrc::new(2, logger.clone(), false, 0);
593            let boxed = Box::new(DataSrcContainer::new("bar", ds, true));
594            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
595            vec.push(SendSyncNonNull::new(ptr));
596
597            let mut manager = DataSrcManager::new(true);
598            manager.prepend(vec);
599
600            assert!(manager.local);
601            assert_eq!(manager.vec_unready.len(), 2);
602            assert_eq!(manager.vec_ready.len(), 0);
603
604            assert_eq!(
605                unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
606                "foo".into()
607            );
608            assert_eq!(
609                unsafe { manager.vec_unready[1].non_null_ptr.as_ref().name.clone() },
610                "bar".into()
611            );
612
613            let mut vec = Vec::<SendSyncNonNull<DataSrcContainer>>::new();
614
615            let ds = SyncDataSrc::new(3, logger.clone(), false);
616            let boxed = Box::new(DataSrcContainer::new("baz", ds, true));
617            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
618            vec.push(SendSyncNonNull::new(ptr));
619
620            let ds = AsyncDataSrc::new(4, logger.clone(), false, 0);
621            let boxed = Box::new(DataSrcContainer::new("qux", ds, true));
622            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
623            vec.push(SendSyncNonNull::new(ptr));
624
625            manager.prepend(vec);
626
627            assert!(manager.local);
628            assert_eq!(manager.vec_unready.len(), 4);
629            assert_eq!(manager.vec_ready.len(), 0);
630
631            assert_eq!(
632                unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
633                "baz".into()
634            );
635            assert_eq!(
636                unsafe { manager.vec_unready[1].non_null_ptr.as_ref().name.clone() },
637                "qux".into()
638            );
639            assert_eq!(
640                unsafe { manager.vec_unready[2].non_null_ptr.as_ref().name.clone() },
641                "foo".into()
642            );
643            assert_eq!(
644                unsafe { manager.vec_unready[3].non_null_ptr.as_ref().name.clone() },
645                "bar".into()
646            );
647        }
648
649        // Give some time for drop to be called
650        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
651
652        let locked = logger.lock().await;
653        assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
654        assert!(locked.contains(&"AsyncDataSrc::new 2".to_string()));
655        assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
656        assert!(locked.contains(&"AsyncDataSrc::new 4".to_string()));
657        assert!(locked.contains(&"AsyncDataSrc::drop 2".to_string()));
658        assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
659        assert!(locked.contains(&"AsyncDataSrc::drop 4".to_string()));
660        assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
661    }
662
663    #[tokio::test]
664    async fn test_of_add() {
665        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
666
667        {
668            let mut manager = DataSrcManager::new(true);
669
670            let ds = SyncDataSrc::new(1, logger.clone(), false);
671            manager.add("foo", ds);
672
673            assert!(manager.local);
674            assert_eq!(manager.vec_unready.len(), 1);
675            assert_eq!(manager.vec_ready.len(), 0);
676
677            assert_eq!(
678                unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
679                "foo".into()
680            );
681
682            let ds = AsyncDataSrc::new(2, logger.clone(), false, 0);
683            manager.add("bar", ds);
684
685            assert!(manager.local);
686            assert_eq!(manager.vec_unready.len(), 2);
687            assert_eq!(manager.vec_ready.len(), 0);
688
689            assert_eq!(
690                unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
691                "foo".into()
692            );
693            assert_eq!(
694                unsafe { manager.vec_unready[1].non_null_ptr.as_ref().name.clone() },
695                "bar".into()
696            );
697        }
698
699        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
700
701        let locked = logger.lock().await;
702        assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
703        assert!(locked.contains(&"AsyncDataSrc::new 2".to_string()));
704        assert!(locked.contains(&"AsyncDataSrc::drop 2".to_string()));
705        assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
706    }
707
708    #[tokio::test]
709    async fn test_of_remove() {
710        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
711
712        {
713            let mut manager = DataSrcManager::new(true);
714
715            let ds1 = SyncDataSrc::new(1, logger.clone(), false);
716            let boxed = Box::new(DataSrcContainer::new("foo", ds1, true));
717            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
718            manager.vec_unready.push(SendSyncNonNull::new(ptr));
719
720            let ds2 = AsyncDataSrc::new(2, logger.clone(), false, 0);
721            let boxed = Box::new(DataSrcContainer::new("bar", ds2, true));
722            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
723            manager.vec_unready.push(SendSyncNonNull::new(ptr));
724
725            let ds3 = SyncDataSrc::new(3, logger.clone(), false);
726            let boxed = Box::new(DataSrcContainer::new("baz", ds3, true));
727            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
728            manager.vec_ready.push(SendSyncNonNull::new(ptr));
729
730            let ds4 = AsyncDataSrc::new(4, logger.clone(), false, 0);
731            let boxed = Box::new(DataSrcContainer::new("qux", ds4, true));
732            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
733            manager.vec_ready.push(SendSyncNonNull::new(ptr));
734
735            assert!(manager.local);
736            assert_eq!(manager.vec_unready.len(), 2);
737            assert_eq!(manager.vec_ready.len(), 2);
738
739            manager.remove("baz");
740            manager.remove("foo");
741            manager.remove("qux");
742            manager.remove("bar");
743        }
744
745        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
746
747        let locked = logger.lock().await;
748        assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
749        assert!(locked.contains(&"AsyncDataSrc::new 2".to_string()));
750        assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
751        assert!(locked.contains(&"AsyncDataSrc::new 4".to_string()));
752        assert!(locked.contains(&"SyncDataSrc::close 3".to_string()));
753        assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
754        assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
755        assert!(locked.contains(&"AsyncDataSrc::close 4".to_string()));
756        assert!(locked.contains(&"AsyncDataSrc::drop 4".to_string()));
757        assert!(locked.contains(&"AsyncDataSrc::drop 2".to_string()));
758    }
759
760    #[tokio::test]
761    async fn test_of_close() {
762        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
763
764        {
765            let mut manager = DataSrcManager::new(true);
766
767            let ds1 = SyncDataSrc::new(1, logger.clone(), false);
768            let boxed = Box::new(DataSrcContainer::new("foo", ds1, true));
769            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
770            manager.vec_unready.push(SendSyncNonNull::new(ptr));
771
772            let ds2 = AsyncDataSrc::new(2, logger.clone(), false, 0);
773            let boxed = Box::new(DataSrcContainer::new("bar", ds2, true));
774            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
775            manager.vec_unready.push(SendSyncNonNull::new(ptr));
776
777            let ds3 = SyncDataSrc::new(3, logger.clone(), false);
778            let boxed = Box::new(DataSrcContainer::new("baz", ds3, true));
779            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
780            manager.vec_ready.push(SendSyncNonNull::new(ptr));
781
782            let ds4 = AsyncDataSrc::new(4, logger.clone(), false, 0);
783            let boxed = Box::new(DataSrcContainer::new("qux", ds4, true));
784            let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
785            manager.vec_ready.push(SendSyncNonNull::new(ptr));
786
787            assert!(manager.local);
788            assert_eq!(manager.vec_unready.len(), 2);
789            assert_eq!(manager.vec_ready.len(), 2);
790
791            manager.close();
792        }
793
794        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
795
796        let locked = logger.lock().await;
797        assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
798        assert!(locked.contains(&"AsyncDataSrc::new 2".to_string()));
799        assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
800        assert!(locked.contains(&"AsyncDataSrc::new 4".to_string()));
801        assert!(locked.contains(&"AsyncDataSrc::close 4".to_string()));
802        assert!(locked.contains(&"AsyncDataSrc::drop 4".to_string()));
803        assert!(locked.contains(&"SyncDataSrc::close 3".to_string()));
804        assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
805        assert!(locked.contains(&"AsyncDataSrc::drop 2".to_string()));
806        assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
807    }
808
809    #[tokio::test]
810    async fn test_of_setup_async_and_ok() {
811        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
812
813        {
814            let mut manager = DataSrcManager::new(true);
815
816            let ds1 = SyncDataSrc::new(1, logger.clone(), false);
817            manager.add("foo", ds1);
818
819            let ds2 = SyncDataSrc::new(2, logger.clone(), false);
820            manager.add("bar", ds2);
821
822            assert!(manager.local);
823            assert_eq!(manager.vec_unready.len(), 2);
824            assert_eq!(manager.vec_ready.len(), 0);
825
826            let mut vec = Vec::new();
827            manager.setup_async(&mut vec).await;
828
829            assert!(manager.local);
830            assert_eq!(manager.vec_unready.len(), 0);
831            assert_eq!(manager.vec_ready.len(), 2);
832        }
833
834        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
835
836        let locked = logger.lock().await;
837        assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
838        assert!(locked.contains(&"SyncDataSrc::new 2".to_string()));
839        assert!(locked.contains(&"SyncDataSrc::setup 1".to_string()));
840        assert!(locked.contains(&"SyncDataSrc::setup 2".to_string()));
841        assert!(locked.contains(&"SyncDataSrc::close 2".to_string()));
842        assert!(locked.contains(&"SyncDataSrc::drop 2".to_string()));
843        assert!(locked.contains(&"SyncDataSrc::close 1".to_string()));
844        assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
845    }
846
847    #[tokio::test]
848    async fn test_of_setup_but_error() {
849        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
850
851        {
852            let mut manager = DataSrcManager::new(true);
853
854            let ds1 = SyncDataSrc::new(1, logger.clone(), false);
855            manager.add("foo", ds1);
856
857            let ds2 = SyncDataSrc::new(2, logger.clone(), true);
858            manager.add("bar", ds2);
859
860            let ds3 = SyncDataSrc::new(3, logger.clone(), true);
861            manager.add("bar", ds3);
862
863            assert!(manager.local);
864            assert_eq!(manager.vec_unready.len(), 3);
865            assert_eq!(manager.vec_ready.len(), 0);
866
867            let mut vec = Vec::new();
868            manager.setup_async(&mut vec).await;
869
870            assert!(manager.local);
871            assert_eq!(manager.vec_unready.len(), 3);
872            assert_eq!(manager.vec_ready.len(), 0);
873        }
874
875        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
876
877        let locked = logger.lock().await;
878        assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
879        assert!(locked.contains(&"SyncDataSrc::new 2".to_string()));
880        assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
881        assert!(locked.contains(&"SyncDataSrc::setup 1".to_string()));
882        assert!(locked.contains(&"SyncDataSrc::setup 2 failed".to_string()));
883        assert!(locked.contains(&"SyncDataSrc::close 2".to_string()));
884        assert!(locked.contains(&"SyncDataSrc::close 1".to_string()));
885        assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
886        assert!(locked.contains(&"SyncDataSrc::drop 2".to_string()));
887        assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
888    }
889
890    #[tokio::test]
891    async fn test_of_setup_with_order_and_ok() {
892        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
893
894        {
895            let mut manager = DataSrcManager::new(true);
896
897            let ds1 = SyncDataSrc::new(1, logger.clone(), false);
898            manager.add("foo", ds1);
899
900            let ds2 = SyncDataSrc::new(2, logger.clone(), false);
901            manager.add("bar", ds2);
902
903            let ds3 = SyncDataSrc::new(3, logger.clone(), false);
904            manager.add("baz", ds3);
905
906            assert!(manager.local);
907            assert_eq!(manager.vec_unready.len(), 3);
908            assert_eq!(manager.vec_ready.len(), 0);
909
910            let mut vec = Vec::new();
911            manager
912                .setup_with_order_async(&["baz", "foo"], &mut vec)
913                .await;
914
915            assert!(manager.local);
916            assert_eq!(manager.vec_unready.len(), 0);
917            assert_eq!(manager.vec_ready.len(), 3);
918        }
919
920        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
921
922        let locked = logger.lock().await;
923        assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
924        assert!(locked.contains(&"SyncDataSrc::new 2".to_string()));
925        assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
926        assert!(locked.contains(&"SyncDataSrc::setup 3".to_string()));
927        assert!(locked.contains(&"SyncDataSrc::setup 1".to_string()));
928        assert!(locked.contains(&"SyncDataSrc::setup 2".to_string()));
929        assert!(locked.contains(&"SyncDataSrc::close 2".to_string()));
930        assert!(locked.contains(&"SyncDataSrc::drop 2".to_string()));
931        assert!(locked.contains(&"SyncDataSrc::close 1".to_string()));
932        assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
933        assert!(locked.contains(&"SyncDataSrc::close 3".to_string()));
934        assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
935    }
936
937    #[tokio::test]
938    async fn test_of_setup_with_order_but_fail() {
939        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
940
941        {
942            let mut manager = DataSrcManager::new(true);
943
944            let ds1 = SyncDataSrc::new(1, logger.clone(), true);
945            manager.add("foo", ds1);
946
947            let ds2 = SyncDataSrc::new(2, logger.clone(), true);
948            manager.add("bar", ds2);
949
950            let ds3 = SyncDataSrc::new(3, logger.clone(), false);
951            manager.add("baz", ds3);
952
953            assert!(manager.local);
954            assert_eq!(manager.vec_unready.len(), 3);
955            assert_eq!(manager.vec_ready.len(), 0);
956
957            let mut vec = Vec::new();
958            manager
959                .setup_with_order_async(&["baz", "foo"], &mut vec)
960                .await;
961
962            assert!(manager.local);
963            assert_eq!(manager.vec_unready.len(), 3);
964            assert_eq!(manager.vec_ready.len(), 0);
965        }
966
967        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
968
969        let locked = logger.lock().await;
970        assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
971        assert!(locked.contains(&"SyncDataSrc::new 2".to_string()));
972        assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
973        assert!(locked.contains(&"SyncDataSrc::setup 3".to_string()));
974        assert!(locked.contains(&"SyncDataSrc::setup 1 failed".to_string()));
975        assert!(locked.contains(&"SyncDataSrc::close 1".to_string()));
976        assert!(locked.contains(&"SyncDataSrc::close 3".to_string()));
977        assert!(locked.contains(&"SyncDataSrc::drop 2".to_string()));
978        assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
979        assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
980    }
981
982    #[tokio::test]
983    async fn test_of_setup_with_order_containing_duplicated_name_and_ok() {
984        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
985
986        {
987            let mut manager = DataSrcManager::new(true);
988
989            let ds1 = SyncDataSrc::new(1, logger.clone(), false);
990            manager.add("foo", ds1);
991
992            let ds2 = SyncDataSrc::new(2, logger.clone(), false);
993            manager.add("bar", ds2);
994
995            let ds3 = SyncDataSrc::new(3, logger.clone(), false);
996            manager.add("baz", ds3);
997
998            assert!(manager.local);
999            assert_eq!(manager.vec_unready.len(), 3);
1000            assert_eq!(manager.vec_ready.len(), 0);
1001
1002            let mut vec = Vec::new();
1003            manager
1004                .setup_with_order_async(&["baz", "foo", "baz"], &mut vec)
1005                .await;
1006
1007            assert!(manager.local);
1008            assert_eq!(manager.vec_unready.len(), 0);
1009            assert_eq!(manager.vec_ready.len(), 3);
1010        }
1011
1012        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1013
1014        let locked = logger.lock().await;
1015        assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
1016        assert!(locked.contains(&"SyncDataSrc::new 2".to_string()));
1017        assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
1018        assert!(locked.contains(&"SyncDataSrc::setup 3".to_string()));
1019        assert!(locked.contains(&"SyncDataSrc::setup 1".to_string()));
1020        assert!(locked.contains(&"SyncDataSrc::setup 2".to_string()));
1021        assert!(locked.contains(&"SyncDataSrc::close 2".to_string()));
1022        assert!(locked.contains(&"SyncDataSrc::drop 2".to_string()));
1023        assert!(locked.contains(&"SyncDataSrc::close 1".to_string()));
1024        assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
1025        assert!(locked.contains(&"SyncDataSrc::close 3".to_string()));
1026        assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
1027    }
1028
1029    #[tokio::test]
1030    async fn test_of_copy_ds_ready_to_map() {
1031        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1032        let mut errors = Vec::new();
1033
1034        let mut index_map = HashMap::<Arc<str>, (bool, usize)>::new();
1035
1036        let manager = DataSrcManager::new(true);
1037        manager.copy_ds_ready_to_map(&mut index_map);
1038        assert!(index_map.is_empty());
1039
1040        let mut manager = DataSrcManager::new(true);
1041        let ds1 = SyncDataSrc::new(1, logger.clone(), false);
1042        manager.add("foo", ds1);
1043        manager.setup_async(&mut errors).await;
1044        assert!(errors.is_empty());
1045        manager.copy_ds_ready_to_map(&mut index_map);
1046        assert_eq!(index_map.len(), 1);
1047        assert_eq!(index_map.get("foo").unwrap(), &(true, 0));
1048
1049        let mut manager = DataSrcManager::new(false);
1050        let ds2 = AsyncDataSrc::new(2, logger.clone(), false, 0);
1051        let ds3 = SyncDataSrc::new(3, logger.clone(), false);
1052        manager.add("bar", ds2);
1053        manager.add("baz", ds3);
1054        manager.setup_async(&mut errors).await;
1055        assert!(errors.is_empty());
1056        manager.copy_ds_ready_to_map(&mut index_map);
1057        assert_eq!(index_map.len(), 3);
1058        assert_eq!(index_map.get("foo").unwrap(), &(true, 0));
1059        assert_eq!(index_map.get("bar").unwrap(), &(false, 0));
1060        assert_eq!(index_map.get("baz").unwrap(), &(false, 1));
1061    }
1062
1063    #[tokio::test]
1064    async fn test_of_create_data_conn_and_ok() {
1065        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1066        let mut errors = Vec::new();
1067
1068        let mut manager = DataSrcManager::new(true);
1069        let ds1 = SyncDataSrc::new(1, logger.clone(), false);
1070        manager.add("foo", ds1);
1071        manager.setup_async(&mut errors).await;
1072
1073        if let Ok(boxed) = manager
1074            .create_data_conn_async::<SyncDataConn>(0, "foo")
1075            .await
1076        {
1077            assert_eq!(boxed.name.clone(), "foo".into());
1078        } else {
1079            panic!();
1080        }
1081    }
1082
1083    #[tokio::test]
1084    async fn test_of_create_data_conn_but_not_found() {
1085        let mut errors = Vec::new();
1086
1087        let mut manager = DataSrcManager::new(true);
1088        manager.setup_async(&mut errors).await;
1089
1090        if let Err(err) = manager
1091            .create_data_conn_async::<SyncDataConn>(0, "foo")
1092            .await
1093        {
1094            match err.reason::<DataSrcError>() {
1095                Ok(DataSrcError::NotFoundDataSrcToCreateDataConn {
1096                    name,
1097                    data_conn_type,
1098                }) => {
1099                    assert_eq!(*name, "foo".into());
1100                    assert_eq!(
1101                        *data_conn_type,
1102                        "sabi::tokio::data_src::tests_of_data_src::SyncDataConn"
1103                    );
1104                }
1105                _ => panic!(),
1106            }
1107        } else {
1108            panic!();
1109        }
1110    }
1111
1112    #[tokio::test]
1113    async fn test_of_create_data_conn_but_fail_to_cast() {
1114        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1115        let mut errors = Vec::new();
1116
1117        let mut manager = DataSrcManager::new(true);
1118        let ds1 = SyncDataSrc::new(1, logger.clone(), false);
1119        manager.add("foo", ds1);
1120        manager.setup_async(&mut errors).await;
1121
1122        if let Err(err) = manager
1123            .create_data_conn_async::<AsyncDataConn>(0, "foo")
1124            .await
1125        {
1126            match err.reason::<DataSrcError>() {
1127                Ok(DataSrcError::FailToCastDataConn { name, target_type }) => {
1128                    assert_eq!(*name, "foo".into());
1129                    assert_eq!(
1130                        *target_type,
1131                        "sabi::tokio::data_src::tests_of_data_src::AsyncDataConn"
1132                    );
1133                }
1134                _ => panic!(),
1135            }
1136        } else {
1137            panic!();
1138        }
1139    }
1140
1141    #[tokio::test]
1142    async fn test_of_create_data_conn_but_fail_to_create() {
1143        let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1144        let mut errors = Vec::new();
1145
1146        let mut manager = DataSrcManager::new(true);
1147        let ds1 = SyncDataSrc::new_for_fail_to_create_data_conn(1, logger.clone());
1148        manager.add("foo", ds1);
1149        manager.setup_async(&mut errors).await;
1150
1151        if let Err(err) = manager
1152            .create_data_conn_async::<SyncDataConn>(0, "foo")
1153            .await
1154        {
1155            match err.reason::<DataSrcError>() {
1156                Ok(DataSrcError::FailToCreateDataConn {
1157                    name,
1158                    data_conn_type,
1159                }) => {
1160                    assert_eq!(*name, "foo".into());
1161                    assert_eq!(
1162                        *data_conn_type,
1163                        "sabi::tokio::data_src::tests_of_data_src::SyncDataConn"
1164                    );
1165                }
1166                _ => panic!(),
1167            }
1168        } else {
1169            panic!();
1170        }
1171    }
1172}