1use std::any;
6use std::collections::HashMap;
7use std::sync;
8
9use errs::Err;
10
11use crate::data_conn::DataConnList;
12use crate::data_src::DataSrcList;
13use crate::{AsyncGroup, DataConn, DataConnContainer, DataSrc, DataSrcContainer};
14
15static mut GLOBAL_DATA_SRC_LIST: DataSrcList = DataSrcList::new(false);
16
17#[cfg(not(test))]
18static GLOBAL_DATA_SRCS_FIXED: sync::OnceLock<()> = sync::OnceLock::new();
19#[cfg(test)]
20static GLOBAL_DATA_SRCS_FIXED: sync::atomic::AtomicBool = sync::atomic::AtomicBool::new(false);
21
22#[derive(Debug)]
24pub enum DataHubError {
25 FailToSetupGlobalDataSrcs {
28 errors: HashMap<String, Err>,
30 },
31
32 FailToSetupLocalDataSrcs {
35 errors: HashMap<String, Err>,
37 },
38
39 FailToCommitDataConn {
42 errors: HashMap<String, Err>,
44 },
45
46 FailToPreCommitDataConn {
49 errors: HashMap<String, Err>,
51 },
52
53 NoDataSrcToCreateDataConn {
56 name: String,
58
59 data_conn_type: &'static str,
61 },
62
63 FailToCreateDataConn {
65 name: String,
67
68 data_conn_type: &'static str,
70 },
71
72 FailToCastDataConn {
74 name: String,
76
77 cast_to_type: &'static str,
79 },
80}
81
82pub fn uses<S, C>(name: &str, ds: S)
96where
97 S: DataSrc<C>,
98 C: DataConn + 'static,
99{
100 #[cfg(not(test))]
101 let fixed = GLOBAL_DATA_SRCS_FIXED.get().is_some();
102 #[cfg(test)]
103 let fixed = GLOBAL_DATA_SRCS_FIXED.load(sync::atomic::Ordering::Relaxed);
104
105 if !fixed {
106 #[allow(static_mut_refs)]
107 unsafe {
108 GLOBAL_DATA_SRC_LIST.add_data_src(name.to_string(), ds);
109 }
110 }
111}
112
113pub fn setup() -> Result<AutoShutdown, Err> {
140 #[cfg(not(test))]
141 let ok = GLOBAL_DATA_SRCS_FIXED.set(()).is_ok();
142 #[cfg(test)]
143 let ok = GLOBAL_DATA_SRCS_FIXED
144 .compare_exchange(
145 false,
146 true,
147 sync::atomic::Ordering::Relaxed,
148 sync::atomic::Ordering::Relaxed,
149 )
150 .is_ok();
151
152 if ok {
153 #[allow(static_mut_refs)]
154 let err_map = unsafe { GLOBAL_DATA_SRC_LIST.setup_data_srcs() };
155 if err_map.len() > 0 {
156 #[allow(static_mut_refs)]
157 unsafe {
158 GLOBAL_DATA_SRC_LIST.close_and_drop_data_srcs();
159 }
160 return Err(Err::new(DataHubError::FailToSetupGlobalDataSrcs {
161 errors: err_map,
162 }));
163 }
164 }
165
166 Ok(AutoShutdown {})
167}
168
169pub async fn setup_async() -> Result<AutoShutdown, Err> {
196 #[cfg(not(test))]
197 let ok = GLOBAL_DATA_SRCS_FIXED.set(()).is_ok();
198 #[cfg(test)]
199 let ok = GLOBAL_DATA_SRCS_FIXED
200 .compare_exchange(
201 false,
202 true,
203 sync::atomic::Ordering::Relaxed,
204 sync::atomic::Ordering::Relaxed,
205 )
206 .is_ok();
207
208 if ok {
209 #[allow(static_mut_refs)]
210 let err_map = unsafe { GLOBAL_DATA_SRC_LIST.setup_data_srcs_async() }.await;
211 if err_map.len() > 0 {
212 #[allow(static_mut_refs)]
213 unsafe {
214 GLOBAL_DATA_SRC_LIST.close_and_drop_data_srcs();
215 }
216 return Err(Err::new(DataHubError::FailToSetupGlobalDataSrcs {
217 errors: err_map,
218 }));
219 }
220 }
221
222 Ok(AutoShutdown {})
223}
224
225pub struct AutoShutdown {}
235
236impl Drop for AutoShutdown {
237 fn drop(&mut self) {
238 #[allow(static_mut_refs)]
239 unsafe {
240 GLOBAL_DATA_SRC_LIST.close_and_drop_data_srcs();
241 }
242 }
243}
244
245pub struct DataHub {
256 local_data_src_list: DataSrcList,
257 data_src_map: HashMap<String, *mut DataSrcContainer>,
258 data_conn_list: DataConnList,
259 data_conn_map: HashMap<String, *mut DataConnContainer>,
260 fixed: bool,
261}
262
263impl DataHub {
264 pub fn new() -> Self {
270 #[cfg(not(test))]
271 let _ = GLOBAL_DATA_SRCS_FIXED.set(());
272 #[cfg(test)]
273 GLOBAL_DATA_SRCS_FIXED.store(true, sync::atomic::Ordering::Relaxed);
274
275 let mut data_src_map = HashMap::new();
276
277 #[allow(static_mut_refs)]
278 unsafe {
279 GLOBAL_DATA_SRC_LIST.copy_container_ptrs_did_setup_into(&mut data_src_map);
280 }
281
282 Self {
283 local_data_src_list: DataSrcList::new(true),
284 data_src_map,
285 data_conn_list: DataConnList::new(),
286 data_conn_map: HashMap::new(),
287 fixed: false,
288 }
289 }
290
291 pub fn uses<S, C>(&mut self, name: &str, ds: S)
305 where
306 S: DataSrc<C>,
307 C: DataConn + 'static,
308 {
309 if self.fixed {
310 return;
311 }
312
313 self.local_data_src_list.add_data_src(name.to_string(), ds);
314 }
315
316 pub fn disuses(&mut self, name: &str) {
325 if self.fixed {
326 return;
327 }
328
329 self.data_src_map
330 .retain(|nm, p| unsafe { !(*(*p)).local } || nm != name);
331 self.local_data_src_list
332 .remove_and_drop_container_ptr_did_setup_by_name(name);
333 self.local_data_src_list
334 .remove_and_drop_container_ptr_not_setup_by_name(name);
335 }
336
337 #[doc(hidden)]
338 pub fn begin(&mut self) -> Result<(), Err> {
339 self.fixed = true;
340
341 let err_map = self.local_data_src_list.setup_data_srcs();
342
343 self.local_data_src_list
344 .copy_container_ptrs_did_setup_into(&mut self.data_src_map);
345
346 if err_map.len() > 0 {
347 return Err(Err::new(DataHubError::FailToSetupLocalDataSrcs {
348 errors: err_map,
349 }));
350 }
351
352 Ok(())
353 }
354
355 #[doc(hidden)]
356 pub async fn begin_async(&mut self) -> Result<(), Err> {
357 self.fixed = true;
358
359 let err_map = self.local_data_src_list.setup_data_srcs_async().await;
360
361 self.local_data_src_list
362 .copy_container_ptrs_did_setup_into(&mut self.data_src_map);
363
364 if err_map.len() > 0 {
365 return Err(Err::new(DataHubError::FailToSetupLocalDataSrcs {
366 errors: err_map,
367 }));
368 }
369
370 Ok(())
371 }
372
373 #[doc(hidden)]
374 pub fn commit(&mut self) -> Result<(), Err> {
375 let mut err_map = HashMap::new();
376
377 let mut ag = AsyncGroup::new();
378
379 let mut ptr = self.data_conn_list.head();
380 while !ptr.is_null() {
381 let pre_commit_fn = unsafe { (*ptr).pre_commit_fn };
382 let name = unsafe { &(*ptr).name };
383 let next = unsafe { (*ptr).next };
384
385 ag.name = name;
386
387 if let Err(err) = pre_commit_fn(ptr, &mut ag) {
388 err_map.insert(name.to_string(), err);
389 break;
390 }
391
392 ptr = next;
393 }
394
395 ag.join_and_collect_errors(&mut err_map);
396
397 if !err_map.is_empty() {
398 return Err(Err::new(DataHubError::FailToPreCommitDataConn {
399 errors: err_map,
400 }));
401 }
402
403 let mut ag = AsyncGroup::new();
404
405 let mut ptr = self.data_conn_list.head();
406 while !ptr.is_null() {
407 let commit_fn = unsafe { (*ptr).commit_fn };
408 let name = unsafe { &(*ptr).name };
409 let next = unsafe { (*ptr).next };
410
411 ag.name = name;
412
413 if let Err(err) = commit_fn(ptr, &mut ag) {
414 err_map.insert(name.to_string(), err);
415 break;
416 }
417
418 ptr = next;
419 }
420
421 ag.join_and_collect_errors(&mut err_map);
422
423 if !err_map.is_empty() {
424 return Err(Err::new(DataHubError::FailToCommitDataConn {
425 errors: err_map,
426 }));
427 }
428
429 let mut ag = AsyncGroup::new();
430
431 let mut ptr = self.data_conn_list.head();
432 while !ptr.is_null() {
433 let post_commit_fn = unsafe { (*ptr).post_commit_fn };
434 let name = unsafe { &(*ptr).name };
435 let next = unsafe { (*ptr).next };
436
437 ag.name = name;
438
439 post_commit_fn(ptr, &mut ag);
440
441 ptr = next;
442 }
443
444 ag.join_and_ignore_errors();
445
446 return Ok(());
447 }
448
449 #[doc(hidden)]
450 pub async fn commit_async(&mut self) -> Result<(), Err> {
451 let mut err_map = HashMap::new();
452
453 let mut ag = AsyncGroup::new();
454
455 let mut ptr = self.data_conn_list.head();
456 while !ptr.is_null() {
457 let pre_commit_fn = unsafe { (*ptr).pre_commit_fn };
458 let name = unsafe { &(*ptr).name };
459 let next = unsafe { (*ptr).next };
460
461 ag.name = name;
462
463 if let Err(err) = pre_commit_fn(ptr, &mut ag) {
464 err_map.insert(name.to_string(), err);
465 break;
466 }
467
468 ptr = next;
469 }
470
471 ag.join_and_collect_errors_async(&mut err_map).await;
472
473 if !err_map.is_empty() {
474 return Err(Err::new(DataHubError::FailToPreCommitDataConn {
475 errors: err_map,
476 }));
477 }
478
479 let mut ag = AsyncGroup::new();
480
481 let mut ptr = self.data_conn_list.head();
482 while !ptr.is_null() {
483 let commit_fn = unsafe { (*ptr).commit_fn };
484 let name = unsafe { &(*ptr).name };
485 let next = unsafe { (*ptr).next };
486
487 ag.name = name;
488
489 if let Err(err) = commit_fn(ptr, &mut ag) {
490 err_map.insert(name.to_string(), err);
491 break;
492 }
493
494 ptr = next;
495 }
496
497 ag.join_and_collect_errors_async(&mut err_map).await;
498
499 if !err_map.is_empty() {
500 return Err(Err::new(DataHubError::FailToCommitDataConn {
501 errors: err_map,
502 }));
503 }
504
505 let mut ag = AsyncGroup::new();
506
507 let mut ptr = self.data_conn_list.head();
508 while !ptr.is_null() {
509 let post_commit_fn = unsafe { (*ptr).post_commit_fn };
510 let name = unsafe { &(*ptr).name };
511 let next = unsafe { (*ptr).next };
512
513 ag.name = name;
514
515 post_commit_fn(ptr, &mut ag);
516
517 ptr = next;
518 }
519
520 ag.join_and_ignore_errors_async().await;
521
522 return Ok(());
523 }
524
525 #[doc(hidden)]
526 pub fn rollback(&mut self) {
527 let mut ag = AsyncGroup::new();
528
529 let mut ptr = self.data_conn_list.head();
530 while !ptr.is_null() {
531 let should_force_back_fn = unsafe { (*ptr).should_force_back_fn };
532 let force_back_fn = unsafe { (*ptr).force_back_fn };
533 let rollback_fn = unsafe { (*ptr).rollback_fn };
534 let name = unsafe { &(*ptr).name };
535 let next = unsafe { (*ptr).next };
536
537 ag.name = name;
538
539 if should_force_back_fn(ptr) {
540 force_back_fn(ptr, &mut ag);
541 } else {
542 rollback_fn(ptr, &mut ag);
543 }
544
545 ptr = next;
546 }
547
548 ag.join_and_ignore_errors();
549 }
550
551 #[doc(hidden)]
552 pub async fn rollback_async(&mut self) {
553 let mut ag = AsyncGroup::new();
554
555 let mut ptr = self.data_conn_list.head();
556 while !ptr.is_null() {
557 let should_force_back_fn = unsafe { (*ptr).should_force_back_fn };
558 let force_back_fn = unsafe { (*ptr).force_back_fn };
559 let rollback_fn = unsafe { (*ptr).rollback_fn };
560 let name = unsafe { &(*ptr).name };
561 let next = unsafe { (*ptr).next };
562
563 ag.name = name;
564
565 if should_force_back_fn(ptr) {
566 force_back_fn(ptr, &mut ag);
567 } else {
568 rollback_fn(ptr, &mut ag);
569 }
570
571 ptr = next;
572 }
573
574 ag.join_and_ignore_errors_async().await;
575 }
576
577 #[doc(hidden)]
578 pub fn end(&mut self) {
579 self.data_conn_map.clear();
580 self.data_conn_list.close_and_drop_data_conns();
581 self.fixed = false;
582 }
583
584 pub fn get_data_conn<C>(&mut self, name: &str) -> Result<&mut C, Err>
605 where
606 C: DataConn + 'static,
607 {
608 match self.data_conn_map.get(name) {
609 Some(conn_ptr) => {
610 let type_id = any::TypeId::of::<C>();
611 let is_fn = unsafe { (*(*conn_ptr)).is_fn };
612 if !is_fn(type_id) {
613 return Err(Err::new(DataHubError::FailToCastDataConn {
614 name: name.to_string(),
615 cast_to_type: any::type_name::<C>(),
616 }));
617 }
618 let typed_ptr = (*conn_ptr) as *mut DataConnContainer<C>;
619 return Ok(unsafe { &mut ((*typed_ptr).data_conn) });
620 }
621 None => match self.data_src_map.get(name) {
622 Some(src_ptr) => {
623 let type_id = any::TypeId::of::<C>();
624 let is_data_conn_fn = unsafe { (*(*src_ptr)).is_data_conn_fn };
625 if !is_data_conn_fn(type_id) {
626 return Err(Err::new(DataHubError::FailToCastDataConn {
627 name: name.to_string(),
628 cast_to_type: any::type_name::<C>(),
629 }));
630 }
631
632 let create_data_conn_fn = unsafe { (*(*src_ptr)).create_data_conn_fn };
633 match create_data_conn_fn(*src_ptr) {
634 Ok(boxed) => {
635 let raw_ptr = Box::into_raw(boxed);
636 let conn_ptr = raw_ptr.cast::<DataConnContainer>();
637
638 self.data_conn_list.append_container_ptr(conn_ptr);
639 self.data_conn_map.insert(name.to_string(), conn_ptr);
640
641 let typed_ptr = raw_ptr.cast::<DataConnContainer<C>>();
642 return Ok(unsafe { &mut (*typed_ptr).data_conn });
643 }
644 Err(err) => {
645 return Err(Err::with_source(
646 DataHubError::FailToCreateDataConn {
647 name: name.to_string(),
648 data_conn_type: any::type_name::<C>(),
649 },
650 err,
651 ));
652 }
653 }
654 }
655 None => {
656 return Err(Err::new(DataHubError::NoDataSrcToCreateDataConn {
657 name: name.to_string(),
658 data_conn_type: any::type_name::<C>(),
659 }));
660 }
661 },
662 }
663 }
664}
665
666impl Drop for DataHub {
667 fn drop(&mut self) {
668 self.data_conn_map.clear();
669 self.data_conn_list.close_and_drop_data_conns();
670
671 self.data_src_map.clear();
672 self.local_data_src_list.close_and_drop_data_srcs();
673 }
674}
675
676#[macro_export]
693macro_rules! run {
694 ($logic_fn:expr, $hub:expr) => {{
695 let hub = &mut ($hub);
696 let mut r = hub.begin();
697 if r.is_ok() {
698 r = ($logic_fn)(hub);
699 }
700 hub.end();
701 r
702 }};
703}
704
705#[macro_export]
725macro_rules! txn {
726 ($logic_fn:expr, $hub:expr) => {{
727 let hub = &mut ($hub);
728 let mut r = hub.begin();
729 if r.is_ok() {
730 r = ($logic_fn)(hub);
731 }
732 if r.is_ok() {
733 r = hub.commit();
734 }
735 if r.is_err() {
736 hub.rollback();
737 }
738 hub.end();
739 r
740 }};
741}
742
743#[macro_export]
760macro_rules! run_async {
761 ($logic_fn:expr, $hub:expr) => {
762 async {
763 let hub = &mut ($hub);
764 let mut r = hub.begin_async().await;
765 if r.is_ok() {
766 r = ($logic_fn)(hub).await;
767 }
768 hub.end();
769 r
770 }
771 };
772}
773
774#[macro_export]
794macro_rules! txn_async {
795 ($logic_fn:expr, $hub:expr) => {
796 async {
797 let hub = &mut ($hub);
798 let mut r = hub.begin_async().await;
799 if r.is_ok() {
800 r = ($logic_fn)(hub).await;
801 }
802 if r.is_ok() {
803 r = hub.commit_async().await;
804 }
805 if r.is_err() {
806 hub.rollback_async().await;
807 }
808 hub.end();
809 r
810 }
811 };
812}
813
814#[cfg(test)]
815pub(crate) fn clear_global_data_srcs_fixed() {
816 GLOBAL_DATA_SRCS_FIXED.store(false, sync::atomic::Ordering::Relaxed);
817}
818
819#[cfg(test)]
820pub(crate) static TEST_SEQ: sync::LazyLock<sync::Mutex<()>> =
821 sync::LazyLock::new(|| sync::Mutex::new(()));
822
823#[cfg(test)]
824mod tests_data_hub {
825 use super::*;
826 use std::sync::{Arc, Mutex};
827 use tokio::time;
828
829 #[derive(PartialEq, Copy, Clone)]
830 enum Fail {
831 Not,
832 Setup,
833 CreateDataConn,
834 Commit,
835 PreCommit,
836 }
837
838 struct SyncDataSrc {
839 id: i8,
840 fail: Fail,
841 logger: Arc<Mutex<Vec<String>>>,
842 }
843
844 impl SyncDataSrc {
845 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: Fail) -> Self {
846 Self { id, fail, logger }
847 }
848 }
849
850 impl Drop for SyncDataSrc {
851 fn drop(&mut self) {
852 let mut logger = self.logger.lock().unwrap();
853 logger.push(format!("SyncDataSrc {} dropped", self.id));
854 }
855 }
856
857 impl DataSrc<SyncDataConn> for SyncDataSrc {
858 fn setup(&mut self, _ag: &mut AsyncGroup) -> Result<(), Err> {
859 let mut logger = self.logger.lock().unwrap();
860 if self.fail == Fail::Setup {
861 logger.push(format!("SyncDataSrc {} failed to setup", self.id));
862 return Err(Err::new("XXX".to_string()));
863 }
864 logger.push(format!("SyncDataSrc {} setupped", self.id));
865 Ok(())
866 }
867
868 fn close(&mut self) {
869 let mut logger = self.logger.lock().unwrap();
870 logger.push(format!("SyncDataSrc {} closed", self.id));
871 }
872
873 fn create_data_conn(&mut self) -> Result<Box<SyncDataConn>, Err> {
874 let mut logger = self.logger.lock().unwrap();
875 if self.fail == Fail::CreateDataConn {
876 logger.push(format!(
877 "SyncDataSrc {} failed to create a DataConn",
878 self.id
879 ));
880 return Err(Err::new("xxx".to_string()));
881 }
882 logger.push(format!("SyncDataSrc {} created DataConn", self.id));
883 let conn = SyncDataConn::new(self.id, self.logger.clone(), self.fail);
884 Ok(Box::new(conn))
885 }
886 }
887
888 struct AsyncDataSrc {
889 id: i8,
890 fail: Fail,
891 logger: Arc<Mutex<Vec<String>>>,
892 }
893
894 impl AsyncDataSrc {
895 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: Fail) -> Self {
896 Self { id, fail, logger }
897 }
898 }
899
900 impl Drop for AsyncDataSrc {
901 fn drop(&mut self) {
902 let mut logger = self.logger.lock().unwrap();
903 logger.push(format!("AsyncDataSrc {} dropped", self.id));
904 }
905 }
906
907 impl DataSrc<AsyncDataConn> for AsyncDataSrc {
908 fn setup(&mut self, ag: &mut AsyncGroup) -> Result<(), Err> {
909 let fail = self.fail;
910 let logger = self.logger.clone();
911 let id = self.id;
912
913 ag.add(async move {
914 let _ = time::sleep(time::Duration::from_millis(100)).await;
916
917 if fail == Fail::Setup {
918 logger
919 .lock()
920 .unwrap()
921 .push(format!("AsyncDataSrc {} failed to setup", id));
922 return Err(Err::new("YYY".to_string()));
923 }
924
925 logger
926 .lock()
927 .unwrap()
928 .push(format!("AsyncDataSrc {} setupped", id));
929 Ok(())
930 });
931 Ok(())
932 }
933
934 fn close(&mut self) {
935 let mut logger = self.logger.lock().unwrap();
936 logger.push(format!("AsyncDataSrc {} closed", self.id));
937 }
938
939 fn create_data_conn(&mut self) -> Result<Box<AsyncDataConn>, Err> {
940 let mut logger = self.logger.lock().unwrap();
941 logger.push(format!("AsyncDataSrc {} created DataConn", self.id));
942 let conn = AsyncDataConn::new(self.id, self.logger.clone(), self.fail);
943 Ok(Box::new(conn))
944 }
945 }
946
947 struct SyncDataConn {
948 id: i8,
949 committed: bool,
950 fail: Fail,
951 logger: Arc<Mutex<Vec<String>>>,
952 }
953
954 impl SyncDataConn {
955 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: Fail) -> Self {
956 Self {
957 id,
958 committed: false,
959 fail,
960 logger,
961 }
962 }
963 }
964
965 impl Drop for SyncDataConn {
966 fn drop(&mut self) {
967 let mut logger = self.logger.lock().unwrap();
968 logger.push(format!("SyncDataConn {} dropped", self.id));
969 }
970 }
971
972 impl DataConn for SyncDataConn {
973 fn commit(&mut self, _ag: &mut AsyncGroup) -> Result<(), Err> {
974 let mut logger = self.logger.lock().unwrap();
975 if self.fail == Fail::Commit {
976 logger.push(format!("SyncDataConn {} failed to commit", self.id));
977 return Err(Err::new("ZZZ".to_string()));
978 }
979 self.committed = true;
980 logger.push(format!("SyncDataConn {} committed", self.id));
981 Ok(())
982 }
983
984 fn pre_commit(&mut self, _ag: &mut AsyncGroup) -> Result<(), Err> {
985 let mut logger = self.logger.lock().unwrap();
986 if self.fail == Fail::PreCommit {
987 logger.push(format!("SyncDataConn {} failed to pre commit", self.id));
988 return Err(Err::new("zzz".to_string()));
989 }
990 logger.push(format!("SyncDataConn {} pre committed", self.id));
991 Ok(())
992 }
993
994 fn post_commit(&mut self, _ag: &mut AsyncGroup) {
995 let mut logger = self.logger.lock().unwrap();
996 logger.push(format!("SyncDataConn {} post committed", self.id));
997 }
998
999 fn should_force_back(&self) -> bool {
1000 self.committed
1001 }
1002
1003 fn rollback(&mut self, _ag: &mut AsyncGroup) {
1004 let mut logger = self.logger.lock().unwrap();
1005 logger.push(format!("SyncDataConn {} rollbacked", self.id));
1006 }
1007
1008 fn force_back(&mut self, _ag: &mut AsyncGroup) {
1009 let mut logger = self.logger.lock().unwrap();
1010 logger.push(format!("SyncDataConn {} forced back", self.id));
1011 }
1012
1013 fn close(&mut self) {
1014 let mut logger = self.logger.lock().unwrap();
1015 logger.push(format!("SyncDataConn {} closed", self.id));
1016 }
1017 }
1018
1019 struct AsyncDataConn {
1020 id: i8,
1021 committed: bool,
1022 fail: Fail,
1023 logger: Arc<Mutex<Vec<String>>>,
1024 }
1025
1026 impl AsyncDataConn {
1027 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: Fail) -> Self {
1028 Self {
1029 id,
1030 committed: false,
1031 fail,
1032 logger,
1033 }
1034 }
1035 }
1036
1037 impl Drop for AsyncDataConn {
1038 fn drop(&mut self) {
1039 let mut logger = self.logger.lock().unwrap();
1040 logger.push(format!("AsyncDataConn {} dropped", self.id));
1041 }
1042 }
1043
1044 impl DataConn for AsyncDataConn {
1045 fn commit(&mut self, _ag: &mut AsyncGroup) -> Result<(), Err> {
1046 let mut logger = self.logger.lock().unwrap();
1047 if self.fail == Fail::Commit {
1048 logger.push(format!("AsyncDataConn {} failed to commit", self.id));
1049 return Err(Err::new("VVV".to_string()));
1050 }
1051 self.committed = true;
1052 logger.push(format!("AsyncDataConn {} committed", self.id));
1053 Ok(())
1054 }
1055
1056 fn pre_commit(&mut self, _ag: &mut AsyncGroup) -> Result<(), Err> {
1057 let mut logger = self.logger.lock().unwrap();
1058 if self.fail == Fail::PreCommit {
1059 logger.push(format!("AsyncDataConn {} failed to pre commit", self.id));
1060 return Err(Err::new("vvv".to_string()));
1061 }
1062 logger.push(format!("AsyncDataConn {} pre committed", self.id));
1063 Ok(())
1064 }
1065
1066 fn post_commit(&mut self, _ag: &mut AsyncGroup) {
1067 let mut logger = self.logger.lock().unwrap();
1068 logger.push(format!("AsyncDataConn {} post committed", self.id));
1069 }
1070
1071 fn should_force_back(&self) -> bool {
1072 self.committed
1073 }
1074
1075 fn rollback(&mut self, _ag: &mut AsyncGroup) {
1076 let mut logger = self.logger.lock().unwrap();
1077 logger.push(format!("AsyncDataConn {} rollbacked", self.id));
1078 }
1079
1080 fn force_back(&mut self, _ag: &mut AsyncGroup) {
1081 let mut logger = self.logger.lock().unwrap();
1082 logger.push(format!("AsyncDataConn {} forced back", self.id));
1083 }
1084
1085 fn close(&mut self) {
1086 let mut logger = self.logger.lock().unwrap();
1087 logger.push(format!("AsyncDataConn {} closed", self.id));
1088 }
1089 }
1090
1091 mod tests_of_global_functions {
1092 use super::*;
1093
1094 #[test]
1095 fn test_setup_and_shutdown() {
1096 let _unused = TEST_SEQ.lock().unwrap();
1097 clear_global_data_srcs_fixed();
1098
1099 #[allow(static_mut_refs)]
1100 unsafe {
1101 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1102 assert!(ptr.is_null());
1103
1104 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1105 assert!(ptr.is_null());
1106 }
1107
1108 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1109
1110 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
1111 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
1112
1113 #[allow(static_mut_refs)]
1114 unsafe {
1115 let mut ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1116 assert!(!ptr.is_null());
1117 assert_eq!((*ptr).name, "foo");
1118 ptr = (*ptr).next;
1119 assert!(!ptr.is_null());
1120 assert_eq!((*ptr).name, "bar");
1121 ptr = (*ptr).next;
1122 assert!(ptr.is_null());
1123
1124 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1125 assert!(ptr.is_null());
1126 }
1127
1128 {
1129 let result = setup();
1130 assert!(result.is_ok());
1131
1132 #[allow(static_mut_refs)]
1133 unsafe {
1134 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1135 assert!(ptr.is_null());
1136
1137 let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1138 assert!(!ptr.is_null());
1139 assert_eq!((*ptr).name, "foo");
1140 ptr = (*ptr).next;
1141 assert!(!ptr.is_null());
1142 assert_eq!((*ptr).name, "bar");
1143 ptr = (*ptr).next;
1144 assert!(ptr.is_null());
1145 }
1146 }
1147
1148 #[allow(static_mut_refs)]
1149 unsafe {
1150 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1151 assert!(ptr.is_null());
1152
1153 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1154 assert!(ptr.is_null());
1155 }
1156
1157 assert_eq!(
1158 *logger.lock().unwrap(),
1159 vec![
1160 "SyncDataSrc 2 setupped",
1161 "AsyncDataSrc 1 setupped",
1162 "SyncDataSrc 2 closed",
1163 "SyncDataSrc 2 dropped",
1164 "AsyncDataSrc 1 closed",
1165 "AsyncDataSrc 1 dropped",
1166 ],
1167 );
1168 }
1169
1170 #[test]
1171 fn test_shutdown_later() {
1172 let _unused = TEST_SEQ.lock().unwrap();
1173 clear_global_data_srcs_fixed();
1174
1175 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1176
1177 {
1178 #[allow(static_mut_refs)]
1179 unsafe {
1180 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1181 assert!(ptr.is_null());
1182
1183 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1184 assert!(ptr.is_null());
1185 }
1186
1187 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
1188 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
1189
1190 #[allow(static_mut_refs)]
1191 unsafe {
1192 let mut ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1193 assert!(!ptr.is_null());
1194 assert_eq!((*ptr).name, "foo");
1195 ptr = (*ptr).next;
1196 assert!(!ptr.is_null());
1197 assert_eq!((*ptr).name, "bar");
1198 ptr = (*ptr).next;
1199 assert!(ptr.is_null());
1200
1201 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1202 assert!(ptr.is_null());
1203 }
1204
1205 let result = setup();
1206 assert!(result.is_ok());
1207
1208 #[allow(static_mut_refs)]
1209 unsafe {
1210 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1211 assert!(ptr.is_null());
1212
1213 let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1214 assert!(!ptr.is_null());
1215 assert_eq!((*ptr).name, "foo");
1216 ptr = (*ptr).next;
1217 assert!(!ptr.is_null());
1218 assert_eq!((*ptr).name, "bar");
1219 ptr = (*ptr).next;
1220 assert!(ptr.is_null());
1221 }
1222 }
1223
1224 #[allow(static_mut_refs)]
1225 unsafe {
1226 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1227 assert!(ptr.is_null());
1228
1229 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1230 assert!(ptr.is_null());
1231 }
1232
1233 assert_eq!(
1234 *logger.lock().unwrap(),
1235 vec![
1236 "SyncDataSrc 2 setupped",
1237 "AsyncDataSrc 1 setupped",
1238 "SyncDataSrc 2 closed",
1239 "SyncDataSrc 2 dropped",
1240 "AsyncDataSrc 1 closed",
1241 "AsyncDataSrc 1 dropped",
1242 ],
1243 );
1244 }
1245
1246 #[test]
1247 fn test_fail_to_setup() {
1248 let _unused = TEST_SEQ.lock().unwrap();
1249 clear_global_data_srcs_fixed();
1250
1251 #[allow(static_mut_refs)]
1252 unsafe {
1253 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1254 assert!(ptr.is_null());
1255
1256 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1257 assert!(ptr.is_null());
1258 }
1259
1260 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1261
1262 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Setup));
1263 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Setup));
1264
1265 #[allow(static_mut_refs)]
1266 unsafe {
1267 let mut ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1268 assert!(!ptr.is_null());
1269 assert_eq!((*ptr).name, "foo");
1270 ptr = (*ptr).next;
1271 assert!(!ptr.is_null());
1272 assert_eq!((*ptr).name, "bar");
1273 ptr = (*ptr).next;
1274 assert!(ptr.is_null());
1275
1276 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1277 assert!(ptr.is_null());
1278 }
1279
1280 match setup() {
1281 Ok(_) => panic!(),
1282 Err(err) => match err.reason::<DataHubError>() {
1283 Ok(r) => match r {
1284 DataHubError::FailToSetupGlobalDataSrcs { errors } => {
1285 let err = errors.get("foo").unwrap();
1286 match err.reason::<String>() {
1287 Ok(s) => assert_eq!(s, "YYY"),
1288 Err(_) => panic!(),
1289 }
1290 let err = errors.get("bar").unwrap();
1291 match err.reason::<String>() {
1292 Ok(s) => assert_eq!(s, "XXX"),
1293 Err(_) => panic!(),
1294 }
1295 }
1296 _ => panic!(),
1297 },
1298 Err(_) => panic!(),
1299 },
1300 }
1301
1302 #[allow(static_mut_refs)]
1303 unsafe {
1304 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1305 assert!(ptr.is_null());
1306
1307 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1308 assert!(ptr.is_null());
1309 }
1310
1311 assert_eq!(
1312 logger.lock().unwrap().clone(),
1313 vec![
1314 "SyncDataSrc 2 failed to setup",
1315 "AsyncDataSrc 1 failed to setup",
1316 "SyncDataSrc 2 dropped",
1317 "AsyncDataSrc 1 dropped",
1318 ]
1319 );
1320 }
1321
1322 #[test]
1323 fn test_cannot_add_global_data_src_after_setup() {
1324 let _unused = TEST_SEQ.lock().unwrap();
1325 clear_global_data_srcs_fixed();
1326
1327 #[allow(static_mut_refs)]
1328 unsafe {
1329 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1330 assert!(ptr.is_null());
1331
1332 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1333 assert!(ptr.is_null());
1334 }
1335
1336 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1337
1338 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
1339
1340 #[allow(static_mut_refs)]
1341 unsafe {
1342 let mut ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1343 assert!(!ptr.is_null());
1344 assert_eq!((*ptr).name, "foo");
1345 ptr = (*ptr).next;
1346 assert!(ptr.is_null());
1347
1348 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1349 assert!(ptr.is_null());
1350 }
1351
1352 {
1353 let result = setup();
1354 assert!(result.is_ok());
1355
1356 #[allow(static_mut_refs)]
1357 unsafe {
1358 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1359 assert!(ptr.is_null());
1360
1361 let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1362 assert!(!ptr.is_null());
1363 assert_eq!((*ptr).name, "foo");
1364 ptr = (*ptr).next;
1365 assert!(ptr.is_null());
1366 }
1367
1368 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
1369
1370 #[allow(static_mut_refs)]
1371 unsafe {
1372 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1373 assert!(ptr.is_null());
1374
1375 let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1376 assert!(!ptr.is_null());
1377 assert_eq!((*ptr).name, "foo");
1378 ptr = (*ptr).next;
1379 assert!(ptr.is_null());
1380 }
1381 }
1382
1383 assert_eq!(
1384 *logger.lock().unwrap(),
1385 vec![
1386 "AsyncDataSrc 1 setupped",
1387 "SyncDataSrc 2 dropped",
1388 "AsyncDataSrc 1 closed",
1389 "AsyncDataSrc 1 dropped",
1390 ]
1391 );
1392 }
1393
1394 #[test]
1395 fn test_do_nothing_if_executing_setup_twice() {
1396 let _unused = TEST_SEQ.lock().unwrap();
1397 clear_global_data_srcs_fixed();
1398
1399 #[allow(static_mut_refs)]
1400 unsafe {
1401 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1402 assert!(ptr.is_null());
1403
1404 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1405 assert!(ptr.is_null());
1406 }
1407
1408 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1409
1410 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
1411
1412 #[allow(static_mut_refs)]
1413 unsafe {
1414 let mut ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1415 assert!(!ptr.is_null());
1416 assert_eq!((*ptr).name, "foo");
1417 ptr = (*ptr).next;
1418 assert!(ptr.is_null());
1419
1420 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1421 assert!(ptr.is_null());
1422 }
1423
1424 {
1425 let result = setup();
1426 assert!(result.is_ok());
1427
1428 #[allow(static_mut_refs)]
1429 unsafe {
1430 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1431 assert!(ptr.is_null());
1432
1433 let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1434 assert!(!ptr.is_null());
1435 assert_eq!((*ptr).name, "foo");
1436 ptr = (*ptr).next;
1437 assert!(ptr.is_null());
1438 }
1439
1440 let result = setup();
1441 assert!(result.is_ok());
1442
1443 #[allow(static_mut_refs)]
1444 unsafe {
1445 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1446 assert!(ptr.is_null());
1447
1448 let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1449 assert!(!ptr.is_null());
1450 assert_eq!((*ptr).name, "foo");
1451 ptr = (*ptr).next;
1452 assert!(ptr.is_null());
1453 }
1454 }
1455
1456 assert_eq!(
1457 logger.lock().unwrap().clone(),
1458 vec![
1459 "AsyncDataSrc 1 setupped",
1460 "AsyncDataSrc 1 closed",
1461 "AsyncDataSrc 1 dropped",
1462 ]
1463 );
1464 }
1465
1466 #[tokio::test]
1467 async fn async_test_setup_and_shutdown() {
1468 let _unused = TEST_SEQ.lock().unwrap();
1469 clear_global_data_srcs_fixed();
1470
1471 #[allow(static_mut_refs)]
1472 unsafe {
1473 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1474 assert!(ptr.is_null());
1475
1476 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1477 assert!(ptr.is_null());
1478 }
1479
1480 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1481
1482 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
1483 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
1484
1485 #[allow(static_mut_refs)]
1486 unsafe {
1487 let mut ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1488 assert!(!ptr.is_null());
1489 assert_eq!((*ptr).name, "foo");
1490 ptr = (*ptr).next;
1491 assert!(!ptr.is_null());
1492 assert_eq!((*ptr).name, "bar");
1493 ptr = (*ptr).next;
1494 assert!(ptr.is_null());
1495
1496 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1497 assert!(ptr.is_null());
1498 }
1499
1500 {
1501 let result = setup_async().await;
1502 assert!(result.is_ok());
1503
1504 #[allow(static_mut_refs)]
1505 unsafe {
1506 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1507 assert!(ptr.is_null());
1508
1509 let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1510 assert!(!ptr.is_null());
1511 assert_eq!((*ptr).name, "foo");
1512 ptr = (*ptr).next;
1513 assert!(!ptr.is_null());
1514 assert_eq!((*ptr).name, "bar");
1515 ptr = (*ptr).next;
1516 assert!(ptr.is_null());
1517 }
1518 }
1519
1520 #[allow(static_mut_refs)]
1521 unsafe {
1522 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1523 assert!(ptr.is_null());
1524
1525 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1526 assert!(ptr.is_null());
1527 }
1528
1529 assert_eq!(
1530 *logger.lock().unwrap(),
1531 vec![
1532 "SyncDataSrc 2 setupped",
1533 "AsyncDataSrc 1 setupped",
1534 "SyncDataSrc 2 closed",
1535 "SyncDataSrc 2 dropped",
1536 "AsyncDataSrc 1 closed",
1537 "AsyncDataSrc 1 dropped",
1538 ],
1539 );
1540 }
1541
1542 #[tokio::test]
1543 async fn async_test_shutdown_later() {
1544 let _unused = TEST_SEQ.lock().unwrap();
1545 clear_global_data_srcs_fixed();
1546
1547 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1548
1549 {
1550 #[allow(static_mut_refs)]
1551 unsafe {
1552 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1553 assert!(ptr.is_null());
1554
1555 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1556 assert!(ptr.is_null());
1557 }
1558
1559 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
1560 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
1561
1562 #[allow(static_mut_refs)]
1563 unsafe {
1564 let mut ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1565 assert!(!ptr.is_null());
1566 assert_eq!((*ptr).name, "foo");
1567 ptr = (*ptr).next;
1568 assert!(!ptr.is_null());
1569 assert_eq!((*ptr).name, "bar");
1570 ptr = (*ptr).next;
1571 assert!(ptr.is_null());
1572
1573 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1574 assert!(ptr.is_null());
1575 }
1576
1577 let result = setup_async().await;
1578 assert!(result.is_ok());
1579
1580 #[allow(static_mut_refs)]
1581 unsafe {
1582 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1583 assert!(ptr.is_null());
1584
1585 let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1586 assert!(!ptr.is_null());
1587 assert_eq!((*ptr).name, "foo");
1588 ptr = (*ptr).next;
1589 assert!(!ptr.is_null());
1590 assert_eq!((*ptr).name, "bar");
1591 ptr = (*ptr).next;
1592 assert!(ptr.is_null());
1593 }
1594 }
1595
1596 #[allow(static_mut_refs)]
1597 unsafe {
1598 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1599 assert!(ptr.is_null());
1600
1601 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1602 assert!(ptr.is_null());
1603 }
1604
1605 assert_eq!(
1606 *logger.lock().unwrap(),
1607 vec![
1608 "SyncDataSrc 2 setupped",
1609 "AsyncDataSrc 1 setupped",
1610 "SyncDataSrc 2 closed",
1611 "SyncDataSrc 2 dropped",
1612 "AsyncDataSrc 1 closed",
1613 "AsyncDataSrc 1 dropped",
1614 ],
1615 );
1616 }
1617
1618 #[tokio::test]
1619 async fn async_test_fail_to_setup() {
1620 let _unused = TEST_SEQ.lock().unwrap();
1621 clear_global_data_srcs_fixed();
1622
1623 #[allow(static_mut_refs)]
1624 unsafe {
1625 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1626 assert!(ptr.is_null());
1627
1628 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1629 assert!(ptr.is_null());
1630 }
1631
1632 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1633
1634 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Setup));
1635 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Setup));
1636
1637 #[allow(static_mut_refs)]
1638 unsafe {
1639 let mut ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1640 assert!(!ptr.is_null());
1641 assert_eq!((*ptr).name, "foo");
1642 ptr = (*ptr).next;
1643 assert!(!ptr.is_null());
1644 assert_eq!((*ptr).name, "bar");
1645 ptr = (*ptr).next;
1646 assert!(ptr.is_null());
1647
1648 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1649 assert!(ptr.is_null());
1650 }
1651
1652 match setup_async().await {
1653 Ok(_) => panic!(),
1654 Err(err) => match err.reason::<DataHubError>() {
1655 Ok(r) => match r {
1656 DataHubError::FailToSetupGlobalDataSrcs { errors } => {
1657 let err = errors.get("foo").unwrap();
1658 match err.reason::<String>() {
1659 Ok(s) => assert_eq!(s, "YYY"),
1660 Err(_) => panic!(),
1661 }
1662 let err = errors.get("bar").unwrap();
1663 match err.reason::<String>() {
1664 Ok(s) => assert_eq!(s, "XXX"),
1665 Err(_) => panic!(),
1666 }
1667 }
1668 _ => panic!(),
1669 },
1670 Err(_) => panic!(),
1671 },
1672 }
1673
1674 #[allow(static_mut_refs)]
1675 unsafe {
1676 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1677 assert!(ptr.is_null());
1678
1679 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1680 assert!(ptr.is_null());
1681 }
1682
1683 assert_eq!(
1684 logger.lock().unwrap().clone(),
1685 vec![
1686 "SyncDataSrc 2 failed to setup",
1687 "AsyncDataSrc 1 failed to setup",
1688 "SyncDataSrc 2 dropped",
1689 "AsyncDataSrc 1 dropped",
1690 ]
1691 );
1692 }
1693
1694 #[tokio::test]
1695 async fn async_test_cannot_add_global_data_src_after_setup() {
1696 let _unused = TEST_SEQ.lock().unwrap();
1697 clear_global_data_srcs_fixed();
1698
1699 #[allow(static_mut_refs)]
1700 unsafe {
1701 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1702 assert!(ptr.is_null());
1703
1704 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1705 assert!(ptr.is_null());
1706 }
1707
1708 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1709
1710 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
1711
1712 #[allow(static_mut_refs)]
1713 unsafe {
1714 let mut ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1715 assert!(!ptr.is_null());
1716 assert_eq!((*ptr).name, "foo");
1717 ptr = (*ptr).next;
1718 assert!(ptr.is_null());
1719
1720 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1721 assert!(ptr.is_null());
1722 }
1723
1724 {
1725 let result = setup_async().await;
1726 assert!(result.is_ok());
1727
1728 #[allow(static_mut_refs)]
1729 unsafe {
1730 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1731 assert!(ptr.is_null());
1732
1733 let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1734 assert!(!ptr.is_null());
1735 assert_eq!((*ptr).name, "foo");
1736 ptr = (*ptr).next;
1737 assert!(ptr.is_null());
1738 }
1739
1740 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
1741
1742 #[allow(static_mut_refs)]
1743 unsafe {
1744 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1745 assert!(ptr.is_null());
1746
1747 let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1748 assert!(!ptr.is_null());
1749 assert_eq!((*ptr).name, "foo");
1750 ptr = (*ptr).next;
1751 assert!(ptr.is_null());
1752 }
1753 }
1754
1755 assert_eq!(
1756 *logger.lock().unwrap(),
1757 vec![
1758 "AsyncDataSrc 1 setupped",
1759 "SyncDataSrc 2 dropped",
1760 "AsyncDataSrc 1 closed",
1761 "AsyncDataSrc 1 dropped",
1762 ]
1763 );
1764 }
1765
1766 #[tokio::test]
1767 async fn async_test_do_nothing_if_executing_setup_twice() {
1768 let _unused = TEST_SEQ.lock().unwrap();
1769 clear_global_data_srcs_fixed();
1770
1771 #[allow(static_mut_refs)]
1772 unsafe {
1773 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1774 assert!(ptr.is_null());
1775
1776 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1777 assert!(ptr.is_null());
1778 }
1779
1780 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1781
1782 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
1783
1784 #[allow(static_mut_refs)]
1785 unsafe {
1786 let mut ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1787 assert!(!ptr.is_null());
1788 assert_eq!((*ptr).name, "foo");
1789 ptr = (*ptr).next;
1790 assert!(ptr.is_null());
1791
1792 let ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1793 assert!(ptr.is_null());
1794 }
1795
1796 {
1797 let result = setup_async().await;
1798 assert!(result.is_ok());
1799
1800 #[allow(static_mut_refs)]
1801 unsafe {
1802 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1803 assert!(ptr.is_null());
1804
1805 let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1806 assert!(!ptr.is_null());
1807 assert_eq!((*ptr).name, "foo");
1808 ptr = (*ptr).next;
1809 assert!(ptr.is_null());
1810 }
1811
1812 let result = setup();
1813 assert!(result.is_ok());
1814
1815 #[allow(static_mut_refs)]
1816 unsafe {
1817 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1818 assert!(ptr.is_null());
1819
1820 let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1821 assert!(!ptr.is_null());
1822 assert_eq!((*ptr).name, "foo");
1823 ptr = (*ptr).next;
1824 assert!(ptr.is_null());
1825 }
1826 }
1827
1828 assert_eq!(
1829 logger.lock().unwrap().clone(),
1830 vec![
1831 "AsyncDataSrc 1 setupped",
1832 "AsyncDataSrc 1 closed",
1833 "AsyncDataSrc 1 dropped",
1834 ]
1835 );
1836 }
1837 }
1838
1839 mod tests_of_data_hub_local {
1840 use super::*;
1841 use std::error::Error;
1842
1843 #[test]
1844 fn test_new_and_close_with_no_global_data_srcs() {
1845 let _unused = TEST_SEQ.lock().unwrap();
1846
1847 let hub = DataHub::new();
1848
1849 assert!(hub.local_data_src_list.not_setup_head().is_null());
1850 assert!(hub.local_data_src_list.did_setup_head().is_null());
1851 assert!(hub.data_conn_list.head().is_null());
1852 assert_eq!(hub.data_src_map.len(), 0);
1853 assert_eq!(hub.data_conn_map.len(), 0);
1854 assert_eq!(hub.fixed, false);
1855 }
1856
1857 #[test]
1858 fn test_new_and_close_with_global_data_srcs() {
1859 let _unused = TEST_SEQ.lock().unwrap();
1860 clear_global_data_srcs_fixed();
1861
1862 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1863
1864 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
1865 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
1866
1867 if let Ok(_auto_shutdown) = setup() {
1868 #[allow(static_mut_refs)]
1869 unsafe {
1870 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
1871 assert!(ptr.is_null());
1872
1873 let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
1874 assert!(!ptr.is_null());
1875 assert_eq!((*ptr).name, "foo");
1876 ptr = (*ptr).next;
1877 assert!(!ptr.is_null());
1878 assert_eq!((*ptr).name, "bar");
1879 ptr = (*ptr).next;
1880 assert!(ptr.is_null());
1881 }
1882
1883 let hub = DataHub::new();
1884
1885 assert!(hub.local_data_src_list.not_setup_head().is_null());
1886 assert!(hub.local_data_src_list.did_setup_head().is_null());
1887 assert!(hub.data_conn_list.head().is_null());
1888 assert_eq!(hub.data_src_map.len(), 2);
1889 assert_eq!(hub.data_conn_map.len(), 0);
1890 assert_eq!(hub.fixed, false);
1891
1892 #[allow(static_mut_refs)]
1893 let mut ptr = unsafe { GLOBAL_DATA_SRC_LIST.did_setup_head() };
1894 assert!(!ptr.is_null());
1895 ptr = unsafe { (*ptr).next };
1896 assert!(!ptr.is_null());
1897 ptr = unsafe { (*ptr).next };
1898 assert!(ptr.is_null());
1899 #[allow(static_mut_refs)]
1900 let ptr = unsafe { GLOBAL_DATA_SRC_LIST.not_setup_head() };
1901 assert!(ptr.is_null());
1902 } else {
1903 panic!();
1904 }
1905
1906 #[allow(static_mut_refs)]
1907 let ptr = unsafe { GLOBAL_DATA_SRC_LIST.did_setup_head() };
1908 assert!(ptr.is_null());
1909 #[allow(static_mut_refs)]
1910 let ptr = unsafe { GLOBAL_DATA_SRC_LIST.not_setup_head() };
1911 assert!(ptr.is_null());
1912
1913 assert_eq!(
1914 *logger.lock().unwrap(),
1915 vec![
1916 "SyncDataSrc 2 setupped",
1917 "AsyncDataSrc 1 setupped",
1918 "SyncDataSrc 2 closed",
1919 "SyncDataSrc 2 dropped",
1920 "AsyncDataSrc 1 closed",
1921 "AsyncDataSrc 1 dropped",
1922 ]
1923 );
1924 }
1925
1926 #[test]
1927 fn test_uses_and_disuses() {
1928 let _unused = TEST_SEQ.lock().unwrap();
1929 clear_global_data_srcs_fixed();
1930
1931 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1932
1933 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
1934 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
1935
1936 if let Ok(_auto_shutdown) = setup() {
1937 let mut hub = DataHub::new();
1938
1939 assert!(hub.local_data_src_list.not_setup_head().is_null());
1940 assert!(hub.local_data_src_list.did_setup_head().is_null());
1941 assert!(hub.data_conn_list.head().is_null());
1942 assert_eq!(hub.data_src_map.len(), 2);
1943 assert_eq!(hub.data_conn_map.len(), 0);
1944 assert_eq!(hub.fixed, false);
1945
1946 hub.uses("baz", SyncDataSrc::new(3, logger.clone(), Fail::Not));
1947 let mut ptr = hub.local_data_src_list.not_setup_head();
1948 assert!(!ptr.is_null());
1949 ptr = unsafe { (*ptr).next };
1950 assert!(ptr.is_null());
1951 assert!(hub.local_data_src_list.did_setup_head().is_null());
1952 assert!(hub.data_conn_list.head().is_null());
1953 assert_eq!(hub.data_src_map.len(), 2);
1954 assert_eq!(hub.data_conn_map.len(), 0);
1955 assert_eq!(hub.fixed, false);
1956
1957 hub.uses("qux", AsyncDataSrc::new(4, logger.clone(), Fail::Not));
1958 let mut ptr = hub.local_data_src_list.not_setup_head();
1959 assert!(!ptr.is_null());
1960 ptr = unsafe { (*ptr).next };
1961 assert!(!ptr.is_null());
1962 ptr = unsafe { (*ptr).next };
1963 assert!(ptr.is_null());
1964 assert!(hub.local_data_src_list.did_setup_head().is_null());
1965 assert!(hub.data_conn_list.head().is_null());
1966 assert_eq!(hub.data_src_map.len(), 2);
1967 assert_eq!(hub.data_conn_map.len(), 0);
1968 assert_eq!(hub.fixed, false);
1969
1970 hub.disuses("foo"); hub.disuses("bar"); let mut ptr = hub.local_data_src_list.not_setup_head();
1973 assert!(!ptr.is_null());
1974 ptr = unsafe { (*ptr).next };
1975 assert!(!ptr.is_null());
1976 ptr = unsafe { (*ptr).next };
1977 assert!(ptr.is_null());
1978 assert!(hub.local_data_src_list.did_setup_head().is_null());
1979 assert!(hub.data_conn_list.head().is_null());
1980 assert_eq!(hub.data_src_map.len(), 2);
1981 assert_eq!(hub.data_conn_map.len(), 0);
1982 assert_eq!(hub.fixed, false);
1983
1984 hub.disuses("baz");
1985 let mut ptr = hub.local_data_src_list.not_setup_head();
1986 assert!(!ptr.is_null());
1987 ptr = unsafe { (*ptr).next };
1988 assert!(ptr.is_null());
1989 assert!(hub.local_data_src_list.did_setup_head().is_null());
1990 assert!(hub.data_conn_list.head().is_null());
1991 assert_eq!(hub.data_src_map.len(), 2);
1992 assert_eq!(hub.data_conn_map.len(), 0);
1993 assert_eq!(hub.fixed, false);
1994
1995 hub.disuses("qux");
1996 let ptr = hub.local_data_src_list.not_setup_head();
1997 assert!(ptr.is_null());
1998 assert!(hub.local_data_src_list.did_setup_head().is_null());
1999 assert!(hub.data_conn_list.head().is_null());
2000 assert_eq!(hub.data_src_map.len(), 2);
2001 assert_eq!(hub.data_conn_map.len(), 0);
2002 assert_eq!(hub.fixed, false);
2003 } else {
2004 panic!();
2005 }
2006
2007 assert_eq!(
2008 *logger.lock().unwrap(),
2009 vec![
2010 "SyncDataSrc 2 setupped",
2011 "AsyncDataSrc 1 setupped",
2012 "SyncDataSrc 3 closed",
2013 "SyncDataSrc 3 dropped",
2014 "AsyncDataSrc 4 closed",
2015 "AsyncDataSrc 4 dropped",
2016 "SyncDataSrc 2 closed",
2017 "SyncDataSrc 2 dropped",
2018 "AsyncDataSrc 1 closed",
2019 "AsyncDataSrc 1 dropped",
2020 ]
2021 );
2022 }
2023
2024 #[test]
2025 fn test_cannot_add_and_remove_data_src_between_begin_and_end() {
2026 let _unused = TEST_SEQ.lock().unwrap();
2027 clear_global_data_srcs_fixed();
2028
2029 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
2030
2031 if let Ok(_auto_shutdown) = setup() {
2032 let mut hub = DataHub::new();
2033
2034 let ptr = hub.local_data_src_list.not_setup_head();
2035 assert!(ptr.is_null());
2036 let ptr = hub.local_data_src_list.did_setup_head();
2037 assert!(ptr.is_null());
2038 assert!(hub.data_conn_list.head().is_null());
2039 assert_eq!(hub.data_src_map.len(), 0);
2040 assert_eq!(hub.data_conn_map.len(), 0);
2041 assert_eq!(hub.fixed, false);
2042
2043 hub.uses("baz", SyncDataSrc::new(1, logger.clone(), Fail::Not));
2044
2045 let mut ptr = hub.local_data_src_list.not_setup_head();
2046 assert!(!ptr.is_null());
2047 ptr = unsafe { (*ptr).next };
2048 assert!(ptr.is_null());
2049 let ptr = hub.local_data_src_list.did_setup_head();
2050 assert!(ptr.is_null());
2051 assert!(hub.data_conn_list.head().is_null());
2052 assert_eq!(hub.data_src_map.len(), 0);
2053 assert_eq!(hub.data_conn_map.len(), 0);
2054 assert_eq!(hub.fixed, false);
2055
2056 assert!(hub.begin().is_ok());
2057
2058 let ptr = hub.local_data_src_list.not_setup_head();
2059 assert!(ptr.is_null());
2060 let mut ptr = hub.local_data_src_list.did_setup_head();
2061 assert!(!ptr.is_null());
2062 ptr = unsafe { (*ptr).next };
2063 assert!(ptr.is_null());
2064 assert!(hub.data_conn_list.head().is_null());
2065 assert_eq!(hub.data_src_map.len(), 1);
2066 assert_eq!(hub.data_conn_map.len(), 0);
2067 assert_eq!(hub.fixed, true);
2068
2069 hub.uses("foo", AsyncDataSrc::new(2, logger.clone(), Fail::Not));
2070
2071 let ptr = hub.local_data_src_list.not_setup_head();
2072 assert!(ptr.is_null());
2073 let mut ptr = hub.local_data_src_list.did_setup_head();
2074 assert!(!ptr.is_null());
2075 ptr = unsafe { (*ptr).next };
2076 assert!(ptr.is_null());
2077 assert!(hub.data_conn_list.head().is_null());
2078 assert_eq!(hub.data_src_map.len(), 1);
2079 assert_eq!(hub.data_conn_map.len(), 0);
2080 assert_eq!(hub.fixed, true);
2081
2082 hub.disuses("baz");
2083
2084 let ptr = hub.local_data_src_list.not_setup_head();
2085 assert!(ptr.is_null());
2086 let mut ptr = hub.local_data_src_list.did_setup_head();
2087 assert!(!ptr.is_null());
2088 ptr = unsafe { (*ptr).next };
2089 assert!(ptr.is_null());
2090 assert!(hub.data_conn_list.head().is_null());
2091 assert_eq!(hub.data_src_map.len(), 1);
2092 assert_eq!(hub.data_conn_map.len(), 0);
2093 assert_eq!(hub.fixed, true);
2094
2095 hub.end();
2096
2097 let ptr = hub.local_data_src_list.not_setup_head();
2098 assert!(ptr.is_null());
2099 let mut ptr = hub.local_data_src_list.did_setup_head();
2100 assert!(!ptr.is_null());
2101 ptr = unsafe { (*ptr).next };
2102 assert!(ptr.is_null());
2103 assert!(hub.data_conn_list.head().is_null());
2104 assert_eq!(hub.data_src_map.len(), 1);
2105 assert_eq!(hub.data_conn_map.len(), 0);
2106 assert_eq!(hub.fixed, false);
2107
2108 hub.uses("foo", AsyncDataSrc::new(2, logger.clone(), Fail::Not));
2109
2110 let mut ptr = hub.local_data_src_list.not_setup_head();
2111 assert!(!ptr.is_null());
2112 ptr = unsafe { (*ptr).next };
2113 assert!(ptr.is_null());
2114 let mut ptr = hub.local_data_src_list.did_setup_head();
2115 assert!(!ptr.is_null());
2116 ptr = unsafe { (*ptr).next };
2117 assert!(ptr.is_null());
2118 assert!(hub.data_conn_list.head().is_null());
2119 assert_eq!(hub.data_src_map.len(), 1);
2120 assert_eq!(hub.data_conn_map.len(), 0);
2121 assert_eq!(hub.fixed, false);
2122
2123 hub.disuses("baz");
2124
2125 let mut ptr = hub.local_data_src_list.not_setup_head();
2126 assert!(!ptr.is_null());
2127 ptr = unsafe { (*ptr).next };
2128 assert!(ptr.is_null());
2129 let ptr = hub.local_data_src_list.did_setup_head();
2130 assert!(ptr.is_null());
2131 assert!(hub.data_conn_list.head().is_null());
2132 assert_eq!(hub.data_src_map.len(), 0);
2133 assert_eq!(hub.data_conn_map.len(), 0);
2134 assert_eq!(hub.fixed, false);
2135 }
2136 }
2137
2138 #[test]
2139 fn test_begin_and_end() {
2140 let _unused = TEST_SEQ.lock().unwrap();
2141 clear_global_data_srcs_fixed();
2142
2143 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
2144
2145 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
2146 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
2147
2148 if let Ok(_auto_shutdown) = setup() {
2149 let mut hub = DataHub::new();
2150
2151 hub.uses("baz", SyncDataSrc::new(3, logger.clone(), Fail::Not));
2152 hub.uses("qux", AsyncDataSrc::new(4, logger.clone(), Fail::Not));
2153
2154 let mut ptr = hub.local_data_src_list.not_setup_head();
2155 assert!(!ptr.is_null());
2156 ptr = unsafe { (*ptr).next };
2157 assert!(!ptr.is_null());
2158 ptr = unsafe { (*ptr).next };
2159 assert!(ptr.is_null());
2160 assert!(hub.local_data_src_list.did_setup_head().is_null());
2161 assert!(hub.data_conn_list.head().is_null());
2162 assert_eq!(hub.data_src_map.len(), 2);
2163 assert_eq!(hub.data_conn_map.len(), 0);
2164 assert_eq!(hub.fixed, false);
2165
2166 assert!(hub.begin().is_ok());
2167
2168 assert!(hub.local_data_src_list.not_setup_head().is_null());
2169 let mut ptr = hub.local_data_src_list.did_setup_head();
2170 assert!(!ptr.is_null());
2171 ptr = unsafe { (*ptr).next };
2172 assert!(!ptr.is_null());
2173 ptr = unsafe { (*ptr).next };
2174 assert!(ptr.is_null());
2175 assert!(hub.data_conn_list.head().is_null());
2176 assert_eq!(hub.data_src_map.len(), 4);
2177 assert_eq!(hub.data_conn_map.len(), 0);
2178 assert_eq!(hub.fixed, true);
2179
2180 hub.end();
2181
2182 assert!(hub.local_data_src_list.not_setup_head().is_null());
2183 let mut ptr = hub.local_data_src_list.did_setup_head();
2184 assert!(!ptr.is_null());
2185 ptr = unsafe { (*ptr).next };
2186 assert!(!ptr.is_null());
2187 ptr = unsafe { (*ptr).next };
2188 assert!(ptr.is_null());
2189 assert!(hub.data_conn_list.head().is_null());
2190 assert_eq!(hub.data_src_map.len(), 4);
2191 assert_eq!(hub.data_conn_map.len(), 0);
2192 assert_eq!(hub.fixed, false);
2193 }
2194
2195 assert_eq!(
2196 *logger.lock().unwrap(),
2197 vec![
2198 "SyncDataSrc 2 setupped",
2199 "AsyncDataSrc 1 setupped",
2200 "SyncDataSrc 3 setupped",
2201 "AsyncDataSrc 4 setupped",
2202 "AsyncDataSrc 4 closed",
2203 "AsyncDataSrc 4 dropped",
2204 "SyncDataSrc 3 closed",
2205 "SyncDataSrc 3 dropped",
2206 "SyncDataSrc 2 closed",
2207 "SyncDataSrc 2 dropped",
2208 "AsyncDataSrc 1 closed",
2209 "AsyncDataSrc 1 dropped",
2210 ]
2211 );
2212 }
2213
2214 #[test]
2215 fn test_begin_and_end_but_fail_sync() {
2216 let _unused = TEST_SEQ.lock().unwrap();
2217 clear_global_data_srcs_fixed();
2218
2219 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
2220
2221 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
2222 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
2223
2224 if let Ok(_auto_shutdown) = setup() {
2225 let mut hub = DataHub::new();
2226 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
2227 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Setup));
2228
2229 if let Err(err) = hub.begin() {
2230 match err.reason::<DataHubError>() {
2231 Ok(r) => match r {
2232 DataHubError::FailToSetupLocalDataSrcs { errors } => {
2233 assert_eq!(errors.len(), 1);
2234 if let Some(err) = errors.get("qux") {
2235 match err.reason::<String>() {
2236 Ok(s) => assert_eq!(s, "XXX"),
2237 Err(_) => panic!(),
2238 }
2239 } else {
2240 panic!();
2241 }
2242 }
2243 _ => panic!(),
2244 },
2245 Err(_) => panic!(),
2246 }
2247 } else {
2248 panic!();
2249 }
2250
2251 let mut ptr = hub.local_data_src_list.not_setup_head();
2252 assert!(!ptr.is_null());
2253 ptr = unsafe { (*ptr).next };
2254 assert!(ptr.is_null());
2255 let mut ptr = hub.local_data_src_list.did_setup_head();
2256 assert!(!ptr.is_null());
2257 ptr = unsafe { (*ptr).next };
2258 assert!(ptr.is_null());
2259 assert!(hub.data_conn_list.head().is_null());
2260 assert_eq!(hub.data_src_map.len(), 3);
2261 assert_eq!(hub.data_conn_map.len(), 0);
2262 assert_eq!(hub.fixed, true);
2263
2264 hub.end();
2265
2266 let mut ptr = hub.local_data_src_list.not_setup_head();
2267 assert!(!ptr.is_null());
2268 ptr = unsafe { (*ptr).next };
2269 assert!(ptr.is_null());
2270 let mut ptr = hub.local_data_src_list.did_setup_head();
2271 assert!(!ptr.is_null());
2272 ptr = unsafe { (*ptr).next };
2273 assert!(ptr.is_null());
2274 assert!(hub.data_conn_list.head().is_null());
2275 assert_eq!(hub.data_src_map.len(), 3);
2276 assert_eq!(hub.data_conn_map.len(), 0);
2277 assert_eq!(hub.fixed, false);
2278 } else {
2279 panic!();
2280 }
2281 }
2282
2283 #[test]
2284 fn test_begin_and_end_but_fail_async() {
2285 let _unused = TEST_SEQ.lock().unwrap();
2286 clear_global_data_srcs_fixed();
2287
2288 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
2289
2290 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
2291 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
2292
2293 if let Ok(_auto_shutdown) = setup() {
2294 let mut hub = DataHub::new();
2295 hub.uses("baz", AsyncDataSrc::new(1, logger.clone(), Fail::Setup));
2296 hub.uses("qux", SyncDataSrc::new(2, logger.clone(), Fail::Not));
2297
2298 if let Err(err) = hub.begin() {
2299 match err.reason::<DataHubError>() {
2300 Ok(r) => match r {
2301 DataHubError::FailToSetupLocalDataSrcs { errors } => {
2302 assert_eq!(errors.len(), 1);
2303 if let Some(err) = errors.get("baz") {
2304 match err.reason::<String>() {
2305 Ok(s) => assert_eq!(s, "YYY"),
2306 Err(_) => panic!(),
2307 }
2308 } else {
2309 panic!();
2310 }
2311 }
2312 _ => panic!(),
2313 },
2314 Err(_) => panic!(),
2315 }
2316 } else {
2317 panic!();
2318 }
2319
2320 let mut ptr = hub.local_data_src_list.not_setup_head();
2321 assert!(!ptr.is_null());
2322 ptr = unsafe { (*ptr).next };
2323 assert!(ptr.is_null());
2324 let mut ptr = hub.local_data_src_list.did_setup_head();
2325 assert!(!ptr.is_null());
2326 ptr = unsafe { (*ptr).next };
2327 assert!(ptr.is_null());
2328 assert!(hub.data_conn_list.head().is_null());
2329 assert_eq!(hub.data_src_map.len(), 3);
2330 assert_eq!(hub.data_conn_map.len(), 0);
2331 assert_eq!(hub.fixed, true);
2332
2333 hub.end();
2334
2335 let mut ptr = hub.local_data_src_list.not_setup_head();
2336 assert!(!ptr.is_null());
2337 ptr = unsafe { (*ptr).next };
2338 assert!(ptr.is_null());
2339 let mut ptr = hub.local_data_src_list.did_setup_head();
2340 assert!(!ptr.is_null());
2341 ptr = unsafe { (*ptr).next };
2342 assert!(ptr.is_null());
2343 assert!(hub.data_conn_list.head().is_null());
2344 assert_eq!(hub.data_src_map.len(), 3);
2345 assert_eq!(hub.data_conn_map.len(), 0);
2346 assert_eq!(hub.fixed, false);
2347 } else {
2348 panic!();
2349 }
2350 }
2351
2352 #[test]
2353 fn test_commit_and_post_commit() {
2354 let _unused = TEST_SEQ.lock().unwrap();
2355 clear_global_data_srcs_fixed();
2356
2357 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
2358
2359 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
2360 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
2361
2362 if let Ok(_auto_shutdown) = setup() {
2363 let mut hub = DataHub::new();
2364 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
2365 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
2366
2367 if let Ok(_) = hub.begin() {
2368 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
2369 assert_eq!(
2370 any::type_name_of_val(conn1),
2371 "sabi::data_hub::tests_data_hub::AsyncDataConn"
2372 );
2373 } else {
2374 panic!();
2375 }
2376 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
2377 assert_eq!(
2378 any::type_name_of_val(conn2),
2379 "sabi::data_hub::tests_data_hub::SyncDataConn"
2380 );
2381 } else {
2382 panic!();
2383 }
2384 if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
2385 assert_eq!(
2386 any::type_name_of_val(conn3),
2387 "sabi::data_hub::tests_data_hub::AsyncDataConn"
2388 );
2389 } else {
2390 panic!();
2391 }
2392 if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
2393 assert_eq!(
2394 any::type_name_of_val(conn4),
2395 "sabi::data_hub::tests_data_hub::SyncDataConn"
2396 );
2397 } else {
2398 panic!();
2399 }
2400
2401 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
2402 assert_eq!(
2403 any::type_name_of_val(conn1),
2404 "sabi::data_hub::tests_data_hub::AsyncDataConn"
2405 );
2406 } else {
2407 panic!();
2408 }
2409 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
2410 assert_eq!(
2411 any::type_name_of_val(conn2),
2412 "sabi::data_hub::tests_data_hub::SyncDataConn"
2413 );
2414 } else {
2415 panic!();
2416 }
2417 if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
2418 assert_eq!(
2419 any::type_name_of_val(conn3),
2420 "sabi::data_hub::tests_data_hub::AsyncDataConn"
2421 );
2422 } else {
2423 panic!();
2424 }
2425 if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
2426 assert_eq!(
2427 any::type_name_of_val(conn4),
2428 "sabi::data_hub::tests_data_hub::SyncDataConn"
2429 );
2430 } else {
2431 panic!();
2432 }
2433
2434 assert!(hub.commit().is_ok());
2435 hub.end();
2436 } else {
2437 panic!();
2438 }
2439 } else {
2440 panic!();
2441 }
2442
2443 assert_eq!(
2444 *logger.lock().unwrap(),
2445 vec![
2446 "SyncDataSrc 2 setupped",
2447 "AsyncDataSrc 1 setupped",
2448 "SyncDataSrc 4 setupped",
2449 "AsyncDataSrc 3 setupped",
2450 "AsyncDataSrc 1 created DataConn",
2451 "SyncDataSrc 2 created DataConn",
2452 "AsyncDataSrc 3 created DataConn",
2453 "SyncDataSrc 4 created DataConn",
2454 "AsyncDataConn 1 pre committed",
2455 "SyncDataConn 2 pre committed",
2456 "AsyncDataConn 3 pre committed",
2457 "SyncDataConn 4 pre committed",
2458 "AsyncDataConn 1 committed",
2459 "SyncDataConn 2 committed",
2460 "AsyncDataConn 3 committed",
2461 "SyncDataConn 4 committed",
2462 "AsyncDataConn 1 post committed",
2463 "SyncDataConn 2 post committed",
2464 "AsyncDataConn 3 post committed",
2465 "SyncDataConn 4 post committed",
2466 "SyncDataConn 4 closed",
2467 "SyncDataConn 4 dropped",
2468 "AsyncDataConn 3 closed",
2469 "AsyncDataConn 3 dropped",
2470 "SyncDataConn 2 closed",
2471 "SyncDataConn 2 dropped",
2472 "AsyncDataConn 1 closed",
2473 "AsyncDataConn 1 dropped",
2474 "SyncDataSrc 4 closed",
2475 "SyncDataSrc 4 dropped",
2476 "AsyncDataSrc 3 closed",
2477 "AsyncDataSrc 3 dropped",
2478 "SyncDataSrc 2 closed",
2479 "SyncDataSrc 2 dropped",
2480 "AsyncDataSrc 1 closed",
2481 "AsyncDataSrc 1 dropped",
2482 ],
2483 );
2484 }
2485
2486 #[test]
2487 fn test_fail_to_cast_new_data_conn() {
2488 let _unused = TEST_SEQ.lock().unwrap();
2489 clear_global_data_srcs_fixed();
2490
2491 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
2492
2493 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
2494
2495 if let Ok(_auto_shutdown) = setup() {
2496 let mut hub = DataHub::new();
2497 hub.uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
2498
2499 if let Ok(_) = hub.begin() {
2500 if let Err(err) = hub.get_data_conn::<SyncDataConn>("foo") {
2501 match err.reason::<DataHubError>() {
2502 Ok(r) => match r {
2503 DataHubError::FailToCastDataConn { name, cast_to_type } => {
2504 assert_eq!(name, "foo");
2505 assert_eq!(
2506 *cast_to_type,
2507 "sabi::data_hub::tests_data_hub::SyncDataConn"
2508 );
2509 }
2510 _ => panic!(),
2511 },
2512 Err(_) => panic!(),
2513 }
2514 } else {
2515 panic!();
2516 }
2517
2518 if let Err(err) = hub.get_data_conn::<AsyncDataConn>("bar") {
2519 match err.reason::<DataHubError>() {
2520 Ok(r) => match r {
2521 DataHubError::FailToCastDataConn { name, cast_to_type } => {
2522 assert_eq!(name, "bar");
2523 assert_eq!(
2524 *cast_to_type,
2525 "sabi::data_hub::tests_data_hub::AsyncDataConn"
2526 );
2527 }
2528 _ => panic!(),
2529 },
2530 Err(_) => panic!(),
2531 }
2532 } else {
2533 panic!();
2534 }
2535 } else {
2536 panic!();
2537 }
2538 } else {
2539 panic!();
2540 }
2541
2542 assert_eq!(
2543 *logger.lock().unwrap(),
2544 vec![
2545 "AsyncDataSrc 1 setupped",
2546 "SyncDataSrc 2 setupped",
2547 "SyncDataSrc 2 closed",
2548 "SyncDataSrc 2 dropped",
2549 "AsyncDataSrc 1 closed",
2550 "AsyncDataSrc 1 dropped",
2551 ],
2552 );
2553 }
2554
2555 #[test]
2556 fn test_fail_to_cast_reused_data_conn() {
2557 let _unused = TEST_SEQ.lock().unwrap();
2558 clear_global_data_srcs_fixed();
2559
2560 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
2561
2562 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
2563
2564 if let Ok(_auto_shutdown) = setup() {
2565 let mut hub = DataHub::new();
2566 hub.uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
2567
2568 if let Ok(_) = hub.begin() {
2569 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
2570 assert_eq!(
2571 any::type_name_of_val(conn1),
2572 "sabi::data_hub::tests_data_hub::AsyncDataConn"
2573 );
2574 } else {
2575 panic!();
2576 }
2577 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
2578 assert_eq!(
2579 any::type_name_of_val(conn2),
2580 "sabi::data_hub::tests_data_hub::SyncDataConn"
2581 );
2582 } else {
2583 panic!();
2584 }
2585
2586 if let Err(err) = hub.get_data_conn::<SyncDataConn>("foo") {
2587 match err.reason::<DataHubError>() {
2588 Ok(r) => match r {
2589 DataHubError::FailToCastDataConn { name, cast_to_type } => {
2590 assert_eq!(name, "foo");
2591 assert_eq!(
2592 *cast_to_type,
2593 "sabi::data_hub::tests_data_hub::SyncDataConn"
2594 );
2595 }
2596 _ => panic!(),
2597 },
2598 Err(_) => panic!(),
2599 }
2600 } else {
2601 panic!();
2602 }
2603
2604 if let Err(err) = hub.get_data_conn::<AsyncDataConn>("bar") {
2605 match err.reason::<DataHubError>() {
2606 Ok(r) => match r {
2607 DataHubError::FailToCastDataConn { name, cast_to_type } => {
2608 assert_eq!(name, "bar");
2609 assert_eq!(
2610 *cast_to_type,
2611 "sabi::data_hub::tests_data_hub::AsyncDataConn"
2612 );
2613 }
2614 _ => panic!(),
2615 },
2616 Err(_) => panic!(),
2617 }
2618 } else {
2619 panic!();
2620 }
2621 } else {
2622 panic!();
2623 }
2624 } else {
2625 panic!();
2626 }
2627
2628 assert_eq!(
2629 *logger.lock().unwrap(),
2630 vec![
2631 "AsyncDataSrc 1 setupped",
2632 "SyncDataSrc 2 setupped",
2633 "AsyncDataSrc 1 created DataConn",
2634 "SyncDataSrc 2 created DataConn",
2635 "SyncDataConn 2 closed",
2636 "SyncDataConn 2 dropped",
2637 "AsyncDataConn 1 closed",
2638 "AsyncDataConn 1 dropped",
2639 "SyncDataSrc 2 closed",
2640 "SyncDataSrc 2 dropped",
2641 "AsyncDataSrc 1 closed",
2642 "AsyncDataSrc 1 dropped",
2643 ],
2644 );
2645 }
2646
2647 #[test]
2648 fn test_fail_to_create_data_conn() {
2649 let _unused = TEST_SEQ.lock().unwrap();
2650 clear_global_data_srcs_fixed();
2651
2652 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
2653
2654 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
2655
2656 if let Ok(_auto_shutdown) = setup() {
2657 let mut hub = DataHub::new();
2658 hub.uses(
2659 "bar",
2660 SyncDataSrc::new(2, logger.clone(), Fail::CreateDataConn),
2661 );
2662
2663 if let Ok(_) = hub.begin() {
2664 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
2665 assert_eq!(
2666 any::type_name_of_val(conn1),
2667 "sabi::data_hub::tests_data_hub::AsyncDataConn"
2668 );
2669 } else {
2670 panic!();
2671 }
2672
2673 if let Err(err) = hub.get_data_conn::<SyncDataConn>("bar") {
2674 match err.reason::<DataHubError>() {
2675 Ok(r) => match r {
2676 DataHubError::FailToCreateDataConn {
2677 name,
2678 data_conn_type,
2679 } => {
2680 assert_eq!(name, "bar");
2681 assert_eq!(
2682 *data_conn_type,
2683 "sabi::data_hub::tests_data_hub::SyncDataConn"
2684 );
2685 }
2686 _ => panic!(),
2687 },
2688 Err(_) => panic!(),
2689 }
2690 match err.source() {
2691 Some(e) => {
2692 assert_eq!(
2693 e.downcast_ref::<errs::Err>()
2694 .unwrap()
2695 .reason::<String>()
2696 .unwrap(),
2697 "xxx"
2698 );
2699 }
2700 None => panic!(),
2701 }
2702 } else {
2703 panic!();
2704 }
2705 } else {
2706 panic!();
2707 }
2708 } else {
2709 panic!();
2710 }
2711
2712 assert_eq!(
2713 *logger.lock().unwrap(),
2714 vec![
2715 "AsyncDataSrc 1 setupped",
2716 "SyncDataSrc 2 setupped",
2717 "AsyncDataSrc 1 created DataConn",
2718 "SyncDataSrc 2 failed to create a DataConn",
2719 "AsyncDataConn 1 closed",
2720 "AsyncDataConn 1 dropped",
2721 "SyncDataSrc 2 closed",
2722 "SyncDataSrc 2 dropped",
2723 "AsyncDataSrc 1 closed",
2724 "AsyncDataSrc 1 dropped",
2725 ],
2726 );
2727 }
2728
2729 #[test]
2730 fn test_fail_to_create_data_conn_because_of_no_data_src() {
2731 let _unused = TEST_SEQ.lock().unwrap();
2732 clear_global_data_srcs_fixed();
2733
2734 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
2735
2736 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
2737
2738 if let Ok(_auto_shutdown) = setup() {
2739 let mut hub = DataHub::new();
2740 hub.uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
2741
2742 if let Ok(_) = hub.begin() {
2743 if let Err(err) = hub.get_data_conn::<SyncDataConn>("baz") {
2744 match err.reason::<DataHubError>() {
2745 Ok(r) => match r {
2746 DataHubError::NoDataSrcToCreateDataConn {
2747 name,
2748 data_conn_type,
2749 } => {
2750 assert_eq!(name, "baz");
2751 assert_eq!(
2752 *data_conn_type,
2753 "sabi::data_hub::tests_data_hub::SyncDataConn"
2754 );
2755 }
2756 _ => panic!(),
2757 },
2758 Err(_) => panic!(),
2759 }
2760 } else {
2761 panic!();
2762 }
2763
2764 if let Err(err) = hub.get_data_conn::<AsyncDataConn>("qux") {
2765 match err.reason::<DataHubError>() {
2766 Ok(r) => match r {
2767 DataHubError::NoDataSrcToCreateDataConn {
2768 name,
2769 data_conn_type,
2770 } => {
2771 assert_eq!(name, "qux");
2772 assert_eq!(
2773 *data_conn_type,
2774 "sabi::data_hub::tests_data_hub::AsyncDataConn"
2775 );
2776 }
2777 _ => panic!(),
2778 },
2779 Err(_) => panic!(),
2780 }
2781 } else {
2782 panic!();
2783 }
2784 } else {
2785 panic!();
2786 }
2787 } else {
2788 panic!();
2789 }
2790
2791 assert_eq!(
2792 *logger.lock().unwrap(),
2793 vec![
2794 "AsyncDataSrc 1 setupped",
2795 "SyncDataSrc 2 setupped",
2796 "SyncDataSrc 2 closed",
2797 "SyncDataSrc 2 dropped",
2798 "AsyncDataSrc 1 closed",
2799 "AsyncDataSrc 1 dropped",
2800 ],
2801 );
2802 }
2803
2804 #[test]
2805 fn test_commit_when_no_data_conn() {
2806 let _unused = TEST_SEQ.lock().unwrap();
2807 clear_global_data_srcs_fixed();
2808
2809 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
2810
2811 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
2812 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
2813
2814 if let Ok(_auto_shutdown) = setup() {
2815 let mut hub = DataHub::new();
2816 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
2817 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
2818
2819 if let Ok(_) = hub.begin() {
2820 assert!(hub.commit().is_ok());
2821 hub.end();
2822 } else {
2823 panic!();
2824 }
2825 } else {
2826 panic!();
2827 }
2828
2829 assert_eq!(
2830 *logger.lock().unwrap(),
2831 vec![
2832 "SyncDataSrc 2 setupped",
2833 "AsyncDataSrc 1 setupped",
2834 "SyncDataSrc 4 setupped",
2835 "AsyncDataSrc 3 setupped",
2836 "SyncDataSrc 4 closed",
2837 "SyncDataSrc 4 dropped",
2838 "AsyncDataSrc 3 closed",
2839 "AsyncDataSrc 3 dropped",
2840 "SyncDataSrc 2 closed",
2841 "SyncDataSrc 2 dropped",
2842 "AsyncDataSrc 1 closed",
2843 "AsyncDataSrc 1 dropped",
2844 ],
2845 );
2846 }
2847
2848 #[test]
2849 fn test_commit_but_fail_global_sync() {
2850 let _unused = TEST_SEQ.lock().unwrap();
2851 clear_global_data_srcs_fixed();
2852
2853 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
2854
2855 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
2856 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Commit));
2857
2858 if let Ok(_auto_shutdown) = setup() {
2859 let mut hub = DataHub::new();
2860 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
2861 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
2862
2863 if let Ok(_) = hub.begin() {
2864 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
2865 assert_eq!(
2866 any::type_name_of_val(conn1),
2867 "sabi::data_hub::tests_data_hub::AsyncDataConn"
2868 );
2869 } else {
2870 panic!();
2871 }
2872 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
2873 assert_eq!(
2874 any::type_name_of_val(conn2),
2875 "sabi::data_hub::tests_data_hub::SyncDataConn"
2876 );
2877 } else {
2878 panic!();
2879 }
2880 if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
2881 assert_eq!(
2882 any::type_name_of_val(conn3),
2883 "sabi::data_hub::tests_data_hub::AsyncDataConn"
2884 );
2885 } else {
2886 panic!();
2887 }
2888 if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
2889 assert_eq!(
2890 any::type_name_of_val(conn4),
2891 "sabi::data_hub::tests_data_hub::SyncDataConn"
2892 );
2893 } else {
2894 panic!();
2895 }
2896
2897 match hub.commit() {
2898 Ok(_) => panic!(),
2899 Err(err) => match err.reason::<DataHubError>() {
2900 Ok(r) => match r {
2901 DataHubError::FailToCommitDataConn { errors } => {
2902 assert_eq!(errors.len(), 1);
2903 if let Some(e) = errors.get("bar") {
2904 if let Ok(s) = e.reason::<String>() {
2905 assert_eq!(s, "ZZZ");
2906 } else {
2907 panic!();
2908 }
2909 } else {
2910 panic!();
2911 }
2912 }
2913 _ => panic!(),
2914 },
2915 Err(_) => panic!(),
2916 },
2917 }
2918
2919 hub.end();
2920 } else {
2921 panic!();
2922 }
2923 } else {
2924 panic!();
2925 }
2926
2927 assert_eq!(
2928 *logger.lock().unwrap(),
2929 vec![
2930 "SyncDataSrc 2 setupped",
2931 "AsyncDataSrc 1 setupped",
2932 "SyncDataSrc 4 setupped",
2933 "AsyncDataSrc 3 setupped",
2934 "AsyncDataSrc 1 created DataConn",
2935 "SyncDataSrc 2 created DataConn",
2936 "AsyncDataSrc 3 created DataConn",
2937 "SyncDataSrc 4 created DataConn",
2938 "AsyncDataConn 1 pre committed",
2939 "SyncDataConn 2 pre committed",
2940 "AsyncDataConn 3 pre committed",
2941 "SyncDataConn 4 pre committed",
2942 "AsyncDataConn 1 committed",
2943 "SyncDataConn 2 failed to commit",
2944 "SyncDataConn 4 closed",
2945 "SyncDataConn 4 dropped",
2946 "AsyncDataConn 3 closed",
2947 "AsyncDataConn 3 dropped",
2948 "SyncDataConn 2 closed",
2949 "SyncDataConn 2 dropped",
2950 "AsyncDataConn 1 closed",
2951 "AsyncDataConn 1 dropped",
2952 "SyncDataSrc 4 closed",
2953 "SyncDataSrc 4 dropped",
2954 "AsyncDataSrc 3 closed",
2955 "AsyncDataSrc 3 dropped",
2956 "SyncDataSrc 2 closed",
2957 "SyncDataSrc 2 dropped",
2958 "AsyncDataSrc 1 closed",
2959 "AsyncDataSrc 1 dropped",
2960 ],
2961 );
2962 }
2963
2964 #[test]
2965 fn test_commit_but_fail_global_async() {
2966 let _unused = TEST_SEQ.lock().unwrap();
2967 clear_global_data_srcs_fixed();
2968
2969 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
2970
2971 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Commit));
2972 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
2973
2974 if let Ok(_auto_shutdown) = setup() {
2975 let mut hub = DataHub::new();
2976 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
2977 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
2978
2979 if let Ok(_) = hub.begin() {
2980 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
2981 assert_eq!(
2982 any::type_name_of_val(conn1),
2983 "sabi::data_hub::tests_data_hub::AsyncDataConn"
2984 );
2985 } else {
2986 panic!();
2987 }
2988 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
2989 assert_eq!(
2990 any::type_name_of_val(conn2),
2991 "sabi::data_hub::tests_data_hub::SyncDataConn"
2992 );
2993 } else {
2994 panic!();
2995 }
2996 if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
2997 assert_eq!(
2998 any::type_name_of_val(conn3),
2999 "sabi::data_hub::tests_data_hub::AsyncDataConn"
3000 );
3001 } else {
3002 panic!();
3003 }
3004 if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
3005 assert_eq!(
3006 any::type_name_of_val(conn4),
3007 "sabi::data_hub::tests_data_hub::SyncDataConn"
3008 );
3009 } else {
3010 panic!();
3011 }
3012
3013 match hub.commit() {
3014 Ok(_) => panic!(),
3015 Err(err) => match err.reason::<DataHubError>() {
3016 Ok(r) => match r {
3017 DataHubError::FailToCommitDataConn { errors } => {
3018 assert_eq!(errors.len(), 1);
3019 if let Some(e) = errors.get("foo") {
3020 if let Ok(s) = e.reason::<String>() {
3021 assert_eq!(s, "VVV");
3022 } else {
3023 panic!();
3024 }
3025 } else {
3026 panic!();
3027 }
3028 }
3029 _ => panic!(),
3030 },
3031 Err(_) => panic!(),
3032 },
3033 }
3034
3035 hub.end();
3036 } else {
3037 panic!();
3038 }
3039 } else {
3040 panic!();
3041 }
3042
3043 assert_eq!(
3044 *logger.lock().unwrap(),
3045 vec![
3046 "SyncDataSrc 2 setupped",
3047 "AsyncDataSrc 1 setupped",
3048 "SyncDataSrc 4 setupped",
3049 "AsyncDataSrc 3 setupped",
3050 "AsyncDataSrc 1 created DataConn",
3051 "SyncDataSrc 2 created DataConn",
3052 "AsyncDataSrc 3 created DataConn",
3053 "SyncDataSrc 4 created DataConn",
3054 "AsyncDataConn 1 pre committed",
3055 "SyncDataConn 2 pre committed",
3056 "AsyncDataConn 3 pre committed",
3057 "SyncDataConn 4 pre committed",
3058 "AsyncDataConn 1 failed to commit",
3059 "SyncDataConn 4 closed",
3060 "SyncDataConn 4 dropped",
3061 "AsyncDataConn 3 closed",
3062 "AsyncDataConn 3 dropped",
3063 "SyncDataConn 2 closed",
3064 "SyncDataConn 2 dropped",
3065 "AsyncDataConn 1 closed",
3066 "AsyncDataConn 1 dropped",
3067 "SyncDataSrc 4 closed",
3068 "SyncDataSrc 4 dropped",
3069 "AsyncDataSrc 3 closed",
3070 "AsyncDataSrc 3 dropped",
3071 "SyncDataSrc 2 closed",
3072 "SyncDataSrc 2 dropped",
3073 "AsyncDataSrc 1 closed",
3074 "AsyncDataSrc 1 dropped",
3075 ],
3076 );
3077 }
3078
3079 #[test]
3080 fn test_commit_but_fail_local_sync() {
3081 let _unused = TEST_SEQ.lock().unwrap();
3082 clear_global_data_srcs_fixed();
3083
3084 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
3085
3086 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
3087 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
3088
3089 if let Ok(_auto_shutdown) = setup() {
3090 let mut hub = DataHub::new();
3091 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
3092 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Commit));
3093
3094 if let Ok(_) = hub.begin() {
3095 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
3096 assert_eq!(
3097 any::type_name_of_val(conn1),
3098 "sabi::data_hub::tests_data_hub::AsyncDataConn"
3099 );
3100 } else {
3101 panic!();
3102 }
3103 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
3104 assert_eq!(
3105 any::type_name_of_val(conn2),
3106 "sabi::data_hub::tests_data_hub::SyncDataConn"
3107 );
3108 } else {
3109 panic!();
3110 }
3111 if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
3112 assert_eq!(
3113 any::type_name_of_val(conn3),
3114 "sabi::data_hub::tests_data_hub::AsyncDataConn"
3115 );
3116 } else {
3117 panic!();
3118 }
3119 if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
3120 assert_eq!(
3121 any::type_name_of_val(conn4),
3122 "sabi::data_hub::tests_data_hub::SyncDataConn"
3123 );
3124 } else {
3125 panic!();
3126 }
3127
3128 match hub.commit() {
3129 Ok(_) => panic!(),
3130 Err(err) => match err.reason::<DataHubError>() {
3131 Ok(r) => match r {
3132 DataHubError::FailToCommitDataConn { errors } => {
3133 assert_eq!(errors.len(), 1);
3134 if let Some(e) = errors.get("qux") {
3135 if let Ok(s) = e.reason::<String>() {
3136 assert_eq!(s, "ZZZ");
3137 } else {
3138 panic!();
3139 }
3140 } else {
3141 panic!();
3142 }
3143 }
3144 _ => panic!(),
3145 },
3146 Err(_) => panic!(),
3147 },
3148 }
3149
3150 hub.end();
3151 } else {
3152 panic!();
3153 }
3154 } else {
3155 panic!();
3156 }
3157
3158 assert_eq!(
3159 *logger.lock().unwrap(),
3160 vec![
3161 "SyncDataSrc 2 setupped",
3162 "AsyncDataSrc 1 setupped",
3163 "SyncDataSrc 4 setupped",
3164 "AsyncDataSrc 3 setupped",
3165 "AsyncDataSrc 1 created DataConn",
3166 "SyncDataSrc 2 created DataConn",
3167 "AsyncDataSrc 3 created DataConn",
3168 "SyncDataSrc 4 created DataConn",
3169 "AsyncDataConn 1 pre committed",
3170 "SyncDataConn 2 pre committed",
3171 "AsyncDataConn 3 pre committed",
3172 "SyncDataConn 4 pre committed",
3173 "AsyncDataConn 1 committed",
3174 "SyncDataConn 2 committed",
3175 "AsyncDataConn 3 committed",
3176 "SyncDataConn 4 failed to commit",
3177 "SyncDataConn 4 closed",
3178 "SyncDataConn 4 dropped",
3179 "AsyncDataConn 3 closed",
3180 "AsyncDataConn 3 dropped",
3181 "SyncDataConn 2 closed",
3182 "SyncDataConn 2 dropped",
3183 "AsyncDataConn 1 closed",
3184 "AsyncDataConn 1 dropped",
3185 "SyncDataSrc 4 closed",
3186 "SyncDataSrc 4 dropped",
3187 "AsyncDataSrc 3 closed",
3188 "AsyncDataSrc 3 dropped",
3189 "SyncDataSrc 2 closed",
3190 "SyncDataSrc 2 dropped",
3191 "AsyncDataSrc 1 closed",
3192 "AsyncDataSrc 1 dropped",
3193 ],
3194 );
3195 }
3196
3197 #[test]
3198 fn test_commit_but_fail_local_async() {
3199 let _unused = TEST_SEQ.lock().unwrap();
3200 clear_global_data_srcs_fixed();
3201
3202 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
3203
3204 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
3205 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
3206
3207 if let Ok(_auto_shutdown) = setup() {
3208 let mut hub = DataHub::new();
3209 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Commit));
3210 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
3211
3212 if let Ok(_) = hub.begin() {
3213 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
3214 assert_eq!(
3215 any::type_name_of_val(conn1),
3216 "sabi::data_hub::tests_data_hub::AsyncDataConn"
3217 );
3218 } else {
3219 panic!();
3220 }
3221 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
3222 assert_eq!(
3223 any::type_name_of_val(conn2),
3224 "sabi::data_hub::tests_data_hub::SyncDataConn"
3225 );
3226 } else {
3227 panic!();
3228 }
3229 if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
3230 assert_eq!(
3231 any::type_name_of_val(conn3),
3232 "sabi::data_hub::tests_data_hub::AsyncDataConn"
3233 );
3234 } else {
3235 panic!();
3236 }
3237 if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
3238 assert_eq!(
3239 any::type_name_of_val(conn4),
3240 "sabi::data_hub::tests_data_hub::SyncDataConn"
3241 );
3242 } else {
3243 panic!();
3244 }
3245
3246 match hub.commit() {
3247 Ok(_) => panic!(),
3248 Err(err) => match err.reason::<DataHubError>() {
3249 Ok(r) => match r {
3250 DataHubError::FailToCommitDataConn { errors } => {
3251 assert_eq!(errors.len(), 1);
3252 if let Some(e) = errors.get("baz") {
3253 if let Ok(s) = e.reason::<String>() {
3254 assert_eq!(s, "VVV");
3255 } else {
3256 panic!();
3257 }
3258 } else {
3259 panic!();
3260 }
3261 }
3262 _ => panic!(),
3263 },
3264 Err(_) => panic!(),
3265 },
3266 }
3267
3268 hub.end();
3269 } else {
3270 panic!();
3271 }
3272 } else {
3273 panic!();
3274 }
3275
3276 assert_eq!(
3277 *logger.lock().unwrap(),
3278 vec![
3279 "SyncDataSrc 2 setupped",
3280 "AsyncDataSrc 1 setupped",
3281 "SyncDataSrc 4 setupped",
3282 "AsyncDataSrc 3 setupped",
3283 "AsyncDataSrc 1 created DataConn",
3284 "SyncDataSrc 2 created DataConn",
3285 "AsyncDataSrc 3 created DataConn",
3286 "SyncDataSrc 4 created DataConn",
3287 "AsyncDataConn 1 pre committed",
3288 "SyncDataConn 2 pre committed",
3289 "AsyncDataConn 3 pre committed",
3290 "SyncDataConn 4 pre committed",
3291 "AsyncDataConn 1 committed",
3292 "SyncDataConn 2 committed",
3293 "AsyncDataConn 3 failed to commit",
3294 "SyncDataConn 4 closed",
3295 "SyncDataConn 4 dropped",
3296 "AsyncDataConn 3 closed",
3297 "AsyncDataConn 3 dropped",
3298 "SyncDataConn 2 closed",
3299 "SyncDataConn 2 dropped",
3300 "AsyncDataConn 1 closed",
3301 "AsyncDataConn 1 dropped",
3302 "SyncDataSrc 4 closed",
3303 "SyncDataSrc 4 dropped",
3304 "AsyncDataSrc 3 closed",
3305 "AsyncDataSrc 3 dropped",
3306 "SyncDataSrc 2 closed",
3307 "SyncDataSrc 2 dropped",
3308 "AsyncDataSrc 1 closed",
3309 "AsyncDataSrc 1 dropped",
3310 ],
3311 );
3312 }
3313
3314 #[test]
3315 fn test_pre_commit_but_fail_global_sync() {
3316 let _unused = TEST_SEQ.lock().unwrap();
3317 clear_global_data_srcs_fixed();
3318
3319 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
3320
3321 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
3322 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::PreCommit));
3323
3324 if let Ok(_auto_shutdown) = setup() {
3325 let mut hub = DataHub::new();
3326 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
3327 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
3328
3329 if let Ok(_) = hub.begin() {
3330 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
3331 assert_eq!(
3332 any::type_name_of_val(conn1),
3333 "sabi::data_hub::tests_data_hub::AsyncDataConn"
3334 );
3335 } else {
3336 panic!();
3337 }
3338 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
3339 assert_eq!(
3340 any::type_name_of_val(conn2),
3341 "sabi::data_hub::tests_data_hub::SyncDataConn"
3342 );
3343 } else {
3344 panic!();
3345 }
3346 if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
3347 assert_eq!(
3348 any::type_name_of_val(conn3),
3349 "sabi::data_hub::tests_data_hub::AsyncDataConn"
3350 );
3351 } else {
3352 panic!();
3353 }
3354 if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
3355 assert_eq!(
3356 any::type_name_of_val(conn4),
3357 "sabi::data_hub::tests_data_hub::SyncDataConn"
3358 );
3359 } else {
3360 panic!();
3361 }
3362
3363 match hub.commit() {
3364 Ok(_) => panic!(),
3365 Err(err) => match err.reason::<DataHubError>() {
3366 Ok(r) => match r {
3367 DataHubError::FailToPreCommitDataConn { errors } => {
3368 assert_eq!(errors.len(), 1);
3369 if let Some(e) = errors.get("bar") {
3370 if let Ok(s) = e.reason::<String>() {
3371 assert_eq!(s, "zzz");
3372 } else {
3373 panic!();
3374 }
3375 } else {
3376 panic!();
3377 }
3378 }
3379 _ => panic!(),
3380 },
3381 Err(_) => panic!(),
3382 },
3383 }
3384
3385 hub.end();
3386 } else {
3387 panic!();
3388 }
3389 } else {
3390 panic!();
3391 }
3392
3393 assert_eq!(
3394 *logger.lock().unwrap(),
3395 vec![
3396 "SyncDataSrc 2 setupped",
3397 "AsyncDataSrc 1 setupped",
3398 "SyncDataSrc 4 setupped",
3399 "AsyncDataSrc 3 setupped",
3400 "AsyncDataSrc 1 created DataConn",
3401 "SyncDataSrc 2 created DataConn",
3402 "AsyncDataSrc 3 created DataConn",
3403 "SyncDataSrc 4 created DataConn",
3404 "AsyncDataConn 1 pre committed",
3405 "SyncDataConn 2 failed to pre commit",
3406 "SyncDataConn 4 closed",
3407 "SyncDataConn 4 dropped",
3408 "AsyncDataConn 3 closed",
3409 "AsyncDataConn 3 dropped",
3410 "SyncDataConn 2 closed",
3411 "SyncDataConn 2 dropped",
3412 "AsyncDataConn 1 closed",
3413 "AsyncDataConn 1 dropped",
3414 "SyncDataSrc 4 closed",
3415 "SyncDataSrc 4 dropped",
3416 "AsyncDataSrc 3 closed",
3417 "AsyncDataSrc 3 dropped",
3418 "SyncDataSrc 2 closed",
3419 "SyncDataSrc 2 dropped",
3420 "AsyncDataSrc 1 closed",
3421 "AsyncDataSrc 1 dropped",
3422 ],
3423 );
3424 }
3425
3426 #[test]
3427 fn test_pre_commit_but_fail_global_async() {
3428 let _unused = TEST_SEQ.lock().unwrap();
3429 clear_global_data_srcs_fixed();
3430
3431 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
3432
3433 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::PreCommit));
3434 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
3435
3436 if let Ok(_auto_shutdown) = setup() {
3437 let mut hub = DataHub::new();
3438 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
3439 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
3440
3441 if let Ok(_) = hub.begin() {
3442 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
3443 assert_eq!(
3444 any::type_name_of_val(conn1),
3445 "sabi::data_hub::tests_data_hub::AsyncDataConn"
3446 );
3447 } else {
3448 panic!();
3449 }
3450 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
3451 assert_eq!(
3452 any::type_name_of_val(conn2),
3453 "sabi::data_hub::tests_data_hub::SyncDataConn"
3454 );
3455 } else {
3456 panic!();
3457 }
3458 if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
3459 assert_eq!(
3460 any::type_name_of_val(conn3),
3461 "sabi::data_hub::tests_data_hub::AsyncDataConn"
3462 );
3463 } else {
3464 panic!();
3465 }
3466 if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
3467 assert_eq!(
3468 any::type_name_of_val(conn4),
3469 "sabi::data_hub::tests_data_hub::SyncDataConn"
3470 );
3471 } else {
3472 panic!();
3473 }
3474
3475 match hub.commit() {
3476 Ok(_) => panic!(),
3477 Err(err) => match err.reason::<DataHubError>() {
3478 Ok(r) => match r {
3479 DataHubError::FailToPreCommitDataConn { errors } => {
3480 assert_eq!(errors.len(), 1);
3481 if let Some(e) = errors.get("foo") {
3482 if let Ok(s) = e.reason::<String>() {
3483 assert_eq!(s, "vvv");
3484 } else {
3485 panic!();
3486 }
3487 } else {
3488 panic!();
3489 }
3490 }
3491 _ => panic!(),
3492 },
3493 Err(_) => panic!(),
3494 },
3495 }
3496
3497 hub.end();
3498 } else {
3499 panic!();
3500 }
3501 } else {
3502 panic!();
3503 }
3504
3505 assert_eq!(
3506 *logger.lock().unwrap(),
3507 vec![
3508 "SyncDataSrc 2 setupped",
3509 "AsyncDataSrc 1 setupped",
3510 "SyncDataSrc 4 setupped",
3511 "AsyncDataSrc 3 setupped",
3512 "AsyncDataSrc 1 created DataConn",
3513 "SyncDataSrc 2 created DataConn",
3514 "AsyncDataSrc 3 created DataConn",
3515 "SyncDataSrc 4 created DataConn",
3516 "AsyncDataConn 1 failed to pre commit",
3517 "SyncDataConn 4 closed",
3518 "SyncDataConn 4 dropped",
3519 "AsyncDataConn 3 closed",
3520 "AsyncDataConn 3 dropped",
3521 "SyncDataConn 2 closed",
3522 "SyncDataConn 2 dropped",
3523 "AsyncDataConn 1 closed",
3524 "AsyncDataConn 1 dropped",
3525 "SyncDataSrc 4 closed",
3526 "SyncDataSrc 4 dropped",
3527 "AsyncDataSrc 3 closed",
3528 "AsyncDataSrc 3 dropped",
3529 "SyncDataSrc 2 closed",
3530 "SyncDataSrc 2 dropped",
3531 "AsyncDataSrc 1 closed",
3532 "AsyncDataSrc 1 dropped",
3533 ],
3534 );
3535 }
3536
3537 #[test]
3538 fn test_pre_commit_but_fail_local_sync() {
3539 let _unused = TEST_SEQ.lock().unwrap();
3540 clear_global_data_srcs_fixed();
3541
3542 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
3543
3544 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
3545 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
3546
3547 if let Ok(_auto_shutdown) = setup() {
3548 let mut hub = DataHub::new();
3549 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
3550 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::PreCommit));
3551
3552 if let Ok(_) = hub.begin() {
3553 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
3554 assert_eq!(
3555 any::type_name_of_val(conn1),
3556 "sabi::data_hub::tests_data_hub::AsyncDataConn"
3557 );
3558 } else {
3559 panic!();
3560 }
3561 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
3562 assert_eq!(
3563 any::type_name_of_val(conn2),
3564 "sabi::data_hub::tests_data_hub::SyncDataConn"
3565 );
3566 } else {
3567 panic!();
3568 }
3569 if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
3570 assert_eq!(
3571 any::type_name_of_val(conn3),
3572 "sabi::data_hub::tests_data_hub::AsyncDataConn"
3573 );
3574 } else {
3575 panic!();
3576 }
3577 if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
3578 assert_eq!(
3579 any::type_name_of_val(conn4),
3580 "sabi::data_hub::tests_data_hub::SyncDataConn"
3581 );
3582 } else {
3583 panic!();
3584 }
3585
3586 match hub.commit() {
3587 Ok(_) => panic!(),
3588 Err(err) => match err.reason::<DataHubError>() {
3589 Ok(r) => match r {
3590 DataHubError::FailToPreCommitDataConn { errors } => {
3591 assert_eq!(errors.len(), 1);
3592 if let Some(e) = errors.get("qux") {
3593 if let Ok(s) = e.reason::<String>() {
3594 assert_eq!(s, "zzz");
3595 } else {
3596 panic!();
3597 }
3598 } else {
3599 panic!();
3600 }
3601 }
3602 _ => panic!(),
3603 },
3604 Err(_) => panic!(),
3605 },
3606 }
3607
3608 hub.end();
3609 } else {
3610 panic!();
3611 }
3612 } else {
3613 panic!();
3614 }
3615
3616 assert_eq!(
3617 *logger.lock().unwrap(),
3618 vec![
3619 "SyncDataSrc 2 setupped",
3620 "AsyncDataSrc 1 setupped",
3621 "SyncDataSrc 4 setupped",
3622 "AsyncDataSrc 3 setupped",
3623 "AsyncDataSrc 1 created DataConn",
3624 "SyncDataSrc 2 created DataConn",
3625 "AsyncDataSrc 3 created DataConn",
3626 "SyncDataSrc 4 created DataConn",
3627 "AsyncDataConn 1 pre committed",
3628 "SyncDataConn 2 pre committed",
3629 "AsyncDataConn 3 pre committed",
3630 "SyncDataConn 4 failed to pre commit",
3631 "SyncDataConn 4 closed",
3632 "SyncDataConn 4 dropped",
3633 "AsyncDataConn 3 closed",
3634 "AsyncDataConn 3 dropped",
3635 "SyncDataConn 2 closed",
3636 "SyncDataConn 2 dropped",
3637 "AsyncDataConn 1 closed",
3638 "AsyncDataConn 1 dropped",
3639 "SyncDataSrc 4 closed",
3640 "SyncDataSrc 4 dropped",
3641 "AsyncDataSrc 3 closed",
3642 "AsyncDataSrc 3 dropped",
3643 "SyncDataSrc 2 closed",
3644 "SyncDataSrc 2 dropped",
3645 "AsyncDataSrc 1 closed",
3646 "AsyncDataSrc 1 dropped",
3647 ],
3648 );
3649 }
3650
3651 #[test]
3652 fn test_pre_commit_but_fail_local_async() {
3653 let _unused = TEST_SEQ.lock().unwrap();
3654 clear_global_data_srcs_fixed();
3655
3656 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
3657
3658 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
3659 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
3660
3661 if let Ok(_auto_shutdown) = setup() {
3662 let mut hub = DataHub::new();
3663 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::PreCommit));
3664 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
3665
3666 if let Ok(_) = hub.begin() {
3667 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
3668 assert_eq!(
3669 any::type_name_of_val(conn1),
3670 "sabi::data_hub::tests_data_hub::AsyncDataConn"
3671 );
3672 } else {
3673 panic!();
3674 }
3675 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
3676 assert_eq!(
3677 any::type_name_of_val(conn2),
3678 "sabi::data_hub::tests_data_hub::SyncDataConn"
3679 );
3680 } else {
3681 panic!();
3682 }
3683 if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
3684 assert_eq!(
3685 any::type_name_of_val(conn3),
3686 "sabi::data_hub::tests_data_hub::AsyncDataConn"
3687 );
3688 } else {
3689 panic!();
3690 }
3691 if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
3692 assert_eq!(
3693 any::type_name_of_val(conn4),
3694 "sabi::data_hub::tests_data_hub::SyncDataConn"
3695 );
3696 } else {
3697 panic!();
3698 }
3699
3700 match hub.commit() {
3701 Ok(_) => panic!(),
3702 Err(err) => match err.reason::<DataHubError>() {
3703 Ok(r) => match r {
3704 DataHubError::FailToPreCommitDataConn { errors } => {
3705 assert_eq!(errors.len(), 1);
3706 if let Some(e) = errors.get("baz") {
3707 if let Ok(s) = e.reason::<String>() {
3708 assert_eq!(s, "vvv");
3709 } else {
3710 panic!();
3711 }
3712 } else {
3713 panic!();
3714 }
3715 }
3716 _ => panic!(),
3717 },
3718 Err(_) => panic!(),
3719 },
3720 }
3721
3722 hub.end();
3723 } else {
3724 panic!();
3725 }
3726 } else {
3727 panic!();
3728 }
3729
3730 assert_eq!(
3731 *logger.lock().unwrap(),
3732 vec![
3733 "SyncDataSrc 2 setupped",
3734 "AsyncDataSrc 1 setupped",
3735 "SyncDataSrc 4 setupped",
3736 "AsyncDataSrc 3 setupped",
3737 "AsyncDataSrc 1 created DataConn",
3738 "SyncDataSrc 2 created DataConn",
3739 "AsyncDataSrc 3 created DataConn",
3740 "SyncDataSrc 4 created DataConn",
3741 "AsyncDataConn 1 pre committed",
3742 "SyncDataConn 2 pre committed",
3743 "AsyncDataConn 3 failed to pre commit",
3744 "SyncDataConn 4 closed",
3745 "SyncDataConn 4 dropped",
3746 "AsyncDataConn 3 closed",
3747 "AsyncDataConn 3 dropped",
3748 "SyncDataConn 2 closed",
3749 "SyncDataConn 2 dropped",
3750 "AsyncDataConn 1 closed",
3751 "AsyncDataConn 1 dropped",
3752 "SyncDataSrc 4 closed",
3753 "SyncDataSrc 4 dropped",
3754 "AsyncDataSrc 3 closed",
3755 "AsyncDataSrc 3 dropped",
3756 "SyncDataSrc 2 closed",
3757 "SyncDataSrc 2 dropped",
3758 "AsyncDataSrc 1 closed",
3759 "AsyncDataSrc 1 dropped",
3760 ],
3761 );
3762 }
3763
3764 #[test]
3765 fn test_rollback() {
3766 let _unused = TEST_SEQ.lock().unwrap();
3767 clear_global_data_srcs_fixed();
3768
3769 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
3770
3771 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
3772 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
3773
3774 if let Ok(_auto_shutdown) = setup() {
3775 let mut hub = DataHub::new();
3776 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
3777 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
3778
3779 if let Ok(_) = hub.begin() {
3780 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
3781 assert_eq!(
3782 any::type_name_of_val(conn1),
3783 "sabi::data_hub::tests_data_hub::AsyncDataConn"
3784 );
3785 } else {
3786 panic!();
3787 }
3788 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
3789 assert_eq!(
3790 any::type_name_of_val(conn2),
3791 "sabi::data_hub::tests_data_hub::SyncDataConn"
3792 );
3793 } else {
3794 panic!();
3795 }
3796 if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
3797 assert_eq!(
3798 any::type_name_of_val(conn3),
3799 "sabi::data_hub::tests_data_hub::AsyncDataConn"
3800 );
3801 } else {
3802 panic!();
3803 }
3804 if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
3805 assert_eq!(
3806 any::type_name_of_val(conn4),
3807 "sabi::data_hub::tests_data_hub::SyncDataConn"
3808 );
3809 } else {
3810 panic!();
3811 }
3812
3813 hub.rollback();
3814 hub.end();
3815 } else {
3816 panic!();
3817 }
3818 } else {
3819 panic!();
3820 }
3821
3822 assert_eq!(
3823 *logger.lock().unwrap(),
3824 vec![
3825 "SyncDataSrc 2 setupped",
3826 "AsyncDataSrc 1 setupped",
3827 "SyncDataSrc 4 setupped",
3828 "AsyncDataSrc 3 setupped",
3829 "AsyncDataSrc 1 created DataConn",
3830 "SyncDataSrc 2 created DataConn",
3831 "AsyncDataSrc 3 created DataConn",
3832 "SyncDataSrc 4 created DataConn",
3833 "AsyncDataConn 1 rollbacked",
3834 "SyncDataConn 2 rollbacked",
3835 "AsyncDataConn 3 rollbacked",
3836 "SyncDataConn 4 rollbacked",
3837 "SyncDataConn 4 closed",
3838 "SyncDataConn 4 dropped",
3839 "AsyncDataConn 3 closed",
3840 "AsyncDataConn 3 dropped",
3841 "SyncDataConn 2 closed",
3842 "SyncDataConn 2 dropped",
3843 "AsyncDataConn 1 closed",
3844 "AsyncDataConn 1 dropped",
3845 "SyncDataSrc 4 closed",
3846 "SyncDataSrc 4 dropped",
3847 "AsyncDataSrc 3 closed",
3848 "AsyncDataSrc 3 dropped",
3849 "SyncDataSrc 2 closed",
3850 "SyncDataSrc 2 dropped",
3851 "AsyncDataSrc 1 closed",
3852 "AsyncDataSrc 1 dropped",
3853 ],
3854 );
3855 }
3856
3857 #[test]
3858 fn test_force_back() {
3859 let _unused = TEST_SEQ.lock().unwrap();
3860 clear_global_data_srcs_fixed();
3861
3862 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
3863
3864 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
3865 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
3866
3867 if let Ok(_auto_shutdown) = setup() {
3868 let mut hub = DataHub::new();
3869 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
3870 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
3871
3872 if let Ok(_) = hub.begin() {
3873 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
3874 assert_eq!(
3875 any::type_name_of_val(conn1),
3876 "sabi::data_hub::tests_data_hub::AsyncDataConn"
3877 );
3878 } else {
3879 panic!();
3880 }
3881 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
3882 assert_eq!(
3883 any::type_name_of_val(conn2),
3884 "sabi::data_hub::tests_data_hub::SyncDataConn"
3885 );
3886 } else {
3887 panic!();
3888 }
3889 if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
3890 assert_eq!(
3891 any::type_name_of_val(conn3),
3892 "sabi::data_hub::tests_data_hub::AsyncDataConn"
3893 );
3894 } else {
3895 panic!();
3896 }
3897 if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
3898 assert_eq!(
3899 any::type_name_of_val(conn4),
3900 "sabi::data_hub::tests_data_hub::SyncDataConn"
3901 );
3902 } else {
3903 panic!();
3904 }
3905
3906 assert!(hub.commit().is_ok());
3907 hub.rollback();
3908 hub.end();
3909 } else {
3910 panic!();
3911 }
3912 } else {
3913 panic!();
3914 }
3915
3916 assert_eq!(
3917 *logger.lock().unwrap(),
3918 vec![
3919 "SyncDataSrc 2 setupped",
3920 "AsyncDataSrc 1 setupped",
3921 "SyncDataSrc 4 setupped",
3922 "AsyncDataSrc 3 setupped",
3923 "AsyncDataSrc 1 created DataConn",
3924 "SyncDataSrc 2 created DataConn",
3925 "AsyncDataSrc 3 created DataConn",
3926 "SyncDataSrc 4 created DataConn",
3927 "AsyncDataConn 1 pre committed",
3928 "SyncDataConn 2 pre committed",
3929 "AsyncDataConn 3 pre committed",
3930 "SyncDataConn 4 pre committed",
3931 "AsyncDataConn 1 committed",
3932 "SyncDataConn 2 committed",
3933 "AsyncDataConn 3 committed",
3934 "SyncDataConn 4 committed",
3935 "AsyncDataConn 1 post committed",
3936 "SyncDataConn 2 post committed",
3937 "AsyncDataConn 3 post committed",
3938 "SyncDataConn 4 post committed",
3939 "AsyncDataConn 1 forced back",
3940 "SyncDataConn 2 forced back",
3941 "AsyncDataConn 3 forced back",
3942 "SyncDataConn 4 forced back",
3943 "SyncDataConn 4 closed",
3944 "SyncDataConn 4 dropped",
3945 "AsyncDataConn 3 closed",
3946 "AsyncDataConn 3 dropped",
3947 "SyncDataConn 2 closed",
3948 "SyncDataConn 2 dropped",
3949 "AsyncDataConn 1 closed",
3950 "AsyncDataConn 1 dropped",
3951 "SyncDataSrc 4 closed",
3952 "SyncDataSrc 4 dropped",
3953 "AsyncDataSrc 3 closed",
3954 "AsyncDataSrc 3 dropped",
3955 "SyncDataSrc 2 closed",
3956 "SyncDataSrc 2 dropped",
3957 "AsyncDataSrc 1 closed",
3958 "AsyncDataSrc 1 dropped",
3959 ],
3960 );
3961 }
3962
3963 #[tokio::test]
3964 async fn async_test_new_and_close_with_global_data_srcs() {
3965 let _unused = TEST_SEQ.lock().unwrap();
3966 clear_global_data_srcs_fixed();
3967
3968 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
3969
3970 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
3971 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
3972
3973 if let Ok(_auto_shutdown) = setup_async().await {
3974 #[allow(static_mut_refs)]
3975 unsafe {
3976 let ptr = GLOBAL_DATA_SRC_LIST.not_setup_head();
3977 assert!(ptr.is_null());
3978
3979 let mut ptr = GLOBAL_DATA_SRC_LIST.did_setup_head();
3980 assert!(!ptr.is_null());
3981 assert_eq!((*ptr).name, "foo");
3982 ptr = (*ptr).next;
3983 assert!(!ptr.is_null());
3984 assert_eq!((*ptr).name, "bar");
3985 ptr = (*ptr).next;
3986 assert!(ptr.is_null());
3987 }
3988
3989 let hub = DataHub::new();
3990
3991 assert!(hub.local_data_src_list.not_setup_head().is_null());
3992 assert!(hub.local_data_src_list.did_setup_head().is_null());
3993 assert!(hub.data_conn_list.head().is_null());
3994 assert_eq!(hub.data_src_map.len(), 2);
3995 assert_eq!(hub.data_conn_map.len(), 0);
3996 assert_eq!(hub.fixed, false);
3997
3998 #[allow(static_mut_refs)]
3999 let mut ptr = unsafe { GLOBAL_DATA_SRC_LIST.did_setup_head() };
4000 assert!(!ptr.is_null());
4001 ptr = unsafe { (*ptr).next };
4002 assert!(!ptr.is_null());
4003 ptr = unsafe { (*ptr).next };
4004 assert!(ptr.is_null());
4005 #[allow(static_mut_refs)]
4006 let ptr = unsafe { GLOBAL_DATA_SRC_LIST.not_setup_head() };
4007 assert!(ptr.is_null());
4008 } else {
4009 panic!();
4010 }
4011
4012 #[allow(static_mut_refs)]
4013 let ptr = unsafe { GLOBAL_DATA_SRC_LIST.did_setup_head() };
4014 assert!(ptr.is_null());
4015 #[allow(static_mut_refs)]
4016 let ptr = unsafe { GLOBAL_DATA_SRC_LIST.not_setup_head() };
4017 assert!(ptr.is_null());
4018
4019 assert_eq!(
4020 *logger.lock().unwrap(),
4021 vec![
4022 "SyncDataSrc 2 setupped",
4023 "AsyncDataSrc 1 setupped",
4024 "SyncDataSrc 2 closed",
4025 "SyncDataSrc 2 dropped",
4026 "AsyncDataSrc 1 closed",
4027 "AsyncDataSrc 1 dropped",
4028 ]
4029 );
4030 }
4031
4032 #[tokio::test]
4033 async fn async_test_uses_and_disuses() {
4034 let _unused = TEST_SEQ.lock().unwrap();
4035 clear_global_data_srcs_fixed();
4036
4037 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
4038
4039 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
4040 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
4041
4042 if let Ok(_auto_shutdown) = setup_async().await {
4043 let mut hub = DataHub::new();
4044
4045 assert!(hub.local_data_src_list.not_setup_head().is_null());
4046 assert!(hub.local_data_src_list.did_setup_head().is_null());
4047 assert!(hub.data_conn_list.head().is_null());
4048 assert_eq!(hub.data_src_map.len(), 2);
4049 assert_eq!(hub.data_conn_map.len(), 0);
4050 assert_eq!(hub.fixed, false);
4051
4052 hub.uses("baz", SyncDataSrc::new(3, logger.clone(), Fail::Not));
4053 let mut ptr = hub.local_data_src_list.not_setup_head();
4054 assert!(!ptr.is_null());
4055 ptr = unsafe { (*ptr).next };
4056 assert!(ptr.is_null());
4057 assert!(hub.local_data_src_list.did_setup_head().is_null());
4058 assert!(hub.data_conn_list.head().is_null());
4059 assert_eq!(hub.data_src_map.len(), 2);
4060 assert_eq!(hub.data_conn_map.len(), 0);
4061 assert_eq!(hub.fixed, false);
4062
4063 hub.uses("qux", AsyncDataSrc::new(4, logger.clone(), Fail::Not));
4064 let mut ptr = hub.local_data_src_list.not_setup_head();
4065 assert!(!ptr.is_null());
4066 ptr = unsafe { (*ptr).next };
4067 assert!(!ptr.is_null());
4068 ptr = unsafe { (*ptr).next };
4069 assert!(ptr.is_null());
4070 assert!(hub.local_data_src_list.did_setup_head().is_null());
4071 assert!(hub.data_conn_list.head().is_null());
4072 assert_eq!(hub.data_src_map.len(), 2);
4073 assert_eq!(hub.data_conn_map.len(), 0);
4074 assert_eq!(hub.fixed, false);
4075
4076 hub.disuses("foo"); hub.disuses("bar"); let mut ptr = hub.local_data_src_list.not_setup_head();
4079 assert!(!ptr.is_null());
4080 ptr = unsafe { (*ptr).next };
4081 assert!(!ptr.is_null());
4082 ptr = unsafe { (*ptr).next };
4083 assert!(ptr.is_null());
4084 assert!(hub.local_data_src_list.did_setup_head().is_null());
4085 assert!(hub.data_conn_list.head().is_null());
4086 assert_eq!(hub.data_src_map.len(), 2);
4087 assert_eq!(hub.data_conn_map.len(), 0);
4088 assert_eq!(hub.fixed, false);
4089
4090 hub.disuses("baz");
4091 let mut ptr = hub.local_data_src_list.not_setup_head();
4092 assert!(!ptr.is_null());
4093 ptr = unsafe { (*ptr).next };
4094 assert!(ptr.is_null());
4095 assert!(hub.local_data_src_list.did_setup_head().is_null());
4096 assert!(hub.data_conn_list.head().is_null());
4097 assert_eq!(hub.data_src_map.len(), 2);
4098 assert_eq!(hub.data_conn_map.len(), 0);
4099 assert_eq!(hub.fixed, false);
4100
4101 hub.disuses("qux");
4102 let ptr = hub.local_data_src_list.not_setup_head();
4103 assert!(ptr.is_null());
4104 assert!(hub.local_data_src_list.did_setup_head().is_null());
4105 assert!(hub.data_conn_list.head().is_null());
4106 assert_eq!(hub.data_src_map.len(), 2);
4107 assert_eq!(hub.data_conn_map.len(), 0);
4108 assert_eq!(hub.fixed, false);
4109 } else {
4110 panic!();
4111 }
4112
4113 assert_eq!(
4114 *logger.lock().unwrap(),
4115 vec![
4116 "SyncDataSrc 2 setupped",
4117 "AsyncDataSrc 1 setupped",
4118 "SyncDataSrc 3 closed",
4119 "SyncDataSrc 3 dropped",
4120 "AsyncDataSrc 4 closed",
4121 "AsyncDataSrc 4 dropped",
4122 "SyncDataSrc 2 closed",
4123 "SyncDataSrc 2 dropped",
4124 "AsyncDataSrc 1 closed",
4125 "AsyncDataSrc 1 dropped",
4126 ]
4127 );
4128 }
4129
4130 #[tokio::test]
4131 async fn async_test_cannot_add_and_remove_data_src_between_begin_and_end() {
4132 let _unused = TEST_SEQ.lock().unwrap();
4133 clear_global_data_srcs_fixed();
4134
4135 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
4136
4137 if let Ok(_auto_shutdown) = setup_async().await {
4138 let mut hub = DataHub::new();
4139
4140 let ptr = hub.local_data_src_list.not_setup_head();
4141 assert!(ptr.is_null());
4142 let ptr = hub.local_data_src_list.did_setup_head();
4143 assert!(ptr.is_null());
4144 assert!(hub.data_conn_list.head().is_null());
4145 assert_eq!(hub.data_src_map.len(), 0);
4146 assert_eq!(hub.data_conn_map.len(), 0);
4147 assert_eq!(hub.fixed, false);
4148
4149 hub.uses("baz", SyncDataSrc::new(1, logger.clone(), Fail::Not));
4150
4151 let mut ptr = hub.local_data_src_list.not_setup_head();
4152 assert!(!ptr.is_null());
4153 ptr = unsafe { (*ptr).next };
4154 assert!(ptr.is_null());
4155 let ptr = hub.local_data_src_list.did_setup_head();
4156 assert!(ptr.is_null());
4157 assert!(hub.data_conn_list.head().is_null());
4158 assert_eq!(hub.data_src_map.len(), 0);
4159 assert_eq!(hub.data_conn_map.len(), 0);
4160 assert_eq!(hub.fixed, false);
4161
4162 assert!(hub.begin_async().await.is_ok());
4163
4164 let ptr = hub.local_data_src_list.not_setup_head();
4165 assert!(ptr.is_null());
4166 let mut ptr = hub.local_data_src_list.did_setup_head();
4167 assert!(!ptr.is_null());
4168 ptr = unsafe { (*ptr).next };
4169 assert!(ptr.is_null());
4170 assert!(hub.data_conn_list.head().is_null());
4171 assert_eq!(hub.data_src_map.len(), 1);
4172 assert_eq!(hub.data_conn_map.len(), 0);
4173 assert_eq!(hub.fixed, true);
4174
4175 hub.uses("foo", AsyncDataSrc::new(2, logger.clone(), Fail::Not));
4176
4177 let ptr = hub.local_data_src_list.not_setup_head();
4178 assert!(ptr.is_null());
4179 let mut ptr = hub.local_data_src_list.did_setup_head();
4180 assert!(!ptr.is_null());
4181 ptr = unsafe { (*ptr).next };
4182 assert!(ptr.is_null());
4183 assert!(hub.data_conn_list.head().is_null());
4184 assert_eq!(hub.data_src_map.len(), 1);
4185 assert_eq!(hub.data_conn_map.len(), 0);
4186 assert_eq!(hub.fixed, true);
4187
4188 hub.disuses("baz");
4189
4190 let ptr = hub.local_data_src_list.not_setup_head();
4191 assert!(ptr.is_null());
4192 let mut ptr = hub.local_data_src_list.did_setup_head();
4193 assert!(!ptr.is_null());
4194 ptr = unsafe { (*ptr).next };
4195 assert!(ptr.is_null());
4196 assert!(hub.data_conn_list.head().is_null());
4197 assert_eq!(hub.data_src_map.len(), 1);
4198 assert_eq!(hub.data_conn_map.len(), 0);
4199 assert_eq!(hub.fixed, true);
4200
4201 hub.end();
4202
4203 let ptr = hub.local_data_src_list.not_setup_head();
4204 assert!(ptr.is_null());
4205 let mut ptr = hub.local_data_src_list.did_setup_head();
4206 assert!(!ptr.is_null());
4207 ptr = unsafe { (*ptr).next };
4208 assert!(ptr.is_null());
4209 assert!(hub.data_conn_list.head().is_null());
4210 assert_eq!(hub.data_src_map.len(), 1);
4211 assert_eq!(hub.data_conn_map.len(), 0);
4212 assert_eq!(hub.fixed, false);
4213
4214 hub.uses("foo", AsyncDataSrc::new(2, logger.clone(), Fail::Not));
4215
4216 let mut ptr = hub.local_data_src_list.not_setup_head();
4217 assert!(!ptr.is_null());
4218 ptr = unsafe { (*ptr).next };
4219 assert!(ptr.is_null());
4220 let mut ptr = hub.local_data_src_list.did_setup_head();
4221 assert!(!ptr.is_null());
4222 ptr = unsafe { (*ptr).next };
4223 assert!(ptr.is_null());
4224 assert!(hub.data_conn_list.head().is_null());
4225 assert_eq!(hub.data_src_map.len(), 1);
4226 assert_eq!(hub.data_conn_map.len(), 0);
4227 assert_eq!(hub.fixed, false);
4228
4229 hub.disuses("baz");
4230
4231 let mut ptr = hub.local_data_src_list.not_setup_head();
4232 assert!(!ptr.is_null());
4233 ptr = unsafe { (*ptr).next };
4234 assert!(ptr.is_null());
4235 let ptr = hub.local_data_src_list.did_setup_head();
4236 assert!(ptr.is_null());
4237 assert!(hub.data_conn_list.head().is_null());
4238 assert_eq!(hub.data_src_map.len(), 0);
4239 assert_eq!(hub.data_conn_map.len(), 0);
4240 assert_eq!(hub.fixed, false);
4241 }
4242 }
4243
4244 #[tokio::test]
4245 async fn async_test_begin_and_end() {
4246 let _unused = TEST_SEQ.lock().unwrap();
4247 clear_global_data_srcs_fixed();
4248
4249 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
4250
4251 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
4252 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
4253
4254 if let Ok(_auto_shutdown) = setup_async().await {
4255 let mut hub = DataHub::new();
4256
4257 hub.uses("baz", SyncDataSrc::new(3, logger.clone(), Fail::Not));
4258 hub.uses("qux", AsyncDataSrc::new(4, logger.clone(), Fail::Not));
4259
4260 let mut ptr = hub.local_data_src_list.not_setup_head();
4261 assert!(!ptr.is_null());
4262 ptr = unsafe { (*ptr).next };
4263 assert!(!ptr.is_null());
4264 ptr = unsafe { (*ptr).next };
4265 assert!(ptr.is_null());
4266 assert!(hub.local_data_src_list.did_setup_head().is_null());
4267 assert!(hub.data_conn_list.head().is_null());
4268 assert_eq!(hub.data_src_map.len(), 2);
4269 assert_eq!(hub.data_conn_map.len(), 0);
4270 assert_eq!(hub.fixed, false);
4271
4272 assert!(hub.begin_async().await.is_ok());
4273
4274 assert!(hub.local_data_src_list.not_setup_head().is_null());
4275 let mut ptr = hub.local_data_src_list.did_setup_head();
4276 assert!(!ptr.is_null());
4277 ptr = unsafe { (*ptr).next };
4278 assert!(!ptr.is_null());
4279 ptr = unsafe { (*ptr).next };
4280 assert!(ptr.is_null());
4281 assert!(hub.data_conn_list.head().is_null());
4282 assert_eq!(hub.data_src_map.len(), 4);
4283 assert_eq!(hub.data_conn_map.len(), 0);
4284 assert_eq!(hub.fixed, true);
4285
4286 hub.end();
4287
4288 assert!(hub.local_data_src_list.not_setup_head().is_null());
4289 let mut ptr = hub.local_data_src_list.did_setup_head();
4290 assert!(!ptr.is_null());
4291 ptr = unsafe { (*ptr).next };
4292 assert!(!ptr.is_null());
4293 ptr = unsafe { (*ptr).next };
4294 assert!(ptr.is_null());
4295 assert!(hub.data_conn_list.head().is_null());
4296 assert_eq!(hub.data_src_map.len(), 4);
4297 assert_eq!(hub.data_conn_map.len(), 0);
4298 assert_eq!(hub.fixed, false);
4299 }
4300
4301 assert_eq!(
4302 *logger.lock().unwrap(),
4303 vec![
4304 "SyncDataSrc 2 setupped",
4305 "AsyncDataSrc 1 setupped",
4306 "SyncDataSrc 3 setupped",
4307 "AsyncDataSrc 4 setupped",
4308 "AsyncDataSrc 4 closed",
4309 "AsyncDataSrc 4 dropped",
4310 "SyncDataSrc 3 closed",
4311 "SyncDataSrc 3 dropped",
4312 "SyncDataSrc 2 closed",
4313 "SyncDataSrc 2 dropped",
4314 "AsyncDataSrc 1 closed",
4315 "AsyncDataSrc 1 dropped",
4316 ]
4317 );
4318 }
4319
4320 #[tokio::test]
4321 async fn async_test_begin_and_end_but_fail_sync() {
4322 let _unused = TEST_SEQ.lock().unwrap();
4323 clear_global_data_srcs_fixed();
4324
4325 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
4326
4327 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
4328 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
4329
4330 if let Ok(_auto_shutdown) = setup_async().await {
4331 let mut hub = DataHub::new();
4332 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
4333 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Setup));
4334
4335 if let Err(err) = hub.begin_async().await {
4336 match err.reason::<DataHubError>() {
4337 Ok(r) => match r {
4338 DataHubError::FailToSetupLocalDataSrcs { errors } => {
4339 assert_eq!(errors.len(), 1);
4340 if let Some(err) = errors.get("qux") {
4341 match err.reason::<String>() {
4342 Ok(s) => assert_eq!(s, "XXX"),
4343 Err(_) => panic!(),
4344 }
4345 } else {
4346 panic!();
4347 }
4348 }
4349 _ => panic!(),
4350 },
4351 Err(_) => panic!(),
4352 }
4353 } else {
4354 panic!();
4355 }
4356
4357 let mut ptr = hub.local_data_src_list.not_setup_head();
4358 assert!(!ptr.is_null());
4359 ptr = unsafe { (*ptr).next };
4360 assert!(ptr.is_null());
4361 let mut ptr = hub.local_data_src_list.did_setup_head();
4362 assert!(!ptr.is_null());
4363 ptr = unsafe { (*ptr).next };
4364 assert!(ptr.is_null());
4365 assert!(hub.data_conn_list.head().is_null());
4366 assert_eq!(hub.data_src_map.len(), 3);
4367 assert_eq!(hub.data_conn_map.len(), 0);
4368 assert_eq!(hub.fixed, true);
4369
4370 hub.end();
4371
4372 let mut ptr = hub.local_data_src_list.not_setup_head();
4373 assert!(!ptr.is_null());
4374 ptr = unsafe { (*ptr).next };
4375 assert!(ptr.is_null());
4376 let mut ptr = hub.local_data_src_list.did_setup_head();
4377 assert!(!ptr.is_null());
4378 ptr = unsafe { (*ptr).next };
4379 assert!(ptr.is_null());
4380 assert!(hub.data_conn_list.head().is_null());
4381 assert_eq!(hub.data_src_map.len(), 3);
4382 assert_eq!(hub.data_conn_map.len(), 0);
4383 assert_eq!(hub.fixed, false);
4384 } else {
4385 panic!();
4386 }
4387 }
4388
4389 #[tokio::test]
4390 async fn async_test_begin_and_end_but_fail_async() {
4391 let _unused = TEST_SEQ.lock().unwrap();
4392 clear_global_data_srcs_fixed();
4393
4394 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
4395
4396 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
4397 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
4398
4399 if let Ok(_auto_shutdown) = setup_async().await {
4400 let mut hub = DataHub::new();
4401 hub.uses("baz", AsyncDataSrc::new(1, logger.clone(), Fail::Setup));
4402 hub.uses("qux", SyncDataSrc::new(2, logger.clone(), Fail::Not));
4403
4404 if let Err(err) = hub.begin_async().await {
4405 match err.reason::<DataHubError>() {
4406 Ok(r) => match r {
4407 DataHubError::FailToSetupLocalDataSrcs { errors } => {
4408 assert_eq!(errors.len(), 1);
4409 if let Some(err) = errors.get("baz") {
4410 match err.reason::<String>() {
4411 Ok(s) => assert_eq!(s, "YYY"),
4412 Err(_) => panic!(),
4413 }
4414 } else {
4415 panic!();
4416 }
4417 }
4418 _ => panic!(),
4419 },
4420 Err(_) => panic!(),
4421 }
4422 } else {
4423 panic!();
4424 }
4425
4426 let mut ptr = hub.local_data_src_list.not_setup_head();
4427 assert!(!ptr.is_null());
4428 ptr = unsafe { (*ptr).next };
4429 assert!(ptr.is_null());
4430 let mut ptr = hub.local_data_src_list.did_setup_head();
4431 assert!(!ptr.is_null());
4432 ptr = unsafe { (*ptr).next };
4433 assert!(ptr.is_null());
4434 assert!(hub.data_conn_list.head().is_null());
4435 assert_eq!(hub.data_src_map.len(), 3);
4436 assert_eq!(hub.data_conn_map.len(), 0);
4437 assert_eq!(hub.fixed, true);
4438
4439 hub.end();
4440
4441 let mut ptr = hub.local_data_src_list.not_setup_head();
4442 assert!(!ptr.is_null());
4443 ptr = unsafe { (*ptr).next };
4444 assert!(ptr.is_null());
4445 let mut ptr = hub.local_data_src_list.did_setup_head();
4446 assert!(!ptr.is_null());
4447 ptr = unsafe { (*ptr).next };
4448 assert!(ptr.is_null());
4449 assert!(hub.data_conn_list.head().is_null());
4450 assert_eq!(hub.data_src_map.len(), 3);
4451 assert_eq!(hub.data_conn_map.len(), 0);
4452 assert_eq!(hub.fixed, false);
4453 } else {
4454 panic!();
4455 }
4456 }
4457
4458 #[tokio::test]
4459 async fn async_test_commit_and_post_commit() {
4460 let _unused = TEST_SEQ.lock().unwrap();
4461 clear_global_data_srcs_fixed();
4462
4463 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
4464
4465 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
4466 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
4467
4468 if let Ok(_auto_shutdown) = setup_async().await {
4469 let mut hub = DataHub::new();
4470 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
4471 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
4472
4473 if let Ok(_) = hub.begin_async().await {
4474 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
4475 assert_eq!(
4476 any::type_name_of_val(conn1),
4477 "sabi::data_hub::tests_data_hub::AsyncDataConn"
4478 );
4479 } else {
4480 panic!();
4481 }
4482 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
4483 assert_eq!(
4484 any::type_name_of_val(conn2),
4485 "sabi::data_hub::tests_data_hub::SyncDataConn"
4486 );
4487 } else {
4488 panic!();
4489 }
4490 if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
4491 assert_eq!(
4492 any::type_name_of_val(conn3),
4493 "sabi::data_hub::tests_data_hub::AsyncDataConn"
4494 );
4495 } else {
4496 panic!();
4497 }
4498 if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
4499 assert_eq!(
4500 any::type_name_of_val(conn4),
4501 "sabi::data_hub::tests_data_hub::SyncDataConn"
4502 );
4503 } else {
4504 panic!();
4505 }
4506
4507 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
4508 assert_eq!(
4509 any::type_name_of_val(conn1),
4510 "sabi::data_hub::tests_data_hub::AsyncDataConn"
4511 );
4512 } else {
4513 panic!();
4514 }
4515 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
4516 assert_eq!(
4517 any::type_name_of_val(conn2),
4518 "sabi::data_hub::tests_data_hub::SyncDataConn"
4519 );
4520 } else {
4521 panic!();
4522 }
4523 if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
4524 assert_eq!(
4525 any::type_name_of_val(conn3),
4526 "sabi::data_hub::tests_data_hub::AsyncDataConn"
4527 );
4528 } else {
4529 panic!();
4530 }
4531 if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
4532 assert_eq!(
4533 any::type_name_of_val(conn4),
4534 "sabi::data_hub::tests_data_hub::SyncDataConn"
4535 );
4536 } else {
4537 panic!();
4538 }
4539
4540 assert!(hub.commit_async().await.is_ok());
4541 hub.end();
4542 } else {
4543 panic!();
4544 }
4545 } else {
4546 panic!();
4547 }
4548
4549 assert_eq!(
4550 *logger.lock().unwrap(),
4551 vec![
4552 "SyncDataSrc 2 setupped",
4553 "AsyncDataSrc 1 setupped",
4554 "SyncDataSrc 4 setupped",
4555 "AsyncDataSrc 3 setupped",
4556 "AsyncDataSrc 1 created DataConn",
4557 "SyncDataSrc 2 created DataConn",
4558 "AsyncDataSrc 3 created DataConn",
4559 "SyncDataSrc 4 created DataConn",
4560 "AsyncDataConn 1 pre committed",
4561 "SyncDataConn 2 pre committed",
4562 "AsyncDataConn 3 pre committed",
4563 "SyncDataConn 4 pre committed",
4564 "AsyncDataConn 1 committed",
4565 "SyncDataConn 2 committed",
4566 "AsyncDataConn 3 committed",
4567 "SyncDataConn 4 committed",
4568 "AsyncDataConn 1 post committed",
4569 "SyncDataConn 2 post committed",
4570 "AsyncDataConn 3 post committed",
4571 "SyncDataConn 4 post committed",
4572 "SyncDataConn 4 closed",
4573 "SyncDataConn 4 dropped",
4574 "AsyncDataConn 3 closed",
4575 "AsyncDataConn 3 dropped",
4576 "SyncDataConn 2 closed",
4577 "SyncDataConn 2 dropped",
4578 "AsyncDataConn 1 closed",
4579 "AsyncDataConn 1 dropped",
4580 "SyncDataSrc 4 closed",
4581 "SyncDataSrc 4 dropped",
4582 "AsyncDataSrc 3 closed",
4583 "AsyncDataSrc 3 dropped",
4584 "SyncDataSrc 2 closed",
4585 "SyncDataSrc 2 dropped",
4586 "AsyncDataSrc 1 closed",
4587 "AsyncDataSrc 1 dropped",
4588 ],
4589 );
4590 }
4591
4592 #[tokio::test]
4593 async fn async_test_fail_to_cast_new_data_conn() {
4594 let _unused = TEST_SEQ.lock().unwrap();
4595 clear_global_data_srcs_fixed();
4596
4597 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
4598
4599 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
4600
4601 if let Ok(_auto_shutdown) = setup_async().await {
4602 let mut hub = DataHub::new();
4603 hub.uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
4604
4605 if let Ok(_) = hub.begin_async().await {
4606 if let Err(err) = hub.get_data_conn::<SyncDataConn>("foo") {
4607 match err.reason::<DataHubError>() {
4608 Ok(r) => match r {
4609 DataHubError::FailToCastDataConn { name, cast_to_type } => {
4610 assert_eq!(name, "foo");
4611 assert_eq!(
4612 *cast_to_type,
4613 "sabi::data_hub::tests_data_hub::SyncDataConn"
4614 );
4615 }
4616 _ => panic!(),
4617 },
4618 Err(_) => panic!(),
4619 }
4620 } else {
4621 panic!();
4622 }
4623
4624 if let Err(err) = hub.get_data_conn::<AsyncDataConn>("bar") {
4625 match err.reason::<DataHubError>() {
4626 Ok(r) => match r {
4627 DataHubError::FailToCastDataConn { name, cast_to_type } => {
4628 assert_eq!(name, "bar");
4629 assert_eq!(
4630 *cast_to_type,
4631 "sabi::data_hub::tests_data_hub::AsyncDataConn"
4632 );
4633 }
4634 _ => panic!(),
4635 },
4636 Err(_) => panic!(),
4637 }
4638 } else {
4639 panic!();
4640 }
4641 } else {
4642 panic!();
4643 }
4644 } else {
4645 panic!();
4646 }
4647
4648 assert_eq!(
4649 *logger.lock().unwrap(),
4650 vec![
4651 "AsyncDataSrc 1 setupped",
4652 "SyncDataSrc 2 setupped",
4653 "SyncDataSrc 2 closed",
4654 "SyncDataSrc 2 dropped",
4655 "AsyncDataSrc 1 closed",
4656 "AsyncDataSrc 1 dropped",
4657 ],
4658 );
4659 }
4660
4661 #[tokio::test]
4662 async fn async_test_fail_to_cast_reused_data_conn() {
4663 let _unused = TEST_SEQ.lock().unwrap();
4664 clear_global_data_srcs_fixed();
4665
4666 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
4667
4668 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
4669
4670 if let Ok(_auto_shutdown) = setup_async().await {
4671 let mut hub = DataHub::new();
4672 hub.uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
4673
4674 if let Ok(_) = hub.begin_async().await {
4675 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
4676 assert_eq!(
4677 any::type_name_of_val(conn1),
4678 "sabi::data_hub::tests_data_hub::AsyncDataConn"
4679 );
4680 } else {
4681 panic!();
4682 }
4683 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
4684 assert_eq!(
4685 any::type_name_of_val(conn2),
4686 "sabi::data_hub::tests_data_hub::SyncDataConn"
4687 );
4688 } else {
4689 panic!();
4690 }
4691
4692 if let Err(err) = hub.get_data_conn::<SyncDataConn>("foo") {
4693 match err.reason::<DataHubError>() {
4694 Ok(r) => match r {
4695 DataHubError::FailToCastDataConn { name, cast_to_type } => {
4696 assert_eq!(name, "foo");
4697 assert_eq!(
4698 *cast_to_type,
4699 "sabi::data_hub::tests_data_hub::SyncDataConn"
4700 );
4701 }
4702 _ => panic!(),
4703 },
4704 Err(_) => panic!(),
4705 }
4706 } else {
4707 panic!();
4708 }
4709
4710 if let Err(err) = hub.get_data_conn::<AsyncDataConn>("bar") {
4711 match err.reason::<DataHubError>() {
4712 Ok(r) => match r {
4713 DataHubError::FailToCastDataConn { name, cast_to_type } => {
4714 assert_eq!(name, "bar");
4715 assert_eq!(
4716 *cast_to_type,
4717 "sabi::data_hub::tests_data_hub::AsyncDataConn"
4718 );
4719 }
4720 _ => panic!(),
4721 },
4722 Err(_) => panic!(),
4723 }
4724 } else {
4725 panic!();
4726 }
4727 } else {
4728 panic!();
4729 }
4730 } else {
4731 panic!();
4732 }
4733
4734 assert_eq!(
4735 *logger.lock().unwrap(),
4736 vec![
4737 "AsyncDataSrc 1 setupped",
4738 "SyncDataSrc 2 setupped",
4739 "AsyncDataSrc 1 created DataConn",
4740 "SyncDataSrc 2 created DataConn",
4741 "SyncDataConn 2 closed",
4742 "SyncDataConn 2 dropped",
4743 "AsyncDataConn 1 closed",
4744 "AsyncDataConn 1 dropped",
4745 "SyncDataSrc 2 closed",
4746 "SyncDataSrc 2 dropped",
4747 "AsyncDataSrc 1 closed",
4748 "AsyncDataSrc 1 dropped",
4749 ],
4750 );
4751 }
4752
4753 #[tokio::test]
4754 async fn async_test_fail_to_create_data_conn() {
4755 let _unused = TEST_SEQ.lock().unwrap();
4756 clear_global_data_srcs_fixed();
4757
4758 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
4759
4760 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
4761
4762 if let Ok(_auto_shutdown) = setup_async().await {
4763 let mut hub = DataHub::new();
4764 hub.uses(
4765 "bar",
4766 SyncDataSrc::new(2, logger.clone(), Fail::CreateDataConn),
4767 );
4768
4769 if let Ok(_) = hub.begin_async().await {
4770 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
4771 assert_eq!(
4772 any::type_name_of_val(conn1),
4773 "sabi::data_hub::tests_data_hub::AsyncDataConn"
4774 );
4775 } else {
4776 panic!();
4777 }
4778
4779 if let Err(err) = hub.get_data_conn::<SyncDataConn>("bar") {
4780 match err.reason::<DataHubError>() {
4781 Ok(r) => match r {
4782 DataHubError::FailToCreateDataConn {
4783 name,
4784 data_conn_type,
4785 } => {
4786 assert_eq!(name, "bar");
4787 assert_eq!(
4788 *data_conn_type,
4789 "sabi::data_hub::tests_data_hub::SyncDataConn"
4790 );
4791 }
4792 _ => panic!(),
4793 },
4794 Err(_) => panic!(),
4795 }
4796 match err.source() {
4797 Some(e) => {
4798 assert_eq!(
4799 e.downcast_ref::<errs::Err>()
4800 .unwrap()
4801 .reason::<String>()
4802 .unwrap(),
4803 "xxx"
4804 );
4805 }
4806 None => panic!(),
4807 }
4808 } else {
4809 panic!();
4810 }
4811 } else {
4812 panic!();
4813 }
4814 } else {
4815 panic!();
4816 }
4817
4818 assert_eq!(
4819 *logger.lock().unwrap(),
4820 vec![
4821 "AsyncDataSrc 1 setupped",
4822 "SyncDataSrc 2 setupped",
4823 "AsyncDataSrc 1 created DataConn",
4824 "SyncDataSrc 2 failed to create a DataConn",
4825 "AsyncDataConn 1 closed",
4826 "AsyncDataConn 1 dropped",
4827 "SyncDataSrc 2 closed",
4828 "SyncDataSrc 2 dropped",
4829 "AsyncDataSrc 1 closed",
4830 "AsyncDataSrc 1 dropped",
4831 ],
4832 );
4833 }
4834
4835 #[tokio::test]
4836 async fn async_test_fail_to_create_data_conn_because_of_no_data_src() {
4837 let _unused = TEST_SEQ.lock().unwrap();
4838 clear_global_data_srcs_fixed();
4839
4840 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
4841
4842 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
4843
4844 if let Ok(_auto_shutdown) = setup_async().await {
4845 let mut hub = DataHub::new();
4846 hub.uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
4847
4848 if let Ok(_) = hub.begin_async().await {
4849 if let Err(err) = hub.get_data_conn::<SyncDataConn>("baz") {
4850 match err.reason::<DataHubError>() {
4851 Ok(r) => match r {
4852 DataHubError::NoDataSrcToCreateDataConn {
4853 name,
4854 data_conn_type,
4855 } => {
4856 assert_eq!(name, "baz");
4857 assert_eq!(
4858 *data_conn_type,
4859 "sabi::data_hub::tests_data_hub::SyncDataConn"
4860 );
4861 }
4862 _ => panic!(),
4863 },
4864 Err(_) => panic!(),
4865 }
4866 } else {
4867 panic!();
4868 }
4869
4870 if let Err(err) = hub.get_data_conn::<AsyncDataConn>("qux") {
4871 match err.reason::<DataHubError>() {
4872 Ok(r) => match r {
4873 DataHubError::NoDataSrcToCreateDataConn {
4874 name,
4875 data_conn_type,
4876 } => {
4877 assert_eq!(name, "qux");
4878 assert_eq!(
4879 *data_conn_type,
4880 "sabi::data_hub::tests_data_hub::AsyncDataConn"
4881 );
4882 }
4883 _ => panic!(),
4884 },
4885 Err(_) => panic!(),
4886 }
4887 } else {
4888 panic!();
4889 }
4890 } else {
4891 panic!();
4892 }
4893 } else {
4894 panic!();
4895 }
4896
4897 assert_eq!(
4898 *logger.lock().unwrap(),
4899 vec![
4900 "AsyncDataSrc 1 setupped",
4901 "SyncDataSrc 2 setupped",
4902 "SyncDataSrc 2 closed",
4903 "SyncDataSrc 2 dropped",
4904 "AsyncDataSrc 1 closed",
4905 "AsyncDataSrc 1 dropped",
4906 ],
4907 );
4908 }
4909
4910 #[tokio::test]
4911 async fn async_test_commit_and_post_commit_when_no_data_conn() {
4912 let _unused = TEST_SEQ.lock().unwrap();
4913 clear_global_data_srcs_fixed();
4914
4915 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
4916
4917 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
4918 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
4919
4920 if let Ok(_auto_shutdown) = setup_async().await {
4921 let mut hub = DataHub::new();
4922 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
4923 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
4924
4925 if let Ok(_) = hub.begin_async().await {
4926 assert!(hub.commit_async().await.is_ok());
4927 hub.end();
4928 } else {
4929 panic!();
4930 }
4931 } else {
4932 panic!();
4933 }
4934
4935 assert_eq!(
4936 *logger.lock().unwrap(),
4937 vec![
4938 "SyncDataSrc 2 setupped",
4939 "AsyncDataSrc 1 setupped",
4940 "SyncDataSrc 4 setupped",
4941 "AsyncDataSrc 3 setupped",
4942 "SyncDataSrc 4 closed",
4943 "SyncDataSrc 4 dropped",
4944 "AsyncDataSrc 3 closed",
4945 "AsyncDataSrc 3 dropped",
4946 "SyncDataSrc 2 closed",
4947 "SyncDataSrc 2 dropped",
4948 "AsyncDataSrc 1 closed",
4949 "AsyncDataSrc 1 dropped",
4950 ],
4951 );
4952 }
4953
4954 #[tokio::test]
4955 async fn async_test_commit_and_but_fail_global_sync() {
4956 let _unused = TEST_SEQ.lock().unwrap();
4957 clear_global_data_srcs_fixed();
4958
4959 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
4960
4961 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
4962 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Commit));
4963
4964 if let Ok(_auto_shutdown) = setup_async().await {
4965 let mut hub = DataHub::new();
4966 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
4967 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
4968
4969 if let Ok(_) = hub.begin_async().await {
4970 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
4971 assert_eq!(
4972 any::type_name_of_val(conn1),
4973 "sabi::data_hub::tests_data_hub::AsyncDataConn"
4974 );
4975 } else {
4976 panic!();
4977 }
4978 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
4979 assert_eq!(
4980 any::type_name_of_val(conn2),
4981 "sabi::data_hub::tests_data_hub::SyncDataConn"
4982 );
4983 } else {
4984 panic!();
4985 }
4986 if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
4987 assert_eq!(
4988 any::type_name_of_val(conn3),
4989 "sabi::data_hub::tests_data_hub::AsyncDataConn"
4990 );
4991 } else {
4992 panic!();
4993 }
4994 if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
4995 assert_eq!(
4996 any::type_name_of_val(conn4),
4997 "sabi::data_hub::tests_data_hub::SyncDataConn"
4998 );
4999 } else {
5000 panic!();
5001 }
5002
5003 match hub.commit_async().await {
5004 Ok(_) => panic!(),
5005 Err(err) => match err.reason::<DataHubError>() {
5006 Ok(r) => match r {
5007 DataHubError::FailToCommitDataConn { errors } => {
5008 assert_eq!(errors.len(), 1);
5009 if let Some(e) = errors.get("bar") {
5010 if let Ok(s) = e.reason::<String>() {
5011 assert_eq!(s, "ZZZ");
5012 } else {
5013 panic!();
5014 }
5015 } else {
5016 panic!();
5017 }
5018 }
5019 _ => panic!(),
5020 },
5021 Err(_) => panic!(),
5022 },
5023 }
5024
5025 hub.end();
5026 } else {
5027 panic!();
5028 }
5029 } else {
5030 panic!();
5031 }
5032
5033 assert_eq!(
5034 *logger.lock().unwrap(),
5035 vec![
5036 "SyncDataSrc 2 setupped",
5037 "AsyncDataSrc 1 setupped",
5038 "SyncDataSrc 4 setupped",
5039 "AsyncDataSrc 3 setupped",
5040 "AsyncDataSrc 1 created DataConn",
5041 "SyncDataSrc 2 created DataConn",
5042 "AsyncDataSrc 3 created DataConn",
5043 "SyncDataSrc 4 created DataConn",
5044 "AsyncDataConn 1 pre committed",
5045 "SyncDataConn 2 pre committed",
5046 "AsyncDataConn 3 pre committed",
5047 "SyncDataConn 4 pre committed",
5048 "AsyncDataConn 1 committed",
5049 "SyncDataConn 2 failed to commit",
5050 "SyncDataConn 4 closed",
5051 "SyncDataConn 4 dropped",
5052 "AsyncDataConn 3 closed",
5053 "AsyncDataConn 3 dropped",
5054 "SyncDataConn 2 closed",
5055 "SyncDataConn 2 dropped",
5056 "AsyncDataConn 1 closed",
5057 "AsyncDataConn 1 dropped",
5058 "SyncDataSrc 4 closed",
5059 "SyncDataSrc 4 dropped",
5060 "AsyncDataSrc 3 closed",
5061 "AsyncDataSrc 3 dropped",
5062 "SyncDataSrc 2 closed",
5063 "SyncDataSrc 2 dropped",
5064 "AsyncDataSrc 1 closed",
5065 "AsyncDataSrc 1 dropped",
5066 ],
5067 );
5068 }
5069
5070 #[tokio::test]
5071 async fn async_test_commit_but_fail_global_async() {
5072 let _unused = TEST_SEQ.lock().unwrap();
5073 clear_global_data_srcs_fixed();
5074
5075 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
5076
5077 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Commit));
5078 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
5079
5080 if let Ok(_auto_shutdown) = setup_async().await {
5081 let mut hub = DataHub::new();
5082 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
5083 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
5084
5085 if let Ok(_) = hub.begin_async().await {
5086 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
5087 assert_eq!(
5088 any::type_name_of_val(conn1),
5089 "sabi::data_hub::tests_data_hub::AsyncDataConn"
5090 );
5091 } else {
5092 panic!();
5093 }
5094 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
5095 assert_eq!(
5096 any::type_name_of_val(conn2),
5097 "sabi::data_hub::tests_data_hub::SyncDataConn"
5098 );
5099 } else {
5100 panic!();
5101 }
5102 if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
5103 assert_eq!(
5104 any::type_name_of_val(conn3),
5105 "sabi::data_hub::tests_data_hub::AsyncDataConn"
5106 );
5107 } else {
5108 panic!();
5109 }
5110 if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
5111 assert_eq!(
5112 any::type_name_of_val(conn4),
5113 "sabi::data_hub::tests_data_hub::SyncDataConn"
5114 );
5115 } else {
5116 panic!();
5117 }
5118
5119 match hub.commit_async().await {
5120 Ok(_) => panic!(),
5121 Err(err) => match err.reason::<DataHubError>() {
5122 Ok(r) => match r {
5123 DataHubError::FailToCommitDataConn { errors } => {
5124 assert_eq!(errors.len(), 1);
5125 if let Some(e) = errors.get("foo") {
5126 if let Ok(s) = e.reason::<String>() {
5127 assert_eq!(s, "VVV");
5128 } else {
5129 panic!();
5130 }
5131 } else {
5132 panic!();
5133 }
5134 }
5135 _ => panic!(),
5136 },
5137 Err(_) => panic!(),
5138 },
5139 }
5140
5141 hub.end();
5142 } else {
5143 panic!();
5144 }
5145 } else {
5146 panic!();
5147 }
5148
5149 assert_eq!(
5150 *logger.lock().unwrap(),
5151 vec![
5152 "SyncDataSrc 2 setupped",
5153 "AsyncDataSrc 1 setupped",
5154 "SyncDataSrc 4 setupped",
5155 "AsyncDataSrc 3 setupped",
5156 "AsyncDataSrc 1 created DataConn",
5157 "SyncDataSrc 2 created DataConn",
5158 "AsyncDataSrc 3 created DataConn",
5159 "SyncDataSrc 4 created DataConn",
5160 "AsyncDataConn 1 pre committed",
5161 "SyncDataConn 2 pre committed",
5162 "AsyncDataConn 3 pre committed",
5163 "SyncDataConn 4 pre committed",
5164 "AsyncDataConn 1 failed to commit",
5165 "SyncDataConn 4 closed",
5166 "SyncDataConn 4 dropped",
5167 "AsyncDataConn 3 closed",
5168 "AsyncDataConn 3 dropped",
5169 "SyncDataConn 2 closed",
5170 "SyncDataConn 2 dropped",
5171 "AsyncDataConn 1 closed",
5172 "AsyncDataConn 1 dropped",
5173 "SyncDataSrc 4 closed",
5174 "SyncDataSrc 4 dropped",
5175 "AsyncDataSrc 3 closed",
5176 "AsyncDataSrc 3 dropped",
5177 "SyncDataSrc 2 closed",
5178 "SyncDataSrc 2 dropped",
5179 "AsyncDataSrc 1 closed",
5180 "AsyncDataSrc 1 dropped",
5181 ],
5182 );
5183 }
5184
5185 #[tokio::test]
5186 async fn async_test_commit_but_fail_local_sync() {
5187 let _unused = TEST_SEQ.lock().unwrap();
5188 clear_global_data_srcs_fixed();
5189
5190 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
5191
5192 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
5193 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
5194
5195 if let Ok(_auto_shutdown) = setup_async().await {
5196 let mut hub = DataHub::new();
5197 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
5198 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Commit));
5199
5200 if let Ok(_) = hub.begin_async().await {
5201 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
5202 assert_eq!(
5203 any::type_name_of_val(conn1),
5204 "sabi::data_hub::tests_data_hub::AsyncDataConn"
5205 );
5206 } else {
5207 panic!();
5208 }
5209 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
5210 assert_eq!(
5211 any::type_name_of_val(conn2),
5212 "sabi::data_hub::tests_data_hub::SyncDataConn"
5213 );
5214 } else {
5215 panic!();
5216 }
5217 if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
5218 assert_eq!(
5219 any::type_name_of_val(conn3),
5220 "sabi::data_hub::tests_data_hub::AsyncDataConn"
5221 );
5222 } else {
5223 panic!();
5224 }
5225 if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
5226 assert_eq!(
5227 any::type_name_of_val(conn4),
5228 "sabi::data_hub::tests_data_hub::SyncDataConn"
5229 );
5230 } else {
5231 panic!();
5232 }
5233
5234 match hub.commit_async().await {
5235 Ok(_) => panic!(),
5236 Err(err) => match err.reason::<DataHubError>() {
5237 Ok(r) => match r {
5238 DataHubError::FailToCommitDataConn { errors } => {
5239 assert_eq!(errors.len(), 1);
5240 if let Some(e) = errors.get("qux") {
5241 if let Ok(s) = e.reason::<String>() {
5242 assert_eq!(s, "ZZZ");
5243 } else {
5244 panic!();
5245 }
5246 } else {
5247 panic!();
5248 }
5249 }
5250 _ => panic!(),
5251 },
5252 Err(_) => panic!(),
5253 },
5254 }
5255
5256 hub.end();
5257 } else {
5258 panic!();
5259 }
5260 } else {
5261 panic!();
5262 }
5263
5264 assert_eq!(
5265 *logger.lock().unwrap(),
5266 vec![
5267 "SyncDataSrc 2 setupped",
5268 "AsyncDataSrc 1 setupped",
5269 "SyncDataSrc 4 setupped",
5270 "AsyncDataSrc 3 setupped",
5271 "AsyncDataSrc 1 created DataConn",
5272 "SyncDataSrc 2 created DataConn",
5273 "AsyncDataSrc 3 created DataConn",
5274 "SyncDataSrc 4 created DataConn",
5275 "AsyncDataConn 1 pre committed",
5276 "SyncDataConn 2 pre committed",
5277 "AsyncDataConn 3 pre committed",
5278 "SyncDataConn 4 pre committed",
5279 "AsyncDataConn 1 committed",
5280 "SyncDataConn 2 committed",
5281 "AsyncDataConn 3 committed",
5282 "SyncDataConn 4 failed to commit",
5283 "SyncDataConn 4 closed",
5284 "SyncDataConn 4 dropped",
5285 "AsyncDataConn 3 closed",
5286 "AsyncDataConn 3 dropped",
5287 "SyncDataConn 2 closed",
5288 "SyncDataConn 2 dropped",
5289 "AsyncDataConn 1 closed",
5290 "AsyncDataConn 1 dropped",
5291 "SyncDataSrc 4 closed",
5292 "SyncDataSrc 4 dropped",
5293 "AsyncDataSrc 3 closed",
5294 "AsyncDataSrc 3 dropped",
5295 "SyncDataSrc 2 closed",
5296 "SyncDataSrc 2 dropped",
5297 "AsyncDataSrc 1 closed",
5298 "AsyncDataSrc 1 dropped",
5299 ],
5300 );
5301 }
5302
5303 #[tokio::test]
5304 async fn async_test_commit_but_fail_local_async() {
5305 let _unused = TEST_SEQ.lock().unwrap();
5306 clear_global_data_srcs_fixed();
5307
5308 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
5309
5310 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
5311 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
5312
5313 if let Ok(_auto_shutdown) = setup_async().await {
5314 let mut hub = DataHub::new();
5315 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Commit));
5316 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
5317
5318 if let Ok(_) = hub.begin_async().await {
5319 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
5320 assert_eq!(
5321 any::type_name_of_val(conn1),
5322 "sabi::data_hub::tests_data_hub::AsyncDataConn"
5323 );
5324 } else {
5325 panic!();
5326 }
5327 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
5328 assert_eq!(
5329 any::type_name_of_val(conn2),
5330 "sabi::data_hub::tests_data_hub::SyncDataConn"
5331 );
5332 } else {
5333 panic!();
5334 }
5335 if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
5336 assert_eq!(
5337 any::type_name_of_val(conn3),
5338 "sabi::data_hub::tests_data_hub::AsyncDataConn"
5339 );
5340 } else {
5341 panic!();
5342 }
5343 if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
5344 assert_eq!(
5345 any::type_name_of_val(conn4),
5346 "sabi::data_hub::tests_data_hub::SyncDataConn"
5347 );
5348 } else {
5349 panic!();
5350 }
5351
5352 match hub.commit_async().await {
5353 Ok(_) => panic!(),
5354 Err(err) => match err.reason::<DataHubError>() {
5355 Ok(r) => match r {
5356 DataHubError::FailToCommitDataConn { errors } => {
5357 assert_eq!(errors.len(), 1);
5358 if let Some(e) = errors.get("baz") {
5359 if let Ok(s) = e.reason::<String>() {
5360 assert_eq!(s, "VVV");
5361 } else {
5362 panic!();
5363 }
5364 } else {
5365 panic!();
5366 }
5367 }
5368 _ => panic!(),
5369 },
5370 Err(_) => panic!(),
5371 },
5372 }
5373
5374 hub.end();
5375 } else {
5376 panic!();
5377 }
5378 } else {
5379 panic!();
5380 }
5381
5382 assert_eq!(
5383 *logger.lock().unwrap(),
5384 vec![
5385 "SyncDataSrc 2 setupped",
5386 "AsyncDataSrc 1 setupped",
5387 "SyncDataSrc 4 setupped",
5388 "AsyncDataSrc 3 setupped",
5389 "AsyncDataSrc 1 created DataConn",
5390 "SyncDataSrc 2 created DataConn",
5391 "AsyncDataSrc 3 created DataConn",
5392 "SyncDataSrc 4 created DataConn",
5393 "AsyncDataConn 1 pre committed",
5394 "SyncDataConn 2 pre committed",
5395 "AsyncDataConn 3 pre committed",
5396 "SyncDataConn 4 pre committed",
5397 "AsyncDataConn 1 committed",
5398 "SyncDataConn 2 committed",
5399 "AsyncDataConn 3 failed to commit",
5400 "SyncDataConn 4 closed",
5401 "SyncDataConn 4 dropped",
5402 "AsyncDataConn 3 closed",
5403 "AsyncDataConn 3 dropped",
5404 "SyncDataConn 2 closed",
5405 "SyncDataConn 2 dropped",
5406 "AsyncDataConn 1 closed",
5407 "AsyncDataConn 1 dropped",
5408 "SyncDataSrc 4 closed",
5409 "SyncDataSrc 4 dropped",
5410 "AsyncDataSrc 3 closed",
5411 "AsyncDataSrc 3 dropped",
5412 "SyncDataSrc 2 closed",
5413 "SyncDataSrc 2 dropped",
5414 "AsyncDataSrc 1 closed",
5415 "AsyncDataSrc 1 dropped",
5416 ],
5417 );
5418 }
5419
5420 #[tokio::test]
5421 async fn async_test_pre_commit_but_fail_global_sync() {
5422 let _unused = TEST_SEQ.lock().unwrap();
5423 clear_global_data_srcs_fixed();
5424
5425 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
5426
5427 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
5428 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::PreCommit));
5429
5430 if let Ok(_auto_shutdown) = setup_async().await {
5431 let mut hub = DataHub::new();
5432 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
5433 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
5434
5435 if let Ok(_) = hub.begin_async().await {
5436 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
5437 assert_eq!(
5438 any::type_name_of_val(conn1),
5439 "sabi::data_hub::tests_data_hub::AsyncDataConn"
5440 );
5441 } else {
5442 panic!();
5443 }
5444 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
5445 assert_eq!(
5446 any::type_name_of_val(conn2),
5447 "sabi::data_hub::tests_data_hub::SyncDataConn"
5448 );
5449 } else {
5450 panic!();
5451 }
5452 if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
5453 assert_eq!(
5454 any::type_name_of_val(conn3),
5455 "sabi::data_hub::tests_data_hub::AsyncDataConn"
5456 );
5457 } else {
5458 panic!();
5459 }
5460 if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
5461 assert_eq!(
5462 any::type_name_of_val(conn4),
5463 "sabi::data_hub::tests_data_hub::SyncDataConn"
5464 );
5465 } else {
5466 panic!();
5467 }
5468
5469 match hub.commit_async().await {
5470 Ok(_) => panic!(),
5471 Err(err) => match err.reason::<DataHubError>() {
5472 Ok(r) => match r {
5473 DataHubError::FailToPreCommitDataConn { errors } => {
5474 assert_eq!(errors.len(), 1);
5475 if let Some(e) = errors.get("bar") {
5476 if let Ok(s) = e.reason::<String>() {
5477 assert_eq!(s, "zzz");
5478 } else {
5479 panic!();
5480 }
5481 } else {
5482 panic!();
5483 }
5484 }
5485 _ => panic!(),
5486 },
5487 Err(_) => panic!(),
5488 },
5489 }
5490
5491 hub.end();
5492 } else {
5493 panic!();
5494 }
5495 } else {
5496 panic!();
5497 }
5498
5499 assert_eq!(
5500 *logger.lock().unwrap(),
5501 vec![
5502 "SyncDataSrc 2 setupped",
5503 "AsyncDataSrc 1 setupped",
5504 "SyncDataSrc 4 setupped",
5505 "AsyncDataSrc 3 setupped",
5506 "AsyncDataSrc 1 created DataConn",
5507 "SyncDataSrc 2 created DataConn",
5508 "AsyncDataSrc 3 created DataConn",
5509 "SyncDataSrc 4 created DataConn",
5510 "AsyncDataConn 1 pre committed",
5511 "SyncDataConn 2 failed to pre commit",
5512 "SyncDataConn 4 closed",
5513 "SyncDataConn 4 dropped",
5514 "AsyncDataConn 3 closed",
5515 "AsyncDataConn 3 dropped",
5516 "SyncDataConn 2 closed",
5517 "SyncDataConn 2 dropped",
5518 "AsyncDataConn 1 closed",
5519 "AsyncDataConn 1 dropped",
5520 "SyncDataSrc 4 closed",
5521 "SyncDataSrc 4 dropped",
5522 "AsyncDataSrc 3 closed",
5523 "AsyncDataSrc 3 dropped",
5524 "SyncDataSrc 2 closed",
5525 "SyncDataSrc 2 dropped",
5526 "AsyncDataSrc 1 closed",
5527 "AsyncDataSrc 1 dropped",
5528 ],
5529 );
5530 }
5531
5532 #[tokio::test]
5533 async fn async_test_pre_commit_but_fail_global_async() {
5534 let _unused = TEST_SEQ.lock().unwrap();
5535 clear_global_data_srcs_fixed();
5536
5537 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
5538
5539 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::PreCommit));
5540 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
5541
5542 if let Ok(_auto_shutdown) = setup_async().await {
5543 let mut hub = DataHub::new();
5544 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
5545 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
5546
5547 if let Ok(_) = hub.begin_async().await {
5548 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
5549 assert_eq!(
5550 any::type_name_of_val(conn1),
5551 "sabi::data_hub::tests_data_hub::AsyncDataConn"
5552 );
5553 } else {
5554 panic!();
5555 }
5556 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
5557 assert_eq!(
5558 any::type_name_of_val(conn2),
5559 "sabi::data_hub::tests_data_hub::SyncDataConn"
5560 );
5561 } else {
5562 panic!();
5563 }
5564 if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
5565 assert_eq!(
5566 any::type_name_of_val(conn3),
5567 "sabi::data_hub::tests_data_hub::AsyncDataConn"
5568 );
5569 } else {
5570 panic!();
5571 }
5572 if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
5573 assert_eq!(
5574 any::type_name_of_val(conn4),
5575 "sabi::data_hub::tests_data_hub::SyncDataConn"
5576 );
5577 } else {
5578 panic!();
5579 }
5580
5581 match hub.commit_async().await {
5582 Ok(_) => panic!(),
5583 Err(err) => match err.reason::<DataHubError>() {
5584 Ok(r) => match r {
5585 DataHubError::FailToPreCommitDataConn { errors } => {
5586 assert_eq!(errors.len(), 1);
5587 if let Some(e) = errors.get("foo") {
5588 if let Ok(s) = e.reason::<String>() {
5589 assert_eq!(s, "vvv");
5590 } else {
5591 panic!();
5592 }
5593 } else {
5594 panic!();
5595 }
5596 }
5597 _ => panic!(),
5598 },
5599 Err(_) => panic!(),
5600 },
5601 }
5602
5603 hub.end();
5604 } else {
5605 panic!();
5606 }
5607 } else {
5608 panic!();
5609 }
5610
5611 assert_eq!(
5612 *logger.lock().unwrap(),
5613 vec![
5614 "SyncDataSrc 2 setupped",
5615 "AsyncDataSrc 1 setupped",
5616 "SyncDataSrc 4 setupped",
5617 "AsyncDataSrc 3 setupped",
5618 "AsyncDataSrc 1 created DataConn",
5619 "SyncDataSrc 2 created DataConn",
5620 "AsyncDataSrc 3 created DataConn",
5621 "SyncDataSrc 4 created DataConn",
5622 "AsyncDataConn 1 failed to pre commit",
5623 "SyncDataConn 4 closed",
5624 "SyncDataConn 4 dropped",
5625 "AsyncDataConn 3 closed",
5626 "AsyncDataConn 3 dropped",
5627 "SyncDataConn 2 closed",
5628 "SyncDataConn 2 dropped",
5629 "AsyncDataConn 1 closed",
5630 "AsyncDataConn 1 dropped",
5631 "SyncDataSrc 4 closed",
5632 "SyncDataSrc 4 dropped",
5633 "AsyncDataSrc 3 closed",
5634 "AsyncDataSrc 3 dropped",
5635 "SyncDataSrc 2 closed",
5636 "SyncDataSrc 2 dropped",
5637 "AsyncDataSrc 1 closed",
5638 "AsyncDataSrc 1 dropped",
5639 ],
5640 );
5641 }
5642
5643 #[tokio::test]
5644 async fn async_test_pre_commit_but_fail_local_sync() {
5645 let _unused = TEST_SEQ.lock().unwrap();
5646 clear_global_data_srcs_fixed();
5647
5648 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
5649
5650 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
5651 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
5652
5653 if let Ok(_auto_shutdown) = setup_async().await {
5654 let mut hub = DataHub::new();
5655 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
5656 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::PreCommit));
5657
5658 if let Ok(_) = hub.begin_async().await {
5659 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
5660 assert_eq!(
5661 any::type_name_of_val(conn1),
5662 "sabi::data_hub::tests_data_hub::AsyncDataConn"
5663 );
5664 } else {
5665 panic!();
5666 }
5667 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
5668 assert_eq!(
5669 any::type_name_of_val(conn2),
5670 "sabi::data_hub::tests_data_hub::SyncDataConn"
5671 );
5672 } else {
5673 panic!();
5674 }
5675 if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
5676 assert_eq!(
5677 any::type_name_of_val(conn3),
5678 "sabi::data_hub::tests_data_hub::AsyncDataConn"
5679 );
5680 } else {
5681 panic!();
5682 }
5683 if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
5684 assert_eq!(
5685 any::type_name_of_val(conn4),
5686 "sabi::data_hub::tests_data_hub::SyncDataConn"
5687 );
5688 } else {
5689 panic!();
5690 }
5691
5692 match hub.commit_async().await {
5693 Ok(_) => panic!(),
5694 Err(err) => match err.reason::<DataHubError>() {
5695 Ok(r) => match r {
5696 DataHubError::FailToPreCommitDataConn { errors } => {
5697 assert_eq!(errors.len(), 1);
5698 if let Some(e) = errors.get("qux") {
5699 if let Ok(s) = e.reason::<String>() {
5700 assert_eq!(s, "zzz");
5701 } else {
5702 panic!();
5703 }
5704 } else {
5705 panic!();
5706 }
5707 }
5708 _ => panic!(),
5709 },
5710 Err(_) => panic!(),
5711 },
5712 }
5713
5714 hub.end();
5715 } else {
5716 panic!();
5717 }
5718 } else {
5719 panic!();
5720 }
5721
5722 assert_eq!(
5723 *logger.lock().unwrap(),
5724 vec![
5725 "SyncDataSrc 2 setupped",
5726 "AsyncDataSrc 1 setupped",
5727 "SyncDataSrc 4 setupped",
5728 "AsyncDataSrc 3 setupped",
5729 "AsyncDataSrc 1 created DataConn",
5730 "SyncDataSrc 2 created DataConn",
5731 "AsyncDataSrc 3 created DataConn",
5732 "SyncDataSrc 4 created DataConn",
5733 "AsyncDataConn 1 pre committed",
5734 "SyncDataConn 2 pre committed",
5735 "AsyncDataConn 3 pre committed",
5736 "SyncDataConn 4 failed to pre commit",
5737 "SyncDataConn 4 closed",
5738 "SyncDataConn 4 dropped",
5739 "AsyncDataConn 3 closed",
5740 "AsyncDataConn 3 dropped",
5741 "SyncDataConn 2 closed",
5742 "SyncDataConn 2 dropped",
5743 "AsyncDataConn 1 closed",
5744 "AsyncDataConn 1 dropped",
5745 "SyncDataSrc 4 closed",
5746 "SyncDataSrc 4 dropped",
5747 "AsyncDataSrc 3 closed",
5748 "AsyncDataSrc 3 dropped",
5749 "SyncDataSrc 2 closed",
5750 "SyncDataSrc 2 dropped",
5751 "AsyncDataSrc 1 closed",
5752 "AsyncDataSrc 1 dropped",
5753 ],
5754 );
5755 }
5756
5757 #[tokio::test]
5758 async fn async_test_pre_commit_but_fail_local_async() {
5759 let _unused = TEST_SEQ.lock().unwrap();
5760 clear_global_data_srcs_fixed();
5761
5762 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
5763
5764 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
5765 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
5766
5767 if let Ok(_auto_shutdown) = setup_async().await {
5768 let mut hub = DataHub::new();
5769 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::PreCommit));
5770 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
5771
5772 if let Ok(_) = hub.begin_async().await {
5773 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
5774 assert_eq!(
5775 any::type_name_of_val(conn1),
5776 "sabi::data_hub::tests_data_hub::AsyncDataConn"
5777 );
5778 } else {
5779 panic!();
5780 }
5781 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
5782 assert_eq!(
5783 any::type_name_of_val(conn2),
5784 "sabi::data_hub::tests_data_hub::SyncDataConn"
5785 );
5786 } else {
5787 panic!();
5788 }
5789 if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
5790 assert_eq!(
5791 any::type_name_of_val(conn3),
5792 "sabi::data_hub::tests_data_hub::AsyncDataConn"
5793 );
5794 } else {
5795 panic!();
5796 }
5797 if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
5798 assert_eq!(
5799 any::type_name_of_val(conn4),
5800 "sabi::data_hub::tests_data_hub::SyncDataConn"
5801 );
5802 } else {
5803 panic!();
5804 }
5805
5806 match hub.commit_async().await {
5807 Ok(_) => panic!(),
5808 Err(err) => match err.reason::<DataHubError>() {
5809 Ok(r) => match r {
5810 DataHubError::FailToPreCommitDataConn { errors } => {
5811 assert_eq!(errors.len(), 1);
5812 if let Some(e) = errors.get("baz") {
5813 if let Ok(s) = e.reason::<String>() {
5814 assert_eq!(s, "vvv");
5815 } else {
5816 panic!();
5817 }
5818 } else {
5819 panic!();
5820 }
5821 }
5822 _ => panic!(),
5823 },
5824 Err(_) => panic!(),
5825 },
5826 }
5827
5828 hub.end();
5829 } else {
5830 panic!();
5831 }
5832 } else {
5833 panic!();
5834 }
5835
5836 assert_eq!(
5837 *logger.lock().unwrap(),
5838 vec![
5839 "SyncDataSrc 2 setupped",
5840 "AsyncDataSrc 1 setupped",
5841 "SyncDataSrc 4 setupped",
5842 "AsyncDataSrc 3 setupped",
5843 "AsyncDataSrc 1 created DataConn",
5844 "SyncDataSrc 2 created DataConn",
5845 "AsyncDataSrc 3 created DataConn",
5846 "SyncDataSrc 4 created DataConn",
5847 "AsyncDataConn 1 pre committed",
5848 "SyncDataConn 2 pre committed",
5849 "AsyncDataConn 3 failed to pre commit",
5850 "SyncDataConn 4 closed",
5851 "SyncDataConn 4 dropped",
5852 "AsyncDataConn 3 closed",
5853 "AsyncDataConn 3 dropped",
5854 "SyncDataConn 2 closed",
5855 "SyncDataConn 2 dropped",
5856 "AsyncDataConn 1 closed",
5857 "AsyncDataConn 1 dropped",
5858 "SyncDataSrc 4 closed",
5859 "SyncDataSrc 4 dropped",
5860 "AsyncDataSrc 3 closed",
5861 "AsyncDataSrc 3 dropped",
5862 "SyncDataSrc 2 closed",
5863 "SyncDataSrc 2 dropped",
5864 "AsyncDataSrc 1 closed",
5865 "AsyncDataSrc 1 dropped",
5866 ],
5867 );
5868 }
5869
5870 #[tokio::test]
5871 async fn async_test_rollback() {
5872 let _unused = TEST_SEQ.lock().unwrap();
5873 clear_global_data_srcs_fixed();
5874
5875 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
5876
5877 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
5878 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
5879
5880 if let Ok(_auto_shutdown) = setup_async().await {
5881 let mut hub = DataHub::new();
5882 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
5883 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
5884
5885 if let Ok(_) = hub.begin_async().await {
5886 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
5887 assert_eq!(
5888 any::type_name_of_val(conn1),
5889 "sabi::data_hub::tests_data_hub::AsyncDataConn"
5890 );
5891 } else {
5892 panic!();
5893 }
5894 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
5895 assert_eq!(
5896 any::type_name_of_val(conn2),
5897 "sabi::data_hub::tests_data_hub::SyncDataConn"
5898 );
5899 } else {
5900 panic!();
5901 }
5902 if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
5903 assert_eq!(
5904 any::type_name_of_val(conn3),
5905 "sabi::data_hub::tests_data_hub::AsyncDataConn"
5906 );
5907 } else {
5908 panic!();
5909 }
5910 if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
5911 assert_eq!(
5912 any::type_name_of_val(conn4),
5913 "sabi::data_hub::tests_data_hub::SyncDataConn"
5914 );
5915 } else {
5916 panic!();
5917 }
5918
5919 hub.rollback_async().await;
5920 hub.end();
5921 } else {
5922 panic!();
5923 }
5924 } else {
5925 panic!();
5926 }
5927
5928 assert_eq!(
5929 *logger.lock().unwrap(),
5930 vec![
5931 "SyncDataSrc 2 setupped",
5932 "AsyncDataSrc 1 setupped",
5933 "SyncDataSrc 4 setupped",
5934 "AsyncDataSrc 3 setupped",
5935 "AsyncDataSrc 1 created DataConn",
5936 "SyncDataSrc 2 created DataConn",
5937 "AsyncDataSrc 3 created DataConn",
5938 "SyncDataSrc 4 created DataConn",
5939 "AsyncDataConn 1 rollbacked",
5940 "SyncDataConn 2 rollbacked",
5941 "AsyncDataConn 3 rollbacked",
5942 "SyncDataConn 4 rollbacked",
5943 "SyncDataConn 4 closed",
5944 "SyncDataConn 4 dropped",
5945 "AsyncDataConn 3 closed",
5946 "AsyncDataConn 3 dropped",
5947 "SyncDataConn 2 closed",
5948 "SyncDataConn 2 dropped",
5949 "AsyncDataConn 1 closed",
5950 "AsyncDataConn 1 dropped",
5951 "SyncDataSrc 4 closed",
5952 "SyncDataSrc 4 dropped",
5953 "AsyncDataSrc 3 closed",
5954 "AsyncDataSrc 3 dropped",
5955 "SyncDataSrc 2 closed",
5956 "SyncDataSrc 2 dropped",
5957 "AsyncDataSrc 1 closed",
5958 "AsyncDataSrc 1 dropped",
5959 ],
5960 );
5961 }
5962
5963 #[tokio::test]
5964 async fn async_test_force_back() {
5965 let _unused = TEST_SEQ.lock().unwrap();
5966 clear_global_data_srcs_fixed();
5967
5968 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
5969
5970 uses("foo", AsyncDataSrc::new(1, logger.clone(), Fail::Not));
5971 uses("bar", SyncDataSrc::new(2, logger.clone(), Fail::Not));
5972
5973 if let Ok(_auto_shutdown) = setup_async().await {
5974 let mut hub = DataHub::new();
5975 hub.uses("baz", AsyncDataSrc::new(3, logger.clone(), Fail::Not));
5976 hub.uses("qux", SyncDataSrc::new(4, logger.clone(), Fail::Not));
5977
5978 if let Ok(_) = hub.begin_async().await {
5979 if let Ok(conn1) = hub.get_data_conn::<AsyncDataConn>("foo") {
5980 assert_eq!(
5981 any::type_name_of_val(conn1),
5982 "sabi::data_hub::tests_data_hub::AsyncDataConn"
5983 );
5984 } else {
5985 panic!();
5986 }
5987 if let Ok(conn2) = hub.get_data_conn::<SyncDataConn>("bar") {
5988 assert_eq!(
5989 any::type_name_of_val(conn2),
5990 "sabi::data_hub::tests_data_hub::SyncDataConn"
5991 );
5992 } else {
5993 panic!();
5994 }
5995 if let Ok(conn3) = hub.get_data_conn::<AsyncDataConn>("baz") {
5996 assert_eq!(
5997 any::type_name_of_val(conn3),
5998 "sabi::data_hub::tests_data_hub::AsyncDataConn"
5999 );
6000 } else {
6001 panic!();
6002 }
6003 if let Ok(conn4) = hub.get_data_conn::<SyncDataConn>("qux") {
6004 assert_eq!(
6005 any::type_name_of_val(conn4),
6006 "sabi::data_hub::tests_data_hub::SyncDataConn"
6007 );
6008 } else {
6009 panic!();
6010 }
6011
6012 assert!(hub.commit_async().await.is_ok());
6013 hub.rollback_async().await;
6014 hub.end();
6015 } else {
6016 panic!();
6017 }
6018 } else {
6019 panic!();
6020 }
6021
6022 assert_eq!(
6023 *logger.lock().unwrap(),
6024 vec![
6025 "SyncDataSrc 2 setupped",
6026 "AsyncDataSrc 1 setupped",
6027 "SyncDataSrc 4 setupped",
6028 "AsyncDataSrc 3 setupped",
6029 "AsyncDataSrc 1 created DataConn",
6030 "SyncDataSrc 2 created DataConn",
6031 "AsyncDataSrc 3 created DataConn",
6032 "SyncDataSrc 4 created DataConn",
6033 "AsyncDataConn 1 pre committed",
6034 "SyncDataConn 2 pre committed",
6035 "AsyncDataConn 3 pre committed",
6036 "SyncDataConn 4 pre committed",
6037 "AsyncDataConn 1 committed",
6038 "SyncDataConn 2 committed",
6039 "AsyncDataConn 3 committed",
6040 "SyncDataConn 4 committed",
6041 "AsyncDataConn 1 post committed",
6042 "SyncDataConn 2 post committed",
6043 "AsyncDataConn 3 post committed",
6044 "SyncDataConn 4 post committed",
6045 "AsyncDataConn 1 forced back",
6046 "SyncDataConn 2 forced back",
6047 "AsyncDataConn 3 forced back",
6048 "SyncDataConn 4 forced back",
6049 "SyncDataConn 4 closed",
6050 "SyncDataConn 4 dropped",
6051 "AsyncDataConn 3 closed",
6052 "AsyncDataConn 3 dropped",
6053 "SyncDataConn 2 closed",
6054 "SyncDataConn 2 dropped",
6055 "AsyncDataConn 1 closed",
6056 "AsyncDataConn 1 dropped",
6057 "SyncDataSrc 4 closed",
6058 "SyncDataSrc 4 dropped",
6059 "AsyncDataSrc 3 closed",
6060 "AsyncDataSrc 3 dropped",
6061 "SyncDataSrc 2 closed",
6062 "SyncDataSrc 2 dropped",
6063 "AsyncDataSrc 1 closed",
6064 "AsyncDataSrc 1 dropped",
6065 ],
6066 );
6067 }
6068 }
6069}