canic_core/ops/placement/
scaling.rs1pub use crate::ops::storage::scaling::ScalingRegistryView;
9
10use crate::{
11 Error, ThisError,
12 cdk::utils::time::now_secs,
13 config::schema::ScalePool,
14 ops::{
15 config::ConfigOps,
16 rpc::{CreateCanisterParent, create_canister_request},
17 storage::scaling::{ScalingWorkerRegistryStorageOps, WorkerEntry},
18 },
19};
20use candid::Principal;
21
22#[derive(Debug, ThisError)]
28pub enum ScalingOpsError {
29 #[error("scaling capability disabled for this canister")]
30 ScalingDisabled,
31
32 #[error("scaling pool '{0}' not found")]
33 PoolNotFound(String),
34
35 #[error("invalid scaling key: {0}")]
36 InvalidKey(String),
37
38 #[error("scaling plan rejected: {0}")]
39 PlanRejected(String),
40}
41
42impl From<ScalingOpsError> for Error {
43 fn from(err: ScalingOpsError) -> Self {
44 Self::OpsError(err.to_string())
45 }
46}
47
48#[derive(Clone, Debug)]
54pub struct ScalingPlan {
55 pub should_spawn: bool,
57
58 pub reason: String,
60}
61
62pub struct ScalingRegistryOps;
67
68impl ScalingRegistryOps {
69 #[allow(clippy::cast_possible_truncation)]
71 pub fn plan_create_worker(pool: &str) -> Result<ScalingPlan, Error> {
72 let pool_cfg = Self::get_scaling_pool_cfg(pool)?;
73 let policy = pool_cfg.policy;
74 let worker_count = ScalingWorkerRegistryStorageOps::find_by_pool(pool).len() as u32;
75
76 if policy.max_workers > 0 && worker_count >= policy.max_workers {
77 return Ok(ScalingPlan {
78 should_spawn: false,
79 reason: format!(
80 "pool '{pool}' at max_workers ({}/{})",
81 worker_count, policy.max_workers
82 ),
83 });
84 }
85
86 if worker_count < policy.min_workers {
87 return Ok(ScalingPlan {
88 should_spawn: true,
89 reason: format!(
90 "pool '{pool}' below min_workers (current {worker_count}, min {})",
91 policy.min_workers
92 ),
93 });
94 }
95
96 Ok(ScalingPlan {
97 should_spawn: false,
98 reason: format!(
99 "pool '{pool}' within policy bounds (current {worker_count}, min {}, max {})",
100 policy.min_workers, policy.max_workers
101 ),
102 })
103 }
104
105 fn get_scaling_pool_cfg(pool: &str) -> Result<ScalePool, Error> {
107 let cfg = ConfigOps::current_canister();
108 let scale_cfg = cfg.scaling.ok_or(ScalingOpsError::ScalingDisabled)?;
109
110 let pool_cfg = scale_cfg
111 .pools
112 .get(pool)
113 .ok_or_else(|| ScalingOpsError::PoolNotFound(pool.to_string()))?;
114
115 Ok(pool_cfg.clone())
116 }
117
118 #[must_use]
120 pub fn export() -> ScalingRegistryView {
121 ScalingWorkerRegistryStorageOps::export()
122 }
123
124 pub async fn create_worker(pool: &str) -> Result<Principal, Error> {
126 let plan = Self::plan_create_worker(pool)?;
128 if !plan.should_spawn {
129 return Err(ScalingOpsError::PlanRejected(plan.reason))?;
130 }
131
132 let pool_cfg = Self::get_scaling_pool_cfg(pool)?;
134 let role = pool_cfg.canister_role.clone();
135
136 let pid = create_canister_request::<()>(&role, CreateCanisterParent::ThisCanister, None)
138 .await?
139 .new_canister_pid;
140
141 let entry =
143 WorkerEntry::try_new(pool, role, now_secs()).map_err(ScalingOpsError::InvalidKey)?;
144
145 ScalingWorkerRegistryStorageOps::insert(pid, entry);
146
147 Ok(pid)
148 }
149
150 pub fn should_spawn_worker(pool: &str) -> Result<bool, Error> {
152 Ok(Self::plan_create_worker(pool)?.should_spawn)
153 }
154}