Skip to main content

raphtory/db/api/storage/
storage.rs

1#[cfg(feature = "search")]
2use crate::search::graph_index::GraphIndex;
3use crate::{
4    core::entities::{graph::tgraph::TemporalGraph, nodes::node_ref::NodeRef},
5    db::api::view::{
6        internal::{InheritEdgeHistoryFilter, InheritNodeHistoryFilter, InternalStorageOps},
7        Base, InheritViewOps,
8    },
9};
10use parking_lot::{RwLock, RwLockWriteGuard};
11use raphtory_api::core::{
12    entities::{EID, VID},
13    storage::{dict_mapper::MaybeNew, timeindex::EventTime},
14};
15use raphtory_storage::graph::graph::GraphStorage;
16use serde::{Deserialize, Serialize};
17use std::{
18    fmt::{Display, Formatter},
19    ops::{Deref, DerefMut},
20    sync::Arc,
21};
22use tracing::info;
23
24#[cfg(feature = "search")]
25use crate::search::graph_index::MutableGraphIndex;
26use crate::{db::api::view::IndexSpec, errors::GraphError};
27use raphtory_api::core::entities::{
28    properties::prop::{Prop, PropType},
29    GidRef,
30};
31use raphtory_core::storage::{
32    raw_edges::{EdgeWGuard, WriteLockedEdges},
33    EntryMut, NodeSlot, WriteLockedNodes,
34};
35use raphtory_storage::{
36    core_ops::InheritCoreGraphOps,
37    graph::{locked::WriteLockedGraph, nodes::node_storage_ops::NodeStorageOps},
38    layer_ops::InheritLayerOps,
39    mutation::{
40        addition_ops::InternalAdditionOps, deletion_ops::InternalDeletionOps,
41        property_addition_ops::InternalPropertyAdditionOps,
42    },
43};
44#[cfg(feature = "proto")]
45use {
46    crate::serialise::incremental::{GraphWriter, InternalCache},
47    crate::serialise::GraphFolder,
48    once_cell::sync::OnceCell,
49};
50
51#[derive(Debug, Default, Serialize, Deserialize)]
52pub struct Storage {
53    graph: GraphStorage,
54    #[cfg(feature = "proto")]
55    #[serde(skip)]
56    pub(crate) cache: OnceCell<GraphWriter>,
57    #[cfg(feature = "search")]
58    #[serde(skip)]
59    pub(crate) index: RwLock<GraphIndex>,
60    // vector index
61}
62
63impl From<GraphStorage> for Storage {
64    fn from(graph: GraphStorage) -> Self {
65        Self::from_inner(graph)
66    }
67}
68
69impl InheritLayerOps for Storage {}
70impl InheritCoreGraphOps for Storage {}
71
72impl Display for Storage {
73    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
74        Display::fmt(&self.graph, f)
75    }
76}
77
78impl Base for Storage {
79    type Base = GraphStorage;
80
81    #[inline]
82    fn base(&self) -> &Self::Base {
83        &self.graph
84    }
85}
86
87#[cfg(feature = "search")]
88const IN_MEMORY_INDEX_NOT_PERSISTED: &str = "In-memory index not persisted. Not supported";
89
90impl Storage {
91    pub(crate) fn new(num_locks: usize) -> Self {
92        Self {
93            graph: GraphStorage::Unlocked(Arc::new(TemporalGraph::new(num_locks))),
94            #[cfg(feature = "proto")]
95            cache: OnceCell::new(),
96            #[cfg(feature = "search")]
97            index: RwLock::new(GraphIndex::Empty),
98        }
99    }
100
101    pub(crate) fn from_inner(graph: GraphStorage) -> Self {
102        Self {
103            graph,
104            #[cfg(feature = "proto")]
105            cache: OnceCell::new(),
106            #[cfg(feature = "search")]
107            index: RwLock::new(GraphIndex::Empty),
108        }
109    }
110
111    #[cfg(feature = "proto")]
112    #[inline]
113    fn if_cache(&self, map_fn: impl FnOnce(&GraphWriter)) {
114        if let Some(cache) = self.cache.get() {
115            map_fn(cache)
116        }
117    }
118
119    #[cfg(feature = "search")]
120    #[inline]
121    fn if_index(
122        &self,
123        map_fn: impl FnOnce(&GraphIndex) -> Result<(), GraphError>,
124    ) -> Result<(), GraphError> {
125        map_fn(&self.index.read_recursive())?;
126        Ok(())
127    }
128
129    #[cfg(feature = "search")]
130    #[inline]
131    fn if_index_mut(
132        &self,
133        map_fn: impl FnOnce(&MutableGraphIndex) -> Result<(), GraphError>,
134    ) -> Result<(), GraphError> {
135        let guard = self.index.read_recursive();
136        match guard.deref() {
137            GraphIndex::Empty => {}
138            GraphIndex::Mutable(i) => map_fn(i)?,
139            GraphIndex::Immutable(_) => {
140                drop(guard);
141                let mut guard = self.index.write();
142                guard.make_mutable_if_needed()?;
143                if let GraphIndex::Mutable(m) = guard.deref_mut() {
144                    map_fn(m)?
145                }
146            }
147        }
148        Ok(())
149    }
150}
151
152#[cfg(feature = "search")]
153impl Storage {
154    pub(crate) fn get_index_spec(&self) -> Result<IndexSpec, GraphError> {
155        Ok(self.index.read_recursive().index_spec())
156    }
157
158    pub(crate) fn load_index_if_empty(&self, path: &GraphFolder) -> Result<(), GraphError> {
159        let guard = self.index.read_recursive();
160        match guard.deref() {
161            GraphIndex::Empty => {
162                drop(guard);
163                let mut guard = self.index.write();
164                if let e @ GraphIndex::Empty = guard.deref_mut() {
165                    let index = GraphIndex::load_from_path(&path)?;
166                    *e = index;
167                }
168            }
169            _ => {}
170        }
171        Ok(())
172    }
173
174    pub(crate) fn create_index_if_empty(&self, index_spec: IndexSpec) -> Result<(), GraphError> {
175        {
176            let guard = self.index.read_recursive();
177            match guard.deref() {
178                GraphIndex::Empty => {
179                    drop(guard);
180                    let mut guard = self.index.write();
181                    if let e @ GraphIndex::Empty = guard.deref_mut() {
182                        let cached_graph_path = self.get_cache().map(|cache| cache.folder.clone());
183                        let index = GraphIndex::create(&self.graph, false, cached_graph_path)?;
184                        *e = index;
185                    }
186                }
187                _ => {}
188            }
189        }
190        self.if_index_mut(|index| index.update(&self.graph, index_spec))?;
191        Ok(())
192    }
193
194    pub(crate) fn create_index_in_ram_if_empty(
195        &self,
196        index_spec: IndexSpec,
197    ) -> Result<(), GraphError> {
198        {
199            let guard = self.index.read_recursive();
200            match guard.deref() {
201                GraphIndex::Empty => {
202                    drop(guard);
203                    let mut guard = self.index.write();
204                    if let e @ GraphIndex::Empty = guard.deref_mut() {
205                        let index = GraphIndex::create(&self.graph, true, None)?;
206                        *e = index;
207                    }
208                }
209                _ => {}
210            }
211        }
212        if self.index.read_recursive().path().is_some() {
213            return Err(GraphError::OnDiskIndexAlreadyExists);
214        }
215        self.if_index_mut(|index| index.update(&self.graph, index_spec))?;
216        Ok(())
217    }
218
219    pub(crate) fn get_index(&self) -> &RwLock<GraphIndex> {
220        &self.index
221    }
222
223    pub(crate) fn is_indexed(&self) -> bool {
224        self.index.read_recursive().is_indexed()
225    }
226
227    pub(crate) fn persist_index_to_disk(&self, path: &GraphFolder) -> Result<(), GraphError> {
228        let guard = self.get_index().read_recursive();
229        if guard.is_indexed() {
230            if guard.path().is_none() {
231                info!("{}", IN_MEMORY_INDEX_NOT_PERSISTED);
232                return Ok(());
233            }
234            self.if_index(|index| index.persist_to_disk(path))?;
235        }
236        Ok(())
237    }
238
239    pub(crate) fn persist_index_to_disk_zip(&self, path: &GraphFolder) -> Result<(), GraphError> {
240        let guard = self.get_index().read_recursive();
241        if guard.is_indexed() {
242            if guard.path().is_none() {
243                info!("{}", IN_MEMORY_INDEX_NOT_PERSISTED);
244                return Ok(());
245            }
246            self.if_index(|index| index.persist_to_disk_zip(path))?;
247        }
248        Ok(())
249    }
250
251    pub(crate) fn drop_index(&self) -> Result<(), GraphError> {
252        let mut guard = self.index.write();
253        *guard = GraphIndex::Empty;
254        Ok(())
255    }
256}
257
258impl InternalStorageOps for Storage {
259    fn get_storage(&self) -> Option<&Storage> {
260        Some(self)
261    }
262}
263
264impl InheritNodeHistoryFilter for Storage {}
265impl InheritEdgeHistoryFilter for Storage {}
266
267impl InheritViewOps for Storage {}
268
269impl InternalAdditionOps for Storage {
270    type Error = GraphError;
271
272    fn write_lock(&self) -> Result<WriteLockedGraph<'_>, Self::Error> {
273        Ok(self.graph.write_lock()?)
274    }
275
276    fn write_lock_nodes(&self) -> Result<WriteLockedNodes<'_>, Self::Error> {
277        Ok(self.graph.write_lock_nodes()?)
278    }
279
280    fn write_lock_edges(&self) -> Result<WriteLockedEdges<'_>, Self::Error> {
281        Ok(self.graph.write_lock_edges()?)
282    }
283
284    fn next_event_id(&self) -> Result<usize, Self::Error> {
285        Ok(self.graph.next_event_id()?)
286    }
287
288    fn reserve_event_ids(&self, num_ids: usize) -> Result<usize, Self::Error> {
289        Ok(self.graph.reserve_event_ids(num_ids)?)
290    }
291
292    fn resolve_layer(&self, layer: Option<&str>) -> Result<MaybeNew<usize>, GraphError> {
293        let id = self.graph.resolve_layer(layer)?;
294
295        #[cfg(feature = "proto")]
296        self.if_cache(|cache| cache.resolve_layer(layer, id));
297
298        Ok(id)
299    }
300
301    fn resolve_node(&self, id: NodeRef) -> Result<MaybeNew<VID>, GraphError> {
302        match id {
303            NodeRef::Internal(id) => Ok(MaybeNew::Existing(id)),
304            NodeRef::External(gid) => {
305                let id = self.graph.resolve_node(id)?;
306
307                #[cfg(feature = "proto")]
308                self.if_cache(|cache| cache.resolve_node(id, gid));
309
310                Ok(id)
311            }
312        }
313    }
314
315    fn set_node(&self, gid: GidRef, vid: VID) -> Result<(), Self::Error> {
316        Ok(self.graph.set_node(gid, vid)?)
317    }
318
319    fn resolve_node_and_type(
320        &self,
321        id: NodeRef,
322        node_type: &str,
323    ) -> Result<MaybeNew<(MaybeNew<VID>, MaybeNew<usize>)>, GraphError> {
324        let node_and_type = self.graph.resolve_node_and_type(id, node_type)?;
325
326        #[cfg(feature = "proto")]
327        self.if_cache(|cache| {
328            let (vid, _) = node_and_type.inner();
329            let node_entry = self.graph.core_node(vid.inner());
330            cache.resolve_node_and_type(node_and_type, node_type, node_entry.id())
331        });
332
333        Ok(node_and_type)
334    }
335
336    fn resolve_graph_property(
337        &self,
338        prop: &str,
339        dtype: PropType,
340        is_static: bool,
341    ) -> Result<MaybeNew<usize>, GraphError> {
342        let id = self
343            .graph
344            .resolve_graph_property(prop, dtype.clone(), is_static)?;
345
346        #[cfg(feature = "proto")]
347        self.if_cache(|cache| cache.resolve_graph_property(prop, id, dtype, is_static));
348
349        Ok(id)
350    }
351
352    fn resolve_node_property(
353        &self,
354        prop: &str,
355        dtype: PropType,
356        is_static: bool,
357    ) -> Result<MaybeNew<usize>, GraphError> {
358        let id = self
359            .graph
360            .resolve_node_property(prop, dtype.clone(), is_static)?;
361
362        #[cfg(feature = "proto")]
363        self.if_cache(|cache| cache.resolve_node_property(prop, id, &dtype, is_static));
364
365        Ok(id)
366    }
367
368    fn resolve_edge_property(
369        &self,
370        prop: &str,
371        dtype: PropType,
372        is_static: bool,
373    ) -> Result<MaybeNew<usize>, GraphError> {
374        let id = self
375            .graph
376            .resolve_edge_property(prop, dtype.clone(), is_static)?;
377
378        #[cfg(feature = "proto")]
379        self.if_cache(|cache| cache.resolve_edge_property(prop, id, &dtype, is_static));
380
381        Ok(id)
382    }
383
384    fn internal_add_node(
385        &self,
386        t: EventTime,
387        v: VID,
388        props: &[(usize, Prop)],
389    ) -> Result<(), GraphError> {
390        self.graph.internal_add_node(t, v, props)?;
391
392        #[cfg(feature = "proto")]
393        self.if_cache(|cache| cache.add_node_update(t, v, props));
394
395        #[cfg(feature = "search")]
396        self.if_index_mut(|index| index.add_node_update(&self.graph, t, MaybeNew::New(v), props))?;
397
398        Ok(())
399    }
400
401    fn internal_add_edge(
402        &self,
403        t: EventTime,
404        src: VID,
405        dst: VID,
406        props: &[(usize, Prop)],
407        layer: usize,
408    ) -> Result<MaybeNew<EID>, GraphError> {
409        let id = self.graph.internal_add_edge(t, src, dst, props, layer)?;
410
411        #[cfg(feature = "proto")]
412        self.if_cache(|cache| {
413            cache.resolve_edge(id, src, dst);
414            cache.add_edge_update(t, id.inner(), props, layer);
415        });
416
417        #[cfg(feature = "search")]
418        self.if_index_mut(|index| index.add_edge_update(&self.graph, id, t, layer, props))?;
419
420        Ok(id)
421    }
422
423    fn internal_add_edge_update(
424        &self,
425        t: EventTime,
426        edge: EID,
427        props: &[(usize, Prop)],
428        layer: usize,
429    ) -> Result<(), GraphError> {
430        self.graph.internal_add_edge_update(t, edge, props, layer)?;
431
432        #[cfg(feature = "proto")]
433        self.if_cache(|cache| cache.add_edge_update(t, edge, props, layer));
434
435        #[cfg(feature = "search")]
436        self.if_index_mut(|index| {
437            index.add_edge_update(&self.graph, MaybeNew::Existing(edge), t, layer, props)
438        })?;
439
440        Ok(())
441    }
442}
443
444impl InternalPropertyAdditionOps for Storage {
445    type Error = GraphError;
446    fn internal_add_properties(
447        &self,
448        t: EventTime,
449        props: &[(usize, Prop)],
450    ) -> Result<(), GraphError> {
451        self.graph.internal_add_properties(t, props)?;
452
453        #[cfg(feature = "proto")]
454        self.if_cache(|cache| cache.add_graph_tprops(t, props));
455
456        Ok(())
457    }
458
459    fn internal_add_metadata(&self, props: &[(usize, Prop)]) -> Result<(), GraphError> {
460        self.graph.internal_add_metadata(props)?;
461
462        #[cfg(feature = "proto")]
463        self.if_cache(|cache| cache.add_graph_cprops(props));
464
465        Ok(())
466    }
467
468    fn internal_update_metadata(&self, props: &[(usize, Prop)]) -> Result<(), GraphError> {
469        self.graph.internal_update_metadata(props)?;
470
471        #[cfg(feature = "proto")]
472        self.if_cache(|cache| cache.add_graph_cprops(props));
473
474        Ok(())
475    }
476
477    fn internal_add_node_metadata(
478        &self,
479        vid: VID,
480        props: &[(usize, Prop)],
481    ) -> Result<EntryMut<'_, RwLockWriteGuard<'_, NodeSlot>>, Self::Error> {
482        let lock = self.graph.internal_add_node_metadata(vid, props)?;
483
484        #[cfg(feature = "proto")]
485        self.if_cache(|cache| cache.add_node_cprops(vid, props));
486
487        #[cfg(feature = "search")]
488        self.if_index_mut(|index| index.add_node_metadata(vid, props))?;
489
490        Ok(lock)
491    }
492
493    fn internal_update_node_metadata(
494        &self,
495        vid: VID,
496        props: &[(usize, Prop)],
497    ) -> Result<EntryMut<'_, RwLockWriteGuard<'_, NodeSlot>>, Self::Error> {
498        let lock = self.graph.internal_update_node_metadata(vid, props)?;
499
500        #[cfg(feature = "proto")]
501        self.if_cache(|cache| cache.add_node_cprops(vid, props));
502
503        #[cfg(feature = "search")]
504        self.if_index_mut(|index| index.update_node_metadata(vid, props))?;
505
506        Ok(lock)
507    }
508
509    fn internal_add_edge_metadata(
510        &self,
511        eid: EID,
512        layer: usize,
513        props: &[(usize, Prop)],
514    ) -> Result<EdgeWGuard<'_>, Self::Error> {
515        let lock = self.graph.internal_add_edge_metadata(eid, layer, props)?;
516
517        #[cfg(feature = "proto")]
518        self.if_cache(|cache| cache.add_edge_cprops(eid, layer, props));
519
520        #[cfg(feature = "search")]
521        self.if_index_mut(|index| index.add_edge_metadata(eid, layer, props))?;
522
523        Ok(lock)
524    }
525
526    fn internal_update_edge_metadata(
527        &self,
528        eid: EID,
529        layer: usize,
530        props: &[(usize, Prop)],
531    ) -> Result<EdgeWGuard<'_>, Self::Error> {
532        let lock = self
533            .graph
534            .internal_update_edge_metadata(eid, layer, props)?;
535
536        #[cfg(feature = "proto")]
537        self.if_cache(|cache| cache.add_edge_cprops(eid, layer, props));
538
539        #[cfg(feature = "search")]
540        self.if_index_mut(|index| index.update_edge_metadata(eid, layer, props))?;
541
542        Ok(lock)
543    }
544}
545
546impl InternalDeletionOps for Storage {
547    type Error = GraphError;
548    fn internal_delete_edge(
549        &self,
550        t: EventTime,
551        src: VID,
552        dst: VID,
553        layer: usize,
554    ) -> Result<MaybeNew<EID>, GraphError> {
555        let eid = self.graph.internal_delete_edge(t, src, dst, layer)?;
556
557        #[cfg(feature = "proto")]
558        self.if_cache(|cache| {
559            cache.resolve_edge(eid, src, dst);
560            cache.delete_edge(eid.inner(), t, layer);
561        });
562
563        Ok(eid)
564    }
565
566    fn internal_delete_existing_edge(
567        &self,
568        t: EventTime,
569        eid: EID,
570        layer: usize,
571    ) -> Result<(), GraphError> {
572        self.graph.internal_delete_existing_edge(t, eid, layer)?;
573
574        #[cfg(feature = "proto")]
575        self.if_cache(|cache| cache.delete_edge(eid, t, layer));
576
577        Ok(())
578    }
579}