Skip to main content

raphtory_core/storage/
mod.rs

1use crate::{
2    entities::{
3        nodes::node_store::NodeStore,
4        properties::{props::TPropError, tprop::IllegalPropType},
5    },
6    loop_lock_write,
7    storage::lazy_vec::IllegalSet,
8};
9use bigdecimal::BigDecimal;
10use itertools::Itertools;
11use lazy_vec::LazyVec;
12use lock_api;
13use node_entry::NodePtr;
14use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
15#[cfg(feature = "arrow")]
16use raphtory_api::core::entities::properties::prop::PropArray;
17use raphtory_api::core::{
18    entities::{
19        properties::prop::{Prop, PropType},
20        GidRef, VID,
21    },
22    storage::arc_str::ArcStr,
23};
24use rayon::prelude::*;
25use rustc_hash::FxHashMap;
26use serde::{Deserialize, Serialize};
27use std::{
28    collections::HashMap,
29    fmt::{Debug, Formatter},
30    marker::PhantomData,
31    ops::{Deref, DerefMut, Index, IndexMut},
32    sync::{
33        atomic::{AtomicUsize, Ordering},
34        Arc,
35    },
36};
37use thiserror::Error;
38
39pub mod lazy_vec;
40pub mod locked_view;
41pub mod node_entry;
42pub mod raw_edges;
43pub mod timeindex;
44
45type ArcRwLockReadGuard<T> = lock_api::ArcRwLockReadGuard<parking_lot::RawRwLock, T>;
46#[must_use]
47pub struct UninitialisedEntry<'a, T, TS> {
48    offset: usize,
49    guard: RwLockWriteGuard<'a, TS>,
50    value: T,
51}
52
53impl<'a, T: Default, TS: DerefMut<Target = Vec<T>>> UninitialisedEntry<'a, T, TS> {
54    pub fn init(mut self) {
55        if self.offset >= self.guard.len() {
56            self.guard.resize_with(self.offset + 1, Default::default);
57        }
58        self.guard[self.offset] = self.value;
59    }
60    pub fn value(&self) -> &T {
61        &self.value
62    }
63}
64
65#[inline]
66fn resolve(index: usize, num_buckets: usize) -> (usize, usize) {
67    let bucket = index % num_buckets;
68    let offset = index / num_buckets;
69    (bucket, offset)
70}
71
72#[derive(Debug, Serialize, Deserialize, Clone)]
73pub struct NodeVec {
74    data: Arc<RwLock<NodeSlot>>,
75}
76
77#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
78pub struct NodeSlot {
79    nodes: Vec<NodeStore>,
80    t_props_log: TColumns, // not the same size as nodes
81}
82
83#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
84pub struct TColumns {
85    t_props_log: Vec<TPropColumn>,
86    num_rows: usize,
87}
88
89impl TColumns {
90    pub fn push(
91        &mut self,
92        row: impl IntoIterator<Item = (usize, Prop)>,
93    ) -> Result<Option<usize>, TPropError> {
94        let id = self.num_rows;
95        let mut has_props = false;
96
97        for (prop_id, prop) in row {
98            match self.t_props_log.get_mut(prop_id) {
99                Some(col) => col.push(prop)?,
100                None => {
101                    let col: TPropColumn = TPropColumn::new(self.num_rows, prop);
102                    self.t_props_log
103                        .resize_with(prop_id + 1, || TPropColumn::Empty(id));
104                    self.t_props_log[prop_id] = col;
105                }
106            }
107            has_props = true;
108        }
109
110        if has_props {
111            self.num_rows += 1;
112            for col in self.t_props_log.iter_mut() {
113                col.grow(self.num_rows);
114            }
115            Ok(Some(id))
116        } else {
117            Ok(None)
118        }
119    }
120
121    pub(crate) fn get(&self, prop_id: usize) -> Option<&TPropColumn> {
122        self.t_props_log.get(prop_id)
123    }
124
125    pub fn len(&self) -> usize {
126        self.num_rows
127    }
128
129    pub fn is_empty(&self) -> bool {
130        self.num_rows == 0
131    }
132
133    pub fn iter(&self) -> impl Iterator<Item = &TPropColumn> {
134        self.t_props_log.iter()
135    }
136}
137
138#[derive(Debug, Serialize, Deserialize, PartialEq)]
139pub enum TPropColumn {
140    Empty(usize),
141    Bool(LazyVec<bool>),
142    U8(LazyVec<u8>),
143    U16(LazyVec<u16>),
144    U32(LazyVec<u32>),
145    U64(LazyVec<u64>),
146    I32(LazyVec<i32>),
147    I64(LazyVec<i64>),
148    F32(LazyVec<f32>),
149    F64(LazyVec<f64>),
150    Str(LazyVec<ArcStr>),
151    #[cfg(feature = "arrow")]
152    Array(LazyVec<PropArray>),
153    List(LazyVec<Arc<Vec<Prop>>>),
154    Map(LazyVec<Arc<FxHashMap<ArcStr, Prop>>>),
155    NDTime(LazyVec<chrono::NaiveDateTime>),
156    DTime(LazyVec<chrono::DateTime<chrono::Utc>>),
157    Decimal(LazyVec<BigDecimal>),
158}
159
160#[derive(Error, Debug)]
161pub enum TPropColumnError {
162    #[error(transparent)]
163    IllegalSetBool(#[from] IllegalSet<bool>),
164    #[error(transparent)]
165    IllegalSetU8(#[from] IllegalSet<u8>),
166    #[error(transparent)]
167    IllegalSetU16(#[from] IllegalSet<u16>),
168    #[error(transparent)]
169    IllegalSetU32(#[from] IllegalSet<u32>),
170    #[error(transparent)]
171    IllegalSetU64(#[from] IllegalSet<u64>),
172    #[error(transparent)]
173    IllegalSetI32(#[from] IllegalSet<i32>),
174    #[error(transparent)]
175    IllegalSetI64(#[from] IllegalSet<i64>),
176    #[error(transparent)]
177    IllegalSetF32(#[from] IllegalSet<f32>),
178    #[error(transparent)]
179    IllegalSetF64(#[from] IllegalSet<f64>),
180    #[error(transparent)]
181    IllegalSetStr(#[from] IllegalSet<ArcStr>),
182    #[cfg(feature = "arrow")]
183    #[error(transparent)]
184    IllegalSetArray(#[from] IllegalSet<PropArray>),
185    #[error(transparent)]
186    IllegalSetList(#[from] IllegalSet<Arc<Vec<Prop>>>),
187    #[error(transparent)]
188    IllegalSetMap(#[from] IllegalSet<Arc<FxHashMap<ArcStr, Prop>>>),
189    #[error(transparent)]
190    IllegalSetNDTime(#[from] IllegalSet<chrono::NaiveDateTime>),
191    #[error(transparent)]
192    IllegalSetDTime(#[from] IllegalSet<chrono::DateTime<chrono::Utc>>),
193    #[error(transparent)]
194    Decimal(#[from] IllegalSet<BigDecimal>),
195    #[error(transparent)]
196    IllegalPropType(#[from] IllegalPropType),
197}
198
199impl Default for TPropColumn {
200    fn default() -> Self {
201        TPropColumn::Empty(0)
202    }
203}
204
205impl TPropColumn {
206    pub(crate) fn new(idx: usize, prop: Prop) -> Self {
207        let mut col = TPropColumn::default();
208        col.set(idx, prop).unwrap();
209        col
210    }
211
212    pub(crate) fn dtype(&self) -> PropType {
213        match self {
214            TPropColumn::Empty(_) => PropType::Empty,
215            TPropColumn::Bool(_) => PropType::Bool,
216            TPropColumn::U8(_) => PropType::U8,
217            TPropColumn::U16(_) => PropType::U16,
218            TPropColumn::U32(_) => PropType::U32,
219            TPropColumn::U64(_) => PropType::U64,
220            TPropColumn::I32(_) => PropType::I32,
221            TPropColumn::I64(_) => PropType::I64,
222            TPropColumn::F32(_) => PropType::F32,
223            TPropColumn::F64(_) => PropType::F64,
224            TPropColumn::Str(_) => PropType::Str,
225            #[cfg(feature = "arrow")]
226            TPropColumn::Array(_) => PropType::Array(Box::new(PropType::Empty)),
227            TPropColumn::List(_) => PropType::List(Box::new(PropType::Empty)),
228            TPropColumn::Map(_) => PropType::Map(HashMap::new().into()),
229            TPropColumn::NDTime(_) => PropType::NDTime,
230            TPropColumn::DTime(_) => PropType::DTime,
231            TPropColumn::Decimal(_) => PropType::Decimal { scale: 0 },
232        }
233    }
234
235    pub(crate) fn grow(&mut self, new_len: usize) {
236        while self.len() < new_len {
237            self.push_null();
238        }
239    }
240
241    pub(crate) fn set(&mut self, index: usize, prop: Prop) -> Result<(), TPropColumnError> {
242        self.init_empty_col(&prop);
243        match (self, prop) {
244            (TPropColumn::Bool(col), Prop::Bool(v)) => col.set(index, v)?,
245            (TPropColumn::I64(col), Prop::I64(v)) => col.set(index, v)?,
246            (TPropColumn::U32(col), Prop::U32(v)) => col.set(index, v)?,
247            (TPropColumn::U64(col), Prop::U64(v)) => col.set(index, v)?,
248            (TPropColumn::F32(col), Prop::F32(v)) => col.set(index, v)?,
249            (TPropColumn::F64(col), Prop::F64(v)) => col.set(index, v)?,
250            (TPropColumn::Str(col), Prop::Str(v)) => col.set(index, v)?,
251            #[cfg(feature = "arrow")]
252            (TPropColumn::Array(col), Prop::Array(v)) => col.set(index, v)?,
253            (TPropColumn::U8(col), Prop::U8(v)) => col.set(index, v)?,
254            (TPropColumn::U16(col), Prop::U16(v)) => col.set(index, v)?,
255            (TPropColumn::I32(col), Prop::I32(v)) => col.set(index, v)?,
256            (TPropColumn::List(col), Prop::List(v)) => col.set(index, v)?,
257            (TPropColumn::Map(col), Prop::Map(v)) => col.set(index, v)?,
258            (TPropColumn::NDTime(col), Prop::NDTime(v)) => col.set(index, v)?,
259            (TPropColumn::DTime(col), Prop::DTime(v)) => col.set(index, v)?,
260            (TPropColumn::Decimal(col), Prop::Decimal(v)) => col.set(index, v)?,
261            (col, prop) => {
262                Err(IllegalPropType {
263                    expected: col.dtype(),
264                    actual: prop.dtype(),
265                })?;
266            }
267        }
268        Ok(())
269    }
270
271    pub(crate) fn push(&mut self, prop: Prop) -> Result<(), IllegalPropType> {
272        self.init_empty_col(&prop);
273        match (self, prop) {
274            (TPropColumn::Bool(col), Prop::Bool(v)) => col.push(Some(v)),
275            (TPropColumn::U8(col), Prop::U8(v)) => col.push(Some(v)),
276            (TPropColumn::I64(col), Prop::I64(v)) => col.push(Some(v)),
277            (TPropColumn::U32(col), Prop::U32(v)) => col.push(Some(v)),
278            (TPropColumn::U64(col), Prop::U64(v)) => col.push(Some(v)),
279            (TPropColumn::F32(col), Prop::F32(v)) => col.push(Some(v)),
280            (TPropColumn::F64(col), Prop::F64(v)) => col.push(Some(v)),
281            (TPropColumn::Str(col), Prop::Str(v)) => col.push(Some(v)),
282            #[cfg(feature = "arrow")]
283            (TPropColumn::Array(col), Prop::Array(v)) => col.push(Some(v)),
284            (TPropColumn::U16(col), Prop::U16(v)) => col.push(Some(v)),
285            (TPropColumn::I32(col), Prop::I32(v)) => col.push(Some(v)),
286            (TPropColumn::List(col), Prop::List(v)) => col.push(Some(v)),
287            (TPropColumn::Map(col), Prop::Map(v)) => col.push(Some(v)),
288            (TPropColumn::NDTime(col), Prop::NDTime(v)) => col.push(Some(v)),
289            (TPropColumn::DTime(col), Prop::DTime(v)) => col.push(Some(v)),
290            (TPropColumn::Decimal(col), Prop::Decimal(v)) => col.push(Some(v)),
291            (col, prop) => {
292                return Err(IllegalPropType {
293                    expected: col.dtype(),
294                    actual: prop.dtype(),
295                })
296            }
297        }
298        Ok(())
299    }
300
301    fn init_empty_col(&mut self, prop: &Prop) {
302        if let TPropColumn::Empty(len) = self {
303            match prop {
304                Prop::Bool(_) => *self = TPropColumn::Bool(LazyVec::with_len(*len)),
305                Prop::I64(_) => *self = TPropColumn::I64(LazyVec::with_len(*len)),
306                Prop::U32(_) => *self = TPropColumn::U32(LazyVec::with_len(*len)),
307                Prop::U64(_) => *self = TPropColumn::U64(LazyVec::with_len(*len)),
308                Prop::F32(_) => *self = TPropColumn::F32(LazyVec::with_len(*len)),
309                Prop::F64(_) => *self = TPropColumn::F64(LazyVec::with_len(*len)),
310                Prop::Str(_) => *self = TPropColumn::Str(LazyVec::with_len(*len)),
311                #[cfg(feature = "arrow")]
312                Prop::Array(_) => *self = TPropColumn::Array(LazyVec::with_len(*len)),
313                Prop::U8(_) => *self = TPropColumn::U8(LazyVec::with_len(*len)),
314                Prop::U16(_) => *self = TPropColumn::U16(LazyVec::with_len(*len)),
315                Prop::I32(_) => *self = TPropColumn::I32(LazyVec::with_len(*len)),
316                Prop::List(_) => *self = TPropColumn::List(LazyVec::with_len(*len)),
317                Prop::Map(_) => *self = TPropColumn::Map(LazyVec::with_len(*len)),
318                Prop::NDTime(_) => *self = TPropColumn::NDTime(LazyVec::with_len(*len)),
319                Prop::DTime(_) => *self = TPropColumn::DTime(LazyVec::with_len(*len)),
320                Prop::Decimal(_) => *self = TPropColumn::Decimal(LazyVec::with_len(*len)),
321            }
322        }
323    }
324
325    fn is_empty(&self) -> bool {
326        matches!(self, TPropColumn::Empty(_))
327    }
328
329    pub(crate) fn push_null(&mut self) {
330        match self {
331            TPropColumn::Bool(col) => col.push(None),
332            TPropColumn::I64(col) => col.push(None),
333            TPropColumn::U32(col) => col.push(None),
334            TPropColumn::U64(col) => col.push(None),
335            TPropColumn::F32(col) => col.push(None),
336            TPropColumn::F64(col) => col.push(None),
337            TPropColumn::Str(col) => col.push(None),
338            #[cfg(feature = "arrow")]
339            TPropColumn::Array(col) => col.push(None),
340            TPropColumn::U8(col) => col.push(None),
341            TPropColumn::U16(col) => col.push(None),
342            TPropColumn::I32(col) => col.push(None),
343            TPropColumn::List(col) => col.push(None),
344            TPropColumn::Map(col) => col.push(None),
345            TPropColumn::NDTime(col) => col.push(None),
346            TPropColumn::DTime(col) => col.push(None),
347            TPropColumn::Decimal(col) => col.push(None),
348            TPropColumn::Empty(count) => {
349                *count += 1;
350            }
351        }
352    }
353
354    pub fn get(&self, index: usize) -> Option<Prop> {
355        match self {
356            TPropColumn::Bool(col) => col.get_opt(index).map(|prop| (*prop).into()),
357            TPropColumn::I64(col) => col.get_opt(index).map(|prop| (*prop).into()),
358            TPropColumn::U32(col) => col.get_opt(index).map(|prop| (*prop).into()),
359            TPropColumn::U64(col) => col.get_opt(index).map(|prop| (*prop).into()),
360            TPropColumn::F32(col) => col.get_opt(index).map(|prop| (*prop).into()),
361            TPropColumn::F64(col) => col.get_opt(index).map(|prop| (*prop).into()),
362            TPropColumn::Str(col) => col.get_opt(index).map(|prop| prop.into()),
363            #[cfg(feature = "arrow")]
364            TPropColumn::Array(col) => col.get_opt(index).map(|prop| Prop::Array(prop.clone())),
365            TPropColumn::U8(col) => col.get_opt(index).map(|prop| (*prop).into()),
366            TPropColumn::U16(col) => col.get_opt(index).map(|prop| (*prop).into()),
367            TPropColumn::I32(col) => col.get_opt(index).map(|prop| (*prop).into()),
368            TPropColumn::List(col) => col.get_opt(index).map(|prop| Prop::List(prop.clone())),
369            TPropColumn::Map(col) => col.get_opt(index).map(|prop| Prop::Map(prop.clone())),
370            TPropColumn::NDTime(col) => col.get_opt(index).map(|prop| Prop::NDTime(*prop)),
371            TPropColumn::DTime(col) => col.get_opt(index).map(|prop| Prop::DTime(*prop)),
372            TPropColumn::Decimal(col) => col.get_opt(index).map(|prop| Prop::Decimal(prop.clone())),
373            TPropColumn::Empty(_) => None,
374        }
375    }
376
377    pub(crate) fn len(&self) -> usize {
378        match self {
379            TPropColumn::Bool(col) => col.len(),
380            TPropColumn::I64(col) => col.len(),
381            TPropColumn::U32(col) => col.len(),
382            TPropColumn::U64(col) => col.len(),
383            TPropColumn::F32(col) => col.len(),
384            TPropColumn::F64(col) => col.len(),
385            TPropColumn::Str(col) => col.len(),
386            #[cfg(feature = "arrow")]
387            TPropColumn::Array(col) => col.len(),
388            TPropColumn::U8(col) => col.len(),
389            TPropColumn::U16(col) => col.len(),
390            TPropColumn::I32(col) => col.len(),
391            TPropColumn::List(col) => col.len(),
392            TPropColumn::Map(col) => col.len(),
393            TPropColumn::NDTime(col) => col.len(),
394            TPropColumn::DTime(col) => col.len(),
395            TPropColumn::Decimal(col) => col.len(),
396            TPropColumn::Empty(count) => *count,
397        }
398    }
399}
400
401impl NodeSlot {
402    pub fn t_props_log(&self) -> &TColumns {
403        &self.t_props_log
404    }
405
406    pub fn t_props_log_mut(&mut self) -> &mut TColumns {
407        &mut self.t_props_log
408    }
409
410    pub fn iter(&self) -> impl Iterator<Item = NodePtr<'_>> {
411        self.nodes
412            .iter()
413            .filter(|v| v.is_initialised())
414            .map(|ns| NodePtr::new(ns, &self.t_props_log))
415    }
416
417    pub fn par_iter(&self) -> impl ParallelIterator<Item = NodePtr<'_>> {
418        self.nodes
419            .par_iter()
420            .filter(|v| v.is_initialised())
421            .map(|ns| NodePtr::new(ns, &self.t_props_log))
422    }
423}
424
425impl Index<usize> for NodeSlot {
426    type Output = NodeStore;
427
428    fn index(&self, index: usize) -> &Self::Output {
429        &self.nodes[index]
430    }
431}
432
433impl IndexMut<usize> for NodeSlot {
434    fn index_mut(&mut self, index: usize) -> &mut Self::Output {
435        &mut self.nodes[index]
436    }
437}
438
439impl Deref for NodeSlot {
440    type Target = Vec<NodeStore>;
441
442    fn deref(&self) -> &Self::Target {
443        &self.nodes
444    }
445}
446
447impl DerefMut for NodeSlot {
448    fn deref_mut(&mut self) -> &mut Self::Target {
449        &mut self.nodes
450    }
451}
452
453impl PartialEq for NodeVec {
454    fn eq(&self, other: &Self) -> bool {
455        let a = self.data.read_recursive();
456        let b = other.data.read_recursive();
457        a.deref() == b.deref()
458    }
459}
460
461impl Default for NodeVec {
462    fn default() -> Self {
463        Self::new()
464    }
465}
466
467impl NodeVec {
468    pub fn new() -> Self {
469        Self {
470            data: Arc::new(RwLock::new(Default::default())),
471        }
472    }
473
474    #[inline]
475    pub fn read_arc_lock(&self) -> ArcRwLockReadGuard<NodeSlot> {
476        RwLock::read_arc_recursive(&self.data)
477    }
478
479    #[inline]
480    pub fn write(&self) -> impl DerefMut<Target = NodeSlot> + '_ {
481        loop_lock_write(&self.data)
482    }
483
484    #[inline]
485    pub fn read(&self) -> impl Deref<Target = NodeSlot> + '_ {
486        self.data.read_recursive()
487    }
488}
489
490#[derive(Serialize, Deserialize)]
491pub struct NodeStorage {
492    pub(crate) data: Box<[NodeVec]>,
493    len: AtomicUsize,
494}
495
496impl Debug for NodeStorage {
497    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
498        f.debug_struct("NodeStorage")
499            .field("len", &self.len())
500            .field("data", &self.read_lock().iter().collect_vec())
501            .finish()
502    }
503}
504
505impl PartialEq for NodeStorage {
506    fn eq(&self, other: &Self) -> bool {
507        self.data.eq(&other.data)
508    }
509}
510
511#[derive(Debug)]
512pub struct ReadLockedStorage {
513    pub(crate) locks: Vec<Arc<ArcRwLockReadGuard<NodeSlot>>>,
514    len: usize,
515}
516
517impl ReadLockedStorage {
518    fn resolve(&self, index: VID) -> (usize, usize) {
519        let index: usize = index.into();
520        let n = self.locks.len();
521        let bucket = index % n;
522        let offset = index / n;
523        (bucket, offset)
524    }
525
526    pub fn len(&self) -> usize {
527        self.len
528    }
529
530    pub fn is_empty(&self) -> bool {
531        self.len == 0
532    }
533
534    #[cfg(test)]
535    pub fn get(&self, index: VID) -> &NodeStore {
536        let (bucket, offset) = self.resolve(index);
537        let bucket = &self.locks[bucket];
538        &bucket[offset]
539    }
540
541    #[inline]
542    pub fn get_entry(&self, index: VID) -> NodePtr<'_> {
543        let (bucket, offset) = self.resolve(index);
544        let bucket = &self.locks[bucket];
545        NodePtr::new(&bucket[offset], &bucket.t_props_log)
546    }
547
548    #[inline]
549    pub fn try_get_entry(&self, index: VID) -> Option<NodePtr<'_>> {
550        let (bucket, offset) = self.resolve(index);
551        let bucket = self.locks.get(bucket)?;
552        let node = bucket.get(offset)?;
553        if node.is_initialised() {
554            Some(NodePtr::new(node, &bucket.t_props_log))
555        } else {
556            None
557        }
558    }
559
560    pub fn iter(&self) -> impl Iterator<Item = NodePtr<'_>> + '_ {
561        self.locks.iter().flat_map(|v| v.iter())
562    }
563
564    pub fn par_iter(&self) -> impl ParallelIterator<Item = NodePtr<'_>> + '_ {
565        self.locks.par_iter().flat_map(|v| v.par_iter())
566    }
567}
568
569impl NodeStorage {
570    pub fn count_with_filter<F: Fn(NodePtr<'_>) -> bool + Send + Sync>(&self, f: F) -> usize {
571        self.read_lock().par_iter().filter(|x| f(*x)).count()
572    }
573}
574
575impl NodeStorage {
576    #[inline]
577    fn resolve(&self, index: usize) -> (usize, usize) {
578        resolve(index, self.data.len())
579    }
580
581    #[inline]
582    pub fn read_lock(&self) -> ReadLockedStorage {
583        let guards = self
584            .data
585            .iter()
586            .map(|v| Arc::new(v.read_arc_lock()))
587            .collect();
588        ReadLockedStorage {
589            locks: guards,
590            len: self.len(),
591        }
592    }
593
594    pub fn write_lock(&self) -> WriteLockedNodes<'_> {
595        WriteLockedNodes {
596            guards: self.data.iter().map(|lock| lock.data.write()).collect(),
597            global_len: &self.len,
598        }
599    }
600
601    pub fn new(n_locks: usize) -> Self {
602        let data: Box<[NodeVec]> = (0..n_locks)
603            .map(|_| NodeVec::new())
604            .collect::<Vec<_>>()
605            .into();
606
607        Self {
608            data,
609            len: AtomicUsize::new(0),
610        }
611    }
612
613    pub fn push(&self, mut value: NodeStore) -> UninitialisedEntry<'_, NodeStore, NodeSlot> {
614        let index = self.len.fetch_add(1, Ordering::Relaxed);
615        value.vid = VID(index);
616        let (bucket, offset) = self.resolve(index);
617        let guard = loop_lock_write(&self.data[bucket].data);
618        UninitialisedEntry {
619            offset,
620            guard,
621            value,
622        }
623    }
624
625    pub fn set(&self, value: NodeStore) {
626        let VID(index) = value.vid;
627        self.len.fetch_max(index + 1, Ordering::Relaxed);
628        let (bucket, offset) = self.resolve(index);
629        let mut guard = loop_lock_write(&self.data[bucket].data);
630        if guard.len() <= offset {
631            guard.resize_with(offset + 1, NodeStore::default)
632        }
633        guard[offset] = value
634    }
635
636    #[inline]
637    pub fn entry(&self, index: VID) -> NodeEntry<'_> {
638        let index = index.into();
639        let (bucket, offset) = self.resolve(index);
640        let guard = self.data[bucket].data.read_recursive();
641        NodeEntry { offset, guard }
642    }
643
644    /// Get the node if it is initialised
645    pub fn try_entry(&self, index: VID) -> Option<NodeEntry<'_>> {
646        let (bucket, offset) = self.resolve(index.index());
647        let guard = self.data.get(bucket)?.data.read_recursive();
648        if guard.get(offset)?.is_initialised() {
649            Some(NodeEntry { offset, guard })
650        } else {
651            None
652        }
653    }
654
655    pub fn entry_mut(&self, index: VID) -> EntryMut<'_, RwLockWriteGuard<'_, NodeSlot>> {
656        let index = index.into();
657        let (bucket, offset) = self.resolve(index);
658        let guard = loop_lock_write(&self.data[bucket].data);
659        EntryMut {
660            i: offset,
661            guard,
662            _pd: PhantomData,
663        }
664    }
665
666    pub fn prop_entry_mut(&self, index: VID) -> impl DerefMut<Target = TColumns> + '_ {
667        let index = index.into();
668        let (bucket, _) = self.resolve(index);
669        let lock = loop_lock_write(&self.data[bucket].data);
670        RwLockWriteGuard::map(lock, |data| &mut data.t_props_log)
671    }
672
673    // This helps get the right locks when adding an edge
674    #[deprecated(note = "use loop_pair_entry_mut instead")]
675    pub fn pair_entry_mut(&self, i: VID, j: VID) -> PairEntryMut<'_> {
676        let i = i.into();
677        let j = j.into();
678        let (bucket_i, offset_i) = self.resolve(i);
679        let (bucket_j, offset_j) = self.resolve(j);
680        // always acquire lock for smaller bucket first to avoid deadlock between two updates for the same pair of buckets
681        if bucket_i < bucket_j {
682            let guard_i = self.data[bucket_i].data.write();
683            let guard_j = self.data[bucket_j].data.write();
684            PairEntryMut::Different {
685                i: offset_i,
686                j: offset_j,
687                guard1: guard_i,
688                guard2: guard_j,
689            }
690        } else if bucket_i > bucket_j {
691            let guard_j = self.data[bucket_j].data.write();
692            let guard_i = self.data[bucket_i].data.write();
693            PairEntryMut::Different {
694                i: offset_i,
695                j: offset_j,
696                guard1: guard_i,
697                guard2: guard_j,
698            }
699        } else {
700            PairEntryMut::Same {
701                i: offset_i,
702                j: offset_j,
703                guard: self.data[bucket_i].data.write(),
704            }
705        }
706    }
707
708    pub fn loop_pair_entry_mut(&self, i: VID, j: VID) -> PairEntryMut<'_> {
709        let i = i.into();
710        let j = j.into();
711        let (bucket_i, offset_i) = self.resolve(i);
712        let (bucket_j, offset_j) = self.resolve(j);
713        loop {
714            if bucket_i < bucket_j {
715                let guard_i = self.data[bucket_i].data.try_write();
716                let guard_j = self.data[bucket_j].data.try_write();
717                let maybe_guards =
718                    guard_i
719                        .zip(guard_j)
720                        .map(|(guard_i, guard_j)| PairEntryMut::Different {
721                            i: offset_i,
722                            j: offset_j,
723                            guard1: guard_i,
724                            guard2: guard_j,
725                        });
726                if let Some(guards) = maybe_guards {
727                    return guards;
728                }
729            } else if bucket_i > bucket_j {
730                let guard_j = self.data[bucket_j].data.try_write();
731                let guard_i = self.data[bucket_i].data.try_write();
732                let maybe_guards =
733                    guard_i
734                        .zip(guard_j)
735                        .map(|(guard_i, guard_j)| PairEntryMut::Different {
736                            i: offset_i,
737                            j: offset_j,
738                            guard1: guard_i,
739                            guard2: guard_j,
740                        });
741                if let Some(guards) = maybe_guards {
742                    return guards;
743                }
744            } else {
745                let maybe_guard = self.data[bucket_i].data.try_write();
746                if let Some(guard) = maybe_guard {
747                    return PairEntryMut::Same {
748                        i: offset_i,
749                        j: offset_j,
750                        guard,
751                    };
752                }
753            }
754        }
755    }
756
757    #[inline]
758    pub fn len(&self) -> usize {
759        self.len.load(Ordering::SeqCst)
760    }
761
762    pub fn is_empty(&self) -> bool {
763        self.len() == 0
764    }
765
766    pub fn next_id(&self) -> VID {
767        VID(self.len.fetch_add(1, Ordering::Relaxed))
768    }
769}
770
771pub struct WriteLockedNodes<'a> {
772    guards: Vec<RwLockWriteGuard<'a, NodeSlot>>,
773    global_len: &'a AtomicUsize,
774}
775
776pub struct NodeShardWriter<'a, S> {
777    shard: S,
778    shard_id: usize,
779    num_shards: usize,
780    global_len: &'a AtomicUsize,
781}
782
783impl<'a, S> NodeShardWriter<'a, S>
784where
785    S: DerefMut<Target = NodeSlot>,
786{
787    #[inline]
788    fn resolve(&self, index: VID) -> Option<usize> {
789        let (shard_id, offset) = resolve(index.into(), self.num_shards);
790        (shard_id == self.shard_id).then_some(offset)
791    }
792
793    #[inline]
794    pub fn get_mut(&mut self, index: VID) -> Option<&mut NodeStore> {
795        self.resolve(index).map(|offset| &mut self.shard[offset])
796    }
797
798    #[inline]
799    pub fn get_mut_entry(&mut self, index: VID) -> Option<EntryMut<'_, &mut S>> {
800        self.resolve(index).map(|offset| EntryMut {
801            i: offset,
802            guard: &mut self.shard,
803            _pd: PhantomData,
804        })
805    }
806
807    #[inline]
808    pub fn get(&self, index: VID) -> Option<&NodeStore> {
809        self.resolve(index).map(|offset| &self.shard[offset])
810    }
811
812    #[inline]
813    pub fn t_prop_log_mut(&mut self) -> &mut TColumns {
814        &mut self.shard.t_props_log
815    }
816
817    pub fn set(&mut self, vid: VID, gid: GidRef) -> Option<EntryMut<'_, &mut S>> {
818        self.resolve(vid).map(|offset| {
819            if offset >= self.shard.len() {
820                self.shard.resize_with(offset + 1, NodeStore::default);
821                self.global_len
822                    .fetch_max(vid.index() + 1, Ordering::Relaxed);
823            }
824            self.shard[offset] = NodeStore::resolved(gid.to_owned(), vid);
825
826            EntryMut {
827                i: offset,
828                guard: &mut self.shard,
829                _pd: PhantomData,
830            }
831        })
832    }
833
834    pub fn shard_id(&self) -> usize {
835        self.shard_id
836    }
837
838    fn resize(&mut self, new_global_len: usize) {
839        let mut new_len = new_global_len / self.num_shards;
840        if self.shard_id < new_global_len % self.num_shards {
841            new_len += 1;
842        }
843        if new_len > self.shard.len() {
844            self.shard.resize_with(new_len, Default::default);
845            self.global_len.fetch_max(new_global_len, Ordering::Relaxed);
846        }
847    }
848}
849
850impl<'a> WriteLockedNodes<'a> {
851    pub fn par_iter_mut(
852        &mut self,
853    ) -> impl IndexedParallelIterator<Item = NodeShardWriter<'_, &mut NodeSlot>> + '_ {
854        let num_shards = self.guards.len();
855        let global_len = self.global_len;
856        let shards: Vec<&mut NodeSlot> = self
857            .guards
858            .iter_mut()
859            .map(|guard| guard.deref_mut())
860            .collect();
861        shards
862            .into_par_iter()
863            .enumerate()
864            .map(move |(shard_id, shard)| NodeShardWriter {
865                shard,
866                shard_id,
867                num_shards,
868                global_len,
869            })
870    }
871
872    pub fn into_par_iter_mut(
873        self,
874    ) -> impl IndexedParallelIterator<Item = NodeShardWriter<'a, RwLockWriteGuard<'a, NodeSlot>>> + 'a
875    {
876        let num_shards = self.guards.len();
877        let global_len = self.global_len;
878        self.guards
879            .into_par_iter()
880            .enumerate()
881            .map(move |(shard_id, shard)| NodeShardWriter {
882                shard,
883                shard_id,
884                num_shards,
885                global_len,
886            })
887    }
888
889    pub fn resize(&mut self, new_len: usize) {
890        self.par_iter_mut()
891            .for_each(|mut shard| shard.resize(new_len))
892    }
893
894    pub fn num_shards(&self) -> usize {
895        self.guards.len()
896    }
897}
898
899#[derive(Debug)]
900pub struct NodeEntry<'a> {
901    offset: usize,
902    guard: RwLockReadGuard<'a, NodeSlot>,
903}
904
905impl NodeEntry<'_> {
906    #[inline]
907    pub fn as_ref(&self) -> NodePtr<'_> {
908        NodePtr::new(&self.guard[self.offset], &self.guard.t_props_log)
909    }
910}
911
912pub enum PairEntryMut<'a> {
913    Same {
914        i: usize,
915        j: usize,
916        guard: parking_lot::RwLockWriteGuard<'a, NodeSlot>,
917    },
918    Different {
919        i: usize,
920        j: usize,
921        guard1: parking_lot::RwLockWriteGuard<'a, NodeSlot>,
922        guard2: parking_lot::RwLockWriteGuard<'a, NodeSlot>,
923    },
924}
925
926impl<'a> PairEntryMut<'a> {
927    pub(crate) fn get_i(&self) -> &NodeStore {
928        match self {
929            PairEntryMut::Same { i, guard, .. } => &guard[*i],
930            PairEntryMut::Different { i, guard1, .. } => &guard1[*i],
931        }
932    }
933    pub(crate) fn get_mut_i(&mut self) -> &mut NodeStore {
934        match self {
935            PairEntryMut::Same { i, guard, .. } => &mut guard[*i],
936            PairEntryMut::Different { i, guard1, .. } => &mut guard1[*i],
937        }
938    }
939
940    pub(crate) fn get_j(&self) -> &NodeStore {
941        match self {
942            PairEntryMut::Same { j, guard, .. } => &guard[*j],
943            PairEntryMut::Different { j, guard2, .. } => &guard2[*j],
944        }
945    }
946
947    pub(crate) fn get_mut_j(&mut self) -> &mut NodeStore {
948        match self {
949            PairEntryMut::Same { j, guard, .. } => &mut guard[*j],
950            PairEntryMut::Different { j, guard2, .. } => &mut guard2[*j],
951        }
952    }
953}
954
955pub struct EntryMut<'a, NS: 'a> {
956    i: usize,
957    guard: NS,
958    _pd: PhantomData<&'a ()>,
959}
960
961impl<'a, NS> EntryMut<'a, NS> {
962    pub fn to_mut(&mut self) -> EntryMut<'a, &mut NS> {
963        EntryMut {
964            i: self.i,
965            guard: &mut self.guard,
966            _pd: self._pd,
967        }
968    }
969}
970
971impl<'a, NS: DerefMut<Target = NodeSlot>> AsMut<NodeStore> for EntryMut<'a, NS> {
972    fn as_mut(&mut self) -> &mut NodeStore {
973        let slots = self.guard.deref_mut();
974        &mut slots[self.i]
975    }
976}
977
978impl<'a, NS: DerefMut<Target = NodeSlot> + 'a> EntryMut<'a, &'a mut NS> {
979    pub fn node_store_mut(&mut self) -> &mut NodeStore {
980        &mut self.guard[self.i]
981    }
982
983    pub fn t_props_log_mut(&mut self) -> &mut TColumns {
984        &mut self.guard.t_props_log
985    }
986}
987
988#[cfg(test)]
989mod test {
990    use super::{NodeStorage, TColumns};
991    use crate::entities::nodes::node_store::NodeStore;
992    use proptest::{arbitrary::any, prop_assert_eq, proptest};
993    use raphtory_api::core::entities::{properties::prop::Prop, GID, VID};
994    use rayon::prelude::*;
995    use std::borrow::Cow;
996
997    #[test]
998    fn tcolumns_append_1() {
999        let mut t_cols = TColumns::default();
1000
1001        t_cols.push([(1, Prop::U64(1))]).unwrap();
1002
1003        let col0 = t_cols.get(0).unwrap();
1004        let col1 = t_cols.get(1).unwrap();
1005
1006        assert_eq!(col0.len(), 1);
1007        assert_eq!(col1.len(), 1);
1008    }
1009
1010    #[test]
1011    fn tcolumns_append_3_rows() {
1012        let mut t_cols = TColumns::default();
1013
1014        t_cols
1015            .push([(1, Prop::U64(1)), (0, Prop::Str("a".into()))])
1016            .unwrap();
1017        t_cols
1018            .push([(0, Prop::Str("c".into())), (2, Prop::I64(9))])
1019            .unwrap();
1020        t_cols
1021            .push([(1, Prop::U64(1)), (3, Prop::Str("c".into()))])
1022            .unwrap();
1023
1024        assert_eq!(t_cols.len(), 3);
1025
1026        for col_id in 0..4 {
1027            let col = t_cols.get(col_id).unwrap();
1028            assert_eq!(col.len(), 3);
1029        }
1030
1031        let col0 = (0..3)
1032            .map(|row| t_cols.get(0).and_then(|col| col.get(row)))
1033            .collect::<Vec<_>>();
1034        assert_eq!(
1035            col0,
1036            vec![
1037                Some(Prop::Str("a".into())),
1038                Some(Prop::Str("c".into())),
1039                None
1040            ]
1041        );
1042
1043        let col1 = (0..3)
1044            .map(|row| t_cols.get(1).and_then(|col| col.get(row)))
1045            .collect::<Vec<_>>();
1046        assert_eq!(col1, vec![Some(Prop::U64(1)), None, Some(Prop::U64(1))]);
1047
1048        let col2 = (0..3)
1049            .map(|row| t_cols.get(2).and_then(|col| col.get(row)))
1050            .collect::<Vec<_>>();
1051        assert_eq!(col2, vec![None, Some(Prop::I64(9)), None]);
1052
1053        let col3 = (0..3)
1054            .map(|row| t_cols.get(3).and_then(|col| col.get(row)))
1055            .collect::<Vec<_>>();
1056        assert_eq!(col3, vec![None, None, Some(Prop::Str("c".into()))]);
1057    }
1058
1059    #[test]
1060    fn tcolumns_append_2_columns_12_items() {
1061        let mut t_cols = TColumns::default();
1062
1063        for value in 0..12 {
1064            if value % 2 == 0 {
1065                t_cols
1066                    .push([
1067                        (1, Prop::U64(value)),
1068                        (0, Prop::Str(value.to_string().into())),
1069                    ])
1070                    .unwrap();
1071            } else {
1072                t_cols.push([(1, Prop::U64(value))]).unwrap();
1073            }
1074        }
1075
1076        assert_eq!(t_cols.len(), 12);
1077
1078        let col0 = (0..12)
1079            .map(|row| t_cols.get(0).and_then(|col| col.get(row)))
1080            .collect::<Vec<_>>();
1081        assert_eq!(
1082            col0,
1083            vec![
1084                Some(Prop::Str("0".into())),
1085                None,
1086                Some(Prop::Str("2".into())),
1087                None,
1088                Some(Prop::Str("4".into())),
1089                None,
1090                Some(Prop::Str("6".into())),
1091                None,
1092                Some(Prop::Str("8".into())),
1093                None,
1094                Some(Prop::Str("10".into())),
1095                None
1096            ]
1097        );
1098
1099        let col1 = (0..12)
1100            .map(|row| t_cols.get(1).and_then(|col| col.get(row)))
1101            .collect::<Vec<_>>();
1102        assert_eq!(
1103            col1,
1104            vec![
1105                Some(Prop::U64(0)),
1106                Some(Prop::U64(1)),
1107                Some(Prop::U64(2)),
1108                Some(Prop::U64(3)),
1109                Some(Prop::U64(4)),
1110                Some(Prop::U64(5)),
1111                Some(Prop::U64(6)),
1112                Some(Prop::U64(7)),
1113                Some(Prop::U64(8)),
1114                Some(Prop::U64(9)),
1115                Some(Prop::U64(10)),
1116                Some(Prop::U64(11))
1117            ]
1118        );
1119    }
1120
1121    #[test]
1122    fn add_5_values_to_storage() {
1123        let storage = NodeStorage::new(2);
1124
1125        for i in 0..5 {
1126            storage.push(NodeStore::empty(i.into())).init();
1127        }
1128
1129        assert_eq!(storage.len(), 5);
1130
1131        for i in 0..5 {
1132            let entry = storage.entry(VID(i));
1133            assert_eq!(entry.as_ref().node().vid, VID(i));
1134        }
1135
1136        let items = storage.read_lock();
1137
1138        let actual = items
1139            .iter()
1140            .map(|s| s.node().vid.index())
1141            .collect::<Vec<_>>();
1142
1143        assert_eq!(actual, vec![0, 2, 4, 1, 3]);
1144    }
1145
1146    #[test]
1147    fn test_index_correctness() {
1148        let storage = NodeStorage::new(2);
1149
1150        for i in 0..5 {
1151            storage.push(NodeStore::empty(i.into())).init();
1152        }
1153        let locked = storage.read_lock();
1154        let actual: Vec<_> = (0..5)
1155            .map(|i| (i, locked.get(VID(i)).global_id.to_str()))
1156            .collect();
1157
1158        assert_eq!(
1159            actual,
1160            vec![
1161                (0usize, Cow::Borrowed("0")),
1162                (1, "1".into()),
1163                (2, "2".into()),
1164                (3, "3".into()),
1165                (4, "4".into())
1166            ]
1167        );
1168    }
1169
1170    #[test]
1171    fn test_entry() {
1172        let storage = NodeStorage::new(2);
1173
1174        for i in 0..5 {
1175            storage.push(NodeStore::empty(i.into())).init();
1176        }
1177
1178        for i in 0..5 {
1179            let entry = storage.entry(VID(i));
1180            assert_eq!(*entry.as_ref().node().global_id.to_str(), i.to_string());
1181        }
1182    }
1183
1184    #[test]
1185    fn concurrent_push() {
1186        proptest!(|(v in any::<Vec<u64>>())| {
1187            let storage = NodeStorage::new(16);
1188            let mut expected = v
1189                .into_par_iter()
1190                .map(|v| {
1191                    storage.push(NodeStore::empty(GID::U64(v))).init();
1192                    v
1193                })
1194                .collect::<Vec<_>>();
1195
1196            let locked = storage.read_lock();
1197            let mut actual: Vec<_> = locked
1198                .iter()
1199                .map(|n| n.node().global_id.as_u64().unwrap())
1200                .collect();
1201
1202            actual.sort();
1203            expected.sort();
1204            prop_assert_eq!(actual, expected)
1205        })
1206    }
1207}