hermes_client/
durable_subscriber.rs1use hermes_core::{Event, Subject};
2use hermes_proto::{
3 Ack, DurableClientMessage, DurableSubscribeRequest, Nack, broker_client::BrokerClient,
4 durable_client_message::Msg as ClientMsg, durable_server_message::Msg as ServerMsg,
5};
6use tokio::sync::mpsc;
7use tokio_stream::wrappers::ReceiverStream;
8use tonic::transport::Channel;
9use tracing::{debug, trace, warn};
10
11use crate::error::ClientError;
12
13pub struct DurableMessage<E> {
16 pub event: E,
17 pub message_id: String,
18 pub attempt: u32,
19 ack_tx: mpsc::Sender<DurableClientMessage>,
20}
21
22impl<E> DurableMessage<E> {
23 pub async fn ack(self) -> Result<(), ClientError> {
25 trace!(message_id = %self.message_id, "sending ack");
26 let msg = DurableClientMessage {
27 msg: Some(ClientMsg::Ack(Ack {
28 message_id: self.message_id,
29 })),
30 };
31 self.ack_tx
32 .send(msg)
33 .await
34 .map_err(|_| ClientError::ChannelClosed)
35 }
36
37 pub async fn nack(self, requeue: bool) -> Result<(), ClientError> {
40 trace!(message_id = %self.message_id, requeue, "sending nack");
41 let msg = DurableClientMessage {
42 msg: Some(ClientMsg::Nack(Nack {
43 message_id: self.message_id,
44 requeue,
45 })),
46 };
47 self.ack_tx
48 .send(msg)
49 .await
50 .map_err(|_| ClientError::ChannelClosed)
51 }
52}
53
54pub struct DurableSubscriber<E> {
56 rx: mpsc::Receiver<Result<DurableMessage<E>, ClientError>>,
57}
58
59impl<E> DurableSubscriber<E> {
60 pub async fn next(&mut self) -> Option<Result<DurableMessage<E>, ClientError>> {
62 self.rx.recv().await
63 }
64}
65
66pub(crate) async fn subscribe_durable<E: Event>(
68 mut client: BrokerClient<Channel>,
69 consumer_name: &str,
70 subject: &Subject,
71 queue_groups: &[&str],
72 max_in_flight: u32,
73 ack_timeout_secs: u32,
74) -> Result<DurableSubscriber<E>, ClientError> {
75 let capacity = max_in_flight as usize;
76
77 let (outgoing_tx, outgoing_rx) = mpsc::channel::<DurableClientMessage>(capacity);
79
80 outgoing_tx
82 .send(DurableClientMessage {
83 msg: Some(ClientMsg::Subscribe(DurableSubscribeRequest {
84 subject: subject.to_bytes(),
85 consumer_name: consumer_name.to_string(),
86 queue_groups: queue_groups.iter().map(|s| s.to_string()).collect(),
87 max_in_flight,
88 ack_timeout_seconds: ack_timeout_secs,
89 })),
90 })
91 .await
92 .map_err(|_| ClientError::ChannelClosed)?;
93
94 let response = client
96 .subscribe_durable(ReceiverStream::new(outgoing_rx))
97 .await?;
98 let mut server_stream = response.into_inner();
99
100 debug!(consumer_name, subject = %subject, max_in_flight, ack_timeout_secs, "durable subscription stream opened");
101
102 let (msg_tx, msg_rx) = mpsc::channel(capacity);
104
105 let ack_sender = outgoing_tx;
107
108 tokio::spawn(async move {
109 while let Ok(Some(server_msg)) = server_stream.message().await {
110 let (envelope, attempt) = match server_msg.msg {
111 Some(ServerMsg::Envelope(env)) => (env, 1),
112 Some(ServerMsg::Redelivery(redel)) => match redel.envelope {
113 Some(env) => (env, redel.attempt),
114 None => continue,
115 },
116 None => continue,
117 };
118
119 let message_id = envelope.id.clone();
120 trace!(message_id = %message_id, attempt, "received durable message");
121 let decoded = match hermes_core::decode::<E>(&envelope.payload) {
122 Ok(e) => e,
123 Err(e) => {
124 warn!(message_id = %message_id, "failed to decode durable message");
125 let _ = msg_tx.send(Err(ClientError::Decode(e))).await;
126 continue;
127 }
128 };
129
130 let durable_msg = DurableMessage {
131 event: decoded,
132 message_id,
133 attempt,
134 ack_tx: ack_sender.clone(),
135 };
136
137 if msg_tx.send(Ok(durable_msg)).await.is_err() {
138 break;
139 }
140 }
141 debug!("durable subscription stream closed");
142 });
143
144 Ok(DurableSubscriber { rx: msg_rx })
145}