1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
// Licensed under the Apache License, Version 2.0
// http://www.apache.org/licenses/LICENSE-2.0
macro_rules! define_window_by_count_impl {
($($bounds:tt)*) => {
use alloc::boxed::Box;
use alloc::sync::Arc;
use alloc::vec::Vec;
use core::fmt::Debug;
use core::mem::take;
use fluxion_core::fluxion_mutex::Mutex;
use fluxion_core::{Fluxion, StreamItem};
use futures::{future::ready, Stream, StreamExt};
/// Extension trait providing the [`window_by_count`](WindowByCountExt::window_by_count) operator.
///
/// This trait is implemented for all streams of [`StreamItem<T>`] where `T` implements [`Fluxion`].
pub trait WindowByCountExt<T>: Stream<Item = StreamItem<T>> + Sized
where
T: Fluxion,
T::Inner: Clone + Debug + Ord + Unpin + 'static + $($bounds)*,
T::Timestamp: Debug + Ord + Copy + 'static + $($bounds)*,
{
/// Groups consecutive items into fixed-size windows (batches).
///
/// Collects items into vectors of size `n`. When `n` items have been collected,
/// emits a `Vec<T::Inner>` with the timestamp of the last item in the window.
/// On stream completion, any remaining items are emitted as a partial window.
///
/// # Type Parameters
///
/// - `Out`: The output wrapper type (must implement `Fluxion` with `Inner = Vec<T::Inner>`)
///
/// # Arguments
///
/// * `n` - The window size. Must be at least 1.
fn window_by_count<Out>(self, n: usize) -> impl Stream<Item = StreamItem<Out>> + $($bounds)*
where
Out: Fluxion<Inner = Vec<T::Inner>>,
Out::Inner: Clone + Debug + Ord + Unpin + 'static + $($bounds)*,
Out::Timestamp: From<T::Timestamp> + Debug + Ord + Copy + 'static + $($bounds)*;
}
impl<S, T> WindowByCountExt<T> for S
where
S: Stream<Item = StreamItem<T>> + 'static + $($bounds)*,
T: Fluxion,
T::Inner: Clone + Debug + Ord + Unpin + 'static + $($bounds)*,
T::Timestamp: Debug + Ord + Copy + 'static + $($bounds)*,
{
fn window_by_count<Out>(self, n: usize) -> impl Stream<Item = StreamItem<Out>> + $($bounds)*
where
Out: Fluxion<Inner = Vec<T::Inner>>,
Out::Inner: Clone + Debug + Ord + Unpin + 'static + $($bounds)*,
Out::Timestamp: From<T::Timestamp> + Debug + Ord + Copy + 'static + $($bounds)*,
{
assert!(n >= 1, "window_by_count: window size must be at least 1");
// State: (buffer, last_timestamp)
let state = Arc::new(Mutex::new((Vec::with_capacity(n), None::<T::Timestamp>)));
// Use filter_map to accumulate and emit when window is full
// We need to handle the completion case separately using chain
let window_size = n;
let state_clone = Arc::clone(&state);
let main_stream = self.filter_map(move |item| {
let state = Arc::clone(&state_clone);
let window_size = window_size;
ready(match item {
StreamItem::Value(value) => {
let timestamp = value.timestamp();
let inner = value.into_inner();
let mut guard = state.lock();
let (buffer, last_ts) = &mut *guard;
buffer.push(inner);
*last_ts = Some(timestamp);
if buffer.len() >= window_size {
let window = take(buffer);
*buffer = Vec::with_capacity(window_size);
let ts = last_ts.take().expect("timestamp must exist");
Some(StreamItem::Value(Out::with_timestamp(window, ts.into())))
} else {
None
}
}
StreamItem::Error(e) => {
// Clear buffer and propagate error
let mut guard = state.lock();
let (buffer, last_ts) = &mut *guard;
buffer.clear();
*last_ts = None;
Some(StreamItem::Error(e))
}
})
});
// Chain with a stream that emits partial window on completion
let final_state = state;
let flush_stream = futures::stream::once(async move {
let mut guard = final_state.lock();
let (buffer, last_ts) = &mut *guard;
if !buffer.is_empty() {
let window = take(buffer);
let ts = last_ts
.take()
.expect("timestamp must exist for partial window");
Some(StreamItem::Value(Out::with_timestamp(window, ts.into())))
} else {
None
}
})
.filter_map(ready);
Box::pin(main_stream.chain(flush_stream))
}
}
};
}