Skip to main content

hermes_client/
durable_subscriber.rs

1use hermes_core::{Event, Subject};
2use hermes_proto::{
3    Ack, DurableClientMessage, DurableSubscribeRequest, Nack, broker_client::BrokerClient,
4    durable_client_message::Msg as ClientMsg, durable_server_message::Msg as ServerMsg,
5};
6use tokio::sync::mpsc;
7use tokio_stream::wrappers::ReceiverStream;
8use tonic::transport::Channel;
9
10use crate::error::ClientError;
11
12/// A message received from a durable subscription.
13/// Must be explicitly acked or nacked.
14pub struct DurableMessage<E> {
15    pub event: E,
16    pub message_id: String,
17    pub attempt: u32,
18    ack_tx: mpsc::Sender<DurableClientMessage>,
19}
20
21impl<E> DurableMessage<E> {
22    /// Acknowledge this message. It won't be redelivered.
23    pub async fn ack(self) -> Result<(), ClientError> {
24        let msg = DurableClientMessage {
25            msg: Some(ClientMsg::Ack(Ack {
26                message_id: self.message_id,
27            })),
28        };
29        self.ack_tx
30            .send(msg)
31            .await
32            .map_err(|_| ClientError::ChannelClosed)
33    }
34
35    /// Negative acknowledge. If `requeue` is true, the message will be
36    /// redelivered immediately. If false, it goes to the dead-letter subject.
37    pub async fn nack(self, requeue: bool) -> Result<(), ClientError> {
38        let msg = DurableClientMessage {
39            msg: Some(ClientMsg::Nack(Nack {
40                message_id: self.message_id,
41                requeue,
42            })),
43        };
44        self.ack_tx
45            .send(msg)
46            .await
47            .map_err(|_| ClientError::ChannelClosed)
48    }
49}
50
51/// A durable subscriber that receives messages and sends ack/nack.
52pub struct DurableSubscriber<E> {
53    rx: mpsc::Receiver<Result<DurableMessage<E>, ClientError>>,
54}
55
56impl<E> DurableSubscriber<E> {
57    /// Receive the next message. Returns None if the stream is closed.
58    pub async fn next(&mut self) -> Option<Result<DurableMessage<E>, ClientError>> {
59        self.rx.recv().await
60    }
61}
62
63/// Create a durable subscriber for a typed Event.
64pub(crate) async fn subscribe_durable<E: Event>(
65    mut client: BrokerClient<Channel>,
66    consumer_name: &str,
67    subject: &Subject,
68    queue_groups: &[&str],
69    max_in_flight: u32,
70    ack_timeout_secs: u32,
71) -> Result<DurableSubscriber<E>, ClientError> {
72    let capacity = max_in_flight as usize;
73
74    // Single channel for all outgoing messages (subscribe + ack/nack).
75    let (outgoing_tx, outgoing_rx) = mpsc::channel::<DurableClientMessage>(capacity);
76
77    // Push subscribe request as the first message.
78    outgoing_tx
79        .send(DurableClientMessage {
80            msg: Some(ClientMsg::Subscribe(DurableSubscribeRequest {
81                subject: subject.to_json(),
82                consumer_name: consumer_name.to_string(),
83                queue_groups: queue_groups.iter().map(|s| s.to_string()).collect(),
84                max_in_flight,
85                ack_timeout_seconds: ack_timeout_secs,
86            })),
87        })
88        .await
89        .map_err(|_| ClientError::ChannelClosed)?;
90
91    // Open the bidi stream.
92    let response = client
93        .subscribe_durable(ReceiverStream::new(outgoing_rx))
94        .await?;
95    let mut server_stream = response.into_inner();
96
97    // Channel for decoded messages to the caller.
98    let (msg_tx, msg_rx) = mpsc::channel(capacity);
99
100    // The outgoing_tx is cloned into each DurableMessage for ack/nack.
101    let ack_sender = outgoing_tx;
102
103    tokio::spawn(async move {
104        while let Ok(Some(server_msg)) = server_stream.message().await {
105            let (envelope, attempt) = match server_msg.msg {
106                Some(ServerMsg::Envelope(env)) => (env, 1),
107                Some(ServerMsg::Redelivery(redel)) => match redel.envelope {
108                    Some(env) => (env, redel.attempt),
109                    None => continue,
110                },
111                None => continue,
112            };
113
114            let message_id = envelope.id.clone();
115            let decoded = match hermes_core::decode::<E>(&envelope.payload) {
116                Ok(e) => e,
117                Err(e) => {
118                    let _ = msg_tx.send(Err(ClientError::Decode(e))).await;
119                    continue;
120                }
121            };
122
123            let durable_msg = DurableMessage {
124                event: decoded,
125                message_id,
126                attempt,
127                ack_tx: ack_sender.clone(),
128            };
129
130            if msg_tx.send(Ok(durable_msg)).await.is_err() {
131                break;
132            }
133        }
134    });
135
136    Ok(DurableSubscriber { rx: msg_rx })
137}