metalmq_client/consumer.rs
1use std::time::Duration;
2
3use crate::channel_api::Channel;
4use crate::client_error;
5use crate::message::DeliveredMessage;
6use crate::model;
7use crate::processor::{self, ClientRequest, ClientRequestSink, Param, WaitFor};
8use anyhow::Result;
9use metalmq_codec::frame;
10use tokio::sync::{mpsc, oneshot};
11
12/// A signal arriving from the server during consuming a queue.
13#[derive(Debug)]
14pub enum ConsumerSignal {
15 Delivered(DeliveredMessage),
16 Cancelled,
17 ChannelClosed {
18 reply_code: u16,
19 reply_text: String,
20 class_method: u32,
21 },
22 ConnectionClosed {
23 reply_code: u16,
24 reply_text: String,
25 class_method: u32,
26 },
27}
28
29/// Consumer API for `Basic.Consume`.
30///
31/// `ConsumerHandler` can be get by invoking [`Channel::basic_consume`].
32pub struct ConsumerHandler {
33 /// The channel number we are consuming messages. One client can have one consumer per channel.
34 pub channel: model::ChannelNumber,
35 /// Identifier of the consumer in server.
36 pub consumer_tag: String,
37 client_sink: ClientRequestSink,
38 /// From this signal stream the consumer gets the messages as [`ConsumerSignal`] values and can
39 /// handle them by acking messages or handling channel or connection close events.
40 pub signal_stream: mpsc::UnboundedReceiver<ConsumerSignal>,
41}
42
43/// After consuming started with `ConsumerHandler` one can ack, nack or reject messages.
44///
45/// ```no_run
46/// use metalmq_client::{Channel, ConsumerSignal, Exclusive, NoAck, NoLocal};
47///
48/// async fn consume(channel: Channel) {
49/// let mut handler = channel.basic_consume("queue", NoAck(false), Exclusive(false),
50/// NoLocal(false)).await.unwrap();
51///
52/// while let Some(signal) = handler.signal_stream.recv().await {
53/// match signal {
54/// ConsumerSignal::Delivered(m) => {
55/// handler.basic_ack(m.delivery_tag).await.unwrap();
56/// }
57/// ConsumerSignal::Cancelled | ConsumerSignal::ChannelClosed { .. } |
58/// ConsumerSignal::ConnectionClosed { .. } => {
59/// break;
60/// }
61/// }
62/// }
63/// }
64/// ```
65impl ConsumerHandler {
66 pub async fn receive(&mut self, timeout: Duration) -> Option<ConsumerSignal> {
67 let sleep = tokio::time::sleep(tokio::time::Duration::from(timeout));
68 tokio::pin!(sleep);
69
70 tokio::select! {
71 signal = self.signal_stream.recv() => {
72 signal
73 }
74 _ = &mut sleep => {
75 return None;
76 }
77 }
78 }
79
80 pub async fn basic_ack(&self, delivery_tag: u64) -> Result<()> {
81 processor::sync_send(
82 &self.client_sink,
83 frame::BasicAckArgs::default()
84 .delivery_tag(delivery_tag)
85 .multiple(false)
86 .frame(self.channel),
87 )
88 .await
89 }
90
91 //pub async fn basic_nack(&self, delivery_tag: u64, multiple: bool, requeue: bool) -> Result<()> {
92 // processor::send(&self.client_sink, frame::basic_nack(self.channel, delivery_tag, false)).await
93 //}
94 //
95 //pub async fn reject (delivery tag, requeue)
96
97 pub async fn basic_cancel(self) -> Result<()> {
98 let frame = frame::BasicCancelArgs::new(&self.consumer_tag).frame(self.channel);
99
100 processor::call(&self.client_sink, frame).await
101 }
102}
103
104/// Specify if the consume is exclusive aka no other client can consume the queue.
105pub struct Exclusive(pub bool);
106/// Specify if the client needs to ack messages after delivery.
107pub struct NoAck(pub bool);
108/// Specify if the server sends messages to the same connection which published them.
109pub struct NoLocal(pub bool);
110
111impl Channel {
112 // TODO consume should spawn a thread and on that thread the client can
113 // execute its callback. From that thread we can ack or reject the message,
114 // so the consumer channel won't be affected. Also in the consumer channel,
115 // the client.rs module can buffer the messages, so if the server support
116 // some kind of qos, it won't send more messages while the client has a
117 // lot of unacked messages.
118 //
119 // Because of the lifetimes it would be nice if we consume on a channel, we
120 // give up the ownership and move the channel inside the tokio thread. Why?
121 // Because inside the thread on the channel we need to send back acks or
122 // nacks and so on, so the thread uses the channel. But since we don't want
123 // to run into multithreading issue, we need to move the channel to the
124 // thread and forget that channel in the main code which consumes.
125
126 // TODO ConsumerTag should be an enum with a value or we can ask client to generate a ctag
127 /// See [`ConsumerHandler`]
128 pub async fn basic_consume<'a>(
129 &'a self,
130 queue_name: &'a str,
131 no_ack: NoAck,
132 exclusive: Exclusive,
133 no_local: NoLocal,
134 ) -> Result<ConsumerHandler> {
135 let consumer_tag = format!("metalmq-{}", rand::random::<u128>());
136
137 let frame = frame::BasicConsumeArgs::default()
138 .queue(queue_name)
139 .consumer_tag(&consumer_tag)
140 .no_ack(no_ack.0)
141 .exclusive(exclusive.0)
142 .no_local(no_local.0)
143 .frame(self.channel);
144
145 // Buffer of the incoming, delivered messages or other signals like
146 // consumer cancelled.
147 let (signal_sink, signal_stream) = mpsc::unbounded_channel::<ConsumerSignal>();
148
149 let handler = ConsumerHandler {
150 channel: self.channel,
151 consumer_tag,
152 client_sink: self.sink.clone(),
153 signal_stream,
154 };
155
156 let (tx, rx) = oneshot::channel();
157
158 self.sink
159 .send(ClientRequest {
160 param: Param::Consume(frame, signal_sink),
161 response: WaitFor::FrameResponse(tx),
162 })
163 .await?;
164
165 match rx.await {
166 Ok(response) => match response {
167 Ok(()) => Ok(handler),
168 Err(e) => Err(e),
169 },
170 Err(_) => client_error!(None, 501, "Channel recv error", 0),
171 }
172 }
173}