streamweave_transformers/timeout/
timeout_transformer.rs

1use streamweave::TransformerConfig;
2use streamweave_error::ErrorStrategy;
3use tokio::time::Duration;
4
5/// A transformer that applies a timeout to stream processing.
6///
7/// This transformer ensures that items are processed within a specified time limit,
8/// failing if the timeout is exceeded.
9#[derive(Clone)]
10pub struct TimeoutTransformer<T: std::fmt::Debug + Clone + Send + Sync + 'static> {
11  /// The maximum duration allowed for processing each item.
12  pub duration: Duration,
13  /// Configuration for the transformer, including error handling strategy.
14  pub config: TransformerConfig<T>,
15  /// Phantom data to track the type parameter.
16  pub _phantom: std::marker::PhantomData<T>,
17}
18
19impl<T: std::fmt::Debug + Clone + Send + Sync + 'static> TimeoutTransformer<T> {
20  /// Creates a new `TimeoutTransformer` with the given duration.
21  ///
22  /// # Arguments
23  ///
24  /// * `duration` - The maximum duration allowed for processing each item.
25  pub fn new(duration: Duration) -> Self {
26    Self {
27      duration,
28      config: TransformerConfig::<T>::default(),
29      _phantom: std::marker::PhantomData,
30    }
31  }
32
33  /// Sets the error handling strategy for this transformer.
34  ///
35  /// # Arguments
36  ///
37  /// * `strategy` - The error handling strategy to use.
38  pub fn with_error_strategy(mut self, strategy: ErrorStrategy<T>) -> Self {
39    self.config.error_strategy = strategy;
40    self
41  }
42
43  /// Sets the name for this transformer.
44  ///
45  /// # Arguments
46  ///
47  /// * `name` - The name to assign to this transformer.
48  pub fn with_name(mut self, name: String) -> Self {
49    self.config.name = Some(name);
50    self
51  }
52}