streamweave_window/transformers/
time_transformer.rs1use crate::time_window_transformer::TimeWindowTransformer;
2use async_trait::async_trait;
3use futures::StreamExt;
4use std::collections::VecDeque;
5use streamweave::{Transformer, TransformerConfig};
6use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
7use tokio::time::interval;
8
9#[async_trait]
10impl<T: std::fmt::Debug + Clone + Send + Sync + 'static> Transformer for TimeWindowTransformer<T> {
11 type InputPorts = (T,);
12 type OutputPorts = (Vec<T>,);
13
14 fn transform(&mut self, input: Self::InputStream) -> Self::OutputStream {
15 let duration = self.duration;
16 Box::pin(async_stream::stream! {
17 let mut window: VecDeque<T> = VecDeque::new();
18 let mut input = input;
19 let mut interval = interval(duration);
20
21 interval.tick().await;
23
24 loop {
25 tokio::select! {
26 _ = interval.tick() => {
28 if !window.is_empty() {
30 let window_vec: Vec<T> = window.iter().cloned().collect();
31 yield window_vec;
32 window.clear();
33 }
34 }
35 item = input.next() => {
37 match item {
38 Some(item) => {
39 window.push_back(item);
40 }
41 None => {
42 if !window.is_empty() {
44 yield window.iter().cloned().collect::<Vec<_>>();
45 }
46 break;
47 }
48 }
49 }
50 }
51 }
52 })
53 }
54
55 fn set_config_impl(&mut self, config: TransformerConfig<T>) {
56 self.config = config;
57 }
58
59 fn get_config_impl(&self) -> &TransformerConfig<T> {
60 &self.config
61 }
62
63 fn get_config_mut_impl(&mut self) -> &mut TransformerConfig<T> {
64 &mut self.config
65 }
66
67 fn handle_error(&self, error: &StreamError<T>) -> ErrorAction {
68 match self.config.error_strategy {
69 ErrorStrategy::Stop => ErrorAction::Stop,
70 ErrorStrategy::Skip => ErrorAction::Skip,
71 ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
72 _ => ErrorAction::Stop,
73 }
74 }
75
76 fn create_error_context(&self, item: Option<T>) -> ErrorContext<T> {
77 ErrorContext {
78 timestamp: chrono::Utc::now(),
79 item,
80 component_name: self.component_info().name,
81 component_type: std::any::type_name::<Self>().to_string(),
82 }
83 }
84
85 fn component_info(&self) -> ComponentInfo {
86 ComponentInfo {
87 name: self
88 .config
89 .name
90 .clone()
91 .unwrap_or_else(|| "time_window_transformer".to_string()),
92 type_name: std::any::type_name::<Self>().to_string(),
93 }
94 }
95}
96
97#[cfg(test)]
98mod tests {
99 use super::*;
100 use futures::StreamExt;
101 use futures::stream;
102 use std::time::Duration;
103
104 #[tokio::test]
105 async fn test_time_window_basic() {
106 let mut transformer = TimeWindowTransformer::new(Duration::from_millis(100));
107 let input = stream::iter(vec![1, 2, 3, 4, 5].into_iter());
108 let boxed_input = Box::pin(input);
109
110 let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
111
112 assert!(!result.is_empty());
114 let total_items: usize = result.iter().map(|w| w.len()).sum();
116 assert_eq!(total_items, 5);
117 }
118
119 #[tokio::test]
120 async fn test_time_window_empty_input() {
121 let mut transformer = TimeWindowTransformer::new(Duration::from_millis(100));
122 let input = stream::iter(Vec::<i32>::new());
123 let boxed_input = Box::pin(input);
124
125 let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
126
127 assert_eq!(result, Vec::<Vec<i32>>::new());
128 }
129
130 #[tokio::test]
131 async fn test_error_handling_strategies() {
132 let transformer = TimeWindowTransformer::new(Duration::from_secs(1))
133 .with_error_strategy(ErrorStrategy::<i32>::Skip)
134 .with_name("test_transformer".to_string());
135
136 let config = transformer.config();
137 assert_eq!(config.error_strategy(), ErrorStrategy::<i32>::Skip);
138 assert_eq!(config.name(), Some("test_transformer".to_string()));
139 }
140}