streamweave/consumer.rs
1use crate::input::Input;
2use crate::port::PortList;
3use async_trait::async_trait;
4use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
5
6/// Helper trait for providing default port types for Consumers.
7///
8/// This trait provides default implementations of the `InputPorts` associated type.
9/// All Consumers automatically get `InputPorts = (Self::Input,)` unless they
10/// explicitly override it.
11pub trait ConsumerPorts: Consumer
12where
13 Self::Input: std::fmt::Debug + Clone + Send + Sync,
14{
15 /// The default input port tuple type (single port with the consumer's input type).
16 type DefaultInputPorts: PortList;
17}
18
19/// Blanket implementation: all Consumers get default single-port input.
20impl<C> ConsumerPorts for C
21where
22 C: Consumer,
23 C::Input: std::fmt::Debug + Clone + Send + Sync,
24{
25 type DefaultInputPorts = (C::Input,);
26}
27
28/// Configuration for a consumer component.
29///
30/// This struct holds configuration options that control how a consumer
31/// behaves, including error handling strategy and component naming.
32#[derive(Debug, Clone)]
33pub struct ConsumerConfig<T: std::fmt::Debug + Clone + Send + Sync> {
34 /// The error handling strategy to use when processing items.
35 pub error_strategy: ErrorStrategy<T>,
36 /// The name of this consumer component.
37 pub name: String,
38}
39
40impl<T: std::fmt::Debug + Clone + Send + Sync> Default for ConsumerConfig<T> {
41 fn default() -> Self {
42 Self {
43 error_strategy: ErrorStrategy::Stop,
44 name: String::new(),
45 }
46 }
47}
48
49/// Trait for components that consume data streams.
50///
51/// Consumers are the end point of a pipeline. They receive processed items
52/// and typically write them to a destination (file, database, console, etc.)
53/// or perform some final action.
54///
55/// # Example
56///
57/// ```rust
58/// use streamweave::prelude::*;
59///
60/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
61/// let mut consumer = VecConsumer::new();
62/// let stream = futures::stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
63/// consumer.consume(stream).await?;
64/// # Ok(())
65/// # }
66/// ```
67///
68/// # Implementations
69///
70/// Common consumer implementations include:
71/// - `VecConsumer` - Collects items into a vector
72/// - `FileConsumer` - Writes data to files
73/// - `KafkaConsumer` - Produces to Kafka topics
74/// - `ConsoleConsumer` - Prints items to console
75#[async_trait]
76pub trait Consumer: Input
77where
78 Self::Input: std::fmt::Debug + Clone + Send + Sync,
79{
80 /// The input port tuple type for this consumer.
81 ///
82 /// This associated type specifies the port tuple that represents this consumer's
83 /// inputs in the graph API. By default, consumers have a single input port
84 /// containing their input type: `(Self::Input,)`.
85 ///
86 /// For multi-port consumers, override this type to specify a tuple with multiple
87 /// input types, e.g., `(i32, String)` for two inputs.
88 type InputPorts: PortList;
89
90 /// Consumes a stream of items.
91 ///
92 /// This method is called by the pipeline to process the final stream.
93 /// The consumer should handle all items in the stream and perform the
94 /// appropriate action (write to file, send to database, etc.).
95 ///
96 /// # Arguments
97 ///
98 /// * `stream` - The stream of items to consume
99 ///
100 /// # Example
101 ///
102 /// ```rust
103 /// use streamweave::prelude::*;
104 ///
105 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
106 /// let mut consumer = VecConsumer::new();
107 /// let stream = futures::stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
108 /// consumer.consume(stream).await?;
109 /// # Ok(())
110 /// # }
111 /// ```
112 async fn consume(&mut self, stream: Self::InputStream);
113
114 /// Creates a new consumer instance with the given configuration.
115 ///
116 /// # Arguments
117 ///
118 /// * `config` - The configuration to apply to the consumer.
119 #[must_use]
120 fn with_config(&self, config: ConsumerConfig<Self::Input>) -> Self
121 where
122 Self: Sized + Clone,
123 {
124 let mut this = self.clone();
125 this.set_config(config);
126 this
127 }
128
129 /// Sets the configuration for this consumer.
130 ///
131 /// # Arguments
132 ///
133 /// * `config` - The configuration to set.
134 fn set_config(&mut self, config: ConsumerConfig<Self::Input>) {
135 self.set_config_impl(config);
136 }
137
138 /// Returns a reference to the consumer's configuration.
139 fn config(&self) -> &ConsumerConfig<Self::Input> {
140 self.get_config_impl()
141 }
142
143 /// Returns a mutable reference to the consumer's configuration.
144 fn config_mut(&mut self) -> &mut ConsumerConfig<Self::Input> {
145 self.get_config_mut_impl()
146 }
147
148 /// Sets the name for this consumer.
149 ///
150 /// # Arguments
151 ///
152 /// * `name` - The name to assign to this consumer.
153 #[must_use]
154 fn with_name(mut self, name: String) -> Self
155 where
156 Self: Sized,
157 {
158 self.config_mut().name = name.clone();
159 self
160 }
161
162 /// Handles an error according to the consumer's error strategy.
163 ///
164 /// # Arguments
165 ///
166 /// * `error` - The error that occurred.
167 ///
168 /// # Returns
169 ///
170 /// The action to take based on the error strategy.
171 fn handle_error(&self, error: &StreamError<Self::Input>) -> ErrorAction {
172 match &self.config().error_strategy {
173 ErrorStrategy::Stop => ErrorAction::Stop,
174 ErrorStrategy::Skip => ErrorAction::Skip,
175 ErrorStrategy::Retry(n) if error.retries < *n => ErrorAction::Retry,
176 ErrorStrategy::Custom(handler) => handler(error),
177 _ => ErrorAction::Stop,
178 }
179 }
180
181 /// Returns information about this consumer component.
182 fn component_info(&self) -> ComponentInfo {
183 ComponentInfo {
184 name: self.config().name.clone(),
185 type_name: std::any::type_name::<Self>().to_string(),
186 }
187 }
188
189 /// Creates an error context for the given item.
190 ///
191 /// # Arguments
192 ///
193 /// * `item` - The item that caused the error, if any.
194 ///
195 /// # Returns
196 ///
197 /// An error context containing information about when and where the error occurred.
198 fn create_error_context(&self, item: Option<Self::Input>) -> ErrorContext<Self::Input> {
199 ErrorContext {
200 timestamp: chrono::Utc::now(),
201 item,
202 component_name: self.component_info().name,
203 component_type: self.component_info().type_name,
204 }
205 }
206
207 /// Internal implementation for setting configuration.
208 fn set_config_impl(&mut self, config: ConsumerConfig<Self::Input>);
209 /// Internal implementation for getting configuration.
210 fn get_config_impl(&self) -> &ConsumerConfig<Self::Input>;
211 /// Internal implementation for getting mutable configuration.
212 fn get_config_mut_impl(&mut self) -> &mut ConsumerConfig<Self::Input>;
213}