1use core::pin::Pin;
3#[cfg(feature = "tokio_runtime")]
4use tokio::sync::mpsc::{Receiver, Sender};
5use std::sync::{Arc, Weak};
8use futures::{
9    future::Future,
10    stream::Stream,
11    task::{Context, Poll},
12};
13use lapin::{message::Delivery, Channel, Consumer};
14use anyhow::Result;
15use crate::exchanges::DeclareExchange;
16
17use super::comms::*;
18
19type ConsumerCreator = Box<dyn RabbitDispatcher<Object = Consumer>>;
21
22pub type CreatorResult<T> = Pin<Box<dyn Future<Output = Result<T>> + Send>>;
23pub type Creator<T> = Pin<Box<dyn Fn(Arc<Channel>, Option<Arc<DeclareExchange>>) -> CreatorResult<T> + Send + Sync>>;
24
25pub type ChannelReceiver = Receiver<Weak<Channel>>;
26
27type NextFuture = Pin<
28    Box<
29        dyn Future<
30            Output = (
31                Delivery,
32                Data,
33            ),
34        > + Send,
35    >,
36>;
37
38struct Data {
39    consumer: Consumer,
41    channel: Weak<Channel>,
44    creator: ConsumerCreator,
46    channel_sender: ChannelSender,
48    channel_receiver: Receiver<Weak<Channel>>,
50    channel_requester: Arc<Sender<CommsMsg>>,
52}
53
54enum State {
55    Idle(Data),
57    Next {
59        next: NextFuture,
61    }
62}
63
64pub struct ConsumerWrapper {
69    state: Option<State>,
70}
71
72impl ConsumerWrapper {
73
74    pub(crate) async fn new(creator: ConsumerCreator) -> Self {
82        log::trace!("Getting channel requester");
83        let channel_requester = Comms::get_channel_comms();
84        let (channel_sender, mut channel_receiver) = Comms::create_channel_channel();
85        log::trace!("Creating the consumer using the creator");
86        let (consumer, channel) =
87            creator.start_dispatch(
88                None,
89                &channel_sender,
90                &mut channel_receiver,
91                &channel_requester).await;
92        log::trace!("Consumer wrapper created");
93        Self {
94            state: Some(State::Idle(Data { consumer, channel, creator, channel_sender, channel_receiver, channel_requester})),
95        }
96    }
97
98
99    async fn next_item(mut data: Data)
101        -> (Delivery, Data) {
102        loop {
103            use futures::stream::StreamExt;
104            log::trace!("Polling consumer");
105            match data.consumer.next().await {
106                Some(Ok(delivery)) => {
107                    log::trace!("Got delivery");
108                    return (delivery, data);
109                }
110                Some(Err(err)) => {
111                    log::error!("Failed to consume a message: {}", err);
112                }
113                None => {
114                    log::error!("Consumer has finished for some reason!");
115                }
116            }
117            log::warn!("Consumer is broken, waiting for a new connection");
118            let (consumer, channel) = data.creator.start_dispatch(Some(data.channel.clone()),
119                                                                  &data.channel_sender,
120                                                                  &mut data.channel_receiver,
121                                                                  &data.channel_requester).await;
122            data.consumer = consumer;
123            data.channel = channel;
124        }
125    }
126}
127
128impl Stream for ConsumerWrapper {
129    type Item = Delivery;
130    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
131        log::trace!("Poll next");
132        let this = Pin::into_inner(self);
133
134        loop {
135            match this.state.take() {
136                Some(State::Idle(data)) => {
137                    this.state = Some(State::Next {
138                        next: Box::pin(Self::next_item(data)),
139                    });
140                }
141                Some(State::Next { mut next }) => {
142                    let action = next.as_mut();
143                    return match Future::poll(action, cx) {
144                        Poll::Pending => {
145                            this.state = Some(State::Next { next });
146                            log::trace!("Pending");
147                            Poll::Pending
148                        }
149                        Poll::Ready((delivery, data)) => {
150                            this.state = Some(State::Idle(data));
151                            log::trace!("Ready");
152                            Poll::Ready(Some(delivery))
153                        }
154                    }
155                }
156                None => unreachable!(),
157            }
158        }
159    }
160}