Skip to main content

hermes_client/
durable_subscriber.rs

1use hermes_core::{Event, Subject};
2use hermes_proto::{
3    Ack, DurableClientMessage, DurableSubscribeRequest, Nack, broker_client::BrokerClient,
4    durable_client_message::Msg as ClientMsg, durable_server_message::Msg as ServerMsg,
5};
6use tokio::sync::mpsc;
7use tokio_stream::wrappers::ReceiverStream;
8use tonic::transport::Channel;
9use tracing::{debug, trace, warn};
10
11use crate::error::ClientError;
12
13/// A message received from a durable subscription.
14/// Must be explicitly acked or nacked.
15pub struct DurableMessage<E> {
16    pub event: E,
17    pub message_id: String,
18    pub attempt: u32,
19    ack_tx: mpsc::Sender<DurableClientMessage>,
20}
21
22impl<E> DurableMessage<E> {
23    /// Acknowledge this message. It won't be redelivered.
24    pub async fn ack(self) -> Result<(), ClientError> {
25        trace!(message_id = %self.message_id, "sending ack");
26        let msg = DurableClientMessage {
27            msg: Some(ClientMsg::Ack(Ack {
28                message_id: self.message_id,
29            })),
30        };
31        self.ack_tx
32            .send(msg)
33            .await
34            .map_err(|_| ClientError::ChannelClosed)
35    }
36
37    /// Negative acknowledge. If `requeue` is true, the message will be
38    /// redelivered immediately. If false, it goes to the dead-letter subject.
39    pub async fn nack(self, requeue: bool) -> Result<(), ClientError> {
40        trace!(message_id = %self.message_id, requeue, "sending nack");
41        let msg = DurableClientMessage {
42            msg: Some(ClientMsg::Nack(Nack {
43                message_id: self.message_id,
44                requeue,
45            })),
46        };
47        self.ack_tx
48            .send(msg)
49            .await
50            .map_err(|_| ClientError::ChannelClosed)
51    }
52}
53
54/// A durable subscriber that receives messages and sends ack/nack.
55pub struct DurableSubscriber<E> {
56    rx: mpsc::Receiver<Result<DurableMessage<E>, ClientError>>,
57}
58
59impl<E> DurableSubscriber<E> {
60    /// Receive the next message. Returns None if the stream is closed.
61    pub async fn next(&mut self) -> Option<Result<DurableMessage<E>, ClientError>> {
62        self.rx.recv().await
63    }
64}
65
66/// Create a durable subscriber for a typed Event.
67pub(crate) async fn subscribe_durable<E: Event>(
68    mut client: BrokerClient<Channel>,
69    consumer_name: &str,
70    subject: &Subject,
71    queue_groups: &[&str],
72    max_in_flight: u32,
73    ack_timeout_secs: u32,
74) -> Result<DurableSubscriber<E>, ClientError> {
75    let capacity = max_in_flight as usize;
76
77    // Single channel for all outgoing messages (subscribe + ack/nack).
78    let (outgoing_tx, outgoing_rx) = mpsc::channel::<DurableClientMessage>(capacity);
79
80    // Push subscribe request as the first message.
81    outgoing_tx
82        .send(DurableClientMessage {
83            msg: Some(ClientMsg::Subscribe(DurableSubscribeRequest {
84                subject: subject.to_bytes(),
85                consumer_name: consumer_name.to_string(),
86                queue_groups: queue_groups.iter().map(|s| s.to_string()).collect(),
87                max_in_flight,
88                ack_timeout_seconds: ack_timeout_secs,
89            })),
90        })
91        .await
92        .map_err(|_| ClientError::ChannelClosed)?;
93
94    // Open the bidi stream.
95    let response = client
96        .subscribe_durable(ReceiverStream::new(outgoing_rx))
97        .await?;
98    let mut server_stream = response.into_inner();
99
100    debug!(consumer_name, subject = %subject, max_in_flight, ack_timeout_secs, "durable subscription stream opened");
101
102    // Channel for decoded messages to the caller.
103    let (msg_tx, msg_rx) = mpsc::channel(capacity);
104
105    // The outgoing_tx is cloned into each DurableMessage for ack/nack.
106    let ack_sender = outgoing_tx;
107
108    tokio::spawn(async move {
109        while let Ok(Some(server_msg)) = server_stream.message().await {
110            let (envelope, attempt) = match server_msg.msg {
111                Some(ServerMsg::Envelope(env)) => (env, 1),
112                Some(ServerMsg::Redelivery(redel)) => match redel.envelope {
113                    Some(env) => (env, redel.attempt),
114                    None => continue,
115                },
116                None => continue,
117            };
118
119            let message_id = envelope.id.clone();
120            trace!(message_id = %message_id, attempt, "received durable message");
121            let decoded = match hermes_core::decode::<E>(&envelope.payload) {
122                Ok(e) => e,
123                Err(e) => {
124                    warn!(message_id = %message_id, "failed to decode durable message");
125                    let _ = msg_tx.send(Err(ClientError::Decode(e))).await;
126                    continue;
127                }
128            };
129
130            let durable_msg = DurableMessage {
131                event: decoded,
132                message_id,
133                attempt,
134                ack_tx: ack_sender.clone(),
135            };
136
137            if msg_tx.send(Ok(durable_msg)).await.is_err() {
138                break;
139            }
140        }
141        debug!("durable subscription stream closed");
142    });
143
144    Ok(DurableSubscriber { rx: msg_rx })
145}