streamweave_transformers/timeout/
transformer.rs1use super::timeout_transformer::TimeoutTransformer;
2use async_stream;
3use async_trait::async_trait;
4use futures::StreamExt;
5use streamweave::{Transformer, TransformerConfig};
6use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
7
8#[async_trait]
9impl<T: std::fmt::Debug + Clone + Send + Sync + 'static> Transformer for TimeoutTransformer<T> {
10 type InputPorts = (T,);
11 type OutputPorts = (T,);
12
13 fn transform(&mut self, input: Self::InputStream) -> Self::OutputStream {
14 let duration = self.duration;
15 Box::pin(async_stream::stream! {
16 let mut input = input;
17
18 loop {
19 match tokio::time::timeout(duration, input.next()).await {
20 Ok(Some(item)) => yield item,
21 Ok(None) => break,
22 Err(_) => {
23 break;
25 }
26 }
27 }
28 })
29 }
30
31 fn set_config_impl(&mut self, config: TransformerConfig<T>) {
32 self.config = config;
33 }
34
35 fn get_config_impl(&self) -> &TransformerConfig<T> {
36 &self.config
37 }
38
39 fn get_config_mut_impl(&mut self) -> &mut TransformerConfig<T> {
40 &mut self.config
41 }
42
43 fn handle_error(&self, error: &StreamError<T>) -> ErrorAction {
44 match self.config.error_strategy {
45 ErrorStrategy::Stop => ErrorAction::Stop,
46 ErrorStrategy::Skip => ErrorAction::Skip,
47 ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
48 _ => ErrorAction::Stop,
49 }
50 }
51
52 fn create_error_context(&self, item: Option<T>) -> ErrorContext<T> {
53 ErrorContext {
54 timestamp: chrono::Utc::now(),
55 item,
56 component_name: self.component_info().name,
57 component_type: std::any::type_name::<Self>().to_string(),
58 }
59 }
60
61 fn component_info(&self) -> ComponentInfo {
62 ComponentInfo {
63 name: self
64 .config
65 .name
66 .clone()
67 .unwrap_or_else(|| "timeout_transformer".to_string()),
68 type_name: std::any::type_name::<Self>().to_string(),
69 }
70 }
71}
72
73#[cfg(test)]
74mod tests {
75 use super::*;
76 use futures::StreamExt;
77 use futures::stream;
78 use std::time::Duration;
79 use tokio::time::sleep;
80
81 #[tokio::test]
82 async fn test_timeout_basic() {
83 let mut transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100));
84 let input = stream::iter(vec![1, 2, 3].into_iter());
85 let boxed_input = Box::pin(input);
86
87 let result: Vec<i32> = transformer.transform(boxed_input).collect().await;
88
89 assert_eq!(result, vec![1, 2, 3]);
90 }
91
92 #[tokio::test]
93 async fn test_timeout_empty_input() {
94 let mut transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100));
95 let input = stream::iter(Vec::<i32>::new());
96 let boxed_input = Box::pin(input);
97
98 let result: Vec<i32> = transformer.transform(boxed_input).collect().await;
99
100 assert_eq!(result, Vec::<i32>::new());
101 }
102
103 #[tokio::test]
104 async fn test_timeout_actual_timeout() {
105 let mut transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(50));
106 let input = stream::iter(vec![1, 2, 3].into_iter()).then(|x| async move {
107 sleep(Duration::from_millis(100)).await;
108 x
109 });
110 let boxed_input = Box::pin(input);
111
112 let result: Vec<i32> = transformer.transform(boxed_input).collect().await;
113
114 assert!(result.is_empty());
115 }
116
117 #[tokio::test]
118 async fn test_error_handling_strategies() {
119 let transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100))
120 .with_error_strategy(ErrorStrategy::<i32>::Skip)
121 .with_name("test_transformer".to_string());
122
123 let config = transformer.config();
124 assert_eq!(config.error_strategy(), ErrorStrategy::<i32>::Skip);
125 assert_eq!(config.name(), Some("test_transformer".to_string()));
126 }
127
128 #[tokio::test]
129 async fn test_set_config_impl() {
130 let mut transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100));
131 let new_config = TransformerConfig::<i32> {
132 name: Some("test_transformer".to_string()),
133 error_strategy: ErrorStrategy::<i32>::Skip,
134 };
135
136 transformer.set_config_impl(new_config.clone());
137 assert_eq!(transformer.get_config_impl().name, new_config.name);
138 assert_eq!(
139 transformer.get_config_impl().error_strategy,
140 new_config.error_strategy
141 );
142 }
143
144 #[tokio::test]
145 async fn test_get_config_mut_impl() {
146 let mut transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100));
147 let config_mut = transformer.get_config_mut_impl();
148 config_mut.name = Some("mutated_name".to_string());
149 assert_eq!(
150 transformer.get_config_impl().name,
151 Some("mutated_name".to_string())
152 );
153 }
154
155 #[tokio::test]
156 async fn test_handle_error_stop() {
157 let transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100))
158 .with_error_strategy(ErrorStrategy::<i32>::Stop);
159 let error = StreamError::new(
160 Box::new(std::io::Error::other("test error")),
161 ErrorContext::default(),
162 ComponentInfo::default(),
163 );
164 assert_eq!(transformer.handle_error(&error), ErrorAction::Stop);
165 }
166
167 #[tokio::test]
168 async fn test_handle_error_skip() {
169 let transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100))
170 .with_error_strategy(ErrorStrategy::<i32>::Skip);
171 let error = StreamError::new(
172 Box::new(std::io::Error::other("test error")),
173 ErrorContext::default(),
174 ComponentInfo::default(),
175 );
176 assert_eq!(transformer.handle_error(&error), ErrorAction::Skip);
177 }
178
179 #[tokio::test]
180 async fn test_handle_error_retry_within_limit() {
181 let transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100))
182 .with_error_strategy(ErrorStrategy::<i32>::Retry(5));
183 let mut error = StreamError::new(
184 Box::new(std::io::Error::other("test error")),
185 ErrorContext::default(),
186 ComponentInfo::default(),
187 );
188 error.retries = 3;
189 assert_eq!(transformer.handle_error(&error), ErrorAction::Retry);
190 }
191
192 #[tokio::test]
193 async fn test_handle_error_retry_exceeds_limit() {
194 let transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100))
195 .with_error_strategy(ErrorStrategy::<i32>::Retry(5));
196 let mut error = StreamError::new(
197 Box::new(std::io::Error::other("test error")),
198 ErrorContext::default(),
199 ComponentInfo::default(),
200 );
201 error.retries = 5;
202 assert_eq!(transformer.handle_error(&error), ErrorAction::Stop);
203 }
204
205 #[tokio::test]
206 async fn test_create_error_context() {
207 let transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100))
208 .with_name("test_transformer".to_string());
209 let context = transformer.create_error_context(Some(42));
210 assert_eq!(context.item, Some(42));
211 assert_eq!(context.component_name, "test_transformer");
212 assert!(context.timestamp <= chrono::Utc::now());
213 }
214
215 #[tokio::test]
216 async fn test_create_error_context_no_item() {
217 let transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100))
218 .with_name("test_transformer".to_string());
219 let context = transformer.create_error_context(None);
220 assert_eq!(context.item, None);
221 assert_eq!(context.component_name, "test_transformer");
222 }
223
224 #[tokio::test]
225 async fn test_component_info() {
226 let transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100))
227 .with_name("test_transformer".to_string());
228 let info = transformer.component_info();
229 assert_eq!(info.name, "test_transformer");
230 assert_eq!(
231 info.type_name,
232 std::any::type_name::<TimeoutTransformer<i32>>()
233 );
234 }
235
236 #[tokio::test]
237 async fn test_component_info_default_name() {
238 let transformer = TimeoutTransformer::<i32>::new(Duration::from_millis(100));
239 let info = transformer.component_info();
240 assert_eq!(info.name, "timeout_transformer");
241 assert_eq!(
242 info.type_name,
243 std::any::type_name::<TimeoutTransformer<i32>>()
244 );
245 }
246}