1pub use crate::ops::storage::pool::{CanisterPoolEntry, CanisterPoolStatus, CanisterPoolView};
14
15use crate::{
16 Error, ThisError,
17 cdk::{
18 api::canister_self,
19 futures::spawn,
20 mgmt::{CanisterSettings, UpdateSettingsArgs},
21 types::Principal,
22 },
23 config::Config,
24 log::Topic,
25 ops::{
26 OPS_POOL_CHECK_INTERVAL, OPS_POOL_INIT_DELAY, OpsError,
27 config::ConfigOps,
28 ic::{
29 get_cycles,
30 mgmt::{create_canister, uninstall_code},
31 timer::{TimerId, TimerOps},
32 update_settings,
33 },
34 prelude::*,
35 storage::{pool::CanisterPoolStorageOps, topology::SubnetCanisterRegistryOps},
36 },
37 types::{Cycles, TC},
38};
39
40use candid::CandidType;
41use serde::Deserialize;
42use std::{cell::RefCell, time::Duration};
43
44#[derive(Debug, ThisError)]
49pub enum PoolOpsError {
50 #[error("pool entry missing for {pid}")]
51 PoolEntryMissing { pid: Principal },
52
53 #[error("missing module hash for pool entry {pid}")]
54 MissingModuleHash { pid: Principal },
55
56 #[error("missing type for pool entry {pid}")]
57 MissingType { pid: Principal },
58
59 #[error("pool entry {pid} is not ready")]
60 PoolEntryNotReady { pid: Principal },
61}
62
63impl From<PoolOpsError> for Error {
64 fn from(err: PoolOpsError) -> Self {
65 OpsError::from(err).into()
66 }
67}
68
69#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
74pub enum PoolAdminCommand {
75 CreateEmpty,
76 Recycle { pid: Principal },
77 ImportImmediate { pid: Principal },
78 ImportQueued { pids: Vec<Principal> },
79}
80
81#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
82pub enum PoolAdminResponse {
83 Created {
84 pid: Principal,
85 },
86 Recycled,
87 Imported,
88 QueuedImported {
89 added: u64,
90 requeued: u64,
91 skipped: u64,
92 total: u64,
93 },
94}
95
96thread_local! {
101 static TIMER: RefCell<Option<TimerId>> = const { RefCell::new(None) };
102 static RESET_IN_PROGRESS: RefCell<bool> = const { RefCell::new(false) };
103 static RESET_RESCHEDULE: RefCell<bool> = const { RefCell::new(false) };
104 static RESET_TIMER: RefCell<Option<TimerId>> = const { RefCell::new(None) };
105}
106
107const POOL_CANISTER_CYCLES: u128 = 5 * TC;
109
110const POOL_RESET_BATCH_SIZE: usize = 10;
112
113fn pool_controllers() -> Vec<Principal> {
118 let mut controllers = Config::try_get()
119 .map(|cfg| cfg.controllers.clone())
120 .unwrap_or_default();
121
122 let root = canister_self();
123 if !controllers.contains(&root) {
124 controllers.push(root);
125 }
126
127 controllers
128}
129
130async fn reset_into_pool(pid: Principal) -> Result<Cycles, Error> {
131 uninstall_code(pid).await?;
132
133 update_settings(&UpdateSettingsArgs {
134 canister_id: pid,
135 settings: CanisterSettings {
136 controllers: Some(pool_controllers()),
137 ..Default::default()
138 },
139 })
140 .await?;
141
142 get_cycles(pid).await
143}
144
145pub struct PoolOps;
150
151impl PoolOps {
152 pub fn start() {
157 TIMER.with_borrow_mut(|slot| {
158 if slot.is_some() {
159 return;
160 }
161
162 let id = TimerOps::set(OPS_POOL_INIT_DELAY, "pool:init", async {
163 let _ = Self::check();
164
165 let interval =
166 TimerOps::set_interval(OPS_POOL_CHECK_INTERVAL, "pool:interval", || async {
167 let _ = Self::check();
168 });
169
170 TIMER.with_borrow_mut(|slot| *slot = Some(interval));
171 });
172
173 *slot = Some(id);
174 });
175 }
176
177 pub fn stop() {
178 TIMER.with_borrow_mut(|slot| {
179 if let Some(id) = slot.take() {
180 TimerOps::clear(id);
181 }
182 });
183 }
184
185 #[must_use]
190 pub fn check() -> u64 {
191 Self::schedule_reset_worker();
192
193 let subnet_cfg = match ConfigOps::current_subnet() {
194 Ok(cfg) => cfg,
195 Err(e) => {
196 log!(
197 Topic::CanisterPool,
198 Warn,
199 "cannot read subnet config: {e:?}"
200 );
201 return 0;
202 }
203 };
204
205 let min_size: u64 = subnet_cfg.pool.minimum_size.into();
206 let ready_size = Self::ready_len();
207
208 if ready_size >= min_size {
209 return 0;
210 }
211
212 let missing = (min_size - ready_size).min(10);
213 log!(
214 Topic::CanisterPool,
215 Ok,
216 "pool low: {ready_size}/{min_size}, creating {missing}"
217 );
218
219 spawn(async move {
220 for i in 0..missing {
221 match pool_create_canister().await {
222 Ok(_) => log!(
223 Topic::CanisterPool,
224 Ok,
225 "created pool canister {}/{}",
226 i + 1,
227 missing
228 ),
229 Err(e) => log!(Topic::CanisterPool, Warn, "pool creation failed: {e:?}"),
230 }
231 }
232 });
233
234 missing
235 }
236
237 #[must_use]
238 pub fn pop_ready() -> Option<(Principal, CanisterPoolEntry)> {
239 CanisterPoolStorageOps::pop_ready()
240 }
241
242 #[must_use]
243 pub fn contains(pid: &Principal) -> bool {
244 CanisterPoolStorageOps::contains(pid)
245 }
246
247 #[must_use]
248 pub fn export() -> CanisterPoolView {
249 CanisterPoolStorageOps::export()
250 }
251
252 pub async fn admin(cmd: PoolAdminCommand) -> Result<PoolAdminResponse, Error> {
253 match cmd {
254 PoolAdminCommand::CreateEmpty => {
255 let pid = pool_create_canister().await?;
256 Ok(PoolAdminResponse::Created { pid })
257 }
258 PoolAdminCommand::Recycle { pid } => {
259 pool_recycle_canister(pid).await?;
260 Ok(PoolAdminResponse::Recycled)
261 }
262 PoolAdminCommand::ImportImmediate { pid } => {
263 pool_import_canister(pid).await?;
264 Ok(PoolAdminResponse::Imported)
265 }
266 PoolAdminCommand::ImportQueued { pids } => {
267 let (a, r, s, t) = pool_import_queued_canisters(pids)?;
268 Ok(PoolAdminResponse::QueuedImported {
269 added: a,
270 requeued: r,
271 skipped: s,
272 total: t,
273 })
274 }
275 }
276 }
277
278 fn ready_len() -> u64 {
283 CanisterPoolStorageOps::export()
284 .into_iter()
285 .filter(|(_, e)| e.status.is_ready())
286 .count() as u64
287 }
288
289 fn has_pending_reset() -> bool {
290 CanisterPoolStorageOps::export()
291 .into_iter()
292 .any(|(_, e)| e.status.is_pending_reset())
293 }
294
295 fn maybe_reschedule() {
296 let reschedule = RESET_RESCHEDULE.with_borrow_mut(|f| {
297 let v = *f;
298 *f = false;
299 v
300 });
301
302 if reschedule || Self::has_pending_reset() {
303 Self::schedule_reset_worker();
304 }
305 }
306
307 fn schedule_reset_worker() {
308 RESET_TIMER.with_borrow_mut(|slot| {
309 if slot.is_some() {
310 return;
311 }
312
313 let id = TimerOps::set(Duration::ZERO, "pool:pending", async {
314 RESET_TIMER.with_borrow_mut(|slot| *slot = None);
315 let _ = Self::run_reset_worker(POOL_RESET_BATCH_SIZE).await;
316 });
317
318 *slot = Some(id);
319 });
320 }
321
322 async fn run_reset_worker(limit: usize) -> Result<(), Error> {
323 if limit == 0 {
324 return Ok(());
325 }
326
327 let should_run = RESET_IN_PROGRESS.with_borrow_mut(|flag| {
328 if *flag {
329 RESET_RESCHEDULE.with_borrow_mut(|r| *r = true);
330 false
331 } else {
332 *flag = true;
333 true
334 }
335 });
336
337 if !should_run {
338 return Ok(());
339 }
340
341 let result = Self::run_reset_batch(limit).await;
342
343 RESET_IN_PROGRESS.with_borrow_mut(|f| *f = false);
344 Self::maybe_reschedule();
345
346 result
347 }
348
349 async fn run_reset_batch(limit: usize) -> Result<(), Error> {
350 let mut pending: Vec<_> = CanisterPoolStorageOps::export()
351 .into_iter()
352 .filter(|(_, e)| e.status.is_pending_reset())
353 .collect();
354
355 if pending.is_empty() {
356 return Ok(());
357 }
358
359 pending.sort_by_key(|(_, e)| e.created_at);
360
361 for (pid, mut entry) in pending.into_iter().take(limit) {
362 match reset_into_pool(pid).await {
363 Ok(cycles) => {
364 entry.cycles = cycles;
365 entry.status = CanisterPoolStatus::Ready;
366 }
367 Err(err) => {
368 entry.status = CanisterPoolStatus::Failed {
369 reason: err.to_string(),
370 };
371 log!(
372 Topic::CanisterPool,
373 Warn,
374 "pool reset failed for {pid}: {err}"
375 );
376 }
377 }
378
379 if !CanisterPoolStorageOps::update(pid, entry) {
380 log!(
381 Topic::CanisterPool,
382 Warn,
383 "pool reset update missing for {pid}"
384 );
385 }
386 }
387
388 Ok(())
389 }
390}
391
392pub async fn pool_create_canister() -> Result<Principal, Error> {
397 OpsError::require_root()?;
398
399 let cycles = Cycles::new(POOL_CANISTER_CYCLES);
400 let pid = create_canister(pool_controllers(), cycles.clone()).await?;
401
402 CanisterPoolStorageOps::register(pid, cycles, CanisterPoolStatus::Ready, None, None, None);
403 Ok(pid)
404}
405
406pub async fn pool_import_canister(pid: Principal) -> Result<(), Error> {
407 OpsError::require_root()?;
408
409 let _ = SubnetCanisterRegistryOps::remove(&pid);
410 let cycles = reset_into_pool(pid).await?;
411
412 CanisterPoolStorageOps::register(pid, cycles, CanisterPoolStatus::Ready, None, None, None);
413 Ok(())
414}
415
416fn pool_import_queued_canisters(pids: Vec<Principal>) -> Result<(u64, u64, u64, u64), Error> {
417 OpsError::require_root()?;
418
419 let mut added = 0;
420 let mut requeued = 0;
421 let mut skipped = 0;
422
423 for pid in &pids {
424 if SubnetCanisterRegistryOps::get(*pid).is_some() {
425 skipped += 1;
426 continue;
427 }
428
429 if let Some(mut entry) = CanisterPoolStorageOps::get(*pid) {
430 if entry.status.is_failed() {
431 entry.status = CanisterPoolStatus::PendingReset;
432 entry.cycles = Cycles::default();
433 if CanisterPoolStorageOps::update(*pid, entry) {
434 requeued += 1;
435 } else {
436 skipped += 1;
437 }
438 } else {
439 skipped += 1;
440 }
441 continue;
442 }
443
444 CanisterPoolStorageOps::register(
445 *pid,
446 Cycles::default(),
447 CanisterPoolStatus::PendingReset,
448 None,
449 None,
450 None,
451 );
452 added += 1;
453 }
454
455 PoolOps::schedule_reset_worker();
456
457 Ok((added, requeued, skipped, pids.len() as u64))
458}
459
460pub async fn pool_recycle_canister(pid: Principal) -> Result<(), Error> {
461 OpsError::require_root()?;
462
463 let entry =
464 SubnetCanisterRegistryOps::get(pid).ok_or(PoolOpsError::PoolEntryMissing { pid })?;
465
466 let role = Some(entry.role.clone());
467 let hash = entry.module_hash.clone();
468
469 let _ = SubnetCanisterRegistryOps::remove(&pid);
470
471 let cycles = reset_into_pool(pid).await?;
472 CanisterPoolStorageOps::register(pid, cycles, CanisterPoolStatus::Ready, role, None, hash);
473
474 Ok(())
475}
476
477pub async fn pool_export_canister(pid: Principal) -> Result<(CanisterRole, Vec<u8>), Error> {
478 OpsError::require_root()?;
479
480 let entry = CanisterPoolStorageOps::take(&pid).ok_or(PoolOpsError::PoolEntryMissing { pid })?;
481
482 if !entry.status.is_ready() {
483 return Err(PoolOpsError::PoolEntryNotReady { pid }.into());
484 }
485
486 let role = entry.role.ok_or(PoolOpsError::MissingType { pid })?;
487 let hash = entry
488 .module_hash
489 .ok_or(PoolOpsError::MissingModuleHash { pid })?;
490
491 Ok((role, hash))
492}
493
494pub async fn recycle_via_orchestrator(pid: Principal) -> Result<(), Error> {
499 use crate::ops::orchestration::orchestrator::{CanisterLifecycleOrchestrator, LifecycleEvent};
500
501 CanisterLifecycleOrchestrator::apply(LifecycleEvent::RecycleToPool { pid })
502 .await
503 .map(|_| ())
504}