Skip to main content

raphtory/db/api/view/
time.rs

1use crate::{
2    core::{storage::timeindex::AsTime, utils::time::Interval},
3    db::api::view::{
4        internal::{GraphTimeSemanticsOps, InternalFilter, InternalMaterialize},
5        time::internal::InternalTimeOps,
6    },
7};
8use raphtory_api::{
9    core::{
10        storage::timeindex::EventTime,
11        utils::time::{IntoTime, ParseTimeError},
12    },
13    GraphType,
14};
15use raphtory_core::utils::time::{AlignmentUnit, IntervalSize};
16use std::{
17    cmp::{max, min},
18    marker::PhantomData,
19};
20
21pub(crate) mod internal {
22    use crate::{
23        db::{api::view::internal::InternalFilter, graph::views::window_graph::WindowedGraph},
24        prelude::{GraphViewOps, TimeOps},
25    };
26    use raphtory_api::core::storage::timeindex::{AsTime, EventTime};
27    use raphtory_storage::core_ops::CoreGraphOps;
28    use std::cmp::{max, min};
29
30    pub trait InternalTimeOps<'graph> {
31        type InternalWindowedView: TimeOps<'graph> + 'graph;
32        fn timeline_start(&self) -> Option<EventTime>;
33        fn timeline_end(&self) -> Option<EventTime>;
34        fn latest_t(&self) -> Option<i64>;
35        fn internal_window(
36            &self,
37            start: Option<EventTime>,
38            end: Option<EventTime>,
39        ) -> Self::InternalWindowedView;
40    }
41    impl<'graph, E: InternalFilter<'graph> + 'graph> InternalTimeOps<'graph> for E {
42        type InternalWindowedView = E::Filtered<WindowedGraph<E::Graph>>;
43
44        fn timeline_start(&self) -> Option<EventTime> {
45            self.start()
46                .or_else(|| self.base_graph().core_graph().earliest_time())
47        }
48
49        fn timeline_end(&self) -> Option<EventTime> {
50            self.end().or_else(|| {
51                self.base_graph()
52                    .core_graph()
53                    .latest_time()
54                    .map(|v| EventTime::from(v.0.saturating_add(1)))
55            })
56        }
57
58        fn latest_t(&self) -> Option<i64> {
59            self.base_graph().latest_time().map(|t| t.t())
60        }
61
62        fn internal_window(
63            &self,
64            start: Option<EventTime>,
65            end: Option<EventTime>,
66        ) -> Self::InternalWindowedView {
67            let base_start = self.base_graph().start();
68            let base_end = self.base_graph().end();
69            let actual_start = match (base_start, start) {
70                (Some(base), Some(start)) => Some(max(base, start)),
71                (None, v) => v,
72                (v, None) => v,
73            };
74            let actual_end = match (base_end, end) {
75                (Some(base), Some(end)) => Some(min(base, end)),
76                (None, v) => v,
77                (v, None) => v,
78            };
79            let actual_end = match (actual_end, actual_start) {
80                (Some(end), Some(start)) => Some(max(end, start)),
81                _ => actual_end,
82            };
83            self.apply_filter(WindowedGraph::new(
84                self.base_graph().clone(),
85                actual_start,
86                actual_end,
87            ))
88        }
89    }
90}
91
92/// Trait defining time query operations
93pub trait TimeOps<'graph>:
94    InternalTimeOps<'graph, InternalWindowedView = Self::WindowedViewType>
95{
96    type WindowedViewType: TimeOps<'graph> + 'graph;
97    /// Return the time entry of the start of the view or None if the view start is unbounded.
98    fn start(&self) -> Option<EventTime>;
99
100    /// Return the time entry of the view or None if the view end is unbounded.
101    fn end(&self) -> Option<EventTime>;
102
103    /// set the start of the window to the larger of `start` and `self.start()`
104    fn shrink_start<T: IntoTime>(&self, start: T) -> Self::WindowedViewType;
105
106    /// set the end of the window to the smaller of `end` and `self.end()`
107    fn shrink_end<T: IntoTime>(&self, end: T) -> Self::WindowedViewType;
108
109    /// shrink both the start and end of the window (same as calling `shrink_start` followed by `shrink_end` but more efficient)
110    fn shrink_window<T: IntoTime>(&self, start: T, end: T) -> Self::WindowedViewType;
111
112    /// Return the size of the window covered by this view or None if the window is unbounded
113    fn window_size(&self) -> Option<u64>;
114
115    /// Create a view including all events between `start` (inclusive) and `end` (exclusive)
116    fn window<T1: IntoTime, T2: IntoTime>(&self, start: T1, end: T2) -> Self::WindowedViewType;
117
118    /// Create a view that only includes events at `time`
119    fn at<T: IntoTime>(&self, time: T) -> Self::WindowedViewType;
120
121    /// Create a view that only includes events at the latest time
122    fn latest(&self) -> Self::WindowedViewType;
123
124    /// Create a view including all events that have not been explicitly deleted at `time`
125    ///
126    /// This is equivalent to `before(time + 1)` for `EventGraph`s and `at(time)` for `PersitentGraph`s
127    fn snapshot_at<T: IntoTime>(&self, time: T) -> Self::WindowedViewType;
128
129    /// Create a view including all events that have not been explicitly deleted at the latest time
130    ///
131    /// This is equivalent to a no-op for `EventGraph`s and `latest()` for `PersitentGraph`s
132    fn snapshot_latest(&self) -> Self::WindowedViewType;
133
134    /// Create a view that only includes events after `start` (exclusive)
135    fn after<T: IntoTime>(&self, start: T) -> Self::WindowedViewType;
136
137    /// Create a view that only includes events before `end` (exclusive)
138    fn before<T: IntoTime>(&self, end: T) -> Self::WindowedViewType;
139
140    /// Creates a `WindowSet` with the given `step` size
141    /// using an expanding window. The last window may fall partially outside the range of the data/view.
142    ///
143    /// An expanding window is a window that grows by `step` size at each iteration.
144    ///
145    /// The window will be aligned with the smallest unit of time passed. For example, if the interval
146    /// is "1 month and 1 day", the first window will begin at the start of the day of the first time event.
147    fn expanding<I>(&self, step: I) -> Result<WindowSet<'graph, Self>, ParseTimeError>
148    where
149        Self: Sized + Clone + 'graph,
150        I: TryInto<Interval> + Clone,
151        ParseTimeError: From<<I as TryInto<Interval>>::Error>;
152
153    /// Creates a `WindowSet` with the given `step` size using an expanding window, where the windows are aligned
154    /// with the `alignment_unit` passed. The last window may fall partially outside the range of the data/view.
155    ///
156    /// An expanding window is a window that grows by `step` size at each iteration.
157    ///
158    /// Note that `alignment_unit = AlignmentUnit::Unaligned` achieves unaligned behaviour.
159    fn expanding_aligned<I>(
160        &self,
161        step: I,
162        alignment_unit: AlignmentUnit,
163    ) -> Result<WindowSet<'graph, Self>, ParseTimeError>
164    where
165        Self: Sized + Clone + 'graph,
166        I: TryInto<Interval>,
167        ParseTimeError: From<<I as TryInto<Interval>>::Error>;
168
169    /// Creates a `WindowSet` with the given `window` size and optional `step`
170    /// using a rolling window. The last window may fall partially outside the range of the data/view.
171    /// Note that passing a `step` larger than `window` can lead to some entries appearing before
172    /// the start of the first window and/or after the end of the last window (i.e. not included in any window)
173    ///
174    /// A rolling window is a window that moves forward by `step` size at each iteration.
175    ///
176    /// The window will be aligned with the smallest unit of time passed. For example, if the interval
177    /// is "1 month and 1 day", the first window will begin at the start of the day of the first time event.
178    fn rolling<I>(
179        &self,
180        window: I,
181        step: Option<I>,
182    ) -> Result<WindowSet<'graph, Self>, ParseTimeError>
183    where
184        Self: Sized + Clone + 'graph,
185        I: TryInto<Interval> + Clone,
186        ParseTimeError: From<<I as TryInto<Interval>>::Error>;
187
188    /// Creates a `WindowSet` with the given `window` size and optional `step` using a rolling window, where the windows
189    /// are aligned with the `alignment_unit` passed. The last window may fall partially outside the range of the data/view.
190    /// Note that, depending on the `alignment_unit`, passing a `step` larger than `window` can lead to some entries
191    /// appearing before the start of the first window and/or after the end of the last window (i.e. not included in any window)
192    ///
193    /// A rolling window is a window that moves forward by `step` size at each iteration.
194    ///
195    /// Note that `alignment_unit = AlignmentUnit::Unaligned` achieves unaligned behaviour.
196    fn rolling_aligned<I>(
197        &self,
198        window: I,
199        step: Option<I>,
200        alignment_unit: AlignmentUnit,
201    ) -> Result<WindowSet<'graph, Self>, ParseTimeError>
202    where
203        Self: Sized + Clone + 'graph,
204        I: TryInto<Interval>,
205        ParseTimeError: From<<I as TryInto<Interval>>::Error>;
206}
207
208impl<'graph, V: InternalFilter<'graph> + 'graph + InternalTimeOps<'graph>> TimeOps<'graph> for V {
209    type WindowedViewType = V::InternalWindowedView;
210
211    fn start(&self) -> Option<EventTime> {
212        self.base_graph().view_start()
213    }
214
215    fn end(&self) -> Option<EventTime> {
216        self.base_graph().view_end()
217    }
218
219    fn shrink_start<T: IntoTime>(&self, start: T) -> Self::WindowedViewType {
220        let start = Some(max(
221            start.into_time(),
222            self.start().unwrap_or(EventTime::MIN),
223        ));
224        self.internal_window(start, self.end())
225    }
226
227    fn shrink_end<T: IntoTime>(&self, end: T) -> Self::WindowedViewType {
228        let end = Some(min(end.into_time(), self.end().unwrap_or(EventTime::MAX)));
229        self.internal_window(self.start(), end)
230    }
231
232    fn shrink_window<T: IntoTime>(&self, start: T, end: T) -> Self::WindowedViewType {
233        let start = max(start.into_time(), self.start().unwrap_or(EventTime::MIN));
234        let end = min(end.into_time(), self.end().unwrap_or(EventTime::MAX));
235        self.internal_window(Some(start), Some(end))
236    }
237
238    fn window_size(&self) -> Option<u64> {
239        match (self.start(), self.end()) {
240            (Some(start), Some(end)) => Some((end.t() - start.t()) as u64),
241            _ => None,
242        }
243    }
244
245    fn window<T1: IntoTime, T2: IntoTime>(&self, start: T1, end: T2) -> Self::WindowedViewType {
246        self.internal_window(Some(start.into_time()), Some(end.into_time()))
247    }
248
249    fn at<T: IntoTime>(&self, time: T) -> Self::WindowedViewType {
250        let start = time.into_time();
251        self.internal_window(
252            Some(EventTime::start(start.t())),
253            Some(EventTime::start(start.t().saturating_add(1))),
254        )
255    }
256
257    fn latest(&self) -> Self::WindowedViewType {
258        let time = self.latest_t();
259        self.internal_window(
260            time.map(EventTime::start),
261            time.map(|t| EventTime::start(t.saturating_add(1))),
262        )
263    }
264
265    fn snapshot_at<T: IntoTime>(&self, time: T) -> Self::WindowedViewType {
266        match self.base_graph().graph_type() {
267            GraphType::EventGraph => self.before(time.into_time().t().saturating_add(1)),
268            GraphType::PersistentGraph => self.at(time),
269        }
270    }
271
272    fn snapshot_latest(&self) -> Self::WindowedViewType {
273        match self.latest_t() {
274            Some(latest) => self.snapshot_at(latest),
275            None => self.snapshot_at(i64::MIN),
276        }
277    }
278
279    fn after<T: IntoTime>(&self, start: T) -> Self::WindowedViewType {
280        let start_time = start.into_time();
281        let start = EventTime::start(start_time.t().saturating_add(1));
282        self.internal_window(Some(start), None)
283    }
284
285    fn before<T: IntoTime>(&self, end: T) -> Self::WindowedViewType {
286        let end = EventTime::start(end.into_time().t());
287        self.internal_window(None, Some(end))
288    }
289
290    fn expanding<I>(&self, step: I) -> Result<WindowSet<'graph, Self>, ParseTimeError>
291    where
292        Self: Sized + Clone + 'graph,
293        I: TryInto<Interval> + Clone,
294        ParseTimeError: From<<I as TryInto<Interval>>::Error>,
295    {
296        // step is usually a number or a small string so performance impact of cloning should be minimal
297        let alignment_unit = step
298            .clone()
299            .try_into()?
300            .alignment_unit
301            .unwrap_or(AlignmentUnit::Unaligned);
302        // Align the timestamp to the smallest unit.
303        // If there is None (the Interval is discrete), no alignment is done
304        self.expanding_aligned(step, alignment_unit)
305    }
306
307    fn expanding_aligned<I>(
308        &self,
309        step: I,
310        alignment_unit: AlignmentUnit,
311    ) -> Result<WindowSet<'graph, Self>, ParseTimeError>
312    where
313        Self: Sized + Clone + 'graph,
314        I: TryInto<Interval>,
315        ParseTimeError: From<<I as TryInto<Interval>>::Error>,
316    {
317        let parent = self.clone();
318        match (self.timeline_start(), self.timeline_end()) {
319            (Some(start), Some(end)) => {
320                let step: Interval = step.try_into()?;
321                let start_time = alignment_unit.align_timestamp(start.t());
322                WindowSet::new(parent, start_time, end.t(), step, None)
323            }
324            _ => WindowSet::empty(parent),
325        }
326    }
327
328    fn rolling<I>(
329        &self,
330        window: I,
331        step: Option<I>,
332    ) -> Result<WindowSet<'graph, Self>, ParseTimeError>
333    where
334        Self: Sized + Clone + 'graph,
335        I: TryInto<Interval> + Clone,
336        ParseTimeError: From<<I as TryInto<Interval>>::Error>,
337    {
338        // step and window are usually numbers or small strings so performance impact of cloning should be minimal
339        let alignment_unit = match &step {
340            Some(s) => s
341                .clone()
342                .try_into()?
343                .alignment_unit
344                .unwrap_or(AlignmentUnit::Unaligned),
345            None => window
346                .clone()
347                .try_into()?
348                .alignment_unit
349                .unwrap_or(AlignmentUnit::Unaligned),
350        };
351        // Align the timestamp to the smallest unit in step.
352        // If there is None (i.e. the Interval is discrete), no alignment is done.
353        self.rolling_aligned(window, step, alignment_unit)
354    }
355
356    fn rolling_aligned<I>(
357        &self,
358        window: I,
359        step: Option<I>,
360        alignment_unit: AlignmentUnit,
361    ) -> Result<WindowSet<'graph, Self>, ParseTimeError>
362    where
363        Self: Sized + Clone + 'graph,
364        I: TryInto<Interval>,
365        ParseTimeError: From<<I as TryInto<Interval>>::Error>,
366    {
367        let parent = self.clone();
368        match (self.timeline_start(), self.timeline_end()) {
369            (Some(start), Some(end)) => {
370                let window: Interval = window.try_into()?;
371                let step: Interval = match step {
372                    Some(step) => step.try_into()?,
373                    None => window,
374                };
375                let start_time = alignment_unit.align_timestamp(start.t());
376                WindowSet::new(parent, start_time, end.t(), step, Some(window))
377            }
378            _ => WindowSet::empty(parent),
379        }
380    }
381}
382
383#[derive(Clone)]
384pub struct WindowSet<'graph, T> {
385    view: T,
386    start: i64,
387    counter: u32, // u32 because months from Temporal intervals are u32 (due to chrono months being u32)
388    end: i64,
389    step: Interval,
390    window: Option<Interval>,
391    _marker: PhantomData<&'graph T>,
392}
393
394impl<'graph, T: TimeOps<'graph> + Clone + 'graph> WindowSet<'graph, T> {
395    fn new(
396        view: T,
397        start: i64,
398        end: i64,
399        step: Interval,
400        window: Option<Interval>,
401    ) -> Result<Self, ParseTimeError> {
402        match step.size {
403            IntervalSize::Discrete(v) => {
404                if v == 0 {
405                    return Err(ParseTimeError::ZeroSizeStep);
406                }
407            }
408            IntervalSize::Temporal { millis, months } => {
409                if millis == 0 && months == 0 {
410                    return Err(ParseTimeError::ZeroSizeStep);
411                }
412            }
413        };
414        Ok(Self {
415            view,
416            start,
417            counter: 1,
418            end,
419            step,
420            window,
421            _marker: PhantomData,
422        })
423    }
424
425    fn empty(view: T) -> Result<Self, ParseTimeError> {
426        // timeline_start is greater than end, so no windows to return, even with end inclusive
427        WindowSet::new(view, 1, 0, Default::default(), None)
428    }
429
430    // TODO: make this optionally public only for the development feature flag
431    pub fn temporal(&self) -> bool {
432        self.step.alignment_unit.is_some()
433            || match self.window {
434                Some(window) => window.alignment_unit.is_some(),
435                None => false,
436            }
437    }
438
439    /// Returns the time index of this window set
440    pub fn time_index(&self, center: bool) -> TimeIndex<'graph, T> {
441        TimeIndex {
442            windowset: self.clone(),
443            center,
444        }
445    }
446}
447
448pub struct TimeIndex<'graph, T> {
449    windowset: WindowSet<'graph, T>,
450    center: bool,
451}
452
453impl<'graph, T: TimeOps<'graph> + Clone + 'graph> Iterator for TimeIndex<'graph, T> {
454    type Item = i64;
455
456    fn next(&mut self) -> Option<Self::Item> {
457        let center = self.center;
458        self.windowset.next().map(move |view| {
459            if center {
460                view.start().unwrap().t()
461                    + ((view.end().unwrap().t() - view.start().unwrap().t()) / 2)
462            } else {
463                view.end().unwrap().t() - 1
464            }
465        })
466    }
467}
468
469impl<'graph, T: TimeOps<'graph> + Clone + 'graph> Iterator for WindowSet<'graph, T> {
470    type Item = T::WindowedViewType;
471    fn next(&mut self) -> Option<Self::Item> {
472        let window_end = self.start + (self.counter * self.step);
473
474        if window_end < self.end + self.step {
475            let window_start = self.window.map(|w| window_end - w);
476            if let Some(start) = window_start {
477                // this is required because if we have steps > window size you can end up overstepping
478                // the end by so much in the final window that there is no data inside
479                if start >= self.end {
480                    // this is >= because the end passed through is already +1
481                    return None;
482                }
483            }
484            let window = self.view.internal_window(
485                window_start.map(EventTime::start),
486                Some(EventTime::start(window_end)),
487            );
488            self.counter += 1;
489            Some(window)
490        } else {
491            None
492        }
493    }
494    fn size_hint(&self) -> (usize, Option<usize>) {
495        let len = self.len();
496        (len, Some(len))
497    }
498}
499impl<'graph, T: TimeOps<'graph> + Clone + 'graph> ExactSizeIterator for WindowSet<'graph, T> {
500    // unfortunately because Interval can change size, there is no nice divide option
501    fn len(&self) -> usize {
502        let mut window_end = self.start + (self.counter * self.step);
503        let mut count = 0;
504        while window_end < self.end + self.step {
505            let window_start = self.window.map(|w| window_end - w);
506            if let Some(start) = window_start {
507                if start >= self.end {
508                    break;
509                }
510            }
511            count += 1;
512            window_end = window_end + self.step;
513        }
514        count
515    }
516}