import datetime as dt
from typing import Literal
import pandas as pd
import pytest
from frequenz.resampling import Closed, Label, Resampler, ResamplingFunction, resample
def test_resampler_resampling_function_average() -> None:
start = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc)
step = dt.timedelta(seconds=1)
resampler = Resampler(
dt.timedelta(seconds=5),
ResamplingFunction.Average,
max_age_in_intervals=1,
start=start,
closed=Closed.Left,
label=Label.Right,
)
for i in range(10):
resampler.push_sample(timestamp=start + i * step, value=i + 1)
expected = [
(start + 5 * step, 3.0),
(start + 10 * step, 8.0),
]
resampled = resampler.resample(start + 10 * step)
assert resampled == expected
def test_resampler_resampling_function_sum() -> None:
start = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc)
step = dt.timedelta(seconds=1)
resampler = Resampler(
dt.timedelta(seconds=5),
ResamplingFunction.Sum,
max_age_in_intervals=1,
start=start,
closed=Closed.Left,
label=Label.Right,
)
for i in range(10):
resampler.push_sample(timestamp=start + i * step, value=i + 1)
expected = [
(start + 5 * step, 15.0),
(start + 10 * step, 40.0),
]
resampled = resampler.resample(start + 10 * step)
assert resampled == expected
def test_resampler_resampling_function_max() -> None:
start = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc)
step = dt.timedelta(seconds=1)
resampler = Resampler(
dt.timedelta(seconds=5),
ResamplingFunction.Max,
max_age_in_intervals=1,
start=start,
closed=Closed.Left,
label=Label.Right,
)
for i in range(10):
resampler.push_sample(timestamp=start + i * step, value=i + 1)
expected = [
(start + 5 * step, 5.0),
(start + 10 * step, 10.0),
]
resampled = resampler.resample(start + 10 * step)
assert resampled == expected
def test_resampler_resampling_function_min() -> None:
start = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc)
step = dt.timedelta(seconds=1)
resampler = Resampler(
dt.timedelta(seconds=5),
ResamplingFunction.Min,
max_age_in_intervals=1,
start=start,
closed=Closed.Left,
label=Label.Right,
)
for i in range(10):
resampler.push_sample(timestamp=start + i * step, value=i + 1)
expected = [
(start + 5 * step, 1.0),
(start + 10 * step, 6.0),
]
resampled = resampler.resample(start + 10 * step)
assert resampled == expected
def test_resampler_resampling_function_first() -> None:
start = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc)
step = dt.timedelta(seconds=1)
resampler = Resampler(
dt.timedelta(seconds=5),
ResamplingFunction.First,
max_age_in_intervals=1,
start=start,
closed=Closed.Left,
label=Label.Right,
)
for i in range(10):
resampler.push_sample(timestamp=start + i * step, value=i + 1)
expected = [
(start + 5 * step, 1.0),
(start + 10 * step, 6.0),
]
resampled = resampler.resample(start + 10 * step)
assert resampled == expected
def test_resampler_resampling_function_last() -> None:
start = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc)
step = dt.timedelta(seconds=1)
resampler = Resampler(
dt.timedelta(seconds=5),
ResamplingFunction.Last,
max_age_in_intervals=1,
start=start,
closed=Closed.Left,
label=Label.Right,
)
for i in range(10):
resampler.push_sample(timestamp=start + i * step, value=i + 1)
expected = [
(start + 5 * step, 5.0),
(start + 10 * step, 10.0),
]
resampled = resampler.resample(start + 10 * step)
assert resampled == expected
def test_resampler_resampling_function_coalesce() -> None:
start = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc)
step = dt.timedelta(seconds=1)
resampler = Resampler(
dt.timedelta(seconds=5),
ResamplingFunction.Coalesce,
max_age_in_intervals=1,
start=start,
closed=Closed.Left,
label=Label.Right,
)
for i in range(10):
if i == 5:
resampler.push_sample(timestamp=start + i * step, value=None)
else:
resampler.push_sample(timestamp=start + i * step, value=i + 1)
expected = [
(start + 5 * step, 1.0),
(start + 10 * step, 7.0),
]
resampled = resampler.resample(start + 10 * step)
assert resampled == expected
def test_resampler_resampling_function_count() -> None:
start = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc)
step = dt.timedelta(seconds=1)
resampler = Resampler(
dt.timedelta(seconds=5),
ResamplingFunction.Count,
max_age_in_intervals=1,
start=start,
closed=Closed.Left,
label=Label.Right,
)
for i in range(10):
resampler.push_sample(timestamp=start + i * step, value=i + 1)
expected = [
(start + 5 * step, 5.0),
(start + 10 * step, 5.0),
]
resampled = resampler.resample(start + 10 * step)
assert resampled == expected
def test_resampling_none() -> None:
start = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc)
step = dt.timedelta(seconds=1)
resampler = Resampler(
dt.timedelta(seconds=5),
ResamplingFunction.Average,
max_age_in_intervals=1,
start=start,
closed=Closed.Left,
label=Label.Right,
)
for i in range(10):
resampler.push_sample(timestamp=start + i * step, value=None)
expected = [
(start + 5 * step, None),
(start + 10 * step, None),
]
resampled = resampler.resample(start + 10 * step)
assert resampled == expected
def test_enum_values() -> None:
assert ResamplingFunction.values() == [0, 1, 2, 3, 4, 5, 6, 7]
def test_enum_members() -> None:
assert ResamplingFunction.members() == [
("Average", 0),
("Sum", 1),
("Max", 2),
("Min", 3),
("Last", 4),
("Count", 5),
("First", 6),
("Coalesce", 7),
]
def test_enum_str_repr() -> None:
assert str(ResamplingFunction.Average) == "ResamplingFunction.Average"
assert repr(ResamplingFunction.Average) == "<ResamplingFunction.Average: 0>"
assert str(ResamplingFunction.Sum) == "ResamplingFunction.Sum"
assert repr(ResamplingFunction.Sum) == "<ResamplingFunction.Sum: 1>"
assert str(ResamplingFunction.Max) == "ResamplingFunction.Max"
assert repr(ResamplingFunction.Max) == "<ResamplingFunction.Max: 2>"
assert str(ResamplingFunction.Min) == "ResamplingFunction.Min"
assert repr(ResamplingFunction.Min) == "<ResamplingFunction.Min: 3>"
assert str(ResamplingFunction.Last) == "ResamplingFunction.Last"
assert repr(ResamplingFunction.Last) == "<ResamplingFunction.Last: 4>"
assert str(ResamplingFunction.Count) == "ResamplingFunction.Count"
assert repr(ResamplingFunction.Count) == "<ResamplingFunction.Count: 5>"
assert str(ResamplingFunction.First) == "ResamplingFunction.First"
assert repr(ResamplingFunction.First) == "<ResamplingFunction.First: 6>"
assert str(ResamplingFunction.Coalesce) == "ResamplingFunction.Coalesce"
assert repr(ResamplingFunction.Coalesce) == "<ResamplingFunction.Coalesce: 7>"
def test_resampling_function_name_value() -> None:
assert ResamplingFunction.Average.name == "Average"
assert ResamplingFunction.Average.value == 0
assert ResamplingFunction.Sum.name == "Sum"
assert ResamplingFunction.Sum.value == 1
assert ResamplingFunction.Max.name == "Max"
assert ResamplingFunction.Max.value == 2
assert ResamplingFunction.Min.name == "Min"
assert ResamplingFunction.Min.value == 3
assert ResamplingFunction.Last.name == "Last"
assert ResamplingFunction.Last.value == 4
assert ResamplingFunction.Count.name == "Count"
assert ResamplingFunction.Count.value == 5
assert ResamplingFunction.First.name == "First"
assert ResamplingFunction.First.value == 6
assert ResamplingFunction.Coalesce.name == "Coalesce"
assert ResamplingFunction.Coalesce.value == 7
def test_resampling_function_init() -> None:
assert ResamplingFunction(0) == ResamplingFunction.Average
assert ResamplingFunction(1) == ResamplingFunction.Sum
assert ResamplingFunction(2) == ResamplingFunction.Max
assert ResamplingFunction(3) == ResamplingFunction.Min
assert ResamplingFunction(4) == ResamplingFunction.Last
assert ResamplingFunction(5) == ResamplingFunction.Count
assert ResamplingFunction(6) == ResamplingFunction.First
assert ResamplingFunction(7) == ResamplingFunction.Coalesce
def test_resampler_label_left() -> None:
start = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc)
step = dt.timedelta(seconds=0.5)
resampler = Resampler(
dt.timedelta(seconds=5),
ResamplingFunction.Average,
max_age_in_intervals=1,
start=start,
closed=Closed.Left,
label=Label.Left,
)
for i in range(0, 20):
resampler.push_sample(timestamp=start + i * step, value=i + 1)
expected = [
(start + 0 * step, 5.5),
(start + 10 * step, 15.5),
]
resampled = resampler.resample(start + 20 * step)
assert resampled == expected
def test_resampler_last_timestamp() -> None:
start = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc)
step = dt.timedelta(seconds=0.5)
resampler = Resampler(
dt.timedelta(seconds=5),
ResamplingFunction.Average,
max_age_in_intervals=1,
start=start,
closed=Closed.Left,
label=Label.Right,
)
for i in range(20):
resampler.push_sample(timestamp=start + i * step, value=i + 1)
expected = [
(start + 10 * step, 5.5),
(start + 20 * step, 15.5),
]
resampled = resampler.resample(start + 20 * step)
assert resampled == expected
def test_resample_function_basic() -> None:
start = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc)
step = dt.timedelta(seconds=1)
data = [(start + i * step, float(i + 1)) for i in range(10)]
result = resample(
data,
dt.timedelta(seconds=5),
ResamplingFunction.Average,
closed=Closed.Left,
label=Label.Left,
)
assert len(result) == 2
assert result[0] == (start, 3.0)
assert result[1] == (start + 5 * step, 8.0)
def test_resample_function_label_right() -> None:
start = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc)
step = dt.timedelta(seconds=1)
data = [(start + i * step, float(i + 1)) for i in range(10)]
result = resample(
data,
dt.timedelta(seconds=5),
ResamplingFunction.Average,
closed=Closed.Left,
label=Label.Right,
)
assert len(result) == 2
assert result[0] == (start + 5 * step, 3.0)
assert result[1] == (start + 10 * step, 8.0)
def test_resample_function_closed_right() -> None:
start = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc)
step = dt.timedelta(seconds=1)
data = [
(start, 10.0),
(start + 5 * step, 20.0),
]
result = resample(
data,
dt.timedelta(seconds=5),
ResamplingFunction.Sum,
closed=Closed.Right,
label=Label.Right,
)
assert result == [
(start, 10.0),
(start + 5 * step, 20.0),
]
@pytest.mark.parametrize(
("closed", "label", "pandas_closed", "pandas_label"),
[
(Closed.Left, Label.Left, "left", "left"),
(Closed.Left, Label.Right, "left", "right"),
(Closed.Right, Label.Left, "right", "left"),
(Closed.Right, Label.Right, "right", "right"),
],
)
def test_resample_function_matches_pandas(
closed: Closed,
label: Label,
pandas_closed: Literal["left", "right"],
pandas_label: Literal["left", "right"],
) -> None:
start = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc)
step = dt.timedelta(seconds=1)
data = [(start + i * step, float(i + 1)) for i in range(10)]
result = resample(
data,
dt.timedelta(seconds=5),
ResamplingFunction.Average,
closed=closed,
label=label,
)
pandas_series = pd.Series(
[value for _, value in data],
index=pd.DatetimeIndex([timestamp for timestamp, _ in data]),
dtype="float64",
)
pandas_result = pandas_series.resample(
"5s", closed=pandas_closed, label=pandas_label
).mean()
expected = [
(timestamp.to_pydatetime(), float(value))
for timestamp, value in zip(
pandas_result.index, pandas_result.to_list(), strict=True
)
]
assert result == expected
def test_label_init_invalid_value() -> None:
with pytest.raises(ValueError, match="Invalid label"):
Label(99)
def test_closed_init_invalid_value() -> None:
with pytest.raises(ValueError, match="Invalid closed"):
Closed(99)
def test_label_values_members() -> None:
assert Label.values() == [0, 1]
assert Label.members() == [("Left", 0), ("Right", 1)]
def test_closed_values_members() -> None:
assert Closed.values() == [0, 1]
assert Closed.members() == [("Left", 0), ("Right", 1)]
def test_resample_function_empty_data() -> None:
data: list[tuple[dt.datetime, float | None]] = []
result = resample(
data,
dt.timedelta(seconds=5),
ResamplingFunction.Average,
closed=Closed.Left,
label=Label.Left,
)
assert result == []
def test_resample_function_with_none_values() -> None:
start = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc)
step = dt.timedelta(seconds=1)
data: list[tuple[dt.datetime, float | None]] = [
(start + i * step, None if i in (0, 5) else float(i + 1)) for i in range(10)
]
result = resample(
data,
dt.timedelta(seconds=5),
ResamplingFunction.Average,
closed=Closed.Left,
label=Label.Left,
)
assert len(result) == 2
assert result[0] == (start, 3.5)
assert result[1] == (start + 5 * step, 8.5)
def test_resample_function_interval_with_only_none() -> None:
start = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc)
step = dt.timedelta(seconds=1)
data: list[tuple[dt.datetime, float | None]] = [
(start + i * step, None if i < 5 else float(i + 1)) for i in range(10)
]
result = resample(
data,
dt.timedelta(seconds=5),
ResamplingFunction.Average,
closed=Closed.Left,
label=Label.Left,
)
assert len(result) == 2
assert result[0] == (start, None)
assert result[1] == (start + 5 * step, 8.0)
def test_resample_function_sum() -> None:
start = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc)
step = dt.timedelta(seconds=1)
data = [(start + i * step, float(i + 1)) for i in range(10)]
result = resample(
data,
dt.timedelta(seconds=5),
ResamplingFunction.Sum,
closed=Closed.Left,
label=Label.Left,
)
assert len(result) == 2
assert result[0] == (start, 15.0)
assert result[1] == (start + 5 * step, 40.0)
def test_resample_function_min_max() -> None:
start = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc)
step = dt.timedelta(seconds=1)
data = [(start + i * step, float(i + 1)) for i in range(10)]
min_result = resample(
data,
dt.timedelta(seconds=5),
ResamplingFunction.Min,
closed=Closed.Left,
label=Label.Left,
)
max_result = resample(
data,
dt.timedelta(seconds=5),
ResamplingFunction.Max,
closed=Closed.Left,
label=Label.Left,
)
assert min_result[0] == (start, 1.0)
assert min_result[1] == (start + 5 * step, 6.0)
assert max_result[0] == (start, 5.0)
assert max_result[1] == (start + 5 * step, 10.0)
def test_resample_function_single_sample() -> None:
start = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc)
data = [(start, 42.0)]
result = resample(
data,
dt.timedelta(seconds=5),
ResamplingFunction.Average,
closed=Closed.Left,
label=Label.Left,
)
assert len(result) == 1
assert result[0] == (start, 42.0)
def test_resample_function_all_methods() -> None:
start = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc)
step = dt.timedelta(seconds=1)
data = [(start + i * step, float(i + 1)) for i in range(5)]
for method in [
ResamplingFunction.Average,
ResamplingFunction.Sum,
ResamplingFunction.Min,
ResamplingFunction.Max,
ResamplingFunction.First,
ResamplingFunction.Last,
ResamplingFunction.Count,
ResamplingFunction.Coalesce,
]:
result = resample(
data,
dt.timedelta(seconds=5),
method,
closed=Closed.Left,
label=Label.Left,
)
assert len(result) == 1
assert result[0][0] == start