rabbit_auto/
consumer.rs

1//! A stream wrapper for rabbitmq consumer. This stream never fails and will consume until stopped being used.
2use core::pin::Pin;
3#[cfg(feature = "tokio_runtime")]
4use tokio::sync::mpsc::{Receiver, Sender};
5// #[cfg(feature = "async_std_runtime")]
6// use async_std::sync::RwLock;
7use std::sync::{Arc, Weak};
8use futures::{
9    future::Future,
10    stream::Stream,
11    task::{Context, Poll},
12};
13use lapin::{message::Delivery, Channel, Consumer};
14use anyhow::Result;
15use crate::exchanges::DeclareExchange;
16
17use super::comms::*;
18
19/// Returns a future which creates the consumer from the provided channel.
20type ConsumerCreator = Box<dyn RabbitDispatcher<Object = Consumer>>;
21
22pub type CreatorResult<T> = Pin<Box<dyn Future<Output = Result<T>> + Send>>;
23pub type Creator<T> = Pin<Box<dyn Fn(Arc<Channel>, Option<Arc<DeclareExchange>>) -> CreatorResult<T> + Send + Sync>>;
24
25pub type ChannelReceiver = Receiver<Weak<Channel>>;
26
27type NextFuture = Pin<
28    Box<
29        dyn Future<
30            Output = (
31                Delivery,
32                Data,
33            ),
34        > + Send,
35    >,
36>;
37
38struct Data {
39    /// RabbitMQ consumer
40    consumer: Consumer,
41    /// RabbitMQ channel, this has to be keep here for having the consumer alive, otherwise there will
42    /// be non channel alive for the consumer, and the consumer would be dropped.
43    channel: Weak<Channel>,
44    /// Creator of the consumer
45    creator: ConsumerCreator,
46    /// This will be send to the Comms when the channel is not ok
47    channel_sender: ChannelSender,
48    /// Here will be a new channel delivered after requesting a new one
49    channel_receiver: Receiver<Weak<Channel>>,
50    /// The channel sender is requested over this
51    channel_requester: Arc<Sender<CommsMsg>>,
52}
53
54enum State {
55    /// Waiting to start looking for the next item
56    Idle(Data),
57    /// Looking for the next item
58    Next {
59        /// Future to get the next item
60        next: NextFuture,
61    }
62}
63
64/// Consumer wrapper handles errors in the connection. If the rabbitmq is disconnected, instead of
65/// finishing the stream, the wrapper will try to reconnect and recreate the connection and continue
66/// consuming like nothing happened. But if the connection was never established at least once, the stream
67/// end right away!
68pub struct ConsumerWrapper {
69    state: Option<State>,
70}
71
72impl ConsumerWrapper {
73
74    /// Create a new consumer by providing a creator function. This function might be called many times,
75    /// as often as we loose connection to the rabbitmq.
76    // pub async fn new(creator: ConsumerCreator) -> Result<Self> {
77    //     let (creator, (channel, consumer)) = Self::connect(creator).await?;
78    //     log::debug!("Consumer wrapper created");
79    //     Ok(Self { state: Some(State::Idle { consumer, channel, creator }) })
80    // }
81    pub(crate) async fn new(creator: ConsumerCreator) -> Self {
82        log::trace!("Getting channel requester");
83        let channel_requester = Comms::get_channel_comms();
84        let (channel_sender, mut channel_receiver) = Comms::create_channel_channel();
85        log::trace!("Creating the consumer using the creator");
86        let (consumer, channel) =
87            creator.start_dispatch(
88                None,
89                &channel_sender,
90                &mut channel_receiver,
91                &channel_requester).await;
92        log::trace!("Consumer wrapper created");
93        Self {
94            state: Some(State::Idle(Data { consumer, channel, creator, channel_sender, channel_receiver, channel_requester})),
95        }
96    }
97
98
99    /// Gets the next item from the consumer. If the consumer is broken, then a new consumer is automatically created
100    async fn next_item(mut data: Data)
101        -> (Delivery, Data) {
102        loop {
103            use futures::stream::StreamExt;
104            log::trace!("Polling consumer");
105            match data.consumer.next().await {
106                Some(Ok(delivery)) => {
107                    log::trace!("Got delivery");
108                    return (delivery, data);
109                }
110                Some(Err(err)) => {
111                    log::error!("Failed to consume a message: {}", err);
112                }
113                None => {
114                    log::error!("Consumer has finished for some reason!");
115                }
116            }
117            log::warn!("Consumer is broken, waiting for a new connection");
118            let (consumer, channel) = data.creator.start_dispatch(Some(data.channel.clone()),
119                                                                  &data.channel_sender,
120                                                                  &mut data.channel_receiver,
121                                                                  &data.channel_requester).await;
122            data.consumer = consumer;
123            data.channel = channel;
124        }
125    }
126}
127
128impl Stream for ConsumerWrapper {
129    type Item = Delivery;
130    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
131        log::trace!("Poll next");
132        let this = Pin::into_inner(self);
133
134        loop {
135            match this.state.take() {
136                Some(State::Idle(data)) => {
137                    this.state = Some(State::Next {
138                        next: Box::pin(Self::next_item(data)),
139                    });
140                }
141                Some(State::Next { mut next }) => {
142                    let action = next.as_mut();
143                    return match Future::poll(action, cx) {
144                        Poll::Pending => {
145                            this.state = Some(State::Next { next });
146                            log::trace!("Pending");
147                            Poll::Pending
148                        }
149                        Poll::Ready((delivery, data)) => {
150                            this.state = Some(State::Idle(data));
151                            log::trace!("Ready");
152                            Poll::Ready(Some(delivery))
153                        }
154                    }
155                }
156                None => unreachable!(),
157            }
158        }
159    }
160}