sea_streamer_file/consumer/
mod.rs1mod future;
2mod group;
3
4pub use future::StreamFuture as FileMessageStream;
5
6use flume::{r#async::RecvFut, Receiver, Sender, TrySendError};
7use sea_streamer_types::{
8    export::futures::{Future, FutureExt},
9    Consumer as ConsumerTrait, SeqPos, ShardId, SharedMessage, StreamErr, StreamKey, Timestamp,
10};
11
12use crate::{is_pulse, FileErr, FileId, FileResult, SeekTarget};
13pub(crate) use group::new_consumer;
14use group::Sid;
15
16pub use self::group::query_streamer;
17use self::group::{preseek_consumer, remove_consumer};
18
19pub struct FileConsumer {
20    file_id: FileId,
21    sid: Sid,
22    receiver: Receiver<Result<SharedMessage, FileErr>>,
23    ctrl: Sender<CtrlMsg>,
24}
25
26impl std::fmt::Debug for FileConsumer {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        write!(f, "FileConsumer {{ sid: {} }}", self.sid)
29    }
30}
31
32enum CtrlMsg {
33    Read,
34    Seek(SeekTarget),
35}
36
37pub enum NextFuture<'a> {
38    Future(RecvFut<'a, Result<SharedMessage, FileErr>>),
39    Error(Option<StreamErr<FileErr>>),
40}
41
42pub type FileMessage = SharedMessage;
43
44impl FileConsumer {
45    fn new(
46        file_id: FileId,
47        sid: Sid,
48        receiver: Receiver<Result<SharedMessage, FileErr>>,
49        ctrl: Sender<CtrlMsg>,
50    ) -> Self {
51        Self {
52            file_id,
53            sid,
54            receiver,
55            ctrl,
56        }
57    }
58}
59
60impl Drop for FileConsumer {
61    fn drop(&mut self) {
62        remove_consumer(self.sid);
63    }
64}
65
66impl ConsumerTrait for FileConsumer {
67    type Error = FileErr;
68    type Message<'a> = SharedMessage;
69    type NextFuture<'a> = NextFuture<'a>;
70    type Stream<'a> = FileMessageStream<'a>;
71
72    async fn seek(&mut self, ts: Timestamp) -> FileResult<()> {
76        self.seek_to(SeekTarget::Timestamp(ts))
77            .await
78            .map_err(StreamErr::Backend)
79    }
80
81    async fn rewind(&mut self, to: SeqPos) -> FileResult<()> {
85        self.seek_to(match to {
86            SeqPos::Beginning => SeekTarget::Beginning,
87            SeqPos::End => SeekTarget::End,
88            SeqPos::At(at) => SeekTarget::SeqNo(at),
89        })
90        .await
91        .map_err(StreamErr::Backend)
92    }
93
94    fn assign(&mut self, _: (StreamKey, ShardId)) -> FileResult<()> {
96        Err(StreamErr::Unsupported(
97            "Cannot manually assign shards".to_owned(),
98        ))
99    }
100
101    fn unassign(&mut self, _: (StreamKey, ShardId)) -> FileResult<()> {
103        Err(StreamErr::Unsupported(
104            "Cannot manually assign shards".to_owned(),
105        ))
106    }
107
108    fn next(&self) -> Self::NextFuture<'_> {
111        if let Err(TrySendError::Disconnected(_)) = self.ctrl.try_send(CtrlMsg::Read) {
112            NextFuture::Error(Some(StreamErr::Backend(FileErr::StreamEnded)))
115        } else {
116            NextFuture::Future(self.receiver.recv_async())
117        }
118    }
119
120    fn stream<'a, 'b: 'a>(&'b mut self) -> Self::Stream<'a> {
121        FileMessageStream::new(self)
122    }
123}
124
125impl FileConsumer {
126    pub fn file_id(&self) -> &FileId {
128        &self.file_id
129    }
130
131    pub async fn seek_to(&mut self, target: SeekTarget) -> Result<(), FileErr> {
135        preseek_consumer(&self.file_id, self.sid).await?;
137        self.ctrl
139            .send_async(CtrlMsg::Seek(target))
140            .await
141            .map_err(|_| FileErr::TaskDead("FileConsumer seek"))?;
142        loop {
144            match self.receiver.recv_async().await {
145                Ok(Ok(msg)) => {
146                    if is_pulse(&msg) {
147                        break;
148                    }
149                }
150                Ok(Err(e)) => return Err(e),
151                Err(_) => return Err(FileErr::TaskDead("FileConsumer seek")),
152            }
153        }
154        Ok(())
155    }
156}
157
158impl<'a> Future for NextFuture<'a> {
159    type Output = FileResult<SharedMessage>;
160
161    fn poll(
162        self: std::pin::Pin<&mut Self>,
163        cx: &mut std::task::Context<'_>,
164    ) -> std::task::Poll<Self::Output> {
165        use std::task::Poll::{Pending, Ready};
166        match std::pin::Pin::into_inner(self) {
167            Self::Error(e) => Ready(Err(e.take().unwrap())),
168            Self::Future(future) => match future.poll_unpin(cx) {
169                Ready(res) => match res {
170                    Ok(Ok(m)) => Ready(Ok(m)),
171                    Ok(Err(e)) => Ready(Err(StreamErr::Backend(e))),
172                    Err(_) => Ready(Err(StreamErr::Backend(FileErr::StreamEnded))),
173                },
174                Pending => Pending,
175            },
176        }
177    }
178}
179
180#[cfg(test)]
181mod test {
182    use super::*;
183
184    fn only_send_sync<C: ConsumerTrait + Send + Sync>(_: C) {}
185
186    #[test]
187    fn consumer_is_send_sync() {
188        #[allow(dead_code)]
189        fn ensure_send_sync(c: FileConsumer) {
190            only_send_sync(c);
191        }
192    }
193}