streamweave_window/transformers/
transformer.rs

1use crate::window_transformer::WindowTransformer;
2use async_trait::async_trait;
3use futures::StreamExt;
4use std::collections::VecDeque;
5use streamweave::{Transformer, TransformerConfig};
6use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
7
8#[async_trait]
9impl<T: std::fmt::Debug + Clone + Send + Sync + 'static> Transformer for WindowTransformer<T> {
10  type InputPorts = (T,);
11  type OutputPorts = (Vec<T>,);
12
13  fn transform(&mut self, input: Self::InputStream) -> Self::OutputStream {
14    let size = self.size;
15    Box::pin(async_stream::stream! {
16      let mut window: VecDeque<T> = VecDeque::with_capacity(size);
17      let mut input = input;
18
19      while let Some(item) = input.next().await {
20        window.push_back(item);
21
22        if window.len() == size {
23          // Efficiently yield the current window as a Vec
24          let window_vec: Vec<T> = window.iter().cloned().collect();
25          yield window_vec;
26          // Slide the window by removing the oldest item
27          window.pop_front();
28        }
29      }
30
31      // Emit any remaining items as a partial window
32      if !window.is_empty() {
33        yield window.iter().cloned().collect::<Vec<_>>();
34      }
35    })
36  }
37
38  fn set_config_impl(&mut self, config: TransformerConfig<T>) {
39    self.config = config;
40  }
41
42  fn get_config_impl(&self) -> &TransformerConfig<T> {
43    &self.config
44  }
45
46  fn get_config_mut_impl(&mut self) -> &mut TransformerConfig<T> {
47    &mut self.config
48  }
49
50  fn handle_error(&self, error: &StreamError<T>) -> ErrorAction {
51    match self.config.error_strategy {
52      ErrorStrategy::Stop => ErrorAction::Stop,
53      ErrorStrategy::Skip => ErrorAction::Skip,
54      ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
55      _ => ErrorAction::Stop,
56    }
57  }
58
59  fn create_error_context(&self, item: Option<T>) -> ErrorContext<T> {
60    ErrorContext {
61      timestamp: chrono::Utc::now(),
62      item,
63      component_name: self.component_info().name,
64      component_type: std::any::type_name::<Self>().to_string(),
65    }
66  }
67
68  fn component_info(&self) -> ComponentInfo {
69    ComponentInfo {
70      name: self
71        .config
72        .name
73        .clone()
74        .unwrap_or_else(|| "window_transformer".to_string()),
75      type_name: std::any::type_name::<Self>().to_string(),
76    }
77  }
78}
79
80#[cfg(test)]
81mod tests {
82  use super::*;
83  use futures::StreamExt;
84  use futures::stream;
85
86  #[tokio::test]
87  async fn test_window_basic() {
88    let mut transformer = WindowTransformer::new(3);
89    let input = stream::iter(vec![1, 2, 3, 4, 5, 6, 7].into_iter());
90    let boxed_input = Box::pin(input);
91
92    let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
93
94    // Sliding window of size 3: each window overlaps by 2 items
95    // The transformer also emits a partial window at the end if there are remaining items
96    assert_eq!(
97      result,
98      vec![
99        vec![1, 2, 3],
100        vec![2, 3, 4],
101        vec![3, 4, 5],
102        vec![4, 5, 6],
103        vec![5, 6, 7],
104        vec![6, 7] // Partial window at the end
105      ]
106    );
107  }
108
109  #[tokio::test]
110  async fn test_window_empty_input() {
111    let mut transformer = WindowTransformer::new(3);
112    let input = stream::iter(Vec::<i32>::new());
113    let boxed_input = Box::pin(input);
114
115    let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
116
117    assert_eq!(result, Vec::<Vec<i32>>::new());
118  }
119
120  #[tokio::test]
121  async fn test_error_handling_strategies() {
122    let transformer = WindowTransformer::new(3)
123      .with_error_strategy(ErrorStrategy::<i32>::Skip)
124      .with_name("test_transformer".to_string());
125
126    let config = transformer.config();
127    assert_eq!(config.error_strategy(), ErrorStrategy::<i32>::Skip);
128    assert_eq!(config.name(), Some("test_transformer".to_string()));
129  }
130
131  #[tokio::test]
132  async fn test_window_size_one() {
133    let mut transformer = WindowTransformer::new(1);
134    let input = stream::iter(vec![1, 2, 3].into_iter());
135    let boxed_input = Box::pin(input);
136
137    let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
138
139    assert_eq!(result, vec![vec![1], vec![2], vec![3]]);
140  }
141
142  #[tokio::test]
143  async fn test_window_larger_than_input() {
144    let mut transformer = WindowTransformer::new(10);
145    let input = stream::iter(vec![1, 2, 3].into_iter());
146    let boxed_input = Box::pin(input);
147
148    let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
149
150    // Should emit partial window at the end
151    assert_eq!(result, vec![vec![1, 2, 3]]);
152  }
153
154  #[tokio::test]
155  async fn test_set_config_impl() {
156    let mut transformer = WindowTransformer::<i32>::new(3);
157    let new_config = TransformerConfig::<i32> {
158      name: Some("test_transformer".to_string()),
159      error_strategy: ErrorStrategy::<i32>::Skip,
160    };
161
162    transformer.set_config_impl(new_config.clone());
163    assert_eq!(transformer.get_config_impl().name, new_config.name);
164    assert_eq!(
165      transformer.get_config_impl().error_strategy,
166      new_config.error_strategy
167    );
168  }
169
170  #[tokio::test]
171  async fn test_get_config_mut_impl() {
172    let mut transformer = WindowTransformer::<i32>::new(3);
173    let config_mut = transformer.get_config_mut_impl();
174    config_mut.name = Some("mutated_name".to_string());
175    assert_eq!(
176      transformer.get_config_impl().name,
177      Some("mutated_name".to_string())
178    );
179  }
180
181  #[tokio::test]
182  async fn test_handle_error_stop() {
183    let transformer = WindowTransformer::new(3).with_error_strategy(ErrorStrategy::<i32>::Stop);
184    let error = StreamError::new(
185      Box::new(std::io::Error::other("test error")),
186      ErrorContext::default(),
187      ComponentInfo::default(),
188    );
189    assert_eq!(transformer.handle_error(&error), ErrorAction::Stop);
190  }
191
192  #[tokio::test]
193  async fn test_handle_error_skip() {
194    let transformer = WindowTransformer::new(3).with_error_strategy(ErrorStrategy::<i32>::Skip);
195    let error = StreamError::new(
196      Box::new(std::io::Error::other("test error")),
197      ErrorContext::default(),
198      ComponentInfo::default(),
199    );
200    assert_eq!(transformer.handle_error(&error), ErrorAction::Skip);
201  }
202
203  #[tokio::test]
204  async fn test_handle_error_retry_within_limit() {
205    let transformer = WindowTransformer::new(3).with_error_strategy(ErrorStrategy::<i32>::Retry(5));
206    let mut error = StreamError::new(
207      Box::new(std::io::Error::other("test error")),
208      ErrorContext::default(),
209      ComponentInfo::default(),
210    );
211    error.retries = 3;
212    assert_eq!(transformer.handle_error(&error), ErrorAction::Retry);
213  }
214
215  #[tokio::test]
216  async fn test_handle_error_retry_exceeds_limit() {
217    let transformer = WindowTransformer::new(3).with_error_strategy(ErrorStrategy::<i32>::Retry(5));
218    let mut error = StreamError::new(
219      Box::new(std::io::Error::other("test error")),
220      ErrorContext::default(),
221      ComponentInfo::default(),
222    );
223    error.retries = 5;
224    assert_eq!(transformer.handle_error(&error), ErrorAction::Stop);
225  }
226
227  #[tokio::test]
228  async fn test_create_error_context() {
229    let transformer = WindowTransformer::new(3).with_name("test_transformer".to_string());
230    let context = transformer.create_error_context(Some(42));
231    assert_eq!(context.item, Some(42));
232    assert_eq!(context.component_name, "test_transformer");
233    assert!(context.timestamp <= chrono::Utc::now());
234  }
235
236  #[tokio::test]
237  async fn test_create_error_context_no_item() {
238    let transformer = WindowTransformer::<i32>::new(3).with_name("test_transformer".to_string());
239    let context = transformer.create_error_context(None);
240    assert_eq!(context.item, None);
241    assert_eq!(context.component_name, "test_transformer");
242  }
243
244  #[tokio::test]
245  async fn test_component_info() {
246    let transformer = WindowTransformer::<i32>::new(3).with_name("test_transformer".to_string());
247    let info = transformer.component_info();
248    assert_eq!(info.name, "test_transformer");
249    assert_eq!(
250      info.type_name,
251      std::any::type_name::<WindowTransformer<i32>>()
252    );
253  }
254
255  #[tokio::test]
256  async fn test_component_info_default_name() {
257    let transformer = WindowTransformer::<i32>::new(3);
258    let info = transformer.component_info();
259    assert_eq!(info.name, "window_transformer");
260    assert_eq!(
261      info.type_name,
262      std::any::type_name::<WindowTransformer<i32>>()
263    );
264  }
265}