sea_streamer_redis/consumer/
future.rs1use super::RedisConsumer;
2use crate::{consumer::cluster::CtrlMsg, RedisErr, RedisResult};
3use flume::r#async::RecvFut;
4use sea_streamer_types::{
5 export::futures::{FutureExt, Stream},
6 Consumer, SharedMessage, StreamErr,
7};
8use std::{fmt::Debug, future::Future};
9
10pub struct NextFuture<'a> {
11 pub(super) con: &'a RedisConsumer,
12 pub(super) fut: RecvFut<'a, RedisResult<SharedMessage>>,
13 pub(super) read: bool,
14}
15
16impl<'a> Debug for NextFuture<'a> {
17 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18 f.debug_struct("NextFuture").finish()
19 }
20}
21
22impl<'a> Future for NextFuture<'a> {
23 type Output = RedisResult<SharedMessage>;
24
25 fn poll(
26 mut self: std::pin::Pin<&mut Self>,
27 cx: &mut std::task::Context<'_>,
28 ) -> std::task::Poll<Self::Output> {
29 use std::task::Poll::{Pending, Ready};
30 if !self.read && !self.con.config.pre_fetch {
31 self.read = true;
32 self.con.handle.try_send(CtrlMsg::Read).ok();
33 }
34 match self.fut.poll_unpin(cx) {
35 Ready(res) => match res {
36 Ok(Ok(msg)) => {
37 if self.con.config.auto_ack && self.con.auto_ack(msg.header()).is_err() {
38 return Ready(Err(StreamErr::Backend(RedisErr::ConsumerDied)));
39 }
40 self.read = false;
41 Ready(Ok(msg))
42 }
43 Ok(Err(err)) => Ready(Err(err)),
44 Err(_) => Ready(Err(StreamErr::Backend(RedisErr::ConsumerDied))),
45 },
46 Pending => Pending,
47 }
48 }
49}
50
51impl<'a> Drop for NextFuture<'a> {
52 fn drop(&mut self) {
53 if self.read {
54 self.con.handle.try_send(CtrlMsg::Unread).ok();
55 }
56 }
57}
58
59pub struct StreamFuture<'a> {
60 con: &'a RedisConsumer,
61 fut: NextFuture<'a>,
62}
63
64impl<'a> Debug for StreamFuture<'a> {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 f.debug_struct("StreamFuture").finish()
67 }
68}
69
70impl<'a> StreamFuture<'a> {
71 pub fn new(con: &'a RedisConsumer) -> Self {
72 let fut = con.next();
73 Self { con, fut }
74 }
75}
76
77impl<'a> Stream for StreamFuture<'a> {
78 type Item = RedisResult<SharedMessage>;
79
80 fn poll_next(
81 mut self: std::pin::Pin<&mut Self>,
82 cx: &mut std::task::Context<'_>,
83 ) -> std::task::Poll<Option<Self::Item>> {
84 use std::task::Poll::{Pending, Ready};
85 match std::pin::Pin::new(&mut self.fut).poll(cx) {
86 Ready(res) => {
87 self.fut = self.con.next();
88 Ready(Some(res))
89 }
90 Pending => Pending,
91 }
92 }
93}