raphtory/db/api/storage/
storage.rs

1#[cfg(feature = "search")]
2use crate::search::graph_index::GraphIndex;
3use crate::{
4    core::entities::{graph::tgraph::TemporalGraph, nodes::node_ref::NodeRef},
5    db::api::view::{
6        internal::{InheritEdgeHistoryFilter, InheritNodeHistoryFilter, InternalStorageOps},
7        Base, InheritViewOps,
8    },
9};
10use parking_lot::{RwLock, RwLockWriteGuard};
11use raphtory_api::core::{
12    entities::{EID, VID},
13    storage::{dict_mapper::MaybeNew, timeindex::TimeIndexEntry},
14};
15use raphtory_storage::graph::graph::GraphStorage;
16use serde::{Deserialize, Serialize};
17use std::{
18    fmt::{Display, Formatter},
19    ops::{Deref, DerefMut},
20    sync::Arc,
21};
22use tracing::info;
23
24#[cfg(feature = "search")]
25use crate::search::graph_index::MutableGraphIndex;
26use crate::{db::api::view::IndexSpec, errors::GraphError};
27use raphtory_api::core::entities::{
28    properties::prop::{Prop, PropType},
29    GidRef,
30};
31use raphtory_core::storage::{
32    raw_edges::{EdgeWGuard, WriteLockedEdges},
33    EntryMut, NodeSlot, WriteLockedNodes,
34};
35use raphtory_storage::{
36    core_ops::InheritCoreGraphOps,
37    graph::{locked::WriteLockedGraph, nodes::node_storage_ops::NodeStorageOps},
38    layer_ops::InheritLayerOps,
39    mutation::{
40        addition_ops::InternalAdditionOps, deletion_ops::InternalDeletionOps,
41        property_addition_ops::InternalPropertyAdditionOps,
42    },
43};
44#[cfg(feature = "proto")]
45use {
46    crate::serialise::incremental::{GraphWriter, InternalCache},
47    crate::serialise::GraphFolder,
48    once_cell::sync::OnceCell,
49};
50
51#[derive(Debug, Default, Serialize, Deserialize)]
52pub struct Storage {
53    graph: GraphStorage,
54    #[cfg(feature = "proto")]
55    #[serde(skip)]
56    pub(crate) cache: OnceCell<GraphWriter>,
57    #[cfg(feature = "search")]
58    #[serde(skip)]
59    pub(crate) index: RwLock<GraphIndex>,
60    // vector index
61}
62
63impl InheritLayerOps for Storage {}
64impl InheritCoreGraphOps for Storage {}
65
66impl Display for Storage {
67    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68        Display::fmt(&self.graph, f)
69    }
70}
71
72impl Base for Storage {
73    type Base = GraphStorage;
74
75    #[inline]
76    fn base(&self) -> &Self::Base {
77        &self.graph
78    }
79}
80
81#[cfg(feature = "search")]
82const IN_MEMORY_INDEX_NOT_PERSISTED: &str = "In-memory index not persisted. Not supported";
83
84impl Storage {
85    pub(crate) fn new(num_locks: usize) -> Self {
86        Self {
87            graph: GraphStorage::Unlocked(Arc::new(TemporalGraph::new(num_locks))),
88            #[cfg(feature = "proto")]
89            cache: OnceCell::new(),
90            #[cfg(feature = "search")]
91            index: RwLock::new(GraphIndex::Empty),
92        }
93    }
94
95    pub(crate) fn from_inner(graph: GraphStorage) -> Self {
96        Self {
97            graph,
98            #[cfg(feature = "proto")]
99            cache: OnceCell::new(),
100            #[cfg(feature = "search")]
101            index: RwLock::new(GraphIndex::Empty),
102        }
103    }
104
105    #[cfg(feature = "proto")]
106    #[inline]
107    fn if_cache(&self, map_fn: impl FnOnce(&GraphWriter)) {
108        if let Some(cache) = self.cache.get() {
109            map_fn(cache)
110        }
111    }
112
113    #[cfg(feature = "search")]
114    #[inline]
115    fn if_index(
116        &self,
117        map_fn: impl FnOnce(&GraphIndex) -> Result<(), GraphError>,
118    ) -> Result<(), GraphError> {
119        map_fn(&self.index.read())?;
120        Ok(())
121    }
122
123    #[cfg(feature = "search")]
124    #[inline]
125    fn if_index_mut(
126        &self,
127        map_fn: impl FnOnce(&MutableGraphIndex) -> Result<(), GraphError>,
128    ) -> Result<(), GraphError> {
129        let guard = self.index.read();
130        match guard.deref() {
131            GraphIndex::Empty => {}
132            GraphIndex::Mutable(i) => map_fn(i)?,
133            GraphIndex::Immutable(_) => {
134                drop(guard);
135                let mut guard = self.index.write();
136                guard.make_mutable_if_needed()?;
137                if let GraphIndex::Mutable(m) = guard.deref_mut() {
138                    map_fn(m)?
139                }
140            }
141        }
142        Ok(())
143    }
144}
145
146#[cfg(feature = "search")]
147impl Storage {
148    pub(crate) fn get_index_spec(&self) -> Result<IndexSpec, GraphError> {
149        Ok(self.index.read().index_spec())
150    }
151
152    pub(crate) fn load_index_if_empty(&self, path: &GraphFolder) -> Result<(), GraphError> {
153        let guard = self.index.read();
154        match guard.deref() {
155            GraphIndex::Empty => {
156                drop(guard);
157                let mut guard = self.index.write();
158                if let e @ GraphIndex::Empty = guard.deref_mut() {
159                    let index = GraphIndex::load_from_path(&path)?;
160                    *e = index;
161                }
162            }
163            _ => {}
164        }
165        Ok(())
166    }
167
168    pub(crate) fn create_index_if_empty(&self, index_spec: IndexSpec) -> Result<(), GraphError> {
169        {
170            let guard = self.index.read();
171            match guard.deref() {
172                GraphIndex::Empty => {
173                    drop(guard);
174                    let mut guard = self.index.write();
175                    if let e @ GraphIndex::Empty = guard.deref_mut() {
176                        let cached_graph_path = self.get_cache().map(|cache| cache.folder.clone());
177                        let index = GraphIndex::create(&self.graph, false, cached_graph_path)?;
178                        *e = index;
179                    }
180                }
181                _ => {}
182            }
183        }
184        self.if_index_mut(|index| index.update(&self.graph, index_spec))?;
185        Ok(())
186    }
187
188    pub(crate) fn create_index_in_ram_if_empty(
189        &self,
190        index_spec: IndexSpec,
191    ) -> Result<(), GraphError> {
192        {
193            let guard = self.index.read();
194            match guard.deref() {
195                GraphIndex::Empty => {
196                    drop(guard);
197                    let mut guard = self.index.write();
198                    if let e @ GraphIndex::Empty = guard.deref_mut() {
199                        let index = GraphIndex::create(&self.graph, true, None)?;
200                        *e = index;
201                    }
202                }
203                _ => {}
204            }
205        }
206        if self.index.read().path().is_some() {
207            return Err(GraphError::OnDiskIndexAlreadyExists);
208        }
209        self.if_index_mut(|index| index.update(&self.graph, index_spec))?;
210        Ok(())
211    }
212
213    pub(crate) fn get_index(&self) -> &RwLock<GraphIndex> {
214        &self.index
215    }
216
217    pub(crate) fn is_indexed(&self) -> bool {
218        self.index.read().is_indexed()
219    }
220
221    pub(crate) fn persist_index_to_disk(&self, path: &GraphFolder) -> Result<(), GraphError> {
222        let guard = self.get_index().read();
223        if guard.is_indexed() {
224            if guard.path().is_none() {
225                info!("{}", IN_MEMORY_INDEX_NOT_PERSISTED);
226                return Ok(());
227            }
228            self.if_index(|index| index.persist_to_disk(path))?;
229        }
230        Ok(())
231    }
232
233    pub(crate) fn persist_index_to_disk_zip(&self, path: &GraphFolder) -> Result<(), GraphError> {
234        let guard = self.get_index().read();
235        if guard.is_indexed() {
236            if guard.path().is_none() {
237                info!("{}", IN_MEMORY_INDEX_NOT_PERSISTED);
238                return Ok(());
239            }
240            self.if_index(|index| index.persist_to_disk_zip(path))?;
241        }
242        Ok(())
243    }
244
245    pub(crate) fn drop_index(&self) -> Result<(), GraphError> {
246        let mut guard = self.index.write();
247        *guard = GraphIndex::Empty;
248        Ok(())
249    }
250}
251
252impl InternalStorageOps for Storage {
253    fn get_storage(&self) -> Option<&Storage> {
254        Some(self)
255    }
256}
257
258impl InheritNodeHistoryFilter for Storage {}
259impl InheritEdgeHistoryFilter for Storage {}
260
261impl InheritViewOps for Storage {}
262
263impl InternalAdditionOps for Storage {
264    type Error = GraphError;
265
266    fn write_lock(&self) -> Result<WriteLockedGraph, Self::Error> {
267        Ok(self.graph.write_lock()?)
268    }
269
270    fn write_lock_nodes(&self) -> Result<WriteLockedNodes, Self::Error> {
271        Ok(self.graph.write_lock_nodes()?)
272    }
273
274    fn write_lock_edges(&self) -> Result<WriteLockedEdges, Self::Error> {
275        Ok(self.graph.write_lock_edges()?)
276    }
277
278    fn next_event_id(&self) -> Result<usize, Self::Error> {
279        Ok(self.graph.next_event_id()?)
280    }
281
282    fn reserve_event_ids(&self, num_ids: usize) -> Result<usize, Self::Error> {
283        Ok(self.graph.reserve_event_ids(num_ids)?)
284    }
285
286    fn resolve_layer(&self, layer: Option<&str>) -> Result<MaybeNew<usize>, GraphError> {
287        let id = self.graph.resolve_layer(layer)?;
288
289        #[cfg(feature = "proto")]
290        self.if_cache(|cache| cache.resolve_layer(layer, id));
291
292        Ok(id)
293    }
294
295    fn resolve_node(&self, id: NodeRef) -> Result<MaybeNew<VID>, GraphError> {
296        match id {
297            NodeRef::Internal(id) => Ok(MaybeNew::Existing(id)),
298            NodeRef::External(gid) => {
299                let id = self.graph.resolve_node(id)?;
300
301                #[cfg(feature = "proto")]
302                self.if_cache(|cache| cache.resolve_node(id, gid));
303
304                Ok(id)
305            }
306        }
307    }
308
309    fn set_node(&self, gid: GidRef, vid: VID) -> Result<(), Self::Error> {
310        Ok(self.graph.set_node(gid, vid)?)
311    }
312
313    fn resolve_node_and_type(
314        &self,
315        id: NodeRef,
316        node_type: &str,
317    ) -> Result<MaybeNew<(MaybeNew<VID>, MaybeNew<usize>)>, GraphError> {
318        let node_and_type = self.graph.resolve_node_and_type(id, node_type)?;
319
320        #[cfg(feature = "proto")]
321        self.if_cache(|cache| {
322            let (vid, _) = node_and_type.inner();
323            let node_entry = self.graph.core_node(vid.inner());
324            cache.resolve_node_and_type(node_and_type, node_type, node_entry.id())
325        });
326
327        Ok(node_and_type)
328    }
329
330    fn resolve_graph_property(
331        &self,
332        prop: &str,
333        dtype: PropType,
334        is_static: bool,
335    ) -> Result<MaybeNew<usize>, GraphError> {
336        let id = self
337            .graph
338            .resolve_graph_property(prop, dtype.clone(), is_static)?;
339
340        #[cfg(feature = "proto")]
341        self.if_cache(|cache| cache.resolve_graph_property(prop, id, dtype, is_static));
342
343        Ok(id)
344    }
345
346    fn resolve_node_property(
347        &self,
348        prop: &str,
349        dtype: PropType,
350        is_static: bool,
351    ) -> Result<MaybeNew<usize>, GraphError> {
352        let id = self
353            .graph
354            .resolve_node_property(prop, dtype.clone(), is_static)?;
355
356        #[cfg(feature = "proto")]
357        self.if_cache(|cache| cache.resolve_node_property(prop, id, &dtype, is_static));
358
359        Ok(id)
360    }
361
362    fn resolve_edge_property(
363        &self,
364        prop: &str,
365        dtype: PropType,
366        is_static: bool,
367    ) -> Result<MaybeNew<usize>, GraphError> {
368        let id = self
369            .graph
370            .resolve_edge_property(prop, dtype.clone(), is_static)?;
371
372        #[cfg(feature = "proto")]
373        self.if_cache(|cache| cache.resolve_edge_property(prop, id, &dtype, is_static));
374
375        Ok(id)
376    }
377
378    fn internal_add_node(
379        &self,
380        t: TimeIndexEntry,
381        v: VID,
382        props: &[(usize, Prop)],
383    ) -> Result<(), GraphError> {
384        self.graph.internal_add_node(t, v, props)?;
385
386        #[cfg(feature = "proto")]
387        self.if_cache(|cache| cache.add_node_update(t, v, props));
388
389        #[cfg(feature = "search")]
390        self.if_index_mut(|index| index.add_node_update(&self.graph, t, MaybeNew::New(v), props))?;
391
392        Ok(())
393    }
394
395    fn internal_add_edge(
396        &self,
397        t: TimeIndexEntry,
398        src: VID,
399        dst: VID,
400        props: &[(usize, Prop)],
401        layer: usize,
402    ) -> Result<MaybeNew<EID>, GraphError> {
403        let id = self.graph.internal_add_edge(t, src, dst, props, layer)?;
404
405        #[cfg(feature = "proto")]
406        self.if_cache(|cache| {
407            cache.resolve_edge(id, src, dst);
408            cache.add_edge_update(t, id.inner(), props, layer);
409        });
410
411        #[cfg(feature = "search")]
412        self.if_index_mut(|index| index.add_edge_update(&self.graph, id, t, layer, props))?;
413
414        Ok(id)
415    }
416
417    fn internal_add_edge_update(
418        &self,
419        t: TimeIndexEntry,
420        edge: EID,
421        props: &[(usize, Prop)],
422        layer: usize,
423    ) -> Result<(), GraphError> {
424        self.graph.internal_add_edge_update(t, edge, props, layer)?;
425
426        #[cfg(feature = "proto")]
427        self.if_cache(|cache| cache.add_edge_update(t, edge, props, layer));
428
429        #[cfg(feature = "search")]
430        self.if_index_mut(|index| {
431            index.add_edge_update(&self.graph, MaybeNew::Existing(edge), t, layer, props)
432        })?;
433
434        Ok(())
435    }
436}
437
438impl InternalPropertyAdditionOps for Storage {
439    type Error = GraphError;
440    fn internal_add_properties(
441        &self,
442        t: TimeIndexEntry,
443        props: &[(usize, Prop)],
444    ) -> Result<(), GraphError> {
445        self.graph.internal_add_properties(t, props)?;
446
447        #[cfg(feature = "proto")]
448        self.if_cache(|cache| cache.add_graph_tprops(t, props));
449
450        Ok(())
451    }
452
453    fn internal_add_metadata(&self, props: &[(usize, Prop)]) -> Result<(), GraphError> {
454        self.graph.internal_add_metadata(props)?;
455
456        #[cfg(feature = "proto")]
457        self.if_cache(|cache| cache.add_graph_cprops(props));
458
459        Ok(())
460    }
461
462    fn internal_update_metadata(&self, props: &[(usize, Prop)]) -> Result<(), GraphError> {
463        self.graph.internal_update_metadata(props)?;
464
465        #[cfg(feature = "proto")]
466        self.if_cache(|cache| cache.add_graph_cprops(props));
467
468        Ok(())
469    }
470
471    fn internal_add_node_metadata(
472        &self,
473        vid: VID,
474        props: &[(usize, Prop)],
475    ) -> Result<EntryMut<RwLockWriteGuard<NodeSlot>>, Self::Error> {
476        let lock = self.graph.internal_add_node_metadata(vid, props)?;
477
478        #[cfg(feature = "proto")]
479        self.if_cache(|cache| cache.add_node_cprops(vid, props));
480
481        #[cfg(feature = "search")]
482        self.if_index_mut(|index| index.add_node_metadata(vid, props))?;
483
484        Ok(lock)
485    }
486
487    fn internal_update_node_metadata(
488        &self,
489        vid: VID,
490        props: &[(usize, Prop)],
491    ) -> Result<EntryMut<RwLockWriteGuard<NodeSlot>>, Self::Error> {
492        let lock = self.graph.internal_update_node_metadata(vid, props)?;
493
494        #[cfg(feature = "proto")]
495        self.if_cache(|cache| cache.add_node_cprops(vid, props));
496
497        #[cfg(feature = "search")]
498        self.if_index_mut(|index| index.update_node_metadata(vid, props))?;
499
500        Ok(lock)
501    }
502
503    fn internal_add_edge_metadata(
504        &self,
505        eid: EID,
506        layer: usize,
507        props: &[(usize, Prop)],
508    ) -> Result<EdgeWGuard, Self::Error> {
509        let lock = self.graph.internal_add_edge_metadata(eid, layer, props)?;
510
511        #[cfg(feature = "proto")]
512        self.if_cache(|cache| cache.add_edge_cprops(eid, layer, props));
513
514        #[cfg(feature = "search")]
515        self.if_index_mut(|index| index.add_edge_metadata(eid, layer, props))?;
516
517        Ok(lock)
518    }
519
520    fn internal_update_edge_metadata(
521        &self,
522        eid: EID,
523        layer: usize,
524        props: &[(usize, Prop)],
525    ) -> Result<EdgeWGuard, Self::Error> {
526        let lock = self
527            .graph
528            .internal_update_edge_metadata(eid, layer, props)?;
529
530        #[cfg(feature = "proto")]
531        self.if_cache(|cache| cache.add_edge_cprops(eid, layer, props));
532
533        #[cfg(feature = "search")]
534        self.if_index_mut(|index| index.update_edge_metadata(eid, layer, props))?;
535
536        Ok(lock)
537    }
538}
539
540impl InternalDeletionOps for Storage {
541    type Error = GraphError;
542    fn internal_delete_edge(
543        &self,
544        t: TimeIndexEntry,
545        src: VID,
546        dst: VID,
547        layer: usize,
548    ) -> Result<MaybeNew<EID>, GraphError> {
549        let eid = self.graph.internal_delete_edge(t, src, dst, layer)?;
550
551        #[cfg(feature = "proto")]
552        self.if_cache(|cache| {
553            cache.resolve_edge(eid, src, dst);
554            cache.delete_edge(eid.inner(), t, layer);
555        });
556
557        Ok(eid)
558    }
559
560    fn internal_delete_existing_edge(
561        &self,
562        t: TimeIndexEntry,
563        eid: EID,
564        layer: usize,
565    ) -> Result<(), GraphError> {
566        self.graph.internal_delete_existing_edge(t, eid, layer)?;
567
568        #[cfg(feature = "proto")]
569        self.if_cache(|cache| cache.delete_edge(eid, t, layer));
570
571        Ok(())
572    }
573}