Skip to main content

grafeo_core/graph/lpg/
property.rs

1//! Columnar property storage for nodes and edges.
2//!
3//! Properties are stored column-wise (all "name" values together, all "age"
4//! values together) rather than row-wise. This makes filtering fast - to find
5//! all nodes where age > 30, we only scan the age column.
6//!
7//! Each column also maintains a zone map (min/max/null_count) enabling the
8//! query optimizer to skip columns entirely when a predicate can't match.
9//!
10//! ## Compression
11//!
12//! Columns can be compressed to save memory. When compression is enabled,
13//! the column automatically selects the best codec based on the data type:
14//!
15//! | Data type | Codec | Typical savings |
16//! |-----------|-------|-----------------|
17//! | Int64 (sorted) | DeltaBitPacked | 5-20x |
18//! | Int64 (small) | BitPacked | 2-16x |
19//! | Int64 (repeated) | RunLength | 2-100x |
20//! | String (low cardinality) | Dictionary | 2-50x |
21//! | Bool | BitVector | 8x |
22
23use crate::codec::CompressionCodec;
24#[cfg(not(feature = "temporal"))]
25use crate::codec::block::DEFAULT_BLOCK_ROWS;
26#[cfg(not(feature = "temporal"))]
27use crate::codec::{CompressedData, DictionaryBuilder, DictionaryEncoding, TypeSpecificCompressor};
28use crate::index::zone_map::ZoneMapEntry;
29#[cfg(not(feature = "temporal"))]
30use arcstr::ArcStr;
31#[cfg(feature = "temporal")]
32use grafeo_common::temporal::VersionLog;
33#[cfg(feature = "temporal")]
34use grafeo_common::types::EpochId;
35use grafeo_common::types::{EdgeId, NodeId, PropertyKey, Value};
36use grafeo_common::utils::hash::FxHashMap;
37use parking_lot::RwLock;
38use std::cmp::Ordering;
39use std::hash::Hash;
40use std::marker::PhantomData;
41
42/// Compression mode for property columns.
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
44#[non_exhaustive]
45pub enum CompressionMode {
46    /// Never compress - always use sparse HashMap (default).
47    #[default]
48    None,
49    /// Automatically compress when beneficial (after threshold).
50    Auto,
51    /// Eagerly compress on every flush.
52    Eager,
53}
54
55/// Threshold for automatic compression (number of values).
56#[cfg(not(feature = "temporal"))]
57const COMPRESSION_THRESHOLD: usize = 1000;
58
59/// Size of the hot buffer for recent writes (before compression).
60/// Larger buffer (4096) keeps more recent data uncompressed for faster reads.
61/// This trades ~64KB of memory overhead per column for 1.5-2x faster point lookups
62/// on recently-written data.
63#[cfg(not(feature = "temporal"))]
64const HOT_BUFFER_SIZE: usize = 4096;
65
66/// Comparison operators used for zone map predicate checks.
67///
68/// These map directly to GQL comparison operators like `=`, `<`, `>=`.
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70#[non_exhaustive]
71pub enum CompareOp {
72    /// Equal to value.
73    Eq,
74    /// Not equal to value.
75    Ne,
76    /// Less than value.
77    Lt,
78    /// Less than or equal to value.
79    Le,
80    /// Greater than value.
81    Gt,
82    /// Greater than or equal to value.
83    Ge,
84}
85
86/// Trait for IDs that can key into property storage.
87///
88/// Implemented for [`NodeId`] and [`EdgeId`] - you can store properties on both.
89/// Provides safe conversions to/from `u64` for compression, replacing unsafe transmute.
90pub trait EntityId: Copy + Eq + Hash + 'static {
91    /// Returns the raw `u64` value.
92    fn as_u64(self) -> u64;
93    /// Creates an ID from a raw `u64` value.
94    fn from_u64(v: u64) -> Self;
95}
96
97impl EntityId for NodeId {
98    #[inline]
99    fn as_u64(self) -> u64 {
100        self.0
101    }
102    #[inline]
103    fn from_u64(v: u64) -> Self {
104        Self(v)
105    }
106}
107
108impl EntityId for EdgeId {
109    #[inline]
110    fn as_u64(self) -> u64 {
111        self.0
112    }
113    #[inline]
114    fn from_u64(v: u64) -> Self {
115        Self(v)
116    }
117}
118
119/// Thread-safe columnar property storage.
120///
121/// Each property key ("name", "age", etc.) gets its own column. This layout
122/// is great for analytical queries that filter on specific properties -
123/// you only touch the columns you need.
124///
125/// Generic over `Id` so the same storage works for nodes and edges.
126///
127/// # Example
128///
129/// ```
130/// # #[cfg(not(feature = "temporal"))]
131/// # {
132/// use grafeo_core::graph::lpg::PropertyStorage;
133/// use grafeo_common::types::{NodeId, PropertyKey};
134///
135/// let storage = PropertyStorage::new();
136/// let alix = NodeId::new(1);
137///
138/// storage.set(alix, PropertyKey::new("name"), "Alix".into());
139/// storage.set(alix, PropertyKey::new("age"), 30i64.into());
140///
141/// // Fetch all properties at once
142/// let props = storage.get_all(alix);
143/// assert_eq!(props.len(), 2);
144/// # }
145/// ```
146pub struct PropertyStorage<Id: EntityId = NodeId> {
147    /// Map from property key to column.
148    /// Lock order: 9 (nested, acquired via LpgStore::node_properties/edge_properties)
149    columns: RwLock<FxHashMap<PropertyKey, PropertyColumn<Id>>>,
150    /// Default compression mode for new columns.
151    default_compression: CompressionMode,
152    _marker: PhantomData<Id>,
153}
154
155impl<Id: EntityId> PropertyStorage<Id> {
156    /// Creates a new property storage.
157    #[must_use]
158    pub fn new() -> Self {
159        Self {
160            columns: RwLock::new(FxHashMap::default()),
161            default_compression: CompressionMode::None,
162            _marker: PhantomData,
163        }
164    }
165
166    /// Creates a new property storage with compression enabled.
167    #[must_use]
168    pub fn with_compression(mode: CompressionMode) -> Self {
169        Self {
170            columns: RwLock::new(FxHashMap::default()),
171            default_compression: mode,
172            _marker: PhantomData,
173        }
174    }
175
176    /// Sets the default compression mode for new columns.
177    pub fn set_default_compression(&mut self, mode: CompressionMode) {
178        self.default_compression = mode;
179    }
180
181    /// Sets a property value for an entity.
182    #[cfg(not(feature = "temporal"))]
183    pub fn set(&self, id: Id, key: PropertyKey, value: Value) {
184        let mut columns = self.columns.write();
185        let mode = self.default_compression;
186        columns
187            .entry(key)
188            .or_insert_with(|| PropertyColumn::with_compression(mode))
189            .set(id, value);
190    }
191
192    /// Sets a property value for an entity at a specific epoch.
193    ///
194    /// For non-transactional writes, pass the current epoch.
195    /// For transactional writes, pass `EpochId::PENDING`.
196    #[cfg(feature = "temporal")]
197    pub fn set(&self, id: Id, key: PropertyKey, value: Value, epoch: EpochId) {
198        let mut columns = self.columns.write();
199        let mode = self.default_compression;
200        columns
201            .entry(key)
202            .or_insert_with(|| PropertyColumn::with_compression(mode))
203            .set(id, value, epoch);
204    }
205
206    /// Enables compression for a specific column.
207    pub fn enable_compression(&self, key: &PropertyKey, mode: CompressionMode) {
208        let mut columns = self.columns.write();
209        if let Some(col) = columns.get_mut(key) {
210            col.set_compression_mode(mode);
211        }
212    }
213
214    /// Compresses all columns that have compression enabled.
215    pub fn compress_all(&self) {
216        let mut columns = self.columns.write();
217        for col in columns.values_mut() {
218            if col.compression_mode() != CompressionMode::None {
219                col.compress();
220            }
221        }
222    }
223
224    /// Forces compression on all columns regardless of mode.
225    pub fn force_compress_all(&self) {
226        let mut columns = self.columns.write();
227        for col in columns.values_mut() {
228            col.force_compress();
229        }
230    }
231
232    /// Returns compression statistics for all columns.
233    #[must_use]
234    pub fn compression_stats(&self) -> FxHashMap<PropertyKey, CompressionStats> {
235        let columns = self.columns.read();
236        columns
237            .iter()
238            .map(|(key, col)| (key.clone(), col.compression_stats()))
239            .collect()
240    }
241
242    /// Returns the total memory usage of all columns (compressed size estimate).
243    #[must_use]
244    pub fn memory_usage(&self) -> usize {
245        let columns = self.columns.read();
246        columns
247            .values()
248            .map(|col| col.compression_stats().compressed_size)
249            .sum()
250    }
251
252    /// Returns estimated heap memory for all columns including hash map overhead.
253    #[must_use]
254    pub fn heap_memory_bytes(&self) -> usize {
255        let columns = self.columns.read();
256        // Outer hash map capacity
257        let map_overhead = columns.capacity()
258            * (std::mem::size_of::<PropertyKey>() + std::mem::size_of::<PropertyColumn<Id>>() + 1);
259        // Sum of all column heap memory
260        let column_bytes: usize = columns.values().map(|col| col.heap_memory_bytes()).sum();
261        map_overhead + column_bytes
262    }
263
264    /// Gets a property value for an entity.
265    #[must_use]
266    pub fn get(&self, id: Id, key: &PropertyKey) -> Option<Value> {
267        let columns = self.columns.read();
268        columns.get(key).and_then(|col| col.get(id))
269    }
270
271    /// Removes a property value for an entity.
272    #[cfg(not(feature = "temporal"))]
273    pub fn remove(&self, id: Id, key: &PropertyKey) -> Option<Value> {
274        let mut columns = self.columns.write();
275        columns.get_mut(key).and_then(|col| col.remove(id))
276    }
277
278    /// Removes a property value for an entity (temporal: appends tombstone at epoch).
279    #[cfg(feature = "temporal")]
280    pub fn remove(&self, id: Id, key: &PropertyKey, epoch: EpochId) -> Option<Value> {
281        let mut columns = self.columns.write();
282        columns.get_mut(key).and_then(|col| col.remove(id, epoch))
283    }
284
285    /// Removes all properties for an entity.
286    #[cfg(not(feature = "temporal"))]
287    pub fn remove_all(&self, id: Id) {
288        let mut columns = self.columns.write();
289        for col in columns.values_mut() {
290            col.remove(id);
291        }
292    }
293
294    /// Removes all properties for an entity (temporal: tombstones at current epoch).
295    #[cfg(feature = "temporal")]
296    pub fn remove_all(&self, id: Id, epoch: EpochId) {
297        let mut columns = self.columns.write();
298        for col in columns.values_mut() {
299            col.remove(id, epoch);
300        }
301    }
302
303    /// Gets all properties for an entity.
304    #[must_use]
305    pub fn get_all(&self, id: Id) -> FxHashMap<PropertyKey, Value> {
306        let columns = self.columns.read();
307        let mut result = FxHashMap::default();
308        for (key, col) in columns.iter() {
309            if let Some(value) = col.get(id) {
310                result.insert(key.clone(), value);
311            }
312        }
313        result
314    }
315
316    /// Gets property values for multiple entities in a single lock acquisition.
317    ///
318    /// More efficient than calling [`Self::get`] in a loop because it acquires
319    /// the read lock only once.
320    ///
321    /// # Example
322    ///
323    /// ```
324    /// use grafeo_core::graph::lpg::PropertyStorage;
325    /// use grafeo_common::types::{PropertyKey, Value};
326    /// use grafeo_common::NodeId;
327    ///
328    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
329    /// let key = PropertyKey::new("age");
330    /// let ids = vec![NodeId(1), NodeId(2), NodeId(3)];
331    /// let values = storage.get_batch(&ids, &key);
332    /// // values[i] is the property value for ids[i], or None if not set
333    /// ```
334    #[must_use]
335    pub fn get_batch(&self, ids: &[Id], key: &PropertyKey) -> Vec<Option<Value>> {
336        let columns = self.columns.read();
337        match columns.get(key) {
338            Some(col) => ids.iter().map(|&id| col.get(id)).collect(),
339            None => vec![None; ids.len()],
340        }
341    }
342
343    /// Gets all properties for multiple entities efficiently.
344    ///
345    /// More efficient than calling [`Self::get_all`] in a loop because it
346    /// acquires the read lock only once.
347    ///
348    /// # Example
349    ///
350    /// ```
351    /// use grafeo_core::graph::lpg::PropertyStorage;
352    /// use grafeo_common::types::{PropertyKey, Value};
353    /// use grafeo_common::NodeId;
354    ///
355    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
356    /// let ids = vec![NodeId(1), NodeId(2)];
357    /// let all_props = storage.get_all_batch(&ids);
358    /// // all_props[i] is a HashMap of all properties for ids[i]
359    /// ```
360    #[must_use]
361    pub fn get_all_batch(&self, ids: &[Id]) -> Vec<FxHashMap<PropertyKey, Value>> {
362        let columns = self.columns.read();
363        let column_count = columns.len();
364
365        // Pre-allocate result vector with exact capacity (NebulaGraph pattern)
366        let mut results = Vec::with_capacity(ids.len());
367
368        for &id in ids {
369            // Pre-allocate HashMap with expected column count
370            let mut result = FxHashMap::with_capacity_and_hasher(column_count, Default::default());
371            for (key, col) in columns.iter() {
372                if let Some(value) = col.get(id) {
373                    result.insert(key.clone(), value);
374                }
375            }
376            results.push(result);
377        }
378
379        results
380    }
381
382    /// Gets selected properties for multiple entities efficiently (projection pushdown).
383    ///
384    /// This is more efficient than [`Self::get_all_batch`] when you only need a subset
385    /// of properties - it only iterates the requested columns instead of all columns.
386    ///
387    /// **Performance**: O(N × K) where N = ids.len() and K = keys.len(),
388    /// compared to O(N × C) for `get_all_batch` where C = total column count.
389    ///
390    /// # Example
391    ///
392    /// ```
393    /// use grafeo_core::graph::lpg::PropertyStorage;
394    /// use grafeo_common::types::{PropertyKey, Value};
395    /// use grafeo_common::NodeId;
396    ///
397    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
398    /// let ids = vec![NodeId::new(1), NodeId::new(2)];
399    /// let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
400    ///
401    /// // Only fetches "name" and "age" columns, ignoring other properties
402    /// let props = storage.get_selective_batch(&ids, &keys);
403    /// ```
404    #[must_use]
405    pub fn get_selective_batch(
406        &self,
407        ids: &[Id],
408        keys: &[PropertyKey],
409    ) -> Vec<FxHashMap<PropertyKey, Value>> {
410        if keys.is_empty() {
411            // No properties requested - return empty maps
412            return vec![FxHashMap::default(); ids.len()];
413        }
414
415        let columns = self.columns.read();
416
417        // Pre-collect only the columns we need (avoids re-lookup per id)
418        let requested_columns: Vec<_> = keys
419            .iter()
420            .filter_map(|key| columns.get(key).map(|col| (key, col)))
421            .collect();
422
423        // Pre-allocate result with exact capacity
424        let mut results = Vec::with_capacity(ids.len());
425
426        for &id in ids {
427            let mut result =
428                FxHashMap::with_capacity_and_hasher(requested_columns.len(), Default::default());
429            // Only iterate requested columns, not all columns
430            for (key, col) in &requested_columns {
431                if let Some(value) = col.get(id) {
432                    result.insert((*key).clone(), value);
433                }
434            }
435            results.push(result);
436        }
437
438        results
439    }
440
441    /// Returns the number of property columns.
442    #[must_use]
443    pub fn column_count(&self) -> usize {
444        self.columns.read().len()
445    }
446
447    /// Returns the keys of all columns.
448    #[must_use]
449    pub fn keys(&self) -> Vec<PropertyKey> {
450        self.columns.read().keys().cloned().collect()
451    }
452
453    /// Removes all property data.
454    pub fn clear(&self) {
455        self.columns.write().clear();
456    }
457
458    // ── Column-level spill / reload ────────────────────────────────
459
460    /// Evicts all values from a specific property column, freeing heap memory.
461    ///
462    /// Returns `(count, estimated_freed_bytes)`. The column stays registered
463    /// (zone map preserved) but `get()` returns `None` until `restore_column()`.
464    #[cfg(not(feature = "temporal"))]
465    pub fn evict_column(&self, key: &PropertyKey) -> (usize, usize) {
466        let mut columns = self.columns.write();
467        if let Some(column) = columns.get_mut(key) {
468            column.evict_values()
469        } else {
470            (0, 0)
471        }
472    }
473
474    /// Restores values into a previously evicted column.
475    ///
476    /// Clears the `spilled` flag on the column. If the column doesn't exist,
477    /// it is created.
478    #[cfg(not(feature = "temporal"))]
479    pub fn restore_column(&self, key: &PropertyKey, values: impl Iterator<Item = (Id, Value)>) {
480        let mut columns = self.columns.write();
481        let column = columns
482            .entry(key.clone())
483            .or_insert_with(|| PropertyColumn::with_compression(self.default_compression));
484        column.restore_values(values);
485    }
486
487    /// Drains all values from a column, returning them for export to disk.
488    ///
489    /// After this call, `is_column_spilled(key)` returns `true`.
490    /// The column remains registered (zone map preserved).
491    #[cfg(not(feature = "temporal"))]
492    pub fn drain_column(&self, key: &PropertyKey) -> Vec<(Id, Value)> {
493        let mut columns = self.columns.write();
494        if let Some(column) = columns.get_mut(key) {
495            column.drain_values()
496        } else {
497            Vec::new()
498        }
499    }
500
501    /// Whether a specific column has been spilled to disk.
502    #[cfg(not(feature = "temporal"))]
503    #[must_use]
504    pub fn is_column_spilled(&self, key: &PropertyKey) -> bool {
505        self.columns
506            .read()
507            .get(key)
508            .is_some_and(|col| col.is_spilled())
509    }
510
511    /// Marks a column as spilled without draining its values.
512    ///
513    /// Used on startup when re-establishing spill state: the column may
514    /// already be empty (loaded from a checkpoint that serialized after spill).
515    #[cfg(not(feature = "temporal"))]
516    pub fn mark_column_spilled(&self, key: &PropertyKey) {
517        let mut columns = self.columns.write();
518        let column = columns.entry(key.clone()).or_default();
519        column.mark_spilled();
520    }
521
522    /// Gets a column by key for bulk access.
523    #[must_use]
524    pub fn column(&self, key: &PropertyKey) -> Option<PropertyColumnRef<'_, Id>> {
525        let columns = self.columns.read();
526        if columns.contains_key(key) {
527            Some(PropertyColumnRef {
528                _guard: columns,
529                _key: key.clone(),
530                _marker: PhantomData,
531            })
532        } else {
533            None
534        }
535    }
536
537    /// Checks if a predicate might match any values (using zone maps).
538    ///
539    /// Returns `false` only when we're *certain* no values match - for example,
540    /// if you're looking for age > 100 but the max age is 80. Returns `true`
541    /// if the property doesn't exist (conservative - might match).
542    #[must_use]
543    pub fn might_match(&self, key: &PropertyKey, op: CompareOp, value: &Value) -> bool {
544        let columns = self.columns.read();
545        columns
546            .get(key)
547            .map_or(true, |col| col.might_match(op, value)) // No column = assume might match (conservative)
548    }
549
550    /// Gets the zone map for a property column.
551    #[must_use]
552    pub fn zone_map(&self, key: &PropertyKey) -> Option<ZoneMapEntry> {
553        let columns = self.columns.read();
554        columns.get(key).map(|col| col.zone_map().clone())
555    }
556
557    /// Returns the per-block zone maps for a property column, if any.
558    ///
559    /// Returns `None` when the column doesn't exist; returns `Some(empty)`
560    /// when the column exists but is uncompressed (the hot buffer is
561    /// unordered, so per-block pruning is meaningless there). Phase 4 will
562    /// treat "no per-block stats" as "fall back to the column-level zone
563    /// map".
564    ///
565    /// **Temporal mode:** always returns `Some(empty)` for any existing
566    /// column. Compression is disabled for `VersionLog`-backed columns,
567    /// so there is no sorted compressed array to chunk into blocks. Use
568    /// the column-level [`zone_map`](Self::zone_map) instead.
569    #[must_use]
570    pub fn block_zone_maps_for(&self, key: &PropertyKey) -> Option<Vec<ZoneMapEntry>> {
571        let columns = self.columns.read();
572        columns.get(key).map(|col| col.block_zone_maps().to_vec())
573    }
574
575    /// Returns the number of compressed blocks for a property column.
576    ///
577    /// Returns `None` when the column doesn't exist; returns `Some(0)` for
578    /// an uncompressed column.
579    #[cfg(not(feature = "temporal"))]
580    #[must_use]
581    pub fn block_count_for(&self, key: &PropertyKey) -> Option<usize> {
582        self.columns.read().get(key).map(|col| col.block_count())
583    }
584
585    /// Decodes a single compressed block of a property column.
586    ///
587    /// See [`PropertyColumn::decode_block`] for semantics. Returns `None`
588    /// when the column doesn't exist, the column is uncompressed, or
589    /// `block_idx` is out of range.
590    #[cfg(not(feature = "temporal"))]
591    #[must_use]
592    pub fn decode_block_for(
593        &self,
594        key: &PropertyKey,
595        block_idx: usize,
596    ) -> Option<DecodedBlock<Id>> {
597        self.columns
598            .read()
599            .get(key)
600            .and_then(|col| col.decode_block(block_idx))
601    }
602
603    /// Decodes every compressed block of a property column under a single
604    /// read-lock acquisition, returning them as a `Vec`.
605    ///
606    /// Returns an empty `Vec` when the column doesn't exist or is
607    /// uncompressed. Phase 4's iterator-bounds operator prefers
608    /// [`Self::decode_block_for`] (after pruning by zone map) over
609    /// decoding all blocks up-front; this method exists for tests and
610    /// debug tools that want the whole picture.
611    #[cfg(not(feature = "temporal"))]
612    #[must_use]
613    pub fn decoded_blocks_for(&self, key: &PropertyKey) -> Vec<DecodedBlock<Id>> {
614        let columns = self.columns.read();
615        match columns.get(key) {
616            Some(col) => col.iter_decoded_blocks().collect(),
617            None => Vec::new(),
618        }
619    }
620
621    /// Checks if a range predicate might match any values (using zone maps).
622    ///
623    /// Returns `false` only when we're *certain* no values match the range.
624    /// Returns `true` if the property doesn't exist (conservative - might match).
625    #[must_use]
626    pub fn might_match_range(
627        &self,
628        key: &PropertyKey,
629        min: Option<&Value>,
630        max: Option<&Value>,
631        min_inclusive: bool,
632        max_inclusive: bool,
633    ) -> bool {
634        let columns = self.columns.read();
635        columns.get(key).map_or(true, |col| {
636            col.zone_map()
637                .might_contain_range(min, max, min_inclusive, max_inclusive)
638        }) // No column = assume might match (conservative)
639    }
640
641    /// Rebuilds zone maps for all columns (call after bulk removes).
642    pub fn rebuild_zone_maps(&self) {
643        let mut columns = self.columns.write();
644        for col in columns.values_mut() {
645            col.rebuild_zone_map();
646        }
647    }
648}
649
650impl<Id: EntityId> Default for PropertyStorage<Id> {
651    fn default() -> Self {
652        Self::new()
653    }
654}
655
656// === Temporal-only methods for PropertyStorage ===
657#[cfg(feature = "temporal")]
658impl<Id: EntityId> PropertyStorage<Id> {
659    /// Returns a write guard to the columns map for targeted rollback.
660    pub(crate) fn columns_write(
661        &self,
662    ) -> parking_lot::RwLockWriteGuard<'_, FxHashMap<PropertyKey, PropertyColumn<Id>>> {
663        self.columns.write()
664    }
665
666    /// Gets a property value at a specific epoch.
667    #[must_use]
668    pub fn get_at(&self, id: Id, key: &PropertyKey, epoch: EpochId) -> Option<Value> {
669        let columns = self.columns.read();
670        columns.get(key).and_then(|col| col.get_at(id, epoch))
671    }
672
673    /// Gets all properties for an entity at a specific epoch.
674    #[must_use]
675    pub fn get_all_at(&self, id: Id, epoch: EpochId) -> FxHashMap<PropertyKey, Value> {
676        let columns = self.columns.read();
677        let mut result = FxHashMap::default();
678        for (key, col) in columns.iter() {
679            if let Some(value) = col.get_at(id, epoch) {
680                result.insert(key.clone(), value);
681            }
682        }
683        result
684    }
685
686    /// Replaces PENDING epochs with the real commit epoch in all columns.
687    pub fn finalize_pending(&self, real_epoch: EpochId) {
688        let mut columns = self.columns.write();
689        for col in columns.values_mut() {
690            col.finalize_pending(real_epoch);
691        }
692    }
693
694    /// Removes all PENDING entries from all columns (transaction rollback).
695    pub fn remove_pending(&self) {
696        let mut columns = self.columns.write();
697        for col in columns.values_mut() {
698            col.remove_pending();
699        }
700    }
701
702    /// Garbage-collects old versions from all columns.
703    pub fn gc(&self, min_epoch: EpochId) {
704        let mut columns = self.columns.write();
705        for col in columns.values_mut() {
706            col.gc(min_epoch);
707        }
708    }
709
710    /// Returns the full version history for all properties of an entity.
711    ///
712    /// Each entry is `(key, Vec<(epoch, value)>)`. Useful for snapshot
713    /// export that preserves temporal history.
714    #[must_use]
715    pub fn get_all_history(&self, id: Id) -> Vec<(PropertyKey, Vec<(EpochId, Value)>)> {
716        let columns = self.columns.read();
717        let mut result = Vec::new();
718        for (key, col) in columns.iter() {
719            if let Some(log) = col.values.get(&id) {
720                let entries: Vec<(EpochId, Value)> = log
721                    .history()
722                    .iter()
723                    .map(|(epoch, value)| (*epoch, value.clone()))
724                    .collect();
725                if !entries.is_empty() {
726                    result.push((key.clone(), entries));
727                }
728            }
729        }
730        result
731    }
732
733    /// Returns the version history for a single property of an entity.
734    ///
735    /// More efficient than `get_all_history` when only one property is needed.
736    #[must_use]
737    pub fn get_history(&self, id: Id, key: &PropertyKey) -> Vec<(EpochId, Value)> {
738        let columns = self.columns.read();
739        columns
740            .get(key)
741            .and_then(|col| col.values.get(&id))
742            .map(|log| log.history().iter().map(|(e, v)| (*e, v.clone())).collect())
743            .unwrap_or_default()
744    }
745}
746
747/// Compressed storage for a property column.
748///
749/// Holds the compressed representation of values along with the index
750/// mapping entity IDs to positions in the compressed array.
751#[cfg(not(feature = "temporal"))]
752#[derive(Debug)]
753#[non_exhaustive]
754pub enum CompressedColumnData {
755    /// Compressed integers (Int64 values).
756    Integers {
757        /// Compressed data.
758        data: CompressedData,
759        /// Index: entity ID position -> compressed array index.
760        id_to_index: Vec<u64>,
761        /// Reverse index: compressed array index -> entity ID position.
762        index_to_id: Vec<u64>,
763    },
764    /// Dictionary-encoded strings.
765    Strings {
766        /// Dictionary encoding.
767        encoding: DictionaryEncoding,
768        /// Index: entity ID position -> dictionary index.
769        id_to_index: Vec<u64>,
770        /// Reverse index: dictionary index -> entity ID position.
771        index_to_id: Vec<u64>,
772    },
773    /// Compressed booleans.
774    Booleans {
775        /// Compressed data.
776        data: CompressedData,
777        /// Index: entity ID position -> bit index.
778        id_to_index: Vec<u64>,
779        /// Reverse index: bit index -> entity ID position.
780        index_to_id: Vec<u64>,
781    },
782}
783
784#[cfg(not(feature = "temporal"))]
785impl CompressedColumnData {
786    /// Returns the memory usage of the compressed data in bytes.
787    #[must_use]
788    pub fn memory_usage(&self) -> usize {
789        match self {
790            CompressedColumnData::Integers {
791                data,
792                id_to_index,
793                index_to_id,
794            } => {
795                data.data.len()
796                    + id_to_index.len() * std::mem::size_of::<u64>()
797                    + index_to_id.len() * std::mem::size_of::<u64>()
798            }
799            CompressedColumnData::Strings {
800                encoding,
801                id_to_index,
802                index_to_id,
803            } => {
804                encoding.code_count() * 4
805                    + encoding.dictionary().iter().map(|s| s.len()).sum::<usize>()
806                    + id_to_index.len() * std::mem::size_of::<u64>()
807                    + index_to_id.len() * std::mem::size_of::<u64>()
808            }
809            CompressedColumnData::Booleans {
810                data,
811                id_to_index,
812                index_to_id,
813            } => {
814                data.data.len()
815                    + id_to_index.len() * std::mem::size_of::<u64>()
816                    + index_to_id.len() * std::mem::size_of::<u64>()
817            }
818        }
819    }
820}
821
822/// A decoded compressed block of `(id, value)` pairs from a property column.
823///
824/// Phase 4's iterator-bounds operator consumes these after pruning via
825/// per-block zone maps. The `entries` are sorted by entity id (matching
826/// the underlying compressed layout).
827#[cfg(not(feature = "temporal"))]
828#[derive(Debug, Clone)]
829pub struct DecodedBlock<Id: EntityId> {
830    /// Per-block min/max/null/row counts populated when the block was
831    /// compressed.
832    pub zone_map: ZoneMapEntry,
833    /// `(id, value)` pairs, sorted by id.
834    pub entries: Vec<(Id, Value)>,
835}
836
837/// Statistics about column compression.
838#[derive(Debug, Clone, Default)]
839pub struct CompressionStats {
840    /// Size of uncompressed data in bytes.
841    pub uncompressed_size: usize,
842    /// Size of compressed data in bytes.
843    pub compressed_size: usize,
844    /// Number of values in the column.
845    pub value_count: usize,
846    /// Codec used for compression.
847    pub codec: Option<CompressionCodec>,
848}
849
850impl CompressionStats {
851    /// Returns the compression ratio (uncompressed / compressed).
852    #[must_use]
853    pub fn compression_ratio(&self) -> f64 {
854        if self.compressed_size == 0 {
855            return 1.0;
856        }
857        self.uncompressed_size as f64 / self.compressed_size as f64
858    }
859}
860
861/// A single property column (e.g., all "age" values).
862///
863/// Maintains min/max/null_count for fast predicate evaluation. When you
864/// filter on `age > 50`, we first check if any age could possibly match
865/// before scanning the actual values.
866///
867/// Columns support optional compression for large datasets. When compression
868/// is enabled, the column automatically selects the best codec based on the
869/// data type and characteristics.
870pub struct PropertyColumn<Id: EntityId = NodeId> {
871    /// Sparse storage: entity ID -> value (hot buffer + uncompressed).
872    /// Used for recent writes and when compression is disabled.
873    #[cfg(not(feature = "temporal"))]
874    values: FxHashMap<Id, Value>,
875    /// Versioned storage: entity ID -> append-only version log.
876    /// Each value is tagged with the epoch it was written in.
877    #[cfg(feature = "temporal")]
878    values: FxHashMap<Id, VersionLog<Value>>,
879    /// Zone map tracking min/max/null_count for predicate pushdown.
880    zone_map: ZoneMapEntry,
881    /// Whether zone map needs rebuild (after removes).
882    zone_map_dirty: bool,
883    /// Compression mode for this column.
884    compression_mode: CompressionMode,
885    /// Compressed data (when compression is enabled and triggered).
886    #[cfg(not(feature = "temporal"))]
887    compressed: Option<CompressedColumnData>,
888    /// Number of values before last compression.
889    #[cfg(not(feature = "temporal"))]
890    compressed_count: usize,
891    /// Whether this column's values have been spilled to disk.
892    /// When true, `get()` returns `None` for all IDs. The column remains
893    /// registered so the schema knows the property exists, but values are
894    /// served from a mmap-backed store instead.
895    #[cfg(not(feature = "temporal"))]
896    spilled: bool,
897    /// Per-block zone maps populated when the column is compressed.
898    ///
899    /// Each entry covers a contiguous slice of `DEFAULT_BLOCK_ROWS` rows of
900    /// the sorted compressed array. Empty when the column is uncompressed
901    /// (the hot buffer is a `HashMap` with no row order, so per-block
902    /// pruning would be meaningless). Phase 4 consumes these for lazy
903    /// `range_iter`-style scans.
904    block_zone_maps: Vec<ZoneMapEntry>,
905}
906
907#[cfg(not(feature = "temporal"))]
908impl<Id: EntityId> PropertyColumn<Id> {
909    /// Creates a new empty column.
910    #[must_use]
911    pub fn new() -> Self {
912        Self {
913            values: FxHashMap::default(),
914            zone_map: ZoneMapEntry::new(),
915            zone_map_dirty: false,
916            compression_mode: CompressionMode::None,
917            compressed: None,
918            compressed_count: 0,
919            spilled: false,
920            block_zone_maps: Vec::new(),
921        }
922    }
923
924    /// Creates a new column with the specified compression mode.
925    #[must_use]
926    pub fn with_compression(mode: CompressionMode) -> Self {
927        Self {
928            values: FxHashMap::default(),
929            zone_map: ZoneMapEntry::new(),
930            zone_map_dirty: false,
931            compression_mode: mode,
932            compressed: None,
933            compressed_count: 0,
934            spilled: false,
935            block_zone_maps: Vec::new(),
936        }
937    }
938
939    /// Sets the compression mode for this column.
940    pub fn set_compression_mode(&mut self, mode: CompressionMode) {
941        self.compression_mode = mode;
942        if mode == CompressionMode::None {
943            // Decompress if switching to no compression
944            if self.compressed.is_some() {
945                self.decompress_all();
946            }
947        }
948    }
949
950    /// Returns the compression mode for this column.
951    #[must_use]
952    pub fn compression_mode(&self) -> CompressionMode {
953        self.compression_mode
954    }
955
956    /// Sets a value for an entity.
957    pub fn set(&mut self, id: Id, value: Value) {
958        // Update zone map incrementally
959        self.update_zone_map_on_insert(&value);
960        self.values.insert(id, value);
961
962        // Check if we should compress (in Auto mode)
963        if self.compression_mode == CompressionMode::Auto {
964            let total_count = self.values.len() + self.compressed_count;
965            let hot_buffer_count = self.values.len();
966
967            // Compress when hot buffer exceeds threshold and total is large enough
968            if hot_buffer_count >= HOT_BUFFER_SIZE && total_count >= COMPRESSION_THRESHOLD {
969                self.compress();
970            }
971        }
972    }
973
974    /// Updates zone map when inserting a value.
975    fn update_zone_map_on_insert(&mut self, value: &Value) {
976        self.zone_map.row_count += 1;
977
978        if matches!(value, Value::Null) {
979            self.zone_map.null_count += 1;
980            return;
981        }
982
983        // Update min
984        match &self.zone_map.min {
985            None => self.zone_map.min = Some(value.clone()),
986            Some(current) => {
987                if compare_values(value, current) == Some(Ordering::Less) {
988                    self.zone_map.min = Some(value.clone());
989                }
990            }
991        }
992
993        // Update max
994        match &self.zone_map.max {
995            None => self.zone_map.max = Some(value.clone()),
996            Some(current) => {
997                if compare_values(value, current) == Some(Ordering::Greater) {
998                    self.zone_map.max = Some(value.clone());
999                }
1000            }
1001        }
1002    }
1003
1004    /// Gets a value for an entity.
1005    ///
1006    /// First checks the hot buffer (uncompressed values), then falls back
1007    /// to the compressed data if present.
1008    #[must_use]
1009    pub fn get(&self, id: Id) -> Option<Value> {
1010        // First check hot buffer
1011        if let Some(value) = self.values.get(&id) {
1012            return Some(value.clone());
1013        }
1014
1015        // For now, compressed data lookup is not implemented for sparse access
1016        // because the compressed format stores values by index, not by entity ID.
1017        // This would require maintaining an ID -> index map in CompressedColumnData.
1018        // The compressed data is primarily useful for bulk/scan operations.
1019        None
1020    }
1021
1022    /// Removes a value for an entity.
1023    pub fn remove(&mut self, id: Id) -> Option<Value> {
1024        let removed = self.values.remove(&id);
1025        if removed.is_some() {
1026            // Mark zone map as dirty - would need full rebuild for accurate min/max
1027            self.zone_map_dirty = true;
1028        }
1029        removed
1030    }
1031
1032    // ── Spill / Reload ─────────────────────────────────────────────
1033
1034    /// Marks the column as spilled without clearing values.
1035    ///
1036    /// Used on startup when re-establishing spill state from persisted files.
1037    pub fn mark_spilled(&mut self) {
1038        self.spilled = true;
1039    }
1040
1041    /// Whether this column's values have been spilled to disk.
1042    ///
1043    /// When spilled, `get()` returns `None` and values are served from an
1044    /// external mmap-backed store. New writes still go into this column
1045    /// (the accessor checks both).
1046    #[must_use]
1047    pub fn is_spilled(&self) -> bool {
1048        self.spilled
1049    }
1050
1051    /// Evicts all values from this column, freeing their heap memory.
1052    ///
1053    /// Returns `(count, estimated_freed_bytes)`. After this call,
1054    /// `is_spilled()` returns `true` and `get()` returns `None` for all IDs.
1055    /// The column remains registered in the schema (zone map, compression
1056    /// metadata are preserved).
1057    pub fn evict_values(&mut self) -> (usize, usize) {
1058        let count = self.values.len();
1059        let freed_bytes = self.heap_memory_bytes();
1060        self.values.clear();
1061        self.values.shrink_to_fit();
1062        self.compressed = None;
1063        self.compressed_count = 0;
1064        self.block_zone_maps.clear();
1065        self.spilled = true;
1066        (count, freed_bytes)
1067    }
1068
1069    /// Drains all values from this column, returning them for export.
1070    ///
1071    /// After this call, `is_spilled()` returns `true`. This combines
1072    /// export + evict in one step to avoid cloning all values.
1073    pub fn drain_values(&mut self) -> Vec<(Id, Value)> {
1074        let drained: Vec<(Id, Value)> = self.values.drain().collect();
1075        self.values.shrink_to_fit();
1076        self.compressed = None;
1077        self.compressed_count = 0;
1078        self.block_zone_maps.clear();
1079        self.spilled = true;
1080        drained
1081    }
1082
1083    /// Restores values into this column after a reload from disk.
1084    ///
1085    /// Clears the `spilled` flag. Callers are responsible for providing
1086    /// the correct values (from `MmapStorage::export_all()` or similar).
1087    pub fn restore_values(&mut self, values: impl Iterator<Item = (Id, Value)>) {
1088        self.spilled = false;
1089        // Insert directly into the map without calling set(), which would
1090        // re-increment zone map counters (row_count, null_count) on top of
1091        // the already-preserved zone map from before eviction.
1092        for (id, value) in values {
1093            self.values.insert(id, value);
1094        }
1095    }
1096
1097    /// Returns the number of values in this column (hot + compressed).
1098    #[must_use]
1099    pub fn len(&self) -> usize {
1100        self.values.len() + self.compressed_count
1101    }
1102
1103    /// Returns true if this column is empty.
1104    #[cfg(test)]
1105    #[must_use]
1106    pub fn is_empty(&self) -> bool {
1107        self.values.is_empty() && self.compressed_count == 0
1108    }
1109
1110    /// Returns compression statistics for this column.
1111    #[must_use]
1112    pub fn compression_stats(&self) -> CompressionStats {
1113        let hot_size = self.values.len() * std::mem::size_of::<Value>();
1114        let compressed_size = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
1115        let codec = match &self.compressed {
1116            Some(CompressedColumnData::Integers { data, .. }) => Some(data.codec),
1117            Some(CompressedColumnData::Strings { .. }) => Some(CompressionCodec::Dictionary),
1118            Some(CompressedColumnData::Booleans { data, .. }) => Some(data.codec),
1119            None => None,
1120        };
1121
1122        CompressionStats {
1123            uncompressed_size: hot_size + self.compressed_count * std::mem::size_of::<Value>(),
1124            compressed_size: hot_size + compressed_size,
1125            value_count: self.len(),
1126            codec,
1127        }
1128    }
1129
1130    /// Returns estimated heap memory for this column.
1131    ///
1132    /// Includes the hot buffer hash map capacity, zone map, and any
1133    /// compressed data.
1134    #[must_use]
1135    pub fn heap_memory_bytes(&self) -> usize {
1136        // Hot buffer: FxHashMap<Id, Value> capacity
1137        let hot_bytes =
1138            self.values.capacity() * (std::mem::size_of::<Id>() + std::mem::size_of::<Value>() + 1);
1139        // Compressed data
1140        let compressed_bytes = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
1141        // ZoneMapEntry is inline (no heap), so just hot + compressed
1142        hot_bytes + compressed_bytes
1143    }
1144
1145    /// Returns whether the column has compressed data.
1146    #[must_use]
1147    #[cfg(test)]
1148    pub fn is_compressed(&self) -> bool {
1149        self.compressed.is_some()
1150    }
1151
1152    /// Compresses the hot buffer values.
1153    ///
1154    /// This merges the hot buffer into the compressed data, selecting the
1155    /// best codec based on the value types.
1156    ///
1157    /// Note: If compressed data already exists, this is a no-op to avoid
1158    /// losing previously compressed values. Use `force_compress()` after
1159    /// decompressing to re-compress with all values.
1160    pub fn compress(&mut self) {
1161        if self.values.is_empty() {
1162            return;
1163        }
1164
1165        // Don't re-compress if we already have compressed data
1166        // (would need to decompress and merge first)
1167        if self.compressed.is_some() {
1168            return;
1169        }
1170
1171        // Determine the dominant type
1172        let (int_count, str_count, bool_count) = self.count_types();
1173        let total = self.values.len();
1174
1175        if int_count > total / 2 {
1176            self.compress_as_integers();
1177        } else if str_count > total / 2 {
1178            self.compress_as_strings();
1179        } else if bool_count > total / 2 {
1180            self.compress_as_booleans();
1181        }
1182        // If no dominant type, don't compress (mixed types don't compress well)
1183    }
1184
1185    /// Counts values by type.
1186    fn count_types(&self) -> (usize, usize, usize) {
1187        let mut int_count = 0;
1188        let mut str_count = 0;
1189        let mut bool_count = 0;
1190
1191        for value in self.values.values() {
1192            match value {
1193                Value::Int64(_) => int_count += 1,
1194                Value::String(_) => str_count += 1,
1195                Value::Bool(_) => bool_count += 1,
1196                _ => {}
1197            }
1198        }
1199
1200        (int_count, str_count, bool_count)
1201    }
1202
1203    /// Compresses integer values.
1204    fn compress_as_integers(&mut self) {
1205        // Extract integer values and their IDs
1206        let mut values: Vec<(u64, i64)> = Vec::new();
1207        let mut non_int_values: FxHashMap<Id, Value> = FxHashMap::default();
1208
1209        for (&id, value) in &self.values {
1210            match value {
1211                Value::Int64(v) => {
1212                    let id_u64 = id.as_u64();
1213                    values.push((id_u64, *v));
1214                }
1215                _ => {
1216                    non_int_values.insert(id, value.clone());
1217                }
1218            }
1219        }
1220
1221        if values.len() < 8 {
1222            // Not worth compressing
1223            return;
1224        }
1225
1226        // Sort by ID for better compression
1227        values.sort_by_key(|(id, _)| *id);
1228
1229        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
1230        let index_to_id: Vec<u64> = id_to_index.clone();
1231        let int_values: Vec<i64> = values.iter().map(|(_, v)| *v).collect();
1232
1233        // Compress using the optimal codec
1234        let Ok(compressed) = TypeSpecificCompressor::compress_signed_integers(&int_values) else {
1235            return;
1236        };
1237
1238        // Only use compression if it actually saves space
1239        if compressed.compression_ratio() > 1.2 {
1240            self.block_zone_maps =
1241                compute_block_zone_maps(int_values.iter().map(|v| Value::Int64(*v)));
1242            self.compressed = Some(CompressedColumnData::Integers {
1243                data: compressed,
1244                id_to_index,
1245                index_to_id,
1246            });
1247            self.compressed_count = values.len();
1248            self.values = non_int_values;
1249        }
1250    }
1251
1252    /// Compresses string values using dictionary encoding.
1253    fn compress_as_strings(&mut self) {
1254        let mut values: Vec<(u64, ArcStr)> = Vec::new();
1255        let mut non_str_values: FxHashMap<Id, Value> = FxHashMap::default();
1256
1257        for (&id, value) in &self.values {
1258            match value {
1259                Value::String(s) => {
1260                    values.push((id.as_u64(), s.clone()));
1261                }
1262                _ => {
1263                    non_str_values.insert(id, value.clone());
1264                }
1265            }
1266        }
1267
1268        if values.len() < 8 {
1269            return;
1270        }
1271
1272        // Sort by ID
1273        values.sort_by_key(|(id, _)| *id);
1274
1275        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
1276        let index_to_id: Vec<u64> = id_to_index.clone();
1277
1278        // Build dictionary
1279        let mut builder = DictionaryBuilder::new();
1280        for (_, s) in &values {
1281            builder.add(s.as_ref());
1282        }
1283        let encoding = builder.build();
1284
1285        // Only use compression if it actually saves space
1286        if encoding.compression_ratio() > 1.2 {
1287            self.block_zone_maps =
1288                compute_block_zone_maps(values.iter().map(|(_, s)| Value::String(s.clone())));
1289            self.compressed = Some(CompressedColumnData::Strings {
1290                encoding,
1291                id_to_index,
1292                index_to_id,
1293            });
1294            self.compressed_count = values.len();
1295            self.values = non_str_values;
1296        }
1297    }
1298
1299    /// Compresses boolean values.
1300    fn compress_as_booleans(&mut self) {
1301        let mut values: Vec<(u64, bool)> = Vec::new();
1302        let mut non_bool_values: FxHashMap<Id, Value> = FxHashMap::default();
1303
1304        for (&id, value) in &self.values {
1305            match value {
1306                Value::Bool(b) => {
1307                    values.push((id.as_u64(), *b));
1308                }
1309                _ => {
1310                    non_bool_values.insert(id, value.clone());
1311                }
1312            }
1313        }
1314
1315        if values.len() < 8 {
1316            return;
1317        }
1318
1319        // Sort by ID
1320        values.sort_by_key(|(id, _)| *id);
1321
1322        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
1323        let index_to_id: Vec<u64> = id_to_index.clone();
1324        let bool_values: Vec<bool> = values.iter().map(|(_, v)| *v).collect();
1325
1326        let Ok(compressed) = TypeSpecificCompressor::compress_booleans(&bool_values) else {
1327            return;
1328        };
1329
1330        // Booleans always compress well (8x)
1331        self.block_zone_maps = compute_block_zone_maps(bool_values.iter().map(|b| Value::Bool(*b)));
1332        self.compressed = Some(CompressedColumnData::Booleans {
1333            data: compressed,
1334            id_to_index,
1335            index_to_id,
1336        });
1337        self.compressed_count = values.len();
1338        self.values = non_bool_values;
1339    }
1340
1341    /// Decompresses all values back to the hot buffer.
1342    fn decompress_all(&mut self) {
1343        let Some(compressed) = self.compressed.take() else {
1344            return;
1345        };
1346
1347        match compressed {
1348            CompressedColumnData::Integers {
1349                data, index_to_id, ..
1350            } => {
1351                if let Ok(values) = TypeSpecificCompressor::decompress_integers(&data) {
1352                    // Convert back to signed using zigzag decoding
1353                    let signed: Vec<i64> = values
1354                        .iter()
1355                        .map(|&v| crate::codec::zigzag_decode(v))
1356                        .collect();
1357
1358                    for (i, id_u64) in index_to_id.iter().enumerate() {
1359                        if let Some(&value) = signed.get(i) {
1360                            let id = Id::from_u64(*id_u64);
1361                            self.values.insert(id, Value::Int64(value));
1362                        }
1363                    }
1364                }
1365            }
1366            CompressedColumnData::Strings {
1367                encoding,
1368                index_to_id,
1369                ..
1370            } => {
1371                for (i, id_u64) in index_to_id.iter().enumerate() {
1372                    if let Some(s) = encoding.get(i) {
1373                        let id = Id::from_u64(*id_u64);
1374                        self.values.insert(id, Value::String(ArcStr::from(s)));
1375                    }
1376                }
1377            }
1378            CompressedColumnData::Booleans {
1379                data, index_to_id, ..
1380            } => {
1381                if let Ok(values) = TypeSpecificCompressor::decompress_booleans(&data) {
1382                    for (i, id_u64) in index_to_id.iter().enumerate() {
1383                        if let Some(&value) = values.get(i) {
1384                            let id = Id::from_u64(*id_u64);
1385                            self.values.insert(id, Value::Bool(value));
1386                        }
1387                    }
1388                }
1389            }
1390        }
1391
1392        self.compressed_count = 0;
1393        self.block_zone_maps.clear();
1394    }
1395
1396    /// Forces compression regardless of thresholds.
1397    ///
1398    /// Useful for bulk loading or when you know the column is complete.
1399    pub fn force_compress(&mut self) {
1400        self.compress();
1401    }
1402
1403    /// Returns the zone map for this column.
1404    #[must_use]
1405    pub fn zone_map(&self) -> &ZoneMapEntry {
1406        &self.zone_map
1407    }
1408
1409    /// Returns the per-block zone maps populated when the column was
1410    /// compressed.
1411    ///
1412    /// Each entry covers a contiguous slice of `DEFAULT_BLOCK_ROWS` rows
1413    /// of the sorted compressed array. Returns an empty slice when the
1414    /// column is uncompressed (the hot buffer is unordered, so per-block
1415    /// pruning would be meaningless). Phase 4 consumes these for lazy
1416    /// `range_iter`-style scans.
1417    #[must_use]
1418    pub fn block_zone_maps(&self) -> &[ZoneMapEntry] {
1419        &self.block_zone_maps
1420    }
1421
1422    /// Returns the number of compressed blocks. Equal to
1423    /// `block_zone_maps().len()`. Returns 0 for uncompressed columns.
1424    #[must_use]
1425    pub fn block_count(&self) -> usize {
1426        self.block_zone_maps.len()
1427    }
1428
1429    /// Decodes a single compressed block into `(id, value)` pairs.
1430    ///
1431    /// Returns `None` when the column is uncompressed or when `block_idx`
1432    /// is out of range. The block contains exactly the rows that
1433    /// [`block_zone_maps`](Self::block_zone_maps) at the same index
1434    /// describes (`row_count` entries).
1435    ///
1436    /// **Today** the implementation decodes the full compressed array
1437    /// once and slices the result; both calls are O(N). When Phase 5/6
1438    /// makes blocks independently decodable on-disk, the API contract
1439    /// stays the same and the implementation becomes per-block.
1440    #[must_use]
1441    pub fn decode_block(&self, block_idx: usize) -> Option<DecodedBlock<Id>> {
1442        let zone_map = self.block_zone_maps.get(block_idx)?.clone();
1443        let compressed = self.compressed.as_ref()?;
1444
1445        let block_size = DEFAULT_BLOCK_ROWS as usize;
1446        let start = block_idx * block_size;
1447        let end = match self.compressed_count.min(start + block_size) {
1448            // Bounds-check: the last block may be short.
1449            n if n > start => n,
1450            _ => return None,
1451        };
1452
1453        let entries = match compressed {
1454            CompressedColumnData::Integers {
1455                data, index_to_id, ..
1456            } => {
1457                let raw = TypeSpecificCompressor::decompress_integers(data).ok()?;
1458                let signed: Vec<i64> = raw
1459                    .iter()
1460                    .map(|&v| crate::codec::zigzag_decode(v))
1461                    .collect();
1462                index_to_id
1463                    .iter()
1464                    .zip(signed.iter())
1465                    .skip(start)
1466                    .take(end - start)
1467                    .map(|(&id_u64, &value)| (Id::from_u64(id_u64), Value::Int64(value)))
1468                    .collect()
1469            }
1470            CompressedColumnData::Strings {
1471                encoding,
1472                index_to_id,
1473                ..
1474            } => index_to_id
1475                .iter()
1476                .enumerate()
1477                .skip(start)
1478                .take(end - start)
1479                .filter_map(|(i, &id_u64)| {
1480                    encoding
1481                        .get(i)
1482                        .map(|s| (Id::from_u64(id_u64), Value::String(ArcStr::from(s))))
1483                })
1484                .collect(),
1485            CompressedColumnData::Booleans {
1486                data, index_to_id, ..
1487            } => {
1488                let raw = TypeSpecificCompressor::decompress_booleans(data).ok()?;
1489                index_to_id
1490                    .iter()
1491                    .zip(raw.iter())
1492                    .skip(start)
1493                    .take(end - start)
1494                    .map(|(&id_u64, &value)| (Id::from_u64(id_u64), Value::Bool(value)))
1495                    .collect()
1496            }
1497        };
1498
1499        Some(DecodedBlock { zone_map, entries })
1500    }
1501
1502    /// Iterates all compressed blocks, decoding each in turn.
1503    ///
1504    /// Empty for uncompressed columns. Phase 4's iterator-bounds operator
1505    /// will prefer `decode_block(idx)` after pruning via
1506    /// [`block_zone_maps`](Self::block_zone_maps); this iterator is the
1507    /// "decode everything" fallback.
1508    pub fn iter_decoded_blocks(&self) -> impl Iterator<Item = DecodedBlock<Id>> + '_ {
1509        (0..self.block_count()).filter_map(|idx| self.decode_block(idx))
1510    }
1511
1512    /// Uses zone map to check if any values could satisfy the predicate.
1513    ///
1514    /// Returns `false` when we can prove no values match (so the column
1515    /// can be skipped entirely). Returns `true` if values might match.
1516    #[must_use]
1517    pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
1518        if self.zone_map_dirty {
1519            // Conservative: can't skip if zone map is stale
1520            return true;
1521        }
1522
1523        match op {
1524            CompareOp::Eq => self.zone_map.might_contain_equal(value),
1525            CompareOp::Ne => {
1526                // Can only skip if all values are equal to the value
1527                // (which means min == max == value)
1528                match (&self.zone_map.min, &self.zone_map.max) {
1529                    (Some(min), Some(max)) => {
1530                        !(compare_values(min, value) == Some(Ordering::Equal)
1531                            && compare_values(max, value) == Some(Ordering::Equal))
1532                    }
1533                    _ => true,
1534                }
1535            }
1536            CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
1537            CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
1538            CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
1539            CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
1540        }
1541    }
1542
1543    /// Rebuilds zone map from current values.
1544    pub fn rebuild_zone_map(&mut self) {
1545        let mut zone_map = ZoneMapEntry::new();
1546
1547        for value in self.values.values() {
1548            zone_map.row_count += 1;
1549
1550            if matches!(value, Value::Null) {
1551                zone_map.null_count += 1;
1552                continue;
1553            }
1554
1555            // Update min
1556            match &zone_map.min {
1557                None => zone_map.min = Some(value.clone()),
1558                Some(current) => {
1559                    if compare_values(value, current) == Some(Ordering::Less) {
1560                        zone_map.min = Some(value.clone());
1561                    }
1562                }
1563            }
1564
1565            // Update max
1566            match &zone_map.max {
1567                None => zone_map.max = Some(value.clone()),
1568                Some(current) => {
1569                    if compare_values(value, current) == Some(Ordering::Greater) {
1570                        zone_map.max = Some(value.clone());
1571                    }
1572                }
1573            }
1574        }
1575
1576        self.zone_map = zone_map;
1577        self.zone_map_dirty = false;
1578    }
1579}
1580
1581// === Temporal implementation: VersionLog-backed property column ===
1582//
1583// **Zone map limitation**: zone maps track min/max across the *latest* values
1584// only (see `rebuild_zone_map`). For temporal queries at old epochs, the zone
1585// map may produce false negatives: it could reject a column based on current
1586// min/max even though historical values would match. This is a known
1587// trade-off: temporal queries are conservative but never return wrong results
1588// (the `zone_map_dirty` fallback returns `true` = "might match").
1589//
1590// **Compression**: disabled in temporal mode because the underlying codecs
1591// (DeltaBitPacked, Dictionary, BitVector) operate on flat `FxHashMap<Id, Value>`
1592// arrays, not `FxHashMap<Id, VersionLog<Value>>`. Per-epoch compression is a
1593// potential future optimization.
1594#[cfg(feature = "temporal")]
1595impl<Id: EntityId> PropertyColumn<Id> {
1596    /// Creates a new empty column.
1597    #[must_use]
1598    pub fn new() -> Self {
1599        Self {
1600            values: FxHashMap::default(),
1601            zone_map: ZoneMapEntry::new(),
1602            zone_map_dirty: false,
1603            compression_mode: CompressionMode::None,
1604            block_zone_maps: Vec::new(),
1605        }
1606    }
1607
1608    /// Creates a new column with the specified compression mode.
1609    #[must_use]
1610    pub fn with_compression(mode: CompressionMode) -> Self {
1611        Self {
1612            values: FxHashMap::default(),
1613            zone_map: ZoneMapEntry::new(),
1614            zone_map_dirty: false,
1615            compression_mode: mode,
1616            block_zone_maps: Vec::new(),
1617        }
1618    }
1619
1620    /// Sets the compression mode for this column.
1621    pub fn set_compression_mode(&mut self, mode: CompressionMode) {
1622        self.compression_mode = mode;
1623    }
1624
1625    /// Returns the compression mode for this column.
1626    #[must_use]
1627    pub fn compression_mode(&self) -> CompressionMode {
1628        self.compression_mode
1629    }
1630
1631    /// Sets a value for an entity, appending to its version log.
1632    ///
1633    /// For non-transactional writes, pass the current epoch.
1634    /// For transactional writes, pass `EpochId::PENDING`.
1635    pub fn set(&mut self, id: Id, value: Value, epoch: EpochId) {
1636        self.update_zone_map_on_insert(&value);
1637        self.values.entry(id).or_default().append(epoch, value);
1638    }
1639
1640    /// Updates zone map when inserting a value.
1641    fn update_zone_map_on_insert(&mut self, value: &Value) {
1642        self.zone_map.row_count += 1;
1643
1644        if matches!(value, Value::Null) {
1645            self.zone_map.null_count += 1;
1646            return;
1647        }
1648
1649        match &self.zone_map.min {
1650            None => self.zone_map.min = Some(value.clone()),
1651            Some(current) => {
1652                if compare_values(value, current) == Some(Ordering::Less) {
1653                    self.zone_map.min = Some(value.clone());
1654                }
1655            }
1656        }
1657
1658        match &self.zone_map.max {
1659            None => self.zone_map.max = Some(value.clone()),
1660            Some(current) => {
1661                if compare_values(value, current) == Some(Ordering::Greater) {
1662                    self.zone_map.max = Some(value.clone());
1663                }
1664            }
1665        }
1666    }
1667
1668    /// Gets the latest value for an entity, filtering out tombstones (Null).
1669    #[must_use]
1670    pub fn get(&self, id: Id) -> Option<Value> {
1671        self.values
1672            .get(&id)
1673            .and_then(|log| log.latest())
1674            .filter(|v| !v.is_null())
1675            .cloned()
1676    }
1677
1678    /// Removes a value by appending a tombstone (Null) at the given epoch.
1679    pub fn remove(&mut self, id: Id, epoch: EpochId) -> Option<Value> {
1680        let previous = self.get(id);
1681        if previous.is_some() {
1682            self.values
1683                .entry(id)
1684                .or_default()
1685                .append(epoch, Value::Null);
1686            self.zone_map_dirty = true;
1687        }
1688        previous
1689    }
1690
1691    /// Returns the number of live (non-tombstoned) values in this column.
1692    #[must_use]
1693    pub fn len(&self) -> usize {
1694        self.values
1695            .values()
1696            .filter(|log| log.latest().is_some_and(|v| !v.is_null()))
1697            .count()
1698    }
1699
1700    /// Returns true if this column is empty.
1701    #[cfg(test)]
1702    #[must_use]
1703    #[allow(dead_code)]
1704    pub fn is_empty(&self) -> bool {
1705        self.len() == 0
1706    }
1707
1708    /// Returns compression statistics for this column.
1709    ///
1710    /// In temporal mode, compression is not used. Reports live value count only.
1711    #[must_use]
1712    pub fn compression_stats(&self) -> CompressionStats {
1713        let live_count = self.len();
1714        let hot_size = live_count * std::mem::size_of::<Value>();
1715
1716        CompressionStats {
1717            uncompressed_size: hot_size,
1718            compressed_size: hot_size,
1719            value_count: live_count,
1720            codec: None,
1721        }
1722    }
1723
1724    /// Returns estimated heap memory for this column.
1725    #[must_use]
1726    pub fn heap_memory_bytes(&self) -> usize {
1727        self.values.capacity()
1728            * (std::mem::size_of::<Id>() + std::mem::size_of::<VersionLog<Value>>() + 1)
1729    }
1730
1731    /// Compression is not supported in temporal mode (no-op).
1732    pub fn compress(&mut self) {}
1733
1734    /// Forces compression (no-op in temporal mode).
1735    pub fn force_compress(&mut self) {}
1736
1737    /// Returns the zone map for this column.
1738    #[must_use]
1739    pub fn zone_map(&self) -> &ZoneMapEntry {
1740        &self.zone_map
1741    }
1742
1743    /// Returns the per-block zone maps for this column.
1744    ///
1745    /// Always empty in temporal mode: compression is disabled for
1746    /// `VersionLog`-backed columns (see module-level note), so there is
1747    /// no sorted compressed array to chunk into blocks.
1748    #[must_use]
1749    pub fn block_zone_maps(&self) -> &[ZoneMapEntry] {
1750        &self.block_zone_maps
1751    }
1752
1753    /// Uses zone map to check if any values could satisfy the predicate.
1754    #[must_use]
1755    pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
1756        if self.zone_map_dirty {
1757            return true;
1758        }
1759
1760        match op {
1761            CompareOp::Eq => self.zone_map.might_contain_equal(value),
1762            CompareOp::Ne => match (&self.zone_map.min, &self.zone_map.max) {
1763                (Some(min), Some(max)) => {
1764                    !(compare_values(min, value) == Some(Ordering::Equal)
1765                        && compare_values(max, value) == Some(Ordering::Equal))
1766                }
1767                _ => true,
1768            },
1769            CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
1770            CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
1771            CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
1772            CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
1773        }
1774    }
1775
1776    /// Rebuilds zone map from current (latest) values.
1777    pub fn rebuild_zone_map(&mut self) {
1778        let mut zone_map = ZoneMapEntry::new();
1779
1780        for log in self.values.values() {
1781            if let Some(value) = log.latest() {
1782                zone_map.row_count += 1;
1783
1784                if matches!(value, Value::Null) {
1785                    zone_map.null_count += 1;
1786                    continue;
1787                }
1788
1789                match &zone_map.min {
1790                    None => zone_map.min = Some(value.clone()),
1791                    Some(current) => {
1792                        if compare_values(value, current) == Some(Ordering::Less) {
1793                            zone_map.min = Some(value.clone());
1794                        }
1795                    }
1796                }
1797
1798                match &zone_map.max {
1799                    None => zone_map.max = Some(value.clone()),
1800                    Some(current) => {
1801                        if compare_values(value, current) == Some(Ordering::Greater) {
1802                            zone_map.max = Some(value.clone());
1803                        }
1804                    }
1805                }
1806            }
1807        }
1808
1809        self.zone_map = zone_map;
1810        self.zone_map_dirty = false;
1811    }
1812
1813    // === Temporal-only methods ===
1814
1815    /// Gets the value at a specific epoch via binary search, filtering tombstones.
1816    #[must_use]
1817    pub fn get_at(&self, id: Id, epoch: EpochId) -> Option<Value> {
1818        self.values
1819            .get(&id)
1820            .and_then(|log| log.at(epoch))
1821            .filter(|v| !v.is_null())
1822            .cloned()
1823    }
1824
1825    /// Replaces PENDING epochs with the real commit epoch in all version logs.
1826    pub fn finalize_pending(&mut self, real_epoch: EpochId) {
1827        for log in self.values.values_mut() {
1828            log.finalize_pending(real_epoch);
1829        }
1830    }
1831
1832    /// Removes all PENDING entries from all version logs (transaction rollback).
1833    pub fn remove_pending(&mut self) {
1834        for log in self.values.values_mut() {
1835            log.remove_pending();
1836        }
1837        self.values.retain(|_, log| !log.is_empty());
1838    }
1839
1840    /// Garbage-collects old versions from all version logs.
1841    pub fn gc(&mut self, min_epoch: EpochId) {
1842        for log in self.values.values_mut() {
1843            log.gc(min_epoch);
1844        }
1845        self.values.retain(|_, log| !log.is_empty());
1846    }
1847
1848    /// Removes PENDING entries for a specific entity (targeted rollback).
1849    pub fn remove_pending_for(&mut self, id: Id) {
1850        if let Some(log) = self.values.get_mut(&id) {
1851            log.remove_pending();
1852            if log.is_empty() {
1853                self.values.remove(&id);
1854            }
1855        }
1856    }
1857
1858    /// Removes up to `n` PENDING entries for a specific entity.
1859    ///
1860    /// Used by savepoint rollback to pop only the entries added after the
1861    /// savepoint, leaving earlier PENDING entries intact.
1862    pub fn pop_n_pending_for(&mut self, id: Id, n: usize) {
1863        if let Some(log) = self.values.get_mut(&id) {
1864            log.pop_n_pending(n);
1865            if log.is_empty() {
1866                self.values.remove(&id);
1867            }
1868        }
1869    }
1870}
1871
1872/// Computes per-block zone maps for a sorted column by chunking the values
1873/// into blocks of [`DEFAULT_BLOCK_ROWS`] rows.
1874///
1875/// Used by `compress_as_*` to populate `PropertyColumn::block_zone_maps`.
1876/// The values must already be in the order in which they will be stored
1877/// (sorted by entity id today). Each block records min/max/null/row counts;
1878/// `Float64` and other variants without a defined `Ord` impl skip min/max
1879/// updates and contribute only to row/null counts (matching the column-
1880/// level zone map's behavior).
1881#[cfg(not(feature = "temporal"))]
1882fn compute_block_zone_maps(values: impl IntoIterator<Item = Value>) -> Vec<ZoneMapEntry> {
1883    let block_size = DEFAULT_BLOCK_ROWS as usize;
1884    let mut blocks: Vec<ZoneMapEntry> = Vec::new();
1885    let mut current = ZoneMapEntry::new();
1886    let mut current_rows: usize = 0;
1887
1888    for value in values {
1889        if current_rows == block_size {
1890            blocks.push(current);
1891            current = ZoneMapEntry::new();
1892            current_rows = 0;
1893        }
1894        current.row_count += 1;
1895        current_rows += 1;
1896
1897        if matches!(value, Value::Null) {
1898            current.null_count += 1;
1899            continue;
1900        }
1901
1902        // Reflexive-comparison guard: only seed/update min/max with values
1903        // that have a defined ordering against themselves. This filters out
1904        // `Float64(NaN)` (and any future variant whose `compare_values` arm
1905        // returns `None`) so a NaN can never poison the running min/max.
1906        if compare_values(&value, &value) != Some(Ordering::Equal) {
1907            continue;
1908        }
1909
1910        let is_less_than_min = match &current.min {
1911            None => true,
1912            Some(existing) => compare_values(&value, existing) == Some(Ordering::Less),
1913        };
1914        let is_greater_than_max = match &current.max {
1915            None => true,
1916            Some(existing) => compare_values(&value, existing) == Some(Ordering::Greater),
1917        };
1918        if is_less_than_min {
1919            current.min = Some(value.clone());
1920        }
1921        if is_greater_than_max {
1922            current.max = Some(value);
1923        }
1924    }
1925
1926    if current_rows > 0 {
1927        blocks.push(current);
1928    }
1929    blocks
1930}
1931
1932/// Compares two values for ordering.
1933fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
1934    match (a, b) {
1935        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
1936        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
1937        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
1938        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
1939        (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
1940        (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
1941        (Value::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
1942        (Value::Date(a), Value::Date(b)) => Some(a.cmp(b)),
1943        (Value::Time(a), Value::Time(b)) => Some(a.cmp(b)),
1944        _ => None,
1945    }
1946}
1947
1948impl<Id: EntityId> Default for PropertyColumn<Id> {
1949    fn default() -> Self {
1950        Self::new()
1951    }
1952}
1953
1954/// A borrowed reference to a property column for bulk reads.
1955///
1956/// Holds the read lock so the column can't change while you're iterating.
1957pub struct PropertyColumnRef<'a, Id: EntityId = NodeId> {
1958    _guard: parking_lot::RwLockReadGuard<'a, FxHashMap<PropertyKey, PropertyColumn<Id>>>,
1959    _key: PropertyKey,
1960    _marker: PhantomData<Id>,
1961}
1962
1963#[cfg(test)]
1964#[cfg(not(feature = "temporal"))]
1965mod tests {
1966    use super::*;
1967    use arcstr::ArcStr;
1968
1969    #[test]
1970    fn test_property_storage_basic() {
1971        let storage = PropertyStorage::new();
1972
1973        let node1 = NodeId::new(1);
1974        let node2 = NodeId::new(2);
1975        let name_key = PropertyKey::new("name");
1976        let age_key = PropertyKey::new("age");
1977
1978        storage.set(node1, name_key.clone(), "Alix".into());
1979        storage.set(node1, age_key.clone(), 30i64.into());
1980        storage.set(node2, name_key.clone(), "Gus".into());
1981
1982        assert_eq!(
1983            storage.get(node1, &name_key),
1984            Some(Value::String("Alix".into()))
1985        );
1986        assert_eq!(storage.get(node1, &age_key), Some(Value::Int64(30)));
1987        assert_eq!(
1988            storage.get(node2, &name_key),
1989            Some(Value::String("Gus".into()))
1990        );
1991        assert!(storage.get(node2, &age_key).is_none());
1992    }
1993
1994    #[test]
1995    fn test_property_storage_remove() {
1996        let storage = PropertyStorage::new();
1997
1998        let node = NodeId::new(1);
1999        let key = PropertyKey::new("name");
2000
2001        storage.set(node, key.clone(), "Alix".into());
2002        assert!(storage.get(node, &key).is_some());
2003
2004        let removed = storage.remove(node, &key);
2005        assert!(removed.is_some());
2006        assert!(storage.get(node, &key).is_none());
2007    }
2008
2009    #[test]
2010    fn test_property_storage_get_all() {
2011        let storage = PropertyStorage::new();
2012
2013        let node = NodeId::new(1);
2014        storage.set(node, PropertyKey::new("name"), "Alix".into());
2015        storage.set(node, PropertyKey::new("age"), 30i64.into());
2016        storage.set(node, PropertyKey::new("active"), true.into());
2017
2018        let props = storage.get_all(node);
2019        assert_eq!(props.len(), 3);
2020    }
2021
2022    #[test]
2023    fn test_property_storage_remove_all() {
2024        let storage = PropertyStorage::new();
2025
2026        let node = NodeId::new(1);
2027        storage.set(node, PropertyKey::new("name"), "Alix".into());
2028        storage.set(node, PropertyKey::new("age"), 30i64.into());
2029
2030        storage.remove_all(node);
2031
2032        assert!(storage.get(node, &PropertyKey::new("name")).is_none());
2033        assert!(storage.get(node, &PropertyKey::new("age")).is_none());
2034    }
2035
2036    #[test]
2037    fn test_property_column() {
2038        let mut col = PropertyColumn::new();
2039
2040        col.set(NodeId::new(1), "Alix".into());
2041        col.set(NodeId::new(2), "Gus".into());
2042
2043        assert_eq!(col.len(), 2);
2044        assert!(!col.is_empty());
2045
2046        assert_eq!(col.get(NodeId::new(1)), Some(Value::String("Alix".into())));
2047
2048        col.remove(NodeId::new(1));
2049        assert!(col.get(NodeId::new(1)).is_none());
2050        assert_eq!(col.len(), 1);
2051    }
2052
2053    #[test]
2054    fn test_compression_mode() {
2055        let col: PropertyColumn<NodeId> = PropertyColumn::new();
2056        assert_eq!(col.compression_mode(), CompressionMode::None);
2057
2058        let col: PropertyColumn<NodeId> = PropertyColumn::with_compression(CompressionMode::Auto);
2059        assert_eq!(col.compression_mode(), CompressionMode::Auto);
2060    }
2061
2062    #[test]
2063    fn test_property_storage_with_compression() {
2064        let storage = PropertyStorage::with_compression(CompressionMode::Auto);
2065
2066        for i in 0u64..100 {
2067            let age = 20 + i64::try_from(i % 50).unwrap();
2068            storage.set(NodeId::new(i), PropertyKey::new("age"), Value::Int64(age));
2069        }
2070
2071        // Values should still be readable
2072        assert_eq!(
2073            storage.get(NodeId::new(0), &PropertyKey::new("age")),
2074            Some(Value::Int64(20))
2075        );
2076        assert_eq!(
2077            storage.get(NodeId::new(50), &PropertyKey::new("age")),
2078            Some(Value::Int64(20))
2079        );
2080    }
2081
2082    #[test]
2083    fn test_compress_integer_column() {
2084        let mut col: PropertyColumn<NodeId> =
2085            PropertyColumn::with_compression(CompressionMode::Auto);
2086
2087        // Add many sequential integers
2088        for i in 0u64..2000 {
2089            col.set(
2090                NodeId::new(i),
2091                Value::Int64(1000 + i64::try_from(i).unwrap()),
2092            );
2093        }
2094
2095        // Should have triggered compression at some point
2096        // Total count should include both compressed and hot buffer values
2097        let stats = col.compression_stats();
2098        assert_eq!(stats.value_count, 2000);
2099
2100        // Values from the hot buffer should be readable
2101        // Note: Compressed values are not accessible via get() - see design note
2102        let last_value = col.get(NodeId::new(1999));
2103        assert!(last_value.is_some() || col.is_compressed());
2104    }
2105
2106    #[test]
2107    fn test_compress_string_column() {
2108        let mut col: PropertyColumn<NodeId> =
2109            PropertyColumn::with_compression(CompressionMode::Auto);
2110
2111        // Add repeated strings (good for dictionary compression)
2112        let categories = ["Person", "Company", "Product", "Location"];
2113        for i in 0..2000 {
2114            let cat = categories[i % 4];
2115            col.set(NodeId::new(i as u64), Value::String(ArcStr::from(cat)));
2116        }
2117
2118        // Total count should be correct
2119        assert_eq!(col.len(), 2000);
2120
2121        // Late values should be in hot buffer and readable
2122        let last_value = col.get(NodeId::new(1999));
2123        assert!(last_value.is_some() || col.is_compressed());
2124    }
2125
2126    #[test]
2127    fn test_compress_boolean_column() {
2128        let mut col: PropertyColumn<NodeId> =
2129            PropertyColumn::with_compression(CompressionMode::Auto);
2130
2131        // Add booleans
2132        for i in 0u64..2000 {
2133            col.set(NodeId::new(i), Value::Bool(i % 2 == 0));
2134        }
2135
2136        // Verify total count
2137        assert_eq!(col.len(), 2000);
2138
2139        // Late values should be readable
2140        let last_value = col.get(NodeId::new(1999));
2141        assert!(last_value.is_some() || col.is_compressed());
2142    }
2143
2144    #[test]
2145    fn test_force_compress() {
2146        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2147
2148        // Add fewer values than the threshold
2149        for i in 0u64..100 {
2150            col.set(NodeId::new(i), Value::Int64(i64::try_from(i).unwrap()));
2151        }
2152
2153        // Force compression
2154        col.force_compress();
2155
2156        // Stats should show compression was applied if beneficial
2157        let stats = col.compression_stats();
2158        assert_eq!(stats.value_count, 100);
2159    }
2160
2161    #[test]
2162    fn test_compression_stats() {
2163        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2164
2165        for i in 0u64..50 {
2166            col.set(NodeId::new(i), Value::Int64(i64::try_from(i).unwrap()));
2167        }
2168
2169        let stats = col.compression_stats();
2170        assert_eq!(stats.value_count, 50);
2171        assert!(stats.uncompressed_size > 0);
2172    }
2173
2174    #[test]
2175    fn test_storage_compression_stats() {
2176        let storage = PropertyStorage::with_compression(CompressionMode::Auto);
2177
2178        for i in 0u64..100 {
2179            storage.set(
2180                NodeId::new(i),
2181                PropertyKey::new("age"),
2182                Value::Int64(i64::try_from(i).unwrap()),
2183            );
2184            storage.set(
2185                NodeId::new(i),
2186                PropertyKey::new("name"),
2187                Value::String(ArcStr::from("Alix")),
2188            );
2189        }
2190
2191        let stats = storage.compression_stats();
2192        assert_eq!(stats.len(), 2); // Two columns
2193        assert!(stats.contains_key(&PropertyKey::new("age")));
2194        assert!(stats.contains_key(&PropertyKey::new("name")));
2195    }
2196
2197    #[test]
2198    fn test_memory_usage() {
2199        let storage = PropertyStorage::new();
2200
2201        for i in 0u64..100 {
2202            storage.set(
2203                NodeId::new(i),
2204                PropertyKey::new("value"),
2205                Value::Int64(i64::try_from(i).unwrap()),
2206            );
2207        }
2208
2209        let usage = storage.memory_usage();
2210        assert!(usage > 0);
2211    }
2212
2213    #[test]
2214    fn test_get_batch_single_property() {
2215        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
2216
2217        let node1 = NodeId::new(1);
2218        let node2 = NodeId::new(2);
2219        let node3 = NodeId::new(3);
2220        let age_key = PropertyKey::new("age");
2221
2222        storage.set(node1, age_key.clone(), 25i64.into());
2223        storage.set(node2, age_key.clone(), 30i64.into());
2224        // node3 has no age property
2225
2226        let ids = vec![node1, node2, node3];
2227        let values = storage.get_batch(&ids, &age_key);
2228
2229        assert_eq!(values.len(), 3);
2230        assert_eq!(values[0], Some(Value::Int64(25)));
2231        assert_eq!(values[1], Some(Value::Int64(30)));
2232        assert_eq!(values[2], None);
2233    }
2234
2235    #[test]
2236    fn test_get_batch_missing_column() {
2237        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
2238
2239        let node1 = NodeId::new(1);
2240        let node2 = NodeId::new(2);
2241        let missing_key = PropertyKey::new("nonexistent");
2242
2243        let ids = vec![node1, node2];
2244        let values = storage.get_batch(&ids, &missing_key);
2245
2246        assert_eq!(values.len(), 2);
2247        assert_eq!(values[0], None);
2248        assert_eq!(values[1], None);
2249    }
2250
2251    #[test]
2252    fn test_get_batch_empty_ids() {
2253        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
2254        let key = PropertyKey::new("any");
2255
2256        let values = storage.get_batch(&[], &key);
2257        assert!(values.is_empty());
2258    }
2259
2260    #[test]
2261    fn test_get_all_batch() {
2262        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
2263
2264        let node1 = NodeId::new(1);
2265        let node2 = NodeId::new(2);
2266        let node3 = NodeId::new(3);
2267
2268        storage.set(node1, PropertyKey::new("name"), "Alix".into());
2269        storage.set(node1, PropertyKey::new("age"), 25i64.into());
2270        storage.set(node2, PropertyKey::new("name"), "Gus".into());
2271        // node3 has no properties
2272
2273        let ids = vec![node1, node2, node3];
2274        let all_props = storage.get_all_batch(&ids);
2275
2276        assert_eq!(all_props.len(), 3);
2277        assert_eq!(all_props[0].len(), 2); // name and age
2278        assert_eq!(all_props[1].len(), 1); // name only
2279        assert_eq!(all_props[2].len(), 0); // no properties
2280
2281        assert_eq!(
2282            all_props[0].get(&PropertyKey::new("name")),
2283            Some(&Value::String("Alix".into()))
2284        );
2285        assert_eq!(
2286            all_props[1].get(&PropertyKey::new("name")),
2287            Some(&Value::String("Gus".into()))
2288        );
2289    }
2290
2291    #[test]
2292    fn test_get_all_batch_empty_ids() {
2293        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
2294
2295        let all_props = storage.get_all_batch(&[]);
2296        assert!(all_props.is_empty());
2297    }
2298
2299    // ── Phase 2d: per-block zone maps ─────────────────────────────────
2300
2301    #[test]
2302    fn test_block_zone_maps_empty_for_uncompressed_column() {
2303        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2304        for i in 0u64..50 {
2305            col.set(NodeId::new(i), Value::Int64(i64::try_from(i).unwrap()));
2306        }
2307        assert!(col.block_zone_maps().is_empty());
2308    }
2309
2310    #[test]
2311    fn test_block_zone_maps_integer_compressed() {
2312        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2313        // 2500 sequential integers → 1024 + 1024 + 452 = 3 blocks at default size
2314        for i in 0u64..2500 {
2315            col.set(
2316                NodeId::new(i),
2317                Value::Int64(1000 + i64::try_from(i).unwrap()),
2318            );
2319        }
2320        col.force_compress();
2321
2322        let blocks = col.block_zone_maps();
2323        assert_eq!(blocks.len(), 3, "2500 rows / 1024 = 3 blocks");
2324
2325        assert_eq!(blocks[0].row_count, 1024);
2326        assert_eq!(blocks[0].min, Some(Value::Int64(1000)));
2327        assert_eq!(blocks[0].max, Some(Value::Int64(2023)));
2328
2329        assert_eq!(blocks[1].row_count, 1024);
2330        assert_eq!(blocks[1].min, Some(Value::Int64(2024)));
2331        assert_eq!(blocks[1].max, Some(Value::Int64(3047)));
2332
2333        assert_eq!(blocks[2].row_count, 452);
2334        assert_eq!(blocks[2].min, Some(Value::Int64(3048)));
2335        assert_eq!(blocks[2].max, Some(Value::Int64(3499)));
2336    }
2337
2338    #[test]
2339    fn test_block_zone_maps_string_compressed() {
2340        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2341        // Low-cardinality cycle (good for dictionary compression). Every
2342        // block contains all four strings, so per-block min/max match the
2343        // overall range, but row counts still segment into 1024+1024+rest.
2344        let strings = ["alpha", "bravo", "charlie", "delta"];
2345        for i in 0u64..2500 {
2346            col.set(
2347                NodeId::new(i),
2348                Value::String(ArcStr::from(strings[(i % 4) as usize])),
2349            );
2350        }
2351        col.force_compress();
2352
2353        let blocks = col.block_zone_maps();
2354        assert_eq!(blocks.len(), 3);
2355        assert_eq!(blocks[0].row_count, 1024);
2356        assert_eq!(blocks[1].row_count, 1024);
2357        assert_eq!(blocks[2].row_count, 452);
2358        for block in blocks {
2359            assert_eq!(block.min, Some(Value::String(ArcStr::from("alpha"))));
2360            assert_eq!(block.max, Some(Value::String(ArcStr::from("delta"))));
2361        }
2362    }
2363
2364    #[test]
2365    fn test_block_zone_maps_boolean_compressed() {
2366        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2367        // 2500 alternating bools sorted by id: every block contains both true and false.
2368        for i in 0u64..2500 {
2369            col.set(NodeId::new(i), Value::Bool(i % 2 == 0));
2370        }
2371        col.force_compress();
2372
2373        let blocks = col.block_zone_maps();
2374        assert_eq!(blocks.len(), 3);
2375        for block in blocks {
2376            assert_eq!(block.min, Some(Value::Bool(false)));
2377            assert_eq!(block.max, Some(Value::Bool(true)));
2378            assert_eq!(block.null_count, 0);
2379        }
2380    }
2381
2382    #[test]
2383    fn test_block_zone_maps_cleared_after_evict() {
2384        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2385        for i in 0u64..2500 {
2386            col.set(NodeId::new(i), Value::Int64(i64::try_from(i).unwrap()));
2387        }
2388        col.force_compress();
2389        assert!(!col.block_zone_maps().is_empty());
2390
2391        col.evict_values();
2392        assert!(
2393            col.block_zone_maps().is_empty(),
2394            "evict drops compressed data, so per-block stats must reset"
2395        );
2396    }
2397
2398    #[test]
2399    fn test_block_might_match_prunes_disjoint_range() {
2400        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2401        for i in 0u64..2500 {
2402            col.set(
2403                NodeId::new(i),
2404                Value::Int64(1000 + i64::try_from(i).unwrap()),
2405            );
2406        }
2407        col.force_compress();
2408
2409        // Block 0 covers values 1000..=2023; querying for 5000 must prune all blocks.
2410        let target = Value::Int64(5000);
2411        let blocks = col.block_zone_maps();
2412        let any_match = blocks.iter().any(|zm| zm.might_contain_equal(&target));
2413        assert!(!any_match, "no block should claim to contain 5000");
2414    }
2415
2416    #[test]
2417    fn test_storage_block_zone_maps_for_returns_blocks() {
2418        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
2419        for i in 0u64..2500 {
2420            storage.set(
2421                NodeId::new(i),
2422                PropertyKey::new("age"),
2423                Value::Int64(20 + i64::try_from(i).unwrap()),
2424            );
2425        }
2426        storage.force_compress_all();
2427
2428        let blocks = storage
2429            .block_zone_maps_for(&PropertyKey::new("age"))
2430            .expect("compressed column must expose block stats");
2431        assert_eq!(blocks.len(), 3);
2432        assert_eq!(blocks[0].min, Some(Value::Int64(20)));
2433        assert_eq!(blocks.last().unwrap().max, Some(Value::Int64(2519)));
2434    }
2435
2436    #[test]
2437    fn test_storage_block_zone_maps_for_missing_column_returns_none() {
2438        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
2439        assert!(
2440            storage
2441                .block_zone_maps_for(&PropertyKey::new("missing"))
2442                .is_none()
2443        );
2444    }
2445
2446    #[test]
2447    fn test_compute_block_zone_maps_float64_finite_min_max() {
2448        // Direct test of the helper: Float64 isn't dispatched by compress_as_*
2449        // today, but the helper must handle Float64 correctly so a future
2450        // compression path can feed Float64 streams without a behavior change.
2451        let values: Vec<Value> = (0u32..2500)
2452            .map(|i| Value::Float64(f64::from(i) * 0.5))
2453            .collect();
2454        let blocks = compute_block_zone_maps(values);
2455
2456        assert_eq!(blocks.len(), 3);
2457        assert_eq!(blocks[0].row_count, 1024);
2458        assert_eq!(blocks[0].min, Some(Value::Float64(0.0)));
2459        assert_eq!(blocks[0].max, Some(Value::Float64(1023.0 * 0.5)));
2460        assert_eq!(blocks[1].min, Some(Value::Float64(1024.0 * 0.5)));
2461    }
2462
2463    // ── Phase 2e: vectorized batch decoders ──────────────────────────
2464
2465    #[test]
2466    fn test_block_count_zero_for_uncompressed_column() {
2467        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2468        for i in 0u64..50 {
2469            col.set(NodeId::new(i), Value::Int64(i64::try_from(i).unwrap()));
2470        }
2471        assert_eq!(col.block_count(), 0);
2472    }
2473
2474    #[test]
2475    fn test_block_count_matches_zone_maps_after_compression() {
2476        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2477        for i in 0u64..2500 {
2478            col.set(NodeId::new(i), Value::Int64(i64::try_from(i).unwrap()));
2479        }
2480        col.force_compress();
2481        assert_eq!(col.block_count(), col.block_zone_maps().len());
2482        assert_eq!(col.block_count(), 3);
2483    }
2484
2485    #[test]
2486    fn test_decode_block_returns_correct_pairs_integer() {
2487        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2488        for i in 0u64..2500 {
2489            col.set(
2490                NodeId::new(i),
2491                Value::Int64(1000 + i64::try_from(i).unwrap()),
2492            );
2493        }
2494        col.force_compress();
2495
2496        let block0 = col.decode_block(0).expect("block 0 must exist");
2497        assert_eq!(block0.entries.len(), 1024);
2498        assert_eq!(block0.entries[0], (NodeId::new(0), Value::Int64(1000)));
2499        assert_eq!(
2500            block0.entries[1023],
2501            (NodeId::new(1023), Value::Int64(2023))
2502        );
2503        assert_eq!(block0.zone_map.row_count, 1024);
2504
2505        let block2 = col.decode_block(2).expect("block 2 must exist");
2506        assert_eq!(block2.entries.len(), 452);
2507        assert_eq!(block2.entries[0], (NodeId::new(2048), Value::Int64(3048)));
2508    }
2509
2510    #[test]
2511    fn test_decode_block_returns_correct_pairs_boolean() {
2512        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2513        for i in 0u64..2500 {
2514            col.set(NodeId::new(i), Value::Bool(i % 2 == 0));
2515        }
2516        col.force_compress();
2517
2518        let block0 = col.decode_block(0).expect("block 0 must exist");
2519        assert_eq!(block0.entries.len(), 1024);
2520        assert_eq!(block0.entries[0], (NodeId::new(0), Value::Bool(true)));
2521        assert_eq!(block0.entries[1], (NodeId::new(1), Value::Bool(false)));
2522    }
2523
2524    #[test]
2525    fn test_decode_block_returns_correct_pairs_string() {
2526        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2527        let strings = ["alpha", "bravo", "charlie", "delta"];
2528        for i in 0u64..2500 {
2529            col.set(
2530                NodeId::new(i),
2531                Value::String(ArcStr::from(strings[(i % 4) as usize])),
2532            );
2533        }
2534        col.force_compress();
2535
2536        let block0 = col.decode_block(0).expect("block 0 must exist");
2537        assert_eq!(block0.entries.len(), 1024);
2538        assert_eq!(
2539            block0.entries[0],
2540            (NodeId::new(0), Value::String(ArcStr::from("alpha")))
2541        );
2542        assert_eq!(
2543            block0.entries[3],
2544            (NodeId::new(3), Value::String(ArcStr::from("delta")))
2545        );
2546    }
2547
2548    #[test]
2549    fn test_decode_block_out_of_range_returns_none() {
2550        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2551        for i in 0u64..2500 {
2552            col.set(NodeId::new(i), Value::Int64(i64::try_from(i).unwrap()));
2553        }
2554        col.force_compress();
2555
2556        assert!(col.decode_block(99).is_none());
2557    }
2558
2559    #[test]
2560    fn test_decode_block_uncompressed_returns_none() {
2561        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2562        for i in 0u64..50 {
2563            col.set(NodeId::new(i), Value::Int64(i64::try_from(i).unwrap()));
2564        }
2565        // No force_compress — column is uncompressed.
2566        assert!(col.decode_block(0).is_none());
2567    }
2568
2569    #[test]
2570    fn test_iter_decoded_blocks_yields_all_blocks() {
2571        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2572        for i in 0u64..2500 {
2573            col.set(NodeId::new(i), Value::Int64(i64::try_from(i).unwrap()));
2574        }
2575        col.force_compress();
2576
2577        let blocks: Vec<_> = col.iter_decoded_blocks().collect();
2578        assert_eq!(blocks.len(), 3);
2579        let total_rows: usize = blocks.iter().map(|b| b.entries.len()).sum();
2580        assert_eq!(total_rows, 2500);
2581    }
2582
2583    #[test]
2584    fn test_compute_block_zone_maps_float64_nan_does_not_poison() {
2585        // NaN must never seed or displace min/max: comparisons against NaN
2586        // return None, which would otherwise leave min/max permanently
2587        // unrecoverable. Reflexive-comparison guard catches this.
2588        let mut values: Vec<Value> = vec![Value::Float64(f64::NAN)];
2589        values.extend((0u32..50).map(|i| Value::Float64(f64::from(i))));
2590        values.push(Value::Float64(f64::NAN));
2591
2592        let blocks = compute_block_zone_maps(values);
2593        assert_eq!(blocks.len(), 1);
2594        assert_eq!(blocks[0].row_count, 52, "NaN values still count as rows");
2595        assert_eq!(blocks[0].null_count, 0, "NaN is not null");
2596        assert_eq!(blocks[0].min, Some(Value::Float64(0.0)));
2597        assert_eq!(blocks[0].max, Some(Value::Float64(49.0)));
2598    }
2599}