1use std::collections::HashMap;
11use std::sync::Arc;
12use std::sync::atomic::{AtomicUsize, Ordering};
13
14use fsqlite_error::{FrankenError, Result};
15use fsqlite_types::Region;
16use fsqlite_types::cx::{self, Cx};
17use tracing::debug;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
23pub enum RegionKind {
24 DbRoot,
26 WriteCoordinator,
28 SymbolStore,
30 Replication,
32 CheckpointGc,
34 Observability,
36 PerConnection,
38 PerTransaction,
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum RegionState {
45 Open,
47 Closing,
49 Closed,
51}
52
53type Finalizer = Box<dyn FnOnce() + Send>;
55
56type SharedCounter = Arc<AtomicUsize>;
58
59fn new_counter() -> SharedCounter {
60 Arc::new(AtomicUsize::new(0))
61}
62
63struct RegionNode {
67 kind: RegionKind,
68 state: RegionState,
69 cx: Cx<cx::FullCaps>,
70 parent: Option<Region>,
71 children: Vec<Region>,
72 finalizers: Vec<Finalizer>,
73 active_tasks: SharedCounter,
74 active_obligations: SharedCounter,
75}
76
77pub struct TaskHandle {
84 counter: SharedCounter,
85 region: Region,
86}
87
88impl TaskHandle {
89 #[must_use]
91 pub const fn region(&self) -> Region {
92 self.region
93 }
94}
95
96impl Drop for TaskHandle {
97 fn drop(&mut self) {
98 self.counter.fetch_sub(1, Ordering::AcqRel);
99 }
100}
101
102pub struct ObligationHandle {
107 counter: SharedCounter,
108 region: Region,
109}
110
111impl ObligationHandle {
112 #[must_use]
114 pub const fn region(&self) -> Region {
115 self.region
116 }
117
118 pub fn resolve(self) {
122 }
124}
125
126impl Drop for ObligationHandle {
127 fn drop(&mut self) {
128 self.counter.fetch_sub(1, Ordering::AcqRel);
129 }
130}
131
132pub struct RegionTree {
140 nodes: HashMap<Region, RegionNode>,
141 next_id: u32,
142 root: Option<Region>,
143}
144
145impl Default for RegionTree {
146 fn default() -> Self {
147 Self::new()
148 }
149}
150
151impl RegionTree {
152 #[must_use]
154 pub fn new() -> Self {
155 Self {
156 nodes: HashMap::new(),
157 next_id: 0,
158 root: None,
159 }
160 }
161
162 pub fn create_root(&mut self, kind: RegionKind, cx: Cx<cx::FullCaps>) -> Result<Region> {
166 if self.root.is_some() {
167 return Err(FrankenError::Internal(
168 "root region already exists".to_owned(),
169 ));
170 }
171 let id = self.alloc_id();
172 self.nodes.insert(
173 id,
174 RegionNode {
175 kind,
176 state: RegionState::Open,
177 cx,
178 parent: None,
179 children: Vec::new(),
180 finalizers: Vec::new(),
181 active_tasks: new_counter(),
182 active_obligations: new_counter(),
183 },
184 );
185 self.root = Some(id);
186 debug!(region = id.get(), kind = ?kind, "region created (root)");
187 Ok(id)
188 }
189
190 pub fn create_child(
192 &mut self,
193 parent: Region,
194 kind: RegionKind,
195 cx: Cx<cx::FullCaps>,
196 ) -> Result<Region> {
197 let parent_state = self.nodes.get(&parent).map(|n| n.state).ok_or_else(|| {
198 FrankenError::Internal(format!("parent region {} not found", parent.get()))
199 })?;
200 if parent_state != RegionState::Open {
201 return Err(FrankenError::Busy);
202 }
203 let id = self.alloc_id();
204 self.nodes.insert(
205 id,
206 RegionNode {
207 kind,
208 state: RegionState::Open,
209 cx,
210 parent: Some(parent),
211 children: Vec::new(),
212 finalizers: Vec::new(),
213 active_tasks: new_counter(),
214 active_obligations: new_counter(),
215 },
216 );
217 if let Some(parent_node) = self.nodes.get_mut(&parent) {
218 parent_node.children.push(id);
219 }
220 debug!(region = id.get(), parent = parent.get(), kind = ?kind, "region created (child)");
221 Ok(id)
222 }
223
224 #[must_use]
228 pub fn root(&self) -> Option<Region> {
229 self.root
230 }
231
232 #[must_use]
234 pub fn kind(&self, id: Region) -> Option<RegionKind> {
235 self.nodes.get(&id).map(|n| n.kind)
236 }
237
238 #[must_use]
240 pub fn state(&self, id: Region) -> Option<RegionState> {
241 self.nodes.get(&id).map(|n| n.state)
242 }
243
244 #[must_use]
246 pub fn parent(&self, id: Region) -> Option<Option<Region>> {
247 self.nodes.get(&id).map(|n| n.parent)
248 }
249
250 #[must_use]
252 pub fn children(&self, id: Region) -> Option<&[Region]> {
253 self.nodes.get(&id).map(|n| n.children.as_slice())
254 }
255
256 #[must_use]
258 pub fn cx(&self, id: Region) -> Option<Cx<cx::FullCaps>> {
259 self.nodes.get(&id).map(|n| n.cx.clone())
260 }
261
262 #[must_use]
264 pub fn active_tasks(&self, id: Region) -> usize {
265 self.nodes
266 .get(&id)
267 .map_or(0, |n| n.active_tasks.load(Ordering::Acquire))
268 }
269
270 #[must_use]
272 pub fn active_obligations(&self, id: Region) -> usize {
273 self.nodes
274 .get(&id)
275 .map_or(0, |n| n.active_obligations.load(Ordering::Acquire))
276 }
277
278 pub fn register_task(&self, id: Region) -> Result<TaskHandle> {
285 let node = self
286 .nodes
287 .get(&id)
288 .ok_or_else(|| FrankenError::Internal(format!("region {} not found", id.get())))?;
289 if node.state != RegionState::Open {
290 return Err(FrankenError::Busy);
291 }
292 node.active_tasks.fetch_add(1, Ordering::AcqRel);
293 debug!(region = id.get(), "task registered");
294 Ok(TaskHandle {
295 counter: Arc::clone(&node.active_tasks),
296 region: id,
297 })
298 }
299
300 pub fn register_obligation(&self, id: Region) -> Result<ObligationHandle> {
306 let node = self
307 .nodes
308 .get(&id)
309 .ok_or_else(|| FrankenError::Internal(format!("region {} not found", id.get())))?;
310 if node.state == RegionState::Closed {
311 return Err(FrankenError::Busy);
312 }
313 node.active_obligations.fetch_add(1, Ordering::AcqRel);
314 debug!(region = id.get(), "obligation registered");
315 Ok(ObligationHandle {
316 counter: Arc::clone(&node.active_obligations),
317 region: id,
318 })
319 }
320
321 pub fn register_finalizer(
323 &mut self,
324 id: Region,
325 finalizer: impl FnOnce() + Send + 'static,
326 ) -> Result<()> {
327 let node = self
328 .nodes
329 .get_mut(&id)
330 .ok_or_else(|| FrankenError::Internal(format!("region {} not found", id.get())))?;
331 if node.state != RegionState::Open {
332 return Err(FrankenError::Busy);
333 }
334 node.finalizers.push(Box::new(finalizer));
335 Ok(())
336 }
337
338 pub fn begin_close(&mut self, id: Region) -> Result<()> {
348 let children = self
349 .nodes
350 .get(&id)
351 .ok_or_else(|| FrankenError::Internal(format!("region {} not found", id.get())))?
352 .children
353 .clone();
354
355 let node = self
357 .nodes
358 .get_mut(&id)
359 .expect("region confirmed present above");
360 if node.state == RegionState::Closed {
361 return Ok(());
362 }
363 node.cx.cancel();
364 node.state = RegionState::Closing;
365 debug!(region = id.get(), kind = ?node.kind, "region closing");
366
367 for child in children {
369 if self.state(child) == Some(RegionState::Open) {
370 self.begin_close(child)?;
371 }
372 }
373 Ok(())
374 }
375
376 #[must_use]
383 pub fn is_quiescent(&self, id: Region) -> bool {
384 let Some(node) = self.nodes.get(&id) else {
385 return false;
386 };
387 let children_closed = node
388 .children
389 .iter()
390 .all(|child| self.state(*child) == Some(RegionState::Closed));
391 children_closed
392 && node.active_tasks.load(Ordering::Acquire) == 0
393 && node.active_obligations.load(Ordering::Acquire) == 0
394 }
395
396 pub fn complete_close(&mut self, id: Region) -> Result<()> {
400 let state = self
401 .nodes
402 .get(&id)
403 .map(|n| n.state)
404 .ok_or_else(|| FrankenError::Internal(format!("region {} not found", id.get())))?;
405 if state == RegionState::Closed {
406 return Ok(());
407 }
408 if state != RegionState::Closing {
409 return Err(FrankenError::Internal(
410 "region must be in Closing state before complete_close".to_owned(),
411 ));
412 }
413 if !self.is_quiescent(id) {
414 return Err(FrankenError::Internal(
415 "region not quiescent; cannot complete close".to_owned(),
416 ));
417 }
418 let node = self
419 .nodes
420 .get_mut(&id)
421 .ok_or_else(|| FrankenError::Internal(format!("region {} not found", id.get())))?;
422 let finalizers = std::mem::take(&mut node.finalizers);
423 for f in finalizers {
424 f();
425 }
426 node.state = RegionState::Closed;
427 debug!(region = id.get(), kind = ?node.kind, "region closed");
428 Ok(())
429 }
430
431 pub fn close_and_drain(&mut self, id: Region) -> Result<()> {
437 self.begin_close(id)?;
438 self.drain_subtree(id)
439 }
440
441 fn drain_subtree(&mut self, id: Region) -> Result<()> {
444 let children = self
445 .nodes
446 .get(&id)
447 .map(|n| n.children.clone())
448 .unwrap_or_default();
449 for child in children {
450 self.drain_subtree(child)?;
451 }
452 while self.active_tasks(id) > 0 || self.active_obligations(id) > 0 {
453 std::hint::spin_loop();
454 }
455 self.complete_close(id)
456 }
457
458 fn alloc_id(&mut self) -> Region {
459 let id = Region::new(self.next_id);
460 self.next_id = self.next_id.checked_add(1).expect("region id overflow");
461 id
462 }
463}
464
465#[cfg(test)]
468mod tests {
469 use std::sync::Arc;
470 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
471
472 use super::*;
473
474 const BEAD_ID: &str = "bd-3go.9";
475
476 #[test]
477 fn test_region_tree_structure() {
478 let mut tree = RegionTree::new();
479 let root = tree
480 .create_root(RegionKind::DbRoot, Cx::new())
481 .expect("root creation");
482 let wc = tree
483 .create_child(root, RegionKind::WriteCoordinator, Cx::new())
484 .expect("wc");
485 let ss = tree
486 .create_child(root, RegionKind::SymbolStore, Cx::new())
487 .expect("ss");
488 let repl = tree
489 .create_child(root, RegionKind::Replication, Cx::new())
490 .expect("repl");
491 let gc = tree
492 .create_child(root, RegionKind::CheckpointGc, Cx::new())
493 .expect("gc");
494 let obs = tree
495 .create_child(root, RegionKind::Observability, Cx::new())
496 .expect("obs");
497
498 assert_eq!(
500 tree.root(),
501 Some(root),
502 "bead_id={BEAD_ID} case=root_exists"
503 );
504 assert_eq!(tree.kind(root), Some(RegionKind::DbRoot));
505
506 let children = tree.children(root).expect("root has children");
508 assert_eq!(
509 children.len(),
510 5,
511 "bead_id={BEAD_ID} case=root_has_5_service_children"
512 );
513 assert_eq!(children, &[wc, ss, repl, gc, obs]);
514
515 assert_eq!(tree.kind(wc), Some(RegionKind::WriteCoordinator));
517 assert_eq!(tree.kind(ss), Some(RegionKind::SymbolStore));
518 assert_eq!(tree.kind(repl), Some(RegionKind::Replication));
519 assert_eq!(tree.kind(gc), Some(RegionKind::CheckpointGc));
520 assert_eq!(tree.kind(obs), Some(RegionKind::Observability));
521
522 for &child in children {
523 assert_eq!(
524 tree.parent(child),
525 Some(Some(root)),
526 "bead_id={BEAD_ID} case=child_parent_is_root region={}",
527 child.get()
528 );
529 }
530 }
531
532 #[test]
533 fn test_region_quiescence_all_children_complete() {
534 let mut tree = RegionTree::new();
535 let root = tree
536 .create_root(RegionKind::DbRoot, Cx::new())
537 .expect("root");
538 let region = tree
539 .create_child(root, RegionKind::WriteCoordinator, Cx::new())
540 .expect("wc");
541
542 let tasks: Vec<TaskHandle> = (0..5)
544 .map(|_| tree.register_task(region).expect("register task"))
545 .collect();
546
547 assert_eq!(
548 tree.active_tasks(region),
549 5,
550 "bead_id={BEAD_ID} case=5_tasks_registered"
551 );
552
553 tree.begin_close(region).expect("begin close");
555 assert_eq!(tree.state(region), Some(RegionState::Closing));
556 assert!(
557 !tree.is_quiescent(region),
558 "bead_id={BEAD_ID} case=not_quiescent_with_active_tasks"
559 );
560
561 for (i, task) in tasks.into_iter().enumerate() {
563 drop(task);
564 if i < 4 {
565 assert!(
566 !tree.is_quiescent(region),
567 "bead_id={BEAD_ID} case=not_quiescent_after_{}_completions",
568 i + 1
569 );
570 }
571 }
572
573 assert!(
574 tree.is_quiescent(region),
575 "bead_id={BEAD_ID} case=quiescent_after_all_tasks_complete"
576 );
577 tree.complete_close(region).expect("complete close");
578 assert_eq!(tree.state(region), Some(RegionState::Closed));
579 }
580
581 #[test]
582 fn test_region_quiescence_finalizers_run() {
583 let mut tree = RegionTree::new();
584 let root = tree
585 .create_root(RegionKind::DbRoot, Cx::new())
586 .expect("root");
587 let region = tree
588 .create_child(root, RegionKind::WriteCoordinator, Cx::new())
589 .expect("wc");
590
591 let flags: Vec<Arc<AtomicBool>> =
593 (0..3).map(|_| Arc::new(AtomicBool::new(false))).collect();
594 let tasks: Vec<TaskHandle> = (0..3)
595 .map(|_| tree.register_task(region).expect("register task"))
596 .collect();
597 for flag in &flags {
598 let f = Arc::clone(flag);
599 tree.register_finalizer(region, move || {
600 f.store(true, Ordering::Release);
601 })
602 .expect("register finalizer");
603 }
604
605 tree.begin_close(region).expect("begin close");
607 drop(tasks);
608
609 for (i, flag) in flags.iter().enumerate() {
611 assert!(
612 !flag.load(Ordering::Acquire),
613 "bead_id={BEAD_ID} case=finalizer_{i}_not_run_before_complete_close"
614 );
615 }
616
617 tree.complete_close(region).expect("complete close");
619 for (i, flag) in flags.iter().enumerate() {
620 assert!(
621 flag.load(Ordering::Acquire),
622 "bead_id={BEAD_ID} case=finalizer_{i}_ran_after_complete_close"
623 );
624 }
625 }
626
627 #[test]
628 fn test_region_quiescence_obligations_resolved() {
629 let mut tree = RegionTree::new();
630 let root = tree
631 .create_root(RegionKind::DbRoot, Cx::new())
632 .expect("root");
633 let region = tree
634 .create_child(root, RegionKind::WriteCoordinator, Cx::new())
635 .expect("wc");
636
637 let obligations: Vec<ObligationHandle> = (0..3)
638 .map(|_| {
639 tree.register_obligation(region)
640 .expect("register obligation")
641 })
642 .collect();
643
644 tree.begin_close(region).expect("begin close");
645 assert!(
646 !tree.is_quiescent(region),
647 "bead_id={BEAD_ID} case=not_quiescent_with_pending_obligations"
648 );
649
650 for (i, obligation) in obligations.into_iter().enumerate() {
652 obligation.resolve();
653 if i < 2 {
654 assert!(
655 !tree.is_quiescent(region),
656 "bead_id={BEAD_ID} case=not_quiescent_after_{}_resolutions",
657 i + 1
658 );
659 }
660 }
661
662 assert!(
663 tree.is_quiescent(region),
664 "bead_id={BEAD_ID} case=quiescent_after_all_obligations_resolved"
665 );
666 tree.complete_close(region).expect("complete close");
667 }
668
669 #[test]
670 fn test_no_detached_tasks() {
671 let tree = RegionTree::new();
672 let result = tree.register_task(Region::new(999));
674 assert!(
675 result.is_err(),
676 "bead_id={BEAD_ID} case=detached_task_rejected"
677 );
678 }
679
680 #[test]
681 fn test_complete_close_requires_closing_state() {
682 let mut tree = RegionTree::new();
683 let root = tree
684 .create_root(RegionKind::DbRoot, Cx::new())
685 .expect("root");
686 let child = tree
687 .create_child(root, RegionKind::WriteCoordinator, Cx::new())
688 .expect("child");
689
690 let err = tree
691 .complete_close(child)
692 .expect_err("must require begin_close");
693 assert!(
694 matches!(err, FrankenError::Internal(_)),
695 "bead_id={BEAD_ID} case=complete_close_requires_closing_state got {err:?}"
696 );
697 assert_eq!(
698 tree.state(child),
699 Some(RegionState::Open),
700 "bead_id={BEAD_ID} case=child_state_unchanged_when_close_rejected"
701 );
702 }
703
704 #[test]
705 fn test_database_close_awaits_quiescence() {
706 let mut tree = RegionTree::new();
707 let root = tree
708 .create_root(RegionKind::DbRoot, Cx::new())
709 .expect("root");
710
711 let wc = tree
713 .create_child(root, RegionKind::WriteCoordinator, Cx::new())
714 .expect("wc");
715 let gc = tree
716 .create_child(root, RegionKind::CheckpointGc, Cx::new())
717 .expect("gc");
718
719 let wc_task = tree.register_task(wc).expect("wc task");
720 let gc_task = tree.register_task(gc).expect("gc task");
721
722 let finalized = Arc::new(AtomicBool::new(false));
723 {
724 let flag = Arc::clone(&finalized);
725 tree.register_finalizer(root, move || {
726 flag.store(true, Ordering::Release);
727 })
728 .expect("root finalizer");
729 }
730
731 tree.begin_close(root).expect("begin close root");
733 assert_eq!(tree.state(wc), Some(RegionState::Closing));
734 assert_eq!(tree.state(gc), Some(RegionState::Closing));
735
736 assert!(
738 !tree.is_quiescent(root),
739 "bead_id={BEAD_ID} case=root_not_quiescent_with_active_children"
740 );
741
742 drop(wc_task);
744 assert!(
745 !tree.is_quiescent(root),
746 "bead_id={BEAD_ID} case=root_not_quiescent_gc_still_active"
747 );
748 drop(gc_task);
749
750 assert!(tree.is_quiescent(wc));
752 assert!(tree.is_quiescent(gc));
753 tree.complete_close(wc).expect("close wc");
754 tree.complete_close(gc).expect("close gc");
755
756 assert!(
758 tree.is_quiescent(root),
759 "bead_id={BEAD_ID} case=root_quiescent_after_children_closed"
760 );
761 tree.complete_close(root).expect("close root");
762
763 assert!(
764 finalized.load(Ordering::Acquire),
765 "bead_id={BEAD_ID} case=root_finalizer_ran"
766 );
767 assert_eq!(tree.state(root), Some(RegionState::Closed));
768 }
769
770 #[test]
771 fn test_per_connection_region_child_of_root() {
772 let mut tree = RegionTree::new();
773 let root = tree
774 .create_root(RegionKind::DbRoot, Cx::new())
775 .expect("root");
776 let conn = tree
777 .create_child(root, RegionKind::PerConnection, Cx::new())
778 .expect("conn");
779
780 assert_eq!(
781 tree.parent(conn),
782 Some(Some(root)),
783 "bead_id={BEAD_ID} case=connection_is_child_of_root"
784 );
785 assert_eq!(tree.kind(conn), Some(RegionKind::PerConnection));
786
787 let conn_cx = tree.cx(conn).expect("conn cx");
789 tree.begin_close(root).expect("begin close root");
790 assert!(
791 conn_cx.is_cancel_requested(),
792 "bead_id={BEAD_ID} case=root_close_cancels_connection"
793 );
794 assert_eq!(tree.state(conn), Some(RegionState::Closing));
795
796 tree.complete_close(conn).expect("close conn");
797 tree.complete_close(root).expect("close root");
798 }
799
800 #[test]
801 #[allow(clippy::too_many_lines)]
802 fn test_e2e_structured_concurrency_shutdown() {
803 let mut tree = RegionTree::new();
804 let root = tree
805 .create_root(RegionKind::DbRoot, Cx::new())
806 .expect("root");
807
808 let wc = tree
810 .create_child(root, RegionKind::WriteCoordinator, Cx::new())
811 .expect("wc");
812 let ss = tree
813 .create_child(root, RegionKind::SymbolStore, Cx::new())
814 .expect("ss");
815 let repl = tree
816 .create_child(root, RegionKind::Replication, Cx::new())
817 .expect("repl");
818 let gc = tree
819 .create_child(root, RegionKind::CheckpointGc, Cx::new())
820 .expect("gc");
821 let obs = tree
822 .create_child(root, RegionKind::Observability, Cx::new())
823 .expect("obs");
824
825 let conns: Vec<Region> = (0..3)
827 .map(|_| {
828 tree.create_child(root, RegionKind::PerConnection, Cx::new())
829 .expect("conn")
830 })
831 .collect();
832
833 let mut txn_tasks = Vec::new();
834 for &conn in &conns {
835 let txn = tree
836 .create_child(conn, RegionKind::PerTransaction, Cx::new())
837 .expect("txn");
838 txn_tasks.push(tree.register_task(txn).expect("txn task"));
839 }
840
841 let service_tasks = vec![
843 tree.register_task(wc).expect("wc task"),
844 tree.register_task(ss).expect("ss task"),
845 tree.register_task(repl).expect("repl task"),
846 tree.register_task(gc).expect("gc task"),
847 tree.register_task(obs).expect("obs task"),
848 ];
849
850 let finalized_count = Arc::new(AtomicUsize::new(0));
852 for _ in 0..3 {
853 let counter = Arc::clone(&finalized_count);
854 tree.register_finalizer(root, move || {
855 counter.fetch_add(1, Ordering::AcqRel);
856 })
857 .expect("root finalizer");
858 }
859
860 tree.begin_close(root).expect("begin close root");
862
863 assert_eq!(tree.state(root), Some(RegionState::Closing));
865 for &conn in &conns {
866 assert_eq!(tree.state(conn), Some(RegionState::Closing));
867 }
868
869 assert!(
871 !tree.is_quiescent(root),
872 "bead_id={BEAD_ID} case=e2e_root_not_quiescent_initially"
873 );
874
875 drop(txn_tasks);
877 drop(service_tasks);
878
879 for &conn in &conns {
881 let txn_children = tree.children(conn).expect("conn children").to_vec();
882 for txn in txn_children {
883 tree.complete_close(txn).expect("close txn");
884 }
885 }
886 for &conn in &conns {
887 tree.complete_close(conn).expect("close conn");
888 }
889 for &svc in &[wc, ss, repl, gc, obs] {
890 tree.complete_close(svc).expect("close svc");
891 }
892
893 assert!(
895 tree.is_quiescent(root),
896 "bead_id={BEAD_ID} case=e2e_root_quiescent"
897 );
898 tree.complete_close(root).expect("close root");
899
900 assert_eq!(
901 finalized_count.load(Ordering::Acquire),
902 3,
903 "bead_id={BEAD_ID} case=e2e_all_finalizers_ran"
904 );
905 assert_eq!(
906 tree.state(root),
907 Some(RegionState::Closed),
908 "bead_id={BEAD_ID} case=e2e_root_closed"
909 );
910
911 assert_eq!(tree.active_tasks(root), 0);
913 for &conn in &conns {
914 assert_eq!(tree.active_tasks(conn), 0);
915 }
916 }
917
918 #[test]
919 fn test_close_and_drain_threaded() {
920 use std::sync::Mutex;
921 use std::thread;
922 use std::time::Duration;
923
924 let tree = Arc::new(Mutex::new(RegionTree::new()));
925 let root = {
926 let mut t = tree.lock().unwrap_or_else(|e| e.into_inner());
927 t.create_root(RegionKind::DbRoot, Cx::new()).expect("root")
928 };
929 let wc = {
930 let mut t = tree.lock().unwrap_or_else(|e| e.into_inner());
931 t.create_child(root, RegionKind::WriteCoordinator, Cx::new())
932 .expect("wc")
933 };
934
935 let task1 = tree
937 .lock()
938 .unwrap_or_else(|e| e.into_inner())
939 .register_task(wc)
940 .expect("t1");
941 let task2 = tree
942 .lock()
943 .unwrap_or_else(|e| e.into_inner())
944 .register_task(wc)
945 .expect("t2");
946
947 let completed = Arc::new(AtomicBool::new(false));
948 let flag = Arc::clone(&completed);
949
950 let t1 = thread::spawn(move || {
952 thread::sleep(Duration::from_millis(20));
953 drop(task1);
954 });
955 let t2 = thread::spawn(move || {
956 thread::sleep(Duration::from_millis(30));
957 drop(task2);
958 });
959
960 {
962 let mut t = tree.lock().unwrap_or_else(|e| e.into_inner());
963 t.close_and_drain(root).expect("close_and_drain");
964 }
965 flag.store(true, Ordering::Release);
966
967 t1.join().expect("t1 join");
968 t2.join().expect("t2 join");
969
970 assert!(
971 completed.load(Ordering::Acquire),
972 "bead_id={BEAD_ID} case=threaded_close_completed"
973 );
974 assert_eq!(
975 tree.lock().unwrap_or_else(|e| e.into_inner()).state(root),
976 Some(RegionState::Closed),
977 "bead_id={BEAD_ID} case=threaded_root_closed"
978 );
979 }
980}