Skip to main content

raphtory_api/core/storage/
timeindex.rs

1use chrono::{DateTime, Utc};
2use itertools::Itertools;
3use serde::{Deserialize, Serialize};
4use std::{fmt, ops::Range};
5
6/// Error type for timestamp to chrono::DateTime<Utc> conversion operations
7#[derive(Debug, Clone, PartialEq, Eq)]
8pub enum TimeError {
9    /// The timestamp value is out of range for chrono::DateTime<Utc> conversion
10    OutOfRange(i64),
11}
12
13impl fmt::Display for TimeError {
14    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
15        let min = DateTime::<Utc>::MIN_UTC.timestamp_millis();
16        let max = DateTime::<Utc>::MAX_UTC.timestamp_millis();
17        match self {
18            TimeError::OutOfRange(timestamp) => {
19                write!(f, "Timestamp '{}' is out of range for DateTime conversion. Valid range is from {} to {}", timestamp, min, max)
20            }
21        }
22    }
23}
24
25impl std::error::Error for TimeError {}
26
27#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Ord, PartialOrd, Eq, Hash)]
28pub struct EventTime(pub i64, pub usize);
29
30impl PartialEq<i64> for EventTime {
31    fn eq(&self, other: &i64) -> bool {
32        self.0 == *other
33    }
34}
35
36impl fmt::Display for EventTime {
37    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38        write!(f, "EventTime(timestamp={}, event_id={})", self.0, self.1)
39    }
40}
41
42pub trait AsTime: fmt::Debug + Copy + Ord + Eq + Send + Sync + 'static {
43    fn t(&self) -> i64;
44
45    /// Tries to convert the timestamp into a UTC DateTime.
46    fn dt(&self) -> Result<DateTime<Utc>, TimeError> {
47        let t = self.t();
48        DateTime::from_timestamp_millis(t).ok_or(TimeError::OutOfRange(t))
49    }
50
51    fn range(w: Range<i64>) -> Range<Self>;
52
53    fn i(&self) -> usize {
54        0
55    }
56
57    fn new(t: i64, s: usize) -> Self;
58}
59
60pub trait TimeIndexLike<'a>: TimeIndexOps<'a> {
61    fn range_iter(
62        self,
63        w: Range<Self::IndexType>,
64    ) -> impl Iterator<Item = Self::IndexType> + Send + Sync + 'a;
65
66    fn range_iter_rev(
67        self,
68        w: Range<Self::IndexType>,
69    ) -> impl Iterator<Item = Self::IndexType> + Send + Sync + 'a;
70
71    fn range_count(&self, w: Range<Self::IndexType>) -> usize;
72
73    fn first_range(&self, w: Range<Self::IndexType>) -> Option<Self::IndexType> {
74        self.clone().range_iter(w).next()
75    }
76
77    fn last_range(&self, w: Range<Self::IndexType>) -> Option<Self::IndexType> {
78        self.clone().range_iter_rev(w).next()
79    }
80}
81
82pub trait TimeIndexOps<'a>: Sized + Clone + Send + Sync + 'a {
83    type IndexType: AsTime;
84    type RangeType: TimeIndexOps<'a, IndexType = Self::IndexType> + 'a;
85
86    fn active(&self, w: Range<Self::IndexType>) -> bool;
87
88    #[inline]
89    fn active_t(&self, w: Range<i64>) -> bool {
90        self.active(<Self::IndexType as AsTime>::range(w))
91    }
92
93    fn range(&self, w: Range<Self::IndexType>) -> Self::RangeType;
94
95    fn range_t(&self, w: Range<i64>) -> Self::RangeType {
96        self.range(<Self::IndexType as AsTime>::range(w))
97    }
98
99    fn first_t(&self) -> Option<i64> {
100        self.first().map(|ti| ti.t())
101    }
102
103    fn first(&self) -> Option<Self::IndexType> {
104        self.clone().iter().next()
105    }
106
107    fn last_t(&self) -> Option<i64> {
108        self.last().map(|ti| ti.t())
109    }
110
111    fn last(&self) -> Option<Self::IndexType> {
112        self.clone().iter_rev().next()
113    }
114
115    fn iter(self) -> impl Iterator<Item = Self::IndexType> + Send + Sync + 'a;
116
117    fn iter_rev(self) -> impl Iterator<Item = Self::IndexType> + Send + Sync + 'a;
118
119    fn iter_t(self) -> impl Iterator<Item = i64> + Send + Sync + 'a {
120        self.iter().map(|time| time.t())
121    }
122
123    fn iter_rev_t(self) -> impl Iterator<Item = i64> + Send + Sync + 'a {
124        self.iter_rev().map(|time| time.t())
125    }
126
127    fn len(&self) -> usize;
128
129    fn is_empty(&self) -> bool {
130        self.clone().iter().next().is_none()
131    }
132
133    fn merge<R: TimeIndexOps<'a, IndexType = Self::IndexType>>(
134        self,
135        other: R,
136    ) -> MergedTimeIndex<Self, R> {
137        MergedTimeIndex(self, other)
138    }
139}
140
141impl<'a, T: TimeIndexOps<'a> + Clone> TimeIndexOps<'a> for &'a T {
142    type IndexType = T::IndexType;
143    type RangeType = T::RangeType;
144
145    fn active(&self, w: Range<Self::IndexType>) -> bool {
146        T::active(*self, w)
147    }
148
149    fn range(&self, w: Range<Self::IndexType>) -> Self::RangeType {
150        T::range(*self, w)
151    }
152
153    fn iter(self) -> impl Iterator<Item = Self::IndexType> + Send + Sync + 'a {
154        T::iter(self.clone())
155    }
156
157    fn iter_rev(self) -> impl Iterator<Item = Self::IndexType> + Send + Sync + 'a {
158        T::iter_rev(self.clone())
159    }
160
161    fn len(&self) -> usize {
162        T::len(*self)
163    }
164}
165
166#[derive(Copy, Clone, Debug)]
167pub struct MergedTimeIndex<L, R>(pub L, pub R);
168
169impl<'a, L: TimeIndexOps<'a>, R: TimeIndexOps<'a, IndexType = L::IndexType>> TimeIndexOps<'a>
170    for MergedTimeIndex<L, R>
171{
172    type IndexType = L::IndexType;
173    type RangeType = MergedTimeIndex<L::RangeType, R::RangeType>;
174
175    fn active(&self, w: Range<Self::IndexType>) -> bool {
176        self.0.active(w.clone()) || self.1.active(w.clone())
177    }
178
179    fn range(&self, w: Range<Self::IndexType>) -> Self::RangeType {
180        MergedTimeIndex(self.0.range(w.clone()), self.1.range(w.clone()))
181    }
182
183    fn first(&self) -> Option<Self::IndexType> {
184        self.0.first().into_iter().chain(self.1.first()).min()
185    }
186
187    fn last(&self) -> Option<Self::IndexType> {
188        self.0.last().into_iter().chain(self.1.last()).max()
189    }
190
191    fn iter(self) -> impl Iterator<Item = Self::IndexType> + Send + Sync + 'a {
192        self.0.iter().merge(self.1.iter())
193    }
194
195    fn iter_rev(self) -> impl Iterator<Item = Self::IndexType> + Send + Sync + 'a {
196        self.0.iter_rev().merge_by(self.1.iter_rev(), |l, r| l >= r)
197    }
198
199    fn len(&self) -> usize {
200        self.0.len() + self.1.len()
201    }
202
203    fn is_empty(&self) -> bool {
204        self.0.is_empty() && self.1.is_empty()
205    }
206}
207
208impl From<i64> for EventTime {
209    fn from(value: i64) -> Self {
210        Self::start(value)
211    }
212}
213
214impl EventTime {
215    pub const MIN: EventTime = EventTime(i64::MIN, 0);
216
217    pub const MAX: EventTime = EventTime(i64::MAX, usize::MAX);
218    pub fn new(t: i64, s: usize) -> Self {
219        Self(t, s)
220    }
221
222    /// Sets the event id of the EventTime.
223    /// Note that this mutates the EventTime in place rather than create and return a new one.
224    pub fn set_event_id(mut self, i: usize) -> Self {
225        self.1 = i;
226        self
227    }
228
229    pub fn as_tuple(&self) -> (i64, usize) {
230        (self.0, self.1)
231    }
232
233    pub fn start(t: i64) -> Self {
234        Self(t, 0)
235    }
236
237    pub fn next(&self) -> Self {
238        if self.1 < usize::MAX {
239            Self(self.0, self.1 + 1)
240        } else if self.0 < i64::MAX {
241            Self(self.0 + 1, 0)
242        } else {
243            *self
244        }
245    }
246
247    pub fn previous(&self) -> Self {
248        if self.1 > 0 {
249            Self(self.0, self.1 - 1)
250        } else if self.0 > i64::MIN {
251            Self(self.0 - 1, 0)
252        } else {
253            *self
254        }
255    }
256
257    pub fn end(t: i64) -> Self {
258        Self(t, usize::MAX)
259    }
260}
261
262impl AsTime for i64 {
263    fn t(&self) -> i64 {
264        *self
265    }
266
267    fn range(w: Range<i64>) -> Range<Self> {
268        w
269    }
270
271    fn new(t: i64, _s: usize) -> Self {
272        t
273    }
274}
275
276impl AsTime for EventTime {
277    fn t(&self) -> i64 {
278        self.0
279    }
280    fn range(w: Range<i64>) -> Range<Self> {
281        Self::start(w.start)..Self::start(w.end)
282    }
283
284    fn i(&self) -> usize {
285        self.1
286    }
287
288    fn new(t: i64, s: usize) -> Self {
289        Self(t, s)
290    }
291}