Skip to main content

grafeo_core/graph/lpg/
property.rs

1//! Columnar property storage for nodes and edges.
2//!
3//! Properties are stored column-wise (all "name" values together, all "age"
4//! values together) rather than row-wise. This makes filtering fast - to find
5//! all nodes where age > 30, we only scan the age column.
6//!
7//! Each column also maintains a zone map (min/max/null_count) enabling the
8//! query optimizer to skip columns entirely when a predicate can't match.
9//!
10//! ## Compression
11//!
12//! Columns can be compressed to save memory. When compression is enabled,
13//! the column automatically selects the best codec based on the data type:
14//!
15//! | Data type | Codec | Typical savings |
16//! |-----------|-------|-----------------|
17//! | Int64 (sorted) | DeltaBitPacked | 5-20x |
18//! | Int64 (small) | BitPacked | 2-16x |
19//! | Int64 (repeated) | RunLength | 2-100x |
20//! | String (low cardinality) | Dictionary | 2-50x |
21//! | Bool | BitVector | 8x |
22
23use crate::index::zone_map::ZoneMapEntry;
24use crate::storage::{
25    CompressedData, CompressionCodec, DictionaryBuilder, DictionaryEncoding, TypeSpecificCompressor,
26};
27use arcstr::ArcStr;
28use grafeo_common::types::{EdgeId, NodeId, PropertyKey, Value};
29use grafeo_common::utils::hash::FxHashMap;
30use parking_lot::RwLock;
31use std::cmp::Ordering;
32use std::hash::Hash;
33use std::marker::PhantomData;
34
35/// Compression mode for property columns.
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
37pub enum CompressionMode {
38    /// Never compress - always use sparse HashMap (default).
39    #[default]
40    None,
41    /// Automatically compress when beneficial (after threshold).
42    Auto,
43    /// Eagerly compress on every flush.
44    Eager,
45}
46
47/// Threshold for automatic compression (number of values).
48const COMPRESSION_THRESHOLD: usize = 1000;
49
50/// Size of the hot buffer for recent writes (before compression).
51/// Larger buffer (4096) keeps more recent data uncompressed for faster reads.
52/// This trades ~64KB of memory overhead per column for 1.5-2x faster point lookups
53/// on recently-written data.
54const HOT_BUFFER_SIZE: usize = 4096;
55
56/// Comparison operators used for zone map predicate checks.
57///
58/// These map directly to GQL comparison operators like `=`, `<`, `>=`.
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum CompareOp {
61    /// Equal to value.
62    Eq,
63    /// Not equal to value.
64    Ne,
65    /// Less than value.
66    Lt,
67    /// Less than or equal to value.
68    Le,
69    /// Greater than value.
70    Gt,
71    /// Greater than or equal to value.
72    Ge,
73}
74
75/// Marker trait for IDs that can key into property storage.
76///
77/// Implemented for [`NodeId`] and [`EdgeId`] - you can store properties on both.
78pub trait EntityId: Copy + Eq + Hash + 'static {}
79
80impl EntityId for NodeId {}
81impl EntityId for EdgeId {}
82
83/// Thread-safe columnar property storage.
84///
85/// Each property key ("name", "age", etc.) gets its own column. This layout
86/// is great for analytical queries that filter on specific properties -
87/// you only touch the columns you need.
88///
89/// Generic over `Id` so the same storage works for nodes and edges.
90///
91/// # Example
92///
93/// ```
94/// use grafeo_core::graph::lpg::PropertyStorage;
95/// use grafeo_common::types::{NodeId, PropertyKey};
96///
97/// let storage = PropertyStorage::new();
98/// let alice = NodeId::new(1);
99///
100/// storage.set(alice, PropertyKey::new("name"), "Alice".into());
101/// storage.set(alice, PropertyKey::new("age"), 30i64.into());
102///
103/// // Fetch all properties at once
104/// let props = storage.get_all(alice);
105/// assert_eq!(props.len(), 2);
106/// ```
107pub struct PropertyStorage<Id: EntityId = NodeId> {
108    /// Map from property key to column.
109    /// Lock order: 9 (nested, acquired via LpgStore::node_properties/edge_properties)
110    columns: RwLock<FxHashMap<PropertyKey, PropertyColumn<Id>>>,
111    /// Default compression mode for new columns.
112    default_compression: CompressionMode,
113    _marker: PhantomData<Id>,
114}
115
116impl<Id: EntityId> PropertyStorage<Id> {
117    /// Creates a new property storage.
118    #[must_use]
119    pub fn new() -> Self {
120        Self {
121            columns: RwLock::new(FxHashMap::default()),
122            default_compression: CompressionMode::None,
123            _marker: PhantomData,
124        }
125    }
126
127    /// Creates a new property storage with compression enabled.
128    #[must_use]
129    pub fn with_compression(mode: CompressionMode) -> Self {
130        Self {
131            columns: RwLock::new(FxHashMap::default()),
132            default_compression: mode,
133            _marker: PhantomData,
134        }
135    }
136
137    /// Sets the default compression mode for new columns.
138    pub fn set_default_compression(&mut self, mode: CompressionMode) {
139        self.default_compression = mode;
140    }
141
142    /// Sets a property value for an entity.
143    pub fn set(&self, id: Id, key: PropertyKey, value: Value) {
144        let mut columns = self.columns.write();
145        let mode = self.default_compression;
146        columns
147            .entry(key)
148            .or_insert_with(|| PropertyColumn::with_compression(mode))
149            .set(id, value);
150    }
151
152    /// Enables compression for a specific column.
153    pub fn enable_compression(&self, key: &PropertyKey, mode: CompressionMode) {
154        let mut columns = self.columns.write();
155        if let Some(col) = columns.get_mut(key) {
156            col.set_compression_mode(mode);
157        }
158    }
159
160    /// Compresses all columns that have compression enabled.
161    pub fn compress_all(&self) {
162        let mut columns = self.columns.write();
163        for col in columns.values_mut() {
164            if col.compression_mode() != CompressionMode::None {
165                col.compress();
166            }
167        }
168    }
169
170    /// Forces compression on all columns regardless of mode.
171    pub fn force_compress_all(&self) {
172        let mut columns = self.columns.write();
173        for col in columns.values_mut() {
174            col.force_compress();
175        }
176    }
177
178    /// Returns compression statistics for all columns.
179    #[must_use]
180    pub fn compression_stats(&self) -> FxHashMap<PropertyKey, CompressionStats> {
181        let columns = self.columns.read();
182        columns
183            .iter()
184            .map(|(key, col)| (key.clone(), col.compression_stats()))
185            .collect()
186    }
187
188    /// Returns the total memory usage of all columns.
189    #[must_use]
190    pub fn memory_usage(&self) -> usize {
191        let columns = self.columns.read();
192        columns
193            .values()
194            .map(|col| col.compression_stats().compressed_size)
195            .sum()
196    }
197
198    /// Gets a property value for an entity.
199    #[must_use]
200    pub fn get(&self, id: Id, key: &PropertyKey) -> Option<Value> {
201        let columns = self.columns.read();
202        columns.get(key).and_then(|col| col.get(id))
203    }
204
205    /// Removes a property value for an entity.
206    pub fn remove(&self, id: Id, key: &PropertyKey) -> Option<Value> {
207        let mut columns = self.columns.write();
208        columns.get_mut(key).and_then(|col| col.remove(id))
209    }
210
211    /// Removes all properties for an entity.
212    pub fn remove_all(&self, id: Id) {
213        let mut columns = self.columns.write();
214        for col in columns.values_mut() {
215            col.remove(id);
216        }
217    }
218
219    /// Gets all properties for an entity.
220    #[must_use]
221    pub fn get_all(&self, id: Id) -> FxHashMap<PropertyKey, Value> {
222        let columns = self.columns.read();
223        let mut result = FxHashMap::default();
224        for (key, col) in columns.iter() {
225            if let Some(value) = col.get(id) {
226                result.insert(key.clone(), value);
227            }
228        }
229        result
230    }
231
232    /// Gets property values for multiple entities in a single lock acquisition.
233    ///
234    /// More efficient than calling [`Self::get`] in a loop because it acquires
235    /// the read lock only once.
236    ///
237    /// # Example
238    ///
239    /// ```
240    /// use grafeo_core::graph::lpg::PropertyStorage;
241    /// use grafeo_common::types::{PropertyKey, Value};
242    /// use grafeo_common::NodeId;
243    ///
244    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
245    /// let key = PropertyKey::new("age");
246    /// let ids = vec![NodeId(1), NodeId(2), NodeId(3)];
247    /// let values = storage.get_batch(&ids, &key);
248    /// // values[i] is the property value for ids[i], or None if not set
249    /// ```
250    #[must_use]
251    pub fn get_batch(&self, ids: &[Id], key: &PropertyKey) -> Vec<Option<Value>> {
252        let columns = self.columns.read();
253        match columns.get(key) {
254            Some(col) => ids.iter().map(|&id| col.get(id)).collect(),
255            None => vec![None; ids.len()],
256        }
257    }
258
259    /// Gets all properties for multiple entities efficiently.
260    ///
261    /// More efficient than calling [`Self::get_all`] in a loop because it
262    /// acquires the read lock only once.
263    ///
264    /// # Example
265    ///
266    /// ```
267    /// use grafeo_core::graph::lpg::PropertyStorage;
268    /// use grafeo_common::types::{PropertyKey, Value};
269    /// use grafeo_common::NodeId;
270    ///
271    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
272    /// let ids = vec![NodeId(1), NodeId(2)];
273    /// let all_props = storage.get_all_batch(&ids);
274    /// // all_props[i] is a HashMap of all properties for ids[i]
275    /// ```
276    #[must_use]
277    pub fn get_all_batch(&self, ids: &[Id]) -> Vec<FxHashMap<PropertyKey, Value>> {
278        let columns = self.columns.read();
279        let column_count = columns.len();
280
281        // Pre-allocate result vector with exact capacity (NebulaGraph pattern)
282        let mut results = Vec::with_capacity(ids.len());
283
284        for &id in ids {
285            // Pre-allocate HashMap with expected column count
286            let mut result = FxHashMap::with_capacity_and_hasher(column_count, Default::default());
287            for (key, col) in columns.iter() {
288                if let Some(value) = col.get(id) {
289                    result.insert(key.clone(), value);
290                }
291            }
292            results.push(result);
293        }
294
295        results
296    }
297
298    /// Gets selected properties for multiple entities efficiently (projection pushdown).
299    ///
300    /// This is more efficient than [`Self::get_all_batch`] when you only need a subset
301    /// of properties - it only iterates the requested columns instead of all columns.
302    ///
303    /// **Performance**: O(N × K) where N = ids.len() and K = keys.len(),
304    /// compared to O(N × C) for `get_all_batch` where C = total column count.
305    ///
306    /// # Example
307    ///
308    /// ```
309    /// use grafeo_core::graph::lpg::PropertyStorage;
310    /// use grafeo_common::types::{PropertyKey, Value};
311    /// use grafeo_common::NodeId;
312    ///
313    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
314    /// let ids = vec![NodeId::new(1), NodeId::new(2)];
315    /// let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
316    ///
317    /// // Only fetches "name" and "age" columns, ignoring other properties
318    /// let props = storage.get_selective_batch(&ids, &keys);
319    /// ```
320    #[must_use]
321    pub fn get_selective_batch(
322        &self,
323        ids: &[Id],
324        keys: &[PropertyKey],
325    ) -> Vec<FxHashMap<PropertyKey, Value>> {
326        if keys.is_empty() {
327            // No properties requested - return empty maps
328            return vec![FxHashMap::default(); ids.len()];
329        }
330
331        let columns = self.columns.read();
332
333        // Pre-collect only the columns we need (avoids re-lookup per id)
334        let requested_columns: Vec<_> = keys
335            .iter()
336            .filter_map(|key| columns.get(key).map(|col| (key, col)))
337            .collect();
338
339        // Pre-allocate result with exact capacity
340        let mut results = Vec::with_capacity(ids.len());
341
342        for &id in ids {
343            let mut result =
344                FxHashMap::with_capacity_and_hasher(requested_columns.len(), Default::default());
345            // Only iterate requested columns, not all columns
346            for (key, col) in &requested_columns {
347                if let Some(value) = col.get(id) {
348                    result.insert((*key).clone(), value);
349                }
350            }
351            results.push(result);
352        }
353
354        results
355    }
356
357    /// Returns the number of property columns.
358    #[must_use]
359    pub fn column_count(&self) -> usize {
360        self.columns.read().len()
361    }
362
363    /// Returns the keys of all columns.
364    #[must_use]
365    pub fn keys(&self) -> Vec<PropertyKey> {
366        self.columns.read().keys().cloned().collect()
367    }
368
369    /// Gets a column by key for bulk access.
370    #[must_use]
371    pub fn column(&self, key: &PropertyKey) -> Option<PropertyColumnRef<'_, Id>> {
372        let columns = self.columns.read();
373        if columns.contains_key(key) {
374            Some(PropertyColumnRef {
375                _guard: columns,
376                key: key.clone(),
377                _marker: PhantomData,
378            })
379        } else {
380            None
381        }
382    }
383
384    /// Checks if a predicate might match any values (using zone maps).
385    ///
386    /// Returns `false` only when we're *certain* no values match - for example,
387    /// if you're looking for age > 100 but the max age is 80. Returns `true`
388    /// if the property doesn't exist (conservative - might match).
389    #[must_use]
390    pub fn might_match(&self, key: &PropertyKey, op: CompareOp, value: &Value) -> bool {
391        let columns = self.columns.read();
392        columns
393            .get(key)
394            .map_or(true, |col| col.might_match(op, value)) // No column = assume might match (conservative)
395    }
396
397    /// Gets the zone map for a property column.
398    #[must_use]
399    pub fn zone_map(&self, key: &PropertyKey) -> Option<ZoneMapEntry> {
400        let columns = self.columns.read();
401        columns.get(key).map(|col| col.zone_map().clone())
402    }
403
404    /// Checks if a range predicate might match any values (using zone maps).
405    ///
406    /// Returns `false` only when we're *certain* no values match the range.
407    /// Returns `true` if the property doesn't exist (conservative - might match).
408    #[must_use]
409    pub fn might_match_range(
410        &self,
411        key: &PropertyKey,
412        min: Option<&Value>,
413        max: Option<&Value>,
414        min_inclusive: bool,
415        max_inclusive: bool,
416    ) -> bool {
417        let columns = self.columns.read();
418        columns.get(key).map_or(true, |col| {
419            col.zone_map()
420                .might_contain_range(min, max, min_inclusive, max_inclusive)
421        }) // No column = assume might match (conservative)
422    }
423
424    /// Rebuilds zone maps for all columns (call after bulk removes).
425    pub fn rebuild_zone_maps(&self) {
426        let mut columns = self.columns.write();
427        for col in columns.values_mut() {
428            col.rebuild_zone_map();
429        }
430    }
431}
432
433impl<Id: EntityId> Default for PropertyStorage<Id> {
434    fn default() -> Self {
435        Self::new()
436    }
437}
438
439/// Compressed storage for a property column.
440///
441/// Holds the compressed representation of values along with the index
442/// mapping entity IDs to positions in the compressed array.
443#[derive(Debug)]
444pub enum CompressedColumnData {
445    /// Compressed integers (Int64 values).
446    Integers {
447        /// Compressed data.
448        data: CompressedData,
449        /// Index: entity ID position -> compressed array index.
450        id_to_index: Vec<u64>,
451        /// Reverse index: compressed array index -> entity ID position.
452        index_to_id: Vec<u64>,
453    },
454    /// Dictionary-encoded strings.
455    Strings {
456        /// Dictionary encoding.
457        encoding: DictionaryEncoding,
458        /// Index: entity ID position -> dictionary index.
459        id_to_index: Vec<u64>,
460        /// Reverse index: dictionary index -> entity ID position.
461        index_to_id: Vec<u64>,
462    },
463    /// Compressed booleans.
464    Booleans {
465        /// Compressed data.
466        data: CompressedData,
467        /// Index: entity ID position -> bit index.
468        id_to_index: Vec<u64>,
469        /// Reverse index: bit index -> entity ID position.
470        index_to_id: Vec<u64>,
471    },
472}
473
474impl CompressedColumnData {
475    /// Returns the memory usage of the compressed data in bytes.
476    #[must_use]
477    pub fn memory_usage(&self) -> usize {
478        match self {
479            CompressedColumnData::Integers {
480                data,
481                id_to_index,
482                index_to_id,
483            } => {
484                data.data.len()
485                    + id_to_index.len() * std::mem::size_of::<u64>()
486                    + index_to_id.len() * std::mem::size_of::<u64>()
487            }
488            CompressedColumnData::Strings {
489                encoding,
490                id_to_index,
491                index_to_id,
492            } => {
493                encoding.codes().len() * std::mem::size_of::<u32>()
494                    + encoding.dictionary().iter().map(|s| s.len()).sum::<usize>()
495                    + id_to_index.len() * std::mem::size_of::<u64>()
496                    + index_to_id.len() * std::mem::size_of::<u64>()
497            }
498            CompressedColumnData::Booleans {
499                data,
500                id_to_index,
501                index_to_id,
502            } => {
503                data.data.len()
504                    + id_to_index.len() * std::mem::size_of::<u64>()
505                    + index_to_id.len() * std::mem::size_of::<u64>()
506            }
507        }
508    }
509
510    /// Returns the compression ratio.
511    #[must_use]
512    #[allow(dead_code)]
513    pub fn compression_ratio(&self) -> f64 {
514        match self {
515            CompressedColumnData::Integers { data, .. } => data.compression_ratio(),
516            CompressedColumnData::Strings { encoding, .. } => encoding.compression_ratio(),
517            CompressedColumnData::Booleans { data, .. } => data.compression_ratio(),
518        }
519    }
520}
521
522/// Statistics about column compression.
523#[derive(Debug, Clone, Default)]
524pub struct CompressionStats {
525    /// Size of uncompressed data in bytes.
526    pub uncompressed_size: usize,
527    /// Size of compressed data in bytes.
528    pub compressed_size: usize,
529    /// Number of values in the column.
530    pub value_count: usize,
531    /// Codec used for compression.
532    pub codec: Option<CompressionCodec>,
533}
534
535impl CompressionStats {
536    /// Returns the compression ratio (uncompressed / compressed).
537    #[must_use]
538    pub fn compression_ratio(&self) -> f64 {
539        if self.compressed_size == 0 {
540            return 1.0;
541        }
542        self.uncompressed_size as f64 / self.compressed_size as f64
543    }
544}
545
546/// A single property column (e.g., all "age" values).
547///
548/// Maintains min/max/null_count for fast predicate evaluation. When you
549/// filter on `age > 50`, we first check if any age could possibly match
550/// before scanning the actual values.
551///
552/// Columns support optional compression for large datasets. When compression
553/// is enabled, the column automatically selects the best codec based on the
554/// data type and characteristics.
555pub struct PropertyColumn<Id: EntityId = NodeId> {
556    /// Sparse storage: entity ID -> value (hot buffer + uncompressed).
557    /// Used for recent writes and when compression is disabled.
558    values: FxHashMap<Id, Value>,
559    /// Zone map tracking min/max/null_count for predicate pushdown.
560    zone_map: ZoneMapEntry,
561    /// Whether zone map needs rebuild (after removes).
562    zone_map_dirty: bool,
563    /// Compression mode for this column.
564    compression_mode: CompressionMode,
565    /// Compressed data (when compression is enabled and triggered).
566    compressed: Option<CompressedColumnData>,
567    /// Number of values before last compression.
568    compressed_count: usize,
569}
570
571impl<Id: EntityId> PropertyColumn<Id> {
572    /// Creates a new empty column.
573    #[must_use]
574    pub fn new() -> Self {
575        Self {
576            values: FxHashMap::default(),
577            zone_map: ZoneMapEntry::new(),
578            zone_map_dirty: false,
579            compression_mode: CompressionMode::None,
580            compressed: None,
581            compressed_count: 0,
582        }
583    }
584
585    /// Creates a new column with the specified compression mode.
586    #[must_use]
587    pub fn with_compression(mode: CompressionMode) -> Self {
588        Self {
589            values: FxHashMap::default(),
590            zone_map: ZoneMapEntry::new(),
591            zone_map_dirty: false,
592            compression_mode: mode,
593            compressed: None,
594            compressed_count: 0,
595        }
596    }
597
598    /// Sets the compression mode for this column.
599    pub fn set_compression_mode(&mut self, mode: CompressionMode) {
600        self.compression_mode = mode;
601        if mode == CompressionMode::None {
602            // Decompress if switching to no compression
603            if self.compressed.is_some() {
604                self.decompress_all();
605            }
606        }
607    }
608
609    /// Returns the compression mode for this column.
610    #[must_use]
611    pub fn compression_mode(&self) -> CompressionMode {
612        self.compression_mode
613    }
614
615    /// Sets a value for an entity.
616    pub fn set(&mut self, id: Id, value: Value) {
617        // Update zone map incrementally
618        self.update_zone_map_on_insert(&value);
619        self.values.insert(id, value);
620
621        // Check if we should compress (in Auto mode)
622        if self.compression_mode == CompressionMode::Auto {
623            let total_count = self.values.len() + self.compressed_count;
624            let hot_buffer_count = self.values.len();
625
626            // Compress when hot buffer exceeds threshold and total is large enough
627            if hot_buffer_count >= HOT_BUFFER_SIZE && total_count >= COMPRESSION_THRESHOLD {
628                self.compress();
629            }
630        }
631    }
632
633    /// Updates zone map when inserting a value.
634    fn update_zone_map_on_insert(&mut self, value: &Value) {
635        self.zone_map.row_count += 1;
636
637        if matches!(value, Value::Null) {
638            self.zone_map.null_count += 1;
639            return;
640        }
641
642        // Update min
643        match &self.zone_map.min {
644            None => self.zone_map.min = Some(value.clone()),
645            Some(current) => {
646                if compare_values(value, current) == Some(Ordering::Less) {
647                    self.zone_map.min = Some(value.clone());
648                }
649            }
650        }
651
652        // Update max
653        match &self.zone_map.max {
654            None => self.zone_map.max = Some(value.clone()),
655            Some(current) => {
656                if compare_values(value, current) == Some(Ordering::Greater) {
657                    self.zone_map.max = Some(value.clone());
658                }
659            }
660        }
661    }
662
663    /// Gets a value for an entity.
664    ///
665    /// First checks the hot buffer (uncompressed values), then falls back
666    /// to the compressed data if present.
667    #[must_use]
668    pub fn get(&self, id: Id) -> Option<Value> {
669        // First check hot buffer
670        if let Some(value) = self.values.get(&id) {
671            return Some(value.clone());
672        }
673
674        // For now, compressed data lookup is not implemented for sparse access
675        // because the compressed format stores values by index, not by entity ID.
676        // This would require maintaining an ID -> index map in CompressedColumnData.
677        // The compressed data is primarily useful for bulk/scan operations.
678        None
679    }
680
681    /// Removes a value for an entity.
682    pub fn remove(&mut self, id: Id) -> Option<Value> {
683        let removed = self.values.remove(&id);
684        if removed.is_some() {
685            // Mark zone map as dirty - would need full rebuild for accurate min/max
686            self.zone_map_dirty = true;
687        }
688        removed
689    }
690
691    /// Returns the number of values in this column (hot + compressed).
692    #[must_use]
693    #[allow(dead_code)]
694    pub fn len(&self) -> usize {
695        self.values.len() + self.compressed_count
696    }
697
698    /// Returns true if this column is empty.
699    #[must_use]
700    #[allow(dead_code)]
701    pub fn is_empty(&self) -> bool {
702        self.values.is_empty() && self.compressed_count == 0
703    }
704
705    /// Returns compression statistics for this column.
706    #[must_use]
707    pub fn compression_stats(&self) -> CompressionStats {
708        let hot_size = self.values.len() * std::mem::size_of::<Value>();
709        let compressed_size = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
710        let codec = match &self.compressed {
711            Some(CompressedColumnData::Integers { data, .. }) => Some(data.codec),
712            Some(CompressedColumnData::Strings { .. }) => Some(CompressionCodec::Dictionary),
713            Some(CompressedColumnData::Booleans { data, .. }) => Some(data.codec),
714            None => None,
715        };
716
717        CompressionStats {
718            uncompressed_size: hot_size + self.compressed_count * std::mem::size_of::<Value>(),
719            compressed_size: hot_size + compressed_size,
720            value_count: self.len(),
721            codec,
722        }
723    }
724
725    /// Returns whether the column has compressed data.
726    #[must_use]
727    #[cfg(test)]
728    pub fn is_compressed(&self) -> bool {
729        self.compressed.is_some()
730    }
731
732    /// Compresses the hot buffer values.
733    ///
734    /// This merges the hot buffer into the compressed data, selecting the
735    /// best codec based on the value types.
736    ///
737    /// Note: If compressed data already exists, this is a no-op to avoid
738    /// losing previously compressed values. Use `force_compress()` after
739    /// decompressing to re-compress with all values.
740    pub fn compress(&mut self) {
741        if self.values.is_empty() {
742            return;
743        }
744
745        // Don't re-compress if we already have compressed data
746        // (would need to decompress and merge first)
747        if self.compressed.is_some() {
748            return;
749        }
750
751        // Determine the dominant type
752        let (int_count, str_count, bool_count) = self.count_types();
753        let total = self.values.len();
754
755        if int_count > total / 2 {
756            self.compress_as_integers();
757        } else if str_count > total / 2 {
758            self.compress_as_strings();
759        } else if bool_count > total / 2 {
760            self.compress_as_booleans();
761        }
762        // If no dominant type, don't compress (mixed types don't compress well)
763    }
764
765    /// Counts values by type.
766    fn count_types(&self) -> (usize, usize, usize) {
767        let mut int_count = 0;
768        let mut str_count = 0;
769        let mut bool_count = 0;
770
771        for value in self.values.values() {
772            match value {
773                Value::Int64(_) => int_count += 1,
774                Value::String(_) => str_count += 1,
775                Value::Bool(_) => bool_count += 1,
776                _ => {}
777            }
778        }
779
780        (int_count, str_count, bool_count)
781    }
782
783    /// Compresses integer values.
784    #[allow(unsafe_code)]
785    fn compress_as_integers(&mut self) {
786        // Extract integer values and their IDs
787        let mut values: Vec<(u64, i64)> = Vec::new();
788        let mut non_int_values: FxHashMap<Id, Value> = FxHashMap::default();
789
790        for (&id, value) in &self.values {
791            match value {
792                Value::Int64(v) => {
793                    // Convert Id to u64 for indexing (assumes Id can be converted)
794                    let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
795                    values.push((id_u64, *v));
796                }
797                _ => {
798                    non_int_values.insert(id, value.clone());
799                }
800            }
801        }
802
803        if values.len() < 8 {
804            // Not worth compressing
805            return;
806        }
807
808        // Sort by ID for better compression
809        values.sort_by_key(|(id, _)| *id);
810
811        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
812        let index_to_id: Vec<u64> = id_to_index.clone();
813        let int_values: Vec<i64> = values.iter().map(|(_, v)| *v).collect();
814
815        // Compress using the optimal codec
816        let compressed = TypeSpecificCompressor::compress_signed_integers(&int_values);
817
818        // Only use compression if it actually saves space
819        if compressed.compression_ratio() > 1.2 {
820            self.compressed = Some(CompressedColumnData::Integers {
821                data: compressed,
822                id_to_index,
823                index_to_id,
824            });
825            self.compressed_count = values.len();
826            self.values = non_int_values;
827        }
828    }
829
830    /// Compresses string values using dictionary encoding.
831    #[allow(unsafe_code)]
832    fn compress_as_strings(&mut self) {
833        let mut values: Vec<(u64, ArcStr)> = Vec::new();
834        let mut non_str_values: FxHashMap<Id, Value> = FxHashMap::default();
835
836        for (&id, value) in &self.values {
837            match value {
838                Value::String(s) => {
839                    let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
840                    values.push((id_u64, s.clone()));
841                }
842                _ => {
843                    non_str_values.insert(id, value.clone());
844                }
845            }
846        }
847
848        if values.len() < 8 {
849            return;
850        }
851
852        // Sort by ID
853        values.sort_by_key(|(id, _)| *id);
854
855        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
856        let index_to_id: Vec<u64> = id_to_index.clone();
857
858        // Build dictionary
859        let mut builder = DictionaryBuilder::new();
860        for (_, s) in &values {
861            builder.add(s.as_ref());
862        }
863        let encoding = builder.build();
864
865        // Only use compression if it actually saves space
866        if encoding.compression_ratio() > 1.2 {
867            self.compressed = Some(CompressedColumnData::Strings {
868                encoding,
869                id_to_index,
870                index_to_id,
871            });
872            self.compressed_count = values.len();
873            self.values = non_str_values;
874        }
875    }
876
877    /// Compresses boolean values.
878    #[allow(unsafe_code)]
879    fn compress_as_booleans(&mut self) {
880        let mut values: Vec<(u64, bool)> = Vec::new();
881        let mut non_bool_values: FxHashMap<Id, Value> = FxHashMap::default();
882
883        for (&id, value) in &self.values {
884            match value {
885                Value::Bool(b) => {
886                    let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
887                    values.push((id_u64, *b));
888                }
889                _ => {
890                    non_bool_values.insert(id, value.clone());
891                }
892            }
893        }
894
895        if values.len() < 8 {
896            return;
897        }
898
899        // Sort by ID
900        values.sort_by_key(|(id, _)| *id);
901
902        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
903        let index_to_id: Vec<u64> = id_to_index.clone();
904        let bool_values: Vec<bool> = values.iter().map(|(_, v)| *v).collect();
905
906        let compressed = TypeSpecificCompressor::compress_booleans(&bool_values);
907
908        // Booleans always compress well (8x)
909        self.compressed = Some(CompressedColumnData::Booleans {
910            data: compressed,
911            id_to_index,
912            index_to_id,
913        });
914        self.compressed_count = values.len();
915        self.values = non_bool_values;
916    }
917
918    /// Decompresses all values back to the hot buffer.
919    #[allow(unsafe_code)]
920    fn decompress_all(&mut self) {
921        let Some(compressed) = self.compressed.take() else {
922            return;
923        };
924
925        match compressed {
926            CompressedColumnData::Integers {
927                data, index_to_id, ..
928            } => {
929                if let Ok(values) = TypeSpecificCompressor::decompress_integers(&data) {
930                    // Convert back to signed using zigzag decoding
931                    let signed: Vec<i64> = values
932                        .iter()
933                        .map(|&v| crate::storage::zigzag_decode(v))
934                        .collect();
935
936                    for (i, id_u64) in index_to_id.iter().enumerate() {
937                        if let Some(&value) = signed.get(i) {
938                            let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
939                            self.values.insert(id, Value::Int64(value));
940                        }
941                    }
942                }
943            }
944            CompressedColumnData::Strings {
945                encoding,
946                index_to_id,
947                ..
948            } => {
949                for (i, id_u64) in index_to_id.iter().enumerate() {
950                    if let Some(s) = encoding.get(i) {
951                        let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
952                        self.values.insert(id, Value::String(ArcStr::from(s)));
953                    }
954                }
955            }
956            CompressedColumnData::Booleans {
957                data, index_to_id, ..
958            } => {
959                if let Ok(values) = TypeSpecificCompressor::decompress_booleans(&data) {
960                    for (i, id_u64) in index_to_id.iter().enumerate() {
961                        if let Some(&value) = values.get(i) {
962                            let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
963                            self.values.insert(id, Value::Bool(value));
964                        }
965                    }
966                }
967            }
968        }
969
970        self.compressed_count = 0;
971    }
972
973    /// Forces compression regardless of thresholds.
974    ///
975    /// Useful for bulk loading or when you know the column is complete.
976    pub fn force_compress(&mut self) {
977        self.compress();
978    }
979
980    /// Returns the zone map for this column.
981    #[must_use]
982    pub fn zone_map(&self) -> &ZoneMapEntry {
983        &self.zone_map
984    }
985
986    /// Uses zone map to check if any values could satisfy the predicate.
987    ///
988    /// Returns `false` when we can prove no values match (so the column
989    /// can be skipped entirely). Returns `true` if values might match.
990    #[must_use]
991    pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
992        if self.zone_map_dirty {
993            // Conservative: can't skip if zone map is stale
994            return true;
995        }
996
997        match op {
998            CompareOp::Eq => self.zone_map.might_contain_equal(value),
999            CompareOp::Ne => {
1000                // Can only skip if all values are equal to the value
1001                // (which means min == max == value)
1002                match (&self.zone_map.min, &self.zone_map.max) {
1003                    (Some(min), Some(max)) => {
1004                        !(compare_values(min, value) == Some(Ordering::Equal)
1005                            && compare_values(max, value) == Some(Ordering::Equal))
1006                    }
1007                    _ => true,
1008                }
1009            }
1010            CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
1011            CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
1012            CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
1013            CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
1014        }
1015    }
1016
1017    /// Rebuilds zone map from current values.
1018    pub fn rebuild_zone_map(&mut self) {
1019        let mut zone_map = ZoneMapEntry::new();
1020
1021        for value in self.values.values() {
1022            zone_map.row_count += 1;
1023
1024            if matches!(value, Value::Null) {
1025                zone_map.null_count += 1;
1026                continue;
1027            }
1028
1029            // Update min
1030            match &zone_map.min {
1031                None => zone_map.min = Some(value.clone()),
1032                Some(current) => {
1033                    if compare_values(value, current) == Some(Ordering::Less) {
1034                        zone_map.min = Some(value.clone());
1035                    }
1036                }
1037            }
1038
1039            // Update max
1040            match &zone_map.max {
1041                None => zone_map.max = Some(value.clone()),
1042                Some(current) => {
1043                    if compare_values(value, current) == Some(Ordering::Greater) {
1044                        zone_map.max = Some(value.clone());
1045                    }
1046                }
1047            }
1048        }
1049
1050        self.zone_map = zone_map;
1051        self.zone_map_dirty = false;
1052    }
1053}
1054
1055/// Compares two values for ordering.
1056fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
1057    match (a, b) {
1058        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
1059        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
1060        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
1061        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
1062        (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
1063        (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
1064        _ => None,
1065    }
1066}
1067
1068impl<Id: EntityId> Default for PropertyColumn<Id> {
1069    fn default() -> Self {
1070        Self::new()
1071    }
1072}
1073
1074/// A borrowed reference to a property column for bulk reads.
1075///
1076/// Holds the read lock so the column can't change while you're iterating.
1077pub struct PropertyColumnRef<'a, Id: EntityId = NodeId> {
1078    _guard: parking_lot::RwLockReadGuard<'a, FxHashMap<PropertyKey, PropertyColumn<Id>>>,
1079    #[allow(dead_code)]
1080    key: PropertyKey,
1081    _marker: PhantomData<Id>,
1082}
1083
1084#[cfg(test)]
1085mod tests {
1086    use super::*;
1087    use arcstr::ArcStr;
1088
1089    #[test]
1090    fn test_property_storage_basic() {
1091        let storage = PropertyStorage::new();
1092
1093        let node1 = NodeId::new(1);
1094        let node2 = NodeId::new(2);
1095        let name_key = PropertyKey::new("name");
1096        let age_key = PropertyKey::new("age");
1097
1098        storage.set(node1, name_key.clone(), "Alice".into());
1099        storage.set(node1, age_key.clone(), 30i64.into());
1100        storage.set(node2, name_key.clone(), "Bob".into());
1101
1102        assert_eq!(
1103            storage.get(node1, &name_key),
1104            Some(Value::String("Alice".into()))
1105        );
1106        assert_eq!(storage.get(node1, &age_key), Some(Value::Int64(30)));
1107        assert_eq!(
1108            storage.get(node2, &name_key),
1109            Some(Value::String("Bob".into()))
1110        );
1111        assert!(storage.get(node2, &age_key).is_none());
1112    }
1113
1114    #[test]
1115    fn test_property_storage_remove() {
1116        let storage = PropertyStorage::new();
1117
1118        let node = NodeId::new(1);
1119        let key = PropertyKey::new("name");
1120
1121        storage.set(node, key.clone(), "Alice".into());
1122        assert!(storage.get(node, &key).is_some());
1123
1124        let removed = storage.remove(node, &key);
1125        assert!(removed.is_some());
1126        assert!(storage.get(node, &key).is_none());
1127    }
1128
1129    #[test]
1130    fn test_property_storage_get_all() {
1131        let storage = PropertyStorage::new();
1132
1133        let node = NodeId::new(1);
1134        storage.set(node, PropertyKey::new("name"), "Alice".into());
1135        storage.set(node, PropertyKey::new("age"), 30i64.into());
1136        storage.set(node, PropertyKey::new("active"), true.into());
1137
1138        let props = storage.get_all(node);
1139        assert_eq!(props.len(), 3);
1140    }
1141
1142    #[test]
1143    fn test_property_storage_remove_all() {
1144        let storage = PropertyStorage::new();
1145
1146        let node = NodeId::new(1);
1147        storage.set(node, PropertyKey::new("name"), "Alice".into());
1148        storage.set(node, PropertyKey::new("age"), 30i64.into());
1149
1150        storage.remove_all(node);
1151
1152        assert!(storage.get(node, &PropertyKey::new("name")).is_none());
1153        assert!(storage.get(node, &PropertyKey::new("age")).is_none());
1154    }
1155
1156    #[test]
1157    fn test_property_column() {
1158        let mut col = PropertyColumn::new();
1159
1160        col.set(NodeId::new(1), "Alice".into());
1161        col.set(NodeId::new(2), "Bob".into());
1162
1163        assert_eq!(col.len(), 2);
1164        assert!(!col.is_empty());
1165
1166        assert_eq!(col.get(NodeId::new(1)), Some(Value::String("Alice".into())));
1167
1168        col.remove(NodeId::new(1));
1169        assert!(col.get(NodeId::new(1)).is_none());
1170        assert_eq!(col.len(), 1);
1171    }
1172
1173    #[test]
1174    fn test_compression_mode() {
1175        let col: PropertyColumn<NodeId> = PropertyColumn::new();
1176        assert_eq!(col.compression_mode(), CompressionMode::None);
1177
1178        let col: PropertyColumn<NodeId> = PropertyColumn::with_compression(CompressionMode::Auto);
1179        assert_eq!(col.compression_mode(), CompressionMode::Auto);
1180    }
1181
1182    #[test]
1183    fn test_property_storage_with_compression() {
1184        let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1185
1186        for i in 0..100 {
1187            storage.set(
1188                NodeId::new(i),
1189                PropertyKey::new("age"),
1190                Value::Int64(20 + (i as i64 % 50)),
1191            );
1192        }
1193
1194        // Values should still be readable
1195        assert_eq!(
1196            storage.get(NodeId::new(0), &PropertyKey::new("age")),
1197            Some(Value::Int64(20))
1198        );
1199        assert_eq!(
1200            storage.get(NodeId::new(50), &PropertyKey::new("age")),
1201            Some(Value::Int64(20))
1202        );
1203    }
1204
1205    #[test]
1206    fn test_compress_integer_column() {
1207        let mut col: PropertyColumn<NodeId> =
1208            PropertyColumn::with_compression(CompressionMode::Auto);
1209
1210        // Add many sequential integers
1211        for i in 0..2000 {
1212            col.set(NodeId::new(i), Value::Int64(1000 + i as i64));
1213        }
1214
1215        // Should have triggered compression at some point
1216        // Total count should include both compressed and hot buffer values
1217        let stats = col.compression_stats();
1218        assert_eq!(stats.value_count, 2000);
1219
1220        // Values from the hot buffer should be readable
1221        // Note: Compressed values are not accessible via get() - see design note
1222        let last_value = col.get(NodeId::new(1999));
1223        assert!(last_value.is_some() || col.is_compressed());
1224    }
1225
1226    #[test]
1227    fn test_compress_string_column() {
1228        let mut col: PropertyColumn<NodeId> =
1229            PropertyColumn::with_compression(CompressionMode::Auto);
1230
1231        // Add repeated strings (good for dictionary compression)
1232        let categories = ["Person", "Company", "Product", "Location"];
1233        for i in 0..2000 {
1234            let cat = categories[i % 4];
1235            col.set(NodeId::new(i as u64), Value::String(ArcStr::from(cat)));
1236        }
1237
1238        // Total count should be correct
1239        assert_eq!(col.len(), 2000);
1240
1241        // Late values should be in hot buffer and readable
1242        let last_value = col.get(NodeId::new(1999));
1243        assert!(last_value.is_some() || col.is_compressed());
1244    }
1245
1246    #[test]
1247    fn test_compress_boolean_column() {
1248        let mut col: PropertyColumn<NodeId> =
1249            PropertyColumn::with_compression(CompressionMode::Auto);
1250
1251        // Add booleans
1252        for i in 0..2000 {
1253            col.set(NodeId::new(i as u64), Value::Bool(i % 2 == 0));
1254        }
1255
1256        // Verify total count
1257        assert_eq!(col.len(), 2000);
1258
1259        // Late values should be readable
1260        let last_value = col.get(NodeId::new(1999));
1261        assert!(last_value.is_some() || col.is_compressed());
1262    }
1263
1264    #[test]
1265    fn test_force_compress() {
1266        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1267
1268        // Add fewer values than the threshold
1269        for i in 0..100 {
1270            col.set(NodeId::new(i), Value::Int64(i as i64));
1271        }
1272
1273        // Force compression
1274        col.force_compress();
1275
1276        // Stats should show compression was applied if beneficial
1277        let stats = col.compression_stats();
1278        assert_eq!(stats.value_count, 100);
1279    }
1280
1281    #[test]
1282    fn test_compression_stats() {
1283        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1284
1285        for i in 0..50 {
1286            col.set(NodeId::new(i), Value::Int64(i as i64));
1287        }
1288
1289        let stats = col.compression_stats();
1290        assert_eq!(stats.value_count, 50);
1291        assert!(stats.uncompressed_size > 0);
1292    }
1293
1294    #[test]
1295    fn test_storage_compression_stats() {
1296        let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1297
1298        for i in 0..100 {
1299            storage.set(
1300                NodeId::new(i),
1301                PropertyKey::new("age"),
1302                Value::Int64(i as i64),
1303            );
1304            storage.set(
1305                NodeId::new(i),
1306                PropertyKey::new("name"),
1307                Value::String(ArcStr::from("Alice")),
1308            );
1309        }
1310
1311        let stats = storage.compression_stats();
1312        assert_eq!(stats.len(), 2); // Two columns
1313        assert!(stats.contains_key(&PropertyKey::new("age")));
1314        assert!(stats.contains_key(&PropertyKey::new("name")));
1315    }
1316
1317    #[test]
1318    fn test_memory_usage() {
1319        let storage = PropertyStorage::new();
1320
1321        for i in 0..100 {
1322            storage.set(
1323                NodeId::new(i),
1324                PropertyKey::new("value"),
1325                Value::Int64(i as i64),
1326            );
1327        }
1328
1329        let usage = storage.memory_usage();
1330        assert!(usage > 0);
1331    }
1332
1333    #[test]
1334    fn test_get_batch_single_property() {
1335        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1336
1337        let node1 = NodeId::new(1);
1338        let node2 = NodeId::new(2);
1339        let node3 = NodeId::new(3);
1340        let age_key = PropertyKey::new("age");
1341
1342        storage.set(node1, age_key.clone(), 25i64.into());
1343        storage.set(node2, age_key.clone(), 30i64.into());
1344        // node3 has no age property
1345
1346        let ids = vec![node1, node2, node3];
1347        let values = storage.get_batch(&ids, &age_key);
1348
1349        assert_eq!(values.len(), 3);
1350        assert_eq!(values[0], Some(Value::Int64(25)));
1351        assert_eq!(values[1], Some(Value::Int64(30)));
1352        assert_eq!(values[2], None);
1353    }
1354
1355    #[test]
1356    fn test_get_batch_missing_column() {
1357        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1358
1359        let node1 = NodeId::new(1);
1360        let node2 = NodeId::new(2);
1361        let missing_key = PropertyKey::new("nonexistent");
1362
1363        let ids = vec![node1, node2];
1364        let values = storage.get_batch(&ids, &missing_key);
1365
1366        assert_eq!(values.len(), 2);
1367        assert_eq!(values[0], None);
1368        assert_eq!(values[1], None);
1369    }
1370
1371    #[test]
1372    fn test_get_batch_empty_ids() {
1373        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1374        let key = PropertyKey::new("any");
1375
1376        let values = storage.get_batch(&[], &key);
1377        assert!(values.is_empty());
1378    }
1379
1380    #[test]
1381    fn test_get_all_batch() {
1382        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1383
1384        let node1 = NodeId::new(1);
1385        let node2 = NodeId::new(2);
1386        let node3 = NodeId::new(3);
1387
1388        storage.set(node1, PropertyKey::new("name"), "Alice".into());
1389        storage.set(node1, PropertyKey::new("age"), 25i64.into());
1390        storage.set(node2, PropertyKey::new("name"), "Bob".into());
1391        // node3 has no properties
1392
1393        let ids = vec![node1, node2, node3];
1394        let all_props = storage.get_all_batch(&ids);
1395
1396        assert_eq!(all_props.len(), 3);
1397        assert_eq!(all_props[0].len(), 2); // name and age
1398        assert_eq!(all_props[1].len(), 1); // name only
1399        assert_eq!(all_props[2].len(), 0); // no properties
1400
1401        assert_eq!(
1402            all_props[0].get(&PropertyKey::new("name")),
1403            Some(&Value::String("Alice".into()))
1404        );
1405        assert_eq!(
1406            all_props[1].get(&PropertyKey::new("name")),
1407            Some(&Value::String("Bob".into()))
1408        );
1409    }
1410
1411    #[test]
1412    fn test_get_all_batch_empty_ids() {
1413        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1414
1415        let all_props = storage.get_all_batch(&[]);
1416        assert!(all_props.is_empty());
1417    }
1418}