1use core::pin::Pin;
3#[cfg(feature = "tokio_runtime")]
4use tokio::sync::mpsc::{Receiver, Sender};
5use std::sync::{Arc, Weak};
8use futures::{
9 future::Future,
10 stream::Stream,
11 task::{Context, Poll},
12};
13use lapin::{message::Delivery, Channel, Consumer};
14use anyhow::Result;
15use crate::exchanges::DeclareExchange;
16
17use super::comms::*;
18
19type ConsumerCreator = Box<dyn RabbitDispatcher<Object = Consumer>>;
21
22pub type CreatorResult<T> = Pin<Box<dyn Future<Output = Result<T>> + Send>>;
23pub type Creator<T> = Pin<Box<dyn Fn(Arc<Channel>, Option<Arc<DeclareExchange>>) -> CreatorResult<T> + Send + Sync>>;
24
25pub type ChannelReceiver = Receiver<Weak<Channel>>;
26
27type NextFuture = Pin<
28 Box<
29 dyn Future<
30 Output = (
31 Delivery,
32 Data,
33 ),
34 > + Send,
35 >,
36>;
37
38struct Data {
39 consumer: Consumer,
41 channel: Weak<Channel>,
44 creator: ConsumerCreator,
46 channel_sender: ChannelSender,
48 channel_receiver: Receiver<Weak<Channel>>,
50 channel_requester: Arc<Sender<CommsMsg>>,
52}
53
54enum State {
55 Idle(Data),
57 Next {
59 next: NextFuture,
61 }
62}
63
64pub struct ConsumerWrapper {
69 state: Option<State>,
70}
71
72impl ConsumerWrapper {
73
74 pub(crate) async fn new(creator: ConsumerCreator) -> Self {
82 log::trace!("Getting channel requester");
83 let channel_requester = Comms::get_channel_comms();
84 let (channel_sender, mut channel_receiver) = Comms::create_channel_channel();
85 log::trace!("Creating the consumer using the creator");
86 let (consumer, channel) =
87 creator.start_dispatch(
88 None,
89 &channel_sender,
90 &mut channel_receiver,
91 &channel_requester).await;
92 log::trace!("Consumer wrapper created");
93 Self {
94 state: Some(State::Idle(Data { consumer, channel, creator, channel_sender, channel_receiver, channel_requester})),
95 }
96 }
97
98
99 async fn next_item(mut data: Data)
101 -> (Delivery, Data) {
102 loop {
103 use futures::stream::StreamExt;
104 log::trace!("Polling consumer");
105 match data.consumer.next().await {
106 Some(Ok(delivery)) => {
107 log::trace!("Got delivery");
108 return (delivery, data);
109 }
110 Some(Err(err)) => {
111 log::error!("Failed to consume a message: {}", err);
112 }
113 None => {
114 log::error!("Consumer has finished for some reason!");
115 }
116 }
117 log::warn!("Consumer is broken, waiting for a new connection");
118 let (consumer, channel) = data.creator.start_dispatch(Some(data.channel.clone()),
119 &data.channel_sender,
120 &mut data.channel_receiver,
121 &data.channel_requester).await;
122 data.consumer = consumer;
123 data.channel = channel;
124 }
125 }
126}
127
128impl Stream for ConsumerWrapper {
129 type Item = Delivery;
130 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
131 log::trace!("Poll next");
132 let this = Pin::into_inner(self);
133
134 loop {
135 match this.state.take() {
136 Some(State::Idle(data)) => {
137 this.state = Some(State::Next {
138 next: Box::pin(Self::next_item(data)),
139 });
140 }
141 Some(State::Next { mut next }) => {
142 let action = next.as_mut();
143 return match Future::poll(action, cx) {
144 Poll::Pending => {
145 this.state = Some(State::Next { next });
146 log::trace!("Pending");
147 Poll::Pending
148 }
149 Poll::Ready((delivery, data)) => {
150 this.state = Some(State::Idle(data));
151 log::trace!("Ready");
152 Poll::Ready(Some(delivery))
153 }
154 }
155 }
156 None => unreachable!(),
157 }
158 }
159 }
160}