1pub use crate::ops::storage::pool::{CanisterPoolEntry, CanisterPoolStatus, CanisterPoolView};
15
16use crate::{
17 Error, ThisError,
18 cdk::{
19 api::canister_self,
20 futures::spawn,
21 mgmt::{CanisterSettings, UpdateSettingsArgs},
22 types::Principal,
23 },
24 config::Config,
25 log::Topic,
26 ops::{
27 OPS_POOL_CHECK_INTERVAL, OPS_POOL_INIT_DELAY, OpsError,
28 config::ConfigOps,
29 ic::{
30 get_cycles,
31 mgmt::{create_canister, uninstall_code},
32 timer::{TimerId, TimerOps},
33 update_settings,
34 },
35 prelude::*,
36 storage::{pool::CanisterPoolStorageOps, topology::SubnetCanisterRegistryOps},
37 },
38 types::{Cycles, TC},
39};
40use candid::CandidType;
41use serde::Deserialize;
42use std::{cell::RefCell, time::Duration};
43
44thread_local! {
49 static TIMER: RefCell<Option<TimerId>> = const { RefCell::new(None) };
50 static RESET_IN_PROGRESS: RefCell<bool> = const { RefCell::new(false) };
51 static RESET_RESCHEDULE: RefCell<bool> = const { RefCell::new(false) };
52 static RESET_TIMER: RefCell<Option<TimerId>> = const { RefCell::new(None) };
53}
54
55const POOL_CANISTER_CYCLES: u128 = 5 * TC;
57
58const POOL_RESET_BATCH_SIZE: usize = 10;
60
61#[derive(Debug, ThisError)]
66pub enum PoolOpsError {
67 #[error("pool entry missing for {pid}")]
68 PoolEntryMissing { pid: Principal },
69
70 #[error("missing module hash for pool entry {pid}")]
71 MissingModuleHash { pid: Principal },
72
73 #[error("missing type for pool entry {pid}")]
74 MissingType { pid: Principal },
75
76 #[error("pool entry {pid} is not ready")]
77 PoolEntryNotReady { pid: Principal },
78}
79
80impl From<PoolOpsError> for Error {
81 fn from(err: PoolOpsError) -> Self {
82 OpsError::from(err).into()
83 }
84}
85
86#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
91pub enum PoolAdminCommand {
92 CreateEmpty,
93 Recycle { pid: Principal },
94 ImportImmediate { pid: Principal },
95 ImportQueued { pids: Vec<Principal> },
96 RequeueFailed { pids: Option<Vec<Principal>> },
97}
98
99#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
104pub enum PoolAdminResponse {
105 Created {
106 pid: Principal,
107 },
108 Recycled,
109 Imported,
110 QueuedImported {
111 added: u64,
112 requeued: u64,
113 skipped: u64,
114 total: u64,
115 },
116 FailedRequeued {
117 requeued: u64,
118 skipped: u64,
119 total: u64,
120 },
121}
122
123fn pool_controllers() -> Vec<Principal> {
128 let mut controllers = Config::try_get()
129 .map(|cfg| cfg.controllers.clone())
130 .unwrap_or_default();
131
132 let root = canister_self();
133 if !controllers.contains(&root) {
134 controllers.push(root);
135 }
136
137 controllers
138}
139
140async fn reset_into_pool(pid: Principal) -> Result<Cycles, Error> {
141 uninstall_code(pid).await?;
142
143 update_settings(&UpdateSettingsArgs {
144 canister_id: pid,
145 settings: CanisterSettings {
146 controllers: Some(pool_controllers()),
147 ..Default::default()
148 },
149 })
150 .await?;
151
152 get_cycles(pid).await
153}
154
155fn register_or_update_preserving_metadata(
156 pid: Principal,
157 cycles: Cycles,
158 status: CanisterPoolStatus,
159 role: Option<CanisterRole>,
160 parent: Option<Principal>,
161 module_hash: Option<Vec<u8>>,
162) {
163 if let Some(mut entry) = CanisterPoolStorageOps::get(pid) {
164 entry.cycles = cycles;
165 entry.status = status;
166 entry.role = role.or(entry.role);
167 entry.parent = parent.or(entry.parent);
168 entry.module_hash = module_hash.or(entry.module_hash);
169 let _ = CanisterPoolStorageOps::update(pid, entry);
170 } else {
171 CanisterPoolStorageOps::register(pid, cycles, status, role, parent, module_hash);
172 }
173}
174
175pub struct PoolOps;
180
181impl PoolOps {
182 pub fn start() {
187 TIMER.with_borrow_mut(|slot| {
188 if slot.is_some() {
189 return;
190 }
191
192 let id = TimerOps::set(OPS_POOL_INIT_DELAY, "pool:init", async {
193 let _ = Self::check();
194
195 let interval =
196 TimerOps::set_interval(OPS_POOL_CHECK_INTERVAL, "pool:interval", || async {
197 let _ = Self::check();
198 });
199
200 TIMER.with_borrow_mut(|slot| *slot = Some(interval));
201 });
202
203 *slot = Some(id);
204 });
205 }
206
207 pub fn stop() {
208 TIMER.with_borrow_mut(|slot| {
209 if let Some(id) = slot.take() {
210 TimerOps::clear(id);
211 }
212 });
213 }
214
215 #[must_use]
220 pub fn check() -> u64 {
221 Self::schedule_reset_worker();
222
223 let subnet_cfg = match ConfigOps::current_subnet() {
224 Ok(cfg) => cfg,
225 Err(e) => {
226 log!(
227 Topic::CanisterPool,
228 Warn,
229 "cannot read subnet config: {e:?}"
230 );
231 return 0;
232 }
233 };
234
235 let min_size: u64 = subnet_cfg.pool.minimum_size.into();
236 let ready_size = Self::ready_len();
237
238 if ready_size >= min_size {
239 return 0;
240 }
241
242 let missing = (min_size - ready_size).min(10);
243 log!(
244 Topic::CanisterPool,
245 Ok,
246 "pool low: {ready_size}/{min_size}, creating {missing}"
247 );
248
249 spawn(async move {
250 for i in 0..missing {
251 match pool_create_canister().await {
252 Ok(_) => log!(
253 Topic::CanisterPool,
254 Ok,
255 "created pool canister {}/{}",
256 i + 1,
257 missing
258 ),
259 Err(e) => log!(Topic::CanisterPool, Warn, "pool creation failed: {e:?}"),
260 }
261 }
262 });
263
264 missing
265 }
266
267 #[must_use]
268 pub fn pop_ready() -> Option<(Principal, CanisterPoolEntry)> {
269 CanisterPoolStorageOps::pop_ready()
270 }
271
272 #[must_use]
273 pub fn contains(pid: &Principal) -> bool {
274 CanisterPoolStorageOps::contains(pid)
275 }
276
277 #[must_use]
278 pub fn export() -> CanisterPoolView {
279 CanisterPoolStorageOps::export()
280 }
281
282 pub async fn admin(cmd: PoolAdminCommand) -> Result<PoolAdminResponse, Error> {
283 match cmd {
284 PoolAdminCommand::CreateEmpty => {
285 let pid = pool_create_canister().await?;
286 Ok(PoolAdminResponse::Created { pid })
287 }
288 PoolAdminCommand::Recycle { pid } => {
289 pool_recycle_canister(pid).await?;
290 Ok(PoolAdminResponse::Recycled)
291 }
292 PoolAdminCommand::ImportImmediate { pid } => {
293 pool_import_canister(pid).await?;
294 Ok(PoolAdminResponse::Imported)
295 }
296 PoolAdminCommand::ImportQueued { pids } => {
297 let (a, r, s, t) = pool_import_queued_canisters(pids)?;
298 Ok(PoolAdminResponse::QueuedImported {
299 added: a,
300 requeued: r,
301 skipped: s,
302 total: t,
303 })
304 }
305 PoolAdminCommand::RequeueFailed { pids } => {
306 let (requeued, skipped, total) = pool_requeue_failed(pids)?;
307 Ok(PoolAdminResponse::FailedRequeued {
308 requeued,
309 skipped,
310 total,
311 })
312 }
313 }
314 }
315
316 fn ready_len() -> u64 {
321 CanisterPoolStorageOps::export()
322 .into_iter()
323 .filter(|(_, e)| e.status.is_ready())
324 .count() as u64
325 }
326
327 fn has_pending_reset() -> bool {
328 CanisterPoolStorageOps::export()
329 .into_iter()
330 .any(|(_, e)| e.status.is_pending_reset())
331 }
332
333 fn maybe_reschedule() {
334 let reschedule = RESET_RESCHEDULE.with_borrow_mut(|f| {
335 let v = *f;
336 *f = false;
337 v
338 });
339
340 if reschedule || Self::has_pending_reset() {
341 Self::schedule_reset_worker();
342 }
343 }
344
345 fn schedule_reset_worker() {
346 RESET_TIMER.with_borrow_mut(|slot| {
347 if slot.is_some() {
348 return;
349 }
350
351 let id = TimerOps::set(Duration::ZERO, "pool:pending", async {
352 RESET_TIMER.with_borrow_mut(|slot| *slot = None);
353 let _ = Self::run_reset_worker(POOL_RESET_BATCH_SIZE).await;
354 });
355
356 *slot = Some(id);
357 });
358 }
359
360 async fn run_reset_worker(limit: usize) -> Result<(), Error> {
361 if limit == 0 {
362 return Ok(());
363 }
364
365 let should_run = RESET_IN_PROGRESS.with_borrow_mut(|flag| {
366 if *flag {
367 RESET_RESCHEDULE.with_borrow_mut(|r| *r = true);
368 false
369 } else {
370 *flag = true;
371 true
372 }
373 });
374
375 if !should_run {
376 return Ok(());
377 }
378
379 let result = Self::run_reset_batch(limit).await;
380
381 RESET_IN_PROGRESS.with_borrow_mut(|f| *f = false);
382 Self::maybe_reschedule();
383
384 result
385 }
386
387 async fn run_reset_batch(limit: usize) -> Result<(), Error> {
388 let mut pending: Vec<_> = CanisterPoolStorageOps::export()
389 .into_iter()
390 .filter(|(_, e)| e.status.is_pending_reset())
391 .collect();
392
393 if pending.is_empty() {
394 return Ok(());
395 }
396
397 pending.sort_by_key(|(_, e)| e.created_at);
398
399 for (pid, mut entry) in pending.into_iter().take(limit) {
400 match reset_into_pool(pid).await {
401 Ok(cycles) => {
402 entry.cycles = cycles;
403 entry.status = CanisterPoolStatus::Ready;
404 }
405 Err(err) => {
406 entry.status = CanisterPoolStatus::Failed {
407 reason: err.to_string(),
408 };
409 log!(
410 Topic::CanisterPool,
411 Warn,
412 "pool reset failed for {pid}: {err}"
413 );
414 }
415 }
416
417 if !CanisterPoolStorageOps::update(pid, entry) {
418 log!(
419 Topic::CanisterPool,
420 Warn,
421 "pool reset update missing for {pid}"
422 );
423 }
424 }
425
426 Ok(())
427 }
428}
429
430pub async fn pool_create_canister() -> Result<Principal, Error> {
435 OpsError::require_root()?;
436
437 let cycles = Cycles::new(POOL_CANISTER_CYCLES);
438 let pid = create_canister(pool_controllers(), cycles.clone()).await?;
439
440 CanisterPoolStorageOps::register(pid, cycles, CanisterPoolStatus::Ready, None, None, None);
441 Ok(pid)
442}
443
444pub async fn pool_import_canister(pid: Principal) -> Result<(), Error> {
445 OpsError::require_root()?;
446
447 register_or_update_preserving_metadata(
448 pid,
449 Cycles::default(),
450 CanisterPoolStatus::PendingReset,
451 None,
452 None,
453 None,
454 );
455 let _ = SubnetCanisterRegistryOps::remove(&pid);
456 match reset_into_pool(pid).await {
457 Ok(cycles) => {
458 register_or_update_preserving_metadata(
459 pid,
460 cycles,
461 CanisterPoolStatus::Ready,
462 None,
463 None,
464 None,
465 );
466 }
467 Err(err) => {
468 log!(
469 Topic::CanisterPool,
470 Warn,
471 "pool import failed for {pid}: {err}"
472 );
473 register_or_update_preserving_metadata(
474 pid,
475 Cycles::default(),
476 CanisterPoolStatus::Failed {
477 reason: err.to_string(),
478 },
479 None,
480 None,
481 None,
482 );
483 }
484 }
485 Ok(())
486}
487
488fn pool_import_queued_canisters(pids: Vec<Principal>) -> Result<(u64, u64, u64, u64), Error> {
489 pool_import_queued_canisters_inner(pids, true)
490}
491
492fn pool_import_queued_canisters_inner(
493 pids: Vec<Principal>,
494 enforce_root: bool,
495) -> Result<(u64, u64, u64, u64), Error> {
496 if enforce_root {
497 OpsError::require_root()?;
498 }
499
500 let mut added = 0;
501 let mut requeued = 0;
502 let mut skipped = 0;
503
504 for pid in &pids {
505 if SubnetCanisterRegistryOps::get(*pid).is_some() {
506 skipped += 1;
507 continue;
508 }
509
510 if let Some(entry) = CanisterPoolStorageOps::get(*pid) {
511 if entry.status.is_failed() {
512 register_or_update_preserving_metadata(
513 *pid,
514 Cycles::default(),
515 CanisterPoolStatus::PendingReset,
516 None,
517 None,
518 None,
519 );
520 requeued += 1;
521 } else {
522 skipped += 1;
523 }
524 continue;
525 }
526
527 register_or_update_preserving_metadata(
528 *pid,
529 Cycles::default(),
530 CanisterPoolStatus::PendingReset,
531 None,
532 None,
533 None,
534 );
535 added += 1;
536 }
537
538 maybe_schedule_reset_worker();
539
540 Ok((added, requeued, skipped, pids.len() as u64))
541}
542
543#[cfg(not(test))]
544fn maybe_schedule_reset_worker() {
545 PoolOps::schedule_reset_worker();
546}
547
548#[cfg(test)]
549fn maybe_schedule_reset_worker() {
550 RESET_SCHEDULED.with_borrow_mut(|flag| *flag = true);
551}
552
553#[cfg(test)]
554thread_local! {
555 static RESET_SCHEDULED: RefCell<bool> = const { RefCell::new(false) };
556}
557
558#[cfg(test)]
559fn take_reset_scheduled() -> bool {
560 RESET_SCHEDULED.with_borrow_mut(|flag| {
561 let value = *flag;
562 *flag = false;
563 value
564 })
565}
566
567fn pool_requeue_failed(pids: Option<Vec<Principal>>) -> Result<(u64, u64, u64), Error> {
568 pool_requeue_failed_inner(pids, true)
569}
570
571fn pool_requeue_failed_inner(
572 pids: Option<Vec<Principal>>,
573 enforce_root: bool,
574) -> Result<(u64, u64, u64), Error> {
575 if enforce_root {
576 OpsError::require_root()?;
577 }
578
579 let mut requeued = 0;
580 let mut skipped = 0;
581 let total;
582
583 if let Some(pids) = pids {
584 total = pids.len() as u64;
585 for pid in pids {
586 if let Some(entry) = CanisterPoolStorageOps::get(pid) {
587 if entry.status.is_failed() {
588 register_or_update_preserving_metadata(
589 pid,
590 Cycles::default(),
591 CanisterPoolStatus::PendingReset,
592 None,
593 None,
594 None,
595 );
596 requeued += 1;
597 } else {
598 skipped += 1;
599 }
600 } else {
601 skipped += 1;
602 }
603 }
604 } else {
605 let entries = CanisterPoolStorageOps::export();
606 total = entries.len() as u64;
607 for (pid, entry) in entries {
608 if entry.status.is_failed() {
609 register_or_update_preserving_metadata(
610 pid,
611 Cycles::default(),
612 CanisterPoolStatus::PendingReset,
613 None,
614 None,
615 None,
616 );
617 requeued += 1;
618 } else {
619 skipped += 1;
620 }
621 }
622 }
623
624 if requeued > 0 {
625 maybe_schedule_reset_worker();
626 }
627
628 Ok((requeued, skipped, total))
629}
630
631pub async fn pool_recycle_canister(pid: Principal) -> Result<(), Error> {
632 OpsError::require_root()?;
633
634 let entry =
635 SubnetCanisterRegistryOps::get(pid).ok_or(PoolOpsError::PoolEntryMissing { pid })?;
636
637 let role = Some(entry.role.clone());
638 let hash = entry.module_hash.clone();
639
640 let _ = SubnetCanisterRegistryOps::remove(&pid);
641
642 let cycles = reset_into_pool(pid).await?;
643 CanisterPoolStorageOps::register(pid, cycles, CanisterPoolStatus::Ready, role, None, hash);
644
645 Ok(())
646}
647
648pub async fn pool_export_canister(pid: Principal) -> Result<(CanisterRole, Vec<u8>), Error> {
649 OpsError::require_root()?;
650
651 let entry = CanisterPoolStorageOps::take(&pid).ok_or(PoolOpsError::PoolEntryMissing { pid })?;
652
653 if !entry.status.is_ready() {
654 return Err(PoolOpsError::PoolEntryNotReady { pid }.into());
655 }
656
657 let role = entry.role.ok_or(PoolOpsError::MissingType { pid })?;
658 let hash = entry
659 .module_hash
660 .ok_or(PoolOpsError::MissingModuleHash { pid })?;
661
662 Ok((role, hash))
663}
664
665pub async fn recycle_via_orchestrator(pid: Principal) -> Result<(), Error> {
670 use crate::ops::orchestration::orchestrator::{CanisterLifecycleOrchestrator, LifecycleEvent};
671
672 CanisterLifecycleOrchestrator::apply(LifecycleEvent::RecycleToPool { pid })
673 .await
674 .map(|_| ())
675}
676
677#[cfg(test)]
682mod tests {
683 use super::*;
684 use crate::{
685 ids::CanisterRole,
686 model::memory::{CanisterEntry, pool::CanisterPool, topology::SubnetCanisterRegistry},
687 };
688 use candid::Principal;
689
690 fn p(id: u8) -> Principal {
691 Principal::from_slice(&[id; 29])
692 }
693
694 fn reset_state() {
695 CanisterPool::clear();
696 SubnetCanisterRegistry::clear_for_tests();
697 let _ = take_reset_scheduled();
698 }
699
700 #[test]
701 fn import_queued_registers_pending_entries() {
702 reset_state();
703
704 let p1 = p(1);
705 let p2 = p(2);
706
707 let (added, requeued, skipped, total) =
708 pool_import_queued_canisters_inner(vec![p1, p2], false).unwrap();
709 assert_eq!(added, 2);
710 assert_eq!(requeued, 0);
711 assert_eq!(skipped, 0);
712 assert_eq!(total, 2);
713
714 let e1 = CanisterPoolStorageOps::get(p1).unwrap();
715 let e2 = CanisterPoolStorageOps::get(p2).unwrap();
716 assert!(e1.status.is_pending_reset());
717 assert!(e2.status.is_pending_reset());
718 assert_eq!(e1.cycles, Cycles::default());
719 assert_eq!(e2.cycles, Cycles::default());
720 }
721
722 #[test]
723 fn import_queued_requeues_failed_entries() {
724 reset_state();
725
726 let p1 = p(3);
727 CanisterPoolStorageOps::register(
728 p1,
729 Cycles::new(10),
730 CanisterPoolStatus::Failed {
731 reason: "nope".to_string(),
732 },
733 None,
734 None,
735 None,
736 );
737
738 let (added, requeued, skipped, total) =
739 pool_import_queued_canisters_inner(vec![p1], false).unwrap();
740 assert_eq!(added, 0);
741 assert_eq!(requeued, 1);
742 assert_eq!(skipped, 0);
743 assert_eq!(total, 1);
744 assert!(take_reset_scheduled());
745
746 let entry = CanisterPoolStorageOps::get(p1).unwrap();
747 assert!(entry.status.is_pending_reset());
748 assert_eq!(entry.cycles, Cycles::default());
749 }
750
751 #[test]
752 fn import_queued_skips_ready_entries() {
753 reset_state();
754
755 let p1 = p(4);
756 CanisterPoolStorageOps::register(
757 p1,
758 Cycles::new(42),
759 CanisterPoolStatus::Ready,
760 None,
761 None,
762 None,
763 );
764
765 let (added, requeued, skipped, total) =
766 pool_import_queued_canisters_inner(vec![p1], false).unwrap();
767 assert_eq!(added, 0);
768 assert_eq!(requeued, 0);
769 assert_eq!(skipped, 1);
770 assert_eq!(total, 1);
771
772 let entry = CanisterPoolStorageOps::get(p1).unwrap();
773 assert!(entry.status.is_ready());
774 assert_eq!(entry.cycles, Cycles::new(42));
775 }
776
777 #[test]
778 fn import_queued_skips_registry_canisters() {
779 reset_state();
780
781 let pid = p(5);
782 SubnetCanisterRegistry::insert_for_tests(CanisterEntry {
783 pid,
784 role: CanisterRole::new("alpha"),
785 parent_pid: None,
786 module_hash: None,
787 created_at: 0,
788 });
789
790 let (added, requeued, skipped, total) =
791 pool_import_queued_canisters_inner(vec![pid], false).unwrap();
792 assert_eq!(added, 0);
793 assert_eq!(requeued, 0);
794 assert_eq!(skipped, 1);
795 assert_eq!(total, 1);
796 assert!(CanisterPoolStorageOps::get(pid).is_none());
797 }
798
799 #[test]
800 fn register_or_update_preserves_metadata() {
801 reset_state();
802
803 let pid = p(6);
804 let role = CanisterRole::new("alpha");
805 let parent = p(9);
806 let hash = vec![1, 2, 3];
807
808 CanisterPoolStorageOps::register(
809 pid,
810 Cycles::new(7),
811 CanisterPoolStatus::Failed {
812 reason: "oops".to_string(),
813 },
814 Some(role.clone()),
815 Some(parent),
816 Some(hash.clone()),
817 );
818
819 register_or_update_preserving_metadata(
820 pid,
821 Cycles::default(),
822 CanisterPoolStatus::PendingReset,
823 None,
824 None,
825 None,
826 );
827
828 let entry = CanisterPoolStorageOps::get(pid).unwrap();
829 assert!(entry.status.is_pending_reset());
830 assert_eq!(entry.cycles, Cycles::default());
831 assert_eq!(entry.role, Some(role));
832 assert_eq!(entry.parent, Some(parent));
833 assert_eq!(entry.module_hash, Some(hash));
834 }
835
836 #[test]
837 fn requeue_failed_scans_pool_and_schedules() {
838 reset_state();
839
840 let failed_pid = p(7);
841 let ready_pid = p(8);
842
843 CanisterPoolStorageOps::register(
844 failed_pid,
845 Cycles::new(11),
846 CanisterPoolStatus::Failed {
847 reason: "bad".to_string(),
848 },
849 None,
850 None,
851 None,
852 );
853 CanisterPoolStorageOps::register(
854 ready_pid,
855 Cycles::new(22),
856 CanisterPoolStatus::Ready,
857 None,
858 None,
859 None,
860 );
861
862 let (requeued, skipped, total) = pool_requeue_failed_inner(None, false).unwrap();
863 assert_eq!(requeued, 1);
864 assert_eq!(skipped, 1);
865 assert_eq!(total, 2);
866 assert!(take_reset_scheduled());
867
868 let failed_entry = CanisterPoolStorageOps::get(failed_pid).unwrap();
869 let ready_entry = CanisterPoolStorageOps::get(ready_pid).unwrap();
870 assert!(failed_entry.status.is_pending_reset());
871 assert_eq!(failed_entry.cycles, Cycles::default());
872 assert!(ready_entry.status.is_ready());
873 assert_eq!(ready_entry.cycles, Cycles::new(22));
874 }
875}