metalmq_client/
consumer.rs

1use std::time::Duration;
2
3use crate::channel_api::Channel;
4use crate::client_error;
5use crate::message::DeliveredMessage;
6use crate::model;
7use crate::processor::{self, ClientRequest, ClientRequestSink, Param, WaitFor};
8use anyhow::Result;
9use metalmq_codec::frame;
10use tokio::sync::{mpsc, oneshot};
11
12/// A signal arriving from the server during consuming a queue.
13#[derive(Debug)]
14pub enum ConsumerSignal {
15    Delivered(DeliveredMessage),
16    Cancelled,
17    ChannelClosed {
18        reply_code: u16,
19        reply_text: String,
20        class_method: u32,
21    },
22    ConnectionClosed {
23        reply_code: u16,
24        reply_text: String,
25        class_method: u32,
26    },
27}
28
29/// Consumer API for `Basic.Consume`.
30///
31/// `ConsumerHandler` can be get by invoking [`Channel::basic_consume`].
32pub struct ConsumerHandler {
33    /// The channel number we are consuming messages. One client can have one consumer per channel.
34    pub channel: model::ChannelNumber,
35    /// Identifier of the consumer in server.
36    pub consumer_tag: String,
37    client_sink: ClientRequestSink,
38    /// From this signal stream the consumer gets the messages as [`ConsumerSignal`] values and can
39    /// handle them by acking messages or handling channel or connection close events.
40    pub signal_stream: mpsc::UnboundedReceiver<ConsumerSignal>,
41}
42
43/// After consuming started with `ConsumerHandler` one can ack, nack or reject messages.
44///
45/// ```no_run
46/// use metalmq_client::{Channel, ConsumerSignal, Exclusive, NoAck, NoLocal};
47///
48/// async fn consume(channel: Channel) {
49///     let mut handler = channel.basic_consume("queue", NoAck(false), Exclusive(false),
50///         NoLocal(false)).await.unwrap();
51///
52///     while let Some(signal) = handler.signal_stream.recv().await {
53///         match signal {
54///             ConsumerSignal::Delivered(m) => {
55///                 handler.basic_ack(m.delivery_tag).await.unwrap();
56///             }
57///             ConsumerSignal::Cancelled | ConsumerSignal::ChannelClosed { .. } |
58///                 ConsumerSignal::ConnectionClosed { .. } => {
59///                 break;
60///             }
61///         }
62///     }
63/// }
64/// ```
65impl ConsumerHandler {
66    pub async fn receive(&mut self, timeout: Duration) -> Option<ConsumerSignal> {
67        let sleep = tokio::time::sleep(tokio::time::Duration::from(timeout));
68        tokio::pin!(sleep);
69
70        tokio::select! {
71            signal = self.signal_stream.recv() => {
72                signal
73            }
74            _ = &mut sleep => {
75                return None;
76            }
77        }
78    }
79
80    pub async fn basic_ack(&self, delivery_tag: u64) -> Result<()> {
81        processor::sync_send(
82            &self.client_sink,
83            frame::BasicAckArgs::default()
84                .delivery_tag(delivery_tag)
85                .multiple(false)
86                .frame(self.channel),
87        )
88        .await
89    }
90
91    //pub async fn basic_nack(&self, delivery_tag: u64, multiple: bool, requeue: bool) -> Result<()> {
92    //    processor::send(&self.client_sink, frame::basic_nack(self.channel, delivery_tag, false)).await
93    //}
94    //
95    //pub async fn reject (delivery tag, requeue)
96
97    pub async fn basic_cancel(self) -> Result<()> {
98        let frame = frame::BasicCancelArgs::new(&self.consumer_tag).frame(self.channel);
99
100        processor::call(&self.client_sink, frame).await
101    }
102}
103
104/// Specify if the consume is exclusive aka no other client can consume the queue.
105pub struct Exclusive(pub bool);
106/// Specify if the client needs to ack messages after delivery.
107pub struct NoAck(pub bool);
108/// Specify if the server sends messages to the same connection which published them.
109pub struct NoLocal(pub bool);
110
111impl Channel {
112    // TODO consume should spawn a thread and on that thread the client can
113    // execute its callback. From that thread we can ack or reject the message,
114    // so the consumer channel won't be affected. Also in the consumer channel,
115    // the client.rs module can buffer the messages, so if the server support
116    // some kind of qos, it won't send more messages while the client has a
117    // lot of unacked messages.
118    //
119    // Because of the lifetimes it would be nice if we consume on a channel, we
120    // give up the ownership and move the channel inside the tokio thread. Why?
121    // Because inside the thread on the channel we need to send back acks or
122    // nacks and so on, so the thread uses the channel. But since we don't want
123    // to run into multithreading issue, we need to move the channel to the
124    // thread and forget that channel in the main code which consumes.
125
126    // TODO ConsumerTag should be an enum with a value or we can ask client to generate a ctag
127    /// See [`ConsumerHandler`]
128    pub async fn basic_consume<'a>(
129        &'a self,
130        queue_name: &'a str,
131        no_ack: NoAck,
132        exclusive: Exclusive,
133        no_local: NoLocal,
134    ) -> Result<ConsumerHandler> {
135        let consumer_tag = format!("metalmq-{}", rand::random::<u128>());
136
137        let frame = frame::BasicConsumeArgs::default()
138            .queue(queue_name)
139            .consumer_tag(&consumer_tag)
140            .no_ack(no_ack.0)
141            .exclusive(exclusive.0)
142            .no_local(no_local.0)
143            .frame(self.channel);
144
145        // Buffer of the incoming, delivered messages or other signals like
146        // consumer cancelled.
147        let (signal_sink, signal_stream) = mpsc::unbounded_channel::<ConsumerSignal>();
148
149        let handler = ConsumerHandler {
150            channel: self.channel,
151            consumer_tag,
152            client_sink: self.sink.clone(),
153            signal_stream,
154        };
155
156        let (tx, rx) = oneshot::channel();
157
158        self.sink
159            .send(ClientRequest {
160                param: Param::Consume(frame, signal_sink),
161                response: WaitFor::FrameResponse(tx),
162            })
163            .await?;
164
165        match rx.await {
166            Ok(response) => match response {
167                Ok(()) => Ok(handler),
168                Err(e) => Err(e),
169            },
170            Err(_) => client_error!(None, 501, "Channel recv error", 0),
171        }
172    }
173}