1use crate::index::zone_map::ZoneMapEntry;
24use crate::storage::{
25 CompressedData, CompressionCodec, DictionaryBuilder, DictionaryEncoding, TypeSpecificCompressor,
26};
27use arcstr::ArcStr;
28use grafeo_common::types::{EdgeId, NodeId, PropertyKey, Value};
29use grafeo_common::utils::hash::FxHashMap;
30use parking_lot::RwLock;
31use std::cmp::Ordering;
32use std::hash::Hash;
33use std::marker::PhantomData;
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
37pub enum CompressionMode {
38 #[default]
40 None,
41 Auto,
43 Eager,
45}
46
47const COMPRESSION_THRESHOLD: usize = 1000;
49
50const HOT_BUFFER_SIZE: usize = 4096;
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum CompareOp {
61 Eq,
63 Ne,
65 Lt,
67 Le,
69 Gt,
71 Ge,
73}
74
75pub trait EntityId: Copy + Eq + Hash + 'static {}
79
80impl EntityId for NodeId {}
81impl EntityId for EdgeId {}
82
83pub struct PropertyStorage<Id: EntityId = NodeId> {
108 columns: RwLock<FxHashMap<PropertyKey, PropertyColumn<Id>>>,
111 default_compression: CompressionMode,
113 _marker: PhantomData<Id>,
114}
115
116impl<Id: EntityId> PropertyStorage<Id> {
117 #[must_use]
119 pub fn new() -> Self {
120 Self {
121 columns: RwLock::new(FxHashMap::default()),
122 default_compression: CompressionMode::None,
123 _marker: PhantomData,
124 }
125 }
126
127 #[must_use]
129 pub fn with_compression(mode: CompressionMode) -> Self {
130 Self {
131 columns: RwLock::new(FxHashMap::default()),
132 default_compression: mode,
133 _marker: PhantomData,
134 }
135 }
136
137 pub fn set_default_compression(&mut self, mode: CompressionMode) {
139 self.default_compression = mode;
140 }
141
142 pub fn set(&self, id: Id, key: PropertyKey, value: Value) {
144 let mut columns = self.columns.write();
145 let mode = self.default_compression;
146 columns
147 .entry(key)
148 .or_insert_with(|| PropertyColumn::with_compression(mode))
149 .set(id, value);
150 }
151
152 pub fn enable_compression(&self, key: &PropertyKey, mode: CompressionMode) {
154 let mut columns = self.columns.write();
155 if let Some(col) = columns.get_mut(key) {
156 col.set_compression_mode(mode);
157 }
158 }
159
160 pub fn compress_all(&self) {
162 let mut columns = self.columns.write();
163 for col in columns.values_mut() {
164 if col.compression_mode() != CompressionMode::None {
165 col.compress();
166 }
167 }
168 }
169
170 pub fn force_compress_all(&self) {
172 let mut columns = self.columns.write();
173 for col in columns.values_mut() {
174 col.force_compress();
175 }
176 }
177
178 #[must_use]
180 pub fn compression_stats(&self) -> FxHashMap<PropertyKey, CompressionStats> {
181 let columns = self.columns.read();
182 columns
183 .iter()
184 .map(|(key, col)| (key.clone(), col.compression_stats()))
185 .collect()
186 }
187
188 #[must_use]
190 pub fn memory_usage(&self) -> usize {
191 let columns = self.columns.read();
192 columns
193 .values()
194 .map(|col| col.compression_stats().compressed_size)
195 .sum()
196 }
197
198 #[must_use]
200 pub fn get(&self, id: Id, key: &PropertyKey) -> Option<Value> {
201 let columns = self.columns.read();
202 columns.get(key).and_then(|col| col.get(id))
203 }
204
205 pub fn remove(&self, id: Id, key: &PropertyKey) -> Option<Value> {
207 let mut columns = self.columns.write();
208 columns.get_mut(key).and_then(|col| col.remove(id))
209 }
210
211 pub fn remove_all(&self, id: Id) {
213 let mut columns = self.columns.write();
214 for col in columns.values_mut() {
215 col.remove(id);
216 }
217 }
218
219 #[must_use]
221 pub fn get_all(&self, id: Id) -> FxHashMap<PropertyKey, Value> {
222 let columns = self.columns.read();
223 let mut result = FxHashMap::default();
224 for (key, col) in columns.iter() {
225 if let Some(value) = col.get(id) {
226 result.insert(key.clone(), value);
227 }
228 }
229 result
230 }
231
232 #[must_use]
251 pub fn get_batch(&self, ids: &[Id], key: &PropertyKey) -> Vec<Option<Value>> {
252 let columns = self.columns.read();
253 match columns.get(key) {
254 Some(col) => ids.iter().map(|&id| col.get(id)).collect(),
255 None => vec![None; ids.len()],
256 }
257 }
258
259 #[must_use]
277 pub fn get_all_batch(&self, ids: &[Id]) -> Vec<FxHashMap<PropertyKey, Value>> {
278 let columns = self.columns.read();
279 let column_count = columns.len();
280
281 let mut results = Vec::with_capacity(ids.len());
283
284 for &id in ids {
285 let mut result = FxHashMap::with_capacity_and_hasher(column_count, Default::default());
287 for (key, col) in columns.iter() {
288 if let Some(value) = col.get(id) {
289 result.insert(key.clone(), value);
290 }
291 }
292 results.push(result);
293 }
294
295 results
296 }
297
298 #[must_use]
321 pub fn get_selective_batch(
322 &self,
323 ids: &[Id],
324 keys: &[PropertyKey],
325 ) -> Vec<FxHashMap<PropertyKey, Value>> {
326 if keys.is_empty() {
327 return vec![FxHashMap::default(); ids.len()];
329 }
330
331 let columns = self.columns.read();
332
333 let requested_columns: Vec<_> = keys
335 .iter()
336 .filter_map(|key| columns.get(key).map(|col| (key, col)))
337 .collect();
338
339 let mut results = Vec::with_capacity(ids.len());
341
342 for &id in ids {
343 let mut result =
344 FxHashMap::with_capacity_and_hasher(requested_columns.len(), Default::default());
345 for (key, col) in &requested_columns {
347 if let Some(value) = col.get(id) {
348 result.insert((*key).clone(), value);
349 }
350 }
351 results.push(result);
352 }
353
354 results
355 }
356
357 #[must_use]
359 pub fn column_count(&self) -> usize {
360 self.columns.read().len()
361 }
362
363 #[must_use]
365 pub fn keys(&self) -> Vec<PropertyKey> {
366 self.columns.read().keys().cloned().collect()
367 }
368
369 #[must_use]
371 pub fn column(&self, key: &PropertyKey) -> Option<PropertyColumnRef<'_, Id>> {
372 let columns = self.columns.read();
373 if columns.contains_key(key) {
374 Some(PropertyColumnRef {
375 _guard: columns,
376 key: key.clone(),
377 _marker: PhantomData,
378 })
379 } else {
380 None
381 }
382 }
383
384 #[must_use]
390 pub fn might_match(&self, key: &PropertyKey, op: CompareOp, value: &Value) -> bool {
391 let columns = self.columns.read();
392 columns
393 .get(key)
394 .map_or(true, |col| col.might_match(op, value)) }
396
397 #[must_use]
399 pub fn zone_map(&self, key: &PropertyKey) -> Option<ZoneMapEntry> {
400 let columns = self.columns.read();
401 columns.get(key).map(|col| col.zone_map().clone())
402 }
403
404 #[must_use]
409 pub fn might_match_range(
410 &self,
411 key: &PropertyKey,
412 min: Option<&Value>,
413 max: Option<&Value>,
414 min_inclusive: bool,
415 max_inclusive: bool,
416 ) -> bool {
417 let columns = self.columns.read();
418 columns.get(key).map_or(true, |col| {
419 col.zone_map()
420 .might_contain_range(min, max, min_inclusive, max_inclusive)
421 }) }
423
424 pub fn rebuild_zone_maps(&self) {
426 let mut columns = self.columns.write();
427 for col in columns.values_mut() {
428 col.rebuild_zone_map();
429 }
430 }
431}
432
433impl<Id: EntityId> Default for PropertyStorage<Id> {
434 fn default() -> Self {
435 Self::new()
436 }
437}
438
439#[derive(Debug)]
444pub enum CompressedColumnData {
445 Integers {
447 data: CompressedData,
449 id_to_index: Vec<u64>,
451 index_to_id: Vec<u64>,
453 },
454 Strings {
456 encoding: DictionaryEncoding,
458 id_to_index: Vec<u64>,
460 index_to_id: Vec<u64>,
462 },
463 Booleans {
465 data: CompressedData,
467 id_to_index: Vec<u64>,
469 index_to_id: Vec<u64>,
471 },
472}
473
474impl CompressedColumnData {
475 #[must_use]
477 pub fn memory_usage(&self) -> usize {
478 match self {
479 CompressedColumnData::Integers {
480 data,
481 id_to_index,
482 index_to_id,
483 } => {
484 data.data.len()
485 + id_to_index.len() * std::mem::size_of::<u64>()
486 + index_to_id.len() * std::mem::size_of::<u64>()
487 }
488 CompressedColumnData::Strings {
489 encoding,
490 id_to_index,
491 index_to_id,
492 } => {
493 encoding.codes().len() * std::mem::size_of::<u32>()
494 + encoding.dictionary().iter().map(|s| s.len()).sum::<usize>()
495 + id_to_index.len() * std::mem::size_of::<u64>()
496 + index_to_id.len() * std::mem::size_of::<u64>()
497 }
498 CompressedColumnData::Booleans {
499 data,
500 id_to_index,
501 index_to_id,
502 } => {
503 data.data.len()
504 + id_to_index.len() * std::mem::size_of::<u64>()
505 + index_to_id.len() * std::mem::size_of::<u64>()
506 }
507 }
508 }
509
510 #[must_use]
512 #[allow(dead_code)]
513 pub fn compression_ratio(&self) -> f64 {
514 match self {
515 CompressedColumnData::Integers { data, .. } => data.compression_ratio(),
516 CompressedColumnData::Strings { encoding, .. } => encoding.compression_ratio(),
517 CompressedColumnData::Booleans { data, .. } => data.compression_ratio(),
518 }
519 }
520}
521
522#[derive(Debug, Clone, Default)]
524pub struct CompressionStats {
525 pub uncompressed_size: usize,
527 pub compressed_size: usize,
529 pub value_count: usize,
531 pub codec: Option<CompressionCodec>,
533}
534
535impl CompressionStats {
536 #[must_use]
538 pub fn compression_ratio(&self) -> f64 {
539 if self.compressed_size == 0 {
540 return 1.0;
541 }
542 self.uncompressed_size as f64 / self.compressed_size as f64
543 }
544}
545
546pub struct PropertyColumn<Id: EntityId = NodeId> {
556 values: FxHashMap<Id, Value>,
559 zone_map: ZoneMapEntry,
561 zone_map_dirty: bool,
563 compression_mode: CompressionMode,
565 compressed: Option<CompressedColumnData>,
567 compressed_count: usize,
569}
570
571impl<Id: EntityId> PropertyColumn<Id> {
572 #[must_use]
574 pub fn new() -> Self {
575 Self {
576 values: FxHashMap::default(),
577 zone_map: ZoneMapEntry::new(),
578 zone_map_dirty: false,
579 compression_mode: CompressionMode::None,
580 compressed: None,
581 compressed_count: 0,
582 }
583 }
584
585 #[must_use]
587 pub fn with_compression(mode: CompressionMode) -> Self {
588 Self {
589 values: FxHashMap::default(),
590 zone_map: ZoneMapEntry::new(),
591 zone_map_dirty: false,
592 compression_mode: mode,
593 compressed: None,
594 compressed_count: 0,
595 }
596 }
597
598 pub fn set_compression_mode(&mut self, mode: CompressionMode) {
600 self.compression_mode = mode;
601 if mode == CompressionMode::None {
602 if self.compressed.is_some() {
604 self.decompress_all();
605 }
606 }
607 }
608
609 #[must_use]
611 pub fn compression_mode(&self) -> CompressionMode {
612 self.compression_mode
613 }
614
615 pub fn set(&mut self, id: Id, value: Value) {
617 self.update_zone_map_on_insert(&value);
619 self.values.insert(id, value);
620
621 if self.compression_mode == CompressionMode::Auto {
623 let total_count = self.values.len() + self.compressed_count;
624 let hot_buffer_count = self.values.len();
625
626 if hot_buffer_count >= HOT_BUFFER_SIZE && total_count >= COMPRESSION_THRESHOLD {
628 self.compress();
629 }
630 }
631 }
632
633 fn update_zone_map_on_insert(&mut self, value: &Value) {
635 self.zone_map.row_count += 1;
636
637 if matches!(value, Value::Null) {
638 self.zone_map.null_count += 1;
639 return;
640 }
641
642 match &self.zone_map.min {
644 None => self.zone_map.min = Some(value.clone()),
645 Some(current) => {
646 if compare_values(value, current) == Some(Ordering::Less) {
647 self.zone_map.min = Some(value.clone());
648 }
649 }
650 }
651
652 match &self.zone_map.max {
654 None => self.zone_map.max = Some(value.clone()),
655 Some(current) => {
656 if compare_values(value, current) == Some(Ordering::Greater) {
657 self.zone_map.max = Some(value.clone());
658 }
659 }
660 }
661 }
662
663 #[must_use]
668 pub fn get(&self, id: Id) -> Option<Value> {
669 if let Some(value) = self.values.get(&id) {
671 return Some(value.clone());
672 }
673
674 None
679 }
680
681 pub fn remove(&mut self, id: Id) -> Option<Value> {
683 let removed = self.values.remove(&id);
684 if removed.is_some() {
685 self.zone_map_dirty = true;
687 }
688 removed
689 }
690
691 #[must_use]
693 #[allow(dead_code)]
694 pub fn len(&self) -> usize {
695 self.values.len() + self.compressed_count
696 }
697
698 #[must_use]
700 #[allow(dead_code)]
701 pub fn is_empty(&self) -> bool {
702 self.values.is_empty() && self.compressed_count == 0
703 }
704
705 #[allow(dead_code)]
710 pub fn iter(&self) -> impl Iterator<Item = (Id, &Value)> {
711 self.values.iter().map(|(&id, v)| (id, v))
712 }
713
714 #[must_use]
716 pub fn compression_stats(&self) -> CompressionStats {
717 let hot_size = self.values.len() * std::mem::size_of::<Value>();
718 let compressed_size = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
719 let codec = match &self.compressed {
720 Some(CompressedColumnData::Integers { data, .. }) => Some(data.codec),
721 Some(CompressedColumnData::Strings { .. }) => Some(CompressionCodec::Dictionary),
722 Some(CompressedColumnData::Booleans { data, .. }) => Some(data.codec),
723 None => None,
724 };
725
726 CompressionStats {
727 uncompressed_size: hot_size + self.compressed_count * std::mem::size_of::<Value>(),
728 compressed_size: hot_size + compressed_size,
729 value_count: self.len(),
730 codec,
731 }
732 }
733
734 #[must_use]
736 #[allow(dead_code)]
737 pub fn is_compressed(&self) -> bool {
738 self.compressed.is_some()
739 }
740
741 pub fn compress(&mut self) {
750 if self.values.is_empty() {
751 return;
752 }
753
754 if self.compressed.is_some() {
757 return;
758 }
759
760 let (int_count, str_count, bool_count) = self.count_types();
762 let total = self.values.len();
763
764 if int_count > total / 2 {
765 self.compress_as_integers();
766 } else if str_count > total / 2 {
767 self.compress_as_strings();
768 } else if bool_count > total / 2 {
769 self.compress_as_booleans();
770 }
771 }
773
774 fn count_types(&self) -> (usize, usize, usize) {
776 let mut int_count = 0;
777 let mut str_count = 0;
778 let mut bool_count = 0;
779
780 for value in self.values.values() {
781 match value {
782 Value::Int64(_) => int_count += 1,
783 Value::String(_) => str_count += 1,
784 Value::Bool(_) => bool_count += 1,
785 _ => {}
786 }
787 }
788
789 (int_count, str_count, bool_count)
790 }
791
792 #[allow(unsafe_code)]
794 fn compress_as_integers(&mut self) {
795 let mut values: Vec<(u64, i64)> = Vec::new();
797 let mut non_int_values: FxHashMap<Id, Value> = FxHashMap::default();
798
799 for (&id, value) in &self.values {
800 match value {
801 Value::Int64(v) => {
802 let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
804 values.push((id_u64, *v));
805 }
806 _ => {
807 non_int_values.insert(id, value.clone());
808 }
809 }
810 }
811
812 if values.len() < 8 {
813 return;
815 }
816
817 values.sort_by_key(|(id, _)| *id);
819
820 let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
821 let index_to_id: Vec<u64> = id_to_index.clone();
822 let int_values: Vec<i64> = values.iter().map(|(_, v)| *v).collect();
823
824 let compressed = TypeSpecificCompressor::compress_signed_integers(&int_values);
826
827 if compressed.compression_ratio() > 1.2 {
829 self.compressed = Some(CompressedColumnData::Integers {
830 data: compressed,
831 id_to_index,
832 index_to_id,
833 });
834 self.compressed_count = values.len();
835 self.values = non_int_values;
836 }
837 }
838
839 #[allow(unsafe_code)]
841 fn compress_as_strings(&mut self) {
842 let mut values: Vec<(u64, ArcStr)> = Vec::new();
843 let mut non_str_values: FxHashMap<Id, Value> = FxHashMap::default();
844
845 for (&id, value) in &self.values {
846 match value {
847 Value::String(s) => {
848 let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
849 values.push((id_u64, s.clone()));
850 }
851 _ => {
852 non_str_values.insert(id, value.clone());
853 }
854 }
855 }
856
857 if values.len() < 8 {
858 return;
859 }
860
861 values.sort_by_key(|(id, _)| *id);
863
864 let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
865 let index_to_id: Vec<u64> = id_to_index.clone();
866
867 let mut builder = DictionaryBuilder::new();
869 for (_, s) in &values {
870 builder.add(s.as_ref());
871 }
872 let encoding = builder.build();
873
874 if encoding.compression_ratio() > 1.2 {
876 self.compressed = Some(CompressedColumnData::Strings {
877 encoding,
878 id_to_index,
879 index_to_id,
880 });
881 self.compressed_count = values.len();
882 self.values = non_str_values;
883 }
884 }
885
886 #[allow(unsafe_code)]
888 fn compress_as_booleans(&mut self) {
889 let mut values: Vec<(u64, bool)> = Vec::new();
890 let mut non_bool_values: FxHashMap<Id, Value> = FxHashMap::default();
891
892 for (&id, value) in &self.values {
893 match value {
894 Value::Bool(b) => {
895 let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
896 values.push((id_u64, *b));
897 }
898 _ => {
899 non_bool_values.insert(id, value.clone());
900 }
901 }
902 }
903
904 if values.len() < 8 {
905 return;
906 }
907
908 values.sort_by_key(|(id, _)| *id);
910
911 let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
912 let index_to_id: Vec<u64> = id_to_index.clone();
913 let bool_values: Vec<bool> = values.iter().map(|(_, v)| *v).collect();
914
915 let compressed = TypeSpecificCompressor::compress_booleans(&bool_values);
916
917 self.compressed = Some(CompressedColumnData::Booleans {
919 data: compressed,
920 id_to_index,
921 index_to_id,
922 });
923 self.compressed_count = values.len();
924 self.values = non_bool_values;
925 }
926
927 #[allow(unsafe_code)]
929 fn decompress_all(&mut self) {
930 let Some(compressed) = self.compressed.take() else {
931 return;
932 };
933
934 match compressed {
935 CompressedColumnData::Integers {
936 data, index_to_id, ..
937 } => {
938 if let Ok(values) = TypeSpecificCompressor::decompress_integers(&data) {
939 let signed: Vec<i64> = values
941 .iter()
942 .map(|&v| crate::storage::zigzag_decode(v))
943 .collect();
944
945 for (i, id_u64) in index_to_id.iter().enumerate() {
946 if let Some(&value) = signed.get(i) {
947 let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
948 self.values.insert(id, Value::Int64(value));
949 }
950 }
951 }
952 }
953 CompressedColumnData::Strings {
954 encoding,
955 index_to_id,
956 ..
957 } => {
958 for (i, id_u64) in index_to_id.iter().enumerate() {
959 if let Some(s) = encoding.get(i) {
960 let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
961 self.values.insert(id, Value::String(ArcStr::from(s)));
962 }
963 }
964 }
965 CompressedColumnData::Booleans {
966 data, index_to_id, ..
967 } => {
968 if let Ok(values) = TypeSpecificCompressor::decompress_booleans(&data) {
969 for (i, id_u64) in index_to_id.iter().enumerate() {
970 if let Some(&value) = values.get(i) {
971 let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
972 self.values.insert(id, Value::Bool(value));
973 }
974 }
975 }
976 }
977 }
978
979 self.compressed_count = 0;
980 }
981
982 pub fn force_compress(&mut self) {
986 self.compress();
987 }
988
989 #[must_use]
991 pub fn zone_map(&self) -> &ZoneMapEntry {
992 &self.zone_map
993 }
994
995 #[must_use]
1000 pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
1001 if self.zone_map_dirty {
1002 return true;
1004 }
1005
1006 match op {
1007 CompareOp::Eq => self.zone_map.might_contain_equal(value),
1008 CompareOp::Ne => {
1009 match (&self.zone_map.min, &self.zone_map.max) {
1012 (Some(min), Some(max)) => {
1013 !(compare_values(min, value) == Some(Ordering::Equal)
1014 && compare_values(max, value) == Some(Ordering::Equal))
1015 }
1016 _ => true,
1017 }
1018 }
1019 CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
1020 CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
1021 CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
1022 CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
1023 }
1024 }
1025
1026 pub fn rebuild_zone_map(&mut self) {
1028 let mut zone_map = ZoneMapEntry::new();
1029
1030 for value in self.values.values() {
1031 zone_map.row_count += 1;
1032
1033 if matches!(value, Value::Null) {
1034 zone_map.null_count += 1;
1035 continue;
1036 }
1037
1038 match &zone_map.min {
1040 None => zone_map.min = Some(value.clone()),
1041 Some(current) => {
1042 if compare_values(value, current) == Some(Ordering::Less) {
1043 zone_map.min = Some(value.clone());
1044 }
1045 }
1046 }
1047
1048 match &zone_map.max {
1050 None => zone_map.max = Some(value.clone()),
1051 Some(current) => {
1052 if compare_values(value, current) == Some(Ordering::Greater) {
1053 zone_map.max = Some(value.clone());
1054 }
1055 }
1056 }
1057 }
1058
1059 self.zone_map = zone_map;
1060 self.zone_map_dirty = false;
1061 }
1062}
1063
1064fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
1066 match (a, b) {
1067 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
1068 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
1069 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
1070 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
1071 (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
1072 (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
1073 _ => None,
1074 }
1075}
1076
1077impl<Id: EntityId> Default for PropertyColumn<Id> {
1078 fn default() -> Self {
1079 Self::new()
1080 }
1081}
1082
1083pub struct PropertyColumnRef<'a, Id: EntityId = NodeId> {
1087 _guard: parking_lot::RwLockReadGuard<'a, FxHashMap<PropertyKey, PropertyColumn<Id>>>,
1088 #[allow(dead_code)]
1089 key: PropertyKey,
1090 _marker: PhantomData<Id>,
1091}
1092
1093#[cfg(test)]
1094mod tests {
1095 use super::*;
1096 use arcstr::ArcStr;
1097
1098 #[test]
1099 fn test_property_storage_basic() {
1100 let storage = PropertyStorage::new();
1101
1102 let node1 = NodeId::new(1);
1103 let node2 = NodeId::new(2);
1104 let name_key = PropertyKey::new("name");
1105 let age_key = PropertyKey::new("age");
1106
1107 storage.set(node1, name_key.clone(), "Alice".into());
1108 storage.set(node1, age_key.clone(), 30i64.into());
1109 storage.set(node2, name_key.clone(), "Bob".into());
1110
1111 assert_eq!(
1112 storage.get(node1, &name_key),
1113 Some(Value::String("Alice".into()))
1114 );
1115 assert_eq!(storage.get(node1, &age_key), Some(Value::Int64(30)));
1116 assert_eq!(
1117 storage.get(node2, &name_key),
1118 Some(Value::String("Bob".into()))
1119 );
1120 assert!(storage.get(node2, &age_key).is_none());
1121 }
1122
1123 #[test]
1124 fn test_property_storage_remove() {
1125 let storage = PropertyStorage::new();
1126
1127 let node = NodeId::new(1);
1128 let key = PropertyKey::new("name");
1129
1130 storage.set(node, key.clone(), "Alice".into());
1131 assert!(storage.get(node, &key).is_some());
1132
1133 let removed = storage.remove(node, &key);
1134 assert!(removed.is_some());
1135 assert!(storage.get(node, &key).is_none());
1136 }
1137
1138 #[test]
1139 fn test_property_storage_get_all() {
1140 let storage = PropertyStorage::new();
1141
1142 let node = NodeId::new(1);
1143 storage.set(node, PropertyKey::new("name"), "Alice".into());
1144 storage.set(node, PropertyKey::new("age"), 30i64.into());
1145 storage.set(node, PropertyKey::new("active"), true.into());
1146
1147 let props = storage.get_all(node);
1148 assert_eq!(props.len(), 3);
1149 }
1150
1151 #[test]
1152 fn test_property_storage_remove_all() {
1153 let storage = PropertyStorage::new();
1154
1155 let node = NodeId::new(1);
1156 storage.set(node, PropertyKey::new("name"), "Alice".into());
1157 storage.set(node, PropertyKey::new("age"), 30i64.into());
1158
1159 storage.remove_all(node);
1160
1161 assert!(storage.get(node, &PropertyKey::new("name")).is_none());
1162 assert!(storage.get(node, &PropertyKey::new("age")).is_none());
1163 }
1164
1165 #[test]
1166 fn test_property_column() {
1167 let mut col = PropertyColumn::new();
1168
1169 col.set(NodeId::new(1), "Alice".into());
1170 col.set(NodeId::new(2), "Bob".into());
1171
1172 assert_eq!(col.len(), 2);
1173 assert!(!col.is_empty());
1174
1175 assert_eq!(col.get(NodeId::new(1)), Some(Value::String("Alice".into())));
1176
1177 col.remove(NodeId::new(1));
1178 assert!(col.get(NodeId::new(1)).is_none());
1179 assert_eq!(col.len(), 1);
1180 }
1181
1182 #[test]
1183 fn test_compression_mode() {
1184 let col: PropertyColumn<NodeId> = PropertyColumn::new();
1185 assert_eq!(col.compression_mode(), CompressionMode::None);
1186
1187 let col: PropertyColumn<NodeId> = PropertyColumn::with_compression(CompressionMode::Auto);
1188 assert_eq!(col.compression_mode(), CompressionMode::Auto);
1189 }
1190
1191 #[test]
1192 fn test_property_storage_with_compression() {
1193 let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1194
1195 for i in 0..100 {
1196 storage.set(
1197 NodeId::new(i),
1198 PropertyKey::new("age"),
1199 Value::Int64(20 + (i as i64 % 50)),
1200 );
1201 }
1202
1203 assert_eq!(
1205 storage.get(NodeId::new(0), &PropertyKey::new("age")),
1206 Some(Value::Int64(20))
1207 );
1208 assert_eq!(
1209 storage.get(NodeId::new(50), &PropertyKey::new("age")),
1210 Some(Value::Int64(20))
1211 );
1212 }
1213
1214 #[test]
1215 fn test_compress_integer_column() {
1216 let mut col: PropertyColumn<NodeId> =
1217 PropertyColumn::with_compression(CompressionMode::Auto);
1218
1219 for i in 0..2000 {
1221 col.set(NodeId::new(i), Value::Int64(1000 + i as i64));
1222 }
1223
1224 let stats = col.compression_stats();
1227 assert_eq!(stats.value_count, 2000);
1228
1229 let last_value = col.get(NodeId::new(1999));
1232 assert!(last_value.is_some() || col.is_compressed());
1233 }
1234
1235 #[test]
1236 fn test_compress_string_column() {
1237 let mut col: PropertyColumn<NodeId> =
1238 PropertyColumn::with_compression(CompressionMode::Auto);
1239
1240 let categories = ["Person", "Company", "Product", "Location"];
1242 for i in 0..2000 {
1243 let cat = categories[i % 4];
1244 col.set(NodeId::new(i as u64), Value::String(ArcStr::from(cat)));
1245 }
1246
1247 assert_eq!(col.len(), 2000);
1249
1250 let last_value = col.get(NodeId::new(1999));
1252 assert!(last_value.is_some() || col.is_compressed());
1253 }
1254
1255 #[test]
1256 fn test_compress_boolean_column() {
1257 let mut col: PropertyColumn<NodeId> =
1258 PropertyColumn::with_compression(CompressionMode::Auto);
1259
1260 for i in 0..2000 {
1262 col.set(NodeId::new(i as u64), Value::Bool(i % 2 == 0));
1263 }
1264
1265 assert_eq!(col.len(), 2000);
1267
1268 let last_value = col.get(NodeId::new(1999));
1270 assert!(last_value.is_some() || col.is_compressed());
1271 }
1272
1273 #[test]
1274 fn test_force_compress() {
1275 let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1276
1277 for i in 0..100 {
1279 col.set(NodeId::new(i), Value::Int64(i as i64));
1280 }
1281
1282 col.force_compress();
1284
1285 let stats = col.compression_stats();
1287 assert_eq!(stats.value_count, 100);
1288 }
1289
1290 #[test]
1291 fn test_compression_stats() {
1292 let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1293
1294 for i in 0..50 {
1295 col.set(NodeId::new(i), Value::Int64(i as i64));
1296 }
1297
1298 let stats = col.compression_stats();
1299 assert_eq!(stats.value_count, 50);
1300 assert!(stats.uncompressed_size > 0);
1301 }
1302
1303 #[test]
1304 fn test_storage_compression_stats() {
1305 let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1306
1307 for i in 0..100 {
1308 storage.set(
1309 NodeId::new(i),
1310 PropertyKey::new("age"),
1311 Value::Int64(i as i64),
1312 );
1313 storage.set(
1314 NodeId::new(i),
1315 PropertyKey::new("name"),
1316 Value::String(ArcStr::from("Alice")),
1317 );
1318 }
1319
1320 let stats = storage.compression_stats();
1321 assert_eq!(stats.len(), 2); assert!(stats.contains_key(&PropertyKey::new("age")));
1323 assert!(stats.contains_key(&PropertyKey::new("name")));
1324 }
1325
1326 #[test]
1327 fn test_memory_usage() {
1328 let storage = PropertyStorage::new();
1329
1330 for i in 0..100 {
1331 storage.set(
1332 NodeId::new(i),
1333 PropertyKey::new("value"),
1334 Value::Int64(i as i64),
1335 );
1336 }
1337
1338 let usage = storage.memory_usage();
1339 assert!(usage > 0);
1340 }
1341
1342 #[test]
1343 fn test_get_batch_single_property() {
1344 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1345
1346 let node1 = NodeId::new(1);
1347 let node2 = NodeId::new(2);
1348 let node3 = NodeId::new(3);
1349 let age_key = PropertyKey::new("age");
1350
1351 storage.set(node1, age_key.clone(), 25i64.into());
1352 storage.set(node2, age_key.clone(), 30i64.into());
1353 let ids = vec![node1, node2, node3];
1356 let values = storage.get_batch(&ids, &age_key);
1357
1358 assert_eq!(values.len(), 3);
1359 assert_eq!(values[0], Some(Value::Int64(25)));
1360 assert_eq!(values[1], Some(Value::Int64(30)));
1361 assert_eq!(values[2], None);
1362 }
1363
1364 #[test]
1365 fn test_get_batch_missing_column() {
1366 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1367
1368 let node1 = NodeId::new(1);
1369 let node2 = NodeId::new(2);
1370 let missing_key = PropertyKey::new("nonexistent");
1371
1372 let ids = vec![node1, node2];
1373 let values = storage.get_batch(&ids, &missing_key);
1374
1375 assert_eq!(values.len(), 2);
1376 assert_eq!(values[0], None);
1377 assert_eq!(values[1], None);
1378 }
1379
1380 #[test]
1381 fn test_get_batch_empty_ids() {
1382 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1383 let key = PropertyKey::new("any");
1384
1385 let values = storage.get_batch(&[], &key);
1386 assert!(values.is_empty());
1387 }
1388
1389 #[test]
1390 fn test_get_all_batch() {
1391 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1392
1393 let node1 = NodeId::new(1);
1394 let node2 = NodeId::new(2);
1395 let node3 = NodeId::new(3);
1396
1397 storage.set(node1, PropertyKey::new("name"), "Alice".into());
1398 storage.set(node1, PropertyKey::new("age"), 25i64.into());
1399 storage.set(node2, PropertyKey::new("name"), "Bob".into());
1400 let ids = vec![node1, node2, node3];
1403 let all_props = storage.get_all_batch(&ids);
1404
1405 assert_eq!(all_props.len(), 3);
1406 assert_eq!(all_props[0].len(), 2); assert_eq!(all_props[1].len(), 1); assert_eq!(all_props[2].len(), 0); assert_eq!(
1411 all_props[0].get(&PropertyKey::new("name")),
1412 Some(&Value::String("Alice".into()))
1413 );
1414 assert_eq!(
1415 all_props[1].get(&PropertyKey::new("name")),
1416 Some(&Value::String("Bob".into()))
1417 );
1418 }
1419
1420 #[test]
1421 fn test_get_all_batch_empty_ids() {
1422 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1423
1424 let all_props = storage.get_all_batch(&[]);
1425 assert!(all_props.is_empty());
1426 }
1427}