streamweave_transformers/timeout/timeout_transformer.rs
1use streamweave::TransformerConfig;
2use streamweave_error::ErrorStrategy;
3use tokio::time::Duration;
4
5/// A transformer that applies a timeout to stream processing.
6///
7/// This transformer ensures that items are processed within a specified time limit,
8/// failing if the timeout is exceeded.
9#[derive(Clone)]
10pub struct TimeoutTransformer<T: std::fmt::Debug + Clone + Send + Sync + 'static> {
11 /// The maximum duration allowed for processing each item.
12 pub duration: Duration,
13 /// Configuration for the transformer, including error handling strategy.
14 pub config: TransformerConfig<T>,
15 /// Phantom data to track the type parameter.
16 pub _phantom: std::marker::PhantomData<T>,
17}
18
19impl<T: std::fmt::Debug + Clone + Send + Sync + 'static> TimeoutTransformer<T> {
20 /// Creates a new `TimeoutTransformer` with the given duration.
21 ///
22 /// # Arguments
23 ///
24 /// * `duration` - The maximum duration allowed for processing each item.
25 pub fn new(duration: Duration) -> Self {
26 Self {
27 duration,
28 config: TransformerConfig::<T>::default(),
29 _phantom: std::marker::PhantomData,
30 }
31 }
32
33 /// Sets the error handling strategy for this transformer.
34 ///
35 /// # Arguments
36 ///
37 /// * `strategy` - The error handling strategy to use.
38 pub fn with_error_strategy(mut self, strategy: ErrorStrategy<T>) -> Self {
39 self.config.error_strategy = strategy;
40 self
41 }
42
43 /// Sets the name for this transformer.
44 ///
45 /// # Arguments
46 ///
47 /// * `name` - The name to assign to this transformer.
48 pub fn with_name(mut self, name: String) -> Self {
49 self.config.name = Some(name);
50 self
51 }
52}