canic_core/ops/storage/sharding/
registry.rs1use crate::{
2 Error, ThisError,
3 cdk::{types::Principal, utils::time::now_secs},
4 ids::CanisterRole,
5 model::memory::sharding::{ShardEntry, ShardKey, ShardingRegistry},
6 ops::storage::StorageOpsError,
7};
8
9pub struct ShardingRegistryOps;
14
15#[derive(Debug, ThisError)]
21pub enum ShardingRegistryOpsError {
22 #[error("shard not found: {0}")]
23 ShardNotFound(Principal),
24
25 #[error("shard {pid} belongs to pool '{actual}', not '{expected}'")]
26 PoolMismatch {
27 pid: Principal,
28 expected: String,
29 actual: String,
30 },
31
32 #[error("slot {slot} in pool '{pool}' already assigned to shard {pid}")]
33 SlotOccupied {
34 pool: String,
35 slot: u32,
36 pid: Principal,
37 },
38}
39
40impl From<ShardingRegistryOpsError> for Error {
41 fn from(err: ShardingRegistryOpsError) -> Self {
42 StorageOpsError::from(err).into()
43 }
44}
45
46impl ShardingRegistryOps {
47 pub fn create(
49 pid: Principal,
50 pool: &str,
51 slot: u32,
52 canister_type: &CanisterRole,
53 capacity: u32,
54 ) -> Result<(), Error> {
55 ShardingRegistry::with_mut(|core| {
56 if slot != ShardEntry::UNASSIGNED_SLOT {
57 for (other_pid, other_entry) in core.all_entries() {
58 if other_pid != pid && other_entry.pool == pool && other_entry.slot == slot {
59 return Err(ShardingRegistryOpsError::SlotOccupied {
60 pool: pool.to_string(),
61 slot,
62 pid: other_pid,
63 }
64 .into());
65 }
66 }
67 }
68
69 let entry = ShardEntry::new(pool, slot, canister_type.clone(), capacity, now_secs());
70 core.insert_entry(pid, entry);
71
72 Ok(())
73 })
74 }
75
76 #[must_use]
78 pub fn get(pid: Principal) -> Option<ShardEntry> {
79 ShardingRegistry::with(|core| core.get_entry(&pid))
80 }
81
82 #[must_use]
84 pub fn export() -> Vec<(Principal, ShardEntry)> {
85 ShardingRegistry::export()
86 }
87
88 #[must_use]
90 pub fn tenant_shard(pool: &str, tenant: &str) -> Option<Principal> {
91 ShardingRegistry::tenant_shard(pool, tenant)
92 }
93
94 #[must_use]
96 pub fn slot_for_shard(pool: &str, shard: Principal) -> Option<u32> {
97 ShardingRegistry::slot_for_shard(pool, shard)
98 }
99
100 #[must_use]
102 pub fn tenants_in_shard(pool: &str, shard: Principal) -> Vec<String> {
103 ShardingRegistry::tenants_in_shard(pool, shard)
104 }
105
106 pub fn assign(pool: &str, tenant: &str, shard: Principal) -> Result<(), Error> {
113 ShardingRegistry::with_mut(|core| {
114 let mut entry = core
115 .get_entry(&shard)
116 .ok_or(ShardingRegistryOpsError::ShardNotFound(shard))?;
117
118 if entry.pool != pool {
119 return Err(ShardingRegistryOpsError::PoolMismatch {
120 pid: shard,
121 expected: pool.to_string(),
122 actual: entry.pool,
123 }
124 .into());
125 }
126
127 let key = ShardKey::new(pool, tenant);
128
129 if let Some(current) = core.get_assignment(&key) {
131 if current == shard {
132 return Ok(());
133 }
134
135 if let Some(mut old_entry) = core.get_entry(¤t) {
136 old_entry.count = old_entry.count.saturating_sub(1);
137 core.insert_entry(current, old_entry);
138 }
139 }
140
141 core.insert_assignment(key, shard);
143 entry.count = entry.count.saturating_add(1);
144 core.insert_entry(shard, entry);
145
146 Ok(())
147 })
148 }
149
150 pub fn unassign(pool: &str, tenant: &str) -> Result<Option<Principal>, Error> {
154 ShardingRegistry::with_mut(|core| {
155 let key = ShardKey::new(pool, tenant);
156 let Some(shard) = core.remove_assignment(&key) else {
157 return Ok(None);
158 };
159
160 if let Some(mut entry) = core.get_entry(&shard) {
161 entry.count = entry.count.saturating_sub(1);
162 core.insert_entry(shard, entry);
163 }
164
165 Ok(Some(shard))
166 })
167 }
168
169 pub fn set_slot(pid: Principal, slot: u32) -> Result<(), Error> {
171 ShardingRegistry::with_mut(|core| {
172 let mut entry = core
173 .get_entry(&pid)
174 .ok_or(ShardingRegistryOpsError::ShardNotFound(pid))?;
175
176 if slot != ShardEntry::UNASSIGNED_SLOT {
177 for (other_pid, other_entry) in core.all_entries() {
178 if other_pid != pid
179 && other_entry.pool == entry.pool
180 && other_entry.slot == slot
181 {
182 return Err(ShardingRegistryOpsError::SlotOccupied {
183 pool: entry.pool,
184 slot,
185 pid: other_pid,
186 }
187 .into());
188 }
189 }
190 }
191
192 entry.slot = slot;
193 core.insert_entry(pid, entry);
194
195 Ok(())
196 })
197 }
198
199 #[cfg(test)]
200 pub(crate) fn clear_for_test() {
201 ShardingRegistry::clear();
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208
209 fn p(id: u8) -> Principal {
210 Principal::from_slice(&[id; 29])
211 }
212
213 #[test]
214 fn assign_and_unassign_updates_count() {
215 ShardingRegistryOps::clear_for_test();
216 let ty = CanisterRole::new("alpha");
217 let shard_pid = p(1);
218
219 ShardingRegistryOps::create(shard_pid, "poolA", 0, &ty, 2).unwrap();
220 ShardingRegistryOps::assign("poolA", "tenant1", shard_pid).unwrap();
221 let count_after = ShardingRegistryOps::get(shard_pid).unwrap().count;
222 assert_eq!(count_after, 1);
223
224 assert_eq!(
225 ShardingRegistryOps::unassign("poolA", "tenant1").unwrap(),
226 Some(shard_pid)
227 );
228 let count_final = ShardingRegistryOps::get(shard_pid).unwrap().count;
229 assert_eq!(count_final, 0);
230 }
231}