Skip to main content

iridium_stomp/
subscription.rs

1use crate::connection::ConnError;
2use crate::connection::Connection;
3use crate::frame::Frame;
4use futures::stream::Stream;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7use tokio::sync::mpsc;
8
9/// Options to configure a subscription. `headers` are forwarded to the
10/// broker as-is when sending the SUBSCRIBE frame and persisted locally so
11/// they can be re-sent on reconnect. This allows broker-specific durable
12/// subscription extensions to be used (for example ActiveMQ's durable
13/// subscription headers) while keeping the library generic.
14#[derive(Debug, Clone, Default)]
15pub struct SubscriptionOptions {
16    /// Extra headers to include on the SUBSCRIBE frame.
17    pub headers: Vec<(String, String)>,
18
19    /// Optional named queue to subscribe to (convenience; typically you can
20    /// just put this in the `destination` argument). Kept for clarity.
21    pub durable_queue: Option<String>,
22}
23
24/// A lightweight handle returned from `Connection::subscribe` that packages the
25/// subscription id, destination, and the receiving side of the subscription.
26///
27/// The `Subscription` provides convenience helpers for acknowledging or
28/// negative-acknowledging messages; these delegate to the underlying
29/// `Connection` handle.
30pub struct Subscription {
31    id: String,
32    destination: String,
33    receiver: mpsc::Receiver<Frame>,
34    conn: Connection,
35}
36
37impl Subscription {
38    pub(crate) fn new(
39        id: String,
40        destination: String,
41        receiver: mpsc::Receiver<Frame>,
42        conn: Connection,
43    ) -> Self {
44        Self {
45            id,
46            destination,
47            receiver,
48            conn,
49        }
50    }
51
52    /// Returns the local subscription id.
53    pub fn id(&self) -> &str {
54        &self.id
55    }
56
57    /// Returns the destination this subscription listens to.
58    pub fn destination(&self) -> &str {
59        &self.destination
60    }
61
62    /// Consume the `Subscription` and return the underlying receiver so the
63    /// caller can drive message handling directly.
64    pub fn into_receiver(self) -> mpsc::Receiver<Frame> {
65        self.receiver
66    }
67
68    /// Acknowledge a message by its `message-id` header. Delegates to
69    /// `Connection::ack` using the local subscription id.
70    pub async fn ack(&self, message_id: &str) -> Result<(), ConnError> {
71        self.conn.ack(&self.id, message_id).await
72    }
73
74    /// Negative-acknowledge a message by its `message-id` header.
75    pub async fn nack(&self, message_id: &str) -> Result<(), ConnError> {
76        self.conn.nack(&self.id, message_id).await
77    }
78
79    /// Consume the subscription and unsubscribe from the server.
80    ///
81    /// This is a convenience that calls `Connection::unsubscribe` with the
82    /// local subscription id and drops the receiver.
83    pub async fn unsubscribe(self) -> Result<(), ConnError> {
84        self.conn.unsubscribe(&self.id).await
85    }
86}
87
88impl Stream for Subscription {
89    type Item = Frame;
90
91    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
92        // Safe to get a mutable reference because all fields of `Subscription`
93        // are `Unpin` (String, Receiver, Connection). We then delegate to the
94        // tokio mpsc receiver's `poll_recv` which returns `Poll<Option<T>>`.
95        let this = self.get_mut();
96        Pin::new(&mut this.receiver).poll_recv(cx)
97    }
98}