sea_streamer_file/consumer/
future.rs1use super::{FileConsumer, NextFuture};
3use crate::FileResult;
4use sea_streamer_types::{export::futures::Stream, Consumer, SharedMessage};
5use std::{fmt::Debug, future::Future};
6
7pub struct StreamFuture<'a> {
8    con: &'a FileConsumer,
9    fut: NextFuture<'a>,
10}
11
12impl<'a> Debug for StreamFuture<'a> {
13    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
14        f.debug_struct("StreamFuture").finish()
15    }
16}
17
18impl<'a> StreamFuture<'a> {
19    pub fn new(con: &'a FileConsumer) -> Self {
20        let fut = con.next();
21        Self { con, fut }
22    }
23}
24
25impl<'a> Stream for StreamFuture<'a> {
26    type Item = FileResult<SharedMessage>;
27
28    fn poll_next(
29        mut self: std::pin::Pin<&mut Self>,
30        cx: &mut std::task::Context<'_>,
31    ) -> std::task::Poll<Option<Self::Item>> {
32        use std::task::Poll::{Pending, Ready};
33        match std::pin::Pin::new(&mut self.fut).poll(cx) {
34            Ready(res) => {
35                self.fut = self.con.next();
36                Ready(Some(res))
37            }
38            Pending => Pending,
39        }
40    }
41}