canic_core/ops/storage/sharding/
registry.rs1use crate::{
2 Error, ThisError,
3 cdk::{types::Principal, utils::time::now_secs},
4 ids::CanisterRole,
5 model::memory::sharding::{ShardEntry, ShardKey, ShardingRegistry},
6 ops::storage::StorageOpsError,
7};
8
9pub struct ShardingRegistryOps;
14
15#[derive(Debug, ThisError)]
21pub enum ShardingRegistryOpsError {
22 #[error("shard not found: {0}")]
23 ShardNotFound(Principal),
24
25 #[error("invalid sharding key: {0}")]
26 InvalidKey(String),
27
28 #[error("shard {pid} belongs to pool '{actual}', not '{expected}'")]
29 PoolMismatch {
30 pid: Principal,
31 expected: String,
32 actual: String,
33 },
34
35 #[error("slot {slot} in pool '{pool}' already assigned to shard {pid}")]
36 SlotOccupied {
37 pool: String,
38 slot: u32,
39 pid: Principal,
40 },
41}
42
43impl From<ShardingRegistryOpsError> for Error {
44 fn from(err: ShardingRegistryOpsError) -> Self {
45 StorageOpsError::from(err).into()
46 }
47}
48
49impl ShardingRegistryOps {
50 pub fn create(
52 pid: Principal,
53 pool: &str,
54 slot: u32,
55 canister_type: &CanisterRole,
56 capacity: u32,
57 ) -> Result<(), Error> {
58 ShardingRegistry::with_mut(|core| {
59 if slot != ShardEntry::UNASSIGNED_SLOT {
60 for (other_pid, other_entry) in core.all_entries() {
61 if other_pid != pid && other_entry.pool == pool && other_entry.slot == slot {
62 return Err(ShardingRegistryOpsError::SlotOccupied {
63 pool: pool.to_string(),
64 slot,
65 pid: other_pid,
66 }
67 .into());
68 }
69 }
70 }
71
72 let entry = ShardEntry::new(pool, slot, canister_type.clone(), capacity, now_secs());
73 core.insert_entry(pid, entry);
74
75 Ok(())
76 })
77 }
78
79 #[must_use]
81 pub fn get(pid: Principal) -> Option<ShardEntry> {
82 ShardingRegistry::with(|core| core.get_entry(&pid))
83 }
84
85 #[must_use]
87 pub fn export() -> Vec<(Principal, ShardEntry)> {
88 ShardingRegistry::export()
89 }
90
91 #[must_use]
93 pub fn tenant_shard(pool: &str, tenant: &str) -> Option<Principal> {
94 ShardingRegistry::tenant_shard(pool, tenant)
95 }
96
97 #[must_use]
99 pub fn slot_for_shard(pool: &str, shard: Principal) -> Option<u32> {
100 ShardingRegistry::slot_for_shard(pool, shard)
101 }
102
103 #[must_use]
105 pub fn tenants_in_shard(pool: &str, shard: Principal) -> Vec<String> {
106 ShardingRegistry::tenants_in_shard(pool, shard)
107 }
108
109 pub fn assign(pool: &str, tenant: &str, shard: Principal) -> Result<(), Error> {
116 ShardingRegistry::with_mut(|core| {
117 let mut entry = core
118 .get_entry(&shard)
119 .ok_or(ShardingRegistryOpsError::ShardNotFound(shard))?;
120
121 if entry.pool != pool {
122 return Err(ShardingRegistryOpsError::PoolMismatch {
123 pid: shard,
124 expected: pool.to_string(),
125 actual: entry.pool,
126 }
127 .into());
128 }
129
130 let key =
131 ShardKey::try_new(pool, tenant).map_err(ShardingRegistryOpsError::InvalidKey)?;
132
133 if let Some(current) = core.get_assignment(&key) {
135 if current == shard {
136 return Ok(());
137 }
138
139 if let Some(mut old_entry) = core.get_entry(¤t) {
140 old_entry.count = old_entry.count.saturating_sub(1);
141 core.insert_entry(current, old_entry);
142 }
143 }
144
145 core.insert_assignment(key, shard);
147 entry.count = entry.count.saturating_add(1);
148 core.insert_entry(shard, entry);
149
150 Ok(())
151 })
152 }
153
154 pub fn unassign(pool: &str, tenant: &str) -> Result<Option<Principal>, Error> {
158 ShardingRegistry::with_mut(|core| {
159 let key =
160 ShardKey::try_new(pool, tenant).map_err(ShardingRegistryOpsError::InvalidKey)?;
161 let Some(shard) = core.remove_assignment(&key) else {
162 return Ok(None);
163 };
164
165 if let Some(mut entry) = core.get_entry(&shard) {
166 entry.count = entry.count.saturating_sub(1);
167 core.insert_entry(shard, entry);
168 }
169
170 Ok(Some(shard))
171 })
172 }
173
174 pub fn set_slot(pid: Principal, slot: u32) -> Result<(), Error> {
176 ShardingRegistry::with_mut(|core| {
177 let mut entry = core
178 .get_entry(&pid)
179 .ok_or(ShardingRegistryOpsError::ShardNotFound(pid))?;
180
181 if slot != ShardEntry::UNASSIGNED_SLOT {
182 for (other_pid, other_entry) in core.all_entries() {
183 if other_pid != pid
184 && other_entry.pool == entry.pool
185 && other_entry.slot == slot
186 {
187 return Err(ShardingRegistryOpsError::SlotOccupied {
188 pool: entry.pool,
189 slot,
190 pid: other_pid,
191 }
192 .into());
193 }
194 }
195 }
196
197 entry.slot = slot;
198 core.insert_entry(pid, entry);
199
200 Ok(())
201 })
202 }
203
204 #[cfg(test)]
205 pub(crate) fn clear_for_test() {
206 ShardingRegistry::clear();
207 }
208}
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213
214 fn p(id: u8) -> Principal {
215 Principal::from_slice(&[id; 29])
216 }
217
218 #[test]
219 fn assign_and_unassign_updates_count() {
220 ShardingRegistryOps::clear_for_test();
221 let ty = CanisterRole::new("alpha");
222 let shard_pid = p(1);
223
224 ShardingRegistryOps::create(shard_pid, "poolA", 0, &ty, 2).unwrap();
225 ShardingRegistryOps::assign("poolA", "tenant1", shard_pid).unwrap();
226 let count_after = ShardingRegistryOps::get(shard_pid).unwrap().count;
227 assert_eq!(count_after, 1);
228
229 assert_eq!(
230 ShardingRegistryOps::unassign("poolA", "tenant1").unwrap(),
231 Some(shard_pid)
232 );
233 let count_final = ShardingRegistryOps::get(shard_pid).unwrap().count;
234 assert_eq!(count_final, 0);
235 }
236}