Skip to main content

grafeo_core/graph/lpg/
property.rs

1//! Columnar property storage for nodes and edges.
2//!
3//! Properties are stored column-wise (all "name" values together, all "age"
4//! values together) rather than row-wise. This makes filtering fast - to find
5//! all nodes where age > 30, we only scan the age column.
6//!
7//! Each column also maintains a zone map (min/max/null_count) enabling the
8//! query optimizer to skip columns entirely when a predicate can't match.
9//!
10//! ## Compression
11//!
12//! Columns can be compressed to save memory. When compression is enabled,
13//! the column automatically selects the best codec based on the data type:
14//!
15//! | Data type | Codec | Typical savings |
16//! |-----------|-------|-----------------|
17//! | Int64 (sorted) | DeltaBitPacked | 5-20x |
18//! | Int64 (small) | BitPacked | 2-16x |
19//! | Int64 (repeated) | RunLength | 2-100x |
20//! | String (low cardinality) | Dictionary | 2-50x |
21//! | Bool | BitVector | 8x |
22
23use crate::index::zone_map::ZoneMapEntry;
24use crate::storage::{
25    CompressedData, CompressionCodec, DictionaryBuilder, DictionaryEncoding, TypeSpecificCompressor,
26};
27use arcstr::ArcStr;
28use grafeo_common::types::{EdgeId, NodeId, PropertyKey, Value};
29use grafeo_common::utils::hash::FxHashMap;
30use parking_lot::RwLock;
31use std::cmp::Ordering;
32use std::hash::Hash;
33use std::marker::PhantomData;
34
35/// Compression mode for property columns.
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
37pub enum CompressionMode {
38    /// Never compress - always use sparse HashMap (default).
39    #[default]
40    None,
41    /// Automatically compress when beneficial (after threshold).
42    Auto,
43    /// Eagerly compress on every flush.
44    Eager,
45}
46
47/// Threshold for automatic compression (number of values).
48const COMPRESSION_THRESHOLD: usize = 1000;
49
50/// Size of the hot buffer for recent writes (before compression).
51/// Larger buffer (4096) keeps more recent data uncompressed for faster reads.
52/// This trades ~64KB of memory overhead per column for 1.5-2x faster point lookups
53/// on recently-written data.
54const HOT_BUFFER_SIZE: usize = 4096;
55
56/// Comparison operators used for zone map predicate checks.
57///
58/// These map directly to GQL comparison operators like `=`, `<`, `>=`.
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum CompareOp {
61    /// Equal to value.
62    Eq,
63    /// Not equal to value.
64    Ne,
65    /// Less than value.
66    Lt,
67    /// Less than or equal to value.
68    Le,
69    /// Greater than value.
70    Gt,
71    /// Greater than or equal to value.
72    Ge,
73}
74
75/// Trait for IDs that can key into property storage.
76///
77/// Implemented for [`NodeId`] and [`EdgeId`] - you can store properties on both.
78/// Provides safe conversions to/from `u64` for compression, replacing unsafe transmute.
79pub trait EntityId: Copy + Eq + Hash + 'static {
80    /// Returns the raw `u64` value.
81    fn as_u64(self) -> u64;
82    /// Creates an ID from a raw `u64` value.
83    fn from_u64(v: u64) -> Self;
84}
85
86impl EntityId for NodeId {
87    #[inline]
88    fn as_u64(self) -> u64 {
89        self.0
90    }
91    #[inline]
92    fn from_u64(v: u64) -> Self {
93        Self(v)
94    }
95}
96
97impl EntityId for EdgeId {
98    #[inline]
99    fn as_u64(self) -> u64 {
100        self.0
101    }
102    #[inline]
103    fn from_u64(v: u64) -> Self {
104        Self(v)
105    }
106}
107
108/// Thread-safe columnar property storage.
109///
110/// Each property key ("name", "age", etc.) gets its own column. This layout
111/// is great for analytical queries that filter on specific properties -
112/// you only touch the columns you need.
113///
114/// Generic over `Id` so the same storage works for nodes and edges.
115///
116/// # Example
117///
118/// ```
119/// use grafeo_core::graph::lpg::PropertyStorage;
120/// use grafeo_common::types::{NodeId, PropertyKey};
121///
122/// let storage = PropertyStorage::new();
123/// let alix = NodeId::new(1);
124///
125/// storage.set(alix, PropertyKey::new("name"), "Alix".into());
126/// storage.set(alix, PropertyKey::new("age"), 30i64.into());
127///
128/// // Fetch all properties at once
129/// let props = storage.get_all(alix);
130/// assert_eq!(props.len(), 2);
131/// ```
132pub struct PropertyStorage<Id: EntityId = NodeId> {
133    /// Map from property key to column.
134    /// Lock order: 9 (nested, acquired via LpgStore::node_properties/edge_properties)
135    columns: RwLock<FxHashMap<PropertyKey, PropertyColumn<Id>>>,
136    /// Default compression mode for new columns.
137    default_compression: CompressionMode,
138    _marker: PhantomData<Id>,
139}
140
141impl<Id: EntityId> PropertyStorage<Id> {
142    /// Creates a new property storage.
143    #[must_use]
144    pub fn new() -> Self {
145        Self {
146            columns: RwLock::new(FxHashMap::default()),
147            default_compression: CompressionMode::None,
148            _marker: PhantomData,
149        }
150    }
151
152    /// Creates a new property storage with compression enabled.
153    #[must_use]
154    pub fn with_compression(mode: CompressionMode) -> Self {
155        Self {
156            columns: RwLock::new(FxHashMap::default()),
157            default_compression: mode,
158            _marker: PhantomData,
159        }
160    }
161
162    /// Sets the default compression mode for new columns.
163    pub fn set_default_compression(&mut self, mode: CompressionMode) {
164        self.default_compression = mode;
165    }
166
167    /// Sets a property value for an entity.
168    pub fn set(&self, id: Id, key: PropertyKey, value: Value) {
169        let mut columns = self.columns.write();
170        let mode = self.default_compression;
171        columns
172            .entry(key)
173            .or_insert_with(|| PropertyColumn::with_compression(mode))
174            .set(id, value);
175    }
176
177    /// Enables compression for a specific column.
178    pub fn enable_compression(&self, key: &PropertyKey, mode: CompressionMode) {
179        let mut columns = self.columns.write();
180        if let Some(col) = columns.get_mut(key) {
181            col.set_compression_mode(mode);
182        }
183    }
184
185    /// Compresses all columns that have compression enabled.
186    pub fn compress_all(&self) {
187        let mut columns = self.columns.write();
188        for col in columns.values_mut() {
189            if col.compression_mode() != CompressionMode::None {
190                col.compress();
191            }
192        }
193    }
194
195    /// Forces compression on all columns regardless of mode.
196    pub fn force_compress_all(&self) {
197        let mut columns = self.columns.write();
198        for col in columns.values_mut() {
199            col.force_compress();
200        }
201    }
202
203    /// Returns compression statistics for all columns.
204    #[must_use]
205    pub fn compression_stats(&self) -> FxHashMap<PropertyKey, CompressionStats> {
206        let columns = self.columns.read();
207        columns
208            .iter()
209            .map(|(key, col)| (key.clone(), col.compression_stats()))
210            .collect()
211    }
212
213    /// Returns the total memory usage of all columns (compressed size estimate).
214    #[must_use]
215    pub fn memory_usage(&self) -> usize {
216        let columns = self.columns.read();
217        columns
218            .values()
219            .map(|col| col.compression_stats().compressed_size)
220            .sum()
221    }
222
223    /// Returns estimated heap memory for all columns including hash map overhead.
224    #[must_use]
225    pub fn heap_memory_bytes(&self) -> usize {
226        let columns = self.columns.read();
227        // Outer hash map capacity
228        let map_overhead = columns.capacity()
229            * (std::mem::size_of::<PropertyKey>() + std::mem::size_of::<PropertyColumn<Id>>() + 1);
230        // Sum of all column heap memory
231        let column_bytes: usize = columns.values().map(|col| col.heap_memory_bytes()).sum();
232        map_overhead + column_bytes
233    }
234
235    /// Gets a property value for an entity.
236    #[must_use]
237    pub fn get(&self, id: Id, key: &PropertyKey) -> Option<Value> {
238        let columns = self.columns.read();
239        columns.get(key).and_then(|col| col.get(id))
240    }
241
242    /// Removes a property value for an entity.
243    pub fn remove(&self, id: Id, key: &PropertyKey) -> Option<Value> {
244        let mut columns = self.columns.write();
245        columns.get_mut(key).and_then(|col| col.remove(id))
246    }
247
248    /// Removes all properties for an entity.
249    pub fn remove_all(&self, id: Id) {
250        let mut columns = self.columns.write();
251        for col in columns.values_mut() {
252            col.remove(id);
253        }
254    }
255
256    /// Gets all properties for an entity.
257    #[must_use]
258    pub fn get_all(&self, id: Id) -> FxHashMap<PropertyKey, Value> {
259        let columns = self.columns.read();
260        let mut result = FxHashMap::default();
261        for (key, col) in columns.iter() {
262            if let Some(value) = col.get(id) {
263                result.insert(key.clone(), value);
264            }
265        }
266        result
267    }
268
269    /// Gets property values for multiple entities in a single lock acquisition.
270    ///
271    /// More efficient than calling [`Self::get`] in a loop because it acquires
272    /// the read lock only once.
273    ///
274    /// # Example
275    ///
276    /// ```
277    /// use grafeo_core::graph::lpg::PropertyStorage;
278    /// use grafeo_common::types::{PropertyKey, Value};
279    /// use grafeo_common::NodeId;
280    ///
281    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
282    /// let key = PropertyKey::new("age");
283    /// let ids = vec![NodeId(1), NodeId(2), NodeId(3)];
284    /// let values = storage.get_batch(&ids, &key);
285    /// // values[i] is the property value for ids[i], or None if not set
286    /// ```
287    #[must_use]
288    pub fn get_batch(&self, ids: &[Id], key: &PropertyKey) -> Vec<Option<Value>> {
289        let columns = self.columns.read();
290        match columns.get(key) {
291            Some(col) => ids.iter().map(|&id| col.get(id)).collect(),
292            None => vec![None; ids.len()],
293        }
294    }
295
296    /// Gets all properties for multiple entities efficiently.
297    ///
298    /// More efficient than calling [`Self::get_all`] in a loop because it
299    /// acquires the read lock only once.
300    ///
301    /// # Example
302    ///
303    /// ```
304    /// use grafeo_core::graph::lpg::PropertyStorage;
305    /// use grafeo_common::types::{PropertyKey, Value};
306    /// use grafeo_common::NodeId;
307    ///
308    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
309    /// let ids = vec![NodeId(1), NodeId(2)];
310    /// let all_props = storage.get_all_batch(&ids);
311    /// // all_props[i] is a HashMap of all properties for ids[i]
312    /// ```
313    #[must_use]
314    pub fn get_all_batch(&self, ids: &[Id]) -> Vec<FxHashMap<PropertyKey, Value>> {
315        let columns = self.columns.read();
316        let column_count = columns.len();
317
318        // Pre-allocate result vector with exact capacity (NebulaGraph pattern)
319        let mut results = Vec::with_capacity(ids.len());
320
321        for &id in ids {
322            // Pre-allocate HashMap with expected column count
323            let mut result = FxHashMap::with_capacity_and_hasher(column_count, Default::default());
324            for (key, col) in columns.iter() {
325                if let Some(value) = col.get(id) {
326                    result.insert(key.clone(), value);
327                }
328            }
329            results.push(result);
330        }
331
332        results
333    }
334
335    /// Gets selected properties for multiple entities efficiently (projection pushdown).
336    ///
337    /// This is more efficient than [`Self::get_all_batch`] when you only need a subset
338    /// of properties - it only iterates the requested columns instead of all columns.
339    ///
340    /// **Performance**: O(N × K) where N = ids.len() and K = keys.len(),
341    /// compared to O(N × C) for `get_all_batch` where C = total column count.
342    ///
343    /// # Example
344    ///
345    /// ```
346    /// use grafeo_core::graph::lpg::PropertyStorage;
347    /// use grafeo_common::types::{PropertyKey, Value};
348    /// use grafeo_common::NodeId;
349    ///
350    /// let storage: PropertyStorage<NodeId> = PropertyStorage::new();
351    /// let ids = vec![NodeId::new(1), NodeId::new(2)];
352    /// let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
353    ///
354    /// // Only fetches "name" and "age" columns, ignoring other properties
355    /// let props = storage.get_selective_batch(&ids, &keys);
356    /// ```
357    #[must_use]
358    pub fn get_selective_batch(
359        &self,
360        ids: &[Id],
361        keys: &[PropertyKey],
362    ) -> Vec<FxHashMap<PropertyKey, Value>> {
363        if keys.is_empty() {
364            // No properties requested - return empty maps
365            return vec![FxHashMap::default(); ids.len()];
366        }
367
368        let columns = self.columns.read();
369
370        // Pre-collect only the columns we need (avoids re-lookup per id)
371        let requested_columns: Vec<_> = keys
372            .iter()
373            .filter_map(|key| columns.get(key).map(|col| (key, col)))
374            .collect();
375
376        // Pre-allocate result with exact capacity
377        let mut results = Vec::with_capacity(ids.len());
378
379        for &id in ids {
380            let mut result =
381                FxHashMap::with_capacity_and_hasher(requested_columns.len(), Default::default());
382            // Only iterate requested columns, not all columns
383            for (key, col) in &requested_columns {
384                if let Some(value) = col.get(id) {
385                    result.insert((*key).clone(), value);
386                }
387            }
388            results.push(result);
389        }
390
391        results
392    }
393
394    /// Returns the number of property columns.
395    #[must_use]
396    pub fn column_count(&self) -> usize {
397        self.columns.read().len()
398    }
399
400    /// Returns the keys of all columns.
401    #[must_use]
402    pub fn keys(&self) -> Vec<PropertyKey> {
403        self.columns.read().keys().cloned().collect()
404    }
405
406    /// Removes all property data.
407    pub fn clear(&self) {
408        self.columns.write().clear();
409    }
410
411    /// Gets a column by key for bulk access.
412    #[must_use]
413    pub fn column(&self, key: &PropertyKey) -> Option<PropertyColumnRef<'_, Id>> {
414        let columns = self.columns.read();
415        if columns.contains_key(key) {
416            Some(PropertyColumnRef {
417                _guard: columns,
418                _key: key.clone(),
419                _marker: PhantomData,
420            })
421        } else {
422            None
423        }
424    }
425
426    /// Checks if a predicate might match any values (using zone maps).
427    ///
428    /// Returns `false` only when we're *certain* no values match - for example,
429    /// if you're looking for age > 100 but the max age is 80. Returns `true`
430    /// if the property doesn't exist (conservative - might match).
431    #[must_use]
432    pub fn might_match(&self, key: &PropertyKey, op: CompareOp, value: &Value) -> bool {
433        let columns = self.columns.read();
434        columns
435            .get(key)
436            .map_or(true, |col| col.might_match(op, value)) // No column = assume might match (conservative)
437    }
438
439    /// Gets the zone map for a property column.
440    #[must_use]
441    pub fn zone_map(&self, key: &PropertyKey) -> Option<ZoneMapEntry> {
442        let columns = self.columns.read();
443        columns.get(key).map(|col| col.zone_map().clone())
444    }
445
446    /// Checks if a range predicate might match any values (using zone maps).
447    ///
448    /// Returns `false` only when we're *certain* no values match the range.
449    /// Returns `true` if the property doesn't exist (conservative - might match).
450    #[must_use]
451    pub fn might_match_range(
452        &self,
453        key: &PropertyKey,
454        min: Option<&Value>,
455        max: Option<&Value>,
456        min_inclusive: bool,
457        max_inclusive: bool,
458    ) -> bool {
459        let columns = self.columns.read();
460        columns.get(key).map_or(true, |col| {
461            col.zone_map()
462                .might_contain_range(min, max, min_inclusive, max_inclusive)
463        }) // No column = assume might match (conservative)
464    }
465
466    /// Rebuilds zone maps for all columns (call after bulk removes).
467    pub fn rebuild_zone_maps(&self) {
468        let mut columns = self.columns.write();
469        for col in columns.values_mut() {
470            col.rebuild_zone_map();
471        }
472    }
473}
474
475impl<Id: EntityId> Default for PropertyStorage<Id> {
476    fn default() -> Self {
477        Self::new()
478    }
479}
480
481/// Compressed storage for a property column.
482///
483/// Holds the compressed representation of values along with the index
484/// mapping entity IDs to positions in the compressed array.
485#[derive(Debug)]
486pub enum CompressedColumnData {
487    /// Compressed integers (Int64 values).
488    Integers {
489        /// Compressed data.
490        data: CompressedData,
491        /// Index: entity ID position -> compressed array index.
492        id_to_index: Vec<u64>,
493        /// Reverse index: compressed array index -> entity ID position.
494        index_to_id: Vec<u64>,
495    },
496    /// Dictionary-encoded strings.
497    Strings {
498        /// Dictionary encoding.
499        encoding: DictionaryEncoding,
500        /// Index: entity ID position -> dictionary index.
501        id_to_index: Vec<u64>,
502        /// Reverse index: dictionary index -> entity ID position.
503        index_to_id: Vec<u64>,
504    },
505    /// Compressed booleans.
506    Booleans {
507        /// Compressed data.
508        data: CompressedData,
509        /// Index: entity ID position -> bit index.
510        id_to_index: Vec<u64>,
511        /// Reverse index: bit index -> entity ID position.
512        index_to_id: Vec<u64>,
513    },
514}
515
516impl CompressedColumnData {
517    /// Returns the memory usage of the compressed data in bytes.
518    #[must_use]
519    pub fn memory_usage(&self) -> usize {
520        match self {
521            CompressedColumnData::Integers {
522                data,
523                id_to_index,
524                index_to_id,
525            } => {
526                data.data.len()
527                    + id_to_index.len() * std::mem::size_of::<u64>()
528                    + index_to_id.len() * std::mem::size_of::<u64>()
529            }
530            CompressedColumnData::Strings {
531                encoding,
532                id_to_index,
533                index_to_id,
534            } => {
535                encoding.codes().len() * std::mem::size_of::<u32>()
536                    + encoding.dictionary().iter().map(|s| s.len()).sum::<usize>()
537                    + id_to_index.len() * std::mem::size_of::<u64>()
538                    + index_to_id.len() * std::mem::size_of::<u64>()
539            }
540            CompressedColumnData::Booleans {
541                data,
542                id_to_index,
543                index_to_id,
544            } => {
545                data.data.len()
546                    + id_to_index.len() * std::mem::size_of::<u64>()
547                    + index_to_id.len() * std::mem::size_of::<u64>()
548            }
549        }
550    }
551}
552
553/// Statistics about column compression.
554#[derive(Debug, Clone, Default)]
555pub struct CompressionStats {
556    /// Size of uncompressed data in bytes.
557    pub uncompressed_size: usize,
558    /// Size of compressed data in bytes.
559    pub compressed_size: usize,
560    /// Number of values in the column.
561    pub value_count: usize,
562    /// Codec used for compression.
563    pub codec: Option<CompressionCodec>,
564}
565
566impl CompressionStats {
567    /// Returns the compression ratio (uncompressed / compressed).
568    #[must_use]
569    pub fn compression_ratio(&self) -> f64 {
570        if self.compressed_size == 0 {
571            return 1.0;
572        }
573        self.uncompressed_size as f64 / self.compressed_size as f64
574    }
575}
576
577/// A single property column (e.g., all "age" values).
578///
579/// Maintains min/max/null_count for fast predicate evaluation. When you
580/// filter on `age > 50`, we first check if any age could possibly match
581/// before scanning the actual values.
582///
583/// Columns support optional compression for large datasets. When compression
584/// is enabled, the column automatically selects the best codec based on the
585/// data type and characteristics.
586pub struct PropertyColumn<Id: EntityId = NodeId> {
587    /// Sparse storage: entity ID -> value (hot buffer + uncompressed).
588    /// Used for recent writes and when compression is disabled.
589    values: FxHashMap<Id, Value>,
590    /// Zone map tracking min/max/null_count for predicate pushdown.
591    zone_map: ZoneMapEntry,
592    /// Whether zone map needs rebuild (after removes).
593    zone_map_dirty: bool,
594    /// Compression mode for this column.
595    compression_mode: CompressionMode,
596    /// Compressed data (when compression is enabled and triggered).
597    compressed: Option<CompressedColumnData>,
598    /// Number of values before last compression.
599    compressed_count: usize,
600}
601
602impl<Id: EntityId> PropertyColumn<Id> {
603    /// Creates a new empty column.
604    #[must_use]
605    pub fn new() -> Self {
606        Self {
607            values: FxHashMap::default(),
608            zone_map: ZoneMapEntry::new(),
609            zone_map_dirty: false,
610            compression_mode: CompressionMode::None,
611            compressed: None,
612            compressed_count: 0,
613        }
614    }
615
616    /// Creates a new column with the specified compression mode.
617    #[must_use]
618    pub fn with_compression(mode: CompressionMode) -> Self {
619        Self {
620            values: FxHashMap::default(),
621            zone_map: ZoneMapEntry::new(),
622            zone_map_dirty: false,
623            compression_mode: mode,
624            compressed: None,
625            compressed_count: 0,
626        }
627    }
628
629    /// Sets the compression mode for this column.
630    pub fn set_compression_mode(&mut self, mode: CompressionMode) {
631        self.compression_mode = mode;
632        if mode == CompressionMode::None {
633            // Decompress if switching to no compression
634            if self.compressed.is_some() {
635                self.decompress_all();
636            }
637        }
638    }
639
640    /// Returns the compression mode for this column.
641    #[must_use]
642    pub fn compression_mode(&self) -> CompressionMode {
643        self.compression_mode
644    }
645
646    /// Sets a value for an entity.
647    pub fn set(&mut self, id: Id, value: Value) {
648        // Update zone map incrementally
649        self.update_zone_map_on_insert(&value);
650        self.values.insert(id, value);
651
652        // Check if we should compress (in Auto mode)
653        if self.compression_mode == CompressionMode::Auto {
654            let total_count = self.values.len() + self.compressed_count;
655            let hot_buffer_count = self.values.len();
656
657            // Compress when hot buffer exceeds threshold and total is large enough
658            if hot_buffer_count >= HOT_BUFFER_SIZE && total_count >= COMPRESSION_THRESHOLD {
659                self.compress();
660            }
661        }
662    }
663
664    /// Updates zone map when inserting a value.
665    fn update_zone_map_on_insert(&mut self, value: &Value) {
666        self.zone_map.row_count += 1;
667
668        if matches!(value, Value::Null) {
669            self.zone_map.null_count += 1;
670            return;
671        }
672
673        // Update min
674        match &self.zone_map.min {
675            None => self.zone_map.min = Some(value.clone()),
676            Some(current) => {
677                if compare_values(value, current) == Some(Ordering::Less) {
678                    self.zone_map.min = Some(value.clone());
679                }
680            }
681        }
682
683        // Update max
684        match &self.zone_map.max {
685            None => self.zone_map.max = Some(value.clone()),
686            Some(current) => {
687                if compare_values(value, current) == Some(Ordering::Greater) {
688                    self.zone_map.max = Some(value.clone());
689                }
690            }
691        }
692    }
693
694    /// Gets a value for an entity.
695    ///
696    /// First checks the hot buffer (uncompressed values), then falls back
697    /// to the compressed data if present.
698    #[must_use]
699    pub fn get(&self, id: Id) -> Option<Value> {
700        // First check hot buffer
701        if let Some(value) = self.values.get(&id) {
702            return Some(value.clone());
703        }
704
705        // For now, compressed data lookup is not implemented for sparse access
706        // because the compressed format stores values by index, not by entity ID.
707        // This would require maintaining an ID -> index map in CompressedColumnData.
708        // The compressed data is primarily useful for bulk/scan operations.
709        None
710    }
711
712    /// Removes a value for an entity.
713    pub fn remove(&mut self, id: Id) -> Option<Value> {
714        let removed = self.values.remove(&id);
715        if removed.is_some() {
716            // Mark zone map as dirty - would need full rebuild for accurate min/max
717            self.zone_map_dirty = true;
718        }
719        removed
720    }
721
722    /// Returns the number of values in this column (hot + compressed).
723    #[must_use]
724    pub fn len(&self) -> usize {
725        self.values.len() + self.compressed_count
726    }
727
728    /// Returns true if this column is empty.
729    #[cfg(test)]
730    #[must_use]
731    pub fn is_empty(&self) -> bool {
732        self.values.is_empty() && self.compressed_count == 0
733    }
734
735    /// Returns compression statistics for this column.
736    #[must_use]
737    pub fn compression_stats(&self) -> CompressionStats {
738        let hot_size = self.values.len() * std::mem::size_of::<Value>();
739        let compressed_size = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
740        let codec = match &self.compressed {
741            Some(CompressedColumnData::Integers { data, .. }) => Some(data.codec),
742            Some(CompressedColumnData::Strings { .. }) => Some(CompressionCodec::Dictionary),
743            Some(CompressedColumnData::Booleans { data, .. }) => Some(data.codec),
744            None => None,
745        };
746
747        CompressionStats {
748            uncompressed_size: hot_size + self.compressed_count * std::mem::size_of::<Value>(),
749            compressed_size: hot_size + compressed_size,
750            value_count: self.len(),
751            codec,
752        }
753    }
754
755    /// Returns estimated heap memory for this column.
756    ///
757    /// Includes the hot buffer hash map capacity, zone map, and any
758    /// compressed data.
759    #[must_use]
760    pub fn heap_memory_bytes(&self) -> usize {
761        // Hot buffer: FxHashMap<Id, Value> capacity
762        let hot_bytes =
763            self.values.capacity() * (std::mem::size_of::<Id>() + std::mem::size_of::<Value>() + 1);
764        // Compressed data
765        let compressed_bytes = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
766        // ZoneMapEntry is inline (no heap), so just hot + compressed
767        hot_bytes + compressed_bytes
768    }
769
770    /// Returns whether the column has compressed data.
771    #[must_use]
772    #[cfg(test)]
773    pub fn is_compressed(&self) -> bool {
774        self.compressed.is_some()
775    }
776
777    /// Compresses the hot buffer values.
778    ///
779    /// This merges the hot buffer into the compressed data, selecting the
780    /// best codec based on the value types.
781    ///
782    /// Note: If compressed data already exists, this is a no-op to avoid
783    /// losing previously compressed values. Use `force_compress()` after
784    /// decompressing to re-compress with all values.
785    pub fn compress(&mut self) {
786        if self.values.is_empty() {
787            return;
788        }
789
790        // Don't re-compress if we already have compressed data
791        // (would need to decompress and merge first)
792        if self.compressed.is_some() {
793            return;
794        }
795
796        // Determine the dominant type
797        let (int_count, str_count, bool_count) = self.count_types();
798        let total = self.values.len();
799
800        if int_count > total / 2 {
801            self.compress_as_integers();
802        } else if str_count > total / 2 {
803            self.compress_as_strings();
804        } else if bool_count > total / 2 {
805            self.compress_as_booleans();
806        }
807        // If no dominant type, don't compress (mixed types don't compress well)
808    }
809
810    /// Counts values by type.
811    fn count_types(&self) -> (usize, usize, usize) {
812        let mut int_count = 0;
813        let mut str_count = 0;
814        let mut bool_count = 0;
815
816        for value in self.values.values() {
817            match value {
818                Value::Int64(_) => int_count += 1,
819                Value::String(_) => str_count += 1,
820                Value::Bool(_) => bool_count += 1,
821                _ => {}
822            }
823        }
824
825        (int_count, str_count, bool_count)
826    }
827
828    /// Compresses integer values.
829    fn compress_as_integers(&mut self) {
830        // Extract integer values and their IDs
831        let mut values: Vec<(u64, i64)> = Vec::new();
832        let mut non_int_values: FxHashMap<Id, Value> = FxHashMap::default();
833
834        for (&id, value) in &self.values {
835            match value {
836                Value::Int64(v) => {
837                    let id_u64 = id.as_u64();
838                    values.push((id_u64, *v));
839                }
840                _ => {
841                    non_int_values.insert(id, value.clone());
842                }
843            }
844        }
845
846        if values.len() < 8 {
847            // Not worth compressing
848            return;
849        }
850
851        // Sort by ID for better compression
852        values.sort_by_key(|(id, _)| *id);
853
854        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
855        let index_to_id: Vec<u64> = id_to_index.clone();
856        let int_values: Vec<i64> = values.iter().map(|(_, v)| *v).collect();
857
858        // Compress using the optimal codec
859        let compressed = TypeSpecificCompressor::compress_signed_integers(&int_values);
860
861        // Only use compression if it actually saves space
862        if compressed.compression_ratio() > 1.2 {
863            self.compressed = Some(CompressedColumnData::Integers {
864                data: compressed,
865                id_to_index,
866                index_to_id,
867            });
868            self.compressed_count = values.len();
869            self.values = non_int_values;
870        }
871    }
872
873    /// Compresses string values using dictionary encoding.
874    fn compress_as_strings(&mut self) {
875        let mut values: Vec<(u64, ArcStr)> = Vec::new();
876        let mut non_str_values: FxHashMap<Id, Value> = FxHashMap::default();
877
878        for (&id, value) in &self.values {
879            match value {
880                Value::String(s) => {
881                    values.push((id.as_u64(), s.clone()));
882                }
883                _ => {
884                    non_str_values.insert(id, value.clone());
885                }
886            }
887        }
888
889        if values.len() < 8 {
890            return;
891        }
892
893        // Sort by ID
894        values.sort_by_key(|(id, _)| *id);
895
896        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
897        let index_to_id: Vec<u64> = id_to_index.clone();
898
899        // Build dictionary
900        let mut builder = DictionaryBuilder::new();
901        for (_, s) in &values {
902            builder.add(s.as_ref());
903        }
904        let encoding = builder.build();
905
906        // Only use compression if it actually saves space
907        if encoding.compression_ratio() > 1.2 {
908            self.compressed = Some(CompressedColumnData::Strings {
909                encoding,
910                id_to_index,
911                index_to_id,
912            });
913            self.compressed_count = values.len();
914            self.values = non_str_values;
915        }
916    }
917
918    /// Compresses boolean values.
919    fn compress_as_booleans(&mut self) {
920        let mut values: Vec<(u64, bool)> = Vec::new();
921        let mut non_bool_values: FxHashMap<Id, Value> = FxHashMap::default();
922
923        for (&id, value) in &self.values {
924            match value {
925                Value::Bool(b) => {
926                    values.push((id.as_u64(), *b));
927                }
928                _ => {
929                    non_bool_values.insert(id, value.clone());
930                }
931            }
932        }
933
934        if values.len() < 8 {
935            return;
936        }
937
938        // Sort by ID
939        values.sort_by_key(|(id, _)| *id);
940
941        let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
942        let index_to_id: Vec<u64> = id_to_index.clone();
943        let bool_values: Vec<bool> = values.iter().map(|(_, v)| *v).collect();
944
945        let compressed = TypeSpecificCompressor::compress_booleans(&bool_values);
946
947        // Booleans always compress well (8x)
948        self.compressed = Some(CompressedColumnData::Booleans {
949            data: compressed,
950            id_to_index,
951            index_to_id,
952        });
953        self.compressed_count = values.len();
954        self.values = non_bool_values;
955    }
956
957    /// Decompresses all values back to the hot buffer.
958    fn decompress_all(&mut self) {
959        let Some(compressed) = self.compressed.take() else {
960            return;
961        };
962
963        match compressed {
964            CompressedColumnData::Integers {
965                data, index_to_id, ..
966            } => {
967                if let Ok(values) = TypeSpecificCompressor::decompress_integers(&data) {
968                    // Convert back to signed using zigzag decoding
969                    let signed: Vec<i64> = values
970                        .iter()
971                        .map(|&v| crate::storage::zigzag_decode(v))
972                        .collect();
973
974                    for (i, id_u64) in index_to_id.iter().enumerate() {
975                        if let Some(&value) = signed.get(i) {
976                            let id = Id::from_u64(*id_u64);
977                            self.values.insert(id, Value::Int64(value));
978                        }
979                    }
980                }
981            }
982            CompressedColumnData::Strings {
983                encoding,
984                index_to_id,
985                ..
986            } => {
987                for (i, id_u64) in index_to_id.iter().enumerate() {
988                    if let Some(s) = encoding.get(i) {
989                        let id = Id::from_u64(*id_u64);
990                        self.values.insert(id, Value::String(ArcStr::from(s)));
991                    }
992                }
993            }
994            CompressedColumnData::Booleans {
995                data, index_to_id, ..
996            } => {
997                if let Ok(values) = TypeSpecificCompressor::decompress_booleans(&data) {
998                    for (i, id_u64) in index_to_id.iter().enumerate() {
999                        if let Some(&value) = values.get(i) {
1000                            let id = Id::from_u64(*id_u64);
1001                            self.values.insert(id, Value::Bool(value));
1002                        }
1003                    }
1004                }
1005            }
1006        }
1007
1008        self.compressed_count = 0;
1009    }
1010
1011    /// Forces compression regardless of thresholds.
1012    ///
1013    /// Useful for bulk loading or when you know the column is complete.
1014    pub fn force_compress(&mut self) {
1015        self.compress();
1016    }
1017
1018    /// Returns the zone map for this column.
1019    #[must_use]
1020    pub fn zone_map(&self) -> &ZoneMapEntry {
1021        &self.zone_map
1022    }
1023
1024    /// Uses zone map to check if any values could satisfy the predicate.
1025    ///
1026    /// Returns `false` when we can prove no values match (so the column
1027    /// can be skipped entirely). Returns `true` if values might match.
1028    #[must_use]
1029    pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
1030        if self.zone_map_dirty {
1031            // Conservative: can't skip if zone map is stale
1032            return true;
1033        }
1034
1035        match op {
1036            CompareOp::Eq => self.zone_map.might_contain_equal(value),
1037            CompareOp::Ne => {
1038                // Can only skip if all values are equal to the value
1039                // (which means min == max == value)
1040                match (&self.zone_map.min, &self.zone_map.max) {
1041                    (Some(min), Some(max)) => {
1042                        !(compare_values(min, value) == Some(Ordering::Equal)
1043                            && compare_values(max, value) == Some(Ordering::Equal))
1044                    }
1045                    _ => true,
1046                }
1047            }
1048            CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
1049            CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
1050            CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
1051            CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
1052        }
1053    }
1054
1055    /// Rebuilds zone map from current values.
1056    pub fn rebuild_zone_map(&mut self) {
1057        let mut zone_map = ZoneMapEntry::new();
1058
1059        for value in self.values.values() {
1060            zone_map.row_count += 1;
1061
1062            if matches!(value, Value::Null) {
1063                zone_map.null_count += 1;
1064                continue;
1065            }
1066
1067            // Update min
1068            match &zone_map.min {
1069                None => zone_map.min = Some(value.clone()),
1070                Some(current) => {
1071                    if compare_values(value, current) == Some(Ordering::Less) {
1072                        zone_map.min = Some(value.clone());
1073                    }
1074                }
1075            }
1076
1077            // Update max
1078            match &zone_map.max {
1079                None => zone_map.max = Some(value.clone()),
1080                Some(current) => {
1081                    if compare_values(value, current) == Some(Ordering::Greater) {
1082                        zone_map.max = Some(value.clone());
1083                    }
1084                }
1085            }
1086        }
1087
1088        self.zone_map = zone_map;
1089        self.zone_map_dirty = false;
1090    }
1091}
1092
1093/// Compares two values for ordering.
1094fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
1095    match (a, b) {
1096        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
1097        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
1098        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
1099        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
1100        (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
1101        (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
1102        (Value::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
1103        (Value::Date(a), Value::Date(b)) => Some(a.cmp(b)),
1104        (Value::Time(a), Value::Time(b)) => Some(a.cmp(b)),
1105        _ => None,
1106    }
1107}
1108
1109impl<Id: EntityId> Default for PropertyColumn<Id> {
1110    fn default() -> Self {
1111        Self::new()
1112    }
1113}
1114
1115/// A borrowed reference to a property column for bulk reads.
1116///
1117/// Holds the read lock so the column can't change while you're iterating.
1118pub struct PropertyColumnRef<'a, Id: EntityId = NodeId> {
1119    _guard: parking_lot::RwLockReadGuard<'a, FxHashMap<PropertyKey, PropertyColumn<Id>>>,
1120    _key: PropertyKey,
1121    _marker: PhantomData<Id>,
1122}
1123
1124#[cfg(test)]
1125mod tests {
1126    use super::*;
1127    use arcstr::ArcStr;
1128
1129    #[test]
1130    fn test_property_storage_basic() {
1131        let storage = PropertyStorage::new();
1132
1133        let node1 = NodeId::new(1);
1134        let node2 = NodeId::new(2);
1135        let name_key = PropertyKey::new("name");
1136        let age_key = PropertyKey::new("age");
1137
1138        storage.set(node1, name_key.clone(), "Alix".into());
1139        storage.set(node1, age_key.clone(), 30i64.into());
1140        storage.set(node2, name_key.clone(), "Gus".into());
1141
1142        assert_eq!(
1143            storage.get(node1, &name_key),
1144            Some(Value::String("Alix".into()))
1145        );
1146        assert_eq!(storage.get(node1, &age_key), Some(Value::Int64(30)));
1147        assert_eq!(
1148            storage.get(node2, &name_key),
1149            Some(Value::String("Gus".into()))
1150        );
1151        assert!(storage.get(node2, &age_key).is_none());
1152    }
1153
1154    #[test]
1155    fn test_property_storage_remove() {
1156        let storage = PropertyStorage::new();
1157
1158        let node = NodeId::new(1);
1159        let key = PropertyKey::new("name");
1160
1161        storage.set(node, key.clone(), "Alix".into());
1162        assert!(storage.get(node, &key).is_some());
1163
1164        let removed = storage.remove(node, &key);
1165        assert!(removed.is_some());
1166        assert!(storage.get(node, &key).is_none());
1167    }
1168
1169    #[test]
1170    fn test_property_storage_get_all() {
1171        let storage = PropertyStorage::new();
1172
1173        let node = NodeId::new(1);
1174        storage.set(node, PropertyKey::new("name"), "Alix".into());
1175        storage.set(node, PropertyKey::new("age"), 30i64.into());
1176        storage.set(node, PropertyKey::new("active"), true.into());
1177
1178        let props = storage.get_all(node);
1179        assert_eq!(props.len(), 3);
1180    }
1181
1182    #[test]
1183    fn test_property_storage_remove_all() {
1184        let storage = PropertyStorage::new();
1185
1186        let node = NodeId::new(1);
1187        storage.set(node, PropertyKey::new("name"), "Alix".into());
1188        storage.set(node, PropertyKey::new("age"), 30i64.into());
1189
1190        storage.remove_all(node);
1191
1192        assert!(storage.get(node, &PropertyKey::new("name")).is_none());
1193        assert!(storage.get(node, &PropertyKey::new("age")).is_none());
1194    }
1195
1196    #[test]
1197    fn test_property_column() {
1198        let mut col = PropertyColumn::new();
1199
1200        col.set(NodeId::new(1), "Alix".into());
1201        col.set(NodeId::new(2), "Gus".into());
1202
1203        assert_eq!(col.len(), 2);
1204        assert!(!col.is_empty());
1205
1206        assert_eq!(col.get(NodeId::new(1)), Some(Value::String("Alix".into())));
1207
1208        col.remove(NodeId::new(1));
1209        assert!(col.get(NodeId::new(1)).is_none());
1210        assert_eq!(col.len(), 1);
1211    }
1212
1213    #[test]
1214    fn test_compression_mode() {
1215        let col: PropertyColumn<NodeId> = PropertyColumn::new();
1216        assert_eq!(col.compression_mode(), CompressionMode::None);
1217
1218        let col: PropertyColumn<NodeId> = PropertyColumn::with_compression(CompressionMode::Auto);
1219        assert_eq!(col.compression_mode(), CompressionMode::Auto);
1220    }
1221
1222    #[test]
1223    fn test_property_storage_with_compression() {
1224        let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1225
1226        for i in 0..100 {
1227            storage.set(
1228                NodeId::new(i),
1229                PropertyKey::new("age"),
1230                Value::Int64(20 + (i as i64 % 50)),
1231            );
1232        }
1233
1234        // Values should still be readable
1235        assert_eq!(
1236            storage.get(NodeId::new(0), &PropertyKey::new("age")),
1237            Some(Value::Int64(20))
1238        );
1239        assert_eq!(
1240            storage.get(NodeId::new(50), &PropertyKey::new("age")),
1241            Some(Value::Int64(20))
1242        );
1243    }
1244
1245    #[test]
1246    fn test_compress_integer_column() {
1247        let mut col: PropertyColumn<NodeId> =
1248            PropertyColumn::with_compression(CompressionMode::Auto);
1249
1250        // Add many sequential integers
1251        for i in 0..2000 {
1252            col.set(NodeId::new(i), Value::Int64(1000 + i as i64));
1253        }
1254
1255        // Should have triggered compression at some point
1256        // Total count should include both compressed and hot buffer values
1257        let stats = col.compression_stats();
1258        assert_eq!(stats.value_count, 2000);
1259
1260        // Values from the hot buffer should be readable
1261        // Note: Compressed values are not accessible via get() - see design note
1262        let last_value = col.get(NodeId::new(1999));
1263        assert!(last_value.is_some() || col.is_compressed());
1264    }
1265
1266    #[test]
1267    fn test_compress_string_column() {
1268        let mut col: PropertyColumn<NodeId> =
1269            PropertyColumn::with_compression(CompressionMode::Auto);
1270
1271        // Add repeated strings (good for dictionary compression)
1272        let categories = ["Person", "Company", "Product", "Location"];
1273        for i in 0..2000 {
1274            let cat = categories[i % 4];
1275            col.set(NodeId::new(i as u64), Value::String(ArcStr::from(cat)));
1276        }
1277
1278        // Total count should be correct
1279        assert_eq!(col.len(), 2000);
1280
1281        // Late values should be in hot buffer and readable
1282        let last_value = col.get(NodeId::new(1999));
1283        assert!(last_value.is_some() || col.is_compressed());
1284    }
1285
1286    #[test]
1287    fn test_compress_boolean_column() {
1288        let mut col: PropertyColumn<NodeId> =
1289            PropertyColumn::with_compression(CompressionMode::Auto);
1290
1291        // Add booleans
1292        for i in 0..2000 {
1293            col.set(NodeId::new(i as u64), Value::Bool(i % 2 == 0));
1294        }
1295
1296        // Verify total count
1297        assert_eq!(col.len(), 2000);
1298
1299        // Late values should be readable
1300        let last_value = col.get(NodeId::new(1999));
1301        assert!(last_value.is_some() || col.is_compressed());
1302    }
1303
1304    #[test]
1305    fn test_force_compress() {
1306        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1307
1308        // Add fewer values than the threshold
1309        for i in 0..100 {
1310            col.set(NodeId::new(i), Value::Int64(i as i64));
1311        }
1312
1313        // Force compression
1314        col.force_compress();
1315
1316        // Stats should show compression was applied if beneficial
1317        let stats = col.compression_stats();
1318        assert_eq!(stats.value_count, 100);
1319    }
1320
1321    #[test]
1322    fn test_compression_stats() {
1323        let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1324
1325        for i in 0..50 {
1326            col.set(NodeId::new(i), Value::Int64(i as i64));
1327        }
1328
1329        let stats = col.compression_stats();
1330        assert_eq!(stats.value_count, 50);
1331        assert!(stats.uncompressed_size > 0);
1332    }
1333
1334    #[test]
1335    fn test_storage_compression_stats() {
1336        let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1337
1338        for i in 0..100 {
1339            storage.set(
1340                NodeId::new(i),
1341                PropertyKey::new("age"),
1342                Value::Int64(i as i64),
1343            );
1344            storage.set(
1345                NodeId::new(i),
1346                PropertyKey::new("name"),
1347                Value::String(ArcStr::from("Alix")),
1348            );
1349        }
1350
1351        let stats = storage.compression_stats();
1352        assert_eq!(stats.len(), 2); // Two columns
1353        assert!(stats.contains_key(&PropertyKey::new("age")));
1354        assert!(stats.contains_key(&PropertyKey::new("name")));
1355    }
1356
1357    #[test]
1358    fn test_memory_usage() {
1359        let storage = PropertyStorage::new();
1360
1361        for i in 0..100 {
1362            storage.set(
1363                NodeId::new(i),
1364                PropertyKey::new("value"),
1365                Value::Int64(i as i64),
1366            );
1367        }
1368
1369        let usage = storage.memory_usage();
1370        assert!(usage > 0);
1371    }
1372
1373    #[test]
1374    fn test_get_batch_single_property() {
1375        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1376
1377        let node1 = NodeId::new(1);
1378        let node2 = NodeId::new(2);
1379        let node3 = NodeId::new(3);
1380        let age_key = PropertyKey::new("age");
1381
1382        storage.set(node1, age_key.clone(), 25i64.into());
1383        storage.set(node2, age_key.clone(), 30i64.into());
1384        // node3 has no age property
1385
1386        let ids = vec![node1, node2, node3];
1387        let values = storage.get_batch(&ids, &age_key);
1388
1389        assert_eq!(values.len(), 3);
1390        assert_eq!(values[0], Some(Value::Int64(25)));
1391        assert_eq!(values[1], Some(Value::Int64(30)));
1392        assert_eq!(values[2], None);
1393    }
1394
1395    #[test]
1396    fn test_get_batch_missing_column() {
1397        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1398
1399        let node1 = NodeId::new(1);
1400        let node2 = NodeId::new(2);
1401        let missing_key = PropertyKey::new("nonexistent");
1402
1403        let ids = vec![node1, node2];
1404        let values = storage.get_batch(&ids, &missing_key);
1405
1406        assert_eq!(values.len(), 2);
1407        assert_eq!(values[0], None);
1408        assert_eq!(values[1], None);
1409    }
1410
1411    #[test]
1412    fn test_get_batch_empty_ids() {
1413        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1414        let key = PropertyKey::new("any");
1415
1416        let values = storage.get_batch(&[], &key);
1417        assert!(values.is_empty());
1418    }
1419
1420    #[test]
1421    fn test_get_all_batch() {
1422        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1423
1424        let node1 = NodeId::new(1);
1425        let node2 = NodeId::new(2);
1426        let node3 = NodeId::new(3);
1427
1428        storage.set(node1, PropertyKey::new("name"), "Alix".into());
1429        storage.set(node1, PropertyKey::new("age"), 25i64.into());
1430        storage.set(node2, PropertyKey::new("name"), "Gus".into());
1431        // node3 has no properties
1432
1433        let ids = vec![node1, node2, node3];
1434        let all_props = storage.get_all_batch(&ids);
1435
1436        assert_eq!(all_props.len(), 3);
1437        assert_eq!(all_props[0].len(), 2); // name and age
1438        assert_eq!(all_props[1].len(), 1); // name only
1439        assert_eq!(all_props[2].len(), 0); // no properties
1440
1441        assert_eq!(
1442            all_props[0].get(&PropertyKey::new("name")),
1443            Some(&Value::String("Alix".into()))
1444        );
1445        assert_eq!(
1446            all_props[1].get(&PropertyKey::new("name")),
1447            Some(&Value::String("Gus".into()))
1448        );
1449    }
1450
1451    #[test]
1452    fn test_get_all_batch_empty_ids() {
1453        let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1454
1455        let all_props = storage.get_all_batch(&[]);
1456        assert!(all_props.is_empty());
1457    }
1458}