streamweave_window/transformers/
transformer.rs1use crate::window_transformer::WindowTransformer;
2use async_trait::async_trait;
3use futures::StreamExt;
4use std::collections::VecDeque;
5use streamweave::{Transformer, TransformerConfig};
6use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
7
8#[async_trait]
9impl<T: std::fmt::Debug + Clone + Send + Sync + 'static> Transformer for WindowTransformer<T> {
10 type InputPorts = (T,);
11 type OutputPorts = (Vec<T>,);
12
13 fn transform(&mut self, input: Self::InputStream) -> Self::OutputStream {
14 let size = self.size;
15 Box::pin(async_stream::stream! {
16 let mut window: VecDeque<T> = VecDeque::with_capacity(size);
17 let mut input = input;
18
19 while let Some(item) = input.next().await {
20 window.push_back(item);
21
22 if window.len() == size {
23 let window_vec: Vec<T> = window.iter().cloned().collect();
25 yield window_vec;
26 window.pop_front();
28 }
29 }
30
31 if !window.is_empty() {
33 yield window.iter().cloned().collect::<Vec<_>>();
34 }
35 })
36 }
37
38 fn set_config_impl(&mut self, config: TransformerConfig<T>) {
39 self.config = config;
40 }
41
42 fn get_config_impl(&self) -> &TransformerConfig<T> {
43 &self.config
44 }
45
46 fn get_config_mut_impl(&mut self) -> &mut TransformerConfig<T> {
47 &mut self.config
48 }
49
50 fn handle_error(&self, error: &StreamError<T>) -> ErrorAction {
51 match self.config.error_strategy {
52 ErrorStrategy::Stop => ErrorAction::Stop,
53 ErrorStrategy::Skip => ErrorAction::Skip,
54 ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
55 _ => ErrorAction::Stop,
56 }
57 }
58
59 fn create_error_context(&self, item: Option<T>) -> ErrorContext<T> {
60 ErrorContext {
61 timestamp: chrono::Utc::now(),
62 item,
63 component_name: self.component_info().name,
64 component_type: std::any::type_name::<Self>().to_string(),
65 }
66 }
67
68 fn component_info(&self) -> ComponentInfo {
69 ComponentInfo {
70 name: self
71 .config
72 .name
73 .clone()
74 .unwrap_or_else(|| "window_transformer".to_string()),
75 type_name: std::any::type_name::<Self>().to_string(),
76 }
77 }
78}
79
80#[cfg(test)]
81mod tests {
82 use super::*;
83 use futures::StreamExt;
84 use futures::stream;
85
86 #[tokio::test]
87 async fn test_window_basic() {
88 let mut transformer = WindowTransformer::new(3);
89 let input = stream::iter(vec![1, 2, 3, 4, 5, 6, 7].into_iter());
90 let boxed_input = Box::pin(input);
91
92 let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
93
94 assert_eq!(
97 result,
98 vec![
99 vec![1, 2, 3],
100 vec![2, 3, 4],
101 vec![3, 4, 5],
102 vec![4, 5, 6],
103 vec![5, 6, 7],
104 vec![6, 7] ]
106 );
107 }
108
109 #[tokio::test]
110 async fn test_window_empty_input() {
111 let mut transformer = WindowTransformer::new(3);
112 let input = stream::iter(Vec::<i32>::new());
113 let boxed_input = Box::pin(input);
114
115 let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
116
117 assert_eq!(result, Vec::<Vec<i32>>::new());
118 }
119
120 #[tokio::test]
121 async fn test_error_handling_strategies() {
122 let transformer = WindowTransformer::new(3)
123 .with_error_strategy(ErrorStrategy::<i32>::Skip)
124 .with_name("test_transformer".to_string());
125
126 let config = transformer.config();
127 assert_eq!(config.error_strategy(), ErrorStrategy::<i32>::Skip);
128 assert_eq!(config.name(), Some("test_transformer".to_string()));
129 }
130
131 #[tokio::test]
132 async fn test_window_size_one() {
133 let mut transformer = WindowTransformer::new(1);
134 let input = stream::iter(vec![1, 2, 3].into_iter());
135 let boxed_input = Box::pin(input);
136
137 let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
138
139 assert_eq!(result, vec![vec![1], vec![2], vec![3]]);
140 }
141
142 #[tokio::test]
143 async fn test_window_larger_than_input() {
144 let mut transformer = WindowTransformer::new(10);
145 let input = stream::iter(vec![1, 2, 3].into_iter());
146 let boxed_input = Box::pin(input);
147
148 let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
149
150 assert_eq!(result, vec![vec![1, 2, 3]]);
152 }
153
154 #[tokio::test]
155 async fn test_set_config_impl() {
156 let mut transformer = WindowTransformer::<i32>::new(3);
157 let new_config = TransformerConfig::<i32> {
158 name: Some("test_transformer".to_string()),
159 error_strategy: ErrorStrategy::<i32>::Skip,
160 };
161
162 transformer.set_config_impl(new_config.clone());
163 assert_eq!(transformer.get_config_impl().name, new_config.name);
164 assert_eq!(
165 transformer.get_config_impl().error_strategy,
166 new_config.error_strategy
167 );
168 }
169
170 #[tokio::test]
171 async fn test_get_config_mut_impl() {
172 let mut transformer = WindowTransformer::<i32>::new(3);
173 let config_mut = transformer.get_config_mut_impl();
174 config_mut.name = Some("mutated_name".to_string());
175 assert_eq!(
176 transformer.get_config_impl().name,
177 Some("mutated_name".to_string())
178 );
179 }
180
181 #[tokio::test]
182 async fn test_handle_error_stop() {
183 let transformer = WindowTransformer::new(3).with_error_strategy(ErrorStrategy::<i32>::Stop);
184 let error = StreamError::new(
185 Box::new(std::io::Error::other("test error")),
186 ErrorContext::default(),
187 ComponentInfo::default(),
188 );
189 assert_eq!(transformer.handle_error(&error), ErrorAction::Stop);
190 }
191
192 #[tokio::test]
193 async fn test_handle_error_skip() {
194 let transformer = WindowTransformer::new(3).with_error_strategy(ErrorStrategy::<i32>::Skip);
195 let error = StreamError::new(
196 Box::new(std::io::Error::other("test error")),
197 ErrorContext::default(),
198 ComponentInfo::default(),
199 );
200 assert_eq!(transformer.handle_error(&error), ErrorAction::Skip);
201 }
202
203 #[tokio::test]
204 async fn test_handle_error_retry_within_limit() {
205 let transformer = WindowTransformer::new(3).with_error_strategy(ErrorStrategy::<i32>::Retry(5));
206 let mut error = StreamError::new(
207 Box::new(std::io::Error::other("test error")),
208 ErrorContext::default(),
209 ComponentInfo::default(),
210 );
211 error.retries = 3;
212 assert_eq!(transformer.handle_error(&error), ErrorAction::Retry);
213 }
214
215 #[tokio::test]
216 async fn test_handle_error_retry_exceeds_limit() {
217 let transformer = WindowTransformer::new(3).with_error_strategy(ErrorStrategy::<i32>::Retry(5));
218 let mut error = StreamError::new(
219 Box::new(std::io::Error::other("test error")),
220 ErrorContext::default(),
221 ComponentInfo::default(),
222 );
223 error.retries = 5;
224 assert_eq!(transformer.handle_error(&error), ErrorAction::Stop);
225 }
226
227 #[tokio::test]
228 async fn test_create_error_context() {
229 let transformer = WindowTransformer::new(3).with_name("test_transformer".to_string());
230 let context = transformer.create_error_context(Some(42));
231 assert_eq!(context.item, Some(42));
232 assert_eq!(context.component_name, "test_transformer");
233 assert!(context.timestamp <= chrono::Utc::now());
234 }
235
236 #[tokio::test]
237 async fn test_create_error_context_no_item() {
238 let transformer = WindowTransformer::<i32>::new(3).with_name("test_transformer".to_string());
239 let context = transformer.create_error_context(None);
240 assert_eq!(context.item, None);
241 assert_eq!(context.component_name, "test_transformer");
242 }
243
244 #[tokio::test]
245 async fn test_component_info() {
246 let transformer = WindowTransformer::<i32>::new(3).with_name("test_transformer".to_string());
247 let info = transformer.component_info();
248 assert_eq!(info.name, "test_transformer");
249 assert_eq!(
250 info.type_name,
251 std::any::type_name::<WindowTransformer<i32>>()
252 );
253 }
254
255 #[tokio::test]
256 async fn test_component_info_default_name() {
257 let transformer = WindowTransformer::<i32>::new(3);
258 let info = transformer.component_info();
259 assert_eq!(info.name, "window_transformer");
260 assert_eq!(
261 info.type_name,
262 std::any::type_name::<WindowTransformer<i32>>()
263 );
264 }
265}