Skip to main content

grafeo_core/graph/lpg/
property.rs

1//! Columnar property storage for nodes and edges.
2//!
3//! Properties are stored column-wise (all "name" values together, all "age"
4//! values together) rather than row-wise. This makes filtering fast - to find
5//! all nodes where age > 30, we only scan the age column.
6//!
7//! Each column also maintains a zone map (min/max/null_count) enabling the
8//! query optimizer to skip columns entirely when a predicate can't match.
9//!
10//! ## Compression
11//!
12//! Columns can be compressed to save memory. When compression is enabled,
13//! the column automatically selects the best codec based on the data type:
14//!
15//! | Data type | Codec | Typical savings |
16//! |-----------|-------|-----------------|
17//! | Int64 (sorted) | DeltaBitPacked | 5-20x |
18//! | Int64 (small) | BitPacked | 2-16x |
19//! | Int64 (repeated) | RunLength | 2-100x |
20//! | String (low cardinality) | Dictionary | 2-50x |
21//! | Bool | BitVector | 8x |
22
23use crate::index::zone_map::ZoneMapEntry;
24use crate::storage::{
25    CompressedData, CompressionCodec, DictionaryBuilder, DictionaryEncoding, TypeSpecificCompressor,
26};
27use grafeo_common::types::{EdgeId, NodeId, PropertyKey, Value};
28use grafeo_common::utils::hash::FxHashMap;
29use parking_lot::RwLock;
30use std::cmp::Ordering;
31use std::hash::Hash;
32use std::marker::PhantomData;
33use std::sync::Arc;
34
35/// Compression mode for property columns.
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
37pub enum CompressionMode {
38    /// Never compress - always use sparse HashMap (default).
39    #[default]
40    None,
41    /// Automatically compress when beneficial (after threshold).
42    Auto,
43    /// Eagerly compress on every flush.
44    Eager,
45}
46
47/// Threshold for automatic compression (number of values).
48const COMPRESSION_THRESHOLD: usize = 1000;
49
50/// Size of the hot buffer for recent writes (before compression).
51/// Larger buffer (4096) keeps more recent data uncompressed for faster reads.
52/// This trades ~64KB of memory overhead per column for 1.5-2x faster point lookups
53/// on recently-written data.
54const HOT_BUFFER_SIZE: usize = 4096;
55
56/// Comparison operators used for zone map predicate checks.
57///
58/// These map directly to GQL comparison operators like `=`, `<`, `>=`.
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum CompareOp {
61    /// Equal to value.
62    Eq,
63    /// Not equal to value.
64    Ne,
65    /// Less than value.
66    Lt,
67    /// Less than or equal to value.
68    Le,
69    /// Greater than value.
70    Gt,
71    /// Greater than or equal to value.
72    Ge,
73}
74
75/// Marker trait for IDs that can key into property storage.
76///
77/// Implemented for [`NodeId`] and [`EdgeId`] - you can store properties on both.
78pub trait EntityId: Copy + Eq + Hash + 'static {}
79
80impl EntityId for NodeId {}
81impl EntityId for EdgeId {}
82
83/// Thread-safe columnar property storage.
84///
85/// Each property key ("name", "age", etc.) gets its own column. This layout
86/// is great for analytical queries that filter on specific properties -
87/// you only touch the columns you need.
88///
89/// Generic over `Id` so the same storage works for nodes and edges.
90///
91/// # Example
92///
93/// ```
94/// use grafeo_core::graph::lpg::PropertyStorage;
95/// use grafeo_common::types::{NodeId, PropertyKey};
96///
97/// let storage = PropertyStorage::new();
98/// let alice = NodeId::new(1);
99///
100/// storage.set(alice, PropertyKey::new("name"), "Alice".into());
101/// storage.set(alice, PropertyKey::new("age"), 30i64.into());
102///
103/// // Fetch all properties at once
104/// let props = storage.get_all(alice);
105/// assert_eq!(props.len(), 2);
106/// ```
107pub struct PropertyStorage<Id: EntityId = NodeId> {
108    /// Map from property key to column.
109    columns: RwLock<FxHashMap<PropertyKey, PropertyColumn<Id>>>,
110    /// Default compression mode for new columns.
111    default_compression: CompressionMode,
112    _marker: PhantomData<Id>,
113}
114
115impl<Id: EntityId> PropertyStorage<Id> {
116    /// Creates a new property storage.
117    #[must_use]
118    pub fn new() -> Self {
119        Self {
120            columns: RwLock::new(FxHashMap::default()),
121            default_compression: CompressionMode::None,
122            _marker: PhantomData,
123        }
124    }
125
126    /// Creates a new property storage with compression enabled.
127    #[must_use]
128    pub fn with_compression(mode: CompressionMode) -> Self {
129        Self {
130            columns: RwLock::new(FxHashMap::default()),
131            default_compression: mode,
132            _marker: PhantomData,
133        }
134    }
135
136    /// Sets the default compression mode for new columns.
137    pub fn set_default_compression(&mut self, mode: CompressionMode) {
138        self.default_compression = mode;
139    }
140
141    /// Sets a property value for an entity.
142    pub fn set(&self, id: Id, key: PropertyKey, value: Value) {
143        let mut columns = self.columns.write();
144        let mode = self.default_compression;
145        columns
146            .entry(key)
147            .or_insert_with(|| PropertyColumn::with_compression(mode))
148            .set(id, value);
149    }
150
151    /// Enables compression for a specific column.
152    pub fn enable_compression(&self, key: &PropertyKey, mode: CompressionMode) {
153        let mut columns = self.columns.write();
154        if let Some(col) = columns.get_mut(key) {
155            col.set_compression_mode(mode);
156        }
157    }
158
159    /// Compresses all columns that have compression enabled.
160    pub fn compress_all(&self) {
161        let mut columns = self.columns.write();
162        for col in columns.values_mut() {
163            if col.compression_mode() != CompressionMode::None {
164                col.compress();
165            }
166        }
167    }
168
169    /// Forces compression on all columns regardless of mode.
170    pub fn force_compress_all(&self) {
171        let mut columns = self.columns.write();
172        for col in columns.values_mut() {
173            col.force_compress();
174        }
175    }
176
177    /// Returns compression statistics for all columns.
178    #[must_use]
179    pub fn compression_stats(&self) -> FxHashMap<PropertyKey, CompressionStats> {
180        let columns = self.columns.read();
181        columns
182            .iter()
183            .map(|(key, col)| (key.clone(), col.compression_stats()))
184            .collect()
185    }
186
187    /// Returns the total memory usage of all columns.
188    #[must_use]
189    pub fn memory_usage(&self) -> usize {
190        let columns = self.columns.read();
191        columns
192            .values()
193            .map(|col| col.compression_stats().compressed_size)
194            .sum()
195    }
196
197    /// Gets a property value for an entity.
198    #[must_use]
199    pub fn get(&self, id: Id, key: &PropertyKey) -> Option<Value> {
200        let columns = self.columns.read();
201        columns.get(key).and_then(|col| col.get(id))
202    }
203
204    /// Removes a property value for an entity.
205    pub fn remove(&self, id: Id, key: &PropertyKey) -> Option<Value> {
206        let mut columns = self.columns.write();
207        columns.get_mut(key).and_then(|col| col.remove(id))
208    }
209
210    /// Removes all properties for an entity.
211    pub fn remove_all(&self, id: Id) {
212        let mut columns = self.columns.write();
213        for col in columns.values_mut() {
214            col.remove(id);
215        }
216    }
217
218    /// Gets all properties for an entity.
219    #[must_use]
220    pub fn get_all(&self, id: Id) -> FxHashMap<PropertyKey, Value> {
221        let columns = self.columns.read();
222        let mut result = FxHashMap::default();
223        for (key, col) in columns.iter() {
224            if let Some(value) = col.get(id) {
225                result.insert(key.clone(), value);
226            }
227        }
228        result
229    }
230
231    /// Gets property values for multiple entities in a single lock acquisition.
232    ///
233    /// More efficient than calling [`Self::get`] in a loop because it acquires
234    /// the read lock only once.
235    ///
236    /// # Example
237    ///
238    /// ```
239    /// use grafeo_core::graph::lpg::PropertyStorage;
240    /// use grafeo_common::types::{PropertyKey, Value};
241    /// use grafeo_common::NodeId;
242    ///
243    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
244    /// let key = PropertyKey::new("age");
245    /// let ids = vec![NodeId(1), NodeId(2), NodeId(3)];
246    /// let values = storage.get_batch(&ids, &key);
247    /// // values[i] is the property value for ids[i], or None if not set
248    /// ```
249    #[must_use]
250    pub fn get_batch(&self, ids: &[Id], key: &PropertyKey) -> Vec<Option<Value>> {
251        let columns = self.columns.read();
252        match columns.get(key) {
253            Some(col) => ids.iter().map(|&id| col.get(id)).collect(),
254            None => vec![None; ids.len()],
255        }
256    }
257
258    /// Gets all properties for multiple entities efficiently.
259    ///
260    /// More efficient than calling [`Self::get_all`] in a loop because it
261    /// acquires the read lock only once.
262    ///
263    /// # Example
264    ///
265    /// ```
266    /// use grafeo_core::graph::lpg::PropertyStorage;
267    /// use grafeo_common::types::{PropertyKey, Value};
268    /// use grafeo_common::NodeId;
269    ///
270    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
271    /// let ids = vec![NodeId(1), NodeId(2)];
272    /// let all_props = storage.get_all_batch(&ids);
273    /// // all_props[i] is a HashMap of all properties for ids[i]
274    /// ```
275    #[must_use]
276    pub fn get_all_batch(&self, ids: &[Id]) -> Vec<FxHashMap<PropertyKey, Value>> {
277        let columns = self.columns.read();
278        ids.iter()
279            .map(|&id| {
280                let mut result = FxHashMap::default();
281                for (key, col) in columns.iter() {
282                    if let Some(value) = col.get(id) {
283                        result.insert(key.clone(), value);
284                    }
285                }
286                result
287            })
288            .collect()
289    }
290
291    /// Returns the number of property columns.
292    #[must_use]
293    pub fn column_count(&self) -> usize {
294        self.columns.read().len()
295    }
296
297    /// Returns the keys of all columns.
298    #[must_use]
299    pub fn keys(&self) -> Vec<PropertyKey> {
300        self.columns.read().keys().cloned().collect()
301    }
302
303    /// Gets a column by key for bulk access.
304    #[must_use]
305    pub fn column(&self, key: &PropertyKey) -> Option<PropertyColumnRef<'_, Id>> {
306        let columns = self.columns.read();
307        if columns.contains_key(key) {
308            Some(PropertyColumnRef {
309                _guard: columns,
310                key: key.clone(),
311                _marker: PhantomData,
312            })
313        } else {
314            None
315        }
316    }
317
318    /// Checks if a predicate might match any values (using zone maps).
319    ///
320    /// Returns `false` only when we're *certain* no values match - for example,
321    /// if you're looking for age > 100 but the max age is 80. Returns `true`
322    /// if the property doesn't exist (conservative - might match).
323    #[must_use]
324    pub fn might_match(&self, key: &PropertyKey, op: CompareOp, value: &Value) -> bool {
325        let columns = self.columns.read();
326        columns
327            .get(key)
328            .map(|col| col.might_match(op, value))
329            .unwrap_or(true) // No column = assume might match (conservative)
330    }
331
332    /// Gets the zone map for a property column.
333    #[must_use]
334    pub fn zone_map(&self, key: &PropertyKey) -> Option<ZoneMapEntry> {
335        let columns = self.columns.read();
336        columns.get(key).map(|col| col.zone_map().clone())
337    }
338
339    /// Checks if a range predicate might match any values (using zone maps).
340    ///
341    /// Returns `false` only when we're *certain* no values match the range.
342    /// Returns `true` if the property doesn't exist (conservative - might match).
343    #[must_use]
344    pub fn might_match_range(
345        &self,
346        key: &PropertyKey,
347        min: Option<&Value>,
348        max: Option<&Value>,
349        min_inclusive: bool,
350        max_inclusive: bool,
351    ) -> bool {
352        let columns = self.columns.read();
353        columns
354            .get(key)
355            .map(|col| {
356                col.zone_map()
357                    .might_contain_range(min, max, min_inclusive, max_inclusive)
358            })
359            .unwrap_or(true) // No column = assume might match (conservative)
360    }
361
362    /// Rebuilds zone maps for all columns (call after bulk removes).
363    pub fn rebuild_zone_maps(&self) {
364        let mut columns = self.columns.write();
365        for col in columns.values_mut() {
366            col.rebuild_zone_map();
367        }
368    }
369}
370
371impl<Id: EntityId> Default for PropertyStorage<Id> {
372    fn default() -> Self {
373        Self::new()
374    }
375}
376
377/// Compressed storage for a property column.
378///
379/// Holds the compressed representation of values along with the index
380/// mapping entity IDs to positions in the compressed array.
381#[derive(Debug)]
382pub enum CompressedColumnData {
383    /// Compressed integers (Int64 values).
384    Integers {
385        /// Compressed data.
386        data: CompressedData,
387        /// Index: entity ID position -> compressed array index.
388        id_to_index: Vec<u64>,
389        /// Reverse index: compressed array index -> entity ID position.
390        index_to_id: Vec<u64>,
391    },
392    /// Dictionary-encoded strings.
393    Strings {
394        /// Dictionary encoding.
395        encoding: DictionaryEncoding,
396        /// Index: entity ID position -> dictionary index.
397        id_to_index: Vec<u64>,
398        /// Reverse index: dictionary index -> entity ID position.
399        index_to_id: Vec<u64>,
400    },
401    /// Compressed booleans.
402    Booleans {
403        /// Compressed data.
404        data: CompressedData,
405        /// Index: entity ID position -> bit index.
406        id_to_index: Vec<u64>,
407        /// Reverse index: bit index -> entity ID position.
408        index_to_id: Vec<u64>,
409    },
410}
411
412impl CompressedColumnData {
413    /// Returns the memory usage of the compressed data in bytes.
414    #[must_use]
415    pub fn memory_usage(&self) -> usize {
416        match self {
417            CompressedColumnData::Integers {
418                data,
419                id_to_index,
420                index_to_id,
421            } => {
422                data.data.len()
423                    + id_to_index.len() * std::mem::size_of::<u64>()
424                    + index_to_id.len() * std::mem::size_of::<u64>()
425            }
426            CompressedColumnData::Strings {
427                encoding,
428                id_to_index,
429                index_to_id,
430            } => {
431                encoding.codes().len() * std::mem::size_of::<u32>()
432                    + encoding.dictionary().iter().map(|s| s.len()).sum::<usize>()
433                    + id_to_index.len() * std::mem::size_of::<u64>()
434                    + index_to_id.len() * std::mem::size_of::<u64>()
435            }
436            CompressedColumnData::Booleans {
437                data,
438                id_to_index,
439                index_to_id,
440            } => {
441                data.data.len()
442                    + id_to_index.len() * std::mem::size_of::<u64>()
443                    + index_to_id.len() * std::mem::size_of::<u64>()
444            }
445        }
446    }
447
448    /// Returns the compression ratio.
449    #[must_use]
450    #[allow(dead_code)]
451    pub fn compression_ratio(&self) -> f64 {
452        match self {
453            CompressedColumnData::Integers { data, .. } => data.compression_ratio(),
454            CompressedColumnData::Strings { encoding, .. } => encoding.compression_ratio(),
455            CompressedColumnData::Booleans { data, .. } => data.compression_ratio(),
456        }
457    }
458}
459
460/// Statistics about column compression.
461#[derive(Debug, Clone, Default)]
462pub struct CompressionStats {
463    /// Size of uncompressed data in bytes.
464    pub uncompressed_size: usize,
465    /// Size of compressed data in bytes.
466    pub compressed_size: usize,
467    /// Number of values in the column.
468    pub value_count: usize,
469    /// Codec used for compression.
470    pub codec: Option<CompressionCodec>,
471}
472
473impl CompressionStats {
474    /// Returns the compression ratio (uncompressed / compressed).
475    #[must_use]
476    pub fn compression_ratio(&self) -> f64 {
477        if self.compressed_size == 0 {
478            return 1.0;
479        }
480        self.uncompressed_size as f64 / self.compressed_size as f64
481    }
482}
483
484/// A single property column (e.g., all "age" values).
485///
486/// Maintains min/max/null_count for fast predicate evaluation. When you
487/// filter on `age > 50`, we first check if any age could possibly match
488/// before scanning the actual values.
489///
490/// Columns support optional compression for large datasets. When compression
491/// is enabled, the column automatically selects the best codec based on the
492/// data type and characteristics.
493pub struct PropertyColumn<Id: EntityId = NodeId> {
494    /// Sparse storage: entity ID -> value (hot buffer + uncompressed).
495    /// Used for recent writes and when compression is disabled.
496    values: FxHashMap<Id, Value>,
497    /// Zone map tracking min/max/null_count for predicate pushdown.
498    zone_map: ZoneMapEntry,
499    /// Whether zone map needs rebuild (after removes).
500    zone_map_dirty: bool,
501    /// Compression mode for this column.
502    compression_mode: CompressionMode,
503    /// Compressed data (when compression is enabled and triggered).
504    compressed: Option<CompressedColumnData>,
505    /// Number of values before last compression.
506    compressed_count: usize,
507}
508
509impl<Id: EntityId> PropertyColumn<Id> {
510    /// Creates a new empty column.
511    #[must_use]
512    pub fn new() -> Self {
513        Self {
514            values: FxHashMap::default(),
515            zone_map: ZoneMapEntry::new(),
516            zone_map_dirty: false,
517            compression_mode: CompressionMode::None,
518            compressed: None,
519            compressed_count: 0,
520        }
521    }
522
523    /// Creates a new column with the specified compression mode.
524    #[must_use]
525    pub fn with_compression(mode: CompressionMode) -> Self {
526        Self {
527            values: FxHashMap::default(),
528            zone_map: ZoneMapEntry::new(),
529            zone_map_dirty: false,
530            compression_mode: mode,
531            compressed: None,
532            compressed_count: 0,
533        }
534    }
535
536    /// Sets the compression mode for this column.
537    pub fn set_compression_mode(&mut self, mode: CompressionMode) {
538        self.compression_mode = mode;
539        if mode == CompressionMode::None {
540            // Decompress if switching to no compression
541            if self.compressed.is_some() {
542                self.decompress_all();
543            }
544        }
545    }
546
547    /// Returns the compression mode for this column.
548    #[must_use]
549    pub fn compression_mode(&self) -> CompressionMode {
550        self.compression_mode
551    }
552
553    /// Sets a value for an entity.
554    pub fn set(&mut self, id: Id, value: Value) {
555        // Update zone map incrementally
556        self.update_zone_map_on_insert(&value);
557        self.values.insert(id, value);
558
559        // Check if we should compress (in Auto mode)
560        if self.compression_mode == CompressionMode::Auto {
561            let total_count = self.values.len() + self.compressed_count;
562            let hot_buffer_count = self.values.len();
563
564            // Compress when hot buffer exceeds threshold and total is large enough
565            if hot_buffer_count >= HOT_BUFFER_SIZE && total_count >= COMPRESSION_THRESHOLD {
566                self.compress();
567            }
568        }
569    }
570
571    /// Updates zone map when inserting a value.
572    fn update_zone_map_on_insert(&mut self, value: &Value) {
573        self.zone_map.row_count += 1;
574
575        if matches!(value, Value::Null) {
576            self.zone_map.null_count += 1;
577            return;
578        }
579
580        // Update min
581        match &self.zone_map.min {
582            None => self.zone_map.min = Some(value.clone()),
583            Some(current) => {
584                if compare_values(value, current) == Some(Ordering::Less) {
585                    self.zone_map.min = Some(value.clone());
586                }
587            }
588        }
589
590        // Update max
591        match &self.zone_map.max {
592            None => self.zone_map.max = Some(value.clone()),
593            Some(current) => {
594                if compare_values(value, current) == Some(Ordering::Greater) {
595                    self.zone_map.max = Some(value.clone());
596                }
597            }
598        }
599    }
600
601    /// Gets a value for an entity.
602    ///
603    /// First checks the hot buffer (uncompressed values), then falls back
604    /// to the compressed data if present.
605    #[must_use]
606    pub fn get(&self, id: Id) -> Option<Value> {
607        // First check hot buffer
608        if let Some(value) = self.values.get(&id) {
609            return Some(value.clone());
610        }
611
612        // For now, compressed data lookup is not implemented for sparse access
613        // because the compressed format stores values by index, not by entity ID.
614        // This would require maintaining an ID -> index map in CompressedColumnData.
615        // The compressed data is primarily useful for bulk/scan operations.
616        None
617    }
618
619    /// Removes a value for an entity.
620    pub fn remove(&mut self, id: Id) -> Option<Value> {
621        let removed = self.values.remove(&id);
622        if removed.is_some() {
623            // Mark zone map as dirty - would need full rebuild for accurate min/max
624            self.zone_map_dirty = true;
625        }
626        removed
627    }
628
629    /// Returns the number of values in this column (hot + compressed).
630    #[must_use]
631    #[allow(dead_code)]
632    pub fn len(&self) -> usize {
633        self.values.len() + self.compressed_count
634    }
635
636    /// Returns true if this column is empty.
637    #[must_use]
638    #[allow(dead_code)]
639    pub fn is_empty(&self) -> bool {
640        self.values.is_empty() && self.compressed_count == 0
641    }
642
643    /// Iterates over all (id, value) pairs in the hot buffer.
644    ///
645    /// Note: This only iterates over uncompressed values. For full iteration
646    /// including compressed values, use [`iter_all`].
647    #[allow(dead_code)]
648    pub fn iter(&self) -> impl Iterator<Item = (Id, &Value)> {
649        self.values.iter().map(|(&id, v)| (id, v))
650    }
651
652    /// Returns compression statistics for this column.
653    #[must_use]
654    pub fn compression_stats(&self) -> CompressionStats {
655        let hot_size = self.values.len() * std::mem::size_of::<Value>();
656        let compressed_size = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
657        let codec = match &self.compressed {
658            Some(CompressedColumnData::Integers { data, .. }) => Some(data.codec),
659            Some(CompressedColumnData::Strings { .. }) => Some(CompressionCodec::Dictionary),
660            Some(CompressedColumnData::Booleans { data, .. }) => Some(data.codec),
661            None => None,
662        };
663
664        CompressionStats {
665            uncompressed_size: hot_size + self.compressed_count * std::mem::size_of::<Value>(),
666            compressed_size: hot_size + compressed_size,
667            value_count: self.len(),
668            codec,
669        }
670    }
671
672    /// Returns whether the column has compressed data.
673    #[must_use]
674    #[allow(dead_code)]
675    pub fn is_compressed(&self) -> bool {
676        self.compressed.is_some()
677    }
678
679    /// Compresses the hot buffer values.
680    ///
681    /// This merges the hot buffer into the compressed data, selecting the
682    /// best codec based on the value types.
683    ///
684    /// Note: If compressed data already exists, this is a no-op to avoid
685    /// losing previously compressed values. Use `force_compress()` after
686    /// decompressing to re-compress with all values.
687    pub fn compress(&mut self) {
688        if self.values.is_empty() {
689            return;
690        }
691
692        // Don't re-compress if we already have compressed data
693        // (would need to decompress and merge first)
694        if self.compressed.is_some() {
695            return;
696        }
697
698        // Determine the dominant type
699        let (int_count, str_count, bool_count) = self.count_types();
700        let total = self.values.len();
701
702        if int_count > total / 2 {
703            self.compress_as_integers();
704        } else if str_count > total / 2 {
705            self.compress_as_strings();
706        } else if bool_count > total / 2 {
707            self.compress_as_booleans();
708        }
709        // If no dominant type, don't compress (mixed types don't compress well)
710    }
711
712    /// Counts values by type.
713    fn count_types(&self) -> (usize, usize, usize) {
714        let mut int_count = 0;
715        let mut str_count = 0;
716        let mut bool_count = 0;
717
718        for value in self.values.values() {
719            match value {
720                Value::Int64(_) => int_count += 1,
721                Value::String(_) => str_count += 1,
722                Value::Bool(_) => bool_count += 1,
723                _ => {}
724            }
725        }
726
727        (int_count, str_count, bool_count)
728    }
729
730    /// Compresses integer values.
731    #[allow(unsafe_code)]
732    fn compress_as_integers(&mut self) {
733        // Extract integer values and their IDs
734        let mut values: Vec<(u64, i64)> = Vec::new();
735        let mut non_int_values: FxHashMap<Id, Value> = FxHashMap::default();
736
737        for (&id, value) in &self.values {
738            match value {
739                Value::Int64(v) => {
740                    // Convert Id to u64 for indexing (assumes Id can be converted)
741                    let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
742                    values.push((id_u64, *v));
743                }
744                _ => {
745                    non_int_values.insert(id, value.clone());
746                }
747            }
748        }
749
750        if values.len() < 8 {
751            // Not worth compressing
752            return;
753        }
754
755        // Sort by ID for better compression
756        values.sort_by_key(|(id, _)| *id);
757
758        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
759        let index_to_id: Vec<u64> = id_to_index.clone();
760        let int_values: Vec<i64> = values.iter().map(|(_, v)| *v).collect();
761
762        // Compress using the optimal codec
763        let compressed = TypeSpecificCompressor::compress_signed_integers(&int_values);
764
765        // Only use compression if it actually saves space
766        if compressed.compression_ratio() > 1.2 {
767            self.compressed = Some(CompressedColumnData::Integers {
768                data: compressed,
769                id_to_index,
770                index_to_id,
771            });
772            self.compressed_count = values.len();
773            self.values = non_int_values;
774        }
775    }
776
777    /// Compresses string values using dictionary encoding.
778    #[allow(unsafe_code)]
779    fn compress_as_strings(&mut self) {
780        let mut values: Vec<(u64, Arc<str>)> = Vec::new();
781        let mut non_str_values: FxHashMap<Id, Value> = FxHashMap::default();
782
783        for (&id, value) in &self.values {
784            match value {
785                Value::String(s) => {
786                    let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
787                    values.push((id_u64, Arc::clone(s)));
788                }
789                _ => {
790                    non_str_values.insert(id, value.clone());
791                }
792            }
793        }
794
795        if values.len() < 8 {
796            return;
797        }
798
799        // Sort by ID
800        values.sort_by_key(|(id, _)| *id);
801
802        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
803        let index_to_id: Vec<u64> = id_to_index.clone();
804
805        // Build dictionary
806        let mut builder = DictionaryBuilder::new();
807        for (_, s) in &values {
808            builder.add(s.as_ref());
809        }
810        let encoding = builder.build();
811
812        // Only use compression if it actually saves space
813        if encoding.compression_ratio() > 1.2 {
814            self.compressed = Some(CompressedColumnData::Strings {
815                encoding,
816                id_to_index,
817                index_to_id,
818            });
819            self.compressed_count = values.len();
820            self.values = non_str_values;
821        }
822    }
823
824    /// Compresses boolean values.
825    #[allow(unsafe_code)]
826    fn compress_as_booleans(&mut self) {
827        let mut values: Vec<(u64, bool)> = Vec::new();
828        let mut non_bool_values: FxHashMap<Id, Value> = FxHashMap::default();
829
830        for (&id, value) in &self.values {
831            match value {
832                Value::Bool(b) => {
833                    let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
834                    values.push((id_u64, *b));
835                }
836                _ => {
837                    non_bool_values.insert(id, value.clone());
838                }
839            }
840        }
841
842        if values.len() < 8 {
843            return;
844        }
845
846        // Sort by ID
847        values.sort_by_key(|(id, _)| *id);
848
849        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
850        let index_to_id: Vec<u64> = id_to_index.clone();
851        let bool_values: Vec<bool> = values.iter().map(|(_, v)| *v).collect();
852
853        let compressed = TypeSpecificCompressor::compress_booleans(&bool_values);
854
855        // Booleans always compress well (8x)
856        self.compressed = Some(CompressedColumnData::Booleans {
857            data: compressed,
858            id_to_index,
859            index_to_id,
860        });
861        self.compressed_count = values.len();
862        self.values = non_bool_values;
863    }
864
865    /// Decompresses all values back to the hot buffer.
866    #[allow(unsafe_code)]
867    fn decompress_all(&mut self) {
868        let Some(compressed) = self.compressed.take() else {
869            return;
870        };
871
872        match compressed {
873            CompressedColumnData::Integers {
874                data, index_to_id, ..
875            } => {
876                if let Ok(values) = TypeSpecificCompressor::decompress_integers(&data) {
877                    // Convert back to signed using zigzag decoding
878                    let signed: Vec<i64> = values
879                        .iter()
880                        .map(|&v| crate::storage::zigzag_decode(v))
881                        .collect();
882
883                    for (i, id_u64) in index_to_id.iter().enumerate() {
884                        if let Some(&value) = signed.get(i) {
885                            let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
886                            self.values.insert(id, Value::Int64(value));
887                        }
888                    }
889                }
890            }
891            CompressedColumnData::Strings {
892                encoding,
893                index_to_id,
894                ..
895            } => {
896                for (i, id_u64) in index_to_id.iter().enumerate() {
897                    if let Some(s) = encoding.get(i) {
898                        let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
899                        self.values.insert(id, Value::String(Arc::from(s)));
900                    }
901                }
902            }
903            CompressedColumnData::Booleans {
904                data, index_to_id, ..
905            } => {
906                if let Ok(values) = TypeSpecificCompressor::decompress_booleans(&data) {
907                    for (i, id_u64) in index_to_id.iter().enumerate() {
908                        if let Some(&value) = values.get(i) {
909                            let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
910                            self.values.insert(id, Value::Bool(value));
911                        }
912                    }
913                }
914            }
915        }
916
917        self.compressed_count = 0;
918    }
919
920    /// Forces compression regardless of thresholds.
921    ///
922    /// Useful for bulk loading or when you know the column is complete.
923    pub fn force_compress(&mut self) {
924        self.compress();
925    }
926
927    /// Returns the zone map for this column.
928    #[must_use]
929    pub fn zone_map(&self) -> &ZoneMapEntry {
930        &self.zone_map
931    }
932
933    /// Uses zone map to check if any values could satisfy the predicate.
934    ///
935    /// Returns `false` when we can prove no values match (so the column
936    /// can be skipped entirely). Returns `true` if values might match.
937    #[must_use]
938    pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
939        if self.zone_map_dirty {
940            // Conservative: can't skip if zone map is stale
941            return true;
942        }
943
944        match op {
945            CompareOp::Eq => self.zone_map.might_contain_equal(value),
946            CompareOp::Ne => {
947                // Can only skip if all values are equal to the value
948                // (which means min == max == value)
949                match (&self.zone_map.min, &self.zone_map.max) {
950                    (Some(min), Some(max)) => {
951                        !(compare_values(min, value) == Some(Ordering::Equal)
952                            && compare_values(max, value) == Some(Ordering::Equal))
953                    }
954                    _ => true,
955                }
956            }
957            CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
958            CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
959            CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
960            CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
961        }
962    }
963
964    /// Rebuilds zone map from current values.
965    pub fn rebuild_zone_map(&mut self) {
966        let mut zone_map = ZoneMapEntry::new();
967
968        for value in self.values.values() {
969            zone_map.row_count += 1;
970
971            if matches!(value, Value::Null) {
972                zone_map.null_count += 1;
973                continue;
974            }
975
976            // Update min
977            match &zone_map.min {
978                None => zone_map.min = Some(value.clone()),
979                Some(current) => {
980                    if compare_values(value, current) == Some(Ordering::Less) {
981                        zone_map.min = Some(value.clone());
982                    }
983                }
984            }
985
986            // Update max
987            match &zone_map.max {
988                None => zone_map.max = Some(value.clone()),
989                Some(current) => {
990                    if compare_values(value, current) == Some(Ordering::Greater) {
991                        zone_map.max = Some(value.clone());
992                    }
993                }
994            }
995        }
996
997        self.zone_map = zone_map;
998        self.zone_map_dirty = false;
999    }
1000}
1001
1002/// Compares two values for ordering.
1003fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
1004    match (a, b) {
1005        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
1006        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
1007        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
1008        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
1009        (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
1010        (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
1011        _ => None,
1012    }
1013}
1014
1015impl<Id: EntityId> Default for PropertyColumn<Id> {
1016    fn default() -> Self {
1017        Self::new()
1018    }
1019}
1020
1021/// A borrowed reference to a property column for bulk reads.
1022///
1023/// Holds the read lock so the column can't change while you're iterating.
1024pub struct PropertyColumnRef<'a, Id: EntityId = NodeId> {
1025    _guard: parking_lot::RwLockReadGuard<'a, FxHashMap<PropertyKey, PropertyColumn<Id>>>,
1026    #[allow(dead_code)]
1027    key: PropertyKey,
1028    _marker: PhantomData<Id>,
1029}
1030
1031#[cfg(test)]
1032mod tests {
1033    use super::*;
1034
1035    #[test]
1036    fn test_property_storage_basic() {
1037        let storage = PropertyStorage::new();
1038
1039        let node1 = NodeId::new(1);
1040        let node2 = NodeId::new(2);
1041        let name_key = PropertyKey::new("name");
1042        let age_key = PropertyKey::new("age");
1043
1044        storage.set(node1, name_key.clone(), "Alice".into());
1045        storage.set(node1, age_key.clone(), 30i64.into());
1046        storage.set(node2, name_key.clone(), "Bob".into());
1047
1048        assert_eq!(
1049            storage.get(node1, &name_key),
1050            Some(Value::String("Alice".into()))
1051        );
1052        assert_eq!(storage.get(node1, &age_key), Some(Value::Int64(30)));
1053        assert_eq!(
1054            storage.get(node2, &name_key),
1055            Some(Value::String("Bob".into()))
1056        );
1057        assert!(storage.get(node2, &age_key).is_none());
1058    }
1059
1060    #[test]
1061    fn test_property_storage_remove() {
1062        let storage = PropertyStorage::new();
1063
1064        let node = NodeId::new(1);
1065        let key = PropertyKey::new("name");
1066
1067        storage.set(node, key.clone(), "Alice".into());
1068        assert!(storage.get(node, &key).is_some());
1069
1070        let removed = storage.remove(node, &key);
1071        assert!(removed.is_some());
1072        assert!(storage.get(node, &key).is_none());
1073    }
1074
1075    #[test]
1076    fn test_property_storage_get_all() {
1077        let storage = PropertyStorage::new();
1078
1079        let node = NodeId::new(1);
1080        storage.set(node, PropertyKey::new("name"), "Alice".into());
1081        storage.set(node, PropertyKey::new("age"), 30i64.into());
1082        storage.set(node, PropertyKey::new("active"), true.into());
1083
1084        let props = storage.get_all(node);
1085        assert_eq!(props.len(), 3);
1086    }
1087
1088    #[test]
1089    fn test_property_storage_remove_all() {
1090        let storage = PropertyStorage::new();
1091
1092        let node = NodeId::new(1);
1093        storage.set(node, PropertyKey::new("name"), "Alice".into());
1094        storage.set(node, PropertyKey::new("age"), 30i64.into());
1095
1096        storage.remove_all(node);
1097
1098        assert!(storage.get(node, &PropertyKey::new("name")).is_none());
1099        assert!(storage.get(node, &PropertyKey::new("age")).is_none());
1100    }
1101
1102    #[test]
1103    fn test_property_column() {
1104        let mut col = PropertyColumn::new();
1105
1106        col.set(NodeId::new(1), "Alice".into());
1107        col.set(NodeId::new(2), "Bob".into());
1108
1109        assert_eq!(col.len(), 2);
1110        assert!(!col.is_empty());
1111
1112        assert_eq!(col.get(NodeId::new(1)), Some(Value::String("Alice".into())));
1113
1114        col.remove(NodeId::new(1));
1115        assert!(col.get(NodeId::new(1)).is_none());
1116        assert_eq!(col.len(), 1);
1117    }
1118
1119    #[test]
1120    fn test_compression_mode() {
1121        let col: PropertyColumn<NodeId> = PropertyColumn::new();
1122        assert_eq!(col.compression_mode(), CompressionMode::None);
1123
1124        let col: PropertyColumn<NodeId> = PropertyColumn::with_compression(CompressionMode::Auto);
1125        assert_eq!(col.compression_mode(), CompressionMode::Auto);
1126    }
1127
1128    #[test]
1129    fn test_property_storage_with_compression() {
1130        let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1131
1132        for i in 0..100 {
1133            storage.set(
1134                NodeId::new(i),
1135                PropertyKey::new("age"),
1136                Value::Int64(20 + (i as i64 % 50)),
1137            );
1138        }
1139
1140        // Values should still be readable
1141        assert_eq!(
1142            storage.get(NodeId::new(0), &PropertyKey::new("age")),
1143            Some(Value::Int64(20))
1144        );
1145        assert_eq!(
1146            storage.get(NodeId::new(50), &PropertyKey::new("age")),
1147            Some(Value::Int64(20))
1148        );
1149    }
1150
1151    #[test]
1152    fn test_compress_integer_column() {
1153        let mut col: PropertyColumn<NodeId> =
1154            PropertyColumn::with_compression(CompressionMode::Auto);
1155
1156        // Add many sequential integers
1157        for i in 0..2000 {
1158            col.set(NodeId::new(i), Value::Int64(1000 + i as i64));
1159        }
1160
1161        // Should have triggered compression at some point
1162        // Total count should include both compressed and hot buffer values
1163        let stats = col.compression_stats();
1164        assert_eq!(stats.value_count, 2000);
1165
1166        // Values from the hot buffer should be readable
1167        // Note: Compressed values are not accessible via get() - see design note
1168        let last_value = col.get(NodeId::new(1999));
1169        assert!(last_value.is_some() || col.is_compressed());
1170    }
1171
1172    #[test]
1173    fn test_compress_string_column() {
1174        let mut col: PropertyColumn<NodeId> =
1175            PropertyColumn::with_compression(CompressionMode::Auto);
1176
1177        // Add repeated strings (good for dictionary compression)
1178        let categories = ["Person", "Company", "Product", "Location"];
1179        for i in 0..2000 {
1180            let cat = categories[i % 4];
1181            col.set(NodeId::new(i as u64), Value::String(Arc::from(cat)));
1182        }
1183
1184        // Total count should be correct
1185        assert_eq!(col.len(), 2000);
1186
1187        // Late values should be in hot buffer and readable
1188        let last_value = col.get(NodeId::new(1999));
1189        assert!(last_value.is_some() || col.is_compressed());
1190    }
1191
1192    #[test]
1193    fn test_compress_boolean_column() {
1194        let mut col: PropertyColumn<NodeId> =
1195            PropertyColumn::with_compression(CompressionMode::Auto);
1196
1197        // Add booleans
1198        for i in 0..2000 {
1199            col.set(NodeId::new(i as u64), Value::Bool(i % 2 == 0));
1200        }
1201
1202        // Verify total count
1203        assert_eq!(col.len(), 2000);
1204
1205        // Late values should be readable
1206        let last_value = col.get(NodeId::new(1999));
1207        assert!(last_value.is_some() || col.is_compressed());
1208    }
1209
1210    #[test]
1211    fn test_force_compress() {
1212        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1213
1214        // Add fewer values than the threshold
1215        for i in 0..100 {
1216            col.set(NodeId::new(i), Value::Int64(i as i64));
1217        }
1218
1219        // Force compression
1220        col.force_compress();
1221
1222        // Stats should show compression was applied if beneficial
1223        let stats = col.compression_stats();
1224        assert_eq!(stats.value_count, 100);
1225    }
1226
1227    #[test]
1228    fn test_compression_stats() {
1229        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1230
1231        for i in 0..50 {
1232            col.set(NodeId::new(i), Value::Int64(i as i64));
1233        }
1234
1235        let stats = col.compression_stats();
1236        assert_eq!(stats.value_count, 50);
1237        assert!(stats.uncompressed_size > 0);
1238    }
1239
1240    #[test]
1241    fn test_storage_compression_stats() {
1242        let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1243
1244        for i in 0..100 {
1245            storage.set(
1246                NodeId::new(i),
1247                PropertyKey::new("age"),
1248                Value::Int64(i as i64),
1249            );
1250            storage.set(
1251                NodeId::new(i),
1252                PropertyKey::new("name"),
1253                Value::String(Arc::from("Alice")),
1254            );
1255        }
1256
1257        let stats = storage.compression_stats();
1258        assert_eq!(stats.len(), 2); // Two columns
1259        assert!(stats.contains_key(&PropertyKey::new("age")));
1260        assert!(stats.contains_key(&PropertyKey::new("name")));
1261    }
1262
1263    #[test]
1264    fn test_memory_usage() {
1265        let storage = PropertyStorage::new();
1266
1267        for i in 0..100 {
1268            storage.set(
1269                NodeId::new(i),
1270                PropertyKey::new("value"),
1271                Value::Int64(i as i64),
1272            );
1273        }
1274
1275        let usage = storage.memory_usage();
1276        assert!(usage > 0);
1277    }
1278
1279    #[test]
1280    fn test_get_batch_single_property() {
1281        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1282
1283        let node1 = NodeId::new(1);
1284        let node2 = NodeId::new(2);
1285        let node3 = NodeId::new(3);
1286        let age_key = PropertyKey::new("age");
1287
1288        storage.set(node1, age_key.clone(), 25i64.into());
1289        storage.set(node2, age_key.clone(), 30i64.into());
1290        // node3 has no age property
1291
1292        let ids = vec![node1, node2, node3];
1293        let values = storage.get_batch(&ids, &age_key);
1294
1295        assert_eq!(values.len(), 3);
1296        assert_eq!(values[0], Some(Value::Int64(25)));
1297        assert_eq!(values[1], Some(Value::Int64(30)));
1298        assert_eq!(values[2], None);
1299    }
1300
1301    #[test]
1302    fn test_get_batch_missing_column() {
1303        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1304
1305        let node1 = NodeId::new(1);
1306        let node2 = NodeId::new(2);
1307        let missing_key = PropertyKey::new("nonexistent");
1308
1309        let ids = vec![node1, node2];
1310        let values = storage.get_batch(&ids, &missing_key);
1311
1312        assert_eq!(values.len(), 2);
1313        assert_eq!(values[0], None);
1314        assert_eq!(values[1], None);
1315    }
1316
1317    #[test]
1318    fn test_get_batch_empty_ids() {
1319        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1320        let key = PropertyKey::new("any");
1321
1322        let values = storage.get_batch(&[], &key);
1323        assert!(values.is_empty());
1324    }
1325
1326    #[test]
1327    fn test_get_all_batch() {
1328        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1329
1330        let node1 = NodeId::new(1);
1331        let node2 = NodeId::new(2);
1332        let node3 = NodeId::new(3);
1333
1334        storage.set(node1, PropertyKey::new("name"), "Alice".into());
1335        storage.set(node1, PropertyKey::new("age"), 25i64.into());
1336        storage.set(node2, PropertyKey::new("name"), "Bob".into());
1337        // node3 has no properties
1338
1339        let ids = vec![node1, node2, node3];
1340        let all_props = storage.get_all_batch(&ids);
1341
1342        assert_eq!(all_props.len(), 3);
1343        assert_eq!(all_props[0].len(), 2); // name and age
1344        assert_eq!(all_props[1].len(), 1); // name only
1345        assert_eq!(all_props[2].len(), 0); // no properties
1346
1347        assert_eq!(
1348            all_props[0].get(&PropertyKey::new("name")),
1349            Some(&Value::String("Alice".into()))
1350        );
1351        assert_eq!(
1352            all_props[1].get(&PropertyKey::new("name")),
1353            Some(&Value::String("Bob".into()))
1354        );
1355    }
1356
1357    #[test]
1358    fn test_get_all_batch_empty_ids() {
1359        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1360
1361        let all_props = storage.get_all_batch(&[]);
1362        assert!(all_props.is_empty());
1363    }
1364}