polars_time/windows/
group_by.rs

1use std::collections::VecDeque;
2
3use arrow::legacy::time_zone::Tz;
4use arrow::trusted_len::TrustedLen;
5use polars_core::POOL;
6use polars_core::prelude::*;
7use polars_core::utils::_split_offsets;
8use polars_core::utils::flatten::flatten_par;
9use rayon::prelude::*;
10#[cfg(feature = "serde")]
11use serde::{Deserialize, Serialize};
12use strum_macros::IntoStaticStr;
13
14use crate::prelude::*;
15
16#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
17#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
18#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
19#[strum(serialize_all = "snake_case")]
20pub enum ClosedWindow {
21    Left,
22    Right,
23    Both,
24    None,
25}
26
27#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
28#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
29#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
30#[strum(serialize_all = "snake_case")]
31pub enum Label {
32    Left,
33    Right,
34    DataPoint,
35}
36
37#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
38#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
39#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
40#[strum(serialize_all = "snake_case")]
41#[derive(Default)]
42pub enum StartBy {
43    #[default]
44    WindowBound,
45    DataPoint,
46    /// only useful if periods are weekly
47    Monday,
48    Tuesday,
49    Wednesday,
50    Thursday,
51    Friday,
52    Saturday,
53    Sunday,
54}
55
56impl StartBy {
57    pub fn weekday(&self) -> Option<u32> {
58        match self {
59            StartBy::Monday => Some(0),
60            StartBy::Tuesday => Some(1),
61            StartBy::Wednesday => Some(2),
62            StartBy::Thursday => Some(3),
63            StartBy::Friday => Some(4),
64            StartBy::Saturday => Some(5),
65            StartBy::Sunday => Some(6),
66            _ => None,
67        }
68    }
69}
70
71#[allow(clippy::too_many_arguments)]
72fn update_groups_and_bounds(
73    bounds_iter: BoundsIter<'_>,
74    mut start: usize,
75    time: &[i64],
76    closed_window: ClosedWindow,
77    include_lower_bound: bool,
78    include_upper_bound: bool,
79    lower_bound: &mut Vec<i64>,
80    upper_bound: &mut Vec<i64>,
81    groups: &mut Vec<[IdxSize; 2]>,
82) {
83    let mut iter = bounds_iter.into_iter();
84    let mut stride = 0;
85
86    'bounds: while let Some(bi) = iter.nth(stride) {
87        let mut has_member = false;
88        // find starting point of window
89        for &t in &time[start..time.len().saturating_sub(1)] {
90            // the window is behind the time values.
91            if bi.is_future(t, closed_window) {
92                stride = iter.get_stride(t);
93                continue 'bounds;
94            }
95            if bi.is_member_entry(t, closed_window) {
96                has_member = true;
97                break;
98            }
99            // element drops out of the window
100            start += 1;
101        }
102
103        // update stride so we can fast-forward in case of sparse data
104        stride = if has_member {
105            0
106        } else {
107            debug_assert!(start < time.len());
108            iter.get_stride(time[start])
109        };
110
111        // find members of this window
112        let mut end = start;
113
114        // last value isn't always added
115        if end == time.len() - 1 {
116            let t = time[end];
117            if bi.is_member(t, closed_window) {
118                if include_lower_bound {
119                    lower_bound.push(bi.start);
120                }
121                if include_upper_bound {
122                    upper_bound.push(bi.stop);
123                }
124                groups.push([end as IdxSize, 1])
125            }
126            continue;
127        }
128        for &t in &time[end..] {
129            if !bi.is_member_exit(t, closed_window) {
130                break;
131            }
132            end += 1;
133        }
134        let len = end - start;
135
136        if include_lower_bound {
137            lower_bound.push(bi.start);
138        }
139        if include_upper_bound {
140            upper_bound.push(bi.stop);
141        }
142        groups.push([start as IdxSize, len as IdxSize])
143    }
144}
145
146/// Window boundaries are created based on the given `Window`, which is defined by:
147/// - every
148/// - period
149/// - offset
150///
151/// And every window boundary we search for the values that fit that window by the given
152/// `ClosedWindow`. The groups are return as `GroupTuples` together with the lower bound and upper
153/// bound timestamps. These timestamps indicate the start (lower) and end (upper) of the window of
154/// that group.
155///
156/// If `include_boundaries` is `false` those `lower` and `upper` vectors will be empty.
157#[allow(clippy::too_many_arguments)]
158pub fn group_by_windows(
159    window: Window,
160    time: &[i64],
161    closed_window: ClosedWindow,
162    tu: TimeUnit,
163    tz: &Option<TimeZone>,
164    include_lower_bound: bool,
165    include_upper_bound: bool,
166    start_by: StartBy,
167) -> PolarsResult<(GroupsSlice, Vec<i64>, Vec<i64>)> {
168    let start = time[0];
169    // the boundary we define here is not yet correct. It doesn't take 'period' into account
170    // and it doesn't have the proper starting point. This boundary is used as a proxy to find
171    // the proper 'boundary' in  'window.get_overlapping_bounds_iter'.
172    let boundary = if time.len() > 1 {
173        // +1 because left or closed boundary could match the next window if it is on the boundary
174        let stop = time[time.len() - 1] + 1;
175        Bounds::new_checked(start, stop)
176    } else {
177        let stop = start + 1;
178        Bounds::new_checked(start, stop)
179    };
180
181    let size = {
182        match tu {
183            TimeUnit::Nanoseconds => window.estimate_overlapping_bounds_ns(boundary),
184            TimeUnit::Microseconds => window.estimate_overlapping_bounds_us(boundary),
185            TimeUnit::Milliseconds => window.estimate_overlapping_bounds_ms(boundary),
186        }
187    };
188    let size_lower = if include_lower_bound { size } else { 0 };
189    let size_upper = if include_upper_bound { size } else { 0 };
190    let mut lower_bound = Vec::with_capacity(size_lower);
191    let mut upper_bound = Vec::with_capacity(size_upper);
192
193    let mut groups = Vec::with_capacity(size);
194    let start_offset = 0;
195
196    match tz {
197        #[cfg(feature = "timezones")]
198        Some(tz) => {
199            update_groups_and_bounds(
200                window.get_overlapping_bounds_iter(
201                    boundary,
202                    closed_window,
203                    tu,
204                    tz.parse::<Tz>().ok().as_ref(),
205                    start_by,
206                )?,
207                start_offset,
208                time,
209                closed_window,
210                include_lower_bound,
211                include_upper_bound,
212                &mut lower_bound,
213                &mut upper_bound,
214                &mut groups,
215            );
216        },
217        _ => {
218            update_groups_and_bounds(
219                window.get_overlapping_bounds_iter(boundary, closed_window, tu, None, start_by)?,
220                start_offset,
221                time,
222                closed_window,
223                include_lower_bound,
224                include_upper_bound,
225                &mut lower_bound,
226                &mut upper_bound,
227                &mut groups,
228            );
229        },
230    };
231
232    Ok((groups, lower_bound, upper_bound))
233}
234
235// t is right at the end of the window
236// ------t---
237// [------]
238#[inline]
239#[allow(clippy::too_many_arguments)]
240pub(crate) fn group_by_values_iter_lookbehind(
241    period: Duration,
242    offset: Duration,
243    time: &[i64],
244    closed_window: ClosedWindow,
245    tu: TimeUnit,
246    tz: Option<Tz>,
247    start_offset: usize,
248    upper_bound: Option<usize>,
249) -> PolarsResult<impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_> {
250    debug_assert!(offset.duration_ns() == period.duration_ns());
251    debug_assert!(offset.negative);
252    let add = match tu {
253        TimeUnit::Nanoseconds => Duration::add_ns,
254        TimeUnit::Microseconds => Duration::add_us,
255        TimeUnit::Milliseconds => Duration::add_ms,
256    };
257
258    let upper_bound = upper_bound.unwrap_or(time.len());
259    // Use binary search to find the initial start as that is behind.
260    let mut start = if let Some(&t) = time.get(start_offset) {
261        let lower = add(&offset, t, tz.as_ref())?;
262        // We have `period == -offset`, so `t + offset + period` is equal to `t`,
263        // and `upper` is trivially equal to `t` itself. Using the trivial calculation,
264        // instead of `upper = lower + period`, avoids issues around
265        // `t - 1mo + 1mo` not round-tripping.
266        let upper = t;
267        let b = Bounds::new(lower, upper);
268        let slice = &time[..start_offset];
269        slice.partition_point(|v| !b.is_member(*v, closed_window))
270    } else {
271        0
272    };
273    let mut end = start;
274    let mut last = time[start_offset];
275    Ok(time[start_offset..upper_bound]
276        .iter()
277        .enumerate()
278        .map(move |(mut i, t)| {
279            // Fast path for duplicates.
280            if *t == last && i > 0 {
281                let len = end - start;
282                let offset = start as IdxSize;
283                return Ok((offset, len as IdxSize));
284            }
285            last = *t;
286            i += start_offset;
287
288            let lower = add(&offset, *t, tz.as_ref())?;
289            let upper = *t;
290
291            let b = Bounds::new(lower, upper);
292
293            for &t in unsafe { time.get_unchecked(start..i) } {
294                if b.is_member_entry(t, closed_window) {
295                    break;
296                }
297                start += 1;
298            }
299
300            // faster path, check if `i` is member.
301            if b.is_member_exit(*t, closed_window) {
302                end = i;
303            } else {
304                end = std::cmp::max(end, start);
305            }
306            // we still must loop to consume duplicates
307            for &t in unsafe { time.get_unchecked(end..) } {
308                if !b.is_member_exit(t, closed_window) {
309                    break;
310                }
311                end += 1;
312            }
313
314            let len = end - start;
315            let offset = start as IdxSize;
316
317            Ok((offset, len as IdxSize))
318        }))
319}
320
321// this one is correct for all lookbehind/lookaheads, but is slower
322// window is completely behind t and t itself is not a member
323// ---------------t---
324//  [---]
325pub(crate) fn group_by_values_iter_window_behind_t(
326    period: Duration,
327    offset: Duration,
328    time: &[i64],
329    closed_window: ClosedWindow,
330    tu: TimeUnit,
331    tz: Option<Tz>,
332) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
333    let add = match tu {
334        TimeUnit::Nanoseconds => Duration::add_ns,
335        TimeUnit::Microseconds => Duration::add_us,
336        TimeUnit::Milliseconds => Duration::add_ms,
337    };
338
339    let mut start = 0;
340    let mut end = start;
341    let mut last = time[0];
342    let mut started = false;
343    time.iter().map(move |lower| {
344        // Fast path for duplicates.
345        if *lower == last && started {
346            let len = end - start;
347            let offset = start as IdxSize;
348            return Ok((offset, len as IdxSize));
349        }
350        last = *lower;
351        started = true;
352        let lower = add(&offset, *lower, tz.as_ref())?;
353        let upper = add(&period, lower, tz.as_ref())?;
354
355        let b = Bounds::new(lower, upper);
356        if b.is_future(time[0], closed_window) {
357            Ok((0, 0))
358        } else {
359            for &t in &time[start..] {
360                if b.is_member_entry(t, closed_window) {
361                    break;
362                }
363                start += 1;
364            }
365
366            end = std::cmp::max(start, end);
367            for &t in &time[end..] {
368                if !b.is_member_exit(t, closed_window) {
369                    break;
370                }
371                end += 1;
372            }
373
374            let len = end - start;
375            let offset = start as IdxSize;
376
377            Ok((offset, len as IdxSize))
378        }
379    })
380}
381
382// window is with -1 periods of t
383// ----t---
384//  [---]
385pub(crate) fn group_by_values_iter_partial_lookbehind(
386    period: Duration,
387    offset: Duration,
388    time: &[i64],
389    closed_window: ClosedWindow,
390    tu: TimeUnit,
391    tz: Option<Tz>,
392) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
393    let add = match tu {
394        TimeUnit::Nanoseconds => Duration::add_ns,
395        TimeUnit::Microseconds => Duration::add_us,
396        TimeUnit::Milliseconds => Duration::add_ms,
397    };
398
399    let mut start = 0;
400    let mut end = start;
401    let mut last = time[0];
402    time.iter().enumerate().map(move |(i, lower)| {
403        // Fast path for duplicates.
404        if *lower == last && i > 0 {
405            let len = end - start;
406            let offset = start as IdxSize;
407            return Ok((offset, len as IdxSize));
408        }
409        last = *lower;
410
411        let lower = add(&offset, *lower, tz.as_ref())?;
412        let upper = add(&period, lower, tz.as_ref())?;
413
414        let b = Bounds::new(lower, upper);
415
416        for &t in &time[start..] {
417            if b.is_member_entry(t, closed_window) || start == i {
418                break;
419            }
420            start += 1;
421        }
422
423        end = std::cmp::max(start, end);
424        for &t in &time[end..] {
425            if !b.is_member_exit(t, closed_window) {
426                break;
427            }
428            end += 1;
429        }
430
431        let len = end - start;
432        let offset = start as IdxSize;
433
434        Ok((offset, len as IdxSize))
435    })
436}
437
438#[allow(clippy::too_many_arguments)]
439// window is completely ahead of t and t itself is not a member
440// --t-----------
441//        [---]
442pub(crate) fn group_by_values_iter_lookahead(
443    period: Duration,
444    offset: Duration,
445    time: &[i64],
446    closed_window: ClosedWindow,
447    tu: TimeUnit,
448    tz: Option<Tz>,
449    start_offset: usize,
450    upper_bound: Option<usize>,
451) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
452    let upper_bound = upper_bound.unwrap_or(time.len());
453
454    let add = match tu {
455        TimeUnit::Nanoseconds => Duration::add_ns,
456        TimeUnit::Microseconds => Duration::add_us,
457        TimeUnit::Milliseconds => Duration::add_ms,
458    };
459    let mut start = start_offset;
460    let mut end = start;
461
462    let mut last = time[start_offset];
463    let mut started = false;
464    time[start_offset..upper_bound].iter().map(move |lower| {
465        // Fast path for duplicates.
466        if *lower == last && started {
467            let len = end - start;
468            let offset = start as IdxSize;
469            return Ok((offset, len as IdxSize));
470        }
471        started = true;
472        last = *lower;
473
474        let lower = add(&offset, *lower, tz.as_ref())?;
475        let upper = add(&period, lower, tz.as_ref())?;
476
477        let b = Bounds::new(lower, upper);
478
479        for &t in &time[start..] {
480            if b.is_member_entry(t, closed_window) {
481                break;
482            }
483            start += 1;
484        }
485
486        end = std::cmp::max(start, end);
487        for &t in &time[end..] {
488            if !b.is_member_exit(t, closed_window) {
489                break;
490            }
491            end += 1;
492        }
493
494        let len = end - start;
495        let offset = start as IdxSize;
496
497        Ok((offset, len as IdxSize))
498    })
499}
500
501#[cfg(feature = "rolling_window_by")]
502#[inline]
503pub(crate) fn group_by_values_iter(
504    period: Duration,
505    time: &[i64],
506    closed_window: ClosedWindow,
507    tu: TimeUnit,
508    tz: Option<Tz>,
509) -> PolarsResult<impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_> {
510    let mut offset = period;
511    offset.negative = true;
512    // t is at the right endpoint of the window
513    group_by_values_iter_lookbehind(period, offset, time, closed_window, tu, tz, 0, None)
514}
515
516/// Checks if the boundary elements don't split on duplicates.
517/// If they do we remove them
518fn prune_splits_on_duplicates(time: &[i64], thread_offsets: &mut Vec<(usize, usize)>) {
519    let is_valid = |window: &[(usize, usize)]| -> bool {
520        debug_assert_eq!(window.len(), 2);
521        let left_block_end = window[0].0 + window[0].1.saturating_sub(1);
522        let right_block_start = window[1].0;
523        time[left_block_end] != time[right_block_start]
524    };
525
526    if time.is_empty() || thread_offsets.len() <= 1 || thread_offsets.windows(2).all(is_valid) {
527        return;
528    }
529
530    let mut new = vec![];
531    for window in thread_offsets.windows(2) {
532        let this_block_is_valid = is_valid(window);
533        if this_block_is_valid {
534            // Only push left block
535            new.push(window[0])
536        }
537    }
538    // Check last block
539    if thread_offsets.len().is_multiple_of(2) {
540        let window = &thread_offsets[thread_offsets.len() - 2..];
541        if is_valid(window) {
542            new.push(thread_offsets[thread_offsets.len() - 1])
543        }
544    }
545    // We pruned invalid blocks, now we must correct the lengths.
546    if new.len() <= 1 {
547        new = vec![(0, time.len())];
548    } else {
549        let mut previous_start = time.len();
550        for window in new.iter_mut().rev() {
551            window.1 = previous_start - window.0;
552            previous_start = window.0;
553        }
554        new[0].0 = 0;
555        new[0].1 = new[1].0;
556        debug_assert_eq!(new.iter().map(|w| w.1).sum::<usize>(), time.len());
557        // Call again to check.
558        prune_splits_on_duplicates(time, &mut new)
559    }
560    std::mem::swap(thread_offsets, &mut new);
561}
562
563#[allow(clippy::too_many_arguments)]
564fn group_by_values_iter_lookbehind_collected(
565    period: Duration,
566    offset: Duration,
567    time: &[i64],
568    closed_window: ClosedWindow,
569    tu: TimeUnit,
570    tz: Option<Tz>,
571    start_offset: usize,
572    upper_bound: Option<usize>,
573) -> PolarsResult<Vec<[IdxSize; 2]>> {
574    let iter = group_by_values_iter_lookbehind(
575        period,
576        offset,
577        time,
578        closed_window,
579        tu,
580        tz,
581        start_offset,
582        upper_bound,
583    )?;
584    iter.map(|result| result.map(|(offset, len)| [offset, len]))
585        .collect::<PolarsResult<Vec<_>>>()
586}
587
588#[allow(clippy::too_many_arguments)]
589pub(crate) fn group_by_values_iter_lookahead_collected(
590    period: Duration,
591    offset: Duration,
592    time: &[i64],
593    closed_window: ClosedWindow,
594    tu: TimeUnit,
595    tz: Option<Tz>,
596    start_offset: usize,
597    upper_bound: Option<usize>,
598) -> PolarsResult<Vec<[IdxSize; 2]>> {
599    let iter = group_by_values_iter_lookahead(
600        period,
601        offset,
602        time,
603        closed_window,
604        tu,
605        tz,
606        start_offset,
607        upper_bound,
608    );
609    iter.map(|result| result.map(|(offset, len)| [offset as IdxSize, len]))
610        .collect::<PolarsResult<Vec<_>>>()
611}
612
613/// Different from `group_by_windows`, where define window buckets and search which values fit that
614/// pre-defined bucket.
615///
616/// This function defines every window based on the:
617///     - timestamp (lower bound)
618///     - timestamp + period (upper bound)
619/// where timestamps are the individual values in the array `time`
620pub fn group_by_values(
621    period: Duration,
622    offset: Duration,
623    time: &[i64],
624    closed_window: ClosedWindow,
625    tu: TimeUnit,
626    tz: Option<Tz>,
627) -> PolarsResult<GroupsSlice> {
628    if time.is_empty() {
629        return Ok(GroupsSlice::from(vec![]));
630    }
631
632    let mut thread_offsets = _split_offsets(time.len(), POOL.current_num_threads());
633    // there are duplicates in the splits, so we opt for a single partition
634    prune_splits_on_duplicates(time, &mut thread_offsets);
635
636    // If we start from within parallel work we will do this single threaded.
637    let run_parallel = !POOL.current_thread_has_pending_tasks().unwrap_or(false);
638
639    // we have a (partial) lookbehind window
640    if offset.negative && !offset.is_zero() {
641        // lookbehind
642        if offset.duration_ns() == period.duration_ns() {
643            // t is right at the end of the window
644            // ------t---
645            // [------]
646            if !run_parallel {
647                let vecs = group_by_values_iter_lookbehind_collected(
648                    period,
649                    offset,
650                    time,
651                    closed_window,
652                    tu,
653                    tz,
654                    0,
655                    None,
656                )?;
657                return Ok(GroupsSlice::from(vecs));
658            }
659
660            POOL.install(|| {
661                let vals = thread_offsets
662                    .par_iter()
663                    .copied()
664                    .map(|(base_offset, len)| {
665                        let upper_bound = base_offset + len;
666                        group_by_values_iter_lookbehind_collected(
667                            period,
668                            offset,
669                            time,
670                            closed_window,
671                            tu,
672                            tz,
673                            base_offset,
674                            Some(upper_bound),
675                        )
676                    })
677                    .collect::<PolarsResult<Vec<_>>>()?;
678                Ok(flatten_par(&vals))
679            })
680        } else if ((offset.duration_ns() >= period.duration_ns())
681            && matches!(closed_window, ClosedWindow::Left | ClosedWindow::None))
682            || ((offset.duration_ns() > period.duration_ns())
683                && matches!(closed_window, ClosedWindow::Right | ClosedWindow::Both))
684        {
685            // window is completely behind t and t itself is not a member
686            // ---------------t---
687            //  [---]
688            let iter =
689                group_by_values_iter_window_behind_t(period, offset, time, closed_window, tu, tz);
690            iter.map(|result| result.map(|(offset, len)| [offset, len]))
691                .collect::<PolarsResult<_>>()
692        }
693        // partial lookbehind
694        // this one is still single threaded
695        // can make it parallel later, its a bit more complicated because the boundaries are unknown
696        // window is with -1 periods of t
697        // ----t---
698        //  [---]
699        else {
700            let iter = group_by_values_iter_partial_lookbehind(
701                period,
702                offset,
703                time,
704                closed_window,
705                tu,
706                tz,
707            );
708            iter.map(|result| result.map(|(offset, len)| [offset, len]))
709                .collect::<PolarsResult<_>>()
710        }
711    } else if !offset.is_zero()
712        || closed_window == ClosedWindow::Right
713        || closed_window == ClosedWindow::None
714    {
715        // window is completely ahead of t and t itself is not a member
716        // --t-----------
717        //        [---]
718
719        if !run_parallel {
720            let vecs = group_by_values_iter_lookahead_collected(
721                period,
722                offset,
723                time,
724                closed_window,
725                tu,
726                tz,
727                0,
728                None,
729            )?;
730            return Ok(GroupsSlice::from(vecs));
731        }
732
733        POOL.install(|| {
734            let vals = thread_offsets
735                .par_iter()
736                .copied()
737                .map(|(base_offset, len)| {
738                    let lower_bound = base_offset;
739                    let upper_bound = base_offset + len;
740                    group_by_values_iter_lookahead_collected(
741                        period,
742                        offset,
743                        time,
744                        closed_window,
745                        tu,
746                        tz,
747                        lower_bound,
748                        Some(upper_bound),
749                    )
750                })
751                .collect::<PolarsResult<Vec<_>>>()?;
752            Ok(flatten_par(&vals))
753        })
754    } else {
755        if !run_parallel {
756            let vecs = group_by_values_iter_lookahead_collected(
757                period,
758                offset,
759                time,
760                closed_window,
761                tu,
762                tz,
763                0,
764                None,
765            )?;
766            return Ok(GroupsSlice::from(vecs));
767        }
768
769        // Offset is 0 and window is closed on the left:
770        // it must be that the window starts at t and t is a member
771        // --t-----------
772        //  [---]
773        POOL.install(|| {
774            let vals = thread_offsets
775                .par_iter()
776                .copied()
777                .map(|(base_offset, len)| {
778                    let lower_bound = base_offset;
779                    let upper_bound = base_offset + len;
780                    group_by_values_iter_lookahead_collected(
781                        period,
782                        offset,
783                        time,
784                        closed_window,
785                        tu,
786                        tz,
787                        lower_bound,
788                        Some(upper_bound),
789                    )
790                })
791                .collect::<PolarsResult<Vec<_>>>()?;
792            Ok(flatten_par(&vals))
793        })
794    }
795}
796
797pub struct RollingWindower {
798    period: Duration,
799    offset: Duration,
800    closed: ClosedWindow,
801
802    add: fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
803    tz: Option<Tz>,
804
805    start: IdxSize,
806    end: IdxSize,
807    length: IdxSize,
808
809    active: VecDeque<ActiveWindow>,
810}
811
812struct ActiveWindow {
813    start: i64,
814    end: i64,
815}
816
817impl ActiveWindow {
818    #[inline(always)]
819    fn above_lower_bound(&self, t: i64, closed: ClosedWindow) -> bool {
820        (t > self.start)
821            | (matches!(closed, ClosedWindow::Left | ClosedWindow::Both) & (t == self.start))
822    }
823
824    #[inline(always)]
825    fn below_upper_bound(&self, t: i64, closed: ClosedWindow) -> bool {
826        (t < self.end)
827            | (matches!(closed, ClosedWindow::Right | ClosedWindow::Both) & (t == self.end))
828    }
829}
830
831fn skip_in_2d_list(l: &[&[i64]], mut n: usize) -> (usize, usize) {
832    let mut y = 0;
833    while y < l.len() && (n >= l[y].len() || l[y].is_empty()) {
834        n -= l[y].len();
835        y += 1;
836    }
837    assert!(n == 0 || y < l.len());
838    (n, y)
839}
840fn increment_2d(x: &mut usize, y: &mut usize, l: &[&[i64]]) {
841    *x += 1;
842    while *y < l.len() && *x == l[*y].len() {
843        *y += 1;
844        *x = 0;
845    }
846}
847
848impl RollingWindower {
849    pub fn new(
850        period: Duration,
851        offset: Duration,
852        closed: ClosedWindow,
853        tu: TimeUnit,
854        tz: Option<Tz>,
855    ) -> Self {
856        Self {
857            period,
858            offset,
859            closed,
860
861            add: match tu {
862                TimeUnit::Nanoseconds => Duration::add_ns,
863                TimeUnit::Microseconds => Duration::add_us,
864                TimeUnit::Milliseconds => Duration::add_ms,
865            },
866            tz,
867
868            start: 0,
869            end: 0,
870            length: 0,
871
872            active: Default::default(),
873        }
874    }
875
876    /// Insert new values into the windower.
877    ///
878    /// This should be given all the old values that were not processed yet.
879    pub fn insert(
880        &mut self,
881        time: &[&[i64]],
882        windows: &mut Vec<[IdxSize; 2]>,
883    ) -> PolarsResult<IdxSize> {
884        let (mut i_x, mut i_y) = skip_in_2d_list(time, (self.length - self.start) as usize);
885        let (mut s_x, mut s_y) = skip_in_2d_list(time, 0); // skip over empty lists
886        let (mut e_x, mut e_y) = skip_in_2d_list(time, (self.end - self.start) as usize);
887
888        let time_start = self.start;
889        let mut i = self.length;
890        while i_y < time.len() {
891            let t = time[i_y][i_x];
892            let window_start = (self.add)(&self.offset, t, self.tz.as_ref())?;
893            // For datetime arithmetic, it does *NOT* hold 0 + a - a == 0. Therefore, we make sure
894            // that if `offset` and `period` are inverses we keep the `t`.
895            let window_end = if self.offset == -self.period {
896                t
897            } else {
898                (self.add)(&self.period, window_start, self.tz.as_ref())?
899            };
900
901            self.active.push_back(ActiveWindow {
902                start: window_start,
903                end: window_end,
904            });
905
906            while let Some(w) = self.active.front() {
907                if w.below_upper_bound(t, self.closed) {
908                    break;
909                }
910
911                let w = self.active.pop_front().unwrap();
912                while self.start < i && !w.above_lower_bound(time[s_y][s_x], self.closed) {
913                    increment_2d(&mut s_x, &mut s_y, time);
914                    self.start += 1;
915                }
916                while self.end < i && w.below_upper_bound(time[e_y][e_x], self.closed) {
917                    increment_2d(&mut e_x, &mut e_y, time);
918                    self.end += 1;
919                }
920                windows.push([self.start, self.end - self.start]);
921            }
922
923            increment_2d(&mut i_x, &mut i_y, time);
924            i += 1;
925        }
926
927        self.length = i;
928        Ok(self.start - time_start)
929    }
930
931    /// Process all remaining items and signal that no more items are coming.
932    pub fn finalize(&mut self, time: &[&[i64]], windows: &mut Vec<[IdxSize; 2]>) {
933        assert_eq!(
934            time.iter().map(|t| t.len()).sum::<usize>() as IdxSize,
935            self.length - self.start
936        );
937
938        let (mut s_x, mut s_y) = skip_in_2d_list(time, 0);
939        let (mut e_x, mut e_y) = skip_in_2d_list(time, (self.end - self.start) as usize);
940
941        windows.extend(self.active.drain(..).map(|w| {
942            while self.start < self.length && !w.above_lower_bound(time[s_y][s_x], self.closed) {
943                increment_2d(&mut s_x, &mut s_y, time);
944                self.start += 1;
945            }
946            while self.end < self.length && w.below_upper_bound(time[e_y][e_x], self.closed) {
947                increment_2d(&mut e_x, &mut e_y, time);
948                self.end += 1;
949            }
950            [self.start, self.end - self.start]
951        }));
952
953        self.start = 0;
954        self.end = 0;
955        self.length = 0;
956    }
957}
958
959#[cfg(test)]
960mod test {
961    use super::*;
962
963    #[test]
964    fn test_prune_duplicates() {
965        //                     |--|------------|----|---------|
966        //                     0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
967        let time = &[0, 1, 1, 2, 2, 2, 3, 4, 5, 6, 5];
968        let mut splits = vec![(0, 2), (2, 4), (6, 2), (8, 3)];
969        prune_splits_on_duplicates(time, &mut splits);
970        assert_eq!(splits, &[(0, 6), (6, 2), (8, 3)]);
971    }
972}