raphtory_core/entities/graph/
tgraph.rs

1use super::logical_to_physical::{InvalidNodeId, Mapping};
2use crate::{
3    entities::{
4        edges::edge_store::EdgeStore,
5        graph::{
6            tgraph_storage::GraphStorage,
7            timer::{MaxCounter, MinCounter, TimeCounterTrait},
8        },
9        nodes::{node_ref::NodeRef, node_store::NodeStore},
10        properties::graph_meta::GraphMeta,
11        LayerIds, EID, VID,
12    },
13    storage::{
14        raw_edges::EdgeWGuard,
15        timeindex::{AsTime, TimeIndexEntry},
16        NodeEntry, PairEntryMut,
17    },
18};
19use dashmap::DashSet;
20use either::Either;
21use raphtory_api::core::{
22    entities::{
23        properties::{meta::Meta, prop::Prop},
24        GidRef, Layer, Multiple, MAX_LAYER,
25    },
26    input::input_node::InputNode,
27    storage::{arc_str::ArcStr, dict_mapper::MaybeNew},
28    Direction,
29};
30use rustc_hash::FxHasher;
31use serde::{Deserialize, Serialize};
32use std::{fmt::Debug, hash::BuildHasherDefault, sync::atomic::AtomicUsize};
33use thiserror::Error;
34
35pub(crate) type FxDashSet<K> = DashSet<K, BuildHasherDefault<FxHasher>>;
36
37#[derive(Serialize, Deserialize, Debug)]
38pub struct TemporalGraph {
39    pub storage: GraphStorage,
40    // mapping between logical and physical ids
41    pub logical_to_physical: Mapping,
42    string_pool: FxDashSet<ArcStr>,
43    pub event_counter: AtomicUsize,
44    //earliest time seen in this graph
45    pub earliest_time: MinCounter,
46    //latest time seen in this graph
47    pub latest_time: MaxCounter,
48    // props meta data for nodes (mapping between strings and ids)
49    pub node_meta: Meta,
50    // props meta data for edges (mapping between strings and ids)
51    pub edge_meta: Meta,
52    // graph properties
53    pub graph_meta: GraphMeta,
54}
55
56#[derive(Error, Debug)]
57#[error("Invalid layer: {invalid_layer}. Valid layers: {valid_layers}")]
58pub struct InvalidLayer {
59    invalid_layer: ArcStr,
60    valid_layers: String,
61}
62
63#[derive(Error, Debug)]
64#[error("At most {MAX_LAYER} layers are supported.")]
65pub struct TooManyLayers;
66
67impl InvalidLayer {
68    pub fn new(invalid_layer: ArcStr, valid: Vec<String>) -> Self {
69        let valid_layers = valid.join(", ");
70        Self {
71            invalid_layer,
72            valid_layers,
73        }
74    }
75}
76
77impl std::fmt::Display for TemporalGraph {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        write!(
80            f,
81            "Graph(num_nodes={}, num_edges={})",
82            self.storage.nodes_len(),
83            self.storage.edges_len()
84        )
85    }
86}
87
88impl Default for TemporalGraph {
89    fn default() -> Self {
90        Self::new(rayon::current_num_threads())
91    }
92}
93
94impl TemporalGraph {
95    pub fn new(num_locks: usize) -> Self {
96        TemporalGraph {
97            logical_to_physical: Mapping::new(),
98            string_pool: Default::default(),
99            storage: GraphStorage::new(num_locks),
100            event_counter: AtomicUsize::new(0),
101            earliest_time: MinCounter::new(),
102            latest_time: MaxCounter::new(),
103            node_meta: Meta::new(),
104            edge_meta: Meta::new(),
105            graph_meta: GraphMeta::new(),
106        }
107    }
108
109    pub fn process_prop_value(&self, prop: &Prop) -> Prop {
110        match prop {
111            Prop::Str(value) => Prop::Str(self.resolve_str(value)),
112            _ => prop.clone(),
113        }
114    }
115
116    fn get_valid_layers(edge_meta: &Meta) -> Vec<String> {
117        edge_meta
118            .layer_meta()
119            .get_keys()
120            .iter()
121            .map(|x| x.to_string())
122            .collect::<Vec<_>>()
123    }
124
125    pub fn num_layers(&self) -> usize {
126        self.edge_meta.layer_meta().len()
127    }
128
129    pub fn resolve_node_inner(&self, id: NodeRef) -> Result<MaybeNew<VID>, InvalidNodeId> {
130        match id {
131            NodeRef::External(id) => self.logical_to_physical.get_or_init_node(id, || {
132                let node_store = NodeStore::empty(id.into());
133                self.storage.push_node(node_store)
134            }),
135            NodeRef::Internal(id) => Ok(MaybeNew::Existing(id)),
136        }
137    }
138
139    /// map layer name to id and allocate a new layer if needed
140    pub fn resolve_layer_inner(
141        &self,
142        layer: Option<&str>,
143    ) -> Result<MaybeNew<usize>, TooManyLayers> {
144        let id = self.edge_meta.get_or_create_layer_id(layer);
145        if let MaybeNew::New(id) = id {
146            if id > MAX_LAYER {
147                Err(TooManyLayers)?;
148            }
149        }
150        Ok(id)
151    }
152
153    pub fn layer_ids(&self, key: Layer) -> Result<LayerIds, InvalidLayer> {
154        match key {
155            Layer::None => Ok(LayerIds::None),
156            Layer::All => Ok(LayerIds::All),
157            Layer::Default => Ok(LayerIds::One(0)),
158            Layer::One(id) => match self.edge_meta.get_layer_id(&id) {
159                Some(id) => Ok(LayerIds::One(id)),
160                None => Err(InvalidLayer::new(
161                    id,
162                    Self::get_valid_layers(&self.edge_meta),
163                )),
164            },
165            Layer::Multiple(ids) => {
166                let mut new_layers = ids
167                    .iter()
168                    .map(|id| {
169                        self.edge_meta.get_layer_id(id).ok_or_else(|| {
170                            InvalidLayer::new(id.clone(), Self::get_valid_layers(&self.edge_meta))
171                        })
172                    })
173                    .collect::<Result<Vec<_>, InvalidLayer>>()?;
174                let num_layers = self.num_layers();
175                let num_new_layers = new_layers.len();
176                if num_new_layers == 0 {
177                    Ok(LayerIds::None)
178                } else if num_new_layers == 1 {
179                    Ok(LayerIds::One(new_layers[0]))
180                } else if num_new_layers == num_layers {
181                    Ok(LayerIds::All)
182                } else {
183                    new_layers.sort_unstable();
184                    new_layers.dedup();
185                    Ok(LayerIds::Multiple(new_layers.into()))
186                }
187            }
188        }
189    }
190
191    pub fn valid_layer_ids(&self, key: Layer) -> LayerIds {
192        match key {
193            Layer::None => LayerIds::None,
194            Layer::All => LayerIds::All,
195            Layer::Default => LayerIds::One(0),
196            Layer::One(id) => match self.edge_meta.get_layer_id(&id) {
197                Some(id) => LayerIds::One(id),
198                None => LayerIds::None,
199            },
200            Layer::Multiple(ids) => {
201                let new_layers: Multiple = ids
202                    .iter()
203                    .flat_map(|id| self.edge_meta.get_layer_id(id))
204                    .collect();
205                let num_layers = self.num_layers();
206                let num_new_layers = new_layers.len();
207                if num_new_layers == 0 {
208                    LayerIds::None
209                } else if num_new_layers == 1 {
210                    LayerIds::One(new_layers.get_id_by_index(0).unwrap())
211                } else if num_new_layers == num_layers {
212                    LayerIds::All
213                } else {
214                    LayerIds::Multiple(new_layers)
215                }
216            }
217        }
218    }
219
220    pub fn get_layer_name(&self, layer: usize) -> ArcStr {
221        self.edge_meta.get_layer_name_by_id(layer)
222    }
223
224    #[inline]
225    pub fn graph_earliest_time(&self) -> Option<i64> {
226        Some(self.earliest_time.get()).filter(|t| *t != i64::MAX)
227    }
228
229    #[inline]
230    pub fn graph_latest_time(&self) -> Option<i64> {
231        Some(self.latest_time.get()).filter(|t| *t != i64::MIN)
232    }
233
234    #[inline]
235    pub fn internal_num_nodes(&self) -> usize {
236        self.storage.nodes.len()
237    }
238
239    #[inline]
240    pub fn update_time(&self, time: TimeIndexEntry) {
241        let t = time.t();
242        self.earliest_time.update(t);
243        self.latest_time.update(t);
244    }
245
246    pub(crate) fn link_nodes_inner(
247        &self,
248        node_pair: &mut PairEntryMut,
249        edge_id: EID,
250        t: TimeIndexEntry,
251        layer: usize,
252        is_deletion: bool,
253    ) {
254        self.update_time(t);
255        let src_id = node_pair.get_i().vid;
256        let dst_id = node_pair.get_j().vid;
257        let src = node_pair.get_mut_i();
258        let elid = if is_deletion {
259            edge_id.with_layer_deletion(layer)
260        } else {
261            edge_id.with_layer(layer)
262        };
263        src.add_edge(dst_id, Direction::OUT, layer, edge_id);
264        src.update_time(t, elid);
265        let dst = node_pair.get_mut_j();
266        dst.add_edge(src_id, Direction::IN, layer, edge_id);
267        dst.update_time(t, elid);
268    }
269
270    pub fn link_edge(
271        &self,
272        eid: EID,
273        t: TimeIndexEntry,
274        layer: usize,
275        is_deletion: bool,
276    ) -> EdgeWGuard {
277        let (src, dst) = {
278            let edge_r = self.storage.edges.get_edge(eid);
279            let edge_r = edge_r.as_mem_edge().edge_store();
280            (edge_r.src, edge_r.dst)
281        };
282        // need to get the node pair first to avoid deadlocks with link_nodes
283        let mut node_pair = self.storage.pair_node_mut(src, dst);
284        self.link_nodes_inner(&mut node_pair, eid, t, layer, is_deletion);
285        self.storage.edges.get_edge_mut(eid)
286    }
287
288    pub fn link_nodes(
289        &self,
290        src_id: VID,
291        dst_id: VID,
292        t: TimeIndexEntry,
293        layer: usize,
294        is_deletion: bool,
295    ) -> MaybeNew<EdgeWGuard> {
296        let edge = {
297            let mut node_pair = self.storage.pair_node_mut(src_id, dst_id);
298            let src = node_pair.get_i();
299            let mut edge = match src.find_edge_eid(dst_id, &LayerIds::All) {
300                Some(edge_id) => Either::Left(self.storage.get_edge_mut(edge_id)),
301                None => Either::Right(self.storage.push_edge(EdgeStore::new(src_id, dst_id))),
302            };
303            let eid = match edge.as_mut() {
304                Either::Left(edge) => edge.as_ref().eid(),
305                Either::Right(edge) => edge.value().eid,
306            };
307            self.link_nodes_inner(&mut node_pair, eid, t, layer, is_deletion);
308            edge
309        };
310
311        match edge {
312            Either::Left(edge) => MaybeNew::Existing(edge),
313            Either::Right(edge) => {
314                let edge = edge.init();
315                MaybeNew::New(edge)
316            }
317        }
318    }
319
320    #[inline]
321    pub fn resolve_node_ref(&self, v: NodeRef) -> Option<VID> {
322        match v {
323            NodeRef::Internal(vid) => Some(vid),
324            NodeRef::External(GidRef::U64(gid)) => self.logical_to_physical.get_u64(gid),
325            NodeRef::External(GidRef::Str(string)) => self
326                .logical_to_physical
327                .get_str(string)
328                .or_else(|| self.logical_to_physical.get_u64(string.id())),
329        }
330    }
331
332    /// Checks if the same string value already exists and returns a pointer to the same existing value if it exists,
333    /// otherwise adds the string to the pool.
334    fn resolve_str(&self, value: &ArcStr) -> ArcStr {
335        match self.string_pool.get(value) {
336            Some(value) => value.clone(),
337            None => {
338                self.string_pool.insert(value.clone());
339                self.string_pool
340                    .get(value)
341                    .expect("value should exist as inserted above")
342                    .clone()
343            }
344        }
345    }
346
347    pub fn node(&self, id: VID) -> NodeEntry {
348        self.storage.get_node(id)
349    }
350}