1use super::{AsyncGroup, DataConn, DataConnContainer, DataConnManager, SendSyncNonNull};
6
7use std::collections::HashMap;
8use std::future::Future;
9use std::pin::Pin;
10use std::sync::Arc;
11use std::{any, mem};
12
13#[allow(clippy::enum_variant_names)]
15#[derive(Debug)]
16pub enum DataConnError {
17 FailToPreCommitDataConn {
19 errors: Vec<(Arc<str>, errs::Err)>,
21 },
22
23 FailToCommitDataConn {
25 errors: Vec<(Arc<str>, errs::Err)>,
27 },
28
29 FailToCastDataConn {
31 name: Arc<str>,
33 target_type: &'static str,
35 },
36}
37
38impl<C> DataConnContainer<C>
39where
40 C: DataConn + 'static,
41{
42 pub(crate) fn new(name: impl Into<Arc<str>>, data_conn: Box<C>) -> Self {
43 Self {
44 drop_fn: drop_data_conn::<C>,
45 is_fn: is_data_conn::<C>,
46 commit_fn: commit_data_conn_async::<C>,
47 pre_commit_fn: pre_commit_data_conn_async::<C>,
48 post_commit_fn: post_commit_data_conn_async::<C>,
49 should_force_back_fn: should_force_back_data_conn::<C>,
50 rollback_fn: rollback_data_conn_async::<C>,
51 force_back_fn: force_back_data_conn_async::<C>,
52 close_fn: close_data_conn::<C>,
53
54 name: name.into(),
55 data_conn,
56 }
57 }
58}
59
60fn drop_data_conn<C>(ptr: *const DataConnContainer)
61where
62 C: DataConn + 'static,
63{
64 unsafe {
65 drop(Box::from_raw(ptr as *mut DataConnContainer<C>));
66 }
67}
68
69fn is_data_conn<C>(type_id: any::TypeId) -> bool
70where
71 C: DataConn + 'static,
72{
73 any::TypeId::of::<C>() == type_id
74}
75
76fn commit_data_conn_async<C>(
77 ptr: *const DataConnContainer,
78 ag: &mut AsyncGroup,
79) -> Pin<Box<dyn Future<Output = errs::Result<()>> + '_>>
80where
81 C: DataConn + 'static,
82{
83 let container = unsafe { &mut *(ptr as *mut DataConnContainer<C>) };
84 Box::pin(container.data_conn.commit_async(ag))
85}
86
87fn pre_commit_data_conn_async<C>(
88 ptr: *const DataConnContainer,
89 ag: &mut AsyncGroup,
90) -> Pin<Box<dyn Future<Output = errs::Result<()>> + '_>>
91where
92 C: DataConn + 'static,
93{
94 let container = unsafe { &mut *(ptr as *mut DataConnContainer<C>) };
95 Box::pin(container.data_conn.pre_commit_async(ag))
96}
97
98fn post_commit_data_conn_async<C>(
99 ptr: *const DataConnContainer,
100 ag: &mut AsyncGroup,
101) -> Pin<Box<dyn Future<Output = ()> + '_>>
102where
103 C: DataConn + 'static,
104{
105 let container = unsafe { &mut *(ptr as *mut DataConnContainer<C>) };
106 Box::pin(container.data_conn.post_commit_async(ag))
107}
108
109fn should_force_back_data_conn<C>(ptr: *const DataConnContainer) -> bool
110where
111 C: DataConn + 'static,
112{
113 let container = unsafe { &*(ptr as *const DataConnContainer<C>) };
114 container.data_conn.should_force_back()
115}
116
117fn rollback_data_conn_async<C>(
118 ptr: *const DataConnContainer,
119 ag: &mut AsyncGroup,
120) -> Pin<Box<dyn Future<Output = ()> + '_>>
121where
122 C: DataConn + 'static,
123{
124 let container = unsafe { &mut *(ptr as *mut DataConnContainer<C>) };
125 Box::pin(container.data_conn.rollback_async(ag))
126}
127
128fn force_back_data_conn_async<C>(
129 ptr: *const DataConnContainer,
130 ag: &mut AsyncGroup,
131) -> Pin<Box<dyn Future<Output = ()> + '_>>
132where
133 C: DataConn + 'static,
134{
135 let container = unsafe { &mut *(ptr as *mut DataConnContainer<C>) };
136 Box::pin(container.data_conn.force_back_async(ag))
137}
138
139fn close_data_conn<C>(ptr: *const DataConnContainer)
140where
141 C: DataConn + 'static,
142{
143 let container = unsafe { &mut *(ptr as *mut DataConnContainer<C>) };
144 container.data_conn.close();
145}
146
147impl DataConnManager {
148 pub(crate) fn new() -> Self {
149 Self {
150 vec: Vec::new(),
151 index_map: HashMap::new(),
152 }
153 }
154
155 pub(crate) fn with_commit_order(names: &[&str]) -> Self {
156 let mut index_map = HashMap::with_capacity(names.len());
157 for (i, nm) in names.iter().rev().enumerate() {
159 index_map.insert((*nm).into(), names.len() - 1 - i);
160 }
161
162 Self {
163 vec: vec![None; names.len()],
164 index_map,
165 }
166 }
167
168 pub(crate) fn add(&mut self, ssnnptr: SendSyncNonNull<DataConnContainer>) {
169 let name = unsafe { (*ssnnptr.non_null_ptr.as_ptr()).name.clone() };
170 if let Some(index) = self.index_map.get(&name) {
171 self.vec[*index] = Some(ssnnptr);
172 } else {
173 let index = self.vec.len();
174 self.vec.push(Some(ssnnptr));
175 self.index_map.insert(name.clone(), index);
176 }
177 }
178
179 pub(crate) fn find_by_name(
180 &self,
181 name: impl AsRef<str>,
182 ) -> Option<SendSyncNonNull<DataConnContainer>> {
183 if let Some(index) = self.index_map.get(name.as_ref()) {
184 if *index < self.vec.len() {
185 if let Some(ssnnptr) = &self.vec[*index] {
186 let ptr = ssnnptr.non_null_ptr.as_ptr();
187 let cont_name = unsafe { &(*ptr).name };
188 if cont_name.as_ref() == name.as_ref() {
189 return Some(ssnnptr.clone());
190 }
191 }
192 }
193 }
194
195 None
196 }
197
198 pub(crate) fn to_typed_ptr<C>(
199 ssnnptr: &SendSyncNonNull<DataConnContainer>,
200 ) -> errs::Result<*mut DataConnContainer<C>>
201 where
202 C: DataConn + 'static,
203 {
204 let ptr = ssnnptr.non_null_ptr.as_ptr();
205 let name = unsafe { &(*ptr).name };
206 let type_id = any::TypeId::of::<C>();
207 let is_fn = unsafe { (*ptr).is_fn };
208
209 if !is_fn(type_id) {
210 return Err(errs::Err::new(DataConnError::FailToCastDataConn {
211 name: name.clone(),
212 target_type: any::type_name::<C>(),
213 }));
214 }
215
216 let typed_ptr = ptr as *mut DataConnContainer<C>;
217 Ok(typed_ptr)
218 }
219
220 pub(crate) async fn commit_async(&self) -> errs::Result<()> {
221 let mut errors = Vec::new();
222
223 let mut ag = AsyncGroup::new();
224 for ssnnptr in self.vec.iter().flatten() {
225 let ptr = ssnnptr.non_null_ptr.as_ptr();
226 let pre_commit_fn = unsafe { (*ptr).pre_commit_fn };
227 ag._name = unsafe { (*ptr).name.clone() };
228 if let Err(err) = pre_commit_fn(ptr, &mut ag).await {
229 errors.push((ag._name.clone(), err));
230 break;
231 }
232 }
233 ag.join_and_collect_errors_async(&mut errors).await;
234
235 if !errors.is_empty() {
236 return Err(errs::Err::new(DataConnError::FailToPreCommitDataConn {
237 errors,
238 }));
239 }
240
241 let mut ag = AsyncGroup::new();
242 for ssnnptr in self.vec.iter().flatten() {
243 let ptr = ssnnptr.non_null_ptr.as_ptr();
244 let commit_fn = unsafe { (*ptr).commit_fn };
245 ag._name = unsafe { (*ptr).name.clone() };
246 if let Err(err) = commit_fn(ptr, &mut ag).await {
247 errors.push((ag._name.clone(), err));
248 break;
249 }
250 }
251 ag.join_and_collect_errors_async(&mut errors).await;
252
253 if !errors.is_empty() {
254 return Err(errs::Err::new(DataConnError::FailToCommitDataConn {
255 errors,
256 }));
257 }
258
259 let mut ag = AsyncGroup::new();
260 for ssnnptr in self.vec.iter().flatten() {
261 let ptr = ssnnptr.non_null_ptr.as_ptr();
262 let post_commit_fn = unsafe { (*ptr).post_commit_fn };
263 ag._name = unsafe { (*ptr).name.clone() };
264 post_commit_fn(ptr, &mut ag).await;
265 }
266 ag.join_and_ignore_errors_async().await;
267
268 Ok(())
269 }
270
271 pub(crate) async fn rollback_async(&mut self) {
272 let mut ag = AsyncGroup::new();
273 for ssnnptr in self.vec.iter().flatten() {
274 let ptr = ssnnptr.non_null_ptr.as_ptr();
275 let should_force_back_fn = unsafe { (*ptr).should_force_back_fn };
276 let force_back_fn = unsafe { (*ptr).force_back_fn };
277 let rollback_fn = unsafe { (*ptr).rollback_fn };
278 ag._name = unsafe { (*ptr).name.clone() };
279
280 if should_force_back_fn(ptr) {
281 force_back_fn(ptr, &mut ag).await;
282 } else {
283 rollback_fn(ptr, &mut ag).await;
284 }
285 }
286 ag.join_and_ignore_errors_async().await;
287 }
288
289 pub(crate) fn close(&mut self) {
290 self.index_map.clear();
291
292 let vec: Vec<Option<SendSyncNonNull<DataConnContainer>>> = mem::take(&mut self.vec);
293
294 for ssnnptr in vec.iter().flatten() {
295 let ptr = ssnnptr.non_null_ptr.as_ptr();
296 let close_fn = unsafe { (*ptr).close_fn };
297 let drop_fn = unsafe { (*ptr).drop_fn };
298 close_fn(ptr);
299 drop_fn(ptr);
300 }
301 }
302}
303
304impl Drop for DataConnManager {
305 fn drop(&mut self) {
306 self.close();
307 }
308}
309
310#[cfg(test)]
311mod tests_of_data_conn {
312 use super::*;
313 use std::ptr;
314 use std::sync::{
315 atomic::{AtomicBool, Ordering},
316 Arc, Mutex,
317 };
318 use tokio::time;
319
320 #[derive(PartialEq, Copy, Clone)]
321 enum Fail {
322 Not,
323 Commit,
324 PreCommit,
325 }
326
327 struct SyncDataConn {
328 id: i8,
329 committed: AtomicBool,
330 fail: Fail,
331 logger: Arc<Mutex<Vec<String>>>,
332 }
333 impl SyncDataConn {
334 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: Fail) -> Self {
335 logger
336 .lock()
337 .unwrap()
338 .push(format!("SyncDataConn::new {}", id));
339 Self {
340 id,
341 committed: AtomicBool::new(false),
342 fail,
343 logger,
344 }
345 }
346 }
347 impl Drop for SyncDataConn {
348 fn drop(&mut self) {
349 self.logger
350 .lock()
351 .unwrap()
352 .push(format!("SyncDataConn::drop {}", self.id));
353 }
354 }
355 impl DataConn for SyncDataConn {
356 async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
357 let fail = self.fail;
358 let id = self.id;
359 let logger = self.logger.clone();
360 let committed = &self.committed;
361
362 if fail == Fail::Commit {
363 logger
364 .lock()
365 .unwrap()
366 .push(format!("SyncDataConn::commit {} failed", id));
367 return Err(errs::Err::new("ZZZ".to_string()));
368 }
369 committed.store(true, Ordering::Release);
370 logger
371 .lock()
372 .unwrap()
373 .push(format!("SyncDataConn::commit {}", id));
374 Ok(())
375 }
376 async fn pre_commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
377 let fail = self.fail;
378 let id = self.id;
379 let logger = self.logger.clone();
380
381 if fail == Fail::PreCommit {
382 logger
383 .lock()
384 .unwrap()
385 .push(format!("SyncDataConn::pre_commit {} failed", id));
386 return Err(errs::Err::new("zzz".to_string()));
387 }
388 logger
389 .lock()
390 .unwrap()
391 .push(format!("SyncDataConn::pre_commit {}", id));
392 Ok(())
393 }
394 async fn post_commit_async(&mut self, _ag: &mut AsyncGroup) {
395 let id = self.id;
396 let logger = self.logger.clone();
397
398 logger
399 .lock()
400 .unwrap()
401 .push(format!("SyncDataConn::post_commit {}", id));
402 }
403 fn should_force_back(&self) -> bool {
404 self.committed.load(Ordering::Acquire)
405 }
406 async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {
407 let id = self.id;
408 let logger = self.logger.clone();
409
410 logger
411 .lock()
412 .unwrap()
413 .push(format!("SyncDataConn::rollback {}", id));
414 }
415 async fn force_back_async(&mut self, _ag: &mut AsyncGroup) {
416 let id = self.id;
417 let logger = self.logger.clone();
418
419 logger
420 .lock()
421 .unwrap()
422 .push(format!("SyncDataConn::force_back {}", id));
423 }
424 fn close(&mut self) {
425 self.logger
426 .lock()
427 .unwrap()
428 .push(format!("SyncDataConn::close {}", self.id));
429 }
430 }
431
432 struct AsyncDataConn {
433 id: i8,
434 committed: Arc<AtomicBool>,
435 fail: Fail,
436 logger: Arc<Mutex<Vec<String>>>,
437 }
438 impl AsyncDataConn {
439 fn new(id: i8, logger: Arc<Mutex<Vec<String>>>, fail: Fail) -> Self {
440 logger
441 .lock()
442 .unwrap()
443 .push(format!("AsyncDataConn::new {}", id));
444 Self {
445 id,
446 committed: Arc::new(AtomicBool::new(false)),
447 fail,
448 logger,
449 }
450 }
451 }
452 impl Drop for AsyncDataConn {
453 fn drop(&mut self) {
454 self.logger
455 .lock()
456 .unwrap()
457 .push(format!("AsyncDataConn::drop {}", self.id));
458 }
459 }
460 impl DataConn for AsyncDataConn {
461 async fn commit_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
462 let fail = self.fail;
463 let id = self.id;
464 let logger = self.logger.clone();
465 let committed = self.committed.clone();
466
467 ag.add(async move {
468 time::sleep(time::Duration::from_millis(100)).await;
469 if fail == Fail::Commit {
470 logger
471 .lock()
472 .unwrap()
473 .push(format!("AsyncDataConn::commit {} failed", id));
474 return Err(errs::Err::new("YYY".to_string()));
475 }
476 committed.store(true, Ordering::Release);
477 logger
478 .lock()
479 .unwrap()
480 .push(format!("AsyncDataConn::commit {}", id));
481 Ok(())
482 });
483 Ok(())
484 }
485 async fn pre_commit_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
486 let fail = self.fail;
487 let id = self.id;
488 let logger = self.logger.clone();
489
490 ag.add(async move {
491 time::sleep(time::Duration::from_millis(100)).await;
492 if fail == Fail::PreCommit {
493 logger
494 .lock()
495 .unwrap()
496 .push(format!("AsyncDataConn::pre_commit {} failed", id));
497 return Err(errs::Err::new("yyy".to_string()));
498 }
499 logger
500 .lock()
501 .unwrap()
502 .push(format!("AsyncDataConn::pre_commit {}", id));
503 Ok(())
504 });
505 Ok(())
506 }
507 async fn post_commit_async(&mut self, ag: &mut AsyncGroup) {
508 let logger = self.logger.clone();
509 let id = self.id;
510
511 ag.add(async move {
512 time::sleep(time::Duration::from_millis(100)).await;
513 logger
514 .lock()
515 .unwrap()
516 .push(format!("AsyncDataConn::post_commit {}", id));
517 Ok(())
518 });
519 }
520 fn should_force_back(&self) -> bool {
521 self.committed.load(Ordering::Acquire)
522 }
523 async fn rollback_async(&mut self, ag: &mut AsyncGroup) {
524 let logger = self.logger.clone();
525 let id = self.id;
526
527 ag.add(async move {
528 time::sleep(time::Duration::from_millis(100)).await;
529 logger
530 .lock()
531 .unwrap()
532 .push(format!("AsyncDataConn::rollback {}", id));
533 Ok(())
534 });
535 }
536 async fn force_back_async(&mut self, ag: &mut AsyncGroup) {
537 let logger = self.logger.clone();
538 let id = self.id;
539
540 ag.add(async move {
541 time::sleep(time::Duration::from_millis(100)).await;
542 logger
543 .lock()
544 .unwrap()
545 .push(format!("AsyncDataConn::force_back {}", id));
546 Ok(())
547 });
548 }
549 fn close(&mut self) {
550 self.logger
551 .lock()
552 .unwrap()
553 .push(format!("AsyncDataConn::close {}", self.id));
554 }
555 }
556
557 mod tests_of_data_conn_manager {
558 use super::*;
559
560 #[tokio::test]
561 async fn test_new() {
562 let manager = DataConnManager::new();
563 assert!(manager.vec.is_empty());
564 }
565
566 #[tokio::test]
567 async fn test_with_commit_order() {
568 let manager = DataConnManager::with_commit_order(&["bar", "baz", "foo"]);
569 assert_eq!(manager.vec.len(), 3);
570 assert!(manager.vec[0].is_none());
571 assert!(manager.vec[1].is_none());
572 assert!(manager.vec[2].is_none());
573 assert_eq!(manager.index_map.len(), 3);
574 assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
575 assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
576 assert_eq!(*manager.index_map.get("baz").unwrap(), 1);
577 }
578
579 #[test]
580 fn test_new_and_add() {
581 let logger = Arc::new(Mutex::new(Vec::new()));
582
583 let mut manager = DataConnManager::new();
584 assert!(manager.vec.is_empty());
585 assert!(manager.index_map.is_empty());
586
587 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
588 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
589 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
590 let ssnnptr = SendSyncNonNull::new(nnptr);
591 manager.add(ssnnptr);
592 assert_eq!(manager.vec.len(), 1);
593 assert_eq!(manager.index_map.len(), 1);
594 assert_eq!(*manager.index_map.get("foo").unwrap(), 0);
595
596 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
597 let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
598 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
599 let ssnnptr = SendSyncNonNull::new(nnptr);
600 manager.add(ssnnptr);
601 assert_eq!(manager.vec.len(), 2);
602 assert_eq!(manager.index_map.len(), 2);
603 assert_eq!(*manager.index_map.get("foo").unwrap(), 0);
604 assert_eq!(*manager.index_map.get("bar").unwrap(), 1);
605 }
606
607 #[test]
608 fn test_with_commit_order_and_add() {
609 let logger = Arc::new(Mutex::new(Vec::new()));
610
611 let mut manager = DataConnManager::with_commit_order(&["bar", "baz", "foo"]);
612 assert_eq!(manager.vec.len(), 3);
613 assert!(manager.vec[0].is_none());
614 assert!(manager.vec[1].is_none());
615 assert!(manager.vec[2].is_none());
616 assert_eq!(manager.index_map.len(), 3);
617 assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
618 assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
619 assert_eq!(*manager.index_map.get("baz").unwrap(), 1);
620
621 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
622 let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
623 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
624 let ssnnptr = SendSyncNonNull::new(nnptr);
625 manager.add(ssnnptr);
626 assert_eq!(manager.vec.len(), 3);
627 assert_eq!(manager.index_map.len(), 3);
628 assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
629
630 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
631 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
632 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
633 let ssnnptr = SendSyncNonNull::new(nnptr);
634 manager.add(ssnnptr);
635 assert_eq!(manager.vec.len(), 3);
636 assert_eq!(manager.index_map.len(), 3);
637 assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
638 assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
639
640 let conn = SyncDataConn::new(3, logger.clone(), Fail::Not);
641 let boxed = Box::new(DataConnContainer::new("qux", Box::new(conn)));
642 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
643 let ssnnptr = SendSyncNonNull::new(nnptr);
644 manager.add(ssnnptr);
645 assert_eq!(manager.vec.len(), 4);
646 assert_eq!(manager.index_map.len(), 4);
647 assert_eq!(*manager.index_map.get("foo").unwrap(), 2);
648 assert_eq!(*manager.index_map.get("bar").unwrap(), 0);
649 assert_eq!(*manager.index_map.get("qux").unwrap(), 3);
650 }
651
652 #[test]
653 fn test_find_by_name_but_none() {
654 let manager = DataConnManager::new();
655 assert!(manager.find_by_name("foo").is_none());
656 assert!(manager.find_by_name("bar").is_none());
657 }
658
659 #[test]
660 fn test_find_by_name_and_found() {
661 let logger = Arc::new(Mutex::new(Vec::new()));
662
663 let mut manager = DataConnManager::new();
664
665 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
666 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
667 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
668 let ssnnptr = SendSyncNonNull::new(nnptr);
669 manager.add(ssnnptr);
670
671 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
672 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
673 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
674 let ssnnptr = SendSyncNonNull::new(nnptr);
675 manager.add(ssnnptr);
676
677 if let Some(ssnnptr) = manager.find_by_name("foo") {
678 let name = unsafe { (*ssnnptr.non_null_ptr.as_ptr()).name.clone() };
679 assert_eq!(name.as_ref(), "foo");
680 } else {
681 panic!();
682 }
683
684 if let Some(ssnnptr) = manager.find_by_name("bar") {
685 let name = unsafe { (*ssnnptr.non_null_ptr.as_ptr()).name.clone() };
686 assert_eq!(name.as_ref(), "bar");
687 } else {
688 panic!();
689 }
690 }
691
692 #[test]
693 fn test_to_typed_ptr() {
694 let logger = Arc::new(Mutex::new(Vec::new()));
695
696 let mut manager = DataConnManager::new();
697
698 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
699 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
700 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
701 let ssnnptr = SendSyncNonNull::new(nnptr);
702 manager.add(ssnnptr);
703
704 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
705 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
706 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
707 let ssnnptr = SendSyncNonNull::new(nnptr);
708 manager.add(ssnnptr);
709
710 let nnptr = manager.find_by_name("foo").unwrap();
711 if let Ok(typed_nnptr) = DataConnManager::to_typed_ptr::<SyncDataConn>(&nnptr) {
712 assert_eq!(any::type_name_of_val(&typed_nnptr), "*mut sabi::tokio::DataConnContainer<sabi::tokio::data_conn::tests_of_data_conn::SyncDataConn>");
713 assert_eq!(unsafe { (*typed_nnptr).name.clone() }, "foo".into());
714 } else {
715 panic!();
716 }
717
718 let nnptr = manager.find_by_name("bar").unwrap();
719 if let Ok(typed_nnptr) = DataConnManager::to_typed_ptr::<AsyncDataConn>(&nnptr) {
720 assert_eq!(any::type_name_of_val(&typed_nnptr), "*mut sabi::tokio::DataConnContainer<sabi::tokio::data_conn::tests_of_data_conn::AsyncDataConn>");
721 assert_eq!(unsafe { (*typed_nnptr).name.clone() }, "bar".into());
722 } else {
723 panic!();
724 }
725 }
726
727 #[test]
728 fn test_to_typed_ptr_but_fail() {
729 let logger = Arc::new(Mutex::new(Vec::new()));
730
731 let mut manager = DataConnManager::new();
732
733 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
734 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
735 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
736 let ssnnptr = SendSyncNonNull::new(nnptr);
737 manager.add(ssnnptr);
738
739 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
740 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
741 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
742 let ssnnptr = SendSyncNonNull::new(nnptr);
743 manager.add(ssnnptr);
744
745 let ssnnptr = manager.find_by_name("foo").unwrap();
746 if let Err(err) = DataConnManager::to_typed_ptr::<AsyncDataConn>(&ssnnptr) {
747 match err.reason::<DataConnError>() {
748 Ok(DataConnError::FailToCastDataConn { name, target_type }) => {
749 assert_eq!(name.as_ref(), "foo");
750 assert_eq!(
751 *target_type,
752 "sabi::tokio::data_conn::tests_of_data_conn::AsyncDataConn"
753 );
754 }
755 _ => panic!(),
756 }
757 } else {
758 panic!();
759 }
760
761 let ssnnptr = manager.find_by_name("bar").unwrap();
762 if let Err(err) = DataConnManager::to_typed_ptr::<SyncDataConn>(&ssnnptr) {
763 match err.reason::<DataConnError>() {
764 Ok(DataConnError::FailToCastDataConn { name, target_type }) => {
765 assert_eq!(name.as_ref(), "bar");
766 assert_eq!(
767 *target_type,
768 "sabi::tokio::data_conn::tests_of_data_conn::SyncDataConn"
769 );
770 }
771 _ => panic!(),
772 }
773 } else {
774 panic!();
775 }
776 }
777
778 #[tokio::test]
779 async fn test_commit_ok() {
780 let logger = Arc::new(Mutex::new(Vec::new()));
781
782 {
783 let mut manager = DataConnManager::new();
784
785 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
786 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
787 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
788 let ssnnptr = SendSyncNonNull::new(nnptr);
789 manager.add(ssnnptr);
790
791 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
792 let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
793 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
794 let ssnnptr = SendSyncNonNull::new(nnptr);
795 manager.add(ssnnptr);
796
797 assert!(manager.commit_async().await.is_ok());
798 }
799
800 assert_eq!(
801 *logger.lock().unwrap(),
802 &[
803 "SyncDataConn::new 1",
804 "AsyncDataConn::new 2",
805 "SyncDataConn::pre_commit 1",
806 "AsyncDataConn::pre_commit 2",
807 "SyncDataConn::commit 1",
808 "AsyncDataConn::commit 2",
809 "SyncDataConn::post_commit 1",
810 "AsyncDataConn::post_commit 2",
811 "SyncDataConn::close 1",
812 "SyncDataConn::drop 1",
813 "AsyncDataConn::close 2",
814 "AsyncDataConn::drop 2",
815 ]
816 );
817 }
818
819 #[tokio::test]
820 async fn test_commit_with_order() {
821 let logger = Arc::new(Mutex::new(Vec::new()));
822
823 {
824 let mut manager = DataConnManager::with_commit_order(&["bar", "baz", "foo"]);
825
826 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
827 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
828 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
829 let ssnnptr = SendSyncNonNull::new(nnptr);
830 manager.add(ssnnptr);
831
832 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
833 let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
834 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
835 let ssnnptr = SendSyncNonNull::new(nnptr);
836 manager.add(ssnnptr);
837
838 let conn = SyncDataConn::new(3, logger.clone(), Fail::Not);
839 let boxed = Box::new(DataConnContainer::new("qux", Box::new(conn)));
840 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
841 let ssnnptr = SendSyncNonNull::new(nnptr);
842 manager.add(ssnnptr);
843
844 assert!(manager.commit_async().await.is_ok());
845 }
846
847 assert_eq!(
848 *logger.lock().unwrap(),
849 &[
850 "SyncDataConn::new 1",
851 "AsyncDataConn::new 2",
852 "SyncDataConn::new 3",
853 "SyncDataConn::pre_commit 1",
854 "SyncDataConn::pre_commit 3",
855 "AsyncDataConn::pre_commit 2", "SyncDataConn::commit 1",
857 "SyncDataConn::commit 3",
858 "AsyncDataConn::commit 2", "SyncDataConn::post_commit 1",
860 "SyncDataConn::post_commit 3",
861 "AsyncDataConn::post_commit 2", "AsyncDataConn::close 2",
863 "AsyncDataConn::drop 2",
864 "SyncDataConn::close 1",
865 "SyncDataConn::drop 1",
866 "SyncDataConn::close 3",
867 "SyncDataConn::drop 3",
868 ]
869 );
870 }
871
872 #[tokio::test]
873 async fn test_commit_but_fail_first_sync_pre_commit() {
874 let logger = Arc::new(Mutex::new(Vec::new()));
875
876 {
877 let mut manager = DataConnManager::new();
878
879 let conn = SyncDataConn::new(1, logger.clone(), Fail::PreCommit);
880 let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
881 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
882 let ssnnptr = SendSyncNonNull::new(nnptr);
883 manager.add(ssnnptr);
884
885 let conn = AsyncDataConn::new(2, logger.clone(), Fail::PreCommit);
886 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
887 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
888 let ssnnptr = SendSyncNonNull::new(nnptr);
889 manager.add(ssnnptr);
890
891 if let Err(e) = manager.commit_async().await {
892 match e.reason::<DataConnError>() {
893 Ok(DataConnError::FailToPreCommitDataConn { errors }) => {
894 assert_eq!(errors.len(), 1);
895 assert_eq!(errors[0].0, "foo".into());
896 assert_eq!(errors[0].1.reason::<String>().unwrap(), "zzz");
897 }
898 _ => panic!(),
899 }
900 } else {
901 panic!();
902 }
903 }
904
905 assert_eq!(
906 *logger.lock().unwrap(),
907 &[
908 "SyncDataConn::new 1",
909 "AsyncDataConn::new 2",
910 "SyncDataConn::pre_commit 1 failed",
911 "SyncDataConn::close 1",
912 "SyncDataConn::drop 1",
913 "AsyncDataConn::close 2",
914 "AsyncDataConn::drop 2",
915 ]
916 );
917 }
918
919 #[tokio::test]
920 async fn test_commit_but_fail_first_async_pre_commit() {
921 let logger = Arc::new(Mutex::new(Vec::new()));
922
923 {
924 let mut manager = DataConnManager::new();
925
926 let conn = SyncDataConn::new(1, logger.clone(), Fail::PreCommit);
927 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
928 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
929 let ssnnptr = SendSyncNonNull::new(nnptr);
930 manager.add(ssnnptr);
931
932 let conn = AsyncDataConn::new(2, logger.clone(), Fail::PreCommit);
933 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
934 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
935 let ssnnptr = SendSyncNonNull::new(nnptr);
936 manager.add(ssnnptr);
937
938 if let Err(e) = manager.commit_async().await {
939 match e.reason::<DataConnError>() {
940 Ok(DataConnError::FailToPreCommitDataConn { errors }) => {
941 assert_eq!(errors.len(), 1);
942 assert_eq!(errors[0].0, "foo".into());
943 assert_eq!(errors[0].1.reason::<String>().unwrap(), "zzz");
944 }
945 _ => panic!(),
946 }
947 } else {
948 panic!();
949 }
950 }
951
952 assert_eq!(
953 *logger.lock().unwrap(),
954 &[
955 "SyncDataConn::new 1",
956 "AsyncDataConn::new 2",
957 "SyncDataConn::pre_commit 1 failed",
958 "SyncDataConn::close 1",
959 "SyncDataConn::drop 1",
960 "AsyncDataConn::close 2",
961 "AsyncDataConn::drop 2",
962 ]
963 );
964 }
965
966 #[tokio::test]
967 async fn test_commit_but_fail_second_pre_commit() {
968 let logger = Arc::new(Mutex::new(Vec::new()));
969
970 {
971 let mut manager = DataConnManager::new();
972
973 let conn = AsyncDataConn::new(1, logger.clone(), Fail::PreCommit);
974 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
975 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
976 let ssnnptr = SendSyncNonNull::new(nnptr);
977 manager.add(ssnnptr);
978
979 let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
980 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
981 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
982 let ssnnptr = SendSyncNonNull::new(nnptr);
983 manager.add(ssnnptr);
984
985 if let Err(e) = manager.commit_async().await {
986 match e.reason::<DataConnError>() {
987 Ok(DataConnError::FailToPreCommitDataConn { errors }) => {
988 assert_eq!(errors.len(), 1);
989 assert_eq!(errors[0].0, "foo".into());
990 assert_eq!(errors[0].1.reason::<String>().unwrap(), "yyy");
991 }
992 _ => panic!(),
993 }
994 } else {
995 panic!();
996 }
997 }
998
999 assert_eq!(
1000 *logger.lock().unwrap(),
1001 &[
1002 "AsyncDataConn::new 1",
1003 "SyncDataConn::new 2",
1004 "SyncDataConn::pre_commit 2",
1005 "AsyncDataConn::pre_commit 1 failed",
1006 "AsyncDataConn::close 1",
1007 "AsyncDataConn::drop 1",
1008 "SyncDataConn::close 2",
1009 "SyncDataConn::drop 2",
1010 ]
1011 );
1012 }
1013
1014 #[tokio::test]
1015 async fn test_commit_but_fail_first_sync_commit() {
1016 let logger = Arc::new(Mutex::new(Vec::new()));
1017
1018 {
1019 let mut manager = DataConnManager::new();
1020
1021 let conn = SyncDataConn::new(1, logger.clone(), Fail::Commit);
1022 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1023 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1024 let ssnnptr = SendSyncNonNull::new(nnptr);
1025 manager.add(ssnnptr);
1026
1027 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Commit);
1028 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1029 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1030 let ssnnptr = SendSyncNonNull::new(nnptr);
1031 manager.add(ssnnptr);
1032
1033 if let Err(e) = manager.commit_async().await {
1034 match e.reason::<DataConnError>() {
1035 Ok(DataConnError::FailToCommitDataConn { errors }) => {
1036 assert_eq!(errors.len(), 1);
1037 assert_eq!(errors[0].0, "foo".into());
1038 assert_eq!(errors[0].1.reason::<String>().unwrap(), "ZZZ");
1039 }
1040 _ => panic!(),
1041 }
1042 } else {
1043 panic!();
1044 }
1045 }
1046
1047 assert_eq!(
1048 *logger.lock().unwrap(),
1049 &[
1050 "SyncDataConn::new 1",
1051 "AsyncDataConn::new 2",
1052 "SyncDataConn::pre_commit 1",
1053 "AsyncDataConn::pre_commit 2",
1054 "SyncDataConn::commit 1 failed",
1055 "SyncDataConn::close 1",
1056 "SyncDataConn::drop 1",
1057 "AsyncDataConn::close 2",
1058 "AsyncDataConn::drop 2",
1059 ]
1060 );
1061 }
1062
1063 #[tokio::test]
1064 async fn test_commit_but_fail_first_async_commit() {
1065 let logger = Arc::new(Mutex::new(Vec::new()));
1066
1067 {
1068 let mut manager = DataConnManager::new();
1069
1070 let conn = AsyncDataConn::new(1, logger.clone(), Fail::Commit);
1071 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1072 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1073 let ssnnptr = SendSyncNonNull::new(nnptr);
1074 manager.add(ssnnptr);
1075
1076 let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
1077 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1078 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1079 let ssnnptr = SendSyncNonNull::new(nnptr);
1080 manager.add(ssnnptr);
1081
1082 if let Err(e) = manager.commit_async().await {
1083 match e.reason::<DataConnError>() {
1084 Ok(DataConnError::FailToCommitDataConn { errors }) => {
1085 assert_eq!(errors.len(), 1);
1086 assert_eq!(errors[0].0, "foo".into());
1087 assert_eq!(errors[0].1.reason::<String>().unwrap(), "YYY");
1088 }
1089 _ => panic!(),
1090 }
1091 } else {
1092 panic!();
1093 }
1094 }
1095
1096 assert_eq!(
1097 *logger.lock().unwrap(),
1098 &[
1099 "AsyncDataConn::new 1",
1100 "SyncDataConn::new 2",
1101 "SyncDataConn::pre_commit 2",
1102 "AsyncDataConn::pre_commit 1",
1103 "SyncDataConn::commit 2",
1104 "AsyncDataConn::commit 1 failed",
1105 "AsyncDataConn::close 1",
1106 "AsyncDataConn::drop 1",
1107 "SyncDataConn::close 2",
1108 "SyncDataConn::drop 2",
1109 ]
1110 );
1111 }
1112
1113 #[tokio::test]
1114 async fn test_commit_but_fail_second_commit() {
1115 let logger = Arc::new(Mutex::new(Vec::new()));
1116
1117 {
1118 let mut manager = DataConnManager::new();
1119
1120 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
1121 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1122 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1123 let ssnnptr = SendSyncNonNull::new(nnptr);
1124 manager.add(ssnnptr);
1125
1126 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Commit);
1127 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1128 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1129 let ssnnptr = SendSyncNonNull::new(nnptr);
1130 manager.add(ssnnptr);
1131
1132 if let Err(e) = manager.commit_async().await {
1133 match e.reason::<DataConnError>() {
1134 Ok(DataConnError::FailToCommitDataConn { errors }) => {
1135 assert_eq!(errors.len(), 1);
1136 assert_eq!(errors[0].0, "bar".into());
1137 assert_eq!(errors[0].1.reason::<String>().unwrap(), "YYY");
1138 }
1139 _ => panic!(),
1140 }
1141 } else {
1142 panic!();
1143 }
1144 }
1145
1146 assert_eq!(
1147 *logger.lock().unwrap(),
1148 &[
1149 "SyncDataConn::new 1",
1150 "AsyncDataConn::new 2",
1151 "SyncDataConn::pre_commit 1",
1152 "AsyncDataConn::pre_commit 2",
1153 "SyncDataConn::commit 1",
1154 "AsyncDataConn::commit 2 failed",
1155 "SyncDataConn::close 1",
1156 "SyncDataConn::drop 1",
1157 "AsyncDataConn::close 2",
1158 "AsyncDataConn::drop 2",
1159 ]
1160 );
1161 }
1162
1163 #[tokio::test]
1164 async fn test_rollback_and_first_is_sync() {
1165 let logger = Arc::new(Mutex::new(Vec::new()));
1166
1167 {
1168 let mut manager = DataConnManager::new();
1169
1170 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
1171 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1172 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1173 let ssnnptr = SendSyncNonNull::new(nnptr);
1174 manager.add(ssnnptr);
1175
1176 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
1177 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1178 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1179 let ssnnptr = SendSyncNonNull::new(nnptr);
1180 manager.add(ssnnptr);
1181
1182 manager.rollback_async().await;
1183 }
1184
1185 assert_eq!(
1186 *logger.lock().unwrap(),
1187 &[
1188 "SyncDataConn::new 1",
1189 "AsyncDataConn::new 2",
1190 "SyncDataConn::rollback 1",
1191 "AsyncDataConn::rollback 2",
1192 "SyncDataConn::close 1",
1193 "SyncDataConn::drop 1",
1194 "AsyncDataConn::close 2",
1195 "AsyncDataConn::drop 2",
1196 ]
1197 );
1198 }
1199
1200 #[tokio::test]
1201 async fn test_rollback_and_first_is_async() {
1202 let logger = Arc::new(Mutex::new(Vec::new()));
1203
1204 {
1205 let mut manager = DataConnManager::new();
1206
1207 let conn = AsyncDataConn::new(1, logger.clone(), Fail::Not);
1208 let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
1209 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1210 let ssnnptr = SendSyncNonNull::new(nnptr);
1211 manager.add(ssnnptr);
1212
1213 let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
1214 let boxed = Box::new(DataConnContainer::new("bar", Box::new(conn)));
1215 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1216 let ssnnptr = SendSyncNonNull::new(nnptr);
1217 manager.add(ssnnptr);
1218
1219 manager.rollback_async().await;
1220 }
1221
1222 assert_eq!(
1223 *logger.lock().unwrap(),
1224 &[
1225 "AsyncDataConn::new 1",
1226 "SyncDataConn::new 2",
1227 "SyncDataConn::rollback 2",
1228 "AsyncDataConn::rollback 1",
1229 "AsyncDataConn::close 1",
1230 "AsyncDataConn::drop 1",
1231 "SyncDataConn::close 2",
1232 "SyncDataConn::drop 2",
1233 ]
1234 );
1235 }
1236
1237 #[tokio::test]
1238 async fn test_force_back_and_first_is_sync() {
1239 let logger = Arc::new(Mutex::new(Vec::new()));
1240
1241 {
1242 let mut manager = DataConnManager::new();
1243
1244 let conn = SyncDataConn::new(1, logger.clone(), Fail::Not);
1245 let boxed = Box::new(DataConnContainer::new("foo", Box::new(conn)));
1246 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1247 let ssnnptr = SendSyncNonNull::new(nnptr);
1248 manager.add(ssnnptr);
1249
1250 let conn = AsyncDataConn::new(2, logger.clone(), Fail::Not);
1251 let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
1252 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1253 let ssnnptr = SendSyncNonNull::new(nnptr);
1254 manager.add(ssnnptr);
1255
1256 assert!(manager.commit_async().await.is_ok());
1257 manager.rollback_async().await;
1258 }
1259
1260 assert_eq!(
1261 *logger.lock().unwrap(),
1262 &[
1263 "SyncDataConn::new 1",
1264 "AsyncDataConn::new 2",
1265 "SyncDataConn::pre_commit 1",
1266 "AsyncDataConn::pre_commit 2",
1267 "SyncDataConn::commit 1",
1268 "AsyncDataConn::commit 2",
1269 "SyncDataConn::post_commit 1",
1270 "AsyncDataConn::post_commit 2",
1271 "SyncDataConn::force_back 1",
1272 "AsyncDataConn::force_back 2",
1273 "SyncDataConn::close 1",
1274 "SyncDataConn::drop 1",
1275 "AsyncDataConn::close 2",
1276 "AsyncDataConn::drop 2",
1277 ]
1278 );
1279 }
1280
1281 #[tokio::test]
1282 async fn test_force_back_and_first_is_async() {
1283 let logger = Arc::new(Mutex::new(Vec::new()));
1284
1285 {
1286 let mut manager = DataConnManager::new();
1287
1288 let conn = AsyncDataConn::new(1, logger.clone(), Fail::Not);
1289 let boxed = Box::new(DataConnContainer::new("foo".to_string(), Box::new(conn)));
1290 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1291 let ssnnptr = SendSyncNonNull::new(nnptr);
1292 manager.add(ssnnptr);
1293
1294 let conn = SyncDataConn::new(2, logger.clone(), Fail::Not);
1295 let boxed = Box::new(DataConnContainer::new("bar".to_string(), Box::new(conn)));
1296 let nnptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataConnContainer>();
1297 let ssnnptr = SendSyncNonNull::new(nnptr);
1298 manager.add(ssnnptr);
1299
1300 assert!(manager.commit_async().await.is_ok());
1301 manager.rollback_async().await;
1302 }
1303
1304 assert_eq!(
1305 *logger.lock().unwrap(),
1306 &[
1307 "AsyncDataConn::new 1",
1308 "SyncDataConn::new 2",
1309 "SyncDataConn::pre_commit 2",
1310 "AsyncDataConn::pre_commit 1",
1311 "SyncDataConn::commit 2",
1312 "AsyncDataConn::commit 1",
1313 "SyncDataConn::post_commit 2",
1314 "AsyncDataConn::post_commit 1",
1315 "SyncDataConn::force_back 2",
1316 "AsyncDataConn::force_back 1",
1317 "AsyncDataConn::close 1",
1318 "AsyncDataConn::drop 1",
1319 "SyncDataConn::close 2",
1320 "SyncDataConn::drop 2",
1321 ]
1322 );
1323 }
1324 }
1325}