canic_core/ops/placement/
scaling.rs1pub use crate::ops::storage::scaling::ScalingRegistryView;
9
10use crate::{
11 Error, ThisError,
12 cdk::utils::time::now_secs,
13 config::schema::ScalePool,
14 ops::{
15 config::ConfigOps,
16 rpc::{CreateCanisterParent, create_canister_request},
17 storage::scaling::{ScalingWorkerRegistryStorageOps, WorkerEntry},
18 },
19};
20use candid::Principal;
21
22#[derive(Debug, ThisError)]
28pub enum ScalingOpsError {
29 #[error("scaling capability disabled for this canister")]
30 ScalingDisabled,
31
32 #[error("scaling pool '{0}' not found")]
33 PoolNotFound(String),
34
35 #[error("scaling plan rejected: {0}")]
36 PlanRejected(String),
37}
38
39impl From<ScalingOpsError> for Error {
40 fn from(err: ScalingOpsError) -> Self {
41 Self::OpsError(err.to_string())
42 }
43}
44
45#[derive(Clone, Debug)]
51pub struct ScalingPlan {
52 pub should_spawn: bool,
54
55 pub reason: String,
57}
58
59pub struct ScalingRegistryOps;
64
65impl ScalingRegistryOps {
66 #[allow(clippy::cast_possible_truncation)]
68 pub fn plan_create_worker(pool: &str) -> Result<ScalingPlan, Error> {
69 let pool_cfg = Self::get_scaling_pool_cfg(pool)?;
70 let policy = pool_cfg.policy;
71 let worker_count = ScalingWorkerRegistryStorageOps::find_by_pool(pool).len() as u32;
72
73 if policy.max_workers > 0 && worker_count >= policy.max_workers {
74 return Ok(ScalingPlan {
75 should_spawn: false,
76 reason: format!(
77 "pool '{pool}' at max_workers ({}/{})",
78 worker_count, policy.max_workers
79 ),
80 });
81 }
82
83 if worker_count < policy.min_workers {
84 return Ok(ScalingPlan {
85 should_spawn: true,
86 reason: format!(
87 "pool '{pool}' below min_workers (current {worker_count}, min {})",
88 policy.min_workers
89 ),
90 });
91 }
92
93 Ok(ScalingPlan {
94 should_spawn: false,
95 reason: format!(
96 "pool '{pool}' within policy bounds (current {worker_count}, min {}, max {})",
97 policy.min_workers, policy.max_workers
98 ),
99 })
100 }
101
102 fn get_scaling_pool_cfg(pool: &str) -> Result<ScalePool, Error> {
104 let cfg = ConfigOps::current_canister()?;
105 let scale_cfg = cfg.scaling.ok_or(ScalingOpsError::ScalingDisabled)?;
106
107 let pool_cfg = scale_cfg
108 .pools
109 .get(pool)
110 .ok_or_else(|| ScalingOpsError::PoolNotFound(pool.to_string()))?;
111
112 Ok(pool_cfg.clone())
113 }
114
115 #[must_use]
117 pub fn export() -> ScalingRegistryView {
118 ScalingWorkerRegistryStorageOps::export()
119 }
120
121 pub async fn create_worker(pool: &str) -> Result<Principal, Error> {
123 let plan = Self::plan_create_worker(pool)?;
125 if !plan.should_spawn {
126 return Err(ScalingOpsError::PlanRejected(plan.reason))?;
127 }
128
129 let pool_cfg = Self::get_scaling_pool_cfg(pool)?;
131 let ty = pool_cfg.canister_type.clone();
132
133 let pid = create_canister_request::<()>(&ty, CreateCanisterParent::ThisCanister, None)
135 .await?
136 .new_canister_pid;
137
138 let entry = WorkerEntry {
140 pool: pool.to_string(),
141 canister_type: ty,
142 created_at_secs: now_secs(),
143 };
145
146 ScalingWorkerRegistryStorageOps::insert(pid, entry);
147
148 Ok(pid)
149 }
150
151 pub fn should_spawn_worker(pool: &str) -> Result<bool, Error> {
153 Ok(Self::plan_create_worker(pool)?.should_spawn)
154 }
155}