raphtory_storage/graph/nodes/
node_additions.rs1use iter_enum::{DoubleEndedIterator, ExactSizeIterator, FusedIterator, Iterator};
2use raphtory_api::core::{
3 entities::ELID,
4 storage::timeindex::{TimeIndexEntry, TimeIndexOps},
5};
6use raphtory_core::{
7 entities::nodes::node_store::NodeTimestamps,
8 storage::timeindex::{TimeIndexWindow, TimeIndexWindowVariants},
9};
10use std::{iter, ops::Range};
11
12#[cfg(feature = "storage")]
13use {itertools::Itertools, pometry_storage::timestamps::LayerAdditions};
14
15#[derive(Clone, Debug)]
16pub enum NodeAdditions<'a> {
17 Mem(&'a NodeTimestamps),
18 Range(TimeIndexWindow<'a, TimeIndexEntry, NodeTimestamps>),
19 #[cfg(feature = "storage")]
20 Col(LayerAdditions<'a>),
21}
22
23#[derive(Iterator, DoubleEndedIterator, ExactSizeIterator, FusedIterator, Debug)]
24pub enum AdditionVariants<Mem, Range, #[cfg(feature = "storage")] Col> {
25 Mem(Mem),
26 Range(Range),
27 #[cfg(feature = "storage")]
28 Col(Col),
29}
30
31impl<'a> NodeAdditions<'a> {
32 #[inline]
33 pub fn prop_events(&self) -> impl Iterator<Item = TimeIndexEntry> + use<'a> {
34 match self {
35 NodeAdditions::Mem(index) => {
36 AdditionVariants::Mem(index.props_ts.iter().map(|(t, _)| *t))
37 }
38 NodeAdditions::Range(index) => AdditionVariants::Range(match index {
39 TimeIndexWindow::Empty => TimeIndexWindowVariants::Empty(iter::empty()),
40 TimeIndexWindow::Range { timeindex, range } => TimeIndexWindowVariants::Range(
41 timeindex
42 .props_ts
43 .iter_window(range.clone())
44 .map(|(t, _)| *t),
45 ),
46 TimeIndexWindow::All(index) => {
47 TimeIndexWindowVariants::All(index.props_ts.iter().map(|(t, _)| *t))
48 }
49 }),
50 #[cfg(feature = "storage")]
51 NodeAdditions::Col(index) => {
52 AdditionVariants::Col(index.clone().prop_events().map(|t| t.into_iter()).kmerge())
53 }
54 }
55 }
56
57 #[inline]
58 pub fn prop_events_rev(&self) -> impl Iterator<Item = TimeIndexEntry> + use<'a> {
59 match self {
60 NodeAdditions::Mem(index) => {
61 AdditionVariants::Mem(index.props_ts.iter().map(|(t, _)| *t).rev())
62 }
63 NodeAdditions::Range(index) => AdditionVariants::Range(match index {
64 TimeIndexWindow::Empty => TimeIndexWindowVariants::Empty(iter::empty()),
65 TimeIndexWindow::Range { timeindex, range } => TimeIndexWindowVariants::Range(
66 timeindex
67 .props_ts
68 .iter_window(range.clone())
69 .map(|(t, _)| *t)
70 .rev(),
71 ),
72 TimeIndexWindow::All(index) => {
73 TimeIndexWindowVariants::All(index.props_ts.iter().map(|(t, _)| *t).rev())
74 }
75 }),
76 #[cfg(feature = "storage")]
77 NodeAdditions::Col(index) => AdditionVariants::Col(
78 index
79 .clone()
80 .prop_events()
81 .map(|t| t.into_iter().rev())
82 .kmerge_by(|t1, t2| t1 >= t2),
83 ),
84 }
85 }
86
87 #[inline]
88 pub fn edge_events(&self) -> impl Iterator<Item = (TimeIndexEntry, ELID)> + use<'a> {
89 match self {
90 NodeAdditions::Mem(index) => {
91 AdditionVariants::Mem(index.edge_ts.iter().map(|(t, e)| (*t, *e)))
92 }
93 NodeAdditions::Range(index) => AdditionVariants::Range(match index {
94 TimeIndexWindow::Empty => TimeIndexWindowVariants::Empty(iter::empty()),
95 TimeIndexWindow::Range { timeindex, range } => TimeIndexWindowVariants::Range(
96 timeindex
97 .edge_ts
98 .iter_window(range.clone())
99 .map(|(t, e)| (*t, *e)),
100 ),
101 TimeIndexWindow::All(index) => {
102 TimeIndexWindowVariants::All(index.edge_ts.iter().map(|(t, e)| (*t, *e)))
103 }
104 }),
105 #[cfg(feature = "storage")]
106 NodeAdditions::Col(index) => AdditionVariants::Col(index.edge_history()),
107 }
108 }
109
110 #[inline]
111 pub fn edge_events_rev(&self) -> impl Iterator<Item = (TimeIndexEntry, ELID)> + use<'a> {
112 match self {
113 NodeAdditions::Mem(index) => {
114 AdditionVariants::Mem(index.edge_ts.iter().map(|(t, e)| (*t, *e)).rev())
115 }
116 NodeAdditions::Range(index) => AdditionVariants::Range(match index {
117 TimeIndexWindow::Empty => TimeIndexWindowVariants::Empty(iter::empty()),
118 TimeIndexWindow::Range { timeindex, range } => TimeIndexWindowVariants::Range(
119 timeindex
120 .edge_ts
121 .iter_window(range.clone())
122 .map(|(t, e)| (*t, *e))
123 .rev(),
124 ),
125 TimeIndexWindow::All(index) => {
126 TimeIndexWindowVariants::All(index.edge_ts.iter().map(|(t, e)| (*t, *e)).rev())
127 }
128 }),
129 #[cfg(feature = "storage")]
130 NodeAdditions::Col(index) => AdditionVariants::Col(index.edge_history_rev()),
131 }
132 }
133}
134
135impl<'b> TimeIndexOps<'b> for NodeAdditions<'b> {
136 type IndexType = TimeIndexEntry;
137 type RangeType = Self;
138
139 #[inline]
140 fn active(&self, w: Range<TimeIndexEntry>) -> bool {
141 match self {
142 NodeAdditions::Mem(index) => index.active(w),
143 NodeAdditions::Range(index) => index.active(w),
144 #[cfg(feature = "storage")]
145 NodeAdditions::Col(index) => index.iter().any(|index| index.active(w.clone())),
146 }
147 }
148
149 fn range(&self, w: Range<TimeIndexEntry>) -> Self {
150 match self {
151 NodeAdditions::Mem(index) => NodeAdditions::Range(index.range(w)),
152 NodeAdditions::Range(index) => NodeAdditions::Range(index.range(w)),
153 #[cfg(feature = "storage")]
154 NodeAdditions::Col(index) => NodeAdditions::Col(index.with_range(w)),
155 }
156 }
157
158 fn first(&self) -> Option<Self::IndexType> {
159 match self {
160 NodeAdditions::Mem(index) => index.first(),
161 NodeAdditions::Range(index) => index.first(),
162 #[cfg(feature = "storage")]
163 NodeAdditions::Col(index) => index.iter().flat_map(|index| index.first()).min(),
164 }
165 }
166
167 fn last(&self) -> Option<Self::IndexType> {
168 match self {
169 NodeAdditions::Mem(index) => index.last(),
170 NodeAdditions::Range(index) => index.last(),
171 #[cfg(feature = "storage")]
172 NodeAdditions::Col(index) => index.iter().flat_map(|index| index.last()).max(),
173 }
174 }
175
176 fn iter(self) -> impl Iterator<Item = Self::IndexType> + Send + Sync + 'b {
177 match self {
178 NodeAdditions::Mem(index) => AdditionVariants::Mem(index.iter()),
179 NodeAdditions::Range(index) => AdditionVariants::Range(index.iter()),
180 #[cfg(feature = "storage")]
181 NodeAdditions::Col(index) => {
182 AdditionVariants::Col(index.iter().map(|index| index.into_iter()).kmerge())
183 }
184 }
185 }
186
187 fn iter_rev(self) -> impl Iterator<Item = Self::IndexType> + Send + Sync + 'b {
188 match self {
189 NodeAdditions::Mem(index) => AdditionVariants::Mem(index.iter_rev()),
190 NodeAdditions::Range(index) => AdditionVariants::Range(index.iter_rev()),
191 #[cfg(feature = "storage")]
192 NodeAdditions::Col(index) => AdditionVariants::Col(
193 index
194 .iter()
195 .map(|index| index.into_iter().rev())
196 .kmerge_by(|lt, rt| lt >= rt),
197 ),
198 }
199 }
200
201 fn len(&self) -> usize {
202 match self {
203 NodeAdditions::Mem(index) => index.len(),
204 NodeAdditions::Range(range) => range.len(),
205 #[cfg(feature = "storage")]
206 NodeAdditions::Col(col) => col.len(),
207 }
208 }
209}