1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
use crate::{Message, SeqPos, ShardId, StreamKey, StreamResult, Timestamp};
use async_trait::async_trait;
use futures::{Future, Stream};
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
/// Mode of stream consumption.
pub enum ConsumerMode {
/// This is the 'vanilla' stream consumer. It does not auto-commit, and thus only consumes messages from now on.
RealTime,
/// When the process restarts, it will resume the stream from the previous committed sequence.
Resumable,
/// You should assign a consumer group manually. The load-balancing mechanism is implementation-specific.
LoadBalanced,
}
impl Default for ConsumerMode {
fn default() -> Self {
Self::RealTime
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
/// Used to identify a group of consumers.
pub struct ConsumerGroup {
name: String,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
/// Used to identify a consumer within a group.
pub struct ConsumerId {
id: String,
}
/// Common options of a Consumer.
pub trait ConsumerOptions: Default + Clone + Send {
type Error: std::error::Error;
fn new(mode: ConsumerMode) -> Self;
/// Get currently set ConsumerMode
fn mode(&self) -> StreamResult<&ConsumerMode, Self::Error>;
/// Get currently set consumer group; may return `StreamErr::ConsumerGroupNotSet`.
fn consumer_group(&self) -> StreamResult<&ConsumerGroup, Self::Error>;
/// Set consumer group for this consumer. Note the semantic is implementation-specific.
fn set_consumer_group(
&mut self,
group_id: ConsumerGroup,
) -> StreamResult<&mut Self, Self::Error>;
}
#[async_trait]
/// Common interface of consumers, to be implemented by all backends.
pub trait Consumer: Sized + Send + Sync {
type Error: std::error::Error;
type Message<'a>: Message
where
Self: 'a;
type NextFuture<'a>: Future<Output = StreamResult<Self::Message<'a>, Self::Error>>
where
Self: 'a;
type Stream<'a>: Stream<Item = StreamResult<Self::Message<'a>, Self::Error>>
where
Self: 'a;
/// Seek all streams to an arbitrary point in time. It will start consuming from the earliest message
/// with a timestamp later than `to`.
///
/// If the consumer is not already assigned, shard ZERO will be used.
async fn seek(&mut self, to: Timestamp) -> StreamResult<(), Self::Error>;
/// Rewind all streams to a particular sequence number.
///
/// If the consumer is not already assigned, shard ZERO will be used.
async fn rewind(&mut self, offset: SeqPos) -> StreamResult<(), Self::Error>;
/// Assign this consumer to a particular shard. Can be called multiple times to assign
/// to multiple shards. Returns error `StreamKeyNotFound` if the stream is not currently subscribed.
///
/// It will only take effect on the next [`Consumer::seek`] or [`Consumer::rewind`].
fn assign(&mut self, ss: (StreamKey, ShardId)) -> StreamResult<(), Self::Error>;
/// Unassign a shard. Returns error `StreamKeyNotFound` if the stream is not currently subscribed.
/// Returns error `StreamKeyEmpty` if all streams have been unassigned.
fn unassign(&mut self, ss: (StreamKey, ShardId)) -> StreamResult<(), Self::Error>;
/// Poll and receive one message: it awaits until there are new messages.
/// This method can be called from multiple threads.
fn next(&self) -> Self::NextFuture<'_>;
/// Returns an async stream. You cannot create multiple streams from the same consumer,
/// nor perform any operation while streaming.
fn stream<'a, 'b: 'a>(&'b mut self) -> Self::Stream<'a>;
}
impl ConsumerGroup {
pub fn new<S: Into<String>>(name: S) -> Self {
Self { name: name.into() }
}
pub fn name(&self) -> &str {
&self.name
}
}
impl ConsumerId {
pub fn new<S: Into<String>>(id: S) -> Self {
Self { id: id.into() }
}
pub fn id(&self) -> &str {
&self.id
}
}