raphtory_storage/graph/nodes/
node_additions.rs

1use iter_enum::{DoubleEndedIterator, ExactSizeIterator, FusedIterator, Iterator};
2use raphtory_api::core::{
3    entities::ELID,
4    storage::timeindex::{TimeIndexEntry, TimeIndexOps},
5};
6use raphtory_core::{
7    entities::nodes::node_store::NodeTimestamps,
8    storage::timeindex::{TimeIndexWindow, TimeIndexWindowVariants},
9};
10use std::{iter, ops::Range};
11
12#[cfg(feature = "storage")]
13use {itertools::Itertools, pometry_storage::timestamps::LayerAdditions};
14
15#[derive(Clone, Debug)]
16pub enum NodeAdditions<'a> {
17    Mem(&'a NodeTimestamps),
18    Range(TimeIndexWindow<'a, TimeIndexEntry, NodeTimestamps>),
19    #[cfg(feature = "storage")]
20    Col(LayerAdditions<'a>),
21}
22
23#[derive(Iterator, DoubleEndedIterator, ExactSizeIterator, FusedIterator, Debug)]
24pub enum AdditionVariants<Mem, Range, #[cfg(feature = "storage")] Col> {
25    Mem(Mem),
26    Range(Range),
27    #[cfg(feature = "storage")]
28    Col(Col),
29}
30
31impl<'a> NodeAdditions<'a> {
32    #[inline]
33    pub fn prop_events(&self) -> impl Iterator<Item = TimeIndexEntry> + use<'a> {
34        match self {
35            NodeAdditions::Mem(index) => {
36                AdditionVariants::Mem(index.props_ts.iter().map(|(t, _)| *t))
37            }
38            NodeAdditions::Range(index) => AdditionVariants::Range(match index {
39                TimeIndexWindow::Empty => TimeIndexWindowVariants::Empty(iter::empty()),
40                TimeIndexWindow::Range { timeindex, range } => TimeIndexWindowVariants::Range(
41                    timeindex
42                        .props_ts
43                        .iter_window(range.clone())
44                        .map(|(t, _)| *t),
45                ),
46                TimeIndexWindow::All(index) => {
47                    TimeIndexWindowVariants::All(index.props_ts.iter().map(|(t, _)| *t))
48                }
49            }),
50            #[cfg(feature = "storage")]
51            NodeAdditions::Col(index) => {
52                AdditionVariants::Col(index.clone().prop_events().map(|t| t.into_iter()).kmerge())
53            }
54        }
55    }
56
57    #[inline]
58    pub fn prop_events_rev(&self) -> impl Iterator<Item = TimeIndexEntry> + use<'a> {
59        match self {
60            NodeAdditions::Mem(index) => {
61                AdditionVariants::Mem(index.props_ts.iter().map(|(t, _)| *t).rev())
62            }
63            NodeAdditions::Range(index) => AdditionVariants::Range(match index {
64                TimeIndexWindow::Empty => TimeIndexWindowVariants::Empty(iter::empty()),
65                TimeIndexWindow::Range { timeindex, range } => TimeIndexWindowVariants::Range(
66                    timeindex
67                        .props_ts
68                        .iter_window(range.clone())
69                        .map(|(t, _)| *t)
70                        .rev(),
71                ),
72                TimeIndexWindow::All(index) => {
73                    TimeIndexWindowVariants::All(index.props_ts.iter().map(|(t, _)| *t).rev())
74                }
75            }),
76            #[cfg(feature = "storage")]
77            NodeAdditions::Col(index) => AdditionVariants::Col(
78                index
79                    .clone()
80                    .prop_events()
81                    .map(|t| t.into_iter().rev())
82                    .kmerge_by(|t1, t2| t1 >= t2),
83            ),
84        }
85    }
86
87    #[inline]
88    pub fn edge_events(&self) -> impl Iterator<Item = (TimeIndexEntry, ELID)> + use<'a> {
89        match self {
90            NodeAdditions::Mem(index) => {
91                AdditionVariants::Mem(index.edge_ts.iter().map(|(t, e)| (*t, *e)))
92            }
93            NodeAdditions::Range(index) => AdditionVariants::Range(match index {
94                TimeIndexWindow::Empty => TimeIndexWindowVariants::Empty(iter::empty()),
95                TimeIndexWindow::Range { timeindex, range } => TimeIndexWindowVariants::Range(
96                    timeindex
97                        .edge_ts
98                        .iter_window(range.clone())
99                        .map(|(t, e)| (*t, *e)),
100                ),
101                TimeIndexWindow::All(index) => {
102                    TimeIndexWindowVariants::All(index.edge_ts.iter().map(|(t, e)| (*t, *e)))
103                }
104            }),
105            #[cfg(feature = "storage")]
106            NodeAdditions::Col(index) => AdditionVariants::Col(index.edge_history()),
107        }
108    }
109
110    #[inline]
111    pub fn edge_events_rev(&self) -> impl Iterator<Item = (TimeIndexEntry, ELID)> + use<'a> {
112        match self {
113            NodeAdditions::Mem(index) => {
114                AdditionVariants::Mem(index.edge_ts.iter().map(|(t, e)| (*t, *e)).rev())
115            }
116            NodeAdditions::Range(index) => AdditionVariants::Range(match index {
117                TimeIndexWindow::Empty => TimeIndexWindowVariants::Empty(iter::empty()),
118                TimeIndexWindow::Range { timeindex, range } => TimeIndexWindowVariants::Range(
119                    timeindex
120                        .edge_ts
121                        .iter_window(range.clone())
122                        .map(|(t, e)| (*t, *e))
123                        .rev(),
124                ),
125                TimeIndexWindow::All(index) => {
126                    TimeIndexWindowVariants::All(index.edge_ts.iter().map(|(t, e)| (*t, *e)).rev())
127                }
128            }),
129            #[cfg(feature = "storage")]
130            NodeAdditions::Col(index) => AdditionVariants::Col(index.edge_history_rev()),
131        }
132    }
133}
134
135impl<'b> TimeIndexOps<'b> for NodeAdditions<'b> {
136    type IndexType = TimeIndexEntry;
137    type RangeType = Self;
138
139    #[inline]
140    fn active(&self, w: Range<TimeIndexEntry>) -> bool {
141        match self {
142            NodeAdditions::Mem(index) => index.active(w),
143            NodeAdditions::Range(index) => index.active(w),
144            #[cfg(feature = "storage")]
145            NodeAdditions::Col(index) => index.iter().any(|index| index.active(w.clone())),
146        }
147    }
148
149    fn range(&self, w: Range<TimeIndexEntry>) -> Self {
150        match self {
151            NodeAdditions::Mem(index) => NodeAdditions::Range(index.range(w)),
152            NodeAdditions::Range(index) => NodeAdditions::Range(index.range(w)),
153            #[cfg(feature = "storage")]
154            NodeAdditions::Col(index) => NodeAdditions::Col(index.with_range(w)),
155        }
156    }
157
158    fn first(&self) -> Option<Self::IndexType> {
159        match self {
160            NodeAdditions::Mem(index) => index.first(),
161            NodeAdditions::Range(index) => index.first(),
162            #[cfg(feature = "storage")]
163            NodeAdditions::Col(index) => index.iter().flat_map(|index| index.first()).min(),
164        }
165    }
166
167    fn last(&self) -> Option<Self::IndexType> {
168        match self {
169            NodeAdditions::Mem(index) => index.last(),
170            NodeAdditions::Range(index) => index.last(),
171            #[cfg(feature = "storage")]
172            NodeAdditions::Col(index) => index.iter().flat_map(|index| index.last()).max(),
173        }
174    }
175
176    fn iter(self) -> impl Iterator<Item = Self::IndexType> + Send + Sync + 'b {
177        match self {
178            NodeAdditions::Mem(index) => AdditionVariants::Mem(index.iter()),
179            NodeAdditions::Range(index) => AdditionVariants::Range(index.iter()),
180            #[cfg(feature = "storage")]
181            NodeAdditions::Col(index) => {
182                AdditionVariants::Col(index.iter().map(|index| index.into_iter()).kmerge())
183            }
184        }
185    }
186
187    fn iter_rev(self) -> impl Iterator<Item = Self::IndexType> + Send + Sync + 'b {
188        match self {
189            NodeAdditions::Mem(index) => AdditionVariants::Mem(index.iter_rev()),
190            NodeAdditions::Range(index) => AdditionVariants::Range(index.iter_rev()),
191            #[cfg(feature = "storage")]
192            NodeAdditions::Col(index) => AdditionVariants::Col(
193                index
194                    .iter()
195                    .map(|index| index.into_iter().rev())
196                    .kmerge_by(|lt, rt| lt >= rt),
197            ),
198        }
199    }
200
201    fn len(&self) -> usize {
202        match self {
203            NodeAdditions::Mem(index) => index.len(),
204            NodeAdditions::Range(range) => range.len(),
205            #[cfg(feature = "storage")]
206            NodeAdditions::Col(col) => col.len(),
207        }
208    }
209}