canic_core/ops/placement/
scaling.rs

1//! Policy layer for scaling worker pools.
2//!
3//! Scaling builds on top of [`ScalingRegistry`] and the configuration entries
4//! under `[canisters.<type>.scaling]`. The helpers in this module apply policy
5//! decisions, create new workers when necessary, and surface registry
6//! snapshots for diagnostics.
7
8pub use crate::ops::storage::scaling::ScalingRegistryView;
9
10use crate::{
11    Error, ThisError,
12    cdk::utils::time::now_secs,
13    config::schema::ScalePool,
14    ops::{
15        config::ConfigOps,
16        rpc::{CreateCanisterParent, create_canister_request},
17        storage::scaling::{ScalingWorkerRegistryStorageOps, WorkerEntry},
18    },
19};
20use candid::Principal;
21
22///
23/// ScalingOpsError
24/// Errors raised by scaling operations (policy / orchestration layer)
25///
26
27#[derive(Debug, ThisError)]
28pub enum ScalingOpsError {
29    #[error("scaling capability disabled for this canister")]
30    ScalingDisabled,
31
32    #[error("scaling pool '{0}' not found")]
33    PoolNotFound(String),
34
35    #[error("invalid scaling key: {0}")]
36    InvalidKey(String),
37
38    #[error("scaling plan rejected: {0}")]
39    PlanRejected(String),
40}
41
42impl From<ScalingOpsError> for Error {
43    fn from(err: ScalingOpsError) -> Self {
44        Self::OpsError(err.to_string())
45    }
46}
47
48///
49/// ScalingPlan
50/// Result of a dry-run evaluation for scaling decisions
51///
52
53#[derive(Clone, Debug)]
54pub struct ScalingPlan {
55    /// Whether a new worker should be spawned.
56    pub should_spawn: bool,
57
58    /// Explanation / debug string for the decision.
59    pub reason: String,
60}
61
62///
63/// ScalingRegistryOps
64///
65
66pub struct ScalingRegistryOps;
67
68impl ScalingRegistryOps {
69    /// Evaluate scaling policy for a pool without side effects.
70    #[allow(clippy::cast_possible_truncation)]
71    pub fn plan_create_worker(pool: &str) -> Result<ScalingPlan, Error> {
72        let pool_cfg = Self::get_scaling_pool_cfg(pool)?;
73        let policy = pool_cfg.policy;
74        let worker_count = ScalingWorkerRegistryStorageOps::find_by_pool(pool).len() as u32;
75
76        if policy.max_workers > 0 && worker_count >= policy.max_workers {
77            return Ok(ScalingPlan {
78                should_spawn: false,
79                reason: format!(
80                    "pool '{pool}' at max_workers ({}/{})",
81                    worker_count, policy.max_workers
82                ),
83            });
84        }
85
86        if worker_count < policy.min_workers {
87            return Ok(ScalingPlan {
88                should_spawn: true,
89                reason: format!(
90                    "pool '{pool}' below min_workers (current {worker_count}, min {})",
91                    policy.min_workers
92                ),
93            });
94        }
95
96        Ok(ScalingPlan {
97            should_spawn: false,
98            reason: format!(
99                "pool '{pool}' within policy bounds (current {worker_count}, min {}, max {})",
100                policy.min_workers, policy.max_workers
101            ),
102        })
103    }
104
105    /// Look up the config for a given pool on the *current canister*.
106    fn get_scaling_pool_cfg(pool: &str) -> Result<ScalePool, Error> {
107        let cfg = ConfigOps::current_canister()?;
108        let scale_cfg = cfg.scaling.ok_or(ScalingOpsError::ScalingDisabled)?;
109
110        let pool_cfg = scale_cfg
111            .pools
112            .get(pool)
113            .ok_or_else(|| ScalingOpsError::PoolNotFound(pool.to_string()))?;
114
115        Ok(pool_cfg.clone())
116    }
117
118    /// Export a snapshot of the current registry state.
119    #[must_use]
120    pub fn export() -> ScalingRegistryView {
121        ScalingWorkerRegistryStorageOps::export()
122    }
123
124    /// Create a new worker canister in the given pool and register it.
125    pub async fn create_worker(pool: &str) -> Result<Principal, Error> {
126        // 1. Evaluate policy
127        let plan = Self::plan_create_worker(pool)?;
128        if !plan.should_spawn {
129            return Err(ScalingOpsError::PlanRejected(plan.reason))?;
130        }
131
132        // 2. Look up pool config
133        let pool_cfg = Self::get_scaling_pool_cfg(pool)?;
134        let ty = pool_cfg.canister_type.clone();
135
136        // 3. Create the canister
137        let pid = create_canister_request::<()>(&ty, CreateCanisterParent::ThisCanister, None)
138            .await?
139            .new_canister_pid;
140
141        // 4. Register in memory
142        let entry =
143            WorkerEntry::try_new(pool, ty, now_secs()).map_err(ScalingOpsError::InvalidKey)?;
144
145        ScalingWorkerRegistryStorageOps::insert(pid, entry);
146
147        Ok(pid)
148    }
149
150    /// Convenience: return only the decision flag for a pool.
151    pub fn should_spawn_worker(pool: &str) -> Result<bool, Error> {
152        Ok(Self::plan_create_worker(pool)?.should_spawn)
153    }
154}