1#[cfg(feature = "search")]
2use crate::search::graph_index::GraphIndex;
3use crate::{
4 core::entities::{graph::tgraph::TemporalGraph, nodes::node_ref::NodeRef},
5 db::api::view::{
6 internal::{InheritEdgeHistoryFilter, InheritNodeHistoryFilter, InternalStorageOps},
7 Base, InheritViewOps,
8 },
9};
10use parking_lot::{RwLock, RwLockWriteGuard};
11use raphtory_api::core::{
12 entities::{EID, VID},
13 storage::{dict_mapper::MaybeNew, timeindex::EventTime},
14};
15use raphtory_storage::graph::graph::GraphStorage;
16use serde::{Deserialize, Serialize};
17use std::{
18 fmt::{Display, Formatter},
19 ops::{Deref, DerefMut},
20 sync::Arc,
21};
22use tracing::info;
23
24#[cfg(feature = "search")]
25use crate::search::graph_index::MutableGraphIndex;
26use crate::{db::api::view::IndexSpec, errors::GraphError};
27use raphtory_api::core::entities::{
28 properties::prop::{Prop, PropType},
29 GidRef,
30};
31use raphtory_core::storage::{
32 raw_edges::{EdgeWGuard, WriteLockedEdges},
33 EntryMut, NodeSlot, WriteLockedNodes,
34};
35use raphtory_storage::{
36 core_ops::InheritCoreGraphOps,
37 graph::{locked::WriteLockedGraph, nodes::node_storage_ops::NodeStorageOps},
38 layer_ops::InheritLayerOps,
39 mutation::{
40 addition_ops::InternalAdditionOps, deletion_ops::InternalDeletionOps,
41 property_addition_ops::InternalPropertyAdditionOps,
42 },
43};
44#[cfg(feature = "proto")]
45use {
46 crate::serialise::incremental::{GraphWriter, InternalCache},
47 crate::serialise::GraphFolder,
48 once_cell::sync::OnceCell,
49};
50
51#[derive(Debug, Default, Serialize, Deserialize)]
52pub struct Storage {
53 graph: GraphStorage,
54 #[cfg(feature = "proto")]
55 #[serde(skip)]
56 pub(crate) cache: OnceCell<GraphWriter>,
57 #[cfg(feature = "search")]
58 #[serde(skip)]
59 pub(crate) index: RwLock<GraphIndex>,
60 }
62
63impl From<GraphStorage> for Storage {
64 fn from(graph: GraphStorage) -> Self {
65 Self::from_inner(graph)
66 }
67}
68
69impl InheritLayerOps for Storage {}
70impl InheritCoreGraphOps for Storage {}
71
72impl Display for Storage {
73 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
74 Display::fmt(&self.graph, f)
75 }
76}
77
78impl Base for Storage {
79 type Base = GraphStorage;
80
81 #[inline]
82 fn base(&self) -> &Self::Base {
83 &self.graph
84 }
85}
86
87#[cfg(feature = "search")]
88const IN_MEMORY_INDEX_NOT_PERSISTED: &str = "In-memory index not persisted. Not supported";
89
90impl Storage {
91 pub(crate) fn new(num_locks: usize) -> Self {
92 Self {
93 graph: GraphStorage::Unlocked(Arc::new(TemporalGraph::new(num_locks))),
94 #[cfg(feature = "proto")]
95 cache: OnceCell::new(),
96 #[cfg(feature = "search")]
97 index: RwLock::new(GraphIndex::Empty),
98 }
99 }
100
101 pub(crate) fn from_inner(graph: GraphStorage) -> Self {
102 Self {
103 graph,
104 #[cfg(feature = "proto")]
105 cache: OnceCell::new(),
106 #[cfg(feature = "search")]
107 index: RwLock::new(GraphIndex::Empty),
108 }
109 }
110
111 #[cfg(feature = "proto")]
112 #[inline]
113 fn if_cache(&self, map_fn: impl FnOnce(&GraphWriter)) {
114 if let Some(cache) = self.cache.get() {
115 map_fn(cache)
116 }
117 }
118
119 #[cfg(feature = "search")]
120 #[inline]
121 fn if_index(
122 &self,
123 map_fn: impl FnOnce(&GraphIndex) -> Result<(), GraphError>,
124 ) -> Result<(), GraphError> {
125 map_fn(&self.index.read_recursive())?;
126 Ok(())
127 }
128
129 #[cfg(feature = "search")]
130 #[inline]
131 fn if_index_mut(
132 &self,
133 map_fn: impl FnOnce(&MutableGraphIndex) -> Result<(), GraphError>,
134 ) -> Result<(), GraphError> {
135 let guard = self.index.read_recursive();
136 match guard.deref() {
137 GraphIndex::Empty => {}
138 GraphIndex::Mutable(i) => map_fn(i)?,
139 GraphIndex::Immutable(_) => {
140 drop(guard);
141 let mut guard = self.index.write();
142 guard.make_mutable_if_needed()?;
143 if let GraphIndex::Mutable(m) = guard.deref_mut() {
144 map_fn(m)?
145 }
146 }
147 }
148 Ok(())
149 }
150}
151
152#[cfg(feature = "search")]
153impl Storage {
154 pub(crate) fn get_index_spec(&self) -> Result<IndexSpec, GraphError> {
155 Ok(self.index.read_recursive().index_spec())
156 }
157
158 pub(crate) fn load_index_if_empty(&self, path: &GraphFolder) -> Result<(), GraphError> {
159 let guard = self.index.read_recursive();
160 match guard.deref() {
161 GraphIndex::Empty => {
162 drop(guard);
163 let mut guard = self.index.write();
164 if let e @ GraphIndex::Empty = guard.deref_mut() {
165 let index = GraphIndex::load_from_path(&path)?;
166 *e = index;
167 }
168 }
169 _ => {}
170 }
171 Ok(())
172 }
173
174 pub(crate) fn create_index_if_empty(&self, index_spec: IndexSpec) -> Result<(), GraphError> {
175 {
176 let guard = self.index.read_recursive();
177 match guard.deref() {
178 GraphIndex::Empty => {
179 drop(guard);
180 let mut guard = self.index.write();
181 if let e @ GraphIndex::Empty = guard.deref_mut() {
182 let cached_graph_path = self.get_cache().map(|cache| cache.folder.clone());
183 let index = GraphIndex::create(&self.graph, false, cached_graph_path)?;
184 *e = index;
185 }
186 }
187 _ => {}
188 }
189 }
190 self.if_index_mut(|index| index.update(&self.graph, index_spec))?;
191 Ok(())
192 }
193
194 pub(crate) fn create_index_in_ram_if_empty(
195 &self,
196 index_spec: IndexSpec,
197 ) -> Result<(), GraphError> {
198 {
199 let guard = self.index.read_recursive();
200 match guard.deref() {
201 GraphIndex::Empty => {
202 drop(guard);
203 let mut guard = self.index.write();
204 if let e @ GraphIndex::Empty = guard.deref_mut() {
205 let index = GraphIndex::create(&self.graph, true, None)?;
206 *e = index;
207 }
208 }
209 _ => {}
210 }
211 }
212 if self.index.read_recursive().path().is_some() {
213 return Err(GraphError::OnDiskIndexAlreadyExists);
214 }
215 self.if_index_mut(|index| index.update(&self.graph, index_spec))?;
216 Ok(())
217 }
218
219 pub(crate) fn get_index(&self) -> &RwLock<GraphIndex> {
220 &self.index
221 }
222
223 pub(crate) fn is_indexed(&self) -> bool {
224 self.index.read_recursive().is_indexed()
225 }
226
227 pub(crate) fn persist_index_to_disk(&self, path: &GraphFolder) -> Result<(), GraphError> {
228 let guard = self.get_index().read_recursive();
229 if guard.is_indexed() {
230 if guard.path().is_none() {
231 info!("{}", IN_MEMORY_INDEX_NOT_PERSISTED);
232 return Ok(());
233 }
234 self.if_index(|index| index.persist_to_disk(path))?;
235 }
236 Ok(())
237 }
238
239 pub(crate) fn persist_index_to_disk_zip(&self, path: &GraphFolder) -> Result<(), GraphError> {
240 let guard = self.get_index().read_recursive();
241 if guard.is_indexed() {
242 if guard.path().is_none() {
243 info!("{}", IN_MEMORY_INDEX_NOT_PERSISTED);
244 return Ok(());
245 }
246 self.if_index(|index| index.persist_to_disk_zip(path))?;
247 }
248 Ok(())
249 }
250
251 pub(crate) fn drop_index(&self) -> Result<(), GraphError> {
252 let mut guard = self.index.write();
253 *guard = GraphIndex::Empty;
254 Ok(())
255 }
256}
257
258impl InternalStorageOps for Storage {
259 fn get_storage(&self) -> Option<&Storage> {
260 Some(self)
261 }
262}
263
264impl InheritNodeHistoryFilter for Storage {}
265impl InheritEdgeHistoryFilter for Storage {}
266
267impl InheritViewOps for Storage {}
268
269impl InternalAdditionOps for Storage {
270 type Error = GraphError;
271
272 fn write_lock(&self) -> Result<WriteLockedGraph<'_>, Self::Error> {
273 Ok(self.graph.write_lock()?)
274 }
275
276 fn write_lock_nodes(&self) -> Result<WriteLockedNodes<'_>, Self::Error> {
277 Ok(self.graph.write_lock_nodes()?)
278 }
279
280 fn write_lock_edges(&self) -> Result<WriteLockedEdges<'_>, Self::Error> {
281 Ok(self.graph.write_lock_edges()?)
282 }
283
284 fn next_event_id(&self) -> Result<usize, Self::Error> {
285 Ok(self.graph.next_event_id()?)
286 }
287
288 fn reserve_event_ids(&self, num_ids: usize) -> Result<usize, Self::Error> {
289 Ok(self.graph.reserve_event_ids(num_ids)?)
290 }
291
292 fn resolve_layer(&self, layer: Option<&str>) -> Result<MaybeNew<usize>, GraphError> {
293 let id = self.graph.resolve_layer(layer)?;
294
295 #[cfg(feature = "proto")]
296 self.if_cache(|cache| cache.resolve_layer(layer, id));
297
298 Ok(id)
299 }
300
301 fn resolve_node(&self, id: NodeRef) -> Result<MaybeNew<VID>, GraphError> {
302 match id {
303 NodeRef::Internal(id) => Ok(MaybeNew::Existing(id)),
304 NodeRef::External(gid) => {
305 let id = self.graph.resolve_node(id)?;
306
307 #[cfg(feature = "proto")]
308 self.if_cache(|cache| cache.resolve_node(id, gid));
309
310 Ok(id)
311 }
312 }
313 }
314
315 fn set_node(&self, gid: GidRef, vid: VID) -> Result<(), Self::Error> {
316 Ok(self.graph.set_node(gid, vid)?)
317 }
318
319 fn resolve_node_and_type(
320 &self,
321 id: NodeRef,
322 node_type: &str,
323 ) -> Result<MaybeNew<(MaybeNew<VID>, MaybeNew<usize>)>, GraphError> {
324 let node_and_type = self.graph.resolve_node_and_type(id, node_type)?;
325
326 #[cfg(feature = "proto")]
327 self.if_cache(|cache| {
328 let (vid, _) = node_and_type.inner();
329 let node_entry = self.graph.core_node(vid.inner());
330 cache.resolve_node_and_type(node_and_type, node_type, node_entry.id())
331 });
332
333 Ok(node_and_type)
334 }
335
336 fn resolve_graph_property(
337 &self,
338 prop: &str,
339 dtype: PropType,
340 is_static: bool,
341 ) -> Result<MaybeNew<usize>, GraphError> {
342 let id = self
343 .graph
344 .resolve_graph_property(prop, dtype.clone(), is_static)?;
345
346 #[cfg(feature = "proto")]
347 self.if_cache(|cache| cache.resolve_graph_property(prop, id, dtype, is_static));
348
349 Ok(id)
350 }
351
352 fn resolve_node_property(
353 &self,
354 prop: &str,
355 dtype: PropType,
356 is_static: bool,
357 ) -> Result<MaybeNew<usize>, GraphError> {
358 let id = self
359 .graph
360 .resolve_node_property(prop, dtype.clone(), is_static)?;
361
362 #[cfg(feature = "proto")]
363 self.if_cache(|cache| cache.resolve_node_property(prop, id, &dtype, is_static));
364
365 Ok(id)
366 }
367
368 fn resolve_edge_property(
369 &self,
370 prop: &str,
371 dtype: PropType,
372 is_static: bool,
373 ) -> Result<MaybeNew<usize>, GraphError> {
374 let id = self
375 .graph
376 .resolve_edge_property(prop, dtype.clone(), is_static)?;
377
378 #[cfg(feature = "proto")]
379 self.if_cache(|cache| cache.resolve_edge_property(prop, id, &dtype, is_static));
380
381 Ok(id)
382 }
383
384 fn internal_add_node(
385 &self,
386 t: EventTime,
387 v: VID,
388 props: &[(usize, Prop)],
389 ) -> Result<(), GraphError> {
390 self.graph.internal_add_node(t, v, props)?;
391
392 #[cfg(feature = "proto")]
393 self.if_cache(|cache| cache.add_node_update(t, v, props));
394
395 #[cfg(feature = "search")]
396 self.if_index_mut(|index| index.add_node_update(&self.graph, t, MaybeNew::New(v), props))?;
397
398 Ok(())
399 }
400
401 fn internal_add_edge(
402 &self,
403 t: EventTime,
404 src: VID,
405 dst: VID,
406 props: &[(usize, Prop)],
407 layer: usize,
408 ) -> Result<MaybeNew<EID>, GraphError> {
409 let id = self.graph.internal_add_edge(t, src, dst, props, layer)?;
410
411 #[cfg(feature = "proto")]
412 self.if_cache(|cache| {
413 cache.resolve_edge(id, src, dst);
414 cache.add_edge_update(t, id.inner(), props, layer);
415 });
416
417 #[cfg(feature = "search")]
418 self.if_index_mut(|index| index.add_edge_update(&self.graph, id, t, layer, props))?;
419
420 Ok(id)
421 }
422
423 fn internal_add_edge_update(
424 &self,
425 t: EventTime,
426 edge: EID,
427 props: &[(usize, Prop)],
428 layer: usize,
429 ) -> Result<(), GraphError> {
430 self.graph.internal_add_edge_update(t, edge, props, layer)?;
431
432 #[cfg(feature = "proto")]
433 self.if_cache(|cache| cache.add_edge_update(t, edge, props, layer));
434
435 #[cfg(feature = "search")]
436 self.if_index_mut(|index| {
437 index.add_edge_update(&self.graph, MaybeNew::Existing(edge), t, layer, props)
438 })?;
439
440 Ok(())
441 }
442}
443
444impl InternalPropertyAdditionOps for Storage {
445 type Error = GraphError;
446 fn internal_add_properties(
447 &self,
448 t: EventTime,
449 props: &[(usize, Prop)],
450 ) -> Result<(), GraphError> {
451 self.graph.internal_add_properties(t, props)?;
452
453 #[cfg(feature = "proto")]
454 self.if_cache(|cache| cache.add_graph_tprops(t, props));
455
456 Ok(())
457 }
458
459 fn internal_add_metadata(&self, props: &[(usize, Prop)]) -> Result<(), GraphError> {
460 self.graph.internal_add_metadata(props)?;
461
462 #[cfg(feature = "proto")]
463 self.if_cache(|cache| cache.add_graph_cprops(props));
464
465 Ok(())
466 }
467
468 fn internal_update_metadata(&self, props: &[(usize, Prop)]) -> Result<(), GraphError> {
469 self.graph.internal_update_metadata(props)?;
470
471 #[cfg(feature = "proto")]
472 self.if_cache(|cache| cache.add_graph_cprops(props));
473
474 Ok(())
475 }
476
477 fn internal_add_node_metadata(
478 &self,
479 vid: VID,
480 props: &[(usize, Prop)],
481 ) -> Result<EntryMut<'_, RwLockWriteGuard<'_, NodeSlot>>, Self::Error> {
482 let lock = self.graph.internal_add_node_metadata(vid, props)?;
483
484 #[cfg(feature = "proto")]
485 self.if_cache(|cache| cache.add_node_cprops(vid, props));
486
487 #[cfg(feature = "search")]
488 self.if_index_mut(|index| index.add_node_metadata(vid, props))?;
489
490 Ok(lock)
491 }
492
493 fn internal_update_node_metadata(
494 &self,
495 vid: VID,
496 props: &[(usize, Prop)],
497 ) -> Result<EntryMut<'_, RwLockWriteGuard<'_, NodeSlot>>, Self::Error> {
498 let lock = self.graph.internal_update_node_metadata(vid, props)?;
499
500 #[cfg(feature = "proto")]
501 self.if_cache(|cache| cache.add_node_cprops(vid, props));
502
503 #[cfg(feature = "search")]
504 self.if_index_mut(|index| index.update_node_metadata(vid, props))?;
505
506 Ok(lock)
507 }
508
509 fn internal_add_edge_metadata(
510 &self,
511 eid: EID,
512 layer: usize,
513 props: &[(usize, Prop)],
514 ) -> Result<EdgeWGuard<'_>, Self::Error> {
515 let lock = self.graph.internal_add_edge_metadata(eid, layer, props)?;
516
517 #[cfg(feature = "proto")]
518 self.if_cache(|cache| cache.add_edge_cprops(eid, layer, props));
519
520 #[cfg(feature = "search")]
521 self.if_index_mut(|index| index.add_edge_metadata(eid, layer, props))?;
522
523 Ok(lock)
524 }
525
526 fn internal_update_edge_metadata(
527 &self,
528 eid: EID,
529 layer: usize,
530 props: &[(usize, Prop)],
531 ) -> Result<EdgeWGuard<'_>, Self::Error> {
532 let lock = self
533 .graph
534 .internal_update_edge_metadata(eid, layer, props)?;
535
536 #[cfg(feature = "proto")]
537 self.if_cache(|cache| cache.add_edge_cprops(eid, layer, props));
538
539 #[cfg(feature = "search")]
540 self.if_index_mut(|index| index.update_edge_metadata(eid, layer, props))?;
541
542 Ok(lock)
543 }
544}
545
546impl InternalDeletionOps for Storage {
547 type Error = GraphError;
548 fn internal_delete_edge(
549 &self,
550 t: EventTime,
551 src: VID,
552 dst: VID,
553 layer: usize,
554 ) -> Result<MaybeNew<EID>, GraphError> {
555 let eid = self.graph.internal_delete_edge(t, src, dst, layer)?;
556
557 #[cfg(feature = "proto")]
558 self.if_cache(|cache| {
559 cache.resolve_edge(eid, src, dst);
560 cache.delete_edge(eid.inner(), t, layer);
561 });
562
563 Ok(eid)
564 }
565
566 fn internal_delete_existing_edge(
567 &self,
568 t: EventTime,
569 eid: EID,
570 layer: usize,
571 ) -> Result<(), GraphError> {
572 self.graph.internal_delete_existing_edge(t, eid, layer)?;
573
574 #[cfg(feature = "proto")]
575 self.if_cache(|cache| cache.delete_edge(eid, t, layer));
576
577 Ok(())
578 }
579}