Skip to main content

grafeo_core/graph/lpg/
property.rs

1//! Columnar property storage for nodes and edges.
2//!
3//! Properties are stored column-wise (all "name" values together, all "age"
4//! values together) rather than row-wise. This makes filtering fast - to find
5//! all nodes where age > 30, we only scan the age column.
6//!
7//! Each column also maintains a zone map (min/max/null_count) enabling the
8//! query optimizer to skip columns entirely when a predicate can't match.
9//!
10//! ## Compression
11//!
12//! Columns can be compressed to save memory. When compression is enabled,
13//! the column automatically selects the best codec based on the data type:
14//!
15//! | Data type | Codec | Typical savings |
16//! |-----------|-------|-----------------|
17//! | Int64 (sorted) | DeltaBitPacked | 5-20x |
18//! | Int64 (small) | BitPacked | 2-16x |
19//! | Int64 (repeated) | RunLength | 2-100x |
20//! | String (low cardinality) | Dictionary | 2-50x |
21//! | Bool | BitVector | 8x |
22
23use crate::index::zone_map::ZoneMapEntry;
24use crate::storage::{
25    CompressedData, CompressionCodec, DictionaryBuilder, DictionaryEncoding, TypeSpecificCompressor,
26};
27use grafeo_common::types::{EdgeId, NodeId, PropertyKey, Value};
28use grafeo_common::utils::hash::FxHashMap;
29use parking_lot::RwLock;
30use std::cmp::Ordering;
31use std::hash::Hash;
32use std::marker::PhantomData;
33use std::sync::Arc;
34
35/// Compression mode for property columns.
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
37pub enum CompressionMode {
38    /// Never compress - always use sparse HashMap (default).
39    #[default]
40    None,
41    /// Automatically compress when beneficial (after threshold).
42    Auto,
43    /// Eagerly compress on every flush.
44    Eager,
45}
46
47/// Threshold for automatic compression (number of values).
48const COMPRESSION_THRESHOLD: usize = 1000;
49
50/// Size of the hot buffer for recent writes (before compression).
51/// Larger buffer (4096) keeps more recent data uncompressed for faster reads.
52/// This trades ~64KB of memory overhead per column for 1.5-2x faster point lookups
53/// on recently-written data.
54const HOT_BUFFER_SIZE: usize = 4096;
55
56/// Comparison operators used for zone map predicate checks.
57///
58/// These map directly to GQL comparison operators like `=`, `<`, `>=`.
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum CompareOp {
61    /// Equal to value.
62    Eq,
63    /// Not equal to value.
64    Ne,
65    /// Less than value.
66    Lt,
67    /// Less than or equal to value.
68    Le,
69    /// Greater than value.
70    Gt,
71    /// Greater than or equal to value.
72    Ge,
73}
74
75/// Marker trait for IDs that can key into property storage.
76///
77/// Implemented for [`NodeId`] and [`EdgeId`] - you can store properties on both.
78pub trait EntityId: Copy + Eq + Hash + 'static {}
79
80impl EntityId for NodeId {}
81impl EntityId for EdgeId {}
82
83/// Thread-safe columnar property storage.
84///
85/// Each property key ("name", "age", etc.) gets its own column. This layout
86/// is great for analytical queries that filter on specific properties -
87/// you only touch the columns you need.
88///
89/// Generic over `Id` so the same storage works for nodes and edges.
90///
91/// # Example
92///
93/// ```
94/// use grafeo_core::graph::lpg::PropertyStorage;
95/// use grafeo_common::types::{NodeId, PropertyKey};
96///
97/// let storage = PropertyStorage::new();
98/// let alice = NodeId::new(1);
99///
100/// storage.set(alice, PropertyKey::new("name"), "Alice".into());
101/// storage.set(alice, PropertyKey::new("age"), 30i64.into());
102///
103/// // Fetch all properties at once
104/// let props = storage.get_all(alice);
105/// assert_eq!(props.len(), 2);
106/// ```
107pub struct PropertyStorage<Id: EntityId = NodeId> {
108    /// Map from property key to column.
109    columns: RwLock<FxHashMap<PropertyKey, PropertyColumn<Id>>>,
110    /// Default compression mode for new columns.
111    default_compression: CompressionMode,
112    _marker: PhantomData<Id>,
113}
114
115impl<Id: EntityId> PropertyStorage<Id> {
116    /// Creates a new property storage.
117    #[must_use]
118    pub fn new() -> Self {
119        Self {
120            columns: RwLock::new(FxHashMap::default()),
121            default_compression: CompressionMode::None,
122            _marker: PhantomData,
123        }
124    }
125
126    /// Creates a new property storage with compression enabled.
127    #[must_use]
128    pub fn with_compression(mode: CompressionMode) -> Self {
129        Self {
130            columns: RwLock::new(FxHashMap::default()),
131            default_compression: mode,
132            _marker: PhantomData,
133        }
134    }
135
136    /// Sets the default compression mode for new columns.
137    pub fn set_default_compression(&mut self, mode: CompressionMode) {
138        self.default_compression = mode;
139    }
140
141    /// Sets a property value for an entity.
142    pub fn set(&self, id: Id, key: PropertyKey, value: Value) {
143        let mut columns = self.columns.write();
144        let mode = self.default_compression;
145        columns
146            .entry(key)
147            .or_insert_with(|| PropertyColumn::with_compression(mode))
148            .set(id, value);
149    }
150
151    /// Enables compression for a specific column.
152    pub fn enable_compression(&self, key: &PropertyKey, mode: CompressionMode) {
153        let mut columns = self.columns.write();
154        if let Some(col) = columns.get_mut(key) {
155            col.set_compression_mode(mode);
156        }
157    }
158
159    /// Compresses all columns that have compression enabled.
160    pub fn compress_all(&self) {
161        let mut columns = self.columns.write();
162        for col in columns.values_mut() {
163            if col.compression_mode() != CompressionMode::None {
164                col.compress();
165            }
166        }
167    }
168
169    /// Forces compression on all columns regardless of mode.
170    pub fn force_compress_all(&self) {
171        let mut columns = self.columns.write();
172        for col in columns.values_mut() {
173            col.force_compress();
174        }
175    }
176
177    /// Returns compression statistics for all columns.
178    #[must_use]
179    pub fn compression_stats(&self) -> FxHashMap<PropertyKey, CompressionStats> {
180        let columns = self.columns.read();
181        columns
182            .iter()
183            .map(|(key, col)| (key.clone(), col.compression_stats()))
184            .collect()
185    }
186
187    /// Returns the total memory usage of all columns.
188    #[must_use]
189    pub fn memory_usage(&self) -> usize {
190        let columns = self.columns.read();
191        columns
192            .values()
193            .map(|col| col.compression_stats().compressed_size)
194            .sum()
195    }
196
197    /// Gets a property value for an entity.
198    #[must_use]
199    pub fn get(&self, id: Id, key: &PropertyKey) -> Option<Value> {
200        let columns = self.columns.read();
201        columns.get(key).and_then(|col| col.get(id))
202    }
203
204    /// Removes a property value for an entity.
205    pub fn remove(&self, id: Id, key: &PropertyKey) -> Option<Value> {
206        let mut columns = self.columns.write();
207        columns.get_mut(key).and_then(|col| col.remove(id))
208    }
209
210    /// Removes all properties for an entity.
211    pub fn remove_all(&self, id: Id) {
212        let mut columns = self.columns.write();
213        for col in columns.values_mut() {
214            col.remove(id);
215        }
216    }
217
218    /// Gets all properties for an entity.
219    #[must_use]
220    pub fn get_all(&self, id: Id) -> FxHashMap<PropertyKey, Value> {
221        let columns = self.columns.read();
222        let mut result = FxHashMap::default();
223        for (key, col) in columns.iter() {
224            if let Some(value) = col.get(id) {
225                result.insert(key.clone(), value);
226            }
227        }
228        result
229    }
230
231    /// Returns the number of property columns.
232    #[must_use]
233    pub fn column_count(&self) -> usize {
234        self.columns.read().len()
235    }
236
237    /// Returns the keys of all columns.
238    #[must_use]
239    pub fn keys(&self) -> Vec<PropertyKey> {
240        self.columns.read().keys().cloned().collect()
241    }
242
243    /// Gets a column by key for bulk access.
244    #[must_use]
245    pub fn column(&self, key: &PropertyKey) -> Option<PropertyColumnRef<'_, Id>> {
246        let columns = self.columns.read();
247        if columns.contains_key(key) {
248            Some(PropertyColumnRef {
249                _guard: columns,
250                key: key.clone(),
251                _marker: PhantomData,
252            })
253        } else {
254            None
255        }
256    }
257
258    /// Checks if a predicate might match any values (using zone maps).
259    ///
260    /// Returns `false` only when we're *certain* no values match - for example,
261    /// if you're looking for age > 100 but the max age is 80. Returns `true`
262    /// if the property doesn't exist (conservative - might match).
263    #[must_use]
264    pub fn might_match(&self, key: &PropertyKey, op: CompareOp, value: &Value) -> bool {
265        let columns = self.columns.read();
266        columns
267            .get(key)
268            .map(|col| col.might_match(op, value))
269            .unwrap_or(true) // No column = assume might match (conservative)
270    }
271
272    /// Gets the zone map for a property column.
273    #[must_use]
274    pub fn zone_map(&self, key: &PropertyKey) -> Option<ZoneMapEntry> {
275        let columns = self.columns.read();
276        columns.get(key).map(|col| col.zone_map().clone())
277    }
278
279    /// Rebuilds zone maps for all columns (call after bulk removes).
280    pub fn rebuild_zone_maps(&self) {
281        let mut columns = self.columns.write();
282        for col in columns.values_mut() {
283            col.rebuild_zone_map();
284        }
285    }
286}
287
288impl<Id: EntityId> Default for PropertyStorage<Id> {
289    fn default() -> Self {
290        Self::new()
291    }
292}
293
294/// Compressed storage for a property column.
295///
296/// Holds the compressed representation of values along with the index
297/// mapping entity IDs to positions in the compressed array.
298#[derive(Debug)]
299pub enum CompressedColumnData {
300    /// Compressed integers (Int64 values).
301    Integers {
302        /// Compressed data.
303        data: CompressedData,
304        /// Index: entity ID position -> compressed array index.
305        id_to_index: Vec<u64>,
306        /// Reverse index: compressed array index -> entity ID position.
307        index_to_id: Vec<u64>,
308    },
309    /// Dictionary-encoded strings.
310    Strings {
311        /// Dictionary encoding.
312        encoding: DictionaryEncoding,
313        /// Index: entity ID position -> dictionary index.
314        id_to_index: Vec<u64>,
315        /// Reverse index: dictionary index -> entity ID position.
316        index_to_id: Vec<u64>,
317    },
318    /// Compressed booleans.
319    Booleans {
320        /// Compressed data.
321        data: CompressedData,
322        /// Index: entity ID position -> bit index.
323        id_to_index: Vec<u64>,
324        /// Reverse index: bit index -> entity ID position.
325        index_to_id: Vec<u64>,
326    },
327}
328
329impl CompressedColumnData {
330    /// Returns the memory usage of the compressed data in bytes.
331    #[must_use]
332    pub fn memory_usage(&self) -> usize {
333        match self {
334            CompressedColumnData::Integers {
335                data,
336                id_to_index,
337                index_to_id,
338            } => {
339                data.data.len()
340                    + id_to_index.len() * std::mem::size_of::<u64>()
341                    + index_to_id.len() * std::mem::size_of::<u64>()
342            }
343            CompressedColumnData::Strings {
344                encoding,
345                id_to_index,
346                index_to_id,
347            } => {
348                encoding.codes().len() * std::mem::size_of::<u32>()
349                    + encoding.dictionary().iter().map(|s| s.len()).sum::<usize>()
350                    + id_to_index.len() * std::mem::size_of::<u64>()
351                    + index_to_id.len() * std::mem::size_of::<u64>()
352            }
353            CompressedColumnData::Booleans {
354                data,
355                id_to_index,
356                index_to_id,
357            } => {
358                data.data.len()
359                    + id_to_index.len() * std::mem::size_of::<u64>()
360                    + index_to_id.len() * std::mem::size_of::<u64>()
361            }
362        }
363    }
364
365    /// Returns the compression ratio.
366    #[must_use]
367    #[allow(dead_code)]
368    pub fn compression_ratio(&self) -> f64 {
369        match self {
370            CompressedColumnData::Integers { data, .. } => data.compression_ratio(),
371            CompressedColumnData::Strings { encoding, .. } => encoding.compression_ratio(),
372            CompressedColumnData::Booleans { data, .. } => data.compression_ratio(),
373        }
374    }
375}
376
377/// Statistics about column compression.
378#[derive(Debug, Clone, Default)]
379pub struct CompressionStats {
380    /// Size of uncompressed data in bytes.
381    pub uncompressed_size: usize,
382    /// Size of compressed data in bytes.
383    pub compressed_size: usize,
384    /// Number of values in the column.
385    pub value_count: usize,
386    /// Codec used for compression.
387    pub codec: Option<CompressionCodec>,
388}
389
390impl CompressionStats {
391    /// Returns the compression ratio (uncompressed / compressed).
392    #[must_use]
393    pub fn compression_ratio(&self) -> f64 {
394        if self.compressed_size == 0 {
395            return 1.0;
396        }
397        self.uncompressed_size as f64 / self.compressed_size as f64
398    }
399}
400
401/// A single property column (e.g., all "age" values).
402///
403/// Maintains min/max/null_count for fast predicate evaluation. When you
404/// filter on `age > 50`, we first check if any age could possibly match
405/// before scanning the actual values.
406///
407/// Columns support optional compression for large datasets. When compression
408/// is enabled, the column automatically selects the best codec based on the
409/// data type and characteristics.
410pub struct PropertyColumn<Id: EntityId = NodeId> {
411    /// Sparse storage: entity ID -> value (hot buffer + uncompressed).
412    /// Used for recent writes and when compression is disabled.
413    values: FxHashMap<Id, Value>,
414    /// Zone map tracking min/max/null_count for predicate pushdown.
415    zone_map: ZoneMapEntry,
416    /// Whether zone map needs rebuild (after removes).
417    zone_map_dirty: bool,
418    /// Compression mode for this column.
419    compression_mode: CompressionMode,
420    /// Compressed data (when compression is enabled and triggered).
421    compressed: Option<CompressedColumnData>,
422    /// Number of values before last compression.
423    compressed_count: usize,
424}
425
426impl<Id: EntityId> PropertyColumn<Id> {
427    /// Creates a new empty column.
428    #[must_use]
429    pub fn new() -> Self {
430        Self {
431            values: FxHashMap::default(),
432            zone_map: ZoneMapEntry::new(),
433            zone_map_dirty: false,
434            compression_mode: CompressionMode::None,
435            compressed: None,
436            compressed_count: 0,
437        }
438    }
439
440    /// Creates a new column with the specified compression mode.
441    #[must_use]
442    pub fn with_compression(mode: CompressionMode) -> Self {
443        Self {
444            values: FxHashMap::default(),
445            zone_map: ZoneMapEntry::new(),
446            zone_map_dirty: false,
447            compression_mode: mode,
448            compressed: None,
449            compressed_count: 0,
450        }
451    }
452
453    /// Sets the compression mode for this column.
454    pub fn set_compression_mode(&mut self, mode: CompressionMode) {
455        self.compression_mode = mode;
456        if mode == CompressionMode::None {
457            // Decompress if switching to no compression
458            if self.compressed.is_some() {
459                self.decompress_all();
460            }
461        }
462    }
463
464    /// Returns the compression mode for this column.
465    #[must_use]
466    pub fn compression_mode(&self) -> CompressionMode {
467        self.compression_mode
468    }
469
470    /// Sets a value for an entity.
471    pub fn set(&mut self, id: Id, value: Value) {
472        // Update zone map incrementally
473        self.update_zone_map_on_insert(&value);
474        self.values.insert(id, value);
475
476        // Check if we should compress (in Auto mode)
477        if self.compression_mode == CompressionMode::Auto {
478            let total_count = self.values.len() + self.compressed_count;
479            let hot_buffer_count = self.values.len();
480
481            // Compress when hot buffer exceeds threshold and total is large enough
482            if hot_buffer_count >= HOT_BUFFER_SIZE && total_count >= COMPRESSION_THRESHOLD {
483                self.compress();
484            }
485        }
486    }
487
488    /// Updates zone map when inserting a value.
489    fn update_zone_map_on_insert(&mut self, value: &Value) {
490        self.zone_map.row_count += 1;
491
492        if matches!(value, Value::Null) {
493            self.zone_map.null_count += 1;
494            return;
495        }
496
497        // Update min
498        match &self.zone_map.min {
499            None => self.zone_map.min = Some(value.clone()),
500            Some(current) => {
501                if compare_values(value, current) == Some(Ordering::Less) {
502                    self.zone_map.min = Some(value.clone());
503                }
504            }
505        }
506
507        // Update max
508        match &self.zone_map.max {
509            None => self.zone_map.max = Some(value.clone()),
510            Some(current) => {
511                if compare_values(value, current) == Some(Ordering::Greater) {
512                    self.zone_map.max = Some(value.clone());
513                }
514            }
515        }
516    }
517
518    /// Gets a value for an entity.
519    ///
520    /// First checks the hot buffer (uncompressed values), then falls back
521    /// to the compressed data if present.
522    #[must_use]
523    pub fn get(&self, id: Id) -> Option<Value> {
524        // First check hot buffer
525        if let Some(value) = self.values.get(&id) {
526            return Some(value.clone());
527        }
528
529        // For now, compressed data lookup is not implemented for sparse access
530        // because the compressed format stores values by index, not by entity ID.
531        // This would require maintaining an ID -> index map in CompressedColumnData.
532        // The compressed data is primarily useful for bulk/scan operations.
533        None
534    }
535
536    /// Removes a value for an entity.
537    pub fn remove(&mut self, id: Id) -> Option<Value> {
538        let removed = self.values.remove(&id);
539        if removed.is_some() {
540            // Mark zone map as dirty - would need full rebuild for accurate min/max
541            self.zone_map_dirty = true;
542        }
543        removed
544    }
545
546    /// Returns the number of values in this column (hot + compressed).
547    #[must_use]
548    #[allow(dead_code)]
549    pub fn len(&self) -> usize {
550        self.values.len() + self.compressed_count
551    }
552
553    /// Returns true if this column is empty.
554    #[must_use]
555    #[allow(dead_code)]
556    pub fn is_empty(&self) -> bool {
557        self.values.is_empty() && self.compressed_count == 0
558    }
559
560    /// Iterates over all (id, value) pairs in the hot buffer.
561    ///
562    /// Note: This only iterates over uncompressed values. For full iteration
563    /// including compressed values, use [`iter_all`].
564    #[allow(dead_code)]
565    pub fn iter(&self) -> impl Iterator<Item = (Id, &Value)> {
566        self.values.iter().map(|(&id, v)| (id, v))
567    }
568
569    /// Returns compression statistics for this column.
570    #[must_use]
571    pub fn compression_stats(&self) -> CompressionStats {
572        let hot_size = self.values.len() * std::mem::size_of::<Value>();
573        let compressed_size = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
574        let codec = match &self.compressed {
575            Some(CompressedColumnData::Integers { data, .. }) => Some(data.codec),
576            Some(CompressedColumnData::Strings { .. }) => Some(CompressionCodec::Dictionary),
577            Some(CompressedColumnData::Booleans { data, .. }) => Some(data.codec),
578            None => None,
579        };
580
581        CompressionStats {
582            uncompressed_size: hot_size + self.compressed_count * std::mem::size_of::<Value>(),
583            compressed_size: hot_size + compressed_size,
584            value_count: self.len(),
585            codec,
586        }
587    }
588
589    /// Returns whether the column has compressed data.
590    #[must_use]
591    #[allow(dead_code)]
592    pub fn is_compressed(&self) -> bool {
593        self.compressed.is_some()
594    }
595
596    /// Compresses the hot buffer values.
597    ///
598    /// This merges the hot buffer into the compressed data, selecting the
599    /// best codec based on the value types.
600    ///
601    /// Note: If compressed data already exists, this is a no-op to avoid
602    /// losing previously compressed values. Use `force_compress()` after
603    /// decompressing to re-compress with all values.
604    pub fn compress(&mut self) {
605        if self.values.is_empty() {
606            return;
607        }
608
609        // Don't re-compress if we already have compressed data
610        // (would need to decompress and merge first)
611        if self.compressed.is_some() {
612            return;
613        }
614
615        // Determine the dominant type
616        let (int_count, str_count, bool_count) = self.count_types();
617        let total = self.values.len();
618
619        if int_count > total / 2 {
620            self.compress_as_integers();
621        } else if str_count > total / 2 {
622            self.compress_as_strings();
623        } else if bool_count > total / 2 {
624            self.compress_as_booleans();
625        }
626        // If no dominant type, don't compress (mixed types don't compress well)
627    }
628
629    /// Counts values by type.
630    fn count_types(&self) -> (usize, usize, usize) {
631        let mut int_count = 0;
632        let mut str_count = 0;
633        let mut bool_count = 0;
634
635        for value in self.values.values() {
636            match value {
637                Value::Int64(_) => int_count += 1,
638                Value::String(_) => str_count += 1,
639                Value::Bool(_) => bool_count += 1,
640                _ => {}
641            }
642        }
643
644        (int_count, str_count, bool_count)
645    }
646
647    /// Compresses integer values.
648    #[allow(unsafe_code)]
649    fn compress_as_integers(&mut self) {
650        // Extract integer values and their IDs
651        let mut values: Vec<(u64, i64)> = Vec::new();
652        let mut non_int_values: FxHashMap<Id, Value> = FxHashMap::default();
653
654        for (&id, value) in &self.values {
655            match value {
656                Value::Int64(v) => {
657                    // Convert Id to u64 for indexing (assumes Id can be converted)
658                    let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
659                    values.push((id_u64, *v));
660                }
661                _ => {
662                    non_int_values.insert(id, value.clone());
663                }
664            }
665        }
666
667        if values.len() < 8 {
668            // Not worth compressing
669            return;
670        }
671
672        // Sort by ID for better compression
673        values.sort_by_key(|(id, _)| *id);
674
675        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
676        let index_to_id: Vec<u64> = id_to_index.clone();
677        let int_values: Vec<i64> = values.iter().map(|(_, v)| *v).collect();
678
679        // Compress using the optimal codec
680        let compressed = TypeSpecificCompressor::compress_signed_integers(&int_values);
681
682        // Only use compression if it actually saves space
683        if compressed.compression_ratio() > 1.2 {
684            self.compressed = Some(CompressedColumnData::Integers {
685                data: compressed,
686                id_to_index,
687                index_to_id,
688            });
689            self.compressed_count = values.len();
690            self.values = non_int_values;
691        }
692    }
693
694    /// Compresses string values using dictionary encoding.
695    #[allow(unsafe_code)]
696    fn compress_as_strings(&mut self) {
697        let mut values: Vec<(u64, Arc<str>)> = Vec::new();
698        let mut non_str_values: FxHashMap<Id, Value> = FxHashMap::default();
699
700        for (&id, value) in &self.values {
701            match value {
702                Value::String(s) => {
703                    let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
704                    values.push((id_u64, Arc::clone(s)));
705                }
706                _ => {
707                    non_str_values.insert(id, value.clone());
708                }
709            }
710        }
711
712        if values.len() < 8 {
713            return;
714        }
715
716        // Sort by ID
717        values.sort_by_key(|(id, _)| *id);
718
719        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
720        let index_to_id: Vec<u64> = id_to_index.clone();
721
722        // Build dictionary
723        let mut builder = DictionaryBuilder::new();
724        for (_, s) in &values {
725            builder.add(s.as_ref());
726        }
727        let encoding = builder.build();
728
729        // Only use compression if it actually saves space
730        if encoding.compression_ratio() > 1.2 {
731            self.compressed = Some(CompressedColumnData::Strings {
732                encoding,
733                id_to_index,
734                index_to_id,
735            });
736            self.compressed_count = values.len();
737            self.values = non_str_values;
738        }
739    }
740
741    /// Compresses boolean values.
742    #[allow(unsafe_code)]
743    fn compress_as_booleans(&mut self) {
744        let mut values: Vec<(u64, bool)> = Vec::new();
745        let mut non_bool_values: FxHashMap<Id, Value> = FxHashMap::default();
746
747        for (&id, value) in &self.values {
748            match value {
749                Value::Bool(b) => {
750                    let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
751                    values.push((id_u64, *b));
752                }
753                _ => {
754                    non_bool_values.insert(id, value.clone());
755                }
756            }
757        }
758
759        if values.len() < 8 {
760            return;
761        }
762
763        // Sort by ID
764        values.sort_by_key(|(id, _)| *id);
765
766        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
767        let index_to_id: Vec<u64> = id_to_index.clone();
768        let bool_values: Vec<bool> = values.iter().map(|(_, v)| *v).collect();
769
770        let compressed = TypeSpecificCompressor::compress_booleans(&bool_values);
771
772        // Booleans always compress well (8x)
773        self.compressed = Some(CompressedColumnData::Booleans {
774            data: compressed,
775            id_to_index,
776            index_to_id,
777        });
778        self.compressed_count = values.len();
779        self.values = non_bool_values;
780    }
781
782    /// Decompresses all values back to the hot buffer.
783    #[allow(unsafe_code)]
784    fn decompress_all(&mut self) {
785        let Some(compressed) = self.compressed.take() else {
786            return;
787        };
788
789        match compressed {
790            CompressedColumnData::Integers {
791                data, index_to_id, ..
792            } => {
793                if let Ok(values) = TypeSpecificCompressor::decompress_integers(&data) {
794                    // Convert back to signed using zigzag decoding
795                    let signed: Vec<i64> = values
796                        .iter()
797                        .map(|&v| crate::storage::zigzag_decode(v))
798                        .collect();
799
800                    for (i, id_u64) in index_to_id.iter().enumerate() {
801                        if let Some(&value) = signed.get(i) {
802                            let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
803                            self.values.insert(id, Value::Int64(value));
804                        }
805                    }
806                }
807            }
808            CompressedColumnData::Strings {
809                encoding,
810                index_to_id,
811                ..
812            } => {
813                for (i, id_u64) in index_to_id.iter().enumerate() {
814                    if let Some(s) = encoding.get(i) {
815                        let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
816                        self.values.insert(id, Value::String(Arc::from(s)));
817                    }
818                }
819            }
820            CompressedColumnData::Booleans {
821                data, index_to_id, ..
822            } => {
823                if let Ok(values) = TypeSpecificCompressor::decompress_booleans(&data) {
824                    for (i, id_u64) in index_to_id.iter().enumerate() {
825                        if let Some(&value) = values.get(i) {
826                            let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
827                            self.values.insert(id, Value::Bool(value));
828                        }
829                    }
830                }
831            }
832        }
833
834        self.compressed_count = 0;
835    }
836
837    /// Forces compression regardless of thresholds.
838    ///
839    /// Useful for bulk loading or when you know the column is complete.
840    pub fn force_compress(&mut self) {
841        self.compress();
842    }
843
844    /// Returns the zone map for this column.
845    #[must_use]
846    pub fn zone_map(&self) -> &ZoneMapEntry {
847        &self.zone_map
848    }
849
850    /// Uses zone map to check if any values could satisfy the predicate.
851    ///
852    /// Returns `false` when we can prove no values match (so the column
853    /// can be skipped entirely). Returns `true` if values might match.
854    #[must_use]
855    pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
856        if self.zone_map_dirty {
857            // Conservative: can't skip if zone map is stale
858            return true;
859        }
860
861        match op {
862            CompareOp::Eq => self.zone_map.might_contain_equal(value),
863            CompareOp::Ne => {
864                // Can only skip if all values are equal to the value
865                // (which means min == max == value)
866                match (&self.zone_map.min, &self.zone_map.max) {
867                    (Some(min), Some(max)) => {
868                        !(compare_values(min, value) == Some(Ordering::Equal)
869                            && compare_values(max, value) == Some(Ordering::Equal))
870                    }
871                    _ => true,
872                }
873            }
874            CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
875            CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
876            CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
877            CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
878        }
879    }
880
881    /// Rebuilds zone map from current values.
882    pub fn rebuild_zone_map(&mut self) {
883        let mut zone_map = ZoneMapEntry::new();
884
885        for value in self.values.values() {
886            zone_map.row_count += 1;
887
888            if matches!(value, Value::Null) {
889                zone_map.null_count += 1;
890                continue;
891            }
892
893            // Update min
894            match &zone_map.min {
895                None => zone_map.min = Some(value.clone()),
896                Some(current) => {
897                    if compare_values(value, current) == Some(Ordering::Less) {
898                        zone_map.min = Some(value.clone());
899                    }
900                }
901            }
902
903            // Update max
904            match &zone_map.max {
905                None => zone_map.max = Some(value.clone()),
906                Some(current) => {
907                    if compare_values(value, current) == Some(Ordering::Greater) {
908                        zone_map.max = Some(value.clone());
909                    }
910                }
911            }
912        }
913
914        self.zone_map = zone_map;
915        self.zone_map_dirty = false;
916    }
917}
918
919/// Compares two values for ordering.
920fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
921    match (a, b) {
922        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
923        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
924        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
925        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
926        (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
927        (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
928        _ => None,
929    }
930}
931
932impl<Id: EntityId> Default for PropertyColumn<Id> {
933    fn default() -> Self {
934        Self::new()
935    }
936}
937
938/// A borrowed reference to a property column for bulk reads.
939///
940/// Holds the read lock so the column can't change while you're iterating.
941pub struct PropertyColumnRef<'a, Id: EntityId = NodeId> {
942    _guard: parking_lot::RwLockReadGuard<'a, FxHashMap<PropertyKey, PropertyColumn<Id>>>,
943    #[allow(dead_code)]
944    key: PropertyKey,
945    _marker: PhantomData<Id>,
946}
947
948#[cfg(test)]
949mod tests {
950    use super::*;
951
952    #[test]
953    fn test_property_storage_basic() {
954        let storage = PropertyStorage::new();
955
956        let node1 = NodeId::new(1);
957        let node2 = NodeId::new(2);
958        let name_key = PropertyKey::new("name");
959        let age_key = PropertyKey::new("age");
960
961        storage.set(node1, name_key.clone(), "Alice".into());
962        storage.set(node1, age_key.clone(), 30i64.into());
963        storage.set(node2, name_key.clone(), "Bob".into());
964
965        assert_eq!(
966            storage.get(node1, &name_key),
967            Some(Value::String("Alice".into()))
968        );
969        assert_eq!(storage.get(node1, &age_key), Some(Value::Int64(30)));
970        assert_eq!(
971            storage.get(node2, &name_key),
972            Some(Value::String("Bob".into()))
973        );
974        assert!(storage.get(node2, &age_key).is_none());
975    }
976
977    #[test]
978    fn test_property_storage_remove() {
979        let storage = PropertyStorage::new();
980
981        let node = NodeId::new(1);
982        let key = PropertyKey::new("name");
983
984        storage.set(node, key.clone(), "Alice".into());
985        assert!(storage.get(node, &key).is_some());
986
987        let removed = storage.remove(node, &key);
988        assert!(removed.is_some());
989        assert!(storage.get(node, &key).is_none());
990    }
991
992    #[test]
993    fn test_property_storage_get_all() {
994        let storage = PropertyStorage::new();
995
996        let node = NodeId::new(1);
997        storage.set(node, PropertyKey::new("name"), "Alice".into());
998        storage.set(node, PropertyKey::new("age"), 30i64.into());
999        storage.set(node, PropertyKey::new("active"), true.into());
1000
1001        let props = storage.get_all(node);
1002        assert_eq!(props.len(), 3);
1003    }
1004
1005    #[test]
1006    fn test_property_storage_remove_all() {
1007        let storage = PropertyStorage::new();
1008
1009        let node = NodeId::new(1);
1010        storage.set(node, PropertyKey::new("name"), "Alice".into());
1011        storage.set(node, PropertyKey::new("age"), 30i64.into());
1012
1013        storage.remove_all(node);
1014
1015        assert!(storage.get(node, &PropertyKey::new("name")).is_none());
1016        assert!(storage.get(node, &PropertyKey::new("age")).is_none());
1017    }
1018
1019    #[test]
1020    fn test_property_column() {
1021        let mut col = PropertyColumn::new();
1022
1023        col.set(NodeId::new(1), "Alice".into());
1024        col.set(NodeId::new(2), "Bob".into());
1025
1026        assert_eq!(col.len(), 2);
1027        assert!(!col.is_empty());
1028
1029        assert_eq!(col.get(NodeId::new(1)), Some(Value::String("Alice".into())));
1030
1031        col.remove(NodeId::new(1));
1032        assert!(col.get(NodeId::new(1)).is_none());
1033        assert_eq!(col.len(), 1);
1034    }
1035
1036    #[test]
1037    fn test_compression_mode() {
1038        let col: PropertyColumn<NodeId> = PropertyColumn::new();
1039        assert_eq!(col.compression_mode(), CompressionMode::None);
1040
1041        let col: PropertyColumn<NodeId> = PropertyColumn::with_compression(CompressionMode::Auto);
1042        assert_eq!(col.compression_mode(), CompressionMode::Auto);
1043    }
1044
1045    #[test]
1046    fn test_property_storage_with_compression() {
1047        let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1048
1049        for i in 0..100 {
1050            storage.set(
1051                NodeId::new(i),
1052                PropertyKey::new("age"),
1053                Value::Int64(20 + (i as i64 % 50)),
1054            );
1055        }
1056
1057        // Values should still be readable
1058        assert_eq!(
1059            storage.get(NodeId::new(0), &PropertyKey::new("age")),
1060            Some(Value::Int64(20))
1061        );
1062        assert_eq!(
1063            storage.get(NodeId::new(50), &PropertyKey::new("age")),
1064            Some(Value::Int64(20))
1065        );
1066    }
1067
1068    #[test]
1069    fn test_compress_integer_column() {
1070        let mut col: PropertyColumn<NodeId> =
1071            PropertyColumn::with_compression(CompressionMode::Auto);
1072
1073        // Add many sequential integers
1074        for i in 0..2000 {
1075            col.set(NodeId::new(i), Value::Int64(1000 + i as i64));
1076        }
1077
1078        // Should have triggered compression at some point
1079        // Total count should include both compressed and hot buffer values
1080        let stats = col.compression_stats();
1081        assert_eq!(stats.value_count, 2000);
1082
1083        // Values from the hot buffer should be readable
1084        // Note: Compressed values are not accessible via get() - see design note
1085        let last_value = col.get(NodeId::new(1999));
1086        assert!(last_value.is_some() || col.is_compressed());
1087    }
1088
1089    #[test]
1090    fn test_compress_string_column() {
1091        let mut col: PropertyColumn<NodeId> =
1092            PropertyColumn::with_compression(CompressionMode::Auto);
1093
1094        // Add repeated strings (good for dictionary compression)
1095        let categories = ["Person", "Company", "Product", "Location"];
1096        for i in 0..2000 {
1097            let cat = categories[i % 4];
1098            col.set(NodeId::new(i as u64), Value::String(Arc::from(cat)));
1099        }
1100
1101        // Total count should be correct
1102        assert_eq!(col.len(), 2000);
1103
1104        // Late values should be in hot buffer and readable
1105        let last_value = col.get(NodeId::new(1999));
1106        assert!(last_value.is_some() || col.is_compressed());
1107    }
1108
1109    #[test]
1110    fn test_compress_boolean_column() {
1111        let mut col: PropertyColumn<NodeId> =
1112            PropertyColumn::with_compression(CompressionMode::Auto);
1113
1114        // Add booleans
1115        for i in 0..2000 {
1116            col.set(NodeId::new(i as u64), Value::Bool(i % 2 == 0));
1117        }
1118
1119        // Verify total count
1120        assert_eq!(col.len(), 2000);
1121
1122        // Late values should be readable
1123        let last_value = col.get(NodeId::new(1999));
1124        assert!(last_value.is_some() || col.is_compressed());
1125    }
1126
1127    #[test]
1128    fn test_force_compress() {
1129        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1130
1131        // Add fewer values than the threshold
1132        for i in 0..100 {
1133            col.set(NodeId::new(i), Value::Int64(i as i64));
1134        }
1135
1136        // Force compression
1137        col.force_compress();
1138
1139        // Stats should show compression was applied if beneficial
1140        let stats = col.compression_stats();
1141        assert_eq!(stats.value_count, 100);
1142    }
1143
1144    #[test]
1145    fn test_compression_stats() {
1146        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1147
1148        for i in 0..50 {
1149            col.set(NodeId::new(i), Value::Int64(i as i64));
1150        }
1151
1152        let stats = col.compression_stats();
1153        assert_eq!(stats.value_count, 50);
1154        assert!(stats.uncompressed_size > 0);
1155    }
1156
1157    #[test]
1158    fn test_storage_compression_stats() {
1159        let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1160
1161        for i in 0..100 {
1162            storage.set(
1163                NodeId::new(i),
1164                PropertyKey::new("age"),
1165                Value::Int64(i as i64),
1166            );
1167            storage.set(
1168                NodeId::new(i),
1169                PropertyKey::new("name"),
1170                Value::String(Arc::from("Alice")),
1171            );
1172        }
1173
1174        let stats = storage.compression_stats();
1175        assert_eq!(stats.len(), 2); // Two columns
1176        assert!(stats.contains_key(&PropertyKey::new("age")));
1177        assert!(stats.contains_key(&PropertyKey::new("name")));
1178    }
1179
1180    #[test]
1181    fn test_memory_usage() {
1182        let storage = PropertyStorage::new();
1183
1184        for i in 0..100 {
1185            storage.set(
1186                NodeId::new(i),
1187                PropertyKey::new("value"),
1188                Value::Int64(i as i64),
1189            );
1190        }
1191
1192        let usage = storage.memory_usage();
1193        assert!(usage > 0);
1194    }
1195}