hermes_client/
durable_subscriber.rs1use hermes_core::{Event, Subject};
2use hermes_proto::{
3 Ack, DurableClientMessage, DurableSubscribeRequest, Nack, broker_client::BrokerClient,
4 durable_client_message::Msg as ClientMsg, durable_server_message::Msg as ServerMsg,
5};
6use tokio::sync::mpsc;
7use tokio_stream::wrappers::ReceiverStream;
8use tonic::transport::Channel;
9
10use crate::error::ClientError;
11
12pub struct DurableMessage<E> {
15 pub event: E,
16 pub message_id: String,
17 pub attempt: u32,
18 ack_tx: mpsc::Sender<DurableClientMessage>,
19}
20
21impl<E> DurableMessage<E> {
22 pub async fn ack(self) -> Result<(), ClientError> {
24 let msg = DurableClientMessage {
25 msg: Some(ClientMsg::Ack(Ack {
26 message_id: self.message_id,
27 })),
28 };
29 self.ack_tx
30 .send(msg)
31 .await
32 .map_err(|_| ClientError::ChannelClosed)
33 }
34
35 pub async fn nack(self, requeue: bool) -> Result<(), ClientError> {
38 let msg = DurableClientMessage {
39 msg: Some(ClientMsg::Nack(Nack {
40 message_id: self.message_id,
41 requeue,
42 })),
43 };
44 self.ack_tx
45 .send(msg)
46 .await
47 .map_err(|_| ClientError::ChannelClosed)
48 }
49}
50
51pub struct DurableSubscriber<E> {
53 rx: mpsc::Receiver<Result<DurableMessage<E>, ClientError>>,
54}
55
56impl<E> DurableSubscriber<E> {
57 pub async fn next(&mut self) -> Option<Result<DurableMessage<E>, ClientError>> {
59 self.rx.recv().await
60 }
61}
62
63pub(crate) async fn subscribe_durable<E: Event>(
65 mut client: BrokerClient<Channel>,
66 consumer_name: &str,
67 subject: &Subject,
68 queue_groups: &[&str],
69 max_in_flight: u32,
70 ack_timeout_secs: u32,
71) -> Result<DurableSubscriber<E>, ClientError> {
72 let capacity = max_in_flight as usize;
73
74 let (outgoing_tx, outgoing_rx) = mpsc::channel::<DurableClientMessage>(capacity);
76
77 outgoing_tx
79 .send(DurableClientMessage {
80 msg: Some(ClientMsg::Subscribe(DurableSubscribeRequest {
81 subject: subject.to_json(),
82 consumer_name: consumer_name.to_string(),
83 queue_groups: queue_groups.iter().map(|s| s.to_string()).collect(),
84 max_in_flight,
85 ack_timeout_seconds: ack_timeout_secs,
86 })),
87 })
88 .await
89 .map_err(|_| ClientError::ChannelClosed)?;
90
91 let response = client
93 .subscribe_durable(ReceiverStream::new(outgoing_rx))
94 .await?;
95 let mut server_stream = response.into_inner();
96
97 let (msg_tx, msg_rx) = mpsc::channel(capacity);
99
100 let ack_sender = outgoing_tx;
102
103 tokio::spawn(async move {
104 while let Ok(Some(server_msg)) = server_stream.message().await {
105 let (envelope, attempt) = match server_msg.msg {
106 Some(ServerMsg::Envelope(env)) => (env, 1),
107 Some(ServerMsg::Redelivery(redel)) => match redel.envelope {
108 Some(env) => (env, redel.attempt),
109 None => continue,
110 },
111 None => continue,
112 };
113
114 let message_id = envelope.id.clone();
115 let decoded = match hermes_core::decode::<E>(&envelope.payload) {
116 Ok(e) => e,
117 Err(e) => {
118 let _ = msg_tx.send(Err(ClientError::Decode(e))).await;
119 continue;
120 }
121 };
122
123 let durable_msg = DurableMessage {
124 event: decoded,
125 message_id,
126 attempt,
127 ack_tx: ack_sender.clone(),
128 };
129
130 if msg_tx.send(Ok(durable_msg)).await.is_err() {
131 break;
132 }
133 }
134 });
135
136 Ok(DurableSubscriber { rx: msg_rx })
137}