Skip to main content

grafeo_core/graph/lpg/
property.rs

1//! Property storage for the LPG model.
2//!
3//! This module provides columnar property storage optimized for
4//! efficient scanning and filtering. Includes zone map support for
5//! predicate pushdown and data skipping.
6
7use crate::index::zone_map::ZoneMapEntry;
8use grafeo_common::types::{EdgeId, NodeId, PropertyKey, Value};
9use grafeo_common::utils::hash::FxHashMap;
10use parking_lot::RwLock;
11use std::cmp::Ordering;
12use std::hash::Hash;
13use std::marker::PhantomData;
14
15/// Comparison operator for zone map predicate checks.
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum CompareOp {
18    /// Equal to value.
19    Eq,
20    /// Not equal to value.
21    Ne,
22    /// Less than value.
23    Lt,
24    /// Less than or equal to value.
25    Le,
26    /// Greater than value.
27    Gt,
28    /// Greater than or equal to value.
29    Ge,
30}
31
32/// Trait for entity IDs that can be used as property storage keys.
33pub trait EntityId: Copy + Eq + Hash + 'static {}
34
35impl EntityId for NodeId {}
36impl EntityId for EdgeId {}
37
38/// Columnar property storage.
39///
40/// Properties are stored in a columnar format where each property key
41/// has its own column. This enables efficient filtering and scanning
42/// of specific properties across many entities.
43///
44/// Generic over the entity ID type (NodeId or EdgeId).
45pub struct PropertyStorage<Id: EntityId = NodeId> {
46    /// Map from property key to column.
47    columns: RwLock<FxHashMap<PropertyKey, PropertyColumn<Id>>>,
48    _marker: PhantomData<Id>,
49}
50
51impl<Id: EntityId> PropertyStorage<Id> {
52    /// Creates a new property storage.
53    #[must_use]
54    pub fn new() -> Self {
55        Self {
56            columns: RwLock::new(FxHashMap::default()),
57            _marker: PhantomData,
58        }
59    }
60
61    /// Sets a property value for an entity.
62    pub fn set(&self, id: Id, key: PropertyKey, value: Value) {
63        let mut columns = self.columns.write();
64        columns
65            .entry(key)
66            .or_insert_with(PropertyColumn::new)
67            .set(id, value);
68    }
69
70    /// Gets a property value for an entity.
71    #[must_use]
72    pub fn get(&self, id: Id, key: &PropertyKey) -> Option<Value> {
73        let columns = self.columns.read();
74        columns.get(key).and_then(|col| col.get(id))
75    }
76
77    /// Removes a property value for an entity.
78    pub fn remove(&self, id: Id, key: &PropertyKey) -> Option<Value> {
79        let mut columns = self.columns.write();
80        columns.get_mut(key).and_then(|col| col.remove(id))
81    }
82
83    /// Removes all properties for an entity.
84    pub fn remove_all(&self, id: Id) {
85        let mut columns = self.columns.write();
86        for col in columns.values_mut() {
87            col.remove(id);
88        }
89    }
90
91    /// Gets all properties for an entity.
92    #[must_use]
93    pub fn get_all(&self, id: Id) -> FxHashMap<PropertyKey, Value> {
94        let columns = self.columns.read();
95        let mut result = FxHashMap::default();
96        for (key, col) in columns.iter() {
97            if let Some(value) = col.get(id) {
98                result.insert(key.clone(), value);
99            }
100        }
101        result
102    }
103
104    /// Returns the number of property columns.
105    #[must_use]
106    pub fn column_count(&self) -> usize {
107        self.columns.read().len()
108    }
109
110    /// Returns the keys of all columns.
111    #[must_use]
112    pub fn keys(&self) -> Vec<PropertyKey> {
113        self.columns.read().keys().cloned().collect()
114    }
115
116    /// Gets a column by key for bulk access.
117    #[must_use]
118    pub fn column(&self, key: &PropertyKey) -> Option<PropertyColumnRef<'_, Id>> {
119        let columns = self.columns.read();
120        if columns.contains_key(key) {
121            Some(PropertyColumnRef {
122                _guard: columns,
123                key: key.clone(),
124                _marker: PhantomData,
125            })
126        } else {
127            None
128        }
129    }
130
131    /// Checks if a predicate on a property might match any values.
132    ///
133    /// Returns `true` if the property column might contain matching values,
134    /// `false` if it definitely doesn't. Returns `true` if the property doesn't exist.
135    #[must_use]
136    pub fn might_match(&self, key: &PropertyKey, op: CompareOp, value: &Value) -> bool {
137        let columns = self.columns.read();
138        columns
139            .get(key)
140            .map(|col| col.might_match(op, value))
141            .unwrap_or(true) // No column = assume might match (conservative)
142    }
143
144    /// Gets the zone map for a property column.
145    #[must_use]
146    pub fn zone_map(&self, key: &PropertyKey) -> Option<ZoneMapEntry> {
147        let columns = self.columns.read();
148        columns.get(key).map(|col| col.zone_map().clone())
149    }
150
151    /// Rebuilds zone maps for all columns (call after bulk removes).
152    pub fn rebuild_zone_maps(&self) {
153        let mut columns = self.columns.write();
154        for col in columns.values_mut() {
155            col.rebuild_zone_map();
156        }
157    }
158}
159
160impl<Id: EntityId> Default for PropertyStorage<Id> {
161    fn default() -> Self {
162        Self::new()
163    }
164}
165
166/// A single property column with zone map tracking.
167///
168/// Stores values for a specific property key across all entities.
169/// Maintains zone map metadata (min/max/null_count) for predicate pushdown.
170pub struct PropertyColumn<Id: EntityId = NodeId> {
171    /// Sparse storage: entity ID -> value.
172    /// For dense properties, this could be optimized to a flat vector.
173    values: FxHashMap<Id, Value>,
174    /// Zone map tracking min/max/null_count for predicate pushdown.
175    zone_map: ZoneMapEntry,
176    /// Whether zone map needs rebuild (after removes).
177    zone_map_dirty: bool,
178}
179
180impl<Id: EntityId> PropertyColumn<Id> {
181    /// Creates a new empty column.
182    #[must_use]
183    pub fn new() -> Self {
184        Self {
185            values: FxHashMap::default(),
186            zone_map: ZoneMapEntry::new(),
187            zone_map_dirty: false,
188        }
189    }
190
191    /// Sets a value for an entity.
192    pub fn set(&mut self, id: Id, value: Value) {
193        // Update zone map incrementally
194        self.update_zone_map_on_insert(&value);
195        self.values.insert(id, value);
196    }
197
198    /// Updates zone map when inserting a value.
199    fn update_zone_map_on_insert(&mut self, value: &Value) {
200        self.zone_map.row_count += 1;
201
202        if matches!(value, Value::Null) {
203            self.zone_map.null_count += 1;
204            return;
205        }
206
207        // Update min
208        match &self.zone_map.min {
209            None => self.zone_map.min = Some(value.clone()),
210            Some(current) => {
211                if compare_values(value, current) == Some(Ordering::Less) {
212                    self.zone_map.min = Some(value.clone());
213                }
214            }
215        }
216
217        // Update max
218        match &self.zone_map.max {
219            None => self.zone_map.max = Some(value.clone()),
220            Some(current) => {
221                if compare_values(value, current) == Some(Ordering::Greater) {
222                    self.zone_map.max = Some(value.clone());
223                }
224            }
225        }
226    }
227
228    /// Gets a value for an entity.
229    #[must_use]
230    pub fn get(&self, id: Id) -> Option<Value> {
231        self.values.get(&id).cloned()
232    }
233
234    /// Removes a value for an entity.
235    pub fn remove(&mut self, id: Id) -> Option<Value> {
236        let removed = self.values.remove(&id);
237        if removed.is_some() {
238            // Mark zone map as dirty - would need full rebuild for accurate min/max
239            self.zone_map_dirty = true;
240        }
241        removed
242    }
243
244    /// Returns the number of values in this column.
245    #[must_use]
246    #[allow(dead_code)]
247    pub fn len(&self) -> usize {
248        self.values.len()
249    }
250
251    /// Returns true if this column is empty.
252    #[must_use]
253    #[allow(dead_code)]
254    pub fn is_empty(&self) -> bool {
255        self.values.is_empty()
256    }
257
258    /// Iterates over all (id, value) pairs.
259    #[allow(dead_code)]
260    pub fn iter(&self) -> impl Iterator<Item = (Id, &Value)> {
261        self.values.iter().map(|(&id, v)| (id, v))
262    }
263
264    /// Returns the zone map for this column.
265    #[must_use]
266    pub fn zone_map(&self) -> &ZoneMapEntry {
267        &self.zone_map
268    }
269
270    /// Checks if a predicate might match any values in this column.
271    ///
272    /// Returns `true` if the column might contain matching values,
273    /// `false` if it definitely doesn't (allowing the column to be skipped).
274    #[must_use]
275    pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
276        if self.zone_map_dirty {
277            // Conservative: can't skip if zone map is stale
278            return true;
279        }
280
281        match op {
282            CompareOp::Eq => self.zone_map.might_contain_equal(value),
283            CompareOp::Ne => {
284                // Can only skip if all values are equal to the value
285                // (which means min == max == value)
286                match (&self.zone_map.min, &self.zone_map.max) {
287                    (Some(min), Some(max)) => {
288                        !(compare_values(min, value) == Some(Ordering::Equal)
289                            && compare_values(max, value) == Some(Ordering::Equal))
290                    }
291                    _ => true,
292                }
293            }
294            CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
295            CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
296            CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
297            CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
298        }
299    }
300
301    /// Rebuilds zone map from current values.
302    pub fn rebuild_zone_map(&mut self) {
303        let mut zone_map = ZoneMapEntry::new();
304
305        for value in self.values.values() {
306            zone_map.row_count += 1;
307
308            if matches!(value, Value::Null) {
309                zone_map.null_count += 1;
310                continue;
311            }
312
313            // Update min
314            match &zone_map.min {
315                None => zone_map.min = Some(value.clone()),
316                Some(current) => {
317                    if compare_values(value, current) == Some(Ordering::Less) {
318                        zone_map.min = Some(value.clone());
319                    }
320                }
321            }
322
323            // Update max
324            match &zone_map.max {
325                None => zone_map.max = Some(value.clone()),
326                Some(current) => {
327                    if compare_values(value, current) == Some(Ordering::Greater) {
328                        zone_map.max = Some(value.clone());
329                    }
330                }
331            }
332        }
333
334        self.zone_map = zone_map;
335        self.zone_map_dirty = false;
336    }
337}
338
339/// Compares two values for ordering.
340fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
341    match (a, b) {
342        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
343        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
344        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
345        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
346        (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
347        (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
348        _ => None,
349    }
350}
351
352impl<Id: EntityId> Default for PropertyColumn<Id> {
353    fn default() -> Self {
354        Self::new()
355    }
356}
357
358/// A reference to a property column for bulk access.
359pub struct PropertyColumnRef<'a, Id: EntityId = NodeId> {
360    _guard: parking_lot::RwLockReadGuard<'a, FxHashMap<PropertyKey, PropertyColumn<Id>>>,
361    #[allow(dead_code)]
362    key: PropertyKey,
363    _marker: PhantomData<Id>,
364}
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369
370    #[test]
371    fn test_property_storage_basic() {
372        let storage = PropertyStorage::new();
373
374        let node1 = NodeId::new(1);
375        let node2 = NodeId::new(2);
376        let name_key = PropertyKey::new("name");
377        let age_key = PropertyKey::new("age");
378
379        storage.set(node1, name_key.clone(), "Alice".into());
380        storage.set(node1, age_key.clone(), 30i64.into());
381        storage.set(node2, name_key.clone(), "Bob".into());
382
383        assert_eq!(
384            storage.get(node1, &name_key),
385            Some(Value::String("Alice".into()))
386        );
387        assert_eq!(storage.get(node1, &age_key), Some(Value::Int64(30)));
388        assert_eq!(
389            storage.get(node2, &name_key),
390            Some(Value::String("Bob".into()))
391        );
392        assert!(storage.get(node2, &age_key).is_none());
393    }
394
395    #[test]
396    fn test_property_storage_remove() {
397        let storage = PropertyStorage::new();
398
399        let node = NodeId::new(1);
400        let key = PropertyKey::new("name");
401
402        storage.set(node, key.clone(), "Alice".into());
403        assert!(storage.get(node, &key).is_some());
404
405        let removed = storage.remove(node, &key);
406        assert!(removed.is_some());
407        assert!(storage.get(node, &key).is_none());
408    }
409
410    #[test]
411    fn test_property_storage_get_all() {
412        let storage = PropertyStorage::new();
413
414        let node = NodeId::new(1);
415        storage.set(node, PropertyKey::new("name"), "Alice".into());
416        storage.set(node, PropertyKey::new("age"), 30i64.into());
417        storage.set(node, PropertyKey::new("active"), true.into());
418
419        let props = storage.get_all(node);
420        assert_eq!(props.len(), 3);
421    }
422
423    #[test]
424    fn test_property_storage_remove_all() {
425        let storage = PropertyStorage::new();
426
427        let node = NodeId::new(1);
428        storage.set(node, PropertyKey::new("name"), "Alice".into());
429        storage.set(node, PropertyKey::new("age"), 30i64.into());
430
431        storage.remove_all(node);
432
433        assert!(storage.get(node, &PropertyKey::new("name")).is_none());
434        assert!(storage.get(node, &PropertyKey::new("age")).is_none());
435    }
436
437    #[test]
438    fn test_property_column() {
439        let mut col = PropertyColumn::new();
440
441        col.set(NodeId::new(1), "Alice".into());
442        col.set(NodeId::new(2), "Bob".into());
443
444        assert_eq!(col.len(), 2);
445        assert!(!col.is_empty());
446
447        assert_eq!(col.get(NodeId::new(1)), Some(Value::String("Alice".into())));
448
449        col.remove(NodeId::new(1));
450        assert!(col.get(NodeId::new(1)).is_none());
451        assert_eq!(col.len(), 1);
452    }
453}