1use futures_util::stream::StreamExt;
4
5#[derive(Clone)]
10pub struct ProducerBuilder {
11 conn: crate::Connection,
12 ex: String,
13 queue: String,
14 kind: lapin::ExchangeKind,
15 ex_opts: lapin::options::ExchangeDeclareOptions,
16 queue_opts: lapin::options::QueueDeclareOptions,
17 bind_opts: lapin::options::QueueBindOptions,
18 field_table: lapin::types::FieldTable,
19 tx_props: lapin::BasicProperties,
20 tx_opts: lapin::options::BasicPublishOptions,
21 rx_opts: lapin::options::BasicConsumeOptions,
22 ack_opts: lapin::options::BasicAckOptions,
23 rej_opts: lapin::options::BasicRejectOptions,
24 nack_opts: lapin::options::BasicNackOptions,
25 peeker: Box<dyn crate::MessagePeek + Send + Sync>,
26}
27
28impl ProducerBuilder {
29 pub fn new(conn: crate::Connection) -> Self {
30 Self {
31 conn,
32 ex: String::from(crate::DEFAULT_EXCHANGE),
33 queue: String::from(crate::DEFAULT_QUEUE),
34 kind: lapin::ExchangeKind::Direct,
35 ex_opts: lapin::options::ExchangeDeclareOptions::default(),
36 queue_opts: lapin::options::QueueDeclareOptions::default(),
37 bind_opts: lapin::options::QueueBindOptions::default(),
38 field_table: lapin::types::FieldTable::default(),
39 tx_props: lapin::BasicProperties::default(),
40 tx_opts: lapin::options::BasicPublishOptions::default(),
41 rx_opts: lapin::options::BasicConsumeOptions::default(),
42 ack_opts: lapin::options::BasicAckOptions::default(),
43 rej_opts: lapin::options::BasicRejectOptions::default(),
44 nack_opts: lapin::options::BasicNackOptions::default(),
45 peeker: Box::new(crate::message::NoopPeeker {}),
46 }
47 }
48 pub fn exchange(&mut self, exchange: &str) -> &mut Self {
50 self.ex = exchange.to_string();
51 self
52 }
53 pub fn queue(&mut self, queue: &str) -> &mut Self {
55 self.queue = queue.to_string();
56 self
57 }
58 pub fn with_peeker(&mut self, peeker: Box<dyn crate::MessagePeek + Send + Sync>) -> &mut Self {
62 self.peeker = peeker;
63 self
64 }
65 pub async fn build(&self) -> crate::Result<Producer> {
66 let tx = self.conn.channel().await?;
67 let queue_opts = lapin::options::QueueDeclareOptions {
68 exclusive: true,
69 auto_delete: true,
70 ..self.queue_opts.clone()
71 };
72 let opts = crate::client::QueueOptions {
73 kind: self.kind.clone(),
74 ex_opts: self.ex_opts.clone(),
75 ex_field: self.field_table.clone(),
76 queue_opts,
77 queue_field: self.field_table.clone(),
78 bind_opts: self.bind_opts.clone(),
79 bind_field: self.field_table.clone(),
80 };
81 let (rx, q) = self
82 .conn
83 .queue(&self.ex, crate::EPHEMERAL_QUEUE, opts)
84 .await?;
85 let consume = rx
86 .basic_consume(
87 q.name().as_str(),
88 "producer",
89 self.rx_opts.clone(),
90 self.field_table.clone(),
91 )
92 .await
93 .map_err(crate::Error::from)?;
94 Ok(Producer {
95 tx,
96 rx,
97 consume,
98 ex: self.ex.clone(),
99 queue: self.queue.clone(),
100 tx_props: self.tx_props.clone(),
101 rx_props: self.tx_props.clone().with_reply_to(q.name().clone()),
102 tx_opts: self.tx_opts.clone(),
103 ack_opts: self.ack_opts.clone(),
104 rej_opts: self.rej_opts.clone(),
105 nack_opts: self.nack_opts.clone(),
106 peeker: self.peeker.clone(),
107 })
108 }
109}
110
111pub struct Producer {
115 tx: lapin::Channel,
116 rx: lapin::Channel,
117 consume: lapin::Consumer,
118 ex: String,
119 queue: String,
120 tx_props: lapin::BasicProperties,
121 rx_props: lapin::BasicProperties,
122 tx_opts: lapin::options::BasicPublishOptions,
123 ack_opts: lapin::options::BasicAckOptions,
124 rej_opts: lapin::options::BasicRejectOptions,
125 nack_opts: lapin::options::BasicNackOptions,
126 peeker: Box<dyn crate::MessagePeek + Send>,
127}
128
129impl Producer {
130 pub fn with_peeker(&mut self, peeker: Box<dyn crate::MessagePeek + Send + Sync>) -> &mut Self {
134 self.peeker = peeker;
135 self
136 }
137 pub async fn publish(&mut self, msg: Vec<u8>) -> crate::Result<()> {
138 self.tx
139 .basic_publish(
140 &self.ex,
141 &self.queue,
142 self.tx_opts.clone(),
143 msg,
144 self.tx_props.clone(),
145 )
146 .await
147 .map_err(crate::Error::from)?;
148 Ok(())
149 }
150 pub async fn rpc(&mut self, msg: Vec<u8>) -> crate::Result<Vec<u8>> {
151 self.tx
152 .basic_publish(
153 &self.ex,
154 &self.queue,
155 self.tx_opts.clone(),
156 msg,
157 self.rx_props.clone(),
158 )
159 .await
160 .map_err(crate::Error::from)?;
161 if let Some(msg) = self.consume.next().await {
162 match msg {
163 Ok(msg) => return self.recv(&crate::Message::new(msg)).await,
164 Err(err) => return Err(crate::Error::from(err)),
165 }
166 }
167 Ok(vec![])
168 }
169 async fn recv(&mut self, msg: &crate::Message) -> crate::Result<Vec<u8>> {
170 match self.peeker.peek(msg).await {
171 Ok(()) => {
172 self.rx
173 .basic_ack(msg.delivery_tag(), self.ack_opts.clone())
174 .await
175 .map_err(crate::Error::from)?;
176 Ok(msg.data().to_vec())
177 }
178 Err(crate::MessageError::Drop) => Ok(vec![]),
179 Err(crate::MessageError::Reject) => {
180 self.rx
181 .basic_reject(msg.delivery_tag(), self.rej_opts.clone())
182 .await
183 .map_err(crate::Error::from)?;
184 Ok(vec![])
185 }
186 Err(crate::MessageError::Nack) => {
187 self.rx
188 .basic_nack(msg.delivery_tag(), self.nack_opts.clone())
189 .await
190 .map_err(crate::Error::from)?;
191 Ok(vec![])
192 }
193 }
194 }
195}