Skip to main content

grafeo_core/graph/lpg/
property.rs

1//! Columnar property storage for nodes and edges.
2//!
3//! Properties are stored column-wise (all "name" values together, all "age"
4//! values together) rather than row-wise. This makes filtering fast - to find
5//! all nodes where age > 30, we only scan the age column.
6//!
7//! Each column also maintains a zone map (min/max/null_count) enabling the
8//! query optimizer to skip columns entirely when a predicate can't match.
9//!
10//! ## Compression
11//!
12//! Columns can be compressed to save memory. When compression is enabled,
13//! the column automatically selects the best codec based on the data type:
14//!
15//! | Data type | Codec | Typical savings |
16//! |-----------|-------|-----------------|
17//! | Int64 (sorted) | DeltaBitPacked | 5-20x |
18//! | Int64 (small) | BitPacked | 2-16x |
19//! | Int64 (repeated) | RunLength | 2-100x |
20//! | String (low cardinality) | Dictionary | 2-50x |
21//! | Bool | BitVector | 8x |
22
23use crate::index::zone_map::ZoneMapEntry;
24use crate::storage::{
25    CompressedData, CompressionCodec, DictionaryBuilder, DictionaryEncoding, TypeSpecificCompressor,
26};
27use arcstr::ArcStr;
28use grafeo_common::types::{EdgeId, NodeId, PropertyKey, Value};
29use grafeo_common::utils::hash::FxHashMap;
30use parking_lot::RwLock;
31use std::cmp::Ordering;
32use std::hash::Hash;
33use std::marker::PhantomData;
34
35/// Compression mode for property columns.
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
37pub enum CompressionMode {
38    /// Never compress - always use sparse HashMap (default).
39    #[default]
40    None,
41    /// Automatically compress when beneficial (after threshold).
42    Auto,
43    /// Eagerly compress on every flush.
44    Eager,
45}
46
47/// Threshold for automatic compression (number of values).
48const COMPRESSION_THRESHOLD: usize = 1000;
49
50/// Size of the hot buffer for recent writes (before compression).
51/// Larger buffer (4096) keeps more recent data uncompressed for faster reads.
52/// This trades ~64KB of memory overhead per column for 1.5-2x faster point lookups
53/// on recently-written data.
54const HOT_BUFFER_SIZE: usize = 4096;
55
56/// Comparison operators used for zone map predicate checks.
57///
58/// These map directly to GQL comparison operators like `=`, `<`, `>=`.
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum CompareOp {
61    /// Equal to value.
62    Eq,
63    /// Not equal to value.
64    Ne,
65    /// Less than value.
66    Lt,
67    /// Less than or equal to value.
68    Le,
69    /// Greater than value.
70    Gt,
71    /// Greater than or equal to value.
72    Ge,
73}
74
75/// Marker trait for IDs that can key into property storage.
76///
77/// Implemented for [`NodeId`] and [`EdgeId`] - you can store properties on both.
78pub trait EntityId: Copy + Eq + Hash + 'static {}
79
80impl EntityId for NodeId {}
81impl EntityId for EdgeId {}
82
83/// Thread-safe columnar property storage.
84///
85/// Each property key ("name", "age", etc.) gets its own column. This layout
86/// is great for analytical queries that filter on specific properties -
87/// you only touch the columns you need.
88///
89/// Generic over `Id` so the same storage works for nodes and edges.
90///
91/// # Example
92///
93/// ```
94/// use grafeo_core::graph::lpg::PropertyStorage;
95/// use grafeo_common::types::{NodeId, PropertyKey};
96///
97/// let storage = PropertyStorage::new();
98/// let alice = NodeId::new(1);
99///
100/// storage.set(alice, PropertyKey::new("name"), "Alice".into());
101/// storage.set(alice, PropertyKey::new("age"), 30i64.into());
102///
103/// // Fetch all properties at once
104/// let props = storage.get_all(alice);
105/// assert_eq!(props.len(), 2);
106/// ```
107pub struct PropertyStorage<Id: EntityId = NodeId> {
108    /// Map from property key to column.
109    /// Lock order: 9 (nested, acquired via LpgStore::node_properties/edge_properties)
110    columns: RwLock<FxHashMap<PropertyKey, PropertyColumn<Id>>>,
111    /// Default compression mode for new columns.
112    default_compression: CompressionMode,
113    _marker: PhantomData<Id>,
114}
115
116impl<Id: EntityId> PropertyStorage<Id> {
117    /// Creates a new property storage.
118    #[must_use]
119    pub fn new() -> Self {
120        Self {
121            columns: RwLock::new(FxHashMap::default()),
122            default_compression: CompressionMode::None,
123            _marker: PhantomData,
124        }
125    }
126
127    /// Creates a new property storage with compression enabled.
128    #[must_use]
129    pub fn with_compression(mode: CompressionMode) -> Self {
130        Self {
131            columns: RwLock::new(FxHashMap::default()),
132            default_compression: mode,
133            _marker: PhantomData,
134        }
135    }
136
137    /// Sets the default compression mode for new columns.
138    pub fn set_default_compression(&mut self, mode: CompressionMode) {
139        self.default_compression = mode;
140    }
141
142    /// Sets a property value for an entity.
143    pub fn set(&self, id: Id, key: PropertyKey, value: Value) {
144        let mut columns = self.columns.write();
145        let mode = self.default_compression;
146        columns
147            .entry(key)
148            .or_insert_with(|| PropertyColumn::with_compression(mode))
149            .set(id, value);
150    }
151
152    /// Enables compression for a specific column.
153    pub fn enable_compression(&self, key: &PropertyKey, mode: CompressionMode) {
154        let mut columns = self.columns.write();
155        if let Some(col) = columns.get_mut(key) {
156            col.set_compression_mode(mode);
157        }
158    }
159
160    /// Compresses all columns that have compression enabled.
161    pub fn compress_all(&self) {
162        let mut columns = self.columns.write();
163        for col in columns.values_mut() {
164            if col.compression_mode() != CompressionMode::None {
165                col.compress();
166            }
167        }
168    }
169
170    /// Forces compression on all columns regardless of mode.
171    pub fn force_compress_all(&self) {
172        let mut columns = self.columns.write();
173        for col in columns.values_mut() {
174            col.force_compress();
175        }
176    }
177
178    /// Returns compression statistics for all columns.
179    #[must_use]
180    pub fn compression_stats(&self) -> FxHashMap<PropertyKey, CompressionStats> {
181        let columns = self.columns.read();
182        columns
183            .iter()
184            .map(|(key, col)| (key.clone(), col.compression_stats()))
185            .collect()
186    }
187
188    /// Returns the total memory usage of all columns.
189    #[must_use]
190    pub fn memory_usage(&self) -> usize {
191        let columns = self.columns.read();
192        columns
193            .values()
194            .map(|col| col.compression_stats().compressed_size)
195            .sum()
196    }
197
198    /// Gets a property value for an entity.
199    #[must_use]
200    pub fn get(&self, id: Id, key: &PropertyKey) -> Option<Value> {
201        let columns = self.columns.read();
202        columns.get(key).and_then(|col| col.get(id))
203    }
204
205    /// Removes a property value for an entity.
206    pub fn remove(&self, id: Id, key: &PropertyKey) -> Option<Value> {
207        let mut columns = self.columns.write();
208        columns.get_mut(key).and_then(|col| col.remove(id))
209    }
210
211    /// Removes all properties for an entity.
212    pub fn remove_all(&self, id: Id) {
213        let mut columns = self.columns.write();
214        for col in columns.values_mut() {
215            col.remove(id);
216        }
217    }
218
219    /// Gets all properties for an entity.
220    #[must_use]
221    pub fn get_all(&self, id: Id) -> FxHashMap<PropertyKey, Value> {
222        let columns = self.columns.read();
223        let mut result = FxHashMap::default();
224        for (key, col) in columns.iter() {
225            if let Some(value) = col.get(id) {
226                result.insert(key.clone(), value);
227            }
228        }
229        result
230    }
231
232    /// Gets property values for multiple entities in a single lock acquisition.
233    ///
234    /// More efficient than calling [`Self::get`] in a loop because it acquires
235    /// the read lock only once.
236    ///
237    /// # Example
238    ///
239    /// ```
240    /// use grafeo_core::graph::lpg::PropertyStorage;
241    /// use grafeo_common::types::{PropertyKey, Value};
242    /// use grafeo_common::NodeId;
243    ///
244    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
245    /// let key = PropertyKey::new("age");
246    /// let ids = vec![NodeId(1), NodeId(2), NodeId(3)];
247    /// let values = storage.get_batch(&ids, &key);
248    /// // values[i] is the property value for ids[i], or None if not set
249    /// ```
250    #[must_use]
251    pub fn get_batch(&self, ids: &[Id], key: &PropertyKey) -> Vec<Option<Value>> {
252        let columns = self.columns.read();
253        match columns.get(key) {
254            Some(col) => ids.iter().map(|&id| col.get(id)).collect(),
255            None => vec![None; ids.len()],
256        }
257    }
258
259    /// Gets all properties for multiple entities efficiently.
260    ///
261    /// More efficient than calling [`Self::get_all`] in a loop because it
262    /// acquires the read lock only once.
263    ///
264    /// # Example
265    ///
266    /// ```
267    /// use grafeo_core::graph::lpg::PropertyStorage;
268    /// use grafeo_common::types::{PropertyKey, Value};
269    /// use grafeo_common::NodeId;
270    ///
271    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
272    /// let ids = vec![NodeId(1), NodeId(2)];
273    /// let all_props = storage.get_all_batch(&ids);
274    /// // all_props[i] is a HashMap of all properties for ids[i]
275    /// ```
276    #[must_use]
277    pub fn get_all_batch(&self, ids: &[Id]) -> Vec<FxHashMap<PropertyKey, Value>> {
278        let columns = self.columns.read();
279        let column_count = columns.len();
280
281        // Pre-allocate result vector with exact capacity (NebulaGraph pattern)
282        let mut results = Vec::with_capacity(ids.len());
283
284        for &id in ids {
285            // Pre-allocate HashMap with expected column count
286            let mut result = FxHashMap::with_capacity_and_hasher(column_count, Default::default());
287            for (key, col) in columns.iter() {
288                if let Some(value) = col.get(id) {
289                    result.insert(key.clone(), value);
290                }
291            }
292            results.push(result);
293        }
294
295        results
296    }
297
298    /// Gets selected properties for multiple entities efficiently (projection pushdown).
299    ///
300    /// This is more efficient than [`Self::get_all_batch`] when you only need a subset
301    /// of properties - it only iterates the requested columns instead of all columns.
302    ///
303    /// **Performance**: O(N × K) where N = ids.len() and K = keys.len(),
304    /// compared to O(N × C) for `get_all_batch` where C = total column count.
305    ///
306    /// # Example
307    ///
308    /// ```
309    /// use grafeo_core::graph::lpg::PropertyStorage;
310    /// use grafeo_common::types::{PropertyKey, Value};
311    /// use grafeo_common::NodeId;
312    ///
313    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
314    /// let ids = vec![NodeId::new(1), NodeId::new(2)];
315    /// let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
316    ///
317    /// // Only fetches "name" and "age" columns, ignoring other properties
318    /// let props = storage.get_selective_batch(&ids, &keys);
319    /// ```
320    #[must_use]
321    pub fn get_selective_batch(
322        &self,
323        ids: &[Id],
324        keys: &[PropertyKey],
325    ) -> Vec<FxHashMap<PropertyKey, Value>> {
326        if keys.is_empty() {
327            // No properties requested - return empty maps
328            return vec![FxHashMap::default(); ids.len()];
329        }
330
331        let columns = self.columns.read();
332
333        // Pre-collect only the columns we need (avoids re-lookup per id)
334        let requested_columns: Vec<_> = keys
335            .iter()
336            .filter_map(|key| columns.get(key).map(|col| (key, col)))
337            .collect();
338
339        // Pre-allocate result with exact capacity
340        let mut results = Vec::with_capacity(ids.len());
341
342        for &id in ids {
343            let mut result =
344                FxHashMap::with_capacity_and_hasher(requested_columns.len(), Default::default());
345            // Only iterate requested columns, not all columns
346            for (key, col) in &requested_columns {
347                if let Some(value) = col.get(id) {
348                    result.insert((*key).clone(), value);
349                }
350            }
351            results.push(result);
352        }
353
354        results
355    }
356
357    /// Returns the number of property columns.
358    #[must_use]
359    pub fn column_count(&self) -> usize {
360        self.columns.read().len()
361    }
362
363    /// Returns the keys of all columns.
364    #[must_use]
365    pub fn keys(&self) -> Vec<PropertyKey> {
366        self.columns.read().keys().cloned().collect()
367    }
368
369    /// Gets a column by key for bulk access.
370    #[must_use]
371    pub fn column(&self, key: &PropertyKey) -> Option<PropertyColumnRef<'_, Id>> {
372        let columns = self.columns.read();
373        if columns.contains_key(key) {
374            Some(PropertyColumnRef {
375                _guard: columns,
376                key: key.clone(),
377                _marker: PhantomData,
378            })
379        } else {
380            None
381        }
382    }
383
384    /// Checks if a predicate might match any values (using zone maps).
385    ///
386    /// Returns `false` only when we're *certain* no values match - for example,
387    /// if you're looking for age > 100 but the max age is 80. Returns `true`
388    /// if the property doesn't exist (conservative - might match).
389    #[must_use]
390    pub fn might_match(&self, key: &PropertyKey, op: CompareOp, value: &Value) -> bool {
391        let columns = self.columns.read();
392        columns
393            .get(key)
394            .map(|col| col.might_match(op, value))
395            .unwrap_or(true) // No column = assume might match (conservative)
396    }
397
398    /// Gets the zone map for a property column.
399    #[must_use]
400    pub fn zone_map(&self, key: &PropertyKey) -> Option<ZoneMapEntry> {
401        let columns = self.columns.read();
402        columns.get(key).map(|col| col.zone_map().clone())
403    }
404
405    /// Checks if a range predicate might match any values (using zone maps).
406    ///
407    /// Returns `false` only when we're *certain* no values match the range.
408    /// Returns `true` if the property doesn't exist (conservative - might match).
409    #[must_use]
410    pub fn might_match_range(
411        &self,
412        key: &PropertyKey,
413        min: Option<&Value>,
414        max: Option<&Value>,
415        min_inclusive: bool,
416        max_inclusive: bool,
417    ) -> bool {
418        let columns = self.columns.read();
419        columns
420            .get(key)
421            .map(|col| {
422                col.zone_map()
423                    .might_contain_range(min, max, min_inclusive, max_inclusive)
424            })
425            .unwrap_or(true) // No column = assume might match (conservative)
426    }
427
428    /// Rebuilds zone maps for all columns (call after bulk removes).
429    pub fn rebuild_zone_maps(&self) {
430        let mut columns = self.columns.write();
431        for col in columns.values_mut() {
432            col.rebuild_zone_map();
433        }
434    }
435}
436
437impl<Id: EntityId> Default for PropertyStorage<Id> {
438    fn default() -> Self {
439        Self::new()
440    }
441}
442
443/// Compressed storage for a property column.
444///
445/// Holds the compressed representation of values along with the index
446/// mapping entity IDs to positions in the compressed array.
447#[derive(Debug)]
448pub enum CompressedColumnData {
449    /// Compressed integers (Int64 values).
450    Integers {
451        /// Compressed data.
452        data: CompressedData,
453        /// Index: entity ID position -> compressed array index.
454        id_to_index: Vec<u64>,
455        /// Reverse index: compressed array index -> entity ID position.
456        index_to_id: Vec<u64>,
457    },
458    /// Dictionary-encoded strings.
459    Strings {
460        /// Dictionary encoding.
461        encoding: DictionaryEncoding,
462        /// Index: entity ID position -> dictionary index.
463        id_to_index: Vec<u64>,
464        /// Reverse index: dictionary index -> entity ID position.
465        index_to_id: Vec<u64>,
466    },
467    /// Compressed booleans.
468    Booleans {
469        /// Compressed data.
470        data: CompressedData,
471        /// Index: entity ID position -> bit index.
472        id_to_index: Vec<u64>,
473        /// Reverse index: bit index -> entity ID position.
474        index_to_id: Vec<u64>,
475    },
476}
477
478impl CompressedColumnData {
479    /// Returns the memory usage of the compressed data in bytes.
480    #[must_use]
481    pub fn memory_usage(&self) -> usize {
482        match self {
483            CompressedColumnData::Integers {
484                data,
485                id_to_index,
486                index_to_id,
487            } => {
488                data.data.len()
489                    + id_to_index.len() * std::mem::size_of::<u64>()
490                    + index_to_id.len() * std::mem::size_of::<u64>()
491            }
492            CompressedColumnData::Strings {
493                encoding,
494                id_to_index,
495                index_to_id,
496            } => {
497                encoding.codes().len() * std::mem::size_of::<u32>()
498                    + encoding.dictionary().iter().map(|s| s.len()).sum::<usize>()
499                    + id_to_index.len() * std::mem::size_of::<u64>()
500                    + index_to_id.len() * std::mem::size_of::<u64>()
501            }
502            CompressedColumnData::Booleans {
503                data,
504                id_to_index,
505                index_to_id,
506            } => {
507                data.data.len()
508                    + id_to_index.len() * std::mem::size_of::<u64>()
509                    + index_to_id.len() * std::mem::size_of::<u64>()
510            }
511        }
512    }
513
514    /// Returns the compression ratio.
515    #[must_use]
516    #[allow(dead_code)]
517    pub fn compression_ratio(&self) -> f64 {
518        match self {
519            CompressedColumnData::Integers { data, .. } => data.compression_ratio(),
520            CompressedColumnData::Strings { encoding, .. } => encoding.compression_ratio(),
521            CompressedColumnData::Booleans { data, .. } => data.compression_ratio(),
522        }
523    }
524}
525
526/// Statistics about column compression.
527#[derive(Debug, Clone, Default)]
528pub struct CompressionStats {
529    /// Size of uncompressed data in bytes.
530    pub uncompressed_size: usize,
531    /// Size of compressed data in bytes.
532    pub compressed_size: usize,
533    /// Number of values in the column.
534    pub value_count: usize,
535    /// Codec used for compression.
536    pub codec: Option<CompressionCodec>,
537}
538
539impl CompressionStats {
540    /// Returns the compression ratio (uncompressed / compressed).
541    #[must_use]
542    pub fn compression_ratio(&self) -> f64 {
543        if self.compressed_size == 0 {
544            return 1.0;
545        }
546        self.uncompressed_size as f64 / self.compressed_size as f64
547    }
548}
549
550/// A single property column (e.g., all "age" values).
551///
552/// Maintains min/max/null_count for fast predicate evaluation. When you
553/// filter on `age > 50`, we first check if any age could possibly match
554/// before scanning the actual values.
555///
556/// Columns support optional compression for large datasets. When compression
557/// is enabled, the column automatically selects the best codec based on the
558/// data type and characteristics.
559pub struct PropertyColumn<Id: EntityId = NodeId> {
560    /// Sparse storage: entity ID -> value (hot buffer + uncompressed).
561    /// Used for recent writes and when compression is disabled.
562    values: FxHashMap<Id, Value>,
563    /// Zone map tracking min/max/null_count for predicate pushdown.
564    zone_map: ZoneMapEntry,
565    /// Whether zone map needs rebuild (after removes).
566    zone_map_dirty: bool,
567    /// Compression mode for this column.
568    compression_mode: CompressionMode,
569    /// Compressed data (when compression is enabled and triggered).
570    compressed: Option<CompressedColumnData>,
571    /// Number of values before last compression.
572    compressed_count: usize,
573}
574
575impl<Id: EntityId> PropertyColumn<Id> {
576    /// Creates a new empty column.
577    #[must_use]
578    pub fn new() -> Self {
579        Self {
580            values: FxHashMap::default(),
581            zone_map: ZoneMapEntry::new(),
582            zone_map_dirty: false,
583            compression_mode: CompressionMode::None,
584            compressed: None,
585            compressed_count: 0,
586        }
587    }
588
589    /// Creates a new column with the specified compression mode.
590    #[must_use]
591    pub fn with_compression(mode: CompressionMode) -> Self {
592        Self {
593            values: FxHashMap::default(),
594            zone_map: ZoneMapEntry::new(),
595            zone_map_dirty: false,
596            compression_mode: mode,
597            compressed: None,
598            compressed_count: 0,
599        }
600    }
601
602    /// Sets the compression mode for this column.
603    pub fn set_compression_mode(&mut self, mode: CompressionMode) {
604        self.compression_mode = mode;
605        if mode == CompressionMode::None {
606            // Decompress if switching to no compression
607            if self.compressed.is_some() {
608                self.decompress_all();
609            }
610        }
611    }
612
613    /// Returns the compression mode for this column.
614    #[must_use]
615    pub fn compression_mode(&self) -> CompressionMode {
616        self.compression_mode
617    }
618
619    /// Sets a value for an entity.
620    pub fn set(&mut self, id: Id, value: Value) {
621        // Update zone map incrementally
622        self.update_zone_map_on_insert(&value);
623        self.values.insert(id, value);
624
625        // Check if we should compress (in Auto mode)
626        if self.compression_mode == CompressionMode::Auto {
627            let total_count = self.values.len() + self.compressed_count;
628            let hot_buffer_count = self.values.len();
629
630            // Compress when hot buffer exceeds threshold and total is large enough
631            if hot_buffer_count >= HOT_BUFFER_SIZE && total_count >= COMPRESSION_THRESHOLD {
632                self.compress();
633            }
634        }
635    }
636
637    /// Updates zone map when inserting a value.
638    fn update_zone_map_on_insert(&mut self, value: &Value) {
639        self.zone_map.row_count += 1;
640
641        if matches!(value, Value::Null) {
642            self.zone_map.null_count += 1;
643            return;
644        }
645
646        // Update min
647        match &self.zone_map.min {
648            None => self.zone_map.min = Some(value.clone()),
649            Some(current) => {
650                if compare_values(value, current) == Some(Ordering::Less) {
651                    self.zone_map.min = Some(value.clone());
652                }
653            }
654        }
655
656        // Update max
657        match &self.zone_map.max {
658            None => self.zone_map.max = Some(value.clone()),
659            Some(current) => {
660                if compare_values(value, current) == Some(Ordering::Greater) {
661                    self.zone_map.max = Some(value.clone());
662                }
663            }
664        }
665    }
666
667    /// Gets a value for an entity.
668    ///
669    /// First checks the hot buffer (uncompressed values), then falls back
670    /// to the compressed data if present.
671    #[must_use]
672    pub fn get(&self, id: Id) -> Option<Value> {
673        // First check hot buffer
674        if let Some(value) = self.values.get(&id) {
675            return Some(value.clone());
676        }
677
678        // For now, compressed data lookup is not implemented for sparse access
679        // because the compressed format stores values by index, not by entity ID.
680        // This would require maintaining an ID -> index map in CompressedColumnData.
681        // The compressed data is primarily useful for bulk/scan operations.
682        None
683    }
684
685    /// Removes a value for an entity.
686    pub fn remove(&mut self, id: Id) -> Option<Value> {
687        let removed = self.values.remove(&id);
688        if removed.is_some() {
689            // Mark zone map as dirty - would need full rebuild for accurate min/max
690            self.zone_map_dirty = true;
691        }
692        removed
693    }
694
695    /// Returns the number of values in this column (hot + compressed).
696    #[must_use]
697    #[allow(dead_code)]
698    pub fn len(&self) -> usize {
699        self.values.len() + self.compressed_count
700    }
701
702    /// Returns true if this column is empty.
703    #[must_use]
704    #[allow(dead_code)]
705    pub fn is_empty(&self) -> bool {
706        self.values.is_empty() && self.compressed_count == 0
707    }
708
709    /// Iterates over all (id, value) pairs in the hot buffer.
710    ///
711    /// Note: This only iterates over uncompressed values. For full iteration
712    /// including compressed values, use [`iter_all`].
713    #[allow(dead_code)]
714    pub fn iter(&self) -> impl Iterator<Item = (Id, &Value)> {
715        self.values.iter().map(|(&id, v)| (id, v))
716    }
717
718    /// Returns compression statistics for this column.
719    #[must_use]
720    pub fn compression_stats(&self) -> CompressionStats {
721        let hot_size = self.values.len() * std::mem::size_of::<Value>();
722        let compressed_size = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
723        let codec = match &self.compressed {
724            Some(CompressedColumnData::Integers { data, .. }) => Some(data.codec),
725            Some(CompressedColumnData::Strings { .. }) => Some(CompressionCodec::Dictionary),
726            Some(CompressedColumnData::Booleans { data, .. }) => Some(data.codec),
727            None => None,
728        };
729
730        CompressionStats {
731            uncompressed_size: hot_size + self.compressed_count * std::mem::size_of::<Value>(),
732            compressed_size: hot_size + compressed_size,
733            value_count: self.len(),
734            codec,
735        }
736    }
737
738    /// Returns whether the column has compressed data.
739    #[must_use]
740    #[allow(dead_code)]
741    pub fn is_compressed(&self) -> bool {
742        self.compressed.is_some()
743    }
744
745    /// Compresses the hot buffer values.
746    ///
747    /// This merges the hot buffer into the compressed data, selecting the
748    /// best codec based on the value types.
749    ///
750    /// Note: If compressed data already exists, this is a no-op to avoid
751    /// losing previously compressed values. Use `force_compress()` after
752    /// decompressing to re-compress with all values.
753    pub fn compress(&mut self) {
754        if self.values.is_empty() {
755            return;
756        }
757
758        // Don't re-compress if we already have compressed data
759        // (would need to decompress and merge first)
760        if self.compressed.is_some() {
761            return;
762        }
763
764        // Determine the dominant type
765        let (int_count, str_count, bool_count) = self.count_types();
766        let total = self.values.len();
767
768        if int_count > total / 2 {
769            self.compress_as_integers();
770        } else if str_count > total / 2 {
771            self.compress_as_strings();
772        } else if bool_count > total / 2 {
773            self.compress_as_booleans();
774        }
775        // If no dominant type, don't compress (mixed types don't compress well)
776    }
777
778    /// Counts values by type.
779    fn count_types(&self) -> (usize, usize, usize) {
780        let mut int_count = 0;
781        let mut str_count = 0;
782        let mut bool_count = 0;
783
784        for value in self.values.values() {
785            match value {
786                Value::Int64(_) => int_count += 1,
787                Value::String(_) => str_count += 1,
788                Value::Bool(_) => bool_count += 1,
789                _ => {}
790            }
791        }
792
793        (int_count, str_count, bool_count)
794    }
795
796    /// Compresses integer values.
797    #[allow(unsafe_code)]
798    fn compress_as_integers(&mut self) {
799        // Extract integer values and their IDs
800        let mut values: Vec<(u64, i64)> = Vec::new();
801        let mut non_int_values: FxHashMap<Id, Value> = FxHashMap::default();
802
803        for (&id, value) in &self.values {
804            match value {
805                Value::Int64(v) => {
806                    // Convert Id to u64 for indexing (assumes Id can be converted)
807                    let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
808                    values.push((id_u64, *v));
809                }
810                _ => {
811                    non_int_values.insert(id, value.clone());
812                }
813            }
814        }
815
816        if values.len() < 8 {
817            // Not worth compressing
818            return;
819        }
820
821        // Sort by ID for better compression
822        values.sort_by_key(|(id, _)| *id);
823
824        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
825        let index_to_id: Vec<u64> = id_to_index.clone();
826        let int_values: Vec<i64> = values.iter().map(|(_, v)| *v).collect();
827
828        // Compress using the optimal codec
829        let compressed = TypeSpecificCompressor::compress_signed_integers(&int_values);
830
831        // Only use compression if it actually saves space
832        if compressed.compression_ratio() > 1.2 {
833            self.compressed = Some(CompressedColumnData::Integers {
834                data: compressed,
835                id_to_index,
836                index_to_id,
837            });
838            self.compressed_count = values.len();
839            self.values = non_int_values;
840        }
841    }
842
843    /// Compresses string values using dictionary encoding.
844    #[allow(unsafe_code)]
845    fn compress_as_strings(&mut self) {
846        let mut values: Vec<(u64, ArcStr)> = Vec::new();
847        let mut non_str_values: FxHashMap<Id, Value> = FxHashMap::default();
848
849        for (&id, value) in &self.values {
850            match value {
851                Value::String(s) => {
852                    let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
853                    values.push((id_u64, s.clone()));
854                }
855                _ => {
856                    non_str_values.insert(id, value.clone());
857                }
858            }
859        }
860
861        if values.len() < 8 {
862            return;
863        }
864
865        // Sort by ID
866        values.sort_by_key(|(id, _)| *id);
867
868        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
869        let index_to_id: Vec<u64> = id_to_index.clone();
870
871        // Build dictionary
872        let mut builder = DictionaryBuilder::new();
873        for (_, s) in &values {
874            builder.add(s.as_ref());
875        }
876        let encoding = builder.build();
877
878        // Only use compression if it actually saves space
879        if encoding.compression_ratio() > 1.2 {
880            self.compressed = Some(CompressedColumnData::Strings {
881                encoding,
882                id_to_index,
883                index_to_id,
884            });
885            self.compressed_count = values.len();
886            self.values = non_str_values;
887        }
888    }
889
890    /// Compresses boolean values.
891    #[allow(unsafe_code)]
892    fn compress_as_booleans(&mut self) {
893        let mut values: Vec<(u64, bool)> = Vec::new();
894        let mut non_bool_values: FxHashMap<Id, Value> = FxHashMap::default();
895
896        for (&id, value) in &self.values {
897            match value {
898                Value::Bool(b) => {
899                    let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
900                    values.push((id_u64, *b));
901                }
902                _ => {
903                    non_bool_values.insert(id, value.clone());
904                }
905            }
906        }
907
908        if values.len() < 8 {
909            return;
910        }
911
912        // Sort by ID
913        values.sort_by_key(|(id, _)| *id);
914
915        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
916        let index_to_id: Vec<u64> = id_to_index.clone();
917        let bool_values: Vec<bool> = values.iter().map(|(_, v)| *v).collect();
918
919        let compressed = TypeSpecificCompressor::compress_booleans(&bool_values);
920
921        // Booleans always compress well (8x)
922        self.compressed = Some(CompressedColumnData::Booleans {
923            data: compressed,
924            id_to_index,
925            index_to_id,
926        });
927        self.compressed_count = values.len();
928        self.values = non_bool_values;
929    }
930
931    /// Decompresses all values back to the hot buffer.
932    #[allow(unsafe_code)]
933    fn decompress_all(&mut self) {
934        let Some(compressed) = self.compressed.take() else {
935            return;
936        };
937
938        match compressed {
939            CompressedColumnData::Integers {
940                data, index_to_id, ..
941            } => {
942                if let Ok(values) = TypeSpecificCompressor::decompress_integers(&data) {
943                    // Convert back to signed using zigzag decoding
944                    let signed: Vec<i64> = values
945                        .iter()
946                        .map(|&v| crate::storage::zigzag_decode(v))
947                        .collect();
948
949                    for (i, id_u64) in index_to_id.iter().enumerate() {
950                        if let Some(&value) = signed.get(i) {
951                            let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
952                            self.values.insert(id, Value::Int64(value));
953                        }
954                    }
955                }
956            }
957            CompressedColumnData::Strings {
958                encoding,
959                index_to_id,
960                ..
961            } => {
962                for (i, id_u64) in index_to_id.iter().enumerate() {
963                    if let Some(s) = encoding.get(i) {
964                        let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
965                        self.values.insert(id, Value::String(ArcStr::from(s)));
966                    }
967                }
968            }
969            CompressedColumnData::Booleans {
970                data, index_to_id, ..
971            } => {
972                if let Ok(values) = TypeSpecificCompressor::decompress_booleans(&data) {
973                    for (i, id_u64) in index_to_id.iter().enumerate() {
974                        if let Some(&value) = values.get(i) {
975                            let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
976                            self.values.insert(id, Value::Bool(value));
977                        }
978                    }
979                }
980            }
981        }
982
983        self.compressed_count = 0;
984    }
985
986    /// Forces compression regardless of thresholds.
987    ///
988    /// Useful for bulk loading or when you know the column is complete.
989    pub fn force_compress(&mut self) {
990        self.compress();
991    }
992
993    /// Returns the zone map for this column.
994    #[must_use]
995    pub fn zone_map(&self) -> &ZoneMapEntry {
996        &self.zone_map
997    }
998
999    /// Uses zone map to check if any values could satisfy the predicate.
1000    ///
1001    /// Returns `false` when we can prove no values match (so the column
1002    /// can be skipped entirely). Returns `true` if values might match.
1003    #[must_use]
1004    pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
1005        if self.zone_map_dirty {
1006            // Conservative: can't skip if zone map is stale
1007            return true;
1008        }
1009
1010        match op {
1011            CompareOp::Eq => self.zone_map.might_contain_equal(value),
1012            CompareOp::Ne => {
1013                // Can only skip if all values are equal to the value
1014                // (which means min == max == value)
1015                match (&self.zone_map.min, &self.zone_map.max) {
1016                    (Some(min), Some(max)) => {
1017                        !(compare_values(min, value) == Some(Ordering::Equal)
1018                            && compare_values(max, value) == Some(Ordering::Equal))
1019                    }
1020                    _ => true,
1021                }
1022            }
1023            CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
1024            CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
1025            CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
1026            CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
1027        }
1028    }
1029
1030    /// Rebuilds zone map from current values.
1031    pub fn rebuild_zone_map(&mut self) {
1032        let mut zone_map = ZoneMapEntry::new();
1033
1034        for value in self.values.values() {
1035            zone_map.row_count += 1;
1036
1037            if matches!(value, Value::Null) {
1038                zone_map.null_count += 1;
1039                continue;
1040            }
1041
1042            // Update min
1043            match &zone_map.min {
1044                None => zone_map.min = Some(value.clone()),
1045                Some(current) => {
1046                    if compare_values(value, current) == Some(Ordering::Less) {
1047                        zone_map.min = Some(value.clone());
1048                    }
1049                }
1050            }
1051
1052            // Update max
1053            match &zone_map.max {
1054                None => zone_map.max = Some(value.clone()),
1055                Some(current) => {
1056                    if compare_values(value, current) == Some(Ordering::Greater) {
1057                        zone_map.max = Some(value.clone());
1058                    }
1059                }
1060            }
1061        }
1062
1063        self.zone_map = zone_map;
1064        self.zone_map_dirty = false;
1065    }
1066}
1067
1068/// Compares two values for ordering.
1069fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
1070    match (a, b) {
1071        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
1072        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
1073        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
1074        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
1075        (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
1076        (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
1077        _ => None,
1078    }
1079}
1080
1081impl<Id: EntityId> Default for PropertyColumn<Id> {
1082    fn default() -> Self {
1083        Self::new()
1084    }
1085}
1086
1087/// A borrowed reference to a property column for bulk reads.
1088///
1089/// Holds the read lock so the column can't change while you're iterating.
1090pub struct PropertyColumnRef<'a, Id: EntityId = NodeId> {
1091    _guard: parking_lot::RwLockReadGuard<'a, FxHashMap<PropertyKey, PropertyColumn<Id>>>,
1092    #[allow(dead_code)]
1093    key: PropertyKey,
1094    _marker: PhantomData<Id>,
1095}
1096
1097#[cfg(test)]
1098mod tests {
1099    use super::*;
1100    use arcstr::ArcStr;
1101
1102    #[test]
1103    fn test_property_storage_basic() {
1104        let storage = PropertyStorage::new();
1105
1106        let node1 = NodeId::new(1);
1107        let node2 = NodeId::new(2);
1108        let name_key = PropertyKey::new("name");
1109        let age_key = PropertyKey::new("age");
1110
1111        storage.set(node1, name_key.clone(), "Alice".into());
1112        storage.set(node1, age_key.clone(), 30i64.into());
1113        storage.set(node2, name_key.clone(), "Bob".into());
1114
1115        assert_eq!(
1116            storage.get(node1, &name_key),
1117            Some(Value::String("Alice".into()))
1118        );
1119        assert_eq!(storage.get(node1, &age_key), Some(Value::Int64(30)));
1120        assert_eq!(
1121            storage.get(node2, &name_key),
1122            Some(Value::String("Bob".into()))
1123        );
1124        assert!(storage.get(node2, &age_key).is_none());
1125    }
1126
1127    #[test]
1128    fn test_property_storage_remove() {
1129        let storage = PropertyStorage::new();
1130
1131        let node = NodeId::new(1);
1132        let key = PropertyKey::new("name");
1133
1134        storage.set(node, key.clone(), "Alice".into());
1135        assert!(storage.get(node, &key).is_some());
1136
1137        let removed = storage.remove(node, &key);
1138        assert!(removed.is_some());
1139        assert!(storage.get(node, &key).is_none());
1140    }
1141
1142    #[test]
1143    fn test_property_storage_get_all() {
1144        let storage = PropertyStorage::new();
1145
1146        let node = NodeId::new(1);
1147        storage.set(node, PropertyKey::new("name"), "Alice".into());
1148        storage.set(node, PropertyKey::new("age"), 30i64.into());
1149        storage.set(node, PropertyKey::new("active"), true.into());
1150
1151        let props = storage.get_all(node);
1152        assert_eq!(props.len(), 3);
1153    }
1154
1155    #[test]
1156    fn test_property_storage_remove_all() {
1157        let storage = PropertyStorage::new();
1158
1159        let node = NodeId::new(1);
1160        storage.set(node, PropertyKey::new("name"), "Alice".into());
1161        storage.set(node, PropertyKey::new("age"), 30i64.into());
1162
1163        storage.remove_all(node);
1164
1165        assert!(storage.get(node, &PropertyKey::new("name")).is_none());
1166        assert!(storage.get(node, &PropertyKey::new("age")).is_none());
1167    }
1168
1169    #[test]
1170    fn test_property_column() {
1171        let mut col = PropertyColumn::new();
1172
1173        col.set(NodeId::new(1), "Alice".into());
1174        col.set(NodeId::new(2), "Bob".into());
1175
1176        assert_eq!(col.len(), 2);
1177        assert!(!col.is_empty());
1178
1179        assert_eq!(col.get(NodeId::new(1)), Some(Value::String("Alice".into())));
1180
1181        col.remove(NodeId::new(1));
1182        assert!(col.get(NodeId::new(1)).is_none());
1183        assert_eq!(col.len(), 1);
1184    }
1185
1186    #[test]
1187    fn test_compression_mode() {
1188        let col: PropertyColumn<NodeId> = PropertyColumn::new();
1189        assert_eq!(col.compression_mode(), CompressionMode::None);
1190
1191        let col: PropertyColumn<NodeId> = PropertyColumn::with_compression(CompressionMode::Auto);
1192        assert_eq!(col.compression_mode(), CompressionMode::Auto);
1193    }
1194
1195    #[test]
1196    fn test_property_storage_with_compression() {
1197        let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1198
1199        for i in 0..100 {
1200            storage.set(
1201                NodeId::new(i),
1202                PropertyKey::new("age"),
1203                Value::Int64(20 + (i as i64 % 50)),
1204            );
1205        }
1206
1207        // Values should still be readable
1208        assert_eq!(
1209            storage.get(NodeId::new(0), &PropertyKey::new("age")),
1210            Some(Value::Int64(20))
1211        );
1212        assert_eq!(
1213            storage.get(NodeId::new(50), &PropertyKey::new("age")),
1214            Some(Value::Int64(20))
1215        );
1216    }
1217
1218    #[test]
1219    fn test_compress_integer_column() {
1220        let mut col: PropertyColumn<NodeId> =
1221            PropertyColumn::with_compression(CompressionMode::Auto);
1222
1223        // Add many sequential integers
1224        for i in 0..2000 {
1225            col.set(NodeId::new(i), Value::Int64(1000 + i as i64));
1226        }
1227
1228        // Should have triggered compression at some point
1229        // Total count should include both compressed and hot buffer values
1230        let stats = col.compression_stats();
1231        assert_eq!(stats.value_count, 2000);
1232
1233        // Values from the hot buffer should be readable
1234        // Note: Compressed values are not accessible via get() - see design note
1235        let last_value = col.get(NodeId::new(1999));
1236        assert!(last_value.is_some() || col.is_compressed());
1237    }
1238
1239    #[test]
1240    fn test_compress_string_column() {
1241        let mut col: PropertyColumn<NodeId> =
1242            PropertyColumn::with_compression(CompressionMode::Auto);
1243
1244        // Add repeated strings (good for dictionary compression)
1245        let categories = ["Person", "Company", "Product", "Location"];
1246        for i in 0..2000 {
1247            let cat = categories[i % 4];
1248            col.set(NodeId::new(i as u64), Value::String(ArcStr::from(cat)));
1249        }
1250
1251        // Total count should be correct
1252        assert_eq!(col.len(), 2000);
1253
1254        // Late values should be in hot buffer and readable
1255        let last_value = col.get(NodeId::new(1999));
1256        assert!(last_value.is_some() || col.is_compressed());
1257    }
1258
1259    #[test]
1260    fn test_compress_boolean_column() {
1261        let mut col: PropertyColumn<NodeId> =
1262            PropertyColumn::with_compression(CompressionMode::Auto);
1263
1264        // Add booleans
1265        for i in 0..2000 {
1266            col.set(NodeId::new(i as u64), Value::Bool(i % 2 == 0));
1267        }
1268
1269        // Verify total count
1270        assert_eq!(col.len(), 2000);
1271
1272        // Late values should be readable
1273        let last_value = col.get(NodeId::new(1999));
1274        assert!(last_value.is_some() || col.is_compressed());
1275    }
1276
1277    #[test]
1278    fn test_force_compress() {
1279        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1280
1281        // Add fewer values than the threshold
1282        for i in 0..100 {
1283            col.set(NodeId::new(i), Value::Int64(i as i64));
1284        }
1285
1286        // Force compression
1287        col.force_compress();
1288
1289        // Stats should show compression was applied if beneficial
1290        let stats = col.compression_stats();
1291        assert_eq!(stats.value_count, 100);
1292    }
1293
1294    #[test]
1295    fn test_compression_stats() {
1296        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1297
1298        for i in 0..50 {
1299            col.set(NodeId::new(i), Value::Int64(i as i64));
1300        }
1301
1302        let stats = col.compression_stats();
1303        assert_eq!(stats.value_count, 50);
1304        assert!(stats.uncompressed_size > 0);
1305    }
1306
1307    #[test]
1308    fn test_storage_compression_stats() {
1309        let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1310
1311        for i in 0..100 {
1312            storage.set(
1313                NodeId::new(i),
1314                PropertyKey::new("age"),
1315                Value::Int64(i as i64),
1316            );
1317            storage.set(
1318                NodeId::new(i),
1319                PropertyKey::new("name"),
1320                Value::String(ArcStr::from("Alice")),
1321            );
1322        }
1323
1324        let stats = storage.compression_stats();
1325        assert_eq!(stats.len(), 2); // Two columns
1326        assert!(stats.contains_key(&PropertyKey::new("age")));
1327        assert!(stats.contains_key(&PropertyKey::new("name")));
1328    }
1329
1330    #[test]
1331    fn test_memory_usage() {
1332        let storage = PropertyStorage::new();
1333
1334        for i in 0..100 {
1335            storage.set(
1336                NodeId::new(i),
1337                PropertyKey::new("value"),
1338                Value::Int64(i as i64),
1339            );
1340        }
1341
1342        let usage = storage.memory_usage();
1343        assert!(usage > 0);
1344    }
1345
1346    #[test]
1347    fn test_get_batch_single_property() {
1348        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1349
1350        let node1 = NodeId::new(1);
1351        let node2 = NodeId::new(2);
1352        let node3 = NodeId::new(3);
1353        let age_key = PropertyKey::new("age");
1354
1355        storage.set(node1, age_key.clone(), 25i64.into());
1356        storage.set(node2, age_key.clone(), 30i64.into());
1357        // node3 has no age property
1358
1359        let ids = vec![node1, node2, node3];
1360        let values = storage.get_batch(&ids, &age_key);
1361
1362        assert_eq!(values.len(), 3);
1363        assert_eq!(values[0], Some(Value::Int64(25)));
1364        assert_eq!(values[1], Some(Value::Int64(30)));
1365        assert_eq!(values[2], None);
1366    }
1367
1368    #[test]
1369    fn test_get_batch_missing_column() {
1370        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1371
1372        let node1 = NodeId::new(1);
1373        let node2 = NodeId::new(2);
1374        let missing_key = PropertyKey::new("nonexistent");
1375
1376        let ids = vec![node1, node2];
1377        let values = storage.get_batch(&ids, &missing_key);
1378
1379        assert_eq!(values.len(), 2);
1380        assert_eq!(values[0], None);
1381        assert_eq!(values[1], None);
1382    }
1383
1384    #[test]
1385    fn test_get_batch_empty_ids() {
1386        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1387        let key = PropertyKey::new("any");
1388
1389        let values = storage.get_batch(&[], &key);
1390        assert!(values.is_empty());
1391    }
1392
1393    #[test]
1394    fn test_get_all_batch() {
1395        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1396
1397        let node1 = NodeId::new(1);
1398        let node2 = NodeId::new(2);
1399        let node3 = NodeId::new(3);
1400
1401        storage.set(node1, PropertyKey::new("name"), "Alice".into());
1402        storage.set(node1, PropertyKey::new("age"), 25i64.into());
1403        storage.set(node2, PropertyKey::new("name"), "Bob".into());
1404        // node3 has no properties
1405
1406        let ids = vec![node1, node2, node3];
1407        let all_props = storage.get_all_batch(&ids);
1408
1409        assert_eq!(all_props.len(), 3);
1410        assert_eq!(all_props[0].len(), 2); // name and age
1411        assert_eq!(all_props[1].len(), 1); // name only
1412        assert_eq!(all_props[2].len(), 0); // no properties
1413
1414        assert_eq!(
1415            all_props[0].get(&PropertyKey::new("name")),
1416            Some(&Value::String("Alice".into()))
1417        );
1418        assert_eq!(
1419            all_props[1].get(&PropertyKey::new("name")),
1420            Some(&Value::String("Bob".into()))
1421        );
1422    }
1423
1424    #[test]
1425    fn test_get_all_batch_empty_ids() {
1426        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1427
1428        let all_props = storage.get_all_batch(&[]);
1429        assert!(all_props.is_empty());
1430    }
1431}