1use futures_util::stream::StreamExt;
4
5#[derive(Clone)]
10pub struct ProducerBuilder {
11    conn: crate::Connection,
12    ex: String,
13    queue: String,
14    kind: lapin::ExchangeKind,
15    ex_opts: lapin::options::ExchangeDeclareOptions,
16    queue_opts: lapin::options::QueueDeclareOptions,
17    bind_opts: lapin::options::QueueBindOptions,
18    field_table: lapin::types::FieldTable,
19    tx_props: lapin::BasicProperties,
20    tx_opts: lapin::options::BasicPublishOptions,
21    rx_opts: lapin::options::BasicConsumeOptions,
22    ack_opts: lapin::options::BasicAckOptions,
23    rej_opts: lapin::options::BasicRejectOptions,
24    nack_opts: lapin::options::BasicNackOptions,
25    peeker: Box<dyn crate::MessagePeek + Send + Sync>,
26}
27
28impl ProducerBuilder {
29    pub fn new(conn: crate::Connection) -> Self {
30        Self {
31            conn,
32            ex: String::from(crate::DEFAULT_EXCHANGE),
33            queue: String::from(crate::DEFAULT_QUEUE),
34            kind: lapin::ExchangeKind::Direct,
35            ex_opts: lapin::options::ExchangeDeclareOptions::default(),
36            queue_opts: lapin::options::QueueDeclareOptions::default(),
37            bind_opts: lapin::options::QueueBindOptions::default(),
38            field_table: lapin::types::FieldTable::default(),
39            tx_props: lapin::BasicProperties::default(),
40            tx_opts: lapin::options::BasicPublishOptions::default(),
41            rx_opts: lapin::options::BasicConsumeOptions::default(),
42            ack_opts: lapin::options::BasicAckOptions::default(),
43            rej_opts: lapin::options::BasicRejectOptions::default(),
44            nack_opts: lapin::options::BasicNackOptions::default(),
45            peeker: Box::new(crate::message::NoopPeeker {}),
46        }
47    }
48    pub fn exchange(&mut self, exchange: &str) -> &mut Self {
50        self.ex = exchange.to_string();
51        self
52    }
53    pub fn queue(&mut self, queue: &str) -> &mut Self {
55        self.queue = queue.to_string();
56        self
57    }
58    pub fn with_peeker(&mut self, peeker: Box<dyn crate::MessagePeek + Send + Sync>) -> &mut Self {
62        self.peeker = peeker;
63        self
64    }
65    pub async fn build(&self) -> crate::Result<Producer> {
66        let tx = self.conn.channel().await?;
67        let queue_opts = lapin::options::QueueDeclareOptions {
68            exclusive: true,
69            auto_delete: true,
70            ..self.queue_opts.clone()
71        };
72        let opts = crate::client::QueueOptions {
73            kind: self.kind.clone(),
74            ex_opts: self.ex_opts.clone(),
75            ex_field: self.field_table.clone(),
76            queue_opts,
77            queue_field: self.field_table.clone(),
78            bind_opts: self.bind_opts.clone(),
79            bind_field: self.field_table.clone(),
80        };
81        let (rx, q) = self
82            .conn
83            .queue(&self.ex, crate::EPHEMERAL_QUEUE, opts)
84            .await?;
85        let consume = rx
86            .basic_consume(
87                q.name().as_str(),
88                "producer",
89                self.rx_opts.clone(),
90                self.field_table.clone(),
91            )
92            .await
93            .map_err(crate::Error::from)?;
94        Ok(Producer {
95            tx,
96            rx,
97            consume,
98            ex: self.ex.clone(),
99            queue: self.queue.clone(),
100            tx_props: self.tx_props.clone(),
101            rx_props: self.tx_props.clone().with_reply_to(q.name().clone()),
102            tx_opts: self.tx_opts.clone(),
103            ack_opts: self.ack_opts.clone(),
104            rej_opts: self.rej_opts.clone(),
105            nack_opts: self.nack_opts.clone(),
106            peeker: self.peeker.clone(),
107        })
108    }
109}
110
111pub struct Producer {
115    tx: lapin::Channel,
116    rx: lapin::Channel,
117    consume: lapin::Consumer,
118    ex: String,
119    queue: String,
120    tx_props: lapin::BasicProperties,
121    rx_props: lapin::BasicProperties,
122    tx_opts: lapin::options::BasicPublishOptions,
123    ack_opts: lapin::options::BasicAckOptions,
124    rej_opts: lapin::options::BasicRejectOptions,
125    nack_opts: lapin::options::BasicNackOptions,
126    peeker: Box<dyn crate::MessagePeek + Send>,
127}
128
129impl Producer {
130    pub fn with_peeker(&mut self, peeker: Box<dyn crate::MessagePeek + Send + Sync>) -> &mut Self {
134        self.peeker = peeker;
135        self
136    }
137    pub async fn publish(&mut self, msg: Vec<u8>) -> crate::Result<()> {
138        self.tx
139            .basic_publish(
140                &self.ex,
141                &self.queue,
142                self.tx_opts.clone(),
143                msg,
144                self.tx_props.clone(),
145            )
146            .await
147            .map_err(crate::Error::from)?;
148        Ok(())
149    }
150    pub async fn rpc(&mut self, msg: Vec<u8>) -> crate::Result<Vec<u8>> {
151        self.tx
152            .basic_publish(
153                &self.ex,
154                &self.queue,
155                self.tx_opts.clone(),
156                msg,
157                self.rx_props.clone(),
158            )
159            .await
160            .map_err(crate::Error::from)?;
161        if let Some(msg) = self.consume.next().await {
162            match msg {
163                Ok(msg) => return self.recv(&crate::Message::new(msg)).await,
164                Err(err) => return Err(crate::Error::from(err)),
165            }
166        }
167        Ok(vec![])
168    }
169    async fn recv(&mut self, msg: &crate::Message) -> crate::Result<Vec<u8>> {
170        match self.peeker.peek(msg).await {
171            Ok(()) => {
172                self.rx
173                    .basic_ack(msg.delivery_tag(), self.ack_opts.clone())
174                    .await
175                    .map_err(crate::Error::from)?;
176                Ok(msg.data().to_vec())
177            }
178            Err(crate::MessageError::Drop) => Ok(vec![]),
179            Err(crate::MessageError::Reject) => {
180                self.rx
181                    .basic_reject(msg.delivery_tag(), self.rej_opts.clone())
182                    .await
183                    .map_err(crate::Error::from)?;
184                Ok(vec![])
185            }
186            Err(crate::MessageError::Nack) => {
187                self.rx
188                    .basic_nack(msg.delivery_tag(), self.nack_opts.clone())
189                    .await
190                    .map_err(crate::Error::from)?;
191                Ok(vec![])
192            }
193        }
194    }
195}