canic_core/ops/storage/sharding/
registry.rs

1use crate::{
2    Error, ThisError,
3    cdk::{types::Principal, utils::time::now_secs},
4    ids::CanisterRole,
5    model::memory::sharding::{ShardEntry, ShardKey, ShardingRegistry},
6    ops::storage::StorageOpsError,
7};
8
9///
10/// ShardingRegistryOps
11///
12
13pub struct ShardingRegistryOps;
14
15///
16/// ShardingRegistryOpsError
17/// Storage-layer errors for sharding registry CRUD and consistency checks.
18///
19
20#[derive(Debug, ThisError)]
21pub enum ShardingRegistryOpsError {
22    #[error("shard not found: {0}")]
23    ShardNotFound(Principal),
24
25    #[error("shard {pid} belongs to pool '{actual}', not '{expected}'")]
26    PoolMismatch {
27        pid: Principal,
28        expected: String,
29        actual: String,
30    },
31
32    #[error("slot {slot} in pool '{pool}' already assigned to shard {pid}")]
33    SlotOccupied {
34        pool: String,
35        slot: u32,
36        pid: Principal,
37    },
38}
39
40impl From<ShardingRegistryOpsError> for Error {
41    fn from(err: ShardingRegistryOpsError) -> Self {
42        StorageOpsError::from(err).into()
43    }
44}
45
46impl ShardingRegistryOps {
47    /// Create a new shard entry in the registry.
48    pub fn create(
49        pid: Principal,
50        pool: &str,
51        slot: u32,
52        canister_type: &CanisterRole,
53        capacity: u32,
54    ) -> Result<(), Error> {
55        ShardingRegistry::with_mut(|core| {
56            if slot != ShardEntry::UNASSIGNED_SLOT {
57                for (other_pid, other_entry) in core.all_entries() {
58                    if other_pid != pid && other_entry.pool == pool && other_entry.slot == slot {
59                        return Err(ShardingRegistryOpsError::SlotOccupied {
60                            pool: pool.to_string(),
61                            slot,
62                            pid: other_pid,
63                        }
64                        .into());
65                    }
66                }
67            }
68
69            let entry = ShardEntry::new(pool, slot, canister_type.clone(), capacity, now_secs());
70            core.insert_entry(pid, entry);
71
72            Ok(())
73        })
74    }
75
76    /// Fetch a shard entry by principal.
77    #[must_use]
78    pub fn get(pid: Principal) -> Option<ShardEntry> {
79        ShardingRegistry::with(|core| core.get_entry(&pid))
80    }
81
82    /// Export all shard entries.
83    #[must_use]
84    pub fn export() -> Vec<(Principal, ShardEntry)> {
85        ShardingRegistry::export()
86    }
87
88    /// Returns the shard assigned to the given tenant (if any).
89    #[must_use]
90    pub fn tenant_shard(pool: &str, tenant: &str) -> Option<Principal> {
91        ShardingRegistry::tenant_shard(pool, tenant)
92    }
93
94    /// Lookup the slot index for a given shard principal.
95    #[must_use]
96    pub fn slot_for_shard(pool: &str, shard: Principal) -> Option<u32> {
97        ShardingRegistry::slot_for_shard(pool, shard)
98    }
99
100    /// Lists all tenants currently assigned to the specified shard.
101    #[must_use]
102    pub fn tenants_in_shard(pool: &str, shard: Principal) -> Vec<String> {
103        ShardingRegistry::tenants_in_shard(pool, shard)
104    }
105
106    /// Assign (or reassign) a tenant to a shard.
107    ///
108    /// Storage responsibilities:
109    /// - enforce referential integrity (target shard must exist)
110    /// - enforce pool consistency (assignment pool must match shard entry pool)
111    /// - maintain derived counters (`ShardEntry.count`)
112    pub fn assign(pool: &str, tenant: &str, shard: Principal) -> Result<(), Error> {
113        ShardingRegistry::with_mut(|core| {
114            let mut entry = core
115                .get_entry(&shard)
116                .ok_or(ShardingRegistryOpsError::ShardNotFound(shard))?;
117
118            if entry.pool != pool {
119                return Err(ShardingRegistryOpsError::PoolMismatch {
120                    pid: shard,
121                    expected: pool.to_string(),
122                    actual: entry.pool,
123                }
124                .into());
125            }
126
127            let key = ShardKey::new(pool, tenant);
128
129            // If tenant is already assigned, decrement the old shard count.
130            if let Some(current) = core.get_assignment(&key) {
131                if current == shard {
132                    return Ok(());
133                }
134
135                if let Some(mut old_entry) = core.get_entry(&current) {
136                    old_entry.count = old_entry.count.saturating_sub(1);
137                    core.insert_entry(current, old_entry);
138                }
139            }
140
141            // Overwrite the assignment and increment the new shard count.
142            core.insert_assignment(key, shard);
143            entry.count = entry.count.saturating_add(1);
144            core.insert_entry(shard, entry);
145
146            Ok(())
147        })
148    }
149
150    /// Remove a tenant assignment, if present.
151    ///
152    /// Returns the shard principal that previously held the assignment.
153    pub fn unassign(pool: &str, tenant: &str) -> Result<Option<Principal>, Error> {
154        ShardingRegistry::with_mut(|core| {
155            let key = ShardKey::new(pool, tenant);
156            let Some(shard) = core.remove_assignment(&key) else {
157                return Ok(None);
158            };
159
160            if let Some(mut entry) = core.get_entry(&shard) {
161                entry.count = entry.count.saturating_sub(1);
162                core.insert_entry(shard, entry);
163            }
164
165            Ok(Some(shard))
166        })
167    }
168
169    /// Update the logical slot index for a shard entry.
170    pub fn set_slot(pid: Principal, slot: u32) -> Result<(), Error> {
171        ShardingRegistry::with_mut(|core| {
172            let mut entry = core
173                .get_entry(&pid)
174                .ok_or(ShardingRegistryOpsError::ShardNotFound(pid))?;
175
176            if slot != ShardEntry::UNASSIGNED_SLOT {
177                for (other_pid, other_entry) in core.all_entries() {
178                    if other_pid != pid
179                        && other_entry.pool == entry.pool
180                        && other_entry.slot == slot
181                    {
182                        return Err(ShardingRegistryOpsError::SlotOccupied {
183                            pool: entry.pool,
184                            slot,
185                            pid: other_pid,
186                        }
187                        .into());
188                    }
189                }
190            }
191
192            entry.slot = slot;
193            core.insert_entry(pid, entry);
194
195            Ok(())
196        })
197    }
198
199    #[cfg(test)]
200    pub(crate) fn clear_for_test() {
201        ShardingRegistry::clear();
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208
209    fn p(id: u8) -> Principal {
210        Principal::from_slice(&[id; 29])
211    }
212
213    #[test]
214    fn assign_and_unassign_updates_count() {
215        ShardingRegistryOps::clear_for_test();
216        let ty = CanisterRole::new("alpha");
217        let shard_pid = p(1);
218
219        ShardingRegistryOps::create(shard_pid, "poolA", 0, &ty, 2).unwrap();
220        ShardingRegistryOps::assign("poolA", "tenant1", shard_pid).unwrap();
221        let count_after = ShardingRegistryOps::get(shard_pid).unwrap().count;
222        assert_eq!(count_after, 1);
223
224        assert_eq!(
225            ShardingRegistryOps::unassign("poolA", "tenant1").unwrap(),
226            Some(shard_pid)
227        );
228        let count_final = ShardingRegistryOps::get(shard_pid).unwrap().count;
229        assert_eq!(count_final, 0);
230    }
231}