async_mq/
consume.rs

1// SPDX-License-Identifier: Apache-2.0 AND MIT
2//! `ConsumerBuilder` and `Consumer` structs
3use futures::stream::{Stream, StreamExt};
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7/// A [non-consuming] [Consumer] builder.
8///
9/// [Consumer]: struct.Consumer.html
10/// [non-consuming]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html#non-consuming-builders-(preferred):
11#[derive(Clone)]
12pub struct ConsumerBuilder {
13    conn: crate::Connection,
14    ex: String,
15    queue: String,
16    kind: lapin::ExchangeKind,
17    ex_opts: lapin::options::ExchangeDeclareOptions,
18    queue_opts: lapin::options::QueueDeclareOptions,
19    bind_opts: lapin::options::QueueBindOptions,
20    field_table: lapin::types::FieldTable,
21    tx_props: lapin::BasicProperties,
22    tx_opts: lapin::options::BasicPublishOptions,
23    rx_opts: lapin::options::BasicConsumeOptions,
24    ack_opts: lapin::options::BasicAckOptions,
25    rej_opts: lapin::options::BasicRejectOptions,
26    processor: Box<dyn crate::MessageProcess + Send + Sync>,
27}
28
29impl ConsumerBuilder {
30    pub fn new(conn: crate::Connection) -> Self {
31        Self {
32            conn,
33            ex: String::from(crate::DEFAULT_EXCHANGE),
34            queue: String::from(crate::DEFAULT_QUEUE),
35            kind: lapin::ExchangeKind::Direct,
36            ex_opts: lapin::options::ExchangeDeclareOptions::default(),
37            queue_opts: lapin::options::QueueDeclareOptions::default(),
38            bind_opts: lapin::options::QueueBindOptions::default(),
39            field_table: lapin::types::FieldTable::default(),
40            tx_props: lapin::BasicProperties::default(),
41            tx_opts: lapin::options::BasicPublishOptions::default(),
42            rx_opts: lapin::options::BasicConsumeOptions::default(),
43            ack_opts: lapin::options::BasicAckOptions::default(),
44            rej_opts: lapin::options::BasicRejectOptions::default(),
45            processor: Box::new(crate::message::EchoProcessor {}),
46        }
47    }
48    /// Specify the exchange name.
49    pub fn exchange(&mut self, exchange: &str) -> &mut Self {
50        self.ex = exchange.to_string();
51        self
52    }
53    /// Specify the queue name.
54    pub fn queue(&mut self, queue: &str) -> &mut Self {
55        self.queue = queue.to_string();
56        self
57    }
58    /// Use the provided [MessageProcess] trait object.
59    ///
60    /// [MessageProcess]: ../message/trait.MessageProcess.html
61    pub fn with_processor(
62        &mut self,
63        processor: Box<dyn crate::MessageProcess + Send + Sync>,
64    ) -> &mut Self {
65        self.processor = processor;
66        self
67    }
68    pub async fn build(&self) -> crate::Result<Consumer> {
69        let opts = crate::client::QueueOptions {
70            kind: self.kind.clone(),
71            ex_opts: self.ex_opts.clone(),
72            ex_field: self.field_table.clone(),
73            queue_opts: self.queue_opts.clone(),
74            queue_field: self.field_table.clone(),
75            bind_opts: self.bind_opts.clone(),
76            bind_field: self.field_table.clone(),
77        };
78        let (ch, q) = self.conn.queue(&self.ex, &self.queue, opts).await?;
79        let consume = ch
80            .clone()
81            .basic_consume(
82                q.name().as_str(),
83                "consumer",
84                self.rx_opts.clone(),
85                self.field_table.clone(),
86            )
87            .await
88            .map_err(crate::Error::from)?;
89        Ok(Consumer {
90            ch,
91            consume,
92            ex: self.ex.clone(),
93            tx_props: self.tx_props.clone(),
94            tx_opts: self.tx_opts.clone(),
95            ack_opts: self.ack_opts.clone(),
96            rej_opts: self.rej_opts.clone(),
97            processor: self.processor.clone(),
98        })
99    }
100}
101
102/// A zero-cost [lapin::Consumer] abstruction type.
103///
104/// [lapin::Consumer]: https://docs.rs/lapin/latest/lapin/struct.Consumer.html
105pub struct Consumer {
106    ch: lapin::Channel,
107    consume: lapin::Consumer,
108    ex: String,
109    tx_props: lapin::BasicProperties,
110    tx_opts: lapin::options::BasicPublishOptions,
111    ack_opts: lapin::options::BasicAckOptions,
112    rej_opts: lapin::options::BasicRejectOptions,
113    processor: Box<dyn crate::MessageProcess + Send + Sync>,
114}
115
116impl Consumer {
117    /// Use the provided [MessageProcess] trait object.
118    ///
119    /// [MessageProcess]: ../message/trait.MessageProcess.html
120    pub fn with_processor(
121        &mut self,
122        processor: Box<dyn crate::MessageProcess + Send + Sync>,
123    ) -> &mut Self {
124        self.processor = processor;
125        self
126    }
127    pub async fn run(&mut self) -> crate::Result<()> {
128        while let Some(msg) = self.consume.next().await {
129            match msg {
130                Ok(msg) => {
131                    let req = &crate::Message::new(msg);
132                    match self.processor.process(req).await {
133                        Ok(resp) => self.response(req, &resp).await?,
134                        Err(_err) => self.reject(req).await?,
135                    }
136                }
137                Err(err) => return Err(crate::Error::from(err)),
138            }
139        }
140        Ok(())
141    }
142    pub async fn response(&mut self, req: &crate::Message, resp: &[u8]) -> crate::Result<()> {
143        if let Some(reply_to) = req.reply_to() {
144            self.send(reply_to, resp).await?;
145        }
146        self.ch
147            .basic_ack(req.delivery_tag(), self.ack_opts.clone())
148            .await
149            .map_err(crate::Error::from)?;
150        Ok(())
151    }
152    pub async fn reject(&mut self, req: &crate::Message) -> crate::Result<()> {
153        self.ch
154            .basic_reject(req.delivery_tag(), self.rej_opts.clone())
155            .await
156            .map_err(crate::Error::from)?;
157        Ok(())
158    }
159    async fn send(&mut self, routing_key: &str, msg: &[u8]) -> crate::Result<()> {
160        self.ch
161            .basic_publish(
162                &self.ex,
163                &routing_key,
164                self.tx_opts.clone(),
165                msg.to_vec(),
166                self.tx_props.clone(),
167            )
168            .await
169            .map_err(crate::Error::from)?;
170        Ok(())
171    }
172}
173
174impl Stream for Consumer {
175    type Item = Result<crate::Message, crate::Error>;
176    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
177        let c = &mut self.consume;
178        let c = Pin::new(c);
179        match c.poll_next(cx) {
180            Poll::Ready(Some(Ok(msg))) => Poll::Ready(Some(Ok(crate::Message::new(msg)))),
181            Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err.into()))),
182            Poll::Ready(None) => Poll::Ready(None),
183            Poll::Pending => Poll::Pending,
184        }
185    }
186}