1use futures::stream::{Stream, StreamExt};
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7#[derive(Clone)]
12pub struct ConsumerBuilder {
13 conn: crate::Connection,
14 ex: String,
15 queue: String,
16 kind: lapin::ExchangeKind,
17 ex_opts: lapin::options::ExchangeDeclareOptions,
18 queue_opts: lapin::options::QueueDeclareOptions,
19 bind_opts: lapin::options::QueueBindOptions,
20 field_table: lapin::types::FieldTable,
21 tx_props: lapin::BasicProperties,
22 tx_opts: lapin::options::BasicPublishOptions,
23 rx_opts: lapin::options::BasicConsumeOptions,
24 ack_opts: lapin::options::BasicAckOptions,
25 rej_opts: lapin::options::BasicRejectOptions,
26 processor: Box<dyn crate::MessageProcess + Send + Sync>,
27}
28
29impl ConsumerBuilder {
30 pub fn new(conn: crate::Connection) -> Self {
31 Self {
32 conn,
33 ex: String::from(crate::DEFAULT_EXCHANGE),
34 queue: String::from(crate::DEFAULT_QUEUE),
35 kind: lapin::ExchangeKind::Direct,
36 ex_opts: lapin::options::ExchangeDeclareOptions::default(),
37 queue_opts: lapin::options::QueueDeclareOptions::default(),
38 bind_opts: lapin::options::QueueBindOptions::default(),
39 field_table: lapin::types::FieldTable::default(),
40 tx_props: lapin::BasicProperties::default(),
41 tx_opts: lapin::options::BasicPublishOptions::default(),
42 rx_opts: lapin::options::BasicConsumeOptions::default(),
43 ack_opts: lapin::options::BasicAckOptions::default(),
44 rej_opts: lapin::options::BasicRejectOptions::default(),
45 processor: Box::new(crate::message::EchoProcessor {}),
46 }
47 }
48 pub fn exchange(&mut self, exchange: &str) -> &mut Self {
50 self.ex = exchange.to_string();
51 self
52 }
53 pub fn queue(&mut self, queue: &str) -> &mut Self {
55 self.queue = queue.to_string();
56 self
57 }
58 pub fn with_processor(
62 &mut self,
63 processor: Box<dyn crate::MessageProcess + Send + Sync>,
64 ) -> &mut Self {
65 self.processor = processor;
66 self
67 }
68 pub async fn build(&self) -> crate::Result<Consumer> {
69 let opts = crate::client::QueueOptions {
70 kind: self.kind.clone(),
71 ex_opts: self.ex_opts.clone(),
72 ex_field: self.field_table.clone(),
73 queue_opts: self.queue_opts.clone(),
74 queue_field: self.field_table.clone(),
75 bind_opts: self.bind_opts.clone(),
76 bind_field: self.field_table.clone(),
77 };
78 let (ch, q) = self.conn.queue(&self.ex, &self.queue, opts).await?;
79 let consume = ch
80 .clone()
81 .basic_consume(
82 q.name().as_str(),
83 "consumer",
84 self.rx_opts.clone(),
85 self.field_table.clone(),
86 )
87 .await
88 .map_err(crate::Error::from)?;
89 Ok(Consumer {
90 ch,
91 consume,
92 ex: self.ex.clone(),
93 tx_props: self.tx_props.clone(),
94 tx_opts: self.tx_opts.clone(),
95 ack_opts: self.ack_opts.clone(),
96 rej_opts: self.rej_opts.clone(),
97 processor: self.processor.clone(),
98 })
99 }
100}
101
102pub struct Consumer {
106 ch: lapin::Channel,
107 consume: lapin::Consumer,
108 ex: String,
109 tx_props: lapin::BasicProperties,
110 tx_opts: lapin::options::BasicPublishOptions,
111 ack_opts: lapin::options::BasicAckOptions,
112 rej_opts: lapin::options::BasicRejectOptions,
113 processor: Box<dyn crate::MessageProcess + Send + Sync>,
114}
115
116impl Consumer {
117 pub fn with_processor(
121 &mut self,
122 processor: Box<dyn crate::MessageProcess + Send + Sync>,
123 ) -> &mut Self {
124 self.processor = processor;
125 self
126 }
127 pub async fn run(&mut self) -> crate::Result<()> {
128 while let Some(msg) = self.consume.next().await {
129 match msg {
130 Ok(msg) => {
131 let req = &crate::Message::new(msg);
132 match self.processor.process(req).await {
133 Ok(resp) => self.response(req, &resp).await?,
134 Err(_err) => self.reject(req).await?,
135 }
136 }
137 Err(err) => return Err(crate::Error::from(err)),
138 }
139 }
140 Ok(())
141 }
142 pub async fn response(&mut self, req: &crate::Message, resp: &[u8]) -> crate::Result<()> {
143 if let Some(reply_to) = req.reply_to() {
144 self.send(reply_to, resp).await?;
145 }
146 self.ch
147 .basic_ack(req.delivery_tag(), self.ack_opts.clone())
148 .await
149 .map_err(crate::Error::from)?;
150 Ok(())
151 }
152 pub async fn reject(&mut self, req: &crate::Message) -> crate::Result<()> {
153 self.ch
154 .basic_reject(req.delivery_tag(), self.rej_opts.clone())
155 .await
156 .map_err(crate::Error::from)?;
157 Ok(())
158 }
159 async fn send(&mut self, routing_key: &str, msg: &[u8]) -> crate::Result<()> {
160 self.ch
161 .basic_publish(
162 &self.ex,
163 &routing_key,
164 self.tx_opts.clone(),
165 msg.to_vec(),
166 self.tx_props.clone(),
167 )
168 .await
169 .map_err(crate::Error::from)?;
170 Ok(())
171 }
172}
173
174impl Stream for Consumer {
175 type Item = Result<crate::Message, crate::Error>;
176 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
177 let c = &mut self.consume;
178 let c = Pin::new(c);
179 match c.poll_next(cx) {
180 Poll::Ready(Some(Ok(msg))) => Poll::Ready(Some(Ok(crate::Message::new(msg)))),
181 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err.into()))),
182 Poll::Ready(None) => Poll::Ready(None),
183 Poll::Pending => Poll::Pending,
184 }
185 }
186}