1#[cfg(feature = "search")]
2use crate::search::graph_index::GraphIndex;
3use crate::{
4 core::entities::{graph::tgraph::TemporalGraph, nodes::node_ref::NodeRef},
5 db::api::view::{
6 internal::{InheritEdgeHistoryFilter, InheritNodeHistoryFilter, InternalStorageOps},
7 Base, InheritViewOps,
8 },
9};
10use parking_lot::{RwLock, RwLockWriteGuard};
11use raphtory_api::core::{
12 entities::{EID, VID},
13 storage::{dict_mapper::MaybeNew, timeindex::TimeIndexEntry},
14};
15use raphtory_storage::graph::graph::GraphStorage;
16use serde::{Deserialize, Serialize};
17use std::{
18 fmt::{Display, Formatter},
19 ops::{Deref, DerefMut},
20 sync::Arc,
21};
22use tracing::info;
23
24#[cfg(feature = "search")]
25use crate::search::graph_index::MutableGraphIndex;
26use crate::{db::api::view::IndexSpec, errors::GraphError};
27use raphtory_api::core::entities::{
28 properties::prop::{Prop, PropType},
29 GidRef,
30};
31use raphtory_core::storage::{
32 raw_edges::{EdgeWGuard, WriteLockedEdges},
33 EntryMut, NodeSlot, WriteLockedNodes,
34};
35use raphtory_storage::{
36 core_ops::InheritCoreGraphOps,
37 graph::{locked::WriteLockedGraph, nodes::node_storage_ops::NodeStorageOps},
38 layer_ops::InheritLayerOps,
39 mutation::{
40 addition_ops::InternalAdditionOps, deletion_ops::InternalDeletionOps,
41 property_addition_ops::InternalPropertyAdditionOps,
42 },
43};
44#[cfg(feature = "proto")]
45use {
46 crate::serialise::incremental::{GraphWriter, InternalCache},
47 crate::serialise::GraphFolder,
48 once_cell::sync::OnceCell,
49};
50
51#[derive(Debug, Default, Serialize, Deserialize)]
52pub struct Storage {
53 graph: GraphStorage,
54 #[cfg(feature = "proto")]
55 #[serde(skip)]
56 pub(crate) cache: OnceCell<GraphWriter>,
57 #[cfg(feature = "search")]
58 #[serde(skip)]
59 pub(crate) index: RwLock<GraphIndex>,
60 }
62
63impl InheritLayerOps for Storage {}
64impl InheritCoreGraphOps for Storage {}
65
66impl Display for Storage {
67 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68 Display::fmt(&self.graph, f)
69 }
70}
71
72impl Base for Storage {
73 type Base = GraphStorage;
74
75 #[inline]
76 fn base(&self) -> &Self::Base {
77 &self.graph
78 }
79}
80
81#[cfg(feature = "search")]
82const IN_MEMORY_INDEX_NOT_PERSISTED: &str = "In-memory index not persisted. Not supported";
83
84impl Storage {
85 pub(crate) fn new(num_locks: usize) -> Self {
86 Self {
87 graph: GraphStorage::Unlocked(Arc::new(TemporalGraph::new(num_locks))),
88 #[cfg(feature = "proto")]
89 cache: OnceCell::new(),
90 #[cfg(feature = "search")]
91 index: RwLock::new(GraphIndex::Empty),
92 }
93 }
94
95 pub(crate) fn from_inner(graph: GraphStorage) -> Self {
96 Self {
97 graph,
98 #[cfg(feature = "proto")]
99 cache: OnceCell::new(),
100 #[cfg(feature = "search")]
101 index: RwLock::new(GraphIndex::Empty),
102 }
103 }
104
105 #[cfg(feature = "proto")]
106 #[inline]
107 fn if_cache(&self, map_fn: impl FnOnce(&GraphWriter)) {
108 if let Some(cache) = self.cache.get() {
109 map_fn(cache)
110 }
111 }
112
113 #[cfg(feature = "search")]
114 #[inline]
115 fn if_index(
116 &self,
117 map_fn: impl FnOnce(&GraphIndex) -> Result<(), GraphError>,
118 ) -> Result<(), GraphError> {
119 map_fn(&self.index.read())?;
120 Ok(())
121 }
122
123 #[cfg(feature = "search")]
124 #[inline]
125 fn if_index_mut(
126 &self,
127 map_fn: impl FnOnce(&MutableGraphIndex) -> Result<(), GraphError>,
128 ) -> Result<(), GraphError> {
129 let guard = self.index.read();
130 match guard.deref() {
131 GraphIndex::Empty => {}
132 GraphIndex::Mutable(i) => map_fn(i)?,
133 GraphIndex::Immutable(_) => {
134 drop(guard);
135 let mut guard = self.index.write();
136 guard.make_mutable_if_needed()?;
137 if let GraphIndex::Mutable(m) = guard.deref_mut() {
138 map_fn(m)?
139 }
140 }
141 }
142 Ok(())
143 }
144}
145
146#[cfg(feature = "search")]
147impl Storage {
148 pub(crate) fn get_index_spec(&self) -> Result<IndexSpec, GraphError> {
149 Ok(self.index.read().index_spec())
150 }
151
152 pub(crate) fn load_index_if_empty(&self, path: &GraphFolder) -> Result<(), GraphError> {
153 let guard = self.index.read();
154 match guard.deref() {
155 GraphIndex::Empty => {
156 drop(guard);
157 let mut guard = self.index.write();
158 if let e @ GraphIndex::Empty = guard.deref_mut() {
159 let index = GraphIndex::load_from_path(&path)?;
160 *e = index;
161 }
162 }
163 _ => {}
164 }
165 Ok(())
166 }
167
168 pub(crate) fn create_index_if_empty(&self, index_spec: IndexSpec) -> Result<(), GraphError> {
169 {
170 let guard = self.index.read();
171 match guard.deref() {
172 GraphIndex::Empty => {
173 drop(guard);
174 let mut guard = self.index.write();
175 if let e @ GraphIndex::Empty = guard.deref_mut() {
176 let cached_graph_path = self.get_cache().map(|cache| cache.folder.clone());
177 let index = GraphIndex::create(&self.graph, false, cached_graph_path)?;
178 *e = index;
179 }
180 }
181 _ => {}
182 }
183 }
184 self.if_index_mut(|index| index.update(&self.graph, index_spec))?;
185 Ok(())
186 }
187
188 pub(crate) fn create_index_in_ram_if_empty(
189 &self,
190 index_spec: IndexSpec,
191 ) -> Result<(), GraphError> {
192 {
193 let guard = self.index.read();
194 match guard.deref() {
195 GraphIndex::Empty => {
196 drop(guard);
197 let mut guard = self.index.write();
198 if let e @ GraphIndex::Empty = guard.deref_mut() {
199 let index = GraphIndex::create(&self.graph, true, None)?;
200 *e = index;
201 }
202 }
203 _ => {}
204 }
205 }
206 if self.index.read().path().is_some() {
207 return Err(GraphError::OnDiskIndexAlreadyExists);
208 }
209 self.if_index_mut(|index| index.update(&self.graph, index_spec))?;
210 Ok(())
211 }
212
213 pub(crate) fn get_index(&self) -> &RwLock<GraphIndex> {
214 &self.index
215 }
216
217 pub(crate) fn is_indexed(&self) -> bool {
218 self.index.read().is_indexed()
219 }
220
221 pub(crate) fn persist_index_to_disk(&self, path: &GraphFolder) -> Result<(), GraphError> {
222 let guard = self.get_index().read();
223 if guard.is_indexed() {
224 if guard.path().is_none() {
225 info!("{}", IN_MEMORY_INDEX_NOT_PERSISTED);
226 return Ok(());
227 }
228 self.if_index(|index| index.persist_to_disk(path))?;
229 }
230 Ok(())
231 }
232
233 pub(crate) fn persist_index_to_disk_zip(&self, path: &GraphFolder) -> Result<(), GraphError> {
234 let guard = self.get_index().read();
235 if guard.is_indexed() {
236 if guard.path().is_none() {
237 info!("{}", IN_MEMORY_INDEX_NOT_PERSISTED);
238 return Ok(());
239 }
240 self.if_index(|index| index.persist_to_disk_zip(path))?;
241 }
242 Ok(())
243 }
244
245 pub(crate) fn drop_index(&self) -> Result<(), GraphError> {
246 let mut guard = self.index.write();
247 *guard = GraphIndex::Empty;
248 Ok(())
249 }
250}
251
252impl InternalStorageOps for Storage {
253 fn get_storage(&self) -> Option<&Storage> {
254 Some(self)
255 }
256}
257
258impl InheritNodeHistoryFilter for Storage {}
259impl InheritEdgeHistoryFilter for Storage {}
260
261impl InheritViewOps for Storage {}
262
263impl InternalAdditionOps for Storage {
264 type Error = GraphError;
265
266 fn write_lock(&self) -> Result<WriteLockedGraph, Self::Error> {
267 Ok(self.graph.write_lock()?)
268 }
269
270 fn write_lock_nodes(&self) -> Result<WriteLockedNodes, Self::Error> {
271 Ok(self.graph.write_lock_nodes()?)
272 }
273
274 fn write_lock_edges(&self) -> Result<WriteLockedEdges, Self::Error> {
275 Ok(self.graph.write_lock_edges()?)
276 }
277
278 fn next_event_id(&self) -> Result<usize, Self::Error> {
279 Ok(self.graph.next_event_id()?)
280 }
281
282 fn reserve_event_ids(&self, num_ids: usize) -> Result<usize, Self::Error> {
283 Ok(self.graph.reserve_event_ids(num_ids)?)
284 }
285
286 fn resolve_layer(&self, layer: Option<&str>) -> Result<MaybeNew<usize>, GraphError> {
287 let id = self.graph.resolve_layer(layer)?;
288
289 #[cfg(feature = "proto")]
290 self.if_cache(|cache| cache.resolve_layer(layer, id));
291
292 Ok(id)
293 }
294
295 fn resolve_node(&self, id: NodeRef) -> Result<MaybeNew<VID>, GraphError> {
296 match id {
297 NodeRef::Internal(id) => Ok(MaybeNew::Existing(id)),
298 NodeRef::External(gid) => {
299 let id = self.graph.resolve_node(id)?;
300
301 #[cfg(feature = "proto")]
302 self.if_cache(|cache| cache.resolve_node(id, gid));
303
304 Ok(id)
305 }
306 }
307 }
308
309 fn set_node(&self, gid: GidRef, vid: VID) -> Result<(), Self::Error> {
310 Ok(self.graph.set_node(gid, vid)?)
311 }
312
313 fn resolve_node_and_type(
314 &self,
315 id: NodeRef,
316 node_type: &str,
317 ) -> Result<MaybeNew<(MaybeNew<VID>, MaybeNew<usize>)>, GraphError> {
318 let node_and_type = self.graph.resolve_node_and_type(id, node_type)?;
319
320 #[cfg(feature = "proto")]
321 self.if_cache(|cache| {
322 let (vid, _) = node_and_type.inner();
323 let node_entry = self.graph.core_node(vid.inner());
324 cache.resolve_node_and_type(node_and_type, node_type, node_entry.id())
325 });
326
327 Ok(node_and_type)
328 }
329
330 fn resolve_graph_property(
331 &self,
332 prop: &str,
333 dtype: PropType,
334 is_static: bool,
335 ) -> Result<MaybeNew<usize>, GraphError> {
336 let id = self
337 .graph
338 .resolve_graph_property(prop, dtype.clone(), is_static)?;
339
340 #[cfg(feature = "proto")]
341 self.if_cache(|cache| cache.resolve_graph_property(prop, id, dtype, is_static));
342
343 Ok(id)
344 }
345
346 fn resolve_node_property(
347 &self,
348 prop: &str,
349 dtype: PropType,
350 is_static: bool,
351 ) -> Result<MaybeNew<usize>, GraphError> {
352 let id = self
353 .graph
354 .resolve_node_property(prop, dtype.clone(), is_static)?;
355
356 #[cfg(feature = "proto")]
357 self.if_cache(|cache| cache.resolve_node_property(prop, id, &dtype, is_static));
358
359 Ok(id)
360 }
361
362 fn resolve_edge_property(
363 &self,
364 prop: &str,
365 dtype: PropType,
366 is_static: bool,
367 ) -> Result<MaybeNew<usize>, GraphError> {
368 let id = self
369 .graph
370 .resolve_edge_property(prop, dtype.clone(), is_static)?;
371
372 #[cfg(feature = "proto")]
373 self.if_cache(|cache| cache.resolve_edge_property(prop, id, &dtype, is_static));
374
375 Ok(id)
376 }
377
378 fn internal_add_node(
379 &self,
380 t: TimeIndexEntry,
381 v: VID,
382 props: &[(usize, Prop)],
383 ) -> Result<(), GraphError> {
384 self.graph.internal_add_node(t, v, props)?;
385
386 #[cfg(feature = "proto")]
387 self.if_cache(|cache| cache.add_node_update(t, v, props));
388
389 #[cfg(feature = "search")]
390 self.if_index_mut(|index| index.add_node_update(&self.graph, t, MaybeNew::New(v), props))?;
391
392 Ok(())
393 }
394
395 fn internal_add_edge(
396 &self,
397 t: TimeIndexEntry,
398 src: VID,
399 dst: VID,
400 props: &[(usize, Prop)],
401 layer: usize,
402 ) -> Result<MaybeNew<EID>, GraphError> {
403 let id = self.graph.internal_add_edge(t, src, dst, props, layer)?;
404
405 #[cfg(feature = "proto")]
406 self.if_cache(|cache| {
407 cache.resolve_edge(id, src, dst);
408 cache.add_edge_update(t, id.inner(), props, layer);
409 });
410
411 #[cfg(feature = "search")]
412 self.if_index_mut(|index| index.add_edge_update(&self.graph, id, t, layer, props))?;
413
414 Ok(id)
415 }
416
417 fn internal_add_edge_update(
418 &self,
419 t: TimeIndexEntry,
420 edge: EID,
421 props: &[(usize, Prop)],
422 layer: usize,
423 ) -> Result<(), GraphError> {
424 self.graph.internal_add_edge_update(t, edge, props, layer)?;
425
426 #[cfg(feature = "proto")]
427 self.if_cache(|cache| cache.add_edge_update(t, edge, props, layer));
428
429 #[cfg(feature = "search")]
430 self.if_index_mut(|index| {
431 index.add_edge_update(&self.graph, MaybeNew::Existing(edge), t, layer, props)
432 })?;
433
434 Ok(())
435 }
436}
437
438impl InternalPropertyAdditionOps for Storage {
439 type Error = GraphError;
440 fn internal_add_properties(
441 &self,
442 t: TimeIndexEntry,
443 props: &[(usize, Prop)],
444 ) -> Result<(), GraphError> {
445 self.graph.internal_add_properties(t, props)?;
446
447 #[cfg(feature = "proto")]
448 self.if_cache(|cache| cache.add_graph_tprops(t, props));
449
450 Ok(())
451 }
452
453 fn internal_add_metadata(&self, props: &[(usize, Prop)]) -> Result<(), GraphError> {
454 self.graph.internal_add_metadata(props)?;
455
456 #[cfg(feature = "proto")]
457 self.if_cache(|cache| cache.add_graph_cprops(props));
458
459 Ok(())
460 }
461
462 fn internal_update_metadata(&self, props: &[(usize, Prop)]) -> Result<(), GraphError> {
463 self.graph.internal_update_metadata(props)?;
464
465 #[cfg(feature = "proto")]
466 self.if_cache(|cache| cache.add_graph_cprops(props));
467
468 Ok(())
469 }
470
471 fn internal_add_node_metadata(
472 &self,
473 vid: VID,
474 props: &[(usize, Prop)],
475 ) -> Result<EntryMut<RwLockWriteGuard<NodeSlot>>, Self::Error> {
476 let lock = self.graph.internal_add_node_metadata(vid, props)?;
477
478 #[cfg(feature = "proto")]
479 self.if_cache(|cache| cache.add_node_cprops(vid, props));
480
481 #[cfg(feature = "search")]
482 self.if_index_mut(|index| index.add_node_metadata(vid, props))?;
483
484 Ok(lock)
485 }
486
487 fn internal_update_node_metadata(
488 &self,
489 vid: VID,
490 props: &[(usize, Prop)],
491 ) -> Result<EntryMut<RwLockWriteGuard<NodeSlot>>, Self::Error> {
492 let lock = self.graph.internal_update_node_metadata(vid, props)?;
493
494 #[cfg(feature = "proto")]
495 self.if_cache(|cache| cache.add_node_cprops(vid, props));
496
497 #[cfg(feature = "search")]
498 self.if_index_mut(|index| index.update_node_metadata(vid, props))?;
499
500 Ok(lock)
501 }
502
503 fn internal_add_edge_metadata(
504 &self,
505 eid: EID,
506 layer: usize,
507 props: &[(usize, Prop)],
508 ) -> Result<EdgeWGuard, Self::Error> {
509 let lock = self.graph.internal_add_edge_metadata(eid, layer, props)?;
510
511 #[cfg(feature = "proto")]
512 self.if_cache(|cache| cache.add_edge_cprops(eid, layer, props));
513
514 #[cfg(feature = "search")]
515 self.if_index_mut(|index| index.add_edge_metadata(eid, layer, props))?;
516
517 Ok(lock)
518 }
519
520 fn internal_update_edge_metadata(
521 &self,
522 eid: EID,
523 layer: usize,
524 props: &[(usize, Prop)],
525 ) -> Result<EdgeWGuard, Self::Error> {
526 let lock = self
527 .graph
528 .internal_update_edge_metadata(eid, layer, props)?;
529
530 #[cfg(feature = "proto")]
531 self.if_cache(|cache| cache.add_edge_cprops(eid, layer, props));
532
533 #[cfg(feature = "search")]
534 self.if_index_mut(|index| index.update_edge_metadata(eid, layer, props))?;
535
536 Ok(lock)
537 }
538}
539
540impl InternalDeletionOps for Storage {
541 type Error = GraphError;
542 fn internal_delete_edge(
543 &self,
544 t: TimeIndexEntry,
545 src: VID,
546 dst: VID,
547 layer: usize,
548 ) -> Result<MaybeNew<EID>, GraphError> {
549 let eid = self.graph.internal_delete_edge(t, src, dst, layer)?;
550
551 #[cfg(feature = "proto")]
552 self.if_cache(|cache| {
553 cache.resolve_edge(eid, src, dst);
554 cache.delete_edge(eid.inner(), t, layer);
555 });
556
557 Ok(eid)
558 }
559
560 fn internal_delete_existing_edge(
561 &self,
562 t: TimeIndexEntry,
563 eid: EID,
564 layer: usize,
565 ) -> Result<(), GraphError> {
566 self.graph.internal_delete_existing_edge(t, eid, layer)?;
567
568 #[cfg(feature = "proto")]
569 self.if_cache(|cache| cache.delete_edge(eid, t, layer));
570
571 Ok(())
572 }
573}