pub trait Consumer: Input{
type InputPorts: PortList;
// Required methods
fn consume<'life0, 'async_trait>(
&'life0 mut self,
stream: Self::InputStream,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn set_config_impl(&mut self, config: ConsumerConfig<Self::Input>);
fn get_config_impl(&self) -> &ConsumerConfig<Self::Input>;
fn get_config_mut_impl(&mut self) -> &mut ConsumerConfig<Self::Input>;
// Provided methods
fn with_config(&self, config: ConsumerConfig<Self::Input>) -> Self
where Self: Sized + Clone { ... }
fn set_config(&mut self, config: ConsumerConfig<Self::Input>) { ... }
fn config(&self) -> &ConsumerConfig<Self::Input> { ... }
fn config_mut(&mut self) -> &mut ConsumerConfig<Self::Input> { ... }
fn with_name(self, name: String) -> Self
where Self: Sized { ... }
fn handle_error(&self, error: &StreamError<Self::Input>) -> ErrorAction { ... }
fn component_info(&self) -> ComponentInfo { ... }
fn create_error_context(
&self,
item: Option<Self::Input>,
) -> ErrorContext<Self::Input> { ... }
}Expand description
Trait for components that consume data streams.
Consumers are the end point of a pipeline. They receive processed items and typically write them to a destination (file, database, console, etc.) or perform some final action.
§Example
use streamweave::prelude::*;
let mut consumer = VecConsumer::new();
let stream = futures::stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
consumer.consume(stream).await?;§Implementations
Common consumer implementations include:
VecConsumer- Collects items into a vectorFileConsumer- Writes data to filesKafkaConsumer- Produces to Kafka topicsConsoleConsumer- Prints items to console
Required Associated Types§
Sourcetype InputPorts: PortList
type InputPorts: PortList
The input port tuple type for this consumer.
This associated type specifies the port tuple that represents this consumer’s
inputs in the graph API. By default, consumers have a single input port
containing their input type: (Self::Input,).
For multi-port consumers, override this type to specify a tuple with multiple
input types, e.g., (i32, String) for two inputs.
Required Methods§
Sourcefn consume<'life0, 'async_trait>(
&'life0 mut self,
stream: Self::InputStream,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn consume<'life0, 'async_trait>(
&'life0 mut self,
stream: Self::InputStream,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Consumes a stream of items.
This method is called by the pipeline to process the final stream. The consumer should handle all items in the stream and perform the appropriate action (write to file, send to database, etc.).
§Arguments
stream- The stream of items to consume
§Example
use streamweave::prelude::*;
let mut consumer = VecConsumer::new();
let stream = futures::stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
consumer.consume(stream).await?;Sourcefn set_config_impl(&mut self, config: ConsumerConfig<Self::Input>)
fn set_config_impl(&mut self, config: ConsumerConfig<Self::Input>)
Internal implementation for setting configuration.
Sourcefn get_config_impl(&self) -> &ConsumerConfig<Self::Input>
fn get_config_impl(&self) -> &ConsumerConfig<Self::Input>
Internal implementation for getting configuration.
Sourcefn get_config_mut_impl(&mut self) -> &mut ConsumerConfig<Self::Input>
fn get_config_mut_impl(&mut self) -> &mut ConsumerConfig<Self::Input>
Internal implementation for getting mutable configuration.
Provided Methods§
Sourcefn with_config(&self, config: ConsumerConfig<Self::Input>) -> Self
fn with_config(&self, config: ConsumerConfig<Self::Input>) -> Self
Creates a new consumer instance with the given configuration.
§Arguments
config- The configuration to apply to the consumer.
Sourcefn set_config(&mut self, config: ConsumerConfig<Self::Input>)
fn set_config(&mut self, config: ConsumerConfig<Self::Input>)
Sourcefn config(&self) -> &ConsumerConfig<Self::Input>
fn config(&self) -> &ConsumerConfig<Self::Input>
Returns a reference to the consumer’s configuration.
Sourcefn config_mut(&mut self) -> &mut ConsumerConfig<Self::Input>
fn config_mut(&mut self) -> &mut ConsumerConfig<Self::Input>
Returns a mutable reference to the consumer’s configuration.
Sourcefn handle_error(&self, error: &StreamError<Self::Input>) -> ErrorAction
fn handle_error(&self, error: &StreamError<Self::Input>) -> ErrorAction
Sourcefn component_info(&self) -> ComponentInfo
fn component_info(&self) -> ComponentInfo
Returns information about this consumer component.