Trait sea_streamer::Streamer
source · pub trait Streamer: Sized {
type Error: Error;
type Producer: Producer<Error = Self::Error>;
type Consumer: Consumer<Error = Self::Error>;
type ConnectOptions: ConnectOptions;
type ConsumerOptions: ConsumerOptions;
type ProducerOptions: ProducerOptions;
// Required methods
fn connect(
streamer: StreamerUri,
options: Self::ConnectOptions
) -> impl Future<Output = Result<Self, StreamErr<Self::Error>>> + Send;
fn disconnect(
self
) -> impl Future<Output = Result<(), StreamErr<Self::Error>>> + Send;
fn create_generic_producer(
&self,
options: Self::ProducerOptions
) -> impl Future<Output = Result<Self::Producer, StreamErr<Self::Error>>> + Send;
fn create_consumer(
&self,
streams: &[StreamKey],
options: Self::ConsumerOptions
) -> impl Future<Output = Result<Self::Consumer, StreamErr<Self::Error>>> + Send;
// Provided method
fn create_producer(
&self,
stream: StreamKey,
options: Self::ProducerOptions
) -> impl Future<Output = Result<Self::Producer, StreamErr<Self::Error>>> + Send { ... }
}
Expand description
Common interface of streamer clients.
Required Associated Types§
type Error: Error
type Producer: Producer<Error = Self::Error>
type Consumer: Consumer<Error = Self::Error>
type ConnectOptions: ConnectOptions
type ConsumerOptions: ConsumerOptions
type ProducerOptions: ProducerOptions
Required Methods§
sourcefn connect(
streamer: StreamerUri,
options: Self::ConnectOptions
) -> impl Future<Output = Result<Self, StreamErr<Self::Error>>> + Send
fn connect( streamer: StreamerUri, options: Self::ConnectOptions ) -> impl Future<Output = Result<Self, StreamErr<Self::Error>>> + Send
Establish a connection to the streaming server.
sourcefn disconnect(
self
) -> impl Future<Output = Result<(), StreamErr<Self::Error>>> + Send
fn disconnect( self ) -> impl Future<Output = Result<(), StreamErr<Self::Error>>> + Send
Flush and disconnect from the streaming server.
sourcefn create_generic_producer(
&self,
options: Self::ProducerOptions
) -> impl Future<Output = Result<Self::Producer, StreamErr<Self::Error>>> + Send
fn create_generic_producer( &self, options: Self::ProducerOptions ) -> impl Future<Output = Result<Self::Producer, StreamErr<Self::Error>>> + Send
Create a producer that can stream to any stream key.
sourcefn create_consumer(
&self,
streams: &[StreamKey],
options: Self::ConsumerOptions
) -> impl Future<Output = Result<Self::Consumer, StreamErr<Self::Error>>> + Send
fn create_consumer( &self, streams: &[StreamKey], options: Self::ConsumerOptions ) -> impl Future<Output = Result<Self::Consumer, StreamErr<Self::Error>>> + Send
Create a consumer subscribing to the specified streams.
Provided Methods§
sourcefn create_producer(
&self,
stream: StreamKey,
options: Self::ProducerOptions
) -> impl Future<Output = Result<Self::Producer, StreamErr<Self::Error>>> + Send
fn create_producer( &self, stream: StreamKey, options: Self::ProducerOptions ) -> impl Future<Output = Result<Self::Producer, StreamErr<Self::Error>>> + Send
Create a producer that streams to the specified stream.
Object Safety§
Implementations on Foreign Types§
source§impl Streamer for KafkaStreamer
impl Streamer for KafkaStreamer
source§async fn disconnect(self) -> Result<(), StreamErr<KafkaError>>
async fn disconnect(self) -> Result<(), StreamErr<KafkaError>>
It will flush all producers
source§async fn create_consumer(
&self,
streams: &[StreamKey],
options: <KafkaStreamer as Streamer>::ConsumerOptions
) -> Result<<KafkaStreamer as Streamer>::Consumer, StreamErr<KafkaError>>
async fn create_consumer( &self, streams: &[StreamKey], options: <KafkaStreamer as Streamer>::ConsumerOptions ) -> Result<<KafkaStreamer as Streamer>::Consumer, StreamErr<KafkaError>>
If ConsumerMode is RealTime, auto commit will be disabled and it always stream from latest. group_id
must not be set.
If ConsumerMode is Resumable, it will use a group id unique to this host:
on a physical machine, it will use the mac address. Inside a docker container, it will use the container id.
So when the process restarts, it will resume from last committed offset. group_id
must not be set.
If ConsumerMode is LoadBalanced, shards will be divided-and-assigned by the broker to consumers sharing the same group_id
.
group_id
must already be set.
If you need to override the HOST ID, you can set the ENV var HOST_ID
.
type Error = KafkaError
type Producer = KafkaProducer
type Consumer = KafkaConsumer
type ConnectOptions = KafkaConnectOptions
type ConsumerOptions = KafkaConsumerOptions
type ProducerOptions = KafkaProducerOptions
async fn connect( uri: StreamerUri, options: <KafkaStreamer as Streamer>::ConnectOptions ) -> Result<KafkaStreamer, StreamErr<KafkaError>>
async fn create_generic_producer( &self, options: <KafkaStreamer as Streamer>::ProducerOptions ) -> Result<<KafkaStreamer as Streamer>::Producer, StreamErr<KafkaError>>
source§impl Streamer for StdioStreamer
impl Streamer for StdioStreamer
source§async fn connect(
_: StreamerUri,
options: <StdioStreamer as Streamer>::ConnectOptions
) -> Result<StdioStreamer, StreamErr<StdioErr>>
async fn connect( _: StreamerUri, options: <StdioStreamer as Streamer>::ConnectOptions ) -> Result<StdioStreamer, StreamErr<StdioErr>>
Nothing will happen until you create a producer/consumer
source§async fn disconnect(self) -> Result<(), StreamErr<StdioErr>>
async fn disconnect(self) -> Result<(), StreamErr<StdioErr>>
Call this method if you want to exit gracefully. This waits asynchronously until all pending messages are sent.
The side effects is global: all existing consumers and producers will become unusable, until you connect again.
source§async fn create_consumer(
&self,
streams: &[StreamKey],
options: <StdioStreamer as Streamer>::ConsumerOptions
) -> Result<<StdioStreamer as Streamer>::Consumer, StreamErr<StdioErr>>
async fn create_consumer( &self, streams: &[StreamKey], options: <StdioStreamer as Streamer>::ConsumerOptions ) -> Result<<StdioStreamer as Streamer>::Consumer, StreamErr<StdioErr>>
A background thread will be spawned to read stdin dedicatedly. It is safe to spawn multiple consumers.