Skip to main content

grafeo_core/graph/lpg/
property.rs

1//! Columnar property storage for nodes and edges.
2//!
3//! Properties are stored column-wise (all "name" values together, all "age"
4//! values together) rather than row-wise. This makes filtering fast - to find
5//! all nodes where age > 30, we only scan the age column.
6//!
7//! Each column also maintains a zone map (min/max/null_count) enabling the
8//! query optimizer to skip columns entirely when a predicate can't match.
9//!
10//! ## Compression
11//!
12//! Columns can be compressed to save memory. When compression is enabled,
13//! the column automatically selects the best codec based on the data type:
14//!
15//! | Data type | Codec | Typical savings |
16//! |-----------|-------|-----------------|
17//! | Int64 (sorted) | DeltaBitPacked | 5-20x |
18//! | Int64 (small) | BitPacked | 2-16x |
19//! | Int64 (repeated) | RunLength | 2-100x |
20//! | String (low cardinality) | Dictionary | 2-50x |
21//! | Bool | BitVector | 8x |
22
23use crate::index::zone_map::ZoneMapEntry;
24use crate::storage::{
25    CompressedData, CompressionCodec, DictionaryBuilder, DictionaryEncoding, TypeSpecificCompressor,
26};
27use arcstr::ArcStr;
28use grafeo_common::types::{EdgeId, NodeId, PropertyKey, Value};
29use grafeo_common::utils::hash::FxHashMap;
30use parking_lot::RwLock;
31use std::cmp::Ordering;
32use std::hash::Hash;
33use std::marker::PhantomData;
34
35/// Compression mode for property columns.
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
37pub enum CompressionMode {
38    /// Never compress - always use sparse HashMap (default).
39    #[default]
40    None,
41    /// Automatically compress when beneficial (after threshold).
42    Auto,
43    /// Eagerly compress on every flush.
44    Eager,
45}
46
47/// Threshold for automatic compression (number of values).
48const COMPRESSION_THRESHOLD: usize = 1000;
49
50/// Size of the hot buffer for recent writes (before compression).
51/// Larger buffer (4096) keeps more recent data uncompressed for faster reads.
52/// This trades ~64KB of memory overhead per column for 1.5-2x faster point lookups
53/// on recently-written data.
54const HOT_BUFFER_SIZE: usize = 4096;
55
56/// Comparison operators used for zone map predicate checks.
57///
58/// These map directly to GQL comparison operators like `=`, `<`, `>=`.
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum CompareOp {
61    /// Equal to value.
62    Eq,
63    /// Not equal to value.
64    Ne,
65    /// Less than value.
66    Lt,
67    /// Less than or equal to value.
68    Le,
69    /// Greater than value.
70    Gt,
71    /// Greater than or equal to value.
72    Ge,
73}
74
75/// Marker trait for IDs that can key into property storage.
76///
77/// Implemented for [`NodeId`] and [`EdgeId`] - you can store properties on both.
78pub trait EntityId: Copy + Eq + Hash + 'static {}
79
80impl EntityId for NodeId {}
81impl EntityId for EdgeId {}
82
83/// Thread-safe columnar property storage.
84///
85/// Each property key ("name", "age", etc.) gets its own column. This layout
86/// is great for analytical queries that filter on specific properties -
87/// you only touch the columns you need.
88///
89/// Generic over `Id` so the same storage works for nodes and edges.
90///
91/// # Example
92///
93/// ```
94/// use grafeo_core::graph::lpg::PropertyStorage;
95/// use grafeo_common::types::{NodeId, PropertyKey};
96///
97/// let storage = PropertyStorage::new();
98/// let alice = NodeId::new(1);
99///
100/// storage.set(alice, PropertyKey::new("name"), "Alice".into());
101/// storage.set(alice, PropertyKey::new("age"), 30i64.into());
102///
103/// // Fetch all properties at once
104/// let props = storage.get_all(alice);
105/// assert_eq!(props.len(), 2);
106/// ```
107pub struct PropertyStorage<Id: EntityId = NodeId> {
108    /// Map from property key to column.
109    /// Lock order: 9 (nested, acquired via LpgStore::node_properties/edge_properties)
110    columns: RwLock<FxHashMap<PropertyKey, PropertyColumn<Id>>>,
111    /// Default compression mode for new columns.
112    default_compression: CompressionMode,
113    _marker: PhantomData<Id>,
114}
115
116impl<Id: EntityId> PropertyStorage<Id> {
117    /// Creates a new property storage.
118    #[must_use]
119    pub fn new() -> Self {
120        Self {
121            columns: RwLock::new(FxHashMap::default()),
122            default_compression: CompressionMode::None,
123            _marker: PhantomData,
124        }
125    }
126
127    /// Creates a new property storage with compression enabled.
128    #[must_use]
129    pub fn with_compression(mode: CompressionMode) -> Self {
130        Self {
131            columns: RwLock::new(FxHashMap::default()),
132            default_compression: mode,
133            _marker: PhantomData,
134        }
135    }
136
137    /// Sets the default compression mode for new columns.
138    pub fn set_default_compression(&mut self, mode: CompressionMode) {
139        self.default_compression = mode;
140    }
141
142    /// Sets a property value for an entity.
143    pub fn set(&self, id: Id, key: PropertyKey, value: Value) {
144        let mut columns = self.columns.write();
145        let mode = self.default_compression;
146        columns
147            .entry(key)
148            .or_insert_with(|| PropertyColumn::with_compression(mode))
149            .set(id, value);
150    }
151
152    /// Enables compression for a specific column.
153    pub fn enable_compression(&self, key: &PropertyKey, mode: CompressionMode) {
154        let mut columns = self.columns.write();
155        if let Some(col) = columns.get_mut(key) {
156            col.set_compression_mode(mode);
157        }
158    }
159
160    /// Compresses all columns that have compression enabled.
161    pub fn compress_all(&self) {
162        let mut columns = self.columns.write();
163        for col in columns.values_mut() {
164            if col.compression_mode() != CompressionMode::None {
165                col.compress();
166            }
167        }
168    }
169
170    /// Forces compression on all columns regardless of mode.
171    pub fn force_compress_all(&self) {
172        let mut columns = self.columns.write();
173        for col in columns.values_mut() {
174            col.force_compress();
175        }
176    }
177
178    /// Returns compression statistics for all columns.
179    #[must_use]
180    pub fn compression_stats(&self) -> FxHashMap<PropertyKey, CompressionStats> {
181        let columns = self.columns.read();
182        columns
183            .iter()
184            .map(|(key, col)| (key.clone(), col.compression_stats()))
185            .collect()
186    }
187
188    /// Returns the total memory usage of all columns.
189    #[must_use]
190    pub fn memory_usage(&self) -> usize {
191        let columns = self.columns.read();
192        columns
193            .values()
194            .map(|col| col.compression_stats().compressed_size)
195            .sum()
196    }
197
198    /// Gets a property value for an entity.
199    #[must_use]
200    pub fn get(&self, id: Id, key: &PropertyKey) -> Option<Value> {
201        let columns = self.columns.read();
202        columns.get(key).and_then(|col| col.get(id))
203    }
204
205    /// Removes a property value for an entity.
206    pub fn remove(&self, id: Id, key: &PropertyKey) -> Option<Value> {
207        let mut columns = self.columns.write();
208        columns.get_mut(key).and_then(|col| col.remove(id))
209    }
210
211    /// Removes all properties for an entity.
212    pub fn remove_all(&self, id: Id) {
213        let mut columns = self.columns.write();
214        for col in columns.values_mut() {
215            col.remove(id);
216        }
217    }
218
219    /// Gets all properties for an entity.
220    #[must_use]
221    pub fn get_all(&self, id: Id) -> FxHashMap<PropertyKey, Value> {
222        let columns = self.columns.read();
223        let mut result = FxHashMap::default();
224        for (key, col) in columns.iter() {
225            if let Some(value) = col.get(id) {
226                result.insert(key.clone(), value);
227            }
228        }
229        result
230    }
231
232    /// Gets property values for multiple entities in a single lock acquisition.
233    ///
234    /// More efficient than calling [`Self::get`] in a loop because it acquires
235    /// the read lock only once.
236    ///
237    /// # Example
238    ///
239    /// ```
240    /// use grafeo_core::graph::lpg::PropertyStorage;
241    /// use grafeo_common::types::{PropertyKey, Value};
242    /// use grafeo_common::NodeId;
243    ///
244    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
245    /// let key = PropertyKey::new("age");
246    /// let ids = vec![NodeId(1), NodeId(2), NodeId(3)];
247    /// let values = storage.get_batch(&ids, &key);
248    /// // values[i] is the property value for ids[i], or None if not set
249    /// ```
250    #[must_use]
251    pub fn get_batch(&self, ids: &[Id], key: &PropertyKey) -> Vec<Option<Value>> {
252        let columns = self.columns.read();
253        match columns.get(key) {
254            Some(col) => ids.iter().map(|&id| col.get(id)).collect(),
255            None => vec![None; ids.len()],
256        }
257    }
258
259    /// Gets all properties for multiple entities efficiently.
260    ///
261    /// More efficient than calling [`Self::get_all`] in a loop because it
262    /// acquires the read lock only once.
263    ///
264    /// # Example
265    ///
266    /// ```
267    /// use grafeo_core::graph::lpg::PropertyStorage;
268    /// use grafeo_common::types::{PropertyKey, Value};
269    /// use grafeo_common::NodeId;
270    ///
271    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
272    /// let ids = vec![NodeId(1), NodeId(2)];
273    /// let all_props = storage.get_all_batch(&ids);
274    /// // all_props[i] is a HashMap of all properties for ids[i]
275    /// ```
276    #[must_use]
277    pub fn get_all_batch(&self, ids: &[Id]) -> Vec<FxHashMap<PropertyKey, Value>> {
278        let columns = self.columns.read();
279        let column_count = columns.len();
280
281        // Pre-allocate result vector with exact capacity (NebulaGraph pattern)
282        let mut results = Vec::with_capacity(ids.len());
283
284        for &id in ids {
285            // Pre-allocate HashMap with expected column count
286            let mut result = FxHashMap::with_capacity_and_hasher(column_count, Default::default());
287            for (key, col) in columns.iter() {
288                if let Some(value) = col.get(id) {
289                    result.insert(key.clone(), value);
290                }
291            }
292            results.push(result);
293        }
294
295        results
296    }
297
298    /// Gets selected properties for multiple entities efficiently (projection pushdown).
299    ///
300    /// This is more efficient than [`Self::get_all_batch`] when you only need a subset
301    /// of properties - it only iterates the requested columns instead of all columns.
302    ///
303    /// **Performance**: O(N × K) where N = ids.len() and K = keys.len(),
304    /// compared to O(N × C) for `get_all_batch` where C = total column count.
305    ///
306    /// # Example
307    ///
308    /// ```
309    /// use grafeo_core::graph::lpg::PropertyStorage;
310    /// use grafeo_common::types::{PropertyKey, Value};
311    /// use grafeo_common::NodeId;
312    ///
313    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
314    /// let ids = vec![NodeId::new(1), NodeId::new(2)];
315    /// let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
316    ///
317    /// // Only fetches "name" and "age" columns, ignoring other properties
318    /// let props = storage.get_selective_batch(&ids, &keys);
319    /// ```
320    #[must_use]
321    pub fn get_selective_batch(
322        &self,
323        ids: &[Id],
324        keys: &[PropertyKey],
325    ) -> Vec<FxHashMap<PropertyKey, Value>> {
326        if keys.is_empty() {
327            // No properties requested - return empty maps
328            return vec![FxHashMap::default(); ids.len()];
329        }
330
331        let columns = self.columns.read();
332
333        // Pre-collect only the columns we need (avoids re-lookup per id)
334        let requested_columns: Vec<_> = keys
335            .iter()
336            .filter_map(|key| columns.get(key).map(|col| (key, col)))
337            .collect();
338
339        // Pre-allocate result with exact capacity
340        let mut results = Vec::with_capacity(ids.len());
341
342        for &id in ids {
343            let mut result =
344                FxHashMap::with_capacity_and_hasher(requested_columns.len(), Default::default());
345            // Only iterate requested columns, not all columns
346            for (key, col) in &requested_columns {
347                if let Some(value) = col.get(id) {
348                    result.insert((*key).clone(), value);
349                }
350            }
351            results.push(result);
352        }
353
354        results
355    }
356
357    /// Returns the number of property columns.
358    #[must_use]
359    pub fn column_count(&self) -> usize {
360        self.columns.read().len()
361    }
362
363    /// Returns the keys of all columns.
364    #[must_use]
365    pub fn keys(&self) -> Vec<PropertyKey> {
366        self.columns.read().keys().cloned().collect()
367    }
368
369    /// Gets a column by key for bulk access.
370    #[must_use]
371    pub fn column(&self, key: &PropertyKey) -> Option<PropertyColumnRef<'_, Id>> {
372        let columns = self.columns.read();
373        if columns.contains_key(key) {
374            Some(PropertyColumnRef {
375                _guard: columns,
376                key: key.clone(),
377                _marker: PhantomData,
378            })
379        } else {
380            None
381        }
382    }
383
384    /// Checks if a predicate might match any values (using zone maps).
385    ///
386    /// Returns `false` only when we're *certain* no values match - for example,
387    /// if you're looking for age > 100 but the max age is 80. Returns `true`
388    /// if the property doesn't exist (conservative - might match).
389    #[must_use]
390    pub fn might_match(&self, key: &PropertyKey, op: CompareOp, value: &Value) -> bool {
391        let columns = self.columns.read();
392        columns
393            .get(key)
394            .map_or(true, |col| col.might_match(op, value)) // No column = assume might match (conservative)
395    }
396
397    /// Gets the zone map for a property column.
398    #[must_use]
399    pub fn zone_map(&self, key: &PropertyKey) -> Option<ZoneMapEntry> {
400        let columns = self.columns.read();
401        columns.get(key).map(|col| col.zone_map().clone())
402    }
403
404    /// Checks if a range predicate might match any values (using zone maps).
405    ///
406    /// Returns `false` only when we're *certain* no values match the range.
407    /// Returns `true` if the property doesn't exist (conservative - might match).
408    #[must_use]
409    pub fn might_match_range(
410        &self,
411        key: &PropertyKey,
412        min: Option<&Value>,
413        max: Option<&Value>,
414        min_inclusive: bool,
415        max_inclusive: bool,
416    ) -> bool {
417        let columns = self.columns.read();
418        columns.get(key).map_or(true, |col| {
419            col.zone_map()
420                .might_contain_range(min, max, min_inclusive, max_inclusive)
421        }) // No column = assume might match (conservative)
422    }
423
424    /// Rebuilds zone maps for all columns (call after bulk removes).
425    pub fn rebuild_zone_maps(&self) {
426        let mut columns = self.columns.write();
427        for col in columns.values_mut() {
428            col.rebuild_zone_map();
429        }
430    }
431}
432
433impl<Id: EntityId> Default for PropertyStorage<Id> {
434    fn default() -> Self {
435        Self::new()
436    }
437}
438
439/// Compressed storage for a property column.
440///
441/// Holds the compressed representation of values along with the index
442/// mapping entity IDs to positions in the compressed array.
443#[derive(Debug)]
444pub enum CompressedColumnData {
445    /// Compressed integers (Int64 values).
446    Integers {
447        /// Compressed data.
448        data: CompressedData,
449        /// Index: entity ID position -> compressed array index.
450        id_to_index: Vec<u64>,
451        /// Reverse index: compressed array index -> entity ID position.
452        index_to_id: Vec<u64>,
453    },
454    /// Dictionary-encoded strings.
455    Strings {
456        /// Dictionary encoding.
457        encoding: DictionaryEncoding,
458        /// Index: entity ID position -> dictionary index.
459        id_to_index: Vec<u64>,
460        /// Reverse index: dictionary index -> entity ID position.
461        index_to_id: Vec<u64>,
462    },
463    /// Compressed booleans.
464    Booleans {
465        /// Compressed data.
466        data: CompressedData,
467        /// Index: entity ID position -> bit index.
468        id_to_index: Vec<u64>,
469        /// Reverse index: bit index -> entity ID position.
470        index_to_id: Vec<u64>,
471    },
472}
473
474impl CompressedColumnData {
475    /// Returns the memory usage of the compressed data in bytes.
476    #[must_use]
477    pub fn memory_usage(&self) -> usize {
478        match self {
479            CompressedColumnData::Integers {
480                data,
481                id_to_index,
482                index_to_id,
483            } => {
484                data.data.len()
485                    + id_to_index.len() * std::mem::size_of::<u64>()
486                    + index_to_id.len() * std::mem::size_of::<u64>()
487            }
488            CompressedColumnData::Strings {
489                encoding,
490                id_to_index,
491                index_to_id,
492            } => {
493                encoding.codes().len() * std::mem::size_of::<u32>()
494                    + encoding.dictionary().iter().map(|s| s.len()).sum::<usize>()
495                    + id_to_index.len() * std::mem::size_of::<u64>()
496                    + index_to_id.len() * std::mem::size_of::<u64>()
497            }
498            CompressedColumnData::Booleans {
499                data,
500                id_to_index,
501                index_to_id,
502            } => {
503                data.data.len()
504                    + id_to_index.len() * std::mem::size_of::<u64>()
505                    + index_to_id.len() * std::mem::size_of::<u64>()
506            }
507        }
508    }
509
510    /// Returns the compression ratio.
511    #[must_use]
512    #[allow(dead_code)]
513    pub fn compression_ratio(&self) -> f64 {
514        match self {
515            CompressedColumnData::Integers { data, .. } => data.compression_ratio(),
516            CompressedColumnData::Strings { encoding, .. } => encoding.compression_ratio(),
517            CompressedColumnData::Booleans { data, .. } => data.compression_ratio(),
518        }
519    }
520}
521
522/// Statistics about column compression.
523#[derive(Debug, Clone, Default)]
524pub struct CompressionStats {
525    /// Size of uncompressed data in bytes.
526    pub uncompressed_size: usize,
527    /// Size of compressed data in bytes.
528    pub compressed_size: usize,
529    /// Number of values in the column.
530    pub value_count: usize,
531    /// Codec used for compression.
532    pub codec: Option<CompressionCodec>,
533}
534
535impl CompressionStats {
536    /// Returns the compression ratio (uncompressed / compressed).
537    #[must_use]
538    pub fn compression_ratio(&self) -> f64 {
539        if self.compressed_size == 0 {
540            return 1.0;
541        }
542        self.uncompressed_size as f64 / self.compressed_size as f64
543    }
544}
545
546/// A single property column (e.g., all "age" values).
547///
548/// Maintains min/max/null_count for fast predicate evaluation. When you
549/// filter on `age > 50`, we first check if any age could possibly match
550/// before scanning the actual values.
551///
552/// Columns support optional compression for large datasets. When compression
553/// is enabled, the column automatically selects the best codec based on the
554/// data type and characteristics.
555pub struct PropertyColumn<Id: EntityId = NodeId> {
556    /// Sparse storage: entity ID -> value (hot buffer + uncompressed).
557    /// Used for recent writes and when compression is disabled.
558    values: FxHashMap<Id, Value>,
559    /// Zone map tracking min/max/null_count for predicate pushdown.
560    zone_map: ZoneMapEntry,
561    /// Whether zone map needs rebuild (after removes).
562    zone_map_dirty: bool,
563    /// Compression mode for this column.
564    compression_mode: CompressionMode,
565    /// Compressed data (when compression is enabled and triggered).
566    compressed: Option<CompressedColumnData>,
567    /// Number of values before last compression.
568    compressed_count: usize,
569}
570
571impl<Id: EntityId> PropertyColumn<Id> {
572    /// Creates a new empty column.
573    #[must_use]
574    pub fn new() -> Self {
575        Self {
576            values: FxHashMap::default(),
577            zone_map: ZoneMapEntry::new(),
578            zone_map_dirty: false,
579            compression_mode: CompressionMode::None,
580            compressed: None,
581            compressed_count: 0,
582        }
583    }
584
585    /// Creates a new column with the specified compression mode.
586    #[must_use]
587    pub fn with_compression(mode: CompressionMode) -> Self {
588        Self {
589            values: FxHashMap::default(),
590            zone_map: ZoneMapEntry::new(),
591            zone_map_dirty: false,
592            compression_mode: mode,
593            compressed: None,
594            compressed_count: 0,
595        }
596    }
597
598    /// Sets the compression mode for this column.
599    pub fn set_compression_mode(&mut self, mode: CompressionMode) {
600        self.compression_mode = mode;
601        if mode == CompressionMode::None {
602            // Decompress if switching to no compression
603            if self.compressed.is_some() {
604                self.decompress_all();
605            }
606        }
607    }
608
609    /// Returns the compression mode for this column.
610    #[must_use]
611    pub fn compression_mode(&self) -> CompressionMode {
612        self.compression_mode
613    }
614
615    /// Sets a value for an entity.
616    pub fn set(&mut self, id: Id, value: Value) {
617        // Update zone map incrementally
618        self.update_zone_map_on_insert(&value);
619        self.values.insert(id, value);
620
621        // Check if we should compress (in Auto mode)
622        if self.compression_mode == CompressionMode::Auto {
623            let total_count = self.values.len() + self.compressed_count;
624            let hot_buffer_count = self.values.len();
625
626            // Compress when hot buffer exceeds threshold and total is large enough
627            if hot_buffer_count >= HOT_BUFFER_SIZE && total_count >= COMPRESSION_THRESHOLD {
628                self.compress();
629            }
630        }
631    }
632
633    /// Updates zone map when inserting a value.
634    fn update_zone_map_on_insert(&mut self, value: &Value) {
635        self.zone_map.row_count += 1;
636
637        if matches!(value, Value::Null) {
638            self.zone_map.null_count += 1;
639            return;
640        }
641
642        // Update min
643        match &self.zone_map.min {
644            None => self.zone_map.min = Some(value.clone()),
645            Some(current) => {
646                if compare_values(value, current) == Some(Ordering::Less) {
647                    self.zone_map.min = Some(value.clone());
648                }
649            }
650        }
651
652        // Update max
653        match &self.zone_map.max {
654            None => self.zone_map.max = Some(value.clone()),
655            Some(current) => {
656                if compare_values(value, current) == Some(Ordering::Greater) {
657                    self.zone_map.max = Some(value.clone());
658                }
659            }
660        }
661    }
662
663    /// Gets a value for an entity.
664    ///
665    /// First checks the hot buffer (uncompressed values), then falls back
666    /// to the compressed data if present.
667    #[must_use]
668    pub fn get(&self, id: Id) -> Option<Value> {
669        // First check hot buffer
670        if let Some(value) = self.values.get(&id) {
671            return Some(value.clone());
672        }
673
674        // For now, compressed data lookup is not implemented for sparse access
675        // because the compressed format stores values by index, not by entity ID.
676        // This would require maintaining an ID -> index map in CompressedColumnData.
677        // The compressed data is primarily useful for bulk/scan operations.
678        None
679    }
680
681    /// Removes a value for an entity.
682    pub fn remove(&mut self, id: Id) -> Option<Value> {
683        let removed = self.values.remove(&id);
684        if removed.is_some() {
685            // Mark zone map as dirty - would need full rebuild for accurate min/max
686            self.zone_map_dirty = true;
687        }
688        removed
689    }
690
691    /// Returns the number of values in this column (hot + compressed).
692    #[must_use]
693    #[allow(dead_code)]
694    pub fn len(&self) -> usize {
695        self.values.len() + self.compressed_count
696    }
697
698    /// Returns true if this column is empty.
699    #[must_use]
700    #[allow(dead_code)]
701    pub fn is_empty(&self) -> bool {
702        self.values.is_empty() && self.compressed_count == 0
703    }
704
705    /// Iterates over all (id, value) pairs in the hot buffer.
706    ///
707    /// Note: This only iterates over uncompressed values. For full iteration
708    /// including compressed values, use [`iter_all`].
709    #[allow(dead_code)]
710    pub fn iter(&self) -> impl Iterator<Item = (Id, &Value)> {
711        self.values.iter().map(|(&id, v)| (id, v))
712    }
713
714    /// Returns compression statistics for this column.
715    #[must_use]
716    pub fn compression_stats(&self) -> CompressionStats {
717        let hot_size = self.values.len() * std::mem::size_of::<Value>();
718        let compressed_size = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
719        let codec = match &self.compressed {
720            Some(CompressedColumnData::Integers { data, .. }) => Some(data.codec),
721            Some(CompressedColumnData::Strings { .. }) => Some(CompressionCodec::Dictionary),
722            Some(CompressedColumnData::Booleans { data, .. }) => Some(data.codec),
723            None => None,
724        };
725
726        CompressionStats {
727            uncompressed_size: hot_size + self.compressed_count * std::mem::size_of::<Value>(),
728            compressed_size: hot_size + compressed_size,
729            value_count: self.len(),
730            codec,
731        }
732    }
733
734    /// Returns whether the column has compressed data.
735    #[must_use]
736    #[allow(dead_code)]
737    pub fn is_compressed(&self) -> bool {
738        self.compressed.is_some()
739    }
740
741    /// Compresses the hot buffer values.
742    ///
743    /// This merges the hot buffer into the compressed data, selecting the
744    /// best codec based on the value types.
745    ///
746    /// Note: If compressed data already exists, this is a no-op to avoid
747    /// losing previously compressed values. Use `force_compress()` after
748    /// decompressing to re-compress with all values.
749    pub fn compress(&mut self) {
750        if self.values.is_empty() {
751            return;
752        }
753
754        // Don't re-compress if we already have compressed data
755        // (would need to decompress and merge first)
756        if self.compressed.is_some() {
757            return;
758        }
759
760        // Determine the dominant type
761        let (int_count, str_count, bool_count) = self.count_types();
762        let total = self.values.len();
763
764        if int_count > total / 2 {
765            self.compress_as_integers();
766        } else if str_count > total / 2 {
767            self.compress_as_strings();
768        } else if bool_count > total / 2 {
769            self.compress_as_booleans();
770        }
771        // If no dominant type, don't compress (mixed types don't compress well)
772    }
773
774    /// Counts values by type.
775    fn count_types(&self) -> (usize, usize, usize) {
776        let mut int_count = 0;
777        let mut str_count = 0;
778        let mut bool_count = 0;
779
780        for value in self.values.values() {
781            match value {
782                Value::Int64(_) => int_count += 1,
783                Value::String(_) => str_count += 1,
784                Value::Bool(_) => bool_count += 1,
785                _ => {}
786            }
787        }
788
789        (int_count, str_count, bool_count)
790    }
791
792    /// Compresses integer values.
793    #[allow(unsafe_code)]
794    fn compress_as_integers(&mut self) {
795        // Extract integer values and their IDs
796        let mut values: Vec<(u64, i64)> = Vec::new();
797        let mut non_int_values: FxHashMap<Id, Value> = FxHashMap::default();
798
799        for (&id, value) in &self.values {
800            match value {
801                Value::Int64(v) => {
802                    // Convert Id to u64 for indexing (assumes Id can be converted)
803                    let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
804                    values.push((id_u64, *v));
805                }
806                _ => {
807                    non_int_values.insert(id, value.clone());
808                }
809            }
810        }
811
812        if values.len() < 8 {
813            // Not worth compressing
814            return;
815        }
816
817        // Sort by ID for better compression
818        values.sort_by_key(|(id, _)| *id);
819
820        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
821        let index_to_id: Vec<u64> = id_to_index.clone();
822        let int_values: Vec<i64> = values.iter().map(|(_, v)| *v).collect();
823
824        // Compress using the optimal codec
825        let compressed = TypeSpecificCompressor::compress_signed_integers(&int_values);
826
827        // Only use compression if it actually saves space
828        if compressed.compression_ratio() > 1.2 {
829            self.compressed = Some(CompressedColumnData::Integers {
830                data: compressed,
831                id_to_index,
832                index_to_id,
833            });
834            self.compressed_count = values.len();
835            self.values = non_int_values;
836        }
837    }
838
839    /// Compresses string values using dictionary encoding.
840    #[allow(unsafe_code)]
841    fn compress_as_strings(&mut self) {
842        let mut values: Vec<(u64, ArcStr)> = Vec::new();
843        let mut non_str_values: FxHashMap<Id, Value> = FxHashMap::default();
844
845        for (&id, value) in &self.values {
846            match value {
847                Value::String(s) => {
848                    let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
849                    values.push((id_u64, s.clone()));
850                }
851                _ => {
852                    non_str_values.insert(id, value.clone());
853                }
854            }
855        }
856
857        if values.len() < 8 {
858            return;
859        }
860
861        // Sort by ID
862        values.sort_by_key(|(id, _)| *id);
863
864        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
865        let index_to_id: Vec<u64> = id_to_index.clone();
866
867        // Build dictionary
868        let mut builder = DictionaryBuilder::new();
869        for (_, s) in &values {
870            builder.add(s.as_ref());
871        }
872        let encoding = builder.build();
873
874        // Only use compression if it actually saves space
875        if encoding.compression_ratio() > 1.2 {
876            self.compressed = Some(CompressedColumnData::Strings {
877                encoding,
878                id_to_index,
879                index_to_id,
880            });
881            self.compressed_count = values.len();
882            self.values = non_str_values;
883        }
884    }
885
886    /// Compresses boolean values.
887    #[allow(unsafe_code)]
888    fn compress_as_booleans(&mut self) {
889        let mut values: Vec<(u64, bool)> = Vec::new();
890        let mut non_bool_values: FxHashMap<Id, Value> = FxHashMap::default();
891
892        for (&id, value) in &self.values {
893            match value {
894                Value::Bool(b) => {
895                    let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
896                    values.push((id_u64, *b));
897                }
898                _ => {
899                    non_bool_values.insert(id, value.clone());
900                }
901            }
902        }
903
904        if values.len() < 8 {
905            return;
906        }
907
908        // Sort by ID
909        values.sort_by_key(|(id, _)| *id);
910
911        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
912        let index_to_id: Vec<u64> = id_to_index.clone();
913        let bool_values: Vec<bool> = values.iter().map(|(_, v)| *v).collect();
914
915        let compressed = TypeSpecificCompressor::compress_booleans(&bool_values);
916
917        // Booleans always compress well (8x)
918        self.compressed = Some(CompressedColumnData::Booleans {
919            data: compressed,
920            id_to_index,
921            index_to_id,
922        });
923        self.compressed_count = values.len();
924        self.values = non_bool_values;
925    }
926
927    /// Decompresses all values back to the hot buffer.
928    #[allow(unsafe_code)]
929    fn decompress_all(&mut self) {
930        let Some(compressed) = self.compressed.take() else {
931            return;
932        };
933
934        match compressed {
935            CompressedColumnData::Integers {
936                data, index_to_id, ..
937            } => {
938                if let Ok(values) = TypeSpecificCompressor::decompress_integers(&data) {
939                    // Convert back to signed using zigzag decoding
940                    let signed: Vec<i64> = values
941                        .iter()
942                        .map(|&v| crate::storage::zigzag_decode(v))
943                        .collect();
944
945                    for (i, id_u64) in index_to_id.iter().enumerate() {
946                        if let Some(&value) = signed.get(i) {
947                            let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
948                            self.values.insert(id, Value::Int64(value));
949                        }
950                    }
951                }
952            }
953            CompressedColumnData::Strings {
954                encoding,
955                index_to_id,
956                ..
957            } => {
958                for (i, id_u64) in index_to_id.iter().enumerate() {
959                    if let Some(s) = encoding.get(i) {
960                        let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
961                        self.values.insert(id, Value::String(ArcStr::from(s)));
962                    }
963                }
964            }
965            CompressedColumnData::Booleans {
966                data, index_to_id, ..
967            } => {
968                if let Ok(values) = TypeSpecificCompressor::decompress_booleans(&data) {
969                    for (i, id_u64) in index_to_id.iter().enumerate() {
970                        if let Some(&value) = values.get(i) {
971                            let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
972                            self.values.insert(id, Value::Bool(value));
973                        }
974                    }
975                }
976            }
977        }
978
979        self.compressed_count = 0;
980    }
981
982    /// Forces compression regardless of thresholds.
983    ///
984    /// Useful for bulk loading or when you know the column is complete.
985    pub fn force_compress(&mut self) {
986        self.compress();
987    }
988
989    /// Returns the zone map for this column.
990    #[must_use]
991    pub fn zone_map(&self) -> &ZoneMapEntry {
992        &self.zone_map
993    }
994
995    /// Uses zone map to check if any values could satisfy the predicate.
996    ///
997    /// Returns `false` when we can prove no values match (so the column
998    /// can be skipped entirely). Returns `true` if values might match.
999    #[must_use]
1000    pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
1001        if self.zone_map_dirty {
1002            // Conservative: can't skip if zone map is stale
1003            return true;
1004        }
1005
1006        match op {
1007            CompareOp::Eq => self.zone_map.might_contain_equal(value),
1008            CompareOp::Ne => {
1009                // Can only skip if all values are equal to the value
1010                // (which means min == max == value)
1011                match (&self.zone_map.min, &self.zone_map.max) {
1012                    (Some(min), Some(max)) => {
1013                        !(compare_values(min, value) == Some(Ordering::Equal)
1014                            && compare_values(max, value) == Some(Ordering::Equal))
1015                    }
1016                    _ => true,
1017                }
1018            }
1019            CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
1020            CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
1021            CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
1022            CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
1023        }
1024    }
1025
1026    /// Rebuilds zone map from current values.
1027    pub fn rebuild_zone_map(&mut self) {
1028        let mut zone_map = ZoneMapEntry::new();
1029
1030        for value in self.values.values() {
1031            zone_map.row_count += 1;
1032
1033            if matches!(value, Value::Null) {
1034                zone_map.null_count += 1;
1035                continue;
1036            }
1037
1038            // Update min
1039            match &zone_map.min {
1040                None => zone_map.min = Some(value.clone()),
1041                Some(current) => {
1042                    if compare_values(value, current) == Some(Ordering::Less) {
1043                        zone_map.min = Some(value.clone());
1044                    }
1045                }
1046            }
1047
1048            // Update max
1049            match &zone_map.max {
1050                None => zone_map.max = Some(value.clone()),
1051                Some(current) => {
1052                    if compare_values(value, current) == Some(Ordering::Greater) {
1053                        zone_map.max = Some(value.clone());
1054                    }
1055                }
1056            }
1057        }
1058
1059        self.zone_map = zone_map;
1060        self.zone_map_dirty = false;
1061    }
1062}
1063
1064/// Compares two values for ordering.
1065fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
1066    match (a, b) {
1067        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
1068        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
1069        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
1070        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
1071        (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
1072        (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
1073        _ => None,
1074    }
1075}
1076
1077impl<Id: EntityId> Default for PropertyColumn<Id> {
1078    fn default() -> Self {
1079        Self::new()
1080    }
1081}
1082
1083/// A borrowed reference to a property column for bulk reads.
1084///
1085/// Holds the read lock so the column can't change while you're iterating.
1086pub struct PropertyColumnRef<'a, Id: EntityId = NodeId> {
1087    _guard: parking_lot::RwLockReadGuard<'a, FxHashMap<PropertyKey, PropertyColumn<Id>>>,
1088    #[allow(dead_code)]
1089    key: PropertyKey,
1090    _marker: PhantomData<Id>,
1091}
1092
1093#[cfg(test)]
1094mod tests {
1095    use super::*;
1096    use arcstr::ArcStr;
1097
1098    #[test]
1099    fn test_property_storage_basic() {
1100        let storage = PropertyStorage::new();
1101
1102        let node1 = NodeId::new(1);
1103        let node2 = NodeId::new(2);
1104        let name_key = PropertyKey::new("name");
1105        let age_key = PropertyKey::new("age");
1106
1107        storage.set(node1, name_key.clone(), "Alice".into());
1108        storage.set(node1, age_key.clone(), 30i64.into());
1109        storage.set(node2, name_key.clone(), "Bob".into());
1110
1111        assert_eq!(
1112            storage.get(node1, &name_key),
1113            Some(Value::String("Alice".into()))
1114        );
1115        assert_eq!(storage.get(node1, &age_key), Some(Value::Int64(30)));
1116        assert_eq!(
1117            storage.get(node2, &name_key),
1118            Some(Value::String("Bob".into()))
1119        );
1120        assert!(storage.get(node2, &age_key).is_none());
1121    }
1122
1123    #[test]
1124    fn test_property_storage_remove() {
1125        let storage = PropertyStorage::new();
1126
1127        let node = NodeId::new(1);
1128        let key = PropertyKey::new("name");
1129
1130        storage.set(node, key.clone(), "Alice".into());
1131        assert!(storage.get(node, &key).is_some());
1132
1133        let removed = storage.remove(node, &key);
1134        assert!(removed.is_some());
1135        assert!(storage.get(node, &key).is_none());
1136    }
1137
1138    #[test]
1139    fn test_property_storage_get_all() {
1140        let storage = PropertyStorage::new();
1141
1142        let node = NodeId::new(1);
1143        storage.set(node, PropertyKey::new("name"), "Alice".into());
1144        storage.set(node, PropertyKey::new("age"), 30i64.into());
1145        storage.set(node, PropertyKey::new("active"), true.into());
1146
1147        let props = storage.get_all(node);
1148        assert_eq!(props.len(), 3);
1149    }
1150
1151    #[test]
1152    fn test_property_storage_remove_all() {
1153        let storage = PropertyStorage::new();
1154
1155        let node = NodeId::new(1);
1156        storage.set(node, PropertyKey::new("name"), "Alice".into());
1157        storage.set(node, PropertyKey::new("age"), 30i64.into());
1158
1159        storage.remove_all(node);
1160
1161        assert!(storage.get(node, &PropertyKey::new("name")).is_none());
1162        assert!(storage.get(node, &PropertyKey::new("age")).is_none());
1163    }
1164
1165    #[test]
1166    fn test_property_column() {
1167        let mut col = PropertyColumn::new();
1168
1169        col.set(NodeId::new(1), "Alice".into());
1170        col.set(NodeId::new(2), "Bob".into());
1171
1172        assert_eq!(col.len(), 2);
1173        assert!(!col.is_empty());
1174
1175        assert_eq!(col.get(NodeId::new(1)), Some(Value::String("Alice".into())));
1176
1177        col.remove(NodeId::new(1));
1178        assert!(col.get(NodeId::new(1)).is_none());
1179        assert_eq!(col.len(), 1);
1180    }
1181
1182    #[test]
1183    fn test_compression_mode() {
1184        let col: PropertyColumn<NodeId> = PropertyColumn::new();
1185        assert_eq!(col.compression_mode(), CompressionMode::None);
1186
1187        let col: PropertyColumn<NodeId> = PropertyColumn::with_compression(CompressionMode::Auto);
1188        assert_eq!(col.compression_mode(), CompressionMode::Auto);
1189    }
1190
1191    #[test]
1192    fn test_property_storage_with_compression() {
1193        let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1194
1195        for i in 0..100 {
1196            storage.set(
1197                NodeId::new(i),
1198                PropertyKey::new("age"),
1199                Value::Int64(20 + (i as i64 % 50)),
1200            );
1201        }
1202
1203        // Values should still be readable
1204        assert_eq!(
1205            storage.get(NodeId::new(0), &PropertyKey::new("age")),
1206            Some(Value::Int64(20))
1207        );
1208        assert_eq!(
1209            storage.get(NodeId::new(50), &PropertyKey::new("age")),
1210            Some(Value::Int64(20))
1211        );
1212    }
1213
1214    #[test]
1215    fn test_compress_integer_column() {
1216        let mut col: PropertyColumn<NodeId> =
1217            PropertyColumn::with_compression(CompressionMode::Auto);
1218
1219        // Add many sequential integers
1220        for i in 0..2000 {
1221            col.set(NodeId::new(i), Value::Int64(1000 + i as i64));
1222        }
1223
1224        // Should have triggered compression at some point
1225        // Total count should include both compressed and hot buffer values
1226        let stats = col.compression_stats();
1227        assert_eq!(stats.value_count, 2000);
1228
1229        // Values from the hot buffer should be readable
1230        // Note: Compressed values are not accessible via get() - see design note
1231        let last_value = col.get(NodeId::new(1999));
1232        assert!(last_value.is_some() || col.is_compressed());
1233    }
1234
1235    #[test]
1236    fn test_compress_string_column() {
1237        let mut col: PropertyColumn<NodeId> =
1238            PropertyColumn::with_compression(CompressionMode::Auto);
1239
1240        // Add repeated strings (good for dictionary compression)
1241        let categories = ["Person", "Company", "Product", "Location"];
1242        for i in 0..2000 {
1243            let cat = categories[i % 4];
1244            col.set(NodeId::new(i as u64), Value::String(ArcStr::from(cat)));
1245        }
1246
1247        // Total count should be correct
1248        assert_eq!(col.len(), 2000);
1249
1250        // Late values should be in hot buffer and readable
1251        let last_value = col.get(NodeId::new(1999));
1252        assert!(last_value.is_some() || col.is_compressed());
1253    }
1254
1255    #[test]
1256    fn test_compress_boolean_column() {
1257        let mut col: PropertyColumn<NodeId> =
1258            PropertyColumn::with_compression(CompressionMode::Auto);
1259
1260        // Add booleans
1261        for i in 0..2000 {
1262            col.set(NodeId::new(i as u64), Value::Bool(i % 2 == 0));
1263        }
1264
1265        // Verify total count
1266        assert_eq!(col.len(), 2000);
1267
1268        // Late values should be readable
1269        let last_value = col.get(NodeId::new(1999));
1270        assert!(last_value.is_some() || col.is_compressed());
1271    }
1272
1273    #[test]
1274    fn test_force_compress() {
1275        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1276
1277        // Add fewer values than the threshold
1278        for i in 0..100 {
1279            col.set(NodeId::new(i), Value::Int64(i as i64));
1280        }
1281
1282        // Force compression
1283        col.force_compress();
1284
1285        // Stats should show compression was applied if beneficial
1286        let stats = col.compression_stats();
1287        assert_eq!(stats.value_count, 100);
1288    }
1289
1290    #[test]
1291    fn test_compression_stats() {
1292        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1293
1294        for i in 0..50 {
1295            col.set(NodeId::new(i), Value::Int64(i as i64));
1296        }
1297
1298        let stats = col.compression_stats();
1299        assert_eq!(stats.value_count, 50);
1300        assert!(stats.uncompressed_size > 0);
1301    }
1302
1303    #[test]
1304    fn test_storage_compression_stats() {
1305        let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1306
1307        for i in 0..100 {
1308            storage.set(
1309                NodeId::new(i),
1310                PropertyKey::new("age"),
1311                Value::Int64(i as i64),
1312            );
1313            storage.set(
1314                NodeId::new(i),
1315                PropertyKey::new("name"),
1316                Value::String(ArcStr::from("Alice")),
1317            );
1318        }
1319
1320        let stats = storage.compression_stats();
1321        assert_eq!(stats.len(), 2); // Two columns
1322        assert!(stats.contains_key(&PropertyKey::new("age")));
1323        assert!(stats.contains_key(&PropertyKey::new("name")));
1324    }
1325
1326    #[test]
1327    fn test_memory_usage() {
1328        let storage = PropertyStorage::new();
1329
1330        for i in 0..100 {
1331            storage.set(
1332                NodeId::new(i),
1333                PropertyKey::new("value"),
1334                Value::Int64(i as i64),
1335            );
1336        }
1337
1338        let usage = storage.memory_usage();
1339        assert!(usage > 0);
1340    }
1341
1342    #[test]
1343    fn test_get_batch_single_property() {
1344        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1345
1346        let node1 = NodeId::new(1);
1347        let node2 = NodeId::new(2);
1348        let node3 = NodeId::new(3);
1349        let age_key = PropertyKey::new("age");
1350
1351        storage.set(node1, age_key.clone(), 25i64.into());
1352        storage.set(node2, age_key.clone(), 30i64.into());
1353        // node3 has no age property
1354
1355        let ids = vec![node1, node2, node3];
1356        let values = storage.get_batch(&ids, &age_key);
1357
1358        assert_eq!(values.len(), 3);
1359        assert_eq!(values[0], Some(Value::Int64(25)));
1360        assert_eq!(values[1], Some(Value::Int64(30)));
1361        assert_eq!(values[2], None);
1362    }
1363
1364    #[test]
1365    fn test_get_batch_missing_column() {
1366        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1367
1368        let node1 = NodeId::new(1);
1369        let node2 = NodeId::new(2);
1370        let missing_key = PropertyKey::new("nonexistent");
1371
1372        let ids = vec![node1, node2];
1373        let values = storage.get_batch(&ids, &missing_key);
1374
1375        assert_eq!(values.len(), 2);
1376        assert_eq!(values[0], None);
1377        assert_eq!(values[1], None);
1378    }
1379
1380    #[test]
1381    fn test_get_batch_empty_ids() {
1382        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1383        let key = PropertyKey::new("any");
1384
1385        let values = storage.get_batch(&[], &key);
1386        assert!(values.is_empty());
1387    }
1388
1389    #[test]
1390    fn test_get_all_batch() {
1391        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1392
1393        let node1 = NodeId::new(1);
1394        let node2 = NodeId::new(2);
1395        let node3 = NodeId::new(3);
1396
1397        storage.set(node1, PropertyKey::new("name"), "Alice".into());
1398        storage.set(node1, PropertyKey::new("age"), 25i64.into());
1399        storage.set(node2, PropertyKey::new("name"), "Bob".into());
1400        // node3 has no properties
1401
1402        let ids = vec![node1, node2, node3];
1403        let all_props = storage.get_all_batch(&ids);
1404
1405        assert_eq!(all_props.len(), 3);
1406        assert_eq!(all_props[0].len(), 2); // name and age
1407        assert_eq!(all_props[1].len(), 1); // name only
1408        assert_eq!(all_props[2].len(), 0); // no properties
1409
1410        assert_eq!(
1411            all_props[0].get(&PropertyKey::new("name")),
1412            Some(&Value::String("Alice".into()))
1413        );
1414        assert_eq!(
1415            all_props[1].get(&PropertyKey::new("name")),
1416            Some(&Value::String("Bob".into()))
1417        );
1418    }
1419
1420    #[test]
1421    fn test_get_all_batch_empty_ids() {
1422        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1423
1424        let all_props = storage.get_all_batch(&[]);
1425        assert!(all_props.is_empty());
1426    }
1427}