1mod global_setup;
6
7pub(crate) use global_setup::{
8 copy_global_data_srcs_to_map, create_data_conn_from_global_data_src_async,
9};
10pub use global_setup::{
11 create_static_data_src_container, setup_async, setup_with_order_async, uses, uses_async,
12};
13
14use crate::tokio::{
15 AsyncGroup, DataConn, DataConnContainer, DataSrc, DataSrcContainer, DataSrcManager,
16 SendSyncNonNull,
17};
18
19use std::collections::HashMap;
20use std::future::Future;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::{any, mem, ptr};
24
25#[derive(Debug)]
27pub enum DataSrcError {
28 FailToRegisterGlobalDataSrc {
31 name: Arc<str>,
33 },
34
35 FailToSetupGlobalDataSrcs {
37 errors: Vec<(Arc<str>, errs::Err)>,
39 },
40
41 DuringSetupGlobalDataSrcs,
43
44 AlreadySetupGlobalDataSrcs,
46
47 FailToCastDataConn {
49 name: Arc<str>,
51 target_type: &'static str,
53 },
54
55 FailToCreateDataConn {
57 name: Arc<str>,
59 data_conn_type: &'static str,
61 },
62
63 NotFoundDataSrcToCreateDataConn {
65 name: Arc<str>,
67 data_conn_type: &'static str,
69 },
70}
71
72impl<S, C> DataSrcContainer<S, C>
73where
74 S: DataSrc<C> + 'static,
75 C: DataConn + 'static,
76{
77 pub(crate) fn new(name: impl Into<Arc<str>>, data_src: S, local: bool) -> Self {
78 Self {
79 drop_fn: drop_data_src::<S, C>,
80 close_fn: close_data_src::<S, C>,
81 is_data_conn_fn: is_data_conn::<C>,
82
83 setup_fn: setup_data_src_async::<S, C>,
84 create_data_conn_fn: create_data_conn_async::<S, C>,
85
86 local,
87 name: name.into(),
88 data_src,
89 }
90 }
91}
92
93fn drop_data_src<S, C>(ptr: *const DataSrcContainer)
94where
95 S: DataSrc<C>,
96 C: DataConn + 'static,
97{
98 let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
99 drop(unsafe { Box::from_raw(typed_ptr) });
100}
101
102fn close_data_src<S, C>(ptr: *const DataSrcContainer)
103where
104 S: DataSrc<C>,
105 C: DataConn + 'static,
106{
107 let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
108 unsafe { (*typed_ptr).data_src.close() };
109}
110
111fn is_data_conn<C>(type_id: any::TypeId) -> bool
112where
113 C: DataConn + 'static,
114{
115 any::TypeId::of::<C>() == type_id
116}
117
118fn setup_data_src_async<S, C>(
119 ptr: *const DataSrcContainer,
120 ag: &mut AsyncGroup,
121) -> Pin<Box<dyn Future<Output = errs::Result<()>> + Send + '_>>
122where
123 S: DataSrc<C> + 'static,
124 C: DataConn + 'static,
125{
126 let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
127 let data_src = unsafe { &mut (*typed_ptr).data_src };
128 Box::pin(data_src.setup_async(ag))
129}
130
131#[allow(clippy::type_complexity)]
132fn create_data_conn_async<'a, S, C>(
133 ptr: *const DataSrcContainer,
134) -> Pin<Box<dyn Future<Output = errs::Result<Box<DataConnContainer<C>>>> + 'a>>
135where
136 S: DataSrc<C> + 'a,
137 C: DataConn + 'static,
138{
139 let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
140 let data_src = unsafe { &mut (*typed_ptr).data_src };
141 let name = unsafe { &(*typed_ptr).name };
142 Box::pin(async move {
143 let conn: Box<C> = data_src.create_data_conn_async().await?;
144 Ok(Box::new(DataConnContainer::<C>::new(
145 name.to_string(),
146 conn,
147 )))
148 })
149}
150
151impl DataSrcManager {
152 pub(crate) const fn new(local: bool) -> Self {
153 Self {
154 vec_unready: Vec::new(),
155 vec_ready: Vec::new(),
156 local,
157 }
158 }
159
160 pub(crate) fn prepend(&mut self, vec: Vec<SendSyncNonNull<DataSrcContainer>>) {
161 self.vec_unready.splice(0..0, vec);
162 }
163
164 pub(crate) fn add<S, C>(&mut self, name: impl Into<Arc<str>>, ds: S)
165 where
166 S: DataSrc<C> + 'static,
167 C: DataConn + 'static,
168 {
169 let boxed = Box::new(DataSrcContainer::<S, C>::new(name, ds, self.local));
170 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
171 self.vec_unready.push(SendSyncNonNull::new(ptr));
172 }
173
174 pub(crate) fn remove(&mut self, name: impl AsRef<str>) {
175 let extracted_vec: Vec<_> = self
176 .vec_ready
177 .extract_if(.., |ssnnptr| {
178 unsafe { &(*ssnnptr.non_null_ptr.as_ptr()).name }.as_ref() == name.as_ref()
179 })
180 .collect();
181
182 for ssnnptr in extracted_vec.iter().rev() {
183 let ptr = ssnnptr.non_null_ptr.as_ptr();
184 let close_fn = unsafe { (*ptr).close_fn };
185 let drop_fn = unsafe { (*ptr).drop_fn };
186 close_fn(ptr);
187 drop_fn(ptr);
188 }
189
190 let extracted_vec: Vec<_> = self
191 .vec_unready
192 .extract_if(.., |ssnnptr| {
193 unsafe { &(*ssnnptr.non_null_ptr.as_ptr()).name }.as_ref() == name.as_ref()
194 })
195 .collect();
196
197 for ssnnptr in extracted_vec.iter().rev() {
198 let ptr = ssnnptr.non_null_ptr.as_ptr();
199 let drop_fn = unsafe { (*ptr).drop_fn };
200 drop_fn(ptr);
201 }
202 }
203
204 pub(crate) fn close(&mut self) {
205 let vec = mem::take(&mut self.vec_ready);
206 for ssnnptr in vec.into_iter().rev() {
207 let ptr = ssnnptr.non_null_ptr.as_ptr();
208 let close_fn = unsafe { (*ptr).close_fn };
209 let drop_fn = unsafe { (*ptr).drop_fn };
210 close_fn(ptr);
211 drop_fn(ptr);
212 }
213 let vec = mem::take(&mut self.vec_unready);
214 for ssnnptr in vec.into_iter().rev() {
215 let ptr = ssnnptr.non_null_ptr.as_ptr();
216 let drop_fn = unsafe { (*ptr).drop_fn };
217 drop_fn(ptr);
218 }
219 }
220
221 pub(crate) async fn setup_async(&mut self, errors: &mut Vec<(Arc<str>, errs::Err)>) {
222 if self.vec_unready.is_empty() {
223 return;
224 }
225
226 let mut n_done = 0;
227 let mut ag = AsyncGroup::new();
228 for ssnnptr in self.vec_unready.iter() {
229 n_done += 1;
230 let ptr = ssnnptr.non_null_ptr.as_ptr();
231 let setup_fn = unsafe { (*ptr).setup_fn };
232 ag._name = unsafe { (*ptr).name.clone() };
233 if let Err(err) = setup_fn(ptr, &mut ag).await {
234 errors.push((ag._name.clone(), err));
235 break;
236 }
237 }
238 ag.join_and_collect_errors_async(errors).await;
239
240 if errors.is_empty() {
241 self.vec_ready.append(&mut self.vec_unready);
242 } else {
243 for ssnnptr in self.vec_unready[0..n_done].iter().rev() {
244 let ptr = ssnnptr.non_null_ptr.as_ptr();
245 let close_fn = unsafe { (*ptr).close_fn };
246 close_fn(ptr);
247 }
248 }
249 }
250
251 pub(crate) async fn setup_with_order_async(
252 &mut self,
253 names: &[&str],
254 errors: &mut Vec<(Arc<str>, errs::Err)>,
255 ) {
256 if self.vec_unready.is_empty() {
257 return;
258 }
259
260 let mut index_map: HashMap<&str, usize> = HashMap::with_capacity(names.len());
261 for (i, nm) in names.iter().rev().enumerate() {
263 index_map.insert(*nm, names.len() - 1 - i);
264 }
265
266 let vec_unready = mem::take(&mut self.vec_unready);
267
268 let mut ordered_vec: Vec<Option<SendSyncNonNull<DataSrcContainer>>> =
269 vec![None; index_map.len()];
270 for ssnnptr in vec_unready.into_iter() {
271 let ptr = ssnnptr.non_null_ptr.as_ptr();
272 let name = unsafe { (*ptr).name.clone() };
273 if let Some(index) = index_map.remove(name.as_ref()) {
274 ordered_vec[index] = Some(ssnnptr);
275 } else {
276 ordered_vec.push(Some(ssnnptr));
277 }
278 }
279
280 let mut n_done = 0;
281 let mut ag = AsyncGroup::new();
282 for ssnnptr_opt in ordered_vec.iter() {
283 n_done += 1;
284 if let Some(ssnnptr) = ssnnptr_opt {
285 let ptr = ssnnptr.non_null_ptr.as_ptr();
286 let setup_fn = unsafe { (*ptr).setup_fn };
287 ag._name = unsafe { (*ptr).name.clone() };
288 if let Err(err) = setup_fn(ptr, &mut ag).await {
289 errors.push((ag._name.clone(), err));
290 break;
291 }
292 }
293 }
294 ag.join_and_collect_errors_async(errors).await;
295
296 if errors.is_empty() {
297 for ssnnptr in ordered_vec.into_iter().flatten() {
298 self.vec_ready.push(ssnnptr);
299 }
300 } else {
301 for ssnnptr in ordered_vec[0..n_done].iter().flatten().rev() {
302 let ptr = ssnnptr.non_null_ptr.as_ptr();
303 let close_fn = unsafe { (*ptr).close_fn };
304 close_fn(ptr);
305 }
306 for ssnnptr in ordered_vec.into_iter().flatten() {
307 self.vec_unready.push(ssnnptr);
308 }
309 }
310 }
311
312 pub(crate) fn copy_ds_ready_to_map(&self, index_map: &mut HashMap<Arc<str>, (bool, usize)>) {
313 for (i, ssnnptr) in self.vec_ready.iter().enumerate() {
314 let ptr = ssnnptr.non_null_ptr.as_ptr();
315 let name = unsafe { (*ptr).name.clone() };
316 index_map.insert(name, (self.local, i));
317 }
318 }
319
320 pub(crate) async fn create_data_conn_async<C>(
321 &self,
322 index: usize,
323 name: impl AsRef<str>,
324 ) -> errs::Result<Box<DataConnContainer>>
325 where
326 C: DataConn + 'static,
327 {
328 if let Some(ssnnptr) = self.vec_ready.get(index) {
329 let ptr = ssnnptr.non_null_ptr.as_ptr();
330 let type_id = any::TypeId::of::<C>();
331 let is_fn = unsafe { (*ptr).is_data_conn_fn };
332 let create_data_conn_fn = unsafe { (*ptr).create_data_conn_fn };
333 if !is_fn(type_id) {
334 Err(errs::Err::new(DataSrcError::FailToCastDataConn {
335 name: name.as_ref().into(),
336 target_type: any::type_name::<C>(),
337 }))
338 } else {
339 match create_data_conn_fn(ptr).await {
340 Ok(boxed) => Ok(boxed),
341 Err(err) => Err(errs::Err::with_source(
342 DataSrcError::FailToCreateDataConn {
343 name: name.as_ref().into(),
344 data_conn_type: any::type_name::<C>(),
345 },
346 err,
347 )),
348 }
349 }
350 } else {
351 Err(errs::Err::new(
352 DataSrcError::NotFoundDataSrcToCreateDataConn {
353 name: name.as_ref().into(),
354 data_conn_type: any::type_name::<C>(),
355 },
356 ))
357 }
358 }
359}
360
361impl Drop for DataSrcManager {
362 fn drop(&mut self) {
363 self.close();
364 }
365}
366
367#[cfg(test)]
368mod tests_of_data_src {
369 use super::*;
370 use std::sync::Arc;
371 use tokio::sync::Mutex;
372
373 struct SyncDataConn {}
374 impl SyncDataConn {
375 fn new() -> Self {
376 Self {}
377 }
378 }
379 impl DataConn for SyncDataConn {
380 async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
381 Ok(())
382 }
383 async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {}
384 fn close(&mut self) {}
385 }
386
387 struct AsyncDataConn {}
388 impl AsyncDataConn {
389 fn new() -> Self {
390 Self {}
391 }
392 }
393 impl DataConn for AsyncDataConn {
394 async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
395 Ok(())
396 }
397 async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {}
398 fn close(&mut self) {}
399 }
400
401 struct SyncDataSrc {
402 id: i8,
403 logger: Arc<Mutex<Vec<String>>>,
404 fail_to_setup: bool,
405 fail_to_create_data_conn: bool,
406 }
407 impl SyncDataSrc {
408 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail_to_setup: bool) -> Self {
409 let logger_clone = logger.clone();
410 tokio::spawn(async move {
411 logger_clone
412 .lock()
413 .await
414 .push(format!("SyncDataSrc::new {}", id));
415 });
416 Self {
417 id,
418 logger: logger,
419 fail_to_setup,
420 fail_to_create_data_conn: false,
421 }
422 }
423 fn new_for_fail_to_create_data_conn(id: i8, logger: Arc<Mutex<Vec<String>>>) -> Self {
424 Self {
425 id,
426 logger: logger,
427 fail_to_setup: false,
428 fail_to_create_data_conn: true,
429 }
430 }
431 }
432 impl Drop for SyncDataSrc {
433 fn drop(&mut self) {
434 let logger = self.logger.clone();
435 let id = self.id;
436 tokio::spawn(async move {
437 logger
438 .lock()
439 .await
440 .push(format!("SyncDataSrc::drop {}", id));
441 });
442 }
443 }
444 impl DataSrc<SyncDataConn> for SyncDataSrc {
445 async fn setup_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
446 let fail = self.fail_to_setup;
447 let id = self.id;
448 let logger = self.logger.clone();
449
450 if fail {
451 logger
452 .lock()
453 .await
454 .push(format!("SyncDataSrc::setup {} failed", id));
455 return Err(errs::Err::new("XXX".to_string()));
456 }
457 logger
458 .lock()
459 .await
460 .push(format!("SyncDataSrc::setup {}", id));
461 Ok(())
462 }
463 fn close(&mut self) {
464 let logger = self.logger.clone();
465 let id = self.id;
466 tokio::spawn(async move {
467 logger
468 .lock()
469 .await
470 .push(format!("SyncDataSrc::close {}", id));
471 });
472 }
473 async fn create_data_conn_async(&mut self) -> errs::Result<Box<SyncDataConn>> {
474 let id = self.id;
475 let logger = self.logger.clone();
476 {
477 logger
478 .lock()
479 .await
480 .push(format!("SyncDataSrc::create_data_conn {}", id));
481 }
482 if self.fail_to_create_data_conn {
483 return Err(errs::Err::new("eeee".to_string()));
484 }
485 let conn = SyncDataConn::new();
486 Ok(Box::new(conn))
487 }
488 }
489
490 struct AsyncDataSrc {
491 id: i8,
492 fail: bool,
493 logger: Arc<Mutex<Vec<String>>>,
494 wait: u64,
495 }
496 impl AsyncDataSrc {
497 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: bool, wait: u64) -> Self {
498 let logger_clone = logger.clone();
499 tokio::spawn(async move {
500 logger_clone
501 .lock()
502 .await
503 .push(format!("AsyncDataSrc::new {}", id));
504 });
505 Self {
506 id,
507 fail,
508 logger,
509 wait,
510 }
511 }
512 }
513 impl Drop for AsyncDataSrc {
514 fn drop(&mut self) {
515 let logger = self.logger.clone();
516 let id = self.id;
517 tokio::spawn(async move {
518 logger
519 .lock()
520 .await
521 .push(format!("AsyncDataSrc::drop {}", id));
522 });
523 }
524 }
525 impl DataSrc<AsyncDataConn> for AsyncDataSrc {
526 async fn setup_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
527 let logger = self.logger.clone();
528 let fail = self.fail;
529 let id = self.id;
530 let wait = self.wait;
531
532 ag.add(async move {
533 tokio::time::sleep(std::time::Duration::from_millis(wait)).await;
534 let mut logger = logger.lock().await;
535 if fail {
536 logger.push(format!("AsyncDataSrc::setup {} failed to setup", id));
537 return Err(errs::Err::new("XXX".to_string()));
538 }
539 logger.push(format!("AsyncDataSrc::setup {}", id));
540 Ok(())
541 });
542 Ok(())
543 }
544 fn close(&mut self) {
545 let logger = self.logger.clone();
546 let id = self.id;
547 tokio::spawn(async move {
548 logger
549 .lock()
550 .await
551 .push(format!("AsyncDataSrc::close {}", id));
552 });
553 }
554 async fn create_data_conn_async(&mut self) -> errs::Result<Box<AsyncDataConn>> {
555 let logger = self.logger.clone();
556 {
557 logger
558 .lock()
559 .await
560 .push(format!("AsyncDataSrc::create_data_conn {}", self.id));
561 }
562 let conn = AsyncDataConn::new();
563 Ok(Box::new(conn))
564 }
565 }
566
567 #[tokio::test]
568 async fn test_of_new() {
569 let manager = DataSrcManager::new(true);
570 assert!(manager.local);
571 assert_eq!(manager.vec_unready.len(), 0);
572 assert_eq!(manager.vec_ready.len(), 0);
573
574 let manager = DataSrcManager::new(false);
575 assert!(!manager.local);
576 assert_eq!(manager.vec_unready.len(), 0);
577 assert_eq!(manager.vec_ready.len(), 0);
578 }
579
580 #[tokio::test]
581 async fn test_of_prepend() {
582 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
583
584 {
585 let mut vec = Vec::<SendSyncNonNull<DataSrcContainer>>::new();
586
587 let ds = SyncDataSrc::new(1, logger.clone(), false);
588 let boxed = Box::new(DataSrcContainer::new("foo", ds, true));
589 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
590 vec.push(SendSyncNonNull::new(ptr));
591
592 let ds = AsyncDataSrc::new(2, logger.clone(), false, 0);
593 let boxed = Box::new(DataSrcContainer::new("bar", ds, true));
594 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
595 vec.push(SendSyncNonNull::new(ptr));
596
597 let mut manager = DataSrcManager::new(true);
598 manager.prepend(vec);
599
600 assert!(manager.local);
601 assert_eq!(manager.vec_unready.len(), 2);
602 assert_eq!(manager.vec_ready.len(), 0);
603
604 assert_eq!(
605 unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
606 "foo".into()
607 );
608 assert_eq!(
609 unsafe { manager.vec_unready[1].non_null_ptr.as_ref().name.clone() },
610 "bar".into()
611 );
612
613 let mut vec = Vec::<SendSyncNonNull<DataSrcContainer>>::new();
614
615 let ds = SyncDataSrc::new(3, logger.clone(), false);
616 let boxed = Box::new(DataSrcContainer::new("baz", ds, true));
617 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
618 vec.push(SendSyncNonNull::new(ptr));
619
620 let ds = AsyncDataSrc::new(4, logger.clone(), false, 0);
621 let boxed = Box::new(DataSrcContainer::new("qux", ds, true));
622 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
623 vec.push(SendSyncNonNull::new(ptr));
624
625 manager.prepend(vec);
626
627 assert!(manager.local);
628 assert_eq!(manager.vec_unready.len(), 4);
629 assert_eq!(manager.vec_ready.len(), 0);
630
631 assert_eq!(
632 unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
633 "baz".into()
634 );
635 assert_eq!(
636 unsafe { manager.vec_unready[1].non_null_ptr.as_ref().name.clone() },
637 "qux".into()
638 );
639 assert_eq!(
640 unsafe { manager.vec_unready[2].non_null_ptr.as_ref().name.clone() },
641 "foo".into()
642 );
643 assert_eq!(
644 unsafe { manager.vec_unready[3].non_null_ptr.as_ref().name.clone() },
645 "bar".into()
646 );
647 }
648
649 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
651
652 let locked = logger.lock().await;
653 assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
654 assert!(locked.contains(&"AsyncDataSrc::new 2".to_string()));
655 assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
656 assert!(locked.contains(&"AsyncDataSrc::new 4".to_string()));
657 assert!(locked.contains(&"AsyncDataSrc::drop 2".to_string()));
658 assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
659 assert!(locked.contains(&"AsyncDataSrc::drop 4".to_string()));
660 assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
661 }
662
663 #[tokio::test]
664 async fn test_of_add() {
665 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
666
667 {
668 let mut manager = DataSrcManager::new(true);
669
670 let ds = SyncDataSrc::new(1, logger.clone(), false);
671 manager.add("foo", ds);
672
673 assert!(manager.local);
674 assert_eq!(manager.vec_unready.len(), 1);
675 assert_eq!(manager.vec_ready.len(), 0);
676
677 assert_eq!(
678 unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
679 "foo".into()
680 );
681
682 let ds = AsyncDataSrc::new(2, logger.clone(), false, 0);
683 manager.add("bar", ds);
684
685 assert!(manager.local);
686 assert_eq!(manager.vec_unready.len(), 2);
687 assert_eq!(manager.vec_ready.len(), 0);
688
689 assert_eq!(
690 unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
691 "foo".into()
692 );
693 assert_eq!(
694 unsafe { manager.vec_unready[1].non_null_ptr.as_ref().name.clone() },
695 "bar".into()
696 );
697 }
698
699 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
700
701 let locked = logger.lock().await;
702 assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
703 assert!(locked.contains(&"AsyncDataSrc::new 2".to_string()));
704 assert!(locked.contains(&"AsyncDataSrc::drop 2".to_string()));
705 assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
706 }
707
708 #[tokio::test]
709 async fn test_of_remove() {
710 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
711
712 {
713 let mut manager = DataSrcManager::new(true);
714
715 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
716 let boxed = Box::new(DataSrcContainer::new("foo", ds1, true));
717 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
718 manager.vec_unready.push(SendSyncNonNull::new(ptr));
719
720 let ds2 = AsyncDataSrc::new(2, logger.clone(), false, 0);
721 let boxed = Box::new(DataSrcContainer::new("bar", ds2, true));
722 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
723 manager.vec_unready.push(SendSyncNonNull::new(ptr));
724
725 let ds3 = SyncDataSrc::new(3, logger.clone(), false);
726 let boxed = Box::new(DataSrcContainer::new("baz", ds3, true));
727 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
728 manager.vec_ready.push(SendSyncNonNull::new(ptr));
729
730 let ds4 = AsyncDataSrc::new(4, logger.clone(), false, 0);
731 let boxed = Box::new(DataSrcContainer::new("qux", ds4, true));
732 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
733 manager.vec_ready.push(SendSyncNonNull::new(ptr));
734
735 assert!(manager.local);
736 assert_eq!(manager.vec_unready.len(), 2);
737 assert_eq!(manager.vec_ready.len(), 2);
738
739 manager.remove("baz");
740 manager.remove("foo");
741 manager.remove("qux");
742 manager.remove("bar");
743 }
744
745 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
746
747 let locked = logger.lock().await;
748 assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
749 assert!(locked.contains(&"AsyncDataSrc::new 2".to_string()));
750 assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
751 assert!(locked.contains(&"AsyncDataSrc::new 4".to_string()));
752 assert!(locked.contains(&"SyncDataSrc::close 3".to_string()));
753 assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
754 assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
755 assert!(locked.contains(&"AsyncDataSrc::close 4".to_string()));
756 assert!(locked.contains(&"AsyncDataSrc::drop 4".to_string()));
757 assert!(locked.contains(&"AsyncDataSrc::drop 2".to_string()));
758 }
759
760 #[tokio::test]
761 async fn test_of_close() {
762 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
763
764 {
765 let mut manager = DataSrcManager::new(true);
766
767 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
768 let boxed = Box::new(DataSrcContainer::new("foo", ds1, true));
769 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
770 manager.vec_unready.push(SendSyncNonNull::new(ptr));
771
772 let ds2 = AsyncDataSrc::new(2, logger.clone(), false, 0);
773 let boxed = Box::new(DataSrcContainer::new("bar", ds2, true));
774 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
775 manager.vec_unready.push(SendSyncNonNull::new(ptr));
776
777 let ds3 = SyncDataSrc::new(3, logger.clone(), false);
778 let boxed = Box::new(DataSrcContainer::new("baz", ds3, true));
779 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
780 manager.vec_ready.push(SendSyncNonNull::new(ptr));
781
782 let ds4 = AsyncDataSrc::new(4, logger.clone(), false, 0);
783 let boxed = Box::new(DataSrcContainer::new("qux", ds4, true));
784 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
785 manager.vec_ready.push(SendSyncNonNull::new(ptr));
786
787 assert!(manager.local);
788 assert_eq!(manager.vec_unready.len(), 2);
789 assert_eq!(manager.vec_ready.len(), 2);
790
791 manager.close();
792 }
793
794 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
795
796 let locked = logger.lock().await;
797 assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
798 assert!(locked.contains(&"AsyncDataSrc::new 2".to_string()));
799 assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
800 assert!(locked.contains(&"AsyncDataSrc::new 4".to_string()));
801 assert!(locked.contains(&"AsyncDataSrc::close 4".to_string()));
802 assert!(locked.contains(&"AsyncDataSrc::drop 4".to_string()));
803 assert!(locked.contains(&"SyncDataSrc::close 3".to_string()));
804 assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
805 assert!(locked.contains(&"AsyncDataSrc::drop 2".to_string()));
806 assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
807 }
808
809 #[tokio::test]
810 async fn test_of_setup_async_and_ok() {
811 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
812
813 {
814 let mut manager = DataSrcManager::new(true);
815
816 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
817 manager.add("foo", ds1);
818
819 let ds2 = SyncDataSrc::new(2, logger.clone(), false);
820 manager.add("bar", ds2);
821
822 assert!(manager.local);
823 assert_eq!(manager.vec_unready.len(), 2);
824 assert_eq!(manager.vec_ready.len(), 0);
825
826 let mut vec = Vec::new();
827 manager.setup_async(&mut vec).await;
828
829 assert!(manager.local);
830 assert_eq!(manager.vec_unready.len(), 0);
831 assert_eq!(manager.vec_ready.len(), 2);
832 }
833
834 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
835
836 let locked = logger.lock().await;
837 assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
838 assert!(locked.contains(&"SyncDataSrc::new 2".to_string()));
839 assert!(locked.contains(&"SyncDataSrc::setup 1".to_string()));
840 assert!(locked.contains(&"SyncDataSrc::setup 2".to_string()));
841 assert!(locked.contains(&"SyncDataSrc::close 2".to_string()));
842 assert!(locked.contains(&"SyncDataSrc::drop 2".to_string()));
843 assert!(locked.contains(&"SyncDataSrc::close 1".to_string()));
844 assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
845 }
846
847 #[tokio::test]
848 async fn test_of_setup_but_error() {
849 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
850
851 {
852 let mut manager = DataSrcManager::new(true);
853
854 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
855 manager.add("foo", ds1);
856
857 let ds2 = SyncDataSrc::new(2, logger.clone(), true);
858 manager.add("bar", ds2);
859
860 let ds3 = SyncDataSrc::new(3, logger.clone(), true);
861 manager.add("bar", ds3);
862
863 assert!(manager.local);
864 assert_eq!(manager.vec_unready.len(), 3);
865 assert_eq!(manager.vec_ready.len(), 0);
866
867 let mut vec = Vec::new();
868 manager.setup_async(&mut vec).await;
869
870 assert!(manager.local);
871 assert_eq!(manager.vec_unready.len(), 3);
872 assert_eq!(manager.vec_ready.len(), 0);
873 }
874
875 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
876
877 let locked = logger.lock().await;
878 assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
879 assert!(locked.contains(&"SyncDataSrc::new 2".to_string()));
880 assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
881 assert!(locked.contains(&"SyncDataSrc::setup 1".to_string()));
882 assert!(locked.contains(&"SyncDataSrc::setup 2 failed".to_string()));
883 assert!(locked.contains(&"SyncDataSrc::close 2".to_string()));
884 assert!(locked.contains(&"SyncDataSrc::close 1".to_string()));
885 assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
886 assert!(locked.contains(&"SyncDataSrc::drop 2".to_string()));
887 assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
888 }
889
890 #[tokio::test]
891 async fn test_of_setup_with_order_and_ok() {
892 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
893
894 {
895 let mut manager = DataSrcManager::new(true);
896
897 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
898 manager.add("foo", ds1);
899
900 let ds2 = SyncDataSrc::new(2, logger.clone(), false);
901 manager.add("bar", ds2);
902
903 let ds3 = SyncDataSrc::new(3, logger.clone(), false);
904 manager.add("baz", ds3);
905
906 assert!(manager.local);
907 assert_eq!(manager.vec_unready.len(), 3);
908 assert_eq!(manager.vec_ready.len(), 0);
909
910 let mut vec = Vec::new();
911 manager
912 .setup_with_order_async(&["baz", "foo"], &mut vec)
913 .await;
914
915 assert!(manager.local);
916 assert_eq!(manager.vec_unready.len(), 0);
917 assert_eq!(manager.vec_ready.len(), 3);
918 }
919
920 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
921
922 let locked = logger.lock().await;
923 assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
924 assert!(locked.contains(&"SyncDataSrc::new 2".to_string()));
925 assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
926 assert!(locked.contains(&"SyncDataSrc::setup 3".to_string()));
927 assert!(locked.contains(&"SyncDataSrc::setup 1".to_string()));
928 assert!(locked.contains(&"SyncDataSrc::setup 2".to_string()));
929 assert!(locked.contains(&"SyncDataSrc::close 2".to_string()));
930 assert!(locked.contains(&"SyncDataSrc::drop 2".to_string()));
931 assert!(locked.contains(&"SyncDataSrc::close 1".to_string()));
932 assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
933 assert!(locked.contains(&"SyncDataSrc::close 3".to_string()));
934 assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
935 }
936
937 #[tokio::test]
938 async fn test_of_setup_with_order_but_fail() {
939 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
940
941 {
942 let mut manager = DataSrcManager::new(true);
943
944 let ds1 = SyncDataSrc::new(1, logger.clone(), true);
945 manager.add("foo", ds1);
946
947 let ds2 = SyncDataSrc::new(2, logger.clone(), true);
948 manager.add("bar", ds2);
949
950 let ds3 = SyncDataSrc::new(3, logger.clone(), false);
951 manager.add("baz", ds3);
952
953 assert!(manager.local);
954 assert_eq!(manager.vec_unready.len(), 3);
955 assert_eq!(manager.vec_ready.len(), 0);
956
957 let mut vec = Vec::new();
958 manager
959 .setup_with_order_async(&["baz", "foo"], &mut vec)
960 .await;
961
962 assert!(manager.local);
963 assert_eq!(manager.vec_unready.len(), 3);
964 assert_eq!(manager.vec_ready.len(), 0);
965 }
966
967 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
968
969 let locked = logger.lock().await;
970 assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
971 assert!(locked.contains(&"SyncDataSrc::new 2".to_string()));
972 assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
973 assert!(locked.contains(&"SyncDataSrc::setup 3".to_string()));
974 assert!(locked.contains(&"SyncDataSrc::setup 1 failed".to_string()));
975 assert!(locked.contains(&"SyncDataSrc::close 1".to_string()));
976 assert!(locked.contains(&"SyncDataSrc::close 3".to_string()));
977 assert!(locked.contains(&"SyncDataSrc::drop 2".to_string()));
978 assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
979 assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
980 }
981
982 #[tokio::test]
983 async fn test_of_setup_with_order_containing_duplicated_name_and_ok() {
984 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
985
986 {
987 let mut manager = DataSrcManager::new(true);
988
989 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
990 manager.add("foo", ds1);
991
992 let ds2 = SyncDataSrc::new(2, logger.clone(), false);
993 manager.add("bar", ds2);
994
995 let ds3 = SyncDataSrc::new(3, logger.clone(), false);
996 manager.add("baz", ds3);
997
998 assert!(manager.local);
999 assert_eq!(manager.vec_unready.len(), 3);
1000 assert_eq!(manager.vec_ready.len(), 0);
1001
1002 let mut vec = Vec::new();
1003 manager
1004 .setup_with_order_async(&["baz", "foo", "baz"], &mut vec)
1005 .await;
1006
1007 assert!(manager.local);
1008 assert_eq!(manager.vec_unready.len(), 0);
1009 assert_eq!(manager.vec_ready.len(), 3);
1010 }
1011
1012 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1013
1014 let locked = logger.lock().await;
1015 assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
1016 assert!(locked.contains(&"SyncDataSrc::new 2".to_string()));
1017 assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
1018 assert!(locked.contains(&"SyncDataSrc::setup 3".to_string()));
1019 assert!(locked.contains(&"SyncDataSrc::setup 1".to_string()));
1020 assert!(locked.contains(&"SyncDataSrc::setup 2".to_string()));
1021 assert!(locked.contains(&"SyncDataSrc::close 2".to_string()));
1022 assert!(locked.contains(&"SyncDataSrc::drop 2".to_string()));
1023 assert!(locked.contains(&"SyncDataSrc::close 1".to_string()));
1024 assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
1025 assert!(locked.contains(&"SyncDataSrc::close 3".to_string()));
1026 assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
1027 }
1028
1029 #[tokio::test]
1030 async fn test_of_copy_ds_ready_to_map() {
1031 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1032 let mut errors = Vec::new();
1033
1034 let mut index_map = HashMap::<Arc<str>, (bool, usize)>::new();
1035
1036 let manager = DataSrcManager::new(true);
1037 manager.copy_ds_ready_to_map(&mut index_map);
1038 assert!(index_map.is_empty());
1039
1040 let mut manager = DataSrcManager::new(true);
1041 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
1042 manager.add("foo", ds1);
1043 manager.setup_async(&mut errors).await;
1044 assert!(errors.is_empty());
1045 manager.copy_ds_ready_to_map(&mut index_map);
1046 assert_eq!(index_map.len(), 1);
1047 assert_eq!(index_map.get("foo").unwrap(), &(true, 0));
1048
1049 let mut manager = DataSrcManager::new(false);
1050 let ds2 = AsyncDataSrc::new(2, logger.clone(), false, 0);
1051 let ds3 = SyncDataSrc::new(3, logger.clone(), false);
1052 manager.add("bar", ds2);
1053 manager.add("baz", ds3);
1054 manager.setup_async(&mut errors).await;
1055 assert!(errors.is_empty());
1056 manager.copy_ds_ready_to_map(&mut index_map);
1057 assert_eq!(index_map.len(), 3);
1058 assert_eq!(index_map.get("foo").unwrap(), &(true, 0));
1059 assert_eq!(index_map.get("bar").unwrap(), &(false, 0));
1060 assert_eq!(index_map.get("baz").unwrap(), &(false, 1));
1061 }
1062
1063 #[tokio::test]
1064 async fn test_of_create_data_conn_and_ok() {
1065 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1066 let mut errors = Vec::new();
1067
1068 let mut manager = DataSrcManager::new(true);
1069 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
1070 manager.add("foo", ds1);
1071 manager.setup_async(&mut errors).await;
1072
1073 if let Ok(boxed) = manager
1074 .create_data_conn_async::<SyncDataConn>(0, "foo")
1075 .await
1076 {
1077 assert_eq!(boxed.name.clone(), "foo".into());
1078 } else {
1079 panic!();
1080 }
1081 }
1082
1083 #[tokio::test]
1084 async fn test_of_create_data_conn_but_not_found() {
1085 let mut errors = Vec::new();
1086
1087 let mut manager = DataSrcManager::new(true);
1088 manager.setup_async(&mut errors).await;
1089
1090 if let Err(err) = manager
1091 .create_data_conn_async::<SyncDataConn>(0, "foo")
1092 .await
1093 {
1094 match err.reason::<DataSrcError>() {
1095 Ok(DataSrcError::NotFoundDataSrcToCreateDataConn {
1096 name,
1097 data_conn_type,
1098 }) => {
1099 assert_eq!(*name, "foo".into());
1100 assert_eq!(
1101 *data_conn_type,
1102 "sabi::tokio::data_src::tests_of_data_src::SyncDataConn"
1103 );
1104 }
1105 _ => panic!(),
1106 }
1107 } else {
1108 panic!();
1109 }
1110 }
1111
1112 #[tokio::test]
1113 async fn test_of_create_data_conn_but_fail_to_cast() {
1114 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1115 let mut errors = Vec::new();
1116
1117 let mut manager = DataSrcManager::new(true);
1118 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
1119 manager.add("foo", ds1);
1120 manager.setup_async(&mut errors).await;
1121
1122 if let Err(err) = manager
1123 .create_data_conn_async::<AsyncDataConn>(0, "foo")
1124 .await
1125 {
1126 match err.reason::<DataSrcError>() {
1127 Ok(DataSrcError::FailToCastDataConn { name, target_type }) => {
1128 assert_eq!(*name, "foo".into());
1129 assert_eq!(
1130 *target_type,
1131 "sabi::tokio::data_src::tests_of_data_src::AsyncDataConn"
1132 );
1133 }
1134 _ => panic!(),
1135 }
1136 } else {
1137 panic!();
1138 }
1139 }
1140
1141 #[tokio::test]
1142 async fn test_of_create_data_conn_but_fail_to_create() {
1143 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1144 let mut errors = Vec::new();
1145
1146 let mut manager = DataSrcManager::new(true);
1147 let ds1 = SyncDataSrc::new_for_fail_to_create_data_conn(1, logger.clone());
1148 manager.add("foo", ds1);
1149 manager.setup_async(&mut errors).await;
1150
1151 if let Err(err) = manager
1152 .create_data_conn_async::<SyncDataConn>(0, "foo")
1153 .await
1154 {
1155 match err.reason::<DataSrcError>() {
1156 Ok(DataSrcError::FailToCreateDataConn {
1157 name,
1158 data_conn_type,
1159 }) => {
1160 assert_eq!(*name, "foo".into());
1161 assert_eq!(
1162 *data_conn_type,
1163 "sabi::tokio::data_src::tests_of_data_src::SyncDataConn"
1164 );
1165 }
1166 _ => panic!(),
1167 }
1168 } else {
1169 panic!();
1170 }
1171 }
1172}