streamweave_transformers/timeout/
transformer.rs

1use super::timeout_transformer::TimeoutTransformer;
2use async_stream;
3use async_trait::async_trait;
4use futures::StreamExt;
5use streamweave::{Transformer, TransformerConfig};
6use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
7
8#[async_trait]
9impl<T: std::fmt::Debug + Clone + Send + Sync + 'static> Transformer for TimeoutTransformer<T> {
10  type InputPorts = (T,);
11  type OutputPorts = (T,);
12
13  fn transform(&mut self, input: Self::InputStream) -> Self::OutputStream {
14    let duration = self.duration;
15    Box::pin(async_stream::stream! {
16      let mut input = input;
17
18      loop {
19        match tokio::time::timeout(duration, input.next()).await {
20          Ok(Some(item)) => yield item,
21          Ok(None) => break,
22          Err(_) => {
23            // On timeout, stop the stream
24            break;
25          }
26        }
27      }
28    })
29  }
30
31  fn set_config_impl(&mut self, config: TransformerConfig<T>) {
32    self.config = config;
33  }
34
35  fn get_config_impl(&self) -> &TransformerConfig<T> {
36    &self.config
37  }
38
39  fn get_config_mut_impl(&mut self) -> &mut TransformerConfig<T> {
40    &mut self.config
41  }
42
43  fn handle_error(&self, error: &StreamError<T>) -> ErrorAction {
44    match self.config.error_strategy {
45      ErrorStrategy::Stop => ErrorAction::Stop,
46      ErrorStrategy::Skip => ErrorAction::Skip,
47      ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
48      _ => ErrorAction::Stop,
49    }
50  }
51
52  fn create_error_context(&self, item: Option<T>) -> ErrorContext<T> {
53    ErrorContext {
54      timestamp: chrono::Utc::now(),
55      item,
56      component_name: self.component_info().name,
57      component_type: std::any::type_name::<Self>().to_string(),
58    }
59  }
60
61  fn component_info(&self) -> ComponentInfo {
62    ComponentInfo {
63      name: self
64        .config
65        .name
66        .clone()
67        .unwrap_or_else(|| "timeout_transformer".to_string()),
68      type_name: std::any::type_name::<Self>().to_string(),
69    }
70  }
71}
72
73#[cfg(test)]
74mod tests {
75  use super::*;
76  use futures::StreamExt;
77  use futures::stream;
78  use std::time::Duration;
79  use tokio::time::sleep;
80
81  #[tokio::test]
82  async fn test_timeout_basic() {
83    let mut transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100));
84    let input = stream::iter(vec![1, 2, 3].into_iter());
85    let boxed_input = Box::pin(input);
86
87    let result: Vec<i32> = transformer.transform(boxed_input).collect().await;
88
89    assert_eq!(result, vec![1, 2, 3]);
90  }
91
92  #[tokio::test]
93  async fn test_timeout_empty_input() {
94    let mut transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100));
95    let input = stream::iter(Vec::<i32>::new());
96    let boxed_input = Box::pin(input);
97
98    let result: Vec<i32> = transformer.transform(boxed_input).collect().await;
99
100    assert_eq!(result, Vec::<i32>::new());
101  }
102
103  #[tokio::test]
104  async fn test_timeout_actual_timeout() {
105    let mut transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(50));
106    let input = stream::iter(vec![1, 2, 3].into_iter()).then(|x| async move {
107      sleep(Duration::from_millis(100)).await;
108      x
109    });
110    let boxed_input = Box::pin(input);
111
112    let result: Vec<i32> = transformer.transform(boxed_input).collect().await;
113
114    assert!(result.is_empty());
115  }
116
117  #[tokio::test]
118  async fn test_error_handling_strategies() {
119    let transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100))
120      .with_error_strategy(ErrorStrategy::<i32>::Skip)
121      .with_name("test_transformer".to_string());
122
123    let config = transformer.config();
124    assert_eq!(config.error_strategy(), ErrorStrategy::<i32>::Skip);
125    assert_eq!(config.name(), Some("test_transformer".to_string()));
126  }
127
128  #[tokio::test]
129  async fn test_set_config_impl() {
130    let mut transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100));
131    let new_config = TransformerConfig::<i32> {
132      name: Some("test_transformer".to_string()),
133      error_strategy: ErrorStrategy::<i32>::Skip,
134    };
135
136    transformer.set_config_impl(new_config.clone());
137    assert_eq!(transformer.get_config_impl().name, new_config.name);
138    assert_eq!(
139      transformer.get_config_impl().error_strategy,
140      new_config.error_strategy
141    );
142  }
143
144  #[tokio::test]
145  async fn test_get_config_mut_impl() {
146    let mut transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100));
147    let config_mut = transformer.get_config_mut_impl();
148    config_mut.name = Some("mutated_name".to_string());
149    assert_eq!(
150      transformer.get_config_impl().name,
151      Some("mutated_name".to_string())
152    );
153  }
154
155  #[tokio::test]
156  async fn test_handle_error_stop() {
157    let transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100))
158      .with_error_strategy(ErrorStrategy::<i32>::Stop);
159    let error = StreamError::new(
160      Box::new(std::io::Error::other("test error")),
161      ErrorContext::default(),
162      ComponentInfo::default(),
163    );
164    assert_eq!(transformer.handle_error(&error), ErrorAction::Stop);
165  }
166
167  #[tokio::test]
168  async fn test_handle_error_skip() {
169    let transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100))
170      .with_error_strategy(ErrorStrategy::<i32>::Skip);
171    let error = StreamError::new(
172      Box::new(std::io::Error::other("test error")),
173      ErrorContext::default(),
174      ComponentInfo::default(),
175    );
176    assert_eq!(transformer.handle_error(&error), ErrorAction::Skip);
177  }
178
179  #[tokio::test]
180  async fn test_handle_error_retry_within_limit() {
181    let transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100))
182      .with_error_strategy(ErrorStrategy::<i32>::Retry(5));
183    let mut error = StreamError::new(
184      Box::new(std::io::Error::other("test error")),
185      ErrorContext::default(),
186      ComponentInfo::default(),
187    );
188    error.retries = 3;
189    assert_eq!(transformer.handle_error(&error), ErrorAction::Retry);
190  }
191
192  #[tokio::test]
193  async fn test_handle_error_retry_exceeds_limit() {
194    let transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100))
195      .with_error_strategy(ErrorStrategy::<i32>::Retry(5));
196    let mut error = StreamError::new(
197      Box::new(std::io::Error::other("test error")),
198      ErrorContext::default(),
199      ComponentInfo::default(),
200    );
201    error.retries = 5;
202    assert_eq!(transformer.handle_error(&error), ErrorAction::Stop);
203  }
204
205  #[tokio::test]
206  async fn test_create_error_context() {
207    let transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100))
208      .with_name("test_transformer".to_string());
209    let context = transformer.create_error_context(Some(42));
210    assert_eq!(context.item, Some(42));
211    assert_eq!(context.component_name, "test_transformer");
212    assert!(context.timestamp <= chrono::Utc::now());
213  }
214
215  #[tokio::test]
216  async fn test_create_error_context_no_item() {
217    let transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100))
218      .with_name("test_transformer".to_string());
219    let context = transformer.create_error_context(None);
220    assert_eq!(context.item, None);
221    assert_eq!(context.component_name, "test_transformer");
222  }
223
224  #[tokio::test]
225  async fn test_component_info() {
226    let transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100))
227      .with_name("test_transformer".to_string());
228    let info = transformer.component_info();
229    assert_eq!(info.name, "test_transformer");
230    assert_eq!(
231      info.type_name,
232      std::any::type_name::<TimeoutTransformer<i32>>()
233    );
234  }
235
236  #[tokio::test]
237  async fn test_component_info_default_name() {
238    let transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100));
239    let info = transformer.component_info();
240    assert_eq!(info.name, "timeout_transformer");
241    assert_eq!(
242      info.type_name,
243      std::any::type_name::<TimeoutTransformer<i32>>()
244    );
245  }
246}