sea_streamer_types/consumer.rs
1use crate::{Message, SeqPos, ShardId, StreamKey, StreamResult, Timestamp};
2use futures::{Future, Stream};
3
4#[derive(Debug, Copy, Clone, PartialEq, Eq)]
5/// Mode of stream consumption.
6pub enum ConsumerMode {
7 /// This is the 'vanilla' stream consumer. It does not auto-commit, and thus only consumes messages from now on.
8 RealTime,
9 /// When the process restarts, it will resume the stream from the previous committed sequence.
10 Resumable,
11 /// You should assign a consumer group manually. The load-balancing mechanism is implementation-specific.
12 LoadBalanced,
13}
14
15impl Default for ConsumerMode {
16 fn default() -> Self {
17 Self::RealTime
18 }
19}
20
21#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
22/// Used to identify a group of consumers.
23pub struct ConsumerGroup {
24 name: String,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
28/// Used to identify a consumer within a group.
29pub struct ConsumerId {
30 id: String,
31}
32
33/// Common options of a Consumer.
34pub trait ConsumerOptions: Default + Clone + Send {
35 type Error: std::error::Error;
36
37 fn new(mode: ConsumerMode) -> Self;
38
39 /// Get currently set ConsumerMode
40 fn mode(&self) -> StreamResult<&ConsumerMode, Self::Error>;
41
42 /// Get currently set consumer group; may return `StreamErr::ConsumerGroupNotSet`.
43 fn consumer_group(&self) -> StreamResult<&ConsumerGroup, Self::Error>;
44
45 /// Set consumer group for this consumer. Note the semantic is implementation-specific.
46 fn set_consumer_group(
47 &mut self,
48 group_id: ConsumerGroup,
49 ) -> StreamResult<&mut Self, Self::Error>;
50}
51
52/// Common interface of consumers, to be implemented by all backends.
53pub trait Consumer: Sized + Send + Sync {
54 type Error: std::error::Error;
55
56 type Message<'a>: Message
57 where
58 Self: 'a;
59 type NextFuture<'a>: Future<Output = StreamResult<Self::Message<'a>, Self::Error>>
60 where
61 Self: 'a;
62 type Stream<'a>: Stream<Item = StreamResult<Self::Message<'a>, Self::Error>>
63 where
64 Self: 'a;
65
66 /// Seek all streams to an arbitrary point in time. It will start consuming from the earliest message
67 /// with a timestamp later than `to`.
68 ///
69 /// If the consumer is not already assigned, shard ZERO will be used.
70 fn seek(&mut self, to: Timestamp)
71 -> impl Future<Output = StreamResult<(), Self::Error>> + Send;
72
73 /// Rewind all streams to a particular sequence number.
74 ///
75 /// If the consumer is not already assigned, shard ZERO will be used.
76 fn rewind(
77 &mut self,
78 offset: SeqPos,
79 ) -> impl Future<Output = StreamResult<(), Self::Error>> + Send;
80
81 /// Assign this consumer to a particular shard. Can be called multiple times to assign
82 /// to multiple shards. Returns error `StreamKeyNotFound` if the stream is not currently subscribed.
83 ///
84 /// It will only take effect on the next [`Consumer::seek`] or [`Consumer::rewind`].
85 fn assign(&mut self, ss: (StreamKey, ShardId)) -> StreamResult<(), Self::Error>;
86
87 /// Unassign a shard. Returns error `StreamKeyNotFound` if the stream is not currently subscribed.
88 /// Returns error `StreamKeyEmpty` if all streams have been unassigned.
89 fn unassign(&mut self, ss: (StreamKey, ShardId)) -> StreamResult<(), Self::Error>;
90
91 /// Poll and receive one message: it awaits until there are new messages.
92 /// This method can be called from multiple threads.
93 fn next(&self) -> Self::NextFuture<'_>;
94
95 /// Returns an async stream. You cannot create multiple streams from the same consumer,
96 /// nor perform any operation while streaming.
97 fn stream<'a, 'b: 'a>(&'b mut self) -> Self::Stream<'a>;
98}
99
100impl ConsumerGroup {
101 pub fn new<S: Into<String>>(name: S) -> Self {
102 Self { name: name.into() }
103 }
104
105 pub fn name(&self) -> &str {
106 &self.name
107 }
108}
109
110impl ConsumerId {
111 pub fn new<S: Into<String>>(id: S) -> Self {
112 Self { id: id.into() }
113 }
114
115 pub fn id(&self) -> &str {
116 &self.id
117 }
118}