sea_streamer_redis/consumer/
future.rs

1use super::RedisConsumer;
2use crate::{consumer::cluster::CtrlMsg, RedisErr, RedisResult};
3use flume::r#async::RecvFut;
4use sea_streamer_types::{
5    export::futures::{FutureExt, Stream},
6    Consumer, SharedMessage, StreamErr,
7};
8use std::{fmt::Debug, future::Future};
9
10pub struct NextFuture<'a> {
11    pub(super) con: &'a RedisConsumer,
12    pub(super) fut: RecvFut<'a, RedisResult<SharedMessage>>,
13    pub(super) read: bool,
14}
15
16impl<'a> Debug for NextFuture<'a> {
17    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18        f.debug_struct("NextFuture").finish()
19    }
20}
21
22impl<'a> Future for NextFuture<'a> {
23    type Output = RedisResult<SharedMessage>;
24
25    fn poll(
26        mut self: std::pin::Pin<&mut Self>,
27        cx: &mut std::task::Context<'_>,
28    ) -> std::task::Poll<Self::Output> {
29        use std::task::Poll::{Pending, Ready};
30        if !self.read && !self.con.config.pre_fetch {
31            self.read = true;
32            self.con.handle.try_send(CtrlMsg::Read).ok();
33        }
34        match self.fut.poll_unpin(cx) {
35            Ready(res) => match res {
36                Ok(Ok(msg)) => {
37                    if self.con.config.auto_ack && self.con.auto_ack(msg.header()).is_err() {
38                        return Ready(Err(StreamErr::Backend(RedisErr::ConsumerDied)));
39                    }
40                    self.read = false;
41                    Ready(Ok(msg))
42                }
43                Ok(Err(err)) => Ready(Err(err)),
44                Err(_) => Ready(Err(StreamErr::Backend(RedisErr::ConsumerDied))),
45            },
46            Pending => Pending,
47        }
48    }
49}
50
51impl<'a> Drop for NextFuture<'a> {
52    fn drop(&mut self) {
53        if self.read {
54            self.con.handle.try_send(CtrlMsg::Unread).ok();
55        }
56    }
57}
58
59pub struct StreamFuture<'a> {
60    con: &'a RedisConsumer,
61    fut: NextFuture<'a>,
62}
63
64impl<'a> Debug for StreamFuture<'a> {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        f.debug_struct("StreamFuture").finish()
67    }
68}
69
70impl<'a> StreamFuture<'a> {
71    pub fn new(con: &'a RedisConsumer) -> Self {
72        let fut = con.next();
73        Self { con, fut }
74    }
75}
76
77impl<'a> Stream for StreamFuture<'a> {
78    type Item = RedisResult<SharedMessage>;
79
80    fn poll_next(
81        mut self: std::pin::Pin<&mut Self>,
82        cx: &mut std::task::Context<'_>,
83    ) -> std::task::Poll<Option<Self::Item>> {
84        use std::task::Poll::{Pending, Ready};
85        match std::pin::Pin::new(&mut self.fut).poll(cx) {
86            Ready(res) => {
87                self.fut = self.con.next();
88                Ready(Some(res))
89            }
90            Pending => Pending,
91        }
92    }
93}