1use crate::{
2 entities::{
3 nodes::node_store::NodeStore,
4 properties::{props::TPropError, tprop::IllegalPropType},
5 },
6 storage::lazy_vec::IllegalSet,
7};
8use bigdecimal::BigDecimal;
9use itertools::Itertools;
10use lazy_vec::LazyVec;
11use lock_api;
12use node_entry::NodePtr;
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14#[cfg(feature = "arrow")]
15use raphtory_api::core::entities::properties::prop::PropArray;
16use raphtory_api::core::{
17 entities::{
18 properties::prop::{Prop, PropType},
19 GidRef, VID,
20 },
21 storage::arc_str::ArcStr,
22};
23use rayon::prelude::*;
24use rustc_hash::FxHashMap;
25use serde::{Deserialize, Serialize};
26use std::{
27 collections::HashMap,
28 fmt::{Debug, Formatter},
29 marker::PhantomData,
30 ops::{Deref, DerefMut, Index, IndexMut},
31 sync::{
32 atomic::{AtomicUsize, Ordering},
33 Arc,
34 },
35};
36use thiserror::Error;
37
38pub mod lazy_vec;
39pub mod locked_view;
40pub mod node_entry;
41pub mod raw_edges;
42pub mod timeindex;
43
44type ArcRwLockReadGuard<T> = lock_api::ArcRwLockReadGuard<parking_lot::RawRwLock, T>;
45#[must_use]
46pub struct UninitialisedEntry<'a, T, TS> {
47 offset: usize,
48 guard: RwLockWriteGuard<'a, TS>,
49 value: T,
50}
51
52impl<'a, T: Default, TS: DerefMut<Target = Vec<T>>> UninitialisedEntry<'a, T, TS> {
53 pub fn init(mut self) {
54 if self.offset >= self.guard.len() {
55 self.guard.resize_with(self.offset + 1, Default::default);
56 }
57 self.guard[self.offset] = self.value;
58 }
59 pub fn value(&self) -> &T {
60 &self.value
61 }
62}
63
64#[inline]
65fn resolve(index: usize, num_buckets: usize) -> (usize, usize) {
66 let bucket = index % num_buckets;
67 let offset = index / num_buckets;
68 (bucket, offset)
69}
70
71#[derive(Debug, Serialize, Deserialize, Clone)]
72pub struct NodeVec {
73 data: Arc<RwLock<NodeSlot>>,
74}
75
76#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
77pub struct NodeSlot {
78 nodes: Vec<NodeStore>,
79 t_props_log: TColumns, }
81
82#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
83pub struct TColumns {
84 t_props_log: Vec<TPropColumn>,
85 num_rows: usize,
86}
87
88impl TColumns {
89 pub fn push(
90 &mut self,
91 row: impl IntoIterator<Item = (usize, Prop)>,
92 ) -> Result<Option<usize>, TPropError> {
93 let id = self.num_rows;
94 let mut has_props = false;
95
96 for (prop_id, prop) in row {
97 match self.t_props_log.get_mut(prop_id) {
98 Some(col) => col.push(prop)?,
99 None => {
100 let col: TPropColumn = TPropColumn::new(self.num_rows, prop);
101 self.t_props_log
102 .resize_with(prop_id + 1, || TPropColumn::Empty(id));
103 self.t_props_log[prop_id] = col;
104 }
105 }
106 has_props = true;
107 }
108
109 if has_props {
110 self.num_rows += 1;
111 for col in self.t_props_log.iter_mut() {
112 col.grow(self.num_rows);
113 }
114 Ok(Some(id))
115 } else {
116 Ok(None)
117 }
118 }
119
120 pub(crate) fn get(&self, prop_id: usize) -> Option<&TPropColumn> {
121 self.t_props_log.get(prop_id)
122 }
123
124 pub fn len(&self) -> usize {
125 self.num_rows
126 }
127
128 pub fn is_empty(&self) -> bool {
129 self.num_rows == 0
130 }
131
132 pub fn iter(&self) -> impl Iterator<Item = &TPropColumn> {
133 self.t_props_log.iter()
134 }
135}
136
137#[derive(Debug, Serialize, Deserialize, PartialEq)]
138pub enum TPropColumn {
139 Empty(usize),
140 Bool(LazyVec<bool>),
141 U8(LazyVec<u8>),
142 U16(LazyVec<u16>),
143 U32(LazyVec<u32>),
144 U64(LazyVec<u64>),
145 I32(LazyVec<i32>),
146 I64(LazyVec<i64>),
147 F32(LazyVec<f32>),
148 F64(LazyVec<f64>),
149 Str(LazyVec<ArcStr>),
150 #[cfg(feature = "arrow")]
151 Array(LazyVec<PropArray>),
152 List(LazyVec<Arc<Vec<Prop>>>),
153 Map(LazyVec<Arc<FxHashMap<ArcStr, Prop>>>),
154 NDTime(LazyVec<chrono::NaiveDateTime>),
155 DTime(LazyVec<chrono::DateTime<chrono::Utc>>),
156 Decimal(LazyVec<BigDecimal>),
157}
158
159#[derive(Error, Debug)]
160pub enum TPropColumnError {
161 #[error(transparent)]
162 IllegalSetBool(#[from] IllegalSet<bool>),
163 #[error(transparent)]
164 IllegalSetU8(#[from] IllegalSet<u8>),
165 #[error(transparent)]
166 IllegalSetU16(#[from] IllegalSet<u16>),
167 #[error(transparent)]
168 IllegalSetU32(#[from] IllegalSet<u32>),
169 #[error(transparent)]
170 IllegalSetU64(#[from] IllegalSet<u64>),
171 #[error(transparent)]
172 IllegalSetI32(#[from] IllegalSet<i32>),
173 #[error(transparent)]
174 IllegalSetI64(#[from] IllegalSet<i64>),
175 #[error(transparent)]
176 IllegalSetF32(#[from] IllegalSet<f32>),
177 #[error(transparent)]
178 IllegalSetF64(#[from] IllegalSet<f64>),
179 #[error(transparent)]
180 IllegalSetStr(#[from] IllegalSet<ArcStr>),
181 #[cfg(feature = "arrow")]
182 #[error(transparent)]
183 IllegalSetArray(#[from] IllegalSet<PropArray>),
184 #[error(transparent)]
185 IllegalSetList(#[from] IllegalSet<Arc<Vec<Prop>>>),
186 #[error(transparent)]
187 IllegalSetMap(#[from] IllegalSet<Arc<FxHashMap<ArcStr, Prop>>>),
188 #[error(transparent)]
189 IllegalSetNDTime(#[from] IllegalSet<chrono::NaiveDateTime>),
190 #[error(transparent)]
191 IllegalSetDTime(#[from] IllegalSet<chrono::DateTime<chrono::Utc>>),
192 #[error(transparent)]
193 Decimal(#[from] IllegalSet<BigDecimal>),
194 #[error(transparent)]
195 IllegalPropType(#[from] IllegalPropType),
196}
197
198impl Default for TPropColumn {
199 fn default() -> Self {
200 TPropColumn::Empty(0)
201 }
202}
203
204impl TPropColumn {
205 pub(crate) fn new(idx: usize, prop: Prop) -> Self {
206 let mut col = TPropColumn::default();
207 col.set(idx, prop).unwrap();
208 col
209 }
210
211 pub(crate) fn dtype(&self) -> PropType {
212 match self {
213 TPropColumn::Empty(_) => PropType::Empty,
214 TPropColumn::Bool(_) => PropType::Bool,
215 TPropColumn::U8(_) => PropType::U8,
216 TPropColumn::U16(_) => PropType::U16,
217 TPropColumn::U32(_) => PropType::U32,
218 TPropColumn::U64(_) => PropType::U64,
219 TPropColumn::I32(_) => PropType::I32,
220 TPropColumn::I64(_) => PropType::I64,
221 TPropColumn::F32(_) => PropType::F32,
222 TPropColumn::F64(_) => PropType::F64,
223 TPropColumn::Str(_) => PropType::Str,
224 #[cfg(feature = "arrow")]
225 TPropColumn::Array(_) => PropType::Array(Box::new(PropType::Empty)),
226 TPropColumn::List(_) => PropType::List(Box::new(PropType::Empty)),
227 TPropColumn::Map(_) => PropType::Map(HashMap::new().into()),
228 TPropColumn::NDTime(_) => PropType::NDTime,
229 TPropColumn::DTime(_) => PropType::DTime,
230 TPropColumn::Decimal(_) => PropType::Decimal { scale: 0 },
231 }
232 }
233
234 pub(crate) fn grow(&mut self, new_len: usize) {
235 while self.len() < new_len {
236 self.push_null();
237 }
238 }
239
240 pub(crate) fn set(&mut self, index: usize, prop: Prop) -> Result<(), TPropColumnError> {
241 self.init_empty_col(&prop);
242 match (self, prop) {
243 (TPropColumn::Bool(col), Prop::Bool(v)) => col.set(index, v)?,
244 (TPropColumn::I64(col), Prop::I64(v)) => col.set(index, v)?,
245 (TPropColumn::U32(col), Prop::U32(v)) => col.set(index, v)?,
246 (TPropColumn::U64(col), Prop::U64(v)) => col.set(index, v)?,
247 (TPropColumn::F32(col), Prop::F32(v)) => col.set(index, v)?,
248 (TPropColumn::F64(col), Prop::F64(v)) => col.set(index, v)?,
249 (TPropColumn::Str(col), Prop::Str(v)) => col.set(index, v)?,
250 #[cfg(feature = "arrow")]
251 (TPropColumn::Array(col), Prop::Array(v)) => col.set(index, v)?,
252 (TPropColumn::U8(col), Prop::U8(v)) => col.set(index, v)?,
253 (TPropColumn::U16(col), Prop::U16(v)) => col.set(index, v)?,
254 (TPropColumn::I32(col), Prop::I32(v)) => col.set(index, v)?,
255 (TPropColumn::List(col), Prop::List(v)) => col.set(index, v)?,
256 (TPropColumn::Map(col), Prop::Map(v)) => col.set(index, v)?,
257 (TPropColumn::NDTime(col), Prop::NDTime(v)) => col.set(index, v)?,
258 (TPropColumn::DTime(col), Prop::DTime(v)) => col.set(index, v)?,
259 (TPropColumn::Decimal(col), Prop::Decimal(v)) => col.set(index, v)?,
260 (col, prop) => {
261 Err(IllegalPropType {
262 expected: col.dtype(),
263 actual: prop.dtype(),
264 })?;
265 }
266 }
267 Ok(())
268 }
269
270 pub(crate) fn push(&mut self, prop: Prop) -> Result<(), IllegalPropType> {
271 self.init_empty_col(&prop);
272 match (self, prop) {
273 (TPropColumn::Bool(col), Prop::Bool(v)) => col.push(Some(v)),
274 (TPropColumn::U8(col), Prop::U8(v)) => col.push(Some(v)),
275 (TPropColumn::I64(col), Prop::I64(v)) => col.push(Some(v)),
276 (TPropColumn::U32(col), Prop::U32(v)) => col.push(Some(v)),
277 (TPropColumn::U64(col), Prop::U64(v)) => col.push(Some(v)),
278 (TPropColumn::F32(col), Prop::F32(v)) => col.push(Some(v)),
279 (TPropColumn::F64(col), Prop::F64(v)) => col.push(Some(v)),
280 (TPropColumn::Str(col), Prop::Str(v)) => col.push(Some(v)),
281 #[cfg(feature = "arrow")]
282 (TPropColumn::Array(col), Prop::Array(v)) => col.push(Some(v)),
283 (TPropColumn::U16(col), Prop::U16(v)) => col.push(Some(v)),
284 (TPropColumn::I32(col), Prop::I32(v)) => col.push(Some(v)),
285 (TPropColumn::List(col), Prop::List(v)) => col.push(Some(v)),
286 (TPropColumn::Map(col), Prop::Map(v)) => col.push(Some(v)),
287 (TPropColumn::NDTime(col), Prop::NDTime(v)) => col.push(Some(v)),
288 (TPropColumn::DTime(col), Prop::DTime(v)) => col.push(Some(v)),
289 (TPropColumn::Decimal(col), Prop::Decimal(v)) => col.push(Some(v)),
290 (col, prop) => {
291 return Err(IllegalPropType {
292 expected: col.dtype(),
293 actual: prop.dtype(),
294 })
295 }
296 }
297 Ok(())
298 }
299
300 fn init_empty_col(&mut self, prop: &Prop) {
301 if let TPropColumn::Empty(len) = self {
302 match prop {
303 Prop::Bool(_) => *self = TPropColumn::Bool(LazyVec::with_len(*len)),
304 Prop::I64(_) => *self = TPropColumn::I64(LazyVec::with_len(*len)),
305 Prop::U32(_) => *self = TPropColumn::U32(LazyVec::with_len(*len)),
306 Prop::U64(_) => *self = TPropColumn::U64(LazyVec::with_len(*len)),
307 Prop::F32(_) => *self = TPropColumn::F32(LazyVec::with_len(*len)),
308 Prop::F64(_) => *self = TPropColumn::F64(LazyVec::with_len(*len)),
309 Prop::Str(_) => *self = TPropColumn::Str(LazyVec::with_len(*len)),
310 #[cfg(feature = "arrow")]
311 Prop::Array(_) => *self = TPropColumn::Array(LazyVec::with_len(*len)),
312 Prop::U8(_) => *self = TPropColumn::U8(LazyVec::with_len(*len)),
313 Prop::U16(_) => *self = TPropColumn::U16(LazyVec::with_len(*len)),
314 Prop::I32(_) => *self = TPropColumn::I32(LazyVec::with_len(*len)),
315 Prop::List(_) => *self = TPropColumn::List(LazyVec::with_len(*len)),
316 Prop::Map(_) => *self = TPropColumn::Map(LazyVec::with_len(*len)),
317 Prop::NDTime(_) => *self = TPropColumn::NDTime(LazyVec::with_len(*len)),
318 Prop::DTime(_) => *self = TPropColumn::DTime(LazyVec::with_len(*len)),
319 Prop::Decimal(_) => *self = TPropColumn::Decimal(LazyVec::with_len(*len)),
320 }
321 }
322 }
323
324 fn is_empty(&self) -> bool {
325 matches!(self, TPropColumn::Empty(_))
326 }
327
328 pub(crate) fn push_null(&mut self) {
329 match self {
330 TPropColumn::Bool(col) => col.push(None),
331 TPropColumn::I64(col) => col.push(None),
332 TPropColumn::U32(col) => col.push(None),
333 TPropColumn::U64(col) => col.push(None),
334 TPropColumn::F32(col) => col.push(None),
335 TPropColumn::F64(col) => col.push(None),
336 TPropColumn::Str(col) => col.push(None),
337 #[cfg(feature = "arrow")]
338 TPropColumn::Array(col) => col.push(None),
339 TPropColumn::U8(col) => col.push(None),
340 TPropColumn::U16(col) => col.push(None),
341 TPropColumn::I32(col) => col.push(None),
342 TPropColumn::List(col) => col.push(None),
343 TPropColumn::Map(col) => col.push(None),
344 TPropColumn::NDTime(col) => col.push(None),
345 TPropColumn::DTime(col) => col.push(None),
346 TPropColumn::Decimal(col) => col.push(None),
347 TPropColumn::Empty(count) => {
348 *count += 1;
349 }
350 }
351 }
352
353 pub fn get(&self, index: usize) -> Option<Prop> {
354 match self {
355 TPropColumn::Bool(col) => col.get_opt(index).map(|prop| (*prop).into()),
356 TPropColumn::I64(col) => col.get_opt(index).map(|prop| (*prop).into()),
357 TPropColumn::U32(col) => col.get_opt(index).map(|prop| (*prop).into()),
358 TPropColumn::U64(col) => col.get_opt(index).map(|prop| (*prop).into()),
359 TPropColumn::F32(col) => col.get_opt(index).map(|prop| (*prop).into()),
360 TPropColumn::F64(col) => col.get_opt(index).map(|prop| (*prop).into()),
361 TPropColumn::Str(col) => col.get_opt(index).map(|prop| prop.into()),
362 #[cfg(feature = "arrow")]
363 TPropColumn::Array(col) => col.get_opt(index).map(|prop| Prop::Array(prop.clone())),
364 TPropColumn::U8(col) => col.get_opt(index).map(|prop| (*prop).into()),
365 TPropColumn::U16(col) => col.get_opt(index).map(|prop| (*prop).into()),
366 TPropColumn::I32(col) => col.get_opt(index).map(|prop| (*prop).into()),
367 TPropColumn::List(col) => col.get_opt(index).map(|prop| Prop::List(prop.clone())),
368 TPropColumn::Map(col) => col.get_opt(index).map(|prop| Prop::Map(prop.clone())),
369 TPropColumn::NDTime(col) => col.get_opt(index).map(|prop| Prop::NDTime(*prop)),
370 TPropColumn::DTime(col) => col.get_opt(index).map(|prop| Prop::DTime(*prop)),
371 TPropColumn::Decimal(col) => col.get_opt(index).map(|prop| Prop::Decimal(prop.clone())),
372 TPropColumn::Empty(_) => None,
373 }
374 }
375
376 pub(crate) fn len(&self) -> usize {
377 match self {
378 TPropColumn::Bool(col) => col.len(),
379 TPropColumn::I64(col) => col.len(),
380 TPropColumn::U32(col) => col.len(),
381 TPropColumn::U64(col) => col.len(),
382 TPropColumn::F32(col) => col.len(),
383 TPropColumn::F64(col) => col.len(),
384 TPropColumn::Str(col) => col.len(),
385 #[cfg(feature = "arrow")]
386 TPropColumn::Array(col) => col.len(),
387 TPropColumn::U8(col) => col.len(),
388 TPropColumn::U16(col) => col.len(),
389 TPropColumn::I32(col) => col.len(),
390 TPropColumn::List(col) => col.len(),
391 TPropColumn::Map(col) => col.len(),
392 TPropColumn::NDTime(col) => col.len(),
393 TPropColumn::DTime(col) => col.len(),
394 TPropColumn::Decimal(col) => col.len(),
395 TPropColumn::Empty(count) => *count,
396 }
397 }
398}
399
400impl NodeSlot {
401 pub fn t_props_log(&self) -> &TColumns {
402 &self.t_props_log
403 }
404
405 pub fn t_props_log_mut(&mut self) -> &mut TColumns {
406 &mut self.t_props_log
407 }
408
409 pub fn iter(&self) -> impl Iterator<Item = NodePtr> {
410 self.nodes
411 .iter()
412 .map(|ns| NodePtr::new(ns, &self.t_props_log))
413 }
414
415 pub fn par_iter(&self) -> impl ParallelIterator<Item = NodePtr> {
416 self.nodes
417 .par_iter()
418 .map(|ns| NodePtr::new(ns, &self.t_props_log))
419 }
420}
421
422impl Index<usize> for NodeSlot {
423 type Output = NodeStore;
424
425 fn index(&self, index: usize) -> &Self::Output {
426 &self.nodes[index]
427 }
428}
429
430impl IndexMut<usize> for NodeSlot {
431 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
432 &mut self.nodes[index]
433 }
434}
435
436impl Deref for NodeSlot {
437 type Target = Vec<NodeStore>;
438
439 fn deref(&self) -> &Self::Target {
440 &self.nodes
441 }
442}
443
444impl DerefMut for NodeSlot {
445 fn deref_mut(&mut self) -> &mut Self::Target {
446 &mut self.nodes
447 }
448}
449
450impl PartialEq for NodeVec {
451 fn eq(&self, other: &Self) -> bool {
452 let a = self.data.read();
453 let b = other.data.read();
454 a.deref() == b.deref()
455 }
456}
457
458impl Default for NodeVec {
459 fn default() -> Self {
460 Self::new()
461 }
462}
463
464impl NodeVec {
465 pub fn new() -> Self {
466 Self {
467 data: Arc::new(RwLock::new(Default::default())),
468 }
469 }
470
471 #[inline]
472 pub fn read_arc_lock(&self) -> ArcRwLockReadGuard<NodeSlot> {
473 RwLock::read_arc(&self.data)
474 }
475
476 #[inline]
477 pub fn write(&self) -> impl DerefMut<Target = NodeSlot> + '_ {
478 self.data.write()
479 }
480
481 #[inline]
482 pub fn read(&self) -> impl Deref<Target = NodeSlot> + '_ {
483 self.data.read()
484 }
485}
486
487#[derive(Serialize, Deserialize)]
488pub struct NodeStorage {
489 pub(crate) data: Box<[NodeVec]>,
490 len: AtomicUsize,
491}
492
493impl Debug for NodeStorage {
494 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
495 f.debug_struct("NodeStorage")
496 .field("len", &self.len())
497 .field("data", &self.read_lock().iter().collect_vec())
498 .finish()
499 }
500}
501
502impl PartialEq for NodeStorage {
503 fn eq(&self, other: &Self) -> bool {
504 self.data.eq(&other.data)
505 }
506}
507
508#[derive(Debug)]
509pub struct ReadLockedStorage {
510 pub(crate) locks: Vec<Arc<ArcRwLockReadGuard<NodeSlot>>>,
511 len: usize,
512}
513
514impl ReadLockedStorage {
515 fn resolve(&self, index: VID) -> (usize, usize) {
516 let index: usize = index.into();
517 let n = self.locks.len();
518 let bucket = index % n;
519 let offset = index / n;
520 (bucket, offset)
521 }
522
523 pub fn len(&self) -> usize {
524 self.len
525 }
526
527 pub fn is_empty(&self) -> bool {
528 self.len == 0
529 }
530
531 #[cfg(test)]
532 pub fn get(&self, index: VID) -> &NodeStore {
533 let (bucket, offset) = self.resolve(index);
534 let bucket = &self.locks[bucket];
535 &bucket[offset]
536 }
537
538 #[inline]
539 pub fn get_entry(&self, index: VID) -> NodePtr {
540 let (bucket, offset) = self.resolve(index);
541 let bucket = &self.locks[bucket];
542 NodePtr::new(&bucket[offset], &bucket.t_props_log)
543 }
544
545 pub fn iter(&self) -> impl Iterator<Item = NodePtr> + '_ {
546 self.locks.iter().flat_map(|v| v.iter())
547 }
548
549 pub fn par_iter(&self) -> impl ParallelIterator<Item = NodePtr> + '_ {
550 self.locks.par_iter().flat_map(|v| v.par_iter())
551 }
552}
553
554impl NodeStorage {
555 pub fn count_with_filter<F: Fn(NodePtr<'_>) -> bool + Send + Sync>(&self, f: F) -> usize {
556 self.read_lock().par_iter().filter(|x| f(*x)).count()
557 }
558}
559
560impl NodeStorage {
561 #[inline]
562 fn resolve(&self, index: usize) -> (usize, usize) {
563 resolve(index, self.data.len())
564 }
565
566 #[inline]
567 pub fn read_lock(&self) -> ReadLockedStorage {
568 let guards = self
569 .data
570 .iter()
571 .map(|v| Arc::new(v.read_arc_lock()))
572 .collect();
573 ReadLockedStorage {
574 locks: guards,
575 len: self.len(),
576 }
577 }
578
579 pub fn write_lock(&self) -> WriteLockedNodes {
580 WriteLockedNodes {
581 guards: self.data.iter().map(|lock| lock.data.write()).collect(),
582 global_len: &self.len,
583 }
584 }
585
586 pub fn new(n_locks: usize) -> Self {
587 let data: Box<[NodeVec]> = (0..n_locks)
588 .map(|_| NodeVec::new())
589 .collect::<Vec<_>>()
590 .into();
591
592 Self {
593 data,
594 len: AtomicUsize::new(0),
595 }
596 }
597
598 pub fn push(&self, mut value: NodeStore) -> UninitialisedEntry<NodeStore, NodeSlot> {
599 let index = self.len.fetch_add(1, Ordering::Relaxed);
600 value.vid = VID(index);
601 let (bucket, offset) = self.resolve(index);
602 let guard = self.data[bucket].data.write();
603 UninitialisedEntry {
604 offset,
605 guard,
606 value,
607 }
608 }
609
610 pub fn set(&self, value: NodeStore) {
611 let VID(index) = value.vid;
612 self.len.fetch_max(index + 1, Ordering::Relaxed);
613 let (bucket, offset) = self.resolve(index);
614 let mut guard = self.data[bucket].data.write();
615 if guard.len() <= offset {
616 guard.resize_with(offset + 1, NodeStore::default)
617 }
618 guard[offset] = value
619 }
620
621 #[inline]
622 pub fn entry(&self, index: VID) -> NodeEntry<'_> {
623 let index = index.into();
624 let (bucket, offset) = self.resolve(index);
625 let guard = self.data[bucket].data.read_recursive();
626 NodeEntry { offset, guard }
627 }
628
629 pub fn entry_mut(&self, index: VID) -> EntryMut<'_, RwLockWriteGuard<'_, NodeSlot>> {
630 let index = index.into();
631 let (bucket, offset) = self.resolve(index);
632 let guard = self.data[bucket].data.write();
633 EntryMut {
634 i: offset,
635 guard,
636 _pd: PhantomData,
637 }
638 }
639
640 pub fn prop_entry_mut(&self, index: VID) -> impl DerefMut<Target = TColumns> + '_ {
641 let index = index.into();
642 let (bucket, _) = self.resolve(index);
643 let lock = self.data[bucket].data.write();
644 RwLockWriteGuard::map(lock, |data| &mut data.t_props_log)
645 }
646
647 pub fn pair_entry_mut(&self, i: VID, j: VID) -> PairEntryMut<'_> {
649 let i = i.into();
650 let j = j.into();
651 let (bucket_i, offset_i) = self.resolve(i);
652 let (bucket_j, offset_j) = self.resolve(j);
653 if bucket_i < bucket_j {
655 let guard_i = self.data[bucket_i].data.write();
656 let guard_j = self.data[bucket_j].data.write();
657 PairEntryMut::Different {
658 i: offset_i,
659 j: offset_j,
660 guard1: guard_i,
661 guard2: guard_j,
662 }
663 } else if bucket_i > bucket_j {
664 let guard_j = self.data[bucket_j].data.write();
665 let guard_i = self.data[bucket_i].data.write();
666 PairEntryMut::Different {
667 i: offset_i,
668 j: offset_j,
669 guard1: guard_i,
670 guard2: guard_j,
671 }
672 } else {
673 PairEntryMut::Same {
674 i: offset_i,
675 j: offset_j,
676 guard: self.data[bucket_i].data.write(),
677 }
678 }
679 }
680
681 #[inline]
682 pub fn len(&self) -> usize {
683 self.len.load(Ordering::SeqCst)
684 }
685
686 pub fn is_empty(&self) -> bool {
687 self.len() == 0
688 }
689
690 pub fn next_id(&self) -> VID {
691 VID(self.len.fetch_add(1, Ordering::Relaxed))
692 }
693}
694
695pub struct WriteLockedNodes<'a> {
696 guards: Vec<RwLockWriteGuard<'a, NodeSlot>>,
697 global_len: &'a AtomicUsize,
698}
699
700pub struct NodeShardWriter<'a, S> {
701 shard: S,
702 shard_id: usize,
703 num_shards: usize,
704 global_len: &'a AtomicUsize,
705}
706
707impl<'a, S> NodeShardWriter<'a, S>
708where
709 S: DerefMut<Target = NodeSlot>,
710{
711 #[inline]
712 fn resolve(&self, index: VID) -> Option<usize> {
713 let (shard_id, offset) = resolve(index.into(), self.num_shards);
714 (shard_id == self.shard_id).then_some(offset)
715 }
716
717 #[inline]
718 pub fn get_mut(&mut self, index: VID) -> Option<&mut NodeStore> {
719 self.resolve(index).map(|offset| &mut self.shard[offset])
720 }
721
722 #[inline]
723 pub fn get_mut_entry(&mut self, index: VID) -> Option<EntryMut<'_, &mut S>> {
724 self.resolve(index).map(|offset| EntryMut {
725 i: offset,
726 guard: &mut self.shard,
727 _pd: PhantomData,
728 })
729 }
730
731 #[inline]
732 pub fn get(&self, index: VID) -> Option<&NodeStore> {
733 self.resolve(index).map(|offset| &self.shard[offset])
734 }
735
736 #[inline]
737 pub fn t_prop_log_mut(&mut self) -> &mut TColumns {
738 &mut self.shard.t_props_log
739 }
740
741 pub fn set(&mut self, vid: VID, gid: GidRef) -> Option<EntryMut<'_, &mut S>> {
742 self.resolve(vid).map(|offset| {
743 if offset >= self.shard.len() {
744 self.shard.resize_with(offset + 1, NodeStore::default);
745 self.global_len
746 .fetch_max(vid.index() + 1, Ordering::Relaxed);
747 }
748 self.shard[offset] = NodeStore::resolved(gid.to_owned(), vid);
749
750 EntryMut {
751 i: offset,
752 guard: &mut self.shard,
753 _pd: PhantomData,
754 }
755 })
756 }
757
758 pub fn shard_id(&self) -> usize {
759 self.shard_id
760 }
761
762 fn resize(&mut self, new_global_len: usize) {
763 let mut new_len = new_global_len / self.num_shards;
764 if self.shard_id < new_global_len % self.num_shards {
765 new_len += 1;
766 }
767 if new_len > self.shard.len() {
768 self.shard.resize_with(new_len, Default::default);
769 self.global_len.fetch_max(new_global_len, Ordering::Relaxed);
770 }
771 }
772}
773
774impl<'a> WriteLockedNodes<'a> {
775 pub fn par_iter_mut(
776 &mut self,
777 ) -> impl IndexedParallelIterator<Item = NodeShardWriter<&mut NodeSlot>> + '_ {
778 let num_shards = self.guards.len();
779 let global_len = self.global_len;
780 let shards: Vec<&mut NodeSlot> = self
781 .guards
782 .iter_mut()
783 .map(|guard| guard.deref_mut())
784 .collect();
785 shards
786 .into_par_iter()
787 .enumerate()
788 .map(move |(shard_id, shard)| NodeShardWriter {
789 shard,
790 shard_id,
791 num_shards,
792 global_len,
793 })
794 }
795
796 pub fn into_par_iter_mut(
797 self,
798 ) -> impl IndexedParallelIterator<Item = NodeShardWriter<'a, RwLockWriteGuard<'a, NodeSlot>>> + 'a
799 {
800 let num_shards = self.guards.len();
801 let global_len = self.global_len;
802 self.guards
803 .into_par_iter()
804 .enumerate()
805 .map(move |(shard_id, shard)| NodeShardWriter {
806 shard,
807 shard_id,
808 num_shards,
809 global_len,
810 })
811 }
812
813 pub fn resize(&mut self, new_len: usize) {
814 self.par_iter_mut()
815 .for_each(|mut shard| shard.resize(new_len))
816 }
817
818 pub fn num_shards(&self) -> usize {
819 self.guards.len()
820 }
821}
822
823#[derive(Debug)]
824pub struct NodeEntry<'a> {
825 offset: usize,
826 guard: RwLockReadGuard<'a, NodeSlot>,
827}
828
829impl NodeEntry<'_> {
830 #[inline]
831 pub fn as_ref(&self) -> NodePtr<'_> {
832 NodePtr::new(&self.guard[self.offset], &self.guard.t_props_log)
833 }
834}
835
836pub enum PairEntryMut<'a> {
837 Same {
838 i: usize,
839 j: usize,
840 guard: parking_lot::RwLockWriteGuard<'a, NodeSlot>,
841 },
842 Different {
843 i: usize,
844 j: usize,
845 guard1: parking_lot::RwLockWriteGuard<'a, NodeSlot>,
846 guard2: parking_lot::RwLockWriteGuard<'a, NodeSlot>,
847 },
848}
849
850impl<'a> PairEntryMut<'a> {
851 pub(crate) fn get_i(&self) -> &NodeStore {
852 match self {
853 PairEntryMut::Same { i, guard, .. } => &guard[*i],
854 PairEntryMut::Different { i, guard1, .. } => &guard1[*i],
855 }
856 }
857 pub(crate) fn get_mut_i(&mut self) -> &mut NodeStore {
858 match self {
859 PairEntryMut::Same { i, guard, .. } => &mut guard[*i],
860 PairEntryMut::Different { i, guard1, .. } => &mut guard1[*i],
861 }
862 }
863
864 pub(crate) fn get_j(&self) -> &NodeStore {
865 match self {
866 PairEntryMut::Same { j, guard, .. } => &guard[*j],
867 PairEntryMut::Different { j, guard2, .. } => &guard2[*j],
868 }
869 }
870
871 pub(crate) fn get_mut_j(&mut self) -> &mut NodeStore {
872 match self {
873 PairEntryMut::Same { j, guard, .. } => &mut guard[*j],
874 PairEntryMut::Different { j, guard2, .. } => &mut guard2[*j],
875 }
876 }
877}
878
879pub struct EntryMut<'a, NS: 'a> {
880 i: usize,
881 guard: NS,
882 _pd: PhantomData<&'a ()>,
883}
884
885impl<'a, NS> EntryMut<'a, NS> {
886 pub fn to_mut(&mut self) -> EntryMut<'a, &mut NS> {
887 EntryMut {
888 i: self.i,
889 guard: &mut self.guard,
890 _pd: self._pd,
891 }
892 }
893}
894
895impl<'a, NS: DerefMut<Target = NodeSlot>> AsMut<NodeStore> for EntryMut<'a, NS> {
896 fn as_mut(&mut self) -> &mut NodeStore {
897 let slots = self.guard.deref_mut();
898 &mut slots[self.i]
899 }
900}
901
902impl<'a, NS: DerefMut<Target = NodeSlot> + 'a> EntryMut<'a, &'a mut NS> {
903 pub fn node_store_mut(&mut self) -> &mut NodeStore {
904 &mut self.guard[self.i]
905 }
906
907 pub fn t_props_log_mut(&mut self) -> &mut TColumns {
908 &mut self.guard.t_props_log
909 }
910}
911
912#[cfg(test)]
913mod test {
914 use super::{NodeStorage, TColumns};
915 use crate::entities::nodes::node_store::NodeStore;
916 use proptest::{arbitrary::any, prop_assert_eq, proptest};
917 use raphtory_api::core::entities::{properties::prop::Prop, GID, VID};
918 use rayon::prelude::*;
919 use std::borrow::Cow;
920
921 #[test]
922 fn tcolumns_append_1() {
923 let mut t_cols = TColumns::default();
924
925 t_cols.push([(1, Prop::U64(1))]).unwrap();
926
927 let col0 = t_cols.get(0).unwrap();
928 let col1 = t_cols.get(1).unwrap();
929
930 assert_eq!(col0.len(), 1);
931 assert_eq!(col1.len(), 1);
932 }
933
934 #[test]
935 fn tcolumns_append_3_rows() {
936 let mut t_cols = TColumns::default();
937
938 t_cols
939 .push([(1, Prop::U64(1)), (0, Prop::Str("a".into()))])
940 .unwrap();
941 t_cols
942 .push([(0, Prop::Str("c".into())), (2, Prop::I64(9))])
943 .unwrap();
944 t_cols
945 .push([(1, Prop::U64(1)), (3, Prop::Str("c".into()))])
946 .unwrap();
947
948 assert_eq!(t_cols.len(), 3);
949
950 for col_id in 0..4 {
951 let col = t_cols.get(col_id).unwrap();
952 assert_eq!(col.len(), 3);
953 }
954
955 let col0 = (0..3)
956 .map(|row| t_cols.get(0).and_then(|col| col.get(row)))
957 .collect::<Vec<_>>();
958 assert_eq!(
959 col0,
960 vec![
961 Some(Prop::Str("a".into())),
962 Some(Prop::Str("c".into())),
963 None
964 ]
965 );
966
967 let col1 = (0..3)
968 .map(|row| t_cols.get(1).and_then(|col| col.get(row)))
969 .collect::<Vec<_>>();
970 assert_eq!(col1, vec![Some(Prop::U64(1)), None, Some(Prop::U64(1))]);
971
972 let col2 = (0..3)
973 .map(|row| t_cols.get(2).and_then(|col| col.get(row)))
974 .collect::<Vec<_>>();
975 assert_eq!(col2, vec![None, Some(Prop::I64(9)), None]);
976
977 let col3 = (0..3)
978 .map(|row| t_cols.get(3).and_then(|col| col.get(row)))
979 .collect::<Vec<_>>();
980 assert_eq!(col3, vec![None, None, Some(Prop::Str("c".into()))]);
981 }
982
983 #[test]
984 fn tcolumns_append_2_columns_12_items() {
985 let mut t_cols = TColumns::default();
986
987 for value in 0..12 {
988 if value % 2 == 0 {
989 t_cols
990 .push([
991 (1, Prop::U64(value)),
992 (0, Prop::Str(value.to_string().into())),
993 ])
994 .unwrap();
995 } else {
996 t_cols.push([(1, Prop::U64(value))]).unwrap();
997 }
998 }
999
1000 assert_eq!(t_cols.len(), 12);
1001
1002 let col0 = (0..12)
1003 .map(|row| t_cols.get(0).and_then(|col| col.get(row)))
1004 .collect::<Vec<_>>();
1005 assert_eq!(
1006 col0,
1007 vec![
1008 Some(Prop::Str("0".into())),
1009 None,
1010 Some(Prop::Str("2".into())),
1011 None,
1012 Some(Prop::Str("4".into())),
1013 None,
1014 Some(Prop::Str("6".into())),
1015 None,
1016 Some(Prop::Str("8".into())),
1017 None,
1018 Some(Prop::Str("10".into())),
1019 None
1020 ]
1021 );
1022
1023 let col1 = (0..12)
1024 .map(|row| t_cols.get(1).and_then(|col| col.get(row)))
1025 .collect::<Vec<_>>();
1026 assert_eq!(
1027 col1,
1028 vec![
1029 Some(Prop::U64(0)),
1030 Some(Prop::U64(1)),
1031 Some(Prop::U64(2)),
1032 Some(Prop::U64(3)),
1033 Some(Prop::U64(4)),
1034 Some(Prop::U64(5)),
1035 Some(Prop::U64(6)),
1036 Some(Prop::U64(7)),
1037 Some(Prop::U64(8)),
1038 Some(Prop::U64(9)),
1039 Some(Prop::U64(10)),
1040 Some(Prop::U64(11))
1041 ]
1042 );
1043 }
1044
1045 #[test]
1046 fn add_5_values_to_storage() {
1047 let storage = NodeStorage::new(2);
1048
1049 for i in 0..5 {
1050 storage.push(NodeStore::empty(i.into())).init();
1051 }
1052
1053 assert_eq!(storage.len(), 5);
1054
1055 for i in 0..5 {
1056 let entry = storage.entry(VID(i));
1057 assert_eq!(entry.as_ref().node().vid, VID(i));
1058 }
1059
1060 let items = storage.read_lock();
1061
1062 let actual = items
1063 .iter()
1064 .map(|s| s.node().vid.index())
1065 .collect::<Vec<_>>();
1066
1067 assert_eq!(actual, vec![0, 2, 4, 1, 3]);
1068 }
1069
1070 #[test]
1071 fn test_index_correctness() {
1072 let storage = NodeStorage::new(2);
1073
1074 for i in 0..5 {
1075 storage.push(NodeStore::empty(i.into())).init();
1076 }
1077 let locked = storage.read_lock();
1078 let actual: Vec<_> = (0..5)
1079 .map(|i| (i, locked.get(VID(i)).global_id.to_str()))
1080 .collect();
1081
1082 assert_eq!(
1083 actual,
1084 vec![
1085 (0usize, Cow::Borrowed("0")),
1086 (1, "1".into()),
1087 (2, "2".into()),
1088 (3, "3".into()),
1089 (4, "4".into())
1090 ]
1091 );
1092 }
1093
1094 #[test]
1095 fn test_entry() {
1096 let storage = NodeStorage::new(2);
1097
1098 for i in 0..5 {
1099 storage.push(NodeStore::empty(i.into())).init();
1100 }
1101
1102 for i in 0..5 {
1103 let entry = storage.entry(VID(i));
1104 assert_eq!(*entry.as_ref().node().global_id.to_str(), i.to_string());
1105 }
1106 }
1107
1108 #[test]
1109 fn concurrent_push() {
1110 proptest!(|(v in any::<Vec<u64>>())| {
1111 let storage = NodeStorage::new(16);
1112 let mut expected = v
1113 .into_par_iter()
1114 .map(|v| {
1115 storage.push(NodeStore::empty(GID::U64(v))).init();
1116 v
1117 })
1118 .collect::<Vec<_>>();
1119
1120 let locked = storage.read_lock();
1121 let mut actual: Vec<_> = locked
1122 .iter()
1123 .map(|n| n.node().global_id.as_u64().unwrap())
1124 .collect();
1125
1126 actual.sort();
1127 expected.sort();
1128 prop_assert_eq!(actual, expected)
1129 })
1130 }
1131}