1use super::data_src::{copy_global_data_srcs_to_map, create_data_conn_from_global_data_src_async};
6use super::{DataConn, DataConnContainer, DataConnManager, DataHub, DataSrc, DataSrcManager};
7
8use std::collections::HashMap;
9use std::future::Future;
10use std::pin::Pin;
11use std::sync::Arc;
12use std::{any, ptr};
13
14#[derive(Debug)]
16pub enum DataHubError {
17 FailToSetupLocalDataSrcs {
19 errors: Vec<(Arc<str>, errs::Err)>,
21 },
22
23 NoDataSrcToCreateDataConn {
26 name: Arc<str>,
28 data_conn_type: &'static str,
30 },
31}
32
33impl DataHub {
34 #[allow(clippy::new_without_default)]
39 pub fn new() -> Self {
40 let mut data_src_map = HashMap::new();
41 copy_global_data_srcs_to_map(&mut data_src_map);
42
43 Self {
44 local_data_src_manager: DataSrcManager::new(true),
45 data_src_map,
46 data_conn_manager: DataConnManager::new(),
47 fixed: false,
48 }
49 }
50
51 pub fn with_commit_order(names: &[&str]) -> Self {
61 let mut data_src_map = HashMap::new();
62 copy_global_data_srcs_to_map(&mut data_src_map);
63
64 Self {
65 local_data_src_manager: DataSrcManager::new(true),
66 data_src_map,
67 data_conn_manager: DataConnManager::with_commit_order(names),
68 fixed: false,
69 }
70 }
71
72 pub fn uses<S, C>(&mut self, name: impl Into<Arc<str>>, ds: S)
87 where
88 S: DataSrc<C> + 'static,
89 C: DataConn + 'static,
90 {
91 if self.fixed {
92 return;
93 }
94 self.local_data_src_manager.add(name, ds);
95 }
96
97 pub fn disuses(&mut self, name: impl AsRef<str>) {
106 if self.fixed {
107 return;
108 }
109 self.data_src_map.remove(name.as_ref());
110 self.local_data_src_manager.remove(name);
111 }
112
113 #[inline]
114 async fn begin_async(&mut self) -> errs::Result<()> {
115 self.fixed = true;
116
117 let mut errors = Vec::new();
118
119 self.local_data_src_manager.setup_async(&mut errors).await;
120 if errors.is_empty() {
121 self.local_data_src_manager
122 .copy_ds_ready_to_map(&mut self.data_src_map);
123 Ok(())
124 } else {
125 Err(errs::Err::new(DataHubError::FailToSetupLocalDataSrcs {
126 errors,
127 }))
128 }
129 }
130
131 #[inline]
132 async fn commit_async(&mut self) -> errs::Result<()> {
133 self.data_conn_manager.commit_async().await
134 }
135
136 #[inline]
137 async fn rollback_async(&mut self) {
138 self.data_conn_manager.rollback_async().await
139 }
140
141 #[inline]
142 fn end(&mut self) {
143 self.data_conn_manager.close();
144 self.fixed = false;
145 }
146
147 #[allow(clippy::doc_overindented_list_items)]
167 pub async fn run_async<F>(&mut self, mut logic_fn: F) -> errs::Result<()>
168 where
169 for<'a> F: FnMut(&'a mut DataHub) -> Pin<Box<dyn Future<Output = errs::Result<()>> + 'a>>,
170 {
171 let mut r = self.begin_async().await;
172 if r.is_ok() {
173 r = logic_fn(self).await;
174 }
175 self.end();
176 r
177 }
178
179 #[allow(clippy::doc_overindented_list_items)]
201 pub async fn txn_async<F>(&mut self, mut logic_fn: F) -> errs::Result<()>
202 where
203 for<'a> F: FnMut(&'a mut DataHub) -> Pin<Box<dyn Future<Output = errs::Result<()>> + 'a>>,
204 {
205 let mut r = self.begin_async().await;
206 if r.is_ok() {
207 r = logic_fn(self).await;
208 }
209 if r.is_ok() {
210 r = self.commit_async().await;
211 }
212 if r.is_err() {
213 self.rollback_async().await;
214 }
215 self.end();
216 r
217 }
218
219 pub async fn get_data_conn_async<C>(&mut self, name: impl AsRef<str>) -> errs::Result<&mut C>
239 where
240 C: DataConn + 'static,
241 {
242 if let Some(nnptr) = self.data_conn_manager.find_by_name(name.as_ref()) {
243 let typed_nnptr = DataConnManager::to_typed_ptr::<C>(&nnptr)?;
244 return Ok(unsafe { &mut (*typed_nnptr).data_conn });
245 }
246
247 if let Some((local, index)) = self.data_src_map.get(name.as_ref()) {
248 let boxed = if *local {
249 self.local_data_src_manager
250 .create_data_conn_async::<C>(*index, name.as_ref())
251 .await?
252 } else {
253 create_data_conn_from_global_data_src_async::<C>(*index, name.as_ref()).await?
254 };
255
256 let ptr = Box::into_raw(boxed);
257 if let Some(nnptr) = ptr::NonNull::new(ptr) {
258 self.data_conn_manager
259 .add(nnptr.cast::<DataConnContainer>());
260
261 let typed_ptr = ptr.cast::<DataConnContainer<C>>();
262 return Ok(unsafe { &mut (*typed_ptr).data_conn });
263 } else {
264 }
266 }
267
268 Err(errs::Err::new(DataHubError::NoDataSrcToCreateDataConn {
269 name: name.as_ref().into(),
270 data_conn_type: any::type_name::<C>(),
271 }))
272 }
273}
274
275#[macro_export]
276#[doc(hidden)]
277macro_rules! _logic {
278 ($f:expr) => {
279 |data| Box::pin($f(data))
280 };
281}
282
283#[cfg(test)]
284mod tests_of_data_hub {
285 use super::*;
286 use crate::tokio::AsyncGroup;
287 use std::sync::Mutex;
288
289 #[derive(Clone, Copy, PartialEq)]
290 enum Failure {
291 None,
292 FailToPreCommit,
293 FailToCommit,
294 FailToSetup,
295 FailToCreateDataConn,
296 }
297
298 struct MyDataConn {
299 id: i8,
300 failure: Failure,
301 committed: bool,
302 logger: Arc<Mutex<Vec<String>>>,
303 }
304 impl MyDataConn {
305 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, failure: Failure) -> Self {
306 logger
307 .lock()
308 .unwrap()
309 .push(format!("MyDataConn::new {}", id));
310 Self {
311 id,
312 failure,
313 committed: false,
314 logger,
315 }
316 }
317 }
318 impl Drop for MyDataConn {
319 fn drop(&mut self) {
320 self.logger
321 .lock()
322 .unwrap()
323 .push(format!("MyDataConn::drop {}", self.id));
324 }
325 }
326 impl DataConn for MyDataConn {
327 async fn pre_commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
328 if self.failure == Failure::FailToPreCommit {
329 self.logger
330 .lock()
331 .unwrap()
332 .push(format!("MyDataConn::pre_commit {} failed", self.id));
333 Err(errs::Err::new("pre commit error"))
334 } else {
335 self.logger
336 .lock()
337 .unwrap()
338 .push(format!("MyDataConn::pre_commit {}", self.id));
339 Ok(())
340 }
341 }
342 async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
343 if self.failure == Failure::FailToCommit {
344 self.logger
345 .lock()
346 .unwrap()
347 .push(format!("MyDataConn::commit {} failed", self.id));
348 Err(errs::Err::new("commit error"))
349 } else {
350 self.logger
351 .lock()
352 .unwrap()
353 .push(format!("MyDataConn::commit {}", self.id));
354 self.committed = true;
355 Ok(())
356 }
357 }
358 async fn post_commit_async(&mut self, _ag: &mut AsyncGroup) {
359 self.logger
360 .lock()
361 .unwrap()
362 .push(format!("MyDataConn::post_commit {}", self.id));
363 }
364 async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {
365 self.logger
366 .lock()
367 .unwrap()
368 .push(format!("MyDataConn::rollback {}", self.id));
369 }
370 async fn force_back_async(&mut self, _ag: &mut AsyncGroup) {
371 self.logger
372 .lock()
373 .unwrap()
374 .push(format!("MyDataConn::force_back {}", self.id));
375 }
376 fn close(&mut self) {
377 self.logger
378 .lock()
379 .unwrap()
380 .push(format!("MyDataConn::close {}", self.id));
381 }
382 }
383
384 struct MyDataSrc {
385 id: i8,
386 failure: Failure,
387 logger: Arc<Mutex<Vec<String>>>,
388 }
389 impl MyDataSrc {
390 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, failure: Failure) -> Self {
391 logger
392 .lock()
393 .unwrap()
394 .push(format!("MyDataSrc::new {}", id));
395 Self {
396 id,
397 failure,
398 logger,
399 }
400 }
401 }
402 impl Drop for MyDataSrc {
403 fn drop(&mut self) {
404 self.logger
405 .lock()
406 .unwrap()
407 .push(format!("MyDataSrc::drop {}", self.id));
408 }
409 }
410 impl DataSrc<MyDataConn> for MyDataSrc {
411 async fn setup_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
412 if self.failure == Failure::FailToSetup {
413 self.logger
414 .lock()
415 .unwrap()
416 .push(format!("MyDataSrc::setup {} failed", self.id));
417 Err(errs::Err::new("setup error".to_string()))
418 } else {
419 self.logger
420 .lock()
421 .unwrap()
422 .push(format!("MyDataSrc::setup {}", self.id));
423 Ok(())
424 }
425 }
426 fn close(&mut self) {
427 self.logger
428 .lock()
429 .unwrap()
430 .push(format!("MyDataSrc::close {}", self.id));
431 }
432 async fn create_data_conn_async(&mut self) -> errs::Result<Box<MyDataConn>> {
433 if self.failure == Failure::FailToCreateDataConn {
434 self.logger
435 .lock()
436 .unwrap()
437 .push(format!("MyDataSrc::create_data_conn {} failed", self.id));
438 return Err(errs::Err::new("eeee".to_string()));
439 }
440 {
441 self.logger
442 .lock()
443 .unwrap()
444 .push(format!("MyDataSrc::create_data_conn {}", self.id));
445 }
446 let conn = MyDataConn::new(self.id, self.logger.clone(), self.failure);
447 Ok(Box::new(conn))
448 }
449 }
450
451 #[test]
452 fn test_new() {
453 let hub = DataHub::new();
454 assert!(hub.local_data_src_manager.vec_unready.is_empty());
455 assert!(hub.local_data_src_manager.vec_ready.is_empty());
456 assert!(hub.local_data_src_manager.local);
457 assert!(hub.data_src_map.is_empty());
458 assert!(hub.data_conn_manager.vec.is_empty());
459 assert!(hub.data_conn_manager.index_map.is_empty());
460 assert!(!hub.fixed);
461 }
462
463 #[test]
464 fn test_with_commit_order() {
465 let hub = DataHub::with_commit_order(&["bar", "qux", "foo"]);
466 assert!(hub.local_data_src_manager.vec_unready.is_empty());
467 assert!(hub.local_data_src_manager.vec_ready.is_empty());
468 assert!(hub.local_data_src_manager.local);
469 assert!(hub.data_src_map.is_empty());
470 assert_eq!(hub.data_conn_manager.vec.len(), 3);
471 assert_eq!(hub.data_conn_manager.index_map.len(), 3);
472 assert!(!hub.fixed);
473 }
474
475 #[tokio::test]
476 async fn test_uses_and_ok() {
477 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
478
479 let mut hub = DataHub::new();
480 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
481 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
482
483 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
484 assert!(hub.local_data_src_manager.vec_ready.is_empty());
485 assert!(hub.local_data_src_manager.local);
486 assert!(hub.data_src_map.is_empty());
487 assert_eq!(hub.data_conn_manager.vec.len(), 0);
488 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
489 assert!(!hub.fixed);
490
491 assert!(hub.begin_async().await.is_ok());
492
493 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
494 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
495 assert!(hub.local_data_src_manager.local);
496 assert_eq!(hub.data_src_map.len(), 2);
497 assert_eq!(hub.data_conn_manager.vec.len(), 0);
498 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
499 assert!(hub.fixed);
500 }
501
502 #[tokio::test]
503 async fn test_uses_but_already_fixed() {
504 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
505
506 let mut hub = DataHub::new();
507 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
508
509 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 1);
510 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 0);
511 assert!(hub.local_data_src_manager.local);
512 assert_eq!(hub.data_src_map.len(), 0);
513 assert_eq!(hub.data_conn_manager.vec.len(), 0);
514 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
515 assert!(!hub.fixed);
516
517 assert!(hub.begin_async().await.is_ok());
518
519 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
520 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 1);
521 assert!(hub.local_data_src_manager.local);
522 assert_eq!(hub.data_src_map.len(), 1);
523 assert_eq!(hub.data_conn_manager.vec.len(), 0);
524 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
525 assert!(hub.fixed);
526
527 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
528
529 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
530 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 1);
531 assert!(hub.local_data_src_manager.local);
532 assert_eq!(hub.data_src_map.len(), 1);
533 assert_eq!(hub.data_conn_manager.vec.len(), 0);
534 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
535 assert!(hub.fixed);
536 }
537
538 #[test]
539 fn test_disuses_and_ok() {
540 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
541
542 let mut hub = DataHub::new();
543 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
544 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
545
546 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
547 assert!(hub.local_data_src_manager.vec_ready.is_empty());
548 assert!(hub.local_data_src_manager.local);
549 assert!(hub.data_src_map.is_empty());
550 assert_eq!(hub.data_conn_manager.vec.len(), 0);
551 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
552 assert!(!hub.fixed);
553
554 hub.disuses("foo");
555
556 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 1);
557 assert!(hub.local_data_src_manager.vec_ready.is_empty());
558 assert!(hub.local_data_src_manager.local);
559 assert!(hub.data_src_map.is_empty());
560 assert_eq!(hub.data_conn_manager.vec.len(), 0);
561 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
562 assert!(!hub.fixed);
563
564 hub.disuses("bar");
565
566 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
567 assert!(hub.local_data_src_manager.vec_ready.is_empty());
568 assert!(hub.local_data_src_manager.local);
569 assert!(hub.data_src_map.is_empty());
570 assert_eq!(hub.data_conn_manager.vec.len(), 0);
571 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
572 assert!(!hub.fixed);
573 }
574
575 #[tokio::test]
576 async fn test_disuses_and_fix() {
577 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
578
579 let mut hub = DataHub::new();
580 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
581 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
582
583 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
584 assert!(hub.local_data_src_manager.vec_ready.is_empty());
585 assert!(hub.local_data_src_manager.local);
586 assert!(hub.data_src_map.is_empty());
587 assert_eq!(hub.data_conn_manager.vec.len(), 0);
588 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
589 assert!(!hub.fixed);
590
591 hub.disuses("foo");
592
593 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 1);
594 assert!(hub.local_data_src_manager.vec_ready.is_empty());
595 assert!(hub.local_data_src_manager.local);
596 assert!(hub.data_src_map.is_empty());
597 assert_eq!(hub.data_conn_manager.vec.len(), 0);
598 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
599 assert!(!hub.fixed);
600
601 hub.disuses("bar");
602
603 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
604 assert!(hub.local_data_src_manager.vec_ready.is_empty());
605 assert!(hub.local_data_src_manager.local);
606 assert!(hub.data_src_map.is_empty());
607 assert_eq!(hub.data_conn_manager.vec.len(), 0);
608 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
609 assert!(!hub.fixed);
610
611 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
612 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
613
614 assert!(hub.begin_async().await.is_ok());
615
616 assert!(hub.local_data_src_manager.vec_unready.is_empty());
617 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
618 assert!(hub.local_data_src_manager.local);
619 assert_eq!(hub.data_src_map.len(), 2);
620 assert_eq!(hub.data_conn_manager.vec.len(), 0);
621 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
622 assert!(hub.fixed);
623
624 hub.uses("baz", MyDataSrc::new(3, logger.clone(), Failure::None));
625
626 assert!(hub.local_data_src_manager.vec_unready.is_empty());
627 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
628 assert!(hub.local_data_src_manager.local);
629 assert_eq!(hub.data_src_map.len(), 2);
630 assert_eq!(hub.data_conn_manager.vec.len(), 0);
631 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
632 assert!(hub.fixed);
633
634 hub.disuses("bar");
635
636 assert!(hub.local_data_src_manager.vec_unready.is_empty());
637 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
638 assert!(hub.local_data_src_manager.local);
639 assert_eq!(hub.data_src_map.len(), 2);
640 assert_eq!(hub.data_conn_manager.vec.len(), 0);
641 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
642 assert!(hub.fixed);
643
644 hub.end();
645
646 assert!(hub.local_data_src_manager.vec_unready.is_empty());
647 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
648 assert!(hub.local_data_src_manager.local);
649 assert_eq!(hub.data_src_map.len(), 2);
650 assert_eq!(hub.data_conn_manager.vec.len(), 0);
651 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
652 assert!(!hub.fixed);
653
654 hub.disuses("bar");
655
656 assert!(hub.local_data_src_manager.vec_unready.is_empty());
657 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 1);
658 assert!(hub.local_data_src_manager.local);
659 assert_eq!(hub.data_src_map.len(), 1);
660 assert_eq!(hub.data_conn_manager.vec.len(), 0);
661 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
662 assert!(!hub.fixed);
663
664 hub.disuses("foo");
665
666 assert!(hub.local_data_src_manager.vec_unready.is_empty());
667 assert!(hub.local_data_src_manager.vec_ready.is_empty());
668 assert!(hub.local_data_src_manager.local);
669 assert_eq!(hub.data_src_map.len(), 0);
670 assert_eq!(hub.data_conn_manager.vec.len(), 0);
671 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
672 assert!(!hub.fixed);
673 }
674
675 #[tokio::test]
676 async fn test_begin_if_empty() {
677 let mut hub = DataHub::new();
678 assert!(hub.begin_async().await.is_ok());
679
680 assert!(hub.local_data_src_manager.vec_unready.is_empty());
681 assert!(hub.local_data_src_manager.vec_ready.is_empty());
682 assert!(hub.local_data_src_manager.local);
683 assert_eq!(hub.data_src_map.len(), 0);
684 assert_eq!(hub.data_conn_manager.vec.len(), 0);
685 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
686 assert!(hub.fixed);
687
688 hub.end();
689
690 assert!(hub.local_data_src_manager.vec_unready.is_empty());
691 assert!(hub.local_data_src_manager.vec_ready.is_empty());
692 assert!(hub.local_data_src_manager.local);
693 assert_eq!(hub.data_src_map.len(), 0);
694 assert_eq!(hub.data_conn_manager.vec.len(), 0);
695 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
696 assert!(!hub.fixed);
697 }
698
699 #[tokio::test]
700 async fn test_begin_and_ok() {
701 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
702
703 {
704 let mut hub = DataHub::new();
705
706 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
707 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
708
709 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
710 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 0);
711 assert_eq!(hub.local_data_src_manager.local, true);
712 assert_eq!(hub.data_src_map.len(), 0);
713 assert_eq!(hub.data_conn_manager.vec.len(), 0);
714 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
715 assert_eq!(hub.fixed, false);
716
717 assert_eq!(hub.begin_async().await.is_ok(), true);
718
719 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
720 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
721 assert_eq!(hub.local_data_src_manager.local, true);
722 assert_eq!(hub.data_src_map.len(), 2);
723 assert_eq!(hub.data_conn_manager.vec.len(), 0);
724 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
725 assert_eq!(hub.fixed, true);
726
727 hub.end();
728
729 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
730 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
731 assert_eq!(hub.local_data_src_manager.local, true);
732 assert_eq!(hub.data_src_map.len(), 2);
733 assert_eq!(hub.data_conn_manager.vec.len(), 0);
734 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
735 assert_eq!(hub.fixed, false);
736 }
737
738 assert_eq!(
739 *logger.lock().unwrap(),
740 &[
741 "MyDataSrc::new 1",
742 "MyDataSrc::new 2",
743 "MyDataSrc::setup 1",
744 "MyDataSrc::setup 2",
745 "MyDataSrc::close 2",
746 "MyDataSrc::drop 2",
747 "MyDataSrc::close 1",
748 "MyDataSrc::drop 1",
749 ]
750 );
751 }
752
753 #[tokio::test]
754 async fn test_begin_but_failed() {
755 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
756
757 {
758 let mut hub = DataHub::new();
759
760 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
761 hub.uses(
762 "bar",
763 MyDataSrc::new(2, logger.clone(), Failure::FailToSetup),
764 );
765 hub.uses("baz", MyDataSrc::new(3, logger.clone(), Failure::None));
766
767 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 3);
768 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 0);
769 assert_eq!(hub.local_data_src_manager.local, true);
770 assert_eq!(hub.data_src_map.len(), 0);
771 assert_eq!(hub.data_conn_manager.vec.len(), 0);
772 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
773 assert_eq!(hub.fixed, false);
774
775 if let Err(err) = hub.begin_async().await {
776 match err.reason::<DataHubError>() {
777 Ok(DataHubError::FailToSetupLocalDataSrcs { errors }) => {
778 assert_eq!(errors.len(), 1);
779 assert_eq!(errors[0].0, "bar".into());
780 assert_eq!(errors[0].1.reason::<String>().unwrap(), "setup error");
781 }
782 _ => panic!(),
783 }
784 } else {
785 panic!();
786 }
787
788 hub.end();
789 }
790
791 assert_eq!(
792 *logger.lock().unwrap(),
793 &[
794 "MyDataSrc::new 1",
795 "MyDataSrc::new 2",
796 "MyDataSrc::new 3",
797 "MyDataSrc::setup 1",
798 "MyDataSrc::setup 2 failed",
799 "MyDataSrc::close 2",
800 "MyDataSrc::close 1",
801 "MyDataSrc::drop 3",
802 "MyDataSrc::drop 2",
803 "MyDataSrc::drop 1",
804 ]
805 );
806 }
807
808 #[tokio::test]
809 async fn test_run_and_ok() {
810 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
811 {
812 let mut hub = DataHub::new();
813
814 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
815 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
816
817 let logger_clone = logger.clone();
818 assert!(hub
819 .run_async(|_data| {
820 let logger_clone2 = logger_clone.clone();
821 Box::pin(async move {
822 logger_clone2
823 .lock()
824 .unwrap()
825 .push("execute logic".to_string());
826 Ok(())
827 })
828 })
829 .await
830 .is_ok());
831 }
832
833 assert_eq!(
834 *logger.lock().unwrap(),
835 &[
836 "MyDataSrc::new 1",
837 "MyDataSrc::new 2",
838 "MyDataSrc::setup 1",
839 "MyDataSrc::setup 2",
840 "execute logic",
841 "MyDataSrc::close 2",
842 "MyDataSrc::drop 2",
843 "MyDataSrc::close 1",
844 "MyDataSrc::drop 1",
845 ]
846 );
847 }
848
849 #[tokio::test]
850 async fn test_run_but_failed() {
851 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
852 {
853 let mut hub = DataHub::new();
854
855 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
856 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
857
858 let logger_clone = logger.clone();
859 if let Err(err) = hub
860 .run_async(|_data| {
861 let logger_clone2 = logger_clone.clone();
862 Box::pin(async move {
863 logger_clone2
864 .lock()
865 .unwrap()
866 .push("execute logic but fail".to_string());
867 Err(errs::Err::new("logic error".to_string()))
868 })
869 })
870 .await
871 {
872 match err.reason::<String>() {
873 Ok(s) => assert_eq!(s, "logic error"),
874 _ => panic!(),
875 }
876 } else {
877 panic!();
878 }
879 }
880
881 assert_eq!(
882 *logger.lock().unwrap(),
883 &[
884 "MyDataSrc::new 1",
885 "MyDataSrc::new 2",
886 "MyDataSrc::setup 1",
887 "MyDataSrc::setup 2",
888 "execute logic but fail",
889 "MyDataSrc::close 2",
890 "MyDataSrc::drop 2",
891 "MyDataSrc::close 1",
892 "MyDataSrc::drop 1",
893 ]
894 );
895 }
896
897 #[tokio::test]
898 async fn test_txn_and_no_data_access_and_ok() {
899 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
900 {
901 let mut hub = DataHub::new();
902
903 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
904 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
905
906 let logger_clone = logger.clone();
907 assert!(hub
908 .txn_async(|_data| {
909 let logger_clone2 = logger_clone.clone();
910 Box::pin(async move {
911 logger_clone2
912 .lock()
913 .unwrap()
914 .push("execute logic".to_string());
915 Ok(())
916 })
917 })
918 .await
919 .is_ok());
920 }
921
922 assert_eq!(
923 *logger.lock().unwrap(),
924 &[
925 "MyDataSrc::new 1",
926 "MyDataSrc::new 2",
927 "MyDataSrc::setup 1",
928 "MyDataSrc::setup 2",
929 "execute logic",
930 "MyDataSrc::close 2",
931 "MyDataSrc::drop 2",
932 "MyDataSrc::close 1",
933 "MyDataSrc::drop 1",
934 ]
935 );
936 }
937
938 #[tokio::test]
939 async fn test_txn_and_has_data_access_and_ok() {
940 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
941 {
942 let mut hub = DataHub::new();
943
944 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
945 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
946
947 let logger_clone = logger.clone();
948 hub.txn_async(move |data| {
949 let logger_clone2 = logger_clone.clone();
950 Box::pin(async move {
951 logger_clone2
952 .lock()
953 .unwrap()
954 .push("execute logic".to_string());
955 let _conn1 = data.get_data_conn_async::<MyDataConn>("foo").await?;
956 let _conn2 = data.get_data_conn_async::<MyDataConn>("bar").await?;
957 Ok(())
958 })
959 })
960 .await
961 .unwrap()
962 }
963
964 assert_eq!(
965 *logger.lock().unwrap(),
966 &[
967 "MyDataSrc::new 1",
968 "MyDataSrc::new 2",
969 "MyDataSrc::setup 1",
970 "MyDataSrc::setup 2",
971 "execute logic",
972 "MyDataSrc::create_data_conn 1",
973 "MyDataConn::new 1",
974 "MyDataSrc::create_data_conn 2",
975 "MyDataConn::new 2",
976 "MyDataConn::pre_commit 1",
977 "MyDataConn::pre_commit 2",
978 "MyDataConn::commit 1",
979 "MyDataConn::commit 2",
980 "MyDataConn::post_commit 1",
981 "MyDataConn::post_commit 2",
982 "MyDataConn::close 1",
983 "MyDataConn::drop 1",
984 "MyDataConn::close 2",
985 "MyDataConn::drop 2",
986 "MyDataSrc::close 2",
987 "MyDataSrc::drop 2",
988 "MyDataSrc::close 1",
989 "MyDataSrc::drop 1",
990 ]
991 );
992 }
993
994 #[tokio::test]
995 async fn test_txn_but_failed() {
996 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
997 {
998 let mut hub = DataHub::new();
999
1000 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
1001 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
1002
1003 let logger_clone = logger.clone();
1004 if let Err(e) = hub
1005 .txn_async(move |data| {
1006 let logger_clone2 = logger_clone.clone();
1007 Box::pin(async move {
1008 logger_clone2
1009 .lock()
1010 .unwrap()
1011 .push("execute logic".to_string());
1012 let _conn1 = data.get_data_conn_async::<MyDataConn>("foo").await?;
1013 let _conn2 = data.get_data_conn_async::<MyDataConn>("bar").await?;
1014 Err(errs::Err::new("logic error"))
1015 })
1016 })
1017 .await
1018 {
1019 match e.reason::<&str>() {
1020 Ok(s) => assert_eq!(s, &"logic error"),
1021 _ => panic!(),
1022 }
1023 }
1024 }
1025
1026 assert_eq!(
1027 *logger.lock().unwrap(),
1028 &[
1029 "MyDataSrc::new 1",
1030 "MyDataSrc::new 2",
1031 "MyDataSrc::setup 1",
1032 "MyDataSrc::setup 2",
1033 "execute logic",
1034 "MyDataSrc::create_data_conn 1",
1035 "MyDataConn::new 1",
1036 "MyDataSrc::create_data_conn 2",
1037 "MyDataConn::new 2",
1038 "MyDataConn::rollback 1",
1039 "MyDataConn::rollback 2",
1040 "MyDataConn::close 1",
1041 "MyDataConn::drop 1",
1042 "MyDataConn::close 2",
1043 "MyDataConn::drop 2",
1044 "MyDataSrc::close 2",
1045 "MyDataSrc::drop 2",
1046 "MyDataSrc::close 1",
1047 "MyDataSrc::drop 1",
1048 ]
1049 );
1050 }
1051
1052 #[tokio::test]
1053 async fn test_txn_with_commit_order() {
1054 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1055 {
1056 let mut hub = DataHub::with_commit_order(&["bar", "foo"]);
1057
1058 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
1059 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
1060
1061 let logger_clone = logger.clone();
1062 hub.txn_async(move |data| {
1063 let logger_clone2 = logger_clone.clone();
1064 Box::pin(async move {
1065 logger_clone2
1066 .lock()
1067 .unwrap()
1068 .push("execute logic".to_string());
1069 let _conn1 = data.get_data_conn_async::<MyDataConn>("foo").await?;
1070 let _conn2 = data.get_data_conn_async::<MyDataConn>("bar").await?;
1071 Ok(())
1072 })
1073 })
1074 .await
1075 .unwrap();
1076 }
1077
1078 assert_eq!(
1079 *logger.lock().unwrap(),
1080 &[
1081 "MyDataSrc::new 1",
1082 "MyDataSrc::new 2",
1083 "MyDataSrc::setup 1",
1084 "MyDataSrc::setup 2",
1085 "execute logic",
1086 "MyDataSrc::create_data_conn 1",
1087 "MyDataConn::new 1",
1088 "MyDataSrc::create_data_conn 2",
1089 "MyDataConn::new 2",
1090 "MyDataConn::pre_commit 2",
1091 "MyDataConn::pre_commit 1",
1092 "MyDataConn::commit 2",
1093 "MyDataConn::commit 1",
1094 "MyDataConn::post_commit 2",
1095 "MyDataConn::post_commit 1",
1096 "MyDataConn::close 2",
1097 "MyDataConn::drop 2",
1098 "MyDataConn::close 1",
1099 "MyDataConn::drop 1",
1100 "MyDataSrc::close 2",
1101 "MyDataSrc::drop 2",
1102 "MyDataSrc::close 1",
1103 "MyDataSrc::drop 1",
1104 ]
1105 );
1106 }
1107
1108 #[tokio::test]
1109 async fn test_get_data_conn_and_failed() {
1110 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1111 {
1112 let mut hub = DataHub::new();
1113
1114 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
1115 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
1116
1117 let logger_clone = logger.clone();
1118 let err = hub
1119 .txn_async(move |data| {
1120 let logger_clone2 = logger_clone.clone();
1121 Box::pin(async move {
1122 logger_clone2
1123 .lock()
1124 .unwrap()
1125 .push("execute logic".to_string());
1126 let _conn1 = data.get_data_conn_async::<MyDataConn>("fxx").await?;
1127 Ok(())
1128 })
1129 })
1130 .await
1131 .unwrap_err();
1132
1133 match err.reason::<DataHubError>() {
1134 Ok(r) => match r {
1135 DataHubError::NoDataSrcToCreateDataConn {
1136 name,
1137 data_conn_type,
1138 } => {
1139 assert_eq!(name.as_ref(), "fxx");
1140 assert_eq!(
1141 data_conn_type,
1142 &"sabi::tokio::data_hub::tests_of_data_hub::MyDataConn"
1143 );
1144 }
1145 _ => panic!(),
1146 },
1147 _ => panic!(),
1148 }
1149 }
1150
1151 assert_eq!(
1152 *logger.lock().unwrap(),
1153 &[
1154 "MyDataSrc::new 1",
1155 "MyDataSrc::new 2",
1156 "MyDataSrc::setup 1",
1157 "MyDataSrc::setup 2",
1158 "execute logic",
1159 "MyDataSrc::close 2",
1160 "MyDataSrc::drop 2",
1161 "MyDataSrc::close 1",
1162 "MyDataSrc::drop 1",
1163 ]
1164 );
1165 }
1166}