streamweave_window/transformers/
time_transformer.rs

1use crate::time_window_transformer::TimeWindowTransformer;
2use async_trait::async_trait;
3use futures::StreamExt;
4use std::collections::VecDeque;
5use streamweave::{Transformer, TransformerConfig};
6use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
7use tokio::time::interval;
8
9#[async_trait]
10impl<T: std::fmt::Debug + Clone + Send + Sync + 'static> Transformer for TimeWindowTransformer<T> {
11  type InputPorts = (T,);
12  type OutputPorts = (Vec<T>,);
13
14  fn transform(&mut self, input: Self::InputStream) -> Self::OutputStream {
15    let duration = self.duration;
16    Box::pin(async_stream::stream! {
17      let mut window: VecDeque<T> = VecDeque::new();
18      let mut input = input;
19      let mut interval = interval(duration);
20
21      // Skip the first tick (interval starts immediately)
22      interval.tick().await;
23
24      loop {
25        tokio::select! {
26          // Check if interval has ticked (time window expired)
27          _ = interval.tick() => {
28            // Time window expired, emit current window if it has items
29            if !window.is_empty() {
30              let window_vec: Vec<T> = window.iter().cloned().collect();
31              yield window_vec;
32              window.clear();
33            }
34          }
35          // Check for new items from input stream
36          item = input.next() => {
37            match item {
38              Some(item) => {
39                window.push_back(item);
40              }
41              None => {
42                // Stream ended, emit any remaining items as partial window
43                if !window.is_empty() {
44                  yield window.iter().cloned().collect::<Vec<_>>();
45                }
46                break;
47              }
48            }
49          }
50        }
51      }
52    })
53  }
54
55  fn set_config_impl(&mut self, config: TransformerConfig<T>) {
56    self.config = config;
57  }
58
59  fn get_config_impl(&self) -> &TransformerConfig<T> {
60    &self.config
61  }
62
63  fn get_config_mut_impl(&mut self) -> &mut TransformerConfig<T> {
64    &mut self.config
65  }
66
67  fn handle_error(&self, error: &StreamError<T>) -> ErrorAction {
68    match self.config.error_strategy {
69      ErrorStrategy::Stop => ErrorAction::Stop,
70      ErrorStrategy::Skip => ErrorAction::Skip,
71      ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
72      _ => ErrorAction::Stop,
73    }
74  }
75
76  fn create_error_context(&self, item: Option<T>) -> ErrorContext<T> {
77    ErrorContext {
78      timestamp: chrono::Utc::now(),
79      item,
80      component_name: self.component_info().name,
81      component_type: std::any::type_name::<Self>().to_string(),
82    }
83  }
84
85  fn component_info(&self) -> ComponentInfo {
86    ComponentInfo {
87      name: self
88        .config
89        .name
90        .clone()
91        .unwrap_or_else(|| "time_window_transformer".to_string()),
92      type_name: std::any::type_name::<Self>().to_string(),
93    }
94  }
95}
96
97#[cfg(test)]
98mod tests {
99  use super::*;
100  use futures::StreamExt;
101  use futures::stream;
102  use std::time::Duration;
103
104  #[tokio::test]
105  async fn test_time_window_basic() {
106    let mut transformer = TimeWindowTransformer::new(Duration::from_millis(100));
107    let input = stream::iter(vec![1, 2, 3, 4, 5].into_iter());
108    let boxed_input = Box::pin(input);
109
110    let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
111
112    // Should have at least one window
113    assert!(!result.is_empty());
114    // Should have received all items
115    let total_items: usize = result.iter().map(|w| w.len()).sum();
116    assert_eq!(total_items, 5);
117  }
118
119  #[tokio::test]
120  async fn test_time_window_empty_input() {
121    let mut transformer = TimeWindowTransformer::new(Duration::from_millis(100));
122    let input = stream::iter(Vec::<i32>::new());
123    let boxed_input = Box::pin(input);
124
125    let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
126
127    assert_eq!(result, Vec::<Vec<i32>>::new());
128  }
129
130  #[tokio::test]
131  async fn test_error_handling_strategies() {
132    let transformer = TimeWindowTransformer::new(Duration::from_secs(1))
133      .with_error_strategy(ErrorStrategy::<i32>::Skip)
134      .with_name("test_transformer".to_string());
135
136    let config = transformer.config();
137    assert_eq!(config.error_strategy(), ErrorStrategy::<i32>::Skip);
138    assert_eq!(config.name(), Some("test_transformer".to_string()));
139  }
140}