canic_core/ops/storage/
sharding.rs

1pub(crate) use crate::model::memory::sharding::ShardEntry;
2
3use crate::{
4    Error, ThisError,
5    cdk::{types::Principal, utils::time::now_secs},
6    dto::placement::ShardingRegistryView,
7    ids::CanisterRole,
8    model::memory::sharding::{ShardKey, ShardingRegistry, ShardingRegistryData},
9    ops::{adapter::placement::shard_entry_to_view, storage::StorageOpsError},
10};
11
12///
13/// ShardingRegistryOps
14///
15
16pub struct ShardingRegistryOps;
17
18///
19/// ShardingRegistryOpsError
20/// Storage-layer errors for sharding registry CRUD and consistency checks.
21///
22
23#[derive(Debug, ThisError)]
24pub enum ShardingRegistryOpsError {
25    #[error("shard not found: {0}")]
26    ShardNotFound(Principal),
27
28    #[error("invalid sharding key: {0}")]
29    InvalidKey(String),
30
31    #[error("shard {pid} belongs to pool '{actual}', not '{expected}'")]
32    PoolMismatch {
33        pid: Principal,
34        expected: String,
35        actual: String,
36    },
37
38    #[error("slot {slot} in pool '{pool}' already assigned to shard {pid}")]
39    SlotOccupied {
40        pool: String,
41        slot: u32,
42        pid: Principal,
43    },
44}
45
46impl From<ShardingRegistryOpsError> for Error {
47    fn from(err: ShardingRegistryOpsError) -> Self {
48        StorageOpsError::ShardingRegistryOpsError(err).into()
49    }
50}
51
52impl ShardingRegistryOps {
53    /// Create a new shard entry in the registry.
54    pub fn create(
55        pid: Principal,
56        pool: &str,
57        slot: u32,
58        canister_role: &CanisterRole,
59        capacity: u32,
60    ) -> Result<(), Error> {
61        ShardingRegistry::with_mut(|core| {
62            if slot != ShardEntry::UNASSIGNED_SLOT {
63                for (other_pid, other_entry) in core.all_entries() {
64                    if other_pid != pid
65                        && other_entry.pool.as_ref() == pool
66                        && other_entry.slot == slot
67                    {
68                        return Err(ShardingRegistryOpsError::SlotOccupied {
69                            pool: pool.to_string(),
70                            slot,
71                            pid: other_pid,
72                        }
73                        .into());
74                    }
75                }
76            }
77
78            let entry =
79                ShardEntry::try_new(pool, slot, canister_role.clone(), capacity, now_secs())
80                    .map_err(ShardingRegistryOpsError::InvalidKey)?;
81            core.insert_entry(pid, entry);
82
83            Ok(())
84        })
85    }
86
87    /// Fetch a shard entry by principal (tests only).
88    #[cfg(test)]
89    #[must_use]
90    pub(crate) fn get(pid: Principal) -> Option<ShardEntry> {
91        ShardingRegistry::with(|core| core.get_entry(&pid))
92    }
93
94    /// Export all shard entries as a public view.
95    #[must_use]
96    pub fn export() -> ShardingRegistryData {
97        ShardingRegistry::export()
98    }
99
100    /// Export all shard entries as a public view.
101    #[must_use]
102    pub fn export_view() -> ShardingRegistryView {
103        let data = ShardingRegistry::export();
104        let view = data
105            .into_iter()
106            .map(|(pid, entry)| (pid, shard_entry_to_view(&entry)))
107            .collect();
108
109        ShardingRegistryView(view)
110    }
111
112    /// Returns the shard assigned to the given tenant (if any).
113    #[must_use]
114    pub fn tenant_shard(pool: &str, tenant: &str) -> Option<Principal> {
115        ShardingRegistry::tenant_shard(pool, tenant)
116    }
117
118    /// Lookup the slot index for a given shard principal.
119    #[must_use]
120    pub fn slot_for_shard(pool: &str, shard: Principal) -> Option<u32> {
121        ShardingRegistry::slot_for_shard(pool, shard)
122    }
123
124    /// Lists all tenants currently assigned to the specified shard.
125    #[must_use]
126    pub fn tenants_in_shard(pool: &str, shard: Principal) -> Vec<String> {
127        ShardingRegistry::tenants_in_shard(pool, shard)
128    }
129
130    /// Assign (or reassign) a tenant to a shard.
131    ///
132    /// Storage responsibilities:
133    /// - enforce referential integrity (target shard must exist)
134    /// - enforce pool consistency (assignment pool must match shard entry pool)
135    /// - maintain derived counters (`ShardEntry.count`)
136    pub fn assign(pool: &str, tenant: &str, shard: Principal) -> Result<(), Error> {
137        ShardingRegistry::with_mut(|core| {
138            let mut entry = core
139                .get_entry(&shard)
140                .ok_or(ShardingRegistryOpsError::ShardNotFound(shard))?;
141
142            if entry.pool.as_ref() != pool {
143                return Err(ShardingRegistryOpsError::PoolMismatch {
144                    pid: shard,
145                    expected: pool.to_string(),
146                    actual: entry.pool.to_string(),
147                }
148                .into());
149            }
150
151            let key =
152                ShardKey::try_new(pool, tenant).map_err(ShardingRegistryOpsError::InvalidKey)?;
153
154            // If tenant is already assigned, decrement the old shard count.
155            if let Some(current) = core.get_assignment(&key) {
156                if current == shard {
157                    return Ok(());
158                }
159
160                if let Some(mut old_entry) = core.get_entry(&current) {
161                    old_entry.count = old_entry.count.saturating_sub(1);
162                    core.insert_entry(current, old_entry);
163                }
164            }
165
166            // Overwrite the assignment and increment the new shard count.
167            core.insert_assignment(key, shard);
168            entry.count = entry.count.saturating_add(1);
169            core.insert_entry(shard, entry);
170
171            Ok(())
172        })
173    }
174
175    /// Remove a tenant assignment, if present.
176    ///
177    /// Returns the shard principal that previously held the assignment.
178    pub fn unassign(pool: &str, tenant: &str) -> Result<Option<Principal>, Error> {
179        ShardingRegistry::with_mut(|core| {
180            let key =
181                ShardKey::try_new(pool, tenant).map_err(ShardingRegistryOpsError::InvalidKey)?;
182            let Some(shard) = core.remove_assignment(&key) else {
183                return Ok(None);
184            };
185
186            if let Some(mut entry) = core.get_entry(&shard) {
187                entry.count = entry.count.saturating_sub(1);
188                core.insert_entry(shard, entry);
189            }
190
191            Ok(Some(shard))
192        })
193    }
194
195    /// Update the logical slot index for a shard entry.
196    pub fn set_slot(pid: Principal, slot: u32) -> Result<(), Error> {
197        ShardingRegistry::with_mut(|core| {
198            let mut entry = core
199                .get_entry(&pid)
200                .ok_or(ShardingRegistryOpsError::ShardNotFound(pid))?;
201
202            if slot != ShardEntry::UNASSIGNED_SLOT {
203                for (other_pid, other_entry) in core.all_entries() {
204                    if other_pid != pid
205                        && other_entry.pool == entry.pool
206                        && other_entry.slot == slot
207                    {
208                        return Err(ShardingRegistryOpsError::SlotOccupied {
209                            pool: entry.pool.to_string(),
210                            slot,
211                            pid: other_pid,
212                        }
213                        .into());
214                    }
215                }
216            }
217
218            entry.slot = slot;
219            core.insert_entry(pid, entry);
220
221            Ok(())
222        })
223    }
224
225    #[cfg(test)]
226    pub(crate) fn clear_for_test() {
227        ShardingRegistry::clear();
228    }
229}
230
231///
232/// TESTS
233///
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238
239    fn p(id: u8) -> Principal {
240        Principal::from_slice(&[id; 29])
241    }
242
243    #[test]
244    fn assign_and_unassign_updates_count() {
245        ShardingRegistryOps::clear_for_test();
246        let role = CanisterRole::new("alpha");
247        let shard_pid = p(1);
248
249        ShardingRegistryOps::create(shard_pid, "poolA", 0, &role, 2).unwrap();
250        ShardingRegistryOps::assign("poolA", "tenant1", shard_pid).unwrap();
251        let count_after = ShardingRegistryOps::get(shard_pid).unwrap().count;
252        assert_eq!(count_after, 1);
253
254        assert_eq!(
255            ShardingRegistryOps::unassign("poolA", "tenant1").unwrap(),
256            Some(shard_pid)
257        );
258        let count_final = ShardingRegistryOps::get(shard_pid).unwrap().count;
259        assert_eq!(count_final, 0);
260    }
261}