raphtory_api/core/storage/
timeindex.rs1use chrono::{DateTime, Utc};
2use itertools::Itertools;
3use serde::{Deserialize, Serialize};
4use std::{fmt, ops::Range};
5
6#[derive(Debug, Clone, PartialEq, Eq)]
8pub enum TimeError {
9 OutOfRange(i64),
11}
12
13impl fmt::Display for TimeError {
14 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
15 let min = DateTime::<Utc>::MIN_UTC.timestamp_millis();
16 let max = DateTime::<Utc>::MAX_UTC.timestamp_millis();
17 match self {
18 TimeError::OutOfRange(timestamp) => {
19 write!(f, "Timestamp '{}' is out of range for DateTime conversion. Valid range is from {} to {}", timestamp, min, max)
20 }
21 }
22 }
23}
24
25impl std::error::Error for TimeError {}
26
27#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Ord, PartialOrd, Eq, Hash)]
28pub struct EventTime(pub i64, pub usize);
29
30impl PartialEq<i64> for EventTime {
31 fn eq(&self, other: &i64) -> bool {
32 self.0 == *other
33 }
34}
35
36impl fmt::Display for EventTime {
37 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38 write!(f, "EventTime(timestamp={}, event_id={})", self.0, self.1)
39 }
40}
41
42pub trait AsTime: fmt::Debug + Copy + Ord + Eq + Send + Sync + 'static {
43 fn t(&self) -> i64;
44
45 fn dt(&self) -> Result<DateTime<Utc>, TimeError> {
47 let t = self.t();
48 DateTime::from_timestamp_millis(t).ok_or(TimeError::OutOfRange(t))
49 }
50
51 fn range(w: Range<i64>) -> Range<Self>;
52
53 fn i(&self) -> usize {
54 0
55 }
56
57 fn new(t: i64, s: usize) -> Self;
58}
59
60pub trait TimeIndexLike<'a>: TimeIndexOps<'a> {
61 fn range_iter(
62 self,
63 w: Range<Self::IndexType>,
64 ) -> impl Iterator<Item = Self::IndexType> + Send + Sync + 'a;
65
66 fn range_iter_rev(
67 self,
68 w: Range<Self::IndexType>,
69 ) -> impl Iterator<Item = Self::IndexType> + Send + Sync + 'a;
70
71 fn range_count(&self, w: Range<Self::IndexType>) -> usize;
72
73 fn first_range(&self, w: Range<Self::IndexType>) -> Option<Self::IndexType> {
74 self.clone().range_iter(w).next()
75 }
76
77 fn last_range(&self, w: Range<Self::IndexType>) -> Option<Self::IndexType> {
78 self.clone().range_iter_rev(w).next()
79 }
80}
81
82pub trait TimeIndexOps<'a>: Sized + Clone + Send + Sync + 'a {
83 type IndexType: AsTime;
84 type RangeType: TimeIndexOps<'a, IndexType = Self::IndexType> + 'a;
85
86 fn active(&self, w: Range<Self::IndexType>) -> bool;
87
88 #[inline]
89 fn active_t(&self, w: Range<i64>) -> bool {
90 self.active(<Self::IndexType as AsTime>::range(w))
91 }
92
93 fn range(&self, w: Range<Self::IndexType>) -> Self::RangeType;
94
95 fn range_t(&self, w: Range<i64>) -> Self::RangeType {
96 self.range(<Self::IndexType as AsTime>::range(w))
97 }
98
99 fn first_t(&self) -> Option<i64> {
100 self.first().map(|ti| ti.t())
101 }
102
103 fn first(&self) -> Option<Self::IndexType> {
104 self.clone().iter().next()
105 }
106
107 fn last_t(&self) -> Option<i64> {
108 self.last().map(|ti| ti.t())
109 }
110
111 fn last(&self) -> Option<Self::IndexType> {
112 self.clone().iter_rev().next()
113 }
114
115 fn iter(self) -> impl Iterator<Item = Self::IndexType> + Send + Sync + 'a;
116
117 fn iter_rev(self) -> impl Iterator<Item = Self::IndexType> + Send + Sync + 'a;
118
119 fn iter_t(self) -> impl Iterator<Item = i64> + Send + Sync + 'a {
120 self.iter().map(|time| time.t())
121 }
122
123 fn iter_rev_t(self) -> impl Iterator<Item = i64> + Send + Sync + 'a {
124 self.iter_rev().map(|time| time.t())
125 }
126
127 fn len(&self) -> usize;
128
129 fn is_empty(&self) -> bool {
130 self.clone().iter().next().is_none()
131 }
132
133 fn merge<R: TimeIndexOps<'a, IndexType = Self::IndexType>>(
134 self,
135 other: R,
136 ) -> MergedTimeIndex<Self, R> {
137 MergedTimeIndex(self, other)
138 }
139}
140
141impl<'a, T: TimeIndexOps<'a> + Clone> TimeIndexOps<'a> for &'a T {
142 type IndexType = T::IndexType;
143 type RangeType = T::RangeType;
144
145 fn active(&self, w: Range<Self::IndexType>) -> bool {
146 T::active(*self, w)
147 }
148
149 fn range(&self, w: Range<Self::IndexType>) -> Self::RangeType {
150 T::range(*self, w)
151 }
152
153 fn iter(self) -> impl Iterator<Item = Self::IndexType> + Send + Sync + 'a {
154 T::iter(self.clone())
155 }
156
157 fn iter_rev(self) -> impl Iterator<Item = Self::IndexType> + Send + Sync + 'a {
158 T::iter_rev(self.clone())
159 }
160
161 fn len(&self) -> usize {
162 T::len(*self)
163 }
164}
165
166#[derive(Copy, Clone, Debug)]
167pub struct MergedTimeIndex<L, R>(pub L, pub R);
168
169impl<'a, L: TimeIndexOps<'a>, R: TimeIndexOps<'a, IndexType = L::IndexType>> TimeIndexOps<'a>
170 for MergedTimeIndex<L, R>
171{
172 type IndexType = L::IndexType;
173 type RangeType = MergedTimeIndex<L::RangeType, R::RangeType>;
174
175 fn active(&self, w: Range<Self::IndexType>) -> bool {
176 self.0.active(w.clone()) || self.1.active(w.clone())
177 }
178
179 fn range(&self, w: Range<Self::IndexType>) -> Self::RangeType {
180 MergedTimeIndex(self.0.range(w.clone()), self.1.range(w.clone()))
181 }
182
183 fn first(&self) -> Option<Self::IndexType> {
184 self.0.first().into_iter().chain(self.1.first()).min()
185 }
186
187 fn last(&self) -> Option<Self::IndexType> {
188 self.0.last().into_iter().chain(self.1.last()).max()
189 }
190
191 fn iter(self) -> impl Iterator<Item = Self::IndexType> + Send + Sync + 'a {
192 self.0.iter().merge(self.1.iter())
193 }
194
195 fn iter_rev(self) -> impl Iterator<Item = Self::IndexType> + Send + Sync + 'a {
196 self.0.iter_rev().merge_by(self.1.iter_rev(), |l, r| l >= r)
197 }
198
199 fn len(&self) -> usize {
200 self.0.len() + self.1.len()
201 }
202
203 fn is_empty(&self) -> bool {
204 self.0.is_empty() && self.1.is_empty()
205 }
206}
207
208impl From<i64> for EventTime {
209 fn from(value: i64) -> Self {
210 Self::start(value)
211 }
212}
213
214impl EventTime {
215 pub const MIN: EventTime = EventTime(i64::MIN, 0);
216
217 pub const MAX: EventTime = EventTime(i64::MAX, usize::MAX);
218 pub fn new(t: i64, s: usize) -> Self {
219 Self(t, s)
220 }
221
222 pub fn set_event_id(mut self, i: usize) -> Self {
225 self.1 = i;
226 self
227 }
228
229 pub fn as_tuple(&self) -> (i64, usize) {
230 (self.0, self.1)
231 }
232
233 pub fn start(t: i64) -> Self {
234 Self(t, 0)
235 }
236
237 pub fn next(&self) -> Self {
238 if self.1 < usize::MAX {
239 Self(self.0, self.1 + 1)
240 } else if self.0 < i64::MAX {
241 Self(self.0 + 1, 0)
242 } else {
243 *self
244 }
245 }
246
247 pub fn previous(&self) -> Self {
248 if self.1 > 0 {
249 Self(self.0, self.1 - 1)
250 } else if self.0 > i64::MIN {
251 Self(self.0 - 1, 0)
252 } else {
253 *self
254 }
255 }
256
257 pub fn end(t: i64) -> Self {
258 Self(t, usize::MAX)
259 }
260}
261
262impl AsTime for i64 {
263 fn t(&self) -> i64 {
264 *self
265 }
266
267 fn range(w: Range<i64>) -> Range<Self> {
268 w
269 }
270
271 fn new(t: i64, _s: usize) -> Self {
272 t
273 }
274}
275
276impl AsTime for EventTime {
277 fn t(&self) -> i64 {
278 self.0
279 }
280 fn range(w: Range<i64>) -> Range<Self> {
281 Self::start(w.start)..Self::start(w.end)
282 }
283
284 fn i(&self) -> usize {
285 self.1
286 }
287
288 fn new(t: i64, s: usize) -> Self {
289 Self(t, s)
290 }
291}