1use std::collections::VecDeque;
2
3use arrow::legacy::time_zone::Tz;
4use arrow::trusted_len::TrustedLen;
5use polars_core::POOL;
6use polars_core::prelude::*;
7use polars_core::utils::_split_offsets;
8use polars_core::utils::flatten::flatten_par;
9use rayon::prelude::*;
10#[cfg(feature = "serde")]
11use serde::{Deserialize, Serialize};
12use strum_macros::IntoStaticStr;
13
14use crate::prelude::*;
15
16#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
17#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
18#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
19#[strum(serialize_all = "snake_case")]
20pub enum ClosedWindow {
21 Left,
22 Right,
23 Both,
24 None,
25}
26
27#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
28#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
29#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
30#[strum(serialize_all = "snake_case")]
31pub enum Label {
32 Left,
33 Right,
34 DataPoint,
35}
36
37#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
38#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
39#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
40#[strum(serialize_all = "snake_case")]
41#[derive(Default)]
42pub enum StartBy {
43 #[default]
44 WindowBound,
45 DataPoint,
46 Monday,
48 Tuesday,
49 Wednesday,
50 Thursday,
51 Friday,
52 Saturday,
53 Sunday,
54}
55
56impl StartBy {
57 pub fn weekday(&self) -> Option<u32> {
58 match self {
59 StartBy::Monday => Some(0),
60 StartBy::Tuesday => Some(1),
61 StartBy::Wednesday => Some(2),
62 StartBy::Thursday => Some(3),
63 StartBy::Friday => Some(4),
64 StartBy::Saturday => Some(5),
65 StartBy::Sunday => Some(6),
66 _ => None,
67 }
68 }
69}
70
71#[allow(clippy::too_many_arguments)]
72fn update_groups_and_bounds(
73 bounds_iter: BoundsIter<'_>,
74 mut start: usize,
75 time: &[i64],
76 closed_window: ClosedWindow,
77 include_lower_bound: bool,
78 include_upper_bound: bool,
79 lower_bound: &mut Vec<i64>,
80 upper_bound: &mut Vec<i64>,
81 groups: &mut Vec<[IdxSize; 2]>,
82) {
83 let mut iter = bounds_iter.into_iter();
84 let mut stride = 0;
85
86 'bounds: while let Some(bi) = iter.nth(stride) {
87 let mut has_member = false;
88 for &t in &time[start..time.len().saturating_sub(1)] {
90 if bi.is_future(t, closed_window) {
92 stride = iter.get_stride(t);
93 continue 'bounds;
94 }
95 if bi.is_member_entry(t, closed_window) {
96 has_member = true;
97 break;
98 }
99 start += 1;
101 }
102
103 stride = if has_member {
105 0
106 } else {
107 debug_assert!(start < time.len());
108 iter.get_stride(time[start])
109 };
110
111 let mut end = start;
113
114 if end == time.len() - 1 {
116 let t = time[end];
117 if bi.is_member(t, closed_window) {
118 if include_lower_bound {
119 lower_bound.push(bi.start);
120 }
121 if include_upper_bound {
122 upper_bound.push(bi.stop);
123 }
124 groups.push([end as IdxSize, 1])
125 }
126 continue;
127 }
128 for &t in &time[end..] {
129 if !bi.is_member_exit(t, closed_window) {
130 break;
131 }
132 end += 1;
133 }
134 let len = end - start;
135
136 if include_lower_bound {
137 lower_bound.push(bi.start);
138 }
139 if include_upper_bound {
140 upper_bound.push(bi.stop);
141 }
142 groups.push([start as IdxSize, len as IdxSize])
143 }
144}
145
146#[allow(clippy::too_many_arguments)]
158pub fn group_by_windows(
159 window: Window,
160 time: &[i64],
161 closed_window: ClosedWindow,
162 tu: TimeUnit,
163 tz: &Option<TimeZone>,
164 include_lower_bound: bool,
165 include_upper_bound: bool,
166 start_by: StartBy,
167) -> PolarsResult<(GroupsSlice, Vec<i64>, Vec<i64>)> {
168 let start = time[0];
169 let boundary = if time.len() > 1 {
173 let stop = time[time.len() - 1] + 1;
175 Bounds::new_checked(start, stop)
176 } else {
177 let stop = start + 1;
178 Bounds::new_checked(start, stop)
179 };
180
181 let size = {
182 match tu {
183 TimeUnit::Nanoseconds => window.estimate_overlapping_bounds_ns(boundary),
184 TimeUnit::Microseconds => window.estimate_overlapping_bounds_us(boundary),
185 TimeUnit::Milliseconds => window.estimate_overlapping_bounds_ms(boundary),
186 }
187 };
188 let size_lower = if include_lower_bound { size } else { 0 };
189 let size_upper = if include_upper_bound { size } else { 0 };
190 let mut lower_bound = Vec::with_capacity(size_lower);
191 let mut upper_bound = Vec::with_capacity(size_upper);
192
193 let mut groups = Vec::with_capacity(size);
194 let start_offset = 0;
195
196 match tz {
197 #[cfg(feature = "timezones")]
198 Some(tz) => {
199 update_groups_and_bounds(
200 window.get_overlapping_bounds_iter(
201 boundary,
202 closed_window,
203 tu,
204 tz.parse::<Tz>().ok().as_ref(),
205 start_by,
206 )?,
207 start_offset,
208 time,
209 closed_window,
210 include_lower_bound,
211 include_upper_bound,
212 &mut lower_bound,
213 &mut upper_bound,
214 &mut groups,
215 );
216 },
217 _ => {
218 update_groups_and_bounds(
219 window.get_overlapping_bounds_iter(boundary, closed_window, tu, None, start_by)?,
220 start_offset,
221 time,
222 closed_window,
223 include_lower_bound,
224 include_upper_bound,
225 &mut lower_bound,
226 &mut upper_bound,
227 &mut groups,
228 );
229 },
230 };
231
232 Ok((groups, lower_bound, upper_bound))
233}
234
235#[inline]
239#[allow(clippy::too_many_arguments)]
240pub(crate) fn group_by_values_iter_lookbehind(
241 period: Duration,
242 offset: Duration,
243 time: &[i64],
244 closed_window: ClosedWindow,
245 tu: TimeUnit,
246 tz: Option<Tz>,
247 start_offset: usize,
248 upper_bound: Option<usize>,
249) -> PolarsResult<impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_> {
250 debug_assert!(offset.duration_ns() == period.duration_ns());
251 debug_assert!(offset.negative);
252 let add = match tu {
253 TimeUnit::Nanoseconds => Duration::add_ns,
254 TimeUnit::Microseconds => Duration::add_us,
255 TimeUnit::Milliseconds => Duration::add_ms,
256 };
257
258 let upper_bound = upper_bound.unwrap_or(time.len());
259 let mut start = if let Some(&t) = time.get(start_offset) {
261 let lower = add(&offset, t, tz.as_ref())?;
262 let upper = t;
267 let b = Bounds::new(lower, upper);
268 let slice = &time[..start_offset];
269 slice.partition_point(|v| !b.is_member(*v, closed_window))
270 } else {
271 0
272 };
273 let mut end = start;
274 let mut last = time[start_offset];
275 Ok(time[start_offset..upper_bound]
276 .iter()
277 .enumerate()
278 .map(move |(mut i, t)| {
279 if *t == last && i > 0 {
281 let len = end - start;
282 let offset = start as IdxSize;
283 return Ok((offset, len as IdxSize));
284 }
285 last = *t;
286 i += start_offset;
287
288 let lower = add(&offset, *t, tz.as_ref())?;
289 let upper = *t;
290
291 let b = Bounds::new(lower, upper);
292
293 for &t in unsafe { time.get_unchecked(start..i) } {
294 if b.is_member_entry(t, closed_window) {
295 break;
296 }
297 start += 1;
298 }
299
300 if b.is_member_exit(*t, closed_window) {
302 end = i;
303 } else {
304 end = std::cmp::max(end, start);
305 }
306 for &t in unsafe { time.get_unchecked(end..) } {
308 if !b.is_member_exit(t, closed_window) {
309 break;
310 }
311 end += 1;
312 }
313
314 let len = end - start;
315 let offset = start as IdxSize;
316
317 Ok((offset, len as IdxSize))
318 }))
319}
320
321pub(crate) fn group_by_values_iter_window_behind_t(
326 period: Duration,
327 offset: Duration,
328 time: &[i64],
329 closed_window: ClosedWindow,
330 tu: TimeUnit,
331 tz: Option<Tz>,
332) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
333 let add = match tu {
334 TimeUnit::Nanoseconds => Duration::add_ns,
335 TimeUnit::Microseconds => Duration::add_us,
336 TimeUnit::Milliseconds => Duration::add_ms,
337 };
338
339 let mut start = 0;
340 let mut end = start;
341 let mut last = time[0];
342 let mut started = false;
343 time.iter().map(move |lower| {
344 if *lower == last && started {
346 let len = end - start;
347 let offset = start as IdxSize;
348 return Ok((offset, len as IdxSize));
349 }
350 last = *lower;
351 started = true;
352 let lower = add(&offset, *lower, tz.as_ref())?;
353 let upper = add(&period, lower, tz.as_ref())?;
354
355 let b = Bounds::new(lower, upper);
356 if b.is_future(time[0], closed_window) {
357 Ok((0, 0))
358 } else {
359 for &t in &time[start..] {
360 if b.is_member_entry(t, closed_window) {
361 break;
362 }
363 start += 1;
364 }
365
366 end = std::cmp::max(start, end);
367 for &t in &time[end..] {
368 if !b.is_member_exit(t, closed_window) {
369 break;
370 }
371 end += 1;
372 }
373
374 let len = end - start;
375 let offset = start as IdxSize;
376
377 Ok((offset, len as IdxSize))
378 }
379 })
380}
381
382pub(crate) fn group_by_values_iter_partial_lookbehind(
386 period: Duration,
387 offset: Duration,
388 time: &[i64],
389 closed_window: ClosedWindow,
390 tu: TimeUnit,
391 tz: Option<Tz>,
392) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
393 let add = match tu {
394 TimeUnit::Nanoseconds => Duration::add_ns,
395 TimeUnit::Microseconds => Duration::add_us,
396 TimeUnit::Milliseconds => Duration::add_ms,
397 };
398
399 let mut start = 0;
400 let mut end = start;
401 let mut last = time[0];
402 time.iter().enumerate().map(move |(i, lower)| {
403 if *lower == last && i > 0 {
405 let len = end - start;
406 let offset = start as IdxSize;
407 return Ok((offset, len as IdxSize));
408 }
409 last = *lower;
410
411 let lower = add(&offset, *lower, tz.as_ref())?;
412 let upper = add(&period, lower, tz.as_ref())?;
413
414 let b = Bounds::new(lower, upper);
415
416 for &t in &time[start..] {
417 if b.is_member_entry(t, closed_window) || start == i {
418 break;
419 }
420 start += 1;
421 }
422
423 end = std::cmp::max(start, end);
424 for &t in &time[end..] {
425 if !b.is_member_exit(t, closed_window) {
426 break;
427 }
428 end += 1;
429 }
430
431 let len = end - start;
432 let offset = start as IdxSize;
433
434 Ok((offset, len as IdxSize))
435 })
436}
437
438#[allow(clippy::too_many_arguments)]
439pub(crate) fn group_by_values_iter_lookahead(
443 period: Duration,
444 offset: Duration,
445 time: &[i64],
446 closed_window: ClosedWindow,
447 tu: TimeUnit,
448 tz: Option<Tz>,
449 start_offset: usize,
450 upper_bound: Option<usize>,
451) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
452 let upper_bound = upper_bound.unwrap_or(time.len());
453
454 let add = match tu {
455 TimeUnit::Nanoseconds => Duration::add_ns,
456 TimeUnit::Microseconds => Duration::add_us,
457 TimeUnit::Milliseconds => Duration::add_ms,
458 };
459 let mut start = start_offset;
460 let mut end = start;
461
462 let mut last = time[start_offset];
463 let mut started = false;
464 time[start_offset..upper_bound].iter().map(move |lower| {
465 if *lower == last && started {
467 let len = end - start;
468 let offset = start as IdxSize;
469 return Ok((offset, len as IdxSize));
470 }
471 started = true;
472 last = *lower;
473
474 let lower = add(&offset, *lower, tz.as_ref())?;
475 let upper = add(&period, lower, tz.as_ref())?;
476
477 let b = Bounds::new(lower, upper);
478
479 for &t in &time[start..] {
480 if b.is_member_entry(t, closed_window) {
481 break;
482 }
483 start += 1;
484 }
485
486 end = std::cmp::max(start, end);
487 for &t in &time[end..] {
488 if !b.is_member_exit(t, closed_window) {
489 break;
490 }
491 end += 1;
492 }
493
494 let len = end - start;
495 let offset = start as IdxSize;
496
497 Ok((offset, len as IdxSize))
498 })
499}
500
501#[cfg(feature = "rolling_window_by")]
502#[inline]
503pub(crate) fn group_by_values_iter(
504 period: Duration,
505 time: &[i64],
506 closed_window: ClosedWindow,
507 tu: TimeUnit,
508 tz: Option<Tz>,
509) -> PolarsResult<impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_> {
510 let mut offset = period;
511 offset.negative = true;
512 group_by_values_iter_lookbehind(period, offset, time, closed_window, tu, tz, 0, None)
514}
515
516fn prune_splits_on_duplicates(time: &[i64], thread_offsets: &mut Vec<(usize, usize)>) {
519 let is_valid = |window: &[(usize, usize)]| -> bool {
520 debug_assert_eq!(window.len(), 2);
521 let left_block_end = window[0].0 + window[0].1.saturating_sub(1);
522 let right_block_start = window[1].0;
523 time[left_block_end] != time[right_block_start]
524 };
525
526 if time.is_empty() || thread_offsets.len() <= 1 || thread_offsets.windows(2).all(is_valid) {
527 return;
528 }
529
530 let mut new = vec![];
531 for window in thread_offsets.windows(2) {
532 let this_block_is_valid = is_valid(window);
533 if this_block_is_valid {
534 new.push(window[0])
536 }
537 }
538 if thread_offsets.len().is_multiple_of(2) {
540 let window = &thread_offsets[thread_offsets.len() - 2..];
541 if is_valid(window) {
542 new.push(thread_offsets[thread_offsets.len() - 1])
543 }
544 }
545 if new.len() <= 1 {
547 new = vec![(0, time.len())];
548 } else {
549 let mut previous_start = time.len();
550 for window in new.iter_mut().rev() {
551 window.1 = previous_start - window.0;
552 previous_start = window.0;
553 }
554 new[0].0 = 0;
555 new[0].1 = new[1].0;
556 debug_assert_eq!(new.iter().map(|w| w.1).sum::<usize>(), time.len());
557 prune_splits_on_duplicates(time, &mut new)
559 }
560 std::mem::swap(thread_offsets, &mut new);
561}
562
563#[allow(clippy::too_many_arguments)]
564fn group_by_values_iter_lookbehind_collected(
565 period: Duration,
566 offset: Duration,
567 time: &[i64],
568 closed_window: ClosedWindow,
569 tu: TimeUnit,
570 tz: Option<Tz>,
571 start_offset: usize,
572 upper_bound: Option<usize>,
573) -> PolarsResult<Vec<[IdxSize; 2]>> {
574 let iter = group_by_values_iter_lookbehind(
575 period,
576 offset,
577 time,
578 closed_window,
579 tu,
580 tz,
581 start_offset,
582 upper_bound,
583 )?;
584 iter.map(|result| result.map(|(offset, len)| [offset, len]))
585 .collect::<PolarsResult<Vec<_>>>()
586}
587
588#[allow(clippy::too_many_arguments)]
589pub(crate) fn group_by_values_iter_lookahead_collected(
590 period: Duration,
591 offset: Duration,
592 time: &[i64],
593 closed_window: ClosedWindow,
594 tu: TimeUnit,
595 tz: Option<Tz>,
596 start_offset: usize,
597 upper_bound: Option<usize>,
598) -> PolarsResult<Vec<[IdxSize; 2]>> {
599 let iter = group_by_values_iter_lookahead(
600 period,
601 offset,
602 time,
603 closed_window,
604 tu,
605 tz,
606 start_offset,
607 upper_bound,
608 );
609 iter.map(|result| result.map(|(offset, len)| [offset as IdxSize, len]))
610 .collect::<PolarsResult<Vec<_>>>()
611}
612
613pub fn group_by_values(
621 period: Duration,
622 offset: Duration,
623 time: &[i64],
624 closed_window: ClosedWindow,
625 tu: TimeUnit,
626 tz: Option<Tz>,
627) -> PolarsResult<GroupsSlice> {
628 if time.is_empty() {
629 return Ok(GroupsSlice::from(vec![]));
630 }
631
632 let mut thread_offsets = _split_offsets(time.len(), POOL.current_num_threads());
633 prune_splits_on_duplicates(time, &mut thread_offsets);
635
636 let run_parallel = !POOL.current_thread_has_pending_tasks().unwrap_or(false);
638
639 if offset.negative && !offset.is_zero() {
641 if offset.duration_ns() == period.duration_ns() {
643 if !run_parallel {
647 let vecs = group_by_values_iter_lookbehind_collected(
648 period,
649 offset,
650 time,
651 closed_window,
652 tu,
653 tz,
654 0,
655 None,
656 )?;
657 return Ok(GroupsSlice::from(vecs));
658 }
659
660 POOL.install(|| {
661 let vals = thread_offsets
662 .par_iter()
663 .copied()
664 .map(|(base_offset, len)| {
665 let upper_bound = base_offset + len;
666 group_by_values_iter_lookbehind_collected(
667 period,
668 offset,
669 time,
670 closed_window,
671 tu,
672 tz,
673 base_offset,
674 Some(upper_bound),
675 )
676 })
677 .collect::<PolarsResult<Vec<_>>>()?;
678 Ok(flatten_par(&vals))
679 })
680 } else if ((offset.duration_ns() >= period.duration_ns())
681 && matches!(closed_window, ClosedWindow::Left | ClosedWindow::None))
682 || ((offset.duration_ns() > period.duration_ns())
683 && matches!(closed_window, ClosedWindow::Right | ClosedWindow::Both))
684 {
685 let iter =
689 group_by_values_iter_window_behind_t(period, offset, time, closed_window, tu, tz);
690 iter.map(|result| result.map(|(offset, len)| [offset, len]))
691 .collect::<PolarsResult<_>>()
692 }
693 else {
700 let iter = group_by_values_iter_partial_lookbehind(
701 period,
702 offset,
703 time,
704 closed_window,
705 tu,
706 tz,
707 );
708 iter.map(|result| result.map(|(offset, len)| [offset, len]))
709 .collect::<PolarsResult<_>>()
710 }
711 } else if !offset.is_zero()
712 || closed_window == ClosedWindow::Right
713 || closed_window == ClosedWindow::None
714 {
715 if !run_parallel {
720 let vecs = group_by_values_iter_lookahead_collected(
721 period,
722 offset,
723 time,
724 closed_window,
725 tu,
726 tz,
727 0,
728 None,
729 )?;
730 return Ok(GroupsSlice::from(vecs));
731 }
732
733 POOL.install(|| {
734 let vals = thread_offsets
735 .par_iter()
736 .copied()
737 .map(|(base_offset, len)| {
738 let lower_bound = base_offset;
739 let upper_bound = base_offset + len;
740 group_by_values_iter_lookahead_collected(
741 period,
742 offset,
743 time,
744 closed_window,
745 tu,
746 tz,
747 lower_bound,
748 Some(upper_bound),
749 )
750 })
751 .collect::<PolarsResult<Vec<_>>>()?;
752 Ok(flatten_par(&vals))
753 })
754 } else {
755 if !run_parallel {
756 let vecs = group_by_values_iter_lookahead_collected(
757 period,
758 offset,
759 time,
760 closed_window,
761 tu,
762 tz,
763 0,
764 None,
765 )?;
766 return Ok(GroupsSlice::from(vecs));
767 }
768
769 POOL.install(|| {
774 let vals = thread_offsets
775 .par_iter()
776 .copied()
777 .map(|(base_offset, len)| {
778 let lower_bound = base_offset;
779 let upper_bound = base_offset + len;
780 group_by_values_iter_lookahead_collected(
781 period,
782 offset,
783 time,
784 closed_window,
785 tu,
786 tz,
787 lower_bound,
788 Some(upper_bound),
789 )
790 })
791 .collect::<PolarsResult<Vec<_>>>()?;
792 Ok(flatten_par(&vals))
793 })
794 }
795}
796
797pub struct RollingWindower {
798 period: Duration,
799 offset: Duration,
800 closed: ClosedWindow,
801
802 add: fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
803 tz: Option<Tz>,
804
805 start: IdxSize,
806 end: IdxSize,
807 length: IdxSize,
808
809 active: VecDeque<ActiveWindow>,
810}
811
812struct ActiveWindow {
813 start: i64,
814 end: i64,
815}
816
817impl ActiveWindow {
818 #[inline(always)]
819 fn above_lower_bound(&self, t: i64, closed: ClosedWindow) -> bool {
820 (t > self.start)
821 | (matches!(closed, ClosedWindow::Left | ClosedWindow::Both) & (t == self.start))
822 }
823
824 #[inline(always)]
825 fn below_upper_bound(&self, t: i64, closed: ClosedWindow) -> bool {
826 (t < self.end)
827 | (matches!(closed, ClosedWindow::Right | ClosedWindow::Both) & (t == self.end))
828 }
829}
830
831fn skip_in_2d_list(l: &[&[i64]], mut n: usize) -> (usize, usize) {
832 let mut y = 0;
833 while y < l.len() && (n >= l[y].len() || l[y].is_empty()) {
834 n -= l[y].len();
835 y += 1;
836 }
837 assert!(n == 0 || y < l.len());
838 (n, y)
839}
840fn increment_2d(x: &mut usize, y: &mut usize, l: &[&[i64]]) {
841 *x += 1;
842 while *y < l.len() && *x == l[*y].len() {
843 *y += 1;
844 *x = 0;
845 }
846}
847
848impl RollingWindower {
849 pub fn new(
850 period: Duration,
851 offset: Duration,
852 closed: ClosedWindow,
853 tu: TimeUnit,
854 tz: Option<Tz>,
855 ) -> Self {
856 Self {
857 period,
858 offset,
859 closed,
860
861 add: match tu {
862 TimeUnit::Nanoseconds => Duration::add_ns,
863 TimeUnit::Microseconds => Duration::add_us,
864 TimeUnit::Milliseconds => Duration::add_ms,
865 },
866 tz,
867
868 start: 0,
869 end: 0,
870 length: 0,
871
872 active: Default::default(),
873 }
874 }
875
876 pub fn insert(
880 &mut self,
881 time: &[&[i64]],
882 windows: &mut Vec<[IdxSize; 2]>,
883 ) -> PolarsResult<IdxSize> {
884 let (mut i_x, mut i_y) = skip_in_2d_list(time, (self.length - self.start) as usize);
885 let (mut s_x, mut s_y) = skip_in_2d_list(time, 0); let (mut e_x, mut e_y) = skip_in_2d_list(time, (self.end - self.start) as usize);
887
888 let time_start = self.start;
889 let mut i = self.length;
890 while i_y < time.len() {
891 let t = time[i_y][i_x];
892 let window_start = (self.add)(&self.offset, t, self.tz.as_ref())?;
893 let window_end = if self.offset == -self.period {
896 t
897 } else {
898 (self.add)(&self.period, window_start, self.tz.as_ref())?
899 };
900
901 self.active.push_back(ActiveWindow {
902 start: window_start,
903 end: window_end,
904 });
905
906 while let Some(w) = self.active.front() {
907 if w.below_upper_bound(t, self.closed) {
908 break;
909 }
910
911 let w = self.active.pop_front().unwrap();
912 while self.start < i && !w.above_lower_bound(time[s_y][s_x], self.closed) {
913 increment_2d(&mut s_x, &mut s_y, time);
914 self.start += 1;
915 }
916 while self.end < i && w.below_upper_bound(time[e_y][e_x], self.closed) {
917 increment_2d(&mut e_x, &mut e_y, time);
918 self.end += 1;
919 }
920 windows.push([self.start, self.end - self.start]);
921 }
922
923 increment_2d(&mut i_x, &mut i_y, time);
924 i += 1;
925 }
926
927 self.length = i;
928 Ok(self.start - time_start)
929 }
930
931 pub fn finalize(&mut self, time: &[&[i64]], windows: &mut Vec<[IdxSize; 2]>) {
933 assert_eq!(
934 time.iter().map(|t| t.len()).sum::<usize>() as IdxSize,
935 self.length - self.start
936 );
937
938 let (mut s_x, mut s_y) = skip_in_2d_list(time, 0);
939 let (mut e_x, mut e_y) = skip_in_2d_list(time, (self.end - self.start) as usize);
940
941 windows.extend(self.active.drain(..).map(|w| {
942 while self.start < self.length && !w.above_lower_bound(time[s_y][s_x], self.closed) {
943 increment_2d(&mut s_x, &mut s_y, time);
944 self.start += 1;
945 }
946 while self.end < self.length && w.below_upper_bound(time[e_y][e_x], self.closed) {
947 increment_2d(&mut e_x, &mut e_y, time);
948 self.end += 1;
949 }
950 [self.start, self.end - self.start]
951 }));
952
953 self.start = 0;
954 self.end = 0;
955 self.length = 0;
956 }
957}
958
959#[cfg(test)]
960mod test {
961 use super::*;
962
963 #[test]
964 fn test_prune_duplicates() {
965 let time = &[0, 1, 1, 2, 2, 2, 3, 4, 5, 6, 5];
968 let mut splits = vec![(0, 2), (2, 4), (6, 2), (8, 3)];
969 prune_splits_on_duplicates(time, &mut splits);
970 assert_eq!(splits, &[(0, 6), (6, 2), (8, 3)]);
971 }
972}