raphtory_core/storage/
mod.rs

1use crate::{
2    entities::{
3        nodes::node_store::NodeStore,
4        properties::{props::TPropError, tprop::IllegalPropType},
5    },
6    storage::lazy_vec::IllegalSet,
7};
8use bigdecimal::BigDecimal;
9use itertools::Itertools;
10use lazy_vec::LazyVec;
11use lock_api;
12use node_entry::NodePtr;
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14#[cfg(feature = "arrow")]
15use raphtory_api::core::entities::properties::prop::PropArray;
16use raphtory_api::core::{
17    entities::{
18        properties::prop::{Prop, PropType},
19        GidRef, VID,
20    },
21    storage::arc_str::ArcStr,
22};
23use rayon::prelude::*;
24use rustc_hash::FxHashMap;
25use serde::{Deserialize, Serialize};
26use std::{
27    collections::HashMap,
28    fmt::{Debug, Formatter},
29    marker::PhantomData,
30    ops::{Deref, DerefMut, Index, IndexMut},
31    sync::{
32        atomic::{AtomicUsize, Ordering},
33        Arc,
34    },
35};
36use thiserror::Error;
37
38pub mod lazy_vec;
39pub mod locked_view;
40pub mod node_entry;
41pub mod raw_edges;
42pub mod timeindex;
43
44type ArcRwLockReadGuard<T> = lock_api::ArcRwLockReadGuard<parking_lot::RawRwLock, T>;
45#[must_use]
46pub struct UninitialisedEntry<'a, T, TS> {
47    offset: usize,
48    guard: RwLockWriteGuard<'a, TS>,
49    value: T,
50}
51
52impl<'a, T: Default, TS: DerefMut<Target = Vec<T>>> UninitialisedEntry<'a, T, TS> {
53    pub fn init(mut self) {
54        if self.offset >= self.guard.len() {
55            self.guard.resize_with(self.offset + 1, Default::default);
56        }
57        self.guard[self.offset] = self.value;
58    }
59    pub fn value(&self) -> &T {
60        &self.value
61    }
62}
63
64#[inline]
65fn resolve(index: usize, num_buckets: usize) -> (usize, usize) {
66    let bucket = index % num_buckets;
67    let offset = index / num_buckets;
68    (bucket, offset)
69}
70
71#[derive(Debug, Serialize, Deserialize, Clone)]
72pub struct NodeVec {
73    data: Arc<RwLock<NodeSlot>>,
74}
75
76#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
77pub struct NodeSlot {
78    nodes: Vec<NodeStore>,
79    t_props_log: TColumns, // not the same size as nodes
80}
81
82#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
83pub struct TColumns {
84    t_props_log: Vec<TPropColumn>,
85    num_rows: usize,
86}
87
88impl TColumns {
89    pub fn push(
90        &mut self,
91        row: impl IntoIterator<Item = (usize, Prop)>,
92    ) -> Result<Option<usize>, TPropError> {
93        let id = self.num_rows;
94        let mut has_props = false;
95
96        for (prop_id, prop) in row {
97            match self.t_props_log.get_mut(prop_id) {
98                Some(col) => col.push(prop)?,
99                None => {
100                    let col: TPropColumn = TPropColumn::new(self.num_rows, prop);
101                    self.t_props_log
102                        .resize_with(prop_id + 1, || TPropColumn::Empty(id));
103                    self.t_props_log[prop_id] = col;
104                }
105            }
106            has_props = true;
107        }
108
109        if has_props {
110            self.num_rows += 1;
111            for col in self.t_props_log.iter_mut() {
112                col.grow(self.num_rows);
113            }
114            Ok(Some(id))
115        } else {
116            Ok(None)
117        }
118    }
119
120    pub(crate) fn get(&self, prop_id: usize) -> Option<&TPropColumn> {
121        self.t_props_log.get(prop_id)
122    }
123
124    pub fn len(&self) -> usize {
125        self.num_rows
126    }
127
128    pub fn is_empty(&self) -> bool {
129        self.num_rows == 0
130    }
131
132    pub fn iter(&self) -> impl Iterator<Item = &TPropColumn> {
133        self.t_props_log.iter()
134    }
135}
136
137#[derive(Debug, Serialize, Deserialize, PartialEq)]
138pub enum TPropColumn {
139    Empty(usize),
140    Bool(LazyVec<bool>),
141    U8(LazyVec<u8>),
142    U16(LazyVec<u16>),
143    U32(LazyVec<u32>),
144    U64(LazyVec<u64>),
145    I32(LazyVec<i32>),
146    I64(LazyVec<i64>),
147    F32(LazyVec<f32>),
148    F64(LazyVec<f64>),
149    Str(LazyVec<ArcStr>),
150    #[cfg(feature = "arrow")]
151    Array(LazyVec<PropArray>),
152    List(LazyVec<Arc<Vec<Prop>>>),
153    Map(LazyVec<Arc<FxHashMap<ArcStr, Prop>>>),
154    NDTime(LazyVec<chrono::NaiveDateTime>),
155    DTime(LazyVec<chrono::DateTime<chrono::Utc>>),
156    Decimal(LazyVec<BigDecimal>),
157}
158
159#[derive(Error, Debug)]
160pub enum TPropColumnError {
161    #[error(transparent)]
162    IllegalSetBool(#[from] IllegalSet<bool>),
163    #[error(transparent)]
164    IllegalSetU8(#[from] IllegalSet<u8>),
165    #[error(transparent)]
166    IllegalSetU16(#[from] IllegalSet<u16>),
167    #[error(transparent)]
168    IllegalSetU32(#[from] IllegalSet<u32>),
169    #[error(transparent)]
170    IllegalSetU64(#[from] IllegalSet<u64>),
171    #[error(transparent)]
172    IllegalSetI32(#[from] IllegalSet<i32>),
173    #[error(transparent)]
174    IllegalSetI64(#[from] IllegalSet<i64>),
175    #[error(transparent)]
176    IllegalSetF32(#[from] IllegalSet<f32>),
177    #[error(transparent)]
178    IllegalSetF64(#[from] IllegalSet<f64>),
179    #[error(transparent)]
180    IllegalSetStr(#[from] IllegalSet<ArcStr>),
181    #[cfg(feature = "arrow")]
182    #[error(transparent)]
183    IllegalSetArray(#[from] IllegalSet<PropArray>),
184    #[error(transparent)]
185    IllegalSetList(#[from] IllegalSet<Arc<Vec<Prop>>>),
186    #[error(transparent)]
187    IllegalSetMap(#[from] IllegalSet<Arc<FxHashMap<ArcStr, Prop>>>),
188    #[error(transparent)]
189    IllegalSetNDTime(#[from] IllegalSet<chrono::NaiveDateTime>),
190    #[error(transparent)]
191    IllegalSetDTime(#[from] IllegalSet<chrono::DateTime<chrono::Utc>>),
192    #[error(transparent)]
193    Decimal(#[from] IllegalSet<BigDecimal>),
194    #[error(transparent)]
195    IllegalPropType(#[from] IllegalPropType),
196}
197
198impl Default for TPropColumn {
199    fn default() -> Self {
200        TPropColumn::Empty(0)
201    }
202}
203
204impl TPropColumn {
205    pub(crate) fn new(idx: usize, prop: Prop) -> Self {
206        let mut col = TPropColumn::default();
207        col.set(idx, prop).unwrap();
208        col
209    }
210
211    pub(crate) fn dtype(&self) -> PropType {
212        match self {
213            TPropColumn::Empty(_) => PropType::Empty,
214            TPropColumn::Bool(_) => PropType::Bool,
215            TPropColumn::U8(_) => PropType::U8,
216            TPropColumn::U16(_) => PropType::U16,
217            TPropColumn::U32(_) => PropType::U32,
218            TPropColumn::U64(_) => PropType::U64,
219            TPropColumn::I32(_) => PropType::I32,
220            TPropColumn::I64(_) => PropType::I64,
221            TPropColumn::F32(_) => PropType::F32,
222            TPropColumn::F64(_) => PropType::F64,
223            TPropColumn::Str(_) => PropType::Str,
224            #[cfg(feature = "arrow")]
225            TPropColumn::Array(_) => PropType::Array(Box::new(PropType::Empty)),
226            TPropColumn::List(_) => PropType::List(Box::new(PropType::Empty)),
227            TPropColumn::Map(_) => PropType::Map(HashMap::new().into()),
228            TPropColumn::NDTime(_) => PropType::NDTime,
229            TPropColumn::DTime(_) => PropType::DTime,
230            TPropColumn::Decimal(_) => PropType::Decimal { scale: 0 },
231        }
232    }
233
234    pub(crate) fn grow(&mut self, new_len: usize) {
235        while self.len() < new_len {
236            self.push_null();
237        }
238    }
239
240    pub(crate) fn set(&mut self, index: usize, prop: Prop) -> Result<(), TPropColumnError> {
241        self.init_empty_col(&prop);
242        match (self, prop) {
243            (TPropColumn::Bool(col), Prop::Bool(v)) => col.set(index, v)?,
244            (TPropColumn::I64(col), Prop::I64(v)) => col.set(index, v)?,
245            (TPropColumn::U32(col), Prop::U32(v)) => col.set(index, v)?,
246            (TPropColumn::U64(col), Prop::U64(v)) => col.set(index, v)?,
247            (TPropColumn::F32(col), Prop::F32(v)) => col.set(index, v)?,
248            (TPropColumn::F64(col), Prop::F64(v)) => col.set(index, v)?,
249            (TPropColumn::Str(col), Prop::Str(v)) => col.set(index, v)?,
250            #[cfg(feature = "arrow")]
251            (TPropColumn::Array(col), Prop::Array(v)) => col.set(index, v)?,
252            (TPropColumn::U8(col), Prop::U8(v)) => col.set(index, v)?,
253            (TPropColumn::U16(col), Prop::U16(v)) => col.set(index, v)?,
254            (TPropColumn::I32(col), Prop::I32(v)) => col.set(index, v)?,
255            (TPropColumn::List(col), Prop::List(v)) => col.set(index, v)?,
256            (TPropColumn::Map(col), Prop::Map(v)) => col.set(index, v)?,
257            (TPropColumn::NDTime(col), Prop::NDTime(v)) => col.set(index, v)?,
258            (TPropColumn::DTime(col), Prop::DTime(v)) => col.set(index, v)?,
259            (TPropColumn::Decimal(col), Prop::Decimal(v)) => col.set(index, v)?,
260            (col, prop) => {
261                Err(IllegalPropType {
262                    expected: col.dtype(),
263                    actual: prop.dtype(),
264                })?;
265            }
266        }
267        Ok(())
268    }
269
270    pub(crate) fn push(&mut self, prop: Prop) -> Result<(), IllegalPropType> {
271        self.init_empty_col(&prop);
272        match (self, prop) {
273            (TPropColumn::Bool(col), Prop::Bool(v)) => col.push(Some(v)),
274            (TPropColumn::U8(col), Prop::U8(v)) => col.push(Some(v)),
275            (TPropColumn::I64(col), Prop::I64(v)) => col.push(Some(v)),
276            (TPropColumn::U32(col), Prop::U32(v)) => col.push(Some(v)),
277            (TPropColumn::U64(col), Prop::U64(v)) => col.push(Some(v)),
278            (TPropColumn::F32(col), Prop::F32(v)) => col.push(Some(v)),
279            (TPropColumn::F64(col), Prop::F64(v)) => col.push(Some(v)),
280            (TPropColumn::Str(col), Prop::Str(v)) => col.push(Some(v)),
281            #[cfg(feature = "arrow")]
282            (TPropColumn::Array(col), Prop::Array(v)) => col.push(Some(v)),
283            (TPropColumn::U16(col), Prop::U16(v)) => col.push(Some(v)),
284            (TPropColumn::I32(col), Prop::I32(v)) => col.push(Some(v)),
285            (TPropColumn::List(col), Prop::List(v)) => col.push(Some(v)),
286            (TPropColumn::Map(col), Prop::Map(v)) => col.push(Some(v)),
287            (TPropColumn::NDTime(col), Prop::NDTime(v)) => col.push(Some(v)),
288            (TPropColumn::DTime(col), Prop::DTime(v)) => col.push(Some(v)),
289            (TPropColumn::Decimal(col), Prop::Decimal(v)) => col.push(Some(v)),
290            (col, prop) => {
291                return Err(IllegalPropType {
292                    expected: col.dtype(),
293                    actual: prop.dtype(),
294                })
295            }
296        }
297        Ok(())
298    }
299
300    fn init_empty_col(&mut self, prop: &Prop) {
301        if let TPropColumn::Empty(len) = self {
302            match prop {
303                Prop::Bool(_) => *self = TPropColumn::Bool(LazyVec::with_len(*len)),
304                Prop::I64(_) => *self = TPropColumn::I64(LazyVec::with_len(*len)),
305                Prop::U32(_) => *self = TPropColumn::U32(LazyVec::with_len(*len)),
306                Prop::U64(_) => *self = TPropColumn::U64(LazyVec::with_len(*len)),
307                Prop::F32(_) => *self = TPropColumn::F32(LazyVec::with_len(*len)),
308                Prop::F64(_) => *self = TPropColumn::F64(LazyVec::with_len(*len)),
309                Prop::Str(_) => *self = TPropColumn::Str(LazyVec::with_len(*len)),
310                #[cfg(feature = "arrow")]
311                Prop::Array(_) => *self = TPropColumn::Array(LazyVec::with_len(*len)),
312                Prop::U8(_) => *self = TPropColumn::U8(LazyVec::with_len(*len)),
313                Prop::U16(_) => *self = TPropColumn::U16(LazyVec::with_len(*len)),
314                Prop::I32(_) => *self = TPropColumn::I32(LazyVec::with_len(*len)),
315                Prop::List(_) => *self = TPropColumn::List(LazyVec::with_len(*len)),
316                Prop::Map(_) => *self = TPropColumn::Map(LazyVec::with_len(*len)),
317                Prop::NDTime(_) => *self = TPropColumn::NDTime(LazyVec::with_len(*len)),
318                Prop::DTime(_) => *self = TPropColumn::DTime(LazyVec::with_len(*len)),
319                Prop::Decimal(_) => *self = TPropColumn::Decimal(LazyVec::with_len(*len)),
320            }
321        }
322    }
323
324    fn is_empty(&self) -> bool {
325        matches!(self, TPropColumn::Empty(_))
326    }
327
328    pub(crate) fn push_null(&mut self) {
329        match self {
330            TPropColumn::Bool(col) => col.push(None),
331            TPropColumn::I64(col) => col.push(None),
332            TPropColumn::U32(col) => col.push(None),
333            TPropColumn::U64(col) => col.push(None),
334            TPropColumn::F32(col) => col.push(None),
335            TPropColumn::F64(col) => col.push(None),
336            TPropColumn::Str(col) => col.push(None),
337            #[cfg(feature = "arrow")]
338            TPropColumn::Array(col) => col.push(None),
339            TPropColumn::U8(col) => col.push(None),
340            TPropColumn::U16(col) => col.push(None),
341            TPropColumn::I32(col) => col.push(None),
342            TPropColumn::List(col) => col.push(None),
343            TPropColumn::Map(col) => col.push(None),
344            TPropColumn::NDTime(col) => col.push(None),
345            TPropColumn::DTime(col) => col.push(None),
346            TPropColumn::Decimal(col) => col.push(None),
347            TPropColumn::Empty(count) => {
348                *count += 1;
349            }
350        }
351    }
352
353    pub fn get(&self, index: usize) -> Option<Prop> {
354        match self {
355            TPropColumn::Bool(col) => col.get_opt(index).map(|prop| (*prop).into()),
356            TPropColumn::I64(col) => col.get_opt(index).map(|prop| (*prop).into()),
357            TPropColumn::U32(col) => col.get_opt(index).map(|prop| (*prop).into()),
358            TPropColumn::U64(col) => col.get_opt(index).map(|prop| (*prop).into()),
359            TPropColumn::F32(col) => col.get_opt(index).map(|prop| (*prop).into()),
360            TPropColumn::F64(col) => col.get_opt(index).map(|prop| (*prop).into()),
361            TPropColumn::Str(col) => col.get_opt(index).map(|prop| prop.into()),
362            #[cfg(feature = "arrow")]
363            TPropColumn::Array(col) => col.get_opt(index).map(|prop| Prop::Array(prop.clone())),
364            TPropColumn::U8(col) => col.get_opt(index).map(|prop| (*prop).into()),
365            TPropColumn::U16(col) => col.get_opt(index).map(|prop| (*prop).into()),
366            TPropColumn::I32(col) => col.get_opt(index).map(|prop| (*prop).into()),
367            TPropColumn::List(col) => col.get_opt(index).map(|prop| Prop::List(prop.clone())),
368            TPropColumn::Map(col) => col.get_opt(index).map(|prop| Prop::Map(prop.clone())),
369            TPropColumn::NDTime(col) => col.get_opt(index).map(|prop| Prop::NDTime(*prop)),
370            TPropColumn::DTime(col) => col.get_opt(index).map(|prop| Prop::DTime(*prop)),
371            TPropColumn::Decimal(col) => col.get_opt(index).map(|prop| Prop::Decimal(prop.clone())),
372            TPropColumn::Empty(_) => None,
373        }
374    }
375
376    pub(crate) fn len(&self) -> usize {
377        match self {
378            TPropColumn::Bool(col) => col.len(),
379            TPropColumn::I64(col) => col.len(),
380            TPropColumn::U32(col) => col.len(),
381            TPropColumn::U64(col) => col.len(),
382            TPropColumn::F32(col) => col.len(),
383            TPropColumn::F64(col) => col.len(),
384            TPropColumn::Str(col) => col.len(),
385            #[cfg(feature = "arrow")]
386            TPropColumn::Array(col) => col.len(),
387            TPropColumn::U8(col) => col.len(),
388            TPropColumn::U16(col) => col.len(),
389            TPropColumn::I32(col) => col.len(),
390            TPropColumn::List(col) => col.len(),
391            TPropColumn::Map(col) => col.len(),
392            TPropColumn::NDTime(col) => col.len(),
393            TPropColumn::DTime(col) => col.len(),
394            TPropColumn::Decimal(col) => col.len(),
395            TPropColumn::Empty(count) => *count,
396        }
397    }
398}
399
400impl NodeSlot {
401    pub fn t_props_log(&self) -> &TColumns {
402        &self.t_props_log
403    }
404
405    pub fn t_props_log_mut(&mut self) -> &mut TColumns {
406        &mut self.t_props_log
407    }
408
409    pub fn iter(&self) -> impl Iterator<Item = NodePtr> {
410        self.nodes
411            .iter()
412            .map(|ns| NodePtr::new(ns, &self.t_props_log))
413    }
414
415    pub fn par_iter(&self) -> impl ParallelIterator<Item = NodePtr> {
416        self.nodes
417            .par_iter()
418            .map(|ns| NodePtr::new(ns, &self.t_props_log))
419    }
420}
421
422impl Index<usize> for NodeSlot {
423    type Output = NodeStore;
424
425    fn index(&self, index: usize) -> &Self::Output {
426        &self.nodes[index]
427    }
428}
429
430impl IndexMut<usize> for NodeSlot {
431    fn index_mut(&mut self, index: usize) -> &mut Self::Output {
432        &mut self.nodes[index]
433    }
434}
435
436impl Deref for NodeSlot {
437    type Target = Vec<NodeStore>;
438
439    fn deref(&self) -> &Self::Target {
440        &self.nodes
441    }
442}
443
444impl DerefMut for NodeSlot {
445    fn deref_mut(&mut self) -> &mut Self::Target {
446        &mut self.nodes
447    }
448}
449
450impl PartialEq for NodeVec {
451    fn eq(&self, other: &Self) -> bool {
452        let a = self.data.read();
453        let b = other.data.read();
454        a.deref() == b.deref()
455    }
456}
457
458impl Default for NodeVec {
459    fn default() -> Self {
460        Self::new()
461    }
462}
463
464impl NodeVec {
465    pub fn new() -> Self {
466        Self {
467            data: Arc::new(RwLock::new(Default::default())),
468        }
469    }
470
471    #[inline]
472    pub fn read_arc_lock(&self) -> ArcRwLockReadGuard<NodeSlot> {
473        RwLock::read_arc(&self.data)
474    }
475
476    #[inline]
477    pub fn write(&self) -> impl DerefMut<Target = NodeSlot> + '_ {
478        self.data.write()
479    }
480
481    #[inline]
482    pub fn read(&self) -> impl Deref<Target = NodeSlot> + '_ {
483        self.data.read()
484    }
485}
486
487#[derive(Serialize, Deserialize)]
488pub struct NodeStorage {
489    pub(crate) data: Box<[NodeVec]>,
490    len: AtomicUsize,
491}
492
493impl Debug for NodeStorage {
494    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
495        f.debug_struct("NodeStorage")
496            .field("len", &self.len())
497            .field("data", &self.read_lock().iter().collect_vec())
498            .finish()
499    }
500}
501
502impl PartialEq for NodeStorage {
503    fn eq(&self, other: &Self) -> bool {
504        self.data.eq(&other.data)
505    }
506}
507
508#[derive(Debug)]
509pub struct ReadLockedStorage {
510    pub(crate) locks: Vec<Arc<ArcRwLockReadGuard<NodeSlot>>>,
511    len: usize,
512}
513
514impl ReadLockedStorage {
515    fn resolve(&self, index: VID) -> (usize, usize) {
516        let index: usize = index.into();
517        let n = self.locks.len();
518        let bucket = index % n;
519        let offset = index / n;
520        (bucket, offset)
521    }
522
523    pub fn len(&self) -> usize {
524        self.len
525    }
526
527    pub fn is_empty(&self) -> bool {
528        self.len == 0
529    }
530
531    #[cfg(test)]
532    pub fn get(&self, index: VID) -> &NodeStore {
533        let (bucket, offset) = self.resolve(index);
534        let bucket = &self.locks[bucket];
535        &bucket[offset]
536    }
537
538    #[inline]
539    pub fn get_entry(&self, index: VID) -> NodePtr {
540        let (bucket, offset) = self.resolve(index);
541        let bucket = &self.locks[bucket];
542        NodePtr::new(&bucket[offset], &bucket.t_props_log)
543    }
544
545    pub fn iter(&self) -> impl Iterator<Item = NodePtr> + '_ {
546        self.locks.iter().flat_map(|v| v.iter())
547    }
548
549    pub fn par_iter(&self) -> impl ParallelIterator<Item = NodePtr> + '_ {
550        self.locks.par_iter().flat_map(|v| v.par_iter())
551    }
552}
553
554impl NodeStorage {
555    pub fn count_with_filter<F: Fn(NodePtr<'_>) -> bool + Send + Sync>(&self, f: F) -> usize {
556        self.read_lock().par_iter().filter(|x| f(*x)).count()
557    }
558}
559
560impl NodeStorage {
561    #[inline]
562    fn resolve(&self, index: usize) -> (usize, usize) {
563        resolve(index, self.data.len())
564    }
565
566    #[inline]
567    pub fn read_lock(&self) -> ReadLockedStorage {
568        let guards = self
569            .data
570            .iter()
571            .map(|v| Arc::new(v.read_arc_lock()))
572            .collect();
573        ReadLockedStorage {
574            locks: guards,
575            len: self.len(),
576        }
577    }
578
579    pub fn write_lock(&self) -> WriteLockedNodes {
580        WriteLockedNodes {
581            guards: self.data.iter().map(|lock| lock.data.write()).collect(),
582            global_len: &self.len,
583        }
584    }
585
586    pub fn new(n_locks: usize) -> Self {
587        let data: Box<[NodeVec]> = (0..n_locks)
588            .map(|_| NodeVec::new())
589            .collect::<Vec<_>>()
590            .into();
591
592        Self {
593            data,
594            len: AtomicUsize::new(0),
595        }
596    }
597
598    pub fn push(&self, mut value: NodeStore) -> UninitialisedEntry<NodeStore, NodeSlot> {
599        let index = self.len.fetch_add(1, Ordering::Relaxed);
600        value.vid = VID(index);
601        let (bucket, offset) = self.resolve(index);
602        let guard = self.data[bucket].data.write();
603        UninitialisedEntry {
604            offset,
605            guard,
606            value,
607        }
608    }
609
610    pub fn set(&self, value: NodeStore) {
611        let VID(index) = value.vid;
612        self.len.fetch_max(index + 1, Ordering::Relaxed);
613        let (bucket, offset) = self.resolve(index);
614        let mut guard = self.data[bucket].data.write();
615        if guard.len() <= offset {
616            guard.resize_with(offset + 1, NodeStore::default)
617        }
618        guard[offset] = value
619    }
620
621    #[inline]
622    pub fn entry(&self, index: VID) -> NodeEntry<'_> {
623        let index = index.into();
624        let (bucket, offset) = self.resolve(index);
625        let guard = self.data[bucket].data.read_recursive();
626        NodeEntry { offset, guard }
627    }
628
629    pub fn entry_mut(&self, index: VID) -> EntryMut<'_, RwLockWriteGuard<'_, NodeSlot>> {
630        let index = index.into();
631        let (bucket, offset) = self.resolve(index);
632        let guard = self.data[bucket].data.write();
633        EntryMut {
634            i: offset,
635            guard,
636            _pd: PhantomData,
637        }
638    }
639
640    pub fn prop_entry_mut(&self, index: VID) -> impl DerefMut<Target = TColumns> + '_ {
641        let index = index.into();
642        let (bucket, _) = self.resolve(index);
643        let lock = self.data[bucket].data.write();
644        RwLockWriteGuard::map(lock, |data| &mut data.t_props_log)
645    }
646
647    // This helps get the right locks when adding an edge
648    pub fn pair_entry_mut(&self, i: VID, j: VID) -> PairEntryMut<'_> {
649        let i = i.into();
650        let j = j.into();
651        let (bucket_i, offset_i) = self.resolve(i);
652        let (bucket_j, offset_j) = self.resolve(j);
653        // always acquire lock for smaller bucket first to avoid deadlock between two updates for the same pair of buckets
654        if bucket_i < bucket_j {
655            let guard_i = self.data[bucket_i].data.write();
656            let guard_j = self.data[bucket_j].data.write();
657            PairEntryMut::Different {
658                i: offset_i,
659                j: offset_j,
660                guard1: guard_i,
661                guard2: guard_j,
662            }
663        } else if bucket_i > bucket_j {
664            let guard_j = self.data[bucket_j].data.write();
665            let guard_i = self.data[bucket_i].data.write();
666            PairEntryMut::Different {
667                i: offset_i,
668                j: offset_j,
669                guard1: guard_i,
670                guard2: guard_j,
671            }
672        } else {
673            PairEntryMut::Same {
674                i: offset_i,
675                j: offset_j,
676                guard: self.data[bucket_i].data.write(),
677            }
678        }
679    }
680
681    #[inline]
682    pub fn len(&self) -> usize {
683        self.len.load(Ordering::SeqCst)
684    }
685
686    pub fn is_empty(&self) -> bool {
687        self.len() == 0
688    }
689
690    pub fn next_id(&self) -> VID {
691        VID(self.len.fetch_add(1, Ordering::Relaxed))
692    }
693}
694
695pub struct WriteLockedNodes<'a> {
696    guards: Vec<RwLockWriteGuard<'a, NodeSlot>>,
697    global_len: &'a AtomicUsize,
698}
699
700pub struct NodeShardWriter<'a, S> {
701    shard: S,
702    shard_id: usize,
703    num_shards: usize,
704    global_len: &'a AtomicUsize,
705}
706
707impl<'a, S> NodeShardWriter<'a, S>
708where
709    S: DerefMut<Target = NodeSlot>,
710{
711    #[inline]
712    fn resolve(&self, index: VID) -> Option<usize> {
713        let (shard_id, offset) = resolve(index.into(), self.num_shards);
714        (shard_id == self.shard_id).then_some(offset)
715    }
716
717    #[inline]
718    pub fn get_mut(&mut self, index: VID) -> Option<&mut NodeStore> {
719        self.resolve(index).map(|offset| &mut self.shard[offset])
720    }
721
722    #[inline]
723    pub fn get_mut_entry(&mut self, index: VID) -> Option<EntryMut<'_, &mut S>> {
724        self.resolve(index).map(|offset| EntryMut {
725            i: offset,
726            guard: &mut self.shard,
727            _pd: PhantomData,
728        })
729    }
730
731    #[inline]
732    pub fn get(&self, index: VID) -> Option<&NodeStore> {
733        self.resolve(index).map(|offset| &self.shard[offset])
734    }
735
736    #[inline]
737    pub fn t_prop_log_mut(&mut self) -> &mut TColumns {
738        &mut self.shard.t_props_log
739    }
740
741    pub fn set(&mut self, vid: VID, gid: GidRef) -> Option<EntryMut<'_, &mut S>> {
742        self.resolve(vid).map(|offset| {
743            if offset >= self.shard.len() {
744                self.shard.resize_with(offset + 1, NodeStore::default);
745                self.global_len
746                    .fetch_max(vid.index() + 1, Ordering::Relaxed);
747            }
748            self.shard[offset] = NodeStore::resolved(gid.to_owned(), vid);
749
750            EntryMut {
751                i: offset,
752                guard: &mut self.shard,
753                _pd: PhantomData,
754            }
755        })
756    }
757
758    pub fn shard_id(&self) -> usize {
759        self.shard_id
760    }
761
762    fn resize(&mut self, new_global_len: usize) {
763        let mut new_len = new_global_len / self.num_shards;
764        if self.shard_id < new_global_len % self.num_shards {
765            new_len += 1;
766        }
767        if new_len > self.shard.len() {
768            self.shard.resize_with(new_len, Default::default);
769            self.global_len.fetch_max(new_global_len, Ordering::Relaxed);
770        }
771    }
772}
773
774impl<'a> WriteLockedNodes<'a> {
775    pub fn par_iter_mut(
776        &mut self,
777    ) -> impl IndexedParallelIterator<Item = NodeShardWriter<&mut NodeSlot>> + '_ {
778        let num_shards = self.guards.len();
779        let global_len = self.global_len;
780        let shards: Vec<&mut NodeSlot> = self
781            .guards
782            .iter_mut()
783            .map(|guard| guard.deref_mut())
784            .collect();
785        shards
786            .into_par_iter()
787            .enumerate()
788            .map(move |(shard_id, shard)| NodeShardWriter {
789                shard,
790                shard_id,
791                num_shards,
792                global_len,
793            })
794    }
795
796    pub fn into_par_iter_mut(
797        self,
798    ) -> impl IndexedParallelIterator<Item = NodeShardWriter<'a, RwLockWriteGuard<'a, NodeSlot>>> + 'a
799    {
800        let num_shards = self.guards.len();
801        let global_len = self.global_len;
802        self.guards
803            .into_par_iter()
804            .enumerate()
805            .map(move |(shard_id, shard)| NodeShardWriter {
806                shard,
807                shard_id,
808                num_shards,
809                global_len,
810            })
811    }
812
813    pub fn resize(&mut self, new_len: usize) {
814        self.par_iter_mut()
815            .for_each(|mut shard| shard.resize(new_len))
816    }
817
818    pub fn num_shards(&self) -> usize {
819        self.guards.len()
820    }
821}
822
823#[derive(Debug)]
824pub struct NodeEntry<'a> {
825    offset: usize,
826    guard: RwLockReadGuard<'a, NodeSlot>,
827}
828
829impl NodeEntry<'_> {
830    #[inline]
831    pub fn as_ref(&self) -> NodePtr<'_> {
832        NodePtr::new(&self.guard[self.offset], &self.guard.t_props_log)
833    }
834}
835
836pub enum PairEntryMut<'a> {
837    Same {
838        i: usize,
839        j: usize,
840        guard: parking_lot::RwLockWriteGuard<'a, NodeSlot>,
841    },
842    Different {
843        i: usize,
844        j: usize,
845        guard1: parking_lot::RwLockWriteGuard<'a, NodeSlot>,
846        guard2: parking_lot::RwLockWriteGuard<'a, NodeSlot>,
847    },
848}
849
850impl<'a> PairEntryMut<'a> {
851    pub(crate) fn get_i(&self) -> &NodeStore {
852        match self {
853            PairEntryMut::Same { i, guard, .. } => &guard[*i],
854            PairEntryMut::Different { i, guard1, .. } => &guard1[*i],
855        }
856    }
857    pub(crate) fn get_mut_i(&mut self) -> &mut NodeStore {
858        match self {
859            PairEntryMut::Same { i, guard, .. } => &mut guard[*i],
860            PairEntryMut::Different { i, guard1, .. } => &mut guard1[*i],
861        }
862    }
863
864    pub(crate) fn get_j(&self) -> &NodeStore {
865        match self {
866            PairEntryMut::Same { j, guard, .. } => &guard[*j],
867            PairEntryMut::Different { j, guard2, .. } => &guard2[*j],
868        }
869    }
870
871    pub(crate) fn get_mut_j(&mut self) -> &mut NodeStore {
872        match self {
873            PairEntryMut::Same { j, guard, .. } => &mut guard[*j],
874            PairEntryMut::Different { j, guard2, .. } => &mut guard2[*j],
875        }
876    }
877}
878
879pub struct EntryMut<'a, NS: 'a> {
880    i: usize,
881    guard: NS,
882    _pd: PhantomData<&'a ()>,
883}
884
885impl<'a, NS> EntryMut<'a, NS> {
886    pub fn to_mut(&mut self) -> EntryMut<'a, &mut NS> {
887        EntryMut {
888            i: self.i,
889            guard: &mut self.guard,
890            _pd: self._pd,
891        }
892    }
893}
894
895impl<'a, NS: DerefMut<Target = NodeSlot>> AsMut<NodeStore> for EntryMut<'a, NS> {
896    fn as_mut(&mut self) -> &mut NodeStore {
897        let slots = self.guard.deref_mut();
898        &mut slots[self.i]
899    }
900}
901
902impl<'a, NS: DerefMut<Target = NodeSlot> + 'a> EntryMut<'a, &'a mut NS> {
903    pub fn node_store_mut(&mut self) -> &mut NodeStore {
904        &mut self.guard[self.i]
905    }
906
907    pub fn t_props_log_mut(&mut self) -> &mut TColumns {
908        &mut self.guard.t_props_log
909    }
910}
911
912#[cfg(test)]
913mod test {
914    use super::{NodeStorage, TColumns};
915    use crate::entities::nodes::node_store::NodeStore;
916    use proptest::{arbitrary::any, prop_assert_eq, proptest};
917    use raphtory_api::core::entities::{properties::prop::Prop, GID, VID};
918    use rayon::prelude::*;
919    use std::borrow::Cow;
920
921    #[test]
922    fn tcolumns_append_1() {
923        let mut t_cols = TColumns::default();
924
925        t_cols.push([(1, Prop::U64(1))]).unwrap();
926
927        let col0 = t_cols.get(0).unwrap();
928        let col1 = t_cols.get(1).unwrap();
929
930        assert_eq!(col0.len(), 1);
931        assert_eq!(col1.len(), 1);
932    }
933
934    #[test]
935    fn tcolumns_append_3_rows() {
936        let mut t_cols = TColumns::default();
937
938        t_cols
939            .push([(1, Prop::U64(1)), (0, Prop::Str("a".into()))])
940            .unwrap();
941        t_cols
942            .push([(0, Prop::Str("c".into())), (2, Prop::I64(9))])
943            .unwrap();
944        t_cols
945            .push([(1, Prop::U64(1)), (3, Prop::Str("c".into()))])
946            .unwrap();
947
948        assert_eq!(t_cols.len(), 3);
949
950        for col_id in 0..4 {
951            let col = t_cols.get(col_id).unwrap();
952            assert_eq!(col.len(), 3);
953        }
954
955        let col0 = (0..3)
956            .map(|row| t_cols.get(0).and_then(|col| col.get(row)))
957            .collect::<Vec<_>>();
958        assert_eq!(
959            col0,
960            vec![
961                Some(Prop::Str("a".into())),
962                Some(Prop::Str("c".into())),
963                None
964            ]
965        );
966
967        let col1 = (0..3)
968            .map(|row| t_cols.get(1).and_then(|col| col.get(row)))
969            .collect::<Vec<_>>();
970        assert_eq!(col1, vec![Some(Prop::U64(1)), None, Some(Prop::U64(1))]);
971
972        let col2 = (0..3)
973            .map(|row| t_cols.get(2).and_then(|col| col.get(row)))
974            .collect::<Vec<_>>();
975        assert_eq!(col2, vec![None, Some(Prop::I64(9)), None]);
976
977        let col3 = (0..3)
978            .map(|row| t_cols.get(3).and_then(|col| col.get(row)))
979            .collect::<Vec<_>>();
980        assert_eq!(col3, vec![None, None, Some(Prop::Str("c".into()))]);
981    }
982
983    #[test]
984    fn tcolumns_append_2_columns_12_items() {
985        let mut t_cols = TColumns::default();
986
987        for value in 0..12 {
988            if value % 2 == 0 {
989                t_cols
990                    .push([
991                        (1, Prop::U64(value)),
992                        (0, Prop::Str(value.to_string().into())),
993                    ])
994                    .unwrap();
995            } else {
996                t_cols.push([(1, Prop::U64(value))]).unwrap();
997            }
998        }
999
1000        assert_eq!(t_cols.len(), 12);
1001
1002        let col0 = (0..12)
1003            .map(|row| t_cols.get(0).and_then(|col| col.get(row)))
1004            .collect::<Vec<_>>();
1005        assert_eq!(
1006            col0,
1007            vec![
1008                Some(Prop::Str("0".into())),
1009                None,
1010                Some(Prop::Str("2".into())),
1011                None,
1012                Some(Prop::Str("4".into())),
1013                None,
1014                Some(Prop::Str("6".into())),
1015                None,
1016                Some(Prop::Str("8".into())),
1017                None,
1018                Some(Prop::Str("10".into())),
1019                None
1020            ]
1021        );
1022
1023        let col1 = (0..12)
1024            .map(|row| t_cols.get(1).and_then(|col| col.get(row)))
1025            .collect::<Vec<_>>();
1026        assert_eq!(
1027            col1,
1028            vec![
1029                Some(Prop::U64(0)),
1030                Some(Prop::U64(1)),
1031                Some(Prop::U64(2)),
1032                Some(Prop::U64(3)),
1033                Some(Prop::U64(4)),
1034                Some(Prop::U64(5)),
1035                Some(Prop::U64(6)),
1036                Some(Prop::U64(7)),
1037                Some(Prop::U64(8)),
1038                Some(Prop::U64(9)),
1039                Some(Prop::U64(10)),
1040                Some(Prop::U64(11))
1041            ]
1042        );
1043    }
1044
1045    #[test]
1046    fn add_5_values_to_storage() {
1047        let storage = NodeStorage::new(2);
1048
1049        for i in 0..5 {
1050            storage.push(NodeStore::empty(i.into())).init();
1051        }
1052
1053        assert_eq!(storage.len(), 5);
1054
1055        for i in 0..5 {
1056            let entry = storage.entry(VID(i));
1057            assert_eq!(entry.as_ref().node().vid, VID(i));
1058        }
1059
1060        let items = storage.read_lock();
1061
1062        let actual = items
1063            .iter()
1064            .map(|s| s.node().vid.index())
1065            .collect::<Vec<_>>();
1066
1067        assert_eq!(actual, vec![0, 2, 4, 1, 3]);
1068    }
1069
1070    #[test]
1071    fn test_index_correctness() {
1072        let storage = NodeStorage::new(2);
1073
1074        for i in 0..5 {
1075            storage.push(NodeStore::empty(i.into())).init();
1076        }
1077        let locked = storage.read_lock();
1078        let actual: Vec<_> = (0..5)
1079            .map(|i| (i, locked.get(VID(i)).global_id.to_str()))
1080            .collect();
1081
1082        assert_eq!(
1083            actual,
1084            vec![
1085                (0usize, Cow::Borrowed("0")),
1086                (1, "1".into()),
1087                (2, "2".into()),
1088                (3, "3".into()),
1089                (4, "4".into())
1090            ]
1091        );
1092    }
1093
1094    #[test]
1095    fn test_entry() {
1096        let storage = NodeStorage::new(2);
1097
1098        for i in 0..5 {
1099            storage.push(NodeStore::empty(i.into())).init();
1100        }
1101
1102        for i in 0..5 {
1103            let entry = storage.entry(VID(i));
1104            assert_eq!(*entry.as_ref().node().global_id.to_str(), i.to_string());
1105        }
1106    }
1107
1108    #[test]
1109    fn concurrent_push() {
1110        proptest!(|(v in any::<Vec<u64>>())| {
1111            let storage = NodeStorage::new(16);
1112            let mut expected = v
1113                .into_par_iter()
1114                .map(|v| {
1115                    storage.push(NodeStore::empty(GID::U64(v))).init();
1116                    v
1117                })
1118                .collect::<Vec<_>>();
1119
1120            let locked = storage.read_lock();
1121            let mut actual: Vec<_> = locked
1122                .iter()
1123                .map(|n| n.node().global_id.as_u64().unwrap())
1124                .collect();
1125
1126            actual.sort();
1127            expected.sort();
1128            prop_assert_eq!(actual, expected)
1129        })
1130    }
1131}