1use super::data_src::{copy_global_data_srcs_to_map, create_data_conn_from_global_data_src_async};
6use super::{DataConn, DataConnContainer, DataConnManager, DataHub, DataSrc, DataSrcManager};
7
8use std::collections::HashMap;
9use std::future::Future;
10use std::pin::Pin;
11use std::sync::Arc;
12use std::{any, ptr};
13
14#[derive(Debug)]
16pub enum DataHubError {
17 FailToSetupLocalDataSrcs {
19 errors: Vec<(Arc<str>, errs::Err)>,
21 },
22
23 NoDataSrcToCreateDataConn {
26 name: Arc<str>,
28 data_conn_type: &'static str,
30 },
31}
32
33impl DataHub {
34 #[allow(clippy::new_without_default)]
39 pub fn new() -> Self {
40 let mut data_src_map = HashMap::new();
41 copy_global_data_srcs_to_map(&mut data_src_map);
42
43 Self {
44 local_data_src_manager: DataSrcManager::new(true),
45 data_src_map,
46 data_conn_manager: DataConnManager::new(),
47 fixed: false,
48 }
49 }
50
51 pub fn with_commit_order(names: &[&str]) -> Self {
61 let mut data_src_map = HashMap::new();
62 copy_global_data_srcs_to_map(&mut data_src_map);
63
64 Self {
65 local_data_src_manager: DataSrcManager::new(true),
66 data_src_map,
67 data_conn_manager: DataConnManager::with_commit_order(names),
68 fixed: false,
69 }
70 }
71
72 pub fn uses<S, C>(&mut self, name: impl Into<Arc<str>>, ds: S)
87 where
88 S: DataSrc<C> + 'static,
89 C: DataConn + 'static,
90 {
91 if self.fixed {
92 return;
93 }
94 self.local_data_src_manager.add(name, ds);
95 }
96
97 pub fn disuses(&mut self, name: impl AsRef<str>) {
106 if self.fixed {
107 return;
108 }
109 self.data_src_map.remove(name.as_ref());
110 self.local_data_src_manager.remove(name);
111 }
112
113 #[inline]
114 async fn begin_async(&mut self) -> errs::Result<()> {
115 self.fixed = true;
116
117 let mut errors = Vec::new();
118
119 self.local_data_src_manager.setup_async(&mut errors).await;
120 if errors.is_empty() {
121 self.local_data_src_manager
122 .copy_ds_ready_to_map(&mut self.data_src_map);
123 Ok(())
124 } else {
125 Err(errs::Err::new(DataHubError::FailToSetupLocalDataSrcs {
126 errors,
127 }))
128 }
129 }
130
131 #[inline]
132 async fn commit_async(&mut self) -> errs::Result<()> {
133 self.data_conn_manager.commit_async().await
134 }
135
136 #[inline]
137 async fn rollback_async(&mut self) {
138 self.data_conn_manager.rollback_async().await
139 }
140
141 #[inline]
142 fn end(&mut self) {
143 self.data_conn_manager.close();
144 self.fixed = false;
145 }
146
147 #[allow(clippy::doc_overindented_list_items)]
167 pub async fn run_async<F>(&mut self, mut logic_fn: F) -> errs::Result<()>
168 where
169 for<'a> F: FnMut(&'a mut DataHub) -> Pin<Box<dyn Future<Output = errs::Result<()>> + 'a>>,
170 {
171 let mut r = self.begin_async().await;
172 if r.is_ok() {
173 r = logic_fn(self).await;
174 }
175 self.end();
176 r
177 }
178
179 #[allow(clippy::doc_overindented_list_items)]
201 pub async fn txn_async<F>(&mut self, mut logic_fn: F) -> errs::Result<()>
202 where
203 for<'a> F: FnMut(&'a mut DataHub) -> Pin<Box<dyn Future<Output = errs::Result<()>> + 'a>>,
204 {
205 let mut r = self.begin_async().await;
206 if r.is_ok() {
207 r = logic_fn(self).await;
208 }
209 if r.is_ok() {
210 r = self.commit_async().await;
211 }
212 if r.is_err() {
213 self.rollback_async().await;
214 }
215 self.end();
216 r
217 }
218
219 pub async fn get_data_conn_async<C>(&mut self, name: impl AsRef<str>) -> errs::Result<&mut C>
239 where
240 C: DataConn + 'static,
241 {
242 if let Some(nnptr) = self.data_conn_manager.find_by_name(name.as_ref()) {
243 let typed_nnptr = DataConnManager::to_typed_ptr::<C>(&nnptr)?;
244 return Ok(unsafe { &mut (*typed_nnptr).data_conn });
245 }
246
247 if let Some((local, index)) = self.data_src_map.get(name.as_ref()) {
248 let boxed = if *local {
249 self.local_data_src_manager
250 .create_data_conn_async::<C>(*index, name.as_ref())
251 .await?
252 } else {
253 create_data_conn_from_global_data_src_async::<C>(*index, name.as_ref()).await?
254 };
255
256 let ptr = Box::into_raw(boxed);
257 if let Some(nnptr) = ptr::NonNull::new(ptr) {
258 self.data_conn_manager
259 .add(nnptr.cast::<DataConnContainer>());
260
261 let typed_ptr = ptr.cast::<DataConnContainer<C>>();
262 return Ok(unsafe { &mut (*typed_ptr).data_conn });
263 } else {
264 }
266 }
267
268 Err(errs::Err::new(DataHubError::NoDataSrcToCreateDataConn {
269 name: name.as_ref().into(),
270 data_conn_type: any::type_name::<C>(),
271 }))
272 }
273}
274
275#[macro_export]
295macro_rules! logic {
296 ($f:expr) => {
297 |data| Box::pin($f(data))
298 };
299}
300
301#[cfg(test)]
302mod tests_of_data_hub {
303 use super::*;
304 use crate::tokio::AsyncGroup;
305 use std::sync::Mutex;
306
307 #[derive(Clone, Copy, PartialEq)]
308 enum Failure {
309 None,
310 FailToPreCommit,
311 FailToCommit,
312 FailToSetup,
313 FailToCreateDataConn,
314 }
315
316 struct MyDataConn {
317 id: i8,
318 failure: Failure,
319 committed: bool,
320 logger: Arc<Mutex<Vec<String>>>,
321 }
322 impl MyDataConn {
323 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, failure: Failure) -> Self {
324 logger
325 .lock()
326 .unwrap()
327 .push(format!("MyDataConn::new {}", id));
328 Self {
329 id,
330 failure,
331 committed: false,
332 logger,
333 }
334 }
335 }
336 impl Drop for MyDataConn {
337 fn drop(&mut self) {
338 self.logger
339 .lock()
340 .unwrap()
341 .push(format!("MyDataConn::drop {}", self.id));
342 }
343 }
344 impl DataConn for MyDataConn {
345 async fn pre_commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
346 if self.failure == Failure::FailToPreCommit {
347 self.logger
348 .lock()
349 .unwrap()
350 .push(format!("MyDataConn::pre_commit {} failed", self.id));
351 Err(errs::Err::new("pre commit error"))
352 } else {
353 self.logger
354 .lock()
355 .unwrap()
356 .push(format!("MyDataConn::pre_commit {}", self.id));
357 Ok(())
358 }
359 }
360 async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
361 if self.failure == Failure::FailToCommit {
362 self.logger
363 .lock()
364 .unwrap()
365 .push(format!("MyDataConn::commit {} failed", self.id));
366 Err(errs::Err::new("commit error"))
367 } else {
368 self.logger
369 .lock()
370 .unwrap()
371 .push(format!("MyDataConn::commit {}", self.id));
372 self.committed = true;
373 Ok(())
374 }
375 }
376 async fn post_commit_async(&mut self, _ag: &mut AsyncGroup) {
377 self.logger
378 .lock()
379 .unwrap()
380 .push(format!("MyDataConn::post_commit {}", self.id));
381 }
382 async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {
383 self.logger
384 .lock()
385 .unwrap()
386 .push(format!("MyDataConn::rollback {}", self.id));
387 }
388 async fn force_back_async(&mut self, _ag: &mut AsyncGroup) {
389 self.logger
390 .lock()
391 .unwrap()
392 .push(format!("MyDataConn::force_back {}", self.id));
393 }
394 fn close(&mut self) {
395 self.logger
396 .lock()
397 .unwrap()
398 .push(format!("MyDataConn::close {}", self.id));
399 }
400 }
401
402 struct MyDataSrc {
403 id: i8,
404 failure: Failure,
405 logger: Arc<Mutex<Vec<String>>>,
406 }
407 impl MyDataSrc {
408 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, failure: Failure) -> Self {
409 logger
410 .lock()
411 .unwrap()
412 .push(format!("MyDataSrc::new {}", id));
413 Self {
414 id,
415 failure,
416 logger,
417 }
418 }
419 }
420 impl Drop for MyDataSrc {
421 fn drop(&mut self) {
422 self.logger
423 .lock()
424 .unwrap()
425 .push(format!("MyDataSrc::drop {}", self.id));
426 }
427 }
428 impl DataSrc<MyDataConn> for MyDataSrc {
429 async fn setup_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
430 if self.failure == Failure::FailToSetup {
431 self.logger
432 .lock()
433 .unwrap()
434 .push(format!("MyDataSrc::setup {} failed", self.id));
435 Err(errs::Err::new("setup error".to_string()))
436 } else {
437 self.logger
438 .lock()
439 .unwrap()
440 .push(format!("MyDataSrc::setup {}", self.id));
441 Ok(())
442 }
443 }
444 fn close(&mut self) {
445 self.logger
446 .lock()
447 .unwrap()
448 .push(format!("MyDataSrc::close {}", self.id));
449 }
450 async fn create_data_conn_async(&mut self) -> errs::Result<Box<MyDataConn>> {
451 if self.failure == Failure::FailToCreateDataConn {
452 self.logger
453 .lock()
454 .unwrap()
455 .push(format!("MyDataSrc::create_data_conn {} failed", self.id));
456 return Err(errs::Err::new("eeee".to_string()));
457 }
458 {
459 self.logger
460 .lock()
461 .unwrap()
462 .push(format!("MyDataSrc::create_data_conn {}", self.id));
463 }
464 let conn = MyDataConn::new(self.id, self.logger.clone(), self.failure);
465 Ok(Box::new(conn))
466 }
467 }
468
469 #[test]
470 fn test_new() {
471 let hub = DataHub::new();
472 assert!(hub.local_data_src_manager.vec_unready.is_empty());
473 assert!(hub.local_data_src_manager.vec_ready.is_empty());
474 assert!(hub.local_data_src_manager.local);
475 assert!(hub.data_src_map.is_empty());
476 assert!(hub.data_conn_manager.vec.is_empty());
477 assert!(hub.data_conn_manager.index_map.is_empty());
478 assert!(!hub.fixed);
479 }
480
481 #[test]
482 fn test_with_commit_order() {
483 let hub = DataHub::with_commit_order(&["bar", "qux", "foo"]);
484 assert!(hub.local_data_src_manager.vec_unready.is_empty());
485 assert!(hub.local_data_src_manager.vec_ready.is_empty());
486 assert!(hub.local_data_src_manager.local);
487 assert!(hub.data_src_map.is_empty());
488 assert_eq!(hub.data_conn_manager.vec.len(), 3);
489 assert_eq!(hub.data_conn_manager.index_map.len(), 3);
490 assert!(!hub.fixed);
491 }
492
493 #[tokio::test]
494 async fn test_uses_and_ok() {
495 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
496
497 let mut hub = DataHub::new();
498 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
499 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
500
501 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
502 assert!(hub.local_data_src_manager.vec_ready.is_empty());
503 assert!(hub.local_data_src_manager.local);
504 assert!(hub.data_src_map.is_empty());
505 assert_eq!(hub.data_conn_manager.vec.len(), 0);
506 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
507 assert!(!hub.fixed);
508
509 assert!(hub.begin_async().await.is_ok());
510
511 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
512 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
513 assert!(hub.local_data_src_manager.local);
514 assert_eq!(hub.data_src_map.len(), 2);
515 assert_eq!(hub.data_conn_manager.vec.len(), 0);
516 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
517 assert!(hub.fixed);
518 }
519
520 #[tokio::test]
521 async fn test_uses_but_already_fixed() {
522 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
523
524 let mut hub = DataHub::new();
525 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
526
527 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 1);
528 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 0);
529 assert!(hub.local_data_src_manager.local);
530 assert_eq!(hub.data_src_map.len(), 0);
531 assert_eq!(hub.data_conn_manager.vec.len(), 0);
532 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
533 assert!(!hub.fixed);
534
535 assert!(hub.begin_async().await.is_ok());
536
537 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
538 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 1);
539 assert!(hub.local_data_src_manager.local);
540 assert_eq!(hub.data_src_map.len(), 1);
541 assert_eq!(hub.data_conn_manager.vec.len(), 0);
542 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
543 assert!(hub.fixed);
544
545 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
546
547 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
548 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 1);
549 assert!(hub.local_data_src_manager.local);
550 assert_eq!(hub.data_src_map.len(), 1);
551 assert_eq!(hub.data_conn_manager.vec.len(), 0);
552 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
553 assert!(hub.fixed);
554 }
555
556 #[test]
557 fn test_disuses_and_ok() {
558 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
559
560 let mut hub = DataHub::new();
561 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
562 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
563
564 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
565 assert!(hub.local_data_src_manager.vec_ready.is_empty());
566 assert!(hub.local_data_src_manager.local);
567 assert!(hub.data_src_map.is_empty());
568 assert_eq!(hub.data_conn_manager.vec.len(), 0);
569 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
570 assert!(!hub.fixed);
571
572 hub.disuses("foo");
573
574 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 1);
575 assert!(hub.local_data_src_manager.vec_ready.is_empty());
576 assert!(hub.local_data_src_manager.local);
577 assert!(hub.data_src_map.is_empty());
578 assert_eq!(hub.data_conn_manager.vec.len(), 0);
579 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
580 assert!(!hub.fixed);
581
582 hub.disuses("bar");
583
584 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
585 assert!(hub.local_data_src_manager.vec_ready.is_empty());
586 assert!(hub.local_data_src_manager.local);
587 assert!(hub.data_src_map.is_empty());
588 assert_eq!(hub.data_conn_manager.vec.len(), 0);
589 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
590 assert!(!hub.fixed);
591 }
592
593 #[tokio::test]
594 async fn test_disuses_and_fix() {
595 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
596
597 let mut hub = DataHub::new();
598 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
599 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
600
601 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
602 assert!(hub.local_data_src_manager.vec_ready.is_empty());
603 assert!(hub.local_data_src_manager.local);
604 assert!(hub.data_src_map.is_empty());
605 assert_eq!(hub.data_conn_manager.vec.len(), 0);
606 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
607 assert!(!hub.fixed);
608
609 hub.disuses("foo");
610
611 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 1);
612 assert!(hub.local_data_src_manager.vec_ready.is_empty());
613 assert!(hub.local_data_src_manager.local);
614 assert!(hub.data_src_map.is_empty());
615 assert_eq!(hub.data_conn_manager.vec.len(), 0);
616 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
617 assert!(!hub.fixed);
618
619 hub.disuses("bar");
620
621 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
622 assert!(hub.local_data_src_manager.vec_ready.is_empty());
623 assert!(hub.local_data_src_manager.local);
624 assert!(hub.data_src_map.is_empty());
625 assert_eq!(hub.data_conn_manager.vec.len(), 0);
626 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
627 assert!(!hub.fixed);
628
629 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
630 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
631
632 assert!(hub.begin_async().await.is_ok());
633
634 assert!(hub.local_data_src_manager.vec_unready.is_empty());
635 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
636 assert!(hub.local_data_src_manager.local);
637 assert_eq!(hub.data_src_map.len(), 2);
638 assert_eq!(hub.data_conn_manager.vec.len(), 0);
639 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
640 assert!(hub.fixed);
641
642 hub.uses("baz", MyDataSrc::new(3, logger.clone(), Failure::None));
643
644 assert!(hub.local_data_src_manager.vec_unready.is_empty());
645 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
646 assert!(hub.local_data_src_manager.local);
647 assert_eq!(hub.data_src_map.len(), 2);
648 assert_eq!(hub.data_conn_manager.vec.len(), 0);
649 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
650 assert!(hub.fixed);
651
652 hub.disuses("bar");
653
654 assert!(hub.local_data_src_manager.vec_unready.is_empty());
655 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
656 assert!(hub.local_data_src_manager.local);
657 assert_eq!(hub.data_src_map.len(), 2);
658 assert_eq!(hub.data_conn_manager.vec.len(), 0);
659 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
660 assert!(hub.fixed);
661
662 hub.end();
663
664 assert!(hub.local_data_src_manager.vec_unready.is_empty());
665 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
666 assert!(hub.local_data_src_manager.local);
667 assert_eq!(hub.data_src_map.len(), 2);
668 assert_eq!(hub.data_conn_manager.vec.len(), 0);
669 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
670 assert!(!hub.fixed);
671
672 hub.disuses("bar");
673
674 assert!(hub.local_data_src_manager.vec_unready.is_empty());
675 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 1);
676 assert!(hub.local_data_src_manager.local);
677 assert_eq!(hub.data_src_map.len(), 1);
678 assert_eq!(hub.data_conn_manager.vec.len(), 0);
679 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
680 assert!(!hub.fixed);
681
682 hub.disuses("foo");
683
684 assert!(hub.local_data_src_manager.vec_unready.is_empty());
685 assert!(hub.local_data_src_manager.vec_ready.is_empty());
686 assert!(hub.local_data_src_manager.local);
687 assert_eq!(hub.data_src_map.len(), 0);
688 assert_eq!(hub.data_conn_manager.vec.len(), 0);
689 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
690 assert!(!hub.fixed);
691 }
692
693 #[tokio::test]
694 async fn test_begin_if_empty() {
695 let mut hub = DataHub::new();
696 assert!(hub.begin_async().await.is_ok());
697
698 assert!(hub.local_data_src_manager.vec_unready.is_empty());
699 assert!(hub.local_data_src_manager.vec_ready.is_empty());
700 assert!(hub.local_data_src_manager.local);
701 assert_eq!(hub.data_src_map.len(), 0);
702 assert_eq!(hub.data_conn_manager.vec.len(), 0);
703 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
704 assert!(hub.fixed);
705
706 hub.end();
707
708 assert!(hub.local_data_src_manager.vec_unready.is_empty());
709 assert!(hub.local_data_src_manager.vec_ready.is_empty());
710 assert!(hub.local_data_src_manager.local);
711 assert_eq!(hub.data_src_map.len(), 0);
712 assert_eq!(hub.data_conn_manager.vec.len(), 0);
713 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
714 assert!(!hub.fixed);
715 }
716
717 #[tokio::test]
718 async fn test_begin_and_ok() {
719 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
720
721 {
722 let mut hub = DataHub::new();
723
724 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
725 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
726
727 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
728 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 0);
729 assert_eq!(hub.local_data_src_manager.local, true);
730 assert_eq!(hub.data_src_map.len(), 0);
731 assert_eq!(hub.data_conn_manager.vec.len(), 0);
732 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
733 assert_eq!(hub.fixed, false);
734
735 assert_eq!(hub.begin_async().await.is_ok(), true);
736
737 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
738 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
739 assert_eq!(hub.local_data_src_manager.local, true);
740 assert_eq!(hub.data_src_map.len(), 2);
741 assert_eq!(hub.data_conn_manager.vec.len(), 0);
742 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
743 assert_eq!(hub.fixed, true);
744
745 hub.end();
746
747 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
748 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
749 assert_eq!(hub.local_data_src_manager.local, true);
750 assert_eq!(hub.data_src_map.len(), 2);
751 assert_eq!(hub.data_conn_manager.vec.len(), 0);
752 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
753 assert_eq!(hub.fixed, false);
754 }
755
756 assert_eq!(
757 *logger.lock().unwrap(),
758 &[
759 "MyDataSrc::new 1",
760 "MyDataSrc::new 2",
761 "MyDataSrc::setup 1",
762 "MyDataSrc::setup 2",
763 "MyDataSrc::close 2",
764 "MyDataSrc::drop 2",
765 "MyDataSrc::close 1",
766 "MyDataSrc::drop 1",
767 ]
768 );
769 }
770
771 #[tokio::test]
772 async fn test_begin_but_failed() {
773 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
774
775 {
776 let mut hub = DataHub::new();
777
778 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
779 hub.uses(
780 "bar",
781 MyDataSrc::new(2, logger.clone(), Failure::FailToSetup),
782 );
783 hub.uses("baz", MyDataSrc::new(3, logger.clone(), Failure::None));
784
785 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 3);
786 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 0);
787 assert_eq!(hub.local_data_src_manager.local, true);
788 assert_eq!(hub.data_src_map.len(), 0);
789 assert_eq!(hub.data_conn_manager.vec.len(), 0);
790 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
791 assert_eq!(hub.fixed, false);
792
793 if let Err(err) = hub.begin_async().await {
794 match err.reason::<DataHubError>() {
795 Ok(DataHubError::FailToSetupLocalDataSrcs { errors }) => {
796 assert_eq!(errors.len(), 1);
797 assert_eq!(errors[0].0, "bar".into());
798 assert_eq!(errors[0].1.reason::<String>().unwrap(), "setup error");
799 }
800 _ => panic!(),
801 }
802 } else {
803 panic!();
804 }
805
806 hub.end();
807 }
808
809 assert_eq!(
810 *logger.lock().unwrap(),
811 &[
812 "MyDataSrc::new 1",
813 "MyDataSrc::new 2",
814 "MyDataSrc::new 3",
815 "MyDataSrc::setup 1",
816 "MyDataSrc::setup 2 failed",
817 "MyDataSrc::close 2",
818 "MyDataSrc::close 1",
819 "MyDataSrc::drop 3",
820 "MyDataSrc::drop 2",
821 "MyDataSrc::drop 1",
822 ]
823 );
824 }
825
826 #[tokio::test]
827 async fn test_run_and_ok() {
828 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
829 {
830 let mut hub = DataHub::new();
831
832 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
833 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
834
835 let logger_clone = logger.clone();
836 assert!(hub
837 .run_async(|_data| {
838 let logger_clone2 = logger_clone.clone();
839 Box::pin(async move {
840 logger_clone2
841 .lock()
842 .unwrap()
843 .push("execute logic".to_string());
844 Ok(())
845 })
846 })
847 .await
848 .is_ok());
849 }
850
851 assert_eq!(
852 *logger.lock().unwrap(),
853 &[
854 "MyDataSrc::new 1",
855 "MyDataSrc::new 2",
856 "MyDataSrc::setup 1",
857 "MyDataSrc::setup 2",
858 "execute logic",
859 "MyDataSrc::close 2",
860 "MyDataSrc::drop 2",
861 "MyDataSrc::close 1",
862 "MyDataSrc::drop 1",
863 ]
864 );
865 }
866
867 #[tokio::test]
868 async fn test_run_but_failed() {
869 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
870 {
871 let mut hub = DataHub::new();
872
873 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
874 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
875
876 let logger_clone = logger.clone();
877 if let Err(err) = hub
878 .run_async(|_data| {
879 let logger_clone2 = logger_clone.clone();
880 Box::pin(async move {
881 logger_clone2
882 .lock()
883 .unwrap()
884 .push("execute logic but fail".to_string());
885 Err(errs::Err::new("logic error".to_string()))
886 })
887 })
888 .await
889 {
890 match err.reason::<String>() {
891 Ok(s) => assert_eq!(s, "logic error"),
892 _ => panic!(),
893 }
894 } else {
895 panic!();
896 }
897 }
898
899 assert_eq!(
900 *logger.lock().unwrap(),
901 &[
902 "MyDataSrc::new 1",
903 "MyDataSrc::new 2",
904 "MyDataSrc::setup 1",
905 "MyDataSrc::setup 2",
906 "execute logic but fail",
907 "MyDataSrc::close 2",
908 "MyDataSrc::drop 2",
909 "MyDataSrc::close 1",
910 "MyDataSrc::drop 1",
911 ]
912 );
913 }
914
915 #[tokio::test]
916 async fn test_txn_and_no_data_access_and_ok() {
917 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
918 {
919 let mut hub = DataHub::new();
920
921 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
922 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
923
924 let logger_clone = logger.clone();
925 assert!(hub
926 .txn_async(|_data| {
927 let logger_clone2 = logger_clone.clone();
928 Box::pin(async move {
929 logger_clone2
930 .lock()
931 .unwrap()
932 .push("execute logic".to_string());
933 Ok(())
934 })
935 })
936 .await
937 .is_ok());
938 }
939
940 assert_eq!(
941 *logger.lock().unwrap(),
942 &[
943 "MyDataSrc::new 1",
944 "MyDataSrc::new 2",
945 "MyDataSrc::setup 1",
946 "MyDataSrc::setup 2",
947 "execute logic",
948 "MyDataSrc::close 2",
949 "MyDataSrc::drop 2",
950 "MyDataSrc::close 1",
951 "MyDataSrc::drop 1",
952 ]
953 );
954 }
955
956 #[tokio::test]
957 async fn test_txn_and_has_data_access_and_ok() {
958 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
959 {
960 let mut hub = DataHub::new();
961
962 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
963 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
964
965 let logger_clone = logger.clone();
966 hub.txn_async(move |data| {
967 let logger_clone2 = logger_clone.clone();
968 Box::pin(async move {
969 logger_clone2
970 .lock()
971 .unwrap()
972 .push("execute logic".to_string());
973 let _conn1 = data.get_data_conn_async::<MyDataConn>("foo").await?;
974 let _conn2 = data.get_data_conn_async::<MyDataConn>("bar").await?;
975 Ok(())
976 })
977 })
978 .await
979 .unwrap()
980 }
981
982 assert_eq!(
983 *logger.lock().unwrap(),
984 &[
985 "MyDataSrc::new 1",
986 "MyDataSrc::new 2",
987 "MyDataSrc::setup 1",
988 "MyDataSrc::setup 2",
989 "execute logic",
990 "MyDataSrc::create_data_conn 1",
991 "MyDataConn::new 1",
992 "MyDataSrc::create_data_conn 2",
993 "MyDataConn::new 2",
994 "MyDataConn::pre_commit 1",
995 "MyDataConn::pre_commit 2",
996 "MyDataConn::commit 1",
997 "MyDataConn::commit 2",
998 "MyDataConn::post_commit 1",
999 "MyDataConn::post_commit 2",
1000 "MyDataConn::close 1",
1001 "MyDataConn::drop 1",
1002 "MyDataConn::close 2",
1003 "MyDataConn::drop 2",
1004 "MyDataSrc::close 2",
1005 "MyDataSrc::drop 2",
1006 "MyDataSrc::close 1",
1007 "MyDataSrc::drop 1",
1008 ]
1009 );
1010 }
1011
1012 #[tokio::test]
1013 async fn test_txn_but_failed() {
1014 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1015 {
1016 let mut hub = DataHub::new();
1017
1018 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
1019 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
1020
1021 let logger_clone = logger.clone();
1022 if let Err(e) = hub
1023 .txn_async(move |data| {
1024 let logger_clone2 = logger_clone.clone();
1025 Box::pin(async move {
1026 logger_clone2
1027 .lock()
1028 .unwrap()
1029 .push("execute logic".to_string());
1030 let _conn1 = data.get_data_conn_async::<MyDataConn>("foo").await?;
1031 let _conn2 = data.get_data_conn_async::<MyDataConn>("bar").await?;
1032 Err(errs::Err::new("logic error"))
1033 })
1034 })
1035 .await
1036 {
1037 match e.reason::<&str>() {
1038 Ok(s) => assert_eq!(s, &"logic error"),
1039 _ => panic!(),
1040 }
1041 }
1042 }
1043
1044 assert_eq!(
1045 *logger.lock().unwrap(),
1046 &[
1047 "MyDataSrc::new 1",
1048 "MyDataSrc::new 2",
1049 "MyDataSrc::setup 1",
1050 "MyDataSrc::setup 2",
1051 "execute logic",
1052 "MyDataSrc::create_data_conn 1",
1053 "MyDataConn::new 1",
1054 "MyDataSrc::create_data_conn 2",
1055 "MyDataConn::new 2",
1056 "MyDataConn::rollback 1",
1057 "MyDataConn::rollback 2",
1058 "MyDataConn::close 1",
1059 "MyDataConn::drop 1",
1060 "MyDataConn::close 2",
1061 "MyDataConn::drop 2",
1062 "MyDataSrc::close 2",
1063 "MyDataSrc::drop 2",
1064 "MyDataSrc::close 1",
1065 "MyDataSrc::drop 1",
1066 ]
1067 );
1068 }
1069
1070 #[tokio::test]
1071 async fn test_txn_with_commit_order() {
1072 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1073 {
1074 let mut hub = DataHub::with_commit_order(&["bar", "foo"]);
1075
1076 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
1077 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
1078
1079 let logger_clone = logger.clone();
1080 hub.txn_async(move |data| {
1081 let logger_clone2 = logger_clone.clone();
1082 Box::pin(async move {
1083 logger_clone2
1084 .lock()
1085 .unwrap()
1086 .push("execute logic".to_string());
1087 let _conn1 = data.get_data_conn_async::<MyDataConn>("foo").await?;
1088 let _conn2 = data.get_data_conn_async::<MyDataConn>("bar").await?;
1089 Ok(())
1090 })
1091 })
1092 .await
1093 .unwrap();
1094 }
1095
1096 assert_eq!(
1097 *logger.lock().unwrap(),
1098 &[
1099 "MyDataSrc::new 1",
1100 "MyDataSrc::new 2",
1101 "MyDataSrc::setup 1",
1102 "MyDataSrc::setup 2",
1103 "execute logic",
1104 "MyDataSrc::create_data_conn 1",
1105 "MyDataConn::new 1",
1106 "MyDataSrc::create_data_conn 2",
1107 "MyDataConn::new 2",
1108 "MyDataConn::pre_commit 2",
1109 "MyDataConn::pre_commit 1",
1110 "MyDataConn::commit 2",
1111 "MyDataConn::commit 1",
1112 "MyDataConn::post_commit 2",
1113 "MyDataConn::post_commit 1",
1114 "MyDataConn::close 2",
1115 "MyDataConn::drop 2",
1116 "MyDataConn::close 1",
1117 "MyDataConn::drop 1",
1118 "MyDataSrc::close 2",
1119 "MyDataSrc::drop 2",
1120 "MyDataSrc::close 1",
1121 "MyDataSrc::drop 1",
1122 ]
1123 );
1124 }
1125
1126 #[tokio::test]
1127 async fn test_get_data_conn_and_failed() {
1128 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1129 {
1130 let mut hub = DataHub::new();
1131
1132 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
1133 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
1134
1135 let logger_clone = logger.clone();
1136 let err = hub
1137 .txn_async(move |data| {
1138 let logger_clone2 = logger_clone.clone();
1139 Box::pin(async move {
1140 logger_clone2
1141 .lock()
1142 .unwrap()
1143 .push("execute logic".to_string());
1144 let _conn1 = data.get_data_conn_async::<MyDataConn>("fxx").await?;
1145 Ok(())
1146 })
1147 })
1148 .await
1149 .unwrap_err();
1150
1151 match err.reason::<DataHubError>() {
1152 Ok(r) => match r {
1153 DataHubError::NoDataSrcToCreateDataConn {
1154 name,
1155 data_conn_type,
1156 } => {
1157 assert_eq!(name.as_ref(), "fxx");
1158 assert_eq!(
1159 data_conn_type,
1160 &"sabi::tokio::data_hub::tests_of_data_hub::MyDataConn"
1161 );
1162 }
1163 _ => panic!(),
1164 },
1165 _ => panic!(),
1166 }
1167 }
1168
1169 assert_eq!(
1170 *logger.lock().unwrap(),
1171 &[
1172 "MyDataSrc::new 1",
1173 "MyDataSrc::new 2",
1174 "MyDataSrc::setup 1",
1175 "MyDataSrc::setup 2",
1176 "execute logic",
1177 "MyDataSrc::close 2",
1178 "MyDataSrc::drop 2",
1179 "MyDataSrc::close 1",
1180 "MyDataSrc::drop 1",
1181 ]
1182 );
1183 }
1184}