raphtory_storage/mutation/
addition_ops.rs

1use crate::{
2    graph::{graph::GraphStorage, locked::WriteLockedGraph},
3    mutation::MutationError,
4};
5use raphtory_api::{
6    core::{
7        entities::{
8            properties::prop::{Prop, PropType},
9            GidRef, EID, VID,
10        },
11        storage::{dict_mapper::MaybeNew, timeindex::TimeIndexEntry},
12    },
13    inherit::Base,
14};
15use raphtory_core::{
16    entities::{graph::tgraph::TemporalGraph, nodes::node_ref::NodeRef},
17    storage::{raw_edges::WriteLockedEdges, WriteLockedNodes},
18};
19use std::sync::atomic::Ordering;
20
21pub trait InternalAdditionOps {
22    type Error: From<MutationError>;
23    fn write_lock(&self) -> Result<WriteLockedGraph<'_>, Self::Error>;
24    fn write_lock_nodes(&self) -> Result<WriteLockedNodes<'_>, Self::Error>;
25    fn write_lock_edges(&self) -> Result<WriteLockedEdges<'_>, Self::Error>;
26    /// get the sequence id for the next event
27    fn next_event_id(&self) -> Result<usize, Self::Error>;
28    fn reserve_event_ids(&self, num_ids: usize) -> Result<usize, Self::Error>;
29    /// map layer name to id and allocate a new layer if needed
30    fn resolve_layer(&self, layer: Option<&str>) -> Result<MaybeNew<usize>, Self::Error>;
31    /// map external node id to internal id, allocating a new empty node if needed
32    fn resolve_node(&self, id: NodeRef) -> Result<MaybeNew<VID>, Self::Error>;
33    fn set_node(&self, gid: GidRef, vid: VID) -> Result<(), Self::Error>;
34    /// resolve a node and corresponding type, outer MaybeNew tracks whether the type assignment is new for the node even if both node and type already existed.
35    fn resolve_node_and_type(
36        &self,
37        id: NodeRef,
38        node_type: &str,
39    ) -> Result<MaybeNew<(MaybeNew<VID>, MaybeNew<usize>)>, Self::Error>;
40    /// map property key to internal id, allocating new property if needed
41    fn resolve_graph_property(
42        &self,
43        prop: &str,
44        dtype: PropType,
45        is_static: bool,
46    ) -> Result<MaybeNew<usize>, Self::Error>;
47    /// map property key to internal id, allocating new property if needed and checking property type.
48    /// returns `None` if the type does not match
49    fn resolve_node_property(
50        &self,
51        prop: &str,
52        dtype: PropType,
53        is_static: bool,
54    ) -> Result<MaybeNew<usize>, Self::Error>;
55    fn resolve_edge_property(
56        &self,
57        prop: &str,
58        dtype: PropType,
59        is_static: bool,
60    ) -> Result<MaybeNew<usize>, Self::Error>;
61    /// add node update
62    fn internal_add_node(
63        &self,
64        t: TimeIndexEntry,
65        v: VID,
66        props: &[(usize, Prop)],
67    ) -> Result<(), Self::Error>;
68    /// add edge update
69    fn internal_add_edge(
70        &self,
71        t: TimeIndexEntry,
72        src: VID,
73        dst: VID,
74        props: &[(usize, Prop)],
75        layer: usize,
76    ) -> Result<MaybeNew<EID>, Self::Error>;
77    /// add update for an existing edge
78    fn internal_add_edge_update(
79        &self,
80        t: TimeIndexEntry,
81        edge: EID,
82        props: &[(usize, Prop)],
83        layer: usize,
84    ) -> Result<(), Self::Error>;
85}
86
87impl InternalAdditionOps for TemporalGraph {
88    type Error = MutationError;
89
90    fn write_lock(&self) -> Result<WriteLockedGraph<'_>, Self::Error> {
91        Ok(WriteLockedGraph::new(self))
92    }
93
94    fn write_lock_nodes(&self) -> Result<WriteLockedNodes<'_>, Self::Error> {
95        Ok(self.storage.nodes.write_lock())
96    }
97
98    fn write_lock_edges(&self) -> Result<WriteLockedEdges<'_>, Self::Error> {
99        Ok(self.storage.edges.write_lock())
100    }
101
102    /// get the sequence id for the next event
103    fn next_event_id(&self) -> Result<usize, Self::Error> {
104        Ok(self.event_counter.fetch_add(1, Ordering::Relaxed))
105    }
106
107    fn reserve_event_ids(&self, num_ids: usize) -> Result<usize, Self::Error> {
108        Ok(self.event_counter.fetch_add(num_ids, Ordering::Relaxed))
109    }
110
111    /// map layer name to id and allocate a new layer if needed
112    fn resolve_layer(&self, layer: Option<&str>) -> Result<MaybeNew<usize>, Self::Error> {
113        let id = self
114            .resolve_layer_inner(layer)
115            .map_err(MutationError::from)?;
116        Ok(id)
117    }
118
119    /// map external node id to internal id, allocating a new empty node if needed
120    fn resolve_node(&self, id: NodeRef) -> Result<MaybeNew<VID>, Self::Error> {
121        Ok(self.resolve_node_inner(id)?)
122    }
123
124    fn set_node(&self, gid: GidRef, vid: VID) -> Result<(), Self::Error> {
125        Ok(self.logical_to_physical.set(gid, vid)?)
126    }
127
128    /// resolve a node and corresponding type, outer MaybeNew tracks whether the type assignment is new for the node even if both node and type already existed.
129    fn resolve_node_and_type(
130        &self,
131        id: NodeRef,
132        node_type: &str,
133    ) -> Result<MaybeNew<(MaybeNew<VID>, MaybeNew<usize>)>, Self::Error> {
134        let vid = self.resolve_node(id)?;
135        let mut entry = self.storage.get_node_mut(vid.inner());
136        let mut entry_ref = entry.to_mut();
137        let node_store = entry_ref.node_store_mut();
138        if node_store.node_type == 0 {
139            let node_type_id = self.node_meta.get_or_create_node_type_id(node_type);
140            node_store.update_node_type(node_type_id.inner());
141            Ok(MaybeNew::New((vid, node_type_id)))
142        } else {
143            let node_type_id = self
144                .node_meta
145                .get_node_type_id(node_type)
146                .filter(|&node_type| node_type == node_store.node_type)
147                .ok_or(MutationError::NodeTypeError)?;
148            Ok(MaybeNew::Existing((vid, MaybeNew::Existing(node_type_id))))
149        }
150    }
151
152    /// map property key to internal id, allocating new property if needed
153    fn resolve_graph_property(
154        &self,
155        prop: &str,
156        dtype: PropType,
157        is_static: bool,
158    ) -> Result<MaybeNew<usize>, Self::Error> {
159        Ok(self.graph_meta.resolve_property(prop, dtype, is_static)?)
160    }
161
162    /// map property key to internal id, allocating new property if needed and checking property type.
163    /// returns `None` if the type does not match
164    fn resolve_node_property(
165        &self,
166        prop: &str,
167        dtype: PropType,
168        is_static: bool,
169    ) -> Result<MaybeNew<usize>, Self::Error> {
170        Ok(self.node_meta.resolve_prop_id(prop, dtype, is_static)?)
171    }
172
173    fn resolve_edge_property(
174        &self,
175        prop: &str,
176        dtype: PropType,
177        is_static: bool,
178    ) -> Result<MaybeNew<usize>, Self::Error> {
179        Ok(self.edge_meta.resolve_prop_id(prop, dtype, is_static)?)
180    }
181
182    /// add node update
183    fn internal_add_node(
184        &self,
185        t: TimeIndexEntry,
186        v: VID,
187        props: &[(usize, Prop)],
188    ) -> Result<(), Self::Error> {
189        self.update_time(t);
190        let mut entry = self.storage.get_node_mut(v);
191        let mut node = entry.to_mut();
192        let prop_i = node
193            .t_props_log_mut()
194            .push(props.iter().map(|(prop_id, prop)| {
195                let prop = self.process_prop_value(prop);
196                (*prop_id, prop)
197            }))
198            .map_err(MutationError::from)?;
199        node.node_store_mut().update_t_prop_time(t, prop_i);
200        Ok(())
201    }
202
203    /// add edge update
204    fn internal_add_edge(
205        &self,
206        t: TimeIndexEntry,
207        src: VID,
208        dst: VID,
209        props: &[(usize, Prop)],
210        layer: usize,
211    ) -> Result<MaybeNew<EID>, Self::Error> {
212        let edge = self.link_nodes(src, dst, t, layer, false);
213        edge.try_map(|mut edge| {
214            let eid = edge.eid();
215            let mut edge = edge.as_mut();
216            edge.additions_mut(layer).insert(t);
217            if !props.is_empty() {
218                let edge_layer = edge.layer_mut(layer);
219                for (prop_id, prop) in props {
220                    let prop = self.process_prop_value(prop);
221                    edge_layer
222                        .add_prop(t, *prop_id, prop)
223                        .map_err(MutationError::from)?;
224                }
225            }
226            Ok(eid)
227        })
228    }
229
230    /// add update for an existing edge
231    fn internal_add_edge_update(
232        &self,
233        t: TimeIndexEntry,
234        edge: EID,
235        props: &[(usize, Prop)],
236        layer: usize,
237    ) -> Result<(), Self::Error> {
238        let mut edge = self.link_edge(edge, t, layer, false);
239        let mut edge = edge.as_mut();
240        edge.additions_mut(layer).insert(t);
241        if !props.is_empty() {
242            let edge_layer = edge.layer_mut(layer);
243            for (prop_id, prop) in props {
244                let prop = self.process_prop_value(prop);
245                edge_layer
246                    .add_prop(t, *prop_id, prop)
247                    .map_err(MutationError::from)?
248            }
249        }
250        Ok(())
251    }
252}
253
254impl InternalAdditionOps for GraphStorage {
255    type Error = MutationError;
256
257    fn write_lock(&self) -> Result<WriteLockedGraph<'_>, Self::Error> {
258        self.mutable()?.write_lock()
259    }
260
261    fn write_lock_nodes(&self) -> Result<WriteLockedNodes<'_>, Self::Error> {
262        self.mutable()?.write_lock_nodes()
263    }
264
265    fn write_lock_edges(&self) -> Result<WriteLockedEdges<'_>, Self::Error> {
266        self.mutable()?.write_lock_edges()
267    }
268
269    fn next_event_id(&self) -> Result<usize, Self::Error> {
270        self.mutable()?.next_event_id()
271    }
272
273    fn reserve_event_ids(&self, num_ids: usize) -> Result<usize, Self::Error> {
274        self.mutable()?.reserve_event_ids(num_ids)
275    }
276
277    fn resolve_layer(&self, layer: Option<&str>) -> Result<MaybeNew<usize>, Self::Error> {
278        self.mutable()?.resolve_layer(layer)
279    }
280
281    fn resolve_node(&self, id: NodeRef) -> Result<MaybeNew<VID>, Self::Error> {
282        self.mutable()?.resolve_node(id)
283    }
284
285    fn set_node(&self, gid: GidRef, vid: VID) -> Result<(), Self::Error> {
286        self.mutable()?.set_node(gid, vid)
287    }
288
289    fn resolve_node_and_type(
290        &self,
291        id: NodeRef,
292        node_type: &str,
293    ) -> Result<MaybeNew<(MaybeNew<VID>, MaybeNew<usize>)>, Self::Error> {
294        self.mutable()?.resolve_node_and_type(id, node_type)
295    }
296
297    fn resolve_graph_property(
298        &self,
299        prop: &str,
300        dtype: PropType,
301        is_static: bool,
302    ) -> Result<MaybeNew<usize>, Self::Error> {
303        self.mutable()?
304            .resolve_graph_property(prop, dtype, is_static)
305    }
306
307    fn resolve_node_property(
308        &self,
309        prop: &str,
310        dtype: PropType,
311        is_static: bool,
312    ) -> Result<MaybeNew<usize>, Self::Error> {
313        self.mutable()?
314            .resolve_node_property(prop, dtype, is_static)
315    }
316
317    fn resolve_edge_property(
318        &self,
319        prop: &str,
320        dtype: PropType,
321        is_static: bool,
322    ) -> Result<MaybeNew<usize>, Self::Error> {
323        self.mutable()?
324            .resolve_edge_property(prop, dtype, is_static)
325    }
326
327    fn internal_add_node(
328        &self,
329        t: TimeIndexEntry,
330        v: VID,
331        props: &[(usize, Prop)],
332    ) -> Result<(), Self::Error> {
333        self.mutable()?.internal_add_node(t, v, props)
334    }
335
336    fn internal_add_edge(
337        &self,
338        t: TimeIndexEntry,
339        src: VID,
340        dst: VID,
341        props: &[(usize, Prop)],
342        layer: usize,
343    ) -> Result<MaybeNew<EID>, Self::Error> {
344        self.mutable()?.internal_add_edge(t, src, dst, props, layer)
345    }
346
347    fn internal_add_edge_update(
348        &self,
349        t: TimeIndexEntry,
350        edge: EID,
351        props: &[(usize, Prop)],
352        layer: usize,
353    ) -> Result<(), Self::Error> {
354        self.mutable()?
355            .internal_add_edge_update(t, edge, props, layer)
356    }
357}
358
359pub trait InheritAdditionOps: Base {}
360
361impl<G: InheritAdditionOps> InternalAdditionOps for G
362where
363    G::Base: InternalAdditionOps,
364{
365    type Error = <G::Base as InternalAdditionOps>::Error;
366
367    #[inline]
368    fn write_lock(&self) -> Result<WriteLockedGraph<'_>, Self::Error> {
369        self.base().write_lock()
370    }
371
372    #[inline]
373    fn write_lock_nodes(&self) -> Result<WriteLockedNodes<'_>, Self::Error> {
374        self.base().write_lock_nodes()
375    }
376
377    #[inline]
378    fn write_lock_edges(&self) -> Result<WriteLockedEdges<'_>, Self::Error> {
379        self.base().write_lock_edges()
380    }
381
382    #[inline]
383    fn next_event_id(&self) -> Result<usize, Self::Error> {
384        self.base().next_event_id()
385    }
386
387    #[inline]
388    fn reserve_event_ids(&self, num_ids: usize) -> Result<usize, Self::Error> {
389        self.base().reserve_event_ids(num_ids)
390    }
391
392    #[inline]
393    fn resolve_layer(&self, layer: Option<&str>) -> Result<MaybeNew<usize>, Self::Error> {
394        self.base().resolve_layer(layer)
395    }
396
397    #[inline]
398    fn resolve_node(&self, id: NodeRef) -> Result<MaybeNew<VID>, Self::Error> {
399        self.base().resolve_node(id)
400    }
401
402    #[inline]
403    fn set_node(&self, gid: GidRef, vid: VID) -> Result<(), Self::Error> {
404        self.base().set_node(gid, vid)
405    }
406
407    #[inline]
408    fn resolve_node_and_type(
409        &self,
410        id: NodeRef,
411        node_type: &str,
412    ) -> Result<MaybeNew<(MaybeNew<VID>, MaybeNew<usize>)>, Self::Error> {
413        self.base().resolve_node_and_type(id, node_type)
414    }
415
416    #[inline]
417    fn resolve_graph_property(
418        &self,
419        prop: &str,
420        dtype: PropType,
421        is_static: bool,
422    ) -> Result<MaybeNew<usize>, Self::Error> {
423        self.base().resolve_graph_property(prop, dtype, is_static)
424    }
425
426    #[inline]
427    fn resolve_node_property(
428        &self,
429        prop: &str,
430        dtype: PropType,
431        is_static: bool,
432    ) -> Result<MaybeNew<usize>, Self::Error> {
433        self.base().resolve_node_property(prop, dtype, is_static)
434    }
435
436    #[inline]
437    fn resolve_edge_property(
438        &self,
439        prop: &str,
440        dtype: PropType,
441        is_static: bool,
442    ) -> Result<MaybeNew<usize>, Self::Error> {
443        self.base().resolve_edge_property(prop, dtype, is_static)
444    }
445
446    #[inline]
447    fn internal_add_node(
448        &self,
449        t: TimeIndexEntry,
450        v: VID,
451        props: &[(usize, Prop)],
452    ) -> Result<(), Self::Error> {
453        self.base().internal_add_node(t, v, props)
454    }
455
456    #[inline]
457    fn internal_add_edge(
458        &self,
459        t: TimeIndexEntry,
460        src: VID,
461        dst: VID,
462        props: &[(usize, Prop)],
463        layer: usize,
464    ) -> Result<MaybeNew<EID>, Self::Error> {
465        self.base().internal_add_edge(t, src, dst, props, layer)
466    }
467
468    #[inline]
469    fn internal_add_edge_update(
470        &self,
471        t: TimeIndexEntry,
472        edge: EID,
473        props: &[(usize, Prop)],
474        layer: usize,
475    ) -> Result<(), Self::Error> {
476        self.base().internal_add_edge_update(t, edge, props, layer)
477    }
478}