use crate::{
ids::{BuildNetwork, CanisterRole},
ops::storage::template::{TemplateChunkedOps, TemplateManifestOps},
workflow::runtime::template::WasmStorePublicationWorkflow,
};
use canic_core::api::runtime::install::ModuleSourceRuntimeApi;
use canic_core::{__control_plane_core as cp_core, log, log::Topic};
use cp_core::{
InternalError,
config::schema::SubnetConfig,
dto::{
pool::CanisterPoolStatus,
validation::{ValidationIssue, ValidationReport},
},
ops::{
config::ConfigOps,
ic::{IcOps, network::NetworkOps},
runtime::{env::EnvOps, ready::ReadyOps},
storage::{
directory::{app::AppDirectoryOps, subnet::SubnetDirectoryOps},
pool::PoolOps,
registry::subnet::SubnetRegistryOps,
},
},
workflow::{
canister_lifecycle::{CanisterLifecycleEvent, CanisterLifecycleWorkflow},
ic::{IcWorkflow, provision::ProvisionWorkflow},
pool::{PoolWorkflow, query::PoolQuery},
prelude::*,
topology::guard::TopologyGuard,
},
};
use std::collections::BTreeMap;
struct RootBootstrapContext {
subnet_cfg: SubnetConfig,
network: Option<BuildNetwork>,
}
impl RootBootstrapContext {
fn load() -> Result<Self, InternalError> {
let subnet_cfg = ConfigOps::current_subnet()?;
let network = NetworkOps::build_network();
Ok(Self {
subnet_cfg,
network,
})
}
}
fn root_has_embedded_wasm_store_bootstrap() -> bool {
ModuleSourceRuntimeApi::has_embedded_module_source(&CanisterRole::WASM_STORE)
}
fn root_missing_staged_release_roles(
data: &RootBootstrapContext,
) -> Result<Vec<CanisterRole>, InternalError> {
let mut missing = Vec::new();
for role in &data.subnet_cfg.auto_create {
if role.is_wasm_store() {
continue;
}
if !TemplateChunkedOps::has_publishable_chunked_approved_for_role(role)? {
missing.push(role.clone());
}
}
Ok(missing)
}
pub async fn bootstrap_init_root_canister() {
if !root_has_embedded_wasm_store_bootstrap() {
log!(
Topic::Init,
Error,
"bootstrap (root:init) embedded wasm_store bootstrap module is not registered"
);
return;
}
let data = match RootBootstrapContext::load() {
Ok(data) => data,
Err(err) => {
log!(
Topic::Init,
Error,
"bootstrap (root:init) bootstrap preflight failed: {err}"
);
return;
}
};
let missing_roles = match root_missing_staged_release_roles(&data) {
Ok(missing_roles) => missing_roles,
Err(err) => {
log!(
Topic::Init,
Error,
"bootstrap (root:init) release-set preflight failed: {err}"
);
return;
}
};
if !missing_roles.is_empty() {
log!(
Topic::Init,
Info,
"bootstrap (root:init) waiting for staged release roles: {:?}",
missing_roles
);
return;
}
let _guard = match TopologyGuard::try_enter() {
Ok(g) => g,
Err(err) => {
log!(Topic::Init, Info, "bootstrap (root:init) skipped: {err}");
return;
}
};
log!(Topic::Init, Info, "bootstrap (root:init) start");
root_import_pool_from_config(true).await;
canic_core::perf!("bootstrap_import_pool");
if let Err(err) = root_create_canisters().await {
log!(Topic::Init, Error, "registry phase failed: {err}");
return;
}
canic_core::perf!("bootstrap_create_canisters");
if let Err(err) = root_rebuild_directories_from_registry() {
log!(
Topic::Init,
Error,
"directory materialization failed: {err}"
);
return;
}
canic_core::perf!("bootstrap_rebuild_directories");
let report = root_validate_state();
canic_core::perf!("bootstrap_validate_state");
if !report.ok {
log!(
Topic::Init,
Error,
"bootstrap validation failed:\n{:#?}",
report.issues
);
return;
}
log!(Topic::Init, Info, "bootstrap (root:init) complete");
ReadyOps::mark_ready();
}
pub async fn bootstrap_post_upgrade_root_canister() {
if !root_has_embedded_wasm_store_bootstrap() {
log!(
Topic::Init,
Error,
"bootstrap (root:upgrade) embedded wasm_store bootstrap module is not registered"
);
return;
}
let data = match RootBootstrapContext::load() {
Ok(data) => data,
Err(err) => {
log!(
Topic::Init,
Error,
"bootstrap (root:upgrade) bootstrap preflight failed: {err}"
);
return;
}
};
let missing_roles = match root_missing_staged_release_roles(&data) {
Ok(missing_roles) => missing_roles,
Err(err) => {
log!(
Topic::Init,
Error,
"bootstrap (root:upgrade) release-set preflight failed: {err}"
);
return;
}
};
if !missing_roles.is_empty() {
log!(
Topic::Init,
Info,
"bootstrap (root:upgrade) waiting for staged release roles: {:?}",
missing_roles
);
return;
}
log!(Topic::Init, Info, "bootstrap (root:upgrade) start");
root_set_subnet_id().await;
root_import_pool_from_config(false).await;
if let Err(err) = root_reconcile_wasm_store().await {
log!(Topic::Init, Error, "wasm store reconcile failed: {err}");
return;
}
log!(Topic::Init, Info, "bootstrap (root:upgrade) complete");
ReadyOps::mark_ready();
}
pub async fn root_set_subnet_id() {
let network = NetworkOps::build_network();
match IcWorkflow::try_get_current_subnet_pid().await {
Ok(Some(subnet_pid)) => {
EnvOps::set_subnet_pid(subnet_pid);
return;
}
Ok(None) => {
if network == Some(BuildNetwork::Ic) {
let msg = "try_get_current_subnet_pid returned None on ic; refusing to fall back";
log!(Topic::Topology, Error, "{msg}");
return;
}
}
Err(err) => {
if network == Some(BuildNetwork::Ic) {
let msg = format!("try_get_current_subnet_pid failed on ic: {err}");
log!(Topic::Topology, Error, "{msg}");
return;
}
}
}
let fallback = IcOps::canister_self();
EnvOps::set_subnet_pid(fallback);
log!(
Topic::Topology,
Info,
"try_get_current_subnet_pid unavailable; using self as subnet: {fallback}"
);
}
pub async fn root_import_pool_from_config(wait_for_queued_imports: bool) {
let data = match RootBootstrapContext::load() {
Ok(data) => data,
Err(err) => {
log!(
Topic::CanisterPool,
Warn,
"pool import skipped: no subnet cfg ({err})"
);
return;
}
};
ensure_pool_imported(&data, wait_for_queued_imports).await;
}
pub async fn root_create_canisters() -> Result<(), InternalError> {
let data = RootBootstrapContext::load()?;
log!(
Topic::Init,
Info,
"auto_create: {:?}",
data.subnet_cfg.auto_create
);
ensure_required_wasm_store_canister().await?;
canic_core::perf!("bootstrap_ensure_wasm_store");
WasmStorePublicationWorkflow::publish_staged_release_set_to_current_store().await?;
canic_core::perf!("bootstrap_publish_release_set");
ensure_required_canisters(&data).await
}
pub fn root_rebuild_directories_from_registry() -> Result<(), InternalError> {
let _ = ProvisionWorkflow::rebuild_directories_from_registry(None)?;
Ok(())
}
#[expect(clippy::too_many_lines)]
async fn ensure_pool_imported(data: &RootBootstrapContext, wait_for_queued_imports: bool) {
let initial_cfg = data
.subnet_cfg
.pool
.import
.initial
.map_or_else(|| "unset".to_string(), |v| v.to_string());
let import_list = match data.network {
Some(BuildNetwork::Local) => data.subnet_cfg.pool.import.local.clone(),
Some(BuildNetwork::Ic) => data.subnet_cfg.pool.import.ic.clone(),
None => {
log!(
Topic::CanisterPool,
Warn,
"pool import skipped: no build network"
);
return;
}
};
let initial_limit = data
.subnet_cfg
.pool
.import
.initial
.map_or(data.subnet_cfg.pool.minimum_size as usize, |count| {
count as usize
});
log!(
Topic::CanisterPool,
Info,
"pool import cfg: net={} min={} init={} limit={} wait={}",
data.network.map_or("unknown", BuildNetwork::as_str),
data.subnet_cfg.pool.minimum_size,
initial_cfg,
initial_limit,
wait_for_queued_imports
);
if !import_list.is_empty() {
log!(
Topic::CanisterPool,
Info,
"pool import candidates={} pids={}",
import_list.len(),
summarize_principals(&import_list, 12)
);
}
if initial_limit == 0 && !data.subnet_cfg.auto_create.is_empty() {
log!(
Topic::CanisterPool,
Warn,
"pool import init=0 with auto_create; queued imports may lag creation"
);
}
if import_list.is_empty() {
log!(
Topic::CanisterPool,
Warn,
"pool import skipped: empty list for net={}",
data.network.map_or("unknown", BuildNetwork::as_str)
);
log_pool_stats("after-empty-import-skip", data.subnet_cfg.pool.minimum_size);
return;
}
let (initial, queued) = import_list.split_at(initial_limit.min(import_list.len()));
let configured_initial = initial.len() as u64;
let configured_queued = queued.len() as u64;
let mut imported = 0_u64;
let mut immediate_skipped = 0_u64;
let mut immediate_failed = 0_u64;
let mut immediate_already_present = 0_u64;
let mut queued_added = 0_u64;
let mut queued_requeued = 0_u64;
let mut queued_skipped = 0_u64;
let mut queued_failed = 0_u64;
let mut queued_already_present = 0_u64;
let mut immediate_imported_pids = Vec::new();
let mut immediate_skipped_pids = Vec::new();
let mut immediate_failed_pids = Vec::new();
let mut immediate_present_pids = Vec::new();
let mut queued_added_pids = Vec::new();
let mut queued_skipped_pids = Vec::new();
let mut queued_failed_pids = Vec::new();
let mut queued_present_pids = Vec::new();
for pid in initial {
if PoolOps::contains(pid) {
immediate_already_present += 1;
immediate_present_pids.push(*pid);
continue;
}
if matches!(PoolWorkflow::pool_import_canister(*pid).await, Ok(())) {
if PoolOps::contains(pid) {
imported += 1;
immediate_imported_pids.push(*pid);
} else {
immediate_skipped += 1;
immediate_skipped_pids.push(*pid);
}
} else {
immediate_failed += 1;
immediate_failed_pids.push(*pid);
}
}
let queued_imports: Vec<Principal> = queued
.iter()
.copied()
.filter(|pid| {
if PoolOps::contains(pid) {
queued_already_present += 1;
queued_present_pids.push(*pid);
false
} else {
true
}
})
.collect();
if !queued_imports.is_empty() {
if wait_for_queued_imports {
for pid in queued_imports {
if matches!(PoolWorkflow::pool_import_canister(pid).await, Ok(())) {
if PoolOps::contains(&pid) {
queued_added += 1;
queued_added_pids.push(pid);
} else {
queued_skipped += 1;
queued_skipped_pids.push(pid);
}
} else {
queued_failed += 1;
queued_failed_pids.push(pid);
}
}
} else {
log!(
Topic::CanisterPool,
Info,
"pool import queued async count={} pids={}",
queued_imports.len(),
summarize_principals(&queued_imports, 12)
);
match PoolWorkflow::pool_import_queued_canisters(queued_imports).await {
Ok(result) => {
queued_added = result.added;
queued_requeued = result.requeued;
queued_skipped = result.skipped;
}
Err(err) => {
queued_failed = configured_queued - queued_already_present;
log!(Topic::CanisterPool, Warn, "pool import queue failed: {err}");
}
}
}
}
log!(
Topic::CanisterPool,
Info,
"pool import now: cfg={} ok={imported} skip={immediate_skipped} fail={immediate_failed} present={immediate_already_present}",
configured_initial
);
log!(
Topic::CanisterPool,
Info,
"pool import now pids: ok={} skip={} fail={} present={}",
summarize_principals(&immediate_imported_pids, 12),
summarize_principals(&immediate_skipped_pids, 12),
summarize_principals(&immediate_failed_pids, 12),
summarize_principals(&immediate_present_pids, 12),
);
if configured_queued > 0 {
if queued_failed > 0 {
log!(
Topic::CanisterPool,
Warn,
"pool import queued: cfg={} fail={queued_failed} present={queued_already_present}",
configured_queued
);
} else {
log!(
Topic::CanisterPool,
Info,
"pool import queued: cfg={} added={queued_added} requeued={queued_requeued} skip={queued_skipped} present={queued_already_present}",
configured_queued
);
}
if wait_for_queued_imports {
log!(
Topic::CanisterPool,
Info,
"pool import queued pids: added={} skip={} fail={} present={}",
summarize_principals(&queued_added_pids, 12),
summarize_principals(&queued_skipped_pids, 12),
summarize_principals(&queued_failed_pids, 12),
summarize_principals(&queued_present_pids, 12),
);
} else {
log!(
Topic::CanisterPool,
Info,
"pool import queued pids: present={} (scheduler resolves added/requeued/skip)",
summarize_principals(&queued_present_pids, 12),
);
}
}
log_pool_stats("after-import", data.subnet_cfg.pool.minimum_size);
}
async fn ensure_required_canisters(data: &RootBootstrapContext) -> Result<(), InternalError> {
for role in &data.subnet_cfg.auto_create {
if SubnetRegistryOps::has_role(role) {
log!(Topic::Init, Info, "auto_create: {role} present; skip");
continue;
}
if !TemplateManifestOps::has_approved_for_role(role)? {
log!(
Topic::Init,
Warn,
"auto_create: skipping {role}; approved manifest not staged"
);
continue;
}
let manifest = TemplateManifestOps::approved_for_role_response(role)?;
log!(
Topic::Init,
Info,
"auto_create: creating {role} from {}@{}",
manifest.template_id,
manifest.version
);
CanisterLifecycleWorkflow::apply(CanisterLifecycleEvent::Create {
role: role.clone(),
parent: IcOps::canister_self(),
extra_arg: None,
})
.await?;
canic_core::perf!("bootstrap_create_role");
}
Ok(())
}
async fn root_reconcile_wasm_store() -> Result<(), InternalError> {
ensure_required_wasm_store_canister().await?;
canic_core::perf!("bootstrap_ensure_wasm_store");
let deprecated = WasmStorePublicationWorkflow::prune_unconfigured_managed_releases()?;
if deprecated > 0 {
log!(
Topic::Init,
Warn,
"ws: deprecated {deprecated} stale managed release(s) no longer present in config"
);
}
canic_core::perf!("bootstrap_prune_store_catalog");
import_default_wasm_store_catalog().await
}
async fn ensure_required_wasm_store_canister() -> Result<(), InternalError> {
let role = CanisterRole::WASM_STORE;
let existing_bindings = WasmStorePublicationWorkflow::sync_registered_wasm_store_inventory();
if !existing_bindings.is_empty() {
log!(Topic::Init, Info, "ws: {role} present; skip");
return Ok(());
}
log!(Topic::Init, Info, "ws: create {role}");
CanisterLifecycleWorkflow::apply(CanisterLifecycleEvent::Create {
role,
parent: IcOps::canister_self(),
extra_arg: None,
})
.await?;
canic_core::perf!("bootstrap_create_wasm_store");
let _ = WasmStorePublicationWorkflow::sync_registered_wasm_store_inventory();
canic_core::perf!("bootstrap_sync_store_inventory");
Ok(())
}
async fn import_default_wasm_store_catalog() -> Result<(), InternalError> {
WasmStorePublicationWorkflow::import_current_store_catalog().await?;
canic_core::perf!("bootstrap_import_store_catalog");
log!(Topic::Init, Info, "ws: imported default catalog");
Ok(())
}
pub fn root_validate_state() -> ValidationReport {
let app_data = AppDirectoryOps::data();
let subnet_data = SubnetDirectoryOps::data();
let mut issues = Vec::new();
let env_missing = EnvOps::missing_required_fields();
let env_complete = env_missing.is_empty();
if !env_complete {
issues.push(ValidationIssue {
code: "env_missing_fields".to_string(),
message: format!("missing env fields: {}", env_missing.join(", ")),
});
}
let registry_roles = SubnetRegistryOps::role_index();
let (app_unique, app_consistent) = check_directory(
"app_directory",
&app_data.entries,
®istry_roles,
&mut issues,
);
let (subnet_unique, subnet_consistent) = check_directory(
"subnet_directory",
&subnet_data.entries,
®istry_roles,
&mut issues,
);
let unique_directory_roles = app_unique && subnet_unique;
let registry_directory_consistent = app_consistent && subnet_consistent;
let ok = env_complete && unique_directory_roles && registry_directory_consistent;
ValidationReport {
ok,
registry_directory_consistent,
unique_directory_roles,
env_complete,
issues,
}
}
fn check_directory(
label: &str,
entries: &[(CanisterRole, Principal)],
registry_roles: &BTreeMap<CanisterRole, Vec<Principal>>,
issues: &mut Vec<ValidationIssue>,
) -> (bool, bool) {
let mut unique = true;
let mut consistent = true;
let mut seen = BTreeMap::<CanisterRole, usize>::new();
for (role, pid) in entries {
let count = seen.entry(role.clone()).or_insert(0);
*count += 1;
if *count > 1 {
unique = false;
issues.push(ValidationIssue {
code: "directory_role_duplicate".to_string(),
message: format!("{label} has duplicate role {role}"),
});
}
match registry_roles.get(role) {
None => {
consistent = false;
issues.push(ValidationIssue {
code: "directory_role_missing_in_registry".to_string(),
message: format!("{label} role {role} not present in registry"),
});
}
Some(pids) if pids.len() > 1 => {
consistent = false;
issues.push(ValidationIssue {
code: "directory_role_duplicate_in_registry".to_string(),
message: format!(
"{label} role {role} has multiple registry entries ({})",
pids.len()
),
});
}
Some(pids) => {
if pids[0] != *pid {
consistent = false;
issues.push(ValidationIssue {
code: "directory_role_pid_mismatch".to_string(),
message: format!(
"{label} role {role} points to {pid}, registry has {}",
pids[0]
),
});
}
}
}
}
(unique, consistent)
}
fn summarize_principals(pids: &[Principal], limit: usize) -> String {
if pids.is_empty() {
return "[]".to_string();
}
let shown: Vec<String> = pids.iter().take(limit).map(ToString::to_string).collect();
let remaining = pids.len().saturating_sub(shown.len());
if remaining == 0 {
format!("[{}]", shown.join(", "))
} else {
format!("[{} ... +{remaining} more]", shown.join(", "))
}
}
fn log_pool_stats(stage: &str, minimum_size: u8) {
let snapshot = PoolQuery::pool_list();
let mut ready = 0_usize;
let mut pending = 0_usize;
let mut failed = 0_usize;
let mut pending_pids = Vec::new();
let mut failed_pids = Vec::new();
for entry in snapshot.entries {
match entry.status {
CanisterPoolStatus::Ready => {
ready += 1;
}
CanisterPoolStatus::PendingReset => {
pending += 1;
pending_pids.push(entry.pid);
}
CanisterPoolStatus::Failed { .. } => {
failed += 1;
failed_pids.push(entry.pid);
}
}
}
let total = ready + pending + failed;
log!(
Topic::CanisterPool,
Info,
"pool stats ({stage}): total={total}, ready={ready}, pending_reset={pending}, failed={failed}, minimum_size={minimum_size}",
);
if ready < minimum_size as usize {
log!(
Topic::CanisterPool,
Warn,
"pool ready below minimum_size ({stage}): ready={ready}, minimum_size={minimum_size}",
);
}
if pending > 0 {
log!(
Topic::CanisterPool,
Info,
"pool pending_reset pids: {}",
summarize_principals(&pending_pids, 12)
);
}
if failed > 0 {
log!(
Topic::CanisterPool,
Warn,
"pool failed pids: {}",
summarize_principals(&failed_pids, 12)
);
}
}