use super::WASM_STORE_BOOTSTRAP_BINDING;
use super::call_store_result;
use super::store_pid_for_binding;
use crate::{
config,
dto::template::{
TemplateChunkInput, TemplateChunkResponse, TemplateChunkSetInfoResponse,
TemplateChunkSetPrepareInput, TemplateManifestInput, TemplateManifestResponse,
WasmStoreAdminCommand, WasmStoreAdminResponse, WasmStoreCatalogEntryResponse,
WasmStoreFinalizedStoreResponse, WasmStoreStatusResponse,
},
ids::{
CanisterRole, TemplateChunkingMode, TemplateId, TemplateManifestState, TemplateReleaseKey,
TemplateVersion, WasmStoreBinding, WasmStoreGcMode,
},
ops::storage::{
state::subnet::SubnetStateOps,
template::{TemplateChunkedOps, TemplateManifestOps},
},
schema::WasmStoreConfig,
storage::stable::state::subnet::{PublicationStoreStateRecord, WasmStoreRecord},
};
use canic_core::{__control_plane_core as cp_core, log, log::Topic};
use cp_core::{
InternalError, InternalErrorOrigin,
cdk::types::Principal,
ops::{
ic::{IcOps, mgmt::MgmtOps},
storage::registry::subnet::SubnetRegistryOps,
},
protocol,
workflow::{
canister_lifecycle::{CanisterLifecycleEvent, CanisterLifecycleWorkflow},
ic::provision::ProvisionWorkflow,
},
};
use std::collections::{BTreeMap, BTreeSet};
const WASM_STORE_ROLE: CanisterRole = CanisterRole::WASM_STORE;
pub(super) async fn store_catalog(
store_pid: Principal,
) -> Result<Vec<WasmStoreCatalogEntryResponse>, InternalError> {
call_store_result(store_pid, protocol::CANIC_WASM_STORE_CATALOG, ()).await
}
pub(super) async fn store_chunk_set_info(
store_pid: Principal,
template_id: &TemplateId,
version: &TemplateVersion,
) -> Result<TemplateChunkSetInfoResponse, InternalError> {
call_store_result(
store_pid,
protocol::CANIC_WASM_STORE_INFO,
(
template_id.as_str().to_string(),
version.as_str().to_string(),
),
)
.await
}
pub(super) async fn store_status(
store_pid: Principal,
) -> Result<WasmStoreStatusResponse, InternalError> {
call_store_result(store_pid, protocol::CANIC_WASM_STORE_STATUS, ()).await
}
pub(super) async fn store_stage_manifest(
store_pid: Principal,
request: TemplateManifestInput,
) -> Result<(), InternalError> {
call_store_result(
store_pid,
protocol::CANIC_WASM_STORE_STAGE_MANIFEST,
(request,),
)
.await
}
pub(super) async fn store_prepare_gc(store_pid: Principal) -> Result<(), InternalError> {
call_store_result(store_pid, protocol::CANIC_WASM_STORE_PREPARE_GC, ()).await
}
pub(super) async fn store_begin_gc(store_pid: Principal) -> Result<(), InternalError> {
call_store_result(store_pid, protocol::CANIC_WASM_STORE_BEGIN_GC, ()).await
}
pub(super) async fn store_complete_gc(store_pid: Principal) -> Result<(), InternalError> {
call_store_result(store_pid, protocol::CANIC_WASM_STORE_COMPLETE_GC, ()).await
}
pub(super) async fn store_chunks(
store_pid: Principal,
template_id: &TemplateId,
version: &TemplateVersion,
chunk_count: usize,
) -> Result<Vec<Vec<u8>>, InternalError> {
let mut chunks = Vec::with_capacity(chunk_count);
for chunk_index in 0..chunk_count {
let chunk_index = u32::try_from(chunk_index).map_err(|_| {
InternalError::workflow(
InternalErrorOrigin::Workflow,
format!("template '{template_id}' exceeds supported chunk indexing bounds"),
)
})?;
let response: TemplateChunkResponse = call_store_result(
store_pid,
protocol::CANIC_WASM_STORE_CHUNK,
(
template_id.as_str().to_string(),
version.as_str().to_string(),
chunk_index,
),
)
.await?;
chunks.push(response.bytes);
}
Ok(chunks)
}
pub(super) fn store_binding_for_pid(
store_pid: Principal,
) -> Result<WasmStoreBinding, InternalError> {
SubnetStateOps::wasm_store_binding_for_pid(store_pid).ok_or_else(|| {
InternalError::workflow(
InternalErrorOrigin::Workflow,
format!("wasm store {store_pid} is not registered"),
)
})
}
fn local_chunks(
template_id: &TemplateId,
version: &TemplateVersion,
chunk_count: usize,
) -> Result<Vec<Vec<u8>>, InternalError> {
let mut chunks = Vec::with_capacity(chunk_count);
for chunk_index in 0..chunk_count {
let chunk_index = u32::try_from(chunk_index).map_err(|_| {
InternalError::workflow(
InternalErrorOrigin::Workflow,
format!("template '{template_id}' exceeds supported chunk indexing bounds"),
)
})?;
let response = TemplateChunkedOps::chunk_response(template_id, version, chunk_index)?;
chunks.push(response.bytes);
}
Ok(chunks)
}
pub struct WasmStorePublicationWorkflow;
#[derive(Clone, Debug, Eq, PartialEq)]
struct PublicationStoreSnapshot {
binding: WasmStoreBinding,
pid: Principal,
created_at: u64,
status: WasmStoreStatusResponse,
releases: Vec<WasmStoreCatalogEntryResponse>,
}
#[derive(Clone, Debug)]
struct PublicationStoreFleet {
preferred_binding: Option<WasmStoreBinding>,
reserved_state: PublicationStoreStateRecord,
stores: Vec<PublicationStoreSnapshot>,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum PublicationPlacementAction {
Reuse,
Publish,
Create,
}
#[derive(Clone, Debug, Eq, PartialEq)]
struct PublicationPlacement {
binding: WasmStoreBinding,
pid: Principal,
action: PublicationPlacementAction,
}
impl PublicationStoreSnapshot {
fn release_key(entry: &WasmStoreCatalogEntryResponse) -> TemplateReleaseKey {
TemplateReleaseKey::new(entry.template_id.clone(), entry.version.clone())
}
fn has_exact_release(&self, manifest: &TemplateManifestResponse) -> bool {
self.releases.iter().any(|entry| {
entry.role == manifest.role
&& entry.template_id == manifest.template_id
&& entry.version == manifest.version
&& entry.payload_hash == manifest.payload_hash
&& entry.payload_size_bytes == manifest.payload_size_bytes
})
}
fn conflicting_release(
&self,
manifest: &TemplateManifestResponse,
) -> Option<&WasmStoreCatalogEntryResponse> {
self.releases.iter().find(|entry| {
entry.template_id == manifest.template_id
&& entry.version == manifest.version
&& (entry.role != manifest.role
|| entry.payload_hash != manifest.payload_hash
|| entry.payload_size_bytes != manifest.payload_size_bytes)
})
}
fn can_accept_release(&self, manifest: &TemplateManifestResponse) -> bool {
if self.has_exact_release(manifest) {
return true;
}
if self.conflicting_release(manifest).is_some() {
return false;
}
if self.status.remaining_store_bytes < manifest.payload_size_bytes {
return false;
}
let templates = self
.status
.templates
.iter()
.map(|template| (template.template_id.clone(), template.versions))
.collect::<BTreeMap<_, _>>();
let current_versions = templates
.get(&manifest.template_id)
.copied()
.unwrap_or_default();
if current_versions == 0
&& self
.status
.max_templates
.is_some_and(|max_templates| self.status.template_count >= max_templates)
{
return false;
}
if self
.status
.max_template_versions_per_template
.is_some_and(|max_versions| current_versions >= max_versions)
{
return false;
}
true
}
fn record_release(&mut self, manifest: &TemplateManifestResponse) {
if self.has_exact_release(manifest) {
return;
}
self.releases.push(WasmStoreCatalogEntryResponse {
role: manifest.role.clone(),
template_id: manifest.template_id.clone(),
version: manifest.version.clone(),
payload_hash: manifest.payload_hash.clone(),
payload_size_bytes: manifest.payload_size_bytes,
});
self.releases
.sort_by(|left, right| Self::release_key(left).cmp(&Self::release_key(right)));
self.status.occupied_store_bytes = self
.status
.occupied_store_bytes
.saturating_add(manifest.payload_size_bytes);
self.status.remaining_store_bytes = self
.status
.remaining_store_bytes
.saturating_sub(manifest.payload_size_bytes);
self.status.within_headroom = self
.status
.headroom_bytes
.is_some_and(|threshold| self.status.remaining_store_bytes <= threshold);
self.status.release_count = self.status.release_count.saturating_add(1);
if let Some(existing) = self
.status
.templates
.iter_mut()
.find(|template| template.template_id == manifest.template_id)
{
existing.versions = existing.versions.saturating_add(1);
} else {
self.status.template_count = self.status.template_count.saturating_add(1);
self.status
.templates
.push(crate::dto::template::WasmStoreTemplateStatusResponse {
template_id: manifest.template_id.clone(),
versions: 1,
});
self.status
.templates
.sort_by(|left, right| left.template_id.cmp(&right.template_id));
}
}
}
impl PublicationStoreFleet {
fn writable_store_indices(&self) -> Vec<usize> {
let mut indexed = self
.stores
.iter()
.enumerate()
.filter(|(_, store)| {
!WasmStorePublicationWorkflow::binding_is_reserved_for_publication(
&self.reserved_state,
&store.binding,
)
})
.collect::<Vec<_>>();
indexed.sort_by(|(_, left), (_, right)| {
let left_rank = usize::from(self.preferred_binding.as_ref() != Some(&left.binding));
let right_rank = usize::from(self.preferred_binding.as_ref() != Some(&right.binding));
left_rank
.cmp(&right_rank)
.then(left.created_at.cmp(&right.created_at))
.then(left.binding.cmp(&right.binding))
});
indexed.into_iter().map(|(index, _)| index).collect()
}
fn select_existing_store_for_release(
&self,
manifest: &TemplateManifestResponse,
) -> Result<Option<PublicationPlacement>, InternalError> {
let mut exact_match = None;
for index in self.writable_store_indices() {
let store = &self.stores[index];
if let Some(conflict) = store.conflicting_release(manifest) {
return Err(InternalError::workflow(
InternalErrorOrigin::Workflow,
format!(
"ws conflict for {}@{} on {}: existing hash/size differ ({:?}, {})",
manifest.template_id,
manifest.version,
store.binding,
conflict.payload_hash,
conflict.payload_size_bytes
),
));
}
if store.has_exact_release(manifest) {
exact_match = Some(PublicationPlacement {
binding: store.binding.clone(),
pid: store.pid,
action: PublicationPlacementAction::Reuse,
});
break;
}
}
if exact_match.is_some() {
return Ok(exact_match);
}
for index in self.writable_store_indices() {
let store = &self.stores[index];
if store.can_accept_release(manifest) {
return Ok(Some(PublicationPlacement {
binding: store.binding.clone(),
pid: store.pid,
action: PublicationPlacementAction::Publish,
}));
}
}
Ok(None)
}
fn record_placement(
&mut self,
binding: &WasmStoreBinding,
manifest: &TemplateManifestResponse,
) {
if let Some(store) = self
.stores
.iter_mut()
.find(|store| &store.binding == binding)
{
store.record_release(manifest);
}
}
fn push_store(&mut self, record: WasmStoreRecord, config: WasmStoreConfig) {
self.stores.push(PublicationStoreSnapshot {
binding: record.binding,
pid: record.pid,
created_at: record.created_at,
status: WasmStoreStatusResponse {
gc: crate::dto::template::WasmStoreGcStatusResponse {
mode: record.gc.mode,
changed_at: record.gc.changed_at,
prepared_at: record.gc.prepared_at,
started_at: record.gc.started_at,
completed_at: record.gc.completed_at,
runs_completed: record.gc.runs_completed,
},
occupied_store_bytes: 0,
occupied_store_size: "0.00 B".to_string(),
max_store_bytes: config.max_store_bytes(),
max_store_size: canic_core::__control_plane_core::format::byte_size(
config.max_store_bytes(),
),
remaining_store_bytes: config.max_store_bytes(),
remaining_store_size: canic_core::__control_plane_core::format::byte_size(
config.max_store_bytes(),
),
headroom_bytes: config.headroom_bytes(),
headroom_size: config.headroom_bytes().map(cp_core::format::byte_size),
within_headroom: false,
template_count: 0,
max_templates: config.max_templates(),
release_count: 0,
max_template_versions_per_template: config.max_template_versions_per_template(),
templates: Vec::new(),
},
releases: Vec::new(),
});
}
}
impl WasmStorePublicationWorkflow {
const WASM_STORE_CAPACITY_EXCEEDED_MESSAGE: &str = "wasm store capacity exceeded";
fn binding_for_store_pid(store_pid: Principal) -> WasmStoreBinding {
WasmStoreBinding::owned(store_pid.to_text())
}
pub fn sync_registered_wasm_store_inventory() -> Vec<WasmStoreBinding> {
let mut bindings = Vec::new();
for pid in SubnetRegistryOps::pids_for_role(&WASM_STORE_ROLE).unwrap_or_default() {
let binding = Self::binding_for_store_pid(pid);
let created_at = SubnetRegistryOps::get(pid).map_or(0, |record| record.created_at);
let _ = SubnetStateOps::upsert_wasm_store(binding.clone(), pid, created_at);
bindings.push(binding);
}
bindings
}
async fn create_publication_store() -> Result<WasmStoreBinding, InternalError> {
let result = CanisterLifecycleWorkflow::apply(CanisterLifecycleEvent::Create {
role: WASM_STORE_ROLE,
parent: IcOps::canister_self(),
extra_arg: None,
})
.await?;
let pid = result.new_canister_pid.ok_or_else(|| {
InternalError::workflow(
InternalErrorOrigin::Workflow,
"wasm store creation did not return a pid",
)
})?;
let binding = Self::binding_for_store_pid(pid);
let created_at =
SubnetRegistryOps::get(pid).map_or_else(IcOps::now_secs, |record| record.created_at);
let _ = SubnetStateOps::upsert_wasm_store(binding.clone(), pid, created_at);
log!(Topic::Wasm, Ok, "ws created {} ({})", binding, pid);
Ok(binding)
}
async fn snapshot_publication_store_fleet() -> Result<PublicationStoreFleet, InternalError> {
Self::sync_registered_wasm_store_inventory();
let preferred_binding = match SubnetStateOps::publication_store_binding() {
Some(binding) if store_pid_for_binding(&binding).is_ok() => Some(binding),
Some(binding) => Some(Self::clear_stale_publication_binding(binding)?),
None => Self::oldest_registered_store_binding(),
};
let reserved_state = SubnetStateOps::publication_store_state();
let mut stores = Vec::new();
for record in SubnetStateOps::wasm_stores() {
let status = store_status(record.pid).await?;
let releases = store_catalog(record.pid).await?;
stores.push(PublicationStoreSnapshot {
binding: record.binding,
pid: record.pid,
created_at: record.created_at,
status,
releases,
});
}
Ok(PublicationStoreFleet {
preferred_binding,
reserved_state,
stores,
})
}
async fn create_store_for_fleet(
fleet: &mut PublicationStoreFleet,
) -> Result<PublicationPlacement, InternalError> {
let binding = match fleet.preferred_binding.clone() {
Some(_) => Self::create_publication_store().await?,
None => Self::create_and_activate_first_publication_store().await?,
};
let store_pid = store_pid_for_binding(&binding)?;
let record = SubnetStateOps::wasm_stores()
.into_iter()
.find(|record| record.binding == binding)
.ok_or_else(|| {
InternalError::workflow(
InternalErrorOrigin::Workflow,
format!("new ws '{binding}' missing from subnet state"),
)
})?;
fleet.push_store(record, config::current_subnet_default_wasm_store());
if fleet.preferred_binding.is_none() {
fleet.preferred_binding = Some(binding.clone());
}
fleet.reserved_state = SubnetStateOps::publication_store_state();
Ok(PublicationPlacement {
binding,
pid: store_pid,
action: PublicationPlacementAction::Create,
})
}
pub async fn handle_admin(
cmd: WasmStoreAdminCommand,
) -> Result<WasmStoreAdminResponse, InternalError> {
match cmd {
WasmStoreAdminCommand::PublishCurrentReleaseToStore { store_pid } => {
Self::publish_current_release_set_to_store(store_pid).await?;
Ok(WasmStoreAdminResponse::PublishedCurrentReleaseToStore { store_pid })
}
WasmStoreAdminCommand::PublishCurrentReleaseToCurrentStore => {
Self::publish_current_release_set_to_current_store().await?;
Ok(WasmStoreAdminResponse::PublishedCurrentReleaseToCurrentStore)
}
WasmStoreAdminCommand::SetPublicationBinding { binding } => {
Self::set_current_publication_store_binding(binding.clone())?;
Ok(WasmStoreAdminResponse::SetPublicationBinding { binding })
}
WasmStoreAdminCommand::ClearPublicationBinding => {
Self::clear_current_publication_store_binding();
Ok(WasmStoreAdminResponse::ClearedPublicationBinding)
}
WasmStoreAdminCommand::RetireDetachedBinding => {
let binding = Self::retire_detached_publication_store_binding();
Ok(WasmStoreAdminResponse::RetiredDetachedBinding { binding })
}
WasmStoreAdminCommand::PrepareRetiredStoreGc => {
let binding = Self::prepare_retired_publication_store_for_gc().await?;
Ok(WasmStoreAdminResponse::PreparedRetiredStoreGc { binding })
}
WasmStoreAdminCommand::BeginRetiredStoreGc => {
let binding = Self::begin_retired_publication_store_gc().await?;
Ok(WasmStoreAdminResponse::BeganRetiredStoreGc { binding })
}
WasmStoreAdminCommand::CompleteRetiredStoreGc => {
let binding = Self::complete_retired_publication_store_gc().await?;
Ok(WasmStoreAdminResponse::CompletedRetiredStoreGc { binding })
}
WasmStoreAdminCommand::FinalizeRetiredBinding => {
let result = Self::finalize_retired_publication_store_binding()
.await?
.map(|(binding, store_pid)| WasmStoreFinalizedStoreResponse {
binding,
store_pid,
});
Ok(WasmStoreAdminResponse::FinalizedRetiredBinding { result })
}
WasmStoreAdminCommand::DeleteFinalizedStore { binding, store_pid } => {
Self::delete_finalized_publication_store(binding.clone(), store_pid).await?;
Ok(WasmStoreAdminResponse::DeletedFinalizedStore { binding, store_pid })
}
}
}
fn binding_slot(slot: Option<&WasmStoreBinding>) -> String {
slot.map_or_else(|| "-".to_string(), std::string::ToString::to_string)
}
fn binding_is_reserved_for_publication(
state: &PublicationStoreStateRecord,
binding: &WasmStoreBinding,
) -> bool {
state.detached_binding.as_ref() == Some(binding)
|| state.retired_binding.as_ref() == Some(binding)
}
fn ensure_binding_is_selectable_for_publication(
state: &PublicationStoreStateRecord,
binding: &WasmStoreBinding,
) -> Result<(), InternalError> {
if Self::binding_is_reserved_for_publication(state, binding) {
return Err(InternalError::workflow(
InternalErrorOrigin::Workflow,
format!("ws binding '{binding}' is detached/retired"),
));
}
Ok(())
}
fn log_publication_state_transition(
transition_kind: &str,
previous: &PublicationStoreStateRecord,
current: &PublicationStoreStateRecord,
changed_at: u64,
) {
if previous == current {
return;
}
log!(
Topic::Wasm,
Info,
"ws.transition kind={} gen={} at={} old_a={} old_d={} old_r={} new_a={} new_d={} new_r={}",
transition_kind,
current.generation,
changed_at,
Self::binding_slot(previous.active_binding.as_ref()),
Self::binding_slot(previous.detached_binding.as_ref()),
Self::binding_slot(previous.retired_binding.as_ref()),
Self::binding_slot(current.active_binding.as_ref()),
Self::binding_slot(current.detached_binding.as_ref()),
Self::binding_slot(current.retired_binding.as_ref()),
);
}
fn ensure_retired_binding_slot_available_for_promotion() -> Result<(), InternalError> {
let state = SubnetStateOps::publication_store_state();
if state.detached_binding.is_some() && state.retired_binding.is_some() {
return Err(InternalError::workflow(
InternalErrorOrigin::Workflow,
"ws rollover blocked: retired slot occupied".to_string(),
));
}
Ok(())
}
fn ensure_retired_binding_slot_available_for_retirement() -> Result<(), InternalError> {
let state = SubnetStateOps::publication_store_state();
if state.retired_binding.is_some() {
return Err(InternalError::workflow(
InternalErrorOrigin::Workflow,
"ws retirement blocked: retired slot occupied".to_string(),
));
}
Ok(())
}
pub async fn prepare_retired_publication_store_for_gc()
-> Result<Option<WasmStoreBinding>, InternalError> {
let state = SubnetStateOps::publication_store_state();
let Some(retired_binding) = state.retired_binding.clone() else {
return Ok(None);
};
let store_pid = store_pid_for_binding(&retired_binding)?;
store_prepare_gc(store_pid).await?;
let _ = SubnetStateOps::transition_wasm_store_gc(
&retired_binding,
WasmStoreGcMode::Prepared,
IcOps::now_secs(),
);
log!(
Topic::Wasm,
Ok,
"ws gc prepared {} gen={} retired_at={}",
retired_binding,
state.generation,
state.retired_at
);
Ok(Some(retired_binding))
}
pub async fn begin_retired_publication_store_gc()
-> Result<Option<WasmStoreBinding>, InternalError> {
let state = SubnetStateOps::publication_store_state();
let Some(retired_binding) = state.retired_binding.clone() else {
return Ok(None);
};
let store_pid = store_pid_for_binding(&retired_binding)?;
store_begin_gc(store_pid).await?;
let _ = SubnetStateOps::transition_wasm_store_gc(
&retired_binding,
WasmStoreGcMode::InProgress,
IcOps::now_secs(),
);
log!(
Topic::Wasm,
Ok,
"ws gc begin {} gen={} retired_at={}",
retired_binding,
state.generation,
state.retired_at
);
Ok(Some(retired_binding))
}
pub async fn complete_retired_publication_store_gc()
-> Result<Option<WasmStoreBinding>, InternalError> {
let state = SubnetStateOps::publication_store_state();
let Some(retired_binding) = state.retired_binding.clone() else {
return Ok(None);
};
let store_pid = store_pid_for_binding(&retired_binding)?;
store_complete_gc(store_pid).await?;
let _ = SubnetStateOps::transition_wasm_store_gc(
&retired_binding,
WasmStoreGcMode::Complete,
IcOps::now_secs(),
);
log!(
Topic::Wasm,
Ok,
"ws gc complete {} gen={} retired_at={}",
retired_binding,
state.generation,
state.retired_at
);
Ok(Some(retired_binding))
}
pub async fn finalize_retired_publication_store_binding()
-> Result<Option<(WasmStoreBinding, Principal)>, InternalError> {
let state = SubnetStateOps::publication_store_state();
let Some(retired_binding) = state.retired_binding.clone() else {
return Ok(None);
};
let store_pid = store_pid_for_binding(&retired_binding)?;
let store = store_status(store_pid).await?;
if store.gc.mode != WasmStoreGcMode::Complete {
return Err(InternalError::workflow(
InternalErrorOrigin::Workflow,
format!(
"retired ws '{}' not ready for finalize; gc={:?}",
retired_binding, store.gc.mode
),
));
}
let changed_at = IcOps::now_secs();
let previous = SubnetStateOps::publication_store_state();
let finalized = SubnetStateOps::finalize_retired_publication_store_binding(changed_at)
.map(|binding| (binding, store_pid));
if let Some((binding, finalized_store_pid)) = finalized.as_ref() {
let current = SubnetStateOps::publication_store_state();
Self::log_publication_state_transition(
"finalize_retired_binding",
&previous,
¤t,
changed_at,
);
log!(
Topic::Wasm,
Ok,
"ws finalized {} ({})",
binding,
finalized_store_pid
);
}
Ok(finalized)
}
pub async fn delete_finalized_publication_store(
binding: WasmStoreBinding,
store_pid: Principal,
) -> Result<(), InternalError> {
let state = SubnetStateOps::publication_store_state();
if state.active_binding.as_ref() == Some(&binding)
|| state.detached_binding.as_ref() == Some(&binding)
|| state.retired_binding.as_ref() == Some(&binding)
{
return Err(InternalError::workflow(
InternalErrorOrigin::Workflow,
format!("ws '{binding}' is still referenced"),
));
}
let store = store_status(store_pid).await?;
if store.gc.mode != WasmStoreGcMode::Complete {
return Err(InternalError::workflow(
InternalErrorOrigin::Workflow,
format!(
"finalized ws '{}' not ready for delete; gc={:?}",
binding, store.gc.mode
),
));
}
if store.occupied_store_bytes != 0 || store.template_count != 0 || store.release_count != 0
{
return Err(InternalError::workflow(
InternalErrorOrigin::Workflow,
format!(
"finalized ws '{}' not empty after gc; bytes={} templates={} releases={}",
binding, store.occupied_store_bytes, store.template_count, store.release_count
),
));
}
ProvisionWorkflow::uninstall_and_delete_canister(store_pid).await?;
let _ = SubnetStateOps::remove_wasm_store(&binding);
log!(Topic::Wasm, Ok, "ws deleted {} ({})", binding, store_pid);
Ok(())
}
pub fn retire_detached_publication_store_binding() -> Option<WasmStoreBinding> {
if let Err(err) = Self::ensure_retired_binding_slot_available_for_retirement() {
log!(Topic::Wasm, Warn, "{err}");
return None;
}
let changed_at = IcOps::now_secs();
let previous = SubnetStateOps::publication_store_state();
let retired = SubnetStateOps::retire_detached_publication_store_binding(changed_at);
if let Some(binding) = retired.as_ref() {
let current = SubnetStateOps::publication_store_state();
Self::log_publication_state_transition(
"retire_detached_binding",
&previous,
¤t,
changed_at,
);
log!(Topic::Wasm, Ok, "ws retired {}", binding);
}
retired
}
pub fn set_current_publication_store_binding(
binding: WasmStoreBinding,
) -> Result<(), InternalError> {
let _ = store_pid_for_binding(&binding)?;
Self::ensure_retired_binding_slot_available_for_promotion()?;
let previous = SubnetStateOps::publication_store_state();
Self::ensure_binding_is_selectable_for_publication(&previous, &binding)?;
let changed_at = IcOps::now_secs();
if SubnetStateOps::activate_publication_store_binding(binding, changed_at) {
let current = SubnetStateOps::publication_store_state();
Self::log_publication_state_transition(
"pin_publication_binding",
&previous,
¤t,
changed_at,
);
}
Ok(())
}
pub fn clear_current_publication_store_binding() {
if let Err(err) = Self::ensure_retired_binding_slot_available_for_promotion() {
log!(Topic::Wasm, Warn, "{err}");
return;
}
let changed_at = IcOps::now_secs();
let previous = SubnetStateOps::publication_store_state();
if SubnetStateOps::clear_publication_store_binding(changed_at) {
let current = SubnetStateOps::publication_store_state();
Self::log_publication_state_transition(
"clear_publication_binding",
&previous,
¤t,
changed_at,
);
}
}
fn oldest_registered_store_binding() -> Option<WasmStoreBinding> {
SubnetStateOps::wasm_stores()
.into_iter()
.min_by(|left, right| left.created_at.cmp(&right.created_at))
.map(|record| record.binding)
}
fn clear_stale_publication_binding(
binding: WasmStoreBinding,
) -> Result<WasmStoreBinding, InternalError> {
log!(Topic::Wasm, Warn, "ws clear stale binding {}", binding);
let changed_at = IcOps::now_secs();
Self::ensure_retired_binding_slot_available_for_promotion()?;
let previous = SubnetStateOps::publication_store_state();
let _ = SubnetStateOps::clear_publication_store_binding(changed_at);
let current = SubnetStateOps::publication_store_state();
Self::log_publication_state_transition(
"clear_stale_publication_binding",
&previous,
¤t,
changed_at,
);
Self::oldest_registered_store_binding().ok_or_else(|| {
InternalError::workflow(
InternalErrorOrigin::Workflow,
"no registered wasm stores after clearing stale publication binding",
)
})
}
async fn create_and_activate_first_publication_store() -> Result<WasmStoreBinding, InternalError>
{
let binding = Self::create_publication_store().await?;
Self::ensure_retired_binding_slot_available_for_promotion()?;
let changed_at = IcOps::now_secs();
let previous = SubnetStateOps::publication_store_state();
let _ = SubnetStateOps::activate_publication_store_binding(binding.clone(), changed_at);
let current = SubnetStateOps::publication_store_state();
Self::log_publication_state_transition(
"activate_first_publication_binding",
&previous,
¤t,
changed_at,
);
Ok(binding)
}
fn managed_release_manifests() -> Vec<TemplateManifestResponse> {
TemplateManifestOps::approved_manifests_response()
.into_iter()
.filter(|manifest| {
manifest.role != WASM_STORE_ROLE
&& manifest.chunking_mode == TemplateChunkingMode::Chunked
})
.collect()
}
fn exact_release_candidates<'a>(
fleet: &'a PublicationStoreFleet,
manifest: &TemplateManifestResponse,
) -> Vec<&'a PublicationStoreSnapshot> {
let mut stores = fleet
.stores
.iter()
.filter(|store| store.has_exact_release(manifest))
.collect::<Vec<_>>();
stores.sort_by(|left, right| {
left.created_at
.cmp(&right.created_at)
.then(left.binding.cmp(&right.binding))
});
stores
}
fn reconciled_binding_for_manifest(
fleet: &PublicationStoreFleet,
manifest: &TemplateManifestResponse,
) -> Result<WasmStoreBinding, InternalError> {
let candidates = Self::exact_release_candidates(fleet, manifest);
if candidates.is_empty() {
return Err(InternalError::workflow(
InternalErrorOrigin::Workflow,
format!(
"fleet import missing exact release for role '{}': expected {}@{} on {}",
manifest.role, manifest.template_id, manifest.version, manifest.store_binding
),
));
}
if candidates
.iter()
.any(|store| store.binding == manifest.store_binding)
{
return Ok(manifest.store_binding.clone());
}
if let Some(binding) = fleet.preferred_binding.as_ref()
&& candidates.iter().any(|store| &store.binding == binding)
{
return Ok(binding.clone());
}
Ok(candidates[0].binding.clone())
}
fn release_label(manifest: &TemplateManifestResponse) -> String {
format!("{}@{}", manifest.template_id, manifest.version)
}
fn source_store_pid_for_manifest(
manifest: &TemplateManifestResponse,
) -> Result<Option<Principal>, InternalError> {
if manifest.store_binding == WASM_STORE_BOOTSTRAP_BINDING {
Ok(None)
} else {
store_pid_for_binding(&manifest.store_binding).map(Some)
}
}
async fn source_chunk_set_info_for_manifest(
manifest: &TemplateManifestResponse,
) -> Result<TemplateChunkSetInfoResponse, InternalError> {
match Self::source_store_pid_for_manifest(manifest)? {
Some(store_pid) => {
store_chunk_set_info(store_pid, &manifest.template_id, &manifest.version).await
}
None => TemplateChunkedOps::chunk_set_info_response(
&manifest.template_id,
&manifest.version,
),
}
}
async fn source_chunks_for_manifest(
manifest: &TemplateManifestResponse,
chunk_count: usize,
) -> Result<Vec<Vec<u8>>, InternalError> {
match Self::source_store_pid_for_manifest(manifest)? {
Some(store_pid) => {
store_chunks(
store_pid,
&manifest.template_id,
&manifest.version,
chunk_count,
)
.await
}
None => local_chunks(&manifest.template_id, &manifest.version, chunk_count),
}
}
fn is_store_capacity_exceeded(err: &InternalError) -> bool {
err.public_error().is_some_and(|public| {
public
.message
.contains(Self::WASM_STORE_CAPACITY_EXCEEDED_MESSAGE)
}) || err
.to_string()
.contains(Self::WASM_STORE_CAPACITY_EXCEEDED_MESSAGE)
}
fn mirror_manifest_to_root_state(
target_store_binding: WasmStoreBinding,
manifest: &TemplateManifestResponse,
) {
TemplateManifestOps::replace_approved_from_input(TemplateManifestInput {
template_id: manifest.template_id.clone(),
role: manifest.role.clone(),
version: manifest.version.clone(),
payload_hash: manifest.payload_hash.clone(),
payload_size_bytes: manifest.payload_size_bytes,
store_binding: target_store_binding,
chunking_mode: TemplateChunkingMode::Chunked,
manifest_state: TemplateManifestState::Approved,
approved_at: Some(IcOps::now_secs()),
created_at: manifest.created_at,
});
}
async fn resolve_managed_publication_placement(
fleet: &mut PublicationStoreFleet,
manifest: &TemplateManifestResponse,
) -> Result<PublicationPlacement, InternalError> {
if let Some(placement) = fleet.select_existing_store_for_release(manifest)? {
return Ok(placement);
}
let store_config = config::current_subnet_default_wasm_store();
if manifest.payload_size_bytes > store_config.max_store_bytes() {
return Err(InternalError::workflow(
InternalErrorOrigin::Workflow,
format!(
"release {} exceeds empty wasm store capacity: bytes {} > {}",
Self::release_label(manifest),
manifest.payload_size_bytes,
store_config.max_store_bytes()
),
));
}
let created = Self::create_store_for_fleet(fleet).await?;
let created_store = fleet
.stores
.iter()
.find(|store| store.binding == created.binding)
.ok_or_else(|| {
InternalError::workflow(
InternalErrorOrigin::Workflow,
format!("new ws '{}' missing from fleet snapshot", created.binding),
)
})?;
if !created_store.can_accept_release(manifest) {
return Err(InternalError::workflow(
InternalErrorOrigin::Workflow,
format!(
"release {} does not fit empty store {}",
Self::release_label(manifest),
created.binding
),
));
}
Ok(created)
}
async fn publish_manifest_to_store(
target_store_pid: Principal,
target_store_binding: WasmStoreBinding,
manifest: TemplateManifestResponse,
) -> Result<(), InternalError> {
let info = Self::source_chunk_set_info_for_manifest(&manifest).await?;
let chunks = Self::source_chunks_for_manifest(&manifest, info.chunk_hashes.len()).await?;
let chunk_hashes = info.chunk_hashes.clone();
let existing_hashes = MgmtOps::stored_chunks(target_store_pid)
.await?
.into_iter()
.collect::<BTreeSet<_>>();
let _: TemplateChunkSetInfoResponse = call_store_result(
target_store_pid,
protocol::CANIC_WASM_STORE_PREPARE,
(TemplateChunkSetPrepareInput {
template_id: manifest.template_id.clone(),
version: manifest.version.clone(),
payload_hash: manifest.payload_hash.clone(),
payload_size_bytes: manifest.payload_size_bytes,
chunk_hashes: chunk_hashes.clone(),
},),
)
.await?;
canic_core::perf!("publish_prepare_store");
for (chunk_index, bytes) in chunks.into_iter().enumerate() {
let chunk_index = u32::try_from(chunk_index).map_err(|_| {
InternalError::workflow(
InternalErrorOrigin::Workflow,
format!(
"template '{}' exceeds chunk index bounds",
manifest.template_id
),
)
})?;
let expected_hash = chunk_hashes[chunk_index as usize].clone();
call_store_result::<(), _>(
target_store_pid,
protocol::CANIC_WASM_STORE_PUBLISH_CHUNK,
(TemplateChunkInput {
template_id: manifest.template_id.clone(),
version: manifest.version.clone(),
chunk_index,
bytes: bytes.clone(),
},),
)
.await?;
canic_core::perf!("publish_push_store_chunk");
if !existing_hashes.contains(&expected_hash) {
let uploaded_hash = MgmtOps::upload_chunk(target_store_pid, bytes).await?;
if uploaded_hash != expected_hash {
return Err(InternalError::workflow(
InternalErrorOrigin::Workflow,
format!(
"template '{}' chunk {} hash mismatch for {}",
manifest.template_id, chunk_index, target_store_pid
),
));
}
}
}
Self::promote_manifest_to_target_store(
target_store_pid,
target_store_binding.clone(),
TemplateManifestInput {
template_id: manifest.template_id.clone(),
role: manifest.role.clone(),
version: manifest.version.clone(),
payload_hash: manifest.payload_hash.clone(),
payload_size_bytes: manifest.payload_size_bytes,
store_binding: manifest.store_binding,
chunking_mode: TemplateChunkingMode::Chunked,
manifest_state: TemplateManifestState::Approved,
approved_at: Some(IcOps::now_secs()),
created_at: manifest.created_at,
},
)
.await?;
canic_core::perf!("publish_promote_manifest");
log!(
Topic::Wasm,
Ok,
"tpl.publish {} -> {}@{} (store={}, chunks={})",
manifest.role,
manifest.template_id,
manifest.version,
target_store_pid,
chunk_hashes.len()
);
Ok(())
}
async fn publish_manifest_to_managed_fleet(
fleet: &mut PublicationStoreFleet,
manifest: TemplateManifestResponse,
) -> Result<(), InternalError> {
let release_label = Self::release_label(&manifest);
let placement = Self::resolve_managed_publication_placement(fleet, &manifest).await?;
match placement.action {
PublicationPlacementAction::Reuse => {
Self::mirror_manifest_to_root_state(placement.binding.clone(), &manifest);
log!(
Topic::Wasm,
Info,
"ws reuse {} on {} ({})",
release_label,
placement.binding,
placement.pid
);
}
PublicationPlacementAction::Publish | PublicationPlacementAction::Create => {
let action_label = if placement.action == PublicationPlacementAction::Create {
"create"
} else {
"publish"
};
match Self::publish_manifest_to_store(
placement.pid,
placement.binding.clone(),
manifest.clone(),
)
.await
{
Ok(()) => {
log!(
Topic::Wasm,
Info,
"ws place {} mode={} binding={} pid={}",
release_label,
action_label,
placement.binding,
placement.pid
);
}
Err(err) if Self::is_store_capacity_exceeded(&err) => {
if placement.action == PublicationPlacementAction::Create {
return Err(err);
}
let retry = Self::create_store_for_fleet(fleet).await?;
Self::publish_manifest_to_store(
retry.pid,
retry.binding.clone(),
manifest.clone(),
)
.await?;
log!(
Topic::Wasm,
Warn,
"ws rollover {} from {} to {}",
release_label,
placement.binding,
retry.binding
);
fleet.record_placement(&retry.binding, &manifest);
return Ok(());
}
Err(err) => return Err(err),
}
}
}
fleet.record_placement(&placement.binding, &manifest);
Ok(())
}
pub async fn publish_staged_release_set_to_current_store() -> Result<(), InternalError> {
let manifests = Self::managed_release_manifests()
.into_iter()
.filter(|manifest| manifest.store_binding == WASM_STORE_BOOTSTRAP_BINDING)
.collect::<Vec<_>>();
for manifest in &manifests {
TemplateChunkedOps::validate_staged_release(manifest)?;
}
let mut fleet = Self::snapshot_publication_store_fleet().await?;
for manifest in manifests {
Self::publish_manifest_to_managed_fleet(&mut fleet, manifest).await?;
}
Ok(())
}
pub async fn publish_current_release_set_to_store(
target_store_pid: Principal,
) -> Result<(), InternalError> {
let target_store_binding = store_binding_for_pid(target_store_pid)?;
let target_status = store_status(target_store_pid).await?;
let target_catalog = store_catalog(target_store_pid).await?;
let mut target_store = PublicationStoreSnapshot {
binding: target_store_binding.clone(),
pid: target_store_pid,
created_at: IcOps::now_secs(),
status: target_status,
releases: target_catalog,
};
for manifest in Self::managed_release_manifests() {
if target_store.has_exact_release(&manifest) {
Self::mirror_manifest_to_root_state(target_store_binding.clone(), &manifest);
continue;
}
if !target_store.can_accept_release(&manifest) {
return Err(InternalError::workflow(
InternalErrorOrigin::Workflow,
format!(
"target ws '{}' cannot fit {}",
target_store_binding,
Self::release_label(&manifest)
),
));
}
Self::publish_manifest_to_store(
target_store_pid,
target_store_binding.clone(),
manifest.clone(),
)
.await?;
target_store.record_release(&manifest);
}
Ok(())
}
pub async fn import_current_store_catalog() -> Result<(), InternalError> {
let fleet = Self::snapshot_publication_store_fleet().await?;
for manifest in Self::managed_release_manifests() {
let binding = Self::reconciled_binding_for_manifest(&fleet, &manifest)?;
TemplateManifestOps::replace_approved_from_input(TemplateManifestInput {
template_id: manifest.template_id,
role: manifest.role,
version: manifest.version,
payload_hash: manifest.payload_hash,
payload_size_bytes: manifest.payload_size_bytes,
store_binding: binding,
chunking_mode: manifest.chunking_mode,
manifest_state: manifest.manifest_state,
approved_at: manifest.approved_at,
created_at: manifest.created_at,
});
}
Ok(())
}
pub async fn publish_current_release_set_to_current_store() -> Result<(), InternalError> {
let mut fleet = Self::snapshot_publication_store_fleet().await?;
for manifest in Self::managed_release_manifests() {
Self::publish_manifest_to_managed_fleet(&mut fleet, manifest).await?;
}
Ok(())
}
async fn promote_manifest_to_target_store(
target_store_pid: Principal,
target_store_binding: WasmStoreBinding,
manifest: TemplateManifestInput,
) -> Result<(), InternalError> {
store_stage_manifest(
target_store_pid,
TemplateManifestInput {
store_binding: target_store_binding.clone(),
..manifest.clone()
},
)
.await?;
TemplateManifestOps::replace_approved_from_input(TemplateManifestInput {
store_binding: target_store_binding,
..manifest
});
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::{
PublicationPlacementAction, PublicationStoreFleet, PublicationStoreSnapshot,
WasmStorePublicationWorkflow,
};
use crate::{
dto::template::{
TemplateManifestResponse, WasmStoreCatalogEntryResponse, WasmStoreGcStatusResponse,
WasmStoreStatusResponse, WasmStoreTemplateStatusResponse,
},
ids::WasmStoreBinding,
ids::{
CanisterRole, TemplateChunkingMode, TemplateId, TemplateManifestState, TemplateVersion,
},
ops::storage::state::subnet::SubnetStateOps,
storage::stable::state::subnet::{PublicationStoreStateRecord, SubnetStateRecord},
};
use candid::Principal;
fn manifest(
role: &'static str,
template_id: &'static str,
version: &'static str,
payload_hash: u8,
payload_size_bytes: u64,
) -> TemplateManifestResponse {
TemplateManifestResponse {
template_id: TemplateId::new(template_id),
role: CanisterRole::new(role),
version: TemplateVersion::new(version),
payload_hash: vec![payload_hash; 32],
payload_size_bytes,
store_binding: WasmStoreBinding::new("bootstrap"),
chunking_mode: TemplateChunkingMode::Chunked,
manifest_state: TemplateManifestState::Approved,
approved_at: Some(10),
created_at: 9,
}
}
fn store(
binding: &'static str,
pid_byte: u8,
created_at: u64,
remaining_store_bytes: u64,
releases: Vec<WasmStoreCatalogEntryResponse>,
templates: Vec<WasmStoreTemplateStatusResponse>,
) -> PublicationStoreSnapshot {
PublicationStoreSnapshot {
binding: WasmStoreBinding::new(binding),
pid: Principal::from_slice(&[pid_byte; 29]),
created_at,
status: WasmStoreStatusResponse {
gc: WasmStoreGcStatusResponse {
mode: crate::ids::WasmStoreGcMode::Normal,
changed_at: 0,
prepared_at: None,
started_at: None,
completed_at: None,
runs_completed: 0,
},
occupied_store_bytes: 40_000_000_u64.saturating_sub(remaining_store_bytes),
occupied_store_size: String::new(),
max_store_bytes: 40_000_000,
max_store_size: String::new(),
remaining_store_bytes,
remaining_store_size: String::new(),
headroom_bytes: Some(4_000_000),
headroom_size: None,
within_headroom: remaining_store_bytes <= 4_000_000,
template_count: u32::try_from(templates.len()).unwrap_or(u32::MAX),
max_templates: None,
release_count: u32::try_from(releases.len()).unwrap_or(u32::MAX),
max_template_versions_per_template: None,
templates,
},
releases,
}
}
#[test]
fn promotion_is_blocked_when_it_would_overwrite_retired_binding() {
SubnetStateOps::import(SubnetStateRecord {
publication_store: PublicationStoreStateRecord {
active_binding: Some(WasmStoreBinding::new("active")),
detached_binding: Some(WasmStoreBinding::new("detached")),
retired_binding: Some(WasmStoreBinding::new("retired")),
generation: 3,
changed_at: 30,
retired_at: 20,
},
wasm_stores: Vec::new(),
});
let err =
WasmStorePublicationWorkflow::ensure_retired_binding_slot_available_for_promotion()
.expect_err("promotion must fail closed while retired binding is still pending");
assert!(err.to_string().contains("rollover blocked"));
}
#[test]
fn explicit_retirement_is_blocked_when_retired_binding_already_exists() {
SubnetStateOps::import(SubnetStateRecord {
publication_store: PublicationStoreStateRecord {
active_binding: Some(WasmStoreBinding::new("active")),
detached_binding: Some(WasmStoreBinding::new("detached")),
retired_binding: Some(WasmStoreBinding::new("retired")),
generation: 3,
changed_at: 30,
retired_at: 20,
},
wasm_stores: Vec::new(),
});
let err =
WasmStorePublicationWorkflow::ensure_retired_binding_slot_available_for_retirement()
.expect_err("retirement must fail closed while an older retired binding exists");
assert!(err.to_string().contains("retirement blocked"));
}
#[test]
fn detached_and_retired_bindings_are_not_publication_candidates() {
let state = PublicationStoreStateRecord {
active_binding: Some(WasmStoreBinding::new("active")),
detached_binding: Some(WasmStoreBinding::new("detached")),
retired_binding: Some(WasmStoreBinding::new("retired")),
generation: 3,
changed_at: 30,
retired_at: 20,
};
assert!(
!WasmStorePublicationWorkflow::binding_is_reserved_for_publication(
&state,
&WasmStoreBinding::new("active"),
)
);
assert!(
WasmStorePublicationWorkflow::binding_is_reserved_for_publication(
&state,
&WasmStoreBinding::new("detached"),
)
);
assert!(
WasmStorePublicationWorkflow::binding_is_reserved_for_publication(
&state,
&WasmStoreBinding::new("retired"),
)
);
}
#[test]
fn exact_release_is_reused_before_new_store_is_created() {
let manifest = manifest("app", "embedded:app", "0.20.9", 7, 512);
let fleet = PublicationStoreFleet {
preferred_binding: Some(WasmStoreBinding::new("primary")),
reserved_state: PublicationStoreStateRecord::default(),
stores: vec![store(
"primary",
1,
10,
20_000_000,
vec![WasmStoreCatalogEntryResponse {
role: manifest.role.clone(),
template_id: manifest.template_id.clone(),
version: manifest.version.clone(),
payload_hash: manifest.payload_hash.clone(),
payload_size_bytes: manifest.payload_size_bytes,
}],
vec![WasmStoreTemplateStatusResponse {
template_id: manifest.template_id.clone(),
versions: 1,
}],
)],
};
let placement = fleet
.select_existing_store_for_release(&manifest)
.expect("selection must succeed")
.expect("exact release must be reusable");
assert_eq!(placement.binding, WasmStoreBinding::new("primary"));
assert_eq!(placement.action, PublicationPlacementAction::Reuse);
}
#[test]
fn conflicting_duplicate_release_is_rejected() {
let manifest = manifest("app", "embedded:app", "0.20.9", 7, 512);
let fleet = PublicationStoreFleet {
preferred_binding: Some(WasmStoreBinding::new("primary")),
reserved_state: PublicationStoreStateRecord::default(),
stores: vec![store(
"primary",
1,
10,
20_000_000,
vec![WasmStoreCatalogEntryResponse {
role: manifest.role.clone(),
template_id: manifest.template_id.clone(),
version: manifest.version.clone(),
payload_hash: vec![9; 32],
payload_size_bytes: manifest.payload_size_bytes,
}],
vec![WasmStoreTemplateStatusResponse {
template_id: manifest.template_id.clone(),
versions: 1,
}],
)],
};
let err = fleet
.select_existing_store_for_release(&manifest)
.expect_err("conflicting duplicate release must fail");
assert!(err.to_string().contains("ws conflict"));
}
#[test]
fn placement_uses_another_store_before_requesting_new_capacity() {
let manifest = manifest("app", "embedded:app", "0.20.9", 7, 8_000_000);
let fleet = PublicationStoreFleet {
preferred_binding: Some(WasmStoreBinding::new("primary")),
reserved_state: PublicationStoreStateRecord::default(),
stores: vec![
store("primary", 1, 10, 2_000_000, Vec::new(), Vec::new()),
store("secondary", 2, 20, 16_000_000, Vec::new(), Vec::new()),
],
};
let placement = fleet
.select_existing_store_for_release(&manifest)
.expect("selection must succeed")
.expect("a second store should be selected");
assert_eq!(placement.binding, WasmStoreBinding::new("secondary"));
assert_eq!(placement.action, PublicationPlacementAction::Publish);
}
#[test]
fn reconcile_binding_ignores_older_role_versions_on_other_stores() {
let manifest = manifest("app", "embedded:app", "0.20.10", 7, 512);
let fleet = PublicationStoreFleet {
preferred_binding: Some(WasmStoreBinding::new("primary")),
reserved_state: PublicationStoreStateRecord::default(),
stores: vec![
store(
"primary",
1,
10,
20_000_000,
vec![WasmStoreCatalogEntryResponse {
role: manifest.role.clone(),
template_id: manifest.template_id.clone(),
version: manifest.version.clone(),
payload_hash: manifest.payload_hash.clone(),
payload_size_bytes: manifest.payload_size_bytes,
}],
vec![WasmStoreTemplateStatusResponse {
template_id: manifest.template_id.clone(),
versions: 1,
}],
),
store(
"secondary",
2,
20,
20_000_000,
vec![WasmStoreCatalogEntryResponse {
role: manifest.role.clone(),
template_id: manifest.template_id.clone(),
version: TemplateVersion::new("0.20.9"),
payload_hash: vec![5; 32],
payload_size_bytes: manifest.payload_size_bytes,
}],
vec![WasmStoreTemplateStatusResponse {
template_id: manifest.template_id.clone(),
versions: 1,
}],
),
],
};
let binding =
WasmStorePublicationWorkflow::reconciled_binding_for_manifest(&fleet, &manifest)
.expect("older versions on another store must not conflict");
assert_eq!(binding, WasmStoreBinding::new("primary"));
}
#[test]
fn reconcile_binding_uses_preferred_exact_duplicate_when_current_binding_is_gone() {
let mut manifest = manifest("app", "embedded:app", "0.20.10", 7, 512);
manifest.store_binding = WasmStoreBinding::new("missing");
let fleet = PublicationStoreFleet {
preferred_binding: Some(WasmStoreBinding::new("secondary")),
reserved_state: PublicationStoreStateRecord::default(),
stores: vec![
store(
"primary",
1,
10,
20_000_000,
vec![WasmStoreCatalogEntryResponse {
role: manifest.role.clone(),
template_id: manifest.template_id.clone(),
version: manifest.version.clone(),
payload_hash: manifest.payload_hash.clone(),
payload_size_bytes: manifest.payload_size_bytes,
}],
vec![WasmStoreTemplateStatusResponse {
template_id: manifest.template_id.clone(),
versions: 1,
}],
),
store(
"secondary",
2,
20,
20_000_000,
vec![WasmStoreCatalogEntryResponse {
role: manifest.role.clone(),
template_id: manifest.template_id.clone(),
version: manifest.version.clone(),
payload_hash: manifest.payload_hash.clone(),
payload_size_bytes: manifest.payload_size_bytes,
}],
vec![WasmStoreTemplateStatusResponse {
template_id: manifest.template_id.clone(),
versions: 1,
}],
),
],
};
let binding =
WasmStorePublicationWorkflow::reconciled_binding_for_manifest(&fleet, &manifest)
.expect("an exact duplicate on the preferred store should be reusable");
assert_eq!(binding, WasmStoreBinding::new("secondary"));
}
#[test]
fn reconcile_binding_rejects_missing_exact_release() {
let manifest = manifest("app", "embedded:app", "0.20.10", 7, 512);
let fleet = PublicationStoreFleet {
preferred_binding: Some(WasmStoreBinding::new("primary")),
reserved_state: PublicationStoreStateRecord::default(),
stores: vec![store(
"primary",
1,
10,
20_000_000,
vec![WasmStoreCatalogEntryResponse {
role: manifest.role.clone(),
template_id: manifest.template_id.clone(),
version: TemplateVersion::new("0.20.9"),
payload_hash: manifest.payload_hash.clone(),
payload_size_bytes: manifest.payload_size_bytes,
}],
vec![WasmStoreTemplateStatusResponse {
template_id: manifest.template_id.clone(),
versions: 1,
}],
)],
};
let err = WasmStorePublicationWorkflow::reconciled_binding_for_manifest(&fleet, &manifest)
.expect_err("reconcile must fail when the exact approved release disappeared");
assert!(err.to_string().contains("missing exact release"));
}
}