1pub use crate::ops::storage::pool::{CanisterPoolEntry, CanisterPoolStatus, CanisterPoolView};
15
16use crate::{
17 Error, ThisError,
18 cdk::{
19 api::canister_self,
20 futures::spawn,
21 mgmt::{CanisterSettings, UpdateSettingsArgs},
22 types::Principal,
23 },
24 config::Config,
25 log::Topic,
26 ops::{
27 OPS_POOL_CHECK_INTERVAL, OPS_POOL_INIT_DELAY, OpsError,
28 config::ConfigOps,
29 ic::{
30 get_cycles,
31 mgmt::{create_canister, uninstall_code},
32 timer::{TimerId, TimerOps},
33 update_settings,
34 },
35 prelude::*,
36 storage::{pool::CanisterPoolStorageOps, topology::SubnetCanisterRegistryOps},
37 },
38 types::{Cycles, TC},
39};
40use candid::CandidType;
41use serde::Deserialize;
42use std::{cell::RefCell, time::Duration};
43
44thread_local! {
49 static TIMER: RefCell<Option<TimerId>> = const { RefCell::new(None) };
50 static RESET_IN_PROGRESS: RefCell<bool> = const { RefCell::new(false) };
51 static RESET_RESCHEDULE: RefCell<bool> = const { RefCell::new(false) };
52 static RESET_TIMER: RefCell<Option<TimerId>> = const { RefCell::new(None) };
53}
54
55const POOL_CANISTER_CYCLES: u128 = 5 * TC;
57
58const POOL_RESET_BATCH_SIZE: usize = 10;
60
61#[derive(Debug, ThisError)]
66pub enum PoolOpsError {
67 #[error("pool entry missing for {pid}")]
68 PoolEntryMissing { pid: Principal },
69
70 #[error("missing module hash for pool entry {pid}")]
71 MissingModuleHash { pid: Principal },
72
73 #[error("missing type for pool entry {pid}")]
74 MissingType { pid: Principal },
75
76 #[error("pool entry {pid} is not ready")]
77 PoolEntryNotReady { pid: Principal },
78}
79
80impl From<PoolOpsError> for Error {
81 fn from(err: PoolOpsError) -> Self {
82 OpsError::from(err).into()
83 }
84}
85
86#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
91pub enum PoolAdminCommand {
92 CreateEmpty,
93 Recycle { pid: Principal },
94 ImportImmediate { pid: Principal },
95 ImportQueued { pids: Vec<Principal> },
96 RequeueFailed { pids: Option<Vec<Principal>> },
97}
98
99#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
104pub enum PoolAdminResponse {
105 Created {
106 pid: Principal,
107 },
108 Recycled,
109 Imported,
110 QueuedImported {
111 added: u64,
112 requeued: u64,
113 skipped: u64,
114 total: u64,
115 },
116 FailedRequeued {
117 requeued: u64,
118 skipped: u64,
119 total: u64,
120 },
121}
122
123fn pool_controllers() -> Vec<Principal> {
128 let mut controllers = Config::get().controllers.clone();
129
130 let root = canister_self();
131 if !controllers.contains(&root) {
132 controllers.push(root);
133 }
134
135 controllers
136}
137
138async fn reset_into_pool(pid: Principal) -> Result<Cycles, Error> {
139 uninstall_code(pid).await?;
140
141 update_settings(&UpdateSettingsArgs {
142 canister_id: pid,
143 settings: CanisterSettings {
144 controllers: Some(pool_controllers()),
145 ..Default::default()
146 },
147 })
148 .await?;
149
150 get_cycles(pid).await
151}
152
153fn register_or_update_preserving_metadata(
154 pid: Principal,
155 cycles: Cycles,
156 status: CanisterPoolStatus,
157 role: Option<CanisterRole>,
158 parent: Option<Principal>,
159 module_hash: Option<Vec<u8>>,
160) {
161 if let Some(mut entry) = CanisterPoolStorageOps::get(pid) {
162 entry.cycles = cycles;
163 entry.status = status;
164 entry.role = role.or(entry.role);
165 entry.parent = parent.or(entry.parent);
166 entry.module_hash = module_hash.or(entry.module_hash);
167 let _ = CanisterPoolStorageOps::update(pid, entry);
168 } else {
169 CanisterPoolStorageOps::register(pid, cycles, status, role, parent, module_hash);
170 }
171}
172
173pub struct PoolOps;
178
179impl PoolOps {
180 pub fn start() {
185 TIMER.with_borrow_mut(|slot| {
186 if slot.is_some() {
187 return;
188 }
189
190 let id = TimerOps::set(OPS_POOL_INIT_DELAY, "pool:init", async {
191 let _ = Self::check();
192
193 let interval =
194 TimerOps::set_interval(OPS_POOL_CHECK_INTERVAL, "pool:interval", || async {
195 let _ = Self::check();
196 });
197
198 TIMER.with_borrow_mut(|slot| *slot = Some(interval));
199 });
200
201 *slot = Some(id);
202 });
203 }
204
205 pub fn stop() {
206 TIMER.with_borrow_mut(|slot| {
207 if let Some(id) = slot.take() {
208 TimerOps::clear(id);
209 }
210 });
211 }
212
213 #[must_use]
218 pub fn check() -> u64 {
219 Self::schedule_reset_worker();
220
221 let subnet_cfg = match ConfigOps::current_subnet() {
222 Ok(cfg) => cfg,
223 Err(e) => {
224 log!(
225 Topic::CanisterPool,
226 Warn,
227 "cannot read subnet config: {e:?}"
228 );
229 return 0;
230 }
231 };
232
233 let min_size: u64 = subnet_cfg.pool.minimum_size.into();
234 let ready_size = Self::ready_len();
235
236 if ready_size >= min_size {
237 return 0;
238 }
239
240 let missing = (min_size - ready_size).min(10);
241 log!(
242 Topic::CanisterPool,
243 Ok,
244 "pool low: {ready_size}/{min_size}, creating {missing}"
245 );
246
247 spawn(async move {
248 for i in 0..missing {
249 match pool_create_canister().await {
250 Ok(_) => log!(
251 Topic::CanisterPool,
252 Ok,
253 "created pool canister {}/{}",
254 i + 1,
255 missing
256 ),
257 Err(e) => log!(Topic::CanisterPool, Warn, "pool creation failed: {e:?}"),
258 }
259 }
260 });
261
262 missing
263 }
264
265 #[must_use]
266 pub fn pop_ready() -> Option<(Principal, CanisterPoolEntry)> {
267 CanisterPoolStorageOps::pop_ready()
268 }
269
270 #[must_use]
271 pub fn contains(pid: &Principal) -> bool {
272 CanisterPoolStorageOps::contains(pid)
273 }
274
275 #[must_use]
276 pub fn export() -> CanisterPoolView {
277 CanisterPoolStorageOps::export()
278 }
279
280 pub async fn admin(cmd: PoolAdminCommand) -> Result<PoolAdminResponse, Error> {
281 match cmd {
282 PoolAdminCommand::CreateEmpty => {
283 let pid = pool_create_canister().await?;
284 Ok(PoolAdminResponse::Created { pid })
285 }
286 PoolAdminCommand::Recycle { pid } => {
287 pool_recycle_canister(pid).await?;
288 Ok(PoolAdminResponse::Recycled)
289 }
290 PoolAdminCommand::ImportImmediate { pid } => {
291 pool_import_canister(pid).await?;
292 Ok(PoolAdminResponse::Imported)
293 }
294 PoolAdminCommand::ImportQueued { pids } => {
295 let (a, r, s, t) = pool_import_queued_canisters(pids)?;
296 Ok(PoolAdminResponse::QueuedImported {
297 added: a,
298 requeued: r,
299 skipped: s,
300 total: t,
301 })
302 }
303 PoolAdminCommand::RequeueFailed { pids } => {
304 let (requeued, skipped, total) = pool_requeue_failed(pids)?;
305 Ok(PoolAdminResponse::FailedRequeued {
306 requeued,
307 skipped,
308 total,
309 })
310 }
311 }
312 }
313
314 fn ready_len() -> u64 {
319 CanisterPoolStorageOps::export()
320 .into_iter()
321 .filter(|(_, e)| e.status.is_ready())
322 .count() as u64
323 }
324
325 fn has_pending_reset() -> bool {
326 CanisterPoolStorageOps::export()
327 .into_iter()
328 .any(|(_, e)| e.status.is_pending_reset())
329 }
330
331 fn maybe_reschedule() {
332 let reschedule = RESET_RESCHEDULE.with_borrow_mut(|f| {
333 let v = *f;
334 *f = false;
335 v
336 });
337
338 if reschedule || Self::has_pending_reset() {
339 Self::schedule_reset_worker();
340 }
341 }
342
343 fn schedule_reset_worker() {
344 RESET_TIMER.with_borrow_mut(|slot| {
345 if slot.is_some() {
346 return;
347 }
348
349 let id = TimerOps::set(Duration::ZERO, "pool:pending", async {
350 RESET_TIMER.with_borrow_mut(|slot| *slot = None);
351 let _ = Self::run_reset_worker(POOL_RESET_BATCH_SIZE).await;
352 });
353
354 *slot = Some(id);
355 });
356 }
357
358 async fn run_reset_worker(limit: usize) -> Result<(), Error> {
359 if limit == 0 {
360 return Ok(());
361 }
362
363 let should_run = RESET_IN_PROGRESS.with_borrow_mut(|flag| {
364 if *flag {
365 RESET_RESCHEDULE.with_borrow_mut(|r| *r = true);
366 false
367 } else {
368 *flag = true;
369 true
370 }
371 });
372
373 if !should_run {
374 return Ok(());
375 }
376
377 let result = Self::run_reset_batch(limit).await;
378
379 RESET_IN_PROGRESS.with_borrow_mut(|f| *f = false);
380 Self::maybe_reschedule();
381
382 result
383 }
384
385 async fn run_reset_batch(limit: usize) -> Result<(), Error> {
386 let mut pending: Vec<_> = CanisterPoolStorageOps::export()
387 .into_iter()
388 .filter(|(_, e)| e.status.is_pending_reset())
389 .collect();
390
391 if pending.is_empty() {
392 return Ok(());
393 }
394
395 pending.sort_by_key(|(_, e)| e.created_at);
396
397 for (pid, mut entry) in pending.into_iter().take(limit) {
398 match reset_into_pool(pid).await {
399 Ok(cycles) => {
400 entry.cycles = cycles;
401 entry.status = CanisterPoolStatus::Ready;
402 }
403 Err(err) => {
404 entry.status = CanisterPoolStatus::Failed {
405 reason: err.to_string(),
406 };
407 log!(
408 Topic::CanisterPool,
409 Warn,
410 "pool reset failed for {pid}: {err}"
411 );
412 }
413 }
414
415 if !CanisterPoolStorageOps::update(pid, entry) {
416 log!(
417 Topic::CanisterPool,
418 Warn,
419 "pool reset update missing for {pid}"
420 );
421 }
422 }
423
424 Ok(())
425 }
426}
427
428pub async fn pool_create_canister() -> Result<Principal, Error> {
433 OpsError::require_root()?;
434
435 let cycles = Cycles::new(POOL_CANISTER_CYCLES);
436 let pid = create_canister(pool_controllers(), cycles.clone()).await?;
437
438 CanisterPoolStorageOps::register(pid, cycles, CanisterPoolStatus::Ready, None, None, None);
439 Ok(pid)
440}
441
442pub async fn pool_import_canister(pid: Principal) -> Result<(), Error> {
443 OpsError::require_root()?;
444
445 register_or_update_preserving_metadata(
446 pid,
447 Cycles::default(),
448 CanisterPoolStatus::PendingReset,
449 None,
450 None,
451 None,
452 );
453 let _ = SubnetCanisterRegistryOps::remove(&pid);
454 match reset_into_pool(pid).await {
455 Ok(cycles) => {
456 register_or_update_preserving_metadata(
457 pid,
458 cycles,
459 CanisterPoolStatus::Ready,
460 None,
461 None,
462 None,
463 );
464 }
465 Err(err) => {
466 log!(
467 Topic::CanisterPool,
468 Warn,
469 "pool import failed for {pid}: {err}"
470 );
471 register_or_update_preserving_metadata(
472 pid,
473 Cycles::default(),
474 CanisterPoolStatus::Failed {
475 reason: err.to_string(),
476 },
477 None,
478 None,
479 None,
480 );
481 }
482 }
483 Ok(())
484}
485
486fn pool_import_queued_canisters(pids: Vec<Principal>) -> Result<(u64, u64, u64, u64), Error> {
487 pool_import_queued_canisters_inner(pids, true)
488}
489
490fn pool_import_queued_canisters_inner(
491 pids: Vec<Principal>,
492 enforce_root: bool,
493) -> Result<(u64, u64, u64, u64), Error> {
494 if enforce_root {
495 OpsError::require_root()?;
496 }
497
498 let mut added = 0;
499 let mut requeued = 0;
500 let mut skipped = 0;
501
502 for pid in &pids {
503 if SubnetCanisterRegistryOps::get(*pid).is_some() {
504 skipped += 1;
505 continue;
506 }
507
508 if let Some(entry) = CanisterPoolStorageOps::get(*pid) {
509 if entry.status.is_failed() {
510 register_or_update_preserving_metadata(
511 *pid,
512 Cycles::default(),
513 CanisterPoolStatus::PendingReset,
514 None,
515 None,
516 None,
517 );
518 requeued += 1;
519 } else {
520 skipped += 1;
521 }
522 continue;
523 }
524
525 register_or_update_preserving_metadata(
526 *pid,
527 Cycles::default(),
528 CanisterPoolStatus::PendingReset,
529 None,
530 None,
531 None,
532 );
533 added += 1;
534 }
535
536 maybe_schedule_reset_worker();
537
538 Ok((added, requeued, skipped, pids.len() as u64))
539}
540
541#[cfg(not(test))]
542fn maybe_schedule_reset_worker() {
543 PoolOps::schedule_reset_worker();
544}
545
546#[cfg(test)]
547fn maybe_schedule_reset_worker() {
548 RESET_SCHEDULED.with_borrow_mut(|flag| *flag = true);
549}
550
551#[cfg(test)]
552thread_local! {
553 static RESET_SCHEDULED: RefCell<bool> = const { RefCell::new(false) };
554}
555
556#[cfg(test)]
557fn take_reset_scheduled() -> bool {
558 RESET_SCHEDULED.with_borrow_mut(|flag| {
559 let value = *flag;
560 *flag = false;
561 value
562 })
563}
564
565fn pool_requeue_failed(pids: Option<Vec<Principal>>) -> Result<(u64, u64, u64), Error> {
566 pool_requeue_failed_inner(pids, true)
567}
568
569fn pool_requeue_failed_inner(
570 pids: Option<Vec<Principal>>,
571 enforce_root: bool,
572) -> Result<(u64, u64, u64), Error> {
573 if enforce_root {
574 OpsError::require_root()?;
575 }
576
577 let mut requeued = 0;
578 let mut skipped = 0;
579 let total;
580
581 if let Some(pids) = pids {
582 total = pids.len() as u64;
583 for pid in pids {
584 if let Some(entry) = CanisterPoolStorageOps::get(pid) {
585 if entry.status.is_failed() {
586 register_or_update_preserving_metadata(
587 pid,
588 Cycles::default(),
589 CanisterPoolStatus::PendingReset,
590 None,
591 None,
592 None,
593 );
594 requeued += 1;
595 } else {
596 skipped += 1;
597 }
598 } else {
599 skipped += 1;
600 }
601 }
602 } else {
603 let entries = CanisterPoolStorageOps::export();
604 total = entries.len() as u64;
605 for (pid, entry) in entries {
606 if entry.status.is_failed() {
607 register_or_update_preserving_metadata(
608 pid,
609 Cycles::default(),
610 CanisterPoolStatus::PendingReset,
611 None,
612 None,
613 None,
614 );
615 requeued += 1;
616 } else {
617 skipped += 1;
618 }
619 }
620 }
621
622 if requeued > 0 {
623 maybe_schedule_reset_worker();
624 }
625
626 Ok((requeued, skipped, total))
627}
628
629pub async fn pool_recycle_canister(pid: Principal) -> Result<(), Error> {
630 OpsError::require_root()?;
631
632 let entry =
633 SubnetCanisterRegistryOps::get(pid).ok_or(PoolOpsError::PoolEntryMissing { pid })?;
634
635 let role = Some(entry.role.clone());
636 let hash = entry.module_hash.clone();
637
638 let _ = SubnetCanisterRegistryOps::remove(&pid);
639
640 let cycles = reset_into_pool(pid).await?;
641 CanisterPoolStorageOps::register(pid, cycles, CanisterPoolStatus::Ready, role, None, hash);
642
643 Ok(())
644}
645
646pub async fn pool_export_canister(pid: Principal) -> Result<(CanisterRole, Vec<u8>), Error> {
647 OpsError::require_root()?;
648
649 let entry = CanisterPoolStorageOps::take(&pid).ok_or(PoolOpsError::PoolEntryMissing { pid })?;
650
651 if !entry.status.is_ready() {
652 return Err(PoolOpsError::PoolEntryNotReady { pid }.into());
653 }
654
655 let role = entry.role.ok_or(PoolOpsError::MissingType { pid })?;
656 let hash = entry
657 .module_hash
658 .ok_or(PoolOpsError::MissingModuleHash { pid })?;
659
660 Ok((role, hash))
661}
662
663pub async fn recycle_via_orchestrator(pid: Principal) -> Result<(), Error> {
668 use crate::ops::orchestration::orchestrator::{CanisterLifecycleOrchestrator, LifecycleEvent};
669
670 CanisterLifecycleOrchestrator::apply(LifecycleEvent::RecycleToPool { pid })
671 .await
672 .map(|_| ())
673}
674
675#[cfg(test)]
680mod tests {
681 use super::*;
682 use crate::{
683 ids::CanisterRole,
684 model::memory::{CanisterEntry, pool::CanisterPool, topology::SubnetCanisterRegistry},
685 };
686 use candid::Principal;
687
688 fn p(id: u8) -> Principal {
689 Principal::from_slice(&[id; 29])
690 }
691
692 fn reset_state() {
693 CanisterPool::clear();
694 SubnetCanisterRegistry::clear_for_tests();
695 let _ = take_reset_scheduled();
696 }
697
698 #[test]
699 fn import_queued_registers_pending_entries() {
700 reset_state();
701
702 let p1 = p(1);
703 let p2 = p(2);
704
705 let (added, requeued, skipped, total) =
706 pool_import_queued_canisters_inner(vec![p1, p2], false).unwrap();
707 assert_eq!(added, 2);
708 assert_eq!(requeued, 0);
709 assert_eq!(skipped, 0);
710 assert_eq!(total, 2);
711
712 let e1 = CanisterPoolStorageOps::get(p1).unwrap();
713 let e2 = CanisterPoolStorageOps::get(p2).unwrap();
714 assert!(e1.status.is_pending_reset());
715 assert!(e2.status.is_pending_reset());
716 assert_eq!(e1.cycles, Cycles::default());
717 assert_eq!(e2.cycles, Cycles::default());
718 }
719
720 #[test]
721 fn import_queued_requeues_failed_entries() {
722 reset_state();
723
724 let p1 = p(3);
725 CanisterPoolStorageOps::register(
726 p1,
727 Cycles::new(10),
728 CanisterPoolStatus::Failed {
729 reason: "nope".to_string(),
730 },
731 None,
732 None,
733 None,
734 );
735
736 let (added, requeued, skipped, total) =
737 pool_import_queued_canisters_inner(vec![p1], false).unwrap();
738 assert_eq!(added, 0);
739 assert_eq!(requeued, 1);
740 assert_eq!(skipped, 0);
741 assert_eq!(total, 1);
742 assert!(take_reset_scheduled());
743
744 let entry = CanisterPoolStorageOps::get(p1).unwrap();
745 assert!(entry.status.is_pending_reset());
746 assert_eq!(entry.cycles, Cycles::default());
747 }
748
749 #[test]
750 fn import_queued_skips_ready_entries() {
751 reset_state();
752
753 let p1 = p(4);
754 CanisterPoolStorageOps::register(
755 p1,
756 Cycles::new(42),
757 CanisterPoolStatus::Ready,
758 None,
759 None,
760 None,
761 );
762
763 let (added, requeued, skipped, total) =
764 pool_import_queued_canisters_inner(vec![p1], false).unwrap();
765 assert_eq!(added, 0);
766 assert_eq!(requeued, 0);
767 assert_eq!(skipped, 1);
768 assert_eq!(total, 1);
769
770 let entry = CanisterPoolStorageOps::get(p1).unwrap();
771 assert!(entry.status.is_ready());
772 assert_eq!(entry.cycles, Cycles::new(42));
773 }
774
775 #[test]
776 fn import_queued_skips_registry_canisters() {
777 reset_state();
778
779 let pid = p(5);
780 SubnetCanisterRegistry::insert_for_tests(CanisterEntry {
781 pid,
782 role: CanisterRole::new("alpha"),
783 parent_pid: None,
784 module_hash: None,
785 created_at: 0,
786 });
787
788 let (added, requeued, skipped, total) =
789 pool_import_queued_canisters_inner(vec![pid], false).unwrap();
790 assert_eq!(added, 0);
791 assert_eq!(requeued, 0);
792 assert_eq!(skipped, 1);
793 assert_eq!(total, 1);
794 assert!(CanisterPoolStorageOps::get(pid).is_none());
795 }
796
797 #[test]
798 fn register_or_update_preserves_metadata() {
799 reset_state();
800
801 let pid = p(6);
802 let role = CanisterRole::new("alpha");
803 let parent = p(9);
804 let hash = vec![1, 2, 3];
805
806 CanisterPoolStorageOps::register(
807 pid,
808 Cycles::new(7),
809 CanisterPoolStatus::Failed {
810 reason: "oops".to_string(),
811 },
812 Some(role.clone()),
813 Some(parent),
814 Some(hash.clone()),
815 );
816
817 register_or_update_preserving_metadata(
818 pid,
819 Cycles::default(),
820 CanisterPoolStatus::PendingReset,
821 None,
822 None,
823 None,
824 );
825
826 let entry = CanisterPoolStorageOps::get(pid).unwrap();
827 assert!(entry.status.is_pending_reset());
828 assert_eq!(entry.cycles, Cycles::default());
829 assert_eq!(entry.role, Some(role));
830 assert_eq!(entry.parent, Some(parent));
831 assert_eq!(entry.module_hash, Some(hash));
832 }
833
834 #[test]
835 fn requeue_failed_scans_pool_and_schedules() {
836 reset_state();
837
838 let failed_pid = p(7);
839 let ready_pid = p(8);
840
841 CanisterPoolStorageOps::register(
842 failed_pid,
843 Cycles::new(11),
844 CanisterPoolStatus::Failed {
845 reason: "bad".to_string(),
846 },
847 None,
848 None,
849 None,
850 );
851 CanisterPoolStorageOps::register(
852 ready_pid,
853 Cycles::new(22),
854 CanisterPoolStatus::Ready,
855 None,
856 None,
857 None,
858 );
859
860 let (requeued, skipped, total) = pool_requeue_failed_inner(None, false).unwrap();
861 assert_eq!(requeued, 1);
862 assert_eq!(skipped, 1);
863 assert_eq!(total, 2);
864 assert!(take_reset_scheduled());
865
866 let failed_entry = CanisterPoolStorageOps::get(failed_pid).unwrap();
867 let ready_entry = CanisterPoolStorageOps::get(ready_pid).unwrap();
868 assert!(failed_entry.status.is_pending_reset());
869 assert_eq!(failed_entry.cycles, Cycles::default());
870 assert!(ready_entry.status.is_ready());
871 assert_eq!(ready_entry.cycles, Cycles::new(22));
872 }
873}