sea_streamer_types/
consumer.rs

1use crate::{Message, SeqPos, ShardId, StreamKey, StreamResult, Timestamp};
2use futures::{Future, Stream};
3
4#[derive(Debug, Copy, Clone, PartialEq, Eq)]
5/// Mode of stream consumption.
6pub enum ConsumerMode {
7    /// This is the 'vanilla' stream consumer. It does not auto-commit, and thus only consumes messages from now on.
8    RealTime,
9    /// When the process restarts, it will resume the stream from the previous committed sequence.
10    Resumable,
11    /// You should assign a consumer group manually. The load-balancing mechanism is implementation-specific.
12    LoadBalanced,
13}
14
15impl Default for ConsumerMode {
16    fn default() -> Self {
17        Self::RealTime
18    }
19}
20
21#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
22/// Used to identify a group of consumers.
23pub struct ConsumerGroup {
24    name: String,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
28/// Used to identify a consumer within a group.
29pub struct ConsumerId {
30    id: String,
31}
32
33/// Common options of a Consumer.
34pub trait ConsumerOptions: Default + Clone + Send {
35    type Error: std::error::Error;
36
37    fn new(mode: ConsumerMode) -> Self;
38
39    /// Get currently set ConsumerMode
40    fn mode(&self) -> StreamResult<&ConsumerMode, Self::Error>;
41
42    /// Get currently set consumer group; may return `StreamErr::ConsumerGroupNotSet`.
43    fn consumer_group(&self) -> StreamResult<&ConsumerGroup, Self::Error>;
44
45    /// Set consumer group for this consumer. Note the semantic is implementation-specific.
46    fn set_consumer_group(
47        &mut self,
48        group_id: ConsumerGroup,
49    ) -> StreamResult<&mut Self, Self::Error>;
50}
51
52/// Common interface of consumers, to be implemented by all backends.
53pub trait Consumer: Sized + Send + Sync {
54    type Error: std::error::Error;
55
56    type Message<'a>: Message
57    where
58        Self: 'a;
59    type NextFuture<'a>: Future<Output = StreamResult<Self::Message<'a>, Self::Error>>
60    where
61        Self: 'a;
62    type Stream<'a>: Stream<Item = StreamResult<Self::Message<'a>, Self::Error>>
63    where
64        Self: 'a;
65
66    /// Seek all streams to an arbitrary point in time. It will start consuming from the earliest message
67    /// with a timestamp later than `to`.
68    ///
69    /// If the consumer is not already assigned, shard ZERO will be used.
70    fn seek(&mut self, to: Timestamp)
71        -> impl Future<Output = StreamResult<(), Self::Error>> + Send;
72
73    /// Rewind all streams to a particular sequence number.
74    ///
75    /// If the consumer is not already assigned, shard ZERO will be used.
76    fn rewind(
77        &mut self,
78        offset: SeqPos,
79    ) -> impl Future<Output = StreamResult<(), Self::Error>> + Send;
80
81    /// Assign this consumer to a particular shard. Can be called multiple times to assign
82    /// to multiple shards. Returns error `StreamKeyNotFound` if the stream is not currently subscribed.
83    ///
84    /// It will only take effect on the next [`Consumer::seek`] or [`Consumer::rewind`].
85    fn assign(&mut self, ss: (StreamKey, ShardId)) -> StreamResult<(), Self::Error>;
86
87    /// Unassign a shard. Returns error `StreamKeyNotFound` if the stream is not currently subscribed.
88    /// Returns error `StreamKeyEmpty` if all streams have been unassigned.
89    fn unassign(&mut self, ss: (StreamKey, ShardId)) -> StreamResult<(), Self::Error>;
90
91    /// Poll and receive one message: it awaits until there are new messages.
92    /// This method can be called from multiple threads.
93    fn next(&self) -> Self::NextFuture<'_>;
94
95    /// Returns an async stream. You cannot create multiple streams from the same consumer,
96    /// nor perform any operation while streaming.
97    fn stream<'a, 'b: 'a>(&'b mut self) -> Self::Stream<'a>;
98}
99
100impl ConsumerGroup {
101    pub fn new<S: Into<String>>(name: S) -> Self {
102        Self { name: name.into() }
103    }
104
105    pub fn name(&self) -> &str {
106        &self.name
107    }
108}
109
110impl ConsumerId {
111    pub fn new<S: Into<String>>(id: S) -> Self {
112        Self { id: id.into() }
113    }
114
115    pub fn id(&self) -> &str {
116        &self.id
117    }
118}