1use crate::{
2 entities::{
3 nodes::node_store::NodeStore,
4 properties::{props::TPropError, tprop::IllegalPropType},
5 },
6 loop_lock_write,
7 storage::lazy_vec::IllegalSet,
8};
9use bigdecimal::BigDecimal;
10use itertools::Itertools;
11use lazy_vec::LazyVec;
12use lock_api;
13use node_entry::NodePtr;
14use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
15#[cfg(feature = "arrow")]
16use raphtory_api::core::entities::properties::prop::PropArray;
17use raphtory_api::core::{
18 entities::{
19 properties::prop::{Prop, PropType},
20 GidRef, VID,
21 },
22 storage::arc_str::ArcStr,
23};
24use rayon::prelude::*;
25use rustc_hash::FxHashMap;
26use serde::{Deserialize, Serialize};
27use std::{
28 collections::HashMap,
29 fmt::{Debug, Formatter},
30 marker::PhantomData,
31 ops::{Deref, DerefMut, Index, IndexMut},
32 sync::{
33 atomic::{AtomicUsize, Ordering},
34 Arc,
35 },
36};
37use thiserror::Error;
38
39pub mod lazy_vec;
40pub mod locked_view;
41pub mod node_entry;
42pub mod raw_edges;
43pub mod timeindex;
44
45type ArcRwLockReadGuard<T> = lock_api::ArcRwLockReadGuard<parking_lot::RawRwLock, T>;
46#[must_use]
47pub struct UninitialisedEntry<'a, T, TS> {
48 offset: usize,
49 guard: RwLockWriteGuard<'a, TS>,
50 value: T,
51}
52
53impl<'a, T: Default, TS: DerefMut<Target = Vec<T>>> UninitialisedEntry<'a, T, TS> {
54 pub fn init(mut self) {
55 if self.offset >= self.guard.len() {
56 self.guard.resize_with(self.offset + 1, Default::default);
57 }
58 self.guard[self.offset] = self.value;
59 }
60 pub fn value(&self) -> &T {
61 &self.value
62 }
63}
64
65#[inline]
66fn resolve(index: usize, num_buckets: usize) -> (usize, usize) {
67 let bucket = index % num_buckets;
68 let offset = index / num_buckets;
69 (bucket, offset)
70}
71
72#[derive(Debug, Serialize, Deserialize, Clone)]
73pub struct NodeVec {
74 data: Arc<RwLock<NodeSlot>>,
75}
76
77#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
78pub struct NodeSlot {
79 nodes: Vec<NodeStore>,
80 t_props_log: TColumns, }
82
83#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
84pub struct TColumns {
85 t_props_log: Vec<TPropColumn>,
86 num_rows: usize,
87}
88
89impl TColumns {
90 pub fn push(
91 &mut self,
92 row: impl IntoIterator<Item = (usize, Prop)>,
93 ) -> Result<Option<usize>, TPropError> {
94 let id = self.num_rows;
95 let mut has_props = false;
96
97 for (prop_id, prop) in row {
98 match self.t_props_log.get_mut(prop_id) {
99 Some(col) => col.push(prop)?,
100 None => {
101 let col: TPropColumn = TPropColumn::new(self.num_rows, prop);
102 self.t_props_log
103 .resize_with(prop_id + 1, || TPropColumn::Empty(id));
104 self.t_props_log[prop_id] = col;
105 }
106 }
107 has_props = true;
108 }
109
110 if has_props {
111 self.num_rows += 1;
112 for col in self.t_props_log.iter_mut() {
113 col.grow(self.num_rows);
114 }
115 Ok(Some(id))
116 } else {
117 Ok(None)
118 }
119 }
120
121 pub(crate) fn get(&self, prop_id: usize) -> Option<&TPropColumn> {
122 self.t_props_log.get(prop_id)
123 }
124
125 pub fn len(&self) -> usize {
126 self.num_rows
127 }
128
129 pub fn is_empty(&self) -> bool {
130 self.num_rows == 0
131 }
132
133 pub fn iter(&self) -> impl Iterator<Item = &TPropColumn> {
134 self.t_props_log.iter()
135 }
136}
137
138#[derive(Debug, Serialize, Deserialize, PartialEq)]
139pub enum TPropColumn {
140 Empty(usize),
141 Bool(LazyVec<bool>),
142 U8(LazyVec<u8>),
143 U16(LazyVec<u16>),
144 U32(LazyVec<u32>),
145 U64(LazyVec<u64>),
146 I32(LazyVec<i32>),
147 I64(LazyVec<i64>),
148 F32(LazyVec<f32>),
149 F64(LazyVec<f64>),
150 Str(LazyVec<ArcStr>),
151 #[cfg(feature = "arrow")]
152 Array(LazyVec<PropArray>),
153 List(LazyVec<Arc<Vec<Prop>>>),
154 Map(LazyVec<Arc<FxHashMap<ArcStr, Prop>>>),
155 NDTime(LazyVec<chrono::NaiveDateTime>),
156 DTime(LazyVec<chrono::DateTime<chrono::Utc>>),
157 Decimal(LazyVec<BigDecimal>),
158}
159
160#[derive(Error, Debug)]
161pub enum TPropColumnError {
162 #[error(transparent)]
163 IllegalSetBool(#[from] IllegalSet<bool>),
164 #[error(transparent)]
165 IllegalSetU8(#[from] IllegalSet<u8>),
166 #[error(transparent)]
167 IllegalSetU16(#[from] IllegalSet<u16>),
168 #[error(transparent)]
169 IllegalSetU32(#[from] IllegalSet<u32>),
170 #[error(transparent)]
171 IllegalSetU64(#[from] IllegalSet<u64>),
172 #[error(transparent)]
173 IllegalSetI32(#[from] IllegalSet<i32>),
174 #[error(transparent)]
175 IllegalSetI64(#[from] IllegalSet<i64>),
176 #[error(transparent)]
177 IllegalSetF32(#[from] IllegalSet<f32>),
178 #[error(transparent)]
179 IllegalSetF64(#[from] IllegalSet<f64>),
180 #[error(transparent)]
181 IllegalSetStr(#[from] IllegalSet<ArcStr>),
182 #[cfg(feature = "arrow")]
183 #[error(transparent)]
184 IllegalSetArray(#[from] IllegalSet<PropArray>),
185 #[error(transparent)]
186 IllegalSetList(#[from] IllegalSet<Arc<Vec<Prop>>>),
187 #[error(transparent)]
188 IllegalSetMap(#[from] IllegalSet<Arc<FxHashMap<ArcStr, Prop>>>),
189 #[error(transparent)]
190 IllegalSetNDTime(#[from] IllegalSet<chrono::NaiveDateTime>),
191 #[error(transparent)]
192 IllegalSetDTime(#[from] IllegalSet<chrono::DateTime<chrono::Utc>>),
193 #[error(transparent)]
194 Decimal(#[from] IllegalSet<BigDecimal>),
195 #[error(transparent)]
196 IllegalPropType(#[from] IllegalPropType),
197}
198
199impl Default for TPropColumn {
200 fn default() -> Self {
201 TPropColumn::Empty(0)
202 }
203}
204
205impl TPropColumn {
206 pub(crate) fn new(idx: usize, prop: Prop) -> Self {
207 let mut col = TPropColumn::default();
208 col.set(idx, prop).unwrap();
209 col
210 }
211
212 pub(crate) fn dtype(&self) -> PropType {
213 match self {
214 TPropColumn::Empty(_) => PropType::Empty,
215 TPropColumn::Bool(_) => PropType::Bool,
216 TPropColumn::U8(_) => PropType::U8,
217 TPropColumn::U16(_) => PropType::U16,
218 TPropColumn::U32(_) => PropType::U32,
219 TPropColumn::U64(_) => PropType::U64,
220 TPropColumn::I32(_) => PropType::I32,
221 TPropColumn::I64(_) => PropType::I64,
222 TPropColumn::F32(_) => PropType::F32,
223 TPropColumn::F64(_) => PropType::F64,
224 TPropColumn::Str(_) => PropType::Str,
225 #[cfg(feature = "arrow")]
226 TPropColumn::Array(_) => PropType::Array(Box::new(PropType::Empty)),
227 TPropColumn::List(_) => PropType::List(Box::new(PropType::Empty)),
228 TPropColumn::Map(_) => PropType::Map(HashMap::new().into()),
229 TPropColumn::NDTime(_) => PropType::NDTime,
230 TPropColumn::DTime(_) => PropType::DTime,
231 TPropColumn::Decimal(_) => PropType::Decimal { scale: 0 },
232 }
233 }
234
235 pub(crate) fn grow(&mut self, new_len: usize) {
236 while self.len() < new_len {
237 self.push_null();
238 }
239 }
240
241 pub(crate) fn set(&mut self, index: usize, prop: Prop) -> Result<(), TPropColumnError> {
242 self.init_empty_col(&prop);
243 match (self, prop) {
244 (TPropColumn::Bool(col), Prop::Bool(v)) => col.set(index, v)?,
245 (TPropColumn::I64(col), Prop::I64(v)) => col.set(index, v)?,
246 (TPropColumn::U32(col), Prop::U32(v)) => col.set(index, v)?,
247 (TPropColumn::U64(col), Prop::U64(v)) => col.set(index, v)?,
248 (TPropColumn::F32(col), Prop::F32(v)) => col.set(index, v)?,
249 (TPropColumn::F64(col), Prop::F64(v)) => col.set(index, v)?,
250 (TPropColumn::Str(col), Prop::Str(v)) => col.set(index, v)?,
251 #[cfg(feature = "arrow")]
252 (TPropColumn::Array(col), Prop::Array(v)) => col.set(index, v)?,
253 (TPropColumn::U8(col), Prop::U8(v)) => col.set(index, v)?,
254 (TPropColumn::U16(col), Prop::U16(v)) => col.set(index, v)?,
255 (TPropColumn::I32(col), Prop::I32(v)) => col.set(index, v)?,
256 (TPropColumn::List(col), Prop::List(v)) => col.set(index, v)?,
257 (TPropColumn::Map(col), Prop::Map(v)) => col.set(index, v)?,
258 (TPropColumn::NDTime(col), Prop::NDTime(v)) => col.set(index, v)?,
259 (TPropColumn::DTime(col), Prop::DTime(v)) => col.set(index, v)?,
260 (TPropColumn::Decimal(col), Prop::Decimal(v)) => col.set(index, v)?,
261 (col, prop) => {
262 Err(IllegalPropType {
263 expected: col.dtype(),
264 actual: prop.dtype(),
265 })?;
266 }
267 }
268 Ok(())
269 }
270
271 pub(crate) fn push(&mut self, prop: Prop) -> Result<(), IllegalPropType> {
272 self.init_empty_col(&prop);
273 match (self, prop) {
274 (TPropColumn::Bool(col), Prop::Bool(v)) => col.push(Some(v)),
275 (TPropColumn::U8(col), Prop::U8(v)) => col.push(Some(v)),
276 (TPropColumn::I64(col), Prop::I64(v)) => col.push(Some(v)),
277 (TPropColumn::U32(col), Prop::U32(v)) => col.push(Some(v)),
278 (TPropColumn::U64(col), Prop::U64(v)) => col.push(Some(v)),
279 (TPropColumn::F32(col), Prop::F32(v)) => col.push(Some(v)),
280 (TPropColumn::F64(col), Prop::F64(v)) => col.push(Some(v)),
281 (TPropColumn::Str(col), Prop::Str(v)) => col.push(Some(v)),
282 #[cfg(feature = "arrow")]
283 (TPropColumn::Array(col), Prop::Array(v)) => col.push(Some(v)),
284 (TPropColumn::U16(col), Prop::U16(v)) => col.push(Some(v)),
285 (TPropColumn::I32(col), Prop::I32(v)) => col.push(Some(v)),
286 (TPropColumn::List(col), Prop::List(v)) => col.push(Some(v)),
287 (TPropColumn::Map(col), Prop::Map(v)) => col.push(Some(v)),
288 (TPropColumn::NDTime(col), Prop::NDTime(v)) => col.push(Some(v)),
289 (TPropColumn::DTime(col), Prop::DTime(v)) => col.push(Some(v)),
290 (TPropColumn::Decimal(col), Prop::Decimal(v)) => col.push(Some(v)),
291 (col, prop) => {
292 return Err(IllegalPropType {
293 expected: col.dtype(),
294 actual: prop.dtype(),
295 })
296 }
297 }
298 Ok(())
299 }
300
301 fn init_empty_col(&mut self, prop: &Prop) {
302 if let TPropColumn::Empty(len) = self {
303 match prop {
304 Prop::Bool(_) => *self = TPropColumn::Bool(LazyVec::with_len(*len)),
305 Prop::I64(_) => *self = TPropColumn::I64(LazyVec::with_len(*len)),
306 Prop::U32(_) => *self = TPropColumn::U32(LazyVec::with_len(*len)),
307 Prop::U64(_) => *self = TPropColumn::U64(LazyVec::with_len(*len)),
308 Prop::F32(_) => *self = TPropColumn::F32(LazyVec::with_len(*len)),
309 Prop::F64(_) => *self = TPropColumn::F64(LazyVec::with_len(*len)),
310 Prop::Str(_) => *self = TPropColumn::Str(LazyVec::with_len(*len)),
311 #[cfg(feature = "arrow")]
312 Prop::Array(_) => *self = TPropColumn::Array(LazyVec::with_len(*len)),
313 Prop::U8(_) => *self = TPropColumn::U8(LazyVec::with_len(*len)),
314 Prop::U16(_) => *self = TPropColumn::U16(LazyVec::with_len(*len)),
315 Prop::I32(_) => *self = TPropColumn::I32(LazyVec::with_len(*len)),
316 Prop::List(_) => *self = TPropColumn::List(LazyVec::with_len(*len)),
317 Prop::Map(_) => *self = TPropColumn::Map(LazyVec::with_len(*len)),
318 Prop::NDTime(_) => *self = TPropColumn::NDTime(LazyVec::with_len(*len)),
319 Prop::DTime(_) => *self = TPropColumn::DTime(LazyVec::with_len(*len)),
320 Prop::Decimal(_) => *self = TPropColumn::Decimal(LazyVec::with_len(*len)),
321 }
322 }
323 }
324
325 fn is_empty(&self) -> bool {
326 matches!(self, TPropColumn::Empty(_))
327 }
328
329 pub(crate) fn push_null(&mut self) {
330 match self {
331 TPropColumn::Bool(col) => col.push(None),
332 TPropColumn::I64(col) => col.push(None),
333 TPropColumn::U32(col) => col.push(None),
334 TPropColumn::U64(col) => col.push(None),
335 TPropColumn::F32(col) => col.push(None),
336 TPropColumn::F64(col) => col.push(None),
337 TPropColumn::Str(col) => col.push(None),
338 #[cfg(feature = "arrow")]
339 TPropColumn::Array(col) => col.push(None),
340 TPropColumn::U8(col) => col.push(None),
341 TPropColumn::U16(col) => col.push(None),
342 TPropColumn::I32(col) => col.push(None),
343 TPropColumn::List(col) => col.push(None),
344 TPropColumn::Map(col) => col.push(None),
345 TPropColumn::NDTime(col) => col.push(None),
346 TPropColumn::DTime(col) => col.push(None),
347 TPropColumn::Decimal(col) => col.push(None),
348 TPropColumn::Empty(count) => {
349 *count += 1;
350 }
351 }
352 }
353
354 pub fn get(&self, index: usize) -> Option<Prop> {
355 match self {
356 TPropColumn::Bool(col) => col.get_opt(index).map(|prop| (*prop).into()),
357 TPropColumn::I64(col) => col.get_opt(index).map(|prop| (*prop).into()),
358 TPropColumn::U32(col) => col.get_opt(index).map(|prop| (*prop).into()),
359 TPropColumn::U64(col) => col.get_opt(index).map(|prop| (*prop).into()),
360 TPropColumn::F32(col) => col.get_opt(index).map(|prop| (*prop).into()),
361 TPropColumn::F64(col) => col.get_opt(index).map(|prop| (*prop).into()),
362 TPropColumn::Str(col) => col.get_opt(index).map(|prop| prop.into()),
363 #[cfg(feature = "arrow")]
364 TPropColumn::Array(col) => col.get_opt(index).map(|prop| Prop::Array(prop.clone())),
365 TPropColumn::U8(col) => col.get_opt(index).map(|prop| (*prop).into()),
366 TPropColumn::U16(col) => col.get_opt(index).map(|prop| (*prop).into()),
367 TPropColumn::I32(col) => col.get_opt(index).map(|prop| (*prop).into()),
368 TPropColumn::List(col) => col.get_opt(index).map(|prop| Prop::List(prop.clone())),
369 TPropColumn::Map(col) => col.get_opt(index).map(|prop| Prop::Map(prop.clone())),
370 TPropColumn::NDTime(col) => col.get_opt(index).map(|prop| Prop::NDTime(*prop)),
371 TPropColumn::DTime(col) => col.get_opt(index).map(|prop| Prop::DTime(*prop)),
372 TPropColumn::Decimal(col) => col.get_opt(index).map(|prop| Prop::Decimal(prop.clone())),
373 TPropColumn::Empty(_) => None,
374 }
375 }
376
377 pub(crate) fn len(&self) -> usize {
378 match self {
379 TPropColumn::Bool(col) => col.len(),
380 TPropColumn::I64(col) => col.len(),
381 TPropColumn::U32(col) => col.len(),
382 TPropColumn::U64(col) => col.len(),
383 TPropColumn::F32(col) => col.len(),
384 TPropColumn::F64(col) => col.len(),
385 TPropColumn::Str(col) => col.len(),
386 #[cfg(feature = "arrow")]
387 TPropColumn::Array(col) => col.len(),
388 TPropColumn::U8(col) => col.len(),
389 TPropColumn::U16(col) => col.len(),
390 TPropColumn::I32(col) => col.len(),
391 TPropColumn::List(col) => col.len(),
392 TPropColumn::Map(col) => col.len(),
393 TPropColumn::NDTime(col) => col.len(),
394 TPropColumn::DTime(col) => col.len(),
395 TPropColumn::Decimal(col) => col.len(),
396 TPropColumn::Empty(count) => *count,
397 }
398 }
399}
400
401impl NodeSlot {
402 pub fn t_props_log(&self) -> &TColumns {
403 &self.t_props_log
404 }
405
406 pub fn t_props_log_mut(&mut self) -> &mut TColumns {
407 &mut self.t_props_log
408 }
409
410 pub fn iter(&self) -> impl Iterator<Item = NodePtr<'_>> {
411 self.nodes
412 .iter()
413 .filter(|v| v.is_initialised())
414 .map(|ns| NodePtr::new(ns, &self.t_props_log))
415 }
416
417 pub fn par_iter(&self) -> impl ParallelIterator<Item = NodePtr<'_>> {
418 self.nodes
419 .par_iter()
420 .filter(|v| v.is_initialised())
421 .map(|ns| NodePtr::new(ns, &self.t_props_log))
422 }
423}
424
425impl Index<usize> for NodeSlot {
426 type Output = NodeStore;
427
428 fn index(&self, index: usize) -> &Self::Output {
429 &self.nodes[index]
430 }
431}
432
433impl IndexMut<usize> for NodeSlot {
434 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
435 &mut self.nodes[index]
436 }
437}
438
439impl Deref for NodeSlot {
440 type Target = Vec<NodeStore>;
441
442 fn deref(&self) -> &Self::Target {
443 &self.nodes
444 }
445}
446
447impl DerefMut for NodeSlot {
448 fn deref_mut(&mut self) -> &mut Self::Target {
449 &mut self.nodes
450 }
451}
452
453impl PartialEq for NodeVec {
454 fn eq(&self, other: &Self) -> bool {
455 let a = self.data.read_recursive();
456 let b = other.data.read_recursive();
457 a.deref() == b.deref()
458 }
459}
460
461impl Default for NodeVec {
462 fn default() -> Self {
463 Self::new()
464 }
465}
466
467impl NodeVec {
468 pub fn new() -> Self {
469 Self {
470 data: Arc::new(RwLock::new(Default::default())),
471 }
472 }
473
474 #[inline]
475 pub fn read_arc_lock(&self) -> ArcRwLockReadGuard<NodeSlot> {
476 RwLock::read_arc_recursive(&self.data)
477 }
478
479 #[inline]
480 pub fn write(&self) -> impl DerefMut<Target = NodeSlot> + '_ {
481 loop_lock_write(&self.data)
482 }
483
484 #[inline]
485 pub fn read(&self) -> impl Deref<Target = NodeSlot> + '_ {
486 self.data.read_recursive()
487 }
488}
489
490#[derive(Serialize, Deserialize)]
491pub struct NodeStorage {
492 pub(crate) data: Box<[NodeVec]>,
493 len: AtomicUsize,
494}
495
496impl Debug for NodeStorage {
497 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
498 f.debug_struct("NodeStorage")
499 .field("len", &self.len())
500 .field("data", &self.read_lock().iter().collect_vec())
501 .finish()
502 }
503}
504
505impl PartialEq for NodeStorage {
506 fn eq(&self, other: &Self) -> bool {
507 self.data.eq(&other.data)
508 }
509}
510
511#[derive(Debug)]
512pub struct ReadLockedStorage {
513 pub(crate) locks: Vec<Arc<ArcRwLockReadGuard<NodeSlot>>>,
514 len: usize,
515}
516
517impl ReadLockedStorage {
518 fn resolve(&self, index: VID) -> (usize, usize) {
519 let index: usize = index.into();
520 let n = self.locks.len();
521 let bucket = index % n;
522 let offset = index / n;
523 (bucket, offset)
524 }
525
526 pub fn len(&self) -> usize {
527 self.len
528 }
529
530 pub fn is_empty(&self) -> bool {
531 self.len == 0
532 }
533
534 #[cfg(test)]
535 pub fn get(&self, index: VID) -> &NodeStore {
536 let (bucket, offset) = self.resolve(index);
537 let bucket = &self.locks[bucket];
538 &bucket[offset]
539 }
540
541 #[inline]
542 pub fn get_entry(&self, index: VID) -> NodePtr<'_> {
543 let (bucket, offset) = self.resolve(index);
544 let bucket = &self.locks[bucket];
545 NodePtr::new(&bucket[offset], &bucket.t_props_log)
546 }
547
548 #[inline]
549 pub fn try_get_entry(&self, index: VID) -> Option<NodePtr<'_>> {
550 let (bucket, offset) = self.resolve(index);
551 let bucket = self.locks.get(bucket)?;
552 let node = bucket.get(offset)?;
553 if node.is_initialised() {
554 Some(NodePtr::new(node, &bucket.t_props_log))
555 } else {
556 None
557 }
558 }
559
560 pub fn iter(&self) -> impl Iterator<Item = NodePtr<'_>> + '_ {
561 self.locks.iter().flat_map(|v| v.iter())
562 }
563
564 pub fn par_iter(&self) -> impl ParallelIterator<Item = NodePtr<'_>> + '_ {
565 self.locks.par_iter().flat_map(|v| v.par_iter())
566 }
567}
568
569impl NodeStorage {
570 pub fn count_with_filter<F: Fn(NodePtr<'_>) -> bool + Send + Sync>(&self, f: F) -> usize {
571 self.read_lock().par_iter().filter(|x| f(*x)).count()
572 }
573}
574
575impl NodeStorage {
576 #[inline]
577 fn resolve(&self, index: usize) -> (usize, usize) {
578 resolve(index, self.data.len())
579 }
580
581 #[inline]
582 pub fn read_lock(&self) -> ReadLockedStorage {
583 let guards = self
584 .data
585 .iter()
586 .map(|v| Arc::new(v.read_arc_lock()))
587 .collect();
588 ReadLockedStorage {
589 locks: guards,
590 len: self.len(),
591 }
592 }
593
594 pub fn write_lock(&self) -> WriteLockedNodes<'_> {
595 WriteLockedNodes {
596 guards: self.data.iter().map(|lock| lock.data.write()).collect(),
597 global_len: &self.len,
598 }
599 }
600
601 pub fn new(n_locks: usize) -> Self {
602 let data: Box<[NodeVec]> = (0..n_locks)
603 .map(|_| NodeVec::new())
604 .collect::<Vec<_>>()
605 .into();
606
607 Self {
608 data,
609 len: AtomicUsize::new(0),
610 }
611 }
612
613 pub fn push(&self, mut value: NodeStore) -> UninitialisedEntry<'_, NodeStore, NodeSlot> {
614 let index = self.len.fetch_add(1, Ordering::Relaxed);
615 value.vid = VID(index);
616 let (bucket, offset) = self.resolve(index);
617 let guard = loop_lock_write(&self.data[bucket].data);
618 UninitialisedEntry {
619 offset,
620 guard,
621 value,
622 }
623 }
624
625 pub fn set(&self, value: NodeStore) {
626 let VID(index) = value.vid;
627 self.len.fetch_max(index + 1, Ordering::Relaxed);
628 let (bucket, offset) = self.resolve(index);
629 let mut guard = loop_lock_write(&self.data[bucket].data);
630 if guard.len() <= offset {
631 guard.resize_with(offset + 1, NodeStore::default)
632 }
633 guard[offset] = value
634 }
635
636 #[inline]
637 pub fn entry(&self, index: VID) -> NodeEntry<'_> {
638 let index = index.into();
639 let (bucket, offset) = self.resolve(index);
640 let guard = self.data[bucket].data.read_recursive();
641 NodeEntry { offset, guard }
642 }
643
644 pub fn try_entry(&self, index: VID) -> Option<NodeEntry<'_>> {
646 let (bucket, offset) = self.resolve(index.index());
647 let guard = self.data.get(bucket)?.data.read_recursive();
648 if guard.get(offset)?.is_initialised() {
649 Some(NodeEntry { offset, guard })
650 } else {
651 None
652 }
653 }
654
655 pub fn entry_mut(&self, index: VID) -> EntryMut<'_, RwLockWriteGuard<'_, NodeSlot>> {
656 let index = index.into();
657 let (bucket, offset) = self.resolve(index);
658 let guard = loop_lock_write(&self.data[bucket].data);
659 EntryMut {
660 i: offset,
661 guard,
662 _pd: PhantomData,
663 }
664 }
665
666 pub fn prop_entry_mut(&self, index: VID) -> impl DerefMut<Target = TColumns> + '_ {
667 let index = index.into();
668 let (bucket, _) = self.resolve(index);
669 let lock = loop_lock_write(&self.data[bucket].data);
670 RwLockWriteGuard::map(lock, |data| &mut data.t_props_log)
671 }
672
673 #[deprecated(note = "use loop_pair_entry_mut instead")]
675 pub fn pair_entry_mut(&self, i: VID, j: VID) -> PairEntryMut<'_> {
676 let i = i.into();
677 let j = j.into();
678 let (bucket_i, offset_i) = self.resolve(i);
679 let (bucket_j, offset_j) = self.resolve(j);
680 if bucket_i < bucket_j {
682 let guard_i = self.data[bucket_i].data.write();
683 let guard_j = self.data[bucket_j].data.write();
684 PairEntryMut::Different {
685 i: offset_i,
686 j: offset_j,
687 guard1: guard_i,
688 guard2: guard_j,
689 }
690 } else if bucket_i > bucket_j {
691 let guard_j = self.data[bucket_j].data.write();
692 let guard_i = self.data[bucket_i].data.write();
693 PairEntryMut::Different {
694 i: offset_i,
695 j: offset_j,
696 guard1: guard_i,
697 guard2: guard_j,
698 }
699 } else {
700 PairEntryMut::Same {
701 i: offset_i,
702 j: offset_j,
703 guard: self.data[bucket_i].data.write(),
704 }
705 }
706 }
707
708 pub fn loop_pair_entry_mut(&self, i: VID, j: VID) -> PairEntryMut<'_> {
709 let i = i.into();
710 let j = j.into();
711 let (bucket_i, offset_i) = self.resolve(i);
712 let (bucket_j, offset_j) = self.resolve(j);
713 loop {
714 if bucket_i < bucket_j {
715 let guard_i = self.data[bucket_i].data.try_write();
716 let guard_j = self.data[bucket_j].data.try_write();
717 let maybe_guards =
718 guard_i
719 .zip(guard_j)
720 .map(|(guard_i, guard_j)| PairEntryMut::Different {
721 i: offset_i,
722 j: offset_j,
723 guard1: guard_i,
724 guard2: guard_j,
725 });
726 if let Some(guards) = maybe_guards {
727 return guards;
728 }
729 } else if bucket_i > bucket_j {
730 let guard_j = self.data[bucket_j].data.try_write();
731 let guard_i = self.data[bucket_i].data.try_write();
732 let maybe_guards =
733 guard_i
734 .zip(guard_j)
735 .map(|(guard_i, guard_j)| PairEntryMut::Different {
736 i: offset_i,
737 j: offset_j,
738 guard1: guard_i,
739 guard2: guard_j,
740 });
741 if let Some(guards) = maybe_guards {
742 return guards;
743 }
744 } else {
745 let maybe_guard = self.data[bucket_i].data.try_write();
746 if let Some(guard) = maybe_guard {
747 return PairEntryMut::Same {
748 i: offset_i,
749 j: offset_j,
750 guard,
751 };
752 }
753 }
754 }
755 }
756
757 #[inline]
758 pub fn len(&self) -> usize {
759 self.len.load(Ordering::SeqCst)
760 }
761
762 pub fn is_empty(&self) -> bool {
763 self.len() == 0
764 }
765
766 pub fn next_id(&self) -> VID {
767 VID(self.len.fetch_add(1, Ordering::Relaxed))
768 }
769}
770
771pub struct WriteLockedNodes<'a> {
772 guards: Vec<RwLockWriteGuard<'a, NodeSlot>>,
773 global_len: &'a AtomicUsize,
774}
775
776pub struct NodeShardWriter<'a, S> {
777 shard: S,
778 shard_id: usize,
779 num_shards: usize,
780 global_len: &'a AtomicUsize,
781}
782
783impl<'a, S> NodeShardWriter<'a, S>
784where
785 S: DerefMut<Target = NodeSlot>,
786{
787 #[inline]
788 fn resolve(&self, index: VID) -> Option<usize> {
789 let (shard_id, offset) = resolve(index.into(), self.num_shards);
790 (shard_id == self.shard_id).then_some(offset)
791 }
792
793 #[inline]
794 pub fn get_mut(&mut self, index: VID) -> Option<&mut NodeStore> {
795 self.resolve(index).map(|offset| &mut self.shard[offset])
796 }
797
798 #[inline]
799 pub fn get_mut_entry(&mut self, index: VID) -> Option<EntryMut<'_, &mut S>> {
800 self.resolve(index).map(|offset| EntryMut {
801 i: offset,
802 guard: &mut self.shard,
803 _pd: PhantomData,
804 })
805 }
806
807 #[inline]
808 pub fn get(&self, index: VID) -> Option<&NodeStore> {
809 self.resolve(index).map(|offset| &self.shard[offset])
810 }
811
812 #[inline]
813 pub fn t_prop_log_mut(&mut self) -> &mut TColumns {
814 &mut self.shard.t_props_log
815 }
816
817 pub fn set(&mut self, vid: VID, gid: GidRef) -> Option<EntryMut<'_, &mut S>> {
818 self.resolve(vid).map(|offset| {
819 if offset >= self.shard.len() {
820 self.shard.resize_with(offset + 1, NodeStore::default);
821 self.global_len
822 .fetch_max(vid.index() + 1, Ordering::Relaxed);
823 }
824 self.shard[offset] = NodeStore::resolved(gid.to_owned(), vid);
825
826 EntryMut {
827 i: offset,
828 guard: &mut self.shard,
829 _pd: PhantomData,
830 }
831 })
832 }
833
834 pub fn shard_id(&self) -> usize {
835 self.shard_id
836 }
837
838 fn resize(&mut self, new_global_len: usize) {
839 let mut new_len = new_global_len / self.num_shards;
840 if self.shard_id < new_global_len % self.num_shards {
841 new_len += 1;
842 }
843 if new_len > self.shard.len() {
844 self.shard.resize_with(new_len, Default::default);
845 self.global_len.fetch_max(new_global_len, Ordering::Relaxed);
846 }
847 }
848}
849
850impl<'a> WriteLockedNodes<'a> {
851 pub fn par_iter_mut(
852 &mut self,
853 ) -> impl IndexedParallelIterator<Item = NodeShardWriter<'_, &mut NodeSlot>> + '_ {
854 let num_shards = self.guards.len();
855 let global_len = self.global_len;
856 let shards: Vec<&mut NodeSlot> = self
857 .guards
858 .iter_mut()
859 .map(|guard| guard.deref_mut())
860 .collect();
861 shards
862 .into_par_iter()
863 .enumerate()
864 .map(move |(shard_id, shard)| NodeShardWriter {
865 shard,
866 shard_id,
867 num_shards,
868 global_len,
869 })
870 }
871
872 pub fn into_par_iter_mut(
873 self,
874 ) -> impl IndexedParallelIterator<Item = NodeShardWriter<'a, RwLockWriteGuard<'a, NodeSlot>>> + 'a
875 {
876 let num_shards = self.guards.len();
877 let global_len = self.global_len;
878 self.guards
879 .into_par_iter()
880 .enumerate()
881 .map(move |(shard_id, shard)| NodeShardWriter {
882 shard,
883 shard_id,
884 num_shards,
885 global_len,
886 })
887 }
888
889 pub fn resize(&mut self, new_len: usize) {
890 self.par_iter_mut()
891 .for_each(|mut shard| shard.resize(new_len))
892 }
893
894 pub fn num_shards(&self) -> usize {
895 self.guards.len()
896 }
897}
898
899#[derive(Debug)]
900pub struct NodeEntry<'a> {
901 offset: usize,
902 guard: RwLockReadGuard<'a, NodeSlot>,
903}
904
905impl NodeEntry<'_> {
906 #[inline]
907 pub fn as_ref(&self) -> NodePtr<'_> {
908 NodePtr::new(&self.guard[self.offset], &self.guard.t_props_log)
909 }
910}
911
912pub enum PairEntryMut<'a> {
913 Same {
914 i: usize,
915 j: usize,
916 guard: parking_lot::RwLockWriteGuard<'a, NodeSlot>,
917 },
918 Different {
919 i: usize,
920 j: usize,
921 guard1: parking_lot::RwLockWriteGuard<'a, NodeSlot>,
922 guard2: parking_lot::RwLockWriteGuard<'a, NodeSlot>,
923 },
924}
925
926impl<'a> PairEntryMut<'a> {
927 pub(crate) fn get_i(&self) -> &NodeStore {
928 match self {
929 PairEntryMut::Same { i, guard, .. } => &guard[*i],
930 PairEntryMut::Different { i, guard1, .. } => &guard1[*i],
931 }
932 }
933 pub(crate) fn get_mut_i(&mut self) -> &mut NodeStore {
934 match self {
935 PairEntryMut::Same { i, guard, .. } => &mut guard[*i],
936 PairEntryMut::Different { i, guard1, .. } => &mut guard1[*i],
937 }
938 }
939
940 pub(crate) fn get_j(&self) -> &NodeStore {
941 match self {
942 PairEntryMut::Same { j, guard, .. } => &guard[*j],
943 PairEntryMut::Different { j, guard2, .. } => &guard2[*j],
944 }
945 }
946
947 pub(crate) fn get_mut_j(&mut self) -> &mut NodeStore {
948 match self {
949 PairEntryMut::Same { j, guard, .. } => &mut guard[*j],
950 PairEntryMut::Different { j, guard2, .. } => &mut guard2[*j],
951 }
952 }
953}
954
955pub struct EntryMut<'a, NS: 'a> {
956 i: usize,
957 guard: NS,
958 _pd: PhantomData<&'a ()>,
959}
960
961impl<'a, NS> EntryMut<'a, NS> {
962 pub fn to_mut(&mut self) -> EntryMut<'a, &mut NS> {
963 EntryMut {
964 i: self.i,
965 guard: &mut self.guard,
966 _pd: self._pd,
967 }
968 }
969}
970
971impl<'a, NS: DerefMut<Target = NodeSlot>> AsMut<NodeStore> for EntryMut<'a, NS> {
972 fn as_mut(&mut self) -> &mut NodeStore {
973 let slots = self.guard.deref_mut();
974 &mut slots[self.i]
975 }
976}
977
978impl<'a, NS: DerefMut<Target = NodeSlot> + 'a> EntryMut<'a, &'a mut NS> {
979 pub fn node_store_mut(&mut self) -> &mut NodeStore {
980 &mut self.guard[self.i]
981 }
982
983 pub fn t_props_log_mut(&mut self) -> &mut TColumns {
984 &mut self.guard.t_props_log
985 }
986}
987
988#[cfg(test)]
989mod test {
990 use super::{NodeStorage, TColumns};
991 use crate::entities::nodes::node_store::NodeStore;
992 use proptest::{arbitrary::any, prop_assert_eq, proptest};
993 use raphtory_api::core::entities::{properties::prop::Prop, GID, VID};
994 use rayon::prelude::*;
995 use std::borrow::Cow;
996
997 #[test]
998 fn tcolumns_append_1() {
999 let mut t_cols = TColumns::default();
1000
1001 t_cols.push([(1, Prop::U64(1))]).unwrap();
1002
1003 let col0 = t_cols.get(0).unwrap();
1004 let col1 = t_cols.get(1).unwrap();
1005
1006 assert_eq!(col0.len(), 1);
1007 assert_eq!(col1.len(), 1);
1008 }
1009
1010 #[test]
1011 fn tcolumns_append_3_rows() {
1012 let mut t_cols = TColumns::default();
1013
1014 t_cols
1015 .push([(1, Prop::U64(1)), (0, Prop::Str("a".into()))])
1016 .unwrap();
1017 t_cols
1018 .push([(0, Prop::Str("c".into())), (2, Prop::I64(9))])
1019 .unwrap();
1020 t_cols
1021 .push([(1, Prop::U64(1)), (3, Prop::Str("c".into()))])
1022 .unwrap();
1023
1024 assert_eq!(t_cols.len(), 3);
1025
1026 for col_id in 0..4 {
1027 let col = t_cols.get(col_id).unwrap();
1028 assert_eq!(col.len(), 3);
1029 }
1030
1031 let col0 = (0..3)
1032 .map(|row| t_cols.get(0).and_then(|col| col.get(row)))
1033 .collect::<Vec<_>>();
1034 assert_eq!(
1035 col0,
1036 vec![
1037 Some(Prop::Str("a".into())),
1038 Some(Prop::Str("c".into())),
1039 None
1040 ]
1041 );
1042
1043 let col1 = (0..3)
1044 .map(|row| t_cols.get(1).and_then(|col| col.get(row)))
1045 .collect::<Vec<_>>();
1046 assert_eq!(col1, vec![Some(Prop::U64(1)), None, Some(Prop::U64(1))]);
1047
1048 let col2 = (0..3)
1049 .map(|row| t_cols.get(2).and_then(|col| col.get(row)))
1050 .collect::<Vec<_>>();
1051 assert_eq!(col2, vec![None, Some(Prop::I64(9)), None]);
1052
1053 let col3 = (0..3)
1054 .map(|row| t_cols.get(3).and_then(|col| col.get(row)))
1055 .collect::<Vec<_>>();
1056 assert_eq!(col3, vec![None, None, Some(Prop::Str("c".into()))]);
1057 }
1058
1059 #[test]
1060 fn tcolumns_append_2_columns_12_items() {
1061 let mut t_cols = TColumns::default();
1062
1063 for value in 0..12 {
1064 if value % 2 == 0 {
1065 t_cols
1066 .push([
1067 (1, Prop::U64(value)),
1068 (0, Prop::Str(value.to_string().into())),
1069 ])
1070 .unwrap();
1071 } else {
1072 t_cols.push([(1, Prop::U64(value))]).unwrap();
1073 }
1074 }
1075
1076 assert_eq!(t_cols.len(), 12);
1077
1078 let col0 = (0..12)
1079 .map(|row| t_cols.get(0).and_then(|col| col.get(row)))
1080 .collect::<Vec<_>>();
1081 assert_eq!(
1082 col0,
1083 vec![
1084 Some(Prop::Str("0".into())),
1085 None,
1086 Some(Prop::Str("2".into())),
1087 None,
1088 Some(Prop::Str("4".into())),
1089 None,
1090 Some(Prop::Str("6".into())),
1091 None,
1092 Some(Prop::Str("8".into())),
1093 None,
1094 Some(Prop::Str("10".into())),
1095 None
1096 ]
1097 );
1098
1099 let col1 = (0..12)
1100 .map(|row| t_cols.get(1).and_then(|col| col.get(row)))
1101 .collect::<Vec<_>>();
1102 assert_eq!(
1103 col1,
1104 vec![
1105 Some(Prop::U64(0)),
1106 Some(Prop::U64(1)),
1107 Some(Prop::U64(2)),
1108 Some(Prop::U64(3)),
1109 Some(Prop::U64(4)),
1110 Some(Prop::U64(5)),
1111 Some(Prop::U64(6)),
1112 Some(Prop::U64(7)),
1113 Some(Prop::U64(8)),
1114 Some(Prop::U64(9)),
1115 Some(Prop::U64(10)),
1116 Some(Prop::U64(11))
1117 ]
1118 );
1119 }
1120
1121 #[test]
1122 fn add_5_values_to_storage() {
1123 let storage = NodeStorage::new(2);
1124
1125 for i in 0..5 {
1126 storage.push(NodeStore::empty(i.into())).init();
1127 }
1128
1129 assert_eq!(storage.len(), 5);
1130
1131 for i in 0..5 {
1132 let entry = storage.entry(VID(i));
1133 assert_eq!(entry.as_ref().node().vid, VID(i));
1134 }
1135
1136 let items = storage.read_lock();
1137
1138 let actual = items
1139 .iter()
1140 .map(|s| s.node().vid.index())
1141 .collect::<Vec<_>>();
1142
1143 assert_eq!(actual, vec![0, 2, 4, 1, 3]);
1144 }
1145
1146 #[test]
1147 fn test_index_correctness() {
1148 let storage = NodeStorage::new(2);
1149
1150 for i in 0..5 {
1151 storage.push(NodeStore::empty(i.into())).init();
1152 }
1153 let locked = storage.read_lock();
1154 let actual: Vec<_> = (0..5)
1155 .map(|i| (i, locked.get(VID(i)).global_id.to_str()))
1156 .collect();
1157
1158 assert_eq!(
1159 actual,
1160 vec![
1161 (0usize, Cow::Borrowed("0")),
1162 (1, "1".into()),
1163 (2, "2".into()),
1164 (3, "3".into()),
1165 (4, "4".into())
1166 ]
1167 );
1168 }
1169
1170 #[test]
1171 fn test_entry() {
1172 let storage = NodeStorage::new(2);
1173
1174 for i in 0..5 {
1175 storage.push(NodeStore::empty(i.into())).init();
1176 }
1177
1178 for i in 0..5 {
1179 let entry = storage.entry(VID(i));
1180 assert_eq!(*entry.as_ref().node().global_id.to_str(), i.to_string());
1181 }
1182 }
1183
1184 #[test]
1185 fn concurrent_push() {
1186 proptest!(|(v in any::<Vec<u64>>())| {
1187 let storage = NodeStorage::new(16);
1188 let mut expected = v
1189 .into_par_iter()
1190 .map(|v| {
1191 storage.push(NodeStore::empty(GID::U64(v))).init();
1192 v
1193 })
1194 .collect::<Vec<_>>();
1195
1196 let locked = storage.read_lock();
1197 let mut actual: Vec<_> = locked
1198 .iter()
1199 .map(|n| n.node().global_id.as_u64().unwrap())
1200 .collect();
1201
1202 actual.sort();
1203 expected.sort();
1204 prop_assert_eq!(actual, expected)
1205 })
1206 }
1207}