1use super::data_src::{copy_global_data_srcs_to_map, create_data_conn_from_global_data_src_async};
6use super::{DataConn, DataConnContainer, DataConnManager, DataHub, DataSrc, DataSrcManager};
7use crate::SendSyncNonNull;
8
9use std::collections::HashMap;
10use std::future::Future;
11use std::pin::Pin;
12use std::sync::Arc;
13use std::{any, ptr};
14
15#[derive(Debug)]
17pub enum DataHubError {
18 FailToSetupLocalDataSrcs {
20 errors: Vec<(Arc<str>, errs::Err)>,
22 },
23
24 NoDataSrcToCreateDataConn {
27 name: Arc<str>,
29 data_conn_type: &'static str,
31 },
32}
33
34impl DataHub {
35 #[allow(clippy::new_without_default)]
40 pub fn new() -> Self {
41 let mut data_src_map = HashMap::new();
42 copy_global_data_srcs_to_map(&mut data_src_map);
43
44 Self {
45 local_data_src_manager: DataSrcManager::new(true),
46 data_src_map,
47 data_conn_manager: DataConnManager::new(),
48 fixed: false,
49 }
50 }
51
52 pub fn with_commit_order(names: &[&str]) -> Self {
62 let mut data_src_map = HashMap::new();
63 copy_global_data_srcs_to_map(&mut data_src_map);
64
65 Self {
66 local_data_src_manager: DataSrcManager::new(true),
67 data_src_map,
68 data_conn_manager: DataConnManager::with_commit_order(names),
69 fixed: false,
70 }
71 }
72
73 pub fn uses<S, C>(&mut self, name: impl Into<Arc<str>>, ds: S)
89 where
90 S: DataSrc<C> + 'static,
91 C: DataConn + 'static,
92 {
93 if self.fixed {
94 return;
95 }
96 self.local_data_src_manager.add(name, ds);
97 }
98
99 pub fn disuses(&mut self, name: impl AsRef<str>) {
108 if self.fixed {
109 return;
110 }
111 self.data_src_map.remove(name.as_ref());
112 self.local_data_src_manager.remove(name);
113 }
114
115 #[inline]
116 async fn begin_async(&mut self) -> errs::Result<()> {
117 self.fixed = true;
118
119 let mut errors = Vec::new();
120
121 self.local_data_src_manager.setup_async(&mut errors).await;
122 if errors.is_empty() {
123 self.local_data_src_manager
124 .copy_ds_ready_to_map(&mut self.data_src_map);
125 Ok(())
126 } else {
127 Err(errs::Err::new(DataHubError::FailToSetupLocalDataSrcs {
128 errors,
129 }))
130 }
131 }
132
133 #[inline]
134 async fn commit_async(&mut self) -> errs::Result<()> {
135 self.data_conn_manager.commit_async().await
136 }
137
138 #[inline]
139 async fn rollback_async(&mut self) {
140 self.data_conn_manager.rollback_async().await
141 }
142
143 #[inline]
144 fn end(&mut self) {
145 self.data_conn_manager.close();
146 self.fixed = false;
147 }
148
149 #[allow(clippy::doc_overindented_list_items)]
170 pub async fn run_async<F>(&mut self, mut logic_fn: F) -> errs::Result<()>
171 where
172 for<'a> F:
173 FnMut(&'a mut DataHub) -> Pin<Box<dyn Future<Output = errs::Result<()>> + Send + 'a>>,
174 {
175 let mut r = self.begin_async().await;
176 if r.is_ok() {
177 r = logic_fn(self).await;
178 }
179 self.end();
180 r
181 }
182
183 #[allow(clippy::doc_overindented_list_items)]
206 pub async fn txn_async<F>(&mut self, mut logic_fn: F) -> errs::Result<()>
207 where
208 for<'a> F:
209 FnMut(&'a mut DataHub) -> Pin<Box<dyn Future<Output = errs::Result<()>> + Send + 'a>>,
210 {
211 let mut r = self.begin_async().await;
212 if r.is_ok() {
213 r = logic_fn(self).await;
214 }
215 if r.is_ok() {
216 r = self.commit_async().await;
217 }
218 if r.is_err() {
219 self.rollback_async().await;
220 }
221 self.end();
222 r
223 }
224
225 pub async fn get_data_conn_async<C>(&mut self, name: impl AsRef<str>) -> errs::Result<&mut C>
245 where
246 C: DataConn + 'static,
247 {
248 if let Some(nnptr) = self.data_conn_manager.find_by_name(name.as_ref()) {
249 let typed_nnptr = DataConnManager::to_typed_ptr::<C>(&nnptr)?;
250 return Ok(unsafe { &mut (*typed_nnptr).data_conn });
251 }
252
253 if let Some((local, index)) = self.data_src_map.get(name.as_ref()) {
254 let boxed = if *local {
255 self.local_data_src_manager
256 .create_data_conn_async::<C>(*index, name.as_ref())
257 .await?
258 } else {
259 create_data_conn_from_global_data_src_async::<C>(*index, name.as_ref()).await?
260 };
261
262 let ptr = Box::into_raw(boxed);
263 if let Some(nnptr) = ptr::NonNull::new(ptr) {
264 let ssnnptr = SendSyncNonNull::new(nnptr);
265 self.data_conn_manager.add(ssnnptr);
266
267 let typed_ptr = ptr.cast::<DataConnContainer<C>>();
268 return Ok(unsafe { &mut (*typed_ptr).data_conn });
269 } else {
270 }
272 }
273
274 Err(errs::Err::new(DataHubError::NoDataSrcToCreateDataConn {
275 name: name.as_ref().into(),
276 data_conn_type: any::type_name::<C>(),
277 }))
278 }
279}
280
281#[macro_export]
282#[doc(hidden)]
283macro_rules! _logic {
284 ($f:expr) => {
285 |data| {
286 let fut: std::pin::Pin<Box<dyn std::future::Future<Output = errs::Result<()>> + Send>> =
287 Box::pin(async move { $f(data).await });
288 fut
289 }
290 };
291}
292
293#[cfg(test)]
294mod tests_of_data_hub {
295 use super::*;
296 use crate::tokio::AsyncGroup;
297 use std::sync::Mutex;
298
299 #[derive(Clone, Copy, PartialEq)]
300 enum Failure {
301 None,
302 FailToPreCommit,
303 FailToCommit,
304 FailToSetup,
305 FailToCreateDataConn,
306 }
307
308 struct MyDataConn {
309 id: i8,
310 failure: Failure,
311 committed: bool,
312 logger: Arc<Mutex<Vec<String>>>,
313 }
314 impl MyDataConn {
315 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, failure: Failure) -> Self {
316 logger
317 .lock()
318 .unwrap()
319 .push(format!("MyDataConn::new {}", id));
320 Self {
321 id,
322 failure,
323 committed: false,
324 logger,
325 }
326 }
327 }
328 impl Drop for MyDataConn {
329 fn drop(&mut self) {
330 self.logger
331 .lock()
332 .unwrap()
333 .push(format!("MyDataConn::drop {}", self.id));
334 }
335 }
336 impl DataConn for MyDataConn {
337 async fn pre_commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
338 if self.failure == Failure::FailToPreCommit {
339 self.logger
340 .lock()
341 .unwrap()
342 .push(format!("MyDataConn::pre_commit {} failed", self.id));
343 Err(errs::Err::new("pre commit error"))
344 } else {
345 self.logger
346 .lock()
347 .unwrap()
348 .push(format!("MyDataConn::pre_commit {}", self.id));
349 Ok(())
350 }
351 }
352 async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
353 if self.failure == Failure::FailToCommit {
354 self.logger
355 .lock()
356 .unwrap()
357 .push(format!("MyDataConn::commit {} failed", self.id));
358 Err(errs::Err::new("commit error"))
359 } else {
360 self.logger
361 .lock()
362 .unwrap()
363 .push(format!("MyDataConn::commit {}", self.id));
364 self.committed = true;
365 Ok(())
366 }
367 }
368 async fn post_commit_async(&mut self, _ag: &mut AsyncGroup) {
369 self.logger
370 .lock()
371 .unwrap()
372 .push(format!("MyDataConn::post_commit {}", self.id));
373 }
374 async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {
375 self.logger
376 .lock()
377 .unwrap()
378 .push(format!("MyDataConn::rollback {}", self.id));
379 }
380 async fn force_back_async(&mut self, _ag: &mut AsyncGroup) {
381 self.logger
382 .lock()
383 .unwrap()
384 .push(format!("MyDataConn::force_back {}", self.id));
385 }
386 fn close(&mut self) {
387 self.logger
388 .lock()
389 .unwrap()
390 .push(format!("MyDataConn::close {}", self.id));
391 }
392 }
393
394 struct MyDataSrc {
395 id: i8,
396 failure: Failure,
397 logger: Arc<Mutex<Vec<String>>>,
398 }
399 impl MyDataSrc {
400 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, failure: Failure) -> Self {
401 logger
402 .lock()
403 .unwrap()
404 .push(format!("MyDataSrc::new {}", id));
405 Self {
406 id,
407 failure,
408 logger,
409 }
410 }
411 }
412 impl Drop for MyDataSrc {
413 fn drop(&mut self) {
414 self.logger
415 .lock()
416 .unwrap()
417 .push(format!("MyDataSrc::drop {}", self.id));
418 }
419 }
420 impl DataSrc<MyDataConn> for MyDataSrc {
421 async fn setup_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
422 if self.failure == Failure::FailToSetup {
423 self.logger
424 .lock()
425 .unwrap()
426 .push(format!("MyDataSrc::setup {} failed", self.id));
427 Err(errs::Err::new("setup error".to_string()))
428 } else {
429 self.logger
430 .lock()
431 .unwrap()
432 .push(format!("MyDataSrc::setup {}", self.id));
433 Ok(())
434 }
435 }
436 fn close(&mut self) {
437 self.logger
438 .lock()
439 .unwrap()
440 .push(format!("MyDataSrc::close {}", self.id));
441 }
442 async fn create_data_conn_async(&mut self) -> errs::Result<Box<MyDataConn>> {
443 if self.failure == Failure::FailToCreateDataConn {
444 self.logger
445 .lock()
446 .unwrap()
447 .push(format!("MyDataSrc::create_data_conn {} failed", self.id));
448 return Err(errs::Err::new("eeee".to_string()));
449 }
450 {
451 self.logger
452 .lock()
453 .unwrap()
454 .push(format!("MyDataSrc::create_data_conn {}", self.id));
455 }
456 let conn = MyDataConn::new(self.id, self.logger.clone(), self.failure);
457 Ok(Box::new(conn))
458 }
459 }
460
461 #[test]
462 fn test_new() {
463 let hub = DataHub::new();
464 assert!(hub.local_data_src_manager.vec_unready.is_empty());
465 assert!(hub.local_data_src_manager.vec_ready.is_empty());
466 assert!(hub.local_data_src_manager.local);
467 assert!(hub.data_src_map.is_empty());
468 assert!(hub.data_conn_manager.vec.is_empty());
469 assert!(hub.data_conn_manager.index_map.is_empty());
470 assert!(!hub.fixed);
471 }
472
473 #[test]
474 fn test_with_commit_order() {
475 let hub = DataHub::with_commit_order(&["bar", "qux", "foo"]);
476 assert!(hub.local_data_src_manager.vec_unready.is_empty());
477 assert!(hub.local_data_src_manager.vec_ready.is_empty());
478 assert!(hub.local_data_src_manager.local);
479 assert!(hub.data_src_map.is_empty());
480 assert_eq!(hub.data_conn_manager.vec.len(), 3);
481 assert_eq!(hub.data_conn_manager.index_map.len(), 3);
482 assert!(!hub.fixed);
483 }
484
485 #[tokio::test]
486 async fn test_uses_and_ok() {
487 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
488
489 let mut hub = DataHub::new();
490 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
491 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
492
493 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
494 assert!(hub.local_data_src_manager.vec_ready.is_empty());
495 assert!(hub.local_data_src_manager.local);
496 assert!(hub.data_src_map.is_empty());
497 assert_eq!(hub.data_conn_manager.vec.len(), 0);
498 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
499 assert!(!hub.fixed);
500
501 assert!(hub.begin_async().await.is_ok());
502
503 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
504 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
505 assert!(hub.local_data_src_manager.local);
506 assert_eq!(hub.data_src_map.len(), 2);
507 assert_eq!(hub.data_conn_manager.vec.len(), 0);
508 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
509 assert!(hub.fixed);
510 }
511
512 #[tokio::test]
513 async fn test_uses_but_already_fixed() {
514 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
515
516 let mut hub = DataHub::new();
517 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
518
519 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 1);
520 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 0);
521 assert!(hub.local_data_src_manager.local);
522 assert_eq!(hub.data_src_map.len(), 0);
523 assert_eq!(hub.data_conn_manager.vec.len(), 0);
524 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
525 assert!(!hub.fixed);
526
527 assert!(hub.begin_async().await.is_ok());
528
529 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
530 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 1);
531 assert!(hub.local_data_src_manager.local);
532 assert_eq!(hub.data_src_map.len(), 1);
533 assert_eq!(hub.data_conn_manager.vec.len(), 0);
534 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
535 assert!(hub.fixed);
536
537 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
538
539 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
540 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 1);
541 assert!(hub.local_data_src_manager.local);
542 assert_eq!(hub.data_src_map.len(), 1);
543 assert_eq!(hub.data_conn_manager.vec.len(), 0);
544 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
545 assert!(hub.fixed);
546 }
547
548 #[test]
549 fn test_disuses_and_ok() {
550 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
551
552 let mut hub = DataHub::new();
553 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
554 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
555
556 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
557 assert!(hub.local_data_src_manager.vec_ready.is_empty());
558 assert!(hub.local_data_src_manager.local);
559 assert!(hub.data_src_map.is_empty());
560 assert_eq!(hub.data_conn_manager.vec.len(), 0);
561 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
562 assert!(!hub.fixed);
563
564 hub.disuses("foo");
565
566 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 1);
567 assert!(hub.local_data_src_manager.vec_ready.is_empty());
568 assert!(hub.local_data_src_manager.local);
569 assert!(hub.data_src_map.is_empty());
570 assert_eq!(hub.data_conn_manager.vec.len(), 0);
571 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
572 assert!(!hub.fixed);
573
574 hub.disuses("bar");
575
576 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
577 assert!(hub.local_data_src_manager.vec_ready.is_empty());
578 assert!(hub.local_data_src_manager.local);
579 assert!(hub.data_src_map.is_empty());
580 assert_eq!(hub.data_conn_manager.vec.len(), 0);
581 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
582 assert!(!hub.fixed);
583 }
584
585 #[tokio::test]
586 async fn test_disuses_and_fix() {
587 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
588
589 let mut hub = DataHub::new();
590 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
591 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
592
593 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
594 assert!(hub.local_data_src_manager.vec_ready.is_empty());
595 assert!(hub.local_data_src_manager.local);
596 assert!(hub.data_src_map.is_empty());
597 assert_eq!(hub.data_conn_manager.vec.len(), 0);
598 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
599 assert!(!hub.fixed);
600
601 hub.disuses("foo");
602
603 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 1);
604 assert!(hub.local_data_src_manager.vec_ready.is_empty());
605 assert!(hub.local_data_src_manager.local);
606 assert!(hub.data_src_map.is_empty());
607 assert_eq!(hub.data_conn_manager.vec.len(), 0);
608 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
609 assert!(!hub.fixed);
610
611 hub.disuses("bar");
612
613 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
614 assert!(hub.local_data_src_manager.vec_ready.is_empty());
615 assert!(hub.local_data_src_manager.local);
616 assert!(hub.data_src_map.is_empty());
617 assert_eq!(hub.data_conn_manager.vec.len(), 0);
618 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
619 assert!(!hub.fixed);
620
621 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
622 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
623
624 assert!(hub.begin_async().await.is_ok());
625
626 assert!(hub.local_data_src_manager.vec_unready.is_empty());
627 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
628 assert!(hub.local_data_src_manager.local);
629 assert_eq!(hub.data_src_map.len(), 2);
630 assert_eq!(hub.data_conn_manager.vec.len(), 0);
631 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
632 assert!(hub.fixed);
633
634 hub.uses("baz", MyDataSrc::new(3, logger.clone(), Failure::None));
635
636 assert!(hub.local_data_src_manager.vec_unready.is_empty());
637 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
638 assert!(hub.local_data_src_manager.local);
639 assert_eq!(hub.data_src_map.len(), 2);
640 assert_eq!(hub.data_conn_manager.vec.len(), 0);
641 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
642 assert!(hub.fixed);
643
644 hub.disuses("bar");
645
646 assert!(hub.local_data_src_manager.vec_unready.is_empty());
647 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
648 assert!(hub.local_data_src_manager.local);
649 assert_eq!(hub.data_src_map.len(), 2);
650 assert_eq!(hub.data_conn_manager.vec.len(), 0);
651 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
652 assert!(hub.fixed);
653
654 hub.end();
655
656 assert!(hub.local_data_src_manager.vec_unready.is_empty());
657 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
658 assert!(hub.local_data_src_manager.local);
659 assert_eq!(hub.data_src_map.len(), 2);
660 assert_eq!(hub.data_conn_manager.vec.len(), 0);
661 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
662 assert!(!hub.fixed);
663
664 hub.disuses("bar");
665
666 assert!(hub.local_data_src_manager.vec_unready.is_empty());
667 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 1);
668 assert!(hub.local_data_src_manager.local);
669 assert_eq!(hub.data_src_map.len(), 1);
670 assert_eq!(hub.data_conn_manager.vec.len(), 0);
671 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
672 assert!(!hub.fixed);
673
674 hub.disuses("foo");
675
676 assert!(hub.local_data_src_manager.vec_unready.is_empty());
677 assert!(hub.local_data_src_manager.vec_ready.is_empty());
678 assert!(hub.local_data_src_manager.local);
679 assert_eq!(hub.data_src_map.len(), 0);
680 assert_eq!(hub.data_conn_manager.vec.len(), 0);
681 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
682 assert!(!hub.fixed);
683 }
684
685 #[tokio::test]
686 async fn test_begin_if_empty() {
687 let mut hub = DataHub::new();
688 assert!(hub.begin_async().await.is_ok());
689
690 assert!(hub.local_data_src_manager.vec_unready.is_empty());
691 assert!(hub.local_data_src_manager.vec_ready.is_empty());
692 assert!(hub.local_data_src_manager.local);
693 assert_eq!(hub.data_src_map.len(), 0);
694 assert_eq!(hub.data_conn_manager.vec.len(), 0);
695 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
696 assert!(hub.fixed);
697
698 hub.end();
699
700 assert!(hub.local_data_src_manager.vec_unready.is_empty());
701 assert!(hub.local_data_src_manager.vec_ready.is_empty());
702 assert!(hub.local_data_src_manager.local);
703 assert_eq!(hub.data_src_map.len(), 0);
704 assert_eq!(hub.data_conn_manager.vec.len(), 0);
705 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
706 assert!(!hub.fixed);
707 }
708
709 #[tokio::test]
710 async fn test_begin_and_ok() {
711 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
712
713 {
714 let mut hub = DataHub::new();
715
716 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
717 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
718
719 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 2);
720 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 0);
721 assert_eq!(hub.local_data_src_manager.local, true);
722 assert_eq!(hub.data_src_map.len(), 0);
723 assert_eq!(hub.data_conn_manager.vec.len(), 0);
724 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
725 assert_eq!(hub.fixed, false);
726
727 assert_eq!(hub.begin_async().await.is_ok(), true);
728
729 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
730 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
731 assert_eq!(hub.local_data_src_manager.local, true);
732 assert_eq!(hub.data_src_map.len(), 2);
733 assert_eq!(hub.data_conn_manager.vec.len(), 0);
734 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
735 assert_eq!(hub.fixed, true);
736
737 hub.end();
738
739 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 0);
740 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 2);
741 assert_eq!(hub.local_data_src_manager.local, true);
742 assert_eq!(hub.data_src_map.len(), 2);
743 assert_eq!(hub.data_conn_manager.vec.len(), 0);
744 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
745 assert_eq!(hub.fixed, false);
746 }
747
748 assert_eq!(
749 *logger.lock().unwrap(),
750 &[
751 "MyDataSrc::new 1",
752 "MyDataSrc::new 2",
753 "MyDataSrc::setup 1",
754 "MyDataSrc::setup 2",
755 "MyDataSrc::close 2",
756 "MyDataSrc::drop 2",
757 "MyDataSrc::close 1",
758 "MyDataSrc::drop 1",
759 ]
760 );
761 }
762
763 #[tokio::test]
764 async fn test_begin_but_failed() {
765 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
766
767 {
768 let mut hub = DataHub::new();
769
770 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
771 hub.uses(
772 "bar",
773 MyDataSrc::new(2, logger.clone(), Failure::FailToSetup),
774 );
775 hub.uses("baz", MyDataSrc::new(3, logger.clone(), Failure::None));
776
777 assert_eq!(hub.local_data_src_manager.vec_unready.len(), 3);
778 assert_eq!(hub.local_data_src_manager.vec_ready.len(), 0);
779 assert_eq!(hub.local_data_src_manager.local, true);
780 assert_eq!(hub.data_src_map.len(), 0);
781 assert_eq!(hub.data_conn_manager.vec.len(), 0);
782 assert_eq!(hub.data_conn_manager.index_map.len(), 0);
783 assert_eq!(hub.fixed, false);
784
785 if let Err(err) = hub.begin_async().await {
786 match err.reason::<DataHubError>() {
787 Ok(DataHubError::FailToSetupLocalDataSrcs { errors }) => {
788 assert_eq!(errors.len(), 1);
789 assert_eq!(errors[0].0, "bar".into());
790 assert_eq!(errors[0].1.reason::<String>().unwrap(), "setup error");
791 }
792 _ => panic!(),
793 }
794 } else {
795 panic!();
796 }
797
798 hub.end();
799 }
800
801 assert_eq!(
802 *logger.lock().unwrap(),
803 &[
804 "MyDataSrc::new 1",
805 "MyDataSrc::new 2",
806 "MyDataSrc::new 3",
807 "MyDataSrc::setup 1",
808 "MyDataSrc::setup 2 failed",
809 "MyDataSrc::close 2",
810 "MyDataSrc::close 1",
811 "MyDataSrc::drop 3",
812 "MyDataSrc::drop 2",
813 "MyDataSrc::drop 1",
814 ]
815 );
816 }
817
818 #[tokio::test]
819 async fn test_run_and_ok() {
820 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
821 {
822 let mut hub = DataHub::new();
823
824 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
825 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
826
827 let logger_clone = logger.clone();
828 assert!(hub
829 .run_async(|_data| {
830 let logger_clone2 = logger_clone.clone();
831 Box::pin(async move {
832 logger_clone2
833 .lock()
834 .unwrap()
835 .push("execute logic".to_string());
836 Ok(())
837 })
838 })
839 .await
840 .is_ok());
841 }
842
843 assert_eq!(
844 *logger.lock().unwrap(),
845 &[
846 "MyDataSrc::new 1",
847 "MyDataSrc::new 2",
848 "MyDataSrc::setup 1",
849 "MyDataSrc::setup 2",
850 "execute logic",
851 "MyDataSrc::close 2",
852 "MyDataSrc::drop 2",
853 "MyDataSrc::close 1",
854 "MyDataSrc::drop 1",
855 ]
856 );
857 }
858
859 #[tokio::test]
860 async fn test_run_but_failed() {
861 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
862 {
863 let mut hub = DataHub::new();
864
865 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
866 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
867
868 let logger_clone = logger.clone();
869 if let Err(err) = hub
870 .run_async(|_data| {
871 let logger_clone2 = logger_clone.clone();
872 Box::pin(async move {
873 logger_clone2
874 .lock()
875 .unwrap()
876 .push("execute logic but fail".to_string());
877 Err(errs::Err::new("logic error".to_string()))
878 })
879 })
880 .await
881 {
882 match err.reason::<String>() {
883 Ok(s) => assert_eq!(s, "logic error"),
884 _ => panic!(),
885 }
886 } else {
887 panic!();
888 }
889 }
890
891 assert_eq!(
892 *logger.lock().unwrap(),
893 &[
894 "MyDataSrc::new 1",
895 "MyDataSrc::new 2",
896 "MyDataSrc::setup 1",
897 "MyDataSrc::setup 2",
898 "execute logic but fail",
899 "MyDataSrc::close 2",
900 "MyDataSrc::drop 2",
901 "MyDataSrc::close 1",
902 "MyDataSrc::drop 1",
903 ]
904 );
905 }
906
907 #[tokio::test]
908 async fn test_txn_and_no_data_access_and_ok() {
909 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
910 {
911 let mut hub = DataHub::new();
912
913 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
914 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
915
916 let logger_clone = logger.clone();
917 assert!(hub
918 .txn_async(|_data| {
919 let logger_clone2 = logger_clone.clone();
920 Box::pin(async move {
921 logger_clone2
922 .lock()
923 .unwrap()
924 .push("execute logic".to_string());
925 Ok(())
926 })
927 })
928 .await
929 .is_ok());
930 }
931
932 assert_eq!(
933 *logger.lock().unwrap(),
934 &[
935 "MyDataSrc::new 1",
936 "MyDataSrc::new 2",
937 "MyDataSrc::setup 1",
938 "MyDataSrc::setup 2",
939 "execute logic",
940 "MyDataSrc::close 2",
941 "MyDataSrc::drop 2",
942 "MyDataSrc::close 1",
943 "MyDataSrc::drop 1",
944 ]
945 );
946 }
947
948 #[tokio::test]
949 async fn test_txn_and_has_data_access_and_ok() {
950 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
951 {
952 let mut hub = DataHub::new();
953
954 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
955 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
956
957 let logger_clone = logger.clone();
958 hub.txn_async(move |data| {
959 let logger_clone2 = logger_clone.clone();
960 Box::pin(async move {
961 logger_clone2
962 .lock()
963 .unwrap()
964 .push("execute logic".to_string());
965 let _conn1 = data.get_data_conn_async::<MyDataConn>("foo").await?;
966 let _conn2 = data.get_data_conn_async::<MyDataConn>("bar").await?;
967 Ok(())
968 })
969 })
970 .await
971 .unwrap()
972 }
973
974 assert_eq!(
975 *logger.lock().unwrap(),
976 &[
977 "MyDataSrc::new 1",
978 "MyDataSrc::new 2",
979 "MyDataSrc::setup 1",
980 "MyDataSrc::setup 2",
981 "execute logic",
982 "MyDataSrc::create_data_conn 1",
983 "MyDataConn::new 1",
984 "MyDataSrc::create_data_conn 2",
985 "MyDataConn::new 2",
986 "MyDataConn::pre_commit 1",
987 "MyDataConn::pre_commit 2",
988 "MyDataConn::commit 1",
989 "MyDataConn::commit 2",
990 "MyDataConn::post_commit 1",
991 "MyDataConn::post_commit 2",
992 "MyDataConn::close 1",
993 "MyDataConn::drop 1",
994 "MyDataConn::close 2",
995 "MyDataConn::drop 2",
996 "MyDataSrc::close 2",
997 "MyDataSrc::drop 2",
998 "MyDataSrc::close 1",
999 "MyDataSrc::drop 1",
1000 ]
1001 );
1002 }
1003
1004 #[tokio::test]
1005 async fn test_txn_but_failed() {
1006 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1007 {
1008 let mut hub = DataHub::new();
1009
1010 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
1011 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
1012
1013 let logger_clone = logger.clone();
1014 if let Err(e) = hub
1015 .txn_async(move |data| {
1016 let logger_clone2 = logger_clone.clone();
1017 Box::pin(async move {
1018 logger_clone2
1019 .lock()
1020 .unwrap()
1021 .push("execute logic".to_string());
1022 let _conn1 = data.get_data_conn_async::<MyDataConn>("foo").await?;
1023 let _conn2 = data.get_data_conn_async::<MyDataConn>("bar").await?;
1024 Err(errs::Err::new("logic error"))
1025 })
1026 })
1027 .await
1028 {
1029 match e.reason::<&str>() {
1030 Ok(s) => assert_eq!(s, &"logic error"),
1031 _ => panic!(),
1032 }
1033 }
1034 }
1035
1036 assert_eq!(
1037 *logger.lock().unwrap(),
1038 &[
1039 "MyDataSrc::new 1",
1040 "MyDataSrc::new 2",
1041 "MyDataSrc::setup 1",
1042 "MyDataSrc::setup 2",
1043 "execute logic",
1044 "MyDataSrc::create_data_conn 1",
1045 "MyDataConn::new 1",
1046 "MyDataSrc::create_data_conn 2",
1047 "MyDataConn::new 2",
1048 "MyDataConn::rollback 1",
1049 "MyDataConn::rollback 2",
1050 "MyDataConn::close 1",
1051 "MyDataConn::drop 1",
1052 "MyDataConn::close 2",
1053 "MyDataConn::drop 2",
1054 "MyDataSrc::close 2",
1055 "MyDataSrc::drop 2",
1056 "MyDataSrc::close 1",
1057 "MyDataSrc::drop 1",
1058 ]
1059 );
1060 }
1061
1062 #[tokio::test]
1063 async fn test_txn_with_commit_order() {
1064 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1065 {
1066 let mut hub = DataHub::with_commit_order(&["bar", "foo"]);
1067
1068 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
1069 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
1070
1071 let logger_clone = logger.clone();
1072 hub.txn_async(move |data| {
1073 let logger_clone2 = logger_clone.clone();
1074 Box::pin(async move {
1075 logger_clone2
1076 .lock()
1077 .unwrap()
1078 .push("execute logic".to_string());
1079 let _conn1 = data.get_data_conn_async::<MyDataConn>("foo").await?;
1080 let _conn2 = data.get_data_conn_async::<MyDataConn>("bar").await?;
1081 Ok(())
1082 })
1083 })
1084 .await
1085 .unwrap();
1086 }
1087
1088 assert_eq!(
1089 *logger.lock().unwrap(),
1090 &[
1091 "MyDataSrc::new 1",
1092 "MyDataSrc::new 2",
1093 "MyDataSrc::setup 1",
1094 "MyDataSrc::setup 2",
1095 "execute logic",
1096 "MyDataSrc::create_data_conn 1",
1097 "MyDataConn::new 1",
1098 "MyDataSrc::create_data_conn 2",
1099 "MyDataConn::new 2",
1100 "MyDataConn::pre_commit 2",
1101 "MyDataConn::pre_commit 1",
1102 "MyDataConn::commit 2",
1103 "MyDataConn::commit 1",
1104 "MyDataConn::post_commit 2",
1105 "MyDataConn::post_commit 1",
1106 "MyDataConn::close 2",
1107 "MyDataConn::drop 2",
1108 "MyDataConn::close 1",
1109 "MyDataConn::drop 1",
1110 "MyDataSrc::close 2",
1111 "MyDataSrc::drop 2",
1112 "MyDataSrc::close 1",
1113 "MyDataSrc::drop 1",
1114 ]
1115 );
1116 }
1117
1118 #[tokio::test]
1119 async fn test_get_data_conn_and_failed() {
1120 let logger = Arc::new(Mutex::new(Vec::<String>::new()));
1121 {
1122 let mut hub = DataHub::new();
1123
1124 hub.uses("foo", MyDataSrc::new(1, logger.clone(), Failure::None));
1125 hub.uses("bar", MyDataSrc::new(2, logger.clone(), Failure::None));
1126
1127 let logger_clone = logger.clone();
1128 let err = hub
1129 .txn_async(move |data| {
1130 let logger_clone2 = logger_clone.clone();
1131 Box::pin(async move {
1132 logger_clone2
1133 .lock()
1134 .unwrap()
1135 .push("execute logic".to_string());
1136 let _conn1 = data.get_data_conn_async::<MyDataConn>("fxx").await?;
1137 Ok(())
1138 })
1139 })
1140 .await
1141 .unwrap_err();
1142
1143 match err.reason::<DataHubError>() {
1144 Ok(r) => match r {
1145 DataHubError::NoDataSrcToCreateDataConn {
1146 name,
1147 data_conn_type,
1148 } => {
1149 assert_eq!(name.as_ref(), "fxx");
1150 assert_eq!(
1151 data_conn_type,
1152 &"sabi::tokio::data_hub::tests_of_data_hub::MyDataConn"
1153 );
1154 }
1155 _ => panic!(),
1156 },
1157 _ => panic!(),
1158 }
1159 }
1160
1161 assert_eq!(
1162 *logger.lock().unwrap(),
1163 &[
1164 "MyDataSrc::new 1",
1165 "MyDataSrc::new 2",
1166 "MyDataSrc::setup 1",
1167 "MyDataSrc::setup 2",
1168 "execute logic",
1169 "MyDataSrc::close 2",
1170 "MyDataSrc::drop 2",
1171 "MyDataSrc::close 1",
1172 "MyDataSrc::drop 1",
1173 ]
1174 );
1175 }
1176
1177 trait Data {}
1178 impl Data for DataHub {}
1179
1180 async fn process_async(_data: &mut impl Data) -> errs::Result<()> {
1181 Ok(())
1182 }
1183
1184 #[tokio::test]
1185 async fn data_hub_implements_send_trait() {
1186 let handle = tokio::spawn(async {
1187 let mut data = DataHub::new();
1188 data.run_async(_logic!(process_async)).await.unwrap();
1189 });
1190
1191 handle.await.unwrap();
1192 }
1193}