streamweave/
consumer.rs

1use crate::input::Input;
2use crate::port::PortList;
3use async_trait::async_trait;
4use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
5
6/// Helper trait for providing default port types for Consumers.
7///
8/// This trait provides default implementations of the `InputPorts` associated type.
9/// All Consumers automatically get `InputPorts = (Self::Input,)` unless they
10/// explicitly override it.
11pub trait ConsumerPorts: Consumer
12where
13  Self::Input: std::fmt::Debug + Clone + Send + Sync,
14{
15  /// The default input port tuple type (single port with the consumer's input type).
16  type DefaultInputPorts: PortList;
17}
18
19/// Blanket implementation: all Consumers get default single-port input.
20impl<C> ConsumerPorts for C
21where
22  C: Consumer,
23  C::Input: std::fmt::Debug + Clone + Send + Sync,
24{
25  type DefaultInputPorts = (C::Input,);
26}
27
28/// Configuration for a consumer component.
29///
30/// This struct holds configuration options that control how a consumer
31/// behaves, including error handling strategy and component naming.
32#[derive(Debug, Clone)]
33pub struct ConsumerConfig<T: std::fmt::Debug + Clone + Send + Sync> {
34  /// The error handling strategy to use when processing items.
35  pub error_strategy: ErrorStrategy<T>,
36  /// The name of this consumer component.
37  pub name: String,
38}
39
40impl<T: std::fmt::Debug + Clone + Send + Sync> Default for ConsumerConfig<T> {
41  fn default() -> Self {
42    Self {
43      error_strategy: ErrorStrategy::Stop,
44      name: String::new(),
45    }
46  }
47}
48
49/// Trait for components that consume data streams.
50///
51/// Consumers are the end point of a pipeline. They receive processed items
52/// and typically write them to a destination (file, database, console, etc.)
53/// or perform some final action.
54///
55/// # Example
56///
57/// ```rust
58/// use streamweave::prelude::*;
59///
60/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
61/// let mut consumer = VecConsumer::new();
62/// let stream = futures::stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
63/// consumer.consume(stream).await?;
64/// # Ok(())
65/// # }
66/// ```
67///
68/// # Implementations
69///
70/// Common consumer implementations include:
71/// - `VecConsumer` - Collects items into a vector
72/// - `FileConsumer` - Writes data to files
73/// - `KafkaConsumer` - Produces to Kafka topics
74/// - `ConsoleConsumer` - Prints items to console
75#[async_trait]
76pub trait Consumer: Input
77where
78  Self::Input: std::fmt::Debug + Clone + Send + Sync,
79{
80  /// The input port tuple type for this consumer.
81  ///
82  /// This associated type specifies the port tuple that represents this consumer's
83  /// inputs in the graph API. By default, consumers have a single input port
84  /// containing their input type: `(Self::Input,)`.
85  ///
86  /// For multi-port consumers, override this type to specify a tuple with multiple
87  /// input types, e.g., `(i32, String)` for two inputs.
88  type InputPorts: PortList;
89
90  /// Consumes a stream of items.
91  ///
92  /// This method is called by the pipeline to process the final stream.
93  /// The consumer should handle all items in the stream and perform the
94  /// appropriate action (write to file, send to database, etc.).
95  ///
96  /// # Arguments
97  ///
98  /// * `stream` - The stream of items to consume
99  ///
100  /// # Example
101  ///
102  /// ```rust
103  /// use streamweave::prelude::*;
104  ///
105  /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
106  /// let mut consumer = VecConsumer::new();
107  /// let stream = futures::stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
108  /// consumer.consume(stream).await?;
109  /// # Ok(())
110  /// # }
111  /// ```
112  async fn consume(&mut self, stream: Self::InputStream);
113
114  /// Creates a new consumer instance with the given configuration.
115  ///
116  /// # Arguments
117  ///
118  /// * `config` - The configuration to apply to the consumer.
119  #[must_use]
120  fn with_config(&self, config: ConsumerConfig<Self::Input>) -> Self
121  where
122    Self: Sized + Clone,
123  {
124    let mut this = self.clone();
125    this.set_config(config);
126    this
127  }
128
129  /// Sets the configuration for this consumer.
130  ///
131  /// # Arguments
132  ///
133  /// * `config` - The configuration to set.
134  fn set_config(&mut self, config: ConsumerConfig<Self::Input>) {
135    self.set_config_impl(config);
136  }
137
138  /// Returns a reference to the consumer's configuration.
139  fn config(&self) -> &ConsumerConfig<Self::Input> {
140    self.get_config_impl()
141  }
142
143  /// Returns a mutable reference to the consumer's configuration.
144  fn config_mut(&mut self) -> &mut ConsumerConfig<Self::Input> {
145    self.get_config_mut_impl()
146  }
147
148  /// Sets the name for this consumer.
149  ///
150  /// # Arguments
151  ///
152  /// * `name` - The name to assign to this consumer.
153  #[must_use]
154  fn with_name(mut self, name: String) -> Self
155  where
156    Self: Sized,
157  {
158    self.config_mut().name = name.clone();
159    self
160  }
161
162  /// Handles an error according to the consumer's error strategy.
163  ///
164  /// # Arguments
165  ///
166  /// * `error` - The error that occurred.
167  ///
168  /// # Returns
169  ///
170  /// The action to take based on the error strategy.
171  fn handle_error(&self, error: &StreamError<Self::Input>) -> ErrorAction {
172    match &self.config().error_strategy {
173      ErrorStrategy::Stop => ErrorAction::Stop,
174      ErrorStrategy::Skip => ErrorAction::Skip,
175      ErrorStrategy::Retry(n) if error.retries < *n => ErrorAction::Retry,
176      ErrorStrategy::Custom(handler) => handler(error),
177      _ => ErrorAction::Stop,
178    }
179  }
180
181  /// Returns information about this consumer component.
182  fn component_info(&self) -> ComponentInfo {
183    ComponentInfo {
184      name: self.config().name.clone(),
185      type_name: std::any::type_name::<Self>().to_string(),
186    }
187  }
188
189  /// Creates an error context for the given item.
190  ///
191  /// # Arguments
192  ///
193  /// * `item` - The item that caused the error, if any.
194  ///
195  /// # Returns
196  ///
197  /// An error context containing information about when and where the error occurred.
198  fn create_error_context(&self, item: Option<Self::Input>) -> ErrorContext<Self::Input> {
199    ErrorContext {
200      timestamp: chrono::Utc::now(),
201      item,
202      component_name: self.component_info().name,
203      component_type: self.component_info().type_name,
204    }
205  }
206
207  /// Internal implementation for setting configuration.
208  fn set_config_impl(&mut self, config: ConsumerConfig<Self::Input>);
209  /// Internal implementation for getting configuration.
210  fn get_config_impl(&self) -> &ConsumerConfig<Self::Input>;
211  /// Internal implementation for getting mutable configuration.
212  fn get_config_mut_impl(&mut self) -> &mut ConsumerConfig<Self::Input>;
213}