1mod global_setup;
6
7pub(crate) use global_setup::{
8 copy_global_data_srcs_to_map, create_data_conn_from_global_data_src,
9};
10pub use global_setup::{create_static_data_src_container, setup, setup_with_order, uses};
11
12use crate::{
13 AsyncGroup, DataConn, DataConnContainer, DataSrc, DataSrcContainer, DataSrcManager,
14 SendSyncNonNull,
15};
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::{any, mem, ptr};
20
21#[derive(Debug)]
23pub enum DataSrcError {
24 FailToRegisterGlobalDataSrc {
27 name: Arc<str>,
29 },
30
31 FailToSetupGlobalDataSrcs {
34 errors: Vec<(Arc<str>, errs::Err)>,
36 },
37
38 DuringSetupGlobalDataSrcs,
40
41 AlreadySetupGlobalDataSrcs,
43
44 FailToCastDataConn {
46 name: Arc<str>,
48 target_type: &'static str,
50 },
51
52 FailToCreateDataConn {
54 name: Arc<str>,
56 data_conn_type: &'static str,
58 },
59
60 NotFoundDataSrcToCreateDataConn {
63 name: Arc<str>,
65 data_conn_type: &'static str,
67 },
68}
69
70impl<S, C> DataSrcContainer<S, C>
71where
72 S: DataSrc<C>,
73 C: DataConn + 'static,
74{
75 pub(crate) fn new(name: impl Into<Arc<str>>, data_src: S, local: bool) -> Self {
76 Self {
77 drop_fn: drop_data_src::<S, C>,
78 setup_fn: setup_data_src::<S, C>,
79 close_fn: close_data_src::<S, C>,
80 create_data_conn_fn: create_data_conn::<S, C>,
81 is_data_conn_fn: is_data_conn::<C>,
82
83 local,
84 name: name.into(),
85 data_src,
86 }
87 }
88}
89
90fn drop_data_src<S, C>(ptr: *const DataSrcContainer)
91where
92 S: DataSrc<C>,
93 C: DataConn + 'static,
94{
95 let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
96 drop(unsafe { Box::from_raw(typed_ptr) });
97}
98
99fn setup_data_src<S, C>(ptr: *const DataSrcContainer, ag: &mut AsyncGroup) -> errs::Result<()>
100where
101 S: DataSrc<C>,
102 C: DataConn + 'static,
103{
104 let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
105 unsafe { (*typed_ptr).data_src.setup(ag) }
106}
107
108fn close_data_src<S, C>(ptr: *const DataSrcContainer)
109where
110 S: DataSrc<C>,
111 C: DataConn + 'static,
112{
113 let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
114 unsafe { (*typed_ptr).data_src.close() };
115}
116
117fn create_data_conn<S, C>(ptr: *const DataSrcContainer) -> errs::Result<Box<DataConnContainer<C>>>
118where
119 S: DataSrc<C>,
120 C: DataConn + 'static,
121{
122 let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
123 let conn: Box<C> = unsafe { (*typed_ptr).data_src.create_data_conn() }?;
124 let name = unsafe { &(*typed_ptr).name };
125 Ok(Box::new(DataConnContainer::<C>::new(
126 name.to_string(),
127 conn,
128 )))
129}
130
131fn is_data_conn<C>(type_id: any::TypeId) -> bool
132where
133 C: DataConn + 'static,
134{
135 any::TypeId::of::<C>() == type_id
136}
137
138impl DataSrcManager {
139 pub(crate) const fn new(local: bool) -> Self {
140 Self {
141 vec_unready: Vec::new(),
142 vec_ready: Vec::new(),
143 local,
144 }
145 }
146
147 pub(crate) fn prepend(&mut self, vec: Vec<SendSyncNonNull<DataSrcContainer>>) {
148 self.vec_unready.splice(0..0, vec);
149 }
150
151 pub(crate) fn add<S, C>(&mut self, name: impl Into<Arc<str>>, ds: S)
152 where
153 S: DataSrc<C>,
154 C: DataConn + 'static,
155 {
156 let boxed = Box::new(DataSrcContainer::<S, C>::new(name, ds, self.local));
157 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
158 self.vec_unready.push(SendSyncNonNull::new(ptr));
159 }
160
161 pub(crate) fn remove(&mut self, name: impl AsRef<str>) {
162 let extracted_vec: Vec<_> = self
163 .vec_ready
164 .extract_if(.., |ssnnptr| {
165 unsafe { &(*ssnnptr.non_null_ptr.as_ptr()).name }.as_ref() == name.as_ref()
166 })
167 .collect();
168
169 for ssnnptr in extracted_vec.iter().rev() {
170 let ptr = ssnnptr.non_null_ptr.as_ptr();
171 let close_fn = unsafe { (*ptr).close_fn };
172 let drop_fn = unsafe { (*ptr).drop_fn };
173 close_fn(ptr);
174 drop_fn(ptr);
175 }
176
177 let extracted_vec: Vec<_> = self
178 .vec_unready
179 .extract_if(.., |ssnnptr| {
180 unsafe { &(*ssnnptr.non_null_ptr.as_ptr()).name }.as_ref() == name.as_ref()
181 })
182 .collect();
183
184 for ssnnptr in extracted_vec.iter().rev() {
185 let ptr = ssnnptr.non_null_ptr.as_ptr();
186 let drop_fn = unsafe { (*ptr).drop_fn };
187 drop_fn(ptr);
188 }
189 }
190
191 pub(crate) fn close(&mut self) {
192 let vec = mem::take(&mut self.vec_ready);
193 for ssnnptr in vec.into_iter().rev() {
194 let ptr = ssnnptr.non_null_ptr.as_ptr();
195 let close_fn = unsafe { (*ptr).close_fn };
196 let drop_fn = unsafe { (*ptr).drop_fn };
197 close_fn(ptr);
198 drop_fn(ptr);
199 }
200 let vec = mem::take(&mut self.vec_unready);
201 for ssnnptr in vec.into_iter().rev() {
202 let ptr = ssnnptr.non_null_ptr.as_ptr();
203 let drop_fn = unsafe { (*ptr).drop_fn };
204 drop_fn(ptr);
205 }
206 }
207
208 pub(crate) fn setup(&mut self, errors: &mut Vec<(Arc<str>, errs::Err)>) {
209 if self.vec_unready.is_empty() {
210 return;
211 }
212
213 let mut n_done = 0;
214 let mut ag = AsyncGroup::new();
215 for ssnnptr in self.vec_unready.iter() {
216 n_done += 1;
217 let ptr = ssnnptr.non_null_ptr.as_ptr();
218 let setup_fn = unsafe { (*ptr).setup_fn };
219 ag._name = unsafe { (*ptr).name.clone() };
220 if let Err(err) = setup_fn(ptr, &mut ag) {
221 errors.push((ag._name.clone(), err));
222 break;
223 }
224 }
225 ag.join_and_collect_errors(errors);
226
227 if errors.is_empty() {
228 self.vec_ready.append(&mut self.vec_unready);
229 } else {
230 for ssnnptr in self.vec_unready[0..n_done].iter().rev() {
231 let ptr = ssnnptr.non_null_ptr.as_ptr();
232 let close_fn = unsafe { (*ptr).close_fn };
233 close_fn(ptr);
234 }
235 }
236 }
237
238 pub(crate) fn setup_with_order(
239 &mut self,
240 names: &[&str],
241 errors: &mut Vec<(Arc<str>, errs::Err)>,
242 ) {
243 if self.vec_unready.is_empty() {
244 return;
245 }
246
247 let mut index_map: HashMap<&str, usize> = HashMap::with_capacity(names.len());
248 for (i, nm) in names.iter().rev().enumerate() {
250 index_map.insert(*nm, names.len() - 1 - i);
251 }
252
253 let vec_unready = mem::take(&mut self.vec_unready);
254
255 let mut ordered_vec: Vec<Option<SendSyncNonNull<DataSrcContainer>>> =
256 vec![None; index_map.len()];
257 for ssnnptr in vec_unready.into_iter() {
258 let ptr = ssnnptr.non_null_ptr.as_ptr();
259 let name = unsafe { (*ptr).name.clone() };
260 if let Some(index) = index_map.remove(name.as_ref()) {
261 ordered_vec[index] = Some(ssnnptr);
262 } else {
263 ordered_vec.push(Some(ssnnptr));
264 }
265 }
266
267 let mut n_done = 0;
268 let mut ag = AsyncGroup::new();
269 for ssnnptr_opt in ordered_vec.iter() {
270 n_done += 1;
271 if let Some(ssnnptr) = ssnnptr_opt {
272 let ptr = ssnnptr.non_null_ptr.as_ptr();
273 let setup_fn = unsafe { (*ptr).setup_fn };
274 ag._name = unsafe { (*ptr).name.clone() };
275 if let Err(err) = setup_fn(ptr, &mut ag) {
276 errors.push((ag._name.clone(), err));
277 break;
278 }
279 }
280 }
281 ag.join_and_collect_errors(errors);
282
283 if errors.is_empty() {
284 for ssnnptr in ordered_vec.into_iter().flatten() {
285 self.vec_ready.push(ssnnptr);
286 }
287 } else {
288 for ssnnptr in ordered_vec[0..n_done].iter().flatten().rev() {
289 let ptr = ssnnptr.non_null_ptr.as_ptr();
290 let close_fn = unsafe { (*ptr).close_fn };
291 close_fn(ptr);
292 }
293 for ssnnptr in ordered_vec.into_iter().flatten() {
294 self.vec_unready.push(ssnnptr);
295 }
296 }
297 }
298
299 pub(crate) fn copy_ds_ready_to_map(&self, index_map: &mut HashMap<Arc<str>, (bool, usize)>) {
300 for (i, ssnnptr) in self.vec_ready.iter().enumerate() {
301 let ptr = ssnnptr.non_null_ptr.as_ptr();
302 let name = unsafe { (*ptr).name.clone() };
303 index_map.insert(name, (self.local, i));
304 }
305 }
306
307 pub(crate) fn create_data_conn<C>(
308 &self,
309 index: usize,
310 name: impl AsRef<str>,
311 ) -> errs::Result<Box<DataConnContainer>>
312 where
313 C: DataConn + 'static,
314 {
315 if let Some(ssnnptr) = self.vec_ready.get(index) {
316 let ptr = ssnnptr.non_null_ptr.as_ptr();
317 let type_id = any::TypeId::of::<C>();
318 let is_fn = unsafe { (*ptr).is_data_conn_fn };
319 let create_data_conn_fn = unsafe { (*ptr).create_data_conn_fn };
320 if !is_fn(type_id) {
321 Err(errs::Err::new(DataSrcError::FailToCastDataConn {
322 name: name.as_ref().into(),
323 target_type: any::type_name::<C>(),
324 }))
325 } else {
326 match create_data_conn_fn(ptr) {
327 Ok(boxed) => Ok(boxed),
328 Err(err) => Err(errs::Err::with_source(
329 DataSrcError::FailToCreateDataConn {
330 name: name.as_ref().into(),
331 data_conn_type: any::type_name::<C>(),
332 },
333 err,
334 )),
335 }
336 }
337 } else {
338 Err(errs::Err::new(
339 DataSrcError::NotFoundDataSrcToCreateDataConn {
340 name: name.as_ref().into(),
341 data_conn_type: any::type_name::<C>(),
342 },
343 ))
344 }
345 }
346}
347
348impl Drop for DataSrcManager {
349 fn drop(&mut self) {
350 self.close();
351 }
352}
353
354#[cfg(test)]
355mod tests_of_data_src {
356 use super::*;
357 use std::sync::{Arc, Mutex};
358
359 struct SyncDataConn {}
360 impl SyncDataConn {
361 fn new() -> Self {
362 Self {}
363 }
364 }
365 impl DataConn for SyncDataConn {
366 fn commit(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
367 Ok(())
368 }
369 fn rollback(&mut self, _ag: &mut AsyncGroup) {}
370 fn close(&mut self) {}
371 }
372
373 struct AsyncDataConn {}
374 impl AsyncDataConn {
375 fn new() -> Self {
376 Self {}
377 }
378 }
379 impl DataConn for AsyncDataConn {
380 fn commit(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
381 Ok(())
382 }
383 fn rollback(&mut self, _ag: &mut AsyncGroup) {}
384 fn close(&mut self) {}
385 }
386
387 struct SyncDataSrc {
388 id: i8,
389 logger: Arc<Mutex<Vec<String>>>,
390 fail_to_setup: bool,
391 fail_to_create_data_conn: bool,
392 }
393 impl SyncDataSrc {
394 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail_to_setup: bool) -> Self {
395 logger
396 .lock()
397 .unwrap()
398 .push(format!("SyncDataSrc::new {}", id));
399 Self {
400 id,
401 logger: logger,
402 fail_to_setup,
403 fail_to_create_data_conn: false,
404 }
405 }
406 fn new_for_fail_to_create_data_conn(id: i8, logger: Arc<Mutex<Vec<String>>>) -> Self {
407 Self {
408 id,
409 logger: logger,
410 fail_to_setup: false,
411 fail_to_create_data_conn: true,
412 }
413 }
414 }
415 impl Drop for SyncDataSrc {
416 fn drop(&mut self) {
417 self.logger
418 .lock()
419 .unwrap()
420 .push(format!("SyncDataSrc::drop {}", self.id));
421 }
422 }
423 impl DataSrc<SyncDataConn> for SyncDataSrc {
424 fn setup(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
425 if self.fail_to_setup {
426 self.logger
427 .lock()
428 .unwrap()
429 .push(format!("SyncDataSrc::setup {} failed", self.id));
430 return Err(errs::Err::new("XXX".to_string()));
431 }
432 self.logger
433 .lock()
434 .unwrap()
435 .push(format!("SyncDataSrc::setup {}", self.id));
436 Ok(())
437 }
438 fn close(&mut self) {
439 self.logger
440 .lock()
441 .unwrap()
442 .push(format!("SyncDataSrc::close {}", self.id));
443 }
444 fn create_data_conn(&mut self) -> errs::Result<Box<SyncDataConn>> {
445 {
446 self.logger
447 .lock()
448 .unwrap()
449 .push(format!("SyncDataSrc::create_data_conn {}", self.id));
450 }
451 if self.fail_to_create_data_conn {
452 return Err(errs::Err::new("eeee".to_string()));
453 }
454 let conn = SyncDataConn::new();
455 Ok(Box::new(conn))
456 }
457 }
458
459 struct AsyncDataSrc {
460 id: i8,
461 fail: bool,
462 logger: Arc<Mutex<Vec<String>>>,
463 wait: u64,
464 }
465 impl AsyncDataSrc {
466 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: bool, wait: u64) -> Self {
467 logger
468 .lock()
469 .unwrap()
470 .push(format!("AsyncDataSrc::new {}", id));
471 Self {
472 id,
473 fail,
474 logger,
475 wait,
476 }
477 }
478 }
479 impl Drop for AsyncDataSrc {
480 fn drop(&mut self) {
481 self.logger
482 .lock()
483 .unwrap()
484 .push(format!("AsyncDataSrc::drop {}", self.id));
485 }
486 }
487 impl DataSrc<AsyncDataConn> for AsyncDataSrc {
488 fn setup(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
489 let logger = self.logger.clone();
490 let fail = self.fail;
491 let id = self.id;
492 let wait = self.wait;
493 ag.add(move || {
494 std::thread::sleep(std::time::Duration::from_millis(wait));
495 let mut logger = logger.lock().unwrap();
496 if fail {
497 logger.push(format!("AsyncDataSrc::setup {} failed to setup", id));
498 return Err(errs::Err::new("XXX".to_string()));
499 }
500 logger.push(format!("AsyncDataSrc::setup {}", id));
501 Ok(())
502 });
503 Ok(())
504 }
505 fn close(&mut self) {
506 self.logger
507 .lock()
508 .unwrap()
509 .push(format!("AsyncDataSrc::close {}", self.id));
510 }
511 fn create_data_conn(&mut self) -> errs::Result<Box<AsyncDataConn>> {
512 {
513 self.logger
514 .lock()
515 .unwrap()
516 .push(format!("AsyncDataSrc::create_data_conn {}", self.id));
517 }
518 let conn = AsyncDataConn::new();
519 Ok(Box::new(conn))
520 }
521 }
522
523 #[test]
524 fn test_of_new() {
525 let manager = DataSrcManager::new(true);
526 assert!(manager.local);
527 assert_eq!(manager.vec_unready.len(), 0);
528 assert_eq!(manager.vec_ready.len(), 0);
529
530 let manager = DataSrcManager::new(false);
531 assert!(!manager.local);
532 assert_eq!(manager.vec_unready.len(), 0);
533 assert_eq!(manager.vec_ready.len(), 0);
534 }
535
536 #[test]
537 fn test_of_prepend() {
538 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
539
540 {
541 let mut vec = Vec::<SendSyncNonNull<DataSrcContainer>>::new();
542
543 let ds = SyncDataSrc::new(1, logger.clone(), false);
544 let boxed = Box::new(DataSrcContainer::new("foo", ds, true));
545 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
546 vec.push(SendSyncNonNull::new(ptr));
547
548 let ds = AsyncDataSrc::new(2, logger.clone(), false, 0);
549 let boxed = Box::new(DataSrcContainer::new("bar", ds, true));
550 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
551 vec.push(SendSyncNonNull::new(ptr));
552
553 let mut manager = DataSrcManager::new(true);
554 manager.prepend(vec);
555
556 assert!(manager.local);
557 assert_eq!(manager.vec_unready.len(), 2);
558 assert_eq!(manager.vec_ready.len(), 0);
559
560 assert_eq!(
561 unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
562 "foo".into()
563 );
564 assert_eq!(
565 unsafe { manager.vec_unready[1].non_null_ptr.as_ref().name.clone() },
566 "bar".into()
567 );
568
569 let mut vec = Vec::<SendSyncNonNull<DataSrcContainer>>::new();
570
571 let ds = SyncDataSrc::new(3, logger.clone(), false);
572 let boxed = Box::new(DataSrcContainer::new("baz", ds, true));
573 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
574 vec.push(SendSyncNonNull::new(ptr));
575
576 let ds = AsyncDataSrc::new(4, logger.clone(), false, 0);
577 let boxed = Box::new(DataSrcContainer::new("qux", ds, true));
578 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
579 vec.push(SendSyncNonNull::new(ptr));
580
581 manager.prepend(vec);
582
583 assert!(manager.local);
584 assert_eq!(manager.vec_unready.len(), 4);
585 assert_eq!(manager.vec_ready.len(), 0);
586
587 assert_eq!(
588 unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
589 "baz".into()
590 );
591 assert_eq!(
592 unsafe { manager.vec_unready[1].non_null_ptr.as_ref().name.clone() },
593 "qux".into()
594 );
595 assert_eq!(
596 unsafe { manager.vec_unready[2].non_null_ptr.as_ref().name.clone() },
597 "foo".into()
598 );
599 assert_eq!(
600 unsafe { manager.vec_unready[3].non_null_ptr.as_ref().name.clone() },
601 "bar".into()
602 );
603 }
604
605 assert_eq!(
606 *logger.lock().unwrap(),
607 vec![
608 "SyncDataSrc::new 1",
609 "AsyncDataSrc::new 2",
610 "SyncDataSrc::new 3",
611 "AsyncDataSrc::new 4",
612 "AsyncDataSrc::drop 2",
613 "SyncDataSrc::drop 1",
614 "AsyncDataSrc::drop 4",
615 "SyncDataSrc::drop 3",
616 ],
617 );
618 }
619
620 #[test]
621 fn test_of_add() {
622 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
623
624 {
625 let mut manager = DataSrcManager::new(true);
626
627 let ds = SyncDataSrc::new(1, logger.clone(), false);
628 manager.add("foo", ds);
629
630 assert!(manager.local);
631 assert_eq!(manager.vec_unready.len(), 1);
632 assert_eq!(manager.vec_ready.len(), 0);
633
634 assert_eq!(
635 unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
636 "foo".into()
637 );
638
639 let ds = AsyncDataSrc::new(2, logger.clone(), false, 0);
640 manager.add("bar", ds);
641
642 assert!(manager.local);
643 assert_eq!(manager.vec_unready.len(), 2);
644 assert_eq!(manager.vec_ready.len(), 0);
645
646 assert_eq!(
647 unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
648 "foo".into()
649 );
650 assert_eq!(
651 unsafe { manager.vec_unready[1].non_null_ptr.as_ref().name.clone() },
652 "bar".into()
653 );
654 }
655
656 assert_eq!(
657 *logger.lock().unwrap(),
658 vec![
659 "SyncDataSrc::new 1",
660 "AsyncDataSrc::new 2",
661 "AsyncDataSrc::drop 2",
662 "SyncDataSrc::drop 1",
663 ],
664 );
665 }
666
667 #[test]
668 fn test_of_remove() {
669 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
670
671 {
672 let mut manager = DataSrcManager::new(true);
673
674 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
675 let boxed = Box::new(DataSrcContainer::new("foo", ds1, true));
676 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
677 manager.vec_unready.push(SendSyncNonNull::new(ptr));
678
679 let ds2 = AsyncDataSrc::new(2, logger.clone(), false, 0);
680 let boxed = Box::new(DataSrcContainer::new("bar", ds2, true));
681 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
682 manager.vec_unready.push(SendSyncNonNull::new(ptr));
683
684 let ds3 = SyncDataSrc::new(3, logger.clone(), false);
685 let boxed = Box::new(DataSrcContainer::new("baz", ds3, true));
686 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
687 manager.vec_ready.push(SendSyncNonNull::new(ptr));
688
689 let ds4 = AsyncDataSrc::new(4, logger.clone(), false, 0);
690 let boxed = Box::new(DataSrcContainer::new("qux", ds4, true));
691 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
692 manager.vec_ready.push(SendSyncNonNull::new(ptr));
693
694 assert!(manager.local);
695 assert_eq!(manager.vec_unready.len(), 2);
696 assert_eq!(manager.vec_ready.len(), 2);
697
698 manager.remove("baz");
699 manager.remove("foo");
700 manager.remove("qux");
701 manager.remove("bar");
702 }
703
704 assert_eq!(
705 *logger.lock().unwrap(),
706 vec![
707 "SyncDataSrc::new 1",
708 "AsyncDataSrc::new 2",
709 "SyncDataSrc::new 3",
710 "AsyncDataSrc::new 4",
711 "SyncDataSrc::close 3",
712 "SyncDataSrc::drop 3",
713 "SyncDataSrc::drop 1",
714 "AsyncDataSrc::close 4",
715 "AsyncDataSrc::drop 4",
716 "AsyncDataSrc::drop 2",
717 ],
718 );
719 }
720
721 #[test]
722 fn test_of_close() {
723 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
724
725 {
726 let mut manager = DataSrcManager::new(true);
727
728 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
729 let boxed = Box::new(DataSrcContainer::new("foo", ds1, true));
730 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
731 manager.vec_unready.push(SendSyncNonNull::new(ptr));
732
733 let ds2 = AsyncDataSrc::new(2, logger.clone(), false, 0);
734 let boxed = Box::new(DataSrcContainer::new("bar", ds2, true));
735 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
736 manager.vec_unready.push(SendSyncNonNull::new(ptr));
737
738 let ds3 = SyncDataSrc::new(3, logger.clone(), false);
739 let boxed = Box::new(DataSrcContainer::new("baz", ds3, true));
740 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
741 manager.vec_ready.push(SendSyncNonNull::new(ptr));
742
743 let ds4 = AsyncDataSrc::new(4, logger.clone(), false, 0);
744 let boxed = Box::new(DataSrcContainer::new("qux", ds4, true));
745 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
746 manager.vec_ready.push(SendSyncNonNull::new(ptr));
747
748 assert!(manager.local);
749 assert_eq!(manager.vec_unready.len(), 2);
750 assert_eq!(manager.vec_ready.len(), 2);
751
752 manager.close();
753 }
754
755 assert_eq!(
756 *logger.lock().unwrap(),
757 vec![
758 "SyncDataSrc::new 1",
759 "AsyncDataSrc::new 2",
760 "SyncDataSrc::new 3",
761 "AsyncDataSrc::new 4",
762 "AsyncDataSrc::close 4",
763 "AsyncDataSrc::drop 4",
764 "SyncDataSrc::close 3",
765 "SyncDataSrc::drop 3",
766 "AsyncDataSrc::drop 2",
767 "SyncDataSrc::drop 1",
768 ],
769 );
770 }
771
772 #[test]
773 fn test_of_setup_and_ok() {
774 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
775
776 {
777 let mut manager = DataSrcManager::new(true);
778
779 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
780 manager.add("foo", ds1);
781
782 let ds2 = SyncDataSrc::new(2, logger.clone(), false);
783 manager.add("bar", ds2);
784
785 assert!(manager.local);
786 assert_eq!(manager.vec_unready.len(), 2);
787 assert_eq!(manager.vec_ready.len(), 0);
788
789 let mut vec = Vec::new();
790 manager.setup(&mut vec);
791
792 assert!(manager.local);
793 assert_eq!(manager.vec_unready.len(), 0);
794 assert_eq!(manager.vec_ready.len(), 2);
795 }
796
797 assert_eq!(
798 *logger.lock().unwrap(),
799 vec![
800 "SyncDataSrc::new 1",
801 "SyncDataSrc::new 2",
802 "SyncDataSrc::setup 1",
803 "SyncDataSrc::setup 2",
804 "SyncDataSrc::close 2",
805 "SyncDataSrc::drop 2",
806 "SyncDataSrc::close 1",
807 "SyncDataSrc::drop 1",
808 ],
809 );
810 }
811
812 #[test]
813 fn test_of_setup_but_error() {
814 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
815
816 {
817 let mut manager = DataSrcManager::new(true);
818
819 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
820 manager.add("foo", ds1);
821
822 let ds2 = SyncDataSrc::new(2, logger.clone(), true);
823 manager.add("bar", ds2);
824
825 let ds3 = SyncDataSrc::new(3, logger.clone(), true);
826 manager.add("bar", ds3);
827
828 assert!(manager.local);
829 assert_eq!(manager.vec_unready.len(), 3);
830 assert_eq!(manager.vec_ready.len(), 0);
831
832 let mut vec = Vec::new();
833 manager.setup(&mut vec);
834
835 assert!(manager.local);
836 assert_eq!(manager.vec_unready.len(), 3);
837 assert_eq!(manager.vec_ready.len(), 0);
838 }
839
840 assert_eq!(
841 *logger.lock().unwrap(),
842 vec![
843 "SyncDataSrc::new 1",
844 "SyncDataSrc::new 2",
845 "SyncDataSrc::new 3",
846 "SyncDataSrc::setup 1",
847 "SyncDataSrc::setup 2 failed",
848 "SyncDataSrc::close 2",
849 "SyncDataSrc::close 1",
850 "SyncDataSrc::drop 3",
851 "SyncDataSrc::drop 2",
852 "SyncDataSrc::drop 1",
853 ],
854 );
855 }
856
857 #[test]
858 fn test_of_setup_with_order_and_ok() {
859 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
860
861 {
862 let mut manager = DataSrcManager::new(true);
863
864 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
865 manager.add("foo", ds1);
866
867 let ds2 = SyncDataSrc::new(2, logger.clone(), false);
868 manager.add("bar", ds2);
869
870 let ds3 = SyncDataSrc::new(3, logger.clone(), false);
871 manager.add("baz", ds3);
872
873 assert!(manager.local);
874 assert_eq!(manager.vec_unready.len(), 3);
875 assert_eq!(manager.vec_ready.len(), 0);
876
877 let mut vec = Vec::new();
878 manager.setup_with_order(&["baz", "foo"], &mut vec);
879
880 assert!(manager.local);
881 assert_eq!(manager.vec_unready.len(), 0);
882 assert_eq!(manager.vec_ready.len(), 3);
883 }
884
885 assert_eq!(
886 *logger.lock().unwrap(),
887 vec![
888 "SyncDataSrc::new 1",
889 "SyncDataSrc::new 2",
890 "SyncDataSrc::new 3",
891 "SyncDataSrc::setup 3",
892 "SyncDataSrc::setup 1",
893 "SyncDataSrc::setup 2",
894 "SyncDataSrc::close 2",
895 "SyncDataSrc::drop 2",
896 "SyncDataSrc::close 1",
897 "SyncDataSrc::drop 1",
898 "SyncDataSrc::close 3",
899 "SyncDataSrc::drop 3",
900 ],
901 );
902 }
903
904 #[test]
905 fn test_of_setup_with_order_but_fail() {
906 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
907
908 {
909 let mut manager = DataSrcManager::new(true);
910
911 let ds1 = SyncDataSrc::new(1, logger.clone(), true);
912 manager.add("foo", ds1);
913
914 let ds2 = SyncDataSrc::new(2, logger.clone(), true);
915 manager.add("bar", ds2);
916
917 let ds3 = SyncDataSrc::new(3, logger.clone(), false);
918 manager.add("baz", ds3);
919
920 assert!(manager.local);
921 assert_eq!(manager.vec_unready.len(), 3);
922 assert_eq!(manager.vec_ready.len(), 0);
923
924 let mut vec = Vec::new();
925 manager.setup_with_order(&["baz", "foo"], &mut vec);
926
927 assert!(manager.local);
928 assert_eq!(manager.vec_unready.len(), 3);
929 assert_eq!(manager.vec_ready.len(), 0);
930 }
931
932 assert_eq!(
933 *logger.lock().unwrap(),
934 vec![
935 "SyncDataSrc::new 1",
936 "SyncDataSrc::new 2",
937 "SyncDataSrc::new 3",
938 "SyncDataSrc::setup 3",
939 "SyncDataSrc::setup 1 failed",
940 "SyncDataSrc::close 1",
941 "SyncDataSrc::close 3",
942 "SyncDataSrc::drop 2",
943 "SyncDataSrc::drop 1",
944 "SyncDataSrc::drop 3",
945 ],
946 );
947 }
948
949 #[test]
950 fn test_of_setup_with_order_containing_duplicated_name_and_ok() {
951 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
952
953 {
954 let mut manager = DataSrcManager::new(true);
955
956 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
957 manager.add("foo", ds1);
958
959 let ds2 = SyncDataSrc::new(2, logger.clone(), false);
960 manager.add("bar", ds2);
961
962 let ds3 = SyncDataSrc::new(3, logger.clone(), false);
963 manager.add("baz", ds3);
964
965 assert!(manager.local);
966 assert_eq!(manager.vec_unready.len(), 3);
967 assert_eq!(manager.vec_ready.len(), 0);
968
969 let mut vec = Vec::new();
970 manager.setup_with_order(&["baz", "foo", "baz"], &mut vec);
971
972 assert!(manager.local);
973 assert_eq!(manager.vec_unready.len(), 0);
974 assert_eq!(manager.vec_ready.len(), 3);
975 }
976
977 assert_eq!(
978 *logger.lock().unwrap(),
979 vec![
980 "SyncDataSrc::new 1",
981 "SyncDataSrc::new 2",
982 "SyncDataSrc::new 3",
983 "SyncDataSrc::setup 3",
984 "SyncDataSrc::setup 1",
985 "SyncDataSrc::setup 2",
986 "SyncDataSrc::close 2",
987 "SyncDataSrc::drop 2",
988 "SyncDataSrc::close 1",
989 "SyncDataSrc::drop 1",
990 "SyncDataSrc::close 3",
991 "SyncDataSrc::drop 3",
992 ],
993 );
994 }
995
996 #[test]
997 fn test_of_copy_ds_ready_to_map() {
998 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
999 let mut errors = Vec::new();
1000
1001 let mut index_map = HashMap::<Arc<str>, (bool, usize)>::new();
1002
1003 let manager = DataSrcManager::new(true);
1004 manager.copy_ds_ready_to_map(&mut index_map);
1005 assert!(index_map.is_empty());
1006
1007 let mut manager = DataSrcManager::new(true);
1008 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
1009 manager.add("foo", ds1);
1010 manager.setup(&mut errors);
1011 assert!(errors.is_empty());
1012 manager.copy_ds_ready_to_map(&mut index_map);
1013 assert_eq!(index_map.len(), 1);
1014 assert_eq!(index_map.get("foo").unwrap(), &(true, 0));
1015
1016 let mut manager = DataSrcManager::new(false);
1017 let ds2 = AsyncDataSrc::new(2, logger.clone(), false, 0);
1018 let ds3 = SyncDataSrc::new(3, logger.clone(), false);
1019 manager.add("bar", ds2);
1020 manager.add("baz", ds3);
1021 manager.setup(&mut errors);
1022 assert!(errors.is_empty());
1023 manager.copy_ds_ready_to_map(&mut index_map);
1024 assert_eq!(index_map.len(), 3);
1025 assert_eq!(index_map.get("foo").unwrap(), &(true, 0));
1026 assert_eq!(index_map.get("bar").unwrap(), &(false, 0));
1027 assert_eq!(index_map.get("baz").unwrap(), &(false, 1));
1028 }
1029
1030 #[test]
1031 fn test_of_create_data_conn_and_ok() {
1032 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1033 let mut errors = Vec::new();
1034
1035 let mut manager = DataSrcManager::new(true);
1036 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
1037 manager.add("foo", ds1);
1038 manager.setup(&mut errors);
1039
1040 if let Ok(boxed) = manager.create_data_conn::<SyncDataConn>(0, "foo") {
1041 assert_eq!(boxed.name.clone(), "foo".into());
1042 } else {
1043 panic!();
1044 }
1045 }
1046
1047 #[test]
1048 fn test_of_create_data_conn_but_not_found() {
1049 let mut errors = Vec::new();
1050
1051 let mut manager = DataSrcManager::new(true);
1052 manager.setup(&mut errors);
1053
1054 if let Err(err) = manager.create_data_conn::<SyncDataConn>(0, "foo") {
1055 match err.reason::<DataSrcError>() {
1056 Ok(DataSrcError::NotFoundDataSrcToCreateDataConn {
1057 name,
1058 data_conn_type,
1059 }) => {
1060 assert_eq!(*name, "foo".into());
1061 assert_eq!(
1062 *data_conn_type,
1063 "sabi::data_src::tests_of_data_src::SyncDataConn"
1064 );
1065 }
1066 _ => panic!(),
1067 }
1068 } else {
1069 panic!();
1070 }
1071 }
1072
1073 #[test]
1074 fn test_of_create_data_conn_but_fail_to_cast() {
1075 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1076 let mut errors = Vec::new();
1077
1078 let mut manager = DataSrcManager::new(true);
1079 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
1080 manager.add("foo", ds1);
1081 manager.setup(&mut errors);
1082
1083 if let Err(err) = manager.create_data_conn::<AsyncDataConn>(0, "foo") {
1084 match err.reason::<DataSrcError>() {
1085 Ok(DataSrcError::FailToCastDataConn { name, target_type }) => {
1086 assert_eq!(*name, "foo".into());
1087 assert_eq!(
1088 *target_type,
1089 "sabi::data_src::tests_of_data_src::AsyncDataConn"
1090 );
1091 }
1092 _ => panic!(),
1093 }
1094 } else {
1095 panic!();
1096 }
1097 }
1098
1099 #[test]
1100 fn test_of_create_data_conn_but_fail_to_create() {
1101 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1102 let mut errors = Vec::new();
1103
1104 let mut manager = DataSrcManager::new(true);
1105 let ds1 = SyncDataSrc::new_for_fail_to_create_data_conn(1, logger.clone());
1106 manager.add("foo", ds1);
1107 manager.setup(&mut errors);
1108
1109 if let Err(err) = manager.create_data_conn::<SyncDataConn>(0, "foo") {
1110 match err.reason::<DataSrcError>() {
1111 Ok(DataSrcError::FailToCreateDataConn {
1112 name,
1113 data_conn_type,
1114 }) => {
1115 assert_eq!(*name, "foo".into());
1116 assert_eq!(
1117 *data_conn_type,
1118 "sabi::data_src::tests_of_data_src::SyncDataConn"
1119 );
1120 }
1121 _ => panic!(),
1122 }
1123 } else {
1124 panic!();
1125 }
1126 }
1127}