Skip to main content

fsqlite_core/
region.rs

1//! Structured concurrency region tree (§4.11, bd-3go.9).
2//!
3//! Every background worker, coordinator, and long-lived service runs as a
4//! region-owned task. The region tree enforces INV-REGION-QUIESCENCE: no
5//! region closes until all children complete, all finalizers run, and all
6//! obligations resolve.
7//!
8//! Close protocol: cancel → drain children → run finalizers → mark closed.
9
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::sync::atomic::{AtomicUsize, Ordering};
13
14use fsqlite_error::{FrankenError, Result};
15use fsqlite_types::Region;
16use fsqlite_types::cx::{self, Cx};
17use tracing::debug;
18
19// ── Types ──────────────────────────────────────────────────────────────
20
21/// Normative region kinds from §4.11.
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
23pub enum RegionKind {
24    /// Top-level database root region.
25    DbRoot,
26    /// Write coordinator service region (native marker sequencer + compat WAL).
27    WriteCoordinator,
28    /// Symbol store service region (local symbol logs + tiered storage fetch).
29    SymbolStore,
30    /// Replication service region (stream symbols; anti-entropy; membership).
31    Replication,
32    /// Checkpoint/GC service region (checkpointer, compactor, GC horizon).
33    CheckpointGc,
34    /// Observability service region (deadline monitor, task inspector, metrics).
35    Observability,
36    /// Per-connection region (child of root).
37    PerConnection,
38    /// Per-transaction region (child of connection).
39    PerTransaction,
40}
41
42/// State machine for region lifecycle.
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum RegionState {
45    /// Region is open and accepting work.
46    Open,
47    /// Region is closing (cancellation requested, draining).
48    Closing,
49    /// Region is fully closed (quiescent, finalizers run).
50    Closed,
51}
52
53/// A finalizer callback to run during region close.
54type Finalizer = Box<dyn FnOnce() + Send>;
55
56/// Shared atomic counter for tracking active tasks or obligations.
57type SharedCounter = Arc<AtomicUsize>;
58
59fn new_counter() -> SharedCounter {
60    Arc::new(AtomicUsize::new(0))
61}
62
63// ── RegionNode ─────────────────────────────────────────────────────────
64
65/// A node in the structured concurrency region tree.
66struct RegionNode {
67    kind: RegionKind,
68    state: RegionState,
69    cx: Cx<cx::FullCaps>,
70    parent: Option<Region>,
71    children: Vec<Region>,
72    finalizers: Vec<Finalizer>,
73    active_tasks: SharedCounter,
74    active_obligations: SharedCounter,
75}
76
77// ── RAII handles ───────────────────────────────────────────────────────
78
79/// RAII handle for a task registered in a region.
80///
81/// When dropped, the task count for the owning region is decremented.
82/// This ensures tasks cannot leak without being accounted for.
83pub struct TaskHandle {
84    counter: SharedCounter,
85    region: Region,
86}
87
88impl TaskHandle {
89    /// The region this task belongs to.
90    #[must_use]
91    pub const fn region(&self) -> Region {
92        self.region
93    }
94}
95
96impl Drop for TaskHandle {
97    fn drop(&mut self) {
98        self.counter.fetch_sub(1, Ordering::AcqRel);
99    }
100}
101
102/// RAII handle for an obligation registered in a region.
103///
104/// When dropped (resolved), the obligation count is decremented.
105/// Obligations model the two-phase lifecycle: Reserved → Committed/Aborted.
106pub struct ObligationHandle {
107    counter: SharedCounter,
108    region: Region,
109}
110
111impl ObligationHandle {
112    /// The region this obligation belongs to.
113    #[must_use]
114    pub const fn region(&self) -> Region {
115        self.region
116    }
117
118    /// Explicitly resolve the obligation (commit or abort).
119    ///
120    /// Equivalent to dropping the handle; provided for clarity at call sites.
121    pub fn resolve(self) {
122        // Ownership transfer triggers Drop, which decrements the counter.
123    }
124}
125
126impl Drop for ObligationHandle {
127    fn drop(&mut self) {
128        self.counter.fetch_sub(1, Ordering::AcqRel);
129    }
130}
131
132// ── RegionTree ─────────────────────────────────────────────────────────
133
134/// Tree of regions enforcing structured concurrency (§4.11).
135///
136/// Every task/actor must be region-owned. The tree enforces
137/// INV-REGION-QUIESCENCE: no region closes until all children
138/// complete, all finalizers run, and all obligations resolve.
139pub struct RegionTree {
140    nodes: HashMap<Region, RegionNode>,
141    next_id: u32,
142    root: Option<Region>,
143}
144
145impl Default for RegionTree {
146    fn default() -> Self {
147        Self::new()
148    }
149}
150
151impl RegionTree {
152    /// Create an empty region tree.
153    #[must_use]
154    pub fn new() -> Self {
155        Self {
156            nodes: HashMap::new(),
157            next_id: 0,
158            root: None,
159        }
160    }
161
162    /// Create the root region.
163    ///
164    /// Only one root region may exist. Returns an error if a root already exists.
165    pub fn create_root(&mut self, kind: RegionKind, cx: Cx<cx::FullCaps>) -> Result<Region> {
166        if self.root.is_some() {
167            return Err(FrankenError::Internal(
168                "root region already exists".to_owned(),
169            ));
170        }
171        let id = self.alloc_id();
172        self.nodes.insert(
173            id,
174            RegionNode {
175                kind,
176                state: RegionState::Open,
177                cx,
178                parent: None,
179                children: Vec::new(),
180                finalizers: Vec::new(),
181                active_tasks: new_counter(),
182                active_obligations: new_counter(),
183            },
184        );
185        self.root = Some(id);
186        debug!(region = id.get(), kind = ?kind, "region created (root)");
187        Ok(id)
188    }
189
190    /// Create a child region under the given parent.
191    pub fn create_child(
192        &mut self,
193        parent: Region,
194        kind: RegionKind,
195        cx: Cx<cx::FullCaps>,
196    ) -> Result<Region> {
197        let parent_state = self.nodes.get(&parent).map(|n| n.state).ok_or_else(|| {
198            FrankenError::Internal(format!("parent region {} not found", parent.get()))
199        })?;
200        if parent_state != RegionState::Open {
201            return Err(FrankenError::Busy);
202        }
203        let id = self.alloc_id();
204        self.nodes.insert(
205            id,
206            RegionNode {
207                kind,
208                state: RegionState::Open,
209                cx,
210                parent: Some(parent),
211                children: Vec::new(),
212                finalizers: Vec::new(),
213                active_tasks: new_counter(),
214                active_obligations: new_counter(),
215            },
216        );
217        if let Some(parent_node) = self.nodes.get_mut(&parent) {
218            parent_node.children.push(id);
219        }
220        debug!(region = id.get(), parent = parent.get(), kind = ?kind, "region created (child)");
221        Ok(id)
222    }
223
224    // ── Accessors ──────────────────────────────────────────────────────
225
226    /// Root region, if created.
227    #[must_use]
228    pub fn root(&self) -> Option<Region> {
229        self.root
230    }
231
232    /// Query the kind of a region.
233    #[must_use]
234    pub fn kind(&self, id: Region) -> Option<RegionKind> {
235        self.nodes.get(&id).map(|n| n.kind)
236    }
237
238    /// Query the state of a region.
239    #[must_use]
240    pub fn state(&self, id: Region) -> Option<RegionState> {
241        self.nodes.get(&id).map(|n| n.state)
242    }
243
244    /// Query the parent of a region.
245    #[must_use]
246    pub fn parent(&self, id: Region) -> Option<Option<Region>> {
247        self.nodes.get(&id).map(|n| n.parent)
248    }
249
250    /// List children of a region.
251    #[must_use]
252    pub fn children(&self, id: Region) -> Option<&[Region]> {
253        self.nodes.get(&id).map(|n| n.children.as_slice())
254    }
255
256    /// Get a clone of the region's `Cx` for cancellation inspection.
257    #[must_use]
258    pub fn cx(&self, id: Region) -> Option<Cx<cx::FullCaps>> {
259        self.nodes.get(&id).map(|n| n.cx.clone())
260    }
261
262    /// Active task count for a region.
263    #[must_use]
264    pub fn active_tasks(&self, id: Region) -> usize {
265        self.nodes
266            .get(&id)
267            .map_or(0, |n| n.active_tasks.load(Ordering::Acquire))
268    }
269
270    /// Active obligation count for a region.
271    #[must_use]
272    pub fn active_obligations(&self, id: Region) -> usize {
273        self.nodes
274            .get(&id)
275            .map_or(0, |n| n.active_obligations.load(Ordering::Acquire))
276    }
277
278    // ── Task / obligation / finalizer registration ─────────────────────
279
280    /// Register a task in a region, returning an RAII handle.
281    ///
282    /// The task count is incremented; when the handle is dropped, it decrements.
283    /// Returns `Err(Busy)` if the region is not [`RegionState::Open`].
284    pub fn register_task(&self, id: Region) -> Result<TaskHandle> {
285        let node = self
286            .nodes
287            .get(&id)
288            .ok_or_else(|| FrankenError::Internal(format!("region {} not found", id.get())))?;
289        if node.state != RegionState::Open {
290            return Err(FrankenError::Busy);
291        }
292        node.active_tasks.fetch_add(1, Ordering::AcqRel);
293        debug!(region = id.get(), "task registered");
294        Ok(TaskHandle {
295            counter: Arc::clone(&node.active_tasks),
296            region: id,
297        })
298    }
299
300    /// Register an obligation in a region, returning an RAII handle.
301    ///
302    /// Obligations can be registered while the region is Open or Closing
303    /// (to allow in-flight work to create follow-up obligations during drain).
304    /// Returns `Err(Busy)` if the region is [`RegionState::Closed`].
305    pub fn register_obligation(&self, id: Region) -> Result<ObligationHandle> {
306        let node = self
307            .nodes
308            .get(&id)
309            .ok_or_else(|| FrankenError::Internal(format!("region {} not found", id.get())))?;
310        if node.state == RegionState::Closed {
311            return Err(FrankenError::Busy);
312        }
313        node.active_obligations.fetch_add(1, Ordering::AcqRel);
314        debug!(region = id.get(), "obligation registered");
315        Ok(ObligationHandle {
316            counter: Arc::clone(&node.active_obligations),
317            region: id,
318        })
319    }
320
321    /// Register a finalizer callback to run during region close.
322    pub fn register_finalizer(
323        &mut self,
324        id: Region,
325        finalizer: impl FnOnce() + Send + 'static,
326    ) -> Result<()> {
327        let node = self
328            .nodes
329            .get_mut(&id)
330            .ok_or_else(|| FrankenError::Internal(format!("region {} not found", id.get())))?;
331        if node.state != RegionState::Open {
332            return Err(FrankenError::Busy);
333        }
334        node.finalizers.push(Box::new(finalizer));
335        Ok(())
336    }
337
338    // ── Close protocol ─────────────────────────────────────────────────
339
340    /// Begin closing a region: cancel its `Cx` and set state to `Closing`.
341    ///
342    /// Recursively begins close on all descendant regions (parent-first
343    /// cancellation propagation per INV-CANCEL-PROPAGATES).
344    ///
345    /// Does NOT wait for quiescence. Use [`is_quiescent`](Self::is_quiescent)
346    /// to poll, then [`complete_close`](Self::complete_close) to finalize.
347    pub fn begin_close(&mut self, id: Region) -> Result<()> {
348        let children = self
349            .nodes
350            .get(&id)
351            .ok_or_else(|| FrankenError::Internal(format!("region {} not found", id.get())))?
352            .children
353            .clone();
354
355        // Cancel this region's Cx first (parent-first propagation).
356        let node = self
357            .nodes
358            .get_mut(&id)
359            .expect("region confirmed present above");
360        if node.state == RegionState::Closed {
361            return Ok(());
362        }
363        node.cx.cancel();
364        node.state = RegionState::Closing;
365        debug!(region = id.get(), kind = ?node.kind, "region closing");
366
367        // Then recursively close children.
368        for child in children {
369            if self.state(child) == Some(RegionState::Open) {
370                self.begin_close(child)?;
371            }
372        }
373        Ok(())
374    }
375
376    /// Check whether a region has reached quiescence.
377    ///
378    /// A region is quiescent when:
379    /// - all child regions are [`RegionState::Closed`],
380    /// - active task count is zero,
381    /// - active obligation count is zero.
382    #[must_use]
383    pub fn is_quiescent(&self, id: Region) -> bool {
384        let Some(node) = self.nodes.get(&id) else {
385            return false;
386        };
387        let children_closed = node
388            .children
389            .iter()
390            .all(|child| self.state(*child) == Some(RegionState::Closed));
391        children_closed
392            && node.active_tasks.load(Ordering::Acquire) == 0
393            && node.active_obligations.load(Ordering::Acquire) == 0
394    }
395
396    /// Complete region close: run finalizers and mark as [`RegionState::Closed`].
397    ///
398    /// Returns an error if the region is not quiescent.
399    pub fn complete_close(&mut self, id: Region) -> Result<()> {
400        let state = self
401            .nodes
402            .get(&id)
403            .map(|n| n.state)
404            .ok_or_else(|| FrankenError::Internal(format!("region {} not found", id.get())))?;
405        if state == RegionState::Closed {
406            return Ok(());
407        }
408        if state != RegionState::Closing {
409            return Err(FrankenError::Internal(
410                "region must be in Closing state before complete_close".to_owned(),
411            ));
412        }
413        if !self.is_quiescent(id) {
414            return Err(FrankenError::Internal(
415                "region not quiescent; cannot complete close".to_owned(),
416            ));
417        }
418        let node = self
419            .nodes
420            .get_mut(&id)
421            .ok_or_else(|| FrankenError::Internal(format!("region {} not found", id.get())))?;
422        let finalizers = std::mem::take(&mut node.finalizers);
423        for f in finalizers {
424            f();
425        }
426        node.state = RegionState::Closed;
427        debug!(region = id.get(), kind = ?node.kind, "region closed");
428        Ok(())
429    }
430
431    /// Close a region and spin-wait until quiescent, then finalize.
432    ///
433    /// This is the full close protocol: cancel → drain → finalize.
434    /// Blocks the caller until INV-REGION-QUIESCENCE is satisfied.
435    /// Children are drained bottom-up before the parent.
436    pub fn close_and_drain(&mut self, id: Region) -> Result<()> {
437        self.begin_close(id)?;
438        self.drain_subtree(id)
439    }
440
441    /// Recursively drain a subtree bottom-up: wait for each region's tasks
442    /// and obligations to complete, then run finalizers and mark closed.
443    fn drain_subtree(&mut self, id: Region) -> Result<()> {
444        let children = self
445            .nodes
446            .get(&id)
447            .map(|n| n.children.clone())
448            .unwrap_or_default();
449        for child in children {
450            self.drain_subtree(child)?;
451        }
452        while self.active_tasks(id) > 0 || self.active_obligations(id) > 0 {
453            std::hint::spin_loop();
454        }
455        self.complete_close(id)
456    }
457
458    fn alloc_id(&mut self) -> Region {
459        let id = Region::new(self.next_id);
460        self.next_id = self.next_id.checked_add(1).expect("region id overflow");
461        id
462    }
463}
464
465// ── Tests ──────────────────────────────────────────────────────────────
466
467#[cfg(test)]
468mod tests {
469    use std::sync::Arc;
470    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
471
472    use super::*;
473
474    const BEAD_ID: &str = "bd-3go.9";
475
476    #[test]
477    fn test_region_tree_structure() {
478        let mut tree = RegionTree::new();
479        let root = tree
480            .create_root(RegionKind::DbRoot, Cx::new())
481            .expect("root creation");
482        let wc = tree
483            .create_child(root, RegionKind::WriteCoordinator, Cx::new())
484            .expect("wc");
485        let ss = tree
486            .create_child(root, RegionKind::SymbolStore, Cx::new())
487            .expect("ss");
488        let repl = tree
489            .create_child(root, RegionKind::Replication, Cx::new())
490            .expect("repl");
491        let gc = tree
492            .create_child(root, RegionKind::CheckpointGc, Cx::new())
493            .expect("gc");
494        let obs = tree
495            .create_child(root, RegionKind::Observability, Cx::new())
496            .expect("obs");
497
498        // Verify root.
499        assert_eq!(
500            tree.root(),
501            Some(root),
502            "bead_id={BEAD_ID} case=root_exists"
503        );
504        assert_eq!(tree.kind(root), Some(RegionKind::DbRoot));
505
506        // Verify children of root.
507        let children = tree.children(root).expect("root has children");
508        assert_eq!(
509            children.len(),
510            5,
511            "bead_id={BEAD_ID} case=root_has_5_service_children"
512        );
513        assert_eq!(children, &[wc, ss, repl, gc, obs]);
514
515        // Verify each child's kind and parent.
516        assert_eq!(tree.kind(wc), Some(RegionKind::WriteCoordinator));
517        assert_eq!(tree.kind(ss), Some(RegionKind::SymbolStore));
518        assert_eq!(tree.kind(repl), Some(RegionKind::Replication));
519        assert_eq!(tree.kind(gc), Some(RegionKind::CheckpointGc));
520        assert_eq!(tree.kind(obs), Some(RegionKind::Observability));
521
522        for &child in children {
523            assert_eq!(
524                tree.parent(child),
525                Some(Some(root)),
526                "bead_id={BEAD_ID} case=child_parent_is_root region={}",
527                child.get()
528            );
529        }
530    }
531
532    #[test]
533    fn test_region_quiescence_all_children_complete() {
534        let mut tree = RegionTree::new();
535        let root = tree
536            .create_root(RegionKind::DbRoot, Cx::new())
537            .expect("root");
538        let region = tree
539            .create_child(root, RegionKind::WriteCoordinator, Cx::new())
540            .expect("wc");
541
542        // Register 5 tasks.
543        let tasks: Vec<TaskHandle> = (0..5)
544            .map(|_| tree.register_task(region).expect("register task"))
545            .collect();
546
547        assert_eq!(
548            tree.active_tasks(region),
549            5,
550            "bead_id={BEAD_ID} case=5_tasks_registered"
551        );
552
553        // Begin close.
554        tree.begin_close(region).expect("begin close");
555        assert_eq!(tree.state(region), Some(RegionState::Closing));
556        assert!(
557            !tree.is_quiescent(region),
558            "bead_id={BEAD_ID} case=not_quiescent_with_active_tasks"
559        );
560
561        // Complete tasks one by one; quiescence only after all 5.
562        for (i, task) in tasks.into_iter().enumerate() {
563            drop(task);
564            if i < 4 {
565                assert!(
566                    !tree.is_quiescent(region),
567                    "bead_id={BEAD_ID} case=not_quiescent_after_{}_completions",
568                    i + 1
569                );
570            }
571        }
572
573        assert!(
574            tree.is_quiescent(region),
575            "bead_id={BEAD_ID} case=quiescent_after_all_tasks_complete"
576        );
577        tree.complete_close(region).expect("complete close");
578        assert_eq!(tree.state(region), Some(RegionState::Closed));
579    }
580
581    #[test]
582    fn test_region_quiescence_finalizers_run() {
583        let mut tree = RegionTree::new();
584        let root = tree
585            .create_root(RegionKind::DbRoot, Cx::new())
586            .expect("root");
587        let region = tree
588            .create_child(root, RegionKind::WriteCoordinator, Cx::new())
589            .expect("wc");
590
591        // Register 3 tasks with corresponding finalizers.
592        let flags: Vec<Arc<AtomicBool>> =
593            (0..3).map(|_| Arc::new(AtomicBool::new(false))).collect();
594        let tasks: Vec<TaskHandle> = (0..3)
595            .map(|_| tree.register_task(region).expect("register task"))
596            .collect();
597        for flag in &flags {
598            let f = Arc::clone(flag);
599            tree.register_finalizer(region, move || {
600                f.store(true, Ordering::Release);
601            })
602            .expect("register finalizer");
603        }
604
605        // Begin close and complete tasks.
606        tree.begin_close(region).expect("begin close");
607        drop(tasks);
608
609        // Finalizers have NOT run yet.
610        for (i, flag) in flags.iter().enumerate() {
611            assert!(
612                !flag.load(Ordering::Acquire),
613                "bead_id={BEAD_ID} case=finalizer_{i}_not_run_before_complete_close"
614            );
615        }
616
617        // complete_close runs finalizers.
618        tree.complete_close(region).expect("complete close");
619        for (i, flag) in flags.iter().enumerate() {
620            assert!(
621                flag.load(Ordering::Acquire),
622                "bead_id={BEAD_ID} case=finalizer_{i}_ran_after_complete_close"
623            );
624        }
625    }
626
627    #[test]
628    fn test_region_quiescence_obligations_resolved() {
629        let mut tree = RegionTree::new();
630        let root = tree
631            .create_root(RegionKind::DbRoot, Cx::new())
632            .expect("root");
633        let region = tree
634            .create_child(root, RegionKind::WriteCoordinator, Cx::new())
635            .expect("wc");
636
637        let obligations: Vec<ObligationHandle> = (0..3)
638            .map(|_| {
639                tree.register_obligation(region)
640                    .expect("register obligation")
641            })
642            .collect();
643
644        tree.begin_close(region).expect("begin close");
645        assert!(
646            !tree.is_quiescent(region),
647            "bead_id={BEAD_ID} case=not_quiescent_with_pending_obligations"
648        );
649
650        // Resolve obligations one by one.
651        for (i, obligation) in obligations.into_iter().enumerate() {
652            obligation.resolve();
653            if i < 2 {
654                assert!(
655                    !tree.is_quiescent(region),
656                    "bead_id={BEAD_ID} case=not_quiescent_after_{}_resolutions",
657                    i + 1
658                );
659            }
660        }
661
662        assert!(
663            tree.is_quiescent(region),
664            "bead_id={BEAD_ID} case=quiescent_after_all_obligations_resolved"
665        );
666        tree.complete_close(region).expect("complete close");
667    }
668
669    #[test]
670    fn test_no_detached_tasks() {
671        let tree = RegionTree::new();
672        // No regions exist — spawning a task without a valid region must fail.
673        let result = tree.register_task(Region::new(999));
674        assert!(
675            result.is_err(),
676            "bead_id={BEAD_ID} case=detached_task_rejected"
677        );
678    }
679
680    #[test]
681    fn test_complete_close_requires_closing_state() {
682        let mut tree = RegionTree::new();
683        let root = tree
684            .create_root(RegionKind::DbRoot, Cx::new())
685            .expect("root");
686        let child = tree
687            .create_child(root, RegionKind::WriteCoordinator, Cx::new())
688            .expect("child");
689
690        let err = tree
691            .complete_close(child)
692            .expect_err("must require begin_close");
693        assert!(
694            matches!(err, FrankenError::Internal(_)),
695            "bead_id={BEAD_ID} case=complete_close_requires_closing_state got {err:?}"
696        );
697        assert_eq!(
698            tree.state(child),
699            Some(RegionState::Open),
700            "bead_id={BEAD_ID} case=child_state_unchanged_when_close_rejected"
701        );
702    }
703
704    #[test]
705    fn test_database_close_awaits_quiescence() {
706        let mut tree = RegionTree::new();
707        let root = tree
708            .create_root(RegionKind::DbRoot, Cx::new())
709            .expect("root");
710
711        // Create service regions with active workers.
712        let wc = tree
713            .create_child(root, RegionKind::WriteCoordinator, Cx::new())
714            .expect("wc");
715        let gc = tree
716            .create_child(root, RegionKind::CheckpointGc, Cx::new())
717            .expect("gc");
718
719        let wc_task = tree.register_task(wc).expect("wc task");
720        let gc_task = tree.register_task(gc).expect("gc task");
721
722        let finalized = Arc::new(AtomicBool::new(false));
723        {
724            let flag = Arc::clone(&finalized);
725            tree.register_finalizer(root, move || {
726                flag.store(true, Ordering::Release);
727            })
728            .expect("root finalizer");
729        }
730
731        // Begin close of root (cascades to children).
732        tree.begin_close(root).expect("begin close root");
733        assert_eq!(tree.state(wc), Some(RegionState::Closing));
734        assert_eq!(tree.state(gc), Some(RegionState::Closing));
735
736        // Root is not quiescent yet (active child tasks).
737        assert!(
738            !tree.is_quiescent(root),
739            "bead_id={BEAD_ID} case=root_not_quiescent_with_active_children"
740        );
741
742        // Complete child tasks.
743        drop(wc_task);
744        assert!(
745            !tree.is_quiescent(root),
746            "bead_id={BEAD_ID} case=root_not_quiescent_gc_still_active"
747        );
748        drop(gc_task);
749
750        // Children are quiescent but not yet closed.
751        assert!(tree.is_quiescent(wc));
752        assert!(tree.is_quiescent(gc));
753        tree.complete_close(wc).expect("close wc");
754        tree.complete_close(gc).expect("close gc");
755
756        // Now root is quiescent.
757        assert!(
758            tree.is_quiescent(root),
759            "bead_id={BEAD_ID} case=root_quiescent_after_children_closed"
760        );
761        tree.complete_close(root).expect("close root");
762
763        assert!(
764            finalized.load(Ordering::Acquire),
765            "bead_id={BEAD_ID} case=root_finalizer_ran"
766        );
767        assert_eq!(tree.state(root), Some(RegionState::Closed));
768    }
769
770    #[test]
771    fn test_per_connection_region_child_of_root() {
772        let mut tree = RegionTree::new();
773        let root = tree
774            .create_root(RegionKind::DbRoot, Cx::new())
775            .expect("root");
776        let conn = tree
777            .create_child(root, RegionKind::PerConnection, Cx::new())
778            .expect("conn");
779
780        assert_eq!(
781            tree.parent(conn),
782            Some(Some(root)),
783            "bead_id={BEAD_ID} case=connection_is_child_of_root"
784        );
785        assert_eq!(tree.kind(conn), Some(RegionKind::PerConnection));
786
787        // Closing root cascades cancellation to connection region.
788        let conn_cx = tree.cx(conn).expect("conn cx");
789        tree.begin_close(root).expect("begin close root");
790        assert!(
791            conn_cx.is_cancel_requested(),
792            "bead_id={BEAD_ID} case=root_close_cancels_connection"
793        );
794        assert_eq!(tree.state(conn), Some(RegionState::Closing));
795
796        tree.complete_close(conn).expect("close conn");
797        tree.complete_close(root).expect("close root");
798    }
799
800    #[test]
801    #[allow(clippy::too_many_lines)]
802    fn test_e2e_structured_concurrency_shutdown() {
803        let mut tree = RegionTree::new();
804        let root = tree
805            .create_root(RegionKind::DbRoot, Cx::new())
806            .expect("root");
807
808        // Create normative service regions.
809        let wc = tree
810            .create_child(root, RegionKind::WriteCoordinator, Cx::new())
811            .expect("wc");
812        let ss = tree
813            .create_child(root, RegionKind::SymbolStore, Cx::new())
814            .expect("ss");
815        let repl = tree
816            .create_child(root, RegionKind::Replication, Cx::new())
817            .expect("repl");
818        let gc = tree
819            .create_child(root, RegionKind::CheckpointGc, Cx::new())
820            .expect("gc");
821        let obs = tree
822            .create_child(root, RegionKind::Observability, Cx::new())
823            .expect("obs");
824
825        // Create 3 connection regions with transaction children.
826        let conns: Vec<Region> = (0..3)
827            .map(|_| {
828                tree.create_child(root, RegionKind::PerConnection, Cx::new())
829                    .expect("conn")
830            })
831            .collect();
832
833        let mut txn_tasks = Vec::new();
834        for &conn in &conns {
835            let txn = tree
836                .create_child(conn, RegionKind::PerTransaction, Cx::new())
837                .expect("txn");
838            txn_tasks.push(tree.register_task(txn).expect("txn task"));
839        }
840
841        // Background workers in service regions.
842        let service_tasks = vec![
843            tree.register_task(wc).expect("wc task"),
844            tree.register_task(ss).expect("ss task"),
845            tree.register_task(repl).expect("repl task"),
846            tree.register_task(gc).expect("gc task"),
847            tree.register_task(obs).expect("obs task"),
848        ];
849
850        // Finalizers on root.
851        let finalized_count = Arc::new(AtomicUsize::new(0));
852        for _ in 0..3 {
853            let counter = Arc::clone(&finalized_count);
854            tree.register_finalizer(root, move || {
855                counter.fetch_add(1, Ordering::AcqRel);
856            })
857            .expect("root finalizer");
858        }
859
860        // Begin close of root (should cascade to all descendants).
861        tree.begin_close(root).expect("begin close root");
862
863        // All regions should be Closing.
864        assert_eq!(tree.state(root), Some(RegionState::Closing));
865        for &conn in &conns {
866            assert_eq!(tree.state(conn), Some(RegionState::Closing));
867        }
868
869        // Nothing is quiescent yet.
870        assert!(
871            !tree.is_quiescent(root),
872            "bead_id={BEAD_ID} case=e2e_root_not_quiescent_initially"
873        );
874
875        // Complete all tasks.
876        drop(txn_tasks);
877        drop(service_tasks);
878
879        // Close bottom-up: transactions → connections → services → root.
880        for &conn in &conns {
881            let txn_children = tree.children(conn).expect("conn children").to_vec();
882            for txn in txn_children {
883                tree.complete_close(txn).expect("close txn");
884            }
885        }
886        for &conn in &conns {
887            tree.complete_close(conn).expect("close conn");
888        }
889        for &svc in &[wc, ss, repl, gc, obs] {
890            tree.complete_close(svc).expect("close svc");
891        }
892
893        // Finally close root.
894        assert!(
895            tree.is_quiescent(root),
896            "bead_id={BEAD_ID} case=e2e_root_quiescent"
897        );
898        tree.complete_close(root).expect("close root");
899
900        assert_eq!(
901            finalized_count.load(Ordering::Acquire),
902            3,
903            "bead_id={BEAD_ID} case=e2e_all_finalizers_ran"
904        );
905        assert_eq!(
906            tree.state(root),
907            Some(RegionState::Closed),
908            "bead_id={BEAD_ID} case=e2e_root_closed"
909        );
910
911        // Verify zero orphan tasks.
912        assert_eq!(tree.active_tasks(root), 0);
913        for &conn in &conns {
914            assert_eq!(tree.active_tasks(conn), 0);
915        }
916    }
917
918    #[test]
919    fn test_close_and_drain_threaded() {
920        use std::sync::Mutex;
921        use std::thread;
922        use std::time::Duration;
923
924        let tree = Arc::new(Mutex::new(RegionTree::new()));
925        let root = {
926            let mut t = tree.lock().unwrap_or_else(|e| e.into_inner());
927            t.create_root(RegionKind::DbRoot, Cx::new()).expect("root")
928        };
929        let wc = {
930            let mut t = tree.lock().unwrap_or_else(|e| e.into_inner());
931            t.create_child(root, RegionKind::WriteCoordinator, Cx::new())
932                .expect("wc")
933        };
934
935        // Register tasks before spawning threads.
936        let task1 = tree
937            .lock()
938            .unwrap_or_else(|e| e.into_inner())
939            .register_task(wc)
940            .expect("t1");
941        let task2 = tree
942            .lock()
943            .unwrap_or_else(|e| e.into_inner())
944            .register_task(wc)
945            .expect("t2");
946
947        let completed = Arc::new(AtomicBool::new(false));
948        let flag = Arc::clone(&completed);
949
950        // Spawn threads that hold tasks and complete after brief work.
951        let t1 = thread::spawn(move || {
952            thread::sleep(Duration::from_millis(20));
953            drop(task1);
954        });
955        let t2 = thread::spawn(move || {
956            thread::sleep(Duration::from_millis(30));
957            drop(task2);
958        });
959
960        // close_and_drain blocks until all tasks complete.
961        {
962            let mut t = tree.lock().unwrap_or_else(|e| e.into_inner());
963            t.close_and_drain(root).expect("close_and_drain");
964        }
965        flag.store(true, Ordering::Release);
966
967        t1.join().expect("t1 join");
968        t2.join().expect("t2 join");
969
970        assert!(
971            completed.load(Ordering::Acquire),
972            "bead_id={BEAD_ID} case=threaded_close_completed"
973        );
974        assert_eq!(
975            tree.lock().unwrap_or_else(|e| e.into_inner()).state(root),
976            Some(RegionState::Closed),
977            "bead_id={BEAD_ID} case=threaded_root_closed"
978        );
979    }
980}