1use super::metrics::{PoolMetrics, pool_metrics};
18use super::{ShardingOpsError, ShardingRegistryDto};
19use crate::{
20 Error,
21 cdk::types::Principal,
22 config::schema::{ShardPool, ShardPoolPolicy},
23 ops::{
24 config::ConfigOps,
25 placement::sharding::hrw::HrwSelector,
26 storage::sharding::{ShardEntry, ShardingRegistryOps},
27 },
28};
29use candid::CandidType;
30use serde::{Deserialize, Serialize};
31use std::collections::{BTreeMap, BTreeSet};
32
33#[derive(CandidType, Clone, Debug, Deserialize, Serialize)]
39pub struct ShardingPlan {
40 pub state: ShardingPlanState,
41 pub target_slot: Option<u32>,
42 pub utilization_pct: u32,
43 pub active_count: u32,
44 pub total_capacity: u64,
45 pub total_used: u64,
46}
47
48#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
54pub enum CreateBlockedReason {
55 PoolAtCapacity,
56 NoFreeSlots,
57 PolicyViolation(String),
58}
59
60impl std::fmt::Display for CreateBlockedReason {
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 match self {
63 Self::PoolAtCapacity => write!(f, "shard cap reached"),
64 Self::NoFreeSlots => write!(f, "sharding pool has no free slots"),
65 Self::PolicyViolation(msg) => write!(f, "{msg}"),
66 }
67 }
68}
69
70#[derive(CandidType, Clone, Debug, Deserialize, Serialize)]
76pub enum ShardingPlanState {
77 AlreadyAssigned { pid: Principal },
79
80 UseExisting { pid: Principal },
82
83 CreateAllowed,
85
86 CreateBlocked { reason: CreateBlockedReason },
88}
89
90pub struct ShardingPolicyOps;
95
96impl ShardingPolicyOps {
97 #[must_use]
103 pub(crate) const fn can_create(metrics: &PoolMetrics, policy: &ShardPoolPolicy) -> bool {
104 metrics.active_count < policy.max_shards
105 }
106
107 pub(crate) fn get_pool_config(pool: &str) -> Result<ShardPool, Error> {
113 let cfg = ConfigOps::current_canister();
114 let sharding_cfg = cfg.sharding.ok_or(ShardingOpsError::ShardingDisabled)?;
115 let pool_cfg = sharding_cfg
116 .pools
117 .get(pool)
118 .ok_or_else(|| ShardingOpsError::PoolNotFound(pool.to_string()))?
119 .clone();
120
121 Ok(pool_cfg)
122 }
123
124 pub fn plan_assign_to_pool(pool: &str, tenant: impl AsRef<str>) -> Result<ShardingPlan, Error> {
131 Self::plan_internal(pool, tenant.as_ref(), None)
132 }
133
134 pub fn plan_reassign_from_shard(
137 pool: &str,
138 tenant: impl AsRef<str>,
139 donor_pid: Principal,
140 ) -> Result<ShardingPlan, Error> {
141 let tenant = tenant.as_ref();
142 Self::plan_internal(pool, tenant, Some(donor_pid))
143 }
144
145 fn plan_internal(
146 pool: &str,
147 tenant: &str,
148 exclude_pid: Option<Principal>,
149 ) -> Result<ShardingPlan, Error> {
150 let pool_cfg = Self::get_pool_config(pool)?;
151 let metrics = pool_metrics(pool);
152 let view = ShardingRegistryOps::export();
153 let slot_plan = plan_slot_backfill(pool, &view, pool_cfg.policy.max_shards);
154
155 if let Some(pid) = ShardingRegistryOps::tenant_shard(pool, tenant)
156 && exclude_pid != Some(pid)
157 {
158 let slot = slot_plan.slots.get(&pid).copied();
159 return Ok(Self::make_plan(
160 ShardingPlanState::AlreadyAssigned { pid },
161 &metrics,
162 slot,
163 ));
164 }
165
166 let shards_with_capacity: Vec<_> = view
168 .iter()
169 .filter(|(pid, entry)| {
170 entry.pool.as_ref() == pool && entry.has_capacity() && exclude_pid != Some(*pid)
171 })
172 .map(|(pid, _)| *pid)
173 .collect();
174
175 if let Some(target_pid) = HrwSelector::select(tenant, &shards_with_capacity) {
176 let slot = slot_plan.slots.get(&target_pid).copied();
177 return Ok(Self::make_plan(
178 ShardingPlanState::UseExisting { pid: target_pid },
179 &metrics,
180 slot,
181 ));
182 }
183
184 let max_slots = pool_cfg.policy.max_shards;
185 let free_slots: Vec<u32> = (0..max_slots)
186 .filter(|slot| !slot_plan.occupied.contains(slot))
187 .collect();
188
189 let Some(target_slot) = HrwSelector::select_from_slots(pool, tenant, &free_slots) else {
192 return Ok(Self::make_plan(
193 ShardingPlanState::CreateBlocked {
194 reason: CreateBlockedReason::NoFreeSlots,
195 },
196 &metrics,
197 None,
198 ));
199 };
200
201 if Self::can_create(&metrics, &pool_cfg.policy) {
202 Ok(Self::make_plan(
203 ShardingPlanState::CreateAllowed,
204 &metrics,
205 Some(target_slot),
206 ))
207 } else {
208 Ok(Self::make_plan(
209 ShardingPlanState::CreateBlocked {
210 reason: CreateBlockedReason::PoolAtCapacity,
211 },
212 &metrics,
213 Some(target_slot),
214 ))
215 }
216 }
217
218 #[must_use]
224 pub fn export() -> ShardingRegistryDto {
225 ShardingRegistryOps::export()
226 }
227
228 #[must_use]
230 pub fn lookup_tenant(pool: &str, tenant: impl AsRef<str>) -> Option<Principal> {
231 let tenant = tenant.as_ref();
232 ShardingRegistryOps::tenant_shard(pool, tenant)
233 }
234
235 pub fn try_lookup_tenant(pool: &str, tenant: impl AsRef<str>) -> Result<Principal, Error> {
237 let tenant = tenant.as_ref();
238 ShardingRegistryOps::tenant_shard(pool, tenant)
239 .ok_or_else(|| ShardingOpsError::TenantNotFound(tenant.to_string()).into())
240 }
241
242 const fn make_plan(
248 state: ShardingPlanState,
249 metrics: &PoolMetrics,
250 slot: Option<u32>,
251 ) -> ShardingPlan {
252 ShardingPlan {
253 state,
254 target_slot: slot,
255 utilization_pct: metrics.utilization_pct,
256 active_count: metrics.active_count,
257 total_capacity: metrics.total_capacity,
258 total_used: metrics.total_used,
259 }
260 }
261}
262
263struct SlotBackfillPlan {
268 slots: BTreeMap<Principal, u32>,
270 occupied: BTreeSet<u32>,
272}
273
274fn plan_slot_backfill(
275 pool: &str,
276 view: &[(Principal, ShardEntry)],
277 max_slots: u32,
278) -> SlotBackfillPlan {
279 let mut entries: Vec<(Principal, ShardEntry)> = view
280 .iter()
281 .filter(|(_, entry)| entry.pool.as_ref() == pool)
282 .map(|(pid, entry)| (*pid, entry.clone()))
283 .collect();
284
285 entries.sort_by_key(|(pid, _)| *pid);
286
287 let mut slots = BTreeMap::<Principal, u32>::new();
288 let mut occupied = BTreeSet::<u32>::new();
289
290 for (pid, entry) in &entries {
291 if entry.has_assigned_slot() {
292 slots.insert(*pid, entry.slot);
293 occupied.insert(entry.slot);
294 }
295 }
296
297 if max_slots == 0 {
298 return SlotBackfillPlan { slots, occupied };
299 }
300
301 let available: Vec<u32> = (0..max_slots)
302 .filter(|slot| !occupied.contains(slot))
303 .collect();
304
305 if available.is_empty() {
306 return SlotBackfillPlan { slots, occupied };
307 }
308
309 let mut idx = 0usize;
310 for (pid, entry) in &entries {
311 if entry.has_assigned_slot() {
312 continue;
313 }
314
315 if idx >= available.len() {
318 break;
319 }
320
321 let slot = available[idx];
322 idx += 1;
323 slots.insert(*pid, slot);
324 occupied.insert(slot);
325 }
326
327 SlotBackfillPlan { slots, occupied }
328}
329
330#[cfg(test)]
335mod tests {
336 use super::*;
337 use crate::{
338 config::Config,
339 ids::CanisterRole,
340 ops::storage::{env::EnvOps, sharding::ShardingRegistryOps},
341 };
342 use candid::Principal;
343
344 #[test]
345 fn can_create_blocks_when_at_capacity() {
346 let metrics = PoolMetrics {
347 active_count: 10,
348 total_capacity: 100,
349 total_used: 80,
350 utilization_pct: 80,
351 };
352 let policy = ShardPoolPolicy {
353 max_shards: 5,
354 ..Default::default()
355 };
356 assert!(!ShardingPolicyOps::can_create(&metrics, &policy));
357 }
358
359 #[test]
360 fn plan_returns_already_assigned_if_tenant_exists() {
361 let tenant = Principal::anonymous();
362 let plan = ShardingPlan {
363 state: ShardingPlanState::AlreadyAssigned { pid: tenant },
364 target_slot: Some(0),
365 utilization_pct: 50,
366 active_count: 2,
367 total_capacity: 100,
368 total_used: 50,
369 };
370 assert!(matches!(
371 plan.state,
372 ShardingPlanState::AlreadyAssigned { .. }
373 ));
374 }
375
376 fn p(id: u8) -> Principal {
377 Principal::from_slice(&[id; 29])
378 }
379
380 fn init_config() {
381 use crate::{
382 config::Config,
383 ids::{CanisterRole, SubnetRole},
384 };
385
386 let toml = r#"
387 [subnets.prime.canisters.manager]
388 initial_cycles = "5T"
389
390 [subnets.prime.canisters.manager.sharding.pools.primary]
391 canister_role = "shard"
392 [subnets.prime.canisters.manager.sharding.pools.primary.policy]
393 capacity = 1
394 max_shards = 2
395
396 [subnets.prime.canisters.shard]
397 initial_cycles = "5T"
398 "#;
399
400 Config::init_from_toml(toml).unwrap();
401 EnvOps::set_subnet_role(SubnetRole::PRIME);
402 EnvOps::set_canister_role(CanisterRole::from("manager"));
403 }
404
405 #[test]
406 fn plan_allows_creation_when_target_shard_full() {
407 Config::reset_for_tests();
408 init_config();
409 ShardingRegistryOps::clear_for_test();
410
411 let shard_role = CanisterRole::from("shard");
412 let shard = p(1);
413 ShardingRegistryOps::create(shard, "primary", 0, &shard_role, 1).unwrap();
414 ShardingRegistryOps::assign("primary", "tenant-a", shard).unwrap();
415
416 let plan = ShardingPolicyOps::plan_assign_to_pool("primary", "tenant-x").unwrap();
417
418 assert!(matches!(plan.state, ShardingPlanState::CreateAllowed));
419 Config::reset_for_tests();
420 }
421
422 #[test]
423 fn plan_blocks_creation_when_pool_at_capacity() {
424 Config::reset_for_tests();
425 init_config();
426 ShardingRegistryOps::clear_for_test();
427
428 let shard_role = CanisterRole::from("shard");
429 let shard_a = p(1);
430 let shard_b = p(2);
431 ShardingRegistryOps::create(shard_a, "primary", 0, &shard_role, 1).unwrap();
432 ShardingRegistryOps::create(shard_b, "primary", 1, &shard_role, 1).unwrap();
433 ShardingRegistryOps::assign("primary", "tenant-a", shard_a).unwrap();
434 ShardingRegistryOps::assign("primary", "tenant-b", shard_b).unwrap();
435
436 let plan = ShardingPolicyOps::plan_assign_to_pool("primary", "tenant-y").unwrap();
437
438 assert!(matches!(
439 plan.state,
440 ShardingPlanState::CreateBlocked { .. }
441 ));
442 Config::reset_for_tests();
443 }
444}