Skip to main content

grafeo_core/graph/lpg/
property.rs

1//! Columnar property storage for nodes and edges.
2//!
3//! Properties are stored column-wise (all "name" values together, all "age"
4//! values together) rather than row-wise. This makes filtering fast - to find
5//! all nodes where age > 30, we only scan the age column.
6//!
7//! Each column also maintains a zone map (min/max/null_count) enabling the
8//! query optimizer to skip columns entirely when a predicate can't match.
9//!
10//! ## Compression
11//!
12//! Columns can be compressed to save memory. When compression is enabled,
13//! the column automatically selects the best codec based on the data type:
14//!
15//! | Data type | Codec | Typical savings |
16//! |-----------|-------|-----------------|
17//! | Int64 (sorted) | DeltaBitPacked | 5-20x |
18//! | Int64 (small) | BitPacked | 2-16x |
19//! | Int64 (repeated) | RunLength | 2-100x |
20//! | String (low cardinality) | Dictionary | 2-50x |
21//! | Bool | BitVector | 8x |
22
23use crate::index::zone_map::ZoneMapEntry;
24use crate::storage::{
25    CompressedData, CompressionCodec, DictionaryBuilder, DictionaryEncoding, TypeSpecificCompressor,
26};
27use grafeo_common::types::{EdgeId, NodeId, PropertyKey, Value};
28use grafeo_common::utils::hash::FxHashMap;
29use parking_lot::RwLock;
30use std::cmp::Ordering;
31use std::hash::Hash;
32use std::marker::PhantomData;
33use std::sync::Arc;
34
35/// Compression mode for property columns.
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
37pub enum CompressionMode {
38    /// Never compress - always use sparse HashMap (default).
39    #[default]
40    None,
41    /// Automatically compress when beneficial (after threshold).
42    Auto,
43    /// Eagerly compress on every flush.
44    Eager,
45}
46
47/// Threshold for automatic compression (number of values).
48const COMPRESSION_THRESHOLD: usize = 1000;
49
50/// Size of the hot buffer for recent writes (before compression).
51const HOT_BUFFER_SIZE: usize = 256;
52
53/// Comparison operators used for zone map predicate checks.
54///
55/// These map directly to GQL comparison operators like `=`, `<`, `>=`.
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum CompareOp {
58    /// Equal to value.
59    Eq,
60    /// Not equal to value.
61    Ne,
62    /// Less than value.
63    Lt,
64    /// Less than or equal to value.
65    Le,
66    /// Greater than value.
67    Gt,
68    /// Greater than or equal to value.
69    Ge,
70}
71
72/// Marker trait for IDs that can key into property storage.
73///
74/// Implemented for [`NodeId`] and [`EdgeId`] - you can store properties on both.
75pub trait EntityId: Copy + Eq + Hash + 'static {}
76
77impl EntityId for NodeId {}
78impl EntityId for EdgeId {}
79
80/// Thread-safe columnar property storage.
81///
82/// Each property key ("name", "age", etc.) gets its own column. This layout
83/// is great for analytical queries that filter on specific properties -
84/// you only touch the columns you need.
85///
86/// Generic over `Id` so the same storage works for nodes and edges.
87///
88/// # Example
89///
90/// ```
91/// use grafeo_core::graph::lpg::PropertyStorage;
92/// use grafeo_common::types::{NodeId, PropertyKey};
93///
94/// let storage = PropertyStorage::new();
95/// let alice = NodeId::new(1);
96///
97/// storage.set(alice, PropertyKey::new("name"), "Alice".into());
98/// storage.set(alice, PropertyKey::new("age"), 30i64.into());
99///
100/// // Fetch all properties at once
101/// let props = storage.get_all(alice);
102/// assert_eq!(props.len(), 2);
103/// ```
104pub struct PropertyStorage<Id: EntityId = NodeId> {
105    /// Map from property key to column.
106    columns: RwLock<FxHashMap<PropertyKey, PropertyColumn<Id>>>,
107    /// Default compression mode for new columns.
108    default_compression: CompressionMode,
109    _marker: PhantomData<Id>,
110}
111
112impl<Id: EntityId> PropertyStorage<Id> {
113    /// Creates a new property storage.
114    #[must_use]
115    pub fn new() -> Self {
116        Self {
117            columns: RwLock::new(FxHashMap::default()),
118            default_compression: CompressionMode::None,
119            _marker: PhantomData,
120        }
121    }
122
123    /// Creates a new property storage with compression enabled.
124    #[must_use]
125    pub fn with_compression(mode: CompressionMode) -> Self {
126        Self {
127            columns: RwLock::new(FxHashMap::default()),
128            default_compression: mode,
129            _marker: PhantomData,
130        }
131    }
132
133    /// Sets the default compression mode for new columns.
134    pub fn set_default_compression(&mut self, mode: CompressionMode) {
135        self.default_compression = mode;
136    }
137
138    /// Sets a property value for an entity.
139    pub fn set(&self, id: Id, key: PropertyKey, value: Value) {
140        let mut columns = self.columns.write();
141        let mode = self.default_compression;
142        columns
143            .entry(key)
144            .or_insert_with(|| PropertyColumn::with_compression(mode))
145            .set(id, value);
146    }
147
148    /// Enables compression for a specific column.
149    pub fn enable_compression(&self, key: &PropertyKey, mode: CompressionMode) {
150        let mut columns = self.columns.write();
151        if let Some(col) = columns.get_mut(key) {
152            col.set_compression_mode(mode);
153        }
154    }
155
156    /// Compresses all columns that have compression enabled.
157    pub fn compress_all(&self) {
158        let mut columns = self.columns.write();
159        for col in columns.values_mut() {
160            if col.compression_mode() != CompressionMode::None {
161                col.compress();
162            }
163        }
164    }
165
166    /// Forces compression on all columns regardless of mode.
167    pub fn force_compress_all(&self) {
168        let mut columns = self.columns.write();
169        for col in columns.values_mut() {
170            col.force_compress();
171        }
172    }
173
174    /// Returns compression statistics for all columns.
175    #[must_use]
176    pub fn compression_stats(&self) -> FxHashMap<PropertyKey, CompressionStats> {
177        let columns = self.columns.read();
178        columns
179            .iter()
180            .map(|(key, col)| (key.clone(), col.compression_stats()))
181            .collect()
182    }
183
184    /// Returns the total memory usage of all columns.
185    #[must_use]
186    pub fn memory_usage(&self) -> usize {
187        let columns = self.columns.read();
188        columns
189            .values()
190            .map(|col| col.compression_stats().compressed_size)
191            .sum()
192    }
193
194    /// Gets a property value for an entity.
195    #[must_use]
196    pub fn get(&self, id: Id, key: &PropertyKey) -> Option<Value> {
197        let columns = self.columns.read();
198        columns.get(key).and_then(|col| col.get(id))
199    }
200
201    /// Removes a property value for an entity.
202    pub fn remove(&self, id: Id, key: &PropertyKey) -> Option<Value> {
203        let mut columns = self.columns.write();
204        columns.get_mut(key).and_then(|col| col.remove(id))
205    }
206
207    /// Removes all properties for an entity.
208    pub fn remove_all(&self, id: Id) {
209        let mut columns = self.columns.write();
210        for col in columns.values_mut() {
211            col.remove(id);
212        }
213    }
214
215    /// Gets all properties for an entity.
216    #[must_use]
217    pub fn get_all(&self, id: Id) -> FxHashMap<PropertyKey, Value> {
218        let columns = self.columns.read();
219        let mut result = FxHashMap::default();
220        for (key, col) in columns.iter() {
221            if let Some(value) = col.get(id) {
222                result.insert(key.clone(), value);
223            }
224        }
225        result
226    }
227
228    /// Returns the number of property columns.
229    #[must_use]
230    pub fn column_count(&self) -> usize {
231        self.columns.read().len()
232    }
233
234    /// Returns the keys of all columns.
235    #[must_use]
236    pub fn keys(&self) -> Vec<PropertyKey> {
237        self.columns.read().keys().cloned().collect()
238    }
239
240    /// Gets a column by key for bulk access.
241    #[must_use]
242    pub fn column(&self, key: &PropertyKey) -> Option<PropertyColumnRef<'_, Id>> {
243        let columns = self.columns.read();
244        if columns.contains_key(key) {
245            Some(PropertyColumnRef {
246                _guard: columns,
247                key: key.clone(),
248                _marker: PhantomData,
249            })
250        } else {
251            None
252        }
253    }
254
255    /// Checks if a predicate might match any values (using zone maps).
256    ///
257    /// Returns `false` only when we're *certain* no values match - for example,
258    /// if you're looking for age > 100 but the max age is 80. Returns `true`
259    /// if the property doesn't exist (conservative - might match).
260    #[must_use]
261    pub fn might_match(&self, key: &PropertyKey, op: CompareOp, value: &Value) -> bool {
262        let columns = self.columns.read();
263        columns
264            .get(key)
265            .map(|col| col.might_match(op, value))
266            .unwrap_or(true) // No column = assume might match (conservative)
267    }
268
269    /// Gets the zone map for a property column.
270    #[must_use]
271    pub fn zone_map(&self, key: &PropertyKey) -> Option<ZoneMapEntry> {
272        let columns = self.columns.read();
273        columns.get(key).map(|col| col.zone_map().clone())
274    }
275
276    /// Rebuilds zone maps for all columns (call after bulk removes).
277    pub fn rebuild_zone_maps(&self) {
278        let mut columns = self.columns.write();
279        for col in columns.values_mut() {
280            col.rebuild_zone_map();
281        }
282    }
283}
284
285impl<Id: EntityId> Default for PropertyStorage<Id> {
286    fn default() -> Self {
287        Self::new()
288    }
289}
290
291/// Compressed storage for a property column.
292///
293/// Holds the compressed representation of values along with the index
294/// mapping entity IDs to positions in the compressed array.
295#[derive(Debug)]
296pub enum CompressedColumnData {
297    /// Compressed integers (Int64 values).
298    Integers {
299        /// Compressed data.
300        data: CompressedData,
301        /// Index: entity ID position -> compressed array index.
302        id_to_index: Vec<u64>,
303        /// Reverse index: compressed array index -> entity ID position.
304        index_to_id: Vec<u64>,
305    },
306    /// Dictionary-encoded strings.
307    Strings {
308        /// Dictionary encoding.
309        encoding: DictionaryEncoding,
310        /// Index: entity ID position -> dictionary index.
311        id_to_index: Vec<u64>,
312        /// Reverse index: dictionary index -> entity ID position.
313        index_to_id: Vec<u64>,
314    },
315    /// Compressed booleans.
316    Booleans {
317        /// Compressed data.
318        data: CompressedData,
319        /// Index: entity ID position -> bit index.
320        id_to_index: Vec<u64>,
321        /// Reverse index: bit index -> entity ID position.
322        index_to_id: Vec<u64>,
323    },
324}
325
326impl CompressedColumnData {
327    /// Returns the memory usage of the compressed data in bytes.
328    #[must_use]
329    pub fn memory_usage(&self) -> usize {
330        match self {
331            CompressedColumnData::Integers {
332                data,
333                id_to_index,
334                index_to_id,
335            } => {
336                data.data.len()
337                    + id_to_index.len() * std::mem::size_of::<u64>()
338                    + index_to_id.len() * std::mem::size_of::<u64>()
339            }
340            CompressedColumnData::Strings {
341                encoding,
342                id_to_index,
343                index_to_id,
344            } => {
345                encoding.codes().len() * std::mem::size_of::<u32>()
346                    + encoding.dictionary().iter().map(|s| s.len()).sum::<usize>()
347                    + id_to_index.len() * std::mem::size_of::<u64>()
348                    + index_to_id.len() * std::mem::size_of::<u64>()
349            }
350            CompressedColumnData::Booleans {
351                data,
352                id_to_index,
353                index_to_id,
354            } => {
355                data.data.len()
356                    + id_to_index.len() * std::mem::size_of::<u64>()
357                    + index_to_id.len() * std::mem::size_of::<u64>()
358            }
359        }
360    }
361
362    /// Returns the compression ratio.
363    #[must_use]
364    #[allow(dead_code)]
365    pub fn compression_ratio(&self) -> f64 {
366        match self {
367            CompressedColumnData::Integers { data, .. } => data.compression_ratio(),
368            CompressedColumnData::Strings { encoding, .. } => encoding.compression_ratio(),
369            CompressedColumnData::Booleans { data, .. } => data.compression_ratio(),
370        }
371    }
372}
373
374/// Statistics about column compression.
375#[derive(Debug, Clone, Default)]
376pub struct CompressionStats {
377    /// Size of uncompressed data in bytes.
378    pub uncompressed_size: usize,
379    /// Size of compressed data in bytes.
380    pub compressed_size: usize,
381    /// Number of values in the column.
382    pub value_count: usize,
383    /// Codec used for compression.
384    pub codec: Option<CompressionCodec>,
385}
386
387impl CompressionStats {
388    /// Returns the compression ratio (uncompressed / compressed).
389    #[must_use]
390    pub fn compression_ratio(&self) -> f64 {
391        if self.compressed_size == 0 {
392            return 1.0;
393        }
394        self.uncompressed_size as f64 / self.compressed_size as f64
395    }
396}
397
398/// A single property column (e.g., all "age" values).
399///
400/// Maintains min/max/null_count for fast predicate evaluation. When you
401/// filter on `age > 50`, we first check if any age could possibly match
402/// before scanning the actual values.
403///
404/// Columns support optional compression for large datasets. When compression
405/// is enabled, the column automatically selects the best codec based on the
406/// data type and characteristics.
407pub struct PropertyColumn<Id: EntityId = NodeId> {
408    /// Sparse storage: entity ID -> value (hot buffer + uncompressed).
409    /// Used for recent writes and when compression is disabled.
410    values: FxHashMap<Id, Value>,
411    /// Zone map tracking min/max/null_count for predicate pushdown.
412    zone_map: ZoneMapEntry,
413    /// Whether zone map needs rebuild (after removes).
414    zone_map_dirty: bool,
415    /// Compression mode for this column.
416    compression_mode: CompressionMode,
417    /// Compressed data (when compression is enabled and triggered).
418    compressed: Option<CompressedColumnData>,
419    /// Number of values before last compression.
420    compressed_count: usize,
421}
422
423impl<Id: EntityId> PropertyColumn<Id> {
424    /// Creates a new empty column.
425    #[must_use]
426    pub fn new() -> Self {
427        Self {
428            values: FxHashMap::default(),
429            zone_map: ZoneMapEntry::new(),
430            zone_map_dirty: false,
431            compression_mode: CompressionMode::None,
432            compressed: None,
433            compressed_count: 0,
434        }
435    }
436
437    /// Creates a new column with the specified compression mode.
438    #[must_use]
439    pub fn with_compression(mode: CompressionMode) -> Self {
440        Self {
441            values: FxHashMap::default(),
442            zone_map: ZoneMapEntry::new(),
443            zone_map_dirty: false,
444            compression_mode: mode,
445            compressed: None,
446            compressed_count: 0,
447        }
448    }
449
450    /// Sets the compression mode for this column.
451    pub fn set_compression_mode(&mut self, mode: CompressionMode) {
452        self.compression_mode = mode;
453        if mode == CompressionMode::None {
454            // Decompress if switching to no compression
455            if self.compressed.is_some() {
456                self.decompress_all();
457            }
458        }
459    }
460
461    /// Returns the compression mode for this column.
462    #[must_use]
463    pub fn compression_mode(&self) -> CompressionMode {
464        self.compression_mode
465    }
466
467    /// Sets a value for an entity.
468    pub fn set(&mut self, id: Id, value: Value) {
469        // Update zone map incrementally
470        self.update_zone_map_on_insert(&value);
471        self.values.insert(id, value);
472
473        // Check if we should compress (in Auto mode)
474        if self.compression_mode == CompressionMode::Auto {
475            let total_count = self.values.len() + self.compressed_count;
476            let hot_buffer_count = self.values.len();
477
478            // Compress when hot buffer exceeds threshold and total is large enough
479            if hot_buffer_count >= HOT_BUFFER_SIZE && total_count >= COMPRESSION_THRESHOLD {
480                self.compress();
481            }
482        }
483    }
484
485    /// Updates zone map when inserting a value.
486    fn update_zone_map_on_insert(&mut self, value: &Value) {
487        self.zone_map.row_count += 1;
488
489        if matches!(value, Value::Null) {
490            self.zone_map.null_count += 1;
491            return;
492        }
493
494        // Update min
495        match &self.zone_map.min {
496            None => self.zone_map.min = Some(value.clone()),
497            Some(current) => {
498                if compare_values(value, current) == Some(Ordering::Less) {
499                    self.zone_map.min = Some(value.clone());
500                }
501            }
502        }
503
504        // Update max
505        match &self.zone_map.max {
506            None => self.zone_map.max = Some(value.clone()),
507            Some(current) => {
508                if compare_values(value, current) == Some(Ordering::Greater) {
509                    self.zone_map.max = Some(value.clone());
510                }
511            }
512        }
513    }
514
515    /// Gets a value for an entity.
516    ///
517    /// First checks the hot buffer (uncompressed values), then falls back
518    /// to the compressed data if present.
519    #[must_use]
520    pub fn get(&self, id: Id) -> Option<Value> {
521        // First check hot buffer
522        if let Some(value) = self.values.get(&id) {
523            return Some(value.clone());
524        }
525
526        // For now, compressed data lookup is not implemented for sparse access
527        // because the compressed format stores values by index, not by entity ID.
528        // This would require maintaining an ID -> index map in CompressedColumnData.
529        // The compressed data is primarily useful for bulk/scan operations.
530        None
531    }
532
533    /// Removes a value for an entity.
534    pub fn remove(&mut self, id: Id) -> Option<Value> {
535        let removed = self.values.remove(&id);
536        if removed.is_some() {
537            // Mark zone map as dirty - would need full rebuild for accurate min/max
538            self.zone_map_dirty = true;
539        }
540        removed
541    }
542
543    /// Returns the number of values in this column (hot + compressed).
544    #[must_use]
545    #[allow(dead_code)]
546    pub fn len(&self) -> usize {
547        self.values.len() + self.compressed_count
548    }
549
550    /// Returns true if this column is empty.
551    #[must_use]
552    #[allow(dead_code)]
553    pub fn is_empty(&self) -> bool {
554        self.values.is_empty() && self.compressed_count == 0
555    }
556
557    /// Iterates over all (id, value) pairs in the hot buffer.
558    ///
559    /// Note: This only iterates over uncompressed values. For full iteration
560    /// including compressed values, use [`iter_all`].
561    #[allow(dead_code)]
562    pub fn iter(&self) -> impl Iterator<Item = (Id, &Value)> {
563        self.values.iter().map(|(&id, v)| (id, v))
564    }
565
566    /// Returns compression statistics for this column.
567    #[must_use]
568    pub fn compression_stats(&self) -> CompressionStats {
569        let hot_size = self.values.len() * std::mem::size_of::<Value>();
570        let compressed_size = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
571        let codec = match &self.compressed {
572            Some(CompressedColumnData::Integers { data, .. }) => Some(data.codec),
573            Some(CompressedColumnData::Strings { .. }) => Some(CompressionCodec::Dictionary),
574            Some(CompressedColumnData::Booleans { data, .. }) => Some(data.codec),
575            None => None,
576        };
577
578        CompressionStats {
579            uncompressed_size: hot_size + self.compressed_count * std::mem::size_of::<Value>(),
580            compressed_size: hot_size + compressed_size,
581            value_count: self.len(),
582            codec,
583        }
584    }
585
586    /// Returns whether the column has compressed data.
587    #[must_use]
588    #[allow(dead_code)]
589    pub fn is_compressed(&self) -> bool {
590        self.compressed.is_some()
591    }
592
593    /// Compresses the hot buffer values.
594    ///
595    /// This merges the hot buffer into the compressed data, selecting the
596    /// best codec based on the value types.
597    ///
598    /// Note: If compressed data already exists, this is a no-op to avoid
599    /// losing previously compressed values. Use `force_compress()` after
600    /// decompressing to re-compress with all values.
601    pub fn compress(&mut self) {
602        if self.values.is_empty() {
603            return;
604        }
605
606        // Don't re-compress if we already have compressed data
607        // (would need to decompress and merge first)
608        if self.compressed.is_some() {
609            return;
610        }
611
612        // Determine the dominant type
613        let (int_count, str_count, bool_count) = self.count_types();
614        let total = self.values.len();
615
616        if int_count > total / 2 {
617            self.compress_as_integers();
618        } else if str_count > total / 2 {
619            self.compress_as_strings();
620        } else if bool_count > total / 2 {
621            self.compress_as_booleans();
622        }
623        // If no dominant type, don't compress (mixed types don't compress well)
624    }
625
626    /// Counts values by type.
627    fn count_types(&self) -> (usize, usize, usize) {
628        let mut int_count = 0;
629        let mut str_count = 0;
630        let mut bool_count = 0;
631
632        for value in self.values.values() {
633            match value {
634                Value::Int64(_) => int_count += 1,
635                Value::String(_) => str_count += 1,
636                Value::Bool(_) => bool_count += 1,
637                _ => {}
638            }
639        }
640
641        (int_count, str_count, bool_count)
642    }
643
644    /// Compresses integer values.
645    #[allow(unsafe_code)]
646    fn compress_as_integers(&mut self) {
647        // Extract integer values and their IDs
648        let mut values: Vec<(u64, i64)> = Vec::new();
649        let mut non_int_values: FxHashMap<Id, Value> = FxHashMap::default();
650
651        for (&id, value) in &self.values {
652            match value {
653                Value::Int64(v) => {
654                    // Convert Id to u64 for indexing (assumes Id can be converted)
655                    let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
656                    values.push((id_u64, *v));
657                }
658                _ => {
659                    non_int_values.insert(id, value.clone());
660                }
661            }
662        }
663
664        if values.len() < 8 {
665            // Not worth compressing
666            return;
667        }
668
669        // Sort by ID for better compression
670        values.sort_by_key(|(id, _)| *id);
671
672        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
673        let index_to_id: Vec<u64> = id_to_index.clone();
674        let int_values: Vec<i64> = values.iter().map(|(_, v)| *v).collect();
675
676        // Compress using the optimal codec
677        let compressed = TypeSpecificCompressor::compress_signed_integers(&int_values);
678
679        // Only use compression if it actually saves space
680        if compressed.compression_ratio() > 1.2 {
681            self.compressed = Some(CompressedColumnData::Integers {
682                data: compressed,
683                id_to_index,
684                index_to_id,
685            });
686            self.compressed_count = values.len();
687            self.values = non_int_values;
688        }
689    }
690
691    /// Compresses string values using dictionary encoding.
692    #[allow(unsafe_code)]
693    fn compress_as_strings(&mut self) {
694        let mut values: Vec<(u64, Arc<str>)> = Vec::new();
695        let mut non_str_values: FxHashMap<Id, Value> = FxHashMap::default();
696
697        for (&id, value) in &self.values {
698            match value {
699                Value::String(s) => {
700                    let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
701                    values.push((id_u64, Arc::clone(s)));
702                }
703                _ => {
704                    non_str_values.insert(id, value.clone());
705                }
706            }
707        }
708
709        if values.len() < 8 {
710            return;
711        }
712
713        // Sort by ID
714        values.sort_by_key(|(id, _)| *id);
715
716        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
717        let index_to_id: Vec<u64> = id_to_index.clone();
718
719        // Build dictionary
720        let mut builder = DictionaryBuilder::new();
721        for (_, s) in &values {
722            builder.add(s.as_ref());
723        }
724        let encoding = builder.build();
725
726        // Only use compression if it actually saves space
727        if encoding.compression_ratio() > 1.2 {
728            self.compressed = Some(CompressedColumnData::Strings {
729                encoding,
730                id_to_index,
731                index_to_id,
732            });
733            self.compressed_count = values.len();
734            self.values = non_str_values;
735        }
736    }
737
738    /// Compresses boolean values.
739    #[allow(unsafe_code)]
740    fn compress_as_booleans(&mut self) {
741        let mut values: Vec<(u64, bool)> = Vec::new();
742        let mut non_bool_values: FxHashMap<Id, Value> = FxHashMap::default();
743
744        for (&id, value) in &self.values {
745            match value {
746                Value::Bool(b) => {
747                    let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
748                    values.push((id_u64, *b));
749                }
750                _ => {
751                    non_bool_values.insert(id, value.clone());
752                }
753            }
754        }
755
756        if values.len() < 8 {
757            return;
758        }
759
760        // Sort by ID
761        values.sort_by_key(|(id, _)| *id);
762
763        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
764        let index_to_id: Vec<u64> = id_to_index.clone();
765        let bool_values: Vec<bool> = values.iter().map(|(_, v)| *v).collect();
766
767        let compressed = TypeSpecificCompressor::compress_booleans(&bool_values);
768
769        // Booleans always compress well (8x)
770        self.compressed = Some(CompressedColumnData::Booleans {
771            data: compressed,
772            id_to_index,
773            index_to_id,
774        });
775        self.compressed_count = values.len();
776        self.values = non_bool_values;
777    }
778
779    /// Decompresses all values back to the hot buffer.
780    #[allow(unsafe_code)]
781    fn decompress_all(&mut self) {
782        let Some(compressed) = self.compressed.take() else {
783            return;
784        };
785
786        match compressed {
787            CompressedColumnData::Integers {
788                data, index_to_id, ..
789            } => {
790                if let Ok(values) = TypeSpecificCompressor::decompress_integers(&data) {
791                    // Convert back to signed using zigzag decoding
792                    let signed: Vec<i64> = values
793                        .iter()
794                        .map(|&v| crate::storage::zigzag_decode(v))
795                        .collect();
796
797                    for (i, id_u64) in index_to_id.iter().enumerate() {
798                        if let Some(&value) = signed.get(i) {
799                            let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
800                            self.values.insert(id, Value::Int64(value));
801                        }
802                    }
803                }
804            }
805            CompressedColumnData::Strings {
806                encoding,
807                index_to_id,
808                ..
809            } => {
810                for (i, id_u64) in index_to_id.iter().enumerate() {
811                    if let Some(s) = encoding.get(i) {
812                        let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
813                        self.values.insert(id, Value::String(Arc::from(s)));
814                    }
815                }
816            }
817            CompressedColumnData::Booleans {
818                data, index_to_id, ..
819            } => {
820                if let Ok(values) = TypeSpecificCompressor::decompress_booleans(&data) {
821                    for (i, id_u64) in index_to_id.iter().enumerate() {
822                        if let Some(&value) = values.get(i) {
823                            let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
824                            self.values.insert(id, Value::Bool(value));
825                        }
826                    }
827                }
828            }
829        }
830
831        self.compressed_count = 0;
832    }
833
834    /// Forces compression regardless of thresholds.
835    ///
836    /// Useful for bulk loading or when you know the column is complete.
837    pub fn force_compress(&mut self) {
838        self.compress();
839    }
840
841    /// Returns the zone map for this column.
842    #[must_use]
843    pub fn zone_map(&self) -> &ZoneMapEntry {
844        &self.zone_map
845    }
846
847    /// Uses zone map to check if any values could satisfy the predicate.
848    ///
849    /// Returns `false` when we can prove no values match (so the column
850    /// can be skipped entirely). Returns `true` if values might match.
851    #[must_use]
852    pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
853        if self.zone_map_dirty {
854            // Conservative: can't skip if zone map is stale
855            return true;
856        }
857
858        match op {
859            CompareOp::Eq => self.zone_map.might_contain_equal(value),
860            CompareOp::Ne => {
861                // Can only skip if all values are equal to the value
862                // (which means min == max == value)
863                match (&self.zone_map.min, &self.zone_map.max) {
864                    (Some(min), Some(max)) => {
865                        !(compare_values(min, value) == Some(Ordering::Equal)
866                            && compare_values(max, value) == Some(Ordering::Equal))
867                    }
868                    _ => true,
869                }
870            }
871            CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
872            CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
873            CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
874            CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
875        }
876    }
877
878    /// Rebuilds zone map from current values.
879    pub fn rebuild_zone_map(&mut self) {
880        let mut zone_map = ZoneMapEntry::new();
881
882        for value in self.values.values() {
883            zone_map.row_count += 1;
884
885            if matches!(value, Value::Null) {
886                zone_map.null_count += 1;
887                continue;
888            }
889
890            // Update min
891            match &zone_map.min {
892                None => zone_map.min = Some(value.clone()),
893                Some(current) => {
894                    if compare_values(value, current) == Some(Ordering::Less) {
895                        zone_map.min = Some(value.clone());
896                    }
897                }
898            }
899
900            // Update max
901            match &zone_map.max {
902                None => zone_map.max = Some(value.clone()),
903                Some(current) => {
904                    if compare_values(value, current) == Some(Ordering::Greater) {
905                        zone_map.max = Some(value.clone());
906                    }
907                }
908            }
909        }
910
911        self.zone_map = zone_map;
912        self.zone_map_dirty = false;
913    }
914}
915
916/// Compares two values for ordering.
917fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
918    match (a, b) {
919        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
920        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
921        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
922        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
923        (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
924        (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
925        _ => None,
926    }
927}
928
929impl<Id: EntityId> Default for PropertyColumn<Id> {
930    fn default() -> Self {
931        Self::new()
932    }
933}
934
935/// A borrowed reference to a property column for bulk reads.
936///
937/// Holds the read lock so the column can't change while you're iterating.
938pub struct PropertyColumnRef<'a, Id: EntityId = NodeId> {
939    _guard: parking_lot::RwLockReadGuard<'a, FxHashMap<PropertyKey, PropertyColumn<Id>>>,
940    #[allow(dead_code)]
941    key: PropertyKey,
942    _marker: PhantomData<Id>,
943}
944
945#[cfg(test)]
946mod tests {
947    use super::*;
948
949    #[test]
950    fn test_property_storage_basic() {
951        let storage = PropertyStorage::new();
952
953        let node1 = NodeId::new(1);
954        let node2 = NodeId::new(2);
955        let name_key = PropertyKey::new("name");
956        let age_key = PropertyKey::new("age");
957
958        storage.set(node1, name_key.clone(), "Alice".into());
959        storage.set(node1, age_key.clone(), 30i64.into());
960        storage.set(node2, name_key.clone(), "Bob".into());
961
962        assert_eq!(
963            storage.get(node1, &name_key),
964            Some(Value::String("Alice".into()))
965        );
966        assert_eq!(storage.get(node1, &age_key), Some(Value::Int64(30)));
967        assert_eq!(
968            storage.get(node2, &name_key),
969            Some(Value::String("Bob".into()))
970        );
971        assert!(storage.get(node2, &age_key).is_none());
972    }
973
974    #[test]
975    fn test_property_storage_remove() {
976        let storage = PropertyStorage::new();
977
978        let node = NodeId::new(1);
979        let key = PropertyKey::new("name");
980
981        storage.set(node, key.clone(), "Alice".into());
982        assert!(storage.get(node, &key).is_some());
983
984        let removed = storage.remove(node, &key);
985        assert!(removed.is_some());
986        assert!(storage.get(node, &key).is_none());
987    }
988
989    #[test]
990    fn test_property_storage_get_all() {
991        let storage = PropertyStorage::new();
992
993        let node = NodeId::new(1);
994        storage.set(node, PropertyKey::new("name"), "Alice".into());
995        storage.set(node, PropertyKey::new("age"), 30i64.into());
996        storage.set(node, PropertyKey::new("active"), true.into());
997
998        let props = storage.get_all(node);
999        assert_eq!(props.len(), 3);
1000    }
1001
1002    #[test]
1003    fn test_property_storage_remove_all() {
1004        let storage = PropertyStorage::new();
1005
1006        let node = NodeId::new(1);
1007        storage.set(node, PropertyKey::new("name"), "Alice".into());
1008        storage.set(node, PropertyKey::new("age"), 30i64.into());
1009
1010        storage.remove_all(node);
1011
1012        assert!(storage.get(node, &PropertyKey::new("name")).is_none());
1013        assert!(storage.get(node, &PropertyKey::new("age")).is_none());
1014    }
1015
1016    #[test]
1017    fn test_property_column() {
1018        let mut col = PropertyColumn::new();
1019
1020        col.set(NodeId::new(1), "Alice".into());
1021        col.set(NodeId::new(2), "Bob".into());
1022
1023        assert_eq!(col.len(), 2);
1024        assert!(!col.is_empty());
1025
1026        assert_eq!(col.get(NodeId::new(1)), Some(Value::String("Alice".into())));
1027
1028        col.remove(NodeId::new(1));
1029        assert!(col.get(NodeId::new(1)).is_none());
1030        assert_eq!(col.len(), 1);
1031    }
1032
1033    #[test]
1034    fn test_compression_mode() {
1035        let col: PropertyColumn<NodeId> = PropertyColumn::new();
1036        assert_eq!(col.compression_mode(), CompressionMode::None);
1037
1038        let col: PropertyColumn<NodeId> = PropertyColumn::with_compression(CompressionMode::Auto);
1039        assert_eq!(col.compression_mode(), CompressionMode::Auto);
1040    }
1041
1042    #[test]
1043    fn test_property_storage_with_compression() {
1044        let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1045
1046        for i in 0..100 {
1047            storage.set(
1048                NodeId::new(i),
1049                PropertyKey::new("age"),
1050                Value::Int64(20 + (i as i64 % 50)),
1051            );
1052        }
1053
1054        // Values should still be readable
1055        assert_eq!(
1056            storage.get(NodeId::new(0), &PropertyKey::new("age")),
1057            Some(Value::Int64(20))
1058        );
1059        assert_eq!(
1060            storage.get(NodeId::new(50), &PropertyKey::new("age")),
1061            Some(Value::Int64(20))
1062        );
1063    }
1064
1065    #[test]
1066    fn test_compress_integer_column() {
1067        let mut col: PropertyColumn<NodeId> =
1068            PropertyColumn::with_compression(CompressionMode::Auto);
1069
1070        // Add many sequential integers
1071        for i in 0..2000 {
1072            col.set(NodeId::new(i), Value::Int64(1000 + i as i64));
1073        }
1074
1075        // Should have triggered compression at some point
1076        // Total count should include both compressed and hot buffer values
1077        let stats = col.compression_stats();
1078        assert_eq!(stats.value_count, 2000);
1079
1080        // Values from the hot buffer should be readable
1081        // Note: Compressed values are not accessible via get() - see design note
1082        let last_value = col.get(NodeId::new(1999));
1083        assert!(last_value.is_some() || col.is_compressed());
1084    }
1085
1086    #[test]
1087    fn test_compress_string_column() {
1088        let mut col: PropertyColumn<NodeId> =
1089            PropertyColumn::with_compression(CompressionMode::Auto);
1090
1091        // Add repeated strings (good for dictionary compression)
1092        let categories = ["Person", "Company", "Product", "Location"];
1093        for i in 0..2000 {
1094            let cat = categories[i % 4];
1095            col.set(NodeId::new(i as u64), Value::String(Arc::from(cat)));
1096        }
1097
1098        // Total count should be correct
1099        assert_eq!(col.len(), 2000);
1100
1101        // Late values should be in hot buffer and readable
1102        let last_value = col.get(NodeId::new(1999));
1103        assert!(last_value.is_some() || col.is_compressed());
1104    }
1105
1106    #[test]
1107    fn test_compress_boolean_column() {
1108        let mut col: PropertyColumn<NodeId> =
1109            PropertyColumn::with_compression(CompressionMode::Auto);
1110
1111        // Add booleans
1112        for i in 0..2000 {
1113            col.set(NodeId::new(i as u64), Value::Bool(i % 2 == 0));
1114        }
1115
1116        // Verify total count
1117        assert_eq!(col.len(), 2000);
1118
1119        // Late values should be readable
1120        let last_value = col.get(NodeId::new(1999));
1121        assert!(last_value.is_some() || col.is_compressed());
1122    }
1123
1124    #[test]
1125    fn test_force_compress() {
1126        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1127
1128        // Add fewer values than the threshold
1129        for i in 0..100 {
1130            col.set(NodeId::new(i), Value::Int64(i as i64));
1131        }
1132
1133        // Force compression
1134        col.force_compress();
1135
1136        // Stats should show compression was applied if beneficial
1137        let stats = col.compression_stats();
1138        assert_eq!(stats.value_count, 100);
1139    }
1140
1141    #[test]
1142    fn test_compression_stats() {
1143        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1144
1145        for i in 0..50 {
1146            col.set(NodeId::new(i), Value::Int64(i as i64));
1147        }
1148
1149        let stats = col.compression_stats();
1150        assert_eq!(stats.value_count, 50);
1151        assert!(stats.uncompressed_size > 0);
1152    }
1153
1154    #[test]
1155    fn test_storage_compression_stats() {
1156        let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1157
1158        for i in 0..100 {
1159            storage.set(
1160                NodeId::new(i),
1161                PropertyKey::new("age"),
1162                Value::Int64(i as i64),
1163            );
1164            storage.set(
1165                NodeId::new(i),
1166                PropertyKey::new("name"),
1167                Value::String(Arc::from("Alice")),
1168            );
1169        }
1170
1171        let stats = storage.compression_stats();
1172        assert_eq!(stats.len(), 2); // Two columns
1173        assert!(stats.contains_key(&PropertyKey::new("age")));
1174        assert!(stats.contains_key(&PropertyKey::new("name")));
1175    }
1176
1177    #[test]
1178    fn test_memory_usage() {
1179        let storage = PropertyStorage::new();
1180
1181        for i in 0..100 {
1182            storage.set(
1183                NodeId::new(i),
1184                PropertyKey::new("value"),
1185                Value::Int64(i as i64),
1186            );
1187        }
1188
1189        let usage = storage.memory_usage();
1190        assert!(usage > 0);
1191    }
1192}