1use crate::index::zone_map::ZoneMapEntry;
8use graphos_common::types::{EdgeId, NodeId, PropertyKey, Value};
9use graphos_common::utils::hash::FxHashMap;
10use parking_lot::RwLock;
11use std::cmp::Ordering;
12use std::hash::Hash;
13use std::marker::PhantomData;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum CompareOp {
18 Eq,
20 Ne,
22 Lt,
24 Le,
26 Gt,
28 Ge,
30}
31
32pub trait EntityId: Copy + Eq + Hash + 'static {}
34
35impl EntityId for NodeId {}
36impl EntityId for EdgeId {}
37
38pub struct PropertyStorage<Id: EntityId = NodeId> {
46 columns: RwLock<FxHashMap<PropertyKey, PropertyColumn<Id>>>,
48 _marker: PhantomData<Id>,
49}
50
51impl<Id: EntityId> PropertyStorage<Id> {
52 #[must_use]
54 pub fn new() -> Self {
55 Self {
56 columns: RwLock::new(FxHashMap::default()),
57 _marker: PhantomData,
58 }
59 }
60
61 pub fn set(&self, id: Id, key: PropertyKey, value: Value) {
63 let mut columns = self.columns.write();
64 columns
65 .entry(key)
66 .or_insert_with(PropertyColumn::new)
67 .set(id, value);
68 }
69
70 #[must_use]
72 pub fn get(&self, id: Id, key: &PropertyKey) -> Option<Value> {
73 let columns = self.columns.read();
74 columns.get(key).and_then(|col| col.get(id))
75 }
76
77 pub fn remove(&self, id: Id, key: &PropertyKey) -> Option<Value> {
79 let mut columns = self.columns.write();
80 columns.get_mut(key).and_then(|col| col.remove(id))
81 }
82
83 pub fn remove_all(&self, id: Id) {
85 let mut columns = self.columns.write();
86 for col in columns.values_mut() {
87 col.remove(id);
88 }
89 }
90
91 #[must_use]
93 pub fn get_all(&self, id: Id) -> FxHashMap<PropertyKey, Value> {
94 let columns = self.columns.read();
95 let mut result = FxHashMap::default();
96 for (key, col) in columns.iter() {
97 if let Some(value) = col.get(id) {
98 result.insert(key.clone(), value);
99 }
100 }
101 result
102 }
103
104 #[must_use]
106 pub fn column_count(&self) -> usize {
107 self.columns.read().len()
108 }
109
110 #[must_use]
112 pub fn keys(&self) -> Vec<PropertyKey> {
113 self.columns.read().keys().cloned().collect()
114 }
115
116 #[must_use]
118 pub fn column(&self, key: &PropertyKey) -> Option<PropertyColumnRef<'_, Id>> {
119 let columns = self.columns.read();
120 if columns.contains_key(key) {
121 Some(PropertyColumnRef {
122 _guard: columns,
123 key: key.clone(),
124 _marker: PhantomData,
125 })
126 } else {
127 None
128 }
129 }
130
131 #[must_use]
136 pub fn might_match(&self, key: &PropertyKey, op: CompareOp, value: &Value) -> bool {
137 let columns = self.columns.read();
138 columns
139 .get(key)
140 .map(|col| col.might_match(op, value))
141 .unwrap_or(true) }
143
144 #[must_use]
146 pub fn zone_map(&self, key: &PropertyKey) -> Option<ZoneMapEntry> {
147 let columns = self.columns.read();
148 columns.get(key).map(|col| col.zone_map().clone())
149 }
150
151 pub fn rebuild_zone_maps(&self) {
153 let mut columns = self.columns.write();
154 for col in columns.values_mut() {
155 col.rebuild_zone_map();
156 }
157 }
158}
159
160impl<Id: EntityId> Default for PropertyStorage<Id> {
161 fn default() -> Self {
162 Self::new()
163 }
164}
165
166pub struct PropertyColumn<Id: EntityId = NodeId> {
171 values: FxHashMap<Id, Value>,
174 zone_map: ZoneMapEntry,
176 zone_map_dirty: bool,
178}
179
180impl<Id: EntityId> PropertyColumn<Id> {
181 #[must_use]
183 pub fn new() -> Self {
184 Self {
185 values: FxHashMap::default(),
186 zone_map: ZoneMapEntry::new(),
187 zone_map_dirty: false,
188 }
189 }
190
191 pub fn set(&mut self, id: Id, value: Value) {
193 self.update_zone_map_on_insert(&value);
195 self.values.insert(id, value);
196 }
197
198 fn update_zone_map_on_insert(&mut self, value: &Value) {
200 self.zone_map.row_count += 1;
201
202 if matches!(value, Value::Null) {
203 self.zone_map.null_count += 1;
204 return;
205 }
206
207 match &self.zone_map.min {
209 None => self.zone_map.min = Some(value.clone()),
210 Some(current) => {
211 if compare_values(value, current) == Some(Ordering::Less) {
212 self.zone_map.min = Some(value.clone());
213 }
214 }
215 }
216
217 match &self.zone_map.max {
219 None => self.zone_map.max = Some(value.clone()),
220 Some(current) => {
221 if compare_values(value, current) == Some(Ordering::Greater) {
222 self.zone_map.max = Some(value.clone());
223 }
224 }
225 }
226 }
227
228 #[must_use]
230 pub fn get(&self, id: Id) -> Option<Value> {
231 self.values.get(&id).cloned()
232 }
233
234 pub fn remove(&mut self, id: Id) -> Option<Value> {
236 let removed = self.values.remove(&id);
237 if removed.is_some() {
238 self.zone_map_dirty = true;
240 }
241 removed
242 }
243
244 #[must_use]
246 #[allow(dead_code)]
247 pub fn len(&self) -> usize {
248 self.values.len()
249 }
250
251 #[must_use]
253 #[allow(dead_code)]
254 pub fn is_empty(&self) -> bool {
255 self.values.is_empty()
256 }
257
258 #[allow(dead_code)]
260 pub fn iter(&self) -> impl Iterator<Item = (Id, &Value)> {
261 self.values.iter().map(|(&id, v)| (id, v))
262 }
263
264 #[must_use]
266 pub fn zone_map(&self) -> &ZoneMapEntry {
267 &self.zone_map
268 }
269
270 #[must_use]
275 pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
276 if self.zone_map_dirty {
277 return true;
279 }
280
281 match op {
282 CompareOp::Eq => self.zone_map.might_contain_equal(value),
283 CompareOp::Ne => {
284 match (&self.zone_map.min, &self.zone_map.max) {
287 (Some(min), Some(max)) => {
288 !(compare_values(min, value) == Some(Ordering::Equal)
289 && compare_values(max, value) == Some(Ordering::Equal))
290 }
291 _ => true,
292 }
293 }
294 CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
295 CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
296 CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
297 CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
298 }
299 }
300
301 pub fn rebuild_zone_map(&mut self) {
303 let mut zone_map = ZoneMapEntry::new();
304
305 for value in self.values.values() {
306 zone_map.row_count += 1;
307
308 if matches!(value, Value::Null) {
309 zone_map.null_count += 1;
310 continue;
311 }
312
313 match &zone_map.min {
315 None => zone_map.min = Some(value.clone()),
316 Some(current) => {
317 if compare_values(value, current) == Some(Ordering::Less) {
318 zone_map.min = Some(value.clone());
319 }
320 }
321 }
322
323 match &zone_map.max {
325 None => zone_map.max = Some(value.clone()),
326 Some(current) => {
327 if compare_values(value, current) == Some(Ordering::Greater) {
328 zone_map.max = Some(value.clone());
329 }
330 }
331 }
332 }
333
334 self.zone_map = zone_map;
335 self.zone_map_dirty = false;
336 }
337}
338
339fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
341 match (a, b) {
342 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
343 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
344 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
345 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
346 (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
347 (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
348 _ => None,
349 }
350}
351
352impl<Id: EntityId> Default for PropertyColumn<Id> {
353 fn default() -> Self {
354 Self::new()
355 }
356}
357
358pub struct PropertyColumnRef<'a, Id: EntityId = NodeId> {
360 _guard: parking_lot::RwLockReadGuard<'a, FxHashMap<PropertyKey, PropertyColumn<Id>>>,
361 #[allow(dead_code)]
362 key: PropertyKey,
363 _marker: PhantomData<Id>,
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369
370 #[test]
371 fn test_property_storage_basic() {
372 let storage = PropertyStorage::new();
373
374 let node1 = NodeId::new(1);
375 let node2 = NodeId::new(2);
376 let name_key = PropertyKey::new("name");
377 let age_key = PropertyKey::new("age");
378
379 storage.set(node1, name_key.clone(), "Alice".into());
380 storage.set(node1, age_key.clone(), 30i64.into());
381 storage.set(node2, name_key.clone(), "Bob".into());
382
383 assert_eq!(
384 storage.get(node1, &name_key),
385 Some(Value::String("Alice".into()))
386 );
387 assert_eq!(storage.get(node1, &age_key), Some(Value::Int64(30)));
388 assert_eq!(
389 storage.get(node2, &name_key),
390 Some(Value::String("Bob".into()))
391 );
392 assert!(storage.get(node2, &age_key).is_none());
393 }
394
395 #[test]
396 fn test_property_storage_remove() {
397 let storage = PropertyStorage::new();
398
399 let node = NodeId::new(1);
400 let key = PropertyKey::new("name");
401
402 storage.set(node, key.clone(), "Alice".into());
403 assert!(storage.get(node, &key).is_some());
404
405 let removed = storage.remove(node, &key);
406 assert!(removed.is_some());
407 assert!(storage.get(node, &key).is_none());
408 }
409
410 #[test]
411 fn test_property_storage_get_all() {
412 let storage = PropertyStorage::new();
413
414 let node = NodeId::new(1);
415 storage.set(node, PropertyKey::new("name"), "Alice".into());
416 storage.set(node, PropertyKey::new("age"), 30i64.into());
417 storage.set(node, PropertyKey::new("active"), true.into());
418
419 let props = storage.get_all(node);
420 assert_eq!(props.len(), 3);
421 }
422
423 #[test]
424 fn test_property_storage_remove_all() {
425 let storage = PropertyStorage::new();
426
427 let node = NodeId::new(1);
428 storage.set(node, PropertyKey::new("name"), "Alice".into());
429 storage.set(node, PropertyKey::new("age"), 30i64.into());
430
431 storage.remove_all(node);
432
433 assert!(storage.get(node, &PropertyKey::new("name")).is_none());
434 assert!(storage.get(node, &PropertyKey::new("age")).is_none());
435 }
436
437 #[test]
438 fn test_property_column() {
439 let mut col = PropertyColumn::new();
440
441 col.set(NodeId::new(1), "Alice".into());
442 col.set(NodeId::new(2), "Bob".into());
443
444 assert_eq!(col.len(), 2);
445 assert!(!col.is_empty());
446
447 assert_eq!(col.get(NodeId::new(1)), Some(Value::String("Alice".into())));
448
449 col.remove(NodeId::new(1));
450 assert!(col.get(NodeId::new(1)).is_none());
451 assert_eq!(col.len(), 1);
452 }
453}