1use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
4use crate::graph::Direction;
5use crate::index::adjacency::ChunkedAdjacency;
6use graphos_common::types::{EdgeId, EpochId, NodeId, PropertyKey, Value};
7use graphos_common::utils::hash::FxHashMap;
8use parking_lot::RwLock;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11
12#[derive(Debug, Clone)]
14pub struct LpgStoreConfig {
15 pub backward_edges: bool,
17 pub initial_node_capacity: usize,
19 pub initial_edge_capacity: usize,
21}
22
23impl Default for LpgStoreConfig {
24 fn default() -> Self {
25 Self {
26 backward_edges: true,
27 initial_node_capacity: 1024,
28 initial_edge_capacity: 4096,
29 }
30 }
31}
32
33pub struct LpgStore {
38 config: LpgStoreConfig,
40
41 nodes: RwLock<FxHashMap<NodeId, NodeRecord>>,
43
44 edges: RwLock<FxHashMap<EdgeId, EdgeRecord>>,
46
47 node_properties: PropertyStorage<NodeId>,
49
50 edge_properties: PropertyStorage<EdgeId>,
52
53 label_to_id: RwLock<FxHashMap<Arc<str>, u8>>,
55
56 id_to_label: RwLock<Vec<Arc<str>>>,
58
59 edge_type_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
61
62 id_to_edge_type: RwLock<Vec<Arc<str>>>,
64
65 forward_adj: ChunkedAdjacency,
67
68 backward_adj: Option<ChunkedAdjacency>,
71
72 label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
74
75 next_node_id: AtomicU64,
77
78 next_edge_id: AtomicU64,
80
81 current_epoch: AtomicU64,
83}
84
85impl LpgStore {
86 #[must_use]
88 pub fn new() -> Self {
89 Self::with_config(LpgStoreConfig::default())
90 }
91
92 #[must_use]
94 pub fn with_config(config: LpgStoreConfig) -> Self {
95 let backward_adj = if config.backward_edges {
96 Some(ChunkedAdjacency::new())
97 } else {
98 None
99 };
100
101 Self {
102 nodes: RwLock::new(FxHashMap::default()),
103 edges: RwLock::new(FxHashMap::default()),
104 node_properties: PropertyStorage::new(),
105 edge_properties: PropertyStorage::new(),
106 label_to_id: RwLock::new(FxHashMap::default()),
107 id_to_label: RwLock::new(Vec::new()),
108 edge_type_to_id: RwLock::new(FxHashMap::default()),
109 id_to_edge_type: RwLock::new(Vec::new()),
110 forward_adj: ChunkedAdjacency::new(),
111 backward_adj,
112 label_index: RwLock::new(Vec::new()),
113 next_node_id: AtomicU64::new(0),
114 next_edge_id: AtomicU64::new(0),
115 current_epoch: AtomicU64::new(0),
116 config,
117 }
118 }
119
120 #[must_use]
122 pub fn current_epoch(&self) -> EpochId {
123 EpochId::new(self.current_epoch.load(Ordering::Acquire))
124 }
125
126 pub fn new_epoch(&self) -> EpochId {
128 let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
129 EpochId::new(id)
130 }
131
132 pub fn create_node(&self, labels: &[&str]) -> NodeId {
136 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
137 let epoch = self.current_epoch();
138
139 let mut record = NodeRecord::new(id, epoch);
140
141 for label in labels {
143 let label_id = self.get_or_create_label_id(*label);
144 record.set_label_bit(label_id);
145
146 let mut index = self.label_index.write();
148 while index.len() <= label_id as usize {
149 index.push(FxHashMap::default());
150 }
151 index[label_id as usize].insert(id, ());
152 }
153
154 self.nodes.write().insert(id, record);
155 id
156 }
157
158 pub fn create_node_with_props(
160 &self,
161 labels: &[&str],
162 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
163 ) -> NodeId {
164 let id = self.create_node(labels);
165
166 for (key, value) in properties {
167 self.node_properties.set(id, key.into(), value.into());
168 }
169
170 let count = self.node_properties.get_all(id).len() as u16;
172 if let Some(record) = self.nodes.write().get_mut(&id) {
173 record.props_count = count;
174 }
175
176 id
177 }
178
179 #[must_use]
181 pub fn get_node(&self, id: NodeId) -> Option<Node> {
182 let nodes = self.nodes.read();
183 let record = nodes.get(&id)?;
184
185 if record.is_deleted() {
186 return None;
187 }
188
189 let mut node = Node::new(id);
190
191 let id_to_label = self.id_to_label.read();
193 for bit in record.label_bits_iter() {
194 if let Some(label) = id_to_label.get(bit as usize) {
195 node.labels.push(label.clone());
196 }
197 }
198
199 node.properties = self
201 .node_properties
202 .get_all(id)
203 .into_iter()
204 .collect();
205
206 Some(node)
207 }
208
209 pub fn delete_node(&self, id: NodeId) -> bool {
211 let mut nodes = self.nodes.write();
212 if let Some(record) = nodes.get_mut(&id) {
213 if record.is_deleted() {
214 return false;
215 }
216
217 record.set_deleted(true);
218
219 let mut index = self.label_index.write();
221 for bit in record.label_bits_iter() {
222 if let Some(set) = index.get_mut(bit as usize) {
223 set.remove(&id);
224 }
225 }
226
227 drop(nodes); self.node_properties.remove_all(id);
230
231 true
234 } else {
235 false
236 }
237 }
238
239 #[must_use]
241 pub fn node_count(&self) -> usize {
242 self.nodes
243 .read()
244 .values()
245 .filter(|r| !r.is_deleted())
246 .count()
247 }
248
249 pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
253 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
254 let epoch = self.current_epoch();
255 let type_id = self.get_or_create_edge_type_id(edge_type);
256
257 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
258 self.edges.write().insert(id, record);
259
260 self.forward_adj.add_edge(src, dst, id);
262 if let Some(ref backward) = self.backward_adj {
263 backward.add_edge(dst, src, id);
264 }
265
266 id
267 }
268
269 pub fn create_edge_with_props(
271 &self,
272 src: NodeId,
273 dst: NodeId,
274 edge_type: &str,
275 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
276 ) -> EdgeId {
277 let id = self.create_edge(src, dst, edge_type);
278
279 for (key, value) in properties {
280 self.edge_properties.set(id, key.into(), value.into());
281 }
282
283 id
284 }
285
286 #[must_use]
288 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
289 let edges = self.edges.read();
290 let record = edges.get(&id)?;
291
292 if record.is_deleted() {
293 return None;
294 }
295
296 let edge_type = {
297 let id_to_type = self.id_to_edge_type.read();
298 id_to_type.get(record.type_id as usize)?.clone()
299 };
300
301 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
302
303 edge.properties = self
305 .edge_properties
306 .get_all(id)
307 .into_iter()
308 .collect();
309
310 Some(edge)
311 }
312
313 pub fn delete_edge(&self, id: EdgeId) -> bool {
315 let mut edges = self.edges.write();
316 if let Some(record) = edges.get_mut(&id) {
317 if record.is_deleted() {
318 return false;
319 }
320
321 let src = record.src;
322 let dst = record.dst;
323
324 record.set_deleted(true);
325
326 drop(edges); self.forward_adj.mark_deleted(src, id);
330 if let Some(ref backward) = self.backward_adj {
331 backward.mark_deleted(dst, id);
332 }
333
334 self.edge_properties.remove_all(id);
336
337 true
338 } else {
339 false
340 }
341 }
342
343 #[must_use]
345 pub fn edge_count(&self) -> usize {
346 self.edges
347 .read()
348 .values()
349 .filter(|r| !r.is_deleted())
350 .count()
351 }
352
353 pub fn neighbors(&self, node: NodeId, direction: Direction) -> impl Iterator<Item = NodeId> + '_ {
357 let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
358 Direction::Outgoing | Direction::Both => {
359 Box::new(self.forward_adj.neighbors(node))
360 }
361 Direction::Incoming => Box::new(std::iter::empty()),
362 };
363
364 let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
365 Direction::Incoming | Direction::Both => {
366 if let Some(ref adj) = self.backward_adj {
367 Box::new(adj.neighbors(node))
368 } else {
369 Box::new(std::iter::empty())
370 }
371 }
372 Direction::Outgoing => Box::new(std::iter::empty()),
373 };
374
375 forward.chain(backward)
376 }
377
378 pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
380 let label_to_id = self.label_to_id.read();
381 if let Some(&label_id) = label_to_id.get(label) {
382 let index = self.label_index.read();
383 if let Some(set) = index.get(label_id as usize) {
384 return set.keys().copied().collect();
385 }
386 }
387 Vec::new()
388 }
389
390 fn get_or_create_label_id(&self, label: &str) -> u8 {
393 {
394 let label_to_id = self.label_to_id.read();
395 if let Some(&id) = label_to_id.get(label) {
396 return id;
397 }
398 }
399
400 let mut label_to_id = self.label_to_id.write();
401 let mut id_to_label = self.id_to_label.write();
402
403 if let Some(&id) = label_to_id.get(label) {
405 return id;
406 }
407
408 let id = id_to_label.len() as u8;
409 assert!(id < 64, "Maximum 64 labels supported");
410
411 let label: Arc<str> = label.into();
412 label_to_id.insert(label.clone(), id);
413 id_to_label.push(label);
414
415 id
416 }
417
418 fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
419 {
420 let type_to_id = self.edge_type_to_id.read();
421 if let Some(&id) = type_to_id.get(edge_type) {
422 return id;
423 }
424 }
425
426 let mut type_to_id = self.edge_type_to_id.write();
427 let mut id_to_type = self.id_to_edge_type.write();
428
429 if let Some(&id) = type_to_id.get(edge_type) {
431 return id;
432 }
433
434 let id = id_to_type.len() as u32;
435 let edge_type: Arc<str> = edge_type.into();
436 type_to_id.insert(edge_type.clone(), id);
437 id_to_type.push(edge_type);
438
439 id
440 }
441}
442
443impl Default for LpgStore {
444 fn default() -> Self {
445 Self::new()
446 }
447}
448
449#[cfg(test)]
450mod tests {
451 use super::*;
452
453 #[test]
454 fn test_create_node() {
455 let store = LpgStore::new();
456
457 let id = store.create_node(&["Person"]);
458 assert!(id.is_valid());
459
460 let node = store.get_node(id).unwrap();
461 assert!(node.has_label("Person"));
462 assert!(!node.has_label("Animal"));
463 }
464
465 #[test]
466 fn test_create_node_with_props() {
467 let store = LpgStore::new();
468
469 let id = store.create_node_with_props(
470 &["Person"],
471 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
472 );
473
474 let node = store.get_node(id).unwrap();
475 assert_eq!(node.get_property("name").and_then(|v| v.as_str()), Some("Alice"));
476 assert_eq!(node.get_property("age").and_then(|v| v.as_int64()), Some(30));
477 }
478
479 #[test]
480 fn test_delete_node() {
481 let store = LpgStore::new();
482
483 let id = store.create_node(&["Person"]);
484 assert_eq!(store.node_count(), 1);
485
486 assert!(store.delete_node(id));
487 assert_eq!(store.node_count(), 0);
488 assert!(store.get_node(id).is_none());
489
490 assert!(!store.delete_node(id));
492 }
493
494 #[test]
495 fn test_create_edge() {
496 let store = LpgStore::new();
497
498 let alice = store.create_node(&["Person"]);
499 let bob = store.create_node(&["Person"]);
500
501 let edge_id = store.create_edge(alice, bob, "KNOWS");
502 assert!(edge_id.is_valid());
503
504 let edge = store.get_edge(edge_id).unwrap();
505 assert_eq!(edge.src, alice);
506 assert_eq!(edge.dst, bob);
507 assert_eq!(edge.edge_type.as_ref(), "KNOWS");
508 }
509
510 #[test]
511 fn test_neighbors() {
512 let store = LpgStore::new();
513
514 let a = store.create_node(&["Person"]);
515 let b = store.create_node(&["Person"]);
516 let c = store.create_node(&["Person"]);
517
518 store.create_edge(a, b, "KNOWS");
519 store.create_edge(a, c, "KNOWS");
520
521 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
522 assert_eq!(outgoing.len(), 2);
523 assert!(outgoing.contains(&b));
524 assert!(outgoing.contains(&c));
525
526 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
527 assert_eq!(incoming.len(), 1);
528 assert!(incoming.contains(&a));
529 }
530
531 #[test]
532 fn test_nodes_by_label() {
533 let store = LpgStore::new();
534
535 let p1 = store.create_node(&["Person"]);
536 let p2 = store.create_node(&["Person"]);
537 let _a = store.create_node(&["Animal"]);
538
539 let persons = store.nodes_by_label("Person");
540 assert_eq!(persons.len(), 2);
541 assert!(persons.contains(&p1));
542 assert!(persons.contains(&p2));
543
544 let animals = store.nodes_by_label("Animal");
545 assert_eq!(animals.len(), 1);
546 }
547
548 #[test]
549 fn test_delete_edge() {
550 let store = LpgStore::new();
551
552 let a = store.create_node(&["Person"]);
553 let b = store.create_node(&["Person"]);
554 let edge_id = store.create_edge(a, b, "KNOWS");
555
556 assert_eq!(store.edge_count(), 1);
557
558 assert!(store.delete_edge(edge_id));
559 assert_eq!(store.edge_count(), 0);
560 assert!(store.get_edge(edge_id).is_none());
561 }
562}