canic-core 0.56.0

Canic — a canister orchestration and management toolkit for the Internet Computer
Documentation
mod allocation;
mod bootstrap;
pub mod query;
mod registry;

use crate::{
    InternalError, InternalErrorOrigin,
    cdk::types::Principal,
    config::schema::{ShardPool, ShardPoolPolicy},
    domain::policy::placement::sharding::{
        CreateBlockedReason, ShardingPlanState, ShardingPolicy, ShardingPolicyError, ShardingState,
        compute_pool_metrics,
    },
    dto::placement::sharding::ShardingPlanStateResponse,
    ids::CanisterRole,
    log::Topic,
    ops::{
        config::ConfigOps,
        placement::sharding::mapper::{
            ShardPartitionKeyAssignmentPolicyInputMapper, ShardPlacementPolicyInputMapper,
            ShardingPlanStateResponseMapper,
        },
        runtime::metrics::{
            recording::ShardingMetricEvent as MetricEvent,
            sharding::{
                ShardingMetricOperation as MetricOperation, ShardingMetricReason as MetricReason,
            },
        },
        storage::placement::{
            sharding::ShardingRegistryOps, sharding_lifecycle::ShardingLifecycleOps,
        },
    },
};
use std::collections::BTreeSet;
use thiserror::Error as ThisError;

#[derive(Debug, ThisError)]
pub enum ShardingWorkflowError {
    #[error(transparent)]
    Policy(#[from] ShardingPolicyError),

    #[error("invariant violation: {0}")]
    Invariant(&'static str),
}

impl From<ShardingWorkflowError> for InternalError {
    fn from(err: ShardingWorkflowError) -> Self {
        match err {
            ShardingWorkflowError::Policy(err) => {
                Self::domain(InternalErrorOrigin::Domain, err.to_string())
            }
            ShardingWorkflowError::Invariant(msg) => {
                Self::invariant(InternalErrorOrigin::Workflow, msg)
            }
        }
    }
}

pub struct ShardingWorkflow;

impl ShardingWorkflow {
    pub async fn assign_to_pool(
        pool: &str,
        partition_key: impl AsRef<str>,
    ) -> Result<Principal, InternalError> {
        let pool_cfg = match Self::get_shard_pool_cfg(pool) {
            Ok(pool_cfg) => pool_cfg,
            Err(err) => {
                MetricEvent::started(MetricOperation::Assign);
                MetricEvent::failed(MetricOperation::Assign, &err);
                return Err(err);
            }
        };
        Self::assign_with_policy(
            &pool_cfg.canister_role,
            pool,
            partition_key.as_ref(),
            pool_cfg.policy,
            None,
        )
        .await
    }

    #[expect(clippy::too_many_lines)]
    pub async fn assign_with_policy(
        canister_role: &CanisterRole,
        pool: &str,
        partition_key: &str,
        policy: ShardPoolPolicy,
        extra_arg: Option<Vec<u8>>,
    ) -> Result<Principal, InternalError> {
        MetricEvent::started(MetricOperation::Assign);
        let active = ShardingLifecycleOps::active_shards();
        crate::perf!("load_active_shards");
        if active.is_empty() {
            return match Self::assign_bootstrap_created(
                canister_role,
                pool,
                partition_key,
                &policy,
                extra_arg,
            )
            .await
            {
                Ok(pid) => {
                    MetricEvent::completed(MetricOperation::Assign, MetricReason::CreateAllowed);
                    Ok(pid)
                }
                Err(err) => {
                    MetricEvent::failed(MetricOperation::Assign, &err);
                    Err(err)
                }
            };
        }

        let active_set: BTreeSet<_> = active.into_iter().collect();
        let routable_active = Self::routable_active_set(&active_set);

        let entry_views: Vec<_> = ShardingRegistryOps::entries_for_pool(pool)
            .iter()
            .filter(|(pid, _)| routable_active.contains(pid))
            .map(|(pid, entry)| {
                ShardPlacementPolicyInputMapper::record_to_policy_input(*pid, entry)
            })
            .collect();

        let metrics = compute_pool_metrics(pool, &entry_views);

        let assignments_raw = ShardingRegistryOps::assignments_for_pool(pool);
        let assignment_views: Vec<_> = assignments_raw
            .iter()
            .filter(|(_, pid)| routable_active.contains(pid))
            .map(|(key, pid)| {
                ShardPartitionKeyAssignmentPolicyInputMapper::record_to_policy_input(key, *pid)
            })
            .collect();
        crate::perf!("collect_registry");

        let state = ShardingState {
            pool,
            config: ShardPool {
                canister_role: canister_role.clone(),
                policy: policy.clone(),
            },
            metrics: &metrics,
            entries: &entry_views,
            assignments: &assignment_views,
        };

        MetricEvent::started(MetricOperation::PlanAssign);
        let plan = ShardingPolicy::plan_assign(&state, partition_key, None);
        crate::perf!("plan_assign");

        match plan.state {
            ShardingPlanState::AlreadyAssigned { pid } => {
                MetricEvent::skipped(MetricOperation::PlanAssign, MetricReason::AlreadyAssigned);
                crate::perf!("already_assigned");
                let slot = plan
                    .target_slot
                    .or_else(|| ShardingRegistryOps::slot_for_shard(pool, pid));

                crate::log!(
                    Topic::Sharding,
                    Info,
                    "📦 partition_key={partition_key} already shard={pid} pool={pool} slot={slot:?}"
                );

                MetricEvent::completed(MetricOperation::Assign, MetricReason::AlreadyAssigned);
                Ok(pid)
            }

            ShardingPlanState::UseExisting { pid } => {
                MetricEvent::completed(MetricOperation::PlanAssign, MetricReason::ExistingCapacity);
                MetricEvent::started(MetricOperation::AssignKey);
                if let Err(err) = ShardingRegistryOps::assign(pool, partition_key, pid) {
                    MetricEvent::failed(MetricOperation::AssignKey, &err);
                    MetricEvent::failed(MetricOperation::Assign, &err);
                    return Err(err);
                }
                MetricEvent::completed(MetricOperation::AssignKey, MetricReason::ExistingCapacity);
                crate::perf!("assign_existing");

                let slot = plan
                    .target_slot
                    .or_else(|| ShardingRegistryOps::slot_for_shard(pool, pid));

                crate::log!(
                    Topic::Sharding,
                    Info,
                    "📦 partition_key={partition_key} assigned shard={pid} pool={pool} slot={slot:?}"
                );

                MetricEvent::completed(MetricOperation::Assign, MetricReason::ExistingCapacity);
                Ok(pid)
            }

            ShardingPlanState::CreateAllowed => {
                let Some(slot) = plan.target_slot else {
                    MetricEvent::failed_reason(
                        MetricOperation::PlanAssign,
                        MetricReason::InvalidState,
                    );
                    MetricEvent::failed_reason(MetricOperation::Assign, MetricReason::InvalidState);
                    return Err(ShardingWorkflowError::Invariant(
                        "sharding policy allowed creation but returned no slot",
                    )
                    .into());
                };
                MetricEvent::completed(MetricOperation::PlanAssign, MetricReason::CreateAllowed);

                let pid =
                    match Self::allocate_and_admit(pool, slot, canister_role, &policy, extra_arg)
                        .await
                    {
                        Ok(pid) => pid,
                        Err(err) => {
                            MetricEvent::failed(MetricOperation::Assign, &err);
                            return Err(err);
                        }
                    };
                crate::perf!("allocate_shard");

                MetricEvent::started(MetricOperation::AssignKey);
                if let Err(err) = ShardingRegistryOps::assign(pool, partition_key, pid) {
                    MetricEvent::failed(MetricOperation::AssignKey, &err);
                    MetricEvent::failed(MetricOperation::Assign, &err);
                    return Err(err);
                }
                MetricEvent::completed(MetricOperation::AssignKey, MetricReason::CreateAllowed);
                crate::perf!("assign_created");

                crate::log!(
                    Topic::Sharding,
                    Ok,
                    "✨ partition_key={partition_key} created+assigned shard={pid} pool={pool} slot={slot}"
                );

                MetricEvent::completed(MetricOperation::Assign, MetricReason::CreateAllowed);
                Ok(pid)
            }

            ShardingPlanState::CreateBlocked { reason } => {
                let metric_reason = MetricReason::from_create_blocked_reason(&reason);
                MetricEvent::skipped(MetricOperation::PlanAssign, metric_reason);
                MetricEvent::failed_reason(MetricOperation::Assign, metric_reason);
                crate::perf!("create_blocked");
                Err(Self::blocked(reason, pool, partition_key))
            }
        }
    }

    pub fn plan_assign_to_pool(
        pool: &str,
        partition_key: impl AsRef<str>,
    ) -> Result<ShardingPlanStateResponse, InternalError> {
        let pool_cfg = Self::get_shard_pool_cfg(pool)?;
        let partition_key = partition_key.as_ref();

        let active = ShardingLifecycleOps::active_shards();
        if active.is_empty() {
            Self::ensure_bootstrap_capacity(pool, partition_key, &pool_cfg.policy)?;
        }

        let active_set: BTreeSet<_> = active.into_iter().collect();
        let routable_active = Self::routable_active_set(&active_set);

        let entry_views: Vec<_> = ShardingRegistryOps::entries_for_pool(pool)
            .iter()
            .filter(|(pid, _)| routable_active.contains(pid))
            .map(|(pid, entry)| {
                ShardPlacementPolicyInputMapper::record_to_policy_input(*pid, entry)
            })
            .collect();

        let metrics = compute_pool_metrics(pool, &entry_views);

        let assignments_raw = ShardingRegistryOps::assignments_for_pool(pool);
        let assignment_views: Vec<_> = assignments_raw
            .iter()
            .filter(|(_, pid)| routable_active.contains(pid))
            .map(|(key, pid)| {
                ShardPartitionKeyAssignmentPolicyInputMapper::record_to_policy_input(key, *pid)
            })
            .collect();

        let state = ShardingState {
            pool,
            config: pool_cfg,
            metrics: &metrics,
            entries: &entry_views,
            assignments: &assignment_views,
        };

        let plan = ShardingPolicy::plan_assign(&state, partition_key, None);
        Ok(ShardingPlanStateResponseMapper::record_to_view(plan.state))
    }

    fn blocked(reason: CreateBlockedReason, pool: &str, partition_key: &str) -> InternalError {
        ShardingWorkflowError::Policy(ShardingPolicyError::ShardCreationBlocked {
            reason,
            partition_key: partition_key.to_string(),
            pool: pool.to_string(),
        })
        .into()
    }

    fn get_shard_pool_cfg(pool: &str) -> Result<ShardPool, InternalError> {
        let sharding = ConfigOps::current_canister()?
            .sharding
            .ok_or(ShardingPolicyError::ShardingDisabled)?;
        let available = if sharding.pools.is_empty() {
            "<none>".to_string()
        } else {
            let mut names: Vec<_> = sharding.pools.keys().cloned().collect();
            names.sort_unstable();
            names.join(", ")
        };

        sharding
            .pools
            .get(pool)
            .cloned()
            .ok_or_else(|| ShardingPolicyError::PoolNotFound {
                requested: pool.to_string(),
                available: available.clone(),
            })
            .map_err(InternalError::from)
    }
}