1use crate::codec::CompressionCodec;
24#[cfg(not(feature = "temporal"))]
25use crate::codec::{CompressedData, DictionaryBuilder, DictionaryEncoding, TypeSpecificCompressor};
26use crate::index::zone_map::ZoneMapEntry;
27#[cfg(not(feature = "temporal"))]
28use arcstr::ArcStr;
29#[cfg(feature = "temporal")]
30use grafeo_common::temporal::VersionLog;
31#[cfg(feature = "temporal")]
32use grafeo_common::types::EpochId;
33use grafeo_common::types::{EdgeId, NodeId, PropertyKey, Value};
34use grafeo_common::utils::hash::FxHashMap;
35use parking_lot::RwLock;
36use std::cmp::Ordering;
37use std::hash::Hash;
38use std::marker::PhantomData;
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
42#[non_exhaustive]
43pub enum CompressionMode {
44 #[default]
46 None,
47 Auto,
49 Eager,
51}
52
53#[cfg(not(feature = "temporal"))]
55const COMPRESSION_THRESHOLD: usize = 1000;
56
57#[cfg(not(feature = "temporal"))]
62const HOT_BUFFER_SIZE: usize = 4096;
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68#[non_exhaustive]
69pub enum CompareOp {
70 Eq,
72 Ne,
74 Lt,
76 Le,
78 Gt,
80 Ge,
82}
83
84pub trait EntityId: Copy + Eq + Hash + 'static {
89 fn as_u64(self) -> u64;
91 fn from_u64(v: u64) -> Self;
93}
94
95impl EntityId for NodeId {
96 #[inline]
97 fn as_u64(self) -> u64 {
98 self.0
99 }
100 #[inline]
101 fn from_u64(v: u64) -> Self {
102 Self(v)
103 }
104}
105
106impl EntityId for EdgeId {
107 #[inline]
108 fn as_u64(self) -> u64 {
109 self.0
110 }
111 #[inline]
112 fn from_u64(v: u64) -> Self {
113 Self(v)
114 }
115}
116
117pub struct PropertyStorage<Id: EntityId = NodeId> {
145 columns: RwLock<FxHashMap<PropertyKey, PropertyColumn<Id>>>,
148 default_compression: CompressionMode,
150 _marker: PhantomData<Id>,
151}
152
153impl<Id: EntityId> PropertyStorage<Id> {
154 #[must_use]
156 pub fn new() -> Self {
157 Self {
158 columns: RwLock::new(FxHashMap::default()),
159 default_compression: CompressionMode::None,
160 _marker: PhantomData,
161 }
162 }
163
164 #[must_use]
166 pub fn with_compression(mode: CompressionMode) -> Self {
167 Self {
168 columns: RwLock::new(FxHashMap::default()),
169 default_compression: mode,
170 _marker: PhantomData,
171 }
172 }
173
174 pub fn set_default_compression(&mut self, mode: CompressionMode) {
176 self.default_compression = mode;
177 }
178
179 #[cfg(not(feature = "temporal"))]
181 pub fn set(&self, id: Id, key: PropertyKey, value: Value) {
182 let mut columns = self.columns.write();
183 let mode = self.default_compression;
184 columns
185 .entry(key)
186 .or_insert_with(|| PropertyColumn::with_compression(mode))
187 .set(id, value);
188 }
189
190 #[cfg(feature = "temporal")]
195 pub fn set(&self, id: Id, key: PropertyKey, value: Value, epoch: EpochId) {
196 let mut columns = self.columns.write();
197 let mode = self.default_compression;
198 columns
199 .entry(key)
200 .or_insert_with(|| PropertyColumn::with_compression(mode))
201 .set(id, value, epoch);
202 }
203
204 pub fn enable_compression(&self, key: &PropertyKey, mode: CompressionMode) {
206 let mut columns = self.columns.write();
207 if let Some(col) = columns.get_mut(key) {
208 col.set_compression_mode(mode);
209 }
210 }
211
212 pub fn compress_all(&self) {
214 let mut columns = self.columns.write();
215 for col in columns.values_mut() {
216 if col.compression_mode() != CompressionMode::None {
217 col.compress();
218 }
219 }
220 }
221
222 pub fn force_compress_all(&self) {
224 let mut columns = self.columns.write();
225 for col in columns.values_mut() {
226 col.force_compress();
227 }
228 }
229
230 #[must_use]
232 pub fn compression_stats(&self) -> FxHashMap<PropertyKey, CompressionStats> {
233 let columns = self.columns.read();
234 columns
235 .iter()
236 .map(|(key, col)| (key.clone(), col.compression_stats()))
237 .collect()
238 }
239
240 #[must_use]
242 pub fn memory_usage(&self) -> usize {
243 let columns = self.columns.read();
244 columns
245 .values()
246 .map(|col| col.compression_stats().compressed_size)
247 .sum()
248 }
249
250 #[must_use]
252 pub fn heap_memory_bytes(&self) -> usize {
253 let columns = self.columns.read();
254 let map_overhead = columns.capacity()
256 * (std::mem::size_of::<PropertyKey>() + std::mem::size_of::<PropertyColumn<Id>>() + 1);
257 let column_bytes: usize = columns.values().map(|col| col.heap_memory_bytes()).sum();
259 map_overhead + column_bytes
260 }
261
262 #[must_use]
264 pub fn get(&self, id: Id, key: &PropertyKey) -> Option<Value> {
265 let columns = self.columns.read();
266 columns.get(key).and_then(|col| col.get(id))
267 }
268
269 #[cfg(not(feature = "temporal"))]
271 pub fn remove(&self, id: Id, key: &PropertyKey) -> Option<Value> {
272 let mut columns = self.columns.write();
273 columns.get_mut(key).and_then(|col| col.remove(id))
274 }
275
276 #[cfg(feature = "temporal")]
278 pub fn remove(&self, id: Id, key: &PropertyKey, epoch: EpochId) -> Option<Value> {
279 let mut columns = self.columns.write();
280 columns.get_mut(key).and_then(|col| col.remove(id, epoch))
281 }
282
283 #[cfg(not(feature = "temporal"))]
285 pub fn remove_all(&self, id: Id) {
286 let mut columns = self.columns.write();
287 for col in columns.values_mut() {
288 col.remove(id);
289 }
290 }
291
292 #[cfg(feature = "temporal")]
294 pub fn remove_all(&self, id: Id, epoch: EpochId) {
295 let mut columns = self.columns.write();
296 for col in columns.values_mut() {
297 col.remove(id, epoch);
298 }
299 }
300
301 #[must_use]
303 pub fn get_all(&self, id: Id) -> FxHashMap<PropertyKey, Value> {
304 let columns = self.columns.read();
305 let mut result = FxHashMap::default();
306 for (key, col) in columns.iter() {
307 if let Some(value) = col.get(id) {
308 result.insert(key.clone(), value);
309 }
310 }
311 result
312 }
313
314 #[must_use]
333 pub fn get_batch(&self, ids: &[Id], key: &PropertyKey) -> Vec<Option<Value>> {
334 let columns = self.columns.read();
335 match columns.get(key) {
336 Some(col) => ids.iter().map(|&id| col.get(id)).collect(),
337 None => vec![None; ids.len()],
338 }
339 }
340
341 #[must_use]
359 pub fn get_all_batch(&self, ids: &[Id]) -> Vec<FxHashMap<PropertyKey, Value>> {
360 let columns = self.columns.read();
361 let column_count = columns.len();
362
363 let mut results = Vec::with_capacity(ids.len());
365
366 for &id in ids {
367 let mut result = FxHashMap::with_capacity_and_hasher(column_count, Default::default());
369 for (key, col) in columns.iter() {
370 if let Some(value) = col.get(id) {
371 result.insert(key.clone(), value);
372 }
373 }
374 results.push(result);
375 }
376
377 results
378 }
379
380 #[must_use]
403 pub fn get_selective_batch(
404 &self,
405 ids: &[Id],
406 keys: &[PropertyKey],
407 ) -> Vec<FxHashMap<PropertyKey, Value>> {
408 if keys.is_empty() {
409 return vec![FxHashMap::default(); ids.len()];
411 }
412
413 let columns = self.columns.read();
414
415 let requested_columns: Vec<_> = keys
417 .iter()
418 .filter_map(|key| columns.get(key).map(|col| (key, col)))
419 .collect();
420
421 let mut results = Vec::with_capacity(ids.len());
423
424 for &id in ids {
425 let mut result =
426 FxHashMap::with_capacity_and_hasher(requested_columns.len(), Default::default());
427 for (key, col) in &requested_columns {
429 if let Some(value) = col.get(id) {
430 result.insert((*key).clone(), value);
431 }
432 }
433 results.push(result);
434 }
435
436 results
437 }
438
439 #[must_use]
441 pub fn column_count(&self) -> usize {
442 self.columns.read().len()
443 }
444
445 #[must_use]
447 pub fn keys(&self) -> Vec<PropertyKey> {
448 self.columns.read().keys().cloned().collect()
449 }
450
451 pub fn clear(&self) {
453 self.columns.write().clear();
454 }
455
456 #[cfg(not(feature = "temporal"))]
463 pub fn evict_column(&self, key: &PropertyKey) -> (usize, usize) {
464 let mut columns = self.columns.write();
465 if let Some(column) = columns.get_mut(key) {
466 column.evict_values()
467 } else {
468 (0, 0)
469 }
470 }
471
472 #[cfg(not(feature = "temporal"))]
477 pub fn restore_column(&self, key: &PropertyKey, values: impl Iterator<Item = (Id, Value)>) {
478 let mut columns = self.columns.write();
479 let column = columns
480 .entry(key.clone())
481 .or_insert_with(|| PropertyColumn::with_compression(self.default_compression));
482 column.restore_values(values);
483 }
484
485 #[cfg(not(feature = "temporal"))]
490 pub fn drain_column(&self, key: &PropertyKey) -> Vec<(Id, Value)> {
491 let mut columns = self.columns.write();
492 if let Some(column) = columns.get_mut(key) {
493 column.drain_values()
494 } else {
495 Vec::new()
496 }
497 }
498
499 #[cfg(not(feature = "temporal"))]
501 #[must_use]
502 pub fn is_column_spilled(&self, key: &PropertyKey) -> bool {
503 self.columns
504 .read()
505 .get(key)
506 .is_some_and(|col| col.is_spilled())
507 }
508
509 #[cfg(not(feature = "temporal"))]
514 pub fn mark_column_spilled(&self, key: &PropertyKey) {
515 let mut columns = self.columns.write();
516 let column = columns.entry(key.clone()).or_default();
517 column.mark_spilled();
518 }
519
520 #[must_use]
522 pub fn column(&self, key: &PropertyKey) -> Option<PropertyColumnRef<'_, Id>> {
523 let columns = self.columns.read();
524 if columns.contains_key(key) {
525 Some(PropertyColumnRef {
526 _guard: columns,
527 _key: key.clone(),
528 _marker: PhantomData,
529 })
530 } else {
531 None
532 }
533 }
534
535 #[must_use]
541 pub fn might_match(&self, key: &PropertyKey, op: CompareOp, value: &Value) -> bool {
542 let columns = self.columns.read();
543 columns
544 .get(key)
545 .map_or(true, |col| col.might_match(op, value)) }
547
548 #[must_use]
550 pub fn zone_map(&self, key: &PropertyKey) -> Option<ZoneMapEntry> {
551 let columns = self.columns.read();
552 columns.get(key).map(|col| col.zone_map().clone())
553 }
554
555 #[must_use]
560 pub fn might_match_range(
561 &self,
562 key: &PropertyKey,
563 min: Option<&Value>,
564 max: Option<&Value>,
565 min_inclusive: bool,
566 max_inclusive: bool,
567 ) -> bool {
568 let columns = self.columns.read();
569 columns.get(key).map_or(true, |col| {
570 col.zone_map()
571 .might_contain_range(min, max, min_inclusive, max_inclusive)
572 }) }
574
575 pub fn rebuild_zone_maps(&self) {
577 let mut columns = self.columns.write();
578 for col in columns.values_mut() {
579 col.rebuild_zone_map();
580 }
581 }
582}
583
584impl<Id: EntityId> Default for PropertyStorage<Id> {
585 fn default() -> Self {
586 Self::new()
587 }
588}
589
590#[cfg(feature = "temporal")]
592impl<Id: EntityId> PropertyStorage<Id> {
593 pub(crate) fn columns_write(
595 &self,
596 ) -> parking_lot::RwLockWriteGuard<'_, FxHashMap<PropertyKey, PropertyColumn<Id>>> {
597 self.columns.write()
598 }
599
600 #[must_use]
602 pub fn get_at(&self, id: Id, key: &PropertyKey, epoch: EpochId) -> Option<Value> {
603 let columns = self.columns.read();
604 columns.get(key).and_then(|col| col.get_at(id, epoch))
605 }
606
607 #[must_use]
609 pub fn get_all_at(&self, id: Id, epoch: EpochId) -> FxHashMap<PropertyKey, Value> {
610 let columns = self.columns.read();
611 let mut result = FxHashMap::default();
612 for (key, col) in columns.iter() {
613 if let Some(value) = col.get_at(id, epoch) {
614 result.insert(key.clone(), value);
615 }
616 }
617 result
618 }
619
620 pub fn finalize_pending(&self, real_epoch: EpochId) {
622 let mut columns = self.columns.write();
623 for col in columns.values_mut() {
624 col.finalize_pending(real_epoch);
625 }
626 }
627
628 pub fn remove_pending(&self) {
630 let mut columns = self.columns.write();
631 for col in columns.values_mut() {
632 col.remove_pending();
633 }
634 }
635
636 pub fn gc(&self, min_epoch: EpochId) {
638 let mut columns = self.columns.write();
639 for col in columns.values_mut() {
640 col.gc(min_epoch);
641 }
642 }
643
644 #[must_use]
649 pub fn get_all_history(&self, id: Id) -> Vec<(PropertyKey, Vec<(EpochId, Value)>)> {
650 let columns = self.columns.read();
651 let mut result = Vec::new();
652 for (key, col) in columns.iter() {
653 if let Some(log) = col.values.get(&id) {
654 let entries: Vec<(EpochId, Value)> = log
655 .history()
656 .iter()
657 .map(|(epoch, value)| (*epoch, value.clone()))
658 .collect();
659 if !entries.is_empty() {
660 result.push((key.clone(), entries));
661 }
662 }
663 }
664 result
665 }
666
667 #[must_use]
671 pub fn get_history(&self, id: Id, key: &PropertyKey) -> Vec<(EpochId, Value)> {
672 let columns = self.columns.read();
673 columns
674 .get(key)
675 .and_then(|col| col.values.get(&id))
676 .map(|log| log.history().iter().map(|(e, v)| (*e, v.clone())).collect())
677 .unwrap_or_default()
678 }
679}
680
681#[cfg(not(feature = "temporal"))]
686#[derive(Debug)]
687#[non_exhaustive]
688pub enum CompressedColumnData {
689 Integers {
691 data: CompressedData,
693 id_to_index: Vec<u64>,
695 index_to_id: Vec<u64>,
697 },
698 Strings {
700 encoding: DictionaryEncoding,
702 id_to_index: Vec<u64>,
704 index_to_id: Vec<u64>,
706 },
707 Booleans {
709 data: CompressedData,
711 id_to_index: Vec<u64>,
713 index_to_id: Vec<u64>,
715 },
716}
717
718#[cfg(not(feature = "temporal"))]
719impl CompressedColumnData {
720 #[must_use]
722 pub fn memory_usage(&self) -> usize {
723 match self {
724 CompressedColumnData::Integers {
725 data,
726 id_to_index,
727 index_to_id,
728 } => {
729 data.data.len()
730 + id_to_index.len() * std::mem::size_of::<u64>()
731 + index_to_id.len() * std::mem::size_of::<u64>()
732 }
733 CompressedColumnData::Strings {
734 encoding,
735 id_to_index,
736 index_to_id,
737 } => {
738 encoding.codes().len() * std::mem::size_of::<u32>()
739 + encoding.dictionary().iter().map(|s| s.len()).sum::<usize>()
740 + id_to_index.len() * std::mem::size_of::<u64>()
741 + index_to_id.len() * std::mem::size_of::<u64>()
742 }
743 CompressedColumnData::Booleans {
744 data,
745 id_to_index,
746 index_to_id,
747 } => {
748 data.data.len()
749 + id_to_index.len() * std::mem::size_of::<u64>()
750 + index_to_id.len() * std::mem::size_of::<u64>()
751 }
752 }
753 }
754}
755
756#[derive(Debug, Clone, Default)]
758pub struct CompressionStats {
759 pub uncompressed_size: usize,
761 pub compressed_size: usize,
763 pub value_count: usize,
765 pub codec: Option<CompressionCodec>,
767}
768
769impl CompressionStats {
770 #[must_use]
772 pub fn compression_ratio(&self) -> f64 {
773 if self.compressed_size == 0 {
774 return 1.0;
775 }
776 self.uncompressed_size as f64 / self.compressed_size as f64
777 }
778}
779
780pub struct PropertyColumn<Id: EntityId = NodeId> {
790 #[cfg(not(feature = "temporal"))]
793 values: FxHashMap<Id, Value>,
794 #[cfg(feature = "temporal")]
797 values: FxHashMap<Id, VersionLog<Value>>,
798 zone_map: ZoneMapEntry,
800 zone_map_dirty: bool,
802 compression_mode: CompressionMode,
804 #[cfg(not(feature = "temporal"))]
806 compressed: Option<CompressedColumnData>,
807 #[cfg(not(feature = "temporal"))]
809 compressed_count: usize,
810 #[cfg(not(feature = "temporal"))]
815 spilled: bool,
816}
817
818#[cfg(not(feature = "temporal"))]
819impl<Id: EntityId> PropertyColumn<Id> {
820 #[must_use]
822 pub fn new() -> Self {
823 Self {
824 values: FxHashMap::default(),
825 zone_map: ZoneMapEntry::new(),
826 zone_map_dirty: false,
827 compression_mode: CompressionMode::None,
828 compressed: None,
829 compressed_count: 0,
830 spilled: false,
831 }
832 }
833
834 #[must_use]
836 pub fn with_compression(mode: CompressionMode) -> Self {
837 Self {
838 values: FxHashMap::default(),
839 zone_map: ZoneMapEntry::new(),
840 zone_map_dirty: false,
841 compression_mode: mode,
842 compressed: None,
843 compressed_count: 0,
844 spilled: false,
845 }
846 }
847
848 pub fn set_compression_mode(&mut self, mode: CompressionMode) {
850 self.compression_mode = mode;
851 if mode == CompressionMode::None {
852 if self.compressed.is_some() {
854 self.decompress_all();
855 }
856 }
857 }
858
859 #[must_use]
861 pub fn compression_mode(&self) -> CompressionMode {
862 self.compression_mode
863 }
864
865 pub fn set(&mut self, id: Id, value: Value) {
867 self.update_zone_map_on_insert(&value);
869 self.values.insert(id, value);
870
871 if self.compression_mode == CompressionMode::Auto {
873 let total_count = self.values.len() + self.compressed_count;
874 let hot_buffer_count = self.values.len();
875
876 if hot_buffer_count >= HOT_BUFFER_SIZE && total_count >= COMPRESSION_THRESHOLD {
878 self.compress();
879 }
880 }
881 }
882
883 fn update_zone_map_on_insert(&mut self, value: &Value) {
885 self.zone_map.row_count += 1;
886
887 if matches!(value, Value::Null) {
888 self.zone_map.null_count += 1;
889 return;
890 }
891
892 match &self.zone_map.min {
894 None => self.zone_map.min = Some(value.clone()),
895 Some(current) => {
896 if compare_values(value, current) == Some(Ordering::Less) {
897 self.zone_map.min = Some(value.clone());
898 }
899 }
900 }
901
902 match &self.zone_map.max {
904 None => self.zone_map.max = Some(value.clone()),
905 Some(current) => {
906 if compare_values(value, current) == Some(Ordering::Greater) {
907 self.zone_map.max = Some(value.clone());
908 }
909 }
910 }
911 }
912
913 #[must_use]
918 pub fn get(&self, id: Id) -> Option<Value> {
919 if let Some(value) = self.values.get(&id) {
921 return Some(value.clone());
922 }
923
924 None
929 }
930
931 pub fn remove(&mut self, id: Id) -> Option<Value> {
933 let removed = self.values.remove(&id);
934 if removed.is_some() {
935 self.zone_map_dirty = true;
937 }
938 removed
939 }
940
941 pub fn mark_spilled(&mut self) {
947 self.spilled = true;
948 }
949
950 #[must_use]
956 pub fn is_spilled(&self) -> bool {
957 self.spilled
958 }
959
960 pub fn evict_values(&mut self) -> (usize, usize) {
967 let count = self.values.len();
968 let freed_bytes = self.heap_memory_bytes();
969 self.values.clear();
970 self.values.shrink_to_fit();
971 self.compressed = None;
972 self.compressed_count = 0;
973 self.spilled = true;
974 (count, freed_bytes)
975 }
976
977 pub fn drain_values(&mut self) -> Vec<(Id, Value)> {
982 let drained: Vec<(Id, Value)> = self.values.drain().collect();
983 self.values.shrink_to_fit();
984 self.compressed = None;
985 self.compressed_count = 0;
986 self.spilled = true;
987 drained
988 }
989
990 pub fn restore_values(&mut self, values: impl Iterator<Item = (Id, Value)>) {
995 self.spilled = false;
996 for (id, value) in values {
1000 self.values.insert(id, value);
1001 }
1002 }
1003
1004 #[must_use]
1006 pub fn len(&self) -> usize {
1007 self.values.len() + self.compressed_count
1008 }
1009
1010 #[cfg(test)]
1012 #[must_use]
1013 pub fn is_empty(&self) -> bool {
1014 self.values.is_empty() && self.compressed_count == 0
1015 }
1016
1017 #[must_use]
1019 pub fn compression_stats(&self) -> CompressionStats {
1020 let hot_size = self.values.len() * std::mem::size_of::<Value>();
1021 let compressed_size = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
1022 let codec = match &self.compressed {
1023 Some(CompressedColumnData::Integers { data, .. }) => Some(data.codec),
1024 Some(CompressedColumnData::Strings { .. }) => Some(CompressionCodec::Dictionary),
1025 Some(CompressedColumnData::Booleans { data, .. }) => Some(data.codec),
1026 None => None,
1027 };
1028
1029 CompressionStats {
1030 uncompressed_size: hot_size + self.compressed_count * std::mem::size_of::<Value>(),
1031 compressed_size: hot_size + compressed_size,
1032 value_count: self.len(),
1033 codec,
1034 }
1035 }
1036
1037 #[must_use]
1042 pub fn heap_memory_bytes(&self) -> usize {
1043 let hot_bytes =
1045 self.values.capacity() * (std::mem::size_of::<Id>() + std::mem::size_of::<Value>() + 1);
1046 let compressed_bytes = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
1048 hot_bytes + compressed_bytes
1050 }
1051
1052 #[must_use]
1054 #[cfg(test)]
1055 pub fn is_compressed(&self) -> bool {
1056 self.compressed.is_some()
1057 }
1058
1059 pub fn compress(&mut self) {
1068 if self.values.is_empty() {
1069 return;
1070 }
1071
1072 if self.compressed.is_some() {
1075 return;
1076 }
1077
1078 let (int_count, str_count, bool_count) = self.count_types();
1080 let total = self.values.len();
1081
1082 if int_count > total / 2 {
1083 self.compress_as_integers();
1084 } else if str_count > total / 2 {
1085 self.compress_as_strings();
1086 } else if bool_count > total / 2 {
1087 self.compress_as_booleans();
1088 }
1089 }
1091
1092 fn count_types(&self) -> (usize, usize, usize) {
1094 let mut int_count = 0;
1095 let mut str_count = 0;
1096 let mut bool_count = 0;
1097
1098 for value in self.values.values() {
1099 match value {
1100 Value::Int64(_) => int_count += 1,
1101 Value::String(_) => str_count += 1,
1102 Value::Bool(_) => bool_count += 1,
1103 _ => {}
1104 }
1105 }
1106
1107 (int_count, str_count, bool_count)
1108 }
1109
1110 fn compress_as_integers(&mut self) {
1112 let mut values: Vec<(u64, i64)> = Vec::new();
1114 let mut non_int_values: FxHashMap<Id, Value> = FxHashMap::default();
1115
1116 for (&id, value) in &self.values {
1117 match value {
1118 Value::Int64(v) => {
1119 let id_u64 = id.as_u64();
1120 values.push((id_u64, *v));
1121 }
1122 _ => {
1123 non_int_values.insert(id, value.clone());
1124 }
1125 }
1126 }
1127
1128 if values.len() < 8 {
1129 return;
1131 }
1132
1133 values.sort_by_key(|(id, _)| *id);
1135
1136 let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
1137 let index_to_id: Vec<u64> = id_to_index.clone();
1138 let int_values: Vec<i64> = values.iter().map(|(_, v)| *v).collect();
1139
1140 let Ok(compressed) = TypeSpecificCompressor::compress_signed_integers(&int_values) else {
1142 return;
1143 };
1144
1145 if compressed.compression_ratio() > 1.2 {
1147 self.compressed = Some(CompressedColumnData::Integers {
1148 data: compressed,
1149 id_to_index,
1150 index_to_id,
1151 });
1152 self.compressed_count = values.len();
1153 self.values = non_int_values;
1154 }
1155 }
1156
1157 fn compress_as_strings(&mut self) {
1159 let mut values: Vec<(u64, ArcStr)> = Vec::new();
1160 let mut non_str_values: FxHashMap<Id, Value> = FxHashMap::default();
1161
1162 for (&id, value) in &self.values {
1163 match value {
1164 Value::String(s) => {
1165 values.push((id.as_u64(), s.clone()));
1166 }
1167 _ => {
1168 non_str_values.insert(id, value.clone());
1169 }
1170 }
1171 }
1172
1173 if values.len() < 8 {
1174 return;
1175 }
1176
1177 values.sort_by_key(|(id, _)| *id);
1179
1180 let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
1181 let index_to_id: Vec<u64> = id_to_index.clone();
1182
1183 let mut builder = DictionaryBuilder::new();
1185 for (_, s) in &values {
1186 builder.add(s.as_ref());
1187 }
1188 let encoding = builder.build();
1189
1190 if encoding.compression_ratio() > 1.2 {
1192 self.compressed = Some(CompressedColumnData::Strings {
1193 encoding,
1194 id_to_index,
1195 index_to_id,
1196 });
1197 self.compressed_count = values.len();
1198 self.values = non_str_values;
1199 }
1200 }
1201
1202 fn compress_as_booleans(&mut self) {
1204 let mut values: Vec<(u64, bool)> = Vec::new();
1205 let mut non_bool_values: FxHashMap<Id, Value> = FxHashMap::default();
1206
1207 for (&id, value) in &self.values {
1208 match value {
1209 Value::Bool(b) => {
1210 values.push((id.as_u64(), *b));
1211 }
1212 _ => {
1213 non_bool_values.insert(id, value.clone());
1214 }
1215 }
1216 }
1217
1218 if values.len() < 8 {
1219 return;
1220 }
1221
1222 values.sort_by_key(|(id, _)| *id);
1224
1225 let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
1226 let index_to_id: Vec<u64> = id_to_index.clone();
1227 let bool_values: Vec<bool> = values.iter().map(|(_, v)| *v).collect();
1228
1229 let Ok(compressed) = TypeSpecificCompressor::compress_booleans(&bool_values) else {
1230 return;
1231 };
1232
1233 self.compressed = Some(CompressedColumnData::Booleans {
1235 data: compressed,
1236 id_to_index,
1237 index_to_id,
1238 });
1239 self.compressed_count = values.len();
1240 self.values = non_bool_values;
1241 }
1242
1243 fn decompress_all(&mut self) {
1245 let Some(compressed) = self.compressed.take() else {
1246 return;
1247 };
1248
1249 match compressed {
1250 CompressedColumnData::Integers {
1251 data, index_to_id, ..
1252 } => {
1253 if let Ok(values) = TypeSpecificCompressor::decompress_integers(&data) {
1254 let signed: Vec<i64> = values
1256 .iter()
1257 .map(|&v| crate::codec::zigzag_decode(v))
1258 .collect();
1259
1260 for (i, id_u64) in index_to_id.iter().enumerate() {
1261 if let Some(&value) = signed.get(i) {
1262 let id = Id::from_u64(*id_u64);
1263 self.values.insert(id, Value::Int64(value));
1264 }
1265 }
1266 }
1267 }
1268 CompressedColumnData::Strings {
1269 encoding,
1270 index_to_id,
1271 ..
1272 } => {
1273 for (i, id_u64) in index_to_id.iter().enumerate() {
1274 if let Some(s) = encoding.get(i) {
1275 let id = Id::from_u64(*id_u64);
1276 self.values.insert(id, Value::String(ArcStr::from(s)));
1277 }
1278 }
1279 }
1280 CompressedColumnData::Booleans {
1281 data, index_to_id, ..
1282 } => {
1283 if let Ok(values) = TypeSpecificCompressor::decompress_booleans(&data) {
1284 for (i, id_u64) in index_to_id.iter().enumerate() {
1285 if let Some(&value) = values.get(i) {
1286 let id = Id::from_u64(*id_u64);
1287 self.values.insert(id, Value::Bool(value));
1288 }
1289 }
1290 }
1291 }
1292 }
1293
1294 self.compressed_count = 0;
1295 }
1296
1297 pub fn force_compress(&mut self) {
1301 self.compress();
1302 }
1303
1304 #[must_use]
1306 pub fn zone_map(&self) -> &ZoneMapEntry {
1307 &self.zone_map
1308 }
1309
1310 #[must_use]
1315 pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
1316 if self.zone_map_dirty {
1317 return true;
1319 }
1320
1321 match op {
1322 CompareOp::Eq => self.zone_map.might_contain_equal(value),
1323 CompareOp::Ne => {
1324 match (&self.zone_map.min, &self.zone_map.max) {
1327 (Some(min), Some(max)) => {
1328 !(compare_values(min, value) == Some(Ordering::Equal)
1329 && compare_values(max, value) == Some(Ordering::Equal))
1330 }
1331 _ => true,
1332 }
1333 }
1334 CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
1335 CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
1336 CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
1337 CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
1338 }
1339 }
1340
1341 pub fn rebuild_zone_map(&mut self) {
1343 let mut zone_map = ZoneMapEntry::new();
1344
1345 for value in self.values.values() {
1346 zone_map.row_count += 1;
1347
1348 if matches!(value, Value::Null) {
1349 zone_map.null_count += 1;
1350 continue;
1351 }
1352
1353 match &zone_map.min {
1355 None => zone_map.min = Some(value.clone()),
1356 Some(current) => {
1357 if compare_values(value, current) == Some(Ordering::Less) {
1358 zone_map.min = Some(value.clone());
1359 }
1360 }
1361 }
1362
1363 match &zone_map.max {
1365 None => zone_map.max = Some(value.clone()),
1366 Some(current) => {
1367 if compare_values(value, current) == Some(Ordering::Greater) {
1368 zone_map.max = Some(value.clone());
1369 }
1370 }
1371 }
1372 }
1373
1374 self.zone_map = zone_map;
1375 self.zone_map_dirty = false;
1376 }
1377}
1378
1379#[cfg(feature = "temporal")]
1393impl<Id: EntityId> PropertyColumn<Id> {
1394 #[must_use]
1396 pub fn new() -> Self {
1397 Self {
1398 values: FxHashMap::default(),
1399 zone_map: ZoneMapEntry::new(),
1400 zone_map_dirty: false,
1401 compression_mode: CompressionMode::None,
1402 }
1403 }
1404
1405 #[must_use]
1407 pub fn with_compression(mode: CompressionMode) -> Self {
1408 Self {
1409 values: FxHashMap::default(),
1410 zone_map: ZoneMapEntry::new(),
1411 zone_map_dirty: false,
1412 compression_mode: mode,
1413 }
1414 }
1415
1416 pub fn set_compression_mode(&mut self, mode: CompressionMode) {
1418 self.compression_mode = mode;
1419 }
1420
1421 #[must_use]
1423 pub fn compression_mode(&self) -> CompressionMode {
1424 self.compression_mode
1425 }
1426
1427 pub fn set(&mut self, id: Id, value: Value, epoch: EpochId) {
1432 self.update_zone_map_on_insert(&value);
1433 self.values.entry(id).or_default().append(epoch, value);
1434 }
1435
1436 fn update_zone_map_on_insert(&mut self, value: &Value) {
1438 self.zone_map.row_count += 1;
1439
1440 if matches!(value, Value::Null) {
1441 self.zone_map.null_count += 1;
1442 return;
1443 }
1444
1445 match &self.zone_map.min {
1446 None => self.zone_map.min = Some(value.clone()),
1447 Some(current) => {
1448 if compare_values(value, current) == Some(Ordering::Less) {
1449 self.zone_map.min = Some(value.clone());
1450 }
1451 }
1452 }
1453
1454 match &self.zone_map.max {
1455 None => self.zone_map.max = Some(value.clone()),
1456 Some(current) => {
1457 if compare_values(value, current) == Some(Ordering::Greater) {
1458 self.zone_map.max = Some(value.clone());
1459 }
1460 }
1461 }
1462 }
1463
1464 #[must_use]
1466 pub fn get(&self, id: Id) -> Option<Value> {
1467 self.values
1468 .get(&id)
1469 .and_then(|log| log.latest())
1470 .filter(|v| !v.is_null())
1471 .cloned()
1472 }
1473
1474 pub fn remove(&mut self, id: Id, epoch: EpochId) -> Option<Value> {
1476 let previous = self.get(id);
1477 if previous.is_some() {
1478 self.values
1479 .entry(id)
1480 .or_default()
1481 .append(epoch, Value::Null);
1482 self.zone_map_dirty = true;
1483 }
1484 previous
1485 }
1486
1487 #[must_use]
1489 pub fn len(&self) -> usize {
1490 self.values
1491 .values()
1492 .filter(|log| log.latest().is_some_and(|v| !v.is_null()))
1493 .count()
1494 }
1495
1496 #[cfg(test)]
1498 #[must_use]
1499 #[allow(dead_code)]
1500 pub fn is_empty(&self) -> bool {
1501 self.len() == 0
1502 }
1503
1504 #[must_use]
1508 pub fn compression_stats(&self) -> CompressionStats {
1509 let live_count = self.len();
1510 let hot_size = live_count * std::mem::size_of::<Value>();
1511
1512 CompressionStats {
1513 uncompressed_size: hot_size,
1514 compressed_size: hot_size,
1515 value_count: live_count,
1516 codec: None,
1517 }
1518 }
1519
1520 #[must_use]
1522 pub fn heap_memory_bytes(&self) -> usize {
1523 self.values.capacity()
1524 * (std::mem::size_of::<Id>() + std::mem::size_of::<VersionLog<Value>>() + 1)
1525 }
1526
1527 pub fn compress(&mut self) {}
1529
1530 pub fn force_compress(&mut self) {}
1532
1533 #[must_use]
1535 pub fn zone_map(&self) -> &ZoneMapEntry {
1536 &self.zone_map
1537 }
1538
1539 #[must_use]
1541 pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
1542 if self.zone_map_dirty {
1543 return true;
1544 }
1545
1546 match op {
1547 CompareOp::Eq => self.zone_map.might_contain_equal(value),
1548 CompareOp::Ne => match (&self.zone_map.min, &self.zone_map.max) {
1549 (Some(min), Some(max)) => {
1550 !(compare_values(min, value) == Some(Ordering::Equal)
1551 && compare_values(max, value) == Some(Ordering::Equal))
1552 }
1553 _ => true,
1554 },
1555 CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
1556 CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
1557 CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
1558 CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
1559 }
1560 }
1561
1562 pub fn rebuild_zone_map(&mut self) {
1564 let mut zone_map = ZoneMapEntry::new();
1565
1566 for log in self.values.values() {
1567 if let Some(value) = log.latest() {
1568 zone_map.row_count += 1;
1569
1570 if matches!(value, Value::Null) {
1571 zone_map.null_count += 1;
1572 continue;
1573 }
1574
1575 match &zone_map.min {
1576 None => zone_map.min = Some(value.clone()),
1577 Some(current) => {
1578 if compare_values(value, current) == Some(Ordering::Less) {
1579 zone_map.min = Some(value.clone());
1580 }
1581 }
1582 }
1583
1584 match &zone_map.max {
1585 None => zone_map.max = Some(value.clone()),
1586 Some(current) => {
1587 if compare_values(value, current) == Some(Ordering::Greater) {
1588 zone_map.max = Some(value.clone());
1589 }
1590 }
1591 }
1592 }
1593 }
1594
1595 self.zone_map = zone_map;
1596 self.zone_map_dirty = false;
1597 }
1598
1599 #[must_use]
1603 pub fn get_at(&self, id: Id, epoch: EpochId) -> Option<Value> {
1604 self.values
1605 .get(&id)
1606 .and_then(|log| log.at(epoch))
1607 .filter(|v| !v.is_null())
1608 .cloned()
1609 }
1610
1611 pub fn finalize_pending(&mut self, real_epoch: EpochId) {
1613 for log in self.values.values_mut() {
1614 log.finalize_pending(real_epoch);
1615 }
1616 }
1617
1618 pub fn remove_pending(&mut self) {
1620 for log in self.values.values_mut() {
1621 log.remove_pending();
1622 }
1623 self.values.retain(|_, log| !log.is_empty());
1624 }
1625
1626 pub fn gc(&mut self, min_epoch: EpochId) {
1628 for log in self.values.values_mut() {
1629 log.gc(min_epoch);
1630 }
1631 self.values.retain(|_, log| !log.is_empty());
1632 }
1633
1634 pub fn remove_pending_for(&mut self, id: Id) {
1636 if let Some(log) = self.values.get_mut(&id) {
1637 log.remove_pending();
1638 if log.is_empty() {
1639 self.values.remove(&id);
1640 }
1641 }
1642 }
1643
1644 pub fn pop_n_pending_for(&mut self, id: Id, n: usize) {
1649 if let Some(log) = self.values.get_mut(&id) {
1650 log.pop_n_pending(n);
1651 if log.is_empty() {
1652 self.values.remove(&id);
1653 }
1654 }
1655 }
1656}
1657
1658fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
1660 match (a, b) {
1661 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
1662 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
1663 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
1664 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
1665 (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
1666 (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
1667 (Value::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
1668 (Value::Date(a), Value::Date(b)) => Some(a.cmp(b)),
1669 (Value::Time(a), Value::Time(b)) => Some(a.cmp(b)),
1670 _ => None,
1671 }
1672}
1673
1674impl<Id: EntityId> Default for PropertyColumn<Id> {
1675 fn default() -> Self {
1676 Self::new()
1677 }
1678}
1679
1680pub struct PropertyColumnRef<'a, Id: EntityId = NodeId> {
1684 _guard: parking_lot::RwLockReadGuard<'a, FxHashMap<PropertyKey, PropertyColumn<Id>>>,
1685 _key: PropertyKey,
1686 _marker: PhantomData<Id>,
1687}
1688
1689#[cfg(test)]
1690#[cfg(not(feature = "temporal"))]
1691mod tests {
1692 use super::*;
1693 use arcstr::ArcStr;
1694
1695 #[test]
1696 fn test_property_storage_basic() {
1697 let storage = PropertyStorage::new();
1698
1699 let node1 = NodeId::new(1);
1700 let node2 = NodeId::new(2);
1701 let name_key = PropertyKey::new("name");
1702 let age_key = PropertyKey::new("age");
1703
1704 storage.set(node1, name_key.clone(), "Alix".into());
1705 storage.set(node1, age_key.clone(), 30i64.into());
1706 storage.set(node2, name_key.clone(), "Gus".into());
1707
1708 assert_eq!(
1709 storage.get(node1, &name_key),
1710 Some(Value::String("Alix".into()))
1711 );
1712 assert_eq!(storage.get(node1, &age_key), Some(Value::Int64(30)));
1713 assert_eq!(
1714 storage.get(node2, &name_key),
1715 Some(Value::String("Gus".into()))
1716 );
1717 assert!(storage.get(node2, &age_key).is_none());
1718 }
1719
1720 #[test]
1721 fn test_property_storage_remove() {
1722 let storage = PropertyStorage::new();
1723
1724 let node = NodeId::new(1);
1725 let key = PropertyKey::new("name");
1726
1727 storage.set(node, key.clone(), "Alix".into());
1728 assert!(storage.get(node, &key).is_some());
1729
1730 let removed = storage.remove(node, &key);
1731 assert!(removed.is_some());
1732 assert!(storage.get(node, &key).is_none());
1733 }
1734
1735 #[test]
1736 fn test_property_storage_get_all() {
1737 let storage = PropertyStorage::new();
1738
1739 let node = NodeId::new(1);
1740 storage.set(node, PropertyKey::new("name"), "Alix".into());
1741 storage.set(node, PropertyKey::new("age"), 30i64.into());
1742 storage.set(node, PropertyKey::new("active"), true.into());
1743
1744 let props = storage.get_all(node);
1745 assert_eq!(props.len(), 3);
1746 }
1747
1748 #[test]
1749 fn test_property_storage_remove_all() {
1750 let storage = PropertyStorage::new();
1751
1752 let node = NodeId::new(1);
1753 storage.set(node, PropertyKey::new("name"), "Alix".into());
1754 storage.set(node, PropertyKey::new("age"), 30i64.into());
1755
1756 storage.remove_all(node);
1757
1758 assert!(storage.get(node, &PropertyKey::new("name")).is_none());
1759 assert!(storage.get(node, &PropertyKey::new("age")).is_none());
1760 }
1761
1762 #[test]
1763 fn test_property_column() {
1764 let mut col = PropertyColumn::new();
1765
1766 col.set(NodeId::new(1), "Alix".into());
1767 col.set(NodeId::new(2), "Gus".into());
1768
1769 assert_eq!(col.len(), 2);
1770 assert!(!col.is_empty());
1771
1772 assert_eq!(col.get(NodeId::new(1)), Some(Value::String("Alix".into())));
1773
1774 col.remove(NodeId::new(1));
1775 assert!(col.get(NodeId::new(1)).is_none());
1776 assert_eq!(col.len(), 1);
1777 }
1778
1779 #[test]
1780 fn test_compression_mode() {
1781 let col: PropertyColumn<NodeId> = PropertyColumn::new();
1782 assert_eq!(col.compression_mode(), CompressionMode::None);
1783
1784 let col: PropertyColumn<NodeId> = PropertyColumn::with_compression(CompressionMode::Auto);
1785 assert_eq!(col.compression_mode(), CompressionMode::Auto);
1786 }
1787
1788 #[test]
1789 fn test_property_storage_with_compression() {
1790 let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1791
1792 for i in 0..100 {
1793 storage.set(
1794 NodeId::new(i),
1795 PropertyKey::new("age"),
1796 Value::Int64(20 + (i as i64 % 50)),
1797 );
1798 }
1799
1800 assert_eq!(
1802 storage.get(NodeId::new(0), &PropertyKey::new("age")),
1803 Some(Value::Int64(20))
1804 );
1805 assert_eq!(
1806 storage.get(NodeId::new(50), &PropertyKey::new("age")),
1807 Some(Value::Int64(20))
1808 );
1809 }
1810
1811 #[test]
1812 fn test_compress_integer_column() {
1813 let mut col: PropertyColumn<NodeId> =
1814 PropertyColumn::with_compression(CompressionMode::Auto);
1815
1816 for i in 0..2000 {
1818 col.set(NodeId::new(i), Value::Int64(1000 + i as i64));
1819 }
1820
1821 let stats = col.compression_stats();
1824 assert_eq!(stats.value_count, 2000);
1825
1826 let last_value = col.get(NodeId::new(1999));
1829 assert!(last_value.is_some() || col.is_compressed());
1830 }
1831
1832 #[test]
1833 fn test_compress_string_column() {
1834 let mut col: PropertyColumn<NodeId> =
1835 PropertyColumn::with_compression(CompressionMode::Auto);
1836
1837 let categories = ["Person", "Company", "Product", "Location"];
1839 for i in 0..2000 {
1840 let cat = categories[i % 4];
1841 col.set(NodeId::new(i as u64), Value::String(ArcStr::from(cat)));
1842 }
1843
1844 assert_eq!(col.len(), 2000);
1846
1847 let last_value = col.get(NodeId::new(1999));
1849 assert!(last_value.is_some() || col.is_compressed());
1850 }
1851
1852 #[test]
1853 fn test_compress_boolean_column() {
1854 let mut col: PropertyColumn<NodeId> =
1855 PropertyColumn::with_compression(CompressionMode::Auto);
1856
1857 for i in 0..2000 {
1859 col.set(NodeId::new(i as u64), Value::Bool(i % 2 == 0));
1860 }
1861
1862 assert_eq!(col.len(), 2000);
1864
1865 let last_value = col.get(NodeId::new(1999));
1867 assert!(last_value.is_some() || col.is_compressed());
1868 }
1869
1870 #[test]
1871 fn test_force_compress() {
1872 let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1873
1874 for i in 0..100 {
1876 col.set(NodeId::new(i), Value::Int64(i as i64));
1877 }
1878
1879 col.force_compress();
1881
1882 let stats = col.compression_stats();
1884 assert_eq!(stats.value_count, 100);
1885 }
1886
1887 #[test]
1888 fn test_compression_stats() {
1889 let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1890
1891 for i in 0..50 {
1892 col.set(NodeId::new(i), Value::Int64(i as i64));
1893 }
1894
1895 let stats = col.compression_stats();
1896 assert_eq!(stats.value_count, 50);
1897 assert!(stats.uncompressed_size > 0);
1898 }
1899
1900 #[test]
1901 fn test_storage_compression_stats() {
1902 let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1903
1904 for i in 0..100 {
1905 storage.set(
1906 NodeId::new(i),
1907 PropertyKey::new("age"),
1908 Value::Int64(i as i64),
1909 );
1910 storage.set(
1911 NodeId::new(i),
1912 PropertyKey::new("name"),
1913 Value::String(ArcStr::from("Alix")),
1914 );
1915 }
1916
1917 let stats = storage.compression_stats();
1918 assert_eq!(stats.len(), 2); assert!(stats.contains_key(&PropertyKey::new("age")));
1920 assert!(stats.contains_key(&PropertyKey::new("name")));
1921 }
1922
1923 #[test]
1924 fn test_memory_usage() {
1925 let storage = PropertyStorage::new();
1926
1927 for i in 0..100 {
1928 storage.set(
1929 NodeId::new(i),
1930 PropertyKey::new("value"),
1931 Value::Int64(i as i64),
1932 );
1933 }
1934
1935 let usage = storage.memory_usage();
1936 assert!(usage > 0);
1937 }
1938
1939 #[test]
1940 fn test_get_batch_single_property() {
1941 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1942
1943 let node1 = NodeId::new(1);
1944 let node2 = NodeId::new(2);
1945 let node3 = NodeId::new(3);
1946 let age_key = PropertyKey::new("age");
1947
1948 storage.set(node1, age_key.clone(), 25i64.into());
1949 storage.set(node2, age_key.clone(), 30i64.into());
1950 let ids = vec![node1, node2, node3];
1953 let values = storage.get_batch(&ids, &age_key);
1954
1955 assert_eq!(values.len(), 3);
1956 assert_eq!(values[0], Some(Value::Int64(25)));
1957 assert_eq!(values[1], Some(Value::Int64(30)));
1958 assert_eq!(values[2], None);
1959 }
1960
1961 #[test]
1962 fn test_get_batch_missing_column() {
1963 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1964
1965 let node1 = NodeId::new(1);
1966 let node2 = NodeId::new(2);
1967 let missing_key = PropertyKey::new("nonexistent");
1968
1969 let ids = vec![node1, node2];
1970 let values = storage.get_batch(&ids, &missing_key);
1971
1972 assert_eq!(values.len(), 2);
1973 assert_eq!(values[0], None);
1974 assert_eq!(values[1], None);
1975 }
1976
1977 #[test]
1978 fn test_get_batch_empty_ids() {
1979 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1980 let key = PropertyKey::new("any");
1981
1982 let values = storage.get_batch(&[], &key);
1983 assert!(values.is_empty());
1984 }
1985
1986 #[test]
1987 fn test_get_all_batch() {
1988 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1989
1990 let node1 = NodeId::new(1);
1991 let node2 = NodeId::new(2);
1992 let node3 = NodeId::new(3);
1993
1994 storage.set(node1, PropertyKey::new("name"), "Alix".into());
1995 storage.set(node1, PropertyKey::new("age"), 25i64.into());
1996 storage.set(node2, PropertyKey::new("name"), "Gus".into());
1997 let ids = vec![node1, node2, node3];
2000 let all_props = storage.get_all_batch(&ids);
2001
2002 assert_eq!(all_props.len(), 3);
2003 assert_eq!(all_props[0].len(), 2); assert_eq!(all_props[1].len(), 1); assert_eq!(all_props[2].len(), 0); assert_eq!(
2008 all_props[0].get(&PropertyKey::new("name")),
2009 Some(&Value::String("Alix".into()))
2010 );
2011 assert_eq!(
2012 all_props[1].get(&PropertyKey::new("name")),
2013 Some(&Value::String("Gus".into()))
2014 );
2015 }
2016
2017 #[test]
2018 fn test_get_all_batch_empty_ids() {
2019 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
2020
2021 let all_props = storage.get_all_batch(&[]);
2022 assert!(all_props.is_empty());
2023 }
2024}