use crate::{
InternalError, InternalErrorOrigin,
dto::cascade::StateSnapshotInput,
ops::{
cascade::CascadeOps,
ic::IcOps,
runtime::env::EnvOps,
storage::{
children::CanisterChildrenOps,
directory::{app::AppDirectoryOps, subnet::SubnetDirectoryOps},
registry::subnet::SubnetRegistryOps,
state::{app::AppStateOps, subnet::SubnetStateOps},
},
},
workflow::{
cascade::{
snapshot::{
StateSnapshot, adapter::StateSnapshotAdapter, state_snapshot_debug,
state_snapshot_is_empty,
},
warn_if_large,
},
prelude::*,
},
};
pub struct StateCascadeWorkflow;
impl StateCascadeWorkflow {
pub async fn root_cascade_state(snapshot: &StateSnapshot) -> Result<(), InternalError> {
EnvOps::require_root()?;
if state_snapshot_is_empty(snapshot) {
log!(
Topic::Sync,
Info,
"sync.state: root cascade skipped (empty snapshot)"
);
return Ok(());
}
log!(
Topic::Sync,
Info,
"sync.state: root cascade start snapshot={}",
state_snapshot_debug(snapshot)
);
let root_pid = IcOps::canister_self();
let children = SubnetRegistryOps::children(root_pid);
warn_if_large("root state cascade", children.len());
let mut failures = 0;
for (pid, _) in children {
if let Err(err) = Self::send_snapshot(pid, snapshot).await {
failures += 1;
log!(
Topic::Sync,
Warn,
"sync.state: failed to cascade to {pid}: {err}",
);
}
}
if failures > 0 {
log!(
Topic::Sync,
Warn,
"sync.state: {failures} child cascade(s) failed; continuing"
);
}
Ok(())
}
pub async fn root_cascade_state_for_pid(
target_pid: Principal,
snapshot: &StateSnapshot,
) -> Result<(), InternalError> {
EnvOps::require_root()?;
if state_snapshot_is_empty(snapshot) {
log!(
Topic::Sync,
Info,
"sync.state: targeted root cascade skipped (empty snapshot)"
);
return Ok(());
}
log!(
Topic::Sync,
Info,
"sync.state: targeted root cascade start target={target_pid} snapshot={}",
state_snapshot_debug(snapshot)
);
let root_pid = IcOps::canister_self();
let parent_chain = SubnetRegistryOps::parent_chain(target_pid)?;
let Some(next_child) = next_child_on_path(root_pid, &parent_chain)? else {
log!(
Topic::Sync,
Warn,
"sync.state: no branch path to {target_pid}, skipping targeted cascade"
);
return Ok(());
};
Self::send_snapshot(next_child, snapshot).await
}
pub async fn nonroot_cascade_state(view: StateSnapshotInput) -> Result<(), InternalError> {
EnvOps::deny_root()?;
let snapshot = StateSnapshotAdapter::from_input(view);
if state_snapshot_is_empty(&snapshot) {
log!(
Topic::Sync,
Info,
"sync.state: non-root cascade skipped (empty snapshot)"
);
return Ok(());
}
log!(
Topic::Sync,
Info,
"sync.state: non-root cascade start snapshot={}",
state_snapshot_debug(&snapshot)
);
Self::apply_state(&snapshot)?;
let child_pids = CanisterChildrenOps::pids();
warn_if_large("non-root state cascade", child_pids.len());
let mut failures = 0;
for pid in child_pids {
if let Err(err) = Self::send_snapshot(pid, &snapshot).await {
failures += 1;
log!(
Topic::Sync,
Warn,
"sync.state: failed to cascade to {pid}: {err}",
);
}
}
if failures > 0 {
log!(
Topic::Sync,
Warn,
"sync.state: {failures} child cascade(s) failed; continuing"
);
}
Ok(())
}
fn apply_state(snapshot: &StateSnapshot) -> Result<(), InternalError> {
EnvOps::deny_root()?;
if let Some(app) = snapshot.app_state {
AppStateOps::import_input(app);
}
if let Some(subnet) = snapshot.subnet_state {
SubnetStateOps::import_input(subnet);
}
if let Some(dir) = &snapshot.app_directory {
AppDirectoryOps::import_args_allow_incomplete(dir.clone())?;
}
if let Some(dir) = &snapshot.subnet_directory {
SubnetDirectoryOps::import_args_allow_incomplete(dir.clone())?;
}
Ok(())
}
async fn send_snapshot(pid: Principal, snapshot: &StateSnapshot) -> Result<(), InternalError> {
let view = StateSnapshotAdapter::to_input(snapshot);
CascadeOps::send_state_snapshot(pid, &view)
.await
.map_err(|err| {
InternalError::workflow(
InternalErrorOrigin::Workflow,
format!("state cascade rejected by child {pid}: {err}"),
)
})
}
}
fn next_child_on_path(
self_pid: Principal,
parents: &[(Principal, crate::storage::canister::CanisterRecord)],
) -> Result<Option<Principal>, InternalError> {
let Some((first_pid, _)) = parents.first() else {
return Err(InternalError::invariant(
InternalErrorOrigin::Workflow,
"state parent chain is empty",
));
};
if *first_pid != self_pid {
return Err(InternalError::invariant(
InternalErrorOrigin::Workflow,
format!("state parent chain does not start with self pid {self_pid}"),
));
}
Ok(parents.get(1).map(|(pid, _)| *pid))
}
#[cfg(test)]
mod tests {
use super::next_child_on_path;
use crate::{cdk::types::Principal, storage::canister::CanisterRecord};
fn p(id: u8) -> Principal {
Principal::from_slice(&[id; 29])
}
fn record(parent_pid: Option<Principal>) -> CanisterRecord {
CanisterRecord {
role: crate::ids::CanisterRole::new("state_path_test"),
parent_pid,
module_hash: None,
created_at: 0,
}
}
#[test]
fn next_child_on_path_returns_first_branch_child() {
let chain = vec![
(p(1), record(None)),
(p(2), record(Some(p(1)))),
(p(3), record(Some(p(2)))),
];
assert_eq!(next_child_on_path(p(1), &chain).unwrap(), Some(p(2)));
}
#[test]
fn next_child_on_path_returns_none_for_root_target() {
let chain = vec![(p(1), record(None))];
assert_eq!(next_child_on_path(p(1), &chain).unwrap(), None);
}
}