canic_core/ops/placement/sharding/
policy.rs

1//! ===========================================================================
2//! Sharding Policy Logic
3//! ===========================================================================
4//!
5//! Pure, deterministic policy rules for sharding pools.
6//!
7//! Responsibilities:
8//! - Validates whether a pool may create new shards.
9//! - Determines which shard a tenant should be assigned to.
10//! - Provides read-only views of sharding state.
11//!
12//! Currently, shard assignment uses **HRW (Highest Random Weight)** selection.
13//! This ensures stable, fair, and deterministic tenant distribution.
14//!
15//! ===========================================================================
16
17use super::metrics::{PoolMetrics, pool_metrics};
18use super::{ShardingOpsError, ShardingRegistryDto};
19use crate::{
20    Error,
21    cdk::types::Principal,
22    config::schema::{ShardPool, ShardPoolPolicy},
23    ops::{
24        config::ConfigOps,
25        placement::sharding::hrw::HrwSelector,
26        storage::sharding::{ShardEntry, ShardingRegistryOps},
27    },
28};
29use candid::CandidType;
30use serde::{Deserialize, Serialize};
31use std::collections::{BTreeMap, BTreeSet};
32
33///
34/// ShardingPlan
35/// Result of a dry-run shard assignment plan (including the desired slot index).
36///
37
38#[derive(CandidType, Clone, Debug, Deserialize, Serialize)]
39pub struct ShardingPlan {
40    pub state: ShardingPlanState,
41    pub target_slot: Option<u32>,
42    pub utilization_pct: u32,
43    pub active_count: u32,
44    pub total_capacity: u64,
45    pub total_used: u64,
46}
47
48///
49/// CreateBlockedReason
50/// Structured reason for creation denial.
51///
52
53#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
54pub enum CreateBlockedReason {
55    PoolAtCapacity,
56    NoFreeSlots,
57    PolicyViolation(String),
58}
59
60impl std::fmt::Display for CreateBlockedReason {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        match self {
63            Self::PoolAtCapacity => write!(f, "shard cap reached"),
64            Self::NoFreeSlots => write!(f, "sharding pool has no free slots"),
65            Self::PolicyViolation(msg) => write!(f, "{msg}"),
66        }
67    }
68}
69
70///
71/// ShardingPlanState
72/// Outcome variants of a dry-run shard plan.
73///
74
75#[derive(CandidType, Clone, Debug, Deserialize, Serialize)]
76pub enum ShardingPlanState {
77    /// Tenant already has a shard assigned.
78    AlreadyAssigned { pid: Principal },
79
80    /// Tenant can be deterministically assigned to an existing shard (via HRW).
81    UseExisting { pid: Principal },
82
83    /// Policy allows creation of a new shard.
84    CreateAllowed,
85
86    /// Policy forbids creation of a new shard (e.g., capacity reached).
87    CreateBlocked { reason: CreateBlockedReason },
88}
89
90///
91/// ShardingPolicyOps
92///
93
94pub struct ShardingPolicyOps;
95
96impl ShardingPolicyOps {
97    // -----------------------------------------------------------------------
98    // Validation
99    // -----------------------------------------------------------------------
100
101    /// Return whether a pool may create a new shard under its policy.
102    #[must_use]
103    pub(crate) const fn can_create(metrics: &PoolMetrics, policy: &ShardPoolPolicy) -> bool {
104        metrics.active_count < policy.max_shards
105    }
106
107    // -----------------------------------------------------------------------
108    // Configuration Access
109    // -----------------------------------------------------------------------
110
111    /// Retrieve the shard pool configuration from the current canister’s config.
112    pub(crate) fn get_pool_config(pool: &str) -> Result<ShardPool, Error> {
113        let cfg = ConfigOps::current_canister();
114        let sharding_cfg = cfg.sharding.ok_or(ShardingOpsError::ShardingDisabled)?;
115        let pool_cfg = sharding_cfg
116            .pools
117            .get(pool)
118            .ok_or_else(|| ShardingOpsError::PoolNotFound(pool.to_string()))?
119            .clone();
120
121        Ok(pool_cfg)
122    }
123
124    // -----------------------------------------------------------------------
125    // Planning
126    // -----------------------------------------------------------------------
127
128    /// Perform a dry-run plan for assigning a tenant to a shard.
129    /// Never creates or mutates registry state.
130    pub fn plan_assign_to_pool(pool: &str, tenant: impl AsRef<str>) -> Result<ShardingPlan, Error> {
131        Self::plan_internal(pool, tenant.as_ref(), None)
132    }
133
134    /// Plan a reassignment for a tenant currently on `donor_pid`, excluding that shard.
135    /// Never creates or mutates registry state.
136    pub fn plan_reassign_from_shard(
137        pool: &str,
138        tenant: impl AsRef<str>,
139        donor_pid: Principal,
140    ) -> Result<ShardingPlan, Error> {
141        let tenant = tenant.as_ref();
142        Self::plan_internal(pool, tenant, Some(donor_pid))
143    }
144
145    fn plan_internal(
146        pool: &str,
147        tenant: &str,
148        exclude_pid: Option<Principal>,
149    ) -> Result<ShardingPlan, Error> {
150        let pool_cfg = Self::get_pool_config(pool)?;
151        let metrics = pool_metrics(pool);
152        let view = ShardingRegistryOps::export();
153        let slot_plan = plan_slot_backfill(pool, &view, pool_cfg.policy.max_shards);
154
155        if let Some(pid) = ShardingRegistryOps::tenant_shard(pool, tenant)
156            && exclude_pid != Some(pid)
157        {
158            let slot = slot_plan.slots.get(&pid).copied();
159            return Ok(Self::make_plan(
160                ShardingPlanState::AlreadyAssigned { pid },
161                &metrics,
162                slot,
163            ));
164        }
165
166        // Prefer an existing shard with spare capacity.
167        let shards_with_capacity: Vec<_> = view
168            .iter()
169            .filter(|(pid, entry)| {
170                entry.pool.as_ref() == pool && entry.has_capacity() && exclude_pid != Some(*pid)
171            })
172            .map(|(pid, _)| *pid)
173            .collect();
174
175        if let Some(target_pid) = HrwSelector::select(tenant, &shards_with_capacity) {
176            let slot = slot_plan.slots.get(&target_pid).copied();
177            return Ok(Self::make_plan(
178                ShardingPlanState::UseExisting { pid: target_pid },
179                &metrics,
180                slot,
181            ));
182        }
183
184        let max_slots = pool_cfg.policy.max_shards;
185        let free_slots: Vec<u32> = (0..max_slots)
186            .filter(|slot| !slot_plan.occupied.contains(slot))
187            .collect();
188
189        // Slot selection is independent of create eligibility; we still compute a target slot
190        // so callers can distinguish "no slots exist" from "policy forbids creating one".
191        let Some(target_slot) = HrwSelector::select_from_slots(pool, tenant, &free_slots) else {
192            return Ok(Self::make_plan(
193                ShardingPlanState::CreateBlocked {
194                    reason: CreateBlockedReason::NoFreeSlots,
195                },
196                &metrics,
197                None,
198            ));
199        };
200
201        if Self::can_create(&metrics, &pool_cfg.policy) {
202            Ok(Self::make_plan(
203                ShardingPlanState::CreateAllowed,
204                &metrics,
205                Some(target_slot),
206            ))
207        } else {
208            Ok(Self::make_plan(
209                ShardingPlanState::CreateBlocked {
210                    reason: CreateBlockedReason::PoolAtCapacity,
211                },
212                &metrics,
213                Some(target_slot),
214            ))
215        }
216    }
217
218    // -----------------------------------------------------------------------
219    // Registry Access Helpers
220    // -----------------------------------------------------------------------
221
222    /// Export a read-only view of the sharding registry.
223    #[must_use]
224    pub fn export() -> ShardingRegistryDto {
225        ShardingRegistryOps::export()
226    }
227
228    /// Lookup the shard assigned to a tenant, if any.
229    #[must_use]
230    pub fn lookup_tenant(pool: &str, tenant: impl AsRef<str>) -> Option<Principal> {
231        let tenant = tenant.as_ref();
232        ShardingRegistryOps::tenant_shard(pool, tenant)
233    }
234
235    /// Lookup the shard assigned to a tenant, returning an error if none exists.
236    pub fn try_lookup_tenant(pool: &str, tenant: impl AsRef<str>) -> Result<Principal, Error> {
237        let tenant = tenant.as_ref();
238        ShardingRegistryOps::tenant_shard(pool, tenant)
239            .ok_or_else(|| ShardingOpsError::TenantNotFound(tenant.to_string()).into())
240    }
241
242    // -----------------------------------------------------------------------
243    // Utilities
244    // -----------------------------------------------------------------------
245
246    /// Internal helper to construct a plan from metrics and state.
247    const fn make_plan(
248        state: ShardingPlanState,
249        metrics: &PoolMetrics,
250        slot: Option<u32>,
251    ) -> ShardingPlan {
252        ShardingPlan {
253            state,
254            target_slot: slot,
255            utilization_pct: metrics.utilization_pct,
256            active_count: metrics.active_count,
257            total_capacity: metrics.total_capacity,
258            total_used: metrics.total_used,
259        }
260    }
261}
262
263// -----------------------------------------------------------------------------
264// Slot backfilling (pure planning)
265// -----------------------------------------------------------------------------
266
267struct SlotBackfillPlan {
268    /// Effective slot mapping for shards in the pool (explicit or simulated).
269    slots: BTreeMap<Principal, u32>,
270    /// Slots considered occupied after deterministic backfill simulation.
271    occupied: BTreeSet<u32>,
272}
273
274fn plan_slot_backfill(
275    pool: &str,
276    view: &[(Principal, ShardEntry)],
277    max_slots: u32,
278) -> SlotBackfillPlan {
279    let mut entries: Vec<(Principal, ShardEntry)> = view
280        .iter()
281        .filter(|(_, entry)| entry.pool.as_ref() == pool)
282        .map(|(pid, entry)| (*pid, entry.clone()))
283        .collect();
284
285    entries.sort_by_key(|(pid, _)| *pid);
286
287    let mut slots = BTreeMap::<Principal, u32>::new();
288    let mut occupied = BTreeSet::<u32>::new();
289
290    for (pid, entry) in &entries {
291        if entry.has_assigned_slot() {
292            slots.insert(*pid, entry.slot);
293            occupied.insert(entry.slot);
294        }
295    }
296
297    if max_slots == 0 {
298        return SlotBackfillPlan { slots, occupied };
299    }
300
301    let available: Vec<u32> = (0..max_slots)
302        .filter(|slot| !occupied.contains(slot))
303        .collect();
304
305    if available.is_empty() {
306        return SlotBackfillPlan { slots, occupied };
307    }
308
309    let mut idx = 0usize;
310    for (pid, entry) in &entries {
311        if entry.has_assigned_slot() {
312            continue;
313        }
314
315        // NOTE: Backfill simulates slot assignment for existing shards only.
316        // Policy enforcement happens later; this function is purely positional.
317        if idx >= available.len() {
318            break;
319        }
320
321        let slot = available[idx];
322        idx += 1;
323        slots.insert(*pid, slot);
324        occupied.insert(slot);
325    }
326
327    SlotBackfillPlan { slots, occupied }
328}
329
330/// ===========================================================================
331/// Tests
332/// ===========================================================================
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337    use crate::{
338        config::Config,
339        ids::CanisterRole,
340        ops::storage::{env::EnvOps, sharding::ShardingRegistryOps},
341    };
342    use candid::Principal;
343
344    #[test]
345    fn can_create_blocks_when_at_capacity() {
346        let metrics = PoolMetrics {
347            active_count: 10,
348            total_capacity: 100,
349            total_used: 80,
350            utilization_pct: 80,
351        };
352        let policy = ShardPoolPolicy {
353            max_shards: 5,
354            ..Default::default()
355        };
356        assert!(!ShardingPolicyOps::can_create(&metrics, &policy));
357    }
358
359    #[test]
360    fn plan_returns_already_assigned_if_tenant_exists() {
361        let tenant = Principal::anonymous();
362        let plan = ShardingPlan {
363            state: ShardingPlanState::AlreadyAssigned { pid: tenant },
364            target_slot: Some(0),
365            utilization_pct: 50,
366            active_count: 2,
367            total_capacity: 100,
368            total_used: 50,
369        };
370        assert!(matches!(
371            plan.state,
372            ShardingPlanState::AlreadyAssigned { .. }
373        ));
374    }
375
376    fn p(id: u8) -> Principal {
377        Principal::from_slice(&[id; 29])
378    }
379
380    fn init_config() {
381        use crate::{
382            config::Config,
383            ids::{CanisterRole, SubnetRole},
384        };
385
386        let toml = r#"
387            [subnets.prime.canisters.manager]
388            initial_cycles = "5T"
389
390            [subnets.prime.canisters.manager.sharding.pools.primary]
391            canister_role = "shard"
392            [subnets.prime.canisters.manager.sharding.pools.primary.policy]
393            capacity = 1
394            max_shards = 2
395
396            [subnets.prime.canisters.shard]
397            initial_cycles = "5T"
398        "#;
399
400        Config::init_from_toml(toml).unwrap();
401        EnvOps::set_subnet_role(SubnetRole::PRIME);
402        EnvOps::set_canister_role(CanisterRole::from("manager"));
403    }
404
405    #[test]
406    fn plan_allows_creation_when_target_shard_full() {
407        Config::reset_for_tests();
408        init_config();
409        ShardingRegistryOps::clear_for_test();
410
411        let shard_role = CanisterRole::from("shard");
412        let shard = p(1);
413        ShardingRegistryOps::create(shard, "primary", 0, &shard_role, 1).unwrap();
414        ShardingRegistryOps::assign("primary", "tenant-a", shard).unwrap();
415
416        let plan = ShardingPolicyOps::plan_assign_to_pool("primary", "tenant-x").unwrap();
417
418        assert!(matches!(plan.state, ShardingPlanState::CreateAllowed));
419        Config::reset_for_tests();
420    }
421
422    #[test]
423    fn plan_blocks_creation_when_pool_at_capacity() {
424        Config::reset_for_tests();
425        init_config();
426        ShardingRegistryOps::clear_for_test();
427
428        let shard_role = CanisterRole::from("shard");
429        let shard_a = p(1);
430        let shard_b = p(2);
431        ShardingRegistryOps::create(shard_a, "primary", 0, &shard_role, 1).unwrap();
432        ShardingRegistryOps::create(shard_b, "primary", 1, &shard_role, 1).unwrap();
433        ShardingRegistryOps::assign("primary", "tenant-a", shard_a).unwrap();
434        ShardingRegistryOps::assign("primary", "tenant-b", shard_b).unwrap();
435
436        let plan = ShardingPolicyOps::plan_assign_to_pool("primary", "tenant-y").unwrap();
437
438        assert!(matches!(
439            plan.state,
440            ShardingPlanState::CreateBlocked { .. }
441        ));
442        Config::reset_for_tests();
443    }
444}