raphtory_core/entities/graph/
tgraph.rs1use super::logical_to_physical::{InvalidNodeId, Mapping};
2use crate::{
3 entities::{
4 edges::edge_store::EdgeStore,
5 graph::{
6 tgraph_storage::GraphStorage,
7 timer::{MaxCounter, MinCounter, TimeCounterTrait},
8 },
9 nodes::{node_ref::NodeRef, node_store::NodeStore},
10 properties::graph_meta::GraphMeta,
11 LayerIds, EID, VID,
12 },
13 storage::{
14 raw_edges::EdgeWGuard,
15 timeindex::{AsTime, EventTime},
16 NodeEntry, PairEntryMut,
17 },
18};
19use dashmap::DashSet;
20use either::Either;
21use raphtory_api::core::{
22 entities::{
23 properties::{meta::Meta, prop::Prop},
24 GidRef, Layer, Multiple, MAX_LAYER,
25 },
26 input::input_node::InputNode,
27 storage::{arc_str::ArcStr, dict_mapper::MaybeNew},
28 Direction,
29};
30use rustc_hash::FxHasher;
31use serde::{Deserialize, Serialize};
32use std::{fmt::Debug, hash::BuildHasherDefault, sync::atomic::AtomicUsize};
33use thiserror::Error;
34
35pub(crate) type FxDashSet<K> = DashSet<K, BuildHasherDefault<FxHasher>>;
36
37#[derive(Serialize, Deserialize, Debug)]
38pub struct TemporalGraph {
39 pub storage: GraphStorage,
40 pub logical_to_physical: Mapping,
42 string_pool: FxDashSet<ArcStr>,
43 pub event_counter: AtomicUsize,
44 pub earliest_time: MinCounter,
46 pub latest_time: MaxCounter,
48 pub node_meta: Meta,
50 pub edge_meta: Meta,
52 pub graph_meta: GraphMeta,
54}
55
56#[derive(Error, Debug)]
57#[error("Invalid layer: {invalid_layer}. Valid layers: {valid_layers:?}")]
58pub struct InvalidLayer {
59 invalid_layer: ArcStr,
60 valid_layers: Vec<String>,
61}
62
63#[derive(Error, Debug)]
64#[error("At most {MAX_LAYER} layers are supported.")]
65pub struct TooManyLayers;
66
67impl InvalidLayer {
68 pub fn new(invalid_layer: ArcStr, valid_layers: Vec<String>) -> Self {
69 Self {
70 invalid_layer,
71 valid_layers,
72 }
73 }
74}
75
76impl std::fmt::Display for TemporalGraph {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 write!(
79 f,
80 "Graph(num_nodes={}, num_edges={})",
81 self.storage.nodes_len(),
82 self.storage.edges_len()
83 )
84 }
85}
86
87impl Default for TemporalGraph {
88 fn default() -> Self {
89 Self::new(rayon::current_num_threads())
90 }
91}
92
93impl TemporalGraph {
94 pub fn new(num_locks: usize) -> Self {
95 TemporalGraph {
96 logical_to_physical: Mapping::new(),
97 string_pool: Default::default(),
98 storage: GraphStorage::new(num_locks),
99 event_counter: AtomicUsize::new(0),
100 earliest_time: MinCounter::new(),
101 latest_time: MaxCounter::new(),
102 node_meta: Meta::new(),
103 edge_meta: Meta::new(),
104 graph_meta: GraphMeta::new(),
105 }
106 }
107
108 pub fn process_prop_value(&self, prop: &Prop) -> Prop {
109 match prop {
110 Prop::Str(value) => Prop::Str(self.resolve_str(value)),
111 _ => prop.clone(),
112 }
113 }
114
115 fn get_valid_layers(edge_meta: &Meta) -> Vec<String> {
116 edge_meta
117 .layer_meta()
118 .get_keys()
119 .iter()
120 .map(|x| x.to_string())
121 .collect::<Vec<_>>()
122 }
123
124 pub fn num_layers(&self) -> usize {
125 self.edge_meta.layer_meta().len()
126 }
127
128 pub fn resolve_node_inner(&self, id: NodeRef) -> Result<MaybeNew<VID>, InvalidNodeId> {
129 match id {
130 NodeRef::External(id) => self.logical_to_physical.get_or_init_node(id, || {
131 let node_store = NodeStore::empty(id.into());
132 self.storage.push_node(node_store)
133 }),
134 NodeRef::Internal(id) => Ok(MaybeNew::Existing(id)),
135 }
136 }
137
138 pub fn resolve_layer_inner(
140 &self,
141 layer: Option<&str>,
142 ) -> Result<MaybeNew<usize>, TooManyLayers> {
143 let id = self.edge_meta.get_or_create_layer_id(layer);
144 if let MaybeNew::New(id) = id {
145 if id > MAX_LAYER {
146 Err(TooManyLayers)?;
147 }
148 }
149 Ok(id)
150 }
151
152 pub fn layer_ids(&self, key: Layer) -> Result<LayerIds, InvalidLayer> {
153 match key {
154 Layer::None => Ok(LayerIds::None),
155 Layer::All => Ok(LayerIds::All),
156 Layer::Default => Ok(LayerIds::One(0)),
157 Layer::One(id) => match self.edge_meta.get_layer_id(&id) {
158 Some(id) => Ok(LayerIds::One(id)),
159 None => Err(InvalidLayer::new(
160 id,
161 Self::get_valid_layers(&self.edge_meta),
162 )),
163 },
164 Layer::Multiple(ids) => {
165 let mut new_layers = ids
166 .iter()
167 .map(|id| {
168 self.edge_meta.get_layer_id(id).ok_or_else(|| {
169 InvalidLayer::new(id.clone(), Self::get_valid_layers(&self.edge_meta))
170 })
171 })
172 .collect::<Result<Vec<_>, InvalidLayer>>()?;
173 let num_layers = self.num_layers();
174 let num_new_layers = new_layers.len();
175 if num_new_layers == 0 {
176 Ok(LayerIds::None)
177 } else if num_new_layers == 1 {
178 Ok(LayerIds::One(new_layers[0]))
179 } else if num_new_layers == num_layers {
180 Ok(LayerIds::All)
181 } else {
182 new_layers.sort_unstable();
183 new_layers.dedup();
184 Ok(LayerIds::Multiple(new_layers.into()))
185 }
186 }
187 }
188 }
189
190 pub fn valid_layer_ids(&self, key: Layer) -> LayerIds {
191 match key {
192 Layer::None => LayerIds::None,
193 Layer::All => LayerIds::All,
194 Layer::Default => LayerIds::One(0),
195 Layer::One(id) => match self.edge_meta.get_layer_id(&id) {
196 Some(id) => LayerIds::One(id),
197 None => LayerIds::None,
198 },
199 Layer::Multiple(ids) => {
200 let new_layers: Multiple = ids
201 .iter()
202 .flat_map(|id| self.edge_meta.get_layer_id(id))
203 .collect();
204 let num_layers = self.num_layers();
205 let num_new_layers = new_layers.len();
206 if num_new_layers == 0 {
207 LayerIds::None
208 } else if num_new_layers == 1 {
209 LayerIds::One(new_layers.get_id_by_index(0).unwrap())
210 } else if num_new_layers == num_layers {
211 LayerIds::All
212 } else {
213 LayerIds::Multiple(new_layers)
214 }
215 }
216 }
217 }
218
219 pub fn get_layer_name(&self, layer: usize) -> ArcStr {
220 self.edge_meta.get_layer_name_by_id(layer)
221 }
222
223 #[inline]
224 pub fn graph_earliest_time(&self) -> Option<i64> {
225 Some(self.earliest_time.get()).filter(|t| *t != i64::MAX)
226 }
227
228 #[inline]
229 pub fn graph_latest_time(&self) -> Option<i64> {
230 Some(self.latest_time.get()).filter(|t| *t != i64::MIN)
231 }
232
233 #[inline]
234 pub fn internal_num_nodes(&self) -> usize {
235 self.storage.nodes.len()
236 }
237
238 #[inline]
239 pub fn update_time(&self, time: EventTime) {
240 let t = time.t();
241 self.earliest_time.update(t);
242 self.latest_time.update(t);
243 }
244
245 pub(crate) fn link_nodes_inner(
246 &self,
247 node_pair: &mut PairEntryMut,
248 edge_id: EID,
249 t: EventTime,
250 layer: usize,
251 is_deletion: bool,
252 ) {
253 self.update_time(t);
254 let src_id = node_pair.get_i().vid;
255 let dst_id = node_pair.get_j().vid;
256 let src = node_pair.get_mut_i();
257 let elid = if is_deletion {
258 edge_id.with_layer_deletion(layer)
259 } else {
260 edge_id.with_layer(layer)
261 };
262 src.add_edge(dst_id, Direction::OUT, layer, edge_id);
263 src.update_time(t, elid);
264 let dst = node_pair.get_mut_j();
265 dst.add_edge(src_id, Direction::IN, layer, edge_id);
266 dst.update_time(t, elid);
267 }
268
269 pub fn link_edge(
270 &self,
271 eid: EID,
272 t: EventTime,
273 layer: usize,
274 is_deletion: bool,
275 ) -> EdgeWGuard<'_> {
276 let (src, dst) = {
277 let edge_r = self.storage.edges.get_edge(eid);
278 let edge_r = edge_r.as_mem_edge().edge_store();
279 (edge_r.src, edge_r.dst)
280 };
281 let mut node_pair = self.storage.pair_node_mut(src, dst);
283 self.link_nodes_inner(&mut node_pair, eid, t, layer, is_deletion);
284 self.storage.edges.get_edge_mut(eid)
285 }
286
287 pub fn link_nodes(
288 &self,
289 src_id: VID,
290 dst_id: VID,
291 t: EventTime,
292 layer: usize,
293 is_deletion: bool,
294 ) -> MaybeNew<EdgeWGuard<'_>> {
295 let edge = {
296 let mut node_pair = self.storage.pair_node_mut(src_id, dst_id);
297 let src = node_pair.get_i();
298 let mut edge = match src.find_edge_eid(dst_id, &LayerIds::All) {
299 Some(edge_id) => Either::Left(self.storage.get_edge_mut(edge_id)),
300 None => Either::Right(self.storage.push_edge(EdgeStore::new(src_id, dst_id))),
301 };
302 let eid = match edge.as_mut() {
303 Either::Left(edge) => edge.as_ref().eid(),
304 Either::Right(edge) => edge.value().eid,
305 };
306 self.link_nodes_inner(&mut node_pair, eid, t, layer, is_deletion);
307 edge
308 };
309
310 match edge {
311 Either::Left(edge) => MaybeNew::Existing(edge),
312 Either::Right(edge) => {
313 let edge = edge.init();
314 MaybeNew::New(edge)
315 }
316 }
317 }
318
319 #[inline]
320 pub fn resolve_node_ref(&self, v: NodeRef) -> Option<VID> {
321 match v {
322 NodeRef::Internal(vid) => Some(vid),
323 NodeRef::External(GidRef::U64(gid)) => self.logical_to_physical.get_u64(gid),
324 NodeRef::External(GidRef::Str(string)) => self
325 .logical_to_physical
326 .get_str(string)
327 .or_else(|| self.logical_to_physical.get_u64(string.id())),
328 }
329 }
330
331 fn resolve_str(&self, value: &ArcStr) -> ArcStr {
334 match self.string_pool.get(value) {
335 Some(value) => value.clone(),
336 None => {
337 self.string_pool.insert(value.clone());
338 self.string_pool
339 .get(value)
340 .expect("value should exist as inserted above")
341 .clone()
342 }
343 }
344 }
345
346 pub fn node(&self, id: VID) -> NodeEntry<'_> {
347 self.storage.get_node(id)
348 }
349}