use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use fsqlite_error::{FrankenError, Result};
use fsqlite_types::Region;
use fsqlite_types::cx::{self, Cx};
use tracing::debug;
use crate::quiescence::{ChildRegionQuiescence, RegionQuiescenceSnapshot};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RegionKind {
DbRoot,
WriteCoordinator,
SymbolStore,
Replication,
CheckpointGc,
Observability,
PerConnection,
PerTransaction,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RegionState {
Open,
Closing,
Closed,
}
type Finalizer = Box<dyn FnOnce() + Send>;
type SharedCounter = Arc<AtomicUsize>;
fn new_counter() -> SharedCounter {
Arc::new(AtomicUsize::new(0))
}
struct RegionNode {
kind: RegionKind,
state: RegionState,
cx: Cx<cx::FullCaps>,
parent: Option<Region>,
children: Vec<Region>,
finalizers: Vec<Finalizer>,
active_tasks: SharedCounter,
active_obligations: SharedCounter,
}
pub struct TaskHandle {
counter: SharedCounter,
region: Region,
}
impl TaskHandle {
#[must_use]
pub const fn region(&self) -> Region {
self.region
}
}
impl Drop for TaskHandle {
fn drop(&mut self) {
self.counter.fetch_sub(1, Ordering::AcqRel);
}
}
pub struct ObligationHandle {
counter: SharedCounter,
region: Region,
}
impl ObligationHandle {
#[must_use]
pub const fn region(&self) -> Region {
self.region
}
pub fn resolve(self) {
}
}
impl Drop for ObligationHandle {
fn drop(&mut self) {
self.counter.fetch_sub(1, Ordering::AcqRel);
}
}
pub struct RegionTree {
nodes: HashMap<Region, RegionNode>,
next_id: u32,
root: Option<Region>,
}
impl Default for RegionTree {
fn default() -> Self {
Self::new()
}
}
impl RegionTree {
#[must_use]
pub fn new() -> Self {
Self {
nodes: HashMap::new(),
next_id: 0,
root: None,
}
}
pub fn create_root(&mut self, kind: RegionKind, cx: Cx<cx::FullCaps>) -> Result<Region> {
if self.root.is_some() {
return Err(FrankenError::Internal(
"root region already exists".to_owned(),
));
}
let id = self.alloc_id();
self.nodes.insert(
id,
RegionNode {
kind,
state: RegionState::Open,
cx,
parent: None,
children: Vec::new(),
finalizers: Vec::new(),
active_tasks: new_counter(),
active_obligations: new_counter(),
},
);
self.root = Some(id);
debug!(region = id.get(), kind = ?kind, "region created (root)");
Ok(id)
}
pub fn create_child(
&mut self,
parent: Region,
kind: RegionKind,
cx: Cx<cx::FullCaps>,
) -> Result<Region> {
let parent_state = self.nodes.get(&parent).map(|n| n.state).ok_or_else(|| {
FrankenError::Internal(format!("parent region {} not found", parent.get()))
})?;
if parent_state != RegionState::Open {
return Err(FrankenError::Busy);
}
let id = self.alloc_id();
self.nodes.insert(
id,
RegionNode {
kind,
state: RegionState::Open,
cx,
parent: Some(parent),
children: Vec::new(),
finalizers: Vec::new(),
active_tasks: new_counter(),
active_obligations: new_counter(),
},
);
if let Some(parent_node) = self.nodes.get_mut(&parent) {
parent_node.children.push(id);
}
debug!(region = id.get(), parent = parent.get(), kind = ?kind, "region created (child)");
Ok(id)
}
#[must_use]
pub fn root(&self) -> Option<Region> {
self.root
}
#[must_use]
pub fn kind(&self, id: Region) -> Option<RegionKind> {
self.nodes.get(&id).map(|n| n.kind)
}
#[must_use]
pub fn state(&self, id: Region) -> Option<RegionState> {
self.nodes.get(&id).map(|n| n.state)
}
#[must_use]
pub fn parent(&self, id: Region) -> Option<Option<Region>> {
self.nodes.get(&id).map(|n| n.parent)
}
#[must_use]
pub fn children(&self, id: Region) -> Option<&[Region]> {
self.nodes.get(&id).map(|n| n.children.as_slice())
}
#[must_use]
pub fn cx(&self, id: Region) -> Option<Cx<cx::FullCaps>> {
self.nodes.get(&id).map(|n| n.cx.clone())
}
#[must_use]
pub fn active_tasks(&self, id: Region) -> usize {
self.nodes
.get(&id)
.map_or(0, |n| n.active_tasks.load(Ordering::Acquire))
}
#[must_use]
pub fn active_obligations(&self, id: Region) -> usize {
self.nodes
.get(&id)
.map_or(0, |n| n.active_obligations.load(Ordering::Acquire))
}
pub fn register_task(&self, id: Region) -> Result<TaskHandle> {
let node = self
.nodes
.get(&id)
.ok_or_else(|| FrankenError::Internal(format!("region {} not found", id.get())))?;
if node.state != RegionState::Open {
return Err(FrankenError::Busy);
}
node.active_tasks.fetch_add(1, Ordering::AcqRel);
debug!(region = id.get(), "task registered");
Ok(TaskHandle {
counter: Arc::clone(&node.active_tasks),
region: id,
})
}
pub fn register_obligation(&self, id: Region) -> Result<ObligationHandle> {
let node = self
.nodes
.get(&id)
.ok_or_else(|| FrankenError::Internal(format!("region {} not found", id.get())))?;
if node.state == RegionState::Closed {
return Err(FrankenError::Busy);
}
node.active_obligations.fetch_add(1, Ordering::AcqRel);
debug!(region = id.get(), "obligation registered");
Ok(ObligationHandle {
counter: Arc::clone(&node.active_obligations),
region: id,
})
}
pub fn register_finalizer(
&mut self,
id: Region,
finalizer: impl FnOnce() + Send + 'static,
) -> Result<()> {
let node = self
.nodes
.get_mut(&id)
.ok_or_else(|| FrankenError::Internal(format!("region {} not found", id.get())))?;
if node.state != RegionState::Open {
return Err(FrankenError::Busy);
}
node.finalizers.push(Box::new(finalizer));
Ok(())
}
pub fn begin_close(&mut self, id: Region) -> Result<()> {
let children = self
.nodes
.get(&id)
.ok_or_else(|| FrankenError::Internal(format!("region {} not found", id.get())))?
.children
.clone();
let node = self
.nodes
.get_mut(&id)
.expect("region confirmed present above");
if node.state == RegionState::Closed {
return Ok(());
}
node.cx.cancel();
node.state = RegionState::Closing;
debug!(region = id.get(), kind = ?node.kind, "region closing");
for child in children {
if self.state(child) == Some(RegionState::Open) {
self.begin_close(child)?;
}
}
Ok(())
}
#[must_use]
pub fn is_quiescent(&self, id: Region) -> bool {
self.quiescence_snapshot(id)
.is_some_and(|snapshot| snapshot.is_quiescent())
}
#[must_use]
pub fn quiescence_snapshot(&self, id: Region) -> Option<RegionQuiescenceSnapshot> {
let node = self.nodes.get(&id)?;
let non_closed_children = node
.children
.iter()
.filter_map(|child| {
let state = self.state(*child);
if state == Some(RegionState::Closed) {
None
} else {
Some(ChildRegionQuiescence {
region: *child,
state,
})
}
})
.collect();
Some(RegionQuiescenceSnapshot {
region: id,
state: node.state,
active_tasks: node.active_tasks.load(Ordering::Acquire),
active_obligations: node.active_obligations.load(Ordering::Acquire),
non_closed_children,
})
}
pub fn complete_close(&mut self, id: Region) -> Result<()> {
let state = self
.nodes
.get(&id)
.map(|n| n.state)
.ok_or_else(|| FrankenError::Internal(format!("region {} not found", id.get())))?;
if state == RegionState::Closed {
return Ok(());
}
if state != RegionState::Closing {
return Err(FrankenError::Internal(
"region must be in Closing state before complete_close".to_owned(),
));
}
let snapshot = self
.quiescence_snapshot(id)
.ok_or_else(|| FrankenError::Internal(format!("region {} not found", id.get())))?;
if !snapshot.is_quiescent() {
return Err(FrankenError::Internal(format!(
"region not quiescent; children_open={} active_tasks={} active_obligations={}",
snapshot.non_closed_children.len(),
snapshot.active_tasks,
snapshot.active_obligations
)));
}
let node = self
.nodes
.get_mut(&id)
.ok_or_else(|| FrankenError::Internal(format!("region {} not found", id.get())))?;
let finalizers = std::mem::take(&mut node.finalizers);
for f in finalizers {
f();
}
node.state = RegionState::Closed;
debug!(region = id.get(), kind = ?node.kind, "region closed");
Ok(())
}
pub fn close_and_drain(&mut self, id: Region) -> Result<()> {
self.begin_close(id)?;
self.drain_subtree(id)
}
fn drain_subtree(&mut self, id: Region) -> Result<()> {
let children = self
.nodes
.get(&id)
.map(|n| n.children.clone())
.unwrap_or_default();
for child in children {
self.drain_subtree(child)?;
}
while self.active_tasks(id) > 0 || self.active_obligations(id) > 0 {
std::hint::spin_loop();
}
self.complete_close(id)
}
fn alloc_id(&mut self) -> Region {
let id = Region::new(self.next_id);
self.next_id = self.next_id.checked_add(1).expect("region id overflow");
id
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use super::*;
const BEAD_ID: &str = "bd-3go.9";
#[test]
fn test_region_tree_structure() {
let mut tree = RegionTree::new();
let root = tree
.create_root(RegionKind::DbRoot, Cx::new())
.expect("root creation");
let wc = tree
.create_child(root, RegionKind::WriteCoordinator, Cx::new())
.expect("wc");
let ss = tree
.create_child(root, RegionKind::SymbolStore, Cx::new())
.expect("ss");
let repl = tree
.create_child(root, RegionKind::Replication, Cx::new())
.expect("repl");
let gc = tree
.create_child(root, RegionKind::CheckpointGc, Cx::new())
.expect("gc");
let obs = tree
.create_child(root, RegionKind::Observability, Cx::new())
.expect("obs");
assert_eq!(
tree.root(),
Some(root),
"bead_id={BEAD_ID} case=root_exists"
);
assert_eq!(tree.kind(root), Some(RegionKind::DbRoot));
let children = tree.children(root).expect("root has children");
assert_eq!(
children.len(),
5,
"bead_id={BEAD_ID} case=root_has_5_service_children"
);
assert_eq!(children, &[wc, ss, repl, gc, obs]);
assert_eq!(tree.kind(wc), Some(RegionKind::WriteCoordinator));
assert_eq!(tree.kind(ss), Some(RegionKind::SymbolStore));
assert_eq!(tree.kind(repl), Some(RegionKind::Replication));
assert_eq!(tree.kind(gc), Some(RegionKind::CheckpointGc));
assert_eq!(tree.kind(obs), Some(RegionKind::Observability));
for &child in children {
assert_eq!(
tree.parent(child),
Some(Some(root)),
"bead_id={BEAD_ID} case=child_parent_is_root region={}",
child.get()
);
}
}
#[test]
fn test_region_quiescence_all_children_complete() {
let mut tree = RegionTree::new();
let root = tree
.create_root(RegionKind::DbRoot, Cx::new())
.expect("root");
let region = tree
.create_child(root, RegionKind::WriteCoordinator, Cx::new())
.expect("wc");
let tasks: Vec<TaskHandle> = (0..5)
.map(|_| tree.register_task(region).expect("register task"))
.collect();
assert_eq!(
tree.active_tasks(region),
5,
"bead_id={BEAD_ID} case=5_tasks_registered"
);
tree.begin_close(region).expect("begin close");
assert_eq!(tree.state(region), Some(RegionState::Closing));
assert!(
!tree.is_quiescent(region),
"bead_id={BEAD_ID} case=not_quiescent_with_active_tasks"
);
for (i, task) in tasks.into_iter().enumerate() {
drop(task);
if i < 4 {
assert!(
!tree.is_quiescent(region),
"bead_id={BEAD_ID} case=not_quiescent_after_{}_completions",
i + 1
);
}
}
assert!(
tree.is_quiescent(region),
"bead_id={BEAD_ID} case=quiescent_after_all_tasks_complete"
);
tree.complete_close(region).expect("complete close");
assert_eq!(tree.state(region), Some(RegionState::Closed));
}
#[test]
fn test_region_quiescence_finalizers_run() {
let mut tree = RegionTree::new();
let root = tree
.create_root(RegionKind::DbRoot, Cx::new())
.expect("root");
let region = tree
.create_child(root, RegionKind::WriteCoordinator, Cx::new())
.expect("wc");
let flags: Vec<Arc<AtomicBool>> =
(0..3).map(|_| Arc::new(AtomicBool::new(false))).collect();
let tasks: Vec<TaskHandle> = (0..3)
.map(|_| tree.register_task(region).expect("register task"))
.collect();
for flag in &flags {
let f = Arc::clone(flag);
tree.register_finalizer(region, move || {
f.store(true, Ordering::Release);
})
.expect("register finalizer");
}
tree.begin_close(region).expect("begin close");
drop(tasks);
for (i, flag) in flags.iter().enumerate() {
assert!(
!flag.load(Ordering::Acquire),
"bead_id={BEAD_ID} case=finalizer_{i}_not_run_before_complete_close"
);
}
tree.complete_close(region).expect("complete close");
for (i, flag) in flags.iter().enumerate() {
assert!(
flag.load(Ordering::Acquire),
"bead_id={BEAD_ID} case=finalizer_{i}_ran_after_complete_close"
);
}
}
#[test]
fn test_region_quiescence_obligations_resolved() {
let mut tree = RegionTree::new();
let root = tree
.create_root(RegionKind::DbRoot, Cx::new())
.expect("root");
let region = tree
.create_child(root, RegionKind::WriteCoordinator, Cx::new())
.expect("wc");
let obligations: Vec<ObligationHandle> = (0..3)
.map(|_| {
tree.register_obligation(region)
.expect("register obligation")
})
.collect();
tree.begin_close(region).expect("begin close");
assert!(
!tree.is_quiescent(region),
"bead_id={BEAD_ID} case=not_quiescent_with_pending_obligations"
);
for (i, obligation) in obligations.into_iter().enumerate() {
obligation.resolve();
if i < 2 {
assert!(
!tree.is_quiescent(region),
"bead_id={BEAD_ID} case=not_quiescent_after_{}_resolutions",
i + 1
);
}
}
assert!(
tree.is_quiescent(region),
"bead_id={BEAD_ID} case=quiescent_after_all_obligations_resolved"
);
tree.complete_close(region).expect("complete close");
}
#[test]
fn test_quiescence_snapshot_reports_precise_blockers() {
let mut tree = RegionTree::new();
let root = tree
.create_root(RegionKind::DbRoot, Cx::new())
.expect("root");
let parent = tree
.create_child(root, RegionKind::WriteCoordinator, Cx::new())
.expect("parent");
let child = tree
.create_child(parent, RegionKind::PerConnection, Cx::new())
.expect("child");
let _task = tree.register_task(parent).expect("task");
let _obligation = tree.register_obligation(parent).expect("obligation");
tree.begin_close(parent).expect("begin close");
let snapshot = tree
.quiescence_snapshot(parent)
.expect("snapshot for existing region");
assert_eq!(snapshot.region, parent);
assert_eq!(snapshot.state, RegionState::Closing);
assert_eq!(snapshot.active_tasks, 1);
assert_eq!(snapshot.active_obligations, 1);
assert_eq!(snapshot.non_closed_children.len(), 1);
assert_eq!(snapshot.non_closed_children[0].region, child);
assert_eq!(
snapshot.non_closed_children[0].state,
Some(RegionState::Closing)
);
assert_eq!(snapshot.blocker_count(), 3);
assert!(!snapshot.is_quiescent());
}
#[test]
fn test_complete_close_error_includes_quiescence_counts() {
let mut tree = RegionTree::new();
let root = tree
.create_root(RegionKind::DbRoot, Cx::new())
.expect("root");
let child = tree
.create_child(root, RegionKind::WriteCoordinator, Cx::new())
.expect("child");
let _task = tree.register_task(child).expect("task");
tree.begin_close(child).expect("begin close");
let err = tree
.complete_close(child)
.expect_err("active task must block close");
let FrankenError::Internal(message) = err else {
panic!("expected internal error for non-quiescent close");
};
assert!(
message.contains("children_open=0"),
"bead_id={BEAD_ID} case=quiescence_error_children_count message={message}"
);
assert!(
message.contains("active_tasks=1"),
"bead_id={BEAD_ID} case=quiescence_error_task_count message={message}"
);
assert!(
message.contains("active_obligations=0"),
"bead_id={BEAD_ID} case=quiescence_error_obligation_count message={message}"
);
}
#[test]
fn test_no_detached_tasks() {
let tree = RegionTree::new();
let result = tree.register_task(Region::new(999));
assert!(
result.is_err(),
"bead_id={BEAD_ID} case=detached_task_rejected"
);
}
#[test]
fn test_complete_close_requires_closing_state() {
let mut tree = RegionTree::new();
let root = tree
.create_root(RegionKind::DbRoot, Cx::new())
.expect("root");
let child = tree
.create_child(root, RegionKind::WriteCoordinator, Cx::new())
.expect("child");
let err = tree
.complete_close(child)
.expect_err("must require begin_close");
assert!(
matches!(err, FrankenError::Internal(_)),
"bead_id={BEAD_ID} case=complete_close_requires_closing_state got {err:?}"
);
assert_eq!(
tree.state(child),
Some(RegionState::Open),
"bead_id={BEAD_ID} case=child_state_unchanged_when_close_rejected"
);
}
#[test]
fn test_database_close_awaits_quiescence() {
let mut tree = RegionTree::new();
let root = tree
.create_root(RegionKind::DbRoot, Cx::new())
.expect("root");
let wc = tree
.create_child(root, RegionKind::WriteCoordinator, Cx::new())
.expect("wc");
let gc = tree
.create_child(root, RegionKind::CheckpointGc, Cx::new())
.expect("gc");
let wc_task = tree.register_task(wc).expect("wc task");
let gc_task = tree.register_task(gc).expect("gc task");
let finalized = Arc::new(AtomicBool::new(false));
{
let flag = Arc::clone(&finalized);
tree.register_finalizer(root, move || {
flag.store(true, Ordering::Release);
})
.expect("root finalizer");
}
tree.begin_close(root).expect("begin close root");
assert_eq!(tree.state(wc), Some(RegionState::Closing));
assert_eq!(tree.state(gc), Some(RegionState::Closing));
assert!(
!tree.is_quiescent(root),
"bead_id={BEAD_ID} case=root_not_quiescent_with_active_children"
);
drop(wc_task);
assert!(
!tree.is_quiescent(root),
"bead_id={BEAD_ID} case=root_not_quiescent_gc_still_active"
);
drop(gc_task);
assert!(tree.is_quiescent(wc));
assert!(tree.is_quiescent(gc));
tree.complete_close(wc).expect("close wc");
tree.complete_close(gc).expect("close gc");
assert!(
tree.is_quiescent(root),
"bead_id={BEAD_ID} case=root_quiescent_after_children_closed"
);
tree.complete_close(root).expect("close root");
assert!(
finalized.load(Ordering::Acquire),
"bead_id={BEAD_ID} case=root_finalizer_ran"
);
assert_eq!(tree.state(root), Some(RegionState::Closed));
}
#[test]
fn test_per_connection_region_child_of_root() {
let mut tree = RegionTree::new();
let root = tree
.create_root(RegionKind::DbRoot, Cx::new())
.expect("root");
let conn = tree
.create_child(root, RegionKind::PerConnection, Cx::new())
.expect("conn");
assert_eq!(
tree.parent(conn),
Some(Some(root)),
"bead_id={BEAD_ID} case=connection_is_child_of_root"
);
assert_eq!(tree.kind(conn), Some(RegionKind::PerConnection));
let conn_cx = tree.cx(conn).expect("conn cx");
tree.begin_close(root).expect("begin close root");
assert!(
conn_cx.is_cancel_requested(),
"bead_id={BEAD_ID} case=root_close_cancels_connection"
);
assert_eq!(tree.state(conn), Some(RegionState::Closing));
tree.complete_close(conn).expect("close conn");
tree.complete_close(root).expect("close root");
}
#[test]
#[allow(clippy::too_many_lines)]
fn test_e2e_structured_concurrency_shutdown() {
let mut tree = RegionTree::new();
let root = tree
.create_root(RegionKind::DbRoot, Cx::new())
.expect("root");
let wc = tree
.create_child(root, RegionKind::WriteCoordinator, Cx::new())
.expect("wc");
let ss = tree
.create_child(root, RegionKind::SymbolStore, Cx::new())
.expect("ss");
let repl = tree
.create_child(root, RegionKind::Replication, Cx::new())
.expect("repl");
let gc = tree
.create_child(root, RegionKind::CheckpointGc, Cx::new())
.expect("gc");
let obs = tree
.create_child(root, RegionKind::Observability, Cx::new())
.expect("obs");
let conns: Vec<Region> = (0..3)
.map(|_| {
tree.create_child(root, RegionKind::PerConnection, Cx::new())
.expect("conn")
})
.collect();
let mut txn_tasks = Vec::new();
for &conn in &conns {
let txn = tree
.create_child(conn, RegionKind::PerTransaction, Cx::new())
.expect("txn");
txn_tasks.push(tree.register_task(txn).expect("txn task"));
}
let service_tasks = vec![
tree.register_task(wc).expect("wc task"),
tree.register_task(ss).expect("ss task"),
tree.register_task(repl).expect("repl task"),
tree.register_task(gc).expect("gc task"),
tree.register_task(obs).expect("obs task"),
];
let finalized_count = Arc::new(AtomicUsize::new(0));
for _ in 0..3 {
let counter = Arc::clone(&finalized_count);
tree.register_finalizer(root, move || {
counter.fetch_add(1, Ordering::AcqRel);
})
.expect("root finalizer");
}
tree.begin_close(root).expect("begin close root");
assert_eq!(tree.state(root), Some(RegionState::Closing));
for &conn in &conns {
assert_eq!(tree.state(conn), Some(RegionState::Closing));
}
assert!(
!tree.is_quiescent(root),
"bead_id={BEAD_ID} case=e2e_root_not_quiescent_initially"
);
drop(txn_tasks);
drop(service_tasks);
for &conn in &conns {
let txn_children = tree.children(conn).expect("conn children").to_vec();
for txn in txn_children {
tree.complete_close(txn).expect("close txn");
}
}
for &conn in &conns {
tree.complete_close(conn).expect("close conn");
}
for &svc in &[wc, ss, repl, gc, obs] {
tree.complete_close(svc).expect("close svc");
}
assert!(
tree.is_quiescent(root),
"bead_id={BEAD_ID} case=e2e_root_quiescent"
);
tree.complete_close(root).expect("close root");
assert_eq!(
finalized_count.load(Ordering::Acquire),
3,
"bead_id={BEAD_ID} case=e2e_all_finalizers_ran"
);
assert_eq!(
tree.state(root),
Some(RegionState::Closed),
"bead_id={BEAD_ID} case=e2e_root_closed"
);
assert_eq!(tree.active_tasks(root), 0);
for &conn in &conns {
assert_eq!(tree.active_tasks(conn), 0);
}
}
#[test]
fn test_close_and_drain_threaded() {
use std::sync::Mutex;
use std::thread;
use std::time::Duration;
let tree = Arc::new(Mutex::new(RegionTree::new()));
let root = {
let mut t = tree.lock().unwrap_or_else(|e| e.into_inner());
t.create_root(RegionKind::DbRoot, Cx::new()).expect("root")
};
let wc = {
let mut t = tree.lock().unwrap_or_else(|e| e.into_inner());
t.create_child(root, RegionKind::WriteCoordinator, Cx::new())
.expect("wc")
};
let task1 = tree
.lock()
.unwrap_or_else(|e| e.into_inner())
.register_task(wc)
.expect("t1");
let task2 = tree
.lock()
.unwrap_or_else(|e| e.into_inner())
.register_task(wc)
.expect("t2");
let completed = Arc::new(AtomicBool::new(false));
let flag = Arc::clone(&completed);
let t1 = thread::spawn(move || {
thread::sleep(Duration::from_millis(20));
drop(task1);
});
let t2 = thread::spawn(move || {
thread::sleep(Duration::from_millis(30));
drop(task2);
});
{
let mut t = tree.lock().unwrap_or_else(|e| e.into_inner());
t.close_and_drain(root).expect("close_and_drain");
}
flag.store(true, Ordering::Release);
t1.join().expect("t1 join");
t2.join().expect("t2 join");
assert!(
completed.load(Ordering::Acquire),
"bead_id={BEAD_ID} case=threaded_close_completed"
);
assert_eq!(
tree.lock().unwrap_or_else(|e| e.into_inner()).state(root),
Some(RegionState::Closed),
"bead_id={BEAD_ID} case=threaded_root_closed"
);
}
}