1use crate::index::zone_map::ZoneMapEntry;
24use crate::storage::{
25 CompressedData, CompressionCodec, DictionaryBuilder, DictionaryEncoding, TypeSpecificCompressor,
26};
27use arcstr::ArcStr;
28use grafeo_common::types::{EdgeId, NodeId, PropertyKey, Value};
29use grafeo_common::utils::hash::FxHashMap;
30use parking_lot::RwLock;
31use std::cmp::Ordering;
32use std::hash::Hash;
33use std::marker::PhantomData;
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
37pub enum CompressionMode {
38 #[default]
40 None,
41 Auto,
43 Eager,
45}
46
47const COMPRESSION_THRESHOLD: usize = 1000;
49
50const HOT_BUFFER_SIZE: usize = 4096;
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum CompareOp {
61 Eq,
63 Ne,
65 Lt,
67 Le,
69 Gt,
71 Ge,
73}
74
75pub trait EntityId: Copy + Eq + Hash + 'static {
80 fn as_u64(self) -> u64;
82 fn from_u64(v: u64) -> Self;
84}
85
86impl EntityId for NodeId {
87 #[inline]
88 fn as_u64(self) -> u64 {
89 self.0
90 }
91 #[inline]
92 fn from_u64(v: u64) -> Self {
93 Self(v)
94 }
95}
96
97impl EntityId for EdgeId {
98 #[inline]
99 fn as_u64(self) -> u64 {
100 self.0
101 }
102 #[inline]
103 fn from_u64(v: u64) -> Self {
104 Self(v)
105 }
106}
107
108pub struct PropertyStorage<Id: EntityId = NodeId> {
133 columns: RwLock<FxHashMap<PropertyKey, PropertyColumn<Id>>>,
136 default_compression: CompressionMode,
138 _marker: PhantomData<Id>,
139}
140
141impl<Id: EntityId> PropertyStorage<Id> {
142 #[must_use]
144 pub fn new() -> Self {
145 Self {
146 columns: RwLock::new(FxHashMap::default()),
147 default_compression: CompressionMode::None,
148 _marker: PhantomData,
149 }
150 }
151
152 #[must_use]
154 pub fn with_compression(mode: CompressionMode) -> Self {
155 Self {
156 columns: RwLock::new(FxHashMap::default()),
157 default_compression: mode,
158 _marker: PhantomData,
159 }
160 }
161
162 pub fn set_default_compression(&mut self, mode: CompressionMode) {
164 self.default_compression = mode;
165 }
166
167 pub fn set(&self, id: Id, key: PropertyKey, value: Value) {
169 let mut columns = self.columns.write();
170 let mode = self.default_compression;
171 columns
172 .entry(key)
173 .or_insert_with(|| PropertyColumn::with_compression(mode))
174 .set(id, value);
175 }
176
177 pub fn enable_compression(&self, key: &PropertyKey, mode: CompressionMode) {
179 let mut columns = self.columns.write();
180 if let Some(col) = columns.get_mut(key) {
181 col.set_compression_mode(mode);
182 }
183 }
184
185 pub fn compress_all(&self) {
187 let mut columns = self.columns.write();
188 for col in columns.values_mut() {
189 if col.compression_mode() != CompressionMode::None {
190 col.compress();
191 }
192 }
193 }
194
195 pub fn force_compress_all(&self) {
197 let mut columns = self.columns.write();
198 for col in columns.values_mut() {
199 col.force_compress();
200 }
201 }
202
203 #[must_use]
205 pub fn compression_stats(&self) -> FxHashMap<PropertyKey, CompressionStats> {
206 let columns = self.columns.read();
207 columns
208 .iter()
209 .map(|(key, col)| (key.clone(), col.compression_stats()))
210 .collect()
211 }
212
213 #[must_use]
215 pub fn memory_usage(&self) -> usize {
216 let columns = self.columns.read();
217 columns
218 .values()
219 .map(|col| col.compression_stats().compressed_size)
220 .sum()
221 }
222
223 #[must_use]
225 pub fn heap_memory_bytes(&self) -> usize {
226 let columns = self.columns.read();
227 let map_overhead = columns.capacity()
229 * (std::mem::size_of::<PropertyKey>() + std::mem::size_of::<PropertyColumn<Id>>() + 1);
230 let column_bytes: usize = columns.values().map(|col| col.heap_memory_bytes()).sum();
232 map_overhead + column_bytes
233 }
234
235 #[must_use]
237 pub fn get(&self, id: Id, key: &PropertyKey) -> Option<Value> {
238 let columns = self.columns.read();
239 columns.get(key).and_then(|col| col.get(id))
240 }
241
242 pub fn remove(&self, id: Id, key: &PropertyKey) -> Option<Value> {
244 let mut columns = self.columns.write();
245 columns.get_mut(key).and_then(|col| col.remove(id))
246 }
247
248 pub fn remove_all(&self, id: Id) {
250 let mut columns = self.columns.write();
251 for col in columns.values_mut() {
252 col.remove(id);
253 }
254 }
255
256 #[must_use]
258 pub fn get_all(&self, id: Id) -> FxHashMap<PropertyKey, Value> {
259 let columns = self.columns.read();
260 let mut result = FxHashMap::default();
261 for (key, col) in columns.iter() {
262 if let Some(value) = col.get(id) {
263 result.insert(key.clone(), value);
264 }
265 }
266 result
267 }
268
269 #[must_use]
288 pub fn get_batch(&self, ids: &[Id], key: &PropertyKey) -> Vec<Option<Value>> {
289 let columns = self.columns.read();
290 match columns.get(key) {
291 Some(col) => ids.iter().map(|&id| col.get(id)).collect(),
292 None => vec![None; ids.len()],
293 }
294 }
295
296 #[must_use]
314 pub fn get_all_batch(&self, ids: &[Id]) -> Vec<FxHashMap<PropertyKey, Value>> {
315 let columns = self.columns.read();
316 let column_count = columns.len();
317
318 let mut results = Vec::with_capacity(ids.len());
320
321 for &id in ids {
322 let mut result = FxHashMap::with_capacity_and_hasher(column_count, Default::default());
324 for (key, col) in columns.iter() {
325 if let Some(value) = col.get(id) {
326 result.insert(key.clone(), value);
327 }
328 }
329 results.push(result);
330 }
331
332 results
333 }
334
335 #[must_use]
358 pub fn get_selective_batch(
359 &self,
360 ids: &[Id],
361 keys: &[PropertyKey],
362 ) -> Vec<FxHashMap<PropertyKey, Value>> {
363 if keys.is_empty() {
364 return vec![FxHashMap::default(); ids.len()];
366 }
367
368 let columns = self.columns.read();
369
370 let requested_columns: Vec<_> = keys
372 .iter()
373 .filter_map(|key| columns.get(key).map(|col| (key, col)))
374 .collect();
375
376 let mut results = Vec::with_capacity(ids.len());
378
379 for &id in ids {
380 let mut result =
381 FxHashMap::with_capacity_and_hasher(requested_columns.len(), Default::default());
382 for (key, col) in &requested_columns {
384 if let Some(value) = col.get(id) {
385 result.insert((*key).clone(), value);
386 }
387 }
388 results.push(result);
389 }
390
391 results
392 }
393
394 #[must_use]
396 pub fn column_count(&self) -> usize {
397 self.columns.read().len()
398 }
399
400 #[must_use]
402 pub fn keys(&self) -> Vec<PropertyKey> {
403 self.columns.read().keys().cloned().collect()
404 }
405
406 pub fn clear(&self) {
408 self.columns.write().clear();
409 }
410
411 #[must_use]
413 pub fn column(&self, key: &PropertyKey) -> Option<PropertyColumnRef<'_, Id>> {
414 let columns = self.columns.read();
415 if columns.contains_key(key) {
416 Some(PropertyColumnRef {
417 _guard: columns,
418 _key: key.clone(),
419 _marker: PhantomData,
420 })
421 } else {
422 None
423 }
424 }
425
426 #[must_use]
432 pub fn might_match(&self, key: &PropertyKey, op: CompareOp, value: &Value) -> bool {
433 let columns = self.columns.read();
434 columns
435 .get(key)
436 .map_or(true, |col| col.might_match(op, value)) }
438
439 #[must_use]
441 pub fn zone_map(&self, key: &PropertyKey) -> Option<ZoneMapEntry> {
442 let columns = self.columns.read();
443 columns.get(key).map(|col| col.zone_map().clone())
444 }
445
446 #[must_use]
451 pub fn might_match_range(
452 &self,
453 key: &PropertyKey,
454 min: Option<&Value>,
455 max: Option<&Value>,
456 min_inclusive: bool,
457 max_inclusive: bool,
458 ) -> bool {
459 let columns = self.columns.read();
460 columns.get(key).map_or(true, |col| {
461 col.zone_map()
462 .might_contain_range(min, max, min_inclusive, max_inclusive)
463 }) }
465
466 pub fn rebuild_zone_maps(&self) {
468 let mut columns = self.columns.write();
469 for col in columns.values_mut() {
470 col.rebuild_zone_map();
471 }
472 }
473}
474
475impl<Id: EntityId> Default for PropertyStorage<Id> {
476 fn default() -> Self {
477 Self::new()
478 }
479}
480
481#[derive(Debug)]
486pub enum CompressedColumnData {
487 Integers {
489 data: CompressedData,
491 id_to_index: Vec<u64>,
493 index_to_id: Vec<u64>,
495 },
496 Strings {
498 encoding: DictionaryEncoding,
500 id_to_index: Vec<u64>,
502 index_to_id: Vec<u64>,
504 },
505 Booleans {
507 data: CompressedData,
509 id_to_index: Vec<u64>,
511 index_to_id: Vec<u64>,
513 },
514}
515
516impl CompressedColumnData {
517 #[must_use]
519 pub fn memory_usage(&self) -> usize {
520 match self {
521 CompressedColumnData::Integers {
522 data,
523 id_to_index,
524 index_to_id,
525 } => {
526 data.data.len()
527 + id_to_index.len() * std::mem::size_of::<u64>()
528 + index_to_id.len() * std::mem::size_of::<u64>()
529 }
530 CompressedColumnData::Strings {
531 encoding,
532 id_to_index,
533 index_to_id,
534 } => {
535 encoding.codes().len() * std::mem::size_of::<u32>()
536 + encoding.dictionary().iter().map(|s| s.len()).sum::<usize>()
537 + id_to_index.len() * std::mem::size_of::<u64>()
538 + index_to_id.len() * std::mem::size_of::<u64>()
539 }
540 CompressedColumnData::Booleans {
541 data,
542 id_to_index,
543 index_to_id,
544 } => {
545 data.data.len()
546 + id_to_index.len() * std::mem::size_of::<u64>()
547 + index_to_id.len() * std::mem::size_of::<u64>()
548 }
549 }
550 }
551}
552
553#[derive(Debug, Clone, Default)]
555pub struct CompressionStats {
556 pub uncompressed_size: usize,
558 pub compressed_size: usize,
560 pub value_count: usize,
562 pub codec: Option<CompressionCodec>,
564}
565
566impl CompressionStats {
567 #[must_use]
569 pub fn compression_ratio(&self) -> f64 {
570 if self.compressed_size == 0 {
571 return 1.0;
572 }
573 self.uncompressed_size as f64 / self.compressed_size as f64
574 }
575}
576
577pub struct PropertyColumn<Id: EntityId = NodeId> {
587 values: FxHashMap<Id, Value>,
590 zone_map: ZoneMapEntry,
592 zone_map_dirty: bool,
594 compression_mode: CompressionMode,
596 compressed: Option<CompressedColumnData>,
598 compressed_count: usize,
600}
601
602impl<Id: EntityId> PropertyColumn<Id> {
603 #[must_use]
605 pub fn new() -> Self {
606 Self {
607 values: FxHashMap::default(),
608 zone_map: ZoneMapEntry::new(),
609 zone_map_dirty: false,
610 compression_mode: CompressionMode::None,
611 compressed: None,
612 compressed_count: 0,
613 }
614 }
615
616 #[must_use]
618 pub fn with_compression(mode: CompressionMode) -> Self {
619 Self {
620 values: FxHashMap::default(),
621 zone_map: ZoneMapEntry::new(),
622 zone_map_dirty: false,
623 compression_mode: mode,
624 compressed: None,
625 compressed_count: 0,
626 }
627 }
628
629 pub fn set_compression_mode(&mut self, mode: CompressionMode) {
631 self.compression_mode = mode;
632 if mode == CompressionMode::None {
633 if self.compressed.is_some() {
635 self.decompress_all();
636 }
637 }
638 }
639
640 #[must_use]
642 pub fn compression_mode(&self) -> CompressionMode {
643 self.compression_mode
644 }
645
646 pub fn set(&mut self, id: Id, value: Value) {
648 self.update_zone_map_on_insert(&value);
650 self.values.insert(id, value);
651
652 if self.compression_mode == CompressionMode::Auto {
654 let total_count = self.values.len() + self.compressed_count;
655 let hot_buffer_count = self.values.len();
656
657 if hot_buffer_count >= HOT_BUFFER_SIZE && total_count >= COMPRESSION_THRESHOLD {
659 self.compress();
660 }
661 }
662 }
663
664 fn update_zone_map_on_insert(&mut self, value: &Value) {
666 self.zone_map.row_count += 1;
667
668 if matches!(value, Value::Null) {
669 self.zone_map.null_count += 1;
670 return;
671 }
672
673 match &self.zone_map.min {
675 None => self.zone_map.min = Some(value.clone()),
676 Some(current) => {
677 if compare_values(value, current) == Some(Ordering::Less) {
678 self.zone_map.min = Some(value.clone());
679 }
680 }
681 }
682
683 match &self.zone_map.max {
685 None => self.zone_map.max = Some(value.clone()),
686 Some(current) => {
687 if compare_values(value, current) == Some(Ordering::Greater) {
688 self.zone_map.max = Some(value.clone());
689 }
690 }
691 }
692 }
693
694 #[must_use]
699 pub fn get(&self, id: Id) -> Option<Value> {
700 if let Some(value) = self.values.get(&id) {
702 return Some(value.clone());
703 }
704
705 None
710 }
711
712 pub fn remove(&mut self, id: Id) -> Option<Value> {
714 let removed = self.values.remove(&id);
715 if removed.is_some() {
716 self.zone_map_dirty = true;
718 }
719 removed
720 }
721
722 #[must_use]
724 pub fn len(&self) -> usize {
725 self.values.len() + self.compressed_count
726 }
727
728 #[cfg(test)]
730 #[must_use]
731 pub fn is_empty(&self) -> bool {
732 self.values.is_empty() && self.compressed_count == 0
733 }
734
735 #[must_use]
737 pub fn compression_stats(&self) -> CompressionStats {
738 let hot_size = self.values.len() * std::mem::size_of::<Value>();
739 let compressed_size = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
740 let codec = match &self.compressed {
741 Some(CompressedColumnData::Integers { data, .. }) => Some(data.codec),
742 Some(CompressedColumnData::Strings { .. }) => Some(CompressionCodec::Dictionary),
743 Some(CompressedColumnData::Booleans { data, .. }) => Some(data.codec),
744 None => None,
745 };
746
747 CompressionStats {
748 uncompressed_size: hot_size + self.compressed_count * std::mem::size_of::<Value>(),
749 compressed_size: hot_size + compressed_size,
750 value_count: self.len(),
751 codec,
752 }
753 }
754
755 #[must_use]
760 pub fn heap_memory_bytes(&self) -> usize {
761 let hot_bytes =
763 self.values.capacity() * (std::mem::size_of::<Id>() + std::mem::size_of::<Value>() + 1);
764 let compressed_bytes = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
766 hot_bytes + compressed_bytes
768 }
769
770 #[must_use]
772 #[cfg(test)]
773 pub fn is_compressed(&self) -> bool {
774 self.compressed.is_some()
775 }
776
777 pub fn compress(&mut self) {
786 if self.values.is_empty() {
787 return;
788 }
789
790 if self.compressed.is_some() {
793 return;
794 }
795
796 let (int_count, str_count, bool_count) = self.count_types();
798 let total = self.values.len();
799
800 if int_count > total / 2 {
801 self.compress_as_integers();
802 } else if str_count > total / 2 {
803 self.compress_as_strings();
804 } else if bool_count > total / 2 {
805 self.compress_as_booleans();
806 }
807 }
809
810 fn count_types(&self) -> (usize, usize, usize) {
812 let mut int_count = 0;
813 let mut str_count = 0;
814 let mut bool_count = 0;
815
816 for value in self.values.values() {
817 match value {
818 Value::Int64(_) => int_count += 1,
819 Value::String(_) => str_count += 1,
820 Value::Bool(_) => bool_count += 1,
821 _ => {}
822 }
823 }
824
825 (int_count, str_count, bool_count)
826 }
827
828 fn compress_as_integers(&mut self) {
830 let mut values: Vec<(u64, i64)> = Vec::new();
832 let mut non_int_values: FxHashMap<Id, Value> = FxHashMap::default();
833
834 for (&id, value) in &self.values {
835 match value {
836 Value::Int64(v) => {
837 let id_u64 = id.as_u64();
838 values.push((id_u64, *v));
839 }
840 _ => {
841 non_int_values.insert(id, value.clone());
842 }
843 }
844 }
845
846 if values.len() < 8 {
847 return;
849 }
850
851 values.sort_by_key(|(id, _)| *id);
853
854 let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
855 let index_to_id: Vec<u64> = id_to_index.clone();
856 let int_values: Vec<i64> = values.iter().map(|(_, v)| *v).collect();
857
858 let compressed = TypeSpecificCompressor::compress_signed_integers(&int_values);
860
861 if compressed.compression_ratio() > 1.2 {
863 self.compressed = Some(CompressedColumnData::Integers {
864 data: compressed,
865 id_to_index,
866 index_to_id,
867 });
868 self.compressed_count = values.len();
869 self.values = non_int_values;
870 }
871 }
872
873 fn compress_as_strings(&mut self) {
875 let mut values: Vec<(u64, ArcStr)> = Vec::new();
876 let mut non_str_values: FxHashMap<Id, Value> = FxHashMap::default();
877
878 for (&id, value) in &self.values {
879 match value {
880 Value::String(s) => {
881 values.push((id.as_u64(), s.clone()));
882 }
883 _ => {
884 non_str_values.insert(id, value.clone());
885 }
886 }
887 }
888
889 if values.len() < 8 {
890 return;
891 }
892
893 values.sort_by_key(|(id, _)| *id);
895
896 let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
897 let index_to_id: Vec<u64> = id_to_index.clone();
898
899 let mut builder = DictionaryBuilder::new();
901 for (_, s) in &values {
902 builder.add(s.as_ref());
903 }
904 let encoding = builder.build();
905
906 if encoding.compression_ratio() > 1.2 {
908 self.compressed = Some(CompressedColumnData::Strings {
909 encoding,
910 id_to_index,
911 index_to_id,
912 });
913 self.compressed_count = values.len();
914 self.values = non_str_values;
915 }
916 }
917
918 fn compress_as_booleans(&mut self) {
920 let mut values: Vec<(u64, bool)> = Vec::new();
921 let mut non_bool_values: FxHashMap<Id, Value> = FxHashMap::default();
922
923 for (&id, value) in &self.values {
924 match value {
925 Value::Bool(b) => {
926 values.push((id.as_u64(), *b));
927 }
928 _ => {
929 non_bool_values.insert(id, value.clone());
930 }
931 }
932 }
933
934 if values.len() < 8 {
935 return;
936 }
937
938 values.sort_by_key(|(id, _)| *id);
940
941 let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
942 let index_to_id: Vec<u64> = id_to_index.clone();
943 let bool_values: Vec<bool> = values.iter().map(|(_, v)| *v).collect();
944
945 let compressed = TypeSpecificCompressor::compress_booleans(&bool_values);
946
947 self.compressed = Some(CompressedColumnData::Booleans {
949 data: compressed,
950 id_to_index,
951 index_to_id,
952 });
953 self.compressed_count = values.len();
954 self.values = non_bool_values;
955 }
956
957 fn decompress_all(&mut self) {
959 let Some(compressed) = self.compressed.take() else {
960 return;
961 };
962
963 match compressed {
964 CompressedColumnData::Integers {
965 data, index_to_id, ..
966 } => {
967 if let Ok(values) = TypeSpecificCompressor::decompress_integers(&data) {
968 let signed: Vec<i64> = values
970 .iter()
971 .map(|&v| crate::storage::zigzag_decode(v))
972 .collect();
973
974 for (i, id_u64) in index_to_id.iter().enumerate() {
975 if let Some(&value) = signed.get(i) {
976 let id = Id::from_u64(*id_u64);
977 self.values.insert(id, Value::Int64(value));
978 }
979 }
980 }
981 }
982 CompressedColumnData::Strings {
983 encoding,
984 index_to_id,
985 ..
986 } => {
987 for (i, id_u64) in index_to_id.iter().enumerate() {
988 if let Some(s) = encoding.get(i) {
989 let id = Id::from_u64(*id_u64);
990 self.values.insert(id, Value::String(ArcStr::from(s)));
991 }
992 }
993 }
994 CompressedColumnData::Booleans {
995 data, index_to_id, ..
996 } => {
997 if let Ok(values) = TypeSpecificCompressor::decompress_booleans(&data) {
998 for (i, id_u64) in index_to_id.iter().enumerate() {
999 if let Some(&value) = values.get(i) {
1000 let id = Id::from_u64(*id_u64);
1001 self.values.insert(id, Value::Bool(value));
1002 }
1003 }
1004 }
1005 }
1006 }
1007
1008 self.compressed_count = 0;
1009 }
1010
1011 pub fn force_compress(&mut self) {
1015 self.compress();
1016 }
1017
1018 #[must_use]
1020 pub fn zone_map(&self) -> &ZoneMapEntry {
1021 &self.zone_map
1022 }
1023
1024 #[must_use]
1029 pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
1030 if self.zone_map_dirty {
1031 return true;
1033 }
1034
1035 match op {
1036 CompareOp::Eq => self.zone_map.might_contain_equal(value),
1037 CompareOp::Ne => {
1038 match (&self.zone_map.min, &self.zone_map.max) {
1041 (Some(min), Some(max)) => {
1042 !(compare_values(min, value) == Some(Ordering::Equal)
1043 && compare_values(max, value) == Some(Ordering::Equal))
1044 }
1045 _ => true,
1046 }
1047 }
1048 CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
1049 CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
1050 CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
1051 CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
1052 }
1053 }
1054
1055 pub fn rebuild_zone_map(&mut self) {
1057 let mut zone_map = ZoneMapEntry::new();
1058
1059 for value in self.values.values() {
1060 zone_map.row_count += 1;
1061
1062 if matches!(value, Value::Null) {
1063 zone_map.null_count += 1;
1064 continue;
1065 }
1066
1067 match &zone_map.min {
1069 None => zone_map.min = Some(value.clone()),
1070 Some(current) => {
1071 if compare_values(value, current) == Some(Ordering::Less) {
1072 zone_map.min = Some(value.clone());
1073 }
1074 }
1075 }
1076
1077 match &zone_map.max {
1079 None => zone_map.max = Some(value.clone()),
1080 Some(current) => {
1081 if compare_values(value, current) == Some(Ordering::Greater) {
1082 zone_map.max = Some(value.clone());
1083 }
1084 }
1085 }
1086 }
1087
1088 self.zone_map = zone_map;
1089 self.zone_map_dirty = false;
1090 }
1091}
1092
1093fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
1095 match (a, b) {
1096 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
1097 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
1098 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
1099 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
1100 (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
1101 (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
1102 (Value::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
1103 (Value::Date(a), Value::Date(b)) => Some(a.cmp(b)),
1104 (Value::Time(a), Value::Time(b)) => Some(a.cmp(b)),
1105 _ => None,
1106 }
1107}
1108
1109impl<Id: EntityId> Default for PropertyColumn<Id> {
1110 fn default() -> Self {
1111 Self::new()
1112 }
1113}
1114
1115pub struct PropertyColumnRef<'a, Id: EntityId = NodeId> {
1119 _guard: parking_lot::RwLockReadGuard<'a, FxHashMap<PropertyKey, PropertyColumn<Id>>>,
1120 _key: PropertyKey,
1121 _marker: PhantomData<Id>,
1122}
1123
1124#[cfg(test)]
1125mod tests {
1126 use super::*;
1127 use arcstr::ArcStr;
1128
1129 #[test]
1130 fn test_property_storage_basic() {
1131 let storage = PropertyStorage::new();
1132
1133 let node1 = NodeId::new(1);
1134 let node2 = NodeId::new(2);
1135 let name_key = PropertyKey::new("name");
1136 let age_key = PropertyKey::new("age");
1137
1138 storage.set(node1, name_key.clone(), "Alix".into());
1139 storage.set(node1, age_key.clone(), 30i64.into());
1140 storage.set(node2, name_key.clone(), "Gus".into());
1141
1142 assert_eq!(
1143 storage.get(node1, &name_key),
1144 Some(Value::String("Alix".into()))
1145 );
1146 assert_eq!(storage.get(node1, &age_key), Some(Value::Int64(30)));
1147 assert_eq!(
1148 storage.get(node2, &name_key),
1149 Some(Value::String("Gus".into()))
1150 );
1151 assert!(storage.get(node2, &age_key).is_none());
1152 }
1153
1154 #[test]
1155 fn test_property_storage_remove() {
1156 let storage = PropertyStorage::new();
1157
1158 let node = NodeId::new(1);
1159 let key = PropertyKey::new("name");
1160
1161 storage.set(node, key.clone(), "Alix".into());
1162 assert!(storage.get(node, &key).is_some());
1163
1164 let removed = storage.remove(node, &key);
1165 assert!(removed.is_some());
1166 assert!(storage.get(node, &key).is_none());
1167 }
1168
1169 #[test]
1170 fn test_property_storage_get_all() {
1171 let storage = PropertyStorage::new();
1172
1173 let node = NodeId::new(1);
1174 storage.set(node, PropertyKey::new("name"), "Alix".into());
1175 storage.set(node, PropertyKey::new("age"), 30i64.into());
1176 storage.set(node, PropertyKey::new("active"), true.into());
1177
1178 let props = storage.get_all(node);
1179 assert_eq!(props.len(), 3);
1180 }
1181
1182 #[test]
1183 fn test_property_storage_remove_all() {
1184 let storage = PropertyStorage::new();
1185
1186 let node = NodeId::new(1);
1187 storage.set(node, PropertyKey::new("name"), "Alix".into());
1188 storage.set(node, PropertyKey::new("age"), 30i64.into());
1189
1190 storage.remove_all(node);
1191
1192 assert!(storage.get(node, &PropertyKey::new("name")).is_none());
1193 assert!(storage.get(node, &PropertyKey::new("age")).is_none());
1194 }
1195
1196 #[test]
1197 fn test_property_column() {
1198 let mut col = PropertyColumn::new();
1199
1200 col.set(NodeId::new(1), "Alix".into());
1201 col.set(NodeId::new(2), "Gus".into());
1202
1203 assert_eq!(col.len(), 2);
1204 assert!(!col.is_empty());
1205
1206 assert_eq!(col.get(NodeId::new(1)), Some(Value::String("Alix".into())));
1207
1208 col.remove(NodeId::new(1));
1209 assert!(col.get(NodeId::new(1)).is_none());
1210 assert_eq!(col.len(), 1);
1211 }
1212
1213 #[test]
1214 fn test_compression_mode() {
1215 let col: PropertyColumn<NodeId> = PropertyColumn::new();
1216 assert_eq!(col.compression_mode(), CompressionMode::None);
1217
1218 let col: PropertyColumn<NodeId> = PropertyColumn::with_compression(CompressionMode::Auto);
1219 assert_eq!(col.compression_mode(), CompressionMode::Auto);
1220 }
1221
1222 #[test]
1223 fn test_property_storage_with_compression() {
1224 let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1225
1226 for i in 0..100 {
1227 storage.set(
1228 NodeId::new(i),
1229 PropertyKey::new("age"),
1230 Value::Int64(20 + (i as i64 % 50)),
1231 );
1232 }
1233
1234 assert_eq!(
1236 storage.get(NodeId::new(0), &PropertyKey::new("age")),
1237 Some(Value::Int64(20))
1238 );
1239 assert_eq!(
1240 storage.get(NodeId::new(50), &PropertyKey::new("age")),
1241 Some(Value::Int64(20))
1242 );
1243 }
1244
1245 #[test]
1246 fn test_compress_integer_column() {
1247 let mut col: PropertyColumn<NodeId> =
1248 PropertyColumn::with_compression(CompressionMode::Auto);
1249
1250 for i in 0..2000 {
1252 col.set(NodeId::new(i), Value::Int64(1000 + i as i64));
1253 }
1254
1255 let stats = col.compression_stats();
1258 assert_eq!(stats.value_count, 2000);
1259
1260 let last_value = col.get(NodeId::new(1999));
1263 assert!(last_value.is_some() || col.is_compressed());
1264 }
1265
1266 #[test]
1267 fn test_compress_string_column() {
1268 let mut col: PropertyColumn<NodeId> =
1269 PropertyColumn::with_compression(CompressionMode::Auto);
1270
1271 let categories = ["Person", "Company", "Product", "Location"];
1273 for i in 0..2000 {
1274 let cat = categories[i % 4];
1275 col.set(NodeId::new(i as u64), Value::String(ArcStr::from(cat)));
1276 }
1277
1278 assert_eq!(col.len(), 2000);
1280
1281 let last_value = col.get(NodeId::new(1999));
1283 assert!(last_value.is_some() || col.is_compressed());
1284 }
1285
1286 #[test]
1287 fn test_compress_boolean_column() {
1288 let mut col: PropertyColumn<NodeId> =
1289 PropertyColumn::with_compression(CompressionMode::Auto);
1290
1291 for i in 0..2000 {
1293 col.set(NodeId::new(i as u64), Value::Bool(i % 2 == 0));
1294 }
1295
1296 assert_eq!(col.len(), 2000);
1298
1299 let last_value = col.get(NodeId::new(1999));
1301 assert!(last_value.is_some() || col.is_compressed());
1302 }
1303
1304 #[test]
1305 fn test_force_compress() {
1306 let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1307
1308 for i in 0..100 {
1310 col.set(NodeId::new(i), Value::Int64(i as i64));
1311 }
1312
1313 col.force_compress();
1315
1316 let stats = col.compression_stats();
1318 assert_eq!(stats.value_count, 100);
1319 }
1320
1321 #[test]
1322 fn test_compression_stats() {
1323 let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
1324
1325 for i in 0..50 {
1326 col.set(NodeId::new(i), Value::Int64(i as i64));
1327 }
1328
1329 let stats = col.compression_stats();
1330 assert_eq!(stats.value_count, 50);
1331 assert!(stats.uncompressed_size > 0);
1332 }
1333
1334 #[test]
1335 fn test_storage_compression_stats() {
1336 let storage = PropertyStorage::with_compression(CompressionMode::Auto);
1337
1338 for i in 0..100 {
1339 storage.set(
1340 NodeId::new(i),
1341 PropertyKey::new("age"),
1342 Value::Int64(i as i64),
1343 );
1344 storage.set(
1345 NodeId::new(i),
1346 PropertyKey::new("name"),
1347 Value::String(ArcStr::from("Alix")),
1348 );
1349 }
1350
1351 let stats = storage.compression_stats();
1352 assert_eq!(stats.len(), 2); assert!(stats.contains_key(&PropertyKey::new("age")));
1354 assert!(stats.contains_key(&PropertyKey::new("name")));
1355 }
1356
1357 #[test]
1358 fn test_memory_usage() {
1359 let storage = PropertyStorage::new();
1360
1361 for i in 0..100 {
1362 storage.set(
1363 NodeId::new(i),
1364 PropertyKey::new("value"),
1365 Value::Int64(i as i64),
1366 );
1367 }
1368
1369 let usage = storage.memory_usage();
1370 assert!(usage > 0);
1371 }
1372
1373 #[test]
1374 fn test_get_batch_single_property() {
1375 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1376
1377 let node1 = NodeId::new(1);
1378 let node2 = NodeId::new(2);
1379 let node3 = NodeId::new(3);
1380 let age_key = PropertyKey::new("age");
1381
1382 storage.set(node1, age_key.clone(), 25i64.into());
1383 storage.set(node2, age_key.clone(), 30i64.into());
1384 let ids = vec![node1, node2, node3];
1387 let values = storage.get_batch(&ids, &age_key);
1388
1389 assert_eq!(values.len(), 3);
1390 assert_eq!(values[0], Some(Value::Int64(25)));
1391 assert_eq!(values[1], Some(Value::Int64(30)));
1392 assert_eq!(values[2], None);
1393 }
1394
1395 #[test]
1396 fn test_get_batch_missing_column() {
1397 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1398
1399 let node1 = NodeId::new(1);
1400 let node2 = NodeId::new(2);
1401 let missing_key = PropertyKey::new("nonexistent");
1402
1403 let ids = vec![node1, node2];
1404 let values = storage.get_batch(&ids, &missing_key);
1405
1406 assert_eq!(values.len(), 2);
1407 assert_eq!(values[0], None);
1408 assert_eq!(values[1], None);
1409 }
1410
1411 #[test]
1412 fn test_get_batch_empty_ids() {
1413 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1414 let key = PropertyKey::new("any");
1415
1416 let values = storage.get_batch(&[], &key);
1417 assert!(values.is_empty());
1418 }
1419
1420 #[test]
1421 fn test_get_all_batch() {
1422 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1423
1424 let node1 = NodeId::new(1);
1425 let node2 = NodeId::new(2);
1426 let node3 = NodeId::new(3);
1427
1428 storage.set(node1, PropertyKey::new("name"), "Alix".into());
1429 storage.set(node1, PropertyKey::new("age"), 25i64.into());
1430 storage.set(node2, PropertyKey::new("name"), "Gus".into());
1431 let ids = vec![node1, node2, node3];
1434 let all_props = storage.get_all_batch(&ids);
1435
1436 assert_eq!(all_props.len(), 3);
1437 assert_eq!(all_props[0].len(), 2); assert_eq!(all_props[1].len(), 1); assert_eq!(all_props[2].len(), 0); assert_eq!(
1442 all_props[0].get(&PropertyKey::new("name")),
1443 Some(&Value::String("Alix".into()))
1444 );
1445 assert_eq!(
1446 all_props[1].get(&PropertyKey::new("name")),
1447 Some(&Value::String("Gus".into()))
1448 );
1449 }
1450
1451 #[test]
1452 fn test_get_all_batch_empty_ids() {
1453 let storage: PropertyStorage<NodeId> = PropertyStorage::new();
1454
1455 let all_props = storage.get_all_batch(&[]);
1456 assert!(all_props.is_empty());
1457 }
1458}