Skip to main content

grafeo_core/graph/lpg/
property.rs

1//! Columnar property storage for nodes and edges.
2//!
3//! Properties are stored column-wise (all "name" values together, all "age"
4//! values together) rather than row-wise. This makes filtering fast - to find
5//! all nodes where age > 30, we only scan the age column.
6//!
7//! Each column also maintains a zone map (min/max/null_count) enabling the
8//! query optimizer to skip columns entirely when a predicate can't match.
9//!
10//! ## Compression
11//!
12//! Columns can be compressed to save memory. When compression is enabled,
13//! the column automatically selects the best codec based on the data type:
14//!
15//! | Data type | Codec | Typical savings |
16//! |-----------|-------|-----------------|
17//! | Int64 (sorted) | DeltaBitPacked | 5-20x |
18//! | Int64 (small) | BitPacked | 2-16x |
19//! | Int64 (repeated) | RunLength | 2-100x |
20//! | String (low cardinality) | Dictionary | 2-50x |
21//! | Bool | BitVector | 8x |
22
23use crate::index::zone_map::ZoneMapEntry;
24use crate::storage::{
25    CompressedData, CompressionCodec, DictionaryBuilder, DictionaryEncoding, TypeSpecificCompressor,
26};
27use grafeo_common::types::{EdgeId, NodeId, PropertyKey, Value};
28use grafeo_common::utils::hash::FxHashMap;
29use parking_lot::RwLock;
30use std::cmp::Ordering;
31use std::hash::Hash;
32use std::marker::PhantomData;
33use std::sync::Arc;
34
35/// Compression mode for property columns.
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
37pub enum CompressionMode {
38    /// Never compress - always use sparse HashMap (default).
39    #[default]
40    None,
41    /// Automatically compress when beneficial (after threshold).
42    Auto,
43    /// Eagerly compress on every flush.
44    Eager,
45}
46
47/// Threshold for automatic compression (number of values).
48const COMPRESSION_THRESHOLD: usize = 1000;
49
50/// Size of the hot buffer for recent writes (before compression).
51/// Larger buffer (4096) keeps more recent data uncompressed for faster reads.
52/// This trades ~64KB of memory overhead per column for 1.5-2x faster point lookups
53/// on recently-written data.
54const HOT_BUFFER_SIZE: usize = 4096;
55
56/// Comparison operators used for zone map predicate checks.
57///
58/// These map directly to GQL comparison operators like `=`, `<`, `>=`.
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum CompareOp {
61    /// Equal to value.
62    Eq,
63    /// Not equal to value.
64    Ne,
65    /// Less than value.
66    Lt,
67    /// Less than or equal to value.
68    Le,
69    /// Greater than value.
70    Gt,
71    /// Greater than or equal to value.
72    Ge,
73}
74
75/// Marker trait for IDs that can key into property storage.
76///
77/// Implemented for [`NodeId`] and [`EdgeId`] - you can store properties on both.
78pub trait EntityId: Copy + Eq + Hash + 'static {}
79
80impl EntityId for NodeId {}
81impl EntityId for EdgeId {}
82
83/// Thread-safe columnar property storage.
84///
85/// Each property key ("name", "age", etc.) gets its own column. This layout
86/// is great for analytical queries that filter on specific properties -
87/// you only touch the columns you need.
88///
89/// Generic over `Id` so the same storage works for nodes and edges.
90///
91/// # Example
92///
93/// ```
94/// use grafeo_core::graph::lpg::PropertyStorage;
95/// use grafeo_common::types::{NodeId, PropertyKey};
96///
97/// let storage = PropertyStorage::new();
98/// let alice = NodeId::new(1);
99///
100/// storage.set(alice, PropertyKey::new("name"), "Alice".into());
101/// storage.set(alice, PropertyKey::new("age"), 30i64.into());
102///
103/// // Fetch all properties at once
104/// let props = storage.get_all(alice);
105/// assert_eq!(props.len(), 2);
106/// ```
107pub struct PropertyStorage<Id: EntityId = NodeId> {
108    /// Map from property key to column.
109    columns: RwLock<FxHashMap<PropertyKey, PropertyColumn<Id>>>,
110    /// Default compression mode for new columns.
111    default_compression: CompressionMode,
112    _marker: PhantomData<Id>,
113}
114
115impl<Id: EntityId> PropertyStorage<Id> {
116    /// Creates a new property storage.
117    #[must_use]
118    pub fn new() -> Self {
119        Self {
120            columns: RwLock::new(FxHashMap::default()),
121            default_compression: CompressionMode::None,
122            _marker: PhantomData,
123        }
124    }
125
126    /// Creates a new property storage with compression enabled.
127    #[must_use]
128    pub fn with_compression(mode: CompressionMode) -> Self {
129        Self {
130            columns: RwLock::new(FxHashMap::default()),
131            default_compression: mode,
132            _marker: PhantomData,
133        }
134    }
135
136    /// Sets the default compression mode for new columns.
137    pub fn set_default_compression(&mut self, mode: CompressionMode) {
138        self.default_compression = mode;
139    }
140
141    /// Sets a property value for an entity.
142    pub fn set(&self, id: Id, key: PropertyKey, value: Value) {
143        let mut columns = self.columns.write();
144        let mode = self.default_compression;
145        columns
146            .entry(key)
147            .or_insert_with(|| PropertyColumn::with_compression(mode))
148            .set(id, value);
149    }
150
151    /// Enables compression for a specific column.
152    pub fn enable_compression(&self, key: &PropertyKey, mode: CompressionMode) {
153        let mut columns = self.columns.write();
154        if let Some(col) = columns.get_mut(key) {
155            col.set_compression_mode(mode);
156        }
157    }
158
159    /// Compresses all columns that have compression enabled.
160    pub fn compress_all(&self) {
161        let mut columns = self.columns.write();
162        for col in columns.values_mut() {
163            if col.compression_mode() != CompressionMode::None {
164                col.compress();
165            }
166        }
167    }
168
169    /// Forces compression on all columns regardless of mode.
170    pub fn force_compress_all(&self) {
171        let mut columns = self.columns.write();
172        for col in columns.values_mut() {
173            col.force_compress();
174        }
175    }
176
177    /// Returns compression statistics for all columns.
178    #[must_use]
179    pub fn compression_stats(&self) -> FxHashMap<PropertyKey, CompressionStats> {
180        let columns = self.columns.read();
181        columns
182            .iter()
183            .map(|(key, col)| (key.clone(), col.compression_stats()))
184            .collect()
185    }
186
187    /// Returns the total memory usage of all columns.
188    #[must_use]
189    pub fn memory_usage(&self) -> usize {
190        let columns = self.columns.read();
191        columns
192            .values()
193            .map(|col| col.compression_stats().compressed_size)
194            .sum()
195    }
196
197    /// Gets a property value for an entity.
198    #[must_use]
199    pub fn get(&self, id: Id, key: &PropertyKey) -> Option<Value> {
200        let columns = self.columns.read();
201        columns.get(key).and_then(|col| col.get(id))
202    }
203
204    /// Removes a property value for an entity.
205    pub fn remove(&self, id: Id, key: &PropertyKey) -> Option<Value> {
206        let mut columns = self.columns.write();
207        columns.get_mut(key).and_then(|col| col.remove(id))
208    }
209
210    /// Removes all properties for an entity.
211    pub fn remove_all(&self, id: Id) {
212        let mut columns = self.columns.write();
213        for col in columns.values_mut() {
214            col.remove(id);
215        }
216    }
217
218    /// Gets all properties for an entity.
219    #[must_use]
220    pub fn get_all(&self, id: Id) -> FxHashMap<PropertyKey, Value> {
221        let columns = self.columns.read();
222        let mut result = FxHashMap::default();
223        for (key, col) in columns.iter() {
224            if let Some(value) = col.get(id) {
225                result.insert(key.clone(), value);
226            }
227        }
228        result
229    }
230
231    /// Returns the number of property columns.
232    #[must_use]
233    pub fn column_count(&self) -> usize {
234        self.columns.read().len()
235    }
236
237    /// Returns the keys of all columns.
238    #[must_use]
239    pub fn keys(&self) -> Vec<PropertyKey> {
240        self.columns.read().keys().cloned().collect()
241    }
242
243    /// Gets a column by key for bulk access.
244    #[must_use]
245    pub fn column(&self, key: &PropertyKey) -> Option<PropertyColumnRef<'_, Id>> {
246        let columns = self.columns.read();
247        if columns.contains_key(key) {
248            Some(PropertyColumnRef {
249                _guard: columns,
250                key: key.clone(),
251                _marker: PhantomData,
252            })
253        } else {
254            None
255        }
256    }
257
258    /// Checks if a predicate might match any values (using zone maps).
259    ///
260    /// Returns `false` only when we're *certain* no values match - for example,
261    /// if you're looking for age > 100 but the max age is 80. Returns `true`
262    /// if the property doesn't exist (conservative - might match).
263    #[must_use]
264    pub fn might_match(&self, key: &PropertyKey, op: CompareOp, value: &Value) -> bool {
265        let columns = self.columns.read();
266        columns
267            .get(key)
268            .map(|col| col.might_match(op, value))
269            .unwrap_or(true) // No column = assume might match (conservative)
270    }
271
272    /// Gets the zone map for a property column.
273    #[must_use]
274    pub fn zone_map(&self, key: &PropertyKey) -> Option<ZoneMapEntry> {
275        let columns = self.columns.read();
276        columns.get(key).map(|col| col.zone_map().clone())
277    }
278
279    /// Checks if a range predicate might match any values (using zone maps).
280    ///
281    /// Returns `false` only when we're *certain* no values match the range.
282    /// Returns `true` if the property doesn't exist (conservative - might match).
283    #[must_use]
284    pub fn might_match_range(
285        &self,
286        key: &PropertyKey,
287        min: Option<&Value>,
288        max: Option<&Value>,
289        min_inclusive: bool,
290        max_inclusive: bool,
291    ) -> bool {
292        let columns = self.columns.read();
293        columns
294            .get(key)
295            .map(|col| {
296                col.zone_map()
297                    .might_contain_range(min, max, min_inclusive, max_inclusive)
298            })
299            .unwrap_or(true) // No column = assume might match (conservative)
300    }
301
302    /// Rebuilds zone maps for all columns (call after bulk removes).
303    pub fn rebuild_zone_maps(&self) {
304        let mut columns = self.columns.write();
305        for col in columns.values_mut() {
306            col.rebuild_zone_map();
307        }
308    }
309}
310
311impl<Id: EntityId> Default for PropertyStorage<Id> {
312    fn default() -> Self {
313        Self::new()
314    }
315}
316
317/// Compressed storage for a property column.
318///
319/// Holds the compressed representation of values along with the index
320/// mapping entity IDs to positions in the compressed array.
321#[derive(Debug)]
322pub enum CompressedColumnData {
323    /// Compressed integers (Int64 values).
324    Integers {
325        /// Compressed data.
326        data: CompressedData,
327        /// Index: entity ID position -> compressed array index.
328        id_to_index: Vec<u64>,
329        /// Reverse index: compressed array index -> entity ID position.
330        index_to_id: Vec<u64>,
331    },
332    /// Dictionary-encoded strings.
333    Strings {
334        /// Dictionary encoding.
335        encoding: DictionaryEncoding,
336        /// Index: entity ID position -> dictionary index.
337        id_to_index: Vec<u64>,
338        /// Reverse index: dictionary index -> entity ID position.
339        index_to_id: Vec<u64>,
340    },
341    /// Compressed booleans.
342    Booleans {
343        /// Compressed data.
344        data: CompressedData,
345        /// Index: entity ID position -> bit index.
346        id_to_index: Vec<u64>,
347        /// Reverse index: bit index -> entity ID position.
348        index_to_id: Vec<u64>,
349    },
350}
351
352impl CompressedColumnData {
353    /// Returns the memory usage of the compressed data in bytes.
354    #[must_use]
355    pub fn memory_usage(&self) -> usize {
356        match self {
357            CompressedColumnData::Integers {
358                data,
359                id_to_index,
360                index_to_id,
361            } => {
362                data.data.len()
363                    + id_to_index.len() * std::mem::size_of::<u64>()
364                    + index_to_id.len() * std::mem::size_of::<u64>()
365            }
366            CompressedColumnData::Strings {
367                encoding,
368                id_to_index,
369                index_to_id,
370            } => {
371                encoding.codes().len() * std::mem::size_of::<u32>()
372                    + encoding.dictionary().iter().map(|s| s.len()).sum::<usize>()
373                    + id_to_index.len() * std::mem::size_of::<u64>()
374                    + index_to_id.len() * std::mem::size_of::<u64>()
375            }
376            CompressedColumnData::Booleans {
377                data,
378                id_to_index,
379                index_to_id,
380            } => {
381                data.data.len()
382                    + id_to_index.len() * std::mem::size_of::<u64>()
383                    + index_to_id.len() * std::mem::size_of::<u64>()
384            }
385        }
386    }
387
388    /// Returns the compression ratio.
389    #[must_use]
390    #[allow(dead_code)]
391    pub fn compression_ratio(&self) -> f64 {
392        match self {
393            CompressedColumnData::Integers { data, .. } => data.compression_ratio(),
394            CompressedColumnData::Strings { encoding, .. } => encoding.compression_ratio(),
395            CompressedColumnData::Booleans { data, .. } => data.compression_ratio(),
396        }
397    }
398}
399
400/// Statistics about column compression.
401#[derive(Debug, Clone, Default)]
402pub struct CompressionStats {
403    /// Size of uncompressed data in bytes.
404    pub uncompressed_size: usize,
405    /// Size of compressed data in bytes.
406    pub compressed_size: usize,
407    /// Number of values in the column.
408    pub value_count: usize,
409    /// Codec used for compression.
410    pub codec: Option<CompressionCodec>,
411}
412
413impl CompressionStats {
414    /// Returns the compression ratio (uncompressed / compressed).
415    #[must_use]
416    pub fn compression_ratio(&self) -> f64 {
417        if self.compressed_size == 0 {
418            return 1.0;
419        }
420        self.uncompressed_size as f64 / self.compressed_size as f64
421    }
422}
423
424/// A single property column (e.g., all "age" values).
425///
426/// Maintains min/max/null_count for fast predicate evaluation. When you
427/// filter on `age > 50`, we first check if any age could possibly match
428/// before scanning the actual values.
429///
430/// Columns support optional compression for large datasets. When compression
431/// is enabled, the column automatically selects the best codec based on the
432/// data type and characteristics.
433pub struct PropertyColumn<Id: EntityId = NodeId> {
434    /// Sparse storage: entity ID -> value (hot buffer + uncompressed).
435    /// Used for recent writes and when compression is disabled.
436    values: FxHashMap<Id, Value>,
437    /// Zone map tracking min/max/null_count for predicate pushdown.
438    zone_map: ZoneMapEntry,
439    /// Whether zone map needs rebuild (after removes).
440    zone_map_dirty: bool,
441    /// Compression mode for this column.
442    compression_mode: CompressionMode,
443    /// Compressed data (when compression is enabled and triggered).
444    compressed: Option<CompressedColumnData>,
445    /// Number of values before last compression.
446    compressed_count: usize,
447}
448
449impl<Id: EntityId> PropertyColumn<Id> {
450    /// Creates a new empty column.
451    #[must_use]
452    pub fn new() -> Self {
453        Self {
454            values: FxHashMap::default(),
455            zone_map: ZoneMapEntry::new(),
456            zone_map_dirty: false,
457            compression_mode: CompressionMode::None,
458            compressed: None,
459            compressed_count: 0,
460        }
461    }
462
463    /// Creates a new column with the specified compression mode.
464    #[must_use]
465    pub fn with_compression(mode: CompressionMode) -> Self {
466        Self {
467            values: FxHashMap::default(),
468            zone_map: ZoneMapEntry::new(),
469            zone_map_dirty: false,
470            compression_mode: mode,
471            compressed: None,
472            compressed_count: 0,
473        }
474    }
475
476    /// Sets the compression mode for this column.
477    pub fn set_compression_mode(&mut self, mode: CompressionMode) {
478        self.compression_mode = mode;
479        if mode == CompressionMode::None {
480            // Decompress if switching to no compression
481            if self.compressed.is_some() {
482                self.decompress_all();
483            }
484        }
485    }
486
487    /// Returns the compression mode for this column.
488    #[must_use]
489    pub fn compression_mode(&self) -> CompressionMode {
490        self.compression_mode
491    }
492
493    /// Sets a value for an entity.
494    pub fn set(&mut self, id: Id, value: Value) {
495        // Update zone map incrementally
496        self.update_zone_map_on_insert(&value);
497        self.values.insert(id, value);
498
499        // Check if we should compress (in Auto mode)
500        if self.compression_mode == CompressionMode::Auto {
501            let total_count = self.values.len() + self.compressed_count;
502            let hot_buffer_count = self.values.len();
503
504            // Compress when hot buffer exceeds threshold and total is large enough
505            if hot_buffer_count >= HOT_BUFFER_SIZE && total_count >= COMPRESSION_THRESHOLD {
506                self.compress();
507            }
508        }
509    }
510
511    /// Updates zone map when inserting a value.
512    fn update_zone_map_on_insert(&mut self, value: &Value) {
513        self.zone_map.row_count += 1;
514
515        if matches!(value, Value::Null) {
516            self.zone_map.null_count += 1;
517            return;
518        }
519
520        // Update min
521        match &self.zone_map.min {
522            None => self.zone_map.min = Some(value.clone()),
523            Some(current) => {
524                if compare_values(value, current) == Some(Ordering::Less) {
525                    self.zone_map.min = Some(value.clone());
526                }
527            }
528        }
529
530        // Update max
531        match &self.zone_map.max {
532            None => self.zone_map.max = Some(value.clone()),
533            Some(current) => {
534                if compare_values(value, current) == Some(Ordering::Greater) {
535                    self.zone_map.max = Some(value.clone());
536                }
537            }
538        }
539    }
540
541    /// Gets a value for an entity.
542    ///
543    /// First checks the hot buffer (uncompressed values), then falls back
544    /// to the compressed data if present.
545    #[must_use]
546    pub fn get(&self, id: Id) -> Option<Value> {
547        // First check hot buffer
548        if let Some(value) = self.values.get(&id) {
549            return Some(value.clone());
550        }
551
552        // For now, compressed data lookup is not implemented for sparse access
553        // because the compressed format stores values by index, not by entity ID.
554        // This would require maintaining an ID -> index map in CompressedColumnData.
555        // The compressed data is primarily useful for bulk/scan operations.
556        None
557    }
558
559    /// Removes a value for an entity.
560    pub fn remove(&mut self, id: Id) -> Option<Value> {
561        let removed = self.values.remove(&id);
562        if removed.is_some() {
563            // Mark zone map as dirty - would need full rebuild for accurate min/max
564            self.zone_map_dirty = true;
565        }
566        removed
567    }
568
569    /// Returns the number of values in this column (hot + compressed).
570    #[must_use]
571    #[allow(dead_code)]
572    pub fn len(&self) -> usize {
573        self.values.len() + self.compressed_count
574    }
575
576    /// Returns true if this column is empty.
577    #[must_use]
578    #[allow(dead_code)]
579    pub fn is_empty(&self) -> bool {
580        self.values.is_empty() && self.compressed_count == 0
581    }
582
583    /// Iterates over all (id, value) pairs in the hot buffer.
584    ///
585    /// Note: This only iterates over uncompressed values. For full iteration
586    /// including compressed values, use [`iter_all`].
587    #[allow(dead_code)]
588    pub fn iter(&self) -> impl Iterator<Item = (Id, &Value)> {
589        self.values.iter().map(|(&id, v)| (id, v))
590    }
591
592    /// Returns compression statistics for this column.
593    #[must_use]
594    pub fn compression_stats(&self) -> CompressionStats {
595        let hot_size = self.values.len() * std::mem::size_of::<Value>();
596        let compressed_size = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
597        let codec = match &self.compressed {
598            Some(CompressedColumnData::Integers { data, .. }) => Some(data.codec),
599            Some(CompressedColumnData::Strings { .. }) => Some(CompressionCodec::Dictionary),
600            Some(CompressedColumnData::Booleans { data, .. }) => Some(data.codec),
601            None => None,
602        };
603
604        CompressionStats {
605            uncompressed_size: hot_size + self.compressed_count * std::mem::size_of::<Value>(),
606            compressed_size: hot_size + compressed_size,
607            value_count: self.len(),
608            codec,
609        }
610    }
611
612    /// Returns whether the column has compressed data.
613    #[must_use]
614    #[allow(dead_code)]
615    pub fn is_compressed(&self) -> bool {
616        self.compressed.is_some()
617    }
618
619    /// Compresses the hot buffer values.
620    ///
621    /// This merges the hot buffer into the compressed data, selecting the
622    /// best codec based on the value types.
623    ///
624    /// Note: If compressed data already exists, this is a no-op to avoid
625    /// losing previously compressed values. Use `force_compress()` after
626    /// decompressing to re-compress with all values.
627    pub fn compress(&mut self) {
628        if self.values.is_empty() {
629            return;
630        }
631
632        // Don't re-compress if we already have compressed data
633        // (would need to decompress and merge first)
634        if self.compressed.is_some() {
635            return;
636        }
637
638        // Determine the dominant type
639        let (int_count, str_count, bool_count) = self.count_types();
640        let total = self.values.len();
641
642        if int_count > total / 2 {
643            self.compress_as_integers();
644        } else if str_count > total / 2 {
645            self.compress_as_strings();
646        } else if bool_count > total / 2 {
647            self.compress_as_booleans();
648        }
649        // If no dominant type, don't compress (mixed types don't compress well)
650    }
651
652    /// Counts values by type.
653    fn count_types(&self) -> (usize, usize, usize) {
654        let mut int_count = 0;
655        let mut str_count = 0;
656        let mut bool_count = 0;
657
658        for value in self.values.values() {
659            match value {
660                Value::Int64(_) => int_count += 1,
661                Value::String(_) => str_count += 1,
662                Value::Bool(_) => bool_count += 1,
663                _ => {}
664            }
665        }
666
667        (int_count, str_count, bool_count)
668    }
669
670    /// Compresses integer values.
671    #[allow(unsafe_code)]
672    fn compress_as_integers(&mut self) {
673        // Extract integer values and their IDs
674        let mut values: Vec<(u64, i64)> = Vec::new();
675        let mut non_int_values: FxHashMap<Id, Value> = FxHashMap::default();
676
677        for (&id, value) in &self.values {
678            match value {
679                Value::Int64(v) => {
680                    // Convert Id to u64 for indexing (assumes Id can be converted)
681                    let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
682                    values.push((id_u64, *v));
683                }
684                _ => {
685                    non_int_values.insert(id, value.clone());
686                }
687            }
688        }
689
690        if values.len() < 8 {
691            // Not worth compressing
692            return;
693        }
694
695        // Sort by ID for better compression
696        values.sort_by_key(|(id, _)| *id);
697
698        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
699        let index_to_id: Vec<u64> = id_to_index.clone();
700        let int_values: Vec<i64> = values.iter().map(|(_, v)| *v).collect();
701
702        // Compress using the optimal codec
703        let compressed = TypeSpecificCompressor::compress_signed_integers(&int_values);
704
705        // Only use compression if it actually saves space
706        if compressed.compression_ratio() > 1.2 {
707            self.compressed = Some(CompressedColumnData::Integers {
708                data: compressed,
709                id_to_index,
710                index_to_id,
711            });
712            self.compressed_count = values.len();
713            self.values = non_int_values;
714        }
715    }
716
717    /// Compresses string values using dictionary encoding.
718    #[allow(unsafe_code)]
719    fn compress_as_strings(&mut self) {
720        let mut values: Vec<(u64, Arc<str>)> = Vec::new();
721        let mut non_str_values: FxHashMap<Id, Value> = FxHashMap::default();
722
723        for (&id, value) in &self.values {
724            match value {
725                Value::String(s) => {
726                    let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
727                    values.push((id_u64, Arc::clone(s)));
728                }
729                _ => {
730                    non_str_values.insert(id, value.clone());
731                }
732            }
733        }
734
735        if values.len() < 8 {
736            return;
737        }
738
739        // Sort by ID
740        values.sort_by_key(|(id, _)| *id);
741
742        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
743        let index_to_id: Vec<u64> = id_to_index.clone();
744
745        // Build dictionary
746        let mut builder = DictionaryBuilder::new();
747        for (_, s) in &values {
748            builder.add(s.as_ref());
749        }
750        let encoding = builder.build();
751
752        // Only use compression if it actually saves space
753        if encoding.compression_ratio() > 1.2 {
754            self.compressed = Some(CompressedColumnData::Strings {
755                encoding,
756                id_to_index,
757                index_to_id,
758            });
759            self.compressed_count = values.len();
760            self.values = non_str_values;
761        }
762    }
763
764    /// Compresses boolean values.
765    #[allow(unsafe_code)]
766    fn compress_as_booleans(&mut self) {
767        let mut values: Vec<(u64, bool)> = Vec::new();
768        let mut non_bool_values: FxHashMap<Id, Value> = FxHashMap::default();
769
770        for (&id, value) in &self.values {
771            match value {
772                Value::Bool(b) => {
773                    let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
774                    values.push((id_u64, *b));
775                }
776                _ => {
777                    non_bool_values.insert(id, value.clone());
778                }
779            }
780        }
781
782        if values.len() < 8 {
783            return;
784        }
785
786        // Sort by ID
787        values.sort_by_key(|(id, _)| *id);
788
789        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
790        let index_to_id: Vec<u64> = id_to_index.clone();
791        let bool_values: Vec<bool> = values.iter().map(|(_, v)| *v).collect();
792
793        let compressed = TypeSpecificCompressor::compress_booleans(&bool_values);
794
795        // Booleans always compress well (8x)
796        self.compressed = Some(CompressedColumnData::Booleans {
797            data: compressed,
798            id_to_index,
799            index_to_id,
800        });
801        self.compressed_count = values.len();
802        self.values = non_bool_values;
803    }
804
805    /// Decompresses all values back to the hot buffer.
806    #[allow(unsafe_code)]
807    fn decompress_all(&mut self) {
808        let Some(compressed) = self.compressed.take() else {
809            return;
810        };
811
812        match compressed {
813            CompressedColumnData::Integers {
814                data, index_to_id, ..
815            } => {
816                if let Ok(values) = TypeSpecificCompressor::decompress_integers(&data) {
817                    // Convert back to signed using zigzag decoding
818                    let signed: Vec<i64> = values
819                        .iter()
820                        .map(|&v| crate::storage::zigzag_decode(v))
821                        .collect();
822
823                    for (i, id_u64) in index_to_id.iter().enumerate() {
824                        if let Some(&value) = signed.get(i) {
825                            let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
826                            self.values.insert(id, Value::Int64(value));
827                        }
828                    }
829                }
830            }
831            CompressedColumnData::Strings {
832                encoding,
833                index_to_id,
834                ..
835            } => {
836                for (i, id_u64) in index_to_id.iter().enumerate() {
837                    if let Some(s) = encoding.get(i) {
838                        let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
839                        self.values.insert(id, Value::String(Arc::from(s)));
840                    }
841                }
842            }
843            CompressedColumnData::Booleans {
844                data, index_to_id, ..
845            } => {
846                if let Ok(values) = TypeSpecificCompressor::decompress_booleans(&data) {
847                    for (i, id_u64) in index_to_id.iter().enumerate() {
848                        if let Some(&value) = values.get(i) {
849                            let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
850                            self.values.insert(id, Value::Bool(value));
851                        }
852                    }
853                }
854            }
855        }
856
857        self.compressed_count = 0;
858    }
859
860    /// Forces compression regardless of thresholds.
861    ///
862    /// Useful for bulk loading or when you know the column is complete.
863    pub fn force_compress(&mut self) {
864        self.compress();
865    }
866
867    /// Returns the zone map for this column.
868    #[must_use]
869    pub fn zone_map(&self) -> &ZoneMapEntry {
870        &self.zone_map
871    }
872
873    /// Uses zone map to check if any values could satisfy the predicate.
874    ///
875    /// Returns `false` when we can prove no values match (so the column
876    /// can be skipped entirely). Returns `true` if values might match.
877    #[must_use]
878    pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
879        if self.zone_map_dirty {
880            // Conservative: can't skip if zone map is stale
881            return true;
882        }
883
884        match op {
885            CompareOp::Eq => self.zone_map.might_contain_equal(value),
886            CompareOp::Ne => {
887                // Can only skip if all values are equal to the value
888                // (which means min == max == value)
889                match (&self.zone_map.min, &self.zone_map.max) {
890                    (Some(min), Some(max)) => {
891                        !(compare_values(min, value) == Some(Ordering::Equal)
892                            && compare_values(max, value) == Some(Ordering::Equal))
893                    }
894                    _ => true,
895                }
896            }
897            CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
898            CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
899            CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
900            CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
901        }
902    }
903
904    /// Rebuilds zone map from current values.
905    pub fn rebuild_zone_map(&mut self) {
906        let mut zone_map = ZoneMapEntry::new();
907
908        for value in self.values.values() {
909            zone_map.row_count += 1;
910
911            if matches!(value, Value::Null) {
912                zone_map.null_count += 1;
913                continue;
914            }
915
916            // Update min
917            match &zone_map.min {
918                None => zone_map.min = Some(value.clone()),
919                Some(current) => {
920                    if compare_values(value, current) == Some(Ordering::Less) {
921                        zone_map.min = Some(value.clone());
922                    }
923                }
924            }
925
926            // Update max
927            match &zone_map.max {
928                None => zone_map.max = Some(value.clone()),
929                Some(current) => {
930                    if compare_values(value, current) == Some(Ordering::Greater) {
931                        zone_map.max = Some(value.clone());
932                    }
933                }
934            }
935        }
936
937        self.zone_map = zone_map;
938        self.zone_map_dirty = false;
939    }
940}
941
942/// Compares two values for ordering.
943fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
944    match (a, b) {
945        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
946        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
947        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
948        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
949        (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
950        (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
951        _ => None,
952    }
953}
954
955impl<Id: EntityId> Default for PropertyColumn<Id> {
956    fn default() -> Self {
957        Self::new()
958    }
959}
960
961/// A borrowed reference to a property column for bulk reads.
962///
963/// Holds the read lock so the column can't change while you're iterating.
964pub struct PropertyColumnRef<'a, Id: EntityId = NodeId> {
965    _guard: parking_lot::RwLockReadGuard<'a, FxHashMap<PropertyKey, PropertyColumn<Id>>>,
966    #[allow(dead_code)]
967    key: PropertyKey,
968    _marker: PhantomData<Id>,
969}
970
971#[cfg(test)]
972mod tests {
973    use super::*;
974
975    #[test]
976    fn test_property_storage_basic() {
977        let storage = PropertyStorage::new();
978
979        let node1 = NodeId::new(1);
980        let node2 = NodeId::new(2);
981        let name_key = PropertyKey::new("name");
982        let age_key = PropertyKey::new("age");
983
984        storage.set(node1, name_key.clone(), "Alice".into());
985        storage.set(node1, age_key.clone(), 30i64.into());
986        storage.set(node2, name_key.clone(), "Bob".into());
987
988        assert_eq!(
989            storage.get(node1, &name_key),
990            Some(Value::String("Alice".into()))
991        );
992        assert_eq!(storage.get(node1, &age_key), Some(Value::Int64(30)));
993        assert_eq!(
994            storage.get(node2, &name_key),
995            Some(Value::String("Bob".into()))
996        );
997        assert!(storage.get(node2, &age_key).is_none());
998    }
999
1000    #[test]
1001    fn test_property_storage_remove() {
1002        let storage = PropertyStorage::new();
1003
1004        let node = NodeId::new(1);
1005        let key = PropertyKey::new("name");
1006
1007        storage.set(node, key.clone(), "Alice".into());
1008        assert!(storage.get(node, &key).is_some());
1009
1010        let removed = storage.remove(node, &key);
1011        assert!(removed.is_some());
1012        assert!(storage.get(node, &key).is_none());
1013    }
1014
1015    #[test]
1016    fn test_property_storage_get_all() {
1017        let storage = PropertyStorage::new();
1018
1019        let node = NodeId::new(1);
1020        storage.set(node, PropertyKey::new("name"), "Alice".into());
1021        storage.set(node, PropertyKey::new("age"), 30i64.into());
1022        storage.set(node, PropertyKey::new("active"), true.into());
1023
1024        let props = storage.get_all(node);
1025        assert_eq!(props.len(), 3);
1026    }
1027
1028    #[test]
1029    fn test_property_storage_remove_all() {
1030        let storage = PropertyStorage::new();
1031
1032        let node = NodeId::new(1);
1033        storage.set(node, PropertyKey::new("name"), "Alice".into());
1034        storage.set(node, PropertyKey::new("age"), 30i64.into());
1035
1036        storage.remove_all(node);
1037
1038        assert!(storage.get(node, &PropertyKey::new("name")).is_none());
1039        assert!(storage.get(node, &PropertyKey::new("age")).is_none());
1040    }
1041
1042    #[test]
1043    fn test_property_column() {
1044        let mut col = PropertyColumn::new();
1045
1046        col.set(NodeId::new(1), "Alice".into());
1047        col.set(NodeId::new(2), "Bob".into());
1048
1049        assert_eq!(col.len(), 2);
1050        assert!(!col.is_empty());
1051
1052        assert_eq!(col.get(NodeId::new(1)), Some(Value::String("Alice".into())));
1053
1054        col.remove(NodeId::new(1));
1055        assert!(col.get(NodeId::new(1)).is_none());
1056        assert_eq!(col.len(), 1);
1057    }
1058
1059    #[test]
1060    fn test_compression_mode() {
1061        let col: PropertyColumn<NodeId> = PropertyColumn::new();
1062        assert_eq!(col.compression_mode(), CompressionMode::None);
1063
1064        let col: PropertyColumn<NodeId> = PropertyColumn::with_compression(CompressionMode::Auto);
1065        assert_eq!(col.compression_mode(), CompressionMode::Auto);
1066    }
1067
1068    #[test]
1069    fn test_property_storage_with_compression() {
1070        let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1071
1072        for i in 0..100 {
1073            storage.set(
1074                NodeId::new(i),
1075                PropertyKey::new("age"),
1076                Value::Int64(20 + (i as i64 % 50)),
1077            );
1078        }
1079
1080        // Values should still be readable
1081        assert_eq!(
1082            storage.get(NodeId::new(0), &PropertyKey::new("age")),
1083            Some(Value::Int64(20))
1084        );
1085        assert_eq!(
1086            storage.get(NodeId::new(50), &PropertyKey::new("age")),
1087            Some(Value::Int64(20))
1088        );
1089    }
1090
1091    #[test]
1092    fn test_compress_integer_column() {
1093        let mut col: PropertyColumn<NodeId> =
1094            PropertyColumn::with_compression(CompressionMode::Auto);
1095
1096        // Add many sequential integers
1097        for i in 0..2000 {
1098            col.set(NodeId::new(i), Value::Int64(1000 + i as i64));
1099        }
1100
1101        // Should have triggered compression at some point
1102        // Total count should include both compressed and hot buffer values
1103        let stats = col.compression_stats();
1104        assert_eq!(stats.value_count, 2000);
1105
1106        // Values from the hot buffer should be readable
1107        // Note: Compressed values are not accessible via get() - see design note
1108        let last_value = col.get(NodeId::new(1999));
1109        assert!(last_value.is_some() || col.is_compressed());
1110    }
1111
1112    #[test]
1113    fn test_compress_string_column() {
1114        let mut col: PropertyColumn<NodeId> =
1115            PropertyColumn::with_compression(CompressionMode::Auto);
1116
1117        // Add repeated strings (good for dictionary compression)
1118        let categories = ["Person", "Company", "Product", "Location"];
1119        for i in 0..2000 {
1120            let cat = categories[i % 4];
1121            col.set(NodeId::new(i as u64), Value::String(Arc::from(cat)));
1122        }
1123
1124        // Total count should be correct
1125        assert_eq!(col.len(), 2000);
1126
1127        // Late values should be in hot buffer and readable
1128        let last_value = col.get(NodeId::new(1999));
1129        assert!(last_value.is_some() || col.is_compressed());
1130    }
1131
1132    #[test]
1133    fn test_compress_boolean_column() {
1134        let mut col: PropertyColumn<NodeId> =
1135            PropertyColumn::with_compression(CompressionMode::Auto);
1136
1137        // Add booleans
1138        for i in 0..2000 {
1139            col.set(NodeId::new(i as u64), Value::Bool(i % 2 == 0));
1140        }
1141
1142        // Verify total count
1143        assert_eq!(col.len(), 2000);
1144
1145        // Late values should be readable
1146        let last_value = col.get(NodeId::new(1999));
1147        assert!(last_value.is_some() || col.is_compressed());
1148    }
1149
1150    #[test]
1151    fn test_force_compress() {
1152        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1153
1154        // Add fewer values than the threshold
1155        for i in 0..100 {
1156            col.set(NodeId::new(i), Value::Int64(i as i64));
1157        }
1158
1159        // Force compression
1160        col.force_compress();
1161
1162        // Stats should show compression was applied if beneficial
1163        let stats = col.compression_stats();
1164        assert_eq!(stats.value_count, 100);
1165    }
1166
1167    #[test]
1168    fn test_compression_stats() {
1169        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1170
1171        for i in 0..50 {
1172            col.set(NodeId::new(i), Value::Int64(i as i64));
1173        }
1174
1175        let stats = col.compression_stats();
1176        assert_eq!(stats.value_count, 50);
1177        assert!(stats.uncompressed_size > 0);
1178    }
1179
1180    #[test]
1181    fn test_storage_compression_stats() {
1182        let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1183
1184        for i in 0..100 {
1185            storage.set(
1186                NodeId::new(i),
1187                PropertyKey::new("age"),
1188                Value::Int64(i as i64),
1189            );
1190            storage.set(
1191                NodeId::new(i),
1192                PropertyKey::new("name"),
1193                Value::String(Arc::from("Alice")),
1194            );
1195        }
1196
1197        let stats = storage.compression_stats();
1198        assert_eq!(stats.len(), 2); // Two columns
1199        assert!(stats.contains_key(&PropertyKey::new("age")));
1200        assert!(stats.contains_key(&PropertyKey::new("name")));
1201    }
1202
1203    #[test]
1204    fn test_memory_usage() {
1205        let storage = PropertyStorage::new();
1206
1207        for i in 0..100 {
1208            storage.set(
1209                NodeId::new(i),
1210                PropertyKey::new("value"),
1211                Value::Int64(i as i64),
1212            );
1213        }
1214
1215        let usage = storage.memory_usage();
1216        assert!(usage > 0);
1217    }
1218}