iridium_stomp/subscription.rs
1use crate::connection::ConnError;
2use crate::connection::Connection;
3use crate::frame::Frame;
4use futures::stream::Stream;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7use tokio::sync::mpsc;
8
9/// Options to configure a subscription. `headers` are forwarded to the
10/// broker as-is when sending the SUBSCRIBE frame and persisted locally so
11/// they can be re-sent on reconnect. This allows broker-specific durable
12/// subscription extensions to be used (for example ActiveMQ's durable
13/// subscription headers) while keeping the library generic.
14#[derive(Debug, Clone, Default)]
15pub struct SubscriptionOptions {
16 /// Extra headers to include on the SUBSCRIBE frame.
17 pub headers: Vec<(String, String)>,
18
19 /// Optional named queue to subscribe to (convenience; typically you can
20 /// just put this in the `destination` argument). Kept for clarity.
21 pub durable_queue: Option<String>,
22}
23
24/// A lightweight handle returned from `Connection::subscribe` that packages the
25/// subscription id, destination, and the receiving side of the subscription.
26///
27/// The `Subscription` provides convenience helpers for acknowledging or
28/// negative-acknowledging messages; these delegate to the underlying
29/// `Connection` handle.
30pub struct Subscription {
31 id: String,
32 destination: String,
33 receiver: mpsc::Receiver<Frame>,
34 conn: Connection,
35}
36
37impl Subscription {
38 pub(crate) fn new(
39 id: String,
40 destination: String,
41 receiver: mpsc::Receiver<Frame>,
42 conn: Connection,
43 ) -> Self {
44 Self {
45 id,
46 destination,
47 receiver,
48 conn,
49 }
50 }
51
52 /// Returns the local subscription id.
53 pub fn id(&self) -> &str {
54 &self.id
55 }
56
57 /// Returns the destination this subscription listens to.
58 pub fn destination(&self) -> &str {
59 &self.destination
60 }
61
62 /// Consume the `Subscription` and return the underlying receiver so the
63 /// caller can drive message handling directly.
64 pub fn into_receiver(self) -> mpsc::Receiver<Frame> {
65 self.receiver
66 }
67
68 /// Acknowledge a message by its `message-id` header. Delegates to
69 /// `Connection::ack` using the local subscription id.
70 pub async fn ack(&self, message_id: &str) -> Result<(), ConnError> {
71 self.conn.ack(&self.id, message_id).await
72 }
73
74 /// Negative-acknowledge a message by its `message-id` header.
75 pub async fn nack(&self, message_id: &str) -> Result<(), ConnError> {
76 self.conn.nack(&self.id, message_id).await
77 }
78
79 /// Consume the subscription and unsubscribe from the server.
80 ///
81 /// This is a convenience that calls `Connection::unsubscribe` with the
82 /// local subscription id and drops the receiver.
83 pub async fn unsubscribe(self) -> Result<(), ConnError> {
84 self.conn.unsubscribe(&self.id).await
85 }
86}
87
88impl Stream for Subscription {
89 type Item = Frame;
90
91 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
92 // Safe to get a mutable reference because all fields of `Subscription`
93 // are `Unpin` (String, Receiver, Connection). We then delegate to the
94 // tokio mpsc receiver's `poll_recv` which returns `Poll<Option<T>>`.
95 let this = self.get_mut();
96 Pin::new(&mut this.receiver).poll_recv(cx)
97 }
98}