raphtory_storage/graph/edges/
edges.rs1use super::{edge_entry::EdgeStorageEntry, unlocked::UnlockedEdges};
2use crate::graph::{
3 edges::{edge_ref::EdgeStorageRef, edge_storage_ops::EdgeStorageOps},
4 variants::storage_variants3::StorageVariants3,
5};
6use raphtory_api::core::entities::{LayerIds, EID};
7use raphtory_core::storage::raw_edges::LockedEdges;
8use rayon::iter::ParallelIterator;
9use std::sync::Arc;
10
11#[cfg(feature = "storage")]
12use crate::disk::storage_interface::{edges::DiskEdges, edges_ref::DiskEdgesRef};
13use crate::graph::variants::storage_variants2::StorageVariants2;
14
15pub enum EdgesStorage {
16 Mem(Arc<LockedEdges>),
17 #[cfg(feature = "storage")]
18 Disk(DiskEdges),
19}
20
21impl EdgesStorage {
22 #[inline]
23 pub fn as_ref(&self) -> EdgesStorageRef<'_> {
24 match self {
25 EdgesStorage::Mem(storage) => EdgesStorageRef::Mem(storage),
26 #[cfg(feature = "storage")]
27 EdgesStorage::Disk(storage) => EdgesStorageRef::Disk(storage.as_ref()),
28 }
29 }
30
31 pub fn edge(&self, eid: EID) -> EdgeStorageRef<'_> {
32 match self {
33 EdgesStorage::Mem(storage) => EdgeStorageRef::Mem(storage.get_mem(eid)),
34 #[cfg(feature = "storage")]
35 EdgesStorage::Disk(storage) => EdgeStorageRef::Disk(storage.get(eid)),
36 }
37 }
38
39 pub fn iter<'a>(
40 &'a self,
41 layers: &'a LayerIds,
42 ) -> impl Iterator<Item = EdgeStorageRef<'a>> + Send + Sync + 'a {
43 match self {
44 EdgesStorage::Mem(storage) => {
45 StorageVariants2::Mem((0..storage.len()).map(EID).filter_map(|e| {
46 let edge = storage.try_get_mem(e)?;
47 edge.has_layer(layers).then_some(EdgeStorageRef::Mem(edge))
48 }))
49 }
50 #[cfg(feature = "storage")]
51 EdgesStorage::Disk(storage) => {
52 StorageVariants2::Disk(storage.as_ref().iter(layers).map(EdgeStorageRef::Disk))
53 }
54 }
55 }
56
57 pub fn par_iter<'a>(
58 &'a self,
59 layers: &'a LayerIds,
60 ) -> impl ParallelIterator<Item = EdgeStorageRef<'a>> + Sync + 'a {
61 match self {
62 EdgesStorage::Mem(storage) => StorageVariants2::Mem(
63 storage
64 .par_iter()
65 .filter(|e| e.has_layer(layers))
66 .map(EdgeStorageRef::Mem),
67 ),
68 #[cfg(feature = "storage")]
69 EdgesStorage::Disk(storage) => {
70 StorageVariants2::Disk(storage.as_ref().par_iter(layers).map(EdgeStorageRef::Disk))
71 }
72 }
73 }
74}
75
76#[derive(Debug, Copy, Clone)]
77pub enum EdgesStorageRef<'a> {
78 Mem(&'a LockedEdges),
79 Unlocked(UnlockedEdges<'a>),
80 #[cfg(feature = "storage")]
81 Disk(DiskEdgesRef<'a>),
82}
83
84impl<'a> EdgesStorageRef<'a> {
85 pub fn iter(
86 self,
87 layers: &'a LayerIds,
88 ) -> impl Iterator<Item = EdgeStorageEntry<'a>> + Send + Sync + 'a {
89 match self {
90 EdgesStorageRef::Mem(storage) => StorageVariants3::Mem(
91 storage
92 .iter()
93 .filter(move |e| e.has_layer(layers))
94 .map(EdgeStorageEntry::Mem),
95 ),
96 EdgesStorageRef::Unlocked(edges) => StorageVariants3::Unlocked(
97 edges
98 .iter()
99 .filter(move |e| e.as_mem_edge().has_layer(layers))
100 .map(EdgeStorageEntry::Unlocked),
101 ),
102 #[cfg(feature = "storage")]
103 EdgesStorageRef::Disk(storage) => {
104 StorageVariants3::Disk(storage.iter(layers).map(EdgeStorageEntry::Disk))
105 }
106 }
107 }
108
109 pub fn par_iter(
110 self,
111 layers: &LayerIds,
112 ) -> impl ParallelIterator<Item = EdgeStorageEntry<'a>> + use<'a, '_> {
113 match self {
114 EdgesStorageRef::Mem(storage) => StorageVariants3::Mem(
115 storage
116 .par_iter()
117 .filter(move |e| e.has_layer(layers))
118 .map(EdgeStorageEntry::Mem),
119 ),
120 EdgesStorageRef::Unlocked(edges) => StorageVariants3::Unlocked(
121 edges
122 .par_iter()
123 .filter(move |e| e.as_mem_edge().has_layer(layers))
124 .map(EdgeStorageEntry::Unlocked),
125 ),
126 #[cfg(feature = "storage")]
127 EdgesStorageRef::Disk(storage) => {
128 StorageVariants3::Disk(storage.par_iter(layers).map(EdgeStorageEntry::Disk))
129 }
130 }
131 }
132
133 #[inline]
134 pub fn count(self, layers: &LayerIds) -> usize {
135 match self {
136 EdgesStorageRef::Mem(storage) => match layers {
137 LayerIds::None => 0,
138 LayerIds::All => storage.len(),
139 _ => storage.par_iter().filter(|e| e.has_layer(layers)).count(),
140 },
141 EdgesStorageRef::Unlocked(edges) => match layers {
142 LayerIds::None => 0,
143 LayerIds::All => edges.len(),
144 _ => edges
145 .par_iter()
146 .filter(|e| e.as_mem_edge().has_layer(layers))
147 .count(),
148 },
149 #[cfg(feature = "storage")]
150 EdgesStorageRef::Disk(storage) => storage.count(layers),
151 }
152 }
153
154 #[inline]
155 pub fn edge(self, edge: EID) -> EdgeStorageEntry<'a> {
156 match self {
157 EdgesStorageRef::Mem(storage) => EdgeStorageEntry::Mem(storage.get_mem(edge)),
158 EdgesStorageRef::Unlocked(storage) => {
159 EdgeStorageEntry::Unlocked(storage.0.edge_entry(edge))
160 }
161 #[cfg(feature = "storage")]
162 EdgesStorageRef::Disk(storage) => EdgeStorageEntry::Disk(storage.edge(edge)),
163 }
164 }
165
166 #[inline]
167 pub fn len(&self) -> usize {
168 match self {
169 EdgesStorageRef::Mem(storage) => storage.len(),
170 EdgesStorageRef::Unlocked(storage) => storage.len(),
171 #[cfg(feature = "storage")]
172 EdgesStorageRef::Disk(storage) => storage.len(),
173 }
174 }
175}