Skip to main content

raphtory_storage/graph/edges/
edges.rs

1use super::{edge_entry::EdgeStorageEntry, unlocked::UnlockedEdges};
2use crate::graph::{
3    edges::{edge_ref::EdgeStorageRef, edge_storage_ops::EdgeStorageOps},
4    variants::storage_variants3::StorageVariants3,
5};
6use raphtory_api::core::entities::{LayerIds, EID};
7use raphtory_core::storage::raw_edges::LockedEdges;
8use rayon::iter::ParallelIterator;
9use std::sync::Arc;
10
11#[cfg(feature = "storage")]
12use crate::disk::storage_interface::{edges::DiskEdges, edges_ref::DiskEdgesRef};
13use crate::graph::variants::storage_variants2::StorageVariants2;
14
15pub enum EdgesStorage {
16    Mem(Arc<LockedEdges>),
17    #[cfg(feature = "storage")]
18    Disk(DiskEdges),
19}
20
21impl EdgesStorage {
22    #[inline]
23    pub fn as_ref(&self) -> EdgesStorageRef<'_> {
24        match self {
25            EdgesStorage::Mem(storage) => EdgesStorageRef::Mem(storage),
26            #[cfg(feature = "storage")]
27            EdgesStorage::Disk(storage) => EdgesStorageRef::Disk(storage.as_ref()),
28        }
29    }
30
31    pub fn edge(&self, eid: EID) -> EdgeStorageRef<'_> {
32        match self {
33            EdgesStorage::Mem(storage) => EdgeStorageRef::Mem(storage.get_mem(eid)),
34            #[cfg(feature = "storage")]
35            EdgesStorage::Disk(storage) => EdgeStorageRef::Disk(storage.get(eid)),
36        }
37    }
38
39    pub fn iter<'a>(
40        &'a self,
41        layers: &'a LayerIds,
42    ) -> impl Iterator<Item = EdgeStorageRef<'a>> + Send + Sync + 'a {
43        match self {
44            EdgesStorage::Mem(storage) => {
45                StorageVariants2::Mem((0..storage.len()).map(EID).filter_map(|e| {
46                    let edge = storage.try_get_mem(e)?;
47                    edge.has_layer(layers).then_some(EdgeStorageRef::Mem(edge))
48                }))
49            }
50            #[cfg(feature = "storage")]
51            EdgesStorage::Disk(storage) => {
52                StorageVariants2::Disk(storage.as_ref().iter(layers).map(EdgeStorageRef::Disk))
53            }
54        }
55    }
56
57    pub fn par_iter<'a>(
58        &'a self,
59        layers: &'a LayerIds,
60    ) -> impl ParallelIterator<Item = EdgeStorageRef<'a>> + Sync + 'a {
61        match self {
62            EdgesStorage::Mem(storage) => StorageVariants2::Mem(
63                storage
64                    .par_iter()
65                    .filter(|e| e.has_layer(layers))
66                    .map(EdgeStorageRef::Mem),
67            ),
68            #[cfg(feature = "storage")]
69            EdgesStorage::Disk(storage) => {
70                StorageVariants2::Disk(storage.as_ref().par_iter(layers).map(EdgeStorageRef::Disk))
71            }
72        }
73    }
74}
75
76#[derive(Debug, Copy, Clone)]
77pub enum EdgesStorageRef<'a> {
78    Mem(&'a LockedEdges),
79    Unlocked(UnlockedEdges<'a>),
80    #[cfg(feature = "storage")]
81    Disk(DiskEdgesRef<'a>),
82}
83
84impl<'a> EdgesStorageRef<'a> {
85    pub fn iter(
86        self,
87        layers: &'a LayerIds,
88    ) -> impl Iterator<Item = EdgeStorageEntry<'a>> + Send + Sync + 'a {
89        match self {
90            EdgesStorageRef::Mem(storage) => StorageVariants3::Mem(
91                storage
92                    .iter()
93                    .filter(move |e| e.has_layer(layers))
94                    .map(EdgeStorageEntry::Mem),
95            ),
96            EdgesStorageRef::Unlocked(edges) => StorageVariants3::Unlocked(
97                edges
98                    .iter()
99                    .filter(move |e| e.as_mem_edge().has_layer(layers))
100                    .map(EdgeStorageEntry::Unlocked),
101            ),
102            #[cfg(feature = "storage")]
103            EdgesStorageRef::Disk(storage) => {
104                StorageVariants3::Disk(storage.iter(layers).map(EdgeStorageEntry::Disk))
105            }
106        }
107    }
108
109    pub fn par_iter(
110        self,
111        layers: &LayerIds,
112    ) -> impl ParallelIterator<Item = EdgeStorageEntry<'a>> + use<'a, '_> {
113        match self {
114            EdgesStorageRef::Mem(storage) => StorageVariants3::Mem(
115                storage
116                    .par_iter()
117                    .filter(move |e| e.has_layer(layers))
118                    .map(EdgeStorageEntry::Mem),
119            ),
120            EdgesStorageRef::Unlocked(edges) => StorageVariants3::Unlocked(
121                edges
122                    .par_iter()
123                    .filter(move |e| e.as_mem_edge().has_layer(layers))
124                    .map(EdgeStorageEntry::Unlocked),
125            ),
126            #[cfg(feature = "storage")]
127            EdgesStorageRef::Disk(storage) => {
128                StorageVariants3::Disk(storage.par_iter(layers).map(EdgeStorageEntry::Disk))
129            }
130        }
131    }
132
133    #[inline]
134    pub fn count(self, layers: &LayerIds) -> usize {
135        match self {
136            EdgesStorageRef::Mem(storage) => match layers {
137                LayerIds::None => 0,
138                LayerIds::All => storage.len(),
139                _ => storage.par_iter().filter(|e| e.has_layer(layers)).count(),
140            },
141            EdgesStorageRef::Unlocked(edges) => match layers {
142                LayerIds::None => 0,
143                LayerIds::All => edges.len(),
144                _ => edges
145                    .par_iter()
146                    .filter(|e| e.as_mem_edge().has_layer(layers))
147                    .count(),
148            },
149            #[cfg(feature = "storage")]
150            EdgesStorageRef::Disk(storage) => storage.count(layers),
151        }
152    }
153
154    #[inline]
155    pub fn edge(self, edge: EID) -> EdgeStorageEntry<'a> {
156        match self {
157            EdgesStorageRef::Mem(storage) => EdgeStorageEntry::Mem(storage.get_mem(edge)),
158            EdgesStorageRef::Unlocked(storage) => {
159                EdgeStorageEntry::Unlocked(storage.0.edge_entry(edge))
160            }
161            #[cfg(feature = "storage")]
162            EdgesStorageRef::Disk(storage) => EdgeStorageEntry::Disk(storage.edge(edge)),
163        }
164    }
165
166    #[inline]
167    pub fn len(&self) -> usize {
168        match self {
169            EdgesStorageRef::Mem(storage) => storage.len(),
170            EdgesStorageRef::Unlocked(storage) => storage.len(),
171            #[cfg(feature = "storage")]
172            EdgesStorageRef::Disk(storage) => storage.len(),
173        }
174    }
175}