1use crate::index::zone_map::ZoneMapEntry;
24use crate::storage::{
25 CompressedData, CompressionCodec, DictionaryBuilder, DictionaryEncoding, TypeSpecificCompressor,
26};
27use arcstr::ArcStr;
28use grafeo_common::types::{EdgeId, NodeId, PropertyKey, Value};
29use grafeo_common::utils::hash::FxHashMap;
30use parking_lot::RwLock;
31use std::cmp::Ordering;
32use std::hash::Hash;
33use std::marker::PhantomData;
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
37pub enum CompressionMode {
38 #[default]
40 None,
41 Auto,
43 Eager,
45}
46
47const COMPRESSION_THRESHOLD: usize = 1000;
49
50const HOT_BUFFER_SIZE: usize = 4096;
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum CompareOp {
61 Eq,
63 Ne,
65 Lt,
67 Le,
69 Gt,
71 Ge,
73}
74
75pub trait EntityId: Copy + Eq + Hash + 'static {}
79
80impl EntityId for NodeId {}
81impl EntityId for EdgeId {}
82
83pub struct PropertyStorage<Id: EntityId = NodeId> {
108 columns: RwLock<FxHashMap<PropertyKey, PropertyColumn<Id>>>,
111 default_compression: CompressionMode,
113 _marker: PhantomData<Id>,
114}
115
116impl<Id: EntityId> PropertyStorage<Id> {
117 #[must_use]
119 pub fn new() -> Self {
120 Self {
121 columns: RwLock::new(FxHashMap::default()),
122 default_compression: CompressionMode::None,
123 _marker: PhantomData,
124 }
125 }
126
127 #[must_use]
129 pub fn with_compression(mode: CompressionMode) -> Self {
130 Self {
131 columns: RwLock::new(FxHashMap::default()),
132 default_compression: mode,
133 _marker: PhantomData,
134 }
135 }
136
137 pub fn set_default_compression(&mut self, mode: CompressionMode) {
139 self.default_compression = mode;
140 }
141
142 pub fn set(&self, id: Id, key: PropertyKey, value: Value) {
144 let mut columns = self.columns.write();
145 let mode = self.default_compression;
146 columns
147 .entry(key)
148 .or_insert_with(|| PropertyColumn::with_compression(mode))
149 .set(id, value);
150 }
151
152 pub fn enable_compression(&self, key: &PropertyKey, mode: CompressionMode) {
154 let mut columns = self.columns.write();
155 if let Some(col) = columns.get_mut(key) {
156 col.set_compression_mode(mode);
157 }
158 }
159
160 pub fn compress_all(&self) {
162 let mut columns = self.columns.write();
163 for col in columns.values_mut() {
164 if col.compression_mode() != CompressionMode::None {
165 col.compress();
166 }
167 }
168 }
169
170 pub fn force_compress_all(&self) {
172 let mut columns = self.columns.write();
173 for col in columns.values_mut() {
174 col.force_compress();
175 }
176 }
177
178 #[must_use]
180 pub fn compression_stats(&self) -> FxHashMap<PropertyKey, CompressionStats> {
181 let columns = self.columns.read();
182 columns
183 .iter()
184 .map(|(key, col)| (key.clone(), col.compression_stats()))
185 .collect()
186 }
187
188 #[must_use]
190 pub fn memory_usage(&self) -> usize {
191 let columns = self.columns.read();
192 columns
193 .values()
194 .map(|col| col.compression_stats().compressed_size)
195 .sum()
196 }
197
198 #[must_use]
200 pub fn get(&self, id: Id, key: &PropertyKey) -> Option<Value> {
201 let columns = self.columns.read();
202 columns.get(key).and_then(|col| col.get(id))
203 }
204
205 pub fn remove(&self, id: Id, key: &PropertyKey) -> Option<Value> {
207 let mut columns = self.columns.write();
208 columns.get_mut(key).and_then(|col| col.remove(id))
209 }
210
211 pub fn remove_all(&self, id: Id) {
213 let mut columns = self.columns.write();
214 for col in columns.values_mut() {
215 col.remove(id);
216 }
217 }
218
219 #[must_use]
221 pub fn get_all(&self, id: Id) -> FxHashMap<PropertyKey, Value> {
222 let columns = self.columns.read();
223 let mut result = FxHashMap::default();
224 for (key, col) in columns.iter() {
225 if let Some(value) = col.get(id) {
226 result.insert(key.clone(), value);
227 }
228 }
229 result
230 }
231
232 #[must_use]
251 pub fn get_batch(&self, ids: &[Id], key: &PropertyKey) -> Vec<Option<Value>> {
252 let columns = self.columns.read();
253 match columns.get(key) {
254 Some(col) => ids.iter().map(|&id| col.get(id)).collect(),
255 None => vec![None; ids.len()],
256 }
257 }
258
259 #[must_use]
277 pub fn get_all_batch(&self, ids: &[Id]) -> Vec<FxHashMap<PropertyKey, Value>> {
278 let columns = self.columns.read();
279 let column_count = columns.len();
280
281 let mut results = Vec::with_capacity(ids.len());
283
284 for &id in ids {
285 let mut result = FxHashMap::with_capacity_and_hasher(column_count, Default::default());
287 for (key, col) in columns.iter() {
288 if let Some(value) = col.get(id) {
289 result.insert(key.clone(), value);
290 }
291 }
292 results.push(result);
293 }
294
295 results
296 }
297
298 #[must_use]
321 pub fn get_selective_batch(
322 &self,
323 ids: &[Id],
324 keys: &[PropertyKey],
325 ) -> Vec<FxHashMap<PropertyKey, Value>> {
326 if keys.is_empty() {
327 return vec![FxHashMap::default(); ids.len()];
329 }
330
331 let columns = self.columns.read();
332
333 let requested_columns: Vec<_> = keys
335 .iter()
336 .filter_map(|key| columns.get(key).map(|col| (key, col)))
337 .collect();
338
339 let mut results = Vec::with_capacity(ids.len());
341
342 for &id in ids {
343 let mut result =
344 FxHashMap::with_capacity_and_hasher(requested_columns.len(), Default::default());
345 for (key, col) in &requested_columns {
347 if let Some(value) = col.get(id) {
348 result.insert((*key).clone(), value);
349 }
350 }
351 results.push(result);
352 }
353
354 results
355 }
356
357 #[must_use]
359 pub fn column_count(&self) -> usize {
360 self.columns.read().len()
361 }
362
363 #[must_use]
365 pub fn keys(&self) -> Vec<PropertyKey> {
366 self.columns.read().keys().cloned().collect()
367 }
368
369 #[must_use]
371 pub fn column(&self, key: &PropertyKey) -> Option<PropertyColumnRef<'_, Id>> {
372 let columns = self.columns.read();
373 if columns.contains_key(key) {
374 Some(PropertyColumnRef {
375 _guard: columns,
376 key: key.clone(),
377 _marker: PhantomData,
378 })
379 } else {
380 None
381 }
382 }
383
384 #[must_use]
390 pub fn might_match(&self, key: &PropertyKey, op: CompareOp, value: &Value) -> bool {
391 let columns = self.columns.read();
392 columns
393 .get(key)
394 .map(|col| col.might_match(op, value))
395 .unwrap_or(true) }
397
398 #[must_use]
400 pub fn zone_map(&self, key: &PropertyKey) -> Option<ZoneMapEntry> {
401 let columns = self.columns.read();
402 columns.get(key).map(|col| col.zone_map().clone())
403 }
404
405 #[must_use]
410 pub fn might_match_range(
411 &self,
412 key: &PropertyKey,
413 min: Option<&Value>,
414 max: Option<&Value>,
415 min_inclusive: bool,
416 max_inclusive: bool,
417 ) -> bool {
418 let columns = self.columns.read();
419 columns
420 .get(key)
421 .map(|col| {
422 col.zone_map()
423 .might_contain_range(min, max, min_inclusive, max_inclusive)
424 })
425 .unwrap_or(true) }
427
428 pub fn rebuild_zone_maps(&self) {
430 let mut columns = self.columns.write();
431 for col in columns.values_mut() {
432 col.rebuild_zone_map();
433 }
434 }
435}
436
437impl<Id: EntityId> Default for PropertyStorage<Id> {
438 fn default() -> Self {
439 Self::new()
440 }
441}
442
443#[derive(Debug)]
448pub enum CompressedColumnData {
449 Integers {
451 data: CompressedData,
453 id_to_index: Vec<u64>,
455 index_to_id: Vec<u64>,
457 },
458 Strings {
460 encoding: DictionaryEncoding,
462 id_to_index: Vec<u64>,
464 index_to_id: Vec<u64>,
466 },
467 Booleans {
469 data: CompressedData,
471 id_to_index: Vec<u64>,
473 index_to_id: Vec<u64>,
475 },
476}
477
478impl CompressedColumnData {
479 #[must_use]
481 pub fn memory_usage(&self) -> usize {
482 match self {
483 CompressedColumnData::Integers {
484 data,
485 id_to_index,
486 index_to_id,
487 } => {
488 data.data.len()
489 + id_to_index.len() * std::mem::size_of::<u64>()
490 + index_to_id.len() * std::mem::size_of::<u64>()
491 }
492 CompressedColumnData::Strings {
493 encoding,
494 id_to_index,
495 index_to_id,
496 } => {
497 encoding.codes().len() * std::mem::size_of::<u32>()
498 + encoding.dictionary().iter().map(|s| s.len()).sum::<usize>()
499 + id_to_index.len() * std::mem::size_of::<u64>()
500 + index_to_id.len() * std::mem::size_of::<u64>()
501 }
502 CompressedColumnData::Booleans {
503 data,
504 id_to_index,
505 index_to_id,
506 } => {
507 data.data.len()
508 + id_to_index.len() * std::mem::size_of::<u64>()
509 + index_to_id.len() * std::mem::size_of::<u64>()
510 }
511 }
512 }
513
514 #[must_use]
516 #[allow(dead_code)]
517 pub fn compression_ratio(&self) -> f64 {
518 match self {
519 CompressedColumnData::Integers { data, .. } => data.compression_ratio(),
520 CompressedColumnData::Strings { encoding, .. } => encoding.compression_ratio(),
521 CompressedColumnData::Booleans { data, .. } => data.compression_ratio(),
522 }
523 }
524}
525
526#[derive(Debug, Clone, Default)]
528pub struct CompressionStats {
529 pub uncompressed_size: usize,
531 pub compressed_size: usize,
533 pub value_count: usize,
535 pub codec: Option<CompressionCodec>,
537}
538
539impl CompressionStats {
540 #[must_use]
542 pub fn compression_ratio(&self) -> f64 {
543 if self.compressed_size == 0 {
544 return 1.0;
545 }
546 self.uncompressed_size as f64 / self.compressed_size as f64
547 }
548}
549
550pub struct PropertyColumn<Id: EntityId = NodeId> {
560 values: FxHashMap<Id, Value>,
563 zone_map: ZoneMapEntry,
565 zone_map_dirty: bool,
567 compression_mode: CompressionMode,
569 compressed: Option<CompressedColumnData>,
571 compressed_count: usize,
573}
574
575impl<Id: EntityId> PropertyColumn<Id> {
576 #[must_use]
578 pub fn new() -> Self {
579 Self {
580 values: FxHashMap::default(),
581 zone_map: ZoneMapEntry::new(),
582 zone_map_dirty: false,
583 compression_mode: CompressionMode::None,
584 compressed: None,
585 compressed_count: 0,
586 }
587 }
588
589 #[must_use]
591 pub fn with_compression(mode: CompressionMode) -> Self {
592 Self {
593 values: FxHashMap::default(),
594 zone_map: ZoneMapEntry::new(),
595 zone_map_dirty: false,
596 compression_mode: mode,
597 compressed: None,
598 compressed_count: 0,
599 }
600 }
601
602 pub fn set_compression_mode(&mut self, mode: CompressionMode) {
604 self.compression_mode = mode;
605 if mode == CompressionMode::None {
606 if self.compressed.is_some() {
608 self.decompress_all();
609 }
610 }
611 }
612
613 #[must_use]
615 pub fn compression_mode(&self) -> CompressionMode {
616 self.compression_mode
617 }
618
619 pub fn set(&mut self, id: Id, value: Value) {
621 self.update_zone_map_on_insert(&value);
623 self.values.insert(id, value);
624
625 if self.compression_mode == CompressionMode::Auto {
627 let total_count = self.values.len() + self.compressed_count;
628 let hot_buffer_count = self.values.len();
629
630 if hot_buffer_count >= HOT_BUFFER_SIZE && total_count >= COMPRESSION_THRESHOLD {
632 self.compress();
633 }
634 }
635 }
636
637 fn update_zone_map_on_insert(&mut self, value: &Value) {
639 self.zone_map.row_count += 1;
640
641 if matches!(value, Value::Null) {
642 self.zone_map.null_count += 1;
643 return;
644 }
645
646 match &self.zone_map.min {
648 None => self.zone_map.min = Some(value.clone()),
649 Some(current) => {
650 if compare_values(value, current) == Some(Ordering::Less) {
651 self.zone_map.min = Some(value.clone());
652 }
653 }
654 }
655
656 match &self.zone_map.max {
658 None => self.zone_map.max = Some(value.clone()),
659 Some(current) => {
660 if compare_values(value, current) == Some(Ordering::Greater) {
661 self.zone_map.max = Some(value.clone());
662 }
663 }
664 }
665 }
666
667 #[must_use]
672 pub fn get(&self, id: Id) -> Option<Value> {
673 if let Some(value) = self.values.get(&id) {
675 return Some(value.clone());
676 }
677
678 None
683 }
684
685 pub fn remove(&mut self, id: Id) -> Option<Value> {
687 let removed = self.values.remove(&id);
688 if removed.is_some() {
689 self.zone_map_dirty = true;
691 }
692 removed
693 }
694
695 #[must_use]
697 #[allow(dead_code)]
698 pub fn len(&self) -> usize {
699 self.values.len() + self.compressed_count
700 }
701
702 #[must_use]
704 #[allow(dead_code)]
705 pub fn is_empty(&self) -> bool {
706 self.values.is_empty() && self.compressed_count == 0
707 }
708
709 #[allow(dead_code)]
714 pub fn iter(&self) -> impl Iterator<Item = (Id, &Value)> {
715 self.values.iter().map(|(&id, v)| (id, v))
716 }
717
718 #[must_use]
720 pub fn compression_stats(&self) -> CompressionStats {
721 let hot_size = self.values.len() * std::mem::size_of::<Value>();
722 let compressed_size = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
723 let codec = match &self.compressed {
724 Some(CompressedColumnData::Integers { data, .. }) => Some(data.codec),
725 Some(CompressedColumnData::Strings { .. }) => Some(CompressionCodec::Dictionary),
726 Some(CompressedColumnData::Booleans { data, .. }) => Some(data.codec),
727 None => None,
728 };
729
730 CompressionStats {
731 uncompressed_size: hot_size + self.compressed_count * std::mem::size_of::<Value>(),
732 compressed_size: hot_size + compressed_size,
733 value_count: self.len(),
734 codec,
735 }
736 }
737
738 #[must_use]
740 #[allow(dead_code)]
741 pub fn is_compressed(&self) -> bool {
742 self.compressed.is_some()
743 }
744
745 pub fn compress(&mut self) {
754 if self.values.is_empty() {
755 return;
756 }
757
758 if self.compressed.is_some() {
761 return;
762 }
763
764 let (int_count, str_count, bool_count) = self.count_types();
766 let total = self.values.len();
767
768 if int_count > total / 2 {
769 self.compress_as_integers();
770 } else if str_count > total / 2 {
771 self.compress_as_strings();
772 } else if bool_count > total / 2 {
773 self.compress_as_booleans();
774 }
775 }
777
778 fn count_types(&self) -> (usize, usize, usize) {
780 let mut int_count = 0;
781 let mut str_count = 0;
782 let mut bool_count = 0;
783
784 for value in self.values.values() {
785 match value {
786 Value::Int64(_) => int_count += 1,
787 Value::String(_) => str_count += 1,
788 Value::Bool(_) => bool_count += 1,
789 _ => {}
790 }
791 }
792
793 (int_count, str_count, bool_count)
794 }
795
796 #[allow(unsafe_code)]
798 fn compress_as_integers(&mut self) {
799 let mut values: Vec<(u64, i64)> = Vec::new();
801 let mut non_int_values: FxHashMap<Id, Value> = FxHashMap::default();
802
803 for (&id, value) in &self.values {
804 match value {
805 Value::Int64(v) => {
806 let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
808 values.push((id_u64, *v));
809 }
810 _ => {
811 non_int_values.insert(id, value.clone());
812 }
813 }
814 }
815
816 if values.len() < 8 {
817 return;
819 }
820
821 values.sort_by_key(|(id, _)| *id);
823
824 let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
825 let index_to_id: Vec<u64> = id_to_index.clone();
826 let int_values: Vec<i64> = values.iter().map(|(_, v)| *v).collect();
827
828 let compressed = TypeSpecificCompressor::compress_signed_integers(&int_values);
830
831 if compressed.compression_ratio() > 1.2 {
833 self.compressed = Some(CompressedColumnData::Integers {
834 data: compressed,
835 id_to_index,
836 index_to_id,
837 });
838 self.compressed_count = values.len();
839 self.values = non_int_values;
840 }
841 }
842
843 #[allow(unsafe_code)]
845 fn compress_as_strings(&mut self) {
846 let mut values: Vec<(u64, ArcStr)> = Vec::new();
847 let mut non_str_values: FxHashMap<Id, Value> = FxHashMap::default();
848
849 for (&id, value) in &self.values {
850 match value {
851 Value::String(s) => {
852 let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
853 values.push((id_u64, s.clone()));
854 }
855 _ => {
856 non_str_values.insert(id, value.clone());
857 }
858 }
859 }
860
861 if values.len() < 8 {
862 return;
863 }
864
865 values.sort_by_key(|(id, _)| *id);
867
868 let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
869 let index_to_id: Vec<u64> = id_to_index.clone();
870
871 let mut builder = DictionaryBuilder::new();
873 for (_, s) in &values {
874 builder.add(s.as_ref());
875 }
876 let encoding = builder.build();
877
878 if encoding.compression_ratio() > 1.2 {
880 self.compressed = Some(CompressedColumnData::Strings {
881 encoding,
882 id_to_index,
883 index_to_id,
884 });
885 self.compressed_count = values.len();
886 self.values = non_str_values;
887 }
888 }
889
890 #[allow(unsafe_code)]
892 fn compress_as_booleans(&mut self) {
893 let mut values: Vec<(u64, bool)> = Vec::new();
894 let mut non_bool_values: FxHashMap<Id, Value> = FxHashMap::default();
895
896 for (&id, value) in &self.values {
897 match value {
898 Value::Bool(b) => {
899 let id_u64 = unsafe { std::mem::transmute_copy::<Id, u64>(&id) };
900 values.push((id_u64, *b));
901 }
902 _ => {
903 non_bool_values.insert(id, value.clone());
904 }
905 }
906 }
907
908 if values.len() < 8 {
909 return;
910 }
911
912 values.sort_by_key(|(id, _)| *id);
914
915 let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
916 let index_to_id: Vec<u64> = id_to_index.clone();
917 let bool_values: Vec<bool> = values.iter().map(|(_, v)| *v).collect();
918
919 let compressed = TypeSpecificCompressor::compress_booleans(&bool_values);
920
921 self.compressed = Some(CompressedColumnData::Booleans {
923 data: compressed,
924 id_to_index,
925 index_to_id,
926 });
927 self.compressed_count = values.len();
928 self.values = non_bool_values;
929 }
930
931 #[allow(unsafe_code)]
933 fn decompress_all(&mut self) {
934 let Some(compressed) = self.compressed.take() else {
935 return;
936 };
937
938 match compressed {
939 CompressedColumnData::Integers {
940 data, index_to_id, ..
941 } => {
942 if let Ok(values) = TypeSpecificCompressor::decompress_integers(&data) {
943 let signed: Vec<i64> = values
945 .iter()
946 .map(|&v| crate::storage::zigzag_decode(v))
947 .collect();
948
949 for (i, id_u64) in index_to_id.iter().enumerate() {
950 if let Some(&value) = signed.get(i) {
951 let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
952 self.values.insert(id, Value::Int64(value));
953 }
954 }
955 }
956 }
957 CompressedColumnData::Strings {
958 encoding,
959 index_to_id,
960 ..
961 } => {
962 for (i, id_u64) in index_to_id.iter().enumerate() {
963 if let Some(s) = encoding.get(i) {
964 let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
965 self.values.insert(id, Value::String(ArcStr::from(s)));
966 }
967 }
968 }
969 CompressedColumnData::Booleans {
970 data, index_to_id, ..
971 } => {
972 if let Ok(values) = TypeSpecificCompressor::decompress_booleans(&data) {
973 for (i, id_u64) in index_to_id.iter().enumerate() {
974 if let Some(&value) = values.get(i) {
975 let id: Id = unsafe { std::mem::transmute_copy(id_u64) };
976 self.values.insert(id, Value::Bool(value));
977 }
978 }
979 }
980 }
981 }
982
983 self.compressed_count = 0;
984 }
985
986 pub fn force_compress(&mut self) {
990 self.compress();
991 }
992
993 #[must_use]
995 pub fn zone_map(&self) -> &ZoneMapEntry {
996 &self.zone_map
997 }
998
999 #[must_use]
1004 pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
1005 if self.zone_map_dirty {
1006 return true;
1008 }
1009
1010 match op {
1011 CompareOp::Eq => self.zone_map.might_contain_equal(value),
1012 CompareOp::Ne => {
1013 match (&self.zone_map.min, &self.zone_map.max) {
1016 (Some(min), Some(max)) => {
1017 !(compare_values(min, value) == Some(Ordering::Equal)
1018 && compare_values(max, value) == Some(Ordering::Equal))
1019 }
1020 _ => true,
1021 }
1022 }
1023 CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
1024 CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
1025 CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
1026 CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
1027 }
1028 }
1029
1030 pub fn rebuild_zone_map(&mut self) {
1032 let mut zone_map = ZoneMapEntry::new();
1033
1034 for value in self.values.values() {
1035 zone_map.row_count += 1;
1036
1037 if matches!(value, Value::Null) {
1038 zone_map.null_count += 1;
1039 continue;
1040 }
1041
1042 match &zone_map.min {
1044 None => zone_map.min = Some(value.clone()),
1045 Some(current) => {
1046 if compare_values(value, current) == Some(Ordering::Less) {
1047 zone_map.min = Some(value.clone());
1048 }
1049 }
1050 }
1051
1052 match &zone_map.max {
1054 None => zone_map.max = Some(value.clone()),
1055 Some(current) => {
1056 if compare_values(value, current) == Some(Ordering::Greater) {
1057 zone_map.max = Some(value.clone());
1058 }
1059 }
1060 }
1061 }
1062
1063 self.zone_map = zone_map;
1064 self.zone_map_dirty = false;
1065 }
1066}
1067
1068fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
1070 match (a, b) {
1071 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
1072 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
1073 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
1074 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
1075 (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
1076 (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
1077 _ => None,
1078 }
1079}
1080
1081impl<Id: EntityId> Default for PropertyColumn<Id> {
1082 fn default() -> Self {
1083 Self::new()
1084 }
1085}
1086
1087pub struct PropertyColumnRef<'a, Id: EntityId = NodeId> {
1091 _guard: parking_lot::RwLockReadGuard<'a, FxHashMap<PropertyKey, PropertyColumn<Id>>>,
1092 #[allow(dead_code)]
1093 key: PropertyKey,
1094 _marker: PhantomData<Id>,
1095}
1096
1097#[cfg(test)]
1098mod tests {
1099 use super::*;
1100 use arcstr::ArcStr;
1101
1102 #[test]
1103 fn test_property_storage_basic() {
1104 let storage = PropertyStorage::new();
1105
1106 let node1 = NodeId::new(1);
1107 let node2 = NodeId::new(2);
1108 let name_key = PropertyKey::new("name");
1109 let age_key = PropertyKey::new("age");
1110
1111 storage.set(node1, name_key.clone(), "Alice".into());
1112 storage.set(node1, age_key.clone(), 30i64.into());
1113 storage.set(node2, name_key.clone(), "Bob".into());
1114
1115 assert_eq!(
1116 storage.get(node1, &name_key),
1117 Some(Value::String("Alice".into()))
1118 );
1119 assert_eq!(storage.get(node1, &age_key), Some(Value::Int64(30)));
1120 assert_eq!(
1121 storage.get(node2, &name_key),
1122 Some(Value::String("Bob".into()))
1123 );
1124 assert!(storage.get(node2, &age_key).is_none());
1125 }
1126
1127 #[test]
1128 fn test_property_storage_remove() {
1129 let storage = PropertyStorage::new();
1130
1131 let node = NodeId::new(1);
1132 let key = PropertyKey::new("name");
1133
1134 storage.set(node, key.clone(), "Alice".into());
1135 assert!(storage.get(node, &key).is_some());
1136
1137 let removed = storage.remove(node, &key);
1138 assert!(removed.is_some());
1139 assert!(storage.get(node, &key).is_none());
1140 }
1141
1142 #[test]
1143 fn test_property_storage_get_all() {
1144 let storage = PropertyStorage::new();
1145
1146 let node = NodeId::new(1);
1147 storage.set(node, PropertyKey::new("name"), "Alice".into());
1148 storage.set(node, PropertyKey::new("age"), 30i64.into());
1149 storage.set(node, PropertyKey::new("active"), true.into());
1150
1151 let props = storage.get_all(node);
1152 assert_eq!(props.len(), 3);
1153 }
1154
1155 #[test]
1156 fn test_property_storage_remove_all() {
1157 let storage = PropertyStorage::new();
1158
1159 let node = NodeId::new(1);
1160 storage.set(node, PropertyKey::new("name"), "Alice".into());
1161 storage.set(node, PropertyKey::new("age"), 30i64.into());
1162
1163 storage.remove_all(node);
1164
1165 assert!(storage.get(node, &PropertyKey::new("name")).is_none());
1166 assert!(storage.get(node, &PropertyKey::new("age")).is_none());
1167 }
1168
1169 #[test]
1170 fn test_property_column() {
1171 let mut col = PropertyColumn::new();
1172
1173 col.set(NodeId::new(1), "Alice".into());
1174 col.set(NodeId::new(2), "Bob".into());
1175
1176 assert_eq!(col.len(), 2);
1177 assert!(!col.is_empty());
1178
1179 assert_eq!(col.get(NodeId::new(1)), Some(Value::String("Alice".into())));
1180
1181 col.remove(NodeId::new(1));
1182 assert!(col.get(NodeId::new(1)).is_none());
1183 assert_eq!(col.len(), 1);
1184 }
1185
1186 #[test]
1187 fn test_compression_mode() {
1188 let col: PropertyColumn<NodeId> = PropertyColumn::new();
1189 assert_eq!(col.compression_mode(), CompressionMode::None);
1190
1191 let col: PropertyColumn<NodeId> = PropertyColumn::with_compression(CompressionMode::Auto);
1192 assert_eq!(col.compression_mode(), CompressionMode::Auto);
1193 }
1194
1195 #[test]
1196 fn test_property_storage_with_compression() {
1197 let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1198
1199 for i in 0..100 {
1200 storage.set(
1201 NodeId::new(i),
1202 PropertyKey::new("age"),
1203 Value::Int64(20 + (i as i64 % 50)),
1204 );
1205 }
1206
1207 assert_eq!(
1209 storage.get(NodeId::new(0), &PropertyKey::new("age")),
1210 Some(Value::Int64(20))
1211 );
1212 assert_eq!(
1213 storage.get(NodeId::new(50), &PropertyKey::new("age")),
1214 Some(Value::Int64(20))
1215 );
1216 }
1217
1218 #[test]
1219 fn test_compress_integer_column() {
1220 let mut col: PropertyColumn<NodeId> =
1221 PropertyColumn::with_compression(CompressionMode::Auto);
1222
1223 for i in 0..2000 {
1225 col.set(NodeId::new(i), Value::Int64(1000 + i as i64));
1226 }
1227
1228 let stats = col.compression_stats();
1231 assert_eq!(stats.value_count, 2000);
1232
1233 let last_value = col.get(NodeId::new(1999));
1236 assert!(last_value.is_some() || col.is_compressed());
1237 }
1238
1239 #[test]
1240 fn test_compress_string_column() {
1241 let mut col: PropertyColumn<NodeId> =
1242 PropertyColumn::with_compression(CompressionMode::Auto);
1243
1244 let categories = ["Person", "Company", "Product", "Location"];
1246 for i in 0..2000 {
1247 let cat = categories[i % 4];
1248 col.set(NodeId::new(i as u64), Value::String(ArcStr::from(cat)));
1249 }
1250
1251 assert_eq!(col.len(), 2000);
1253
1254 let last_value = col.get(NodeId::new(1999));
1256 assert!(last_value.is_some() || col.is_compressed());
1257 }
1258
1259 #[test]
1260 fn test_compress_boolean_column() {
1261 let mut col: PropertyColumn<NodeId> =
1262 PropertyColumn::with_compression(CompressionMode::Auto);
1263
1264 for i in 0..2000 {
1266 col.set(NodeId::new(i as u64), Value::Bool(i % 2 == 0));
1267 }
1268
1269 assert_eq!(col.len(), 2000);
1271
1272 let last_value = col.get(NodeId::new(1999));
1274 assert!(last_value.is_some() || col.is_compressed());
1275 }
1276
1277 #[test]
1278 fn test_force_compress() {
1279 let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1280
1281 for i in 0..100 {
1283 col.set(NodeId::new(i), Value::Int64(i as i64));
1284 }
1285
1286 col.force_compress();
1288
1289 let stats = col.compression_stats();
1291 assert_eq!(stats.value_count, 100);
1292 }
1293
1294 #[test]
1295 fn test_compression_stats() {
1296 let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1297
1298 for i in 0..50 {
1299 col.set(NodeId::new(i), Value::Int64(i as i64));
1300 }
1301
1302 let stats = col.compression_stats();
1303 assert_eq!(stats.value_count, 50);
1304 assert!(stats.uncompressed_size > 0);
1305 }
1306
1307 #[test]
1308 fn test_storage_compression_stats() {
1309 let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1310
1311 for i in 0..100 {
1312 storage.set(
1313 NodeId::new(i),
1314 PropertyKey::new("age"),
1315 Value::Int64(i as i64),
1316 );
1317 storage.set(
1318 NodeId::new(i),
1319 PropertyKey::new("name"),
1320 Value::String(ArcStr::from("Alice")),
1321 );
1322 }
1323
1324 let stats = storage.compression_stats();
1325 assert_eq!(stats.len(), 2); assert!(stats.contains_key(&PropertyKey::new("age")));
1327 assert!(stats.contains_key(&PropertyKey::new("name")));
1328 }
1329
1330 #[test]
1331 fn test_memory_usage() {
1332 let storage = PropertyStorage::new();
1333
1334 for i in 0..100 {
1335 storage.set(
1336 NodeId::new(i),
1337 PropertyKey::new("value"),
1338 Value::Int64(i as i64),
1339 );
1340 }
1341
1342 let usage = storage.memory_usage();
1343 assert!(usage > 0);
1344 }
1345
1346 #[test]
1347 fn test_get_batch_single_property() {
1348 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1349
1350 let node1 = NodeId::new(1);
1351 let node2 = NodeId::new(2);
1352 let node3 = NodeId::new(3);
1353 let age_key = PropertyKey::new("age");
1354
1355 storage.set(node1, age_key.clone(), 25i64.into());
1356 storage.set(node2, age_key.clone(), 30i64.into());
1357 let ids = vec![node1, node2, node3];
1360 let values = storage.get_batch(&ids, &age_key);
1361
1362 assert_eq!(values.len(), 3);
1363 assert_eq!(values[0], Some(Value::Int64(25)));
1364 assert_eq!(values[1], Some(Value::Int64(30)));
1365 assert_eq!(values[2], None);
1366 }
1367
1368 #[test]
1369 fn test_get_batch_missing_column() {
1370 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1371
1372 let node1 = NodeId::new(1);
1373 let node2 = NodeId::new(2);
1374 let missing_key = PropertyKey::new("nonexistent");
1375
1376 let ids = vec![node1, node2];
1377 let values = storage.get_batch(&ids, &missing_key);
1378
1379 assert_eq!(values.len(), 2);
1380 assert_eq!(values[0], None);
1381 assert_eq!(values[1], None);
1382 }
1383
1384 #[test]
1385 fn test_get_batch_empty_ids() {
1386 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1387 let key = PropertyKey::new("any");
1388
1389 let values = storage.get_batch(&[], &key);
1390 assert!(values.is_empty());
1391 }
1392
1393 #[test]
1394 fn test_get_all_batch() {
1395 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1396
1397 let node1 = NodeId::new(1);
1398 let node2 = NodeId::new(2);
1399 let node3 = NodeId::new(3);
1400
1401 storage.set(node1, PropertyKey::new("name"), "Alice".into());
1402 storage.set(node1, PropertyKey::new("age"), 25i64.into());
1403 storage.set(node2, PropertyKey::new("name"), "Bob".into());
1404 let ids = vec![node1, node2, node3];
1407 let all_props = storage.get_all_batch(&ids);
1408
1409 assert_eq!(all_props.len(), 3);
1410 assert_eq!(all_props[0].len(), 2); assert_eq!(all_props[1].len(), 1); assert_eq!(all_props[2].len(), 0); assert_eq!(
1415 all_props[0].get(&PropertyKey::new("name")),
1416 Some(&Value::String("Alice".into()))
1417 );
1418 assert_eq!(
1419 all_props[1].get(&PropertyKey::new("name")),
1420 Some(&Value::String("Bob".into()))
1421 );
1422 }
1423
1424 #[test]
1425 fn test_get_all_batch_empty_ids() {
1426 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1427
1428 let all_props = storage.get_all_batch(&[]);
1429 assert!(all_props.is_empty());
1430 }
1431}