Consumer

Trait Consumer 

Source
pub trait Consumer: Input
where Self::Input: Debug + Clone + Send + Sync,
{ type InputPorts: PortList; // Required methods fn consume<'life0, 'async_trait>( &'life0 mut self, stream: Self::InputStream, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn set_config_impl(&mut self, config: ConsumerConfig<Self::Input>); fn get_config_impl(&self) -> &ConsumerConfig<Self::Input>; fn get_config_mut_impl(&mut self) -> &mut ConsumerConfig<Self::Input>; // Provided methods fn with_config(&self, config: ConsumerConfig<Self::Input>) -> Self where Self: Sized + Clone { ... } fn set_config(&mut self, config: ConsumerConfig<Self::Input>) { ... } fn config(&self) -> &ConsumerConfig<Self::Input> { ... } fn config_mut(&mut self) -> &mut ConsumerConfig<Self::Input> { ... } fn with_name(self, name: String) -> Self where Self: Sized { ... } fn handle_error(&self, error: &StreamError<Self::Input>) -> ErrorAction { ... } fn component_info(&self) -> ComponentInfo { ... } fn create_error_context( &self, item: Option<Self::Input>, ) -> ErrorContext<Self::Input> { ... } }
Expand description

Trait for components that consume data streams.

Consumers are the end point of a pipeline. They receive processed items and typically write them to a destination (file, database, console, etc.) or perform some final action.

§Example

use streamweave::prelude::*;

let mut consumer = VecConsumer::new();
let stream = futures::stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
consumer.consume(stream).await?;

§Implementations

Common consumer implementations include:

  • VecConsumer - Collects items into a vector
  • FileConsumer - Writes data to files
  • KafkaConsumer - Produces to Kafka topics
  • ConsoleConsumer - Prints items to console

Required Associated Types§

Source

type InputPorts: PortList

The input port tuple type for this consumer.

This associated type specifies the port tuple that represents this consumer’s inputs in the graph API. By default, consumers have a single input port containing their input type: (Self::Input,).

For multi-port consumers, override this type to specify a tuple with multiple input types, e.g., (i32, String) for two inputs.

Required Methods§

Source

fn consume<'life0, 'async_trait>( &'life0 mut self, stream: Self::InputStream, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Consumes a stream of items.

This method is called by the pipeline to process the final stream. The consumer should handle all items in the stream and perform the appropriate action (write to file, send to database, etc.).

§Arguments
  • stream - The stream of items to consume
§Example
use streamweave::prelude::*;

let mut consumer = VecConsumer::new();
let stream = futures::stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
consumer.consume(stream).await?;
Source

fn set_config_impl(&mut self, config: ConsumerConfig<Self::Input>)

Internal implementation for setting configuration.

Source

fn get_config_impl(&self) -> &ConsumerConfig<Self::Input>

Internal implementation for getting configuration.

Source

fn get_config_mut_impl(&mut self) -> &mut ConsumerConfig<Self::Input>

Internal implementation for getting mutable configuration.

Provided Methods§

Source

fn with_config(&self, config: ConsumerConfig<Self::Input>) -> Self
where Self: Sized + Clone,

Creates a new consumer instance with the given configuration.

§Arguments
  • config - The configuration to apply to the consumer.
Source

fn set_config(&mut self, config: ConsumerConfig<Self::Input>)

Sets the configuration for this consumer.

§Arguments
  • config - The configuration to set.
Source

fn config(&self) -> &ConsumerConfig<Self::Input>

Returns a reference to the consumer’s configuration.

Source

fn config_mut(&mut self) -> &mut ConsumerConfig<Self::Input>

Returns a mutable reference to the consumer’s configuration.

Source

fn with_name(self, name: String) -> Self
where Self: Sized,

Sets the name for this consumer.

§Arguments
  • name - The name to assign to this consumer.
Source

fn handle_error(&self, error: &StreamError<Self::Input>) -> ErrorAction

Handles an error according to the consumer’s error strategy.

§Arguments
  • error - The error that occurred.
§Returns

The action to take based on the error strategy.

Source

fn component_info(&self) -> ComponentInfo

Returns information about this consumer component.

Source

fn create_error_context( &self, item: Option<Self::Input>, ) -> ErrorContext<Self::Input>

Creates an error context for the given item.

§Arguments
  • item - The item that caused the error, if any.
§Returns

An error context containing information about when and where the error occurred.

Implementors§