use crate::{
InternalError, InternalErrorOrigin,
dto::cascade::StateSnapshotInput,
ops::{
cascade::CascadeOps,
ic::IcOps,
runtime::env::EnvOps,
storage::{
children::CanisterChildrenOps,
directory::{app::AppDirectoryOps, subnet::SubnetDirectoryOps},
registry::subnet::SubnetRegistryOps,
state::{app::AppStateOps, subnet::SubnetStateOps},
},
},
workflow::{
cascade::{
snapshot::{
StateSnapshot, adapter::StateSnapshotAdapter, state_snapshot_debug,
state_snapshot_is_empty,
},
warn_if_large,
},
prelude::*,
},
};
pub struct StateCascadeWorkflow;
impl StateCascadeWorkflow {
pub async fn root_cascade_state(snapshot: &StateSnapshot) -> Result<(), InternalError> {
EnvOps::require_root()?;
if state_snapshot_is_empty(snapshot) {
log!(
Topic::Sync,
Info,
"sync.state: root cascade skipped (empty snapshot)"
);
return Ok(());
}
log!(
Topic::Sync,
Info,
"sync.state: root cascade start snapshot={}",
state_snapshot_debug(snapshot)
);
let root_pid = IcOps::canister_self();
let children = SubnetRegistryOps::children(root_pid);
warn_if_large("root state cascade", children.len());
let mut failures = 0;
for (pid, _) in children {
if let Err(err) = Self::send_snapshot(pid, snapshot).await {
failures += 1;
log!(
Topic::Sync,
Warn,
"sync.state: failed to cascade to {pid}: {err}",
);
}
}
if failures > 0 {
log!(
Topic::Sync,
Warn,
"sync.state: {failures} child cascade(s) failed; continuing"
);
}
Ok(())
}
pub async fn nonroot_cascade_state(view: StateSnapshotInput) -> Result<(), InternalError> {
EnvOps::deny_root()?;
let snapshot = StateSnapshotAdapter::from_input(view);
if state_snapshot_is_empty(&snapshot) {
log!(
Topic::Sync,
Info,
"sync.state: non-root cascade skipped (empty snapshot)"
);
return Ok(());
}
log!(
Topic::Sync,
Info,
"sync.state: non-root cascade start snapshot={}",
state_snapshot_debug(&snapshot)
);
Self::apply_state(&snapshot)?;
let child_pids = CanisterChildrenOps::pids();
warn_if_large("non-root state cascade", child_pids.len());
let mut failures = 0;
for pid in child_pids {
if let Err(err) = Self::send_snapshot(pid, &snapshot).await {
failures += 1;
log!(
Topic::Sync,
Warn,
"sync.state: failed to cascade to {pid}: {err}",
);
}
}
if failures > 0 {
log!(
Topic::Sync,
Warn,
"sync.state: {failures} child cascade(s) failed; continuing"
);
}
Ok(())
}
fn apply_state(snapshot: &StateSnapshot) -> Result<(), InternalError> {
EnvOps::deny_root()?;
if let Some(app) = snapshot.app_state {
AppStateOps::import_input(app);
}
if let Some(subnet) = snapshot.subnet_state {
SubnetStateOps::import_input(subnet);
}
if let Some(dir) = &snapshot.app_directory {
AppDirectoryOps::import_args_allow_incomplete(dir.clone())?;
}
if let Some(dir) = &snapshot.subnet_directory {
SubnetDirectoryOps::import_args_allow_incomplete(dir.clone())?;
}
Ok(())
}
async fn send_snapshot(pid: Principal, snapshot: &StateSnapshot) -> Result<(), InternalError> {
let view = StateSnapshotAdapter::to_input(snapshot);
CascadeOps::send_state_snapshot(pid, &view)
.await
.map_err(|err| {
InternalError::workflow(
InternalErrorOrigin::Workflow,
format!("state cascade rejected by child {pid}: {err}"),
)
})
}
}