1use crate::codec::CompressionCodec;
24#[cfg(not(feature = "temporal"))]
25use crate::codec::block::DEFAULT_BLOCK_ROWS;
26#[cfg(not(feature = "temporal"))]
27use crate::codec::{CompressedData, DictionaryBuilder, DictionaryEncoding, TypeSpecificCompressor};
28use crate::index::zone_map::ZoneMapEntry;
29#[cfg(not(feature = "temporal"))]
30use arcstr::ArcStr;
31#[cfg(feature = "temporal")]
32use grafeo_common::temporal::VersionLog;
33#[cfg(feature = "temporal")]
34use grafeo_common::types::EpochId;
35use grafeo_common::types::{EdgeId, NodeId, PropertyKey, Value};
36use grafeo_common::utils::hash::FxHashMap;
37use parking_lot::RwLock;
38use std::cmp::Ordering;
39use std::hash::Hash;
40use std::marker::PhantomData;
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
44#[non_exhaustive]
45pub enum CompressionMode {
46 #[default]
48 None,
49 Auto,
51 Eager,
53}
54
55#[cfg(not(feature = "temporal"))]
57const COMPRESSION_THRESHOLD: usize = 1000;
58
59#[cfg(not(feature = "temporal"))]
64const HOT_BUFFER_SIZE: usize = 4096;
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70#[non_exhaustive]
71pub enum CompareOp {
72 Eq,
74 Ne,
76 Lt,
78 Le,
80 Gt,
82 Ge,
84}
85
86pub trait EntityId: Copy + Eq + Hash + 'static {
91 fn as_u64(self) -> u64;
93 fn from_u64(v: u64) -> Self;
95}
96
97impl EntityId for NodeId {
98 #[inline]
99 fn as_u64(self) -> u64 {
100 self.0
101 }
102 #[inline]
103 fn from_u64(v: u64) -> Self {
104 Self(v)
105 }
106}
107
108impl EntityId for EdgeId {
109 #[inline]
110 fn as_u64(self) -> u64 {
111 self.0
112 }
113 #[inline]
114 fn from_u64(v: u64) -> Self {
115 Self(v)
116 }
117}
118
119pub struct PropertyStorage<Id: EntityId = NodeId> {
147 columns: RwLock<FxHashMap<PropertyKey, PropertyColumn<Id>>>,
150 default_compression: CompressionMode,
152 _marker: PhantomData<Id>,
153}
154
155impl<Id: EntityId> PropertyStorage<Id> {
156 #[must_use]
158 pub fn new() -> Self {
159 Self {
160 columns: RwLock::new(FxHashMap::default()),
161 default_compression: CompressionMode::None,
162 _marker: PhantomData,
163 }
164 }
165
166 #[must_use]
168 pub fn with_compression(mode: CompressionMode) -> Self {
169 Self {
170 columns: RwLock::new(FxHashMap::default()),
171 default_compression: mode,
172 _marker: PhantomData,
173 }
174 }
175
176 pub fn set_default_compression(&mut self, mode: CompressionMode) {
178 self.default_compression = mode;
179 }
180
181 #[cfg(not(feature = "temporal"))]
183 pub fn set(&self, id: Id, key: PropertyKey, value: Value) {
184 let mut columns = self.columns.write();
185 let mode = self.default_compression;
186 columns
187 .entry(key)
188 .or_insert_with(|| PropertyColumn::with_compression(mode))
189 .set(id, value);
190 }
191
192 #[cfg(feature = "temporal")]
197 pub fn set(&self, id: Id, key: PropertyKey, value: Value, epoch: EpochId) {
198 let mut columns = self.columns.write();
199 let mode = self.default_compression;
200 columns
201 .entry(key)
202 .or_insert_with(|| PropertyColumn::with_compression(mode))
203 .set(id, value, epoch);
204 }
205
206 pub fn enable_compression(&self, key: &PropertyKey, mode: CompressionMode) {
208 let mut columns = self.columns.write();
209 if let Some(col) = columns.get_mut(key) {
210 col.set_compression_mode(mode);
211 }
212 }
213
214 pub fn compress_all(&self) {
216 let mut columns = self.columns.write();
217 for col in columns.values_mut() {
218 if col.compression_mode() != CompressionMode::None {
219 col.compress();
220 }
221 }
222 }
223
224 pub fn force_compress_all(&self) {
226 let mut columns = self.columns.write();
227 for col in columns.values_mut() {
228 col.force_compress();
229 }
230 }
231
232 #[must_use]
234 pub fn compression_stats(&self) -> FxHashMap<PropertyKey, CompressionStats> {
235 let columns = self.columns.read();
236 columns
237 .iter()
238 .map(|(key, col)| (key.clone(), col.compression_stats()))
239 .collect()
240 }
241
242 #[must_use]
244 pub fn memory_usage(&self) -> usize {
245 let columns = self.columns.read();
246 columns
247 .values()
248 .map(|col| col.compression_stats().compressed_size)
249 .sum()
250 }
251
252 #[must_use]
254 pub fn heap_memory_bytes(&self) -> usize {
255 let columns = self.columns.read();
256 let map_overhead = columns.capacity()
258 * (std::mem::size_of::<PropertyKey>() + std::mem::size_of::<PropertyColumn<Id>>() + 1);
259 let column_bytes: usize = columns.values().map(|col| col.heap_memory_bytes()).sum();
261 map_overhead + column_bytes
262 }
263
264 #[must_use]
266 pub fn get(&self, id: Id, key: &PropertyKey) -> Option<Value> {
267 let columns = self.columns.read();
268 columns.get(key).and_then(|col| col.get(id))
269 }
270
271 #[cfg(not(feature = "temporal"))]
273 pub fn remove(&self, id: Id, key: &PropertyKey) -> Option<Value> {
274 let mut columns = self.columns.write();
275 columns.get_mut(key).and_then(|col| col.remove(id))
276 }
277
278 #[cfg(feature = "temporal")]
280 pub fn remove(&self, id: Id, key: &PropertyKey, epoch: EpochId) -> Option<Value> {
281 let mut columns = self.columns.write();
282 columns.get_mut(key).and_then(|col| col.remove(id, epoch))
283 }
284
285 #[cfg(not(feature = "temporal"))]
287 pub fn remove_all(&self, id: Id) {
288 let mut columns = self.columns.write();
289 for col in columns.values_mut() {
290 col.remove(id);
291 }
292 }
293
294 #[cfg(feature = "temporal")]
296 pub fn remove_all(&self, id: Id, epoch: EpochId) {
297 let mut columns = self.columns.write();
298 for col in columns.values_mut() {
299 col.remove(id, epoch);
300 }
301 }
302
303 #[must_use]
305 pub fn get_all(&self, id: Id) -> FxHashMap<PropertyKey, Value> {
306 let columns = self.columns.read();
307 let mut result = FxHashMap::default();
308 for (key, col) in columns.iter() {
309 if let Some(value) = col.get(id) {
310 result.insert(key.clone(), value);
311 }
312 }
313 result
314 }
315
316 #[must_use]
335 pub fn get_batch(&self, ids: &[Id], key: &PropertyKey) -> Vec<Option<Value>> {
336 let columns = self.columns.read();
337 match columns.get(key) {
338 Some(col) => ids.iter().map(|&id| col.get(id)).collect(),
339 None => vec![None; ids.len()],
340 }
341 }
342
343 #[must_use]
361 pub fn get_all_batch(&self, ids: &[Id]) -> Vec<FxHashMap<PropertyKey, Value>> {
362 let columns = self.columns.read();
363 let column_count = columns.len();
364
365 let mut results = Vec::with_capacity(ids.len());
367
368 for &id in ids {
369 let mut result = FxHashMap::with_capacity_and_hasher(column_count, Default::default());
371 for (key, col) in columns.iter() {
372 if let Some(value) = col.get(id) {
373 result.insert(key.clone(), value);
374 }
375 }
376 results.push(result);
377 }
378
379 results
380 }
381
382 #[must_use]
405 pub fn get_selective_batch(
406 &self,
407 ids: &[Id],
408 keys: &[PropertyKey],
409 ) -> Vec<FxHashMap<PropertyKey, Value>> {
410 if keys.is_empty() {
411 return vec![FxHashMap::default(); ids.len()];
413 }
414
415 let columns = self.columns.read();
416
417 let requested_columns: Vec<_> = keys
419 .iter()
420 .filter_map(|key| columns.get(key).map(|col| (key, col)))
421 .collect();
422
423 let mut results = Vec::with_capacity(ids.len());
425
426 for &id in ids {
427 let mut result =
428 FxHashMap::with_capacity_and_hasher(requested_columns.len(), Default::default());
429 for (key, col) in &requested_columns {
431 if let Some(value) = col.get(id) {
432 result.insert((*key).clone(), value);
433 }
434 }
435 results.push(result);
436 }
437
438 results
439 }
440
441 #[must_use]
443 pub fn column_count(&self) -> usize {
444 self.columns.read().len()
445 }
446
447 #[must_use]
449 pub fn keys(&self) -> Vec<PropertyKey> {
450 self.columns.read().keys().cloned().collect()
451 }
452
453 pub fn clear(&self) {
455 self.columns.write().clear();
456 }
457
458 #[cfg(not(feature = "temporal"))]
465 pub fn evict_column(&self, key: &PropertyKey) -> (usize, usize) {
466 let mut columns = self.columns.write();
467 if let Some(column) = columns.get_mut(key) {
468 column.evict_values()
469 } else {
470 (0, 0)
471 }
472 }
473
474 #[cfg(not(feature = "temporal"))]
479 pub fn restore_column(&self, key: &PropertyKey, values: impl Iterator<Item = (Id, Value)>) {
480 let mut columns = self.columns.write();
481 let column = columns
482 .entry(key.clone())
483 .or_insert_with(|| PropertyColumn::with_compression(self.default_compression));
484 column.restore_values(values);
485 }
486
487 #[cfg(not(feature = "temporal"))]
492 pub fn drain_column(&self, key: &PropertyKey) -> Vec<(Id, Value)> {
493 let mut columns = self.columns.write();
494 if let Some(column) = columns.get_mut(key) {
495 column.drain_values()
496 } else {
497 Vec::new()
498 }
499 }
500
501 #[cfg(not(feature = "temporal"))]
503 #[must_use]
504 pub fn is_column_spilled(&self, key: &PropertyKey) -> bool {
505 self.columns
506 .read()
507 .get(key)
508 .is_some_and(|col| col.is_spilled())
509 }
510
511 #[cfg(not(feature = "temporal"))]
516 pub fn mark_column_spilled(&self, key: &PropertyKey) {
517 let mut columns = self.columns.write();
518 let column = columns.entry(key.clone()).or_default();
519 column.mark_spilled();
520 }
521
522 #[must_use]
524 pub fn column(&self, key: &PropertyKey) -> Option<PropertyColumnRef<'_, Id>> {
525 let columns = self.columns.read();
526 if columns.contains_key(key) {
527 Some(PropertyColumnRef {
528 _guard: columns,
529 _key: key.clone(),
530 _marker: PhantomData,
531 })
532 } else {
533 None
534 }
535 }
536
537 #[must_use]
543 pub fn might_match(&self, key: &PropertyKey, op: CompareOp, value: &Value) -> bool {
544 let columns = self.columns.read();
545 columns
546 .get(key)
547 .map_or(true, |col| col.might_match(op, value)) }
549
550 #[must_use]
552 pub fn zone_map(&self, key: &PropertyKey) -> Option<ZoneMapEntry> {
553 let columns = self.columns.read();
554 columns.get(key).map(|col| col.zone_map().clone())
555 }
556
557 #[must_use]
570 pub fn block_zone_maps_for(&self, key: &PropertyKey) -> Option<Vec<ZoneMapEntry>> {
571 let columns = self.columns.read();
572 columns.get(key).map(|col| col.block_zone_maps().to_vec())
573 }
574
575 #[cfg(not(feature = "temporal"))]
580 #[must_use]
581 pub fn block_count_for(&self, key: &PropertyKey) -> Option<usize> {
582 self.columns.read().get(key).map(|col| col.block_count())
583 }
584
585 #[cfg(not(feature = "temporal"))]
591 #[must_use]
592 pub fn decode_block_for(
593 &self,
594 key: &PropertyKey,
595 block_idx: usize,
596 ) -> Option<DecodedBlock<Id>> {
597 self.columns
598 .read()
599 .get(key)
600 .and_then(|col| col.decode_block(block_idx))
601 }
602
603 #[cfg(not(feature = "temporal"))]
612 #[must_use]
613 pub fn decoded_blocks_for(&self, key: &PropertyKey) -> Vec<DecodedBlock<Id>> {
614 let columns = self.columns.read();
615 match columns.get(key) {
616 Some(col) => col.iter_decoded_blocks().collect(),
617 None => Vec::new(),
618 }
619 }
620
621 #[must_use]
626 pub fn might_match_range(
627 &self,
628 key: &PropertyKey,
629 min: Option<&Value>,
630 max: Option<&Value>,
631 min_inclusive: bool,
632 max_inclusive: bool,
633 ) -> bool {
634 let columns = self.columns.read();
635 columns.get(key).map_or(true, |col| {
636 col.zone_map()
637 .might_contain_range(min, max, min_inclusive, max_inclusive)
638 }) }
640
641 pub fn rebuild_zone_maps(&self) {
643 let mut columns = self.columns.write();
644 for col in columns.values_mut() {
645 col.rebuild_zone_map();
646 }
647 }
648}
649
650impl<Id: EntityId> Default for PropertyStorage<Id> {
651 fn default() -> Self {
652 Self::new()
653 }
654}
655
656#[cfg(feature = "temporal")]
658impl<Id: EntityId> PropertyStorage<Id> {
659 pub(crate) fn columns_write(
661 &self,
662 ) -> parking_lot::RwLockWriteGuard<'_, FxHashMap<PropertyKey, PropertyColumn<Id>>> {
663 self.columns.write()
664 }
665
666 #[must_use]
668 pub fn get_at(&self, id: Id, key: &PropertyKey, epoch: EpochId) -> Option<Value> {
669 let columns = self.columns.read();
670 columns.get(key).and_then(|col| col.get_at(id, epoch))
671 }
672
673 #[must_use]
675 pub fn get_all_at(&self, id: Id, epoch: EpochId) -> FxHashMap<PropertyKey, Value> {
676 let columns = self.columns.read();
677 let mut result = FxHashMap::default();
678 for (key, col) in columns.iter() {
679 if let Some(value) = col.get_at(id, epoch) {
680 result.insert(key.clone(), value);
681 }
682 }
683 result
684 }
685
686 pub fn finalize_pending(&self, real_epoch: EpochId) {
688 let mut columns = self.columns.write();
689 for col in columns.values_mut() {
690 col.finalize_pending(real_epoch);
691 }
692 }
693
694 pub fn remove_pending(&self) {
696 let mut columns = self.columns.write();
697 for col in columns.values_mut() {
698 col.remove_pending();
699 }
700 }
701
702 pub fn gc(&self, min_epoch: EpochId) {
704 let mut columns = self.columns.write();
705 for col in columns.values_mut() {
706 col.gc(min_epoch);
707 }
708 }
709
710 #[must_use]
715 pub fn get_all_history(&self, id: Id) -> Vec<(PropertyKey, Vec<(EpochId, Value)>)> {
716 let columns = self.columns.read();
717 let mut result = Vec::new();
718 for (key, col) in columns.iter() {
719 if let Some(log) = col.values.get(&id) {
720 let entries: Vec<(EpochId, Value)> = log
721 .history()
722 .iter()
723 .map(|(epoch, value)| (*epoch, value.clone()))
724 .collect();
725 if !entries.is_empty() {
726 result.push((key.clone(), entries));
727 }
728 }
729 }
730 result
731 }
732
733 #[must_use]
737 pub fn get_history(&self, id: Id, key: &PropertyKey) -> Vec<(EpochId, Value)> {
738 let columns = self.columns.read();
739 columns
740 .get(key)
741 .and_then(|col| col.values.get(&id))
742 .map(|log| log.history().iter().map(|(e, v)| (*e, v.clone())).collect())
743 .unwrap_or_default()
744 }
745}
746
747#[cfg(not(feature = "temporal"))]
752#[derive(Debug)]
753#[non_exhaustive]
754pub enum CompressedColumnData {
755 Integers {
757 data: CompressedData,
759 id_to_index: Vec<u64>,
761 index_to_id: Vec<u64>,
763 },
764 Strings {
766 encoding: DictionaryEncoding,
768 id_to_index: Vec<u64>,
770 index_to_id: Vec<u64>,
772 },
773 Booleans {
775 data: CompressedData,
777 id_to_index: Vec<u64>,
779 index_to_id: Vec<u64>,
781 },
782}
783
784#[cfg(not(feature = "temporal"))]
785impl CompressedColumnData {
786 #[must_use]
788 pub fn memory_usage(&self) -> usize {
789 match self {
790 CompressedColumnData::Integers {
791 data,
792 id_to_index,
793 index_to_id,
794 } => {
795 data.data.len()
796 + id_to_index.len() * std::mem::size_of::<u64>()
797 + index_to_id.len() * std::mem::size_of::<u64>()
798 }
799 CompressedColumnData::Strings {
800 encoding,
801 id_to_index,
802 index_to_id,
803 } => {
804 encoding.code_count() * 4
805 + encoding.dictionary().iter().map(|s| s.len()).sum::<usize>()
806 + id_to_index.len() * std::mem::size_of::<u64>()
807 + index_to_id.len() * std::mem::size_of::<u64>()
808 }
809 CompressedColumnData::Booleans {
810 data,
811 id_to_index,
812 index_to_id,
813 } => {
814 data.data.len()
815 + id_to_index.len() * std::mem::size_of::<u64>()
816 + index_to_id.len() * std::mem::size_of::<u64>()
817 }
818 }
819 }
820}
821
822#[cfg(not(feature = "temporal"))]
828#[derive(Debug, Clone)]
829pub struct DecodedBlock<Id: EntityId> {
830 pub zone_map: ZoneMapEntry,
833 pub entries: Vec<(Id, Value)>,
835}
836
837#[derive(Debug, Clone, Default)]
839pub struct CompressionStats {
840 pub uncompressed_size: usize,
842 pub compressed_size: usize,
844 pub value_count: usize,
846 pub codec: Option<CompressionCodec>,
848}
849
850impl CompressionStats {
851 #[must_use]
853 pub fn compression_ratio(&self) -> f64 {
854 if self.compressed_size == 0 {
855 return 1.0;
856 }
857 self.uncompressed_size as f64 / self.compressed_size as f64
858 }
859}
860
861pub struct PropertyColumn<Id: EntityId = NodeId> {
871 #[cfg(not(feature = "temporal"))]
874 values: FxHashMap<Id, Value>,
875 #[cfg(feature = "temporal")]
878 values: FxHashMap<Id, VersionLog<Value>>,
879 zone_map: ZoneMapEntry,
881 zone_map_dirty: bool,
883 compression_mode: CompressionMode,
885 #[cfg(not(feature = "temporal"))]
887 compressed: Option<CompressedColumnData>,
888 #[cfg(not(feature = "temporal"))]
890 compressed_count: usize,
891 #[cfg(not(feature = "temporal"))]
896 spilled: bool,
897 block_zone_maps: Vec<ZoneMapEntry>,
905}
906
907#[cfg(not(feature = "temporal"))]
908impl<Id: EntityId> PropertyColumn<Id> {
909 #[must_use]
911 pub fn new() -> Self {
912 Self {
913 values: FxHashMap::default(),
914 zone_map: ZoneMapEntry::new(),
915 zone_map_dirty: false,
916 compression_mode: CompressionMode::None,
917 compressed: None,
918 compressed_count: 0,
919 spilled: false,
920 block_zone_maps: Vec::new(),
921 }
922 }
923
924 #[must_use]
926 pub fn with_compression(mode: CompressionMode) -> Self {
927 Self {
928 values: FxHashMap::default(),
929 zone_map: ZoneMapEntry::new(),
930 zone_map_dirty: false,
931 compression_mode: mode,
932 compressed: None,
933 compressed_count: 0,
934 spilled: false,
935 block_zone_maps: Vec::new(),
936 }
937 }
938
939 pub fn set_compression_mode(&mut self, mode: CompressionMode) {
941 self.compression_mode = mode;
942 if mode == CompressionMode::None {
943 if self.compressed.is_some() {
945 self.decompress_all();
946 }
947 }
948 }
949
950 #[must_use]
952 pub fn compression_mode(&self) -> CompressionMode {
953 self.compression_mode
954 }
955
956 pub fn set(&mut self, id: Id, value: Value) {
958 self.update_zone_map_on_insert(&value);
960 self.values.insert(id, value);
961
962 if self.compression_mode == CompressionMode::Auto {
964 let total_count = self.values.len() + self.compressed_count;
965 let hot_buffer_count = self.values.len();
966
967 if hot_buffer_count >= HOT_BUFFER_SIZE && total_count >= COMPRESSION_THRESHOLD {
969 self.compress();
970 }
971 }
972 }
973
974 fn update_zone_map_on_insert(&mut self, value: &Value) {
976 self.zone_map.row_count += 1;
977
978 if matches!(value, Value::Null) {
979 self.zone_map.null_count += 1;
980 return;
981 }
982
983 match &self.zone_map.min {
985 None => self.zone_map.min = Some(value.clone()),
986 Some(current) => {
987 if compare_values(value, current) == Some(Ordering::Less) {
988 self.zone_map.min = Some(value.clone());
989 }
990 }
991 }
992
993 match &self.zone_map.max {
995 None => self.zone_map.max = Some(value.clone()),
996 Some(current) => {
997 if compare_values(value, current) == Some(Ordering::Greater) {
998 self.zone_map.max = Some(value.clone());
999 }
1000 }
1001 }
1002 }
1003
1004 #[must_use]
1009 pub fn get(&self, id: Id) -> Option<Value> {
1010 if let Some(value) = self.values.get(&id) {
1012 return Some(value.clone());
1013 }
1014
1015 None
1020 }
1021
1022 pub fn remove(&mut self, id: Id) -> Option<Value> {
1024 let removed = self.values.remove(&id);
1025 if removed.is_some() {
1026 self.zone_map_dirty = true;
1028 }
1029 removed
1030 }
1031
1032 pub fn mark_spilled(&mut self) {
1038 self.spilled = true;
1039 }
1040
1041 #[must_use]
1047 pub fn is_spilled(&self) -> bool {
1048 self.spilled
1049 }
1050
1051 pub fn evict_values(&mut self) -> (usize, usize) {
1058 let count = self.values.len();
1059 let freed_bytes = self.heap_memory_bytes();
1060 self.values.clear();
1061 self.values.shrink_to_fit();
1062 self.compressed = None;
1063 self.compressed_count = 0;
1064 self.block_zone_maps.clear();
1065 self.spilled = true;
1066 (count, freed_bytes)
1067 }
1068
1069 pub fn drain_values(&mut self) -> Vec<(Id, Value)> {
1074 let drained: Vec<(Id, Value)> = self.values.drain().collect();
1075 self.values.shrink_to_fit();
1076 self.compressed = None;
1077 self.compressed_count = 0;
1078 self.block_zone_maps.clear();
1079 self.spilled = true;
1080 drained
1081 }
1082
1083 pub fn restore_values(&mut self, values: impl Iterator<Item = (Id, Value)>) {
1088 self.spilled = false;
1089 for (id, value) in values {
1093 self.values.insert(id, value);
1094 }
1095 }
1096
1097 #[must_use]
1099 pub fn len(&self) -> usize {
1100 self.values.len() + self.compressed_count
1101 }
1102
1103 #[cfg(test)]
1105 #[must_use]
1106 pub fn is_empty(&self) -> bool {
1107 self.values.is_empty() && self.compressed_count == 0
1108 }
1109
1110 #[must_use]
1112 pub fn compression_stats(&self) -> CompressionStats {
1113 let hot_size = self.values.len() * std::mem::size_of::<Value>();
1114 let compressed_size = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
1115 let codec = match &self.compressed {
1116 Some(CompressedColumnData::Integers { data, .. }) => Some(data.codec),
1117 Some(CompressedColumnData::Strings { .. }) => Some(CompressionCodec::Dictionary),
1118 Some(CompressedColumnData::Booleans { data, .. }) => Some(data.codec),
1119 None => None,
1120 };
1121
1122 CompressionStats {
1123 uncompressed_size: hot_size + self.compressed_count * std::mem::size_of::<Value>(),
1124 compressed_size: hot_size + compressed_size,
1125 value_count: self.len(),
1126 codec,
1127 }
1128 }
1129
1130 #[must_use]
1135 pub fn heap_memory_bytes(&self) -> usize {
1136 let hot_bytes =
1138 self.values.capacity() * (std::mem::size_of::<Id>() + std::mem::size_of::<Value>() + 1);
1139 let compressed_bytes = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
1141 hot_bytes + compressed_bytes
1143 }
1144
1145 #[must_use]
1147 #[cfg(test)]
1148 pub fn is_compressed(&self) -> bool {
1149 self.compressed.is_some()
1150 }
1151
1152 pub fn compress(&mut self) {
1161 if self.values.is_empty() {
1162 return;
1163 }
1164
1165 if self.compressed.is_some() {
1168 return;
1169 }
1170
1171 let (int_count, str_count, bool_count) = self.count_types();
1173 let total = self.values.len();
1174
1175 if int_count > total / 2 {
1176 self.compress_as_integers();
1177 } else if str_count > total / 2 {
1178 self.compress_as_strings();
1179 } else if bool_count > total / 2 {
1180 self.compress_as_booleans();
1181 }
1182 }
1184
1185 fn count_types(&self) -> (usize, usize, usize) {
1187 let mut int_count = 0;
1188 let mut str_count = 0;
1189 let mut bool_count = 0;
1190
1191 for value in self.values.values() {
1192 match value {
1193 Value::Int64(_) => int_count += 1,
1194 Value::String(_) => str_count += 1,
1195 Value::Bool(_) => bool_count += 1,
1196 _ => {}
1197 }
1198 }
1199
1200 (int_count, str_count, bool_count)
1201 }
1202
1203 fn compress_as_integers(&mut self) {
1205 let mut values: Vec<(u64, i64)> = Vec::new();
1207 let mut non_int_values: FxHashMap<Id, Value> = FxHashMap::default();
1208
1209 for (&id, value) in &self.values {
1210 match value {
1211 Value::Int64(v) => {
1212 let id_u64 = id.as_u64();
1213 values.push((id_u64, *v));
1214 }
1215 _ => {
1216 non_int_values.insert(id, value.clone());
1217 }
1218 }
1219 }
1220
1221 if values.len() < 8 {
1222 return;
1224 }
1225
1226 values.sort_by_key(|(id, _)| *id);
1228
1229 let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
1230 let index_to_id: Vec<u64> = id_to_index.clone();
1231 let int_values: Vec<i64> = values.iter().map(|(_, v)| *v).collect();
1232
1233 let Ok(compressed) = TypeSpecificCompressor::compress_signed_integers(&int_values) else {
1235 return;
1236 };
1237
1238 if compressed.compression_ratio() > 1.2 {
1240 self.block_zone_maps =
1241 compute_block_zone_maps(int_values.iter().map(|v| Value::Int64(*v)));
1242 self.compressed = Some(CompressedColumnData::Integers {
1243 data: compressed,
1244 id_to_index,
1245 index_to_id,
1246 });
1247 self.compressed_count = values.len();
1248 self.values = non_int_values;
1249 }
1250 }
1251
1252 fn compress_as_strings(&mut self) {
1254 let mut values: Vec<(u64, ArcStr)> = Vec::new();
1255 let mut non_str_values: FxHashMap<Id, Value> = FxHashMap::default();
1256
1257 for (&id, value) in &self.values {
1258 match value {
1259 Value::String(s) => {
1260 values.push((id.as_u64(), s.clone()));
1261 }
1262 _ => {
1263 non_str_values.insert(id, value.clone());
1264 }
1265 }
1266 }
1267
1268 if values.len() < 8 {
1269 return;
1270 }
1271
1272 values.sort_by_key(|(id, _)| *id);
1274
1275 let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
1276 let index_to_id: Vec<u64> = id_to_index.clone();
1277
1278 let mut builder = DictionaryBuilder::new();
1280 for (_, s) in &values {
1281 builder.add(s.as_ref());
1282 }
1283 let encoding = builder.build();
1284
1285 if encoding.compression_ratio() > 1.2 {
1287 self.block_zone_maps =
1288 compute_block_zone_maps(values.iter().map(|(_, s)| Value::String(s.clone())));
1289 self.compressed = Some(CompressedColumnData::Strings {
1290 encoding,
1291 id_to_index,
1292 index_to_id,
1293 });
1294 self.compressed_count = values.len();
1295 self.values = non_str_values;
1296 }
1297 }
1298
1299 fn compress_as_booleans(&mut self) {
1301 let mut values: Vec<(u64, bool)> = Vec::new();
1302 let mut non_bool_values: FxHashMap<Id, Value> = FxHashMap::default();
1303
1304 for (&id, value) in &self.values {
1305 match value {
1306 Value::Bool(b) => {
1307 values.push((id.as_u64(), *b));
1308 }
1309 _ => {
1310 non_bool_values.insert(id, value.clone());
1311 }
1312 }
1313 }
1314
1315 if values.len() < 8 {
1316 return;
1317 }
1318
1319 values.sort_by_key(|(id, _)| *id);
1321
1322 let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
1323 let index_to_id: Vec<u64> = id_to_index.clone();
1324 let bool_values: Vec<bool> = values.iter().map(|(_, v)| *v).collect();
1325
1326 let Ok(compressed) = TypeSpecificCompressor::compress_booleans(&bool_values) else {
1327 return;
1328 };
1329
1330 self.block_zone_maps = compute_block_zone_maps(bool_values.iter().map(|b| Value::Bool(*b)));
1332 self.compressed = Some(CompressedColumnData::Booleans {
1333 data: compressed,
1334 id_to_index,
1335 index_to_id,
1336 });
1337 self.compressed_count = values.len();
1338 self.values = non_bool_values;
1339 }
1340
1341 fn decompress_all(&mut self) {
1343 let Some(compressed) = self.compressed.take() else {
1344 return;
1345 };
1346
1347 match compressed {
1348 CompressedColumnData::Integers {
1349 data, index_to_id, ..
1350 } => {
1351 if let Ok(values) = TypeSpecificCompressor::decompress_integers(&data) {
1352 let signed: Vec<i64> = values
1354 .iter()
1355 .map(|&v| crate::codec::zigzag_decode(v))
1356 .collect();
1357
1358 for (i, id_u64) in index_to_id.iter().enumerate() {
1359 if let Some(&value) = signed.get(i) {
1360 let id = Id::from_u64(*id_u64);
1361 self.values.insert(id, Value::Int64(value));
1362 }
1363 }
1364 }
1365 }
1366 CompressedColumnData::Strings {
1367 encoding,
1368 index_to_id,
1369 ..
1370 } => {
1371 for (i, id_u64) in index_to_id.iter().enumerate() {
1372 if let Some(s) = encoding.get(i) {
1373 let id = Id::from_u64(*id_u64);
1374 self.values.insert(id, Value::String(ArcStr::from(s)));
1375 }
1376 }
1377 }
1378 CompressedColumnData::Booleans {
1379 data, index_to_id, ..
1380 } => {
1381 if let Ok(values) = TypeSpecificCompressor::decompress_booleans(&data) {
1382 for (i, id_u64) in index_to_id.iter().enumerate() {
1383 if let Some(&value) = values.get(i) {
1384 let id = Id::from_u64(*id_u64);
1385 self.values.insert(id, Value::Bool(value));
1386 }
1387 }
1388 }
1389 }
1390 }
1391
1392 self.compressed_count = 0;
1393 self.block_zone_maps.clear();
1394 }
1395
1396 pub fn force_compress(&mut self) {
1400 self.compress();
1401 }
1402
1403 #[must_use]
1405 pub fn zone_map(&self) -> &ZoneMapEntry {
1406 &self.zone_map
1407 }
1408
1409 #[must_use]
1418 pub fn block_zone_maps(&self) -> &[ZoneMapEntry] {
1419 &self.block_zone_maps
1420 }
1421
1422 #[must_use]
1425 pub fn block_count(&self) -> usize {
1426 self.block_zone_maps.len()
1427 }
1428
1429 #[must_use]
1441 pub fn decode_block(&self, block_idx: usize) -> Option<DecodedBlock<Id>> {
1442 let zone_map = self.block_zone_maps.get(block_idx)?.clone();
1443 let compressed = self.compressed.as_ref()?;
1444
1445 let block_size = DEFAULT_BLOCK_ROWS as usize;
1446 let start = block_idx * block_size;
1447 let end = match self.compressed_count.min(start + block_size) {
1448 n if n > start => n,
1450 _ => return None,
1451 };
1452
1453 let entries = match compressed {
1454 CompressedColumnData::Integers {
1455 data, index_to_id, ..
1456 } => {
1457 let raw = TypeSpecificCompressor::decompress_integers(data).ok()?;
1458 let signed: Vec<i64> = raw
1459 .iter()
1460 .map(|&v| crate::codec::zigzag_decode(v))
1461 .collect();
1462 index_to_id
1463 .iter()
1464 .zip(signed.iter())
1465 .skip(start)
1466 .take(end - start)
1467 .map(|(&id_u64, &value)| (Id::from_u64(id_u64), Value::Int64(value)))
1468 .collect()
1469 }
1470 CompressedColumnData::Strings {
1471 encoding,
1472 index_to_id,
1473 ..
1474 } => index_to_id
1475 .iter()
1476 .enumerate()
1477 .skip(start)
1478 .take(end - start)
1479 .filter_map(|(i, &id_u64)| {
1480 encoding
1481 .get(i)
1482 .map(|s| (Id::from_u64(id_u64), Value::String(ArcStr::from(s))))
1483 })
1484 .collect(),
1485 CompressedColumnData::Booleans {
1486 data, index_to_id, ..
1487 } => {
1488 let raw = TypeSpecificCompressor::decompress_booleans(data).ok()?;
1489 index_to_id
1490 .iter()
1491 .zip(raw.iter())
1492 .skip(start)
1493 .take(end - start)
1494 .map(|(&id_u64, &value)| (Id::from_u64(id_u64), Value::Bool(value)))
1495 .collect()
1496 }
1497 };
1498
1499 Some(DecodedBlock { zone_map, entries })
1500 }
1501
1502 pub fn iter_decoded_blocks(&self) -> impl Iterator<Item = DecodedBlock<Id>> + '_ {
1509 (0..self.block_count()).filter_map(|idx| self.decode_block(idx))
1510 }
1511
1512 #[must_use]
1517 pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
1518 if self.zone_map_dirty {
1519 return true;
1521 }
1522
1523 match op {
1524 CompareOp::Eq => self.zone_map.might_contain_equal(value),
1525 CompareOp::Ne => {
1526 match (&self.zone_map.min, &self.zone_map.max) {
1529 (Some(min), Some(max)) => {
1530 !(compare_values(min, value) == Some(Ordering::Equal)
1531 && compare_values(max, value) == Some(Ordering::Equal))
1532 }
1533 _ => true,
1534 }
1535 }
1536 CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
1537 CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
1538 CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
1539 CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
1540 }
1541 }
1542
1543 pub fn rebuild_zone_map(&mut self) {
1545 let mut zone_map = ZoneMapEntry::new();
1546
1547 for value in self.values.values() {
1548 zone_map.row_count += 1;
1549
1550 if matches!(value, Value::Null) {
1551 zone_map.null_count += 1;
1552 continue;
1553 }
1554
1555 match &zone_map.min {
1557 None => zone_map.min = Some(value.clone()),
1558 Some(current) => {
1559 if compare_values(value, current) == Some(Ordering::Less) {
1560 zone_map.min = Some(value.clone());
1561 }
1562 }
1563 }
1564
1565 match &zone_map.max {
1567 None => zone_map.max = Some(value.clone()),
1568 Some(current) => {
1569 if compare_values(value, current) == Some(Ordering::Greater) {
1570 zone_map.max = Some(value.clone());
1571 }
1572 }
1573 }
1574 }
1575
1576 self.zone_map = zone_map;
1577 self.zone_map_dirty = false;
1578 }
1579}
1580
1581#[cfg(feature = "temporal")]
1595impl<Id: EntityId> PropertyColumn<Id> {
1596 #[must_use]
1598 pub fn new() -> Self {
1599 Self {
1600 values: FxHashMap::default(),
1601 zone_map: ZoneMapEntry::new(),
1602 zone_map_dirty: false,
1603 compression_mode: CompressionMode::None,
1604 block_zone_maps: Vec::new(),
1605 }
1606 }
1607
1608 #[must_use]
1610 pub fn with_compression(mode: CompressionMode) -> Self {
1611 Self {
1612 values: FxHashMap::default(),
1613 zone_map: ZoneMapEntry::new(),
1614 zone_map_dirty: false,
1615 compression_mode: mode,
1616 block_zone_maps: Vec::new(),
1617 }
1618 }
1619
1620 pub fn set_compression_mode(&mut self, mode: CompressionMode) {
1622 self.compression_mode = mode;
1623 }
1624
1625 #[must_use]
1627 pub fn compression_mode(&self) -> CompressionMode {
1628 self.compression_mode
1629 }
1630
1631 pub fn set(&mut self, id: Id, value: Value, epoch: EpochId) {
1636 self.update_zone_map_on_insert(&value);
1637 self.values.entry(id).or_default().append(epoch, value);
1638 }
1639
1640 fn update_zone_map_on_insert(&mut self, value: &Value) {
1642 self.zone_map.row_count += 1;
1643
1644 if matches!(value, Value::Null) {
1645 self.zone_map.null_count += 1;
1646 return;
1647 }
1648
1649 match &self.zone_map.min {
1650 None => self.zone_map.min = Some(value.clone()),
1651 Some(current) => {
1652 if compare_values(value, current) == Some(Ordering::Less) {
1653 self.zone_map.min = Some(value.clone());
1654 }
1655 }
1656 }
1657
1658 match &self.zone_map.max {
1659 None => self.zone_map.max = Some(value.clone()),
1660 Some(current) => {
1661 if compare_values(value, current) == Some(Ordering::Greater) {
1662 self.zone_map.max = Some(value.clone());
1663 }
1664 }
1665 }
1666 }
1667
1668 #[must_use]
1670 pub fn get(&self, id: Id) -> Option<Value> {
1671 self.values
1672 .get(&id)
1673 .and_then(|log| log.latest())
1674 .filter(|v| !v.is_null())
1675 .cloned()
1676 }
1677
1678 pub fn remove(&mut self, id: Id, epoch: EpochId) -> Option<Value> {
1680 let previous = self.get(id);
1681 if previous.is_some() {
1682 self.values
1683 .entry(id)
1684 .or_default()
1685 .append(epoch, Value::Null);
1686 self.zone_map_dirty = true;
1687 }
1688 previous
1689 }
1690
1691 #[must_use]
1693 pub fn len(&self) -> usize {
1694 self.values
1695 .values()
1696 .filter(|log| log.latest().is_some_and(|v| !v.is_null()))
1697 .count()
1698 }
1699
1700 #[cfg(test)]
1702 #[must_use]
1703 #[allow(dead_code)]
1704 pub fn is_empty(&self) -> bool {
1705 self.len() == 0
1706 }
1707
1708 #[must_use]
1712 pub fn compression_stats(&self) -> CompressionStats {
1713 let live_count = self.len();
1714 let hot_size = live_count * std::mem::size_of::<Value>();
1715
1716 CompressionStats {
1717 uncompressed_size: hot_size,
1718 compressed_size: hot_size,
1719 value_count: live_count,
1720 codec: None,
1721 }
1722 }
1723
1724 #[must_use]
1726 pub fn heap_memory_bytes(&self) -> usize {
1727 self.values.capacity()
1728 * (std::mem::size_of::<Id>() + std::mem::size_of::<VersionLog<Value>>() + 1)
1729 }
1730
1731 pub fn compress(&mut self) {}
1733
1734 pub fn force_compress(&mut self) {}
1736
1737 #[must_use]
1739 pub fn zone_map(&self) -> &ZoneMapEntry {
1740 &self.zone_map
1741 }
1742
1743 #[must_use]
1749 pub fn block_zone_maps(&self) -> &[ZoneMapEntry] {
1750 &self.block_zone_maps
1751 }
1752
1753 #[must_use]
1755 pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
1756 if self.zone_map_dirty {
1757 return true;
1758 }
1759
1760 match op {
1761 CompareOp::Eq => self.zone_map.might_contain_equal(value),
1762 CompareOp::Ne => match (&self.zone_map.min, &self.zone_map.max) {
1763 (Some(min), Some(max)) => {
1764 !(compare_values(min, value) == Some(Ordering::Equal)
1765 && compare_values(max, value) == Some(Ordering::Equal))
1766 }
1767 _ => true,
1768 },
1769 CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
1770 CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
1771 CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
1772 CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
1773 }
1774 }
1775
1776 pub fn rebuild_zone_map(&mut self) {
1778 let mut zone_map = ZoneMapEntry::new();
1779
1780 for log in self.values.values() {
1781 if let Some(value) = log.latest() {
1782 zone_map.row_count += 1;
1783
1784 if matches!(value, Value::Null) {
1785 zone_map.null_count += 1;
1786 continue;
1787 }
1788
1789 match &zone_map.min {
1790 None => zone_map.min = Some(value.clone()),
1791 Some(current) => {
1792 if compare_values(value, current) == Some(Ordering::Less) {
1793 zone_map.min = Some(value.clone());
1794 }
1795 }
1796 }
1797
1798 match &zone_map.max {
1799 None => zone_map.max = Some(value.clone()),
1800 Some(current) => {
1801 if compare_values(value, current) == Some(Ordering::Greater) {
1802 zone_map.max = Some(value.clone());
1803 }
1804 }
1805 }
1806 }
1807 }
1808
1809 self.zone_map = zone_map;
1810 self.zone_map_dirty = false;
1811 }
1812
1813 #[must_use]
1817 pub fn get_at(&self, id: Id, epoch: EpochId) -> Option<Value> {
1818 self.values
1819 .get(&id)
1820 .and_then(|log| log.at(epoch))
1821 .filter(|v| !v.is_null())
1822 .cloned()
1823 }
1824
1825 pub fn finalize_pending(&mut self, real_epoch: EpochId) {
1827 for log in self.values.values_mut() {
1828 log.finalize_pending(real_epoch);
1829 }
1830 }
1831
1832 pub fn remove_pending(&mut self) {
1834 for log in self.values.values_mut() {
1835 log.remove_pending();
1836 }
1837 self.values.retain(|_, log| !log.is_empty());
1838 }
1839
1840 pub fn gc(&mut self, min_epoch: EpochId) {
1842 for log in self.values.values_mut() {
1843 log.gc(min_epoch);
1844 }
1845 self.values.retain(|_, log| !log.is_empty());
1846 }
1847
1848 pub fn remove_pending_for(&mut self, id: Id) {
1850 if let Some(log) = self.values.get_mut(&id) {
1851 log.remove_pending();
1852 if log.is_empty() {
1853 self.values.remove(&id);
1854 }
1855 }
1856 }
1857
1858 pub fn pop_n_pending_for(&mut self, id: Id, n: usize) {
1863 if let Some(log) = self.values.get_mut(&id) {
1864 log.pop_n_pending(n);
1865 if log.is_empty() {
1866 self.values.remove(&id);
1867 }
1868 }
1869 }
1870}
1871
1872#[cfg(not(feature = "temporal"))]
1882fn compute_block_zone_maps(values: impl IntoIterator<Item = Value>) -> Vec<ZoneMapEntry> {
1883 let block_size = DEFAULT_BLOCK_ROWS as usize;
1884 let mut blocks: Vec<ZoneMapEntry> = Vec::new();
1885 let mut current = ZoneMapEntry::new();
1886 let mut current_rows: usize = 0;
1887
1888 for value in values {
1889 if current_rows == block_size {
1890 blocks.push(current);
1891 current = ZoneMapEntry::new();
1892 current_rows = 0;
1893 }
1894 current.row_count += 1;
1895 current_rows += 1;
1896
1897 if matches!(value, Value::Null) {
1898 current.null_count += 1;
1899 continue;
1900 }
1901
1902 if compare_values(&value, &value) != Some(Ordering::Equal) {
1907 continue;
1908 }
1909
1910 let is_less_than_min = match ¤t.min {
1911 None => true,
1912 Some(existing) => compare_values(&value, existing) == Some(Ordering::Less),
1913 };
1914 let is_greater_than_max = match ¤t.max {
1915 None => true,
1916 Some(existing) => compare_values(&value, existing) == Some(Ordering::Greater),
1917 };
1918 if is_less_than_min {
1919 current.min = Some(value.clone());
1920 }
1921 if is_greater_than_max {
1922 current.max = Some(value);
1923 }
1924 }
1925
1926 if current_rows > 0 {
1927 blocks.push(current);
1928 }
1929 blocks
1930}
1931
1932fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
1934 match (a, b) {
1935 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
1936 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
1937 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
1938 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
1939 (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
1940 (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
1941 (Value::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
1942 (Value::Date(a), Value::Date(b)) => Some(a.cmp(b)),
1943 (Value::Time(a), Value::Time(b)) => Some(a.cmp(b)),
1944 _ => None,
1945 }
1946}
1947
1948impl<Id: EntityId> Default for PropertyColumn<Id> {
1949 fn default() -> Self {
1950 Self::new()
1951 }
1952}
1953
1954pub struct PropertyColumnRef<'a, Id: EntityId = NodeId> {
1958 _guard: parking_lot::RwLockReadGuard<'a, FxHashMap<PropertyKey, PropertyColumn<Id>>>,
1959 _key: PropertyKey,
1960 _marker: PhantomData<Id>,
1961}
1962
1963#[cfg(test)]
1964#[cfg(not(feature = "temporal"))]
1965mod tests {
1966 use super::*;
1967 use arcstr::ArcStr;
1968
1969 #[test]
1970 fn test_property_storage_basic() {
1971 let storage = PropertyStorage::new();
1972
1973 let node1 = NodeId::new(1);
1974 let node2 = NodeId::new(2);
1975 let name_key = PropertyKey::new("name");
1976 let age_key = PropertyKey::new("age");
1977
1978 storage.set(node1, name_key.clone(), "Alix".into());
1979 storage.set(node1, age_key.clone(), 30i64.into());
1980 storage.set(node2, name_key.clone(), "Gus".into());
1981
1982 assert_eq!(
1983 storage.get(node1, &name_key),
1984 Some(Value::String("Alix".into()))
1985 );
1986 assert_eq!(storage.get(node1, &age_key), Some(Value::Int64(30)));
1987 assert_eq!(
1988 storage.get(node2, &name_key),
1989 Some(Value::String("Gus".into()))
1990 );
1991 assert!(storage.get(node2, &age_key).is_none());
1992 }
1993
1994 #[test]
1995 fn test_property_storage_remove() {
1996 let storage = PropertyStorage::new();
1997
1998 let node = NodeId::new(1);
1999 let key = PropertyKey::new("name");
2000
2001 storage.set(node, key.clone(), "Alix".into());
2002 assert!(storage.get(node, &key).is_some());
2003
2004 let removed = storage.remove(node, &key);
2005 assert!(removed.is_some());
2006 assert!(storage.get(node, &key).is_none());
2007 }
2008
2009 #[test]
2010 fn test_property_storage_get_all() {
2011 let storage = PropertyStorage::new();
2012
2013 let node = NodeId::new(1);
2014 storage.set(node, PropertyKey::new("name"), "Alix".into());
2015 storage.set(node, PropertyKey::new("age"), 30i64.into());
2016 storage.set(node, PropertyKey::new("active"), true.into());
2017
2018 let props = storage.get_all(node);
2019 assert_eq!(props.len(), 3);
2020 }
2021
2022 #[test]
2023 fn test_property_storage_remove_all() {
2024 let storage = PropertyStorage::new();
2025
2026 let node = NodeId::new(1);
2027 storage.set(node, PropertyKey::new("name"), "Alix".into());
2028 storage.set(node, PropertyKey::new("age"), 30i64.into());
2029
2030 storage.remove_all(node);
2031
2032 assert!(storage.get(node, &PropertyKey::new("name")).is_none());
2033 assert!(storage.get(node, &PropertyKey::new("age")).is_none());
2034 }
2035
2036 #[test]
2037 fn test_property_column() {
2038 let mut col = PropertyColumn::new();
2039
2040 col.set(NodeId::new(1), "Alix".into());
2041 col.set(NodeId::new(2), "Gus".into());
2042
2043 assert_eq!(col.len(), 2);
2044 assert!(!col.is_empty());
2045
2046 assert_eq!(col.get(NodeId::new(1)), Some(Value::String("Alix".into())));
2047
2048 col.remove(NodeId::new(1));
2049 assert!(col.get(NodeId::new(1)).is_none());
2050 assert_eq!(col.len(), 1);
2051 }
2052
2053 #[test]
2054 fn test_compression_mode() {
2055 let col: PropertyColumn<NodeId> = PropertyColumn::new();
2056 assert_eq!(col.compression_mode(), CompressionMode::None);
2057
2058 let col: PropertyColumn<NodeId> = PropertyColumn::with_compression(CompressionMode::Auto);
2059 assert_eq!(col.compression_mode(), CompressionMode::Auto);
2060 }
2061
2062 #[test]
2063 fn test_property_storage_with_compression() {
2064 let storage = PropertyStorage::with_compression(CompressionMode::Auto);
2065
2066 for i in 0u64..100 {
2067 let age = 20 + i64::try_from(i % 50).unwrap();
2068 storage.set(NodeId::new(i), PropertyKey::new("age"), Value::Int64(age));
2069 }
2070
2071 assert_eq!(
2073 storage.get(NodeId::new(0), &PropertyKey::new("age")),
2074 Some(Value::Int64(20))
2075 );
2076 assert_eq!(
2077 storage.get(NodeId::new(50), &PropertyKey::new("age")),
2078 Some(Value::Int64(20))
2079 );
2080 }
2081
2082 #[test]
2083 fn test_compress_integer_column() {
2084 let mut col: PropertyColumn<NodeId> =
2085 PropertyColumn::with_compression(CompressionMode::Auto);
2086
2087 for i in 0u64..2000 {
2089 col.set(
2090 NodeId::new(i),
2091 Value::Int64(1000 + i64::try_from(i).unwrap()),
2092 );
2093 }
2094
2095 let stats = col.compression_stats();
2098 assert_eq!(stats.value_count, 2000);
2099
2100 let last_value = col.get(NodeId::new(1999));
2103 assert!(last_value.is_some() || col.is_compressed());
2104 }
2105
2106 #[test]
2107 fn test_compress_string_column() {
2108 let mut col: PropertyColumn<NodeId> =
2109 PropertyColumn::with_compression(CompressionMode::Auto);
2110
2111 let categories = ["Person", "Company", "Product", "Location"];
2113 for i in 0..2000 {
2114 let cat = categories[i % 4];
2115 col.set(NodeId::new(i as u64), Value::String(ArcStr::from(cat)));
2116 }
2117
2118 assert_eq!(col.len(), 2000);
2120
2121 let last_value = col.get(NodeId::new(1999));
2123 assert!(last_value.is_some() || col.is_compressed());
2124 }
2125
2126 #[test]
2127 fn test_compress_boolean_column() {
2128 let mut col: PropertyColumn<NodeId> =
2129 PropertyColumn::with_compression(CompressionMode::Auto);
2130
2131 for i in 0u64..2000 {
2133 col.set(NodeId::new(i), Value::Bool(i % 2 == 0));
2134 }
2135
2136 assert_eq!(col.len(), 2000);
2138
2139 let last_value = col.get(NodeId::new(1999));
2141 assert!(last_value.is_some() || col.is_compressed());
2142 }
2143
2144 #[test]
2145 fn test_force_compress() {
2146 let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2147
2148 for i in 0u64..100 {
2150 col.set(NodeId::new(i), Value::Int64(i64::try_from(i).unwrap()));
2151 }
2152
2153 col.force_compress();
2155
2156 let stats = col.compression_stats();
2158 assert_eq!(stats.value_count, 100);
2159 }
2160
2161 #[test]
2162 fn test_compression_stats() {
2163 let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2164
2165 for i in 0u64..50 {
2166 col.set(NodeId::new(i), Value::Int64(i64::try_from(i).unwrap()));
2167 }
2168
2169 let stats = col.compression_stats();
2170 assert_eq!(stats.value_count, 50);
2171 assert!(stats.uncompressed_size > 0);
2172 }
2173
2174 #[test]
2175 fn test_storage_compression_stats() {
2176 let storage = PropertyStorage::with_compression(CompressionMode::Auto);
2177
2178 for i in 0u64..100 {
2179 storage.set(
2180 NodeId::new(i),
2181 PropertyKey::new("age"),
2182 Value::Int64(i64::try_from(i).unwrap()),
2183 );
2184 storage.set(
2185 NodeId::new(i),
2186 PropertyKey::new("name"),
2187 Value::String(ArcStr::from("Alix")),
2188 );
2189 }
2190
2191 let stats = storage.compression_stats();
2192 assert_eq!(stats.len(), 2); assert!(stats.contains_key(&PropertyKey::new("age")));
2194 assert!(stats.contains_key(&PropertyKey::new("name")));
2195 }
2196
2197 #[test]
2198 fn test_memory_usage() {
2199 let storage = PropertyStorage::new();
2200
2201 for i in 0u64..100 {
2202 storage.set(
2203 NodeId::new(i),
2204 PropertyKey::new("value"),
2205 Value::Int64(i64::try_from(i).unwrap()),
2206 );
2207 }
2208
2209 let usage = storage.memory_usage();
2210 assert!(usage > 0);
2211 }
2212
2213 #[test]
2214 fn test_get_batch_single_property() {
2215 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
2216
2217 let node1 = NodeId::new(1);
2218 let node2 = NodeId::new(2);
2219 let node3 = NodeId::new(3);
2220 let age_key = PropertyKey::new("age");
2221
2222 storage.set(node1, age_key.clone(), 25i64.into());
2223 storage.set(node2, age_key.clone(), 30i64.into());
2224 let ids = vec![node1, node2, node3];
2227 let values = storage.get_batch(&ids, &age_key);
2228
2229 assert_eq!(values.len(), 3);
2230 assert_eq!(values[0], Some(Value::Int64(25)));
2231 assert_eq!(values[1], Some(Value::Int64(30)));
2232 assert_eq!(values[2], None);
2233 }
2234
2235 #[test]
2236 fn test_get_batch_missing_column() {
2237 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
2238
2239 let node1 = NodeId::new(1);
2240 let node2 = NodeId::new(2);
2241 let missing_key = PropertyKey::new("nonexistent");
2242
2243 let ids = vec![node1, node2];
2244 let values = storage.get_batch(&ids, &missing_key);
2245
2246 assert_eq!(values.len(), 2);
2247 assert_eq!(values[0], None);
2248 assert_eq!(values[1], None);
2249 }
2250
2251 #[test]
2252 fn test_get_batch_empty_ids() {
2253 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
2254 let key = PropertyKey::new("any");
2255
2256 let values = storage.get_batch(&[], &key);
2257 assert!(values.is_empty());
2258 }
2259
2260 #[test]
2261 fn test_get_all_batch() {
2262 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
2263
2264 let node1 = NodeId::new(1);
2265 let node2 = NodeId::new(2);
2266 let node3 = NodeId::new(3);
2267
2268 storage.set(node1, PropertyKey::new("name"), "Alix".into());
2269 storage.set(node1, PropertyKey::new("age"), 25i64.into());
2270 storage.set(node2, PropertyKey::new("name"), "Gus".into());
2271 let ids = vec![node1, node2, node3];
2274 let all_props = storage.get_all_batch(&ids);
2275
2276 assert_eq!(all_props.len(), 3);
2277 assert_eq!(all_props[0].len(), 2); assert_eq!(all_props[1].len(), 1); assert_eq!(all_props[2].len(), 0); assert_eq!(
2282 all_props[0].get(&PropertyKey::new("name")),
2283 Some(&Value::String("Alix".into()))
2284 );
2285 assert_eq!(
2286 all_props[1].get(&PropertyKey::new("name")),
2287 Some(&Value::String("Gus".into()))
2288 );
2289 }
2290
2291 #[test]
2292 fn test_get_all_batch_empty_ids() {
2293 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
2294
2295 let all_props = storage.get_all_batch(&[]);
2296 assert!(all_props.is_empty());
2297 }
2298
2299 #[test]
2302 fn test_block_zone_maps_empty_for_uncompressed_column() {
2303 let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2304 for i in 0u64..50 {
2305 col.set(NodeId::new(i), Value::Int64(i64::try_from(i).unwrap()));
2306 }
2307 assert!(col.block_zone_maps().is_empty());
2308 }
2309
2310 #[test]
2311 fn test_block_zone_maps_integer_compressed() {
2312 let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2313 for i in 0u64..2500 {
2315 col.set(
2316 NodeId::new(i),
2317 Value::Int64(1000 + i64::try_from(i).unwrap()),
2318 );
2319 }
2320 col.force_compress();
2321
2322 let blocks = col.block_zone_maps();
2323 assert_eq!(blocks.len(), 3, "2500 rows / 1024 = 3 blocks");
2324
2325 assert_eq!(blocks[0].row_count, 1024);
2326 assert_eq!(blocks[0].min, Some(Value::Int64(1000)));
2327 assert_eq!(blocks[0].max, Some(Value::Int64(2023)));
2328
2329 assert_eq!(blocks[1].row_count, 1024);
2330 assert_eq!(blocks[1].min, Some(Value::Int64(2024)));
2331 assert_eq!(blocks[1].max, Some(Value::Int64(3047)));
2332
2333 assert_eq!(blocks[2].row_count, 452);
2334 assert_eq!(blocks[2].min, Some(Value::Int64(3048)));
2335 assert_eq!(blocks[2].max, Some(Value::Int64(3499)));
2336 }
2337
2338 #[test]
2339 fn test_block_zone_maps_string_compressed() {
2340 let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2341 let strings = ["alpha", "bravo", "charlie", "delta"];
2345 for i in 0u64..2500 {
2346 col.set(
2347 NodeId::new(i),
2348 Value::String(ArcStr::from(strings[(i % 4) as usize])),
2349 );
2350 }
2351 col.force_compress();
2352
2353 let blocks = col.block_zone_maps();
2354 assert_eq!(blocks.len(), 3);
2355 assert_eq!(blocks[0].row_count, 1024);
2356 assert_eq!(blocks[1].row_count, 1024);
2357 assert_eq!(blocks[2].row_count, 452);
2358 for block in blocks {
2359 assert_eq!(block.min, Some(Value::String(ArcStr::from("alpha"))));
2360 assert_eq!(block.max, Some(Value::String(ArcStr::from("delta"))));
2361 }
2362 }
2363
2364 #[test]
2365 fn test_block_zone_maps_boolean_compressed() {
2366 let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2367 for i in 0u64..2500 {
2369 col.set(NodeId::new(i), Value::Bool(i % 2 == 0));
2370 }
2371 col.force_compress();
2372
2373 let blocks = col.block_zone_maps();
2374 assert_eq!(blocks.len(), 3);
2375 for block in blocks {
2376 assert_eq!(block.min, Some(Value::Bool(false)));
2377 assert_eq!(block.max, Some(Value::Bool(true)));
2378 assert_eq!(block.null_count, 0);
2379 }
2380 }
2381
2382 #[test]
2383 fn test_block_zone_maps_cleared_after_evict() {
2384 let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2385 for i in 0u64..2500 {
2386 col.set(NodeId::new(i), Value::Int64(i64::try_from(i).unwrap()));
2387 }
2388 col.force_compress();
2389 assert!(!col.block_zone_maps().is_empty());
2390
2391 col.evict_values();
2392 assert!(
2393 col.block_zone_maps().is_empty(),
2394 "evict drops compressed data, so per-block stats must reset"
2395 );
2396 }
2397
2398 #[test]
2399 fn test_block_might_match_prunes_disjoint_range() {
2400 let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2401 for i in 0u64..2500 {
2402 col.set(
2403 NodeId::new(i),
2404 Value::Int64(1000 + i64::try_from(i).unwrap()),
2405 );
2406 }
2407 col.force_compress();
2408
2409 let target = Value::Int64(5000);
2411 let blocks = col.block_zone_maps();
2412 let any_match = blocks.iter().any(|zm| zm.might_contain_equal(&target));
2413 assert!(!any_match, "no block should claim to contain 5000");
2414 }
2415
2416 #[test]
2417 fn test_storage_block_zone_maps_for_returns_blocks() {
2418 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
2419 for i in 0u64..2500 {
2420 storage.set(
2421 NodeId::new(i),
2422 PropertyKey::new("age"),
2423 Value::Int64(20 + i64::try_from(i).unwrap()),
2424 );
2425 }
2426 storage.force_compress_all();
2427
2428 let blocks = storage
2429 .block_zone_maps_for(&PropertyKey::new("age"))
2430 .expect("compressed column must expose block stats");
2431 assert_eq!(blocks.len(), 3);
2432 assert_eq!(blocks[0].min, Some(Value::Int64(20)));
2433 assert_eq!(blocks.last().unwrap().max, Some(Value::Int64(2519)));
2434 }
2435
2436 #[test]
2437 fn test_storage_block_zone_maps_for_missing_column_returns_none() {
2438 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
2439 assert!(
2440 storage
2441 .block_zone_maps_for(&PropertyKey::new("missing"))
2442 .is_none()
2443 );
2444 }
2445
2446 #[test]
2447 fn test_compute_block_zone_maps_float64_finite_min_max() {
2448 let values: Vec<Value> = (0u32..2500)
2452 .map(|i| Value::Float64(f64::from(i) * 0.5))
2453 .collect();
2454 let blocks = compute_block_zone_maps(values);
2455
2456 assert_eq!(blocks.len(), 3);
2457 assert_eq!(blocks[0].row_count, 1024);
2458 assert_eq!(blocks[0].min, Some(Value::Float64(0.0)));
2459 assert_eq!(blocks[0].max, Some(Value::Float64(1023.0 * 0.5)));
2460 assert_eq!(blocks[1].min, Some(Value::Float64(1024.0 * 0.5)));
2461 }
2462
2463 #[test]
2466 fn test_block_count_zero_for_uncompressed_column() {
2467 let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2468 for i in 0u64..50 {
2469 col.set(NodeId::new(i), Value::Int64(i64::try_from(i).unwrap()));
2470 }
2471 assert_eq!(col.block_count(), 0);
2472 }
2473
2474 #[test]
2475 fn test_block_count_matches_zone_maps_after_compression() {
2476 let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2477 for i in 0u64..2500 {
2478 col.set(NodeId::new(i), Value::Int64(i64::try_from(i).unwrap()));
2479 }
2480 col.force_compress();
2481 assert_eq!(col.block_count(), col.block_zone_maps().len());
2482 assert_eq!(col.block_count(), 3);
2483 }
2484
2485 #[test]
2486 fn test_decode_block_returns_correct_pairs_integer() {
2487 let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2488 for i in 0u64..2500 {
2489 col.set(
2490 NodeId::new(i),
2491 Value::Int64(1000 + i64::try_from(i).unwrap()),
2492 );
2493 }
2494 col.force_compress();
2495
2496 let block0 = col.decode_block(0).expect("block 0 must exist");
2497 assert_eq!(block0.entries.len(), 1024);
2498 assert_eq!(block0.entries[0], (NodeId::new(0), Value::Int64(1000)));
2499 assert_eq!(
2500 block0.entries[1023],
2501 (NodeId::new(1023), Value::Int64(2023))
2502 );
2503 assert_eq!(block0.zone_map.row_count, 1024);
2504
2505 let block2 = col.decode_block(2).expect("block 2 must exist");
2506 assert_eq!(block2.entries.len(), 452);
2507 assert_eq!(block2.entries[0], (NodeId::new(2048), Value::Int64(3048)));
2508 }
2509
2510 #[test]
2511 fn test_decode_block_returns_correct_pairs_boolean() {
2512 let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2513 for i in 0u64..2500 {
2514 col.set(NodeId::new(i), Value::Bool(i % 2 == 0));
2515 }
2516 col.force_compress();
2517
2518 let block0 = col.decode_block(0).expect("block 0 must exist");
2519 assert_eq!(block0.entries.len(), 1024);
2520 assert_eq!(block0.entries[0], (NodeId::new(0), Value::Bool(true)));
2521 assert_eq!(block0.entries[1], (NodeId::new(1), Value::Bool(false)));
2522 }
2523
2524 #[test]
2525 fn test_decode_block_returns_correct_pairs_string() {
2526 let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2527 let strings = ["alpha", "bravo", "charlie", "delta"];
2528 for i in 0u64..2500 {
2529 col.set(
2530 NodeId::new(i),
2531 Value::String(ArcStr::from(strings[(i % 4) as usize])),
2532 );
2533 }
2534 col.force_compress();
2535
2536 let block0 = col.decode_block(0).expect("block 0 must exist");
2537 assert_eq!(block0.entries.len(), 1024);
2538 assert_eq!(
2539 block0.entries[0],
2540 (NodeId::new(0), Value::String(ArcStr::from("alpha")))
2541 );
2542 assert_eq!(
2543 block0.entries[3],
2544 (NodeId::new(3), Value::String(ArcStr::from("delta")))
2545 );
2546 }
2547
2548 #[test]
2549 fn test_decode_block_out_of_range_returns_none() {
2550 let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2551 for i in 0u64..2500 {
2552 col.set(NodeId::new(i), Value::Int64(i64::try_from(i).unwrap()));
2553 }
2554 col.force_compress();
2555
2556 assert!(col.decode_block(99).is_none());
2557 }
2558
2559 #[test]
2560 fn test_decode_block_uncompressed_returns_none() {
2561 let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2562 for i in 0u64..50 {
2563 col.set(NodeId::new(i), Value::Int64(i64::try_from(i).unwrap()));
2564 }
2565 assert!(col.decode_block(0).is_none());
2567 }
2568
2569 #[test]
2570 fn test_iter_decoded_blocks_yields_all_blocks() {
2571 let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
2572 for i in 0u64..2500 {
2573 col.set(NodeId::new(i), Value::Int64(i64::try_from(i).unwrap()));
2574 }
2575 col.force_compress();
2576
2577 let blocks: Vec<_> = col.iter_decoded_blocks().collect();
2578 assert_eq!(blocks.len(), 3);
2579 let total_rows: usize = blocks.iter().map(|b| b.entries.len()).sum();
2580 assert_eq!(total_rows, 2500);
2581 }
2582
2583 #[test]
2584 fn test_compute_block_zone_maps_float64_nan_does_not_poison() {
2585 let mut values: Vec<Value> = vec![Value::Float64(f64::NAN)];
2589 values.extend((0u32..50).map(|i| Value::Float64(f64::from(i))));
2590 values.push(Value::Float64(f64::NAN));
2591
2592 let blocks = compute_block_zone_maps(values);
2593 assert_eq!(blocks.len(), 1);
2594 assert_eq!(blocks[0].row_count, 52, "NaN values still count as rows");
2595 assert_eq!(blocks[0].null_count, 0, "NaN is not null");
2596 assert_eq!(blocks[0].min, Some(Value::Float64(0.0)));
2597 assert_eq!(blocks[0].max, Some(Value::Float64(49.0)));
2598 }
2599}