1use crate::{
2 core::{storage::timeindex::AsTime, utils::time::Interval},
3 db::api::view::{
4 internal::{GraphTimeSemanticsOps, InternalFilter, InternalMaterialize},
5 time::internal::InternalTimeOps,
6 },
7};
8use raphtory_api::{
9 core::{
10 storage::timeindex::EventTime,
11 utils::time::{IntoTime, ParseTimeError},
12 },
13 GraphType,
14};
15use raphtory_core::utils::time::{AlignmentUnit, IntervalSize};
16use std::{
17 cmp::{max, min},
18 marker::PhantomData,
19};
20
21pub(crate) mod internal {
22 use crate::{
23 db::{api::view::internal::InternalFilter, graph::views::window_graph::WindowedGraph},
24 prelude::{GraphViewOps, TimeOps},
25 };
26 use raphtory_api::core::storage::timeindex::{AsTime, EventTime};
27 use raphtory_storage::core_ops::CoreGraphOps;
28 use std::cmp::{max, min};
29
30 pub trait InternalTimeOps<'graph> {
31 type InternalWindowedView: TimeOps<'graph> + 'graph;
32 fn timeline_start(&self) -> Option<EventTime>;
33 fn timeline_end(&self) -> Option<EventTime>;
34 fn latest_t(&self) -> Option<i64>;
35 fn internal_window(
36 &self,
37 start: Option<EventTime>,
38 end: Option<EventTime>,
39 ) -> Self::InternalWindowedView;
40 }
41 impl<'graph, E: InternalFilter<'graph> + 'graph> InternalTimeOps<'graph> for E {
42 type InternalWindowedView = E::Filtered<WindowedGraph<E::Graph>>;
43
44 fn timeline_start(&self) -> Option<EventTime> {
45 self.start()
46 .or_else(|| self.base_graph().core_graph().earliest_time())
47 }
48
49 fn timeline_end(&self) -> Option<EventTime> {
50 self.end().or_else(|| {
51 self.base_graph()
52 .core_graph()
53 .latest_time()
54 .map(|v| EventTime::from(v.0.saturating_add(1)))
55 })
56 }
57
58 fn latest_t(&self) -> Option<i64> {
59 self.base_graph().latest_time().map(|t| t.t())
60 }
61
62 fn internal_window(
63 &self,
64 start: Option<EventTime>,
65 end: Option<EventTime>,
66 ) -> Self::InternalWindowedView {
67 let base_start = self.base_graph().start();
68 let base_end = self.base_graph().end();
69 let actual_start = match (base_start, start) {
70 (Some(base), Some(start)) => Some(max(base, start)),
71 (None, v) => v,
72 (v, None) => v,
73 };
74 let actual_end = match (base_end, end) {
75 (Some(base), Some(end)) => Some(min(base, end)),
76 (None, v) => v,
77 (v, None) => v,
78 };
79 let actual_end = match (actual_end, actual_start) {
80 (Some(end), Some(start)) => Some(max(end, start)),
81 _ => actual_end,
82 };
83 self.apply_filter(WindowedGraph::new(
84 self.base_graph().clone(),
85 actual_start,
86 actual_end,
87 ))
88 }
89 }
90}
91
92pub trait TimeOps<'graph>:
94 InternalTimeOps<'graph, InternalWindowedView = Self::WindowedViewType>
95{
96 type WindowedViewType: TimeOps<'graph> + 'graph;
97 fn start(&self) -> Option<EventTime>;
99
100 fn end(&self) -> Option<EventTime>;
102
103 fn shrink_start<T: IntoTime>(&self, start: T) -> Self::WindowedViewType;
105
106 fn shrink_end<T: IntoTime>(&self, end: T) -> Self::WindowedViewType;
108
109 fn shrink_window<T: IntoTime>(&self, start: T, end: T) -> Self::WindowedViewType;
111
112 fn window_size(&self) -> Option<u64>;
114
115 fn window<T1: IntoTime, T2: IntoTime>(&self, start: T1, end: T2) -> Self::WindowedViewType;
117
118 fn at<T: IntoTime>(&self, time: T) -> Self::WindowedViewType;
120
121 fn latest(&self) -> Self::WindowedViewType;
123
124 fn snapshot_at<T: IntoTime>(&self, time: T) -> Self::WindowedViewType;
128
129 fn snapshot_latest(&self) -> Self::WindowedViewType;
133
134 fn after<T: IntoTime>(&self, start: T) -> Self::WindowedViewType;
136
137 fn before<T: IntoTime>(&self, end: T) -> Self::WindowedViewType;
139
140 fn expanding<I>(&self, step: I) -> Result<WindowSet<'graph, Self>, ParseTimeError>
148 where
149 Self: Sized + Clone + 'graph,
150 I: TryInto<Interval> + Clone,
151 ParseTimeError: From<<I as TryInto<Interval>>::Error>;
152
153 fn expanding_aligned<I>(
160 &self,
161 step: I,
162 alignment_unit: AlignmentUnit,
163 ) -> Result<WindowSet<'graph, Self>, ParseTimeError>
164 where
165 Self: Sized + Clone + 'graph,
166 I: TryInto<Interval>,
167 ParseTimeError: From<<I as TryInto<Interval>>::Error>;
168
169 fn rolling<I>(
179 &self,
180 window: I,
181 step: Option<I>,
182 ) -> Result<WindowSet<'graph, Self>, ParseTimeError>
183 where
184 Self: Sized + Clone + 'graph,
185 I: TryInto<Interval> + Clone,
186 ParseTimeError: From<<I as TryInto<Interval>>::Error>;
187
188 fn rolling_aligned<I>(
197 &self,
198 window: I,
199 step: Option<I>,
200 alignment_unit: AlignmentUnit,
201 ) -> Result<WindowSet<'graph, Self>, ParseTimeError>
202 where
203 Self: Sized + Clone + 'graph,
204 I: TryInto<Interval>,
205 ParseTimeError: From<<I as TryInto<Interval>>::Error>;
206}
207
208impl<'graph, V: InternalFilter<'graph> + 'graph + InternalTimeOps<'graph>> TimeOps<'graph> for V {
209 type WindowedViewType = V::InternalWindowedView;
210
211 fn start(&self) -> Option<EventTime> {
212 self.base_graph().view_start()
213 }
214
215 fn end(&self) -> Option<EventTime> {
216 self.base_graph().view_end()
217 }
218
219 fn shrink_start<T: IntoTime>(&self, start: T) -> Self::WindowedViewType {
220 let start = Some(max(
221 start.into_time(),
222 self.start().unwrap_or(EventTime::MIN),
223 ));
224 self.internal_window(start, self.end())
225 }
226
227 fn shrink_end<T: IntoTime>(&self, end: T) -> Self::WindowedViewType {
228 let end = Some(min(end.into_time(), self.end().unwrap_or(EventTime::MAX)));
229 self.internal_window(self.start(), end)
230 }
231
232 fn shrink_window<T: IntoTime>(&self, start: T, end: T) -> Self::WindowedViewType {
233 let start = max(start.into_time(), self.start().unwrap_or(EventTime::MIN));
234 let end = min(end.into_time(), self.end().unwrap_or(EventTime::MAX));
235 self.internal_window(Some(start), Some(end))
236 }
237
238 fn window_size(&self) -> Option<u64> {
239 match (self.start(), self.end()) {
240 (Some(start), Some(end)) => Some((end.t() - start.t()) as u64),
241 _ => None,
242 }
243 }
244
245 fn window<T1: IntoTime, T2: IntoTime>(&self, start: T1, end: T2) -> Self::WindowedViewType {
246 self.internal_window(Some(start.into_time()), Some(end.into_time()))
247 }
248
249 fn at<T: IntoTime>(&self, time: T) -> Self::WindowedViewType {
250 let start = time.into_time();
251 self.internal_window(
252 Some(EventTime::start(start.t())),
253 Some(EventTime::start(start.t().saturating_add(1))),
254 )
255 }
256
257 fn latest(&self) -> Self::WindowedViewType {
258 let time = self.latest_t();
259 self.internal_window(
260 time.map(EventTime::start),
261 time.map(|t| EventTime::start(t.saturating_add(1))),
262 )
263 }
264
265 fn snapshot_at<T: IntoTime>(&self, time: T) -> Self::WindowedViewType {
266 match self.base_graph().graph_type() {
267 GraphType::EventGraph => self.before(time.into_time().t().saturating_add(1)),
268 GraphType::PersistentGraph => self.at(time),
269 }
270 }
271
272 fn snapshot_latest(&self) -> Self::WindowedViewType {
273 match self.latest_t() {
274 Some(latest) => self.snapshot_at(latest),
275 None => self.snapshot_at(i64::MIN),
276 }
277 }
278
279 fn after<T: IntoTime>(&self, start: T) -> Self::WindowedViewType {
280 let start_time = start.into_time();
281 let start = EventTime::start(start_time.t().saturating_add(1));
282 self.internal_window(Some(start), None)
283 }
284
285 fn before<T: IntoTime>(&self, end: T) -> Self::WindowedViewType {
286 let end = EventTime::start(end.into_time().t());
287 self.internal_window(None, Some(end))
288 }
289
290 fn expanding<I>(&self, step: I) -> Result<WindowSet<'graph, Self>, ParseTimeError>
291 where
292 Self: Sized + Clone + 'graph,
293 I: TryInto<Interval> + Clone,
294 ParseTimeError: From<<I as TryInto<Interval>>::Error>,
295 {
296 let alignment_unit = step
298 .clone()
299 .try_into()?
300 .alignment_unit
301 .unwrap_or(AlignmentUnit::Unaligned);
302 self.expanding_aligned(step, alignment_unit)
305 }
306
307 fn expanding_aligned<I>(
308 &self,
309 step: I,
310 alignment_unit: AlignmentUnit,
311 ) -> Result<WindowSet<'graph, Self>, ParseTimeError>
312 where
313 Self: Sized + Clone + 'graph,
314 I: TryInto<Interval>,
315 ParseTimeError: From<<I as TryInto<Interval>>::Error>,
316 {
317 let parent = self.clone();
318 match (self.timeline_start(), self.timeline_end()) {
319 (Some(start), Some(end)) => {
320 let step: Interval = step.try_into()?;
321 let start_time = alignment_unit.align_timestamp(start.t());
322 WindowSet::new(parent, start_time, end.t(), step, None)
323 }
324 _ => WindowSet::empty(parent),
325 }
326 }
327
328 fn rolling<I>(
329 &self,
330 window: I,
331 step: Option<I>,
332 ) -> Result<WindowSet<'graph, Self>, ParseTimeError>
333 where
334 Self: Sized + Clone + 'graph,
335 I: TryInto<Interval> + Clone,
336 ParseTimeError: From<<I as TryInto<Interval>>::Error>,
337 {
338 let alignment_unit = match &step {
340 Some(s) => s
341 .clone()
342 .try_into()?
343 .alignment_unit
344 .unwrap_or(AlignmentUnit::Unaligned),
345 None => window
346 .clone()
347 .try_into()?
348 .alignment_unit
349 .unwrap_or(AlignmentUnit::Unaligned),
350 };
351 self.rolling_aligned(window, step, alignment_unit)
354 }
355
356 fn rolling_aligned<I>(
357 &self,
358 window: I,
359 step: Option<I>,
360 alignment_unit: AlignmentUnit,
361 ) -> Result<WindowSet<'graph, Self>, ParseTimeError>
362 where
363 Self: Sized + Clone + 'graph,
364 I: TryInto<Interval>,
365 ParseTimeError: From<<I as TryInto<Interval>>::Error>,
366 {
367 let parent = self.clone();
368 match (self.timeline_start(), self.timeline_end()) {
369 (Some(start), Some(end)) => {
370 let window: Interval = window.try_into()?;
371 let step: Interval = match step {
372 Some(step) => step.try_into()?,
373 None => window,
374 };
375 let start_time = alignment_unit.align_timestamp(start.t());
376 WindowSet::new(parent, start_time, end.t(), step, Some(window))
377 }
378 _ => WindowSet::empty(parent),
379 }
380 }
381}
382
383#[derive(Clone)]
384pub struct WindowSet<'graph, T> {
385 view: T,
386 start: i64,
387 counter: u32, end: i64,
389 step: Interval,
390 window: Option<Interval>,
391 _marker: PhantomData<&'graph T>,
392}
393
394impl<'graph, T: TimeOps<'graph> + Clone + 'graph> WindowSet<'graph, T> {
395 fn new(
396 view: T,
397 start: i64,
398 end: i64,
399 step: Interval,
400 window: Option<Interval>,
401 ) -> Result<Self, ParseTimeError> {
402 match step.size {
403 IntervalSize::Discrete(v) => {
404 if v == 0 {
405 return Err(ParseTimeError::ZeroSizeStep);
406 }
407 }
408 IntervalSize::Temporal { millis, months } => {
409 if millis == 0 && months == 0 {
410 return Err(ParseTimeError::ZeroSizeStep);
411 }
412 }
413 };
414 Ok(Self {
415 view,
416 start,
417 counter: 1,
418 end,
419 step,
420 window,
421 _marker: PhantomData,
422 })
423 }
424
425 fn empty(view: T) -> Result<Self, ParseTimeError> {
426 WindowSet::new(view, 1, 0, Default::default(), None)
428 }
429
430 pub fn temporal(&self) -> bool {
432 self.step.alignment_unit.is_some()
433 || match self.window {
434 Some(window) => window.alignment_unit.is_some(),
435 None => false,
436 }
437 }
438
439 pub fn time_index(&self, center: bool) -> TimeIndex<'graph, T> {
441 TimeIndex {
442 windowset: self.clone(),
443 center,
444 }
445 }
446}
447
448pub struct TimeIndex<'graph, T> {
449 windowset: WindowSet<'graph, T>,
450 center: bool,
451}
452
453impl<'graph, T: TimeOps<'graph> + Clone + 'graph> Iterator for TimeIndex<'graph, T> {
454 type Item = i64;
455
456 fn next(&mut self) -> Option<Self::Item> {
457 let center = self.center;
458 self.windowset.next().map(move |view| {
459 if center {
460 view.start().unwrap().t()
461 + ((view.end().unwrap().t() - view.start().unwrap().t()) / 2)
462 } else {
463 view.end().unwrap().t() - 1
464 }
465 })
466 }
467}
468
469impl<'graph, T: TimeOps<'graph> + Clone + 'graph> Iterator for WindowSet<'graph, T> {
470 type Item = T::WindowedViewType;
471 fn next(&mut self) -> Option<Self::Item> {
472 let window_end = self.start + (self.counter * self.step);
473
474 if window_end < self.end + self.step {
475 let window_start = self.window.map(|w| window_end - w);
476 if let Some(start) = window_start {
477 if start >= self.end {
480 return None;
482 }
483 }
484 let window = self.view.internal_window(
485 window_start.map(EventTime::start),
486 Some(EventTime::start(window_end)),
487 );
488 self.counter += 1;
489 Some(window)
490 } else {
491 None
492 }
493 }
494 fn size_hint(&self) -> (usize, Option<usize>) {
495 let len = self.len();
496 (len, Some(len))
497 }
498}
499impl<'graph, T: TimeOps<'graph> + Clone + 'graph> ExactSizeIterator for WindowSet<'graph, T> {
500 fn len(&self) -> usize {
502 let mut window_end = self.start + (self.counter * self.step);
503 let mut count = 0;
504 while window_end < self.end + self.step {
505 let window_start = self.window.map(|w| window_end - w);
506 if let Some(start) = window_start {
507 if start >= self.end {
508 break;
509 }
510 }
511 count += 1;
512 window_end = window_end + self.step;
513 }
514 count
515 }
516}