pub trait Streamer: Sized {
    type Error: Error;
    type Producer: Producer<Error = Self::Error>;
    type Consumer: Consumer<Error = Self::Error>;
    type ConnectOptions: ConnectOptions;
    type ConsumerOptions: ConsumerOptions;
    type ProducerOptions: ProducerOptions;

    // Required methods
    fn connect<'async_trait>(
        streamer: StreamerUri,
        options: Self::ConnectOptions
    ) -> Pin<Box<dyn Future<Output = Result<Self, StreamErr<Self::Error>>> + Send + 'async_trait, Global>>
       where Self: 'async_trait;
    fn disconnect<'async_trait>(
        self
    ) -> Pin<Box<dyn Future<Output = Result<(), StreamErr<Self::Error>>> + Send + 'async_trait, Global>>
       where Self: 'async_trait;
    fn create_generic_producer<'life0, 'async_trait>(
        &'life0 self,
        options: Self::ProducerOptions
    ) -> Pin<Box<dyn Future<Output = Result<Self::Producer, StreamErr<Self::Error>>> + Send + 'async_trait, Global>>
       where 'life0: 'async_trait,
             Self: 'async_trait;
    fn create_consumer<'life0, 'life1, 'async_trait>(
        &'life0 self,
        streams: &'life1 [StreamKey],
        options: Self::ConsumerOptions
    ) -> Pin<Box<dyn Future<Output = Result<Self::Consumer, StreamErr<Self::Error>>> + Send + 'async_trait, Global>>
       where 'life0: 'async_trait,
             'life1: 'async_trait,
             Self: 'async_trait;

    // Provided method
    fn create_producer<'life0, 'async_trait>(
        &'life0 self,
        stream: StreamKey,
        options: Self::ProducerOptions
    ) -> Pin<Box<dyn Future<Output = Result<Self::Producer, StreamErr<Self::Error>>> + Send + 'async_trait, Global>>
       where 'life0: 'async_trait,
             Self: Sync + 'async_trait { ... }
}
Expand description

Common interface of streamer clients.

Required Associated Types§

Required Methods§

source

fn connect<'async_trait>( streamer: StreamerUri, options: Self::ConnectOptions ) -> Pin<Box<dyn Future<Output = Result<Self, StreamErr<Self::Error>>> + Send + 'async_trait, Global>>where Self: 'async_trait,

Establish a connection to the streaming server.

source

fn disconnect<'async_trait>( self ) -> Pin<Box<dyn Future<Output = Result<(), StreamErr<Self::Error>>> + Send + 'async_trait, Global>>where Self: 'async_trait,

Flush and disconnect from the streaming server.

source

fn create_generic_producer<'life0, 'async_trait>( &'life0 self, options: Self::ProducerOptions ) -> Pin<Box<dyn Future<Output = Result<Self::Producer, StreamErr<Self::Error>>> + Send + 'async_trait, Global>>where 'life0: 'async_trait, Self: 'async_trait,

Create a producer that can stream to any stream key.

source

fn create_consumer<'life0, 'life1, 'async_trait>( &'life0 self, streams: &'life1 [StreamKey], options: Self::ConsumerOptions ) -> Pin<Box<dyn Future<Output = Result<Self::Consumer, StreamErr<Self::Error>>> + Send + 'async_trait, Global>>where 'life0: 'async_trait, 'life1: 'async_trait, Self: 'async_trait,

Create a consumer subscribing to the specified streams.

Provided Methods§

source

fn create_producer<'life0, 'async_trait>( &'life0 self, stream: StreamKey, options: Self::ProducerOptions ) -> Pin<Box<dyn Future<Output = Result<Self::Producer, StreamErr<Self::Error>>> + Send + 'async_trait, Global>>where 'life0: 'async_trait, Self: Sync + 'async_trait,

Create a producer that streams to the specified stream.

Implementations on Foreign Types§

source§

impl Streamer for KafkaStreamer

source§

fn disconnect<'async_trait>( self ) -> Pin<Box<dyn Future<Output = Result<(), StreamErr<KafkaError>>> + Send + 'async_trait, Global>>where KafkaStreamer: 'async_trait,

It will flush all producers

source§

fn create_consumer<'life0, 'life1, 'async_trait>( &'life0 self, streams: &'life1 [StreamKey], options: <KafkaStreamer as Streamer>::ConsumerOptions ) -> Pin<Box<dyn Future<Output = Result<<KafkaStreamer as Streamer>::Consumer, StreamErr<KafkaError>>> + Send + 'async_trait, Global>>where 'life0: 'async_trait, 'life1: 'async_trait, KafkaStreamer: 'async_trait,

If ConsumerMode is RealTime, auto commit will be disabled and it always stream from latest. group_id must not be set.

If ConsumerMode is Resumable, it will use a group id unique to this host: on a physical machine, it will use the mac address. Inside a docker container, it will use the container id. So when the process restarts, it will resume from last committed offset. group_id must not be set.

If ConsumerMode is LoadBalanced, shards will be divided-and-assigned by the broker to consumers sharing the same group_id. group_id must already be set.

If you need to override the HOST ID, you can set the ENV var HOST_ID.

§

type Error = KafkaError

§

type Producer = KafkaProducer

§

type Consumer = KafkaConsumer

§

type ConnectOptions = KafkaConnectOptions

§

type ConsumerOptions = KafkaConsumerOptions

§

type ProducerOptions = KafkaProducerOptions

source§

fn connect<'async_trait>( uri: StreamerUri, options: <KafkaStreamer as Streamer>::ConnectOptions ) -> Pin<Box<dyn Future<Output = Result<KafkaStreamer, StreamErr<KafkaError>>> + Send + 'async_trait, Global>>where KafkaStreamer: 'async_trait,

source§

fn create_generic_producer<'life0, 'async_trait>( &'life0 self, options: <KafkaStreamer as Streamer>::ProducerOptions ) -> Pin<Box<dyn Future<Output = Result<<KafkaStreamer as Streamer>::Producer, StreamErr<KafkaError>>> + Send + 'async_trait, Global>>where 'life0: 'async_trait, KafkaStreamer: 'async_trait,

source§

impl Streamer for StdioStreamer

source§

fn connect<'async_trait>( __arg0: StreamerUri, options: <StdioStreamer as Streamer>::ConnectOptions ) -> Pin<Box<dyn Future<Output = Result<StdioStreamer, StreamErr<StdioErr>>> + Send + 'async_trait, Global>>where StdioStreamer: 'async_trait,

Nothing will happen until you create a producer/consumer

source§

fn disconnect<'async_trait>( self ) -> Pin<Box<dyn Future<Output = Result<(), StreamErr<StdioErr>>> + Send + 'async_trait, Global>>where StdioStreamer: 'async_trait,

Call this method if you want to exit gracefully. This waits asynchronously until all pending messages are sent.

The side effects is global: all existing consumers and producers will become unusable, until you connect again.

source§

fn create_consumer<'life0, 'life1, 'async_trait>( &'life0 self, streams: &'life1 [StreamKey], options: <StdioStreamer as Streamer>::ConsumerOptions ) -> Pin<Box<dyn Future<Output = Result<<StdioStreamer as Streamer>::Consumer, StreamErr<StdioErr>>> + Send + 'async_trait, Global>>where 'life0: 'async_trait, 'life1: 'async_trait, StdioStreamer: 'async_trait,

A background thread will be spawned to read stdin dedicatedly. It is safe to spawn multiple consumers.

§

type Error = StdioErr

§

type Producer = StdioProducer

§

type Consumer = StdioConsumer

§

type ConnectOptions = StdioConnectOptions

§

type ConsumerOptions = StdioConsumerOptions

§

type ProducerOptions = StdioProducerOptions

source§

fn create_generic_producer<'life0, 'async_trait>( &'life0 self, __arg1: <StdioStreamer as Streamer>::ProducerOptions ) -> Pin<Box<dyn Future<Output = Result<<StdioStreamer as Streamer>::Producer, StreamErr<StdioErr>>> + Send + 'async_trait, Global>>where 'life0: 'async_trait, StdioStreamer: 'async_trait,

Implementors§