1mod global_setup;
6
7pub(crate) use global_setup::{
8 copy_global_data_srcs_to_map, create_data_conn_from_global_data_src,
9};
10pub use global_setup::{create_static_data_src_container, setup, setup_with_order, uses};
11
12use crate::{
13 AsyncGroup, DataConn, DataConnContainer, DataSrc, DataSrcContainer, DataSrcManager,
14 SendSyncNonNull,
15};
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::{any, mem, ptr};
20
21#[derive(Debug)]
23pub enum DataSrcError {
24 FailToSetupGlobalDataSrcs {
27 errors: Vec<(Arc<str>, errs::Err)>,
29 },
30
31 DuringSetupGlobalDataSrcs,
33
34 AlreadySetupGlobalDataSrcs,
36
37 FailToCastDataConn {
39 name: Arc<str>,
41 target_type: &'static str,
43 },
44
45 FailToCreateDataConn {
47 name: Arc<str>,
49 data_conn_type: &'static str,
51 },
52
53 NotFoundDataSrcToCreateDataConn {
56 name: Arc<str>,
58 data_conn_type: &'static str,
60 },
61}
62
63impl<S, C> DataSrcContainer<S, C>
64where
65 S: DataSrc<C>,
66 C: DataConn + 'static,
67{
68 pub(crate) fn new(name: impl Into<Arc<str>>, data_src: S, local: bool) -> Self {
69 Self {
70 drop_fn: drop_data_src::<S, C>,
71 setup_fn: setup_data_src::<S, C>,
72 close_fn: close_data_src::<S, C>,
73 create_data_conn_fn: create_data_conn::<S, C>,
74 is_data_conn_fn: is_data_conn::<C>,
75
76 local,
77 name: name.into(),
78 data_src,
79 }
80 }
81}
82
83fn drop_data_src<S, C>(ptr: *const DataSrcContainer)
84where
85 S: DataSrc<C>,
86 C: DataConn + 'static,
87{
88 let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
89 drop(unsafe { Box::from_raw(typed_ptr) });
90}
91
92fn setup_data_src<S, C>(ptr: *const DataSrcContainer, ag: &mut AsyncGroup) -> errs::Result<()>
93where
94 S: DataSrc<C>,
95 C: DataConn + 'static,
96{
97 let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
98 unsafe { (*typed_ptr).data_src.setup(ag) }
99}
100
101fn close_data_src<S, C>(ptr: *const DataSrcContainer)
102where
103 S: DataSrc<C>,
104 C: DataConn + 'static,
105{
106 let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
107 unsafe { (*typed_ptr).data_src.close() };
108}
109
110fn create_data_conn<S, C>(ptr: *const DataSrcContainer) -> errs::Result<Box<DataConnContainer<C>>>
111where
112 S: DataSrc<C>,
113 C: DataConn + 'static,
114{
115 let typed_ptr = ptr as *mut DataSrcContainer<S, C>;
116 let conn: Box<C> = unsafe { (*typed_ptr).data_src.create_data_conn() }?;
117 let name = unsafe { &(*typed_ptr).name };
118 Ok(Box::new(DataConnContainer::<C>::new(
119 name.to_string(),
120 conn,
121 )))
122}
123
124fn is_data_conn<C>(type_id: any::TypeId) -> bool
125where
126 C: DataConn + 'static,
127{
128 any::TypeId::of::<C>() == type_id
129}
130
131impl DataSrcManager {
132 pub(crate) const fn new(local: bool) -> Self {
133 Self {
134 vec_unready: Vec::new(),
135 vec_ready: Vec::new(),
136 local,
137 }
138 }
139
140 pub(crate) fn prepend(&mut self, vec: Vec<SendSyncNonNull<DataSrcContainer>>) {
141 self.vec_unready.splice(0..0, vec);
142 }
143
144 pub(crate) fn add<S, C>(&mut self, name: impl Into<Arc<str>>, ds: S)
145 where
146 S: DataSrc<C>,
147 C: DataConn + 'static,
148 {
149 let boxed = Box::new(DataSrcContainer::<S, C>::new(name, ds, self.local));
150 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
151 self.vec_unready.push(SendSyncNonNull::new(ptr));
152 }
153
154 pub(crate) fn remove(&mut self, name: impl AsRef<str>) {
155 let extracted_vec: Vec<_> = self
156 .vec_ready
157 .extract_if(.., |ssnnptr| {
158 unsafe { &(*ssnnptr.non_null_ptr.as_ptr()).name }.as_ref() == name.as_ref()
159 })
160 .collect();
161
162 for ssnnptr in extracted_vec.iter().rev() {
163 let ptr = ssnnptr.non_null_ptr.as_ptr();
164 let close_fn = unsafe { (*ptr).close_fn };
165 let drop_fn = unsafe { (*ptr).drop_fn };
166 close_fn(ptr);
167 drop_fn(ptr);
168 }
169
170 let extracted_vec: Vec<_> = self
171 .vec_unready
172 .extract_if(.., |ssnnptr| {
173 unsafe { &(*ssnnptr.non_null_ptr.as_ptr()).name }.as_ref() == name.as_ref()
174 })
175 .collect();
176
177 for ssnnptr in extracted_vec.iter().rev() {
178 let ptr = ssnnptr.non_null_ptr.as_ptr();
179 let drop_fn = unsafe { (*ptr).drop_fn };
180 drop_fn(ptr);
181 }
182 }
183
184 pub(crate) fn close(&mut self) {
185 let vec = mem::take(&mut self.vec_ready);
186 for ssnnptr in vec.into_iter().rev() {
187 let ptr = ssnnptr.non_null_ptr.as_ptr();
188 let close_fn = unsafe { (*ptr).close_fn };
189 let drop_fn = unsafe { (*ptr).drop_fn };
190 close_fn(ptr);
191 drop_fn(ptr);
192 }
193 let vec = mem::take(&mut self.vec_unready);
194 for ssnnptr in vec.into_iter().rev() {
195 let ptr = ssnnptr.non_null_ptr.as_ptr();
196 let drop_fn = unsafe { (*ptr).drop_fn };
197 drop_fn(ptr);
198 }
199 }
200
201 pub(crate) fn setup(&mut self, errors: &mut Vec<(Arc<str>, errs::Err)>) {
202 if self.vec_unready.is_empty() {
203 return;
204 }
205
206 let mut n_done = 0;
207 let mut ag = AsyncGroup::new();
208 for ssnnptr in self.vec_unready.iter() {
209 n_done += 1;
210 let ptr = ssnnptr.non_null_ptr.as_ptr();
211 let setup_fn = unsafe { (*ptr).setup_fn };
212 ag._name = unsafe { (*ptr).name.clone() };
213 if let Err(err) = setup_fn(ptr, &mut ag) {
214 errors.push((ag._name.clone(), err));
215 break;
216 }
217 }
218 ag.join_and_collect_errors(errors);
219
220 if errors.is_empty() {
221 self.vec_ready.append(&mut self.vec_unready);
222 } else {
223 for ssnnptr in self.vec_unready[0..n_done].iter().rev() {
224 let ptr = ssnnptr.non_null_ptr.as_ptr();
225 let close_fn = unsafe { (*ptr).close_fn };
226 close_fn(ptr);
227 }
228 }
229 }
230
231 pub(crate) fn setup_with_order(
232 &mut self,
233 names: &[&str],
234 errors: &mut Vec<(Arc<str>, errs::Err)>,
235 ) {
236 if self.vec_unready.is_empty() {
237 return;
238 }
239
240 let mut index_map: HashMap<&str, usize> = HashMap::with_capacity(names.len());
241 for (i, nm) in names.iter().rev().enumerate() {
243 index_map.insert(*nm, names.len() - 1 - i);
244 }
245
246 let vec_unready = mem::take(&mut self.vec_unready);
247
248 let mut ordered_vec: Vec<Option<SendSyncNonNull<DataSrcContainer>>> =
249 vec![None; index_map.len()];
250 for ssnnptr in vec_unready.into_iter() {
251 let ptr = ssnnptr.non_null_ptr.as_ptr();
252 let name = unsafe { (*ptr).name.clone() };
253 if let Some(index) = index_map.remove(name.as_ref()) {
254 ordered_vec[index] = Some(ssnnptr);
255 } else {
256 ordered_vec.push(Some(ssnnptr));
257 }
258 }
259
260 let mut n_done = 0;
261 let mut ag = AsyncGroup::new();
262 for ssnnptr_opt in ordered_vec.iter() {
263 n_done += 1;
264 if let Some(ssnnptr) = ssnnptr_opt {
265 let ptr = ssnnptr.non_null_ptr.as_ptr();
266 let setup_fn = unsafe { (*ptr).setup_fn };
267 ag._name = unsafe { (*ptr).name.clone() };
268 if let Err(err) = setup_fn(ptr, &mut ag) {
269 errors.push((ag._name.clone(), err));
270 break;
271 }
272 }
273 }
274 ag.join_and_collect_errors(errors);
275
276 if errors.is_empty() {
277 for ssnnptr in ordered_vec.into_iter().flatten() {
278 self.vec_ready.push(ssnnptr);
279 }
280 } else {
281 for ssnnptr in ordered_vec[0..n_done].iter().flatten().rev() {
282 let ptr = ssnnptr.non_null_ptr.as_ptr();
283 let close_fn = unsafe { (*ptr).close_fn };
284 close_fn(ptr);
285 }
286 for ssnnptr in ordered_vec.into_iter().flatten() {
287 self.vec_unready.push(ssnnptr);
288 }
289 }
290 }
291
292 pub(crate) fn copy_ds_ready_to_map(&self, index_map: &mut HashMap<Arc<str>, (bool, usize)>) {
293 for (i, ssnnptr) in self.vec_ready.iter().enumerate() {
294 let ptr = ssnnptr.non_null_ptr.as_ptr();
295 let name = unsafe { (*ptr).name.clone() };
296 index_map.insert(name, (self.local, i));
297 }
298 }
299
300 pub(crate) fn create_data_conn<C>(
301 &self,
302 index: usize,
303 name: impl AsRef<str>,
304 ) -> errs::Result<Box<DataConnContainer>>
305 where
306 C: DataConn + 'static,
307 {
308 if let Some(ssnnptr) = self.vec_ready.get(index) {
309 let ptr = ssnnptr.non_null_ptr.as_ptr();
310 let type_id = any::TypeId::of::<C>();
311 let is_fn = unsafe { (*ptr).is_data_conn_fn };
312 let create_data_conn_fn = unsafe { (*ptr).create_data_conn_fn };
313 if !is_fn(type_id) {
314 Err(errs::Err::new(DataSrcError::FailToCastDataConn {
315 name: name.as_ref().into(),
316 target_type: any::type_name::<C>(),
317 }))
318 } else {
319 match create_data_conn_fn(ptr) {
320 Ok(boxed) => Ok(boxed),
321 Err(err) => Err(errs::Err::with_source(
322 DataSrcError::FailToCreateDataConn {
323 name: name.as_ref().into(),
324 data_conn_type: any::type_name::<C>(),
325 },
326 err,
327 )),
328 }
329 }
330 } else {
331 Err(errs::Err::new(
332 DataSrcError::NotFoundDataSrcToCreateDataConn {
333 name: name.as_ref().into(),
334 data_conn_type: any::type_name::<C>(),
335 },
336 ))
337 }
338 }
339}
340
341impl Drop for DataSrcManager {
342 fn drop(&mut self) {
343 self.close();
344 }
345}
346
347#[cfg(test)]
348mod tests_of_data_src {
349 use super::*;
350 use std::sync::{Arc, Mutex};
351
352 struct SyncDataConn {}
353 impl SyncDataConn {
354 fn new() -> Self {
355 Self {}
356 }
357 }
358 impl DataConn for SyncDataConn {
359 fn commit(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
360 Ok(())
361 }
362 fn rollback(&mut self, _ag: &mut AsyncGroup) {}
363 fn close(&mut self) {}
364 }
365
366 struct AsyncDataConn {}
367 impl AsyncDataConn {
368 fn new() -> Self {
369 Self {}
370 }
371 }
372 impl DataConn for AsyncDataConn {
373 fn commit(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
374 Ok(())
375 }
376 fn rollback(&mut self, _ag: &mut AsyncGroup) {}
377 fn close(&mut self) {}
378 }
379
380 struct SyncDataSrc {
381 id: i8,
382 logger: Arc<Mutex<Vec<String>>>,
383 fail_to_setup: bool,
384 fail_to_create_data_conn: bool,
385 }
386 impl SyncDataSrc {
387 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail_to_setup: bool) -> Self {
388 logger
389 .lock()
390 .unwrap()
391 .push(format!("SyncDataSrc::new {}", id));
392 Self {
393 id,
394 logger: logger,
395 fail_to_setup,
396 fail_to_create_data_conn: false,
397 }
398 }
399 fn new_for_fail_to_create_data_conn(id: i8, logger: Arc<Mutex<Vec<String>>>) -> Self {
400 Self {
401 id,
402 logger: logger,
403 fail_to_setup: false,
404 fail_to_create_data_conn: true,
405 }
406 }
407 }
408 impl Drop for SyncDataSrc {
409 fn drop(&mut self) {
410 self.logger
411 .lock()
412 .unwrap()
413 .push(format!("SyncDataSrc::drop {}", self.id));
414 }
415 }
416 impl DataSrc<SyncDataConn> for SyncDataSrc {
417 fn setup(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
418 if self.fail_to_setup {
419 self.logger
420 .lock()
421 .unwrap()
422 .push(format!("SyncDataSrc::setup {} failed", self.id));
423 return Err(errs::Err::new("XXX".to_string()));
424 }
425 self.logger
426 .lock()
427 .unwrap()
428 .push(format!("SyncDataSrc::setup {}", self.id));
429 Ok(())
430 }
431 fn close(&mut self) {
432 self.logger
433 .lock()
434 .unwrap()
435 .push(format!("SyncDataSrc::close {}", self.id));
436 }
437 fn create_data_conn(&mut self) -> errs::Result<Box<SyncDataConn>> {
438 {
439 self.logger
440 .lock()
441 .unwrap()
442 .push(format!("SyncDataSrc::create_data_conn {}", self.id));
443 }
444 if self.fail_to_create_data_conn {
445 return Err(errs::Err::new("eeee".to_string()));
446 }
447 let conn = SyncDataConn::new();
448 Ok(Box::new(conn))
449 }
450 }
451
452 struct AsyncDataSrc {
453 id: i8,
454 fail: bool,
455 logger: Arc<Mutex<Vec<String>>>,
456 wait: u64,
457 }
458 impl AsyncDataSrc {
459 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: bool, wait: u64) -> Self {
460 logger
461 .lock()
462 .unwrap()
463 .push(format!("AsyncDataSrc::new {}", id));
464 Self {
465 id,
466 fail,
467 logger,
468 wait,
469 }
470 }
471 }
472 impl Drop for AsyncDataSrc {
473 fn drop(&mut self) {
474 self.logger
475 .lock()
476 .unwrap()
477 .push(format!("AsyncDataSrc::drop {}", self.id));
478 }
479 }
480 impl DataSrc<AsyncDataConn> for AsyncDataSrc {
481 fn setup(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
482 let logger = self.logger.clone();
483 let fail = self.fail;
484 let id = self.id;
485 let wait = self.wait;
486 ag.add(move || {
487 std::thread::sleep(std::time::Duration::from_millis(wait));
488 let mut logger = logger.lock().unwrap();
489 if fail {
490 logger.push(format!("AsyncDataSrc::setup {} failed to setup", id));
491 return Err(errs::Err::new("XXX".to_string()));
492 }
493 logger.push(format!("AsyncDataSrc::setup {}", id));
494 Ok(())
495 });
496 Ok(())
497 }
498 fn close(&mut self) {
499 self.logger
500 .lock()
501 .unwrap()
502 .push(format!("AsyncDataSrc::close {}", self.id));
503 }
504 fn create_data_conn(&mut self) -> errs::Result<Box<AsyncDataConn>> {
505 {
506 self.logger
507 .lock()
508 .unwrap()
509 .push(format!("AsyncDataSrc::create_data_conn {}", self.id));
510 }
511 let conn = AsyncDataConn::new();
512 Ok(Box::new(conn))
513 }
514 }
515
516 #[test]
517 fn test_of_new() {
518 let manager = DataSrcManager::new(true);
519 assert!(manager.local);
520 assert_eq!(manager.vec_unready.len(), 0);
521 assert_eq!(manager.vec_ready.len(), 0);
522
523 let manager = DataSrcManager::new(false);
524 assert!(!manager.local);
525 assert_eq!(manager.vec_unready.len(), 0);
526 assert_eq!(manager.vec_ready.len(), 0);
527 }
528
529 #[test]
530 fn test_of_prepend() {
531 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
532
533 {
534 let mut vec = Vec::<SendSyncNonNull<DataSrcContainer>>::new();
535
536 let ds = SyncDataSrc::new(1, logger.clone(), false);
537 let boxed = Box::new(DataSrcContainer::new("foo", ds, true));
538 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
539 vec.push(SendSyncNonNull::new(ptr));
540
541 let ds = AsyncDataSrc::new(2, logger.clone(), false, 0);
542 let boxed = Box::new(DataSrcContainer::new("bar", ds, true));
543 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
544 vec.push(SendSyncNonNull::new(ptr));
545
546 let mut manager = DataSrcManager::new(true);
547 manager.prepend(vec);
548
549 assert!(manager.local);
550 assert_eq!(manager.vec_unready.len(), 2);
551 assert_eq!(manager.vec_ready.len(), 0);
552
553 assert_eq!(
554 unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
555 "foo".into()
556 );
557 assert_eq!(
558 unsafe { manager.vec_unready[1].non_null_ptr.as_ref().name.clone() },
559 "bar".into()
560 );
561
562 let mut vec = Vec::<SendSyncNonNull<DataSrcContainer>>::new();
563
564 let ds = SyncDataSrc::new(3, logger.clone(), false);
565 let boxed = Box::new(DataSrcContainer::new("baz", ds, true));
566 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
567 vec.push(SendSyncNonNull::new(ptr));
568
569 let ds = AsyncDataSrc::new(4, logger.clone(), false, 0);
570 let boxed = Box::new(DataSrcContainer::new("qux", ds, true));
571 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
572 vec.push(SendSyncNonNull::new(ptr));
573
574 manager.prepend(vec);
575
576 assert!(manager.local);
577 assert_eq!(manager.vec_unready.len(), 4);
578 assert_eq!(manager.vec_ready.len(), 0);
579
580 assert_eq!(
581 unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
582 "baz".into()
583 );
584 assert_eq!(
585 unsafe { manager.vec_unready[1].non_null_ptr.as_ref().name.clone() },
586 "qux".into()
587 );
588 assert_eq!(
589 unsafe { manager.vec_unready[2].non_null_ptr.as_ref().name.clone() },
590 "foo".into()
591 );
592 assert_eq!(
593 unsafe { manager.vec_unready[3].non_null_ptr.as_ref().name.clone() },
594 "bar".into()
595 );
596 }
597
598 assert_eq!(
599 *logger.lock().unwrap(),
600 vec![
601 "SyncDataSrc::new 1",
602 "AsyncDataSrc::new 2",
603 "SyncDataSrc::new 3",
604 "AsyncDataSrc::new 4",
605 "AsyncDataSrc::drop 2",
606 "SyncDataSrc::drop 1",
607 "AsyncDataSrc::drop 4",
608 "SyncDataSrc::drop 3",
609 ],
610 );
611 }
612
613 #[test]
614 fn test_of_add() {
615 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
616
617 {
618 let mut manager = DataSrcManager::new(true);
619
620 let ds = SyncDataSrc::new(1, logger.clone(), false);
621 manager.add("foo", ds);
622
623 assert!(manager.local);
624 assert_eq!(manager.vec_unready.len(), 1);
625 assert_eq!(manager.vec_ready.len(), 0);
626
627 assert_eq!(
628 unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
629 "foo".into()
630 );
631
632 let ds = AsyncDataSrc::new(2, logger.clone(), false, 0);
633 manager.add("bar", ds);
634
635 assert!(manager.local);
636 assert_eq!(manager.vec_unready.len(), 2);
637 assert_eq!(manager.vec_ready.len(), 0);
638
639 assert_eq!(
640 unsafe { manager.vec_unready[0].non_null_ptr.as_ref().name.clone() },
641 "foo".into()
642 );
643 assert_eq!(
644 unsafe { manager.vec_unready[1].non_null_ptr.as_ref().name.clone() },
645 "bar".into()
646 );
647 }
648
649 assert_eq!(
650 *logger.lock().unwrap(),
651 vec![
652 "SyncDataSrc::new 1",
653 "AsyncDataSrc::new 2",
654 "AsyncDataSrc::drop 2",
655 "SyncDataSrc::drop 1",
656 ],
657 );
658 }
659
660 #[test]
661 fn test_of_remove() {
662 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
663
664 {
665 let mut manager = DataSrcManager::new(true);
666
667 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
668 let boxed = Box::new(DataSrcContainer::new("foo", ds1, true));
669 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
670 manager.vec_unready.push(SendSyncNonNull::new(ptr));
671
672 let ds2 = AsyncDataSrc::new(2, logger.clone(), false, 0);
673 let boxed = Box::new(DataSrcContainer::new("bar", ds2, true));
674 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
675 manager.vec_unready.push(SendSyncNonNull::new(ptr));
676
677 let ds3 = SyncDataSrc::new(3, logger.clone(), false);
678 let boxed = Box::new(DataSrcContainer::new("baz", ds3, true));
679 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
680 manager.vec_ready.push(SendSyncNonNull::new(ptr));
681
682 let ds4 = AsyncDataSrc::new(4, logger.clone(), false, 0);
683 let boxed = Box::new(DataSrcContainer::new("qux", ds4, true));
684 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
685 manager.vec_ready.push(SendSyncNonNull::new(ptr));
686
687 assert!(manager.local);
688 assert_eq!(manager.vec_unready.len(), 2);
689 assert_eq!(manager.vec_ready.len(), 2);
690
691 manager.remove("baz");
692 manager.remove("foo");
693 manager.remove("qux");
694 manager.remove("bar");
695 }
696
697 assert_eq!(
698 *logger.lock().unwrap(),
699 vec![
700 "SyncDataSrc::new 1",
701 "AsyncDataSrc::new 2",
702 "SyncDataSrc::new 3",
703 "AsyncDataSrc::new 4",
704 "SyncDataSrc::close 3",
705 "SyncDataSrc::drop 3",
706 "SyncDataSrc::drop 1",
707 "AsyncDataSrc::close 4",
708 "AsyncDataSrc::drop 4",
709 "AsyncDataSrc::drop 2",
710 ],
711 );
712 }
713
714 #[test]
715 fn test_of_close() {
716 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
717
718 {
719 let mut manager = DataSrcManager::new(true);
720
721 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
722 let boxed = Box::new(DataSrcContainer::new("foo", ds1, true));
723 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
724 manager.vec_unready.push(SendSyncNonNull::new(ptr));
725
726 let ds2 = AsyncDataSrc::new(2, logger.clone(), false, 0);
727 let boxed = Box::new(DataSrcContainer::new("bar", ds2, true));
728 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
729 manager.vec_unready.push(SendSyncNonNull::new(ptr));
730
731 let ds3 = SyncDataSrc::new(3, logger.clone(), false);
732 let boxed = Box::new(DataSrcContainer::new("baz", ds3, true));
733 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
734 manager.vec_ready.push(SendSyncNonNull::new(ptr));
735
736 let ds4 = AsyncDataSrc::new(4, logger.clone(), false, 0);
737 let boxed = Box::new(DataSrcContainer::new("qux", ds4, true));
738 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
739 manager.vec_ready.push(SendSyncNonNull::new(ptr));
740
741 assert!(manager.local);
742 assert_eq!(manager.vec_unready.len(), 2);
743 assert_eq!(manager.vec_ready.len(), 2);
744
745 manager.close();
746 }
747
748 assert_eq!(
749 *logger.lock().unwrap(),
750 vec![
751 "SyncDataSrc::new 1",
752 "AsyncDataSrc::new 2",
753 "SyncDataSrc::new 3",
754 "AsyncDataSrc::new 4",
755 "AsyncDataSrc::close 4",
756 "AsyncDataSrc::drop 4",
757 "SyncDataSrc::close 3",
758 "SyncDataSrc::drop 3",
759 "AsyncDataSrc::drop 2",
760 "SyncDataSrc::drop 1",
761 ],
762 );
763 }
764
765 #[test]
766 fn test_of_setup_and_ok() {
767 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
768
769 {
770 let mut manager = DataSrcManager::new(true);
771
772 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
773 manager.add("foo", ds1);
774
775 let ds2 = SyncDataSrc::new(2, logger.clone(), false);
776 manager.add("bar", ds2);
777
778 assert!(manager.local);
779 assert_eq!(manager.vec_unready.len(), 2);
780 assert_eq!(manager.vec_ready.len(), 0);
781
782 let mut vec = Vec::new();
783 manager.setup(&mut vec);
784
785 assert!(manager.local);
786 assert_eq!(manager.vec_unready.len(), 0);
787 assert_eq!(manager.vec_ready.len(), 2);
788 }
789
790 assert_eq!(
791 *logger.lock().unwrap(),
792 vec![
793 "SyncDataSrc::new 1",
794 "SyncDataSrc::new 2",
795 "SyncDataSrc::setup 1",
796 "SyncDataSrc::setup 2",
797 "SyncDataSrc::close 2",
798 "SyncDataSrc::drop 2",
799 "SyncDataSrc::close 1",
800 "SyncDataSrc::drop 1",
801 ],
802 );
803 }
804
805 #[test]
806 fn test_of_setup_but_error() {
807 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
808
809 {
810 let mut manager = DataSrcManager::new(true);
811
812 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
813 manager.add("foo", ds1);
814
815 let ds2 = SyncDataSrc::new(2, logger.clone(), true);
816 manager.add("bar", ds2);
817
818 let ds3 = SyncDataSrc::new(3, logger.clone(), true);
819 manager.add("bar", ds3);
820
821 assert!(manager.local);
822 assert_eq!(manager.vec_unready.len(), 3);
823 assert_eq!(manager.vec_ready.len(), 0);
824
825 let mut vec = Vec::new();
826 manager.setup(&mut vec);
827
828 assert!(manager.local);
829 assert_eq!(manager.vec_unready.len(), 3);
830 assert_eq!(manager.vec_ready.len(), 0);
831 }
832
833 assert_eq!(
834 *logger.lock().unwrap(),
835 vec![
836 "SyncDataSrc::new 1",
837 "SyncDataSrc::new 2",
838 "SyncDataSrc::new 3",
839 "SyncDataSrc::setup 1",
840 "SyncDataSrc::setup 2 failed",
841 "SyncDataSrc::close 2",
842 "SyncDataSrc::close 1",
843 "SyncDataSrc::drop 3",
844 "SyncDataSrc::drop 2",
845 "SyncDataSrc::drop 1",
846 ],
847 );
848 }
849
850 #[test]
851 fn test_of_setup_with_order_and_ok() {
852 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
853
854 {
855 let mut manager = DataSrcManager::new(true);
856
857 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
858 manager.add("foo", ds1);
859
860 let ds2 = SyncDataSrc::new(2, logger.clone(), false);
861 manager.add("bar", ds2);
862
863 let ds3 = SyncDataSrc::new(3, logger.clone(), false);
864 manager.add("baz", ds3);
865
866 assert!(manager.local);
867 assert_eq!(manager.vec_unready.len(), 3);
868 assert_eq!(manager.vec_ready.len(), 0);
869
870 let mut vec = Vec::new();
871 manager.setup_with_order(&["baz", "foo"], &mut vec);
872
873 assert!(manager.local);
874 assert_eq!(manager.vec_unready.len(), 0);
875 assert_eq!(manager.vec_ready.len(), 3);
876 }
877
878 assert_eq!(
879 *logger.lock().unwrap(),
880 vec![
881 "SyncDataSrc::new 1",
882 "SyncDataSrc::new 2",
883 "SyncDataSrc::new 3",
884 "SyncDataSrc::setup 3",
885 "SyncDataSrc::setup 1",
886 "SyncDataSrc::setup 2",
887 "SyncDataSrc::close 2",
888 "SyncDataSrc::drop 2",
889 "SyncDataSrc::close 1",
890 "SyncDataSrc::drop 1",
891 "SyncDataSrc::close 3",
892 "SyncDataSrc::drop 3",
893 ],
894 );
895 }
896
897 #[test]
898 fn test_of_setup_with_order_but_fail() {
899 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
900
901 {
902 let mut manager = DataSrcManager::new(true);
903
904 let ds1 = SyncDataSrc::new(1, logger.clone(), true);
905 manager.add("foo", ds1);
906
907 let ds2 = SyncDataSrc::new(2, logger.clone(), true);
908 manager.add("bar", ds2);
909
910 let ds3 = SyncDataSrc::new(3, logger.clone(), false);
911 manager.add("baz", ds3);
912
913 assert!(manager.local);
914 assert_eq!(manager.vec_unready.len(), 3);
915 assert_eq!(manager.vec_ready.len(), 0);
916
917 let mut vec = Vec::new();
918 manager.setup_with_order(&["baz", "foo"], &mut vec);
919
920 assert!(manager.local);
921 assert_eq!(manager.vec_unready.len(), 3);
922 assert_eq!(manager.vec_ready.len(), 0);
923 }
924
925 assert_eq!(
926 *logger.lock().unwrap(),
927 vec![
928 "SyncDataSrc::new 1",
929 "SyncDataSrc::new 2",
930 "SyncDataSrc::new 3",
931 "SyncDataSrc::setup 3",
932 "SyncDataSrc::setup 1 failed",
933 "SyncDataSrc::close 1",
934 "SyncDataSrc::close 3",
935 "SyncDataSrc::drop 2",
936 "SyncDataSrc::drop 1",
937 "SyncDataSrc::drop 3",
938 ],
939 );
940 }
941
942 #[test]
943 fn test_of_setup_with_order_containing_duplicated_name_and_ok() {
944 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
945
946 {
947 let mut manager = DataSrcManager::new(true);
948
949 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
950 manager.add("foo", ds1);
951
952 let ds2 = SyncDataSrc::new(2, logger.clone(), false);
953 manager.add("bar", ds2);
954
955 let ds3 = SyncDataSrc::new(3, logger.clone(), false);
956 manager.add("baz", ds3);
957
958 assert!(manager.local);
959 assert_eq!(manager.vec_unready.len(), 3);
960 assert_eq!(manager.vec_ready.len(), 0);
961
962 let mut vec = Vec::new();
963 manager.setup_with_order(&["baz", "foo", "baz"], &mut vec);
964
965 assert!(manager.local);
966 assert_eq!(manager.vec_unready.len(), 0);
967 assert_eq!(manager.vec_ready.len(), 3);
968 }
969
970 assert_eq!(
971 *logger.lock().unwrap(),
972 vec![
973 "SyncDataSrc::new 1",
974 "SyncDataSrc::new 2",
975 "SyncDataSrc::new 3",
976 "SyncDataSrc::setup 3",
977 "SyncDataSrc::setup 1",
978 "SyncDataSrc::setup 2",
979 "SyncDataSrc::close 2",
980 "SyncDataSrc::drop 2",
981 "SyncDataSrc::close 1",
982 "SyncDataSrc::drop 1",
983 "SyncDataSrc::close 3",
984 "SyncDataSrc::drop 3",
985 ],
986 );
987 }
988
989 #[test]
990 fn test_of_copy_ds_ready_to_map() {
991 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
992 let mut errors = Vec::new();
993
994 let mut index_map = HashMap::<Arc<str>, (bool, usize)>::new();
995
996 let manager = DataSrcManager::new(true);
997 manager.copy_ds_ready_to_map(&mut index_map);
998 assert!(index_map.is_empty());
999
1000 let mut manager = DataSrcManager::new(true);
1001 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
1002 manager.add("foo", ds1);
1003 manager.setup(&mut errors);
1004 assert!(errors.is_empty());
1005 manager.copy_ds_ready_to_map(&mut index_map);
1006 assert_eq!(index_map.len(), 1);
1007 assert_eq!(index_map.get("foo").unwrap(), &(true, 0));
1008
1009 let mut manager = DataSrcManager::new(false);
1010 let ds2 = AsyncDataSrc::new(2, logger.clone(), false, 0);
1011 let ds3 = SyncDataSrc::new(3, logger.clone(), false);
1012 manager.add("bar", ds2);
1013 manager.add("baz", ds3);
1014 manager.setup(&mut errors);
1015 assert!(errors.is_empty());
1016 manager.copy_ds_ready_to_map(&mut index_map);
1017 assert_eq!(index_map.len(), 3);
1018 assert_eq!(index_map.get("foo").unwrap(), &(true, 0));
1019 assert_eq!(index_map.get("bar").unwrap(), &(false, 0));
1020 assert_eq!(index_map.get("baz").unwrap(), &(false, 1));
1021 }
1022
1023 #[test]
1024 fn test_of_create_data_conn_and_ok() {
1025 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1026 let mut errors = Vec::new();
1027
1028 let mut manager = DataSrcManager::new(true);
1029 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
1030 manager.add("foo", ds1);
1031 manager.setup(&mut errors);
1032
1033 if let Ok(boxed) = manager.create_data_conn::<SyncDataConn>(0, "foo") {
1034 assert_eq!(boxed.name.clone(), "foo".into());
1035 } else {
1036 panic!();
1037 }
1038 }
1039
1040 #[test]
1041 fn test_of_create_data_conn_but_not_found() {
1042 let mut errors = Vec::new();
1043
1044 let mut manager = DataSrcManager::new(true);
1045 manager.setup(&mut errors);
1046
1047 if let Err(err) = manager.create_data_conn::<SyncDataConn>(0, "foo") {
1048 match err.reason::<DataSrcError>() {
1049 Ok(DataSrcError::NotFoundDataSrcToCreateDataConn {
1050 name,
1051 data_conn_type,
1052 }) => {
1053 assert_eq!(*name, "foo".into());
1054 assert_eq!(
1055 *data_conn_type,
1056 "sabi::data_src::tests_of_data_src::SyncDataConn"
1057 );
1058 }
1059 _ => panic!(),
1060 }
1061 } else {
1062 panic!();
1063 }
1064 }
1065
1066 #[test]
1067 fn test_of_create_data_conn_but_fail_to_cast() {
1068 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1069 let mut errors = Vec::new();
1070
1071 let mut manager = DataSrcManager::new(true);
1072 let ds1 = SyncDataSrc::new(1, logger.clone(), false);
1073 manager.add("foo", ds1);
1074 manager.setup(&mut errors);
1075
1076 if let Err(err) = manager.create_data_conn::<AsyncDataConn>(0, "foo") {
1077 match err.reason::<DataSrcError>() {
1078 Ok(DataSrcError::FailToCastDataConn { name, target_type }) => {
1079 assert_eq!(*name, "foo".into());
1080 assert_eq!(
1081 *target_type,
1082 "sabi::data_src::tests_of_data_src::AsyncDataConn"
1083 );
1084 }
1085 _ => panic!(),
1086 }
1087 } else {
1088 panic!();
1089 }
1090 }
1091
1092 #[test]
1093 fn test_of_create_data_conn_but_fail_to_create() {
1094 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1095 let mut errors = Vec::new();
1096
1097 let mut manager = DataSrcManager::new(true);
1098 let ds1 = SyncDataSrc::new_for_fail_to_create_data_conn(1, logger.clone());
1099 manager.add("foo", ds1);
1100 manager.setup(&mut errors);
1101
1102 if let Err(err) = manager.create_data_conn::<SyncDataConn>(0, "foo") {
1103 match err.reason::<DataSrcError>() {
1104 Ok(DataSrcError::FailToCreateDataConn {
1105 name,
1106 data_conn_type,
1107 }) => {
1108 assert_eq!(*name, "foo".into());
1109 assert_eq!(
1110 *data_conn_type,
1111 "sabi::data_src::tests_of_data_src::SyncDataConn"
1112 );
1113 }
1114 _ => panic!(),
1115 }
1116 } else {
1117 panic!();
1118 }
1119 }
1120}