Skip to main content

icydb_core/obs/snapshot/
mod.rs

1use crate::{
2    db::{
3        Db, ensure_recovered,
4        identity::EntityName,
5        index::IndexKey,
6        store::{DataKey, StorageKey},
7    },
8    error::InternalError,
9    traits::CanisterKind,
10    value::Value,
11};
12use candid::CandidType;
13use serde::{Deserialize, Serialize};
14use std::collections::BTreeMap;
15
16///
17/// StorageReport
18/// Live storage snapshot report
19///
20
21#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
22pub struct StorageReport {
23    pub storage_data: Vec<DataStoreSnapshot>,
24    pub storage_index: Vec<IndexStoreSnapshot>,
25    pub entity_storage: Vec<EntitySnapshot>,
26    pub corrupted_keys: u64,
27    pub corrupted_entries: u64,
28}
29
30///
31/// DataStoreSnapshot
32/// Store-level snapshot metrics.
33///
34
35#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
36pub struct DataStoreSnapshot {
37    pub path: String,
38    pub entries: u64,
39    pub memory_bytes: u64,
40}
41
42///
43/// IndexStoreSnapshot
44/// Index-store snapshot metrics
45///
46
47#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
48pub struct IndexStoreSnapshot {
49    pub path: String,
50    pub entries: u64,
51    pub user_entries: u64,
52    pub system_entries: u64,
53    pub memory_bytes: u64,
54}
55
56///
57/// EntitySnapshot
58/// Per-entity storage breakdown across stores
59///
60
61#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
62pub struct EntitySnapshot {
63    /// Store path (e.g., icydb_schema_tests::schema::TestDataStore)
64    pub store: String,
65
66    /// Entity path (e.g., icydb_schema_tests::canister::db::Index)
67    pub path: String,
68
69    /// Number of rows for this entity in the store
70    pub entries: u64,
71
72    /// Approximate bytes used (key + value)
73    pub memory_bytes: u64,
74
75    /// Minimum primary key for this entity (entity-local ordering)
76    pub min_key: Option<Value>,
77
78    /// Maximum primary key for this entity (entity-local ordering)
79    pub max_key: Option<Value>,
80}
81
82///
83/// EntityStats
84/// Internal struct for building per-entity stats before snapshotting.
85///
86
87#[derive(Default)]
88struct EntityStats {
89    entries: u64,
90    memory_bytes: u64,
91    min_key: Option<StorageKey>,
92    max_key: Option<StorageKey>,
93}
94
95impl EntityStats {
96    fn update(&mut self, dk: &DataKey, value_len: u64) {
97        self.entries = self.entries.saturating_add(1);
98        self.memory_bytes = self
99            .memory_bytes
100            .saturating_add(DataKey::entry_size_bytes(value_len));
101
102        let k = dk.storage_key();
103
104        match &mut self.min_key {
105            Some(min) if k < *min => *min = k,
106            None => self.min_key = Some(k),
107            _ => {}
108        }
109
110        match &mut self.max_key {
111            Some(max) if k > *max => *max = k,
112            None => self.max_key = Some(k),
113            _ => {}
114        }
115    }
116}
117
118/// Build storage snapshot and per-entity breakdown; enrich path names using name→path map
119pub fn storage_report<C: CanisterKind>(
120    db: &Db<C>,
121    name_to_path: &[(&'static str, &'static str)],
122) -> Result<StorageReport, InternalError> {
123    ensure_recovered(db)?;
124    // Build name→path map once, reuse across stores
125    let name_map: BTreeMap<&'static str, &str> = name_to_path.iter().copied().collect();
126    let mut data = Vec::new();
127    let mut index = Vec::new();
128    let mut entity_storage: Vec<EntitySnapshot> = Vec::new();
129    let mut corrupted_keys = 0u64;
130    let mut corrupted_entries = 0u64;
131
132    db.with_store_registry(|reg| {
133        reg.iter().for_each(|(path, store_handle)| {
134            store_handle.with_data(|store| {
135                data.push(DataStoreSnapshot {
136                    path: path.to_string(),
137                    entries: store.len(),
138                    memory_bytes: store.memory_bytes(),
139                });
140
141                // Track per-entity counts, memory, and min/max Keys (not DataKeys)
142                let mut by_entity: BTreeMap<EntityName, EntityStats> = BTreeMap::new();
143
144                for entry in store.iter() {
145                    let Ok(dk) = DataKey::try_from_raw(entry.key()) else {
146                        corrupted_keys = corrupted_keys.saturating_add(1);
147                        continue;
148                    };
149
150                    let value_len = entry.value().len() as u64;
151
152                    by_entity
153                        .entry(*dk.entity_name())
154                        .or_default()
155                        .update(&dk, value_len);
156                }
157
158                for (entity_name, stats) in by_entity {
159                    let path_name = name_map.get(entity_name.as_str()).copied().unwrap_or("");
160                    entity_storage.push(EntitySnapshot {
161                        store: path.to_string(),
162                        path: path_name.to_string(),
163                        entries: stats.entries,
164                        memory_bytes: stats.memory_bytes,
165                        min_key: stats.min_key.map(|key| key.as_value()),
166                        max_key: stats.max_key.map(|key| key.as_value()),
167                    });
168                }
169            });
170        });
171    });
172
173    db.with_store_registry(|reg| {
174        reg.iter().for_each(|(path, store_handle)| {
175            store_handle.with_index(|store| {
176                let mut user_entries = 0u64;
177                let mut system_entries = 0u64;
178
179                for (key, value) in store.entries() {
180                    let Ok(decoded_key) = IndexKey::try_from_raw(&key) else {
181                        corrupted_entries = corrupted_entries.saturating_add(1);
182                        continue;
183                    };
184
185                    if decoded_key.uses_system_namespace() {
186                        system_entries = system_entries.saturating_add(1);
187                    } else {
188                        user_entries = user_entries.saturating_add(1);
189                    }
190
191                    if value.validate().is_err() {
192                        corrupted_entries = corrupted_entries.saturating_add(1);
193                    }
194                }
195
196                index.push(IndexStoreSnapshot {
197                    path: path.to_string(),
198                    entries: store.len(),
199                    user_entries,
200                    system_entries,
201                    memory_bytes: store.memory_bytes(),
202                });
203            });
204        });
205    });
206
207    Ok(StorageReport {
208        storage_data: data,
209        storage_index: index,
210        entity_storage,
211        corrupted_keys,
212        corrupted_entries,
213    })
214}
215
216#[cfg(test)]
217mod tests {
218    use crate::{
219        db::{
220            Db,
221            identity::{EntityName, IndexName},
222            index::{IndexId, IndexKey, IndexKeyKind, IndexStore, RawIndexEntry, RawIndexKey},
223            init_commit_store_for_tests,
224            store::{DataKey, DataStore, RawDataKey, RawRow, StorageKey, StoreRegistry},
225        },
226        obs::snapshot::storage_report,
227        test_support::test_memory,
228        traits::{CanisterKind, Path, Storable},
229    };
230    use std::{borrow::Cow, cell::RefCell};
231
232    const STORE_PATH: &str = "snapshot_tests::Store";
233
234    struct SnapshotTestCanister;
235
236    impl Path for SnapshotTestCanister {
237        const PATH: &'static str = "snapshot_tests::Canister";
238    }
239
240    impl CanisterKind for SnapshotTestCanister {}
241
242    thread_local! {
243        static SNAPSHOT_DATA_STORE: RefCell<DataStore> = RefCell::new(DataStore::init(test_memory(101)));
244        static SNAPSHOT_INDEX_STORE: RefCell<IndexStore> =
245            RefCell::new(IndexStore::init(test_memory(102)));
246        static SNAPSHOT_STORE_REGISTRY: StoreRegistry = {
247            let mut reg = StoreRegistry::new();
248            reg.register_store(STORE_PATH, &SNAPSHOT_DATA_STORE, &SNAPSHOT_INDEX_STORE)
249                .expect("snapshot store registration should succeed");
250            reg
251        };
252    }
253
254    static DB: Db<SnapshotTestCanister> = Db::new(&SNAPSHOT_STORE_REGISTRY);
255
256    fn with_snapshot_store<R>(f: impl FnOnce(crate::db::store::StoreHandle) -> R) -> R {
257        DB.with_store_registry(|reg| reg.try_get_store(STORE_PATH).map(f))
258            .expect("snapshot store access should succeed")
259    }
260
261    fn reset_snapshot_state() {
262        init_commit_store_for_tests().expect("commit store init should succeed");
263
264        with_snapshot_store(|store| {
265            store.with_data_mut(DataStore::clear);
266            store.with_index_mut(IndexStore::clear);
267        });
268    }
269
270    #[test]
271    fn storage_report_lists_registered_store_snapshots() {
272        reset_snapshot_state();
273
274        let report = storage_report(&DB, &[]).expect("storage report should succeed");
275        assert_eq!(report.storage_data.len(), 1);
276        assert_eq!(report.storage_data[0].path, STORE_PATH);
277        assert_eq!(report.storage_data[0].entries, 0);
278        assert_eq!(report.storage_index.len(), 1);
279        assert_eq!(report.storage_index[0].path, STORE_PATH);
280        assert_eq!(report.storage_index[0].entries, 0);
281        assert_eq!(report.storage_index[0].user_entries, 0);
282        assert_eq!(report.storage_index[0].system_entries, 0);
283        assert!(report.entity_storage.is_empty());
284        assert_eq!(report.corrupted_keys, 0);
285        assert_eq!(report.corrupted_entries, 0);
286    }
287
288    #[test]
289    fn storage_report_counts_entity_rows_and_corrupted_index_entries() {
290        reset_snapshot_state();
291
292        let data_key = DataKey::max_storable()
293            .to_raw()
294            .expect("max storable data key should encode");
295        let row = RawRow::try_new(vec![1, 2, 3]).expect("row bytes should be valid");
296        with_snapshot_store(|store| {
297            store.with_data_mut(|data_store| {
298                data_store.insert(data_key, row);
299            });
300        });
301
302        let index_key = IndexKey::empty(IndexId::max_storable()).to_raw();
303        let malformed_index_entry = RawIndexEntry::from_bytes(Cow::Owned(vec![0, 0, 0, 0]));
304        with_snapshot_store(|store| {
305            store.with_index_mut(|index_store| {
306                index_store.insert(index_key, malformed_index_entry);
307            });
308        });
309
310        let report = storage_report(&DB, &[]).expect("storage report should succeed");
311        assert_eq!(report.storage_data[0].entries, 1);
312        assert_eq!(report.storage_index[0].entries, 1);
313        assert_eq!(report.storage_index[0].user_entries, 1);
314        assert_eq!(report.storage_index[0].system_entries, 0);
315        assert_eq!(report.entity_storage.len(), 1);
316        assert_eq!(report.entity_storage[0].path, "");
317        assert_eq!(report.entity_storage[0].entries, 1);
318        assert!(report.entity_storage[0].min_key.is_some());
319        assert!(report.entity_storage[0].max_key.is_some());
320        assert_eq!(report.corrupted_entries, 1);
321        assert_eq!(report.corrupted_keys, 0);
322    }
323
324    #[test]
325    fn storage_report_counts_corrupted_data_keys_without_entity_rollup() {
326        reset_snapshot_state();
327
328        let malformed_raw_key =
329            RawDataKey::from_bytes(Cow::Owned(vec![0u8; DataKey::STORED_SIZE_USIZE]));
330        let row = RawRow::try_new(vec![9, 9, 9]).expect("row bytes should be valid");
331        with_snapshot_store(|store| {
332            store.with_data_mut(|data_store| {
333                data_store.insert(malformed_raw_key, row);
334            });
335        });
336
337        let report = storage_report(&DB, &[]).expect("storage report should succeed");
338        assert_eq!(report.storage_data[0].entries, 1);
339        assert_eq!(report.corrupted_keys, 1);
340        assert!(
341            report.entity_storage.is_empty(),
342            "rows with corrupt data keys must not contribute to per-entity stats"
343        );
344    }
345
346    #[test]
347    fn storage_report_counts_corrupted_index_keys_without_user_or_system_split() {
348        reset_snapshot_state();
349
350        let malformed_raw_key =
351            RawIndexKey::from_bytes(Cow::Owned(vec![0u8; IndexKey::STORED_SIZE_USIZE]));
352        let entry = RawIndexEntry::try_from_keys([StorageKey::max_storable()])
353            .expect("entry should encode");
354        with_snapshot_store(|store| {
355            store.with_index_mut(|index_store| {
356                index_store.insert(malformed_raw_key, entry);
357            });
358        });
359
360        let report = storage_report(&DB, &[]).expect("storage report should succeed");
361        assert_eq!(report.storage_index[0].entries, 1);
362        assert_eq!(report.storage_index[0].user_entries, 0);
363        assert_eq!(report.storage_index[0].system_entries, 0);
364        assert_eq!(report.corrupted_entries, 1);
365    }
366
367    #[test]
368    fn storage_report_splits_user_and_system_index_entries() {
369        reset_snapshot_state();
370
371        let entity = EntityName::try_from_str("snapshot_entity").expect("entity name should parse");
372        let user_index = IndexName::try_from_parts(&entity, &["email"]).expect("index name");
373        let system_index = IndexName::try_from_parts(&entity, &["~ri"]).expect("index name");
374        let user_key = IndexKey::empty(IndexId(user_index)).to_raw();
375        let system_key =
376            IndexKey::empty_with_kind(IndexId(system_index), IndexKeyKind::System).to_raw();
377        let entry = RawIndexEntry::try_from_keys([StorageKey::max_storable()])
378            .expect("entry should encode");
379
380        with_snapshot_store(|store| {
381            store.with_index_mut(|index_store| {
382                index_store.insert(user_key, entry.clone());
383                index_store.insert(system_key, entry);
384            });
385        });
386
387        let report = storage_report(&DB, &[]).expect("storage report should succeed");
388        assert_eq!(report.storage_index[0].entries, 2);
389        assert_eq!(report.storage_index[0].user_entries, 1);
390        assert_eq!(report.storage_index[0].system_entries, 1);
391        assert_eq!(report.corrupted_entries, 0);
392    }
393}