1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use crate::{Message, SeqPos, ShardId, StreamKey, StreamResult, Timestamp};
use async_trait::async_trait;
use futures::{Future, Stream};

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
/// Mode of stream consumption.
pub enum ConsumerMode {
    /// This is the 'vanilla' stream consumer. It does not auto-commit, and thus only consumes messages from now on.
    RealTime,
    /// When the process restarts, it will resume the stream from the previous committed sequence.
    Resumable,
    /// You should assign a consumer group manually. The load-balancing mechanism is implementation-specific.
    LoadBalanced,
}

impl Default for ConsumerMode {
    fn default() -> Self {
        Self::RealTime
    }
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
/// Used to identify a group of consumers.
pub struct ConsumerGroup {
    name: String,
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
/// Used to identify a consumer within a group.
pub struct ConsumerId {
    id: String,
}

/// Common options of a Consumer.
pub trait ConsumerOptions: Default + Clone + Send {
    type Error: std::error::Error;

    fn new(mode: ConsumerMode) -> Self;

    /// Get currently set ConsumerMode
    fn mode(&self) -> StreamResult<&ConsumerMode, Self::Error>;

    /// Get currently set consumer group; may return `StreamErr::ConsumerGroupNotSet`.
    fn consumer_group(&self) -> StreamResult<&ConsumerGroup, Self::Error>;

    /// Set consumer group for this consumer. Note the semantic is implementation-specific.
    fn set_consumer_group(
        &mut self,
        group_id: ConsumerGroup,
    ) -> StreamResult<&mut Self, Self::Error>;
}

#[async_trait]
/// Common interface of consumers, to be implemented by all backends.
pub trait Consumer: Sized + Send + Sync {
    type Error: std::error::Error;

    type Message<'a>: Message
    where
        Self: 'a;
    type NextFuture<'a>: Future<Output = StreamResult<Self::Message<'a>, Self::Error>>
    where
        Self: 'a;
    type Stream<'a>: Stream<Item = StreamResult<Self::Message<'a>, Self::Error>>
    where
        Self: 'a;

    /// Seek all streams to an arbitrary point in time. It will start consuming from the earliest message
    /// with a timestamp later than `to`.
    ///
    /// If the consumer is not already assigned, shard ZERO will be used.
    async fn seek(&mut self, to: Timestamp) -> StreamResult<(), Self::Error>;

    /// Rewind all streams to a particular sequence number.
    ///
    /// If the consumer is not already assigned, shard ZERO will be used.
    async fn rewind(&mut self, offset: SeqPos) -> StreamResult<(), Self::Error>;

    /// Assign this consumer to a particular shard. Can be called multiple times to assign
    /// to multiple shards. Returns error `StreamKeyNotFound` if the stream is not currently subscribed.
    ///
    /// It will only take effect on the next [`Consumer::seek`] or [`Consumer::rewind`].
    fn assign(&mut self, ss: (StreamKey, ShardId)) -> StreamResult<(), Self::Error>;

    /// Unassign a shard. Returns error `StreamKeyNotFound` if the stream is not currently subscribed.
    /// Returns error `StreamKeyEmpty` if all streams have been unassigned.
    fn unassign(&mut self, ss: (StreamKey, ShardId)) -> StreamResult<(), Self::Error>;

    /// Poll and receive one message: it awaits until there are new messages.
    /// This method can be called from multiple threads.
    fn next(&self) -> Self::NextFuture<'_>;

    /// Returns an async stream. You cannot create multiple streams from the same consumer,
    /// nor perform any operation while streaming.
    fn stream<'a, 'b: 'a>(&'b mut self) -> Self::Stream<'a>;
}

impl ConsumerGroup {
    pub fn new<S: Into<String>>(name: S) -> Self {
        Self { name: name.into() }
    }

    pub fn name(&self) -> &str {
        &self.name
    }
}

impl ConsumerId {
    pub fn new<S: Into<String>>(id: S) -> Self {
        Self { id: id.into() }
    }

    pub fn id(&self) -> &str {
        &self.id
    }
}