Skip to main content

grafeo_core/graph/lpg/
property.rs

1//! Columnar property storage for nodes and edges.
2//!
3//! Properties are stored column-wise (all "name" values together, all "age"
4//! values together) rather than row-wise. This makes filtering fast - to find
5//! all nodes where age > 30, we only scan the age column.
6//!
7//! Each column also maintains a zone map (min/max/null_count) enabling the
8//! query optimizer to skip columns entirely when a predicate can't match.
9//!
10//! ## Compression
11//!
12//! Columns can be compressed to save memory. When compression is enabled,
13//! the column automatically selects the best codec based on the data type:
14//!
15//! | Data type | Codec | Typical savings |
16//! |-----------|-------|-----------------|
17//! | Int64 (sorted) | DeltaBitPacked | 5-20x |
18//! | Int64 (small) | BitPacked | 2-16x |
19//! | Int64 (repeated) | RunLength | 2-100x |
20//! | String (low cardinality) | Dictionary | 2-50x |
21//! | Bool | BitVector | 8x |
22
23use crate::codec::CompressionCodec;
24#[cfg(not(feature = "temporal"))]
25use crate::codec::{CompressedData, DictionaryBuilder, DictionaryEncoding, TypeSpecificCompressor};
26use crate::index::zone_map::ZoneMapEntry;
27#[cfg(not(feature = "temporal"))]
28use arcstr::ArcStr;
29#[cfg(feature = "temporal")]
30use grafeo_common::temporal::VersionLog;
31#[cfg(feature = "temporal")]
32use grafeo_common::types::EpochId;
33use grafeo_common::types::{EdgeId, NodeId, PropertyKey, Value};
34use grafeo_common::utils::hash::FxHashMap;
35use parking_lot::RwLock;
36use std::cmp::Ordering;
37use std::hash::Hash;
38use std::marker::PhantomData;
39
40/// Compression mode for property columns.
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
42#[non_exhaustive]
43pub enum CompressionMode {
44    /// Never compress - always use sparse HashMap (default).
45    #[default]
46    None,
47    /// Automatically compress when beneficial (after threshold).
48    Auto,
49    /// Eagerly compress on every flush.
50    Eager,
51}
52
53/// Threshold for automatic compression (number of values).
54#[cfg(not(feature = "temporal"))]
55const COMPRESSION_THRESHOLD: usize = 1000;
56
57/// Size of the hot buffer for recent writes (before compression).
58/// Larger buffer (4096) keeps more recent data uncompressed for faster reads.
59/// This trades ~64KB of memory overhead per column for 1.5-2x faster point lookups
60/// on recently-written data.
61#[cfg(not(feature = "temporal"))]
62const HOT_BUFFER_SIZE: usize = 4096;
63
64/// Comparison operators used for zone map predicate checks.
65///
66/// These map directly to GQL comparison operators like `=`, `<`, `>=`.
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68#[non_exhaustive]
69pub enum CompareOp {
70    /// Equal to value.
71    Eq,
72    /// Not equal to value.
73    Ne,
74    /// Less than value.
75    Lt,
76    /// Less than or equal to value.
77    Le,
78    /// Greater than value.
79    Gt,
80    /// Greater than or equal to value.
81    Ge,
82}
83
84/// Trait for IDs that can key into property storage.
85///
86/// Implemented for [`NodeId`] and [`EdgeId`] - you can store properties on both.
87/// Provides safe conversions to/from `u64` for compression, replacing unsafe transmute.
88pub trait EntityId: Copy + Eq + Hash + 'static {
89    /// Returns the raw `u64` value.
90    fn as_u64(self) -> u64;
91    /// Creates an ID from a raw `u64` value.
92    fn from_u64(v: u64) -> Self;
93}
94
95impl EntityId for NodeId {
96    #[inline]
97    fn as_u64(self) -> u64 {
98        self.0
99    }
100    #[inline]
101    fn from_u64(v: u64) -> Self {
102        Self(v)
103    }
104}
105
106impl EntityId for EdgeId {
107    #[inline]
108    fn as_u64(self) -> u64 {
109        self.0
110    }
111    #[inline]
112    fn from_u64(v: u64) -> Self {
113        Self(v)
114    }
115}
116
117/// Thread-safe columnar property storage.
118///
119/// Each property key ("name", "age", etc.) gets its own column. This layout
120/// is great for analytical queries that filter on specific properties -
121/// you only touch the columns you need.
122///
123/// Generic over `Id` so the same storage works for nodes and edges.
124///
125/// # Example
126///
127/// ```
128/// # #[cfg(not(feature = "temporal"))]
129/// # {
130/// use grafeo_core::graph::lpg::PropertyStorage;
131/// use grafeo_common::types::{NodeId, PropertyKey};
132///
133/// let storage = PropertyStorage::new();
134/// let alix = NodeId::new(1);
135///
136/// storage.set(alix, PropertyKey::new("name"), "Alix".into());
137/// storage.set(alix, PropertyKey::new("age"), 30i64.into());
138///
139/// // Fetch all properties at once
140/// let props = storage.get_all(alix);
141/// assert_eq!(props.len(), 2);
142/// # }
143/// ```
144pub struct PropertyStorage<Id: EntityId = NodeId> {
145    /// Map from property key to column.
146    /// Lock order: 9 (nested, acquired via LpgStore::node_properties/edge_properties)
147    columns: RwLock<FxHashMap<PropertyKey, PropertyColumn<Id>>>,
148    /// Default compression mode for new columns.
149    default_compression: CompressionMode,
150    _marker: PhantomData<Id>,
151}
152
153impl<Id: EntityId> PropertyStorage<Id> {
154    /// Creates a new property storage.
155    #[must_use]
156    pub fn new() -> Self {
157        Self {
158            columns: RwLock::new(FxHashMap::default()),
159            default_compression: CompressionMode::None,
160            _marker: PhantomData,
161        }
162    }
163
164    /// Creates a new property storage with compression enabled.
165    #[must_use]
166    pub fn with_compression(mode: CompressionMode) -> Self {
167        Self {
168            columns: RwLock::new(FxHashMap::default()),
169            default_compression: mode,
170            _marker: PhantomData,
171        }
172    }
173
174    /// Sets the default compression mode for new columns.
175    pub fn set_default_compression(&mut self, mode: CompressionMode) {
176        self.default_compression = mode;
177    }
178
179    /// Sets a property value for an entity.
180    #[cfg(not(feature = "temporal"))]
181    pub fn set(&self, id: Id, key: PropertyKey, value: Value) {
182        let mut columns = self.columns.write();
183        let mode = self.default_compression;
184        columns
185            .entry(key)
186            .or_insert_with(|| PropertyColumn::with_compression(mode))
187            .set(id, value);
188    }
189
190    /// Sets a property value for an entity at a specific epoch.
191    ///
192    /// For non-transactional writes, pass the current epoch.
193    /// For transactional writes, pass `EpochId::PENDING`.
194    #[cfg(feature = "temporal")]
195    pub fn set(&self, id: Id, key: PropertyKey, value: Value, epoch: EpochId) {
196        let mut columns = self.columns.write();
197        let mode = self.default_compression;
198        columns
199            .entry(key)
200            .or_insert_with(|| PropertyColumn::with_compression(mode))
201            .set(id, value, epoch);
202    }
203
204    /// Enables compression for a specific column.
205    pub fn enable_compression(&self, key: &PropertyKey, mode: CompressionMode) {
206        let mut columns = self.columns.write();
207        if let Some(col) = columns.get_mut(key) {
208            col.set_compression_mode(mode);
209        }
210    }
211
212    /// Compresses all columns that have compression enabled.
213    pub fn compress_all(&self) {
214        let mut columns = self.columns.write();
215        for col in columns.values_mut() {
216            if col.compression_mode() != CompressionMode::None {
217                col.compress();
218            }
219        }
220    }
221
222    /// Forces compression on all columns regardless of mode.
223    pub fn force_compress_all(&self) {
224        let mut columns = self.columns.write();
225        for col in columns.values_mut() {
226            col.force_compress();
227        }
228    }
229
230    /// Returns compression statistics for all columns.
231    #[must_use]
232    pub fn compression_stats(&self) -> FxHashMap<PropertyKey, CompressionStats> {
233        let columns = self.columns.read();
234        columns
235            .iter()
236            .map(|(key, col)| (key.clone(), col.compression_stats()))
237            .collect()
238    }
239
240    /// Returns the total memory usage of all columns (compressed size estimate).
241    #[must_use]
242    pub fn memory_usage(&self) -> usize {
243        let columns = self.columns.read();
244        columns
245            .values()
246            .map(|col| col.compression_stats().compressed_size)
247            .sum()
248    }
249
250    /// Returns estimated heap memory for all columns including hash map overhead.
251    #[must_use]
252    pub fn heap_memory_bytes(&self) -> usize {
253        let columns = self.columns.read();
254        // Outer hash map capacity
255        let map_overhead = columns.capacity()
256            * (std::mem::size_of::<PropertyKey>() + std::mem::size_of::<PropertyColumn<Id>>() + 1);
257        // Sum of all column heap memory
258        let column_bytes: usize = columns.values().map(|col| col.heap_memory_bytes()).sum();
259        map_overhead + column_bytes
260    }
261
262    /// Gets a property value for an entity.
263    #[must_use]
264    pub fn get(&self, id: Id, key: &PropertyKey) -> Option<Value> {
265        let columns = self.columns.read();
266        columns.get(key).and_then(|col| col.get(id))
267    }
268
269    /// Removes a property value for an entity.
270    #[cfg(not(feature = "temporal"))]
271    pub fn remove(&self, id: Id, key: &PropertyKey) -> Option<Value> {
272        let mut columns = self.columns.write();
273        columns.get_mut(key).and_then(|col| col.remove(id))
274    }
275
276    /// Removes a property value for an entity (temporal: appends tombstone at epoch).
277    #[cfg(feature = "temporal")]
278    pub fn remove(&self, id: Id, key: &PropertyKey, epoch: EpochId) -> Option<Value> {
279        let mut columns = self.columns.write();
280        columns.get_mut(key).and_then(|col| col.remove(id, epoch))
281    }
282
283    /// Removes all properties for an entity.
284    #[cfg(not(feature = "temporal"))]
285    pub fn remove_all(&self, id: Id) {
286        let mut columns = self.columns.write();
287        for col in columns.values_mut() {
288            col.remove(id);
289        }
290    }
291
292    /// Removes all properties for an entity (temporal: tombstones at current epoch).
293    #[cfg(feature = "temporal")]
294    pub fn remove_all(&self, id: Id, epoch: EpochId) {
295        let mut columns = self.columns.write();
296        for col in columns.values_mut() {
297            col.remove(id, epoch);
298        }
299    }
300
301    /// Gets all properties for an entity.
302    #[must_use]
303    pub fn get_all(&self, id: Id) -> FxHashMap<PropertyKey, Value> {
304        let columns = self.columns.read();
305        let mut result = FxHashMap::default();
306        for (key, col) in columns.iter() {
307            if let Some(value) = col.get(id) {
308                result.insert(key.clone(), value);
309            }
310        }
311        result
312    }
313
314    /// Gets property values for multiple entities in a single lock acquisition.
315    ///
316    /// More efficient than calling [`Self::get`] in a loop because it acquires
317    /// the read lock only once.
318    ///
319    /// # Example
320    ///
321    /// ```
322    /// use grafeo_core::graph::lpg::PropertyStorage;
323    /// use grafeo_common::types::{PropertyKey, Value};
324    /// use grafeo_common::NodeId;
325    ///
326    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
327    /// let key = PropertyKey::new("age");
328    /// let ids = vec![NodeId(1), NodeId(2), NodeId(3)];
329    /// let values = storage.get_batch(&ids, &key);
330    /// // values[i] is the property value for ids[i], or None if not set
331    /// ```
332    #[must_use]
333    pub fn get_batch(&self, ids: &[Id], key: &PropertyKey) -> Vec<Option<Value>> {
334        let columns = self.columns.read();
335        match columns.get(key) {
336            Some(col) => ids.iter().map(|&id| col.get(id)).collect(),
337            None => vec![None; ids.len()],
338        }
339    }
340
341    /// Gets all properties for multiple entities efficiently.
342    ///
343    /// More efficient than calling [`Self::get_all`] in a loop because it
344    /// acquires the read lock only once.
345    ///
346    /// # Example
347    ///
348    /// ```
349    /// use grafeo_core::graph::lpg::PropertyStorage;
350    /// use grafeo_common::types::{PropertyKey, Value};
351    /// use grafeo_common::NodeId;
352    ///
353    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
354    /// let ids = vec![NodeId(1), NodeId(2)];
355    /// let all_props = storage.get_all_batch(&ids);
356    /// // all_props[i] is a HashMap of all properties for ids[i]
357    /// ```
358    #[must_use]
359    pub fn get_all_batch(&self, ids: &[Id]) -> Vec<FxHashMap<PropertyKey, Value>> {
360        let columns = self.columns.read();
361        let column_count = columns.len();
362
363        // Pre-allocate result vector with exact capacity (NebulaGraph pattern)
364        let mut results = Vec::with_capacity(ids.len());
365
366        for &id in ids {
367            // Pre-allocate HashMap with expected column count
368            let mut result = FxHashMap::with_capacity_and_hasher(column_count, Default::default());
369            for (key, col) in columns.iter() {
370                if let Some(value) = col.get(id) {
371                    result.insert(key.clone(), value);
372                }
373            }
374            results.push(result);
375        }
376
377        results
378    }
379
380    /// Gets selected properties for multiple entities efficiently (projection pushdown).
381    ///
382    /// This is more efficient than [`Self::get_all_batch`] when you only need a subset
383    /// of properties - it only iterates the requested columns instead of all columns.
384    ///
385    /// **Performance**: O(N × K) where N = ids.len() and K = keys.len(),
386    /// compared to O(N × C) for `get_all_batch` where C = total column count.
387    ///
388    /// # Example
389    ///
390    /// ```
391    /// use grafeo_core::graph::lpg::PropertyStorage;
392    /// use grafeo_common::types::{PropertyKey, Value};
393    /// use grafeo_common::NodeId;
394    ///
395    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
396    /// let ids = vec![NodeId::new(1), NodeId::new(2)];
397    /// let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
398    ///
399    /// // Only fetches "name" and "age" columns, ignoring other properties
400    /// let props = storage.get_selective_batch(&ids, &keys);
401    /// ```
402    #[must_use]
403    pub fn get_selective_batch(
404        &self,
405        ids: &[Id],
406        keys: &[PropertyKey],
407    ) -> Vec<FxHashMap<PropertyKey, Value>> {
408        if keys.is_empty() {
409            // No properties requested - return empty maps
410            return vec![FxHashMap::default(); ids.len()];
411        }
412
413        let columns = self.columns.read();
414
415        // Pre-collect only the columns we need (avoids re-lookup per id)
416        let requested_columns: Vec<_> = keys
417            .iter()
418            .filter_map(|key| columns.get(key).map(|col| (key, col)))
419            .collect();
420
421        // Pre-allocate result with exact capacity
422        let mut results = Vec::with_capacity(ids.len());
423
424        for &id in ids {
425            let mut result =
426                FxHashMap::with_capacity_and_hasher(requested_columns.len(), Default::default());
427            // Only iterate requested columns, not all columns
428            for (key, col) in &requested_columns {
429                if let Some(value) = col.get(id) {
430                    result.insert((*key).clone(), value);
431                }
432            }
433            results.push(result);
434        }
435
436        results
437    }
438
439    /// Returns the number of property columns.
440    #[must_use]
441    pub fn column_count(&self) -> usize {
442        self.columns.read().len()
443    }
444
445    /// Returns the keys of all columns.
446    #[must_use]
447    pub fn keys(&self) -> Vec<PropertyKey> {
448        self.columns.read().keys().cloned().collect()
449    }
450
451    /// Removes all property data.
452    pub fn clear(&self) {
453        self.columns.write().clear();
454    }
455
456    // ── Column-level spill / reload ────────────────────────────────
457
458    /// Evicts all values from a specific property column, freeing heap memory.
459    ///
460    /// Returns `(count, estimated_freed_bytes)`. The column stays registered
461    /// (zone map preserved) but `get()` returns `None` until `restore_column()`.
462    #[cfg(not(feature = "temporal"))]
463    pub fn evict_column(&self, key: &PropertyKey) -> (usize, usize) {
464        let mut columns = self.columns.write();
465        if let Some(column) = columns.get_mut(key) {
466            column.evict_values()
467        } else {
468            (0, 0)
469        }
470    }
471
472    /// Restores values into a previously evicted column.
473    ///
474    /// Clears the `spilled` flag on the column. If the column doesn't exist,
475    /// it is created.
476    #[cfg(not(feature = "temporal"))]
477    pub fn restore_column(&self, key: &PropertyKey, values: impl Iterator<Item = (Id, Value)>) {
478        let mut columns = self.columns.write();
479        let column = columns
480            .entry(key.clone())
481            .or_insert_with(|| PropertyColumn::with_compression(self.default_compression));
482        column.restore_values(values);
483    }
484
485    /// Drains all values from a column, returning them for export to disk.
486    ///
487    /// After this call, `is_column_spilled(key)` returns `true`.
488    /// The column remains registered (zone map preserved).
489    #[cfg(not(feature = "temporal"))]
490    pub fn drain_column(&self, key: &PropertyKey) -> Vec<(Id, Value)> {
491        let mut columns = self.columns.write();
492        if let Some(column) = columns.get_mut(key) {
493            column.drain_values()
494        } else {
495            Vec::new()
496        }
497    }
498
499    /// Whether a specific column has been spilled to disk.
500    #[cfg(not(feature = "temporal"))]
501    #[must_use]
502    pub fn is_column_spilled(&self, key: &PropertyKey) -> bool {
503        self.columns
504            .read()
505            .get(key)
506            .is_some_and(|col| col.is_spilled())
507    }
508
509    /// Marks a column as spilled without draining its values.
510    ///
511    /// Used on startup when re-establishing spill state: the column may
512    /// already be empty (loaded from a checkpoint that serialized after spill).
513    #[cfg(not(feature = "temporal"))]
514    pub fn mark_column_spilled(&self, key: &PropertyKey) {
515        let mut columns = self.columns.write();
516        let column = columns.entry(key.clone()).or_default();
517        column.mark_spilled();
518    }
519
520    /// Gets a column by key for bulk access.
521    #[must_use]
522    pub fn column(&self, key: &PropertyKey) -> Option<PropertyColumnRef<'_, Id>> {
523        let columns = self.columns.read();
524        if columns.contains_key(key) {
525            Some(PropertyColumnRef {
526                _guard: columns,
527                _key: key.clone(),
528                _marker: PhantomData,
529            })
530        } else {
531            None
532        }
533    }
534
535    /// Checks if a predicate might match any values (using zone maps).
536    ///
537    /// Returns `false` only when we're *certain* no values match - for example,
538    /// if you're looking for age > 100 but the max age is 80. Returns `true`
539    /// if the property doesn't exist (conservative - might match).
540    #[must_use]
541    pub fn might_match(&self, key: &PropertyKey, op: CompareOp, value: &Value) -> bool {
542        let columns = self.columns.read();
543        columns
544            .get(key)
545            .map_or(true, |col| col.might_match(op, value)) // No column = assume might match (conservative)
546    }
547
548    /// Gets the zone map for a property column.
549    #[must_use]
550    pub fn zone_map(&self, key: &PropertyKey) -> Option<ZoneMapEntry> {
551        let columns = self.columns.read();
552        columns.get(key).map(|col| col.zone_map().clone())
553    }
554
555    /// Checks if a range predicate might match any values (using zone maps).
556    ///
557    /// Returns `false` only when we're *certain* no values match the range.
558    /// Returns `true` if the property doesn't exist (conservative - might match).
559    #[must_use]
560    pub fn might_match_range(
561        &self,
562        key: &PropertyKey,
563        min: Option<&Value>,
564        max: Option<&Value>,
565        min_inclusive: bool,
566        max_inclusive: bool,
567    ) -> bool {
568        let columns = self.columns.read();
569        columns.get(key).map_or(true, |col| {
570            col.zone_map()
571                .might_contain_range(min, max, min_inclusive, max_inclusive)
572        }) // No column = assume might match (conservative)
573    }
574
575    /// Rebuilds zone maps for all columns (call after bulk removes).
576    pub fn rebuild_zone_maps(&self) {
577        let mut columns = self.columns.write();
578        for col in columns.values_mut() {
579            col.rebuild_zone_map();
580        }
581    }
582}
583
584impl<Id: EntityId> Default for PropertyStorage<Id> {
585    fn default() -> Self {
586        Self::new()
587    }
588}
589
590// === Temporal-only methods for PropertyStorage ===
591#[cfg(feature = "temporal")]
592impl<Id: EntityId> PropertyStorage<Id> {
593    /// Returns a write guard to the columns map for targeted rollback.
594    pub(crate) fn columns_write(
595        &self,
596    ) -> parking_lot::RwLockWriteGuard<'_, FxHashMap<PropertyKey, PropertyColumn<Id>>> {
597        self.columns.write()
598    }
599
600    /// Gets a property value at a specific epoch.
601    #[must_use]
602    pub fn get_at(&self, id: Id, key: &PropertyKey, epoch: EpochId) -> Option<Value> {
603        let columns = self.columns.read();
604        columns.get(key).and_then(|col| col.get_at(id, epoch))
605    }
606
607    /// Gets all properties for an entity at a specific epoch.
608    #[must_use]
609    pub fn get_all_at(&self, id: Id, epoch: EpochId) -> FxHashMap<PropertyKey, Value> {
610        let columns = self.columns.read();
611        let mut result = FxHashMap::default();
612        for (key, col) in columns.iter() {
613            if let Some(value) = col.get_at(id, epoch) {
614                result.insert(key.clone(), value);
615            }
616        }
617        result
618    }
619
620    /// Replaces PENDING epochs with the real commit epoch in all columns.
621    pub fn finalize_pending(&self, real_epoch: EpochId) {
622        let mut columns = self.columns.write();
623        for col in columns.values_mut() {
624            col.finalize_pending(real_epoch);
625        }
626    }
627
628    /// Removes all PENDING entries from all columns (transaction rollback).
629    pub fn remove_pending(&self) {
630        let mut columns = self.columns.write();
631        for col in columns.values_mut() {
632            col.remove_pending();
633        }
634    }
635
636    /// Garbage-collects old versions from all columns.
637    pub fn gc(&self, min_epoch: EpochId) {
638        let mut columns = self.columns.write();
639        for col in columns.values_mut() {
640            col.gc(min_epoch);
641        }
642    }
643
644    /// Returns the full version history for all properties of an entity.
645    ///
646    /// Each entry is `(key, Vec<(epoch, value)>)`. Useful for snapshot
647    /// export that preserves temporal history.
648    #[must_use]
649    pub fn get_all_history(&self, id: Id) -> Vec<(PropertyKey, Vec<(EpochId, Value)>)> {
650        let columns = self.columns.read();
651        let mut result = Vec::new();
652        for (key, col) in columns.iter() {
653            if let Some(log) = col.values.get(&id) {
654                let entries: Vec<(EpochId, Value)> = log
655                    .history()
656                    .iter()
657                    .map(|(epoch, value)| (*epoch, value.clone()))
658                    .collect();
659                if !entries.is_empty() {
660                    result.push((key.clone(), entries));
661                }
662            }
663        }
664        result
665    }
666
667    /// Returns the version history for a single property of an entity.
668    ///
669    /// More efficient than `get_all_history` when only one property is needed.
670    #[must_use]
671    pub fn get_history(&self, id: Id, key: &PropertyKey) -> Vec<(EpochId, Value)> {
672        let columns = self.columns.read();
673        columns
674            .get(key)
675            .and_then(|col| col.values.get(&id))
676            .map(|log| log.history().iter().map(|(e, v)| (*e, v.clone())).collect())
677            .unwrap_or_default()
678    }
679}
680
681/// Compressed storage for a property column.
682///
683/// Holds the compressed representation of values along with the index
684/// mapping entity IDs to positions in the compressed array.
685#[cfg(not(feature = "temporal"))]
686#[derive(Debug)]
687#[non_exhaustive]
688pub enum CompressedColumnData {
689    /// Compressed integers (Int64 values).
690    Integers {
691        /// Compressed data.
692        data: CompressedData,
693        /// Index: entity ID position -> compressed array index.
694        id_to_index: Vec<u64>,
695        /// Reverse index: compressed array index -> entity ID position.
696        index_to_id: Vec<u64>,
697    },
698    /// Dictionary-encoded strings.
699    Strings {
700        /// Dictionary encoding.
701        encoding: DictionaryEncoding,
702        /// Index: entity ID position -> dictionary index.
703        id_to_index: Vec<u64>,
704        /// Reverse index: dictionary index -> entity ID position.
705        index_to_id: Vec<u64>,
706    },
707    /// Compressed booleans.
708    Booleans {
709        /// Compressed data.
710        data: CompressedData,
711        /// Index: entity ID position -> bit index.
712        id_to_index: Vec<u64>,
713        /// Reverse index: bit index -> entity ID position.
714        index_to_id: Vec<u64>,
715    },
716}
717
718#[cfg(not(feature = "temporal"))]
719impl CompressedColumnData {
720    /// Returns the memory usage of the compressed data in bytes.
721    #[must_use]
722    pub fn memory_usage(&self) -> usize {
723        match self {
724            CompressedColumnData::Integers {
725                data,
726                id_to_index,
727                index_to_id,
728            } => {
729                data.data.len()
730                    + id_to_index.len() * std::mem::size_of::<u64>()
731                    + index_to_id.len() * std::mem::size_of::<u64>()
732            }
733            CompressedColumnData::Strings {
734                encoding,
735                id_to_index,
736                index_to_id,
737            } => {
738                encoding.codes().len() * std::mem::size_of::<u32>()
739                    + encoding.dictionary().iter().map(|s| s.len()).sum::<usize>()
740                    + id_to_index.len() * std::mem::size_of::<u64>()
741                    + index_to_id.len() * std::mem::size_of::<u64>()
742            }
743            CompressedColumnData::Booleans {
744                data,
745                id_to_index,
746                index_to_id,
747            } => {
748                data.data.len()
749                    + id_to_index.len() * std::mem::size_of::<u64>()
750                    + index_to_id.len() * std::mem::size_of::<u64>()
751            }
752        }
753    }
754}
755
756/// Statistics about column compression.
757#[derive(Debug, Clone, Default)]
758pub struct CompressionStats {
759    /// Size of uncompressed data in bytes.
760    pub uncompressed_size: usize,
761    /// Size of compressed data in bytes.
762    pub compressed_size: usize,
763    /// Number of values in the column.
764    pub value_count: usize,
765    /// Codec used for compression.
766    pub codec: Option<CompressionCodec>,
767}
768
769impl CompressionStats {
770    /// Returns the compression ratio (uncompressed / compressed).
771    #[must_use]
772    pub fn compression_ratio(&self) -> f64 {
773        if self.compressed_size == 0 {
774            return 1.0;
775        }
776        self.uncompressed_size as f64 / self.compressed_size as f64
777    }
778}
779
780/// A single property column (e.g., all "age" values).
781///
782/// Maintains min/max/null_count for fast predicate evaluation. When you
783/// filter on `age > 50`, we first check if any age could possibly match
784/// before scanning the actual values.
785///
786/// Columns support optional compression for large datasets. When compression
787/// is enabled, the column automatically selects the best codec based on the
788/// data type and characteristics.
789pub struct PropertyColumn<Id: EntityId = NodeId> {
790    /// Sparse storage: entity ID -> value (hot buffer + uncompressed).
791    /// Used for recent writes and when compression is disabled.
792    #[cfg(not(feature = "temporal"))]
793    values: FxHashMap<Id, Value>,
794    /// Versioned storage: entity ID -> append-only version log.
795    /// Each value is tagged with the epoch it was written in.
796    #[cfg(feature = "temporal")]
797    values: FxHashMap<Id, VersionLog<Value>>,
798    /// Zone map tracking min/max/null_count for predicate pushdown.
799    zone_map: ZoneMapEntry,
800    /// Whether zone map needs rebuild (after removes).
801    zone_map_dirty: bool,
802    /// Compression mode for this column.
803    compression_mode: CompressionMode,
804    /// Compressed data (when compression is enabled and triggered).
805    #[cfg(not(feature = "temporal"))]
806    compressed: Option<CompressedColumnData>,
807    /// Number of values before last compression.
808    #[cfg(not(feature = "temporal"))]
809    compressed_count: usize,
810    /// Whether this column's values have been spilled to disk.
811    /// When true, `get()` returns `None` for all IDs. The column remains
812    /// registered so the schema knows the property exists, but values are
813    /// served from a mmap-backed store instead.
814    #[cfg(not(feature = "temporal"))]
815    spilled: bool,
816}
817
818#[cfg(not(feature = "temporal"))]
819impl<Id: EntityId> PropertyColumn<Id> {
820    /// Creates a new empty column.
821    #[must_use]
822    pub fn new() -> Self {
823        Self {
824            values: FxHashMap::default(),
825            zone_map: ZoneMapEntry::new(),
826            zone_map_dirty: false,
827            compression_mode: CompressionMode::None,
828            compressed: None,
829            compressed_count: 0,
830            spilled: false,
831        }
832    }
833
834    /// Creates a new column with the specified compression mode.
835    #[must_use]
836    pub fn with_compression(mode: CompressionMode) -> Self {
837        Self {
838            values: FxHashMap::default(),
839            zone_map: ZoneMapEntry::new(),
840            zone_map_dirty: false,
841            compression_mode: mode,
842            compressed: None,
843            compressed_count: 0,
844            spilled: false,
845        }
846    }
847
848    /// Sets the compression mode for this column.
849    pub fn set_compression_mode(&mut self, mode: CompressionMode) {
850        self.compression_mode = mode;
851        if mode == CompressionMode::None {
852            // Decompress if switching to no compression
853            if self.compressed.is_some() {
854                self.decompress_all();
855            }
856        }
857    }
858
859    /// Returns the compression mode for this column.
860    #[must_use]
861    pub fn compression_mode(&self) -> CompressionMode {
862        self.compression_mode
863    }
864
865    /// Sets a value for an entity.
866    pub fn set(&mut self, id: Id, value: Value) {
867        // Update zone map incrementally
868        self.update_zone_map_on_insert(&value);
869        self.values.insert(id, value);
870
871        // Check if we should compress (in Auto mode)
872        if self.compression_mode == CompressionMode::Auto {
873            let total_count = self.values.len() + self.compressed_count;
874            let hot_buffer_count = self.values.len();
875
876            // Compress when hot buffer exceeds threshold and total is large enough
877            if hot_buffer_count >= HOT_BUFFER_SIZE && total_count >= COMPRESSION_THRESHOLD {
878                self.compress();
879            }
880        }
881    }
882
883    /// Updates zone map when inserting a value.
884    fn update_zone_map_on_insert(&mut self, value: &Value) {
885        self.zone_map.row_count += 1;
886
887        if matches!(value, Value::Null) {
888            self.zone_map.null_count += 1;
889            return;
890        }
891
892        // Update min
893        match &self.zone_map.min {
894            None => self.zone_map.min = Some(value.clone()),
895            Some(current) => {
896                if compare_values(value, current) == Some(Ordering::Less) {
897                    self.zone_map.min = Some(value.clone());
898                }
899            }
900        }
901
902        // Update max
903        match &self.zone_map.max {
904            None => self.zone_map.max = Some(value.clone()),
905            Some(current) => {
906                if compare_values(value, current) == Some(Ordering::Greater) {
907                    self.zone_map.max = Some(value.clone());
908                }
909            }
910        }
911    }
912
913    /// Gets a value for an entity.
914    ///
915    /// First checks the hot buffer (uncompressed values), then falls back
916    /// to the compressed data if present.
917    #[must_use]
918    pub fn get(&self, id: Id) -> Option<Value> {
919        // First check hot buffer
920        if let Some(value) = self.values.get(&id) {
921            return Some(value.clone());
922        }
923
924        // For now, compressed data lookup is not implemented for sparse access
925        // because the compressed format stores values by index, not by entity ID.
926        // This would require maintaining an ID -> index map in CompressedColumnData.
927        // The compressed data is primarily useful for bulk/scan operations.
928        None
929    }
930
931    /// Removes a value for an entity.
932    pub fn remove(&mut self, id: Id) -> Option<Value> {
933        let removed = self.values.remove(&id);
934        if removed.is_some() {
935            // Mark zone map as dirty - would need full rebuild for accurate min/max
936            self.zone_map_dirty = true;
937        }
938        removed
939    }
940
941    // ── Spill / Reload ─────────────────────────────────────────────
942
943    /// Marks the column as spilled without clearing values.
944    ///
945    /// Used on startup when re-establishing spill state from persisted files.
946    pub fn mark_spilled(&mut self) {
947        self.spilled = true;
948    }
949
950    /// Whether this column's values have been spilled to disk.
951    ///
952    /// When spilled, `get()` returns `None` and values are served from an
953    /// external mmap-backed store. New writes still go into this column
954    /// (the accessor checks both).
955    #[must_use]
956    pub fn is_spilled(&self) -> bool {
957        self.spilled
958    }
959
960    /// Evicts all values from this column, freeing their heap memory.
961    ///
962    /// Returns `(count, estimated_freed_bytes)`. After this call,
963    /// `is_spilled()` returns `true` and `get()` returns `None` for all IDs.
964    /// The column remains registered in the schema (zone map, compression
965    /// metadata are preserved).
966    pub fn evict_values(&mut self) -> (usize, usize) {
967        let count = self.values.len();
968        let freed_bytes = self.heap_memory_bytes();
969        self.values.clear();
970        self.values.shrink_to_fit();
971        self.compressed = None;
972        self.compressed_count = 0;
973        self.spilled = true;
974        (count, freed_bytes)
975    }
976
977    /// Drains all values from this column, returning them for export.
978    ///
979    /// After this call, `is_spilled()` returns `true`. This combines
980    /// export + evict in one step to avoid cloning all values.
981    pub fn drain_values(&mut self) -> Vec<(Id, Value)> {
982        let drained: Vec<(Id, Value)> = self.values.drain().collect();
983        self.values.shrink_to_fit();
984        self.compressed = None;
985        self.compressed_count = 0;
986        self.spilled = true;
987        drained
988    }
989
990    /// Restores values into this column after a reload from disk.
991    ///
992    /// Clears the `spilled` flag. Callers are responsible for providing
993    /// the correct values (from `MmapStorage::export_all()` or similar).
994    pub fn restore_values(&mut self, values: impl Iterator<Item = (Id, Value)>) {
995        self.spilled = false;
996        // Insert directly into the map without calling set(), which would
997        // re-increment zone map counters (row_count, null_count) on top of
998        // the already-preserved zone map from before eviction.
999        for (id, value) in values {
1000            self.values.insert(id, value);
1001        }
1002    }
1003
1004    /// Returns the number of values in this column (hot + compressed).
1005    #[must_use]
1006    pub fn len(&self) -> usize {
1007        self.values.len() + self.compressed_count
1008    }
1009
1010    /// Returns true if this column is empty.
1011    #[cfg(test)]
1012    #[must_use]
1013    pub fn is_empty(&self) -> bool {
1014        self.values.is_empty() && self.compressed_count == 0
1015    }
1016
1017    /// Returns compression statistics for this column.
1018    #[must_use]
1019    pub fn compression_stats(&self) -> CompressionStats {
1020        let hot_size = self.values.len() * std::mem::size_of::<Value>();
1021        let compressed_size = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
1022        let codec = match &self.compressed {
1023            Some(CompressedColumnData::Integers { data, .. }) => Some(data.codec),
1024            Some(CompressedColumnData::Strings { .. }) => Some(CompressionCodec::Dictionary),
1025            Some(CompressedColumnData::Booleans { data, .. }) => Some(data.codec),
1026            None => None,
1027        };
1028
1029        CompressionStats {
1030            uncompressed_size: hot_size + self.compressed_count * std::mem::size_of::<Value>(),
1031            compressed_size: hot_size + compressed_size,
1032            value_count: self.len(),
1033            codec,
1034        }
1035    }
1036
1037    /// Returns estimated heap memory for this column.
1038    ///
1039    /// Includes the hot buffer hash map capacity, zone map, and any
1040    /// compressed data.
1041    #[must_use]
1042    pub fn heap_memory_bytes(&self) -> usize {
1043        // Hot buffer: FxHashMap<Id, Value> capacity
1044        let hot_bytes =
1045            self.values.capacity() * (std::mem::size_of::<Id>() + std::mem::size_of::<Value>() + 1);
1046        // Compressed data
1047        let compressed_bytes = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
1048        // ZoneMapEntry is inline (no heap), so just hot + compressed
1049        hot_bytes + compressed_bytes
1050    }
1051
1052    /// Returns whether the column has compressed data.
1053    #[must_use]
1054    #[cfg(test)]
1055    pub fn is_compressed(&self) -> bool {
1056        self.compressed.is_some()
1057    }
1058
1059    /// Compresses the hot buffer values.
1060    ///
1061    /// This merges the hot buffer into the compressed data, selecting the
1062    /// best codec based on the value types.
1063    ///
1064    /// Note: If compressed data already exists, this is a no-op to avoid
1065    /// losing previously compressed values. Use `force_compress()` after
1066    /// decompressing to re-compress with all values.
1067    pub fn compress(&mut self) {
1068        if self.values.is_empty() {
1069            return;
1070        }
1071
1072        // Don't re-compress if we already have compressed data
1073        // (would need to decompress and merge first)
1074        if self.compressed.is_some() {
1075            return;
1076        }
1077
1078        // Determine the dominant type
1079        let (int_count, str_count, bool_count) = self.count_types();
1080        let total = self.values.len();
1081
1082        if int_count > total / 2 {
1083            self.compress_as_integers();
1084        } else if str_count > total / 2 {
1085            self.compress_as_strings();
1086        } else if bool_count > total / 2 {
1087            self.compress_as_booleans();
1088        }
1089        // If no dominant type, don't compress (mixed types don't compress well)
1090    }
1091
1092    /// Counts values by type.
1093    fn count_types(&self) -> (usize, usize, usize) {
1094        let mut int_count = 0;
1095        let mut str_count = 0;
1096        let mut bool_count = 0;
1097
1098        for value in self.values.values() {
1099            match value {
1100                Value::Int64(_) => int_count += 1,
1101                Value::String(_) => str_count += 1,
1102                Value::Bool(_) => bool_count += 1,
1103                _ => {}
1104            }
1105        }
1106
1107        (int_count, str_count, bool_count)
1108    }
1109
1110    /// Compresses integer values.
1111    fn compress_as_integers(&mut self) {
1112        // Extract integer values and their IDs
1113        let mut values: Vec<(u64, i64)> = Vec::new();
1114        let mut non_int_values: FxHashMap<Id, Value> = FxHashMap::default();
1115
1116        for (&id, value) in &self.values {
1117            match value {
1118                Value::Int64(v) => {
1119                    let id_u64 = id.as_u64();
1120                    values.push((id_u64, *v));
1121                }
1122                _ => {
1123                    non_int_values.insert(id, value.clone());
1124                }
1125            }
1126        }
1127
1128        if values.len() < 8 {
1129            // Not worth compressing
1130            return;
1131        }
1132
1133        // Sort by ID for better compression
1134        values.sort_by_key(|(id, _)| *id);
1135
1136        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
1137        let index_to_id: Vec<u64> = id_to_index.clone();
1138        let int_values: Vec<i64> = values.iter().map(|(_, v)| *v).collect();
1139
1140        // Compress using the optimal codec
1141        let Ok(compressed) = TypeSpecificCompressor::compress_signed_integers(&int_values) else {
1142            return;
1143        };
1144
1145        // Only use compression if it actually saves space
1146        if compressed.compression_ratio() > 1.2 {
1147            self.compressed = Some(CompressedColumnData::Integers {
1148                data: compressed,
1149                id_to_index,
1150                index_to_id,
1151            });
1152            self.compressed_count = values.len();
1153            self.values = non_int_values;
1154        }
1155    }
1156
1157    /// Compresses string values using dictionary encoding.
1158    fn compress_as_strings(&mut self) {
1159        let mut values: Vec<(u64, ArcStr)> = Vec::new();
1160        let mut non_str_values: FxHashMap<Id, Value> = FxHashMap::default();
1161
1162        for (&id, value) in &self.values {
1163            match value {
1164                Value::String(s) => {
1165                    values.push((id.as_u64(), s.clone()));
1166                }
1167                _ => {
1168                    non_str_values.insert(id, value.clone());
1169                }
1170            }
1171        }
1172
1173        if values.len() < 8 {
1174            return;
1175        }
1176
1177        // Sort by ID
1178        values.sort_by_key(|(id, _)| *id);
1179
1180        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
1181        let index_to_id: Vec<u64> = id_to_index.clone();
1182
1183        // Build dictionary
1184        let mut builder = DictionaryBuilder::new();
1185        for (_, s) in &values {
1186            builder.add(s.as_ref());
1187        }
1188        let encoding = builder.build();
1189
1190        // Only use compression if it actually saves space
1191        if encoding.compression_ratio() > 1.2 {
1192            self.compressed = Some(CompressedColumnData::Strings {
1193                encoding,
1194                id_to_index,
1195                index_to_id,
1196            });
1197            self.compressed_count = values.len();
1198            self.values = non_str_values;
1199        }
1200    }
1201
1202    /// Compresses boolean values.
1203    fn compress_as_booleans(&mut self) {
1204        let mut values: Vec<(u64, bool)> = Vec::new();
1205        let mut non_bool_values: FxHashMap<Id, Value> = FxHashMap::default();
1206
1207        for (&id, value) in &self.values {
1208            match value {
1209                Value::Bool(b) => {
1210                    values.push((id.as_u64(), *b));
1211                }
1212                _ => {
1213                    non_bool_values.insert(id, value.clone());
1214                }
1215            }
1216        }
1217
1218        if values.len() < 8 {
1219            return;
1220        }
1221
1222        // Sort by ID
1223        values.sort_by_key(|(id, _)| *id);
1224
1225        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
1226        let index_to_id: Vec<u64> = id_to_index.clone();
1227        let bool_values: Vec<bool> = values.iter().map(|(_, v)| *v).collect();
1228
1229        let Ok(compressed) = TypeSpecificCompressor::compress_booleans(&bool_values) else {
1230            return;
1231        };
1232
1233        // Booleans always compress well (8x)
1234        self.compressed = Some(CompressedColumnData::Booleans {
1235            data: compressed,
1236            id_to_index,
1237            index_to_id,
1238        });
1239        self.compressed_count = values.len();
1240        self.values = non_bool_values;
1241    }
1242
1243    /// Decompresses all values back to the hot buffer.
1244    fn decompress_all(&mut self) {
1245        let Some(compressed) = self.compressed.take() else {
1246            return;
1247        };
1248
1249        match compressed {
1250            CompressedColumnData::Integers {
1251                data, index_to_id, ..
1252            } => {
1253                if let Ok(values) = TypeSpecificCompressor::decompress_integers(&data) {
1254                    // Convert back to signed using zigzag decoding
1255                    let signed: Vec<i64> = values
1256                        .iter()
1257                        .map(|&v| crate::codec::zigzag_decode(v))
1258                        .collect();
1259
1260                    for (i, id_u64) in index_to_id.iter().enumerate() {
1261                        if let Some(&value) = signed.get(i) {
1262                            let id = Id::from_u64(*id_u64);
1263                            self.values.insert(id, Value::Int64(value));
1264                        }
1265                    }
1266                }
1267            }
1268            CompressedColumnData::Strings {
1269                encoding,
1270                index_to_id,
1271                ..
1272            } => {
1273                for (i, id_u64) in index_to_id.iter().enumerate() {
1274                    if let Some(s) = encoding.get(i) {
1275                        let id = Id::from_u64(*id_u64);
1276                        self.values.insert(id, Value::String(ArcStr::from(s)));
1277                    }
1278                }
1279            }
1280            CompressedColumnData::Booleans {
1281                data, index_to_id, ..
1282            } => {
1283                if let Ok(values) = TypeSpecificCompressor::decompress_booleans(&data) {
1284                    for (i, id_u64) in index_to_id.iter().enumerate() {
1285                        if let Some(&value) = values.get(i) {
1286                            let id = Id::from_u64(*id_u64);
1287                            self.values.insert(id, Value::Bool(value));
1288                        }
1289                    }
1290                }
1291            }
1292        }
1293
1294        self.compressed_count = 0;
1295    }
1296
1297    /// Forces compression regardless of thresholds.
1298    ///
1299    /// Useful for bulk loading or when you know the column is complete.
1300    pub fn force_compress(&mut self) {
1301        self.compress();
1302    }
1303
1304    /// Returns the zone map for this column.
1305    #[must_use]
1306    pub fn zone_map(&self) -> &ZoneMapEntry {
1307        &self.zone_map
1308    }
1309
1310    /// Uses zone map to check if any values could satisfy the predicate.
1311    ///
1312    /// Returns `false` when we can prove no values match (so the column
1313    /// can be skipped entirely). Returns `true` if values might match.
1314    #[must_use]
1315    pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
1316        if self.zone_map_dirty {
1317            // Conservative: can't skip if zone map is stale
1318            return true;
1319        }
1320
1321        match op {
1322            CompareOp::Eq => self.zone_map.might_contain_equal(value),
1323            CompareOp::Ne => {
1324                // Can only skip if all values are equal to the value
1325                // (which means min == max == value)
1326                match (&self.zone_map.min, &self.zone_map.max) {
1327                    (Some(min), Some(max)) => {
1328                        !(compare_values(min, value) == Some(Ordering::Equal)
1329                            && compare_values(max, value) == Some(Ordering::Equal))
1330                    }
1331                    _ => true,
1332                }
1333            }
1334            CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
1335            CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
1336            CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
1337            CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
1338        }
1339    }
1340
1341    /// Rebuilds zone map from current values.
1342    pub fn rebuild_zone_map(&mut self) {
1343        let mut zone_map = ZoneMapEntry::new();
1344
1345        for value in self.values.values() {
1346            zone_map.row_count += 1;
1347
1348            if matches!(value, Value::Null) {
1349                zone_map.null_count += 1;
1350                continue;
1351            }
1352
1353            // Update min
1354            match &zone_map.min {
1355                None => zone_map.min = Some(value.clone()),
1356                Some(current) => {
1357                    if compare_values(value, current) == Some(Ordering::Less) {
1358                        zone_map.min = Some(value.clone());
1359                    }
1360                }
1361            }
1362
1363            // Update max
1364            match &zone_map.max {
1365                None => zone_map.max = Some(value.clone()),
1366                Some(current) => {
1367                    if compare_values(value, current) == Some(Ordering::Greater) {
1368                        zone_map.max = Some(value.clone());
1369                    }
1370                }
1371            }
1372        }
1373
1374        self.zone_map = zone_map;
1375        self.zone_map_dirty = false;
1376    }
1377}
1378
1379// === Temporal implementation: VersionLog-backed property column ===
1380//
1381// **Zone map limitation**: zone maps track min/max across the *latest* values
1382// only (see `rebuild_zone_map`). For temporal queries at old epochs, the zone
1383// map may produce false negatives: it could reject a column based on current
1384// min/max even though historical values would match. This is a known
1385// trade-off: temporal queries are conservative but never return wrong results
1386// (the `zone_map_dirty` fallback returns `true` = "might match").
1387//
1388// **Compression**: disabled in temporal mode because the underlying codecs
1389// (DeltaBitPacked, Dictionary, BitVector) operate on flat `FxHashMap<Id, Value>`
1390// arrays, not `FxHashMap<Id, VersionLog<Value>>`. Per-epoch compression is a
1391// potential future optimization.
1392#[cfg(feature = "temporal")]
1393impl<Id: EntityId> PropertyColumn<Id> {
1394    /// Creates a new empty column.
1395    #[must_use]
1396    pub fn new() -> Self {
1397        Self {
1398            values: FxHashMap::default(),
1399            zone_map: ZoneMapEntry::new(),
1400            zone_map_dirty: false,
1401            compression_mode: CompressionMode::None,
1402        }
1403    }
1404
1405    /// Creates a new column with the specified compression mode.
1406    #[must_use]
1407    pub fn with_compression(mode: CompressionMode) -> Self {
1408        Self {
1409            values: FxHashMap::default(),
1410            zone_map: ZoneMapEntry::new(),
1411            zone_map_dirty: false,
1412            compression_mode: mode,
1413        }
1414    }
1415
1416    /// Sets the compression mode for this column.
1417    pub fn set_compression_mode(&mut self, mode: CompressionMode) {
1418        self.compression_mode = mode;
1419    }
1420
1421    /// Returns the compression mode for this column.
1422    #[must_use]
1423    pub fn compression_mode(&self) -> CompressionMode {
1424        self.compression_mode
1425    }
1426
1427    /// Sets a value for an entity, appending to its version log.
1428    ///
1429    /// For non-transactional writes, pass the current epoch.
1430    /// For transactional writes, pass `EpochId::PENDING`.
1431    pub fn set(&mut self, id: Id, value: Value, epoch: EpochId) {
1432        self.update_zone_map_on_insert(&value);
1433        self.values.entry(id).or_default().append(epoch, value);
1434    }
1435
1436    /// Updates zone map when inserting a value.
1437    fn update_zone_map_on_insert(&mut self, value: &Value) {
1438        self.zone_map.row_count += 1;
1439
1440        if matches!(value, Value::Null) {
1441            self.zone_map.null_count += 1;
1442            return;
1443        }
1444
1445        match &self.zone_map.min {
1446            None => self.zone_map.min = Some(value.clone()),
1447            Some(current) => {
1448                if compare_values(value, current) == Some(Ordering::Less) {
1449                    self.zone_map.min = Some(value.clone());
1450                }
1451            }
1452        }
1453
1454        match &self.zone_map.max {
1455            None => self.zone_map.max = Some(value.clone()),
1456            Some(current) => {
1457                if compare_values(value, current) == Some(Ordering::Greater) {
1458                    self.zone_map.max = Some(value.clone());
1459                }
1460            }
1461        }
1462    }
1463
1464    /// Gets the latest value for an entity, filtering out tombstones (Null).
1465    #[must_use]
1466    pub fn get(&self, id: Id) -> Option<Value> {
1467        self.values
1468            .get(&id)
1469            .and_then(|log| log.latest())
1470            .filter(|v| !v.is_null())
1471            .cloned()
1472    }
1473
1474    /// Removes a value by appending a tombstone (Null) at the given epoch.
1475    pub fn remove(&mut self, id: Id, epoch: EpochId) -> Option<Value> {
1476        let previous = self.get(id);
1477        if previous.is_some() {
1478            self.values
1479                .entry(id)
1480                .or_default()
1481                .append(epoch, Value::Null);
1482            self.zone_map_dirty = true;
1483        }
1484        previous
1485    }
1486
1487    /// Returns the number of live (non-tombstoned) values in this column.
1488    #[must_use]
1489    pub fn len(&self) -> usize {
1490        self.values
1491            .values()
1492            .filter(|log| log.latest().is_some_and(|v| !v.is_null()))
1493            .count()
1494    }
1495
1496    /// Returns true if this column is empty.
1497    #[cfg(test)]
1498    #[must_use]
1499    #[allow(dead_code)]
1500    pub fn is_empty(&self) -> bool {
1501        self.len() == 0
1502    }
1503
1504    /// Returns compression statistics for this column.
1505    ///
1506    /// In temporal mode, compression is not used. Reports live value count only.
1507    #[must_use]
1508    pub fn compression_stats(&self) -> CompressionStats {
1509        let live_count = self.len();
1510        let hot_size = live_count * std::mem::size_of::<Value>();
1511
1512        CompressionStats {
1513            uncompressed_size: hot_size,
1514            compressed_size: hot_size,
1515            value_count: live_count,
1516            codec: None,
1517        }
1518    }
1519
1520    /// Returns estimated heap memory for this column.
1521    #[must_use]
1522    pub fn heap_memory_bytes(&self) -> usize {
1523        self.values.capacity()
1524            * (std::mem::size_of::<Id>() + std::mem::size_of::<VersionLog<Value>>() + 1)
1525    }
1526
1527    /// Compression is not supported in temporal mode (no-op).
1528    pub fn compress(&mut self) {}
1529
1530    /// Forces compression (no-op in temporal mode).
1531    pub fn force_compress(&mut self) {}
1532
1533    /// Returns the zone map for this column.
1534    #[must_use]
1535    pub fn zone_map(&self) -> &ZoneMapEntry {
1536        &self.zone_map
1537    }
1538
1539    /// Uses zone map to check if any values could satisfy the predicate.
1540    #[must_use]
1541    pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
1542        if self.zone_map_dirty {
1543            return true;
1544        }
1545
1546        match op {
1547            CompareOp::Eq => self.zone_map.might_contain_equal(value),
1548            CompareOp::Ne => match (&self.zone_map.min, &self.zone_map.max) {
1549                (Some(min), Some(max)) => {
1550                    !(compare_values(min, value) == Some(Ordering::Equal)
1551                        && compare_values(max, value) == Some(Ordering::Equal))
1552                }
1553                _ => true,
1554            },
1555            CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
1556            CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
1557            CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
1558            CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
1559        }
1560    }
1561
1562    /// Rebuilds zone map from current (latest) values.
1563    pub fn rebuild_zone_map(&mut self) {
1564        let mut zone_map = ZoneMapEntry::new();
1565
1566        for log in self.values.values() {
1567            if let Some(value) = log.latest() {
1568                zone_map.row_count += 1;
1569
1570                if matches!(value, Value::Null) {
1571                    zone_map.null_count += 1;
1572                    continue;
1573                }
1574
1575                match &zone_map.min {
1576                    None => zone_map.min = Some(value.clone()),
1577                    Some(current) => {
1578                        if compare_values(value, current) == Some(Ordering::Less) {
1579                            zone_map.min = Some(value.clone());
1580                        }
1581                    }
1582                }
1583
1584                match &zone_map.max {
1585                    None => zone_map.max = Some(value.clone()),
1586                    Some(current) => {
1587                        if compare_values(value, current) == Some(Ordering::Greater) {
1588                            zone_map.max = Some(value.clone());
1589                        }
1590                    }
1591                }
1592            }
1593        }
1594
1595        self.zone_map = zone_map;
1596        self.zone_map_dirty = false;
1597    }
1598
1599    // === Temporal-only methods ===
1600
1601    /// Gets the value at a specific epoch via binary search, filtering tombstones.
1602    #[must_use]
1603    pub fn get_at(&self, id: Id, epoch: EpochId) -> Option<Value> {
1604        self.values
1605            .get(&id)
1606            .and_then(|log| log.at(epoch))
1607            .filter(|v| !v.is_null())
1608            .cloned()
1609    }
1610
1611    /// Replaces PENDING epochs with the real commit epoch in all version logs.
1612    pub fn finalize_pending(&mut self, real_epoch: EpochId) {
1613        for log in self.values.values_mut() {
1614            log.finalize_pending(real_epoch);
1615        }
1616    }
1617
1618    /// Removes all PENDING entries from all version logs (transaction rollback).
1619    pub fn remove_pending(&mut self) {
1620        for log in self.values.values_mut() {
1621            log.remove_pending();
1622        }
1623        self.values.retain(|_, log| !log.is_empty());
1624    }
1625
1626    /// Garbage-collects old versions from all version logs.
1627    pub fn gc(&mut self, min_epoch: EpochId) {
1628        for log in self.values.values_mut() {
1629            log.gc(min_epoch);
1630        }
1631        self.values.retain(|_, log| !log.is_empty());
1632    }
1633
1634    /// Removes PENDING entries for a specific entity (targeted rollback).
1635    pub fn remove_pending_for(&mut self, id: Id) {
1636        if let Some(log) = self.values.get_mut(&id) {
1637            log.remove_pending();
1638            if log.is_empty() {
1639                self.values.remove(&id);
1640            }
1641        }
1642    }
1643
1644    /// Removes up to `n` PENDING entries for a specific entity.
1645    ///
1646    /// Used by savepoint rollback to pop only the entries added after the
1647    /// savepoint, leaving earlier PENDING entries intact.
1648    pub fn pop_n_pending_for(&mut self, id: Id, n: usize) {
1649        if let Some(log) = self.values.get_mut(&id) {
1650            log.pop_n_pending(n);
1651            if log.is_empty() {
1652                self.values.remove(&id);
1653            }
1654        }
1655    }
1656}
1657
1658/// Compares two values for ordering.
1659fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
1660    match (a, b) {
1661        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
1662        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
1663        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
1664        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
1665        (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
1666        (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
1667        (Value::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
1668        (Value::Date(a), Value::Date(b)) => Some(a.cmp(b)),
1669        (Value::Time(a), Value::Time(b)) => Some(a.cmp(b)),
1670        _ => None,
1671    }
1672}
1673
1674impl<Id: EntityId> Default for PropertyColumn<Id> {
1675    fn default() -> Self {
1676        Self::new()
1677    }
1678}
1679
1680/// A borrowed reference to a property column for bulk reads.
1681///
1682/// Holds the read lock so the column can't change while you're iterating.
1683pub struct PropertyColumnRef<'a, Id: EntityId = NodeId> {
1684    _guard: parking_lot::RwLockReadGuard<'a, FxHashMap<PropertyKey, PropertyColumn<Id>>>,
1685    _key: PropertyKey,
1686    _marker: PhantomData<Id>,
1687}
1688
1689#[cfg(test)]
1690#[cfg(not(feature = "temporal"))]
1691mod tests {
1692    use super::*;
1693    use arcstr::ArcStr;
1694
1695    #[test]
1696    fn test_property_storage_basic() {
1697        let storage = PropertyStorage::new();
1698
1699        let node1 = NodeId::new(1);
1700        let node2 = NodeId::new(2);
1701        let name_key = PropertyKey::new("name");
1702        let age_key = PropertyKey::new("age");
1703
1704        storage.set(node1, name_key.clone(), "Alix".into());
1705        storage.set(node1, age_key.clone(), 30i64.into());
1706        storage.set(node2, name_key.clone(), "Gus".into());
1707
1708        assert_eq!(
1709            storage.get(node1, &name_key),
1710            Some(Value::String("Alix".into()))
1711        );
1712        assert_eq!(storage.get(node1, &age_key), Some(Value::Int64(30)));
1713        assert_eq!(
1714            storage.get(node2, &name_key),
1715            Some(Value::String("Gus".into()))
1716        );
1717        assert!(storage.get(node2, &age_key).is_none());
1718    }
1719
1720    #[test]
1721    fn test_property_storage_remove() {
1722        let storage = PropertyStorage::new();
1723
1724        let node = NodeId::new(1);
1725        let key = PropertyKey::new("name");
1726
1727        storage.set(node, key.clone(), "Alix".into());
1728        assert!(storage.get(node, &key).is_some());
1729
1730        let removed = storage.remove(node, &key);
1731        assert!(removed.is_some());
1732        assert!(storage.get(node, &key).is_none());
1733    }
1734
1735    #[test]
1736    fn test_property_storage_get_all() {
1737        let storage = PropertyStorage::new();
1738
1739        let node = NodeId::new(1);
1740        storage.set(node, PropertyKey::new("name"), "Alix".into());
1741        storage.set(node, PropertyKey::new("age"), 30i64.into());
1742        storage.set(node, PropertyKey::new("active"), true.into());
1743
1744        let props = storage.get_all(node);
1745        assert_eq!(props.len(), 3);
1746    }
1747
1748    #[test]
1749    fn test_property_storage_remove_all() {
1750        let storage = PropertyStorage::new();
1751
1752        let node = NodeId::new(1);
1753        storage.set(node, PropertyKey::new("name"), "Alix".into());
1754        storage.set(node, PropertyKey::new("age"), 30i64.into());
1755
1756        storage.remove_all(node);
1757
1758        assert!(storage.get(node, &PropertyKey::new("name")).is_none());
1759        assert!(storage.get(node, &PropertyKey::new("age")).is_none());
1760    }
1761
1762    #[test]
1763    fn test_property_column() {
1764        let mut col = PropertyColumn::new();
1765
1766        col.set(NodeId::new(1), "Alix".into());
1767        col.set(NodeId::new(2), "Gus".into());
1768
1769        assert_eq!(col.len(), 2);
1770        assert!(!col.is_empty());
1771
1772        assert_eq!(col.get(NodeId::new(1)), Some(Value::String("Alix".into())));
1773
1774        col.remove(NodeId::new(1));
1775        assert!(col.get(NodeId::new(1)).is_none());
1776        assert_eq!(col.len(), 1);
1777    }
1778
1779    #[test]
1780    fn test_compression_mode() {
1781        let col: PropertyColumn<NodeId> = PropertyColumn::new();
1782        assert_eq!(col.compression_mode(), CompressionMode::None);
1783
1784        let col: PropertyColumn<NodeId> = PropertyColumn::with_compression(CompressionMode::Auto);
1785        assert_eq!(col.compression_mode(), CompressionMode::Auto);
1786    }
1787
1788    #[test]
1789    fn test_property_storage_with_compression() {
1790        let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1791
1792        for i in 0..100 {
1793            storage.set(
1794                NodeId::new(i),
1795                PropertyKey::new("age"),
1796                Value::Int64(20 + (i as i64 % 50)),
1797            );
1798        }
1799
1800        // Values should still be readable
1801        assert_eq!(
1802            storage.get(NodeId::new(0), &PropertyKey::new("age")),
1803            Some(Value::Int64(20))
1804        );
1805        assert_eq!(
1806            storage.get(NodeId::new(50), &PropertyKey::new("age")),
1807            Some(Value::Int64(20))
1808        );
1809    }
1810
1811    #[test]
1812    fn test_compress_integer_column() {
1813        let mut col: PropertyColumn<NodeId> =
1814            PropertyColumn::with_compression(CompressionMode::Auto);
1815
1816        // Add many sequential integers
1817        for i in 0..2000 {
1818            col.set(NodeId::new(i), Value::Int64(1000 + i as i64));
1819        }
1820
1821        // Should have triggered compression at some point
1822        // Total count should include both compressed and hot buffer values
1823        let stats = col.compression_stats();
1824        assert_eq!(stats.value_count, 2000);
1825
1826        // Values from the hot buffer should be readable
1827        // Note: Compressed values are not accessible via get() - see design note
1828        let last_value = col.get(NodeId::new(1999));
1829        assert!(last_value.is_some() || col.is_compressed());
1830    }
1831
1832    #[test]
1833    fn test_compress_string_column() {
1834        let mut col: PropertyColumn<NodeId> =
1835            PropertyColumn::with_compression(CompressionMode::Auto);
1836
1837        // Add repeated strings (good for dictionary compression)
1838        let categories = ["Person", "Company", "Product", "Location"];
1839        for i in 0..2000 {
1840            let cat = categories[i % 4];
1841            col.set(NodeId::new(i as u64), Value::String(ArcStr::from(cat)));
1842        }
1843
1844        // Total count should be correct
1845        assert_eq!(col.len(), 2000);
1846
1847        // Late values should be in hot buffer and readable
1848        let last_value = col.get(NodeId::new(1999));
1849        assert!(last_value.is_some() || col.is_compressed());
1850    }
1851
1852    #[test]
1853    fn test_compress_boolean_column() {
1854        let mut col: PropertyColumn<NodeId> =
1855            PropertyColumn::with_compression(CompressionMode::Auto);
1856
1857        // Add booleans
1858        for i in 0..2000 {
1859            col.set(NodeId::new(i as u64), Value::Bool(i % 2 == 0));
1860        }
1861
1862        // Verify total count
1863        assert_eq!(col.len(), 2000);
1864
1865        // Late values should be readable
1866        let last_value = col.get(NodeId::new(1999));
1867        assert!(last_value.is_some() || col.is_compressed());
1868    }
1869
1870    #[test]
1871    fn test_force_compress() {
1872        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1873
1874        // Add fewer values than the threshold
1875        for i in 0..100 {
1876            col.set(NodeId::new(i), Value::Int64(i as i64));
1877        }
1878
1879        // Force compression
1880        col.force_compress();
1881
1882        // Stats should show compression was applied if beneficial
1883        let stats = col.compression_stats();
1884        assert_eq!(stats.value_count, 100);
1885    }
1886
1887    #[test]
1888    fn test_compression_stats() {
1889        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1890
1891        for i in 0..50 {
1892            col.set(NodeId::new(i), Value::Int64(i as i64));
1893        }
1894
1895        let stats = col.compression_stats();
1896        assert_eq!(stats.value_count, 50);
1897        assert!(stats.uncompressed_size > 0);
1898    }
1899
1900    #[test]
1901    fn test_storage_compression_stats() {
1902        let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1903
1904        for i in 0..100 {
1905            storage.set(
1906                NodeId::new(i),
1907                PropertyKey::new("age"),
1908                Value::Int64(i as i64),
1909            );
1910            storage.set(
1911                NodeId::new(i),
1912                PropertyKey::new("name"),
1913                Value::String(ArcStr::from("Alix")),
1914            );
1915        }
1916
1917        let stats = storage.compression_stats();
1918        assert_eq!(stats.len(), 2); // Two columns
1919        assert!(stats.contains_key(&PropertyKey::new("age")));
1920        assert!(stats.contains_key(&PropertyKey::new("name")));
1921    }
1922
1923    #[test]
1924    fn test_memory_usage() {
1925        let storage = PropertyStorage::new();
1926
1927        for i in 0..100 {
1928            storage.set(
1929                NodeId::new(i),
1930                PropertyKey::new("value"),
1931                Value::Int64(i as i64),
1932            );
1933        }
1934
1935        let usage = storage.memory_usage();
1936        assert!(usage > 0);
1937    }
1938
1939    #[test]
1940    fn test_get_batch_single_property() {
1941        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1942
1943        let node1 = NodeId::new(1);
1944        let node2 = NodeId::new(2);
1945        let node3 = NodeId::new(3);
1946        let age_key = PropertyKey::new("age");
1947
1948        storage.set(node1, age_key.clone(), 25i64.into());
1949        storage.set(node2, age_key.clone(), 30i64.into());
1950        // node3 has no age property
1951
1952        let ids = vec![node1, node2, node3];
1953        let values = storage.get_batch(&ids, &age_key);
1954
1955        assert_eq!(values.len(), 3);
1956        assert_eq!(values[0], Some(Value::Int64(25)));
1957        assert_eq!(values[1], Some(Value::Int64(30)));
1958        assert_eq!(values[2], None);
1959    }
1960
1961    #[test]
1962    fn test_get_batch_missing_column() {
1963        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1964
1965        let node1 = NodeId::new(1);
1966        let node2 = NodeId::new(2);
1967        let missing_key = PropertyKey::new("nonexistent");
1968
1969        let ids = vec![node1, node2];
1970        let values = storage.get_batch(&ids, &missing_key);
1971
1972        assert_eq!(values.len(), 2);
1973        assert_eq!(values[0], None);
1974        assert_eq!(values[1], None);
1975    }
1976
1977    #[test]
1978    fn test_get_batch_empty_ids() {
1979        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1980        let key = PropertyKey::new("any");
1981
1982        let values = storage.get_batch(&[], &key);
1983        assert!(values.is_empty());
1984    }
1985
1986    #[test]
1987    fn test_get_all_batch() {
1988        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1989
1990        let node1 = NodeId::new(1);
1991        let node2 = NodeId::new(2);
1992        let node3 = NodeId::new(3);
1993
1994        storage.set(node1, PropertyKey::new("name"), "Alix".into());
1995        storage.set(node1, PropertyKey::new("age"), 25i64.into());
1996        storage.set(node2, PropertyKey::new("name"), "Gus".into());
1997        // node3 has no properties
1998
1999        let ids = vec![node1, node2, node3];
2000        let all_props = storage.get_all_batch(&ids);
2001
2002        assert_eq!(all_props.len(), 3);
2003        assert_eq!(all_props[0].len(), 2); // name and age
2004        assert_eq!(all_props[1].len(), 1); // name only
2005        assert_eq!(all_props[2].len(), 0); // no properties
2006
2007        assert_eq!(
2008            all_props[0].get(&PropertyKey::new("name")),
2009            Some(&Value::String("Alix".into()))
2010        );
2011        assert_eq!(
2012            all_props[1].get(&PropertyKey::new("name")),
2013            Some(&Value::String("Gus".into()))
2014        );
2015    }
2016
2017    #[test]
2018    fn test_get_all_batch_empty_ids() {
2019        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
2020
2021        let all_props = storage.get_all_batch(&[]);
2022        assert!(all_props.is_empty());
2023    }
2024}