1use crate::{AsyncGroup, DataConn, DataConnContainer, DataConnManager};
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::{any, mem, ptr};
10
11#[allow(clippy::enum_variant_names)]
13#[derive(Debug)]
14pub enum DataConnError {
15 FailToPreCommitDataConn {
19 errors: Vec<(Arc<str>, errs::Err)>,
21 },
22
23 FailToCommitDataConn {
27 errors: Vec<(Arc<str>, errs::Err)>,
29 },
30
31 FailToCastDataConn {
33 name: Arc<str>,
35
36 target_type: &'static str,
38 },
39}
40
41impl<C> DataConnContainer<C>
42where
43 C: DataConn + 'static,
44{
45 pub(crate) fn new(name: impl Into<Arc<str>>, data_conn: Box<C>) -> Self {
46 Self {
47 drop_fn: drop_data_conn::<C>,
48 is_fn: is_data_conn::<C>,
49 commit_fn: commit_data_conn::<C>,
50 pre_commit_fn: pre_commit_data_conn::<C>,
51 post_commit_fn: post_commit_data_conn::<C>,
52 should_force_back_fn: should_force_back_data_conn::<C>,
53 rollback_fn: rollback_data_conn::<C>,
54 force_back_fn: force_back_data_conn::<C>,
55 close_fn: close_data_conn::<C>,
56
57 name: name.into(),
58 data_conn,
59 }
60 }
61}
62
63fn drop_data_conn<C>(ptr: *const DataConnContainer)
64where
65 C: DataConn + 'static,
66{
67 let typed_ptr = ptr as *mut DataConnContainer<C>;
68 unsafe {
69 drop(Box::from_raw(typed_ptr));
70 }
71}
72
73fn is_data_conn<C>(type_id: any::TypeId) -> bool
74where
75 C: DataConn + 'static,
76{
77 any::TypeId::of::<C>() == type_id
78}
79
80fn commit_data_conn<C>(ptr: *const DataConnContainer, ag: &mut AsyncGroup) -> errs::Result<()>
81where
82 C: DataConn + 'static,
83{
84 let typed_ptr = ptr as *mut DataConnContainer<C>;
85 unsafe { (*typed_ptr).data_conn.commit(ag) }
86}
87
88fn pre_commit_data_conn<C>(ptr: *const DataConnContainer, ag: &mut AsyncGroup) -> errs::Result<()>
89where
90 C: DataConn + 'static,
91{
92 let typed_ptr = ptr as *mut DataConnContainer<C>;
93 unsafe { (*typed_ptr).data_conn.pre_commit(ag) }
94}
95
96fn post_commit_data_conn<C>(ptr: *const DataConnContainer, ag: &mut AsyncGroup)
97where
98 C: DataConn + 'static,
99{
100 let typed_ptr = ptr as *mut DataConnContainer<C>;
101 unsafe {
102 (*typed_ptr).data_conn.post_commit(ag);
103 }
104}
105
106fn should_force_back_data_conn<C>(ptr: *const DataConnContainer) -> bool
107where
108 C: DataConn + 'static,
109{
110 let typed_ptr = ptr as *mut DataConnContainer<C>;
111 unsafe { (*typed_ptr).data_conn.should_force_back() }
112}
113
114fn rollback_data_conn<C>(ptr: *const DataConnContainer, ag: &mut AsyncGroup)
115where
116 C: DataConn + 'static,
117{
118 let typed_ptr = ptr as *mut DataConnContainer<C>;
119 unsafe {
120 (*typed_ptr).data_conn.rollback(ag);
121 }
122}
123
124fn force_back_data_conn<C>(ptr: *const DataConnContainer, ag: &mut AsyncGroup)
125where
126 C: DataConn + 'static,
127{
128 let typed_ptr = ptr as *mut DataConnContainer<C>;
129 unsafe {
130 (*typed_ptr).data_conn.force_back(ag);
131 }
132}
133
134fn close_data_conn<C>(ptr: *const DataConnContainer)
135where
136 C: DataConn + 'static,
137{
138 let typed_ptr = ptr as *mut DataConnContainer<C>;
139 unsafe {
140 (*typed_ptr).data_conn.close();
141 }
142}
143
144impl DataConnManager {
145 pub(crate) fn new() -> Self {
146 Self {
147 vec: Vec::new(),
148 index_map: HashMap::new(),
149 }
150 }
151
152 pub(crate) fn with_commit_order(names: &[&str]) -> Self {
153 let mut index_map = HashMap::with_capacity(names.len());
154 for (i, nm) in names.iter().rev().enumerate() {
156 index_map.insert((*nm).into(), names.len() - 1 - i);
157 }
158
159 Self {
160 vec: vec![None; names.len()],
161 index_map,
162 }
163 }
164
165 pub(crate) fn add(&mut self, nnptr: ptr::NonNull<DataConnContainer>) {
166 let name = unsafe { (*nnptr.as_ptr()).name.clone() };
167 if let Some(index) = self.index_map.get(&name) {
168 self.vec[*index] = Some(nnptr);
169 } else {
170 let index = self.vec.len();
171 self.vec.push(Some(nnptr));
172 self.index_map.insert(name.clone(), index);
173 }
174 }
175
176 pub(crate) fn find_by_name(
177 &self,
178 name: impl AsRef<str>,
179 ) -> Option<ptr::NonNull<DataConnContainer>> {
180 if let Some(index) = self.index_map.get(name.as_ref()) {
181 if *index < self.vec.len() {
182 if let Some(nnptr) = self.vec[*index] {
183 let ptr = nnptr.as_ptr();
184 let cont_name = unsafe { &(*ptr).name };
185 if cont_name.as_ref() == name.as_ref() {
186 return Some(nnptr);
187 }
188 }
189 }
190 }
191
192 None
193 }
194
195 pub(crate) fn to_typed_ptr<C>(
196 nnptr: &ptr::NonNull<DataConnContainer>,
197 ) -> errs::Result<*mut DataConnContainer<C>>
198 where
199 C: DataConn + 'static,
200 {
201 let ptr = nnptr.as_ptr();
202 let name = unsafe { &(*ptr).name };
203 let type_id = any::TypeId::of::<C>();
204 let is_fn = unsafe { (*ptr).is_fn };
205
206 if !is_fn(type_id) {
207 return Err(errs::Err::new(DataConnError::FailToCastDataConn {
208 name: name.clone(),
209 target_type: any::type_name::<C>(),
210 }));
211 }
212
213 let typed_ptr = ptr as *mut DataConnContainer<C>;
214 Ok(typed_ptr)
215 }
216
217 pub(crate) fn commit(&self) -> errs::Result<()> {
218 let mut errors = Vec::new();
219
220 let mut ag = AsyncGroup::new();
221 for nnptr in self.vec.iter().flatten() {
222 let ptr = nnptr.as_ptr();
223 let pre_commit_fn = unsafe { (*ptr).pre_commit_fn };
224 ag._name = unsafe { (*ptr).name.clone() };
225 if let Err(err) = pre_commit_fn(ptr, &mut ag) {
226 errors.push((ag._name.clone(), err));
227 break;
228 }
229 }
230 ag.join_and_collect_errors(&mut errors);
231
232 if !errors.is_empty() {
233 return Err(errs::Err::new(DataConnError::FailToPreCommitDataConn {
234 errors,
235 }));
236 }
237
238 let mut ag = AsyncGroup::new();
239 for nnptr in self.vec.iter().flatten() {
240 let ptr = nnptr.as_ptr();
241 let commit_fn = unsafe { (*ptr).commit_fn };
242 ag._name = unsafe { (*ptr).name.clone() };
243 if let Err(err) = commit_fn(ptr, &mut ag) {
244 errors.push((ag._name.clone(), err));
245 break;
246 }
247 }
248 ag.join_and_collect_errors(&mut errors);
249
250 if !errors.is_empty() {
251 return Err(errs::Err::new(DataConnError::FailToCommitDataConn {
252 errors,
253 }));
254 }
255
256 let mut ag = AsyncGroup::new();
257 for nnptr in self.vec.iter().flatten() {
258 let ptr = nnptr.as_ptr();
259 let post_commit_fn = unsafe { (*ptr).post_commit_fn };
260 ag._name = unsafe { (*ptr).name.clone() };
261 post_commit_fn(ptr, &mut ag);
262 }
263 ag.join_and_ignore_errors();
264
265 Ok(())
266 }
267
268 pub(crate) fn rollback(&mut self) {
269 let mut ag = AsyncGroup::new();
270 for nnptr in self.vec.iter().flatten() {
271 let ptr = nnptr.as_ptr();
272 let should_force_back_fn = unsafe { (*ptr).should_force_back_fn };
273 let force_back_fn = unsafe { (*ptr).force_back_fn };
274 let rollback_fn = unsafe { (*ptr).rollback_fn };
275 ag._name = unsafe { (*ptr).name.clone() };
276
277 if should_force_back_fn(ptr) {
278 force_back_fn(ptr, &mut ag);
279 } else {
280 rollback_fn(ptr, &mut ag);
281 }
282 }
283 ag.join_and_ignore_errors();
284 }
285
286 pub(crate) fn close(&mut self) {
287 self.index_map.clear();
288
289 let vec: Vec<Option<ptr::NonNull<DataConnContainer>>> = mem::take(&mut self.vec);
290
291 for nnptr in vec.iter().flatten() {
292 let ptr = nnptr.as_ptr();
293 let close_fn = unsafe { (*ptr).close_fn };
294 let drop_fn = unsafe { (*ptr).drop_fn };
295 close_fn(ptr);
296 drop_fn(ptr);
297 }
298 }
299}
300
301impl Drop for DataConnManager {
302 fn drop(&mut self) {
303 self.close();
304 }
305}
306
307#[cfg(test)]
308mod tests_of_data_conn {
309 use super::*;
310 use std::sync::{
311 atomic::{AtomicBool, Ordering},
312 Arc, Mutex,
313 };
314 use std::{thread, time};
315
316 #[derive(PartialEq, Copy, Clone)]
317 enum Fail {
318 Not,
319 Commit,
320 PreCommit,
321 }
322
323 struct SyncDataConn {
324 id: i8,
325 committed: bool,
326 fail: Fail,
327 logger: Arc<Mutex<Vec<String>>>,
328 }
329 impl SyncDataConn {
330 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: Fail) -> Self {
331 logger
332 .lock()
333 .unwrap()
334 .push(format!("SyncDataConn::new {}", id));
335 Self {
336 id,
337 committed: false,
338 fail,
339 logger,
340 }
341 }
342 }
343 impl Drop for SyncDataConn {
344 fn drop(&mut self) {
345 self.logger
346 .lock()
347 .unwrap()
348 .push(format!("SyncDataConn::drop {}", self.id));
349 }
350 }
351 impl DataConn for SyncDataConn {
352 fn commit(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
353 if self.fail == Fail::Commit {
354 self.logger
355 .lock()
356 .unwrap()
357 .push(format!("SyncDataConn::commit {} failed", self.id));
358 return Err(errs::Err::new("ZZZ".to_string()));
359 }
360 self.committed = true;
361 self.logger
362 .lock()
363 .unwrap()
364 .push(format!("SyncDataConn::commit {}", self.id));
365 Ok(())
366 }
367 fn pre_commit(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
368 if self.fail == Fail::PreCommit {
369 self.logger
370 .lock()
371 .unwrap()
372 .push(format!("SyncDataConn::pre_commit {} failed", self.id));
373 return Err(errs::Err::new("zzz".to_string()));
374 }
375 self.logger
376 .lock()
377 .unwrap()
378 .push(format!("SyncDataConn::pre_commit {}", self.id));
379 Ok(())
380 }
381 fn post_commit(&mut self, _ag: &mut AsyncGroup) {
382 self.logger
383 .lock()
384 .unwrap()
385 .push(format!("SyncDataConn::post_commit {}", self.id));
386 }
387 fn should_force_back(&self) -> bool {
388 self.committed
389 }
390 fn rollback(&mut self, _ag: &mut AsyncGroup) {
391 self.logger
392 .lock()
393 .unwrap()
394 .push(format!("SyncDataConn::rollback {}", self.id));
395 }
396 fn force_back(&mut self, _ag: &mut AsyncGroup) {
397 self.logger
398 .lock()
399 .unwrap()
400 .push(format!("SyncDataConn::force_back {}", self.id));
401 }
402 fn close(&mut self) {
403 self.logger
404 .lock()
405 .unwrap()
406 .push(format!("SyncDataConn::close {}", self.id));
407 }
408 }
409
410 struct AsyncDataConn {
411 id: i8,
412 committed: Arc<AtomicBool>,
413 fail: Fail,
414 logger: Arc<Mutex<Vec<String>>>,
415 }
416 impl AsyncDataConn {
417 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: Fail) -> Self {
418 logger
419 .lock()
420 .unwrap()
421 .push(format!("AsyncDataConn::new {}", id));
422 Self {
423 id,
424 committed: Arc::new(AtomicBool::new(false)),
425 fail,
426 logger,
427 }
428 }
429 }
430 impl Drop for AsyncDataConn {
431 fn drop(&mut self) {
432 self.logger
433 .lock()
434 .unwrap()
435 .push(format!("AsyncDataConn::drop {}", self.id));
436 }
437 }
438 impl DataConn for AsyncDataConn {
439 fn commit(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
440 let fail = self.fail;
441 let logger = self.logger.clone();
442 let id = self.id;
443 let committed = self.committed.clone();
444 ag.add(move || {
445 thread::sleep(time::Duration::from_millis(100));
446 if fail == Fail::Commit {
447 logger
448 .lock()
449 .unwrap()
450 .push(format!("AsyncDataConn::commit {} failed", id));
451 return Err(errs::Err::new("YYY".to_string()));
452 }
453 committed.store(true, Ordering::Release);
454 logger
455 .lock()
456 .unwrap()
457 .push(format!("AsyncDataConn::commit {}", id));
458 Ok(())
459 });
460 Ok(())
461 }
462 fn pre_commit(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
463 let fail = self.fail;
464 let logger = self.logger.clone();
465 let id = self.id;
466 ag.add(move || {
467 thread::sleep(time::Duration::from_millis(100));
468 if fail == Fail::PreCommit {
469 logger
470 .lock()
471 .unwrap()
472 .push(format!("AsyncDataConn::pre_commit {} failed", id));
473 return Err(errs::Err::new("yyy".to_string()));
474 }
475 logger
476 .lock()
477 .unwrap()
478 .push(format!("AsyncDataConn::pre_commit {}", id));
479 Ok(())
480 });
481 Ok(())
482 }
483 fn post_commit(&mut self, ag: &mut AsyncGroup) {
484 let logger = self.logger.clone();
485 let id = self.id;
486 ag.add(move || {
487 thread::sleep(time::Duration::from_millis(100));
488 logger
489 .lock()
490 .unwrap()
491 .push(format!("AsyncDataConn::post_commit {}", id));
492 Ok(())
493 });
494 }
495 fn should_force_back(&self) -> bool {
496 self.committed.load(Ordering::Acquire)
497 }
498 fn rollback(&mut self, ag: &mut AsyncGroup) {
499 let logger = self.logger.clone();
500 let id = self.id;
501 ag.add(move || {
502 thread::sleep(time::Duration::from_millis(100));
503 logger
504 .lock()
505 .unwrap()
506 .push(format!("AsyncDataConn::rollback {}", id));
507 Ok(())
508 });
509 }
510 fn force_back(&mut self, ag: &mut AsyncGroup) {
511 let logger = self.logger.clone();
512 let id = self.id;
513 ag.add(move || {
514 thread::sleep(time::Duration::from_millis(100));
515 logger
516 .lock()
517 .unwrap()
518 .push(format!("AsyncDataConn::force_back {}", id));
519 Ok(())
520 });
521 }
522 fn close(&mut self) {
523 self.logger
524 .lock()
525 .unwrap()
526 .push(format!("AsyncDataConn::close {}", self.id));
527 }
528 }
529
530 mod tests_of_data_conn_manager {
531 use super::*;
532
533 #[test]
534 fn test_new() {
535 let manager = DataConnManager::new();
536 assert!(manager.vec.is_empty());
537 assert!(manager.index_map.is_empty());
538 }
539
540 #[test]
541 fn test_with_commit_order() {
542 let manager = DataConnManager::with_commit_order(&["bar", "baz", "foo"]);
543 assert_eq!(manager.vec, vec![None, None, None]);
544 assert_eq!(manager.index_map.len(), 3);
545 assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
546 assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
547 assert_eq!(*manager.index_map.get("baz").unwrap(), 1);
548 }
549
550 #[test]
551 fn test_new_and_add() {
552 let logger = Arc::new(Mutex::new(Vec::new()));
553
554 let mut manager = DataConnManager::new();
555 assert!(manager.vec.is_empty());
556 assert!(manager.index_map.is_empty());
557
558 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
559 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
560 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
561 manager.add(nnptr);
562 assert_eq!(manager.vec.len(), 1);
563 assert_eq!(manager.index_map.len(), 1);
564 assert_eq!(*manager.index_map.get("foo").unwrap(), 0);
565
566 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
567 let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
568 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
569 manager.add(nnptr);
570 assert_eq!(manager.vec.len(), 2);
571 assert_eq!(manager.index_map.len(), 2);
572 assert_eq!(*manager.index_map.get("foo").unwrap(), 0);
573 assert_eq!(*manager.index_map.get("bar").unwrap(), 1);
574 }
575
576 #[test]
577 fn test_with_commit_order_and_add() {
578 let logger = Arc::new(Mutex::new(Vec::new()));
579
580 let mut manager = DataConnManager::with_commit_order(&["bar", "baz", "foo"]);
581 assert_eq!(manager.vec, vec![None, None, None]);
582 assert_eq!(manager.index_map.len(), 3);
583 assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
584 assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
585 assert_eq!(*manager.index_map.get("baz").unwrap(), 1);
586
587 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
588 let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
589 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
590 manager.add(nnptr);
591 assert_eq!(manager.vec.len(), 3);
592 assert_eq!(manager.index_map.len(), 3);
593 assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
594
595 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
596 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
597 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
598 manager.add(nnptr);
599 assert_eq!(manager.vec.len(), 3);
600 assert_eq!(manager.index_map.len(), 3);
601 assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
602 assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
603
604 let conn = SyncDataConn::new(3, logger.clone(), Fail::Not);
605 let boxed = Box::new(DataConnContainer::new("qux", Box::new(conn)));
606 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
607 manager.add(nnptr);
608 assert_eq!(manager.vec.len(), 4);
609 assert_eq!(manager.index_map.len(), 4);
610 assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
611 assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
612 assert_eq!(*manager.index_map.get("qux").unwrap(), 3);
613 }
614
615 #[test]
616 fn test_find_by_name_but_none() {
617 let manager = DataConnManager::new();
618 assert!(manager.find_by_name("foo").is_none());
619 assert!(manager.find_by_name("bar").is_none());
620 }
621
622 #[test]
623 fn test_find_by_name_and_found() {
624 let logger = Arc::new(Mutex::new(Vec::new()));
625
626 let mut manager = DataConnManager::new();
627
628 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
629 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
630 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
631 manager.add(nnptr);
632
633 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
634 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
635 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
636 manager.add(nnptr);
637
638 if let Some(nnptr) = manager.find_by_name("foo") {
639 let name = unsafe { (*nnptr.as_ptr()).name.clone() };
640 assert_eq!(name.as_ref(), "foo");
641 } else {
642 panic!();
643 }
644
645 if let Some(nnptr) = manager.find_by_name("bar") {
646 let name = unsafe { (*nnptr.as_ptr()).name.clone() };
647 assert_eq!(name.as_ref(), "bar");
648 } else {
649 panic!();
650 }
651 }
652
653 #[test]
654 fn test_to_typed_ptr() {
655 let logger = Arc::new(Mutex::new(Vec::new()));
656
657 let mut manager = DataConnManager::new();
658
659 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
660 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
661 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
662 manager.add(nnptr);
663
664 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
665 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
666 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
667 manager.add(nnptr);
668
669 let nnptr = manager.find_by_name("foo").unwrap();
670 if let Ok(typed_nnptr) = DataConnManager::to_typed_ptr::<SyncDataConn>(&nnptr) {
671 assert_eq!(any::type_name_of_val(&typed_nnptr), "*mut sabi::DataConnContainer<sabi::data_conn::tests_of_data_conn::SyncDataConn>");
672 assert_eq!(unsafe { (*typed_nnptr).name.clone() }, "foo".into());
673 } else {
674 panic!();
675 }
676
677 let nnptr = manager.find_by_name("bar").unwrap();
678 if let Ok(typed_nnptr) = DataConnManager::to_typed_ptr::<AsyncDataConn>(&nnptr) {
679 assert_eq!(any::type_name_of_val(&typed_nnptr), "*mut sabi::DataConnContainer<sabi::data_conn::tests_of_data_conn::AsyncDataConn>");
680 assert_eq!(unsafe { (*typed_nnptr).name.clone() }, "bar".into());
681 } else {
682 panic!();
683 }
684 }
685
686 #[test]
687 fn test_to_typed_ptr_but_fail() {
688 let logger = Arc::new(Mutex::new(Vec::new()));
689
690 let mut manager = DataConnManager::new();
691
692 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
693 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
694 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
695 manager.add(nnptr);
696
697 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
698 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
699 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
700 manager.add(nnptr);
701
702 let nnptr = manager.find_by_name("foo").unwrap();
703 if let Err(err) = DataConnManager::to_typed_ptr::<AsyncDataConn>(&nnptr) {
704 match err.reason::<DataConnError>() {
705 Ok(DataConnError::FailToCastDataConn { name, target_type }) => {
706 assert_eq!(name.as_ref(), "foo");
707 assert_eq!(
708 *target_type,
709 "sabi::data_conn::tests_of_data_conn::AsyncDataConn"
710 );
711 }
712 _ => panic!(),
713 }
714 } else {
715 panic!();
716 }
717
718 let nnptr = manager.find_by_name("bar").unwrap();
719 if let Err(err) = DataConnManager::to_typed_ptr::<SyncDataConn>(&nnptr) {
720 match err.reason::<DataConnError>() {
721 Ok(DataConnError::FailToCastDataConn { name, target_type }) => {
722 assert_eq!(name.as_ref(), "bar");
723 assert_eq!(
724 *target_type,
725 "sabi::data_conn::tests_of_data_conn::SyncDataConn"
726 );
727 }
728 _ => panic!(),
729 }
730 } else {
731 panic!();
732 }
733 }
734
735 #[test]
736 fn test_commit_ok() {
737 let logger = Arc::new(Mutex::new(Vec::new()));
738
739 {
740 let mut manager = DataConnManager::new();
741
742 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
743 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
744 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
745 manager.add(nnptr);
746
747 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
748 let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
749 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
750 manager.add(nnptr);
751
752 assert!(manager.commit().is_ok());
753 }
754
755 assert_eq!(
756 *logger.lock().unwrap(),
757 &[
758 "SyncDataConn::new 1",
759 "AsyncDataConn::new 2",
760 "SyncDataConn::pre_commit 1",
761 "AsyncDataConn::pre_commit 2",
762 "SyncDataConn::commit 1",
763 "AsyncDataConn::commit 2",
764 "SyncDataConn::post_commit 1",
765 "AsyncDataConn::post_commit 2",
766 "SyncDataConn::close 1",
767 "SyncDataConn::drop 1",
768 "AsyncDataConn::close 2",
769 "AsyncDataConn::drop 2",
770 ]
771 );
772 }
773
774 #[test]
775 fn test_commit_with_order() {
776 let logger = Arc::new(Mutex::new(Vec::new()));
777
778 {
779 let mut manager = DataConnManager::with_commit_order(&["bar", "baz", "foo"]);
780
781 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
782 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
783 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
784 manager.add(nnptr);
785
786 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
787 let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
788 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
789 manager.add(nnptr);
790
791 let conn = SyncDataConn::new(3, logger.clone(), Fail::Not);
792 let boxed = Box::new(DataConnContainer::new("qux", Box::new(conn)));
793 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
794 manager.add(nnptr);
795
796 assert!(manager.commit().is_ok());
797 }
798
799 assert_eq!(
800 *logger.lock().unwrap(),
801 &[
802 "SyncDataConn::new 1",
803 "AsyncDataConn::new 2",
804 "SyncDataConn::new 3",
805 "SyncDataConn::pre_commit 1",
806 "SyncDataConn::pre_commit 3",
807 "AsyncDataConn::pre_commit 2", "SyncDataConn::commit 1",
809 "SyncDataConn::commit 3",
810 "AsyncDataConn::commit 2", "SyncDataConn::post_commit 1",
812 "SyncDataConn::post_commit 3",
813 "AsyncDataConn::post_commit 2", "AsyncDataConn::close 2",
815 "AsyncDataConn::drop 2",
816 "SyncDataConn::close 1",
817 "SyncDataConn::drop 1",
818 "SyncDataConn::close 3",
819 "SyncDataConn::drop 3",
820 ]
821 );
822 }
823
824 #[test]
825 fn test_commit_but_fail_first_sync_pre_commit() {
826 let logger = Arc::new(Mutex::new(Vec::new()));
827
828 {
829 let mut manager = DataConnManager::new();
830
831 let conn = SyncDataConn::new(1, logger.clone(), Fail::PreCommit);
832 let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
833 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
834 manager.add(nnptr);
835
836 let conn = AsyncDataConn::new(2, logger.clone(), Fail::PreCommit);
837 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
838 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
839 manager.add(nnptr);
840
841 if let Err(e) = manager.commit() {
842 match e.reason::<DataConnError>() {
843 Ok(DataConnError::FailToPreCommitDataConn { errors }) => {
844 assert_eq!(errors.len(), 1);
845 assert_eq!(errors[0].0, "foo".into());
846 assert_eq!(errors[0].1.reason::<String>().unwrap(), "zzz");
847 }
848 _ => panic!(),
849 }
850 } else {
851 panic!();
852 }
853 }
854
855 assert_eq!(
856 *logger.lock().unwrap(),
857 &[
858 "SyncDataConn::new 1",
859 "AsyncDataConn::new 2",
860 "SyncDataConn::pre_commit 1 failed",
861 "SyncDataConn::close 1",
862 "SyncDataConn::drop 1",
863 "AsyncDataConn::close 2",
864 "AsyncDataConn::drop 2",
865 ]
866 );
867 }
868
869 #[test]
870 fn test_commit_but_fail_first_async_pre_commit() {
871 let logger = Arc::new(Mutex::new(Vec::new()));
872
873 {
874 let mut manager = DataConnManager::new();
875
876 let conn = SyncDataConn::new(1, logger.clone(), Fail::PreCommit);
877 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
878 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
879 manager.add(nnptr);
880
881 let conn = AsyncDataConn::new(2, logger.clone(), Fail::PreCommit);
882 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
883 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
884 manager.add(nnptr);
885
886 if let Err(e) = manager.commit() {
887 match e.reason::<DataConnError>() {
888 Ok(DataConnError::FailToPreCommitDataConn { errors }) => {
889 assert_eq!(errors.len(), 1);
890 assert_eq!(errors[0].0, "foo".into());
891 assert_eq!(errors[0].1.reason::<String>().unwrap(), "zzz");
892 }
893 _ => panic!(),
894 }
895 } else {
896 panic!();
897 }
898 }
899
900 assert_eq!(
901 *logger.lock().unwrap(),
902 &[
903 "SyncDataConn::new 1",
904 "AsyncDataConn::new 2",
905 "SyncDataConn::pre_commit 1 failed",
906 "SyncDataConn::close 1",
907 "SyncDataConn::drop 1",
908 "AsyncDataConn::close 2",
909 "AsyncDataConn::drop 2",
910 ]
911 );
912 }
913
914 #[test]
915 fn test_commit_but_fail_second_pre_commit() {
916 let logger = Arc::new(Mutex::new(Vec::new()));
917
918 {
919 let mut manager = DataConnManager::new();
920
921 let conn = AsyncDataConn::new(1, logger.clone(), Fail::PreCommit);
922 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
923 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
924 manager.add(nnptr);
925
926 let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
927 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
928 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
929 manager.add(nnptr);
930
931 if let Err(e) = manager.commit() {
932 match e.reason::<DataConnError>() {
933 Ok(DataConnError::FailToPreCommitDataConn { errors }) => {
934 assert_eq!(errors.len(), 1);
935 assert_eq!(errors[0].0, "foo".into());
936 assert_eq!(errors[0].1.reason::<String>().unwrap(), "yyy");
937 }
938 _ => panic!(),
939 }
940 } else {
941 panic!();
942 }
943 }
944
945 assert_eq!(
946 *logger.lock().unwrap(),
947 &[
948 "AsyncDataConn::new 1",
949 "SyncDataConn::new 2",
950 "SyncDataConn::pre_commit 2",
951 "AsyncDataConn::pre_commit 1 failed",
952 "AsyncDataConn::close 1",
953 "AsyncDataConn::drop 1",
954 "SyncDataConn::close 2",
955 "SyncDataConn::drop 2",
956 ]
957 );
958 }
959
960 #[test]
961 fn test_commit_but_fail_first_sync_commit() {
962 let logger = Arc::new(Mutex::new(Vec::new()));
963
964 {
965 let mut manager = DataConnManager::new();
966
967 let conn = SyncDataConn::new(1, logger.clone(), Fail::Commit);
968 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
969 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
970 manager.add(nnptr);
971
972 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Commit);
973 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
974 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
975 manager.add(nnptr);
976
977 if let Err(e) = manager.commit() {
978 match e.reason::<DataConnError>() {
979 Ok(DataConnError::FailToCommitDataConn { errors }) => {
980 assert_eq!(errors.len(), 1);
981 assert_eq!(errors[0].0, "foo".into());
982 assert_eq!(errors[0].1.reason::<String>().unwrap(), "ZZZ");
983 }
984 _ => panic!(),
985 }
986 } else {
987 panic!();
988 }
989 }
990
991 assert_eq!(
992 *logger.lock().unwrap(),
993 &[
994 "SyncDataConn::new 1",
995 "AsyncDataConn::new 2",
996 "SyncDataConn::pre_commit 1",
997 "AsyncDataConn::pre_commit 2",
998 "SyncDataConn::commit 1 failed",
999 "SyncDataConn::close 1",
1000 "SyncDataConn::drop 1",
1001 "AsyncDataConn::close 2",
1002 "AsyncDataConn::drop 2",
1003 ]
1004 );
1005 }
1006
1007 #[test]
1008 fn test_commit_but_fail_first_async_commit() {
1009 let logger = Arc::new(Mutex::new(Vec::new()));
1010
1011 {
1012 let mut manager = DataConnManager::new();
1013
1014 let conn = AsyncDataConn::new(1, logger.clone(), Fail::Commit);
1015 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1016 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1017 manager.add(nnptr);
1018
1019 let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
1020 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1021 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1022 manager.add(nnptr);
1023
1024 if let Err(e) = manager.commit() {
1025 match e.reason::<DataConnError>() {
1026 Ok(DataConnError::FailToCommitDataConn { errors }) => {
1027 assert_eq!(errors.len(), 1);
1028 assert_eq!(errors[0].0, "foo".into());
1029 assert_eq!(errors[0].1.reason::<String>().unwrap(), "YYY");
1030 }
1031 _ => panic!(),
1032 }
1033 } else {
1034 panic!();
1035 }
1036 }
1037
1038 assert_eq!(
1039 *logger.lock().unwrap(),
1040 &[
1041 "AsyncDataConn::new 1",
1042 "SyncDataConn::new 2",
1043 "SyncDataConn::pre_commit 2",
1044 "AsyncDataConn::pre_commit 1",
1045 "SyncDataConn::commit 2",
1046 "AsyncDataConn::commit 1 failed",
1047 "AsyncDataConn::close 1",
1048 "AsyncDataConn::drop 1",
1049 "SyncDataConn::close 2",
1050 "SyncDataConn::drop 2",
1051 ]
1052 );
1053 }
1054
1055 #[test]
1056 fn test_commit_but_fail_second_commit() {
1057 let logger = Arc::new(Mutex::new(Vec::new()));
1058
1059 {
1060 let mut manager = DataConnManager::new();
1061
1062 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
1063 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1064 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1065 manager.add(nnptr);
1066
1067 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Commit);
1068 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1069 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1070 manager.add(nnptr);
1071
1072 if let Err(e) = manager.commit() {
1073 match e.reason::<DataConnError>() {
1074 Ok(DataConnError::FailToCommitDataConn { errors }) => {
1075 assert_eq!(errors.len(), 1);
1076 assert_eq!(errors[0].0, "bar".into());
1077 assert_eq!(errors[0].1.reason::<String>().unwrap(), "YYY");
1078 }
1079 _ => panic!(),
1080 }
1081 } else {
1082 panic!();
1083 }
1084 }
1085
1086 assert_eq!(
1087 *logger.lock().unwrap(),
1088 &[
1089 "SyncDataConn::new 1",
1090 "AsyncDataConn::new 2",
1091 "SyncDataConn::pre_commit 1",
1092 "AsyncDataConn::pre_commit 2",
1093 "SyncDataConn::commit 1",
1094 "AsyncDataConn::commit 2 failed",
1095 "SyncDataConn::close 1",
1096 "SyncDataConn::drop 1",
1097 "AsyncDataConn::close 2",
1098 "AsyncDataConn::drop 2",
1099 ]
1100 );
1101 }
1102
1103 #[test]
1104 fn test_rollback_and_first_is_sync() {
1105 let logger = Arc::new(Mutex::new(Vec::new()));
1106
1107 {
1108 let mut manager = DataConnManager::new();
1109
1110 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
1111 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1112 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1113 manager.add(nnptr);
1114
1115 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
1116 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1117 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1118 manager.add(nnptr);
1119
1120 manager.rollback();
1121 }
1122
1123 assert_eq!(
1124 *logger.lock().unwrap(),
1125 &[
1126 "SyncDataConn::new 1",
1127 "AsyncDataConn::new 2",
1128 "SyncDataConn::rollback 1",
1129 "AsyncDataConn::rollback 2",
1130 "SyncDataConn::close 1",
1131 "SyncDataConn::drop 1",
1132 "AsyncDataConn::close 2",
1133 "AsyncDataConn::drop 2",
1134 ]
1135 );
1136 }
1137
1138 #[test]
1139 fn test_rollback_and_first_is_async() {
1140 let logger = Arc::new(Mutex::new(Vec::new()));
1141
1142 {
1143 let mut manager = DataConnManager::new();
1144
1145 let conn = AsyncDataConn::new(1, logger.clone(), Fail::Not);
1146 let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
1147 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1148 manager.add(nnptr);
1149
1150 let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
1151 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1152 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1153 manager.add(nnptr);
1154
1155 manager.rollback();
1156 }
1157
1158 assert_eq!(
1159 *logger.lock().unwrap(),
1160 &[
1161 "AsyncDataConn::new 1",
1162 "SyncDataConn::new 2",
1163 "SyncDataConn::rollback 2",
1164 "AsyncDataConn::rollback 1",
1165 "AsyncDataConn::close 1",
1166 "AsyncDataConn::drop 1",
1167 "SyncDataConn::close 2",
1168 "SyncDataConn::drop 2",
1169 ]
1170 );
1171 }
1172
1173 #[test]
1174 fn test_force_back_and_first_is_sync() {
1175 let logger = Arc::new(Mutex::new(Vec::new()));
1176
1177 {
1178 let mut manager = DataConnManager::new();
1179
1180 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
1181 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1182 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1183 manager.add(nnptr);
1184
1185 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
1186 let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
1187 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1188 manager.add(nnptr);
1189
1190 assert!(manager.commit().is_ok());
1191 manager.rollback();
1192 }
1193
1194 assert_eq!(
1195 *logger.lock().unwrap(),
1196 &[
1197 "SyncDataConn::new 1",
1198 "AsyncDataConn::new 2",
1199 "SyncDataConn::pre_commit 1",
1200 "AsyncDataConn::pre_commit 2",
1201 "SyncDataConn::commit 1",
1202 "AsyncDataConn::commit 2",
1203 "SyncDataConn::post_commit 1",
1204 "AsyncDataConn::post_commit 2",
1205 "SyncDataConn::force_back 1",
1206 "AsyncDataConn::force_back 2",
1207 "SyncDataConn::close 1",
1208 "SyncDataConn::drop 1",
1209 "AsyncDataConn::close 2",
1210 "AsyncDataConn::drop 2",
1211 ]
1212 );
1213 }
1214
1215 #[test]
1216 fn test_force_back_and_first_is_async() {
1217 let logger = Arc::new(Mutex::new(Vec::new()));
1218
1219 {
1220 let mut manager = DataConnManager::new();
1221
1222 let conn = AsyncDataConn::new(1, logger.clone(), Fail::Not);
1223 let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
1224 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1225 manager.add(nnptr);
1226
1227 let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
1228 let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
1229 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1230 manager.add(nnptr);
1231
1232 assert!(manager.commit().is_ok());
1233 manager.rollback();
1234 }
1235
1236 assert_eq!(
1237 *logger.lock().unwrap(),
1238 &[
1239 "AsyncDataConn::new 1",
1240 "SyncDataConn::new 2",
1241 "SyncDataConn::pre_commit 2",
1242 "AsyncDataConn::pre_commit 1",
1243 "SyncDataConn::commit 2",
1244 "AsyncDataConn::commit 1",
1245 "SyncDataConn::post_commit 2",
1246 "AsyncDataConn::post_commit 1",
1247 "SyncDataConn::force_back 2",
1248 "AsyncDataConn::force_back 1",
1249 "AsyncDataConn::close 1",
1250 "AsyncDataConn::drop 1",
1251 "SyncDataConn::close 2",
1252 "SyncDataConn::drop 2",
1253 ]
1254 );
1255 }
1256 }
1257}