canic_core/ops/model/memory/
scaling.rs

1//! Policy layer for scaling worker pools.
2//!
3//! Scaling builds on top of [`ScalingRegistry`] and the configuration entries
4//! under `[canisters.<type>.scaling]`. The helpers in this module apply policy
5//! decisions, create new workers when necessary, and surface registry
6//! snapshots for diagnostics.
7
8pub use crate::model::memory::scaling::ScalingRegistryView;
9
10use crate::{
11    Error, ThisError,
12    config::schema::ScalePool,
13    model::memory::scaling::{ScalingRegistry, WorkerEntry},
14    ops::{
15        config::ConfigOps,
16        model::memory::MemoryOpsError,
17        request::{CreateCanisterParent, create_canister_request},
18    },
19    utils::time::now_secs,
20};
21use candid::Principal;
22
23///
24/// ScalingOpsError
25/// Errors raised by scaling operations (policy / orchestration layer)
26///
27
28#[derive(Debug, ThisError)]
29pub enum ScalingOpsError {
30    #[error("scaling capability disabled for this canister")]
31    ScalingDisabled,
32
33    #[error("scaling pool '{0}' not found")]
34    PoolNotFound(String),
35
36    #[error("scaling plan rejected: {0}")]
37    PlanRejected(String),
38}
39
40impl From<ScalingOpsError> for Error {
41    fn from(err: ScalingOpsError) -> Self {
42        MemoryOpsError::ScalingOpsError(err).into()
43    }
44}
45
46///
47/// ScalingPlan
48/// Result of a dry-run evaluation for scaling decisions
49///
50
51#[derive(Clone, Debug)]
52pub struct ScalingPlan {
53    /// Whether a new worker should be spawned.
54    pub should_spawn: bool,
55
56    /// Explanation / debug string for the decision.
57    pub reason: String,
58}
59
60///
61/// ScalingRegistryOps
62///
63
64pub struct ScalingRegistryOps;
65
66impl ScalingRegistryOps {
67    /// Evaluate scaling policy for a pool without side effects.
68    #[allow(clippy::cast_possible_truncation)]
69    pub fn plan_create_worker(pool: &str) -> Result<ScalingPlan, Error> {
70        let pool_cfg = Self::get_scaling_pool_cfg(pool)?;
71        let policy = pool_cfg.policy;
72        let worker_count = ScalingRegistry::find_by_pool(pool).len() as u32;
73
74        if policy.max_workers > 0 && worker_count >= policy.max_workers {
75            return Ok(ScalingPlan {
76                should_spawn: false,
77                reason: format!(
78                    "pool '{pool}' at max_workers ({}/{})",
79                    worker_count, policy.max_workers
80                ),
81            });
82        }
83
84        if worker_count < policy.min_workers {
85            return Ok(ScalingPlan {
86                should_spawn: true,
87                reason: format!(
88                    "pool '{pool}' below min_workers (current {worker_count}, min {})",
89                    policy.min_workers
90                ),
91            });
92        }
93
94        Ok(ScalingPlan {
95            should_spawn: false,
96            reason: format!(
97                "pool '{pool}' within policy bounds (current {worker_count}, min {}, max {})",
98                policy.min_workers, policy.max_workers
99            ),
100        })
101    }
102
103    /// Look up the config for a given pool on the *current canister*.
104    fn get_scaling_pool_cfg(pool: &str) -> Result<ScalePool, Error> {
105        let cfg = ConfigOps::current_canister()?;
106        let scale_cfg = cfg.scaling.ok_or(ScalingOpsError::ScalingDisabled)?;
107
108        let pool_cfg = scale_cfg
109            .pools
110            .get(pool)
111            .ok_or_else(|| ScalingOpsError::PoolNotFound(pool.to_string()))?;
112
113        Ok(pool_cfg.clone())
114    }
115
116    /// Export a snapshot of the current registry state.
117    #[must_use]
118    pub fn export() -> ScalingRegistryView {
119        ScalingRegistry::export()
120    }
121
122    /// Create a new worker canister in the given pool and register it.
123    pub async fn create_worker(pool: &str) -> Result<Principal, Error> {
124        // 1. Evaluate policy
125        let plan = Self::plan_create_worker(pool)?;
126        if !plan.should_spawn {
127            return Err(ScalingOpsError::PlanRejected(plan.reason))?;
128        }
129
130        // 2. Look up pool config
131        let pool_cfg = Self::get_scaling_pool_cfg(pool)?;
132        let ty = pool_cfg.canister_type.clone();
133
134        // 3. Create the canister
135        let pid = create_canister_request::<()>(&ty, CreateCanisterParent::ThisCanister, None)
136            .await?
137            .new_canister_pid;
138
139        // 4. Register in memory
140        let entry = WorkerEntry {
141            pool: pool.to_string(),
142            canister_type: ty,
143            created_at_secs: now_secs(),
144            // load_bps: 0 by default (no load yet)
145        };
146
147        ScalingRegistry::insert(pid, entry);
148
149        Ok(pid)
150    }
151
152    /// Convenience: return only the decision flag for a pool.
153    pub fn should_spawn_worker(pool: &str) -> Result<bool, Error> {
154        Ok(Self::plan_create_worker(pool)?.should_spawn)
155    }
156}