icydb_core/db/store/
index.rs

1use crate::{
2    Error, IndexSpec, Key, MAX_INDEX_FIELDS, Value,
3    db::{
4        executor::ExecutorError,
5        store::{DataKey, StoreRegistry},
6    },
7    obs::metrics,
8    traits::EntityKind,
9};
10use candid::CandidType;
11use canic::{
12    cdk::structures::{BTreeMap, DefaultMemoryImpl, memory::VirtualMemory},
13    impl_storable_bounded, impl_storable_unbounded,
14    utils::hash::hash_u64,
15};
16use derive_more::{Deref, DerefMut, Display};
17use serde::{Deserialize, Serialize};
18use std::{
19    collections::HashSet,
20    fmt::{self, Display},
21};
22
23///
24/// IndexStoreRegistry
25///
26
27#[derive(Deref, DerefMut)]
28pub struct IndexStoreRegistry(StoreRegistry<IndexStore>);
29
30impl IndexStoreRegistry {
31    #[must_use]
32    #[allow(clippy::new_without_default)]
33    pub fn new() -> Self {
34        Self(StoreRegistry::new())
35    }
36}
37
38///
39/// IndexStore
40///
41
42#[derive(Deref, DerefMut)]
43pub struct IndexStore(BTreeMap<IndexKey, IndexEntry, VirtualMemory<DefaultMemoryImpl>>);
44
45impl IndexStore {
46    #[must_use]
47    pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
48        Self(BTreeMap::init(memory))
49    }
50
51    /// Inserts the given entity into the index defined by `I`.
52    /// - If `I::UNIQUE`, insertion will fail if a conflicting entry already exists.
53    /// - If the entity is missing required fields for this index, insertion is skipped.
54    pub fn insert_index_entry<E: EntityKind>(
55        &mut self,
56        entity: &E,
57        index: &IndexSpec,
58    ) -> Result<(), Error> {
59        // Skip if index key can't be built (e.g. optional fields missing)
60        let Some(index_key) = IndexKey::new(entity, index) else {
61            return Ok(());
62        };
63        let key = entity.key();
64
65        if let Some(mut existing) = self.get(&index_key) {
66            if index.unique {
67                // Already present → skip redundant write
68                if existing.contains(&key) {
69                    return Ok(());
70                }
71
72                // Any different existing key violates UNIQUE
73                if !existing.is_empty() {
74                    metrics::with_state_mut(|m| metrics::record_unique_violation_for::<E>(m));
75
76                    return Err(ExecutorError::index_violation(E::PATH, index.fields))?;
77                }
78
79                self.insert(index_key.clone(), IndexEntry::new(index.fields, key));
80            } else {
81                existing.insert_key(key); // <-- add to the set
82                self.insert(index_key.clone(), existing);
83            }
84        } else {
85            self.insert(index_key, IndexEntry::new(index.fields, key));
86        }
87        metrics::with_state_mut(|m| {
88            m.ops.index_inserts += 1;
89            let entry = m.entities.entry(E::PATH.to_string()).or_default();
90            entry.index_inserts = entry.index_inserts.saturating_add(1);
91        });
92
93        Ok(())
94    }
95
96    // remove_index_entry
97    pub fn remove_index_entry<E: EntityKind>(&mut self, entity: &E, index: &IndexSpec) {
98        // Skip if index key can't be built (e.g. optional fields missing)
99        let Some(index_key) = IndexKey::new(entity, index) else {
100            return;
101        };
102
103        if let Some(mut existing) = self.get(&index_key) {
104            existing.remove_key(&entity.key()); // remove from the set
105
106            if existing.is_empty() {
107                self.remove(&index_key);
108            } else {
109                // Move the updated entry back without cloning
110                self.insert(index_key, existing);
111            }
112            metrics::with_state_mut(|m| {
113                m.ops.index_removes += 1;
114                let entry = m.entities.entry(E::PATH.to_string()).or_default();
115                entry.index_removes = entry.index_removes.saturating_add(1);
116            });
117        }
118    }
119
120    #[must_use]
121    pub fn resolve_data_values<E: EntityKind>(
122        &self,
123        index: &IndexSpec,
124        prefix: &[Value],
125    ) -> Vec<DataKey> {
126        let mut out = Vec::new();
127
128        for (_, entry) in self.iter_with_hashed_prefix::<E>(index, prefix) {
129            out.extend(entry.keys.iter().map(|&k| DataKey::new::<E>(k)));
130        }
131
132        out
133    }
134
135    pub fn memory_bytes(&self) -> u64 {
136        self.iter()
137            .map(|entry| u64::from(IndexKey::STORABLE_MAX_SIZE) + entry.value().len() as u64)
138            .sum()
139    }
140
141    /// Internal: iterate entries for this index whose `hashed_values` start with the hashed `prefix`.
142    /// Uses a bounded range for efficient scanning.
143    fn iter_with_hashed_prefix<E: EntityKind>(
144        &self,
145        index: &IndexSpec,
146        prefix: &[Value],
147    ) -> impl Iterator<Item = (IndexKey, IndexEntry)> {
148        let index_id = IndexId::new::<E>(index);
149        let hashed_prefix_opt = Self::index_fingerprints(prefix); // Option<Vec<[u8;16]>>
150
151        // Compute start..end bounds. If the prefix isn't indexable, construct an empty range
152        // (same iterator type) by using identical start==end under the same index_id.
153        let (start_key, end_key) = if let Some(hp) = hashed_prefix_opt {
154            IndexKey::bounds_for_prefix(index_id, hp)
155        } else {
156            (
157                IndexKey {
158                    index_id,
159                    hashed_values: Vec::new(),
160                },
161                IndexKey {
162                    index_id,
163                    hashed_values: Vec::new(),
164                },
165            )
166        };
167
168        self.range(start_key..end_key)
169            .map(|entry| (entry.key().clone(), entry.value()))
170    }
171
172    fn index_fingerprints(values: &[Value]) -> Option<Vec<[u8; 16]>> {
173        // collects to Option<Vec<_>>: None if any element was non-indexable
174        values.iter().map(Value::to_index_fingerprint).collect()
175    }
176}
177
178///
179/// IndexId
180///
181
182#[derive(
183    Clone,
184    Debug,
185    Display,
186    Copy,
187    PartialEq,
188    Eq,
189    Hash,
190    PartialOrd,
191    Ord,
192    CandidType,
193    Serialize,
194    Deserialize,
195)]
196pub struct IndexId(u64);
197
198impl IndexId {
199    #[must_use]
200    pub fn new<E: EntityKind>(index: &IndexSpec) -> Self {
201        Self::from_path_and_fields(E::PATH, index.fields)
202    }
203
204    fn from_path_and_fields(path: &str, fields: &[&str]) -> Self {
205        let cap = path.len() + fields.iter().map(|f| f.len() + 1).sum::<usize>();
206        let mut buffer = Vec::with_capacity(cap);
207
208        // much more efficient than format
209        buffer.extend_from_slice(path.as_bytes());
210        for field in fields {
211            buffer.extend_from_slice(field.as_bytes());
212            buffer.extend_from_slice(b"|");
213        }
214
215        Self(hash_u64(&buffer))
216    }
217
218    #[must_use]
219    pub fn max_storable() -> Self {
220        Self::from_path_and_fields(
221            "path::to::long::entity::name::Entity",
222            &[
223                "long_field_one",
224                "long_field_two",
225                "long_field_three",
226                "long_field_four",
227            ],
228        )
229    }
230}
231
232///
233/// IndexKey
234///
235
236#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
237pub struct IndexKey {
238    pub index_id: IndexId,
239    pub hashed_values: Vec<[u8; 16]>,
240}
241
242impl IndexKey {
243    // Sized with headroom for worst‑case hashed key payload
244    pub const STORABLE_MAX_SIZE: u32 = 180;
245
246    #[must_use]
247    pub fn new<E: EntityKind>(entity: &E, index: &IndexSpec) -> Option<Self> {
248        let mut hashed_values = Vec::<[u8; 16]>::with_capacity(index.fields.len());
249
250        // get each value and convert to key
251        for field in index.fields {
252            let value = entity.get_value(field)?;
253            let fp = value.to_index_fingerprint()?; // bail if any component is non-indexable
254
255            hashed_values.push(fp);
256        }
257
258        Some(Self {
259            index_id: IndexId::new::<E>(index),
260            hashed_values,
261        })
262    }
263
264    // max_storable
265    #[must_use]
266    pub fn max_storable() -> Self {
267        Self {
268            index_id: IndexId::max_storable(),
269            hashed_values: (0..MAX_INDEX_FIELDS).map(|_| [u8::MAX; 16]).collect(),
270        }
271    }
272
273    /// Compute the bounded start..end keys for a given hashed prefix under an index id.
274    /// End is exclusive and created by appending a single 0xFF..0xFF block to the prefix.
275    #[must_use]
276    pub fn bounds_for_prefix(index_id: IndexId, mut prefix: Vec<[u8; 16]>) -> (Self, Self) {
277        let start = Self {
278            index_id,
279            hashed_values: prefix.clone(),
280        };
281        prefix.push([0xFF; 16]);
282        let end = Self {
283            index_id,
284            hashed_values: prefix,
285        };
286        (start, end)
287    }
288}
289
290impl Display for IndexKey {
291    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
292        write!(
293            f,
294            "id: {}, values: {}",
295            self.index_id,
296            self.hashed_values.len()
297        )
298    }
299}
300
301impl_storable_bounded!(IndexKey, IndexKey::STORABLE_MAX_SIZE, false);
302
303///
304/// IndexEntry
305///
306
307#[derive(CandidType, Clone, Debug, Deserialize, Serialize)]
308pub struct IndexEntry {
309    fields: Vec<String>,
310    keys: HashSet<Key>,
311}
312
313impl IndexEntry {
314    #[must_use]
315    pub fn new(fields: &[&str], key: Key) -> Self {
316        let mut key_set = HashSet::with_capacity(1);
317        key_set.insert(key);
318
319        Self {
320            fields: fields.iter().map(ToString::to_string).collect(),
321            keys: key_set,
322        }
323    }
324
325    pub fn insert_key(&mut self, key: Key) {
326        let _ = self.keys.insert(key);
327    }
328
329    /// Removes the key from the set.
330    pub fn remove_key(&mut self, key: &Key) {
331        let _ = self.keys.remove(key);
332    }
333
334    /// Checks if the set contains the key.
335    #[must_use]
336    pub fn contains(&self, key: &Key) -> bool {
337        self.keys.contains(key)
338    }
339
340    /// Returns `true` if the set is empty.
341    #[must_use]
342    pub fn is_empty(&self) -> bool {
343        self.keys.is_empty()
344    }
345
346    /// Returns number of keys in the entry.
347    #[must_use]
348    pub fn len(&self) -> usize {
349        self.keys.len()
350    }
351
352    /// Returns keys as a sorted `Vec<Key>` (useful for serialization/debug).
353    #[must_use]
354    pub fn to_sorted_vec(&self) -> Vec<Key> {
355        let mut keys: Vec<_> = self.keys.iter().copied().collect();
356        keys.sort_unstable(); // uses Ord, fast
357        keys
358    }
359}
360
361impl_storable_unbounded!(IndexEntry);
362
363///
364/// TESTS
365///
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370    use crate::traits::*;
371
372    #[test]
373    fn index_key_max_size_is_bounded() {
374        let index_key = IndexKey::max_storable();
375        let size = Storable::to_bytes(&index_key).len();
376
377        assert!(
378            size <= IndexKey::STORABLE_MAX_SIZE as usize,
379            "serialized IndexKey too large: got {size} bytes (limit {})",
380            IndexKey::STORABLE_MAX_SIZE
381        );
382    }
383
384    #[test]
385    fn index_entry_round_trip() {
386        let original = IndexEntry::new(&["a", "b"], Key::from(1u64));
387        let encoded = Storable::to_bytes(&original);
388        let decoded = IndexEntry::from_bytes(encoded);
389
390        assert_eq!(original.fields, decoded.fields);
391        assert_eq!(original.to_sorted_vec(), decoded.to_sorted_vec());
392    }
393}