canic_core/ops/storage/
sharding.rs1pub(crate) use crate::model::memory::sharding::ShardEntry;
2
3use crate::{
4 Error, ThisError,
5 cdk::{types::Principal, utils::time::now_secs},
6 dto::placement::ShardingRegistryView,
7 ids::CanisterRole,
8 model::memory::sharding::{ShardKey, ShardingRegistry, ShardingRegistryData},
9 ops::{adapter::placement::shard_entry_to_view, storage::StorageOpsError},
10};
11
12pub struct ShardingRegistryOps;
17
18#[derive(Debug, ThisError)]
24pub enum ShardingRegistryOpsError {
25 #[error("shard not found: {0}")]
26 ShardNotFound(Principal),
27
28 #[error("invalid sharding key: {0}")]
29 InvalidKey(String),
30
31 #[error("shard {pid} belongs to pool '{actual}', not '{expected}'")]
32 PoolMismatch {
33 pid: Principal,
34 expected: String,
35 actual: String,
36 },
37
38 #[error("slot {slot} in pool '{pool}' already assigned to shard {pid}")]
39 SlotOccupied {
40 pool: String,
41 slot: u32,
42 pid: Principal,
43 },
44}
45
46impl From<ShardingRegistryOpsError> for Error {
47 fn from(err: ShardingRegistryOpsError) -> Self {
48 StorageOpsError::ShardingRegistryOpsError(err).into()
49 }
50}
51
52impl ShardingRegistryOps {
53 pub fn create(
55 pid: Principal,
56 pool: &str,
57 slot: u32,
58 canister_role: &CanisterRole,
59 capacity: u32,
60 ) -> Result<(), Error> {
61 ShardingRegistry::with_mut(|core| {
62 if slot != ShardEntry::UNASSIGNED_SLOT {
63 for (other_pid, other_entry) in core.all_entries() {
64 if other_pid != pid
65 && other_entry.pool.as_ref() == pool
66 && other_entry.slot == slot
67 {
68 return Err(ShardingRegistryOpsError::SlotOccupied {
69 pool: pool.to_string(),
70 slot,
71 pid: other_pid,
72 }
73 .into());
74 }
75 }
76 }
77
78 let entry =
79 ShardEntry::try_new(pool, slot, canister_role.clone(), capacity, now_secs())
80 .map_err(ShardingRegistryOpsError::InvalidKey)?;
81 core.insert_entry(pid, entry);
82
83 Ok(())
84 })
85 }
86
87 #[cfg(test)]
89 #[must_use]
90 pub(crate) fn get(pid: Principal) -> Option<ShardEntry> {
91 ShardingRegistry::with(|core| core.get_entry(&pid))
92 }
93
94 #[must_use]
96 pub fn export() -> ShardingRegistryData {
97 ShardingRegistry::export()
98 }
99
100 #[must_use]
102 pub fn export_view() -> ShardingRegistryView {
103 let data = ShardingRegistry::export();
104 let view = data
105 .into_iter()
106 .map(|(pid, entry)| (pid, shard_entry_to_view(&entry)))
107 .collect();
108
109 ShardingRegistryView(view)
110 }
111
112 #[must_use]
114 pub fn tenant_shard(pool: &str, tenant: &str) -> Option<Principal> {
115 ShardingRegistry::tenant_shard(pool, tenant)
116 }
117
118 #[must_use]
120 pub fn slot_for_shard(pool: &str, shard: Principal) -> Option<u32> {
121 ShardingRegistry::slot_for_shard(pool, shard)
122 }
123
124 #[must_use]
126 pub fn tenants_in_shard(pool: &str, shard: Principal) -> Vec<String> {
127 ShardingRegistry::tenants_in_shard(pool, shard)
128 }
129
130 pub fn assign(pool: &str, tenant: &str, shard: Principal) -> Result<(), Error> {
137 ShardingRegistry::with_mut(|core| {
138 let mut entry = core
139 .get_entry(&shard)
140 .ok_or(ShardingRegistryOpsError::ShardNotFound(shard))?;
141
142 if entry.pool.as_ref() != pool {
143 return Err(ShardingRegistryOpsError::PoolMismatch {
144 pid: shard,
145 expected: pool.to_string(),
146 actual: entry.pool.to_string(),
147 }
148 .into());
149 }
150
151 let key =
152 ShardKey::try_new(pool, tenant).map_err(ShardingRegistryOpsError::InvalidKey)?;
153
154 if let Some(current) = core.get_assignment(&key) {
156 if current == shard {
157 return Ok(());
158 }
159
160 if let Some(mut old_entry) = core.get_entry(¤t) {
161 old_entry.count = old_entry.count.saturating_sub(1);
162 core.insert_entry(current, old_entry);
163 }
164 }
165
166 core.insert_assignment(key, shard);
168 entry.count = entry.count.saturating_add(1);
169 core.insert_entry(shard, entry);
170
171 Ok(())
172 })
173 }
174
175 pub fn unassign(pool: &str, tenant: &str) -> Result<Option<Principal>, Error> {
179 ShardingRegistry::with_mut(|core| {
180 let key =
181 ShardKey::try_new(pool, tenant).map_err(ShardingRegistryOpsError::InvalidKey)?;
182 let Some(shard) = core.remove_assignment(&key) else {
183 return Ok(None);
184 };
185
186 if let Some(mut entry) = core.get_entry(&shard) {
187 entry.count = entry.count.saturating_sub(1);
188 core.insert_entry(shard, entry);
189 }
190
191 Ok(Some(shard))
192 })
193 }
194
195 pub fn set_slot(pid: Principal, slot: u32) -> Result<(), Error> {
197 ShardingRegistry::with_mut(|core| {
198 let mut entry = core
199 .get_entry(&pid)
200 .ok_or(ShardingRegistryOpsError::ShardNotFound(pid))?;
201
202 if slot != ShardEntry::UNASSIGNED_SLOT {
203 for (other_pid, other_entry) in core.all_entries() {
204 if other_pid != pid
205 && other_entry.pool == entry.pool
206 && other_entry.slot == slot
207 {
208 return Err(ShardingRegistryOpsError::SlotOccupied {
209 pool: entry.pool.to_string(),
210 slot,
211 pid: other_pid,
212 }
213 .into());
214 }
215 }
216 }
217
218 entry.slot = slot;
219 core.insert_entry(pid, entry);
220
221 Ok(())
222 })
223 }
224
225 #[cfg(test)]
226 pub(crate) fn clear_for_test() {
227 ShardingRegistry::clear();
228 }
229}
230
231#[cfg(test)]
236mod tests {
237 use super::*;
238
239 fn p(id: u8) -> Principal {
240 Principal::from_slice(&[id; 29])
241 }
242
243 #[test]
244 fn assign_and_unassign_updates_count() {
245 ShardingRegistryOps::clear_for_test();
246 let role = CanisterRole::new("alpha");
247 let shard_pid = p(1);
248
249 ShardingRegistryOps::create(shard_pid, "poolA", 0, &role, 2).unwrap();
250 ShardingRegistryOps::assign("poolA", "tenant1", shard_pid).unwrap();
251 let count_after = ShardingRegistryOps::get(shard_pid).unwrap().count;
252 assert_eq!(count_after, 1);
253
254 assert_eq!(
255 ShardingRegistryOps::unassign("poolA", "tenant1").unwrap(),
256 Some(shard_pid)
257 );
258 let count_final = ShardingRegistryOps::get(shard_pid).unwrap().count;
259 assert_eq!(count_final, 0);
260 }
261}