Skip to main content

raphtory_core/entities/graph/
tgraph.rs

1use super::logical_to_physical::{InvalidNodeId, Mapping};
2use crate::{
3    entities::{
4        edges::edge_store::EdgeStore,
5        graph::{
6            tgraph_storage::GraphStorage,
7            timer::{MaxCounter, MinCounter, TimeCounterTrait},
8        },
9        nodes::{node_ref::NodeRef, node_store::NodeStore},
10        properties::graph_meta::GraphMeta,
11        LayerIds, EID, VID,
12    },
13    storage::{
14        raw_edges::EdgeWGuard,
15        timeindex::{AsTime, EventTime},
16        NodeEntry, PairEntryMut,
17    },
18};
19use dashmap::DashSet;
20use either::Either;
21use raphtory_api::core::{
22    entities::{
23        properties::{meta::Meta, prop::Prop},
24        GidRef, Layer, Multiple, MAX_LAYER,
25    },
26    input::input_node::InputNode,
27    storage::{arc_str::ArcStr, dict_mapper::MaybeNew},
28    Direction,
29};
30use rustc_hash::FxHasher;
31use serde::{Deserialize, Serialize};
32use std::{fmt::Debug, hash::BuildHasherDefault, sync::atomic::AtomicUsize};
33use thiserror::Error;
34
35pub(crate) type FxDashSet<K> = DashSet<K, BuildHasherDefault<FxHasher>>;
36
37#[derive(Serialize, Deserialize, Debug)]
38pub struct TemporalGraph {
39    pub storage: GraphStorage,
40    // mapping between logical and physical ids
41    pub logical_to_physical: Mapping,
42    string_pool: FxDashSet<ArcStr>,
43    pub event_counter: AtomicUsize,
44    //earliest time seen in this graph
45    pub earliest_time: MinCounter,
46    //latest time seen in this graph
47    pub latest_time: MaxCounter,
48    // props meta data for nodes (mapping between strings and ids)
49    pub node_meta: Meta,
50    // props meta data for edges (mapping between strings and ids)
51    pub edge_meta: Meta,
52    // graph properties
53    pub graph_meta: GraphMeta,
54}
55
56#[derive(Error, Debug)]
57#[error("Invalid layer: {invalid_layer}. Valid layers: {valid_layers:?}")]
58pub struct InvalidLayer {
59    invalid_layer: ArcStr,
60    valid_layers: Vec<String>,
61}
62
63#[derive(Error, Debug)]
64#[error("At most {MAX_LAYER} layers are supported.")]
65pub struct TooManyLayers;
66
67impl InvalidLayer {
68    pub fn new(invalid_layer: ArcStr, valid_layers: Vec<String>) -> Self {
69        Self {
70            invalid_layer,
71            valid_layers,
72        }
73    }
74}
75
76impl std::fmt::Display for TemporalGraph {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        write!(
79            f,
80            "Graph(num_nodes={}, num_edges={})",
81            self.storage.nodes_len(),
82            self.storage.edges_len()
83        )
84    }
85}
86
87impl Default for TemporalGraph {
88    fn default() -> Self {
89        Self::new(rayon::current_num_threads())
90    }
91}
92
93impl TemporalGraph {
94    pub fn new(num_locks: usize) -> Self {
95        TemporalGraph {
96            logical_to_physical: Mapping::new(),
97            string_pool: Default::default(),
98            storage: GraphStorage::new(num_locks),
99            event_counter: AtomicUsize::new(0),
100            earliest_time: MinCounter::new(),
101            latest_time: MaxCounter::new(),
102            node_meta: Meta::new(),
103            edge_meta: Meta::new(),
104            graph_meta: GraphMeta::new(),
105        }
106    }
107
108    pub fn process_prop_value(&self, prop: &Prop) -> Prop {
109        match prop {
110            Prop::Str(value) => Prop::Str(self.resolve_str(value)),
111            _ => prop.clone(),
112        }
113    }
114
115    fn get_valid_layers(edge_meta: &Meta) -> Vec<String> {
116        edge_meta
117            .layer_meta()
118            .get_keys()
119            .iter()
120            .map(|x| x.to_string())
121            .collect::<Vec<_>>()
122    }
123
124    pub fn num_layers(&self) -> usize {
125        self.edge_meta.layer_meta().len()
126    }
127
128    pub fn resolve_node_inner(&self, id: NodeRef) -> Result<MaybeNew<VID>, InvalidNodeId> {
129        match id {
130            NodeRef::External(id) => self.logical_to_physical.get_or_init_node(id, || {
131                let node_store = NodeStore::empty(id.into());
132                self.storage.push_node(node_store)
133            }),
134            NodeRef::Internal(id) => Ok(MaybeNew::Existing(id)),
135        }
136    }
137
138    /// map layer name to id and allocate a new layer if needed
139    pub fn resolve_layer_inner(
140        &self,
141        layer: Option<&str>,
142    ) -> Result<MaybeNew<usize>, TooManyLayers> {
143        let id = self.edge_meta.get_or_create_layer_id(layer);
144        if let MaybeNew::New(id) = id {
145            if id > MAX_LAYER {
146                Err(TooManyLayers)?;
147            }
148        }
149        Ok(id)
150    }
151
152    pub fn layer_ids(&self, key: Layer) -> Result<LayerIds, InvalidLayer> {
153        match key {
154            Layer::None => Ok(LayerIds::None),
155            Layer::All => Ok(LayerIds::All),
156            Layer::Default => Ok(LayerIds::One(0)),
157            Layer::One(id) => match self.edge_meta.get_layer_id(&id) {
158                Some(id) => Ok(LayerIds::One(id)),
159                None => Err(InvalidLayer::new(
160                    id,
161                    Self::get_valid_layers(&self.edge_meta),
162                )),
163            },
164            Layer::Multiple(ids) => {
165                let mut new_layers = ids
166                    .iter()
167                    .map(|id| {
168                        self.edge_meta.get_layer_id(id).ok_or_else(|| {
169                            InvalidLayer::new(id.clone(), Self::get_valid_layers(&self.edge_meta))
170                        })
171                    })
172                    .collect::<Result<Vec<_>, InvalidLayer>>()?;
173                let num_layers = self.num_layers();
174                let num_new_layers = new_layers.len();
175                if num_new_layers == 0 {
176                    Ok(LayerIds::None)
177                } else if num_new_layers == 1 {
178                    Ok(LayerIds::One(new_layers[0]))
179                } else if num_new_layers == num_layers {
180                    Ok(LayerIds::All)
181                } else {
182                    new_layers.sort_unstable();
183                    new_layers.dedup();
184                    Ok(LayerIds::Multiple(new_layers.into()))
185                }
186            }
187        }
188    }
189
190    pub fn valid_layer_ids(&self, key: Layer) -> LayerIds {
191        match key {
192            Layer::None => LayerIds::None,
193            Layer::All => LayerIds::All,
194            Layer::Default => LayerIds::One(0),
195            Layer::One(id) => match self.edge_meta.get_layer_id(&id) {
196                Some(id) => LayerIds::One(id),
197                None => LayerIds::None,
198            },
199            Layer::Multiple(ids) => {
200                let new_layers: Multiple = ids
201                    .iter()
202                    .flat_map(|id| self.edge_meta.get_layer_id(id))
203                    .collect();
204                let num_layers = self.num_layers();
205                let num_new_layers = new_layers.len();
206                if num_new_layers == 0 {
207                    LayerIds::None
208                } else if num_new_layers == 1 {
209                    LayerIds::One(new_layers.get_id_by_index(0).unwrap())
210                } else if num_new_layers == num_layers {
211                    LayerIds::All
212                } else {
213                    LayerIds::Multiple(new_layers)
214                }
215            }
216        }
217    }
218
219    pub fn get_layer_name(&self, layer: usize) -> ArcStr {
220        self.edge_meta.get_layer_name_by_id(layer)
221    }
222
223    #[inline]
224    pub fn graph_earliest_time(&self) -> Option<i64> {
225        Some(self.earliest_time.get()).filter(|t| *t != i64::MAX)
226    }
227
228    #[inline]
229    pub fn graph_latest_time(&self) -> Option<i64> {
230        Some(self.latest_time.get()).filter(|t| *t != i64::MIN)
231    }
232
233    #[inline]
234    pub fn internal_num_nodes(&self) -> usize {
235        self.storage.nodes.len()
236    }
237
238    #[inline]
239    pub fn update_time(&self, time: EventTime) {
240        let t = time.t();
241        self.earliest_time.update(t);
242        self.latest_time.update(t);
243    }
244
245    pub(crate) fn link_nodes_inner(
246        &self,
247        node_pair: &mut PairEntryMut,
248        edge_id: EID,
249        t: EventTime,
250        layer: usize,
251        is_deletion: bool,
252    ) {
253        self.update_time(t);
254        let src_id = node_pair.get_i().vid;
255        let dst_id = node_pair.get_j().vid;
256        let src = node_pair.get_mut_i();
257        let elid = if is_deletion {
258            edge_id.with_layer_deletion(layer)
259        } else {
260            edge_id.with_layer(layer)
261        };
262        src.add_edge(dst_id, Direction::OUT, layer, edge_id);
263        src.update_time(t, elid);
264        let dst = node_pair.get_mut_j();
265        dst.add_edge(src_id, Direction::IN, layer, edge_id);
266        dst.update_time(t, elid);
267    }
268
269    pub fn link_edge(
270        &self,
271        eid: EID,
272        t: EventTime,
273        layer: usize,
274        is_deletion: bool,
275    ) -> EdgeWGuard<'_> {
276        let (src, dst) = {
277            let edge_r = self.storage.edges.get_edge(eid);
278            let edge_r = edge_r.as_mem_edge().edge_store();
279            (edge_r.src, edge_r.dst)
280        };
281        // need to get the node pair first to avoid deadlocks with link_nodes
282        let mut node_pair = self.storage.pair_node_mut(src, dst);
283        self.link_nodes_inner(&mut node_pair, eid, t, layer, is_deletion);
284        self.storage.edges.get_edge_mut(eid)
285    }
286
287    pub fn link_nodes(
288        &self,
289        src_id: VID,
290        dst_id: VID,
291        t: EventTime,
292        layer: usize,
293        is_deletion: bool,
294    ) -> MaybeNew<EdgeWGuard<'_>> {
295        let edge = {
296            let mut node_pair = self.storage.pair_node_mut(src_id, dst_id);
297            let src = node_pair.get_i();
298            let mut edge = match src.find_edge_eid(dst_id, &LayerIds::All) {
299                Some(edge_id) => Either::Left(self.storage.get_edge_mut(edge_id)),
300                None => Either::Right(self.storage.push_edge(EdgeStore::new(src_id, dst_id))),
301            };
302            let eid = match edge.as_mut() {
303                Either::Left(edge) => edge.as_ref().eid(),
304                Either::Right(edge) => edge.value().eid,
305            };
306            self.link_nodes_inner(&mut node_pair, eid, t, layer, is_deletion);
307            edge
308        };
309
310        match edge {
311            Either::Left(edge) => MaybeNew::Existing(edge),
312            Either::Right(edge) => {
313                let edge = edge.init();
314                MaybeNew::New(edge)
315            }
316        }
317    }
318
319    #[inline]
320    pub fn resolve_node_ref(&self, v: NodeRef) -> Option<VID> {
321        match v {
322            NodeRef::Internal(vid) => Some(vid),
323            NodeRef::External(GidRef::U64(gid)) => self.logical_to_physical.get_u64(gid),
324            NodeRef::External(GidRef::Str(string)) => self
325                .logical_to_physical
326                .get_str(string)
327                .or_else(|| self.logical_to_physical.get_u64(string.id())),
328        }
329    }
330
331    /// Checks if the same string value already exists and returns a pointer to the same existing value if it exists,
332    /// otherwise adds the string to the pool.
333    fn resolve_str(&self, value: &ArcStr) -> ArcStr {
334        match self.string_pool.get(value) {
335            Some(value) => value.clone(),
336            None => {
337                self.string_pool.insert(value.clone());
338                self.string_pool
339                    .get(value)
340                    .expect("value should exist as inserted above")
341                    .clone()
342            }
343        }
344    }
345
346    pub fn node(&self, id: VID) -> NodeEntry<'_> {
347        self.storage.get_node(id)
348    }
349}