raphtory_core/entities/graph/
tgraph.rs1use super::logical_to_physical::{InvalidNodeId, Mapping};
2use crate::{
3 entities::{
4 edges::edge_store::EdgeStore,
5 graph::{
6 tgraph_storage::GraphStorage,
7 timer::{MaxCounter, MinCounter, TimeCounterTrait},
8 },
9 nodes::{node_ref::NodeRef, node_store::NodeStore},
10 properties::graph_meta::GraphMeta,
11 LayerIds, EID, VID,
12 },
13 storage::{
14 raw_edges::EdgeWGuard,
15 timeindex::{AsTime, TimeIndexEntry},
16 NodeEntry, PairEntryMut,
17 },
18};
19use dashmap::DashSet;
20use either::Either;
21use raphtory_api::core::{
22 entities::{
23 properties::{meta::Meta, prop::Prop},
24 GidRef, Layer, Multiple, MAX_LAYER,
25 },
26 input::input_node::InputNode,
27 storage::{arc_str::ArcStr, dict_mapper::MaybeNew},
28 Direction,
29};
30use rustc_hash::FxHasher;
31use serde::{Deserialize, Serialize};
32use std::{fmt::Debug, hash::BuildHasherDefault, sync::atomic::AtomicUsize};
33use thiserror::Error;
34
35pub(crate) type FxDashSet<K> = DashSet<K, BuildHasherDefault<FxHasher>>;
36
37#[derive(Serialize, Deserialize, Debug)]
38pub struct TemporalGraph {
39 pub storage: GraphStorage,
40 pub logical_to_physical: Mapping,
42 string_pool: FxDashSet<ArcStr>,
43 pub event_counter: AtomicUsize,
44 pub earliest_time: MinCounter,
46 pub latest_time: MaxCounter,
48 pub node_meta: Meta,
50 pub edge_meta: Meta,
52 pub graph_meta: GraphMeta,
54}
55
56#[derive(Error, Debug)]
57#[error("Invalid layer: {invalid_layer}. Valid layers: {valid_layers}")]
58pub struct InvalidLayer {
59 invalid_layer: ArcStr,
60 valid_layers: String,
61}
62
63#[derive(Error, Debug)]
64#[error("At most {MAX_LAYER} layers are supported.")]
65pub struct TooManyLayers;
66
67impl InvalidLayer {
68 pub fn new(invalid_layer: ArcStr, valid: Vec<String>) -> Self {
69 let valid_layers = valid.join(", ");
70 Self {
71 invalid_layer,
72 valid_layers,
73 }
74 }
75}
76
77impl std::fmt::Display for TemporalGraph {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 write!(
80 f,
81 "Graph(num_nodes={}, num_edges={})",
82 self.storage.nodes_len(),
83 self.storage.edges_len()
84 )
85 }
86}
87
88impl Default for TemporalGraph {
89 fn default() -> Self {
90 Self::new(rayon::current_num_threads())
91 }
92}
93
94impl TemporalGraph {
95 pub fn new(num_locks: usize) -> Self {
96 TemporalGraph {
97 logical_to_physical: Mapping::new(),
98 string_pool: Default::default(),
99 storage: GraphStorage::new(num_locks),
100 event_counter: AtomicUsize::new(0),
101 earliest_time: MinCounter::new(),
102 latest_time: MaxCounter::new(),
103 node_meta: Meta::new(),
104 edge_meta: Meta::new(),
105 graph_meta: GraphMeta::new(),
106 }
107 }
108
109 pub fn process_prop_value(&self, prop: &Prop) -> Prop {
110 match prop {
111 Prop::Str(value) => Prop::Str(self.resolve_str(value)),
112 _ => prop.clone(),
113 }
114 }
115
116 fn get_valid_layers(edge_meta: &Meta) -> Vec<String> {
117 edge_meta
118 .layer_meta()
119 .get_keys()
120 .iter()
121 .map(|x| x.to_string())
122 .collect::<Vec<_>>()
123 }
124
125 pub fn num_layers(&self) -> usize {
126 self.edge_meta.layer_meta().len()
127 }
128
129 pub fn resolve_node_inner(&self, id: NodeRef) -> Result<MaybeNew<VID>, InvalidNodeId> {
130 match id {
131 NodeRef::External(id) => self.logical_to_physical.get_or_init_node(id, || {
132 let node_store = NodeStore::empty(id.into());
133 self.storage.push_node(node_store)
134 }),
135 NodeRef::Internal(id) => Ok(MaybeNew::Existing(id)),
136 }
137 }
138
139 pub fn resolve_layer_inner(
141 &self,
142 layer: Option<&str>,
143 ) -> Result<MaybeNew<usize>, TooManyLayers> {
144 let id = self.edge_meta.get_or_create_layer_id(layer);
145 if let MaybeNew::New(id) = id {
146 if id > MAX_LAYER {
147 Err(TooManyLayers)?;
148 }
149 }
150 Ok(id)
151 }
152
153 pub fn layer_ids(&self, key: Layer) -> Result<LayerIds, InvalidLayer> {
154 match key {
155 Layer::None => Ok(LayerIds::None),
156 Layer::All => Ok(LayerIds::All),
157 Layer::Default => Ok(LayerIds::One(0)),
158 Layer::One(id) => match self.edge_meta.get_layer_id(&id) {
159 Some(id) => Ok(LayerIds::One(id)),
160 None => Err(InvalidLayer::new(
161 id,
162 Self::get_valid_layers(&self.edge_meta),
163 )),
164 },
165 Layer::Multiple(ids) => {
166 let mut new_layers = ids
167 .iter()
168 .map(|id| {
169 self.edge_meta.get_layer_id(id).ok_or_else(|| {
170 InvalidLayer::new(id.clone(), Self::get_valid_layers(&self.edge_meta))
171 })
172 })
173 .collect::<Result<Vec<_>, InvalidLayer>>()?;
174 let num_layers = self.num_layers();
175 let num_new_layers = new_layers.len();
176 if num_new_layers == 0 {
177 Ok(LayerIds::None)
178 } else if num_new_layers == 1 {
179 Ok(LayerIds::One(new_layers[0]))
180 } else if num_new_layers == num_layers {
181 Ok(LayerIds::All)
182 } else {
183 new_layers.sort_unstable();
184 new_layers.dedup();
185 Ok(LayerIds::Multiple(new_layers.into()))
186 }
187 }
188 }
189 }
190
191 pub fn valid_layer_ids(&self, key: Layer) -> LayerIds {
192 match key {
193 Layer::None => LayerIds::None,
194 Layer::All => LayerIds::All,
195 Layer::Default => LayerIds::One(0),
196 Layer::One(id) => match self.edge_meta.get_layer_id(&id) {
197 Some(id) => LayerIds::One(id),
198 None => LayerIds::None,
199 },
200 Layer::Multiple(ids) => {
201 let new_layers: Multiple = ids
202 .iter()
203 .flat_map(|id| self.edge_meta.get_layer_id(id))
204 .collect();
205 let num_layers = self.num_layers();
206 let num_new_layers = new_layers.len();
207 if num_new_layers == 0 {
208 LayerIds::None
209 } else if num_new_layers == 1 {
210 LayerIds::One(new_layers.get_id_by_index(0).unwrap())
211 } else if num_new_layers == num_layers {
212 LayerIds::All
213 } else {
214 LayerIds::Multiple(new_layers)
215 }
216 }
217 }
218 }
219
220 pub fn get_layer_name(&self, layer: usize) -> ArcStr {
221 self.edge_meta.get_layer_name_by_id(layer)
222 }
223
224 #[inline]
225 pub fn graph_earliest_time(&self) -> Option<i64> {
226 Some(self.earliest_time.get()).filter(|t| *t != i64::MAX)
227 }
228
229 #[inline]
230 pub fn graph_latest_time(&self) -> Option<i64> {
231 Some(self.latest_time.get()).filter(|t| *t != i64::MIN)
232 }
233
234 #[inline]
235 pub fn internal_num_nodes(&self) -> usize {
236 self.storage.nodes.len()
237 }
238
239 #[inline]
240 pub fn update_time(&self, time: TimeIndexEntry) {
241 let t = time.t();
242 self.earliest_time.update(t);
243 self.latest_time.update(t);
244 }
245
246 pub(crate) fn link_nodes_inner(
247 &self,
248 node_pair: &mut PairEntryMut,
249 edge_id: EID,
250 t: TimeIndexEntry,
251 layer: usize,
252 is_deletion: bool,
253 ) {
254 self.update_time(t);
255 let src_id = node_pair.get_i().vid;
256 let dst_id = node_pair.get_j().vid;
257 let src = node_pair.get_mut_i();
258 let elid = if is_deletion {
259 edge_id.with_layer_deletion(layer)
260 } else {
261 edge_id.with_layer(layer)
262 };
263 src.add_edge(dst_id, Direction::OUT, layer, edge_id);
264 src.update_time(t, elid);
265 let dst = node_pair.get_mut_j();
266 dst.add_edge(src_id, Direction::IN, layer, edge_id);
267 dst.update_time(t, elid);
268 }
269
270 pub fn link_edge(
271 &self,
272 eid: EID,
273 t: TimeIndexEntry,
274 layer: usize,
275 is_deletion: bool,
276 ) -> EdgeWGuard<'_> {
277 let (src, dst) = {
278 let edge_r = self.storage.edges.get_edge(eid);
279 let edge_r = edge_r.as_mem_edge().edge_store();
280 (edge_r.src, edge_r.dst)
281 };
282 let mut node_pair = self.storage.pair_node_mut(src, dst);
284 self.link_nodes_inner(&mut node_pair, eid, t, layer, is_deletion);
285 self.storage.edges.get_edge_mut(eid)
286 }
287
288 pub fn link_nodes(
289 &self,
290 src_id: VID,
291 dst_id: VID,
292 t: TimeIndexEntry,
293 layer: usize,
294 is_deletion: bool,
295 ) -> MaybeNew<EdgeWGuard<'_>> {
296 let edge = {
297 let mut node_pair = self.storage.pair_node_mut(src_id, dst_id);
298 let src = node_pair.get_i();
299 let mut edge = match src.find_edge_eid(dst_id, &LayerIds::All) {
300 Some(edge_id) => Either::Left(self.storage.get_edge_mut(edge_id)),
301 None => Either::Right(self.storage.push_edge(EdgeStore::new(src_id, dst_id))),
302 };
303 let eid = match edge.as_mut() {
304 Either::Left(edge) => edge.as_ref().eid(),
305 Either::Right(edge) => edge.value().eid,
306 };
307 self.link_nodes_inner(&mut node_pair, eid, t, layer, is_deletion);
308 edge
309 };
310
311 match edge {
312 Either::Left(edge) => MaybeNew::Existing(edge),
313 Either::Right(edge) => {
314 let edge = edge.init();
315 MaybeNew::New(edge)
316 }
317 }
318 }
319
320 #[inline]
321 pub fn resolve_node_ref(&self, v: NodeRef) -> Option<VID> {
322 match v {
323 NodeRef::Internal(vid) => Some(vid),
324 NodeRef::External(GidRef::U64(gid)) => self.logical_to_physical.get_u64(gid),
325 NodeRef::External(GidRef::Str(string)) => self
326 .logical_to_physical
327 .get_str(string)
328 .or_else(|| self.logical_to_physical.get_u64(string.id())),
329 }
330 }
331
332 fn resolve_str(&self, value: &ArcStr) -> ArcStr {
335 match self.string_pool.get(value) {
336 Some(value) => value.clone(),
337 None => {
338 self.string_pool.insert(value.clone());
339 self.string_pool
340 .get(value)
341 .expect("value should exist as inserted above")
342 .clone()
343 }
344 }
345 }
346
347 pub fn node(&self, id: VID) -> NodeEntry<'_> {
348 self.storage.get_node(id)
349 }
350}