Skip to main content

grafeo_core/graph/lpg/
property.rs

1//! Columnar property storage for nodes and edges.
2//!
3//! Properties are stored column-wise (all "name" values together, all "age"
4//! values together) rather than row-wise. This makes filtering fast - to find
5//! all nodes where age > 30, we only scan the age column.
6//!
7//! Each column also maintains a zone map (min/max/null_count) enabling the
8//! query optimizer to skip columns entirely when a predicate can't match.
9//!
10//! ## Compression
11//!
12//! Columns can be compressed to save memory. When compression is enabled,
13//! the column automatically selects the best codec based on the data type:
14//!
15//! | Data type | Codec | Typical savings |
16//! |-----------|-------|-----------------|
17//! | Int64 (sorted) | DeltaBitPacked | 5-20x |
18//! | Int64 (small) | BitPacked | 2-16x |
19//! | Int64 (repeated) | RunLength | 2-100x |
20//! | String (low cardinality) | Dictionary | 2-50x |
21//! | Bool | BitVector | 8x |
22
23use crate::index::zone_map::ZoneMapEntry;
24use crate::storage::{
25    CompressedData, CompressionCodec, DictionaryBuilder, DictionaryEncoding, TypeSpecificCompressor,
26};
27use arcstr::ArcStr;
28use grafeo_common::types::{EdgeId, NodeId, PropertyKey, Value};
29use grafeo_common::utils::hash::FxHashMap;
30use parking_lot::RwLock;
31use std::cmp::Ordering;
32use std::hash::Hash;
33use std::marker::PhantomData;
34
35/// Compression mode for property columns.
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
37pub enum CompressionMode {
38    /// Never compress - always use sparse HashMap (default).
39    #[default]
40    None,
41    /// Automatically compress when beneficial (after threshold).
42    Auto,
43    /// Eagerly compress on every flush.
44    Eager,
45}
46
47/// Threshold for automatic compression (number of values).
48const COMPRESSION_THRESHOLD: usize = 1000;
49
50/// Size of the hot buffer for recent writes (before compression).
51/// Larger buffer (4096) keeps more recent data uncompressed for faster reads.
52/// This trades ~64KB of memory overhead per column for 1.5-2x faster point lookups
53/// on recently-written data.
54const HOT_BUFFER_SIZE: usize = 4096;
55
56/// Comparison operators used for zone map predicate checks.
57///
58/// These map directly to GQL comparison operators like `=`, `<`, `>=`.
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum CompareOp {
61    /// Equal to value.
62    Eq,
63    /// Not equal to value.
64    Ne,
65    /// Less than value.
66    Lt,
67    /// Less than or equal to value.
68    Le,
69    /// Greater than value.
70    Gt,
71    /// Greater than or equal to value.
72    Ge,
73}
74
75/// Trait for IDs that can key into property storage.
76///
77/// Implemented for [`NodeId`] and [`EdgeId`] - you can store properties on both.
78/// Provides safe conversions to/from `u64` for compression, replacing unsafe transmute.
79pub trait EntityId: Copy + Eq + Hash + 'static {
80    /// Returns the raw `u64` value.
81    fn as_u64(self) -> u64;
82    /// Creates an ID from a raw `u64` value.
83    fn from_u64(v: u64) -> Self;
84}
85
86impl EntityId for NodeId {
87    #[inline]
88    fn as_u64(self) -> u64 {
89        self.0
90    }
91    #[inline]
92    fn from_u64(v: u64) -> Self {
93        Self(v)
94    }
95}
96
97impl EntityId for EdgeId {
98    #[inline]
99    fn as_u64(self) -> u64 {
100        self.0
101    }
102    #[inline]
103    fn from_u64(v: u64) -> Self {
104        Self(v)
105    }
106}
107
108/// Thread-safe columnar property storage.
109///
110/// Each property key ("name", "age", etc.) gets its own column. This layout
111/// is great for analytical queries that filter on specific properties -
112/// you only touch the columns you need.
113///
114/// Generic over `Id` so the same storage works for nodes and edges.
115///
116/// # Example
117///
118/// ```
119/// use grafeo_core::graph::lpg::PropertyStorage;
120/// use grafeo_common::types::{NodeId, PropertyKey};
121///
122/// let storage = PropertyStorage::new();
123/// let alice = NodeId::new(1);
124///
125/// storage.set(alice, PropertyKey::new("name"), "Alice".into());
126/// storage.set(alice, PropertyKey::new("age"), 30i64.into());
127///
128/// // Fetch all properties at once
129/// let props = storage.get_all(alice);
130/// assert_eq!(props.len(), 2);
131/// ```
132pub struct PropertyStorage<Id: EntityId = NodeId> {
133    /// Map from property key to column.
134    /// Lock order: 9 (nested, acquired via LpgStore::node_properties/edge_properties)
135    columns: RwLock<FxHashMap<PropertyKey, PropertyColumn<Id>>>,
136    /// Default compression mode for new columns.
137    default_compression: CompressionMode,
138    _marker: PhantomData<Id>,
139}
140
141impl<Id: EntityId> PropertyStorage<Id> {
142    /// Creates a new property storage.
143    #[must_use]
144    pub fn new() -> Self {
145        Self {
146            columns: RwLock::new(FxHashMap::default()),
147            default_compression: CompressionMode::None,
148            _marker: PhantomData,
149        }
150    }
151
152    /// Creates a new property storage with compression enabled.
153    #[must_use]
154    pub fn with_compression(mode: CompressionMode) -> Self {
155        Self {
156            columns: RwLock::new(FxHashMap::default()),
157            default_compression: mode,
158            _marker: PhantomData,
159        }
160    }
161
162    /// Sets the default compression mode for new columns.
163    pub fn set_default_compression(&mut self, mode: CompressionMode) {
164        self.default_compression = mode;
165    }
166
167    /// Sets a property value for an entity.
168    pub fn set(&self, id: Id, key: PropertyKey, value: Value) {
169        let mut columns = self.columns.write();
170        let mode = self.default_compression;
171        columns
172            .entry(key)
173            .or_insert_with(|| PropertyColumn::with_compression(mode))
174            .set(id, value);
175    }
176
177    /// Enables compression for a specific column.
178    pub fn enable_compression(&self, key: &PropertyKey, mode: CompressionMode) {
179        let mut columns = self.columns.write();
180        if let Some(col) = columns.get_mut(key) {
181            col.set_compression_mode(mode);
182        }
183    }
184
185    /// Compresses all columns that have compression enabled.
186    pub fn compress_all(&self) {
187        let mut columns = self.columns.write();
188        for col in columns.values_mut() {
189            if col.compression_mode() != CompressionMode::None {
190                col.compress();
191            }
192        }
193    }
194
195    /// Forces compression on all columns regardless of mode.
196    pub fn force_compress_all(&self) {
197        let mut columns = self.columns.write();
198        for col in columns.values_mut() {
199            col.force_compress();
200        }
201    }
202
203    /// Returns compression statistics for all columns.
204    #[must_use]
205    pub fn compression_stats(&self) -> FxHashMap<PropertyKey, CompressionStats> {
206        let columns = self.columns.read();
207        columns
208            .iter()
209            .map(|(key, col)| (key.clone(), col.compression_stats()))
210            .collect()
211    }
212
213    /// Returns the total memory usage of all columns.
214    #[must_use]
215    pub fn memory_usage(&self) -> usize {
216        let columns = self.columns.read();
217        columns
218            .values()
219            .map(|col| col.compression_stats().compressed_size)
220            .sum()
221    }
222
223    /// Gets a property value for an entity.
224    #[must_use]
225    pub fn get(&self, id: Id, key: &PropertyKey) -> Option<Value> {
226        let columns = self.columns.read();
227        columns.get(key).and_then(|col| col.get(id))
228    }
229
230    /// Removes a property value for an entity.
231    pub fn remove(&self, id: Id, key: &PropertyKey) -> Option<Value> {
232        let mut columns = self.columns.write();
233        columns.get_mut(key).and_then(|col| col.remove(id))
234    }
235
236    /// Removes all properties for an entity.
237    pub fn remove_all(&self, id: Id) {
238        let mut columns = self.columns.write();
239        for col in columns.values_mut() {
240            col.remove(id);
241        }
242    }
243
244    /// Gets all properties for an entity.
245    #[must_use]
246    pub fn get_all(&self, id: Id) -> FxHashMap<PropertyKey, Value> {
247        let columns = self.columns.read();
248        let mut result = FxHashMap::default();
249        for (key, col) in columns.iter() {
250            if let Some(value) = col.get(id) {
251                result.insert(key.clone(), value);
252            }
253        }
254        result
255    }
256
257    /// Gets property values for multiple entities in a single lock acquisition.
258    ///
259    /// More efficient than calling [`Self::get`] in a loop because it acquires
260    /// the read lock only once.
261    ///
262    /// # Example
263    ///
264    /// ```
265    /// use grafeo_core::graph::lpg::PropertyStorage;
266    /// use grafeo_common::types::{PropertyKey, Value};
267    /// use grafeo_common::NodeId;
268    ///
269    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
270    /// let key = PropertyKey::new("age");
271    /// let ids = vec![NodeId(1), NodeId(2), NodeId(3)];
272    /// let values = storage.get_batch(&ids, &key);
273    /// // values[i] is the property value for ids[i], or None if not set
274    /// ```
275    #[must_use]
276    pub fn get_batch(&self, ids: &[Id], key: &PropertyKey) -> Vec<Option<Value>> {
277        let columns = self.columns.read();
278        match columns.get(key) {
279            Some(col) => ids.iter().map(|&id| col.get(id)).collect(),
280            None => vec![None; ids.len()],
281        }
282    }
283
284    /// Gets all properties for multiple entities efficiently.
285    ///
286    /// More efficient than calling [`Self::get_all`] in a loop because it
287    /// acquires the read lock only once.
288    ///
289    /// # Example
290    ///
291    /// ```
292    /// use grafeo_core::graph::lpg::PropertyStorage;
293    /// use grafeo_common::types::{PropertyKey, Value};
294    /// use grafeo_common::NodeId;
295    ///
296    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
297    /// let ids = vec![NodeId(1), NodeId(2)];
298    /// let all_props = storage.get_all_batch(&ids);
299    /// // all_props[i] is a HashMap of all properties for ids[i]
300    /// ```
301    #[must_use]
302    pub fn get_all_batch(&self, ids: &[Id]) -> Vec<FxHashMap<PropertyKey, Value>> {
303        let columns = self.columns.read();
304        let column_count = columns.len();
305
306        // Pre-allocate result vector with exact capacity (NebulaGraph pattern)
307        let mut results = Vec::with_capacity(ids.len());
308
309        for &id in ids {
310            // Pre-allocate HashMap with expected column count
311            let mut result = FxHashMap::with_capacity_and_hasher(column_count, Default::default());
312            for (key, col) in columns.iter() {
313                if let Some(value) = col.get(id) {
314                    result.insert(key.clone(), value);
315                }
316            }
317            results.push(result);
318        }
319
320        results
321    }
322
323    /// Gets selected properties for multiple entities efficiently (projection pushdown).
324    ///
325    /// This is more efficient than [`Self::get_all_batch`] when you only need a subset
326    /// of properties - it only iterates the requested columns instead of all columns.
327    ///
328    /// **Performance**: O(N × K) where N = ids.len() and K = keys.len(),
329    /// compared to O(N × C) for `get_all_batch` where C = total column count.
330    ///
331    /// # Example
332    ///
333    /// ```
334    /// use grafeo_core::graph::lpg::PropertyStorage;
335    /// use grafeo_common::types::{PropertyKey, Value};
336    /// use grafeo_common::NodeId;
337    ///
338    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
339    /// let ids = vec![NodeId::new(1), NodeId::new(2)];
340    /// let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
341    ///
342    /// // Only fetches "name" and "age" columns, ignoring other properties
343    /// let props = storage.get_selective_batch(&ids, &keys);
344    /// ```
345    #[must_use]
346    pub fn get_selective_batch(
347        &self,
348        ids: &[Id],
349        keys: &[PropertyKey],
350    ) -> Vec<FxHashMap<PropertyKey, Value>> {
351        if keys.is_empty() {
352            // No properties requested - return empty maps
353            return vec![FxHashMap::default(); ids.len()];
354        }
355
356        let columns = self.columns.read();
357
358        // Pre-collect only the columns we need (avoids re-lookup per id)
359        let requested_columns: Vec<_> = keys
360            .iter()
361            .filter_map(|key| columns.get(key).map(|col| (key, col)))
362            .collect();
363
364        // Pre-allocate result with exact capacity
365        let mut results = Vec::with_capacity(ids.len());
366
367        for &id in ids {
368            let mut result =
369                FxHashMap::with_capacity_and_hasher(requested_columns.len(), Default::default());
370            // Only iterate requested columns, not all columns
371            for (key, col) in &requested_columns {
372                if let Some(value) = col.get(id) {
373                    result.insert((*key).clone(), value);
374                }
375            }
376            results.push(result);
377        }
378
379        results
380    }
381
382    /// Returns the number of property columns.
383    #[must_use]
384    pub fn column_count(&self) -> usize {
385        self.columns.read().len()
386    }
387
388    /// Returns the keys of all columns.
389    #[must_use]
390    pub fn keys(&self) -> Vec<PropertyKey> {
391        self.columns.read().keys().cloned().collect()
392    }
393
394    /// Removes all property data.
395    pub fn clear(&self) {
396        self.columns.write().clear();
397    }
398
399    /// Gets a column by key for bulk access.
400    #[must_use]
401    pub fn column(&self, key: &PropertyKey) -> Option<PropertyColumnRef<'_, Id>> {
402        let columns = self.columns.read();
403        if columns.contains_key(key) {
404            Some(PropertyColumnRef {
405                _guard: columns,
406                _key: key.clone(),
407                _marker: PhantomData,
408            })
409        } else {
410            None
411        }
412    }
413
414    /// Checks if a predicate might match any values (using zone maps).
415    ///
416    /// Returns `false` only when we're *certain* no values match - for example,
417    /// if you're looking for age > 100 but the max age is 80. Returns `true`
418    /// if the property doesn't exist (conservative - might match).
419    #[must_use]
420    pub fn might_match(&self, key: &PropertyKey, op: CompareOp, value: &Value) -> bool {
421        let columns = self.columns.read();
422        columns
423            .get(key)
424            .map_or(true, |col| col.might_match(op, value)) // No column = assume might match (conservative)
425    }
426
427    /// Gets the zone map for a property column.
428    #[must_use]
429    pub fn zone_map(&self, key: &PropertyKey) -> Option<ZoneMapEntry> {
430        let columns = self.columns.read();
431        columns.get(key).map(|col| col.zone_map().clone())
432    }
433
434    /// Checks if a range predicate might match any values (using zone maps).
435    ///
436    /// Returns `false` only when we're *certain* no values match the range.
437    /// Returns `true` if the property doesn't exist (conservative - might match).
438    #[must_use]
439    pub fn might_match_range(
440        &self,
441        key: &PropertyKey,
442        min: Option<&Value>,
443        max: Option<&Value>,
444        min_inclusive: bool,
445        max_inclusive: bool,
446    ) -> bool {
447        let columns = self.columns.read();
448        columns.get(key).map_or(true, |col| {
449            col.zone_map()
450                .might_contain_range(min, max, min_inclusive, max_inclusive)
451        }) // No column = assume might match (conservative)
452    }
453
454    /// Rebuilds zone maps for all columns (call after bulk removes).
455    pub fn rebuild_zone_maps(&self) {
456        let mut columns = self.columns.write();
457        for col in columns.values_mut() {
458            col.rebuild_zone_map();
459        }
460    }
461}
462
463impl<Id: EntityId> Default for PropertyStorage<Id> {
464    fn default() -> Self {
465        Self::new()
466    }
467}
468
469/// Compressed storage for a property column.
470///
471/// Holds the compressed representation of values along with the index
472/// mapping entity IDs to positions in the compressed array.
473#[derive(Debug)]
474pub enum CompressedColumnData {
475    /// Compressed integers (Int64 values).
476    Integers {
477        /// Compressed data.
478        data: CompressedData,
479        /// Index: entity ID position -> compressed array index.
480        id_to_index: Vec<u64>,
481        /// Reverse index: compressed array index -> entity ID position.
482        index_to_id: Vec<u64>,
483    },
484    /// Dictionary-encoded strings.
485    Strings {
486        /// Dictionary encoding.
487        encoding: DictionaryEncoding,
488        /// Index: entity ID position -> dictionary index.
489        id_to_index: Vec<u64>,
490        /// Reverse index: dictionary index -> entity ID position.
491        index_to_id: Vec<u64>,
492    },
493    /// Compressed booleans.
494    Booleans {
495        /// Compressed data.
496        data: CompressedData,
497        /// Index: entity ID position -> bit index.
498        id_to_index: Vec<u64>,
499        /// Reverse index: bit index -> entity ID position.
500        index_to_id: Vec<u64>,
501    },
502}
503
504impl CompressedColumnData {
505    /// Returns the memory usage of the compressed data in bytes.
506    #[must_use]
507    pub fn memory_usage(&self) -> usize {
508        match self {
509            CompressedColumnData::Integers {
510                data,
511                id_to_index,
512                index_to_id,
513            } => {
514                data.data.len()
515                    + id_to_index.len() * std::mem::size_of::<u64>()
516                    + index_to_id.len() * std::mem::size_of::<u64>()
517            }
518            CompressedColumnData::Strings {
519                encoding,
520                id_to_index,
521                index_to_id,
522            } => {
523                encoding.codes().len() * std::mem::size_of::<u32>()
524                    + encoding.dictionary().iter().map(|s| s.len()).sum::<usize>()
525                    + id_to_index.len() * std::mem::size_of::<u64>()
526                    + index_to_id.len() * std::mem::size_of::<u64>()
527            }
528            CompressedColumnData::Booleans {
529                data,
530                id_to_index,
531                index_to_id,
532            } => {
533                data.data.len()
534                    + id_to_index.len() * std::mem::size_of::<u64>()
535                    + index_to_id.len() * std::mem::size_of::<u64>()
536            }
537        }
538    }
539}
540
541/// Statistics about column compression.
542#[derive(Debug, Clone, Default)]
543pub struct CompressionStats {
544    /// Size of uncompressed data in bytes.
545    pub uncompressed_size: usize,
546    /// Size of compressed data in bytes.
547    pub compressed_size: usize,
548    /// Number of values in the column.
549    pub value_count: usize,
550    /// Codec used for compression.
551    pub codec: Option<CompressionCodec>,
552}
553
554impl CompressionStats {
555    /// Returns the compression ratio (uncompressed / compressed).
556    #[must_use]
557    pub fn compression_ratio(&self) -> f64 {
558        if self.compressed_size == 0 {
559            return 1.0;
560        }
561        self.uncompressed_size as f64 / self.compressed_size as f64
562    }
563}
564
565/// A single property column (e.g., all "age" values).
566///
567/// Maintains min/max/null_count for fast predicate evaluation. When you
568/// filter on `age > 50`, we first check if any age could possibly match
569/// before scanning the actual values.
570///
571/// Columns support optional compression for large datasets. When compression
572/// is enabled, the column automatically selects the best codec based on the
573/// data type and characteristics.
574pub struct PropertyColumn<Id: EntityId = NodeId> {
575    /// Sparse storage: entity ID -> value (hot buffer + uncompressed).
576    /// Used for recent writes and when compression is disabled.
577    values: FxHashMap<Id, Value>,
578    /// Zone map tracking min/max/null_count for predicate pushdown.
579    zone_map: ZoneMapEntry,
580    /// Whether zone map needs rebuild (after removes).
581    zone_map_dirty: bool,
582    /// Compression mode for this column.
583    compression_mode: CompressionMode,
584    /// Compressed data (when compression is enabled and triggered).
585    compressed: Option<CompressedColumnData>,
586    /// Number of values before last compression.
587    compressed_count: usize,
588}
589
590impl<Id: EntityId> PropertyColumn<Id> {
591    /// Creates a new empty column.
592    #[must_use]
593    pub fn new() -> Self {
594        Self {
595            values: FxHashMap::default(),
596            zone_map: ZoneMapEntry::new(),
597            zone_map_dirty: false,
598            compression_mode: CompressionMode::None,
599            compressed: None,
600            compressed_count: 0,
601        }
602    }
603
604    /// Creates a new column with the specified compression mode.
605    #[must_use]
606    pub fn with_compression(mode: CompressionMode) -> Self {
607        Self {
608            values: FxHashMap::default(),
609            zone_map: ZoneMapEntry::new(),
610            zone_map_dirty: false,
611            compression_mode: mode,
612            compressed: None,
613            compressed_count: 0,
614        }
615    }
616
617    /// Sets the compression mode for this column.
618    pub fn set_compression_mode(&mut self, mode: CompressionMode) {
619        self.compression_mode = mode;
620        if mode == CompressionMode::None {
621            // Decompress if switching to no compression
622            if self.compressed.is_some() {
623                self.decompress_all();
624            }
625        }
626    }
627
628    /// Returns the compression mode for this column.
629    #[must_use]
630    pub fn compression_mode(&self) -> CompressionMode {
631        self.compression_mode
632    }
633
634    /// Sets a value for an entity.
635    pub fn set(&mut self, id: Id, value: Value) {
636        // Update zone map incrementally
637        self.update_zone_map_on_insert(&value);
638        self.values.insert(id, value);
639
640        // Check if we should compress (in Auto mode)
641        if self.compression_mode == CompressionMode::Auto {
642            let total_count = self.values.len() + self.compressed_count;
643            let hot_buffer_count = self.values.len();
644
645            // Compress when hot buffer exceeds threshold and total is large enough
646            if hot_buffer_count >= HOT_BUFFER_SIZE && total_count >= COMPRESSION_THRESHOLD {
647                self.compress();
648            }
649        }
650    }
651
652    /// Updates zone map when inserting a value.
653    fn update_zone_map_on_insert(&mut self, value: &Value) {
654        self.zone_map.row_count += 1;
655
656        if matches!(value, Value::Null) {
657            self.zone_map.null_count += 1;
658            return;
659        }
660
661        // Update min
662        match &self.zone_map.min {
663            None => self.zone_map.min = Some(value.clone()),
664            Some(current) => {
665                if compare_values(value, current) == Some(Ordering::Less) {
666                    self.zone_map.min = Some(value.clone());
667                }
668            }
669        }
670
671        // Update max
672        match &self.zone_map.max {
673            None => self.zone_map.max = Some(value.clone()),
674            Some(current) => {
675                if compare_values(value, current) == Some(Ordering::Greater) {
676                    self.zone_map.max = Some(value.clone());
677                }
678            }
679        }
680    }
681
682    /// Gets a value for an entity.
683    ///
684    /// First checks the hot buffer (uncompressed values), then falls back
685    /// to the compressed data if present.
686    #[must_use]
687    pub fn get(&self, id: Id) -> Option<Value> {
688        // First check hot buffer
689        if let Some(value) = self.values.get(&id) {
690            return Some(value.clone());
691        }
692
693        // For now, compressed data lookup is not implemented for sparse access
694        // because the compressed format stores values by index, not by entity ID.
695        // This would require maintaining an ID -> index map in CompressedColumnData.
696        // The compressed data is primarily useful for bulk/scan operations.
697        None
698    }
699
700    /// Removes a value for an entity.
701    pub fn remove(&mut self, id: Id) -> Option<Value> {
702        let removed = self.values.remove(&id);
703        if removed.is_some() {
704            // Mark zone map as dirty - would need full rebuild for accurate min/max
705            self.zone_map_dirty = true;
706        }
707        removed
708    }
709
710    /// Returns the number of values in this column (hot + compressed).
711    #[must_use]
712    pub fn len(&self) -> usize {
713        self.values.len() + self.compressed_count
714    }
715
716    /// Returns true if this column is empty.
717    #[cfg(test)]
718    #[must_use]
719    pub fn is_empty(&self) -> bool {
720        self.values.is_empty() && self.compressed_count == 0
721    }
722
723    /// Returns compression statistics for this column.
724    #[must_use]
725    pub fn compression_stats(&self) -> CompressionStats {
726        let hot_size = self.values.len() * std::mem::size_of::<Value>();
727        let compressed_size = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
728        let codec = match &self.compressed {
729            Some(CompressedColumnData::Integers { data, .. }) => Some(data.codec),
730            Some(CompressedColumnData::Strings { .. }) => Some(CompressionCodec::Dictionary),
731            Some(CompressedColumnData::Booleans { data, .. }) => Some(data.codec),
732            None => None,
733        };
734
735        CompressionStats {
736            uncompressed_size: hot_size + self.compressed_count * std::mem::size_of::<Value>(),
737            compressed_size: hot_size + compressed_size,
738            value_count: self.len(),
739            codec,
740        }
741    }
742
743    /// Returns whether the column has compressed data.
744    #[must_use]
745    #[cfg(test)]
746    pub fn is_compressed(&self) -> bool {
747        self.compressed.is_some()
748    }
749
750    /// Compresses the hot buffer values.
751    ///
752    /// This merges the hot buffer into the compressed data, selecting the
753    /// best codec based on the value types.
754    ///
755    /// Note: If compressed data already exists, this is a no-op to avoid
756    /// losing previously compressed values. Use `force_compress()` after
757    /// decompressing to re-compress with all values.
758    pub fn compress(&mut self) {
759        if self.values.is_empty() {
760            return;
761        }
762
763        // Don't re-compress if we already have compressed data
764        // (would need to decompress and merge first)
765        if self.compressed.is_some() {
766            return;
767        }
768
769        // Determine the dominant type
770        let (int_count, str_count, bool_count) = self.count_types();
771        let total = self.values.len();
772
773        if int_count > total / 2 {
774            self.compress_as_integers();
775        } else if str_count > total / 2 {
776            self.compress_as_strings();
777        } else if bool_count > total / 2 {
778            self.compress_as_booleans();
779        }
780        // If no dominant type, don't compress (mixed types don't compress well)
781    }
782
783    /// Counts values by type.
784    fn count_types(&self) -> (usize, usize, usize) {
785        let mut int_count = 0;
786        let mut str_count = 0;
787        let mut bool_count = 0;
788
789        for value in self.values.values() {
790            match value {
791                Value::Int64(_) => int_count += 1,
792                Value::String(_) => str_count += 1,
793                Value::Bool(_) => bool_count += 1,
794                _ => {}
795            }
796        }
797
798        (int_count, str_count, bool_count)
799    }
800
801    /// Compresses integer values.
802    fn compress_as_integers(&mut self) {
803        // Extract integer values and their IDs
804        let mut values: Vec<(u64, i64)> = Vec::new();
805        let mut non_int_values: FxHashMap<Id, Value> = FxHashMap::default();
806
807        for (&id, value) in &self.values {
808            match value {
809                Value::Int64(v) => {
810                    let id_u64 = id.as_u64();
811                    values.push((id_u64, *v));
812                }
813                _ => {
814                    non_int_values.insert(id, value.clone());
815                }
816            }
817        }
818
819        if values.len() < 8 {
820            // Not worth compressing
821            return;
822        }
823
824        // Sort by ID for better compression
825        values.sort_by_key(|(id, _)| *id);
826
827        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
828        let index_to_id: Vec<u64> = id_to_index.clone();
829        let int_values: Vec<i64> = values.iter().map(|(_, v)| *v).collect();
830
831        // Compress using the optimal codec
832        let compressed = TypeSpecificCompressor::compress_signed_integers(&int_values);
833
834        // Only use compression if it actually saves space
835        if compressed.compression_ratio() > 1.2 {
836            self.compressed = Some(CompressedColumnData::Integers {
837                data: compressed,
838                id_to_index,
839                index_to_id,
840            });
841            self.compressed_count = values.len();
842            self.values = non_int_values;
843        }
844    }
845
846    /// Compresses string values using dictionary encoding.
847    fn compress_as_strings(&mut self) {
848        let mut values: Vec<(u64, ArcStr)> = Vec::new();
849        let mut non_str_values: FxHashMap<Id, Value> = FxHashMap::default();
850
851        for (&id, value) in &self.values {
852            match value {
853                Value::String(s) => {
854                    values.push((id.as_u64(), s.clone()));
855                }
856                _ => {
857                    non_str_values.insert(id, value.clone());
858                }
859            }
860        }
861
862        if values.len() < 8 {
863            return;
864        }
865
866        // Sort by ID
867        values.sort_by_key(|(id, _)| *id);
868
869        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
870        let index_to_id: Vec<u64> = id_to_index.clone();
871
872        // Build dictionary
873        let mut builder = DictionaryBuilder::new();
874        for (_, s) in &values {
875            builder.add(s.as_ref());
876        }
877        let encoding = builder.build();
878
879        // Only use compression if it actually saves space
880        if encoding.compression_ratio() > 1.2 {
881            self.compressed = Some(CompressedColumnData::Strings {
882                encoding,
883                id_to_index,
884                index_to_id,
885            });
886            self.compressed_count = values.len();
887            self.values = non_str_values;
888        }
889    }
890
891    /// Compresses boolean values.
892    fn compress_as_booleans(&mut self) {
893        let mut values: Vec<(u64, bool)> = Vec::new();
894        let mut non_bool_values: FxHashMap<Id, Value> = FxHashMap::default();
895
896        for (&id, value) in &self.values {
897            match value {
898                Value::Bool(b) => {
899                    values.push((id.as_u64(), *b));
900                }
901                _ => {
902                    non_bool_values.insert(id, value.clone());
903                }
904            }
905        }
906
907        if values.len() < 8 {
908            return;
909        }
910
911        // Sort by ID
912        values.sort_by_key(|(id, _)| *id);
913
914        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
915        let index_to_id: Vec<u64> = id_to_index.clone();
916        let bool_values: Vec<bool> = values.iter().map(|(_, v)| *v).collect();
917
918        let compressed = TypeSpecificCompressor::compress_booleans(&bool_values);
919
920        // Booleans always compress well (8x)
921        self.compressed = Some(CompressedColumnData::Booleans {
922            data: compressed,
923            id_to_index,
924            index_to_id,
925        });
926        self.compressed_count = values.len();
927        self.values = non_bool_values;
928    }
929
930    /// Decompresses all values back to the hot buffer.
931    fn decompress_all(&mut self) {
932        let Some(compressed) = self.compressed.take() else {
933            return;
934        };
935
936        match compressed {
937            CompressedColumnData::Integers {
938                data, index_to_id, ..
939            } => {
940                if let Ok(values) = TypeSpecificCompressor::decompress_integers(&data) {
941                    // Convert back to signed using zigzag decoding
942                    let signed: Vec<i64> = values
943                        .iter()
944                        .map(|&v| crate::storage::zigzag_decode(v))
945                        .collect();
946
947                    for (i, id_u64) in index_to_id.iter().enumerate() {
948                        if let Some(&value) = signed.get(i) {
949                            let id = Id::from_u64(*id_u64);
950                            self.values.insert(id, Value::Int64(value));
951                        }
952                    }
953                }
954            }
955            CompressedColumnData::Strings {
956                encoding,
957                index_to_id,
958                ..
959            } => {
960                for (i, id_u64) in index_to_id.iter().enumerate() {
961                    if let Some(s) = encoding.get(i) {
962                        let id = Id::from_u64(*id_u64);
963                        self.values.insert(id, Value::String(ArcStr::from(s)));
964                    }
965                }
966            }
967            CompressedColumnData::Booleans {
968                data, index_to_id, ..
969            } => {
970                if let Ok(values) = TypeSpecificCompressor::decompress_booleans(&data) {
971                    for (i, id_u64) in index_to_id.iter().enumerate() {
972                        if let Some(&value) = values.get(i) {
973                            let id = Id::from_u64(*id_u64);
974                            self.values.insert(id, Value::Bool(value));
975                        }
976                    }
977                }
978            }
979        }
980
981        self.compressed_count = 0;
982    }
983
984    /// Forces compression regardless of thresholds.
985    ///
986    /// Useful for bulk loading or when you know the column is complete.
987    pub fn force_compress(&mut self) {
988        self.compress();
989    }
990
991    /// Returns the zone map for this column.
992    #[must_use]
993    pub fn zone_map(&self) -> &ZoneMapEntry {
994        &self.zone_map
995    }
996
997    /// Uses zone map to check if any values could satisfy the predicate.
998    ///
999    /// Returns `false` when we can prove no values match (so the column
1000    /// can be skipped entirely). Returns `true` if values might match.
1001    #[must_use]
1002    pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
1003        if self.zone_map_dirty {
1004            // Conservative: can't skip if zone map is stale
1005            return true;
1006        }
1007
1008        match op {
1009            CompareOp::Eq => self.zone_map.might_contain_equal(value),
1010            CompareOp::Ne => {
1011                // Can only skip if all values are equal to the value
1012                // (which means min == max == value)
1013                match (&self.zone_map.min, &self.zone_map.max) {
1014                    (Some(min), Some(max)) => {
1015                        !(compare_values(min, value) == Some(Ordering::Equal)
1016                            && compare_values(max, value) == Some(Ordering::Equal))
1017                    }
1018                    _ => true,
1019                }
1020            }
1021            CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
1022            CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
1023            CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
1024            CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
1025        }
1026    }
1027
1028    /// Rebuilds zone map from current values.
1029    pub fn rebuild_zone_map(&mut self) {
1030        let mut zone_map = ZoneMapEntry::new();
1031
1032        for value in self.values.values() {
1033            zone_map.row_count += 1;
1034
1035            if matches!(value, Value::Null) {
1036                zone_map.null_count += 1;
1037                continue;
1038            }
1039
1040            // Update min
1041            match &zone_map.min {
1042                None => zone_map.min = Some(value.clone()),
1043                Some(current) => {
1044                    if compare_values(value, current) == Some(Ordering::Less) {
1045                        zone_map.min = Some(value.clone());
1046                    }
1047                }
1048            }
1049
1050            // Update max
1051            match &zone_map.max {
1052                None => zone_map.max = Some(value.clone()),
1053                Some(current) => {
1054                    if compare_values(value, current) == Some(Ordering::Greater) {
1055                        zone_map.max = Some(value.clone());
1056                    }
1057                }
1058            }
1059        }
1060
1061        self.zone_map = zone_map;
1062        self.zone_map_dirty = false;
1063    }
1064}
1065
1066/// Compares two values for ordering.
1067fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
1068    match (a, b) {
1069        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
1070        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
1071        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
1072        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
1073        (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
1074        (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
1075        (Value::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
1076        (Value::Date(a), Value::Date(b)) => Some(a.cmp(b)),
1077        (Value::Time(a), Value::Time(b)) => Some(a.cmp(b)),
1078        _ => None,
1079    }
1080}
1081
1082impl<Id: EntityId> Default for PropertyColumn<Id> {
1083    fn default() -> Self {
1084        Self::new()
1085    }
1086}
1087
1088/// A borrowed reference to a property column for bulk reads.
1089///
1090/// Holds the read lock so the column can't change while you're iterating.
1091pub struct PropertyColumnRef<'a, Id: EntityId = NodeId> {
1092    _guard: parking_lot::RwLockReadGuard<'a, FxHashMap<PropertyKey, PropertyColumn<Id>>>,
1093    _key: PropertyKey,
1094    _marker: PhantomData<Id>,
1095}
1096
1097#[cfg(test)]
1098mod tests {
1099    use super::*;
1100    use arcstr::ArcStr;
1101
1102    #[test]
1103    fn test_property_storage_basic() {
1104        let storage = PropertyStorage::new();
1105
1106        let node1 = NodeId::new(1);
1107        let node2 = NodeId::new(2);
1108        let name_key = PropertyKey::new("name");
1109        let age_key = PropertyKey::new("age");
1110
1111        storage.set(node1, name_key.clone(), "Alice".into());
1112        storage.set(node1, age_key.clone(), 30i64.into());
1113        storage.set(node2, name_key.clone(), "Bob".into());
1114
1115        assert_eq!(
1116            storage.get(node1, &name_key),
1117            Some(Value::String("Alice".into()))
1118        );
1119        assert_eq!(storage.get(node1, &age_key), Some(Value::Int64(30)));
1120        assert_eq!(
1121            storage.get(node2, &name_key),
1122            Some(Value::String("Bob".into()))
1123        );
1124        assert!(storage.get(node2, &age_key).is_none());
1125    }
1126
1127    #[test]
1128    fn test_property_storage_remove() {
1129        let storage = PropertyStorage::new();
1130
1131        let node = NodeId::new(1);
1132        let key = PropertyKey::new("name");
1133
1134        storage.set(node, key.clone(), "Alice".into());
1135        assert!(storage.get(node, &key).is_some());
1136
1137        let removed = storage.remove(node, &key);
1138        assert!(removed.is_some());
1139        assert!(storage.get(node, &key).is_none());
1140    }
1141
1142    #[test]
1143    fn test_property_storage_get_all() {
1144        let storage = PropertyStorage::new();
1145
1146        let node = NodeId::new(1);
1147        storage.set(node, PropertyKey::new("name"), "Alice".into());
1148        storage.set(node, PropertyKey::new("age"), 30i64.into());
1149        storage.set(node, PropertyKey::new("active"), true.into());
1150
1151        let props = storage.get_all(node);
1152        assert_eq!(props.len(), 3);
1153    }
1154
1155    #[test]
1156    fn test_property_storage_remove_all() {
1157        let storage = PropertyStorage::new();
1158
1159        let node = NodeId::new(1);
1160        storage.set(node, PropertyKey::new("name"), "Alice".into());
1161        storage.set(node, PropertyKey::new("age"), 30i64.into());
1162
1163        storage.remove_all(node);
1164
1165        assert!(storage.get(node, &PropertyKey::new("name")).is_none());
1166        assert!(storage.get(node, &PropertyKey::new("age")).is_none());
1167    }
1168
1169    #[test]
1170    fn test_property_column() {
1171        let mut col = PropertyColumn::new();
1172
1173        col.set(NodeId::new(1), "Alice".into());
1174        col.set(NodeId::new(2), "Bob".into());
1175
1176        assert_eq!(col.len(), 2);
1177        assert!(!col.is_empty());
1178
1179        assert_eq!(col.get(NodeId::new(1)), Some(Value::String("Alice".into())));
1180
1181        col.remove(NodeId::new(1));
1182        assert!(col.get(NodeId::new(1)).is_none());
1183        assert_eq!(col.len(), 1);
1184    }
1185
1186    #[test]
1187    fn test_compression_mode() {
1188        let col: PropertyColumn<NodeId> = PropertyColumn::new();
1189        assert_eq!(col.compression_mode(), CompressionMode::None);
1190
1191        let col: PropertyColumn<NodeId> = PropertyColumn::with_compression(CompressionMode::Auto);
1192        assert_eq!(col.compression_mode(), CompressionMode::Auto);
1193    }
1194
1195    #[test]
1196    fn test_property_storage_with_compression() {
1197        let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1198
1199        for i in 0..100 {
1200            storage.set(
1201                NodeId::new(i),
1202                PropertyKey::new("age"),
1203                Value::Int64(20 + (i as i64 % 50)),
1204            );
1205        }
1206
1207        // Values should still be readable
1208        assert_eq!(
1209            storage.get(NodeId::new(0), &PropertyKey::new("age")),
1210            Some(Value::Int64(20))
1211        );
1212        assert_eq!(
1213            storage.get(NodeId::new(50), &PropertyKey::new("age")),
1214            Some(Value::Int64(20))
1215        );
1216    }
1217
1218    #[test]
1219    fn test_compress_integer_column() {
1220        let mut col: PropertyColumn<NodeId> =
1221            PropertyColumn::with_compression(CompressionMode::Auto);
1222
1223        // Add many sequential integers
1224        for i in 0..2000 {
1225            col.set(NodeId::new(i), Value::Int64(1000 + i as i64));
1226        }
1227
1228        // Should have triggered compression at some point
1229        // Total count should include both compressed and hot buffer values
1230        let stats = col.compression_stats();
1231        assert_eq!(stats.value_count, 2000);
1232
1233        // Values from the hot buffer should be readable
1234        // Note: Compressed values are not accessible via get() - see design note
1235        let last_value = col.get(NodeId::new(1999));
1236        assert!(last_value.is_some() || col.is_compressed());
1237    }
1238
1239    #[test]
1240    fn test_compress_string_column() {
1241        let mut col: PropertyColumn<NodeId> =
1242            PropertyColumn::with_compression(CompressionMode::Auto);
1243
1244        // Add repeated strings (good for dictionary compression)
1245        let categories = ["Person", "Company", "Product", "Location"];
1246        for i in 0..2000 {
1247            let cat = categories[i % 4];
1248            col.set(NodeId::new(i as u64), Value::String(ArcStr::from(cat)));
1249        }
1250
1251        // Total count should be correct
1252        assert_eq!(col.len(), 2000);
1253
1254        // Late values should be in hot buffer and readable
1255        let last_value = col.get(NodeId::new(1999));
1256        assert!(last_value.is_some() || col.is_compressed());
1257    }
1258
1259    #[test]
1260    fn test_compress_boolean_column() {
1261        let mut col: PropertyColumn<NodeId> =
1262            PropertyColumn::with_compression(CompressionMode::Auto);
1263
1264        // Add booleans
1265        for i in 0..2000 {
1266            col.set(NodeId::new(i as u64), Value::Bool(i % 2 == 0));
1267        }
1268
1269        // Verify total count
1270        assert_eq!(col.len(), 2000);
1271
1272        // Late values should be readable
1273        let last_value = col.get(NodeId::new(1999));
1274        assert!(last_value.is_some() || col.is_compressed());
1275    }
1276
1277    #[test]
1278    fn test_force_compress() {
1279        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1280
1281        // Add fewer values than the threshold
1282        for i in 0..100 {
1283            col.set(NodeId::new(i), Value::Int64(i as i64));
1284        }
1285
1286        // Force compression
1287        col.force_compress();
1288
1289        // Stats should show compression was applied if beneficial
1290        let stats = col.compression_stats();
1291        assert_eq!(stats.value_count, 100);
1292    }
1293
1294    #[test]
1295    fn test_compression_stats() {
1296        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1297
1298        for i in 0..50 {
1299            col.set(NodeId::new(i), Value::Int64(i as i64));
1300        }
1301
1302        let stats = col.compression_stats();
1303        assert_eq!(stats.value_count, 50);
1304        assert!(stats.uncompressed_size > 0);
1305    }
1306
1307    #[test]
1308    fn test_storage_compression_stats() {
1309        let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1310
1311        for i in 0..100 {
1312            storage.set(
1313                NodeId::new(i),
1314                PropertyKey::new("age"),
1315                Value::Int64(i as i64),
1316            );
1317            storage.set(
1318                NodeId::new(i),
1319                PropertyKey::new("name"),
1320                Value::String(ArcStr::from("Alice")),
1321            );
1322        }
1323
1324        let stats = storage.compression_stats();
1325        assert_eq!(stats.len(), 2); // Two columns
1326        assert!(stats.contains_key(&PropertyKey::new("age")));
1327        assert!(stats.contains_key(&PropertyKey::new("name")));
1328    }
1329
1330    #[test]
1331    fn test_memory_usage() {
1332        let storage = PropertyStorage::new();
1333
1334        for i in 0..100 {
1335            storage.set(
1336                NodeId::new(i),
1337                PropertyKey::new("value"),
1338                Value::Int64(i as i64),
1339            );
1340        }
1341
1342        let usage = storage.memory_usage();
1343        assert!(usage > 0);
1344    }
1345
1346    #[test]
1347    fn test_get_batch_single_property() {
1348        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1349
1350        let node1 = NodeId::new(1);
1351        let node2 = NodeId::new(2);
1352        let node3 = NodeId::new(3);
1353        let age_key = PropertyKey::new("age");
1354
1355        storage.set(node1, age_key.clone(), 25i64.into());
1356        storage.set(node2, age_key.clone(), 30i64.into());
1357        // node3 has no age property
1358
1359        let ids = vec![node1, node2, node3];
1360        let values = storage.get_batch(&ids, &age_key);
1361
1362        assert_eq!(values.len(), 3);
1363        assert_eq!(values[0], Some(Value::Int64(25)));
1364        assert_eq!(values[1], Some(Value::Int64(30)));
1365        assert_eq!(values[2], None);
1366    }
1367
1368    #[test]
1369    fn test_get_batch_missing_column() {
1370        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1371
1372        let node1 = NodeId::new(1);
1373        let node2 = NodeId::new(2);
1374        let missing_key = PropertyKey::new("nonexistent");
1375
1376        let ids = vec![node1, node2];
1377        let values = storage.get_batch(&ids, &missing_key);
1378
1379        assert_eq!(values.len(), 2);
1380        assert_eq!(values[0], None);
1381        assert_eq!(values[1], None);
1382    }
1383
1384    #[test]
1385    fn test_get_batch_empty_ids() {
1386        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1387        let key = PropertyKey::new("any");
1388
1389        let values = storage.get_batch(&[], &key);
1390        assert!(values.is_empty());
1391    }
1392
1393    #[test]
1394    fn test_get_all_batch() {
1395        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1396
1397        let node1 = NodeId::new(1);
1398        let node2 = NodeId::new(2);
1399        let node3 = NodeId::new(3);
1400
1401        storage.set(node1, PropertyKey::new("name"), "Alice".into());
1402        storage.set(node1, PropertyKey::new("age"), 25i64.into());
1403        storage.set(node2, PropertyKey::new("name"), "Bob".into());
1404        // node3 has no properties
1405
1406        let ids = vec![node1, node2, node3];
1407        let all_props = storage.get_all_batch(&ids);
1408
1409        assert_eq!(all_props.len(), 3);
1410        assert_eq!(all_props[0].len(), 2); // name and age
1411        assert_eq!(all_props[1].len(), 1); // name only
1412        assert_eq!(all_props[2].len(), 0); // no properties
1413
1414        assert_eq!(
1415            all_props[0].get(&PropertyKey::new("name")),
1416            Some(&Value::String("Alice".into()))
1417        );
1418        assert_eq!(
1419            all_props[1].get(&PropertyKey::new("name")),
1420            Some(&Value::String("Bob".into()))
1421        );
1422    }
1423
1424    #[test]
1425    fn test_get_all_batch_empty_ids() {
1426        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1427
1428        let all_props = storage.get_all_batch(&[]);
1429        assert!(all_props.is_empty());
1430    }
1431}