use std::collections::VecDeque;
use arrow::legacy::time_zone::Tz;
use arrow::temporal_conversions::{
timestamp_ms_to_datetime, timestamp_ns_to_datetime, timestamp_us_to_datetime,
};
use arrow::trusted_len::TrustedLen;
use chrono::NaiveDateTime;
#[cfg(feature = "timezones")]
use chrono::TimeZone as _;
use now::DateTimeNow;
use polars_core::prelude::*;
use polars_core::runtime::RAYON;
use polars_core::utils::_split_offsets;
use polars_core::utils::flatten::flatten_par;
use rayon::prelude::*;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use strum_macros::IntoStaticStr;
use crate::prelude::*;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
#[strum(serialize_all = "snake_case")]
pub enum ClosedWindow {
Left,
Right,
Both,
None,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
#[strum(serialize_all = "snake_case")]
pub enum Label {
Left,
Right,
DataPoint,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
#[strum(serialize_all = "snake_case")]
#[derive(Default)]
pub enum StartBy {
#[default]
WindowBound,
DataPoint,
Monday,
Tuesday,
Wednesday,
Thursday,
Friday,
Saturday,
Sunday,
}
impl StartBy {
pub fn weekday(&self) -> Option<u32> {
match self {
StartBy::Monday => Some(0),
StartBy::Tuesday => Some(1),
StartBy::Wednesday => Some(2),
StartBy::Thursday => Some(3),
StartBy::Friday => Some(4),
StartBy::Saturday => Some(5),
StartBy::Sunday => Some(6),
_ => None,
}
}
}
#[allow(clippy::too_many_arguments)]
fn update_groups_and_bounds(
bounds_iter: BoundsIter<'_>,
mut start: usize,
time: &[i64],
closed_window: ClosedWindow,
include_lower_bound: bool,
include_upper_bound: bool,
lower_bound: &mut Vec<i64>,
upper_bound: &mut Vec<i64>,
groups: &mut Vec<[IdxSize; 2]>,
) {
let mut iter = bounds_iter.into_iter();
let mut stride = 0;
'bounds: while let Some(bi) = iter.nth(stride) {
let mut has_member = false;
for &t in &time[start..time.len().saturating_sub(1)] {
if bi.is_future(t, closed_window) {
stride = iter.get_stride(t);
continue 'bounds;
}
if bi.is_member_entry(t, closed_window) {
has_member = true;
break;
}
start += 1;
}
stride = if has_member {
0
} else {
debug_assert!(start < time.len());
iter.get_stride(time[start])
};
let mut end = start;
if end == time.len() - 1 {
let t = time[end];
if bi.is_member(t, closed_window) {
if include_lower_bound {
lower_bound.push(bi.start);
}
if include_upper_bound {
upper_bound.push(bi.stop);
}
groups.push([end as IdxSize, 1])
}
continue;
}
for &t in &time[end..] {
if !bi.is_member_exit(t, closed_window) {
break;
}
end += 1;
}
let len = end - start;
if include_lower_bound {
lower_bound.push(bi.start);
}
if include_upper_bound {
upper_bound.push(bi.stop);
}
groups.push([start as IdxSize, len as IdxSize])
}
}
#[allow(clippy::too_many_arguments)]
pub fn group_by_windows(
window: Window,
time: &[i64],
closed_window: ClosedWindow,
tu: TimeUnit,
tz: &Option<TimeZone>,
include_lower_bound: bool,
include_upper_bound: bool,
start_by: StartBy,
) -> PolarsResult<(GroupsSlice, Vec<i64>, Vec<i64>)> {
let start = time[0];
let boundary = if time.len() > 1 {
let stop = time[time.len() - 1] + 1;
Bounds::new_checked(start, stop)
} else {
let stop = start + 1;
Bounds::new_checked(start, stop)
};
let size = {
match tu {
TimeUnit::Nanoseconds => window.estimate_overlapping_bounds_ns(boundary),
TimeUnit::Microseconds => window.estimate_overlapping_bounds_us(boundary),
TimeUnit::Milliseconds => window.estimate_overlapping_bounds_ms(boundary),
}
};
let size_lower = if include_lower_bound { size } else { 0 };
let size_upper = if include_upper_bound { size } else { 0 };
let mut lower_bound = Vec::with_capacity(size_lower);
let mut upper_bound = Vec::with_capacity(size_upper);
let mut groups = Vec::with_capacity(size);
let start_offset = 0;
match tz {
#[cfg(feature = "timezones")]
Some(tz) => {
update_groups_and_bounds(
window.get_overlapping_bounds_iter(
boundary,
closed_window,
tu,
tz.parse::<Tz>().ok().as_ref(),
start_by,
)?,
start_offset,
time,
closed_window,
include_lower_bound,
include_upper_bound,
&mut lower_bound,
&mut upper_bound,
&mut groups,
);
},
_ => {
update_groups_and_bounds(
window.get_overlapping_bounds_iter(boundary, closed_window, tu, None, start_by)?,
start_offset,
time,
closed_window,
include_lower_bound,
include_upper_bound,
&mut lower_bound,
&mut upper_bound,
&mut groups,
);
},
};
Ok((groups, lower_bound, upper_bound))
}
#[inline]
#[allow(clippy::too_many_arguments)]
pub(crate) fn group_by_values_iter_lookbehind(
period: Duration,
offset: Duration,
time: &[i64],
closed_window: ClosedWindow,
tu: TimeUnit,
tz: Option<Tz>,
start_offset: usize,
upper_bound: Option<usize>,
) -> PolarsResult<impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_> {
debug_assert!(offset.duration_ns() == period.duration_ns());
debug_assert!(offset.negative);
let add = match tu {
TimeUnit::Nanoseconds => Duration::add_ns,
TimeUnit::Microseconds => Duration::add_us,
TimeUnit::Milliseconds => Duration::add_ms,
};
let upper_bound = upper_bound.unwrap_or(time.len());
let mut start = if let Some(&t) = time.get(start_offset) {
let lower = add(&offset, t, tz.as_ref())?;
let upper = t;
let b = Bounds::new(lower, upper);
let slice = &time[..start_offset];
slice.partition_point(|v| !b.is_member(*v, closed_window))
} else {
0
};
let mut end = start;
let mut last = time[start_offset];
Ok(time[start_offset..upper_bound]
.iter()
.enumerate()
.map(move |(mut i, t)| {
if *t == last && i > 0 {
let len = end - start;
let offset = start as IdxSize;
return Ok((offset, len as IdxSize));
}
last = *t;
i += start_offset;
let lower = add(&offset, *t, tz.as_ref())?;
let upper = *t;
let b = Bounds::new(lower, upper);
for &t in unsafe { time.get_unchecked(start..i) } {
if b.is_member_entry(t, closed_window) {
break;
}
start += 1;
}
if b.is_member_exit(*t, closed_window) {
end = i;
} else {
end = std::cmp::max(end, start);
}
for &t in unsafe { time.get_unchecked(end..) } {
if !b.is_member_exit(t, closed_window) {
break;
}
end += 1;
}
let len = end - start;
let offset = start as IdxSize;
Ok((offset, len as IdxSize))
}))
}
pub(crate) fn group_by_values_iter_window_behind_t(
period: Duration,
offset: Duration,
time: &[i64],
closed_window: ClosedWindow,
tu: TimeUnit,
tz: Option<Tz>,
) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
let add = match tu {
TimeUnit::Nanoseconds => Duration::add_ns,
TimeUnit::Microseconds => Duration::add_us,
TimeUnit::Milliseconds => Duration::add_ms,
};
let mut start = 0;
let mut end = start;
let mut last = time[0];
let mut started = false;
time.iter().map(move |lower| {
if *lower == last && started {
let len = end - start;
let offset = start as IdxSize;
return Ok((offset, len as IdxSize));
}
last = *lower;
started = true;
let lower = add(&offset, *lower, tz.as_ref())?;
let upper = add(&period, lower, tz.as_ref())?;
let b = Bounds::new(lower, upper);
if b.is_future(time[0], closed_window) {
Ok((0, 0))
} else {
for &t in &time[start..] {
if b.is_member_entry(t, closed_window) {
break;
}
start += 1;
}
end = std::cmp::max(start, end);
for &t in &time[end..] {
if !b.is_member_exit(t, closed_window) {
break;
}
end += 1;
}
let len = end - start;
let offset = start as IdxSize;
Ok((offset, len as IdxSize))
}
})
}
pub(crate) fn group_by_values_iter_partial_lookbehind(
period: Duration,
offset: Duration,
time: &[i64],
closed_window: ClosedWindow,
tu: TimeUnit,
tz: Option<Tz>,
) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
let add = match tu {
TimeUnit::Nanoseconds => Duration::add_ns,
TimeUnit::Microseconds => Duration::add_us,
TimeUnit::Milliseconds => Duration::add_ms,
};
let mut start = 0;
let mut end = start;
let mut last = time[0];
time.iter().enumerate().map(move |(i, lower)| {
if *lower == last && i > 0 {
let len = end - start;
let offset = start as IdxSize;
return Ok((offset, len as IdxSize));
}
last = *lower;
let lower = add(&offset, *lower, tz.as_ref())?;
let upper = add(&period, lower, tz.as_ref())?;
let b = Bounds::new(lower, upper);
for &t in &time[start..] {
if b.is_member_entry(t, closed_window) || start == i {
break;
}
start += 1;
}
end = std::cmp::max(start, end);
for &t in &time[end..] {
if !b.is_member_exit(t, closed_window) {
break;
}
end += 1;
}
let len = end - start;
let offset = start as IdxSize;
Ok((offset, len as IdxSize))
})
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn group_by_values_iter_lookahead(
period: Duration,
offset: Duration,
time: &[i64],
closed_window: ClosedWindow,
tu: TimeUnit,
tz: Option<Tz>,
start_offset: usize,
upper_bound: Option<usize>,
) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
let upper_bound = upper_bound.unwrap_or(time.len());
let add = match tu {
TimeUnit::Nanoseconds => Duration::add_ns,
TimeUnit::Microseconds => Duration::add_us,
TimeUnit::Milliseconds => Duration::add_ms,
};
let mut start = start_offset;
let mut end = start;
let mut last = time[start_offset];
let mut started = false;
time[start_offset..upper_bound].iter().map(move |lower| {
if *lower == last && started {
let len = end - start;
let offset = start as IdxSize;
return Ok((offset, len as IdxSize));
}
started = true;
last = *lower;
let lower = add(&offset, *lower, tz.as_ref())?;
let upper = add(&period, lower, tz.as_ref())?;
let b = Bounds::new(lower, upper);
for &t in &time[start..] {
if b.is_member_entry(t, closed_window) {
break;
}
start += 1;
}
end = std::cmp::max(start, end);
for &t in &time[end..] {
if !b.is_member_exit(t, closed_window) {
break;
}
end += 1;
}
let len = end - start;
let offset = start as IdxSize;
Ok((offset, len as IdxSize))
})
}
#[cfg(feature = "rolling_window_by")]
#[inline]
pub(crate) fn group_by_values_iter(
period: Duration,
time: &[i64],
closed_window: ClosedWindow,
tu: TimeUnit,
tz: Option<Tz>,
) -> PolarsResult<impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_> {
let mut offset = period;
offset.negative = true;
group_by_values_iter_lookbehind(period, offset, time, closed_window, tu, tz, 0, None)
}
fn prune_splits_on_duplicates(time: &[i64], thread_offsets: &mut Vec<(usize, usize)>) {
let is_valid = |window: &[(usize, usize)]| -> bool {
debug_assert_eq!(window.len(), 2);
let left_block_end = window[0].0 + window[0].1.saturating_sub(1);
let right_block_start = window[1].0;
time[left_block_end] != time[right_block_start]
};
if time.is_empty() || thread_offsets.len() <= 1 || thread_offsets.windows(2).all(is_valid) {
return;
}
let mut new = vec![];
for window in thread_offsets.windows(2) {
let this_block_is_valid = is_valid(window);
if this_block_is_valid {
new.push(window[0])
}
}
if thread_offsets.len().is_multiple_of(2) {
let window = &thread_offsets[thread_offsets.len() - 2..];
if is_valid(window) {
new.push(thread_offsets[thread_offsets.len() - 1])
}
}
if new.len() <= 1 {
new = vec![(0, time.len())];
} else {
let mut previous_start = time.len();
for window in new.iter_mut().rev() {
window.1 = previous_start - window.0;
previous_start = window.0;
}
new[0].0 = 0;
new[0].1 = new[1].0;
debug_assert_eq!(new.iter().map(|w| w.1).sum::<usize>(), time.len());
prune_splits_on_duplicates(time, &mut new)
}
std::mem::swap(thread_offsets, &mut new);
}
#[allow(clippy::too_many_arguments)]
fn group_by_values_iter_lookbehind_collected(
period: Duration,
offset: Duration,
time: &[i64],
closed_window: ClosedWindow,
tu: TimeUnit,
tz: Option<Tz>,
start_offset: usize,
upper_bound: Option<usize>,
) -> PolarsResult<Vec<[IdxSize; 2]>> {
let iter = group_by_values_iter_lookbehind(
period,
offset,
time,
closed_window,
tu,
tz,
start_offset,
upper_bound,
)?;
iter.map(|result| result.map(|(offset, len)| [offset, len]))
.collect::<PolarsResult<Vec<_>>>()
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn group_by_values_iter_lookahead_collected(
period: Duration,
offset: Duration,
time: &[i64],
closed_window: ClosedWindow,
tu: TimeUnit,
tz: Option<Tz>,
start_offset: usize,
upper_bound: Option<usize>,
) -> PolarsResult<Vec<[IdxSize; 2]>> {
let iter = group_by_values_iter_lookahead(
period,
offset,
time,
closed_window,
tu,
tz,
start_offset,
upper_bound,
);
iter.map(|result| result.map(|(offset, len)| [offset as IdxSize, len]))
.collect::<PolarsResult<Vec<_>>>()
}
pub fn group_by_values(
period: Duration,
offset: Duration,
time: &[i64],
closed_window: ClosedWindow,
tu: TimeUnit,
tz: Option<Tz>,
) -> PolarsResult<GroupsSlice> {
if time.is_empty() {
return Ok(GroupsSlice::from(vec![]));
}
let mut thread_offsets = _split_offsets(time.len(), RAYON.current_num_threads());
prune_splits_on_duplicates(time, &mut thread_offsets);
let run_parallel = !RAYON.current_thread_has_pending_tasks().unwrap_or(false);
if offset.negative && !offset.is_zero() {
if offset.duration_ns() == period.duration_ns() {
if !run_parallel {
let vecs = group_by_values_iter_lookbehind_collected(
period,
offset,
time,
closed_window,
tu,
tz,
0,
None,
)?;
return Ok(GroupsSlice::from(vecs));
}
RAYON.install(|| {
let vals = thread_offsets
.par_iter()
.copied()
.map(|(base_offset, len)| {
let upper_bound = base_offset + len;
group_by_values_iter_lookbehind_collected(
period,
offset,
time,
closed_window,
tu,
tz,
base_offset,
Some(upper_bound),
)
})
.collect::<PolarsResult<Vec<_>>>()?;
Ok(flatten_par(&vals))
})
} else if ((offset.duration_ns() >= period.duration_ns())
&& matches!(closed_window, ClosedWindow::Left | ClosedWindow::None))
|| ((offset.duration_ns() > period.duration_ns())
&& matches!(closed_window, ClosedWindow::Right | ClosedWindow::Both))
{
let iter =
group_by_values_iter_window_behind_t(period, offset, time, closed_window, tu, tz);
iter.map(|result| result.map(|(offset, len)| [offset, len]))
.collect::<PolarsResult<_>>()
}
else {
let iter = group_by_values_iter_partial_lookbehind(
period,
offset,
time,
closed_window,
tu,
tz,
);
iter.map(|result| result.map(|(offset, len)| [offset, len]))
.collect::<PolarsResult<_>>()
}
} else if !offset.is_zero()
|| closed_window == ClosedWindow::Right
|| closed_window == ClosedWindow::None
{
if !run_parallel {
let vecs = group_by_values_iter_lookahead_collected(
period,
offset,
time,
closed_window,
tu,
tz,
0,
None,
)?;
return Ok(GroupsSlice::from(vecs));
}
RAYON.install(|| {
let vals = thread_offsets
.par_iter()
.copied()
.map(|(base_offset, len)| {
let lower_bound = base_offset;
let upper_bound = base_offset + len;
group_by_values_iter_lookahead_collected(
period,
offset,
time,
closed_window,
tu,
tz,
lower_bound,
Some(upper_bound),
)
})
.collect::<PolarsResult<Vec<_>>>()?;
Ok(flatten_par(&vals))
})
} else {
if !run_parallel {
let vecs = group_by_values_iter_lookahead_collected(
period,
offset,
time,
closed_window,
tu,
tz,
0,
None,
)?;
return Ok(GroupsSlice::from(vecs));
}
RAYON.install(|| {
let vals = thread_offsets
.par_iter()
.copied()
.map(|(base_offset, len)| {
let lower_bound = base_offset;
let upper_bound = base_offset + len;
group_by_values_iter_lookahead_collected(
period,
offset,
time,
closed_window,
tu,
tz,
lower_bound,
Some(upper_bound),
)
})
.collect::<PolarsResult<Vec<_>>>()?;
Ok(flatten_par(&vals))
})
}
}
pub struct RollingWindower {
period: Duration,
offset: Duration,
closed: ClosedWindow,
add: fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
tz: Option<Tz>,
start: IdxSize,
end: IdxSize,
length: IdxSize,
active: VecDeque<ActiveWindow>,
}
struct ActiveWindow {
start: i64,
end: i64,
}
impl ActiveWindow {
#[inline(always)]
fn above_lower_bound(&self, t: i64, closed: ClosedWindow) -> bool {
(t > self.start)
| (matches!(closed, ClosedWindow::Left | ClosedWindow::Both) & (t == self.start))
}
#[inline(always)]
fn below_upper_bound(&self, t: i64, closed: ClosedWindow) -> bool {
(t < self.end)
| (matches!(closed, ClosedWindow::Right | ClosedWindow::Both) & (t == self.end))
}
}
fn skip_in_2d_list(l: &[&[i64]], mut n: usize) -> (usize, usize) {
let mut y = 0;
while y < l.len() && (n >= l[y].len() || l[y].is_empty()) {
n -= l[y].len();
y += 1;
}
assert!(n == 0 || y < l.len());
(n, y)
}
fn increment_2d(x: &mut usize, y: &mut usize, l: &[&[i64]]) {
*x += 1;
while *y < l.len() && *x == l[*y].len() {
*y += 1;
*x = 0;
}
}
impl RollingWindower {
pub fn new(
period: Duration,
offset: Duration,
closed: ClosedWindow,
tu: TimeUnit,
tz: Option<Tz>,
) -> Self {
Self {
period,
offset,
closed,
add: match tu {
TimeUnit::Nanoseconds => Duration::add_ns,
TimeUnit::Microseconds => Duration::add_us,
TimeUnit::Milliseconds => Duration::add_ms,
},
tz,
start: 0,
end: 0,
length: 0,
active: Default::default(),
}
}
pub fn insert(
&mut self,
time: &[&[i64]],
windows: &mut Vec<[IdxSize; 2]>,
) -> PolarsResult<IdxSize> {
let (mut i_x, mut i_y) = skip_in_2d_list(time, (self.length - self.start) as usize);
let (mut s_x, mut s_y) = skip_in_2d_list(time, 0); let (mut e_x, mut e_y) = skip_in_2d_list(time, (self.end - self.start) as usize);
let time_start = self.start;
let mut i = self.length;
while i_y < time.len() {
let t = time[i_y][i_x];
let window_start = (self.add)(&self.offset, t, self.tz.as_ref())?;
let window_end = if self.offset == -self.period {
t
} else {
(self.add)(&self.period, window_start, self.tz.as_ref())?
};
self.active.push_back(ActiveWindow {
start: window_start,
end: window_end,
});
while let Some(w) = self.active.front() {
if w.below_upper_bound(t, self.closed) {
break;
}
let w = self.active.pop_front().unwrap();
while self.start < i && !w.above_lower_bound(time[s_y][s_x], self.closed) {
increment_2d(&mut s_x, &mut s_y, time);
self.start += 1;
}
while self.end < i && w.below_upper_bound(time[e_y][e_x], self.closed) {
increment_2d(&mut e_x, &mut e_y, time);
self.end += 1;
}
windows.push([self.start, self.end - self.start]);
}
increment_2d(&mut i_x, &mut i_y, time);
i += 1;
}
self.length = i;
Ok(self.start - time_start)
}
pub fn finalize(&mut self, time: &[&[i64]], windows: &mut Vec<[IdxSize; 2]>) {
assert_eq!(
time.iter().map(|t| t.len()).sum::<usize>() as IdxSize,
self.length - self.start
);
let (mut s_x, mut s_y) = skip_in_2d_list(time, 0);
let (mut e_x, mut e_y) = skip_in_2d_list(time, (self.end - self.start) as usize);
windows.extend(self.active.drain(..).map(|w| {
while self.start < self.length && !w.above_lower_bound(time[s_y][s_x], self.closed) {
increment_2d(&mut s_x, &mut s_y, time);
self.start += 1;
}
while self.end < self.length && w.below_upper_bound(time[e_y][e_x], self.closed) {
increment_2d(&mut e_x, &mut e_y, time);
self.end += 1;
}
[self.start, self.end - self.start]
}));
self.start = 0;
self.end = 0;
self.length = 0;
}
pub fn reset(&mut self) {
self.active.clear();
self.start = 0;
self.end = 0;
self.length = 0;
}
}
#[derive(Debug)]
struct ActiveDynWindow {
start: IdxSize,
lower_bound: i64,
upper_bound: i64,
}
#[inline(always)]
fn is_above_lower_bound(t: i64, lb: i64, closed: ClosedWindow) -> bool {
(t > lb) | (matches!(closed, ClosedWindow::Left | ClosedWindow::Both) & (t == lb))
}
#[inline(always)]
fn is_below_upper_bound(t: i64, ub: i64, closed: ClosedWindow) -> bool {
(t < ub) | (matches!(closed, ClosedWindow::Right | ClosedWindow::Both) & (t == ub))
}
pub struct GroupByDynamicWindower {
period: Duration,
offset: Duration,
every: Duration,
closed: ClosedWindow,
start_by: StartBy,
add: fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
nte: fn(&Duration) -> i64,
tu: TimeUnit,
tz: Option<Tz>,
include_lower_bound: bool,
include_upper_bound: bool,
num_seen: IdxSize,
next_lower_bound: i64,
active: VecDeque<ActiveDynWindow>,
}
impl GroupByDynamicWindower {
#[expect(clippy::too_many_arguments)]
pub fn new(
period: Duration,
offset: Duration,
every: Duration,
start_by: StartBy,
closed: ClosedWindow,
tu: TimeUnit,
tz: Option<Tz>,
include_lower_bound: bool,
include_upper_bound: bool,
) -> Self {
Self {
period,
offset,
every,
closed,
start_by,
add: match tu {
TimeUnit::Nanoseconds => Duration::add_ns,
TimeUnit::Microseconds => Duration::add_us,
TimeUnit::Milliseconds => Duration::add_ms,
},
nte: match tu {
TimeUnit::Nanoseconds => Duration::nte_duration_ns,
TimeUnit::Microseconds => Duration::nte_duration_us,
TimeUnit::Milliseconds => Duration::nte_duration_ms,
},
tu,
tz,
include_lower_bound,
include_upper_bound,
num_seen: 0,
next_lower_bound: 0,
active: Default::default(),
}
}
pub fn find_first_window_around(
&self,
mut lower_bound: i64,
target: i64,
) -> PolarsResult<Result<(i64, i64), i64>> {
let mut upper_bound = (self.add)(&self.period, lower_bound, self.tz.as_ref())?;
while !is_below_upper_bound(target, upper_bound, self.closed) {
let gap = target - lower_bound;
let nth = match self.tu {
TimeUnit::Nanoseconds
if gap > self.every.nte_duration_ns() + self.period.nte_duration_ns() =>
{
((gap - self.period.nte_duration_ns()) as usize)
/ (self.every.nte_duration_ns() as usize)
},
TimeUnit::Microseconds
if gap > self.every.nte_duration_us() + self.period.nte_duration_us() =>
{
((gap - self.period.nte_duration_us()) as usize)
/ (self.every.nte_duration_us() as usize)
},
TimeUnit::Milliseconds
if gap > self.every.nte_duration_ms() + self.period.nte_duration_ms() =>
{
((gap - self.period.nte_duration_ms()) as usize)
/ (self.every.nte_duration_ms() as usize)
},
_ => 1,
};
let nth: i64 = nth.try_into().unwrap();
lower_bound = (self.add)(&(self.every * nth), lower_bound, self.tz.as_ref())?;
upper_bound = (self.add)(&self.period, lower_bound, self.tz.as_ref())?;
}
if is_above_lower_bound(target, lower_bound, self.closed) {
Ok(Ok((lower_bound, upper_bound)))
} else {
Ok(Err(lower_bound))
}
}
fn start_lower_bound(&self, first: i64) -> PolarsResult<i64> {
match self.start_by {
StartBy::DataPoint => Ok(first),
StartBy::WindowBound => {
let get_earliest_bounds = match self.tu {
TimeUnit::Nanoseconds => Window::get_earliest_bounds_ns,
TimeUnit::Microseconds => Window::get_earliest_bounds_us,
TimeUnit::Milliseconds => Window::get_earliest_bounds_ms,
};
Ok((get_earliest_bounds)(
&Window::new(self.every, self.period, self.offset),
first,
self.closed,
self.tz.as_ref(),
)?
.start)
},
_ => {
{
#[allow(clippy::type_complexity)]
let (from, to): (
fn(i64) -> NaiveDateTime,
fn(NaiveDateTime) -> i64,
) = match self.tu {
TimeUnit::Nanoseconds => {
(timestamp_ns_to_datetime, datetime_to_timestamp_ns)
},
TimeUnit::Microseconds => {
(timestamp_us_to_datetime, datetime_to_timestamp_us)
},
TimeUnit::Milliseconds => {
(timestamp_ms_to_datetime, datetime_to_timestamp_ms)
},
};
let dt = from(first);
match self.tz.as_ref() {
#[cfg(feature = "timezones")]
Some(tz) => {
let dt = tz.from_utc_datetime(&dt);
let dt = dt.beginning_of_week();
let dt = dt.naive_utc();
let start = to(dt);
let start = (self.add)(
&Duration::parse(&format!("{}d", self.start_by.weekday().unwrap())),
start,
self.tz.as_ref(),
)?;
let start = (self.add)(&self.offset, start, self.tz.as_ref())?;
Ok(ensure_t_in_or_in_front_of_window(
self.every,
first,
self.add,
self.nte,
self.period,
start,
self.closed,
self.tz.as_ref(),
)?
.start)
},
_ => {
let tz = chrono::Utc;
let dt = dt.and_local_timezone(tz).unwrap();
let dt = dt.beginning_of_week();
let dt = dt.naive_utc();
let start = to(dt);
let start = (self.add)(
&Duration::parse(&format!("{}d", self.start_by.weekday().unwrap())),
start,
None,
)
.unwrap();
let start = (self.add)(&self.offset, start, None).unwrap();
Ok(ensure_t_in_or_in_front_of_window(
self.every,
first,
self.add,
self.nte,
self.period,
start,
self.closed,
None,
)?
.start)
},
}
}
},
}
}
pub fn insert(
&mut self,
time: &[i64],
windows: &mut Vec<[IdxSize; 2]>,
lower_bound: &mut Vec<i64>,
upper_bound: &mut Vec<i64>,
) -> PolarsResult<()> {
if time.is_empty() {
return Ok(());
}
if self.num_seen == 0 {
debug_assert!(self.active.is_empty());
self.next_lower_bound = self.start_lower_bound(time[0])?;
}
for &t in time {
while let Some(w) = self.active.front()
&& !is_below_upper_bound(t, w.upper_bound, self.closed)
{
let w = self.active.pop_front().unwrap();
windows.push([w.start, self.num_seen - w.start]);
if self.include_lower_bound {
lower_bound.push(w.lower_bound);
}
if self.include_upper_bound {
upper_bound.push(w.upper_bound);
}
}
while is_above_lower_bound(t, self.next_lower_bound, self.closed) {
match self.find_first_window_around(self.next_lower_bound, t)? {
Ok((lower_bound, upper_bound)) => {
self.next_lower_bound =
(self.add)(&self.every, lower_bound, self.tz.as_ref())?;
self.active.push_back(ActiveDynWindow {
start: self.num_seen,
lower_bound,
upper_bound,
});
},
Err(lower_bound) => {
self.next_lower_bound = lower_bound;
break;
},
}
}
self.num_seen += 1
}
Ok(())
}
pub fn lowest_needed_index(&self) -> IdxSize {
self.active.front().map_or(self.num_seen, |w| w.start)
}
pub fn finalize(
&mut self,
windows: &mut Vec<[IdxSize; 2]>,
lower_bound: &mut Vec<i64>,
upper_bound: &mut Vec<i64>,
) {
for w in self.active.drain(..) {
windows.push([w.start, self.num_seen - w.start]);
if self.include_lower_bound {
lower_bound.push(w.lower_bound);
}
if self.include_upper_bound {
upper_bound.push(w.upper_bound);
}
}
self.next_lower_bound = 0;
self.num_seen = 0;
}
pub fn num_seen(&self) -> IdxSize {
self.num_seen
}
pub fn time_unit(&self) -> TimeUnit {
self.tu
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_prune_duplicates() {
let time = &[0, 1, 1, 2, 2, 2, 3, 4, 5, 6, 5];
let mut splits = vec![(0, 2), (2, 4), (6, 2), (8, 3)];
prune_splits_on_duplicates(time, &mut splits);
assert_eq!(splits, &[(0, 6), (6, 2), (8, 3)]);
}
}