canic_core/ops/storage/sharding/
registry.rs

1use crate::{
2    Error, ThisError,
3    cdk::{types::Principal, utils::time::now_secs},
4    ids::CanisterRole,
5    model::memory::sharding::{ShardEntry, ShardKey, ShardingRegistry},
6    ops::storage::StorageOpsError,
7};
8
9///
10/// ShardingRegistryOps
11///
12
13pub struct ShardingRegistryOps;
14
15///
16/// ShardingRegistryOpsError
17/// Storage-layer errors for sharding registry CRUD and consistency checks.
18///
19
20#[derive(Debug, ThisError)]
21pub enum ShardingRegistryOpsError {
22    #[error("shard not found: {0}")]
23    ShardNotFound(Principal),
24
25    #[error("invalid sharding key: {0}")]
26    InvalidKey(String),
27
28    #[error("shard {pid} belongs to pool '{actual}', not '{expected}'")]
29    PoolMismatch {
30        pid: Principal,
31        expected: String,
32        actual: String,
33    },
34
35    #[error("slot {slot} in pool '{pool}' already assigned to shard {pid}")]
36    SlotOccupied {
37        pool: String,
38        slot: u32,
39        pid: Principal,
40    },
41}
42
43impl From<ShardingRegistryOpsError> for Error {
44    fn from(err: ShardingRegistryOpsError) -> Self {
45        StorageOpsError::from(err).into()
46    }
47}
48
49impl ShardingRegistryOps {
50    /// Create a new shard entry in the registry.
51    pub fn create(
52        pid: Principal,
53        pool: &str,
54        slot: u32,
55        canister_type: &CanisterRole,
56        capacity: u32,
57    ) -> Result<(), Error> {
58        ShardingRegistry::with_mut(|core| {
59            if slot != ShardEntry::UNASSIGNED_SLOT {
60                for (other_pid, other_entry) in core.all_entries() {
61                    if other_pid != pid && other_entry.pool == pool && other_entry.slot == slot {
62                        return Err(ShardingRegistryOpsError::SlotOccupied {
63                            pool: pool.to_string(),
64                            slot,
65                            pid: other_pid,
66                        }
67                        .into());
68                    }
69                }
70            }
71
72            let entry = ShardEntry::new(pool, slot, canister_type.clone(), capacity, now_secs());
73            core.insert_entry(pid, entry);
74
75            Ok(())
76        })
77    }
78
79    /// Fetch a shard entry by principal.
80    #[must_use]
81    pub fn get(pid: Principal) -> Option<ShardEntry> {
82        ShardingRegistry::with(|core| core.get_entry(&pid))
83    }
84
85    /// Export all shard entries.
86    #[must_use]
87    pub fn export() -> Vec<(Principal, ShardEntry)> {
88        ShardingRegistry::export()
89    }
90
91    /// Returns the shard assigned to the given tenant (if any).
92    #[must_use]
93    pub fn tenant_shard(pool: &str, tenant: &str) -> Option<Principal> {
94        ShardingRegistry::tenant_shard(pool, tenant)
95    }
96
97    /// Lookup the slot index for a given shard principal.
98    #[must_use]
99    pub fn slot_for_shard(pool: &str, shard: Principal) -> Option<u32> {
100        ShardingRegistry::slot_for_shard(pool, shard)
101    }
102
103    /// Lists all tenants currently assigned to the specified shard.
104    #[must_use]
105    pub fn tenants_in_shard(pool: &str, shard: Principal) -> Vec<String> {
106        ShardingRegistry::tenants_in_shard(pool, shard)
107    }
108
109    /// Assign (or reassign) a tenant to a shard.
110    ///
111    /// Storage responsibilities:
112    /// - enforce referential integrity (target shard must exist)
113    /// - enforce pool consistency (assignment pool must match shard entry pool)
114    /// - maintain derived counters (`ShardEntry.count`)
115    pub fn assign(pool: &str, tenant: &str, shard: Principal) -> Result<(), Error> {
116        ShardingRegistry::with_mut(|core| {
117            let mut entry = core
118                .get_entry(&shard)
119                .ok_or(ShardingRegistryOpsError::ShardNotFound(shard))?;
120
121            if entry.pool != pool {
122                return Err(ShardingRegistryOpsError::PoolMismatch {
123                    pid: shard,
124                    expected: pool.to_string(),
125                    actual: entry.pool,
126                }
127                .into());
128            }
129
130            let key =
131                ShardKey::try_new(pool, tenant).map_err(ShardingRegistryOpsError::InvalidKey)?;
132
133            // If tenant is already assigned, decrement the old shard count.
134            if let Some(current) = core.get_assignment(&key) {
135                if current == shard {
136                    return Ok(());
137                }
138
139                if let Some(mut old_entry) = core.get_entry(&current) {
140                    old_entry.count = old_entry.count.saturating_sub(1);
141                    core.insert_entry(current, old_entry);
142                }
143            }
144
145            // Overwrite the assignment and increment the new shard count.
146            core.insert_assignment(key, shard);
147            entry.count = entry.count.saturating_add(1);
148            core.insert_entry(shard, entry);
149
150            Ok(())
151        })
152    }
153
154    /// Remove a tenant assignment, if present.
155    ///
156    /// Returns the shard principal that previously held the assignment.
157    pub fn unassign(pool: &str, tenant: &str) -> Result<Option<Principal>, Error> {
158        ShardingRegistry::with_mut(|core| {
159            let key =
160                ShardKey::try_new(pool, tenant).map_err(ShardingRegistryOpsError::InvalidKey)?;
161            let Some(shard) = core.remove_assignment(&key) else {
162                return Ok(None);
163            };
164
165            if let Some(mut entry) = core.get_entry(&shard) {
166                entry.count = entry.count.saturating_sub(1);
167                core.insert_entry(shard, entry);
168            }
169
170            Ok(Some(shard))
171        })
172    }
173
174    /// Update the logical slot index for a shard entry.
175    pub fn set_slot(pid: Principal, slot: u32) -> Result<(), Error> {
176        ShardingRegistry::with_mut(|core| {
177            let mut entry = core
178                .get_entry(&pid)
179                .ok_or(ShardingRegistryOpsError::ShardNotFound(pid))?;
180
181            if slot != ShardEntry::UNASSIGNED_SLOT {
182                for (other_pid, other_entry) in core.all_entries() {
183                    if other_pid != pid
184                        && other_entry.pool == entry.pool
185                        && other_entry.slot == slot
186                    {
187                        return Err(ShardingRegistryOpsError::SlotOccupied {
188                            pool: entry.pool,
189                            slot,
190                            pid: other_pid,
191                        }
192                        .into());
193                    }
194                }
195            }
196
197            entry.slot = slot;
198            core.insert_entry(pid, entry);
199
200            Ok(())
201        })
202    }
203
204    #[cfg(test)]
205    pub(crate) fn clear_for_test() {
206        ShardingRegistry::clear();
207    }
208}
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213
214    fn p(id: u8) -> Principal {
215        Principal::from_slice(&[id; 29])
216    }
217
218    #[test]
219    fn assign_and_unassign_updates_count() {
220        ShardingRegistryOps::clear_for_test();
221        let ty = CanisterRole::new("alpha");
222        let shard_pid = p(1);
223
224        ShardingRegistryOps::create(shard_pid, "poolA", 0, &ty, 2).unwrap();
225        ShardingRegistryOps::assign("poolA", "tenant1", shard_pid).unwrap();
226        let count_after = ShardingRegistryOps::get(shard_pid).unwrap().count;
227        assert_eq!(count_after, 1);
228
229        assert_eq!(
230            ShardingRegistryOps::unassign("poolA", "tenant1").unwrap(),
231            Some(shard_pid)
232        );
233        let count_final = ShardingRegistryOps::get(shard_pid).unwrap().count;
234        assert_eq!(count_final, 0);
235    }
236}