1mod global_setup;
6
7pub(crate) use global_setup::{
8 copy_global_data_srcs_to_map, create_data_conn_from_global_data_src_async,
9};
10pub use global_setup::{
11 create_static_data_src_container, setup_async, setup_with_order_async, uses_async,
12};
13
14use crate::tokio::{
15 AsyncGroup, DataConn, DataConnContainer, DataSrc, DataSrcContainer, DataSrcManager,
16 SendSyncNonNull,
17};
18
19use std::collections::HashMap;
20use std::future::Future;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::{any, mem, ptr};
24
25#[derive(Debug)]
27pub enum DataSrcError {
28 FailToSetupGlobalDataSrcs {
30 errors: Vec<(Arc<str>, errs::Err)>,
32 },
33
34 DuringSetupGlobalDataSrcs,
36
37 AlreadySetupGlobalDataSrcs,
39
40 FailToCastDataConn {
42 name: Arc<str>,
44 target_type: &'static str,
46 },
47
48 FailToCreateDataConn {
50 name: Arc<str>,
52 data_conn_type: &'static str,
54 },
55
56 NotFoundDataSrcToCreateDataConn {
58 name: Arc<str>,
60 data_conn_type: &'static str,
62 },
63}
64
65impl<S, C> DataSrcContainer<S, C>
66where
67 S: DataSrc<C> + 'static,
68 C: DataConn + 'static,
69{
70 pub(crate) fn new(name: impl Into<Arc<str>>, data_src: S, local: bool) -> Self {
71 Self {
72 drop_fn: drop_data_src::<S, C>,
73 close_fn: close_data_src::<S, C>,
74 is_data_conn_fn: is_data_conn::<C>,
75
76 setup_fn: setup_data_src_async::<S, C>,
77 create_data_conn_fn: create_data_conn_async::<S, C>,
78
79 local,
80 name: name.into(),
81 data_src,
82 }
83 }
84}
85
86fn drop_data_src<S, C>(ptr: *const DataSrcContainer)
87where
88 S: DataSrc<C>,
89 C: DataConn + 'static,
90{
91 let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
92 drop(unsafe { Box::from_raw(typed_ptr) });
93}
94
95fn close_data_src<S, C>(ptr: *const DataSrcContainer)
96where
97 S: DataSrc<C>,
98 C: DataConn + 'static,
99{
100 let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
101 unsafe { (*typed_ptr).data_src.close() };
102}
103
104fn is_data_conn<C>(type_id: any::TypeId) -> bool
105where
106 C: DataConn + 'static,
107{
108 any::TypeId::of::<C>() == type_id
109}
110
111fn setup_data_src_async<S, C>(
112 ptr: *const DataSrcContainer,
113 ag: &mut AsyncGroup,
114) -> Pin<Box<dyn Future<Output = errs::Result<()>> + Send + '_>>
115where
116 S: DataSrc<C> + 'static,
117 C: DataConn + 'static,
118{
119 let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
120 let data_src = unsafe { &mut (*typed_ptr).data_src };
121 Box::pin(data_src.setup_async(ag))
122}
123
124#[allow(clippy::type_complexity)]
125fn create_data_conn_async<'a, S, C>(
126 ptr: *const DataSrcContainer,
127) -> Pin<Box<dyn Future<Output = errs::Result<Box<DataConnContainer<C>>>> + 'a>>
128where
129 S: DataSrc<C> + 'a,
130 C: DataConn + 'static,
131{
132 let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
133 let data_src = unsafe { &mut (*typed_ptr).data_src };
134 let name = unsafe { &(*typed_ptr).name };
135 Box::pin(async move {
136 let conn: Box<C> = data_src.create_data_conn_async().await?;
137 Ok(Box::new(DataConnContainer::<C>::new(
138 name.to_string(),
139 conn,
140 )))
141 })
142}
143
144impl DataSrcManager {
145 pub(crate) const fn new(local: bool) -> Self {
146 Self {
147 vec_unready: Vec::new(),
148 vec_ready: Vec::new(),
149 local,
150 }
151 }
152
153 pub(crate) fn prepend(&mut self, vec: Vec<SendSyncNonNull<DataSrcContainer>>) {
154 self.vec_unready.splice(0..0, vec);
155 }
156
157 pub(crate) fn add<S, C>(&mut self, name: impl Into<Arc<str>>, ds: S)
158 where
159 S: DataSrc<C> + 'static,
160 C: DataConn + 'static,
161 {
162 let boxed = Box::new(DataSrcContainer::<S, C>::new(name, ds, self.local));
163 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
164 self.vec_unready.push(SendSyncNonNull::new(ptr));
165 }
166
167 pub(crate) fn remove(&mut self, name: impl AsRef<str>) {
168 let extracted_vec: Vec<_> = self
169 .vec_ready
170 .extract_if(.., |ssnnptr| {
171 unsafe { &(*ssnnptr.non_null_ptr.as_ptr()).name }.as_ref() == name.as_ref()
172 })
173 .collect();
174
175 for ssnnptr in extracted_vec.iter().rev() {
176 let ptr = ssnnptr.non_null_ptr.as_ptr();
177 let close_fn = unsafe { (*ptr).close_fn };
178 let drop_fn = unsafe { (*ptr).drop_fn };
179 close_fn(ptr);
180 drop_fn(ptr);
181 }
182
183 let extracted_vec: Vec<_> = self
184 .vec_unready
185 .extract_if(.., |ssnnptr| {
186 unsafe { &(*ssnnptr.non_null_ptr.as_ptr()).name }.as_ref() == name.as_ref()
187 })
188 .collect();
189
190 for ssnnptr in extracted_vec.iter().rev() {
191 let ptr = ssnnptr.non_null_ptr.as_ptr();
192 let drop_fn = unsafe { (*ptr).drop_fn };
193 drop_fn(ptr);
194 }
195 }
196
197 pub(crate) fn close(&mut self) {
198 let vec = mem::take(&mut self.vec_ready);
199 for ssnnptr in vec.into_iter().rev() {
200 let ptr = ssnnptr.non_null_ptr.as_ptr();
201 let close_fn = unsafe { (*ptr).close_fn };
202 let drop_fn = unsafe { (*ptr).drop_fn };
203 close_fn(ptr);
204 drop_fn(ptr);
205 }
206 let vec = mem::take(&mut self.vec_unready);
207 for ssnnptr in vec.into_iter().rev() {
208 let ptr = ssnnptr.non_null_ptr.as_ptr();
209 let drop_fn = unsafe { (*ptr).drop_fn };
210 drop_fn(ptr);
211 }
212 }
213
214 pub(crate) async fn setup_async(&mut self, errors: &mut Vec<(Arc<str>, errs::Err)>) {
215 if self.vec_unready.is_empty() {
216 return;
217 }
218
219 let mut n_done = 0;
220 let mut ag = AsyncGroup::new();
221 for ssnnptr in self.vec_unready.iter() {
222 n_done += 1;
223 let ptr = ssnnptr.non_null_ptr.as_ptr();
224 let setup_fn = unsafe { (*ptr).setup_fn };
225 ag._name = unsafe { (*ptr).name.clone() };
226 if let Err(err) = setup_fn(ptr, &mut ag).await {
227 errors.push((ag._name.clone(), err));
228 break;
229 }
230 }
231 ag.join_and_collect_errors_async(errors).await;
232
233 if errors.is_empty() {
234 self.vec_ready.append(&mut self.vec_unready);
235 } else {
236 for ssnnptr in self.vec_unready[0..n_done].iter().rev() {
237 let ptr = ssnnptr.non_null_ptr.as_ptr();
238 let close_fn = unsafe { (*ptr).close_fn };
239 close_fn(ptr);
240 }
241 }
242 }
243
244 pub(crate) async fn setup_with_order_async(
245 &mut self,
246 names: &[&str],
247 errors: &mut Vec<(Arc<str>, errs::Err)>,
248 ) {
249 if self.vec_unready.is_empty() {
250 return;
251 }
252
253 let mut index_map: HashMap<&str, usize> = HashMap::with_capacity(names.len());
254 for (i, nm) in names.iter().rev().enumerate() {
256 index_map.insert(*nm, names.len() - 1 - i);
257 }
258
259 let vec_unready = mem::take(&mut self.vec_unready);
260
261 let mut ordered_vec: Vec<Option<SendSyncNonNull<DataSrcContainer>>> =
262 vec![None; index_map.len()];
263 for ssnnptr in vec_unready.into_iter() {
264 let ptr = ssnnptr.non_null_ptr.as_ptr();
265 let name = unsafe { (*ptr).name.clone() };
266 if let Some(index) = index_map.remove(name.as_ref()) {
267 ordered_vec[index] = Some(ssnnptr);
268 } else {
269 ordered_vec.push(Some(ssnnptr));
270 }
271 }
272
273 let mut n_done = 0;
274 let mut ag = AsyncGroup::new();
275 for ssnnptr_opt in ordered_vec.iter() {
276 n_done += 1;
277 if let Some(ssnnptr) = ssnnptr_opt {
278 let ptr = ssnnptr.non_null_ptr.as_ptr();
279 let setup_fn = unsafe { (*ptr).setup_fn };
280 ag._name = unsafe { (*ptr).name.clone() };
281 if let Err(err) = setup_fn(ptr, &mut ag).await {
282 errors.push((ag._name.clone(), err));
283 break;
284 }
285 }
286 }
287 ag.join_and_collect_errors_async(errors).await;
288
289 if errors.is_empty() {
290 for ssnnptr in ordered_vec.into_iter().flatten() {
291 self.vec_ready.push(ssnnptr);
292 }
293 } else {
294 for ssnnptr in ordered_vec[0..n_done].iter().flatten().rev() {
295 let ptr = ssnnptr.non_null_ptr.as_ptr();
296 let close_fn = unsafe { (*ptr).close_fn };
297 close_fn(ptr);
298 }
299 for ssnnptr in ordered_vec.into_iter().flatten() {
300 self.vec_unready.push(ssnnptr);
301 }
302 }
303 }
304
305 pub(crate) fn copy_ds_ready_to_map(&self, index_map: &mut HashMap<Arc<str>, (bool, usize)>) {
306 for (i, ssnnptr) in self.vec_ready.iter().enumerate() {
307 let ptr = ssnnptr.non_null_ptr.as_ptr();
308 let name = unsafe { (*ptr).name.clone() };
309 index_map.insert(name, (self.local, i));
310 }
311 }
312
313 pub(crate) async fn create_data_conn_async<C>(
314 &self,
315 index: usize,
316 name: impl AsRef<str>,
317 ) -> errs::Result<Box<DataConnContainer>>
318 where
319 C: DataConn + 'static,
320 {
321 if let Some(ssnnptr) = self.vec_ready.get(index) {
322 let ptr = ssnnptr.non_null_ptr.as_ptr();
323 let type_id = any::TypeId::of::<C>();
324 let is_fn = unsafe { (*ptr).is_data_conn_fn };
325 let create_data_conn_fn = unsafe { (*ptr).create_data_conn_fn };
326 if !is_fn(type_id) {
327 Err(errs::Err::new(DataSrcError::FailToCastDataConn {
328 name: name.as_ref().into(),
329 target_type: any::type_name::<C>(),
330 }))
331 } else {
332 match create_data_conn_fn(ptr).await {
333 Ok(boxed) => Ok(boxed),
334 Err(err) => Err(errs::Err::with_source(
335 DataSrcError::FailToCreateDataConn {
336 name: name.as_ref().into(),
337 data_conn_type: any::type_name::<C>(),
338 },
339 err,
340 )),
341 }
342 }
343 } else {
344 Err(errs::Err::new(
345 DataSrcError::NotFoundDataSrcToCreateDataConn {
346 name: name.as_ref().into(),
347 data_conn_type: any::type_name::<C>(),
348 },
349 ))
350 }
351 }
352}
353
354impl Drop for DataSrcManager {
355 fn drop(&mut self) {
356 self.close();
357 }
358}
359
360#[cfg(test)]
361mod tests_of_data_src {
362 use super::*;
363 use std::sync::Arc;
364 use tokio::sync::Mutex;
365
366 struct SyncDataConn {}
367 impl SyncDataConn {
368 fn new() -> Self {
369 Self {}
370 }
371 }
372 impl DataConn for SyncDataConn {
373 async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
374 Ok(())
375 }
376 async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {}
377 fn close(&mut self) {}
378 }
379
380 struct AsyncDataConn {}
381 impl AsyncDataConn {
382 fn new() -> Self {
383 Self {}
384 }
385 }
386 impl DataConn for AsyncDataConn {
387 async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
388 Ok(())
389 }
390 async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {}
391 fn close(&mut self) {}
392 }
393
394 struct SyncDataSrc {
395 id: i8,
396 logger: Arc<Mutex<Vec<String>>>,
397 fail_to_setup: bool,
398 fail_to_create_data_conn: bool,
399 }
400 impl SyncDataSrc {
401 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail_to_setup: bool) -> Self {
402 let logger_clone = logger.clone();
403 tokio::spawn(async move {
404 logger_clone
405 .lock()
406 .await
407 .push(format!("SyncDataSrc::new {}", id));
408 });
409 Self {
410 id,
411 logger: logger,
412 fail_to_setup,
413 fail_to_create_data_conn: false,
414 }
415 }
416 fn new_for_fail_to_create_data_conn(id: i8, logger: Arc<Mutex<Vec<String>>>) -> Self {
417 Self {
418 id,
419 logger: logger,
420 fail_to_setup: false,
421 fail_to_create_data_conn: true,
422 }
423 }
424 }
425 impl Drop for SyncDataSrc {
426 fn drop(&mut self) {
427 let logger = self.logger.clone();
428 let id = self.id;
429 tokio::spawn(async move {
430 logger
431 .lock()
432 .await
433 .push(format!("SyncDataSrc::drop {}", id));
434 });
435 }
436 }
437 impl DataSrc<SyncDataConn> for SyncDataSrc {
438 async fn setup_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
439 let fail = self.fail_to_setup;
440 let id = self.id;
441 let logger = self.logger.clone();
442
443 if fail {
444 logger
445 .lock()
446 .await
447 .push(format!("SyncDataSrc::setup {} failed", id));
448 return Err(errs::Err::new("XXX".to_string()));
449 }
450 logger
451 .lock()
452 .await
453 .push(format!("SyncDataSrc::setup {}", id));
454 Ok(())
455 }
456 fn close(&mut self) {
457 let logger = self.logger.clone();
458 let id = self.id;
459 tokio::spawn(async move {
460 logger
461 .lock()
462 .await
463 .push(format!("SyncDataSrc::close {}", id));
464 });
465 }
466 async fn create_data_conn_async(&mut self) -> errs::Result<Box<SyncDataConn>> {
467 let id = self.id;
468 let logger = self.logger.clone();
469 {
470 logger
471 .lock()
472 .await
473 .push(format!("SyncDataSrc::create_data_conn {}", id));
474 }
475 if self.fail_to_create_data_conn {
476 return Err(errs::Err::new("eeee".to_string()));
477 }
478 let conn = SyncDataConn::new();
479 Ok(Box::new(conn))
480 }
481 }
482
483 struct AsyncDataSrc {
484 id: i8,
485 fail: bool,
486 logger: Arc<Mutex<Vec<String>>>,
487 wait: u64,
488 }
489 impl AsyncDataSrc {
490 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: bool, wait: u64) -> Self {
491 let logger_clone = logger.clone();
492 tokio::spawn(async move {
493 logger_clone
494 .lock()
495 .await
496 .push(format!("AsyncDataSrc::new {}", id));
497 });
498 Self {
499 id,
500 fail,
501 logger,
502 wait,
503 }
504 }
505 }
506 impl Drop for AsyncDataSrc {
507 fn drop(&mut self) {
508 let logger = self.logger.clone();
509 let id = self.id;
510 tokio::spawn(async move {
511 logger
512 .lock()
513 .await
514 .push(format!("AsyncDataSrc::drop {}", id));
515 });
516 }
517 }
518 impl DataSrc<AsyncDataConn> for AsyncDataSrc {
519 async fn setup_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
520 let logger = self.logger.clone();
521 let fail = self.fail;
522 let id = self.id;
523 let wait = self.wait;
524
525 ag.add(async move {
526 tokio::time::sleep(std::time::Duration::from_millis(wait)).await;
527 let mut logger = logger.lock().await;
528 if fail {
529 logger.push(format!("AsyncDataSrc::setup {} failed to setup", id));
530 return Err(errs::Err::new("XXX".to_string()));
531 }
532 logger.push(format!("AsyncDataSrc::setup {}", id));
533 Ok(())
534 });
535 Ok(())
536 }
537 fn close(&mut self) {
538 let logger = self.logger.clone();
539 let id = self.id;
540 tokio::spawn(async move {
541 logger
542 .lock()
543 .await
544 .push(format!("AsyncDataSrc::close {}", id));
545 });
546 }
547 async fn create_data_conn_async(&mut self) -> errs::Result<Box<AsyncDataConn>> {
548 let logger = self.logger.clone();
549 {
550 logger
551 .lock()
552 .await
553 .push(format!("AsyncDataSrc::create_data_conn {}", self.id));
554 }
555 let conn = AsyncDataConn::new();
556 Ok(Box::new(conn))
557 }
558 }
559
560 #[tokio::test]
561 async fn test_of_new() {
562 let manager = DataSrcManager::new(true);
563 assert!(manager.local);
564 assert_eq!(manager.vec_unready.len(), 0);
565 assert_eq!(manager.vec_ready.len(), 0);
566
567 let manager = DataSrcManager::new(false);
568 assert!(!manager.local);
569 assert_eq!(manager.vec_unready.len(), 0);
570 assert_eq!(manager.vec_ready.len(), 0);
571 }
572
573 #[tokio::test]
574 async fn test_of_prepend() {
575 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
576
577 {
578 let mut vec = Vec::<SendSyncNonNull<DataSrcContainer>>::new();
579
580 let ds = SyncDataSrc::new(1, logger.clone(), false);
581 let boxed = Box::new(DataSrcContainer::new("foo", ds, true));
582 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
583 vec.push(SendSyncNonNull::new(ptr));
584
585 let ds = AsyncDataSrc::new(2, logger.clone(), false, 0);
586 let boxed = Box::new(DataSrcContainer::new("bar", ds, true));
587 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
588 vec.push(SendSyncNonNull::new(ptr));
589
590 let mut manager = DataSrcManager::new(true);
591 manager.prepend(vec);
592
593 assert!(manager.local);
594 assert_eq!(manager.vec_unready.len(), 2);
595 assert_eq!(manager.vec_ready.len(), 0);
596
597 assert_eq!(
598 unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
599 "foo".into()
600 );
601 assert_eq!(
602 unsafe { manager.vec_unready[1].non_null_ptr.as_ref().name.clone() },
603 "bar".into()
604 );
605
606 let mut vec = Vec::<SendSyncNonNull<DataSrcContainer>>::new();
607
608 let ds = SyncDataSrc::new(3, logger.clone(), false);
609 let boxed = Box::new(DataSrcContainer::new("baz", ds, true));
610 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
611 vec.push(SendSyncNonNull::new(ptr));
612
613 let ds = AsyncDataSrc::new(4, logger.clone(), false, 0);
614 let boxed = Box::new(DataSrcContainer::new("qux", ds, true));
615 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
616 vec.push(SendSyncNonNull::new(ptr));
617
618 manager.prepend(vec);
619
620 assert!(manager.local);
621 assert_eq!(manager.vec_unready.len(), 4);
622 assert_eq!(manager.vec_ready.len(), 0);
623
624 assert_eq!(
625 unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
626 "baz".into()
627 );
628 assert_eq!(
629 unsafe { manager.vec_unready[1].non_null_ptr.as_ref().name.clone() },
630 "qux".into()
631 );
632 assert_eq!(
633 unsafe { manager.vec_unready[2].non_null_ptr.as_ref().name.clone() },
634 "foo".into()
635 );
636 assert_eq!(
637 unsafe { manager.vec_unready[3].non_null_ptr.as_ref().name.clone() },
638 "bar".into()
639 );
640 }
641
642 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
644
645 let locked = logger.lock().await;
646 assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
647 assert!(locked.contains(&"AsyncDataSrc::new 2".to_string()));
648 assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
649 assert!(locked.contains(&"AsyncDataSrc::new 4".to_string()));
650 assert!(locked.contains(&"AsyncDataSrc::drop 2".to_string()));
651 assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
652 assert!(locked.contains(&"AsyncDataSrc::drop 4".to_string()));
653 assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
654 }
655
656 #[tokio::test]
657 async fn test_of_add() {
658 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
659
660 {
661 let mut manager = DataSrcManager::new(true);
662
663 let ds = SyncDataSrc::new(1, logger.clone(), false);
664 manager.add("foo", ds);
665
666 assert!(manager.local);
667 assert_eq!(manager.vec_unready.len(), 1);
668 assert_eq!(manager.vec_ready.len(), 0);
669
670 assert_eq!(
671 unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
672 "foo".into()
673 );
674
675 let ds = AsyncDataSrc::new(2, logger.clone(), false, 0);
676 manager.add("bar", ds);
677
678 assert!(manager.local);
679 assert_eq!(manager.vec_unready.len(), 2);
680 assert_eq!(manager.vec_ready.len(), 0);
681
682 assert_eq!(
683 unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
684 "foo".into()
685 );
686 assert_eq!(
687 unsafe { manager.vec_unready[1].non_null_ptr.as_ref().name.clone() },
688 "bar".into()
689 );
690 }
691
692 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
693
694 let locked = logger.lock().await;
695 assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
696 assert!(locked.contains(&"AsyncDataSrc::new 2".to_string()));
697 assert!(locked.contains(&"AsyncDataSrc::drop 2".to_string()));
698 assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
699 }
700
701 #[tokio::test]
702 async fn test_of_remove() {
703 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
704
705 {
706 let mut manager = DataSrcManager::new(true);
707
708 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
709 let boxed = Box::new(DataSrcContainer::new("foo", ds1, true));
710 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
711 manager.vec_unready.push(SendSyncNonNull::new(ptr));
712
713 let ds2 = AsyncDataSrc::new(2, logger.clone(), false, 0);
714 let boxed = Box::new(DataSrcContainer::new("bar", ds2, true));
715 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
716 manager.vec_unready.push(SendSyncNonNull::new(ptr));
717
718 let ds3 = SyncDataSrc::new(3, logger.clone(), false);
719 let boxed = Box::new(DataSrcContainer::new("baz", ds3, true));
720 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
721 manager.vec_ready.push(SendSyncNonNull::new(ptr));
722
723 let ds4 = AsyncDataSrc::new(4, logger.clone(), false, 0);
724 let boxed = Box::new(DataSrcContainer::new("qux", ds4, true));
725 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
726 manager.vec_ready.push(SendSyncNonNull::new(ptr));
727
728 assert!(manager.local);
729 assert_eq!(manager.vec_unready.len(), 2);
730 assert_eq!(manager.vec_ready.len(), 2);
731
732 manager.remove("baz");
733 manager.remove("foo");
734 manager.remove("qux");
735 manager.remove("bar");
736 }
737
738 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
739
740 let locked = logger.lock().await;
741 assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
742 assert!(locked.contains(&"AsyncDataSrc::new 2".to_string()));
743 assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
744 assert!(locked.contains(&"AsyncDataSrc::new 4".to_string()));
745 assert!(locked.contains(&"SyncDataSrc::close 3".to_string()));
746 assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
747 assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
748 assert!(locked.contains(&"AsyncDataSrc::close 4".to_string()));
749 assert!(locked.contains(&"AsyncDataSrc::drop 4".to_string()));
750 assert!(locked.contains(&"AsyncDataSrc::drop 2".to_string()));
751 }
752
753 #[tokio::test]
754 async fn test_of_close() {
755 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
756
757 {
758 let mut manager = DataSrcManager::new(true);
759
760 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
761 let boxed = Box::new(DataSrcContainer::new("foo", ds1, true));
762 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
763 manager.vec_unready.push(SendSyncNonNull::new(ptr));
764
765 let ds2 = AsyncDataSrc::new(2, logger.clone(), false, 0);
766 let boxed = Box::new(DataSrcContainer::new("bar", ds2, true));
767 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
768 manager.vec_unready.push(SendSyncNonNull::new(ptr));
769
770 let ds3 = SyncDataSrc::new(3, logger.clone(), false);
771 let boxed = Box::new(DataSrcContainer::new("baz", ds3, true));
772 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
773 manager.vec_ready.push(SendSyncNonNull::new(ptr));
774
775 let ds4 = AsyncDataSrc::new(4, logger.clone(), false, 0);
776 let boxed = Box::new(DataSrcContainer::new("qux", ds4, true));
777 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
778 manager.vec_ready.push(SendSyncNonNull::new(ptr));
779
780 assert!(manager.local);
781 assert_eq!(manager.vec_unready.len(), 2);
782 assert_eq!(manager.vec_ready.len(), 2);
783
784 manager.close();
785 }
786
787 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
788
789 let locked = logger.lock().await;
790 assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
791 assert!(locked.contains(&"AsyncDataSrc::new 2".to_string()));
792 assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
793 assert!(locked.contains(&"AsyncDataSrc::new 4".to_string()));
794 assert!(locked.contains(&"AsyncDataSrc::close 4".to_string()));
795 assert!(locked.contains(&"AsyncDataSrc::drop 4".to_string()));
796 assert!(locked.contains(&"SyncDataSrc::close 3".to_string()));
797 assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
798 assert!(locked.contains(&"AsyncDataSrc::drop 2".to_string()));
799 assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
800 }
801
802 #[tokio::test]
803 async fn test_of_setup_async_and_ok() {
804 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
805
806 {
807 let mut manager = DataSrcManager::new(true);
808
809 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
810 manager.add("foo", ds1);
811
812 let ds2 = SyncDataSrc::new(2, logger.clone(), false);
813 manager.add("bar", ds2);
814
815 assert!(manager.local);
816 assert_eq!(manager.vec_unready.len(), 2);
817 assert_eq!(manager.vec_ready.len(), 0);
818
819 let mut vec = Vec::new();
820 manager.setup_async(&mut vec).await;
821
822 assert!(manager.local);
823 assert_eq!(manager.vec_unready.len(), 0);
824 assert_eq!(manager.vec_ready.len(), 2);
825 }
826
827 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
828
829 let locked = logger.lock().await;
830 assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
831 assert!(locked.contains(&"SyncDataSrc::new 2".to_string()));
832 assert!(locked.contains(&"SyncDataSrc::setup 1".to_string()));
833 assert!(locked.contains(&"SyncDataSrc::setup 2".to_string()));
834 assert!(locked.contains(&"SyncDataSrc::close 2".to_string()));
835 assert!(locked.contains(&"SyncDataSrc::drop 2".to_string()));
836 assert!(locked.contains(&"SyncDataSrc::close 1".to_string()));
837 assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
838 }
839
840 #[tokio::test]
841 async fn test_of_setup_but_error() {
842 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
843
844 {
845 let mut manager = DataSrcManager::new(true);
846
847 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
848 manager.add("foo", ds1);
849
850 let ds2 = SyncDataSrc::new(2, logger.clone(), true);
851 manager.add("bar", ds2);
852
853 let ds3 = SyncDataSrc::new(3, logger.clone(), true);
854 manager.add("bar", ds3);
855
856 assert!(manager.local);
857 assert_eq!(manager.vec_unready.len(), 3);
858 assert_eq!(manager.vec_ready.len(), 0);
859
860 let mut vec = Vec::new();
861 manager.setup_async(&mut vec).await;
862
863 assert!(manager.local);
864 assert_eq!(manager.vec_unready.len(), 3);
865 assert_eq!(manager.vec_ready.len(), 0);
866 }
867
868 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
869
870 let locked = logger.lock().await;
871 assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
872 assert!(locked.contains(&"SyncDataSrc::new 2".to_string()));
873 assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
874 assert!(locked.contains(&"SyncDataSrc::setup 1".to_string()));
875 assert!(locked.contains(&"SyncDataSrc::setup 2 failed".to_string()));
876 assert!(locked.contains(&"SyncDataSrc::close 2".to_string()));
877 assert!(locked.contains(&"SyncDataSrc::close 1".to_string()));
878 assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
879 assert!(locked.contains(&"SyncDataSrc::drop 2".to_string()));
880 assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
881 }
882
883 #[tokio::test]
884 async fn test_of_setup_with_order_and_ok() {
885 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
886
887 {
888 let mut manager = DataSrcManager::new(true);
889
890 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
891 manager.add("foo", ds1);
892
893 let ds2 = SyncDataSrc::new(2, logger.clone(), false);
894 manager.add("bar", ds2);
895
896 let ds3 = SyncDataSrc::new(3, logger.clone(), false);
897 manager.add("baz", ds3);
898
899 assert!(manager.local);
900 assert_eq!(manager.vec_unready.len(), 3);
901 assert_eq!(manager.vec_ready.len(), 0);
902
903 let mut vec = Vec::new();
904 manager
905 .setup_with_order_async(&["baz", "foo"], &mut vec)
906 .await;
907
908 assert!(manager.local);
909 assert_eq!(manager.vec_unready.len(), 0);
910 assert_eq!(manager.vec_ready.len(), 3);
911 }
912
913 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
914
915 let locked = logger.lock().await;
916 assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
917 assert!(locked.contains(&"SyncDataSrc::new 2".to_string()));
918 assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
919 assert!(locked.contains(&"SyncDataSrc::setup 3".to_string()));
920 assert!(locked.contains(&"SyncDataSrc::setup 1".to_string()));
921 assert!(locked.contains(&"SyncDataSrc::setup 2".to_string()));
922 assert!(locked.contains(&"SyncDataSrc::close 2".to_string()));
923 assert!(locked.contains(&"SyncDataSrc::drop 2".to_string()));
924 assert!(locked.contains(&"SyncDataSrc::close 1".to_string()));
925 assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
926 assert!(locked.contains(&"SyncDataSrc::close 3".to_string()));
927 assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
928 }
929
930 #[tokio::test]
931 async fn test_of_setup_with_order_but_fail() {
932 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
933
934 {
935 let mut manager = DataSrcManager::new(true);
936
937 let ds1 = SyncDataSrc::new(1, logger.clone(), true);
938 manager.add("foo", ds1);
939
940 let ds2 = SyncDataSrc::new(2, logger.clone(), true);
941 manager.add("bar", ds2);
942
943 let ds3 = SyncDataSrc::new(3, logger.clone(), false);
944 manager.add("baz", ds3);
945
946 assert!(manager.local);
947 assert_eq!(manager.vec_unready.len(), 3);
948 assert_eq!(manager.vec_ready.len(), 0);
949
950 let mut vec = Vec::new();
951 manager
952 .setup_with_order_async(&["baz", "foo"], &mut vec)
953 .await;
954
955 assert!(manager.local);
956 assert_eq!(manager.vec_unready.len(), 3);
957 assert_eq!(manager.vec_ready.len(), 0);
958 }
959
960 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
961
962 let locked = logger.lock().await;
963 assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
964 assert!(locked.contains(&"SyncDataSrc::new 2".to_string()));
965 assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
966 assert!(locked.contains(&"SyncDataSrc::setup 3".to_string()));
967 assert!(locked.contains(&"SyncDataSrc::setup 1 failed".to_string()));
968 assert!(locked.contains(&"SyncDataSrc::close 1".to_string()));
969 assert!(locked.contains(&"SyncDataSrc::close 3".to_string()));
970 assert!(locked.contains(&"SyncDataSrc::drop 2".to_string()));
971 assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
972 assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
973 }
974
975 #[tokio::test]
976 async fn test_of_setup_with_order_containing_duplicated_name_and_ok() {
977 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
978
979 {
980 let mut manager = DataSrcManager::new(true);
981
982 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
983 manager.add("foo", ds1);
984
985 let ds2 = SyncDataSrc::new(2, logger.clone(), false);
986 manager.add("bar", ds2);
987
988 let ds3 = SyncDataSrc::new(3, logger.clone(), false);
989 manager.add("baz", ds3);
990
991 assert!(manager.local);
992 assert_eq!(manager.vec_unready.len(), 3);
993 assert_eq!(manager.vec_ready.len(), 0);
994
995 let mut vec = Vec::new();
996 manager
997 .setup_with_order_async(&["baz", "foo", "baz"], &mut vec)
998 .await;
999
1000 assert!(manager.local);
1001 assert_eq!(manager.vec_unready.len(), 0);
1002 assert_eq!(manager.vec_ready.len(), 3);
1003 }
1004
1005 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1006
1007 let locked = logger.lock().await;
1008 assert!(locked.contains(&"SyncDataSrc::new 1".to_string()));
1009 assert!(locked.contains(&"SyncDataSrc::new 2".to_string()));
1010 assert!(locked.contains(&"SyncDataSrc::new 3".to_string()));
1011 assert!(locked.contains(&"SyncDataSrc::setup 3".to_string()));
1012 assert!(locked.contains(&"SyncDataSrc::setup 1".to_string()));
1013 assert!(locked.contains(&"SyncDataSrc::setup 2".to_string()));
1014 assert!(locked.contains(&"SyncDataSrc::close 2".to_string()));
1015 assert!(locked.contains(&"SyncDataSrc::drop 2".to_string()));
1016 assert!(locked.contains(&"SyncDataSrc::close 1".to_string()));
1017 assert!(locked.contains(&"SyncDataSrc::drop 1".to_string()));
1018 assert!(locked.contains(&"SyncDataSrc::close 3".to_string()));
1019 assert!(locked.contains(&"SyncDataSrc::drop 3".to_string()));
1020 }
1021
1022 #[tokio::test]
1023 async fn test_of_copy_ds_ready_to_map() {
1024 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1025 let mut errors = Vec::new();
1026
1027 let mut index_map = HashMap::<Arc<str>, (bool, usize)>::new();
1028
1029 let manager = DataSrcManager::new(true);
1030 manager.copy_ds_ready_to_map(&mut index_map);
1031 assert!(index_map.is_empty());
1032
1033 let mut manager = DataSrcManager::new(true);
1034 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
1035 manager.add("foo", ds1);
1036 manager.setup_async(&mut errors).await;
1037 assert!(errors.is_empty());
1038 manager.copy_ds_ready_to_map(&mut index_map);
1039 assert_eq!(index_map.len(), 1);
1040 assert_eq!(index_map.get("foo").unwrap(), &(true, 0));
1041
1042 let mut manager = DataSrcManager::new(false);
1043 let ds2 = AsyncDataSrc::new(2, logger.clone(), false, 0);
1044 let ds3 = SyncDataSrc::new(3, logger.clone(), false);
1045 manager.add("bar", ds2);
1046 manager.add("baz", ds3);
1047 manager.setup_async(&mut errors).await;
1048 assert!(errors.is_empty());
1049 manager.copy_ds_ready_to_map(&mut index_map);
1050 assert_eq!(index_map.len(), 3);
1051 assert_eq!(index_map.get("foo").unwrap(), &(true, 0));
1052 assert_eq!(index_map.get("bar").unwrap(), &(false, 0));
1053 assert_eq!(index_map.get("baz").unwrap(), &(false, 1));
1054 }
1055
1056 #[tokio::test]
1057 async fn test_of_create_data_conn_and_ok() {
1058 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1059 let mut errors = Vec::new();
1060
1061 let mut manager = DataSrcManager::new(true);
1062 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
1063 manager.add("foo", ds1);
1064 manager.setup_async(&mut errors).await;
1065
1066 if let Ok(boxed) = manager
1067 .create_data_conn_async::<SyncDataConn>(0, "foo")
1068 .await
1069 {
1070 assert_eq!(boxed.name.clone(), "foo".into());
1071 } else {
1072 panic!();
1073 }
1074 }
1075
1076 #[tokio::test]
1077 async fn test_of_create_data_conn_but_not_found() {
1078 let mut errors = Vec::new();
1079
1080 let mut manager = DataSrcManager::new(true);
1081 manager.setup_async(&mut errors).await;
1082
1083 if let Err(err) = manager
1084 .create_data_conn_async::<SyncDataConn>(0, "foo")
1085 .await
1086 {
1087 match err.reason::<DataSrcError>() {
1088 Ok(DataSrcError::NotFoundDataSrcToCreateDataConn {
1089 name,
1090 data_conn_type,
1091 }) => {
1092 assert_eq!(*name, "foo".into());
1093 assert_eq!(
1094 *data_conn_type,
1095 "sabi::tokio::data_src::tests_of_data_src::SyncDataConn"
1096 );
1097 }
1098 _ => panic!(),
1099 }
1100 } else {
1101 panic!();
1102 }
1103 }
1104
1105 #[tokio::test]
1106 async fn test_of_create_data_conn_but_fail_to_cast() {
1107 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1108 let mut errors = Vec::new();
1109
1110 let mut manager = DataSrcManager::new(true);
1111 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
1112 manager.add("foo", ds1);
1113 manager.setup_async(&mut errors).await;
1114
1115 if let Err(err) = manager
1116 .create_data_conn_async::<AsyncDataConn>(0, "foo")
1117 .await
1118 {
1119 match err.reason::<DataSrcError>() {
1120 Ok(DataSrcError::FailToCastDataConn { name, target_type }) => {
1121 assert_eq!(*name, "foo".into());
1122 assert_eq!(
1123 *target_type,
1124 "sabi::tokio::data_src::tests_of_data_src::AsyncDataConn"
1125 );
1126 }
1127 _ => panic!(),
1128 }
1129 } else {
1130 panic!();
1131 }
1132 }
1133
1134 #[tokio::test]
1135 async fn test_of_create_data_conn_but_fail_to_create() {
1136 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1137 let mut errors = Vec::new();
1138
1139 let mut manager = DataSrcManager::new(true);
1140 let ds1 = SyncDataSrc::new_for_fail_to_create_data_conn(1, logger.clone());
1141 manager.add("foo", ds1);
1142 manager.setup_async(&mut errors).await;
1143
1144 if let Err(err) = manager
1145 .create_data_conn_async::<SyncDataConn>(0, "foo")
1146 .await
1147 {
1148 match err.reason::<DataSrcError>() {
1149 Ok(DataSrcError::FailToCreateDataConn {
1150 name,
1151 data_conn_type,
1152 }) => {
1153 assert_eq!(*name, "foo".into());
1154 assert_eq!(
1155 *data_conn_type,
1156 "sabi::tokio::data_src::tests_of_data_src::SyncDataConn"
1157 );
1158 }
1159 _ => panic!(),
1160 }
1161 } else {
1162 panic!();
1163 }
1164 }
1165}