1use crate::data_src::{copy_global_data_srcs_to_map, create_data_conn_from_global_data_src};
6use crate::{DataConn, DataConnManager, DataHub, DataSrc, DataSrcManager};
7
8#[allow(unused)] use crate::DataAcc;
10
11use crate::DataConnContainer;
12
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::{any, ptr};
16
17#[derive(Debug)]
19pub enum DataHubError {
20 FailToSetupLocalDataSrcs {
23 errors: Vec<(Arc<str>, errs::Err)>,
25 },
26
27 NoDataSrcToCreateDataConn {
30 name: Arc<str>,
32 data_conn_type: &'static str,
34 },
35}
36
37impl DataHub {
38 #[allow(clippy::new_without_default)]
43 pub fn new() -> Self {
44 let mut data_src_map = HashMap::new();
45 copy_global_data_srcs_to_map(&mut data_src_map);
46
47 Self {
48 local_data_src_manager: DataSrcManager::new(true),
49 data_src_map,
50 data_conn_manager: DataConnManager::new(),
51 fixed: false,
52 }
53 }
54
55 pub fn with_commit_order(names: &[&str]) -> Self {
69 let mut data_src_map = HashMap::new();
70 copy_global_data_srcs_to_map(&mut data_src_map);
71
72 Self {
73 local_data_src_manager: DataSrcManager::new(true),
74 data_src_map,
75 data_conn_manager: DataConnManager::with_commit_order(names),
76 fixed: false,
77 }
78 }
79
80 #[allow(rustdoc::broken_intra_doc_links)]
81 pub fn uses<S, C>(&mut self, name: impl Into<Arc<str>>, ds: S)
95 where
96 S: DataSrc<C>,
97 C: DataConn + 'static,
98 {
99 if self.fixed {
100 return;
101 }
102 self.local_data_src_manager.add(name, ds);
103 }
104
105 pub fn disuses(&mut self, name: impl AsRef<str>) {
114 if self.fixed {
115 return;
116 }
117 self.data_src_map.remove(name.as_ref());
118 self.local_data_src_manager.remove(name);
119 }
120
121 #[inline]
122 fn begin(&mut self) -> errs::Result<()> {
123 self.fixed = true;
124
125 let mut errors = Vec::new();
126
127 self.local_data_src_manager.setup(&mut errors);
128 if errors.is_empty() {
129 self.local_data_src_manager
130 .copy_ds_ready_to_map(&mut self.data_src_map);
131 Ok(())
132 } else {
133 Err(errs::Err::new(DataHubError::FailToSetupLocalDataSrcs {
134 errors,
135 }))
136 }
137 }
138
139 #[inline]
140 fn commit(&mut self) -> errs::Result<()> {
141 self.data_conn_manager.commit()
142 }
143
144 #[inline]
145 fn rollback(&mut self) {
146 self.data_conn_manager.rollback()
147 }
148
149 #[inline]
150 fn end(&mut self) {
151 self.data_conn_manager.close();
152 self.fixed = false;
153 }
154
155 pub fn run<F>(&mut self, mut logic_fn: F) -> errs::Result<()>
171 where
172 F: FnMut(&mut DataHub) -> errs::Result<()>,
173 {
174 let mut r = self.begin();
175 if r.is_ok() {
176 r = logic_fn(self);
177 }
178 self.end();
179 r
180 }
181
182 pub fn txn<F>(&mut self, mut logic_fn: F) -> errs::Result<()>
201 where
202 F: FnMut(&mut DataHub) -> errs::Result<()>,
203 {
204 let mut r = self.begin();
205 if r.is_ok() {
206 r = logic_fn(self);
207 }
208 if r.is_ok() {
209 r = self.commit();
210 }
211 if r.is_err() {
212 self.rollback();
213 }
214 self.end();
215 r
216 }
217
218 pub fn get_data_conn<C>(&mut self, name: impl AsRef<str>) -> errs::Result<&mut C>
239 where
240 C: DataConn + 'static,
241 {
242 if let Some(nnptr) = self.data_conn_manager.find_by_name(name.as_ref()) {
243 let typed_nnptr = DataConnManager::to_typed_ptr::<C>(&nnptr)?;
244 return Ok(unsafe { &mut (*typed_nnptr).data_conn });
245 }
246
247 if let Some((local, index)) = self.data_src_map.get(name.as_ref()) {
248 let boxed = if *local {
249 self.local_data_src_manager
250 .create_data_conn::<C>(*index, name.as_ref())?
251 } else {
252 create_data_conn_from_global_data_src::<C>(*index, name.as_ref())?
253 };
254
255 let ptr = Box::into_raw(boxed);
256 if let Some(nnptr) = ptr::NonNull::new(ptr) {
257 self.data_conn_manager
258 .add(nnptr.cast::<DataConnContainer>());
259
260 let typed_ptr = ptr.cast::<DataConnContainer<C>>();
261 return Ok(unsafe { &mut (*typed_ptr).data_conn });
262 } else {
263 }
265 }
266
267 Err(errs::Err::new(DataHubError::NoDataSrcToCreateDataConn {
268 name: name.as_ref().into(),
269 data_conn_type: any::type_name::<C>(),
270 }))
271 }
272}
273
274#[cfg(test)]
275mod tests_of_data_hub {
276 use super::*;
277 use crate::AsyncGroup;
278 use std::sync::Mutex;
279
280 #[derive(Clone, Copy, PartialEq)]
281 enum Failure {
282 None,
283 FailToPreCommit,
284 FailToCommit,
285 FailToSetup,
286 FailToCreateDataConn,
287 }
288
289 struct MyDataConn {
290 id: i8,
291 failure: Failure,
292 committed: bool,
293 logger: Arc<Mutex<Vec<String>>>,
294 }
295 impl MyDataConn {
296 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, failure: Failure) -> Self {
297 logger
298 .lock()
299 .unwrap()
300 .push(format!("MyDataConn::new {}", id));
301 Self {
302 id,
303 failure,
304 committed: false,
305 logger,
306 }
307 }
308 }
309 impl Drop for MyDataConn {
310 fn drop(&mut self) {
311 self.logger
312 .lock()
313 .unwrap()
314 .push(format!("MyDataConn::drop {}", self.id));
315 }
316 }
317 impl DataConn for MyDataConn {
318 fn pre_commit(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
319 if self.failure == Failure::FailToPreCommit {
320 self.logger
321 .lock()
322 .unwrap()
323 .push(format!("MyDataConn::pre_commit {} failed", self.id));
324 Err(errs::Err::new("pre commit error"))
325 } else {
326 self.logger
327 .lock()
328 .unwrap()
329 .push(format!("MyDataConn::pre_commit {}", self.id));
330 Ok(())
331 }
332 }
333 fn commit(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
334 if self.failure == Failure::FailToCommit {
335 self.logger
336 .lock()
337 .unwrap()
338 .push(format!("MyDataConn::commit {} failed", self.id));
339 Err(errs::Err::new("commit error"))
340 } else {
341 self.logger
342 .lock()
343 .unwrap()
344 .push(format!("MyDataConn::commit {}", self.id));
345 self.committed = true;
346 Ok(())
347 }
348 }
349 fn post_commit(&mut self, _ag: &mut AsyncGroup) {
350 self.logger
351 .lock()
352 .unwrap()
353 .push(format!("MyDataConn::post_commit {}", self.id));
354 }
355 fn rollback(&mut self, _ag: &mut AsyncGroup) {
356 self.logger
357 .lock()
358 .unwrap()
359 .push(format!("MyDataConn::rollback {}", self.id));
360 }
361 fn force_back(&mut self, _ag: &mut AsyncGroup) {
362 self.logger
363 .lock()
364 .unwrap()
365 .push(format!("MyDataConn::force_back {}", self.id));
366 }
367 fn close(&mut self) {
368 self.logger
369 .lock()
370 .unwrap()
371 .push(format!("MyDataConn::close {}", self.id));
372 }
373 }
374
375 struct MyDataSrc {
376 id: i8,
377 failure: Failure,
378 logger: Arc<Mutex<Vec<String>>>,
379 }
380 impl MyDataSrc {
381 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, failure: Failure) -> Self {
382 logger
383 .lock()
384 .unwrap()
385 .push(format!("MyDataSrc::new {}", id));
386 Self {
387 id,
388 failure,
389 logger,
390 }
391 }
392 }
393 impl Drop for MyDataSrc {
394 fn drop(&mut self) {
395 self.logger
396 .lock()
397 .unwrap()
398 .push(format!("MyDataSrc::drop {}", self.id));
399 }
400 }
401 impl DataSrc<MyDataConn> for MyDataSrc {
402 fn setup(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
403 if self.failure == Failure::FailToSetup {
404 self.logger
405 .lock()
406 .unwrap()
407 .push(format!("MyDataSrc::setup {} failed", self.id));
408 Err(errs::Err::new("setup error".to_string()))
409 } else {
410 self.logger
411 .lock()
412 .unwrap()
413 .push(format!("MyDataSrc::setup {}", self.id));
414 Ok(())
415 }
416 }
417 fn close(&mut self) {
418 self.logger
419 .lock()
420 .unwrap()
421 .push(format!("MyDataSrc::close {}", self.id));
422 }
423 fn create_data_conn(&mut self) -> errs::Result<Box<MyDataConn>> {
424 if self.failure == Failure::FailToCreateDataConn {
425 self.logger
426 .lock()
427 .unwrap()
428 .push(format!("MyDataSrc::create_data_conn {} failed", self.id));
429 return Err(errs::Err::new("eeee".to_string()));
430 }
431 {
432 self.logger
433 .lock()
434 .unwrap()
435 .push(format!("MyDataSrc::create_data_conn {}", self.id));
436 }
437 let conn = MyDataConn::new(self.id, self.logger.clone(), self.failure);
438 Ok(Box::new(conn))
439 }
440 }
441
442 #[test]
443 fn test_new() {
444 let hub = DataHub::new();
445 assert!(hub.local_data_src_manager.vec_unready.is_empty());
446 assert!(hub.local_data_src_manager.vec_ready.is_empty());
447 assert!(hub.local_data_src_manager.local);
448 assert!(hub.data_src_map.is_empty());
449 assert!(hub.data_conn_manager.vec.is_empty());
450 assert!(hub.data_conn_manager.index_map.is_empty());
451 assert!(!hub.fixed);
452 }
453
454 #[test]
455 fn test_with_commit_order() {
456 let hub = DataHub::with_commit_order(&["bar", "qux", "foo"]);
457 assert!(hub.local_data_src_manager.vec_unready.is_empty());
458 assert!(hub.local_data_src_manager.vec_ready.is_empty());
459 assert!(hub.local_data_src_manager.local);
460 assert!(hub.data_src_map.is_empty());
461 assert_eq!(hub.data_conn_manager.vec.len(), 3);
462 assert_eq!(hub.data_conn_manager.index_map.len(), 3);
463 assert!(!hub.fixed);
464 }
465
466 #[test]
467 fn test_uses_and_ok() {
468 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
469
470 let mut hub = DataHub::new();
471 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
472 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
473
474 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
475 assert!(hub.local_data_src_manager.vec_ready.is_empty());
476 assert!(hub.local_data_src_manager.local);
477 assert!(hub.data_src_map.is_empty());
478 assert_eq!(hub.data_conn_manager.vec.len(), 0);
479 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
480 assert!(!hub.fixed);
481
482 assert!(hub.begin().is_ok());
483
484 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
485 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
486 assert!(hub.local_data_src_manager.local);
487 assert_eq!(hub.data_src_map.len(), 2);
488 assert_eq!(hub.data_conn_manager.vec.len(), 0);
489 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
490 assert!(hub.fixed);
491 }
492
493 #[test]
494 fn test_uses_but_already_fixed() {
495 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
496
497 let mut hub = DataHub::new();
498 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
499
500 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 1);
501 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 0);
502 assert!(hub.local_data_src_manager.local);
503 assert_eq!(hub.data_src_map.len(), 0);
504 assert_eq!(hub.data_conn_manager.vec.len(), 0);
505 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
506 assert!(!hub.fixed);
507
508 assert!(hub.begin().is_ok());
509
510 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
511 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 1);
512 assert!(hub.local_data_src_manager.local);
513 assert_eq!(hub.data_src_map.len(), 1);
514 assert_eq!(hub.data_conn_manager.vec.len(), 0);
515 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
516 assert!(hub.fixed);
517
518 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
519
520 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
521 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 1);
522 assert!(hub.local_data_src_manager.local);
523 assert_eq!(hub.data_src_map.len(), 1);
524 assert_eq!(hub.data_conn_manager.vec.len(), 0);
525 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
526 assert!(hub.fixed);
527 }
528
529 #[test]
530 fn test_disuses_and_ok() {
531 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
532
533 let mut hub = DataHub::new();
534 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
535 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
536
537 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
538 assert!(hub.local_data_src_manager.vec_ready.is_empty());
539 assert!(hub.local_data_src_manager.local);
540 assert!(hub.data_src_map.is_empty());
541 assert_eq!(hub.data_conn_manager.vec.len(), 0);
542 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
543 assert!(!hub.fixed);
544
545 hub.disuses("foo");
546
547 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 1);
548 assert!(hub.local_data_src_manager.vec_ready.is_empty());
549 assert!(hub.local_data_src_manager.local);
550 assert!(hub.data_src_map.is_empty());
551 assert_eq!(hub.data_conn_manager.vec.len(), 0);
552 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
553 assert!(!hub.fixed);
554
555 hub.disuses("bar");
556
557 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
558 assert!(hub.local_data_src_manager.vec_ready.is_empty());
559 assert!(hub.local_data_src_manager.local);
560 assert!(hub.data_src_map.is_empty());
561 assert_eq!(hub.data_conn_manager.vec.len(), 0);
562 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
563 assert!(!hub.fixed);
564 }
565
566 #[test]
567 fn test_disuses_and_fix() {
568 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
569
570 let mut hub = DataHub::new();
571 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
572 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
573
574 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
575 assert!(hub.local_data_src_manager.vec_ready.is_empty());
576 assert!(hub.local_data_src_manager.local);
577 assert!(hub.data_src_map.is_empty());
578 assert_eq!(hub.data_conn_manager.vec.len(), 0);
579 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
580 assert!(!hub.fixed);
581
582 hub.disuses("foo");
583
584 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 1);
585 assert!(hub.local_data_src_manager.vec_ready.is_empty());
586 assert!(hub.local_data_src_manager.local);
587 assert!(hub.data_src_map.is_empty());
588 assert_eq!(hub.data_conn_manager.vec.len(), 0);
589 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
590 assert!(!hub.fixed);
591
592 hub.disuses("bar");
593
594 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
595 assert!(hub.local_data_src_manager.vec_ready.is_empty());
596 assert!(hub.local_data_src_manager.local);
597 assert!(hub.data_src_map.is_empty());
598 assert_eq!(hub.data_conn_manager.vec.len(), 0);
599 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
600 assert!(!hub.fixed);
601
602 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
603 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
604
605 assert!(hub.begin().is_ok());
606
607 assert!(hub.local_data_src_manager.vec_unready.is_empty());
608 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
609 assert!(hub.local_data_src_manager.local);
610 assert_eq!(hub.data_src_map.len(), 2);
611 assert_eq!(hub.data_conn_manager.vec.len(), 0);
612 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
613 assert!(hub.fixed);
614
615 hub.uses("baz", MyDataSrc::new(3, logger.clone(), Failure::None));
616
617 assert!(hub.local_data_src_manager.vec_unready.is_empty());
618 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
619 assert!(hub.local_data_src_manager.local);
620 assert_eq!(hub.data_src_map.len(), 2);
621 assert_eq!(hub.data_conn_manager.vec.len(), 0);
622 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
623 assert!(hub.fixed);
624
625 hub.disuses("bar");
626
627 assert!(hub.local_data_src_manager.vec_unready.is_empty());
628 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
629 assert!(hub.local_data_src_manager.local);
630 assert_eq!(hub.data_src_map.len(), 2);
631 assert_eq!(hub.data_conn_manager.vec.len(), 0);
632 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
633 assert!(hub.fixed);
634
635 hub.end();
636
637 assert!(hub.local_data_src_manager.vec_unready.is_empty());
638 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
639 assert!(hub.local_data_src_manager.local);
640 assert_eq!(hub.data_src_map.len(), 2);
641 assert_eq!(hub.data_conn_manager.vec.len(), 0);
642 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
643 assert!(!hub.fixed);
644
645 hub.disuses("bar");
646
647 assert!(hub.local_data_src_manager.vec_unready.is_empty());
648 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 1);
649 assert!(hub.local_data_src_manager.local);
650 assert_eq!(hub.data_src_map.len(), 1);
651 assert_eq!(hub.data_conn_manager.vec.len(), 0);
652 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
653 assert!(!hub.fixed);
654
655 hub.disuses("foo");
656
657 assert!(hub.local_data_src_manager.vec_unready.is_empty());
658 assert!(hub.local_data_src_manager.vec_ready.is_empty());
659 assert!(hub.local_data_src_manager.local);
660 assert_eq!(hub.data_src_map.len(), 0);
661 assert_eq!(hub.data_conn_manager.vec.len(), 0);
662 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
663 assert!(!hub.fixed);
664 }
665
666 #[test]
667 fn test_begin_if_empty() {
668 let mut hub = DataHub::new();
669 assert!(hub.begin().is_ok());
670
671 assert!(hub.local_data_src_manager.vec_unready.is_empty());
672 assert!(hub.local_data_src_manager.vec_ready.is_empty());
673 assert!(hub.local_data_src_manager.local);
674 assert_eq!(hub.data_src_map.len(), 0);
675 assert_eq!(hub.data_conn_manager.vec.len(), 0);
676 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
677 assert!(hub.fixed);
678
679 hub.end();
680
681 assert!(hub.local_data_src_manager.vec_unready.is_empty());
682 assert!(hub.local_data_src_manager.vec_ready.is_empty());
683 assert!(hub.local_data_src_manager.local);
684 assert_eq!(hub.data_src_map.len(), 0);
685 assert_eq!(hub.data_conn_manager.vec.len(), 0);
686 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
687 assert!(!hub.fixed);
688 }
689
690 #[test]
691 fn test_begin_and_ok() {
692 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
693
694 {
695 let mut hub = DataHub::new();
696
697 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
698 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
699
700 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
701 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 0);
702 assert_eq!(hub.local_data_src_manager.local, true);
703 assert_eq!(hub.data_src_map.len(), 0);
704 assert_eq!(hub.data_conn_manager.vec.len(), 0);
705 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
706 assert_eq!(hub.fixed, false);
707
708 assert_eq!(hub.begin().is_ok(), true);
709
710 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
711 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
712 assert_eq!(hub.local_data_src_manager.local, true);
713 assert_eq!(hub.data_src_map.len(), 2);
714 assert_eq!(hub.data_conn_manager.vec.len(), 0);
715 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
716 assert_eq!(hub.fixed, true);
717
718 hub.end();
719
720 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
721 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
722 assert_eq!(hub.local_data_src_manager.local, true);
723 assert_eq!(hub.data_src_map.len(), 2);
724 assert_eq!(hub.data_conn_manager.vec.len(), 0);
725 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
726 assert_eq!(hub.fixed, false);
727 }
728
729 assert_eq!(
730 *logger.lock().unwrap(),
731 &[
732 "MyDataSrc::new 1",
733 "MyDataSrc::new 2",
734 "MyDataSrc::setup 1",
735 "MyDataSrc::setup 2",
736 "MyDataSrc::close 2",
737 "MyDataSrc::drop 2",
738 "MyDataSrc::close 1",
739 "MyDataSrc::drop 1",
740 ]
741 );
742 }
743
744 #[test]
745 fn test_begin_but_failed() {
746 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
747
748 {
749 let mut hub = DataHub::new();
750
751 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
752 hub.uses(
753 "bar",
754 MyDataSrc::new(2, logger.clone(), Failure::FailToSetup),
755 );
756 hub.uses("baz", MyDataSrc::new(3, logger.clone(), Failure::None));
757
758 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 3);
759 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 0);
760 assert_eq!(hub.local_data_src_manager.local, true);
761 assert_eq!(hub.data_src_map.len(), 0);
762 assert_eq!(hub.data_conn_manager.vec.len(), 0);
763 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
764 assert_eq!(hub.fixed, false);
765
766 if let Err(err) = hub.begin() {
767 match err.reason::<DataHubError>() {
768 Ok(DataHubError::FailToSetupLocalDataSrcs { errors }) => {
769 assert_eq!(errors.len(), 1);
770 assert_eq!(errors[0].0, "bar".into());
771 assert_eq!(errors[0].1.reason::<String>().unwrap(), "setup error");
772 }
773 _ => panic!(),
774 }
775 } else {
776 panic!();
777 }
778
779 hub.end();
780 }
781
782 assert_eq!(
783 *logger.lock().unwrap(),
784 &[
785 "MyDataSrc::new 1",
786 "MyDataSrc::new 2",
787 "MyDataSrc::new 3",
788 "MyDataSrc::setup 1",
789 "MyDataSrc::setup 2 failed",
790 "MyDataSrc::close 2",
791 "MyDataSrc::close 1",
792 "MyDataSrc::drop 3",
793 "MyDataSrc::drop 2",
794 "MyDataSrc::drop 1",
795 ]
796 );
797 }
798
799 #[test]
800 fn test_run_and_ok() {
801 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
802 {
803 let mut hub = DataHub::new();
804
805 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
806 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
807
808 let logger_clone = logger.clone();
809 assert!(hub
810 .run(move |_data| {
811 logger_clone
812 .lock()
813 .unwrap()
814 .push("execute logic".to_string());
815 Ok(())
816 })
817 .is_ok());
818 }
819
820 assert_eq!(
821 *logger.lock().unwrap(),
822 &[
823 "MyDataSrc::new 1",
824 "MyDataSrc::new 2",
825 "MyDataSrc::setup 1",
826 "MyDataSrc::setup 2",
827 "execute logic",
828 "MyDataSrc::close 2",
829 "MyDataSrc::drop 2",
830 "MyDataSrc::close 1",
831 "MyDataSrc::drop 1",
832 ]
833 );
834 }
835
836 #[test]
837 fn test_run_but_failed() {
838 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
839 {
840 let mut hub = DataHub::new();
841
842 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
843 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
844
845 let logger_clone = logger.clone();
846 if let Err(err) = hub.run(move |_data| {
847 logger_clone
848 .lock()
849 .unwrap()
850 .push("execute logic but fail".to_string());
851 Err(errs::Err::new("logic error".to_string()))
852 }) {
853 match err.reason::<String>() {
854 Ok(s) => assert_eq!(s, "logic error"),
855 _ => panic!(),
856 }
857 } else {
858 panic!();
859 }
860 }
861
862 assert_eq!(
863 *logger.lock().unwrap(),
864 &[
865 "MyDataSrc::new 1",
866 "MyDataSrc::new 2",
867 "MyDataSrc::setup 1",
868 "MyDataSrc::setup 2",
869 "execute logic but fail",
870 "MyDataSrc::close 2",
871 "MyDataSrc::drop 2",
872 "MyDataSrc::close 1",
873 "MyDataSrc::drop 1",
874 ]
875 );
876 }
877
878 #[test]
879 fn test_txn_and_no_data_access_and_ok() {
880 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
881 {
882 let mut hub = DataHub::new();
883
884 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
885 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
886
887 let logger_clone = logger.clone();
888 assert!(hub
889 .txn(move |_data| {
890 logger_clone
891 .lock()
892 .unwrap()
893 .push("execute logic".to_string());
894 Ok(())
895 })
896 .is_ok());
897 }
898
899 assert_eq!(
900 *logger.lock().unwrap(),
901 &[
902 "MyDataSrc::new 1",
903 "MyDataSrc::new 2",
904 "MyDataSrc::setup 1",
905 "MyDataSrc::setup 2",
906 "execute logic",
907 "MyDataSrc::close 2",
908 "MyDataSrc::drop 2",
909 "MyDataSrc::close 1",
910 "MyDataSrc::drop 1",
911 ]
912 );
913 }
914
915 #[test]
916 fn test_txn_and_has_data_access_and_ok() {
917 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
918 {
919 let mut hub = DataHub::new();
920
921 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
922 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
923
924 let logger_clone = logger.clone();
925 hub.txn(move |data| {
926 logger_clone
927 .lock()
928 .unwrap()
929 .push("execute logic".to_string());
930 let _conn1 = data.get_data_conn::<MyDataConn>("foo")?;
931 let _conn2 = data.get_data_conn::<MyDataConn>("bar")?;
932 Ok(())
933 })
934 .unwrap();
935 }
936
937 assert_eq!(
938 *logger.lock().unwrap(),
939 &[
940 "MyDataSrc::new 1",
941 "MyDataSrc::new 2",
942 "MyDataSrc::setup 1",
943 "MyDataSrc::setup 2",
944 "execute logic",
945 "MyDataSrc::create_data_conn 1",
946 "MyDataConn::new 1",
947 "MyDataSrc::create_data_conn 2",
948 "MyDataConn::new 2",
949 "MyDataConn::pre_commit 1",
950 "MyDataConn::pre_commit 2",
951 "MyDataConn::commit 1",
952 "MyDataConn::commit 2",
953 "MyDataConn::post_commit 1",
954 "MyDataConn::post_commit 2",
955 "MyDataConn::close 1",
956 "MyDataConn::drop 1",
957 "MyDataConn::close 2",
958 "MyDataConn::drop 2",
959 "MyDataSrc::close 2",
960 "MyDataSrc::drop 2",
961 "MyDataSrc::close 1",
962 "MyDataSrc::drop 1",
963 ]
964 );
965 }
966
967 #[test]
968 fn test_txn_but_failed() {
969 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
970 {
971 let mut hub = DataHub::new();
972
973 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
974 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
975
976 let logger_clone = logger.clone();
977 if let Err(e) = hub.txn(move |data| {
978 logger_clone
979 .lock()
980 .unwrap()
981 .push("execute logic".to_string());
982 let _conn1 = data.get_data_conn::<MyDataConn>("foo")?;
983 let _conn2 = data.get_data_conn::<MyDataConn>("bar")?;
984 Err(errs::Err::new("logic error"))
985 }) {
986 match e.reason::<&str>() {
987 Ok(s) => assert_eq!(s, &"logic error"),
988 _ => panic!(),
989 }
990 }
991 }
992
993 assert_eq!(
994 *logger.lock().unwrap(),
995 &[
996 "MyDataSrc::new 1",
997 "MyDataSrc::new 2",
998 "MyDataSrc::setup 1",
999 "MyDataSrc::setup 2",
1000 "execute logic",
1001 "MyDataSrc::create_data_conn 1",
1002 "MyDataConn::new 1",
1003 "MyDataSrc::create_data_conn 2",
1004 "MyDataConn::new 2",
1005 "MyDataConn::rollback 1",
1006 "MyDataConn::rollback 2",
1007 "MyDataConn::close 1",
1008 "MyDataConn::drop 1",
1009 "MyDataConn::close 2",
1010 "MyDataConn::drop 2",
1011 "MyDataSrc::close 2",
1012 "MyDataSrc::drop 2",
1013 "MyDataSrc::close 1",
1014 "MyDataSrc::drop 1",
1015 ]
1016 );
1017 }
1018
1019 #[test]
1020 fn test_txn_with_commit_order() {
1021 let _logger = Arc::new(Mutex::new(Vec::<String>::new()));
1022 {
1023 let mut _hub = DataHub::new();
1024 }
1025 }
1026
1027 #[test]
1028 fn test_get_data_conn_and_ok() {
1029 let _logger = Arc::new(Mutex::new(Vec::<String>::new()));
1030 {
1031 let mut _hub = DataHub::new();
1032 }
1033 }
1034
1035 #[test]
1036 fn test_get_data_conn_and_failed() {
1037 let _logger = Arc::new(Mutex::new(Vec::<String>::new()));
1038 {
1039 let mut _hub = DataHub::new();
1040 }
1041 }
1042}