Skip to main content

grafeo_core/graph/lpg/
property.rs

1//! Columnar property storage for nodes and edges.
2//!
3//! Properties are stored column-wise (all "name" values together, all "age"
4//! values together) rather than row-wise. This makes filtering fast - to find
5//! all nodes where age > 30, we only scan the age column.
6//!
7//! Each column also maintains a zone map (min/max/null_count) enabling the
8//! query optimizer to skip columns entirely when a predicate can't match.
9//!
10//! ## Compression
11//!
12//! Columns can be compressed to save memory. When compression is enabled,
13//! the column automatically selects the best codec based on the data type:
14//!
15//! | Data type | Codec | Typical savings |
16//! |-----------|-------|-----------------|
17//! | Int64 (sorted) | DeltaBitPacked | 5-20x |
18//! | Int64 (small) | BitPacked | 2-16x |
19//! | Int64 (repeated) | RunLength | 2-100x |
20//! | String (low cardinality) | Dictionary | 2-50x |
21//! | Bool | BitVector | 8x |
22
23use crate::index::zone_map::ZoneMapEntry;
24use crate::storage::{
25    CompressedData, CompressionCodec, DictionaryBuilder, DictionaryEncoding, TypeSpecificCompressor,
26};
27use arcstr::ArcStr;
28use grafeo_common::types::{EdgeId, NodeId, PropertyKey, Value};
29use grafeo_common::utils::hash::FxHashMap;
30use parking_lot::RwLock;
31use std::cmp::Ordering;
32use std::hash::Hash;
33use std::marker::PhantomData;
34
35/// Compression mode for property columns.
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
37pub enum CompressionMode {
38    /// Never compress - always use sparse HashMap (default).
39    #[default]
40    None,
41    /// Automatically compress when beneficial (after threshold).
42    Auto,
43    /// Eagerly compress on every flush.
44    Eager,
45}
46
47/// Threshold for automatic compression (number of values).
48const COMPRESSION_THRESHOLD: usize = 1000;
49
50/// Size of the hot buffer for recent writes (before compression).
51/// Larger buffer (4096) keeps more recent data uncompressed for faster reads.
52/// This trades ~64KB of memory overhead per column for 1.5-2x faster point lookups
53/// on recently-written data.
54const HOT_BUFFER_SIZE: usize = 4096;
55
56/// Comparison operators used for zone map predicate checks.
57///
58/// These map directly to GQL comparison operators like `=`, `<`, `>=`.
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum CompareOp {
61    /// Equal to value.
62    Eq,
63    /// Not equal to value.
64    Ne,
65    /// Less than value.
66    Lt,
67    /// Less than or equal to value.
68    Le,
69    /// Greater than value.
70    Gt,
71    /// Greater than or equal to value.
72    Ge,
73}
74
75/// Marker trait for IDs that can key into property storage.
76///
77/// Implemented for [`NodeId`] and [`EdgeId`] - you can store properties on both.
78pub trait EntityId: Copy + Eq + Hash + 'static {}
79
80impl EntityId for NodeId {}
81impl EntityId for EdgeId {}
82
83/// Thread-safe columnar property storage.
84///
85/// Each property key ("name", "age", etc.) gets its own column. This layout
86/// is great for analytical queries that filter on specific properties -
87/// you only touch the columns you need.
88///
89/// Generic over `Id` so the same storage works for nodes and edges.
90///
91/// # Example
92///
93/// ```
94/// use grafeo_core::graph::lpg::PropertyStorage;
95/// use grafeo_common::types::{NodeId, PropertyKey};
96///
97/// let storage = PropertyStorage::new();
98/// let alice = NodeId::new(1);
99///
100/// storage.set(alice, PropertyKey::new("name"), "Alice".into());
101/// storage.set(alice, PropertyKey::new("age"), 30i64.into());
102///
103/// // Fetch all properties at once
104/// let props = storage.get_all(alice);
105/// assert_eq!(props.len(), 2);
106/// ```
107pub struct PropertyStorage<Id: EntityId = NodeId> {
108    /// Map from property key to column.
109    /// Lock order: 9 (nested, acquired via LpgStore::node_properties/edge_properties)
110    columns: RwLock<FxHashMap<PropertyKey, PropertyColumn<Id>>>,
111    /// Default compression mode for new columns.
112    default_compression: CompressionMode,
113    _marker: PhantomData<Id>,
114}
115
116impl<Id: EntityId> PropertyStorage<Id> {
117    /// Creates a new property storage.
118    #[must_use]
119    pub fn new() -> Self {
120        Self {
121            columns: RwLock::new(FxHashMap::default()),
122            default_compression: CompressionMode::None,
123            _marker: PhantomData,
124        }
125    }
126
127    /// Creates a new property storage with compression enabled.
128    #[must_use]
129    pub fn with_compression(mode: CompressionMode) -> Self {
130        Self {
131            columns: RwLock::new(FxHashMap::default()),
132            default_compression: mode,
133            _marker: PhantomData,
134        }
135    }
136
137    /// Sets the default compression mode for new columns.
138    pub fn set_default_compression(&mut self, mode: CompressionMode) {
139        self.default_compression = mode;
140    }
141
142    /// Sets a property value for an entity.
143    pub fn set(&self, id: Id, key: PropertyKey, value: Value) {
144        let mut columns = self.columns.write();
145        let mode = self.default_compression;
146        columns
147            .entry(key)
148            .or_insert_with(|| PropertyColumn::with_compression(mode))
149            .set(id, value);
150    }
151
152    /// Enables compression for a specific column.
153    pub fn enable_compression(&self, key: &PropertyKey, mode: CompressionMode) {
154        let mut columns = self.columns.write();
155        if let Some(col) = columns.get_mut(key) {
156            col.set_compression_mode(mode);
157        }
158    }
159
160    /// Compresses all columns that have compression enabled.
161    pub fn compress_all(&self) {
162        let mut columns = self.columns.write();
163        for col in columns.values_mut() {
164            if col.compression_mode() != CompressionMode::None {
165                col.compress();
166            }
167        }
168    }
169
170    /// Forces compression on all columns regardless of mode.
171    pub fn force_compress_all(&self) {
172        let mut columns = self.columns.write();
173        for col in columns.values_mut() {
174            col.force_compress();
175        }
176    }
177
178    /// Returns compression statistics for all columns.
179    #[must_use]
180    pub fn compression_stats(&self) -> FxHashMap<PropertyKey, CompressionStats> {
181        let columns = self.columns.read();
182        columns
183            .iter()
184            .map(|(key, col)| (key.clone(), col.compression_stats()))
185            .collect()
186    }
187
188    /// Returns the total memory usage of all columns.
189    #[must_use]
190    pub fn memory_usage(&self) -> usize {
191        let columns = self.columns.read();
192        columns
193            .values()
194            .map(|col| col.compression_stats().compressed_size)
195            .sum()
196    }
197
198    /// Gets a property value for an entity.
199    #[must_use]
200    pub fn get(&self, id: Id, key: &PropertyKey) -> Option<Value> {
201        let columns = self.columns.read();
202        columns.get(key).and_then(|col| col.get(id))
203    }
204
205    /// Removes a property value for an entity.
206    pub fn remove(&self, id: Id, key: &PropertyKey) -> Option<Value> {
207        let mut columns = self.columns.write();
208        columns.get_mut(key).and_then(|col| col.remove(id))
209    }
210
211    /// Removes all properties for an entity.
212    pub fn remove_all(&self, id: Id) {
213        let mut columns = self.columns.write();
214        for col in columns.values_mut() {
215            col.remove(id);
216        }
217    }
218
219    /// Gets all properties for an entity.
220    #[must_use]
221    pub fn get_all(&self, id: Id) -> FxHashMap<PropertyKey, Value> {
222        let columns = self.columns.read();
223        let mut result = FxHashMap::default();
224        for (key, col) in columns.iter() {
225            if let Some(value) = col.get(id) {
226                result.insert(key.clone(), value);
227            }
228        }
229        result
230    }
231
232    /// Gets property values for multiple entities in a single lock acquisition.
233    ///
234    /// More efficient than calling [`Self::get`] in a loop because it acquires
235    /// the read lock only once.
236    ///
237    /// # Example
238    ///
239    /// ```
240    /// use grafeo_core::graph::lpg::PropertyStorage;
241    /// use grafeo_common::types::{PropertyKey, Value};
242    /// use grafeo_common::NodeId;
243    ///
244    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
245    /// let key = PropertyKey::new("age");
246    /// let ids = vec![NodeId(1), NodeId(2), NodeId(3)];
247    /// let values = storage.get_batch(&ids, &key);
248    /// // values[i] is the property value for ids[i], or None if not set
249    /// ```
250    #[must_use]
251    pub fn get_batch(&self, ids: &[Id], key: &PropertyKey) -> Vec<Option<Value>> {
252        let columns = self.columns.read();
253        match columns.get(key) {
254            Some(col) => ids.iter().map(|&id| col.get(id)).collect(),
255            None => vec![None; ids.len()],
256        }
257    }
258
259    /// Gets all properties for multiple entities efficiently.
260    ///
261    /// More efficient than calling [`Self::get_all`] in a loop because it
262    /// acquires the read lock only once.
263    ///
264    /// # Example
265    ///
266    /// ```
267    /// use grafeo_core::graph::lpg::PropertyStorage;
268    /// use grafeo_common::types::{PropertyKey, Value};
269    /// use grafeo_common::NodeId;
270    ///
271    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
272    /// let ids = vec![NodeId(1), NodeId(2)];
273    /// let all_props = storage.get_all_batch(&ids);
274    /// // all_props[i] is a HashMap of all properties for ids[i]
275    /// ```
276    #[must_use]
277    pub fn get_all_batch(&self, ids: &[Id]) -> Vec<FxHashMap<PropertyKey, Value>> {
278        let columns = self.columns.read();
279        ids.iter()
280            .map(|&id| {
281                let mut result = FxHashMap::default();
282                for (key, col) in columns.iter() {
283                    if let Some(value) = col.get(id) {
284                        result.insert(key.clone(), value);
285                    }
286                }
287                result
288            })
289            .collect()
290    }
291
292    /// Returns the number of property columns.
293    #[must_use]
294    pub fn column_count(&self) -> usize {
295        self.columns.read().len()
296    }
297
298    /// Returns the keys of all columns.
299    #[must_use]
300    pub fn keys(&self) -> Vec<PropertyKey> {
301        self.columns.read().keys().cloned().collect()
302    }
303
304    /// Gets a column by key for bulk access.
305    #[must_use]
306    pub fn column(&self, key: &PropertyKey) -> Option<PropertyColumnRef<'_, Id>> {
307        let columns = self.columns.read();
308        if columns.contains_key(key) {
309            Some(PropertyColumnRef {
310                _guard: columns,
311                key: key.clone(),
312                _marker: PhantomData,
313            })
314        } else {
315            None
316        }
317    }
318
319    /// Checks if a predicate might match any values (using zone maps).
320    ///
321    /// Returns `false` only when we're *certain* no values match - for example,
322    /// if you're looking for age > 100 but the max age is 80. Returns `true`
323    /// if the property doesn't exist (conservative - might match).
324    #[must_use]
325    pub fn might_match(&self, key: &PropertyKey, op: CompareOp, value: &Value) -> bool {
326        let columns = self.columns.read();
327        columns
328            .get(key)
329            .map(|col| col.might_match(op, value))
330            .unwrap_or(true) // No column = assume might match (conservative)
331    }
332
333    /// Gets the zone map for a property column.
334    #[must_use]
335    pub fn zone_map(&self, key: &PropertyKey) -> Option<ZoneMapEntry> {
336        let columns = self.columns.read();
337        columns.get(key).map(|col| col.zone_map().clone())
338    }
339
340    /// Checks if a range predicate might match any values (using zone maps).
341    ///
342    /// Returns `false` only when we're *certain* no values match the range.
343    /// Returns `true` if the property doesn't exist (conservative - might match).
344    #[must_use]
345    pub fn might_match_range(
346        &self,
347        key: &PropertyKey,
348        min: Option<&Value>,
349        max: Option<&Value>,
350        min_inclusive: bool,
351        max_inclusive: bool,
352    ) -> bool {
353        let columns = self.columns.read();
354        columns
355            .get(key)
356            .map(|col| {
357                col.zone_map()
358                    .might_contain_range(min, max, min_inclusive, max_inclusive)
359            })
360            .unwrap_or(true) // No column = assume might match (conservative)
361    }
362
363    /// Rebuilds zone maps for all columns (call after bulk removes).
364    pub fn rebuild_zone_maps(&self) {
365        let mut columns = self.columns.write();
366        for col in columns.values_mut() {
367            col.rebuild_zone_map();
368        }
369    }
370}
371
372impl<Id: EntityId> Default for PropertyStorage<Id> {
373    fn default() -> Self {
374        Self::new()
375    }
376}
377
378/// Compressed storage for a property column.
379///
380/// Holds the compressed representation of values along with the index
381/// mapping entity IDs to positions in the compressed array.
382#[derive(Debug)]
383pub enum CompressedColumnData {
384    /// Compressed integers (Int64 values).
385    Integers {
386        /// Compressed data.
387        data: CompressedData,
388        /// Index: entity ID position -> compressed array index.
389        id_to_index: Vec<u64>,
390        /// Reverse index: compressed array index -> entity ID position.
391        index_to_id: Vec<u64>,
392    },
393    /// Dictionary-encoded strings.
394    Strings {
395        /// Dictionary encoding.
396        encoding: DictionaryEncoding,
397        /// Index: entity ID position -> dictionary index.
398        id_to_index: Vec<u64>,
399        /// Reverse index: dictionary index -> entity ID position.
400        index_to_id: Vec<u64>,
401    },
402    /// Compressed booleans.
403    Booleans {
404        /// Compressed data.
405        data: CompressedData,
406        /// Index: entity ID position -> bit index.
407        id_to_index: Vec<u64>,
408        /// Reverse index: bit index -> entity ID position.
409        index_to_id: Vec<u64>,
410    },
411}
412
413impl CompressedColumnData {
414    /// Returns the memory usage of the compressed data in bytes.
415    #[must_use]
416    pub fn memory_usage(&self) -> usize {
417        match self {
418            CompressedColumnData::Integers {
419                data,
420                id_to_index,
421                index_to_id,
422            } => {
423                data.data.len()
424                    + id_to_index.len() * std::mem::size_of::<u64>()
425                    + index_to_id.len() * std::mem::size_of::<u64>()
426            }
427            CompressedColumnData::Strings {
428                encoding,
429                id_to_index,
430                index_to_id,
431            } => {
432                encoding.codes().len() * std::mem::size_of::<u32>()
433                    + encoding.dictionary().iter().map(|s| s.len()).sum::<usize>()
434                    + id_to_index.len() * std::mem::size_of::<u64>()
435                    + index_to_id.len() * std::mem::size_of::<u64>()
436            }
437            CompressedColumnData::Booleans {
438                data,
439                id_to_index,
440                index_to_id,
441            } => {
442                data.data.len()
443                    + id_to_index.len() * std::mem::size_of::<u64>()
444                    + index_to_id.len() * std::mem::size_of::<u64>()
445            }
446        }
447    }
448
449    /// Returns the compression ratio.
450    #[must_use]
451    #[allow(dead_code)]
452    pub fn compression_ratio(&self) -> f64 {
453        match self {
454            CompressedColumnData::Integers { data, .. } => data.compression_ratio(),
455            CompressedColumnData::Strings { encoding, .. } => encoding.compression_ratio(),
456            CompressedColumnData::Booleans { data, .. } => data.compression_ratio(),
457        }
458    }
459}
460
461/// Statistics about column compression.
462#[derive(Debug, Clone, Default)]
463pub struct CompressionStats {
464    /// Size of uncompressed data in bytes.
465    pub uncompressed_size: usize,
466    /// Size of compressed data in bytes.
467    pub compressed_size: usize,
468    /// Number of values in the column.
469    pub value_count: usize,
470    /// Codec used for compression.
471    pub codec: Option<CompressionCodec>,
472}
473
474impl CompressionStats {
475    /// Returns the compression ratio (uncompressed / compressed).
476    #[must_use]
477    pub fn compression_ratio(&self) -> f64 {
478        if self.compressed_size == 0 {
479            return 1.0;
480        }
481        self.uncompressed_size as f64 / self.compressed_size as f64
482    }
483}
484
485/// A single property column (e.g., all "age" values).
486///
487/// Maintains min/max/null_count for fast predicate evaluation. When you
488/// filter on `age > 50`, we first check if any age could possibly match
489/// before scanning the actual values.
490///
491/// Columns support optional compression for large datasets. When compression
492/// is enabled, the column automatically selects the best codec based on the
493/// data type and characteristics.
494pub struct PropertyColumn<Id: EntityId = NodeId> {
495    /// Sparse storage: entity ID -> value (hot buffer + uncompressed).
496    /// Used for recent writes and when compression is disabled.
497    values: FxHashMap<Id, Value>,
498    /// Zone map tracking min/max/null_count for predicate pushdown.
499    zone_map: ZoneMapEntry,
500    /// Whether zone map needs rebuild (after removes).
501    zone_map_dirty: bool,
502    /// Compression mode for this column.
503    compression_mode: CompressionMode,
504    /// Compressed data (when compression is enabled and triggered).
505    compressed: Option<CompressedColumnData>,
506    /// Number of values before last compression.
507    compressed_count: usize,
508}
509
510impl<Id: EntityId> PropertyColumn<Id> {
511    /// Creates a new empty column.
512    #[must_use]
513    pub fn new() -> Self {
514        Self {
515            values: FxHashMap::default(),
516            zone_map: ZoneMapEntry::new(),
517            zone_map_dirty: false,
518            compression_mode: CompressionMode::None,
519            compressed: None,
520            compressed_count: 0,
521        }
522    }
523
524    /// Creates a new column with the specified compression mode.
525    #[must_use]
526    pub fn with_compression(mode: CompressionMode) -> Self {
527        Self {
528            values: FxHashMap::default(),
529            zone_map: ZoneMapEntry::new(),
530            zone_map_dirty: false,
531            compression_mode: mode,
532            compressed: None,
533            compressed_count: 0,
534        }
535    }
536
537    /// Sets the compression mode for this column.
538    pub fn set_compression_mode(&mut self, mode: CompressionMode) {
539        self.compression_mode = mode;
540        if mode == CompressionMode::None {
541            // Decompress if switching to no compression
542            if self.compressed.is_some() {
543                self.decompress_all();
544            }
545        }
546    }
547
548    /// Returns the compression mode for this column.
549    #[must_use]
550    pub fn compression_mode(&self) -> CompressionMode {
551        self.compression_mode
552    }
553
554    /// Sets a value for an entity.
555    pub fn set(&mut self, id: Id, value: Value) {
556        // Update zone map incrementally
557        self.update_zone_map_on_insert(&value);
558        self.values.insert(id, value);
559
560        // Check if we should compress (in Auto mode)
561        if self.compression_mode == CompressionMode::Auto {
562            let total_count = self.values.len() + self.compressed_count;
563            let hot_buffer_count = self.values.len();
564
565            // Compress when hot buffer exceeds threshold and total is large enough
566            if hot_buffer_count >= HOT_BUFFER_SIZE && total_count >= COMPRESSION_THRESHOLD {
567                self.compress();
568            }
569        }
570    }
571
572    /// Updates zone map when inserting a value.
573    fn update_zone_map_on_insert(&mut self, value: &Value) {
574        self.zone_map.row_count += 1;
575
576        if matches!(value, Value::Null) {
577            self.zone_map.null_count += 1;
578            return;
579        }
580
581        // Update min
582        match &self.zone_map.min {
583            None => self.zone_map.min = Some(value.clone()),
584            Some(current) => {
585                if compare_values(value, current) == Some(Ordering::Less) {
586                    self.zone_map.min = Some(value.clone());
587                }
588            }
589        }
590
591        // Update max
592        match &self.zone_map.max {
593            None => self.zone_map.max = Some(value.clone()),
594            Some(current) => {
595                if compare_values(value, current) == Some(Ordering::Greater) {
596                    self.zone_map.max = Some(value.clone());
597                }
598            }
599        }
600    }
601
602    /// Gets a value for an entity.
603    ///
604    /// First checks the hot buffer (uncompressed values), then falls back
605    /// to the compressed data if present.
606    #[must_use]
607    pub fn get(&self, id: Id) -> Option<Value> {
608        // First check hot buffer
609        if let Some(value) = self.values.get(&id) {
610            return Some(value.clone());
611        }
612
613        // For now, compressed data lookup is not implemented for sparse access
614        // because the compressed format stores values by index, not by entity ID.
615        // This would require maintaining an ID -> index map in CompressedColumnData.
616        // The compressed data is primarily useful for bulk/scan operations.
617        None
618    }
619
620    /// Removes a value for an entity.
621    pub fn remove(&mut self, id: Id) -> Option<Value> {
622        let removed = self.values.remove(&id);
623        if removed.is_some() {
624            // Mark zone map as dirty - would need full rebuild for accurate min/max
625            self.zone_map_dirty = true;
626        }
627        removed
628    }
629
630    /// Returns the number of values in this column (hot + compressed).
631    #[must_use]
632    #[allow(dead_code)]
633    pub fn len(&self) -> usize {
634        self.values.len() + self.compressed_count
635    }
636
637    /// Returns true if this column is empty.
638    #[must_use]
639    #[allow(dead_code)]
640    pub fn is_empty(&self) -> bool {
641        self.values.is_empty() && self.compressed_count == 0
642    }
643
644    /// Iterates over all (id, value) pairs in the hot buffer.
645    ///
646    /// Note: This only iterates over uncompressed values. For full iteration
647    /// including compressed values, use [`iter_all`].
648    #[allow(dead_code)]
649    pub fn iter(&self) -> impl Iterator<Item = (Id, &Value)> {
650        self.values.iter().map(|(&id, v)| (id, v))
651    }
652
653    /// Returns compression statistics for this column.
654    #[must_use]
655    pub fn compression_stats(&self) -> CompressionStats {
656        let hot_size = self.values.len() * std::mem::size_of::<Value>();
657        let compressed_size = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
658        let codec = match &self.compressed {
659            Some(CompressedColumnData::Integers { data, .. }) => Some(data.codec),
660            Some(CompressedColumnData::Strings { .. }) => Some(CompressionCodec::Dictionary),
661            Some(CompressedColumnData::Booleans { data, .. }) => Some(data.codec),
662            None => None,
663        };
664
665        CompressionStats {
666            uncompressed_size: hot_size + self.compressed_count * std::mem::size_of::<Value>(),
667            compressed_size: hot_size + compressed_size,
668            value_count: self.len(),
669            codec,
670        }
671    }
672
673    /// Returns whether the column has compressed data.
674    #[must_use]
675    #[allow(dead_code)]
676    pub fn is_compressed(&self) -> bool {
677        self.compressed.is_some()
678    }
679
680    /// Compresses the hot buffer values.
681    ///
682    /// This merges the hot buffer into the compressed data, selecting the
683    /// best codec based on the value types.
684    ///
685    /// Note: If compressed data already exists, this is a no-op to avoid
686    /// losing previously compressed values. Use `force_compress()` after
687    /// decompressing to re-compress with all values.
688    pub fn compress(&mut self) {
689        if self.values.is_empty() {
690            return;
691        }
692
693        // Don't re-compress if we already have compressed data
694        // (would need to decompress and merge first)
695        if self.compressed.is_some() {
696            return;
697        }
698
699        // Determine the dominant type
700        let (int_count, str_count, bool_count) = self.count_types();
701        let total = self.values.len();
702
703        if int_count > total / 2 {
704            self.compress_as_integers();
705        } else if str_count > total / 2 {
706            self.compress_as_strings();
707        } else if bool_count > total / 2 {
708            self.compress_as_booleans();
709        }
710        // If no dominant type, don't compress (mixed types don't compress well)
711    }
712
713    /// Counts values by type.
714    fn count_types(&self) -> (usize, usize, usize) {
715        let mut int_count = 0;
716        let mut str_count = 0;
717        let mut bool_count = 0;
718
719        for value in self.values.values() {
720            match value {
721                Value::Int64(_) => int_count += 1,
722                Value::String(_) => str_count += 1,
723                Value::Bool(_) => bool_count += 1,
724                _ => {}
725            }
726        }
727
728        (int_count, str_count, bool_count)
729    }
730
731    /// Compresses integer values.
732    #[allow(unsafe_code)]
733    fn compress_as_integers(&mut self) {
734        // Extract integer values and their IDs
735        let mut values: Vec<(u64, i64)> = Vec::new();
736        let mut non_int_values: FxHashMap<Id, Value> = FxHashMap::default();
737
738        for (&id, value) in &self.values {
739            match value {
740                Value::Int64(v) => {
741                    // Convert Id to u64 for indexing (assumes Id can be converted)
742                    let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
743                    values.push((id_u64, *v));
744                }
745                _ => {
746                    non_int_values.insert(id, value.clone());
747                }
748            }
749        }
750
751        if values.len() < 8 {
752            // Not worth compressing
753            return;
754        }
755
756        // Sort by ID for better compression
757        values.sort_by_key(|(id, _)| *id);
758
759        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
760        let index_to_id: Vec<u64> = id_to_index.clone();
761        let int_values: Vec<i64> = values.iter().map(|(_, v)| *v).collect();
762
763        // Compress using the optimal codec
764        let compressed = TypeSpecificCompressor::compress_signed_integers(&int_values);
765
766        // Only use compression if it actually saves space
767        if compressed.compression_ratio() > 1.2 {
768            self.compressed = Some(CompressedColumnData::Integers {
769                data: compressed,
770                id_to_index,
771                index_to_id,
772            });
773            self.compressed_count = values.len();
774            self.values = non_int_values;
775        }
776    }
777
778    /// Compresses string values using dictionary encoding.
779    #[allow(unsafe_code)]
780    fn compress_as_strings(&mut self) {
781        let mut values: Vec<(u64, ArcStr)> = Vec::new();
782        let mut non_str_values: FxHashMap<Id, Value> = FxHashMap::default();
783
784        for (&id, value) in &self.values {
785            match value {
786                Value::String(s) => {
787                    let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
788                    values.push((id_u64, s.clone()));
789                }
790                _ => {
791                    non_str_values.insert(id, value.clone());
792                }
793            }
794        }
795
796        if values.len() < 8 {
797            return;
798        }
799
800        // Sort by ID
801        values.sort_by_key(|(id, _)| *id);
802
803        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
804        let index_to_id: Vec<u64> = id_to_index.clone();
805
806        // Build dictionary
807        let mut builder = DictionaryBuilder::new();
808        for (_, s) in &values {
809            builder.add(s.as_ref());
810        }
811        let encoding = builder.build();
812
813        // Only use compression if it actually saves space
814        if encoding.compression_ratio() > 1.2 {
815            self.compressed = Some(CompressedColumnData::Strings {
816                encoding,
817                id_to_index,
818                index_to_id,
819            });
820            self.compressed_count = values.len();
821            self.values = non_str_values;
822        }
823    }
824
825    /// Compresses boolean values.
826    #[allow(unsafe_code)]
827    fn compress_as_booleans(&mut self) {
828        let mut values: Vec<(u64, bool)> = Vec::new();
829        let mut non_bool_values: FxHashMap<Id, Value> = FxHashMap::default();
830
831        for (&id, value) in &self.values {
832            match value {
833                Value::Bool(b) => {
834                    let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
835                    values.push((id_u64, *b));
836                }
837                _ => {
838                    non_bool_values.insert(id, value.clone());
839                }
840            }
841        }
842
843        if values.len() < 8 {
844            return;
845        }
846
847        // Sort by ID
848        values.sort_by_key(|(id, _)| *id);
849
850        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
851        let index_to_id: Vec<u64> = id_to_index.clone();
852        let bool_values: Vec<bool> = values.iter().map(|(_, v)| *v).collect();
853
854        let compressed = TypeSpecificCompressor::compress_booleans(&bool_values);
855
856        // Booleans always compress well (8x)
857        self.compressed = Some(CompressedColumnData::Booleans {
858            data: compressed,
859            id_to_index,
860            index_to_id,
861        });
862        self.compressed_count = values.len();
863        self.values = non_bool_values;
864    }
865
866    /// Decompresses all values back to the hot buffer.
867    #[allow(unsafe_code)]
868    fn decompress_all(&mut self) {
869        let Some(compressed) = self.compressed.take() else {
870            return;
871        };
872
873        match compressed {
874            CompressedColumnData::Integers {
875                data, index_to_id, ..
876            } => {
877                if let Ok(values) = TypeSpecificCompressor::decompress_integers(&data) {
878                    // Convert back to signed using zigzag decoding
879                    let signed: Vec<i64> = values
880                        .iter()
881                        .map(|&v| crate::storage::zigzag_decode(v))
882                        .collect();
883
884                    for (i, id_u64) in index_to_id.iter().enumerate() {
885                        if let Some(&value) = signed.get(i) {
886                            let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
887                            self.values.insert(id, Value::Int64(value));
888                        }
889                    }
890                }
891            }
892            CompressedColumnData::Strings {
893                encoding,
894                index_to_id,
895                ..
896            } => {
897                for (i, id_u64) in index_to_id.iter().enumerate() {
898                    if let Some(s) = encoding.get(i) {
899                        let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
900                        self.values.insert(id, Value::String(ArcStr::from(s)));
901                    }
902                }
903            }
904            CompressedColumnData::Booleans {
905                data, index_to_id, ..
906            } => {
907                if let Ok(values) = TypeSpecificCompressor::decompress_booleans(&data) {
908                    for (i, id_u64) in index_to_id.iter().enumerate() {
909                        if let Some(&value) = values.get(i) {
910                            let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
911                            self.values.insert(id, Value::Bool(value));
912                        }
913                    }
914                }
915            }
916        }
917
918        self.compressed_count = 0;
919    }
920
921    /// Forces compression regardless of thresholds.
922    ///
923    /// Useful for bulk loading or when you know the column is complete.
924    pub fn force_compress(&mut self) {
925        self.compress();
926    }
927
928    /// Returns the zone map for this column.
929    #[must_use]
930    pub fn zone_map(&self) -> &ZoneMapEntry {
931        &self.zone_map
932    }
933
934    /// Uses zone map to check if any values could satisfy the predicate.
935    ///
936    /// Returns `false` when we can prove no values match (so the column
937    /// can be skipped entirely). Returns `true` if values might match.
938    #[must_use]
939    pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
940        if self.zone_map_dirty {
941            // Conservative: can't skip if zone map is stale
942            return true;
943        }
944
945        match op {
946            CompareOp::Eq => self.zone_map.might_contain_equal(value),
947            CompareOp::Ne => {
948                // Can only skip if all values are equal to the value
949                // (which means min == max == value)
950                match (&self.zone_map.min, &self.zone_map.max) {
951                    (Some(min), Some(max)) => {
952                        !(compare_values(min, value) == Some(Ordering::Equal)
953                            && compare_values(max, value) == Some(Ordering::Equal))
954                    }
955                    _ => true,
956                }
957            }
958            CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
959            CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
960            CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
961            CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
962        }
963    }
964
965    /// Rebuilds zone map from current values.
966    pub fn rebuild_zone_map(&mut self) {
967        let mut zone_map = ZoneMapEntry::new();
968
969        for value in self.values.values() {
970            zone_map.row_count += 1;
971
972            if matches!(value, Value::Null) {
973                zone_map.null_count += 1;
974                continue;
975            }
976
977            // Update min
978            match &zone_map.min {
979                None => zone_map.min = Some(value.clone()),
980                Some(current) => {
981                    if compare_values(value, current) == Some(Ordering::Less) {
982                        zone_map.min = Some(value.clone());
983                    }
984                }
985            }
986
987            // Update max
988            match &zone_map.max {
989                None => zone_map.max = Some(value.clone()),
990                Some(current) => {
991                    if compare_values(value, current) == Some(Ordering::Greater) {
992                        zone_map.max = Some(value.clone());
993                    }
994                }
995            }
996        }
997
998        self.zone_map = zone_map;
999        self.zone_map_dirty = false;
1000    }
1001}
1002
1003/// Compares two values for ordering.
1004fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
1005    match (a, b) {
1006        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
1007        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
1008        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
1009        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
1010        (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
1011        (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
1012        _ => None,
1013    }
1014}
1015
1016impl<Id: EntityId> Default for PropertyColumn<Id> {
1017    fn default() -> Self {
1018        Self::new()
1019    }
1020}
1021
1022/// A borrowed reference to a property column for bulk reads.
1023///
1024/// Holds the read lock so the column can't change while you're iterating.
1025pub struct PropertyColumnRef<'a, Id: EntityId = NodeId> {
1026    _guard: parking_lot::RwLockReadGuard<'a, FxHashMap<PropertyKey, PropertyColumn<Id>>>,
1027    #[allow(dead_code)]
1028    key: PropertyKey,
1029    _marker: PhantomData<Id>,
1030}
1031
1032#[cfg(test)]
1033mod tests {
1034    use super::*;
1035    use arcstr::ArcStr;
1036
1037    #[test]
1038    fn test_property_storage_basic() {
1039        let storage = PropertyStorage::new();
1040
1041        let node1 = NodeId::new(1);
1042        let node2 = NodeId::new(2);
1043        let name_key = PropertyKey::new("name");
1044        let age_key = PropertyKey::new("age");
1045
1046        storage.set(node1, name_key.clone(), "Alice".into());
1047        storage.set(node1, age_key.clone(), 30i64.into());
1048        storage.set(node2, name_key.clone(), "Bob".into());
1049
1050        assert_eq!(
1051            storage.get(node1, &name_key),
1052            Some(Value::String("Alice".into()))
1053        );
1054        assert_eq!(storage.get(node1, &age_key), Some(Value::Int64(30)));
1055        assert_eq!(
1056            storage.get(node2, &name_key),
1057            Some(Value::String("Bob".into()))
1058        );
1059        assert!(storage.get(node2, &age_key).is_none());
1060    }
1061
1062    #[test]
1063    fn test_property_storage_remove() {
1064        let storage = PropertyStorage::new();
1065
1066        let node = NodeId::new(1);
1067        let key = PropertyKey::new("name");
1068
1069        storage.set(node, key.clone(), "Alice".into());
1070        assert!(storage.get(node, &key).is_some());
1071
1072        let removed = storage.remove(node, &key);
1073        assert!(removed.is_some());
1074        assert!(storage.get(node, &key).is_none());
1075    }
1076
1077    #[test]
1078    fn test_property_storage_get_all() {
1079        let storage = PropertyStorage::new();
1080
1081        let node = NodeId::new(1);
1082        storage.set(node, PropertyKey::new("name"), "Alice".into());
1083        storage.set(node, PropertyKey::new("age"), 30i64.into());
1084        storage.set(node, PropertyKey::new("active"), true.into());
1085
1086        let props = storage.get_all(node);
1087        assert_eq!(props.len(), 3);
1088    }
1089
1090    #[test]
1091    fn test_property_storage_remove_all() {
1092        let storage = PropertyStorage::new();
1093
1094        let node = NodeId::new(1);
1095        storage.set(node, PropertyKey::new("name"), "Alice".into());
1096        storage.set(node, PropertyKey::new("age"), 30i64.into());
1097
1098        storage.remove_all(node);
1099
1100        assert!(storage.get(node, &PropertyKey::new("name")).is_none());
1101        assert!(storage.get(node, &PropertyKey::new("age")).is_none());
1102    }
1103
1104    #[test]
1105    fn test_property_column() {
1106        let mut col = PropertyColumn::new();
1107
1108        col.set(NodeId::new(1), "Alice".into());
1109        col.set(NodeId::new(2), "Bob".into());
1110
1111        assert_eq!(col.len(), 2);
1112        assert!(!col.is_empty());
1113
1114        assert_eq!(col.get(NodeId::new(1)), Some(Value::String("Alice".into())));
1115
1116        col.remove(NodeId::new(1));
1117        assert!(col.get(NodeId::new(1)).is_none());
1118        assert_eq!(col.len(), 1);
1119    }
1120
1121    #[test]
1122    fn test_compression_mode() {
1123        let col: PropertyColumn<NodeId> = PropertyColumn::new();
1124        assert_eq!(col.compression_mode(), CompressionMode::None);
1125
1126        let col: PropertyColumn<NodeId> = PropertyColumn::with_compression(CompressionMode::Auto);
1127        assert_eq!(col.compression_mode(), CompressionMode::Auto);
1128    }
1129
1130    #[test]
1131    fn test_property_storage_with_compression() {
1132        let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1133
1134        for i in 0..100 {
1135            storage.set(
1136                NodeId::new(i),
1137                PropertyKey::new("age"),
1138                Value::Int64(20 + (i as i64 % 50)),
1139            );
1140        }
1141
1142        // Values should still be readable
1143        assert_eq!(
1144            storage.get(NodeId::new(0), &PropertyKey::new("age")),
1145            Some(Value::Int64(20))
1146        );
1147        assert_eq!(
1148            storage.get(NodeId::new(50), &PropertyKey::new("age")),
1149            Some(Value::Int64(20))
1150        );
1151    }
1152
1153    #[test]
1154    fn test_compress_integer_column() {
1155        let mut col: PropertyColumn<NodeId> =
1156            PropertyColumn::with_compression(CompressionMode::Auto);
1157
1158        // Add many sequential integers
1159        for i in 0..2000 {
1160            col.set(NodeId::new(i), Value::Int64(1000 + i as i64));
1161        }
1162
1163        // Should have triggered compression at some point
1164        // Total count should include both compressed and hot buffer values
1165        let stats = col.compression_stats();
1166        assert_eq!(stats.value_count, 2000);
1167
1168        // Values from the hot buffer should be readable
1169        // Note: Compressed values are not accessible via get() - see design note
1170        let last_value = col.get(NodeId::new(1999));
1171        assert!(last_value.is_some() || col.is_compressed());
1172    }
1173
1174    #[test]
1175    fn test_compress_string_column() {
1176        let mut col: PropertyColumn<NodeId> =
1177            PropertyColumn::with_compression(CompressionMode::Auto);
1178
1179        // Add repeated strings (good for dictionary compression)
1180        let categories = ["Person", "Company", "Product", "Location"];
1181        for i in 0..2000 {
1182            let cat = categories[i % 4];
1183            col.set(NodeId::new(i as u64), Value::String(ArcStr::from(cat)));
1184        }
1185
1186        // Total count should be correct
1187        assert_eq!(col.len(), 2000);
1188
1189        // Late values should be in hot buffer and readable
1190        let last_value = col.get(NodeId::new(1999));
1191        assert!(last_value.is_some() || col.is_compressed());
1192    }
1193
1194    #[test]
1195    fn test_compress_boolean_column() {
1196        let mut col: PropertyColumn<NodeId> =
1197            PropertyColumn::with_compression(CompressionMode::Auto);
1198
1199        // Add booleans
1200        for i in 0..2000 {
1201            col.set(NodeId::new(i as u64), Value::Bool(i % 2 == 0));
1202        }
1203
1204        // Verify total count
1205        assert_eq!(col.len(), 2000);
1206
1207        // Late values should be readable
1208        let last_value = col.get(NodeId::new(1999));
1209        assert!(last_value.is_some() || col.is_compressed());
1210    }
1211
1212    #[test]
1213    fn test_force_compress() {
1214        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1215
1216        // Add fewer values than the threshold
1217        for i in 0..100 {
1218            col.set(NodeId::new(i), Value::Int64(i as i64));
1219        }
1220
1221        // Force compression
1222        col.force_compress();
1223
1224        // Stats should show compression was applied if beneficial
1225        let stats = col.compression_stats();
1226        assert_eq!(stats.value_count, 100);
1227    }
1228
1229    #[test]
1230    fn test_compression_stats() {
1231        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1232
1233        for i in 0..50 {
1234            col.set(NodeId::new(i), Value::Int64(i as i64));
1235        }
1236
1237        let stats = col.compression_stats();
1238        assert_eq!(stats.value_count, 50);
1239        assert!(stats.uncompressed_size > 0);
1240    }
1241
1242    #[test]
1243    fn test_storage_compression_stats() {
1244        let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1245
1246        for i in 0..100 {
1247            storage.set(
1248                NodeId::new(i),
1249                PropertyKey::new("age"),
1250                Value::Int64(i as i64),
1251            );
1252            storage.set(
1253                NodeId::new(i),
1254                PropertyKey::new("name"),
1255                Value::String(ArcStr::from("Alice")),
1256            );
1257        }
1258
1259        let stats = storage.compression_stats();
1260        assert_eq!(stats.len(), 2); // Two columns
1261        assert!(stats.contains_key(&PropertyKey::new("age")));
1262        assert!(stats.contains_key(&PropertyKey::new("name")));
1263    }
1264
1265    #[test]
1266    fn test_memory_usage() {
1267        let storage = PropertyStorage::new();
1268
1269        for i in 0..100 {
1270            storage.set(
1271                NodeId::new(i),
1272                PropertyKey::new("value"),
1273                Value::Int64(i as i64),
1274            );
1275        }
1276
1277        let usage = storage.memory_usage();
1278        assert!(usage > 0);
1279    }
1280
1281    #[test]
1282    fn test_get_batch_single_property() {
1283        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1284
1285        let node1 = NodeId::new(1);
1286        let node2 = NodeId::new(2);
1287        let node3 = NodeId::new(3);
1288        let age_key = PropertyKey::new("age");
1289
1290        storage.set(node1, age_key.clone(), 25i64.into());
1291        storage.set(node2, age_key.clone(), 30i64.into());
1292        // node3 has no age property
1293
1294        let ids = vec![node1, node2, node3];
1295        let values = storage.get_batch(&ids, &age_key);
1296
1297        assert_eq!(values.len(), 3);
1298        assert_eq!(values[0], Some(Value::Int64(25)));
1299        assert_eq!(values[1], Some(Value::Int64(30)));
1300        assert_eq!(values[2], None);
1301    }
1302
1303    #[test]
1304    fn test_get_batch_missing_column() {
1305        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1306
1307        let node1 = NodeId::new(1);
1308        let node2 = NodeId::new(2);
1309        let missing_key = PropertyKey::new("nonexistent");
1310
1311        let ids = vec![node1, node2];
1312        let values = storage.get_batch(&ids, &missing_key);
1313
1314        assert_eq!(values.len(), 2);
1315        assert_eq!(values[0], None);
1316        assert_eq!(values[1], None);
1317    }
1318
1319    #[test]
1320    fn test_get_batch_empty_ids() {
1321        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1322        let key = PropertyKey::new("any");
1323
1324        let values = storage.get_batch(&[], &key);
1325        assert!(values.is_empty());
1326    }
1327
1328    #[test]
1329    fn test_get_all_batch() {
1330        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1331
1332        let node1 = NodeId::new(1);
1333        let node2 = NodeId::new(2);
1334        let node3 = NodeId::new(3);
1335
1336        storage.set(node1, PropertyKey::new("name"), "Alice".into());
1337        storage.set(node1, PropertyKey::new("age"), 25i64.into());
1338        storage.set(node2, PropertyKey::new("name"), "Bob".into());
1339        // node3 has no properties
1340
1341        let ids = vec![node1, node2, node3];
1342        let all_props = storage.get_all_batch(&ids);
1343
1344        assert_eq!(all_props.len(), 3);
1345        assert_eq!(all_props[0].len(), 2); // name and age
1346        assert_eq!(all_props[1].len(), 1); // name only
1347        assert_eq!(all_props[2].len(), 0); // no properties
1348
1349        assert_eq!(
1350            all_props[0].get(&PropertyKey::new("name")),
1351            Some(&Value::String("Alice".into()))
1352        );
1353        assert_eq!(
1354            all_props[1].get(&PropertyKey::new("name")),
1355            Some(&Value::String("Bob".into()))
1356        );
1357    }
1358
1359    #[test]
1360    fn test_get_all_batch_empty_ids() {
1361        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1362
1363        let all_props = storage.get_all_batch(&[]);
1364        assert!(all_props.is_empty());
1365    }
1366}