canic_core/ops/
pool.rs

1//! Pool lifecycle helpers.
2//!
3//! The root canister maintains a pool of empty or decommissioned canisters
4//! that can be quickly reassigned when scaling.
5//!
6//! INVARIANTS:
7//! - Pool canisters are NOT part of topology
8//! - Pool canisters have NO parent
9//! - Root is the sole controller
10//! - Importing a canister is destructive (code + controllers wiped)
11//! - Registry metadata is informational only while in pool
12
13pub use crate::ops::storage::pool::{CanisterPoolEntry, CanisterPoolStatus, CanisterPoolView};
14
15use crate::{
16    Error, ThisError,
17    cdk::{
18        api::canister_self,
19        futures::spawn,
20        mgmt::{CanisterSettings, UpdateSettingsArgs},
21        types::Principal,
22    },
23    config::Config,
24    log::Topic,
25    ops::{
26        OPS_POOL_CHECK_INTERVAL, OPS_POOL_INIT_DELAY, OpsError,
27        config::ConfigOps,
28        ic::{
29            get_cycles,
30            mgmt::{create_canister, uninstall_code},
31            timer::{TimerId, TimerOps},
32            update_settings,
33        },
34        prelude::*,
35        storage::{pool::CanisterPoolStorageOps, topology::SubnetCanisterRegistryOps},
36    },
37    types::{Cycles, TC},
38};
39
40use candid::CandidType;
41use serde::Deserialize;
42use std::{cell::RefCell, time::Duration};
43
44//
45// ERRORS
46//
47
48#[derive(Debug, ThisError)]
49pub enum PoolOpsError {
50    #[error("pool entry missing for {pid}")]
51    PoolEntryMissing { pid: Principal },
52
53    #[error("missing module hash for pool entry {pid}")]
54    MissingModuleHash { pid: Principal },
55
56    #[error("missing type for pool entry {pid}")]
57    MissingType { pid: Principal },
58
59    #[error("pool entry {pid} is not ready")]
60    PoolEntryNotReady { pid: Principal },
61}
62
63impl From<PoolOpsError> for Error {
64    fn from(err: PoolOpsError) -> Self {
65        OpsError::from(err).into()
66    }
67}
68
69//
70// ADMIN API
71//
72
73#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
74pub enum PoolAdminCommand {
75    CreateEmpty,
76    Recycle { pid: Principal },
77    ImportImmediate { pid: Principal },
78    ImportQueued { pids: Vec<Principal> },
79}
80
81#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
82pub enum PoolAdminResponse {
83    Created {
84        pid: Principal,
85    },
86    Recycled,
87    Imported,
88    QueuedImported {
89        added: u64,
90        requeued: u64,
91        skipped: u64,
92        total: u64,
93    },
94}
95
96//
97// TIMER STATE
98//
99
100thread_local! {
101    static TIMER: RefCell<Option<TimerId>> = const { RefCell::new(None) };
102    static RESET_IN_PROGRESS: RefCell<bool> = const { RefCell::new(false) };
103    static RESET_RESCHEDULE: RefCell<bool> = const { RefCell::new(false) };
104    static RESET_TIMER: RefCell<Option<TimerId>> = const { RefCell::new(None) };
105}
106
107/// Default cycles allocated to freshly created pool canisters.
108const POOL_CANISTER_CYCLES: u128 = 5 * TC;
109
110/// Default batch size for resetting pending pool entries.
111const POOL_RESET_BATCH_SIZE: usize = 10;
112
113//
114// INTERNAL HELPERS
115//
116
117fn pool_controllers() -> Vec<Principal> {
118    let mut controllers = Config::try_get()
119        .map(|cfg| cfg.controllers.clone())
120        .unwrap_or_default();
121
122    let root = canister_self();
123    if !controllers.contains(&root) {
124        controllers.push(root);
125    }
126
127    controllers
128}
129
130async fn reset_into_pool(pid: Principal) -> Result<Cycles, Error> {
131    uninstall_code(pid).await?;
132
133    update_settings(&UpdateSettingsArgs {
134        canister_id: pid,
135        settings: CanisterSettings {
136            controllers: Some(pool_controllers()),
137            ..Default::default()
138        },
139    })
140    .await?;
141
142    get_cycles(pid).await
143}
144
145//
146// POOL OPS
147//
148
149pub struct PoolOps;
150
151impl PoolOps {
152    // ---------------------------------------------------------------------
153    // Lifecycle
154    // ---------------------------------------------------------------------
155
156    pub fn start() {
157        TIMER.with_borrow_mut(|slot| {
158            if slot.is_some() {
159                return;
160            }
161
162            let id = TimerOps::set(OPS_POOL_INIT_DELAY, "pool:init", async {
163                let _ = Self::check();
164
165                let interval =
166                    TimerOps::set_interval(OPS_POOL_CHECK_INTERVAL, "pool:interval", || async {
167                        let _ = Self::check();
168                    });
169
170                TIMER.with_borrow_mut(|slot| *slot = Some(interval));
171            });
172
173            *slot = Some(id);
174        });
175    }
176
177    pub fn stop() {
178        TIMER.with_borrow_mut(|slot| {
179            if let Some(id) = slot.take() {
180                TimerOps::clear(id);
181            }
182        });
183    }
184
185    // ---------------------------------------------------------------------
186    // Public API
187    // ---------------------------------------------------------------------
188
189    #[must_use]
190    pub fn check() -> u64 {
191        Self::schedule_reset_worker();
192
193        let subnet_cfg = match ConfigOps::current_subnet() {
194            Ok(cfg) => cfg,
195            Err(e) => {
196                log!(
197                    Topic::CanisterPool,
198                    Warn,
199                    "cannot read subnet config: {e:?}"
200                );
201                return 0;
202            }
203        };
204
205        let min_size: u64 = subnet_cfg.pool.minimum_size.into();
206        let ready_size = Self::ready_len();
207
208        if ready_size >= min_size {
209            return 0;
210        }
211
212        let missing = (min_size - ready_size).min(10);
213        log!(
214            Topic::CanisterPool,
215            Ok,
216            "pool low: {ready_size}/{min_size}, creating {missing}"
217        );
218
219        spawn(async move {
220            for i in 0..missing {
221                match pool_create_canister().await {
222                    Ok(_) => log!(
223                        Topic::CanisterPool,
224                        Ok,
225                        "created pool canister {}/{}",
226                        i + 1,
227                        missing
228                    ),
229                    Err(e) => log!(Topic::CanisterPool, Warn, "pool creation failed: {e:?}"),
230                }
231            }
232        });
233
234        missing
235    }
236
237    #[must_use]
238    pub fn pop_ready() -> Option<(Principal, CanisterPoolEntry)> {
239        CanisterPoolStorageOps::pop_ready()
240    }
241
242    #[must_use]
243    pub fn contains(pid: &Principal) -> bool {
244        CanisterPoolStorageOps::contains(pid)
245    }
246
247    #[must_use]
248    pub fn export() -> CanisterPoolView {
249        CanisterPoolStorageOps::export()
250    }
251
252    pub async fn admin(cmd: PoolAdminCommand) -> Result<PoolAdminResponse, Error> {
253        match cmd {
254            PoolAdminCommand::CreateEmpty => {
255                let pid = pool_create_canister().await?;
256                Ok(PoolAdminResponse::Created { pid })
257            }
258            PoolAdminCommand::Recycle { pid } => {
259                pool_recycle_canister(pid).await?;
260                Ok(PoolAdminResponse::Recycled)
261            }
262            PoolAdminCommand::ImportImmediate { pid } => {
263                pool_import_canister(pid).await?;
264                Ok(PoolAdminResponse::Imported)
265            }
266            PoolAdminCommand::ImportQueued { pids } => {
267                let (a, r, s, t) = pool_import_queued_canisters(pids)?;
268                Ok(PoolAdminResponse::QueuedImported {
269                    added: a,
270                    requeued: r,
271                    skipped: s,
272                    total: t,
273                })
274            }
275        }
276    }
277
278    // ---------------------------------------------------------------------
279    // Scheduler + worker
280    // ---------------------------------------------------------------------
281
282    fn ready_len() -> u64 {
283        CanisterPoolStorageOps::export()
284            .into_iter()
285            .filter(|(_, e)| e.status.is_ready())
286            .count() as u64
287    }
288
289    fn has_pending_reset() -> bool {
290        CanisterPoolStorageOps::export()
291            .into_iter()
292            .any(|(_, e)| e.status.is_pending_reset())
293    }
294
295    fn maybe_reschedule() {
296        let reschedule = RESET_RESCHEDULE.with_borrow_mut(|f| {
297            let v = *f;
298            *f = false;
299            v
300        });
301
302        if reschedule || Self::has_pending_reset() {
303            Self::schedule_reset_worker();
304        }
305    }
306
307    fn schedule_reset_worker() {
308        RESET_TIMER.with_borrow_mut(|slot| {
309            if slot.is_some() {
310                return;
311            }
312
313            let id = TimerOps::set(Duration::ZERO, "pool:pending", async {
314                RESET_TIMER.with_borrow_mut(|slot| *slot = None);
315                let _ = Self::run_reset_worker(POOL_RESET_BATCH_SIZE).await;
316            });
317
318            *slot = Some(id);
319        });
320    }
321
322    async fn run_reset_worker(limit: usize) -> Result<(), Error> {
323        if limit == 0 {
324            return Ok(());
325        }
326
327        let should_run = RESET_IN_PROGRESS.with_borrow_mut(|flag| {
328            if *flag {
329                RESET_RESCHEDULE.with_borrow_mut(|r| *r = true);
330                false
331            } else {
332                *flag = true;
333                true
334            }
335        });
336
337        if !should_run {
338            return Ok(());
339        }
340
341        let result = Self::run_reset_batch(limit).await;
342
343        RESET_IN_PROGRESS.with_borrow_mut(|f| *f = false);
344        Self::maybe_reschedule();
345
346        result
347    }
348
349    async fn run_reset_batch(limit: usize) -> Result<(), Error> {
350        let mut pending: Vec<_> = CanisterPoolStorageOps::export()
351            .into_iter()
352            .filter(|(_, e)| e.status.is_pending_reset())
353            .collect();
354
355        if pending.is_empty() {
356            return Ok(());
357        }
358
359        pending.sort_by_key(|(_, e)| e.created_at);
360
361        for (pid, mut entry) in pending.into_iter().take(limit) {
362            match reset_into_pool(pid).await {
363                Ok(cycles) => {
364                    entry.cycles = cycles;
365                    entry.status = CanisterPoolStatus::Ready;
366                }
367                Err(err) => {
368                    entry.status = CanisterPoolStatus::Failed {
369                        reason: err.to_string(),
370                    };
371                    log!(
372                        Topic::CanisterPool,
373                        Warn,
374                        "pool reset failed for {pid}: {err}"
375                    );
376                }
377            }
378
379            if !CanisterPoolStorageOps::update(pid, entry) {
380                log!(
381                    Topic::CanisterPool,
382                    Warn,
383                    "pool reset update missing for {pid}"
384                );
385            }
386        }
387
388        Ok(())
389    }
390}
391
392//
393// CREATE / IMPORT / RECYCLE / EXPORT
394//
395
396pub async fn pool_create_canister() -> Result<Principal, Error> {
397    OpsError::require_root()?;
398
399    let cycles = Cycles::new(POOL_CANISTER_CYCLES);
400    let pid = create_canister(pool_controllers(), cycles.clone()).await?;
401
402    CanisterPoolStorageOps::register(pid, cycles, CanisterPoolStatus::Ready, None, None, None);
403    Ok(pid)
404}
405
406pub async fn pool_import_canister(pid: Principal) -> Result<(), Error> {
407    OpsError::require_root()?;
408
409    let _ = SubnetCanisterRegistryOps::remove(&pid);
410    let cycles = reset_into_pool(pid).await?;
411
412    CanisterPoolStorageOps::register(pid, cycles, CanisterPoolStatus::Ready, None, None, None);
413    Ok(())
414}
415
416fn pool_import_queued_canisters(pids: Vec<Principal>) -> Result<(u64, u64, u64, u64), Error> {
417    OpsError::require_root()?;
418
419    let mut added = 0;
420    let mut requeued = 0;
421    let mut skipped = 0;
422
423    for pid in &pids {
424        if SubnetCanisterRegistryOps::get(*pid).is_some() {
425            skipped += 1;
426            continue;
427        }
428
429        if let Some(mut entry) = CanisterPoolStorageOps::get(*pid) {
430            if entry.status.is_failed() {
431                entry.status = CanisterPoolStatus::PendingReset;
432                entry.cycles = Cycles::default();
433                if CanisterPoolStorageOps::update(*pid, entry) {
434                    requeued += 1;
435                } else {
436                    skipped += 1;
437                }
438            } else {
439                skipped += 1;
440            }
441            continue;
442        }
443
444        CanisterPoolStorageOps::register(
445            *pid,
446            Cycles::default(),
447            CanisterPoolStatus::PendingReset,
448            None,
449            None,
450            None,
451        );
452        added += 1;
453    }
454
455    PoolOps::schedule_reset_worker();
456
457    Ok((added, requeued, skipped, pids.len() as u64))
458}
459
460pub async fn pool_recycle_canister(pid: Principal) -> Result<(), Error> {
461    OpsError::require_root()?;
462
463    let entry =
464        SubnetCanisterRegistryOps::get(pid).ok_or(PoolOpsError::PoolEntryMissing { pid })?;
465
466    let role = Some(entry.role.clone());
467    let hash = entry.module_hash.clone();
468
469    let _ = SubnetCanisterRegistryOps::remove(&pid);
470
471    let cycles = reset_into_pool(pid).await?;
472    CanisterPoolStorageOps::register(pid, cycles, CanisterPoolStatus::Ready, role, None, hash);
473
474    Ok(())
475}
476
477pub async fn pool_export_canister(pid: Principal) -> Result<(CanisterRole, Vec<u8>), Error> {
478    OpsError::require_root()?;
479
480    let entry = CanisterPoolStorageOps::take(&pid).ok_or(PoolOpsError::PoolEntryMissing { pid })?;
481
482    if !entry.status.is_ready() {
483        return Err(PoolOpsError::PoolEntryNotReady { pid }.into());
484    }
485
486    let role = entry.role.ok_or(PoolOpsError::MissingType { pid })?;
487    let hash = entry
488        .module_hash
489        .ok_or(PoolOpsError::MissingModuleHash { pid })?;
490
491    Ok((role, hash))
492}
493
494//
495// ORCHESTRATION HOOK
496//
497
498pub async fn recycle_via_orchestrator(pid: Principal) -> Result<(), Error> {
499    use crate::ops::orchestration::orchestrator::{CanisterLifecycleOrchestrator, LifecycleEvent};
500
501    CanisterLifecycleOrchestrator::apply(LifecycleEvent::RecycleToPool { pid })
502        .await
503        .map(|_| ())
504}