use crate::cx::Cx;
use crate::cx::registry::RegistryHandle;
use crate::record::region::RegionState;
use crate::runtime::region_table::RegionCreateError;
use crate::runtime::state::RuntimeState;
use crate::supervision::{
ChildSpec, CompiledSupervisor, RestartPolicy, StartTieBreak, SupervisorBuilder,
SupervisorCompileError, SupervisorHandle, SupervisorSpawnError,
};
use crate::types::{Budget, CancelKind, CancelReason, RegionId, TaskId};
use std::task::{Context, Poll, Waker};
pub struct CompiledApp {
name: String,
budget: Option<Budget>,
compiled_supervisor: CompiledSupervisor,
registry: Option<RegistryHandle>,
}
impl std::fmt::Debug for CompiledApp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CompiledApp")
.field("name", &self.name)
.field("budget", &self.budget)
.finish_non_exhaustive()
}
}
impl CompiledApp {
fn collect_region_tree(state: &RuntimeState, root_region: RegionId) -> Vec<(RegionId, usize)> {
let mut regions = Vec::new();
let mut pending = vec![(root_region, 0_usize)];
while let Some((region_id, depth)) = pending.pop() {
let Some(record) = state.region(region_id) else {
continue;
};
let children = record.child_ids();
regions.push((region_id, depth));
for child in children {
pending.push((child, depth + 1));
}
}
regions
}
fn force_complete_tree_tasks(state: &mut RuntimeState, root_region: RegionId) -> usize {
let startup_tasks: Vec<_> = Self::collect_region_tree(state, root_region)
.into_iter()
.flat_map(|(region_id, _)| {
state
.region(region_id)
.map(crate::record::RegionRecord::task_ids)
.unwrap_or_default()
})
.collect();
let mut completed = 0;
for task_id in startup_tasks {
if let Some(task) = state.task_mut(task_id) {
let reason = task
.cancel_reason()
.cloned()
.unwrap_or_else(CancelReason::shutdown);
let _ = task.complete(crate::types::Outcome::Cancelled(reason));
}
let _ = state.task_completed(task_id);
completed += 1;
}
completed
}
fn drive_bootstrap_task_once(state: &mut RuntimeState, task_id: TaskId) -> bool {
let task_cx = state.task(task_id).and_then(|record| record.cx.clone());
let Some(task_cx) = task_cx else {
return false;
};
let Some(mut stored) = state.remove_stored_future(task_id) else {
return false;
};
let waker = Waker::noop();
let mut poll_cx = Context::from_waker(waker);
let _guard = Cx::set_current(Some(task_cx));
match stored.poll(&mut poll_cx) {
Poll::Ready(outcome) => {
let task_outcome = outcome
.map_err(|()| crate::error::Error::new(crate::error::ErrorKind::Internal));
if let Some(task) = state.task_mut(task_id) {
let _ = task.complete(task_outcome);
}
let _ = state.task_completed(task_id);
true
}
Poll::Pending => {
state.store_spawned_task(task_id, stored);
false
}
}
}
fn cleanup_failed_start(state: &mut RuntimeState, root_region: RegionId) {
let _ = state.cancel_request(root_region, &CancelReason::shutdown(), None);
Self::force_complete_tree_tasks(state, root_region);
let mut previous_region_count = usize::MAX;
while state.region(root_region).is_some() {
let current_region_count = state.regions_len();
let mut made_progress = current_region_count != previous_region_count;
previous_region_count = current_region_count;
let mut regions = Self::collect_region_tree(state, root_region);
regions.sort_by_key(|(_, depth)| std::cmp::Reverse(*depth));
for (region_id, _) in regions {
if let Some(region) = state.region(region_id) {
region.begin_close(None);
}
state.advance_region_state(region_id);
}
let scheduled_finalizers = state.drain_ready_async_finalizers();
if !scheduled_finalizers.is_empty() {
made_progress = true;
}
for (task_id, _) in scheduled_finalizers {
made_progress |= Self::drive_bootstrap_task_once(state, task_id);
}
if Self::force_complete_tree_tasks(state, root_region) > 0 {
made_progress = true;
}
if state.regions_len() != current_region_count {
made_progress = true;
}
if !made_progress {
break;
}
}
}
fn build_app_root_cx(
state: &RuntimeState,
parent_cx: &Cx,
root_region: RegionId,
budget: Budget,
registry_override: Option<RegistryHandle>,
) -> Cx {
let task_id = TaskId::new_ephemeral();
let timer_driver = parent_cx.timer_driver();
let logical_clock = state
.logical_clock_mode()
.build_handle(timer_driver.clone());
let mut root_cx = Cx::new_with_drivers(
root_region,
task_id,
budget,
Some(parent_cx.child_observability(root_region, task_id)),
parent_cx.io_driver_handle(),
parent_cx.io_cap_handle(),
timer_driver,
Some(parent_cx.child_entropy(task_id)),
)
.with_logical_clock(logical_clock)
.with_registry_handle(registry_override.or_else(|| parent_cx.registry_handle()))
.with_remote_cap_handle(parent_cx.remote_cap_handle())
.with_blocking_pool_handle(parent_cx.blocking_pool_handle())
.with_evidence_sink(parent_cx.evidence_sink_handle())
.with_macaroon_handle(parent_cx.macaroon_handle());
if let Some(pressure) = parent_cx.pressure_handle() {
root_cx = root_cx.with_pressure(pressure);
}
root_cx.set_trace_buffer(
parent_cx
.trace_buffer()
.unwrap_or_else(|| state.trace_handle()),
);
root_cx
}
#[must_use]
#[inline]
pub fn name(&self) -> &str {
&self.name
}
#[must_use]
#[inline]
pub fn compiled_supervisor(&self) -> &CompiledSupervisor {
&self.compiled_supervisor
}
pub fn start(
self,
state: &mut RuntimeState,
cx: &Cx,
parent_region: RegionId,
) -> Result<AppHandle, AppSpawnError> {
let parent_budget = self.budget.unwrap_or(Budget::INFINITE);
let root_region = state
.create_child_region(parent_region, parent_budget)
.map_err(AppSpawnError::RegionCreate)?;
let effective_budget = state
.region(root_region)
.map_or(parent_budget, crate::record::RegionRecord::budget);
let registry_for_handle = self.registry.clone();
let app_cx = Self::build_app_root_cx(
state,
cx,
root_region,
effective_budget,
registry_for_handle.clone(),
);
let supervisor =
match self
.compiled_supervisor
.spawn(state, &app_cx, root_region, effective_budget)
{
Ok(s) => s,
Err(e) => {
Self::cleanup_failed_start(state, root_region);
return Err(AppSpawnError::SpawnFailed(e));
}
};
app_cx.trace("app_started");
Ok(AppHandle {
name: self.name,
root_region,
runtime_instance_id: state.instance_id(),
supervisor,
registry: registry_for_handle,
resolved: false,
})
}
}
pub struct AppSpec {
name: String,
budget: Option<Budget>,
supervisor: SupervisorBuilder,
registry: Option<RegistryHandle>,
}
impl std::fmt::Debug for AppSpec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AppSpec")
.field("name", &self.name)
.field("budget", &self.budget)
.finish_non_exhaustive()
}
}
impl AppSpec {
pub fn new(name: impl Into<String>) -> Self {
let name = name.into();
Self {
supervisor: SupervisorBuilder::new(name.clone()),
name,
budget: None,
registry: None,
}
}
#[must_use]
pub fn with_budget(mut self, budget: Budget) -> Self {
self.budget = Some(budget);
self.supervisor = self.supervisor.with_budget(budget);
self
}
#[must_use]
pub fn with_registry(mut self, registry: RegistryHandle) -> Self {
self.registry = Some(registry);
self
}
#[must_use]
pub fn with_restart_policy(mut self, policy: RestartPolicy) -> Self {
self.supervisor = self.supervisor.with_restart_policy(policy);
self
}
#[must_use]
pub fn with_tie_break(mut self, tie_break: StartTieBreak) -> Self {
self.supervisor = self.supervisor.with_tie_break(tie_break);
self
}
#[must_use]
pub fn child(mut self, child: ChildSpec) -> Self {
self.supervisor = self.supervisor.child(child);
self
}
pub fn compile(self) -> Result<CompiledApp, AppCompileError> {
let compiled_supervisor = self
.supervisor
.compile()
.map_err(AppCompileError::SupervisorCompile)?;
Ok(CompiledApp {
name: self.name,
budget: self.budget,
compiled_supervisor,
registry: self.registry,
})
}
pub fn start(
self,
state: &mut RuntimeState,
cx: &Cx,
parent_region: RegionId,
) -> Result<AppHandle, AppStartError> {
let compiled = self.compile().map_err(AppStartError::CompileFailed)?;
compiled
.start(state, cx, parent_region)
.map_err(AppStartError::SpawnFailed)
}
}
pub struct AppHandle {
name: String,
root_region: RegionId,
runtime_instance_id: u64,
supervisor: SupervisorHandle,
registry: Option<RegistryHandle>,
resolved: bool,
}
impl std::fmt::Debug for AppHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AppHandle")
.field("name", &self.name)
.field("root_region", &self.root_region)
.field("runtime_instance_id", &self.runtime_instance_id)
.field("resolved", &self.resolved)
.finish_non_exhaustive()
}
}
impl Drop for AppHandle {
fn drop(&mut self) {
if !self.resolved {
if std::thread::panicking() {
return;
}
panic!(
"APP HANDLE LEAKED: app `{}` (region {:?}) was dropped without stop() or join(). \
Call stop(), join(), or into_raw() to resolve.",
self.name, self.root_region
);
}
}
}
impl AppHandle {
fn runtime_matches(&self, state: &RuntimeState) -> bool {
state.instance_id() == self.runtime_instance_id
}
#[must_use]
#[inline]
pub fn name(&self) -> &str {
&self.name
}
#[must_use]
#[inline]
pub fn root_region(&self) -> RegionId {
self.root_region
}
#[must_use]
#[inline]
pub fn supervisor(&self) -> &SupervisorHandle {
&self.supervisor
}
#[must_use]
pub fn registry(&self) -> Option<&RegistryHandle> {
self.registry.as_ref()
}
pub fn stop(&mut self, state: &mut RuntimeState) -> Result<StoppedApp, AppStopError> {
let reason = CancelReason::new(CancelKind::Shutdown);
if !self.runtime_matches(state) {
return Err(AppStopError::WrongRuntime {
region: self.root_region,
});
}
let Some(region_record) = state.region(self.root_region) else {
if state.region_was_closed(self.root_region) {
self.resolved = true;
return Ok(StoppedApp {
name: self.name.clone(),
root_region: self.root_region,
});
}
self.resolved = true;
return Err(AppStopError::RegionNotFound(self.root_region));
};
let current_state = region_record.state();
if current_state == RegionState::Closed {
self.resolved = true;
return Ok(StoppedApp {
name: self.name.clone(),
root_region: self.root_region,
});
}
let _ = state.cancel_request(self.root_region, &reason, None);
self.resolved = true;
Ok(StoppedApp {
name: self.name.clone(),
root_region: self.root_region,
})
}
#[must_use]
pub fn is_stopped(&self, state: &RuntimeState) -> bool {
if !self.runtime_matches(state) {
return false;
}
state.region(self.root_region).map_or_else(
|| state.region_was_closed(self.root_region),
|r| r.state() == RegionState::Closed,
)
}
pub fn is_quiescent(&self, state: &RuntimeState) -> bool {
if !self.runtime_matches(state) {
return false;
}
state.region(self.root_region).map_or_else(
|| state.region_was_closed(self.root_region),
crate::record::RegionRecord::is_quiescent,
)
}
pub fn join(&mut self, state: &RuntimeState) -> Result<StoppedApp, AppStopError> {
if !self.runtime_matches(state) {
return Err(AppStopError::WrongRuntime {
region: self.root_region,
});
}
let Some(region_record) = state.region(self.root_region) else {
if state.region_was_closed(self.root_region) {
self.resolved = true;
return Ok(StoppedApp {
name: self.name.clone(),
root_region: self.root_region,
});
}
self.resolved = true;
return Err(AppStopError::RegionNotFound(self.root_region));
};
let region_state = region_record.state();
if region_state == RegionState::Closed {
self.resolved = true;
return Ok(StoppedApp {
name: self.name.clone(),
root_region: self.root_region,
});
}
Err(AppStopError::RegionNotStopped {
region: self.root_region,
state: region_state,
})
}
#[must_use]
pub fn into_raw(mut self) -> RawAppHandle {
self.resolved = true;
RawAppHandle {
name: std::mem::take(&mut self.name),
root_region: self.root_region,
}
}
}
#[derive(Debug)]
pub struct StoppedApp {
pub name: String,
pub root_region: RegionId,
}
#[derive(Debug)]
pub struct RawAppHandle {
pub name: String,
pub root_region: RegionId,
}
#[derive(Debug)]
pub enum AppCompileError {
SupervisorCompile(SupervisorCompileError),
}
impl std::fmt::Display for AppCompileError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::SupervisorCompile(e) => write!(f, "app compile failed: {e}"),
}
}
}
impl std::error::Error for AppCompileError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::SupervisorCompile(e) => Some(e),
}
}
}
#[derive(Debug)]
pub enum AppSpawnError {
RegionCreate(RegionCreateError),
SpawnFailed(SupervisorSpawnError),
}
impl std::fmt::Display for AppSpawnError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::RegionCreate(e) => write!(f, "app root region create failed: {e}"),
Self::SpawnFailed(e) => write!(f, "app spawn failed: {e}"),
}
}
}
impl std::error::Error for AppSpawnError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::RegionCreate(e) => Some(e),
Self::SpawnFailed(e) => Some(e),
}
}
}
#[derive(Debug)]
pub enum AppStartError {
CompileFailed(AppCompileError),
SpawnFailed(AppSpawnError),
}
impl std::fmt::Display for AppStartError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::CompileFailed(e) => write!(f, "{e}"),
Self::SpawnFailed(e) => write!(f, "{e}"),
}
}
}
impl std::error::Error for AppStartError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::CompileFailed(e) => Some(e),
Self::SpawnFailed(e) => Some(e),
}
}
}
#[derive(Debug)]
pub enum AppStopError {
WrongRuntime {
region: RegionId,
},
RegionNotFound(RegionId),
RegionNotStopped {
region: RegionId,
state: RegionState,
},
}
impl std::fmt::Display for AppStopError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::WrongRuntime { region } => {
write!(
f,
"app root region {region:?} belongs to a different runtime state"
)
}
Self::RegionNotFound(id) => write!(f, "app root region {id:?} not found"),
Self::RegionNotStopped { region, state } => {
write!(
f,
"app root region {region:?} is not stopped yet (state: {state:?})"
)
}
}
}
}
impl std::error::Error for AppStopError {}
#[cfg(test)]
mod tests {
use super::*;
use crate::remote::{NodeId, RemoteCap};
use crate::runtime::SpawnError;
use crate::runtime::state::RuntimeState;
use crate::supervision::{ChildSpec, NameRegistrationPolicy, SupervisionStrategy};
use crate::types::Budget;
use std::sync::Arc;
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
fn make_child(name: &str) -> ChildSpec {
ChildSpec {
name: name.into(),
start: Box::new(
|scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
state: &mut RuntimeState,
_cx: &Cx| {
let region = scope.region_id();
let budget = scope.budget();
state
.create_task(region, budget, async { 42_u8 })
.map(|(_, stored)| stored.task_id())
},
),
restart: SupervisionStrategy::Stop,
shutdown_budget: Budget::INFINITE,
depends_on: Vec::new(),
registration: NameRegistrationPolicy::None,
start_immediately: true,
required: true,
}
}
fn close_app_region_and_remove_records(state: &mut RuntimeState, app_region: RegionId) {
let _ = state.cancel_request(app_region, &CancelReason::shutdown(), None);
let mut previous_region_count = usize::MAX;
while state.region(app_region).is_some() && state.regions_len() != previous_region_count {
previous_region_count = state.regions_len();
let region_ids: Vec<_> = state.regions_iter().map(|(_, region)| region.id).collect();
for region_id in region_ids {
state.advance_region_state(region_id);
}
}
}
#[test]
fn app_spec_new_creates_named_spec() {
init_test("app_spec_new_creates_named_spec");
let spec = AppSpec::new("test_app");
assert_eq!(spec.name, "test_app");
assert!(spec.budget.is_none());
crate::test_complete!("app_spec_new_creates_named_spec");
}
#[test]
fn app_spec_with_budget_sets_budget() {
init_test("app_spec_with_budget_sets_budget");
let budget = Budget::new().with_poll_quota(100);
let spec = AppSpec::new("budgeted").with_budget(budget);
assert_eq!(spec.budget, Some(budget));
crate::test_complete!("app_spec_with_budget_sets_budget");
}
#[test]
fn app_start_creates_region_and_spawns() {
init_test("app_start_creates_region_and_spawns");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let spec = AppSpec::new("my_app").child(make_child("worker"));
let handle = spec.start(&mut state, &cx, root).expect("start ok");
assert_eq!(handle.name(), "my_app");
assert_ne!(handle.root_region(), root); assert_eq!(handle.supervisor().started.len(), 1);
assert_eq!(handle.supervisor().started[0].name, "worker");
let _raw = handle.into_raw();
crate::test_complete!("app_start_creates_region_and_spawns");
}
#[test]
fn app_start_with_multiple_children() {
init_test("app_start_with_multiple_children");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let spec = AppSpec::new("multi")
.child(make_child("alpha"))
.child(make_child("bravo"))
.child(make_child("charlie"));
let handle = spec.start(&mut state, &cx, root).expect("start ok");
assert_eq!(handle.supervisor().started.len(), 3);
let _raw = handle.into_raw();
crate::test_complete!("app_start_with_multiple_children");
}
#[test]
fn app_stop_initiates_cancel() {
init_test("app_stop_initiates_cancel");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let spec = AppSpec::new("stoppable").child(make_child("w"));
let mut handle = spec.start(&mut state, &cx, root).expect("start ok");
let app_region = handle.root_region();
let stopped = handle.stop(&mut state).expect("stop ok");
assert_eq!(stopped.name, "stoppable");
assert_eq!(stopped.root_region, app_region);
let region = state.region(app_region).expect("region exists");
assert!(
region.state() == RegionState::Closing || region.state() == RegionState::Closed,
"region should be closing or closed, got {:?}",
region.state()
);
crate::test_complete!("app_stop_initiates_cancel");
}
#[test]
fn app_join_on_closed_region_succeeds() {
init_test("app_join_on_closed_region_succeeds");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let spec = AppSpec::new("empty_app");
let mut handle = spec.start(&mut state, &cx, root).expect("start ok");
let app_region = handle.root_region();
if let Some(r) = state.region(app_region) {
for task in r.task_ids() {
r.remove_task(task);
}
for child in r.child_ids() {
r.remove_child(child);
}
r.begin_close(None);
r.begin_drain();
r.begin_finalize();
assert!(r.complete_close(), "should be able to close empty region");
}
assert!(
state
.region(app_region)
.is_some_and(|r| r.state() == RegionState::Closed)
);
let stopped = handle.join(&state).expect("join ok");
assert_eq!(stopped.name, "empty_app");
crate::test_complete!("app_join_on_closed_region_succeeds");
}
#[test]
fn app_join_on_open_region_preserves_handle() {
init_test("app_join_on_open_region_preserves_handle");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let spec = AppSpec::new("still_running").child(make_child("worker"));
let mut handle = spec.start(&mut state, &cx, root).expect("start ok");
let app_region = handle.root_region();
let result = handle.join(&state);
assert!(
matches!(
result,
Err(AppStopError::RegionNotStopped { region, state })
if region == app_region && state == RegionState::Open
),
"expected RegionNotStopped(Open) for the running app region"
);
let stopped = handle
.stop(&mut state)
.expect("handle should remain usable after join miss");
assert_eq!(stopped.root_region, app_region);
crate::test_complete!("app_join_on_open_region_preserves_handle");
}
#[test]
fn app_into_raw_disarms_drop_bomb() {
init_test("app_into_raw_disarms_drop_bomb");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let spec = AppSpec::new("raw_app");
let handle = spec.start(&mut state, &cx, root).expect("start ok");
let raw = handle.into_raw();
assert_eq!(raw.name, "raw_app");
drop(raw);
crate::test_complete!("app_into_raw_disarms_drop_bomb");
}
#[test]
#[should_panic(expected = "APP HANDLE LEAKED")]
fn app_handle_drop_without_resolve_panics() {
init_test("app_handle_drop_without_resolve_panics");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let spec = AppSpec::new("leaky");
let handle = spec.start(&mut state, &cx, root).expect("start ok");
drop(handle); }
#[test]
fn app_start_compile_error_on_duplicate_children() {
init_test("app_start_compile_error_on_duplicate_children");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let spec = AppSpec::new("dup")
.child(make_child("same"))
.child(make_child("same"));
let err = spec.start(&mut state, &cx, root).unwrap_err();
assert!(
matches!(
err,
AppStartError::CompileFailed(AppCompileError::SupervisorCompile(
SupervisorCompileError::DuplicateChildName(_)
))
),
"expected DuplicateChildName, got {err:?}"
);
crate::test_complete!("app_start_compile_error_on_duplicate_children");
}
#[test]
fn app_start_spawn_failure_cleans_up_allocated_region() {
init_test("app_start_spawn_failure_cleans_up_allocated_region");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let failing_child = ChildSpec {
name: "broken".into(),
start: Box::new(
|scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
_state: &mut RuntimeState,
_cx: &Cx| Err(SpawnError::RegionClosed(scope.region_id())),
),
restart: SupervisionStrategy::Stop,
shutdown_budget: Budget::INFINITE,
depends_on: Vec::new(),
registration: NameRegistrationPolicy::None,
start_immediately: true,
required: true,
};
let spec = AppSpec::new("broken_app").child(failing_child);
let result = spec.start(&mut state, &cx, root);
assert!(matches!(result, Err(AppStartError::SpawnFailed(_))));
assert_eq!(
state.regions_len(),
1,
"failed app start should not leak an extra region"
);
assert_eq!(
state
.region(root)
.map(crate::record::RegionRecord::child_count),
Some(0),
"parent root should not retain a leaked child region"
);
crate::test_complete!("app_start_spawn_failure_cleans_up_allocated_region");
}
#[test]
fn app_start_spawn_failure_cleans_up_started_tasks() {
init_test("app_start_spawn_failure_cleans_up_started_tasks");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let failing_child = ChildSpec {
name: "broken".into(),
start: Box::new(
|scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
_state: &mut RuntimeState,
_cx: &Cx| Err(SpawnError::RegionClosed(scope.region_id())),
),
restart: SupervisionStrategy::Stop,
shutdown_budget: Budget::INFINITE,
depends_on: vec!["started".into()],
registration: NameRegistrationPolicy::None,
start_immediately: true,
required: true,
};
let spec = AppSpec::new("partially_started_app")
.child(make_child("started"))
.child(failing_child);
let result = spec.start(&mut state, &cx, root);
assert!(matches!(result, Err(AppStartError::SpawnFailed(_))));
assert_eq!(
state.live_task_count(),
0,
"failed app start should not leave unscheduled tasks behind"
);
assert_eq!(
state.regions_len(),
1,
"failed app start should remove the temporary app region tree"
);
assert_eq!(
state
.region(root)
.map(crate::record::RegionRecord::child_count),
Some(0),
"parent root should not retain leaked app descendants"
);
crate::test_complete!("app_start_spawn_failure_cleans_up_started_tasks");
}
#[test]
fn app_start_spawn_failure_drains_async_finalizers() {
init_test("app_start_spawn_failure_drains_async_finalizers");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let finalizer_ran = Arc::new(std::sync::atomic::AtomicBool::new(false));
let finalizer_ran_clone = Arc::clone(&finalizer_ran);
let failing_child = ChildSpec {
name: "broken".into(),
start: Box::new(
move |scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
state: &mut RuntimeState,
_cx: &Cx| {
let registered = state.register_async_finalizer(scope.region_id(), {
let finalizer_ran = Arc::clone(&finalizer_ran_clone);
async move {
finalizer_ran.store(true, std::sync::atomic::Ordering::SeqCst);
}
});
assert!(registered, "startup region should accept async finalizer");
Err(SpawnError::RegionClosed(scope.region_id()))
},
),
restart: SupervisionStrategy::Stop,
shutdown_budget: Budget::INFINITE,
depends_on: Vec::new(),
registration: NameRegistrationPolicy::None,
start_immediately: true,
required: true,
};
let spec = AppSpec::new("broken_finalizer_app").child(failing_child);
let result = spec.start(&mut state, &cx, root);
assert!(matches!(result, Err(AppStartError::SpawnFailed(_))));
assert!(
finalizer_ran.load(std::sync::atomic::Ordering::SeqCst),
"failed app start should still drain registered async finalizers"
);
assert_eq!(
state.live_task_count(),
0,
"failed app start should not leave async finalizer tasks behind"
);
assert_eq!(
state.regions_len(),
1,
"failed app start should remove the temporary app region tree"
);
assert_eq!(
state
.region(root)
.map(crate::record::RegionRecord::child_count),
Some(0),
"parent root should not retain leaked app descendants"
);
crate::test_complete!("app_start_spawn_failure_drains_async_finalizers");
}
#[test]
fn app_is_quiescent_after_close() {
init_test("app_is_quiescent_after_close");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let spec = AppSpec::new("quiescent_test");
let handle = spec.start(&mut state, &cx, root).expect("start ok");
let app_region = handle.root_region();
let raw = handle.into_raw();
if let Some(r) = state.region(app_region) {
for task in r.task_ids() {
r.remove_task(task);
}
for child in r.child_ids() {
r.remove_child(child);
}
r.begin_close(None);
r.begin_drain();
r.begin_finalize();
assert!(r.complete_close(), "should close empty region");
}
let region = state.region(app_region).expect("region exists");
assert_eq!(region.state(), RegionState::Closed);
drop(raw);
crate::test_complete!("app_is_quiescent_after_close");
}
#[test]
fn app_with_budget_propagates_to_region() {
init_test("app_with_budget_propagates_to_region");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let budget = Budget::new().with_poll_quota(42);
let spec = AppSpec::new("budgeted_app").with_budget(budget);
let handle = spec.start(&mut state, &cx, root).expect("start ok");
let region = state.region(handle.root_region()).expect("region exists");
assert_eq!(region.budget().poll_quota, 42);
let _raw = handle.into_raw();
crate::test_complete!("app_with_budget_propagates_to_region");
}
#[test]
fn app_compile_produces_compiled_app() {
init_test("app_compile_produces_compiled_app");
let compiled = AppSpec::new("compiled_test")
.child(make_child("a"))
.child(make_child("b"))
.compile()
.expect("compile ok");
assert_eq!(compiled.name(), "compiled_test");
crate::test_complete!("app_compile_produces_compiled_app");
}
#[test]
fn app_compile_detects_duplicate_names() {
init_test("app_compile_detects_duplicate_names");
let err = AppSpec::new("dup_compile")
.child(make_child("same"))
.child(make_child("same"))
.compile()
.unwrap_err();
assert!(
matches!(
err,
AppCompileError::SupervisorCompile(SupervisorCompileError::DuplicateChildName(_))
),
"expected DuplicateChildName, got {err:?}"
);
crate::test_complete!("app_compile_detects_duplicate_names");
}
#[test]
fn app_compiled_start_creates_region_and_spawns() {
init_test("app_compiled_start_creates_region_and_spawns");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let compiled = AppSpec::new("two_phase")
.child(make_child("w1"))
.child(make_child("w2"))
.compile()
.expect("compile ok");
let handle = compiled.start(&mut state, &cx, root).expect("start ok");
assert_eq!(handle.name(), "two_phase");
assert_eq!(handle.supervisor().started.len(), 2);
let _raw = handle.into_raw();
crate::test_complete!("app_compiled_start_creates_region_and_spawns");
}
#[test]
fn app_compile_is_deterministic() {
init_test("app_compile_is_deterministic");
let build = || {
AppSpec::new("det")
.child(make_child("c"))
.child(make_child("a"))
.child(make_child("b"))
};
let c1 = build().compile().unwrap();
let c2 = build().compile().unwrap();
assert_eq!(
c1.compiled_supervisor().start_order,
c2.compiled_supervisor().start_order,
"compile must produce identical start orders"
);
crate::test_complete!("app_compile_is_deterministic");
}
#[test]
fn app_compile_with_dependencies_is_deterministic() {
init_test("app_compile_with_dependencies_is_deterministic");
let build = || {
let mut b = make_child("b");
b.depends_on = vec!["a".into()];
let mut c = make_child("c");
c.depends_on = vec!["b".into()];
AppSpec::new("dep_det")
.child(c)
.child(make_child("a"))
.child(b)
};
let c1 = build().compile().unwrap();
let c2 = build().compile().unwrap();
assert_eq!(
c1.compiled_supervisor().start_order,
c2.compiled_supervisor().start_order
);
crate::test_complete!("app_compile_with_dependencies_is_deterministic");
}
#[test]
fn app_compile_budget_propagates() {
init_test("app_compile_budget_propagates");
let budget = Budget::new().with_poll_quota(99);
let compiled = AppSpec::new("budgeted_compile")
.with_budget(budget)
.compile()
.unwrap();
assert_eq!(compiled.budget, Some(budget));
crate::test_complete!("app_compile_budget_propagates");
}
#[test]
fn conformance_start_stop_lifecycle() {
init_test("conformance_start_stop_lifecycle");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let spec = AppSpec::new("lifecycle").child(make_child("w1"));
let mut handle = spec.start(&mut state, &cx, root).expect("start ok");
let app_region = handle.root_region();
assert_eq!(state.region(app_region).unwrap().state(), RegionState::Open);
let _stopped = handle.stop(&mut state).expect("stop ok");
let region_state = state.region(app_region).unwrap().state();
assert_ne!(
region_state,
RegionState::Open,
"region should no longer be open after stop"
);
crate::test_complete!("conformance_start_stop_lifecycle");
}
#[test]
fn conformance_no_ambient_authority() {
init_test("conformance_no_ambient_authority");
let spec = AppSpec::new("isolated");
assert_eq!(spec.name, "isolated");
assert!(spec.budget.is_none());
crate::test_complete!("conformance_no_ambient_authority");
}
#[test]
fn conformance_root_region_is_child_of_parent() {
init_test("conformance_root_region_is_child_of_parent");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let spec = AppSpec::new("nested");
let handle = spec.start(&mut state, &cx, root).expect("start ok");
let app_region = handle.root_region();
let region_record = state.region(app_region).expect("region exists");
assert_eq!(
region_record.parent,
Some(root),
"app root region must be a child of the given parent"
);
let _raw = handle.into_raw();
crate::test_complete!("conformance_root_region_is_child_of_parent");
}
#[test]
fn conformance_stop_is_cancel_correct() {
init_test("conformance_stop_is_cancel_correct");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let spec = AppSpec::new("cancel_correct").child(make_child("w"));
let mut handle = spec.start(&mut state, &cx, root).expect("start ok");
let app_region = handle.root_region();
let _stopped = handle.stop(&mut state).expect("stop ok");
let region = state.region(app_region).expect("region exists");
assert!(
region.cancel_reason().is_some(),
"stop must set a cancel reason on the root region"
);
crate::test_complete!("conformance_stop_is_cancel_correct");
}
#[test]
fn conformance_deterministic_child_start_order() {
init_test("conformance_deterministic_child_start_order");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let alpha = ChildSpec {
name: "alpha".into(),
start: Box::new(
|scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
state: &mut RuntimeState,
_cx: &Cx| {
state
.create_task(scope.region_id(), scope.budget(), async { 1_u8 })
.map(|(_, s)| s.task_id())
},
),
restart: SupervisionStrategy::Stop,
shutdown_budget: Budget::INFINITE,
depends_on: vec![],
registration: NameRegistrationPolicy::None,
start_immediately: true,
required: true,
};
let bravo = ChildSpec {
name: "bravo".into(),
start: Box::new(
|scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
state: &mut RuntimeState,
_cx: &Cx| {
state
.create_task(scope.region_id(), scope.budget(), async { 2_u8 })
.map(|(_, s)| s.task_id())
},
),
restart: SupervisionStrategy::Stop,
shutdown_budget: Budget::INFINITE,
depends_on: vec!["alpha".into()],
registration: NameRegistrationPolicy::None,
start_immediately: true,
required: true,
};
let charlie = ChildSpec {
name: "charlie".into(),
start: Box::new(
|scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
state: &mut RuntimeState,
_cx: &Cx| {
state
.create_task(scope.region_id(), scope.budget(), async { 3_u8 })
.map(|(_, s)| s.task_id())
},
),
restart: SupervisionStrategy::Stop,
shutdown_budget: Budget::INFINITE,
depends_on: vec!["bravo".into()],
registration: NameRegistrationPolicy::None,
start_immediately: true,
required: true,
};
let spec = AppSpec::new("ordered")
.child(charlie) .child(alpha)
.child(bravo);
let handle = spec.start(&mut state, &cx, root).expect("start ok");
let names: Vec<&str> = handle
.supervisor()
.started
.iter()
.map(|c| c.name.as_str())
.collect();
assert_eq!(names, vec!["alpha", "bravo", "charlie"]);
let _raw = handle.into_raw();
crate::test_complete!("conformance_deterministic_child_start_order");
}
#[test]
fn conformance_compiled_app_starts_and_closes() {
init_test("conformance_compiled_app_starts_and_closes");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let compiled = AppSpec::new("quiesce")
.child(make_child("w1"))
.compile()
.expect("compile ok");
let mut handle = compiled.start(&mut state, &cx, root).expect("start ok");
let app_region = handle.root_region();
let _stopped = handle.stop(&mut state).expect("stop ok");
if let Some(r) = state.region(app_region) {
for task in r.task_ids() {
r.remove_task(task);
}
for child in r.child_ids() {
r.remove_child(child);
}
if r.state() == RegionState::Closing {
r.begin_drain();
}
if r.state() == RegionState::Draining {
r.begin_finalize();
}
if r.state() == RegionState::Finalizing {
assert!(r.complete_close(), "should complete close");
}
}
assert_eq!(
state.region(app_region).unwrap().state(),
RegionState::Closed,
);
crate::test_complete!("conformance_compiled_app_starts_and_closes");
}
#[test]
fn conformance_compile_errors_are_explicit() {
init_test("conformance_compile_errors_are_explicit");
let err = AppSpec::new("errs")
.child(make_child("dup"))
.child(make_child("dup"))
.compile()
.unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("compile failed"),
"error should mention compile: {msg}"
);
assert!(
std::error::Error::source(&err).is_some(),
"AppCompileError must have a source"
);
crate::test_complete!("conformance_compile_errors_are_explicit");
}
#[test]
fn conformance_compile_then_start_matches_direct() {
init_test("conformance_compile_then_start_matches_direct");
let mut s1 = RuntimeState::new();
let r1 = s1.create_root_region(Budget::INFINITE);
let cx1 = Cx::new(
r1,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let mut s2 = RuntimeState::new();
let r2 = s2.create_root_region(Budget::INFINITE);
let cx2 = Cx::new(
r2,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let h1 = AppSpec::new("direct")
.child(make_child("w"))
.start(&mut s1, &cx1, r1)
.unwrap();
let compiled = AppSpec::new("compiled")
.child(make_child("w"))
.compile()
.unwrap();
let h2 = compiled.start(&mut s2, &cx2, r2).unwrap();
assert_eq!(h1.supervisor().started.len(), h2.supervisor().started.len());
assert_ne!(h1.root_region(), r1);
assert_ne!(h2.root_region(), r2);
let _raw1 = h1.into_raw();
let _raw2 = h2.into_raw();
crate::test_complete!("conformance_compile_then_start_matches_direct");
}
#[test]
fn app_with_registry_propagates_to_children() {
init_test("app_with_registry_propagates_to_children");
let registry = crate::cx::NameRegistry::new();
let handle = RegistryHandle::new(Arc::new(registry));
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
assert!(!cx.has_registry());
let registry_seen = Arc::new(std::sync::atomic::AtomicBool::new(false));
let seen_clone = Arc::clone(®istry_seen);
let child = ChildSpec {
name: "checker".into(),
start: Box::new(
move |scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
state: &mut RuntimeState,
cx: &Cx| {
seen_clone.store(cx.has_registry(), std::sync::atomic::Ordering::SeqCst);
state
.create_task(scope.region_id(), scope.budget(), async { 0_u8 })
.map(|(_, s)| s.task_id())
},
),
restart: SupervisionStrategy::Stop,
shutdown_budget: Budget::INFINITE,
depends_on: vec![],
registration: NameRegistrationPolicy::None,
start_immediately: true,
required: true,
};
let spec = AppSpec::new("registry_app")
.with_registry(handle)
.child(child);
let app_handle = spec.start(&mut state, &cx, root).expect("start ok");
assert!(
registry_seen.load(std::sync::atomic::Ordering::SeqCst),
"child Cx must carry registry when app is started with one"
);
assert!(app_handle.registry().is_some());
let _raw = app_handle.into_raw();
crate::test_complete!("app_with_registry_propagates_to_children");
}
#[test]
fn app_bootstrap_cx_targets_app_root_and_preserves_capabilities() {
init_test("app_bootstrap_cx_targets_app_root_and_preserves_capabilities");
let registry = crate::cx::NameRegistry::new();
let handle = RegistryHandle::new(Arc::new(registry));
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let parent_task = crate::types::TaskId::new_for_test(77, 9);
let cx = Cx::new(root, parent_task, Budget::INFINITE)
.with_remote_cap(RemoteCap::new().with_local_node(NodeId::new("origin-test")));
let seen = Arc::new(parking_lot::Mutex::new(
None::<(RegionId, crate::types::TaskId, bool, Option<String>)>,
));
let seen_clone = Arc::clone(&seen);
let child = ChildSpec {
name: "checker".into(),
start: Box::new(
move |scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
state: &mut RuntimeState,
cx: &Cx| {
*seen_clone.lock() = Some((
cx.region_id(),
cx.task_id(),
cx.has_registry(),
cx.remote_cap_handle()
.map(|cap| cap.local_node().as_str().to_string()),
));
state
.create_task(scope.region_id(), scope.budget(), async { 0_u8 })
.map(|(_, stored)| stored.task_id())
},
),
restart: SupervisionStrategy::Stop,
shutdown_budget: Budget::INFINITE,
depends_on: vec![],
registration: NameRegistrationPolicy::None,
start_immediately: true,
required: true,
};
let app_handle = AppSpec::new("bootstrap_cx_app")
.with_registry(handle)
.child(child)
.start(&mut state, &cx, root)
.expect("start ok");
let (seen_region, seen_task, saw_registry, remote_origin) = seen
.lock()
.clone()
.expect("child should observe bootstrap cx");
assert_eq!(
seen_region,
app_handle.root_region(),
"startup closures must observe the app root region, not the caller's region"
);
assert_ne!(
seen_task, parent_task,
"startup closures must not inherit the caller's task identity"
);
assert!(
saw_registry,
"app registry override must be visible during startup"
);
assert_eq!(remote_origin.as_deref(), Some("origin-test"));
let _raw = app_handle.into_raw();
crate::test_complete!("app_bootstrap_cx_targets_app_root_and_preserves_capabilities");
}
#[test]
fn app_without_registry_children_see_none() {
init_test("app_without_registry_children_see_none");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let registry_seen = Arc::new(std::sync::atomic::AtomicBool::new(false));
let seen_clone = Arc::clone(®istry_seen);
let child = ChildSpec {
name: "no_reg".into(),
start: Box::new(
move |scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
state: &mut RuntimeState,
cx: &Cx| {
seen_clone.store(cx.has_registry(), std::sync::atomic::Ordering::SeqCst);
state
.create_task(scope.region_id(), scope.budget(), async { 0_u8 })
.map(|(_, s)| s.task_id())
},
),
restart: SupervisionStrategy::Stop,
shutdown_budget: Budget::INFINITE,
depends_on: vec![],
registration: NameRegistrationPolicy::None,
start_immediately: true,
required: true,
};
let spec = AppSpec::new("no_registry_app").child(child);
let app_handle = spec.start(&mut state, &cx, root).expect("start ok");
assert!(
!registry_seen.load(std::sync::atomic::Ordering::SeqCst),
"child Cx must NOT have registry when app has none"
);
assert!(app_handle.registry().is_none());
let _raw = app_handle.into_raw();
crate::test_complete!("app_without_registry_children_see_none");
}
#[test]
fn app_registry_named_service_whereis() {
init_test("app_registry_named_service_whereis");
let registry = Arc::new(parking_lot::Mutex::new(crate::cx::NameRegistry::new()));
let reg_handle =
RegistryHandle::new(Arc::clone(®istry) as Arc<dyn crate::cx::RegistryCap>);
let lease_slot: Arc<parking_lot::Mutex<Option<crate::cx::NameLease>>> =
Arc::new(parking_lot::Mutex::new(None));
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let reg_clone = Arc::clone(®istry);
let lease_clone = Arc::clone(&lease_slot);
let child = ChildSpec {
name: "named_worker".into(),
start: Box::new(
move |scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
state: &mut RuntimeState,
_cx: &Cx| {
let region = scope.region_id();
let budget = scope.budget();
let (_, stored) = state.create_task(region, budget, async { 1_u8 })?;
let task_id = stored.task_id();
let now = crate::types::Time::ZERO;
let lease = reg_clone
.lock()
.register("my_worker", task_id, region, now)
.expect("register ok");
*lease_clone.lock() = Some(lease);
Ok(task_id)
},
),
restart: SupervisionStrategy::Stop,
shutdown_budget: Budget::INFINITE,
depends_on: vec![],
registration: NameRegistrationPolicy::None,
start_immediately: true,
required: true,
};
let spec = AppSpec::new("named_app")
.with_registry(reg_handle)
.child(child);
let app_handle = spec.start(&mut state, &cx, root).expect("start ok");
let found = registry.lock().whereis("my_worker");
assert!(found.is_some(), "named worker must be visible via whereis");
lease_slot
.lock()
.as_mut()
.expect("lease should have been set")
.release()
.expect("release ok");
let _raw = app_handle.into_raw();
crate::test_complete!("app_registry_named_service_whereis");
}
#[test]
fn app_registry_compile_preserves_handle() {
init_test("app_registry_compile_preserves_handle");
let registry = crate::cx::NameRegistry::new();
let handle = RegistryHandle::new(Arc::new(registry));
let compiled = AppSpec::new("compiled_reg")
.with_registry(handle)
.child(make_child("w"))
.compile()
.expect("compile ok");
assert!(compiled.registry.is_some());
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let app_handle = compiled.start(&mut state, &cx, root).expect("start ok");
assert!(app_handle.registry().is_some());
let _raw = app_handle.into_raw();
crate::test_complete!("app_registry_compile_preserves_handle");
}
#[test]
fn app_registry_stop_does_not_panic() {
init_test("app_registry_stop_does_not_panic");
let registry = crate::cx::NameRegistry::new();
let handle = RegistryHandle::new(Arc::new(registry));
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let spec = AppSpec::new("stoppable_reg")
.with_registry(handle)
.child(make_child("w"));
let mut app_handle = spec.start(&mut state, &cx, root).expect("start ok");
let _stopped = app_handle.stop(&mut state).expect("stop ok");
crate::test_complete!("app_registry_stop_does_not_panic");
}
use crate::gen_server::{GenServer, Reply, SystemMsg};
use std::future::Future;
use std::pin::Pin;
struct ChatRoom {
history: Vec<String>,
max_history: usize,
}
enum ChatCall {
GetHistory,
#[allow(dead_code)]
Count,
}
enum ChatCast {
Post(String),
Clear,
}
impl GenServer for ChatRoom {
type Call = ChatCall;
type Reply = Vec<String>;
type Cast = ChatCast;
type Info = SystemMsg;
fn handle_call(
&mut self,
_cx: &Cx,
request: ChatCall,
reply: Reply<Vec<String>>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
match request {
ChatCall::GetHistory => {
let _ = reply.send(self.history.clone());
}
ChatCall::Count => {
let _ = reply.send(vec![self.history.len().to_string()]);
}
}
Box::pin(async {})
}
fn handle_cast(
&mut self,
_cx: &Cx,
msg: ChatCast,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
match msg {
ChatCast::Post(text) => {
self.history.push(text);
if self.history.len() > self.max_history {
self.history.remove(0);
}
}
ChatCast::Clear => {
self.history.clear();
}
}
Box::pin(async {})
}
}
impl ChatRoom {
fn new(max_history: usize) -> Self {
Self {
history: Vec::new(),
max_history,
}
}
}
#[test]
fn example_chat_room_call_and_cast() {
init_test("example_chat_room_call_and_cast");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope =
crate::cx::Scope::<crate::types::policy::FailFast>::new(region, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, ChatRoom::new(100), 32)
.unwrap();
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
handle
.try_cast(ChatCast::Post("alice: hello".into()))
.unwrap();
handle
.try_cast(ChatCast::Post("bob: hi alice".into()))
.unwrap();
handle
.try_cast(ChatCast::Post("alice: how are you?".into()))
.unwrap();
let server_ref = handle.server_ref();
let (mut client_handle, client_stored) = scope
.spawn(&mut runtime.state, &cx, move |cx| async move {
server_ref.call(&cx, ChatCall::GetHistory).await.unwrap()
})
.unwrap();
let client_id = client_handle.task_id();
runtime.state.store_spawned_task(client_id, client_stored);
runtime.scheduler.lock().schedule(task_id, 0);
runtime.scheduler.lock().schedule(client_id, 0);
runtime.run_until_quiescent();
let history =
futures_lite::future::block_on(client_handle.join(&cx)).expect("client join ok");
assert_eq!(history.len(), 3);
assert_eq!(history[0], "alice: hello");
assert_eq!(history[1], "bob: hi alice");
assert_eq!(history[2], "alice: how are you?");
crate::test_complete!("example_chat_room_call_and_cast");
}
#[test]
fn example_chat_room_bounded_history() {
init_test("example_chat_room_bounded_history");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope =
crate::cx::Scope::<crate::types::policy::FailFast>::new(region, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, ChatRoom::new(2), 32)
.unwrap();
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
handle.try_cast(ChatCast::Post("msg1".into())).unwrap();
handle.try_cast(ChatCast::Post("msg2".into())).unwrap();
handle.try_cast(ChatCast::Post("msg3".into())).unwrap();
let server_ref = handle.server_ref();
let (mut client_handle, client_stored) = scope
.spawn(&mut runtime.state, &cx, move |cx| async move {
server_ref.call(&cx, ChatCall::GetHistory).await.unwrap()
})
.unwrap();
let client_id = client_handle.task_id();
runtime.state.store_spawned_task(client_id, client_stored);
runtime.scheduler.lock().schedule(task_id, 0);
runtime.scheduler.lock().schedule(client_id, 0);
runtime.run_until_quiescent();
let history =
futures_lite::future::block_on(client_handle.join(&cx)).expect("client join ok");
assert_eq!(history, vec!["msg2", "msg3"], "oldest message evicted");
crate::test_complete!("example_chat_room_bounded_history");
}
#[test]
fn example_chat_room_named_via_registry() {
init_test("example_chat_room_named_via_registry");
let registry = Arc::new(parking_lot::Mutex::new(crate::cx::NameRegistry::new()));
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope =
crate::cx::Scope::<crate::types::policy::FailFast>::new(region, Budget::INFINITE);
let (mut named_handle, stored) = scope
.spawn_named_gen_server(
&mut runtime.state,
&cx,
&mut registry.lock(),
"lobby",
ChatRoom::new(100),
32,
crate::types::Time::ZERO,
)
.unwrap();
let task_id = named_handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
let found = registry.lock().whereis("lobby");
assert!(
found.is_some(),
"named chat room must be visible via whereis"
);
assert_eq!(found.unwrap(), task_id);
named_handle
.inner()
.try_cast(ChatCast::Post("welcome to lobby".into()))
.unwrap();
let server_ref = named_handle.server_ref();
let (mut client_handle, client_stored) = scope
.spawn(&mut runtime.state, &cx, move |cx| async move {
server_ref.call(&cx, ChatCall::GetHistory).await.unwrap()
})
.unwrap();
let client_id = client_handle.task_id();
runtime.state.store_spawned_task(client_id, client_stored);
runtime.scheduler.lock().schedule(task_id, 0);
runtime.scheduler.lock().schedule(client_id, 0);
runtime.run_until_quiescent();
let history = futures_lite::future::block_on(client_handle.join(&cx)).expect("join ok");
assert_eq!(history, vec!["welcome to lobby"]);
let mut lease = named_handle.take_lease().expect("lease present");
named_handle.inner().abort();
let release_now = runtime.state.now;
let mut registry_guard = registry.lock();
registry_guard
.unregister_owned_and_grant(&lease, release_now)
.expect("manual unregister ok");
lease.abort().expect("lease abort ok");
drop(registry_guard);
let found_after = registry.lock().whereis("lobby");
assert!(
found_after.is_none(),
"name must be gone after stop-and-release"
);
crate::test_complete!("example_chat_room_named_via_registry");
}
#[test]
fn example_chat_room_supervised_app() {
init_test("example_chat_room_supervised_app");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let chat_started = Arc::new(std::sync::atomic::AtomicBool::new(false));
let started_clone = Arc::clone(&chat_started);
let chat_child = ChildSpec {
name: "lobby".into(),
start: Box::new(
move |scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
state: &mut RuntimeState,
cx: &Cx| {
let (handle, stored) =
scope.spawn_gen_server::<ChatRoom>(state, cx, ChatRoom::new(100), 32)?;
started_clone.store(true, std::sync::atomic::Ordering::SeqCst);
let task_id = handle.task_id();
state.store_spawned_task(task_id, stored);
Ok(task_id)
},
),
restart: SupervisionStrategy::Stop,
shutdown_budget: Budget::INFINITE,
depends_on: vec![],
registration: NameRegistrationPolicy::None,
start_immediately: true,
required: true,
};
let spec = AppSpec::new("chat_app").child(chat_child);
let app_handle = spec.start(&mut state, &cx, root).expect("start ok");
assert!(
chat_started.load(std::sync::atomic::Ordering::SeqCst),
"ChatRoom GenServer child must be started by supervisor"
);
assert_eq!(app_handle.name(), "chat_app");
assert_eq!(app_handle.supervisor().started.len(), 1);
assert_eq!(app_handle.supervisor().started[0].name, "lobby");
let _raw = app_handle.into_raw();
crate::test_complete!("example_chat_room_supervised_app");
}
#[test]
fn example_chat_app_with_dependencies() {
init_test("example_chat_app_with_dependencies");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let lobby_child = ChildSpec {
name: "lobby".into(),
start: Box::new(
|scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
state: &mut RuntimeState,
_cx: &Cx| {
state
.create_task(scope.region_id(), scope.budget(), async { 1_u8 })
.map(|(_, s)| s.task_id())
},
),
restart: crate::supervision::SupervisionStrategy::Restart(
crate::supervision::RestartConfig::new(3, std::time::Duration::from_secs(60)),
),
shutdown_budget: Budget::INFINITE,
depends_on: vec![],
registration: NameRegistrationPolicy::None,
start_immediately: true,
required: true,
};
let announcements_child = ChildSpec {
name: "announcements".into(),
start: Box::new(
|scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
state: &mut RuntimeState,
_cx: &Cx| {
state
.create_task(scope.region_id(), scope.budget(), async { 2_u8 })
.map(|(_, s)| s.task_id())
},
),
restart: crate::supervision::SupervisionStrategy::Restart(
crate::supervision::RestartConfig::new(3, std::time::Duration::from_secs(60)),
),
shutdown_budget: Budget::INFINITE,
depends_on: vec!["lobby".into()], registration: NameRegistrationPolicy::None,
start_immediately: true,
required: true,
};
let spec = AppSpec::new("chat_app")
.with_restart_policy(RestartPolicy::OneForAll)
.child(announcements_child)
.child(lobby_child);
let app_handle = spec.start(&mut state, &cx, root).expect("start ok");
let names: Vec<&str> = app_handle
.supervisor()
.started
.iter()
.map(|c| c.name.as_str())
.collect();
assert_eq!(names, vec!["lobby", "announcements"]);
let _raw = app_handle.into_raw();
crate::test_complete!("example_chat_app_with_dependencies");
}
#[test]
fn example_chat_clear_resets_history() {
init_test("example_chat_clear_resets_history");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope =
crate::cx::Scope::<crate::types::policy::FailFast>::new(region, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, ChatRoom::new(100), 32)
.unwrap();
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
handle.try_cast(ChatCast::Post("old msg".into())).unwrap();
handle.try_cast(ChatCast::Clear).unwrap();
handle
.try_cast(ChatCast::Post("fresh start".into()))
.unwrap();
let server_ref = handle.server_ref();
let (mut client_handle, client_stored) = scope
.spawn(&mut runtime.state, &cx, move |cx| async move {
server_ref.call(&cx, ChatCall::GetHistory).await.unwrap()
})
.unwrap();
let client_id = client_handle.task_id();
runtime.state.store_spawned_task(client_id, client_stored);
runtime.scheduler.lock().schedule(task_id, 0);
runtime.scheduler.lock().schedule(client_id, 0);
runtime.run_until_quiescent();
let history = futures_lite::future::block_on(client_handle.join(&cx)).expect("join ok");
assert_eq!(history, vec!["fresh start"], "clear must reset history");
crate::test_complete!("example_chat_clear_resets_history");
}
#[test]
fn stop_region_not_found_does_not_panic() {
init_test("stop_region_not_found_does_not_panic");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let spec = AppSpec::new("phantom").child(make_child("w"));
let mut handle = spec.start(&mut state, &cx, root).expect("start ok");
let app_region = handle.root_region();
let _ = state.regions.remove(app_region.arena_index());
let result = handle.stop(&mut state);
assert!(
matches!(result, Err(AppStopError::RegionNotFound(region)) if region == app_region),
"expected RegionNotFound for a missing root region in the originating runtime"
);
crate::test_complete!("stop_region_not_found_does_not_panic");
}
#[test]
fn join_region_not_found_does_not_panic() {
init_test("join_region_not_found_does_not_panic");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let spec = AppSpec::new("phantom_join").child(make_child("w"));
let mut handle = spec.start(&mut state, &cx, root).expect("start ok");
let app_region = handle.root_region();
let _ = state.regions.remove(app_region.arena_index());
let result = handle.join(&state);
assert!(
matches!(result, Err(AppStopError::RegionNotFound(region)) if region == app_region),
"expected RegionNotFound for a missing root region in the originating runtime"
);
crate::test_complete!("join_region_not_found_does_not_panic");
}
#[test]
fn app_join_succeeds_after_runtime_removes_closed_region() {
init_test("app_join_succeeds_after_runtime_removes_closed_region");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let spec = AppSpec::new("removed_root");
let mut handle = spec.start(&mut state, &cx, root).expect("start ok");
let app_region = handle.root_region();
close_app_region_and_remove_records(&mut state, app_region);
assert!(
state.region(app_region).is_none(),
"normal shutdown should remove the closed app region record"
);
assert!(handle.is_stopped(&state));
assert!(handle.is_quiescent(&state));
let stopped = handle
.join(&state)
.expect("join should succeed after removal");
assert_eq!(stopped.name, "removed_root");
assert_eq!(stopped.root_region, app_region);
crate::test_complete!("app_join_succeeds_after_runtime_removes_closed_region");
}
#[test]
fn app_stop_is_idempotent_after_runtime_removes_closed_region() {
init_test("app_stop_is_idempotent_after_runtime_removes_closed_region");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::new(
root,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let spec = AppSpec::new("removed_then_stop");
let mut handle = spec.start(&mut state, &cx, root).expect("start ok");
let app_region = handle.root_region();
close_app_region_and_remove_records(&mut state, app_region);
let stopped = handle
.stop(&mut state)
.expect("stop should treat an already removed closed region as stopped");
assert_eq!(stopped.name, "removed_then_stop");
assert_eq!(stopped.root_region, app_region);
crate::test_complete!("app_stop_is_idempotent_after_runtime_removes_closed_region");
}
#[test]
fn app_join_wrong_runtime_preserves_handle_even_with_tombstone_collision() {
init_test("app_join_wrong_runtime_preserves_handle_even_with_tombstone_collision");
let mut state_a = RuntimeState::new();
let root_a = state_a.create_root_region(Budget::INFINITE);
let cx_a = Cx::new(
root_a,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let mut state_b = RuntimeState::new();
let root_b = state_b.create_root_region(Budget::INFINITE);
let cx_b = Cx::new(
root_b,
crate::types::TaskId::testing_default(),
Budget::INFINITE,
);
let mut handle_a = AppSpec::new("state_a_app")
.start(&mut state_a, &cx_a, root_a)
.expect("start ok");
let mut handle_b = AppSpec::new("state_b_app")
.start(&mut state_b, &cx_b, root_b)
.expect("start ok");
assert_eq!(
handle_a.root_region(),
handle_b.root_region(),
"fresh runtimes currently allocate the same test root/app region ids"
);
close_app_region_and_remove_records(&mut state_b, handle_b.root_region());
let _ = handle_b.join(&state_b).expect("join state_b app");
let result = handle_a.join(&state_b);
assert!(
matches!(
result,
Err(AppStopError::WrongRuntime { region }) if region == handle_a.root_region()
),
"wrong-runtime joins must fail even if a colliding tombstone exists"
);
let stopped = handle_a
.stop(&mut state_a)
.expect("wrong-runtime join must preserve the original handle");
assert_eq!(stopped.name, "state_a_app");
crate::test_complete!(
"app_join_wrong_runtime_preserves_handle_even_with_tombstone_collision"
);
}
}