canic_core/ops/placement/
scaling.rs

1//! Policy layer for scaling worker pools.
2//!
3//! Scaling builds on top of [`ScalingRegistry`] and the configuration entries
4//! under `[canisters.<type>.scaling]`. The helpers in this module apply policy
5//! decisions, create new workers when necessary, and surface registry
6//! snapshots for diagnostics.
7
8pub use crate::ops::storage::scaling::ScalingRegistryView;
9
10use crate::{
11    Error, ThisError,
12    cdk::utils::time::now_secs,
13    config::schema::ScalePool,
14    ops::{
15        config::ConfigOps,
16        rpc::{CreateCanisterParent, create_canister_request},
17        storage::scaling::{ScalingWorkerRegistryStorageOps, WorkerEntry},
18    },
19};
20use candid::Principal;
21
22///
23/// ScalingOpsError
24/// Errors raised by scaling operations (policy / orchestration layer)
25///
26
27#[derive(Debug, ThisError)]
28pub enum ScalingOpsError {
29    #[error("scaling capability disabled for this canister")]
30    ScalingDisabled,
31
32    #[error("scaling pool '{0}' not found")]
33    PoolNotFound(String),
34
35    #[error("scaling plan rejected: {0}")]
36    PlanRejected(String),
37}
38
39impl From<ScalingOpsError> for Error {
40    fn from(err: ScalingOpsError) -> Self {
41        Self::OpsError(err.to_string())
42    }
43}
44
45///
46/// ScalingPlan
47/// Result of a dry-run evaluation for scaling decisions
48///
49
50#[derive(Clone, Debug)]
51pub struct ScalingPlan {
52    /// Whether a new worker should be spawned.
53    pub should_spawn: bool,
54
55    /// Explanation / debug string for the decision.
56    pub reason: String,
57}
58
59///
60/// ScalingRegistryOps
61///
62
63pub struct ScalingRegistryOps;
64
65impl ScalingRegistryOps {
66    /// Evaluate scaling policy for a pool without side effects.
67    #[allow(clippy::cast_possible_truncation)]
68    pub fn plan_create_worker(pool: &str) -> Result<ScalingPlan, Error> {
69        let pool_cfg = Self::get_scaling_pool_cfg(pool)?;
70        let policy = pool_cfg.policy;
71        let worker_count = ScalingWorkerRegistryStorageOps::find_by_pool(pool).len() as u32;
72
73        if policy.max_workers > 0 && worker_count >= policy.max_workers {
74            return Ok(ScalingPlan {
75                should_spawn: false,
76                reason: format!(
77                    "pool '{pool}' at max_workers ({}/{})",
78                    worker_count, policy.max_workers
79                ),
80            });
81        }
82
83        if worker_count < policy.min_workers {
84            return Ok(ScalingPlan {
85                should_spawn: true,
86                reason: format!(
87                    "pool '{pool}' below min_workers (current {worker_count}, min {})",
88                    policy.min_workers
89                ),
90            });
91        }
92
93        Ok(ScalingPlan {
94            should_spawn: false,
95            reason: format!(
96                "pool '{pool}' within policy bounds (current {worker_count}, min {}, max {})",
97                policy.min_workers, policy.max_workers
98            ),
99        })
100    }
101
102    /// Look up the config for a given pool on the *current canister*.
103    fn get_scaling_pool_cfg(pool: &str) -> Result<ScalePool, Error> {
104        let cfg = ConfigOps::current_canister()?;
105        let scale_cfg = cfg.scaling.ok_or(ScalingOpsError::ScalingDisabled)?;
106
107        let pool_cfg = scale_cfg
108            .pools
109            .get(pool)
110            .ok_or_else(|| ScalingOpsError::PoolNotFound(pool.to_string()))?;
111
112        Ok(pool_cfg.clone())
113    }
114
115    /// Export a snapshot of the current registry state.
116    #[must_use]
117    pub fn export() -> ScalingRegistryView {
118        ScalingWorkerRegistryStorageOps::export()
119    }
120
121    /// Create a new worker canister in the given pool and register it.
122    pub async fn create_worker(pool: &str) -> Result<Principal, Error> {
123        // 1. Evaluate policy
124        let plan = Self::plan_create_worker(pool)?;
125        if !plan.should_spawn {
126            return Err(ScalingOpsError::PlanRejected(plan.reason))?;
127        }
128
129        // 2. Look up pool config
130        let pool_cfg = Self::get_scaling_pool_cfg(pool)?;
131        let ty = pool_cfg.canister_type.clone();
132
133        // 3. Create the canister
134        let pid = create_canister_request::<()>(&ty, CreateCanisterParent::ThisCanister, None)
135            .await?
136            .new_canister_pid;
137
138        // 4. Register in memory
139        let entry = WorkerEntry {
140            pool: pool.to_string(),
141            canister_type: ty,
142            created_at_secs: now_secs(),
143            // load_bps: 0 by default (no load yet)
144        };
145
146        ScalingWorkerRegistryStorageOps::insert(pid, entry);
147
148        Ok(pid)
149    }
150
151    /// Convenience: return only the decision flag for a pool.
152    pub fn should_spawn_worker(pool: &str) -> Result<bool, Error> {
153        Ok(Self::plan_create_worker(pool)?.should_spawn)
154    }
155}