Skip to main content

hermes_server/
grpc.rs

1use std::sync::Arc;
2
3use hermes_core::Subject;
4use hermes_proto::{
5    DurableClientMessage, DurableServerMessage, EventEnvelope, PublishAck, PublishDurableAck,
6    SubscribeRequest, broker_server::Broker, durable_client_message::Msg as ClientMsg,
7};
8use tokio::select;
9use tokio_stream::wrappers::ReceiverStream;
10use tokio_util::sync::CancellationToken;
11use tonic::{Request, Response, Status, Streaming};
12use tracing::{debug, warn};
13
14use crate::broker::BrokerEngine;
15use crate::config::ServerConfig;
16use crate::subscription::SubscriptionReceiver;
17
18pub struct BrokerService {
19    engine: Arc<BrokerEngine>,
20    config: ServerConfig,
21}
22
23impl BrokerService {
24    pub fn new(engine: Arc<BrokerEngine>, config: ServerConfig) -> Self {
25        Self { engine, config }
26    }
27}
28
29#[tonic::async_trait]
30impl Broker for BrokerService {
31    // --- Fire-and-forget ---
32
33    async fn publish(
34        &self,
35        request: Request<Streaming<EventEnvelope>>,
36    ) -> Result<Response<PublishAck>, Status> {
37        let mut stream = request.into_inner();
38        let mut accepted: u64 = 0;
39
40        while let Some(envelope) = stream.message().await? {
41            if envelope.subject.is_empty() {
42                return Err(Status::invalid_argument("subject must not be empty"));
43            }
44            if envelope.id.is_empty() {
45                return Err(Status::invalid_argument("id must not be empty"));
46            }
47
48            let _delivered = self.engine.publish(&envelope);
49            accepted = accepted
50                .checked_add(1)
51                .ok_or_else(|| Status::resource_exhausted("too many messages in single stream"))?;
52        }
53
54        debug!(accepted, "publish stream completed");
55        Ok(Response::new(PublishAck { accepted }))
56    }
57
58    type SubscribeStream = ReceiverStream<Result<EventEnvelope, Status>>;
59
60    async fn subscribe(
61        &self,
62        request: Request<SubscribeRequest>,
63    ) -> Result<Response<Self::SubscribeStream>, Status> {
64        let req = request.into_inner();
65
66        if req.subject.is_empty() {
67            return Err(Status::invalid_argument("subject must not be empty"));
68        }
69
70        let subject = Subject::from_bytes(&req.subject)
71            .map_err(|e| Status::invalid_argument(format!("invalid subject: {e}")))?;
72
73        let (id, receiver) = self.engine.subscribe(subject.clone(), req.queue_groups);
74
75        let (tx_out, rx_out) = tokio::sync::mpsc::channel(self.config.grpc_output_buffer);
76        let engine = self.engine.clone();
77
78        tokio::spawn(async move {
79            match receiver {
80                SubscriptionReceiver::Fanout(mut rx) => loop {
81                    select! {
82                        result = rx.recv() => {
83                            match result {
84                                Ok(arc_env) => {
85                                    let env = Arc::unwrap_or_clone(arc_env);
86                                    if tx_out.send(Ok(env)).await.is_err() {
87                                        break;
88                                    }
89                                }
90                                Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
91                                    warn!(subject = %subject, "subscriber lagged, missed {n} messages");
92                                    continue;
93                                }
94                                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
95                            }
96                        }
97                        _ = tx_out.closed() => break,
98                    }
99                },
100                SubscriptionReceiver::QueueGroup(mut rx) => loop {
101                    select! {
102                        msg = rx.recv() => {
103                            match msg {
104                                Some(arc_env) => {
105                                    let env = Arc::unwrap_or_clone(arc_env);
106                                    if tx_out.send(Ok(env)).await.is_err() {
107                                        break;
108                                    }
109                                }
110                                None => break,
111                            }
112                        }
113                        _ = tx_out.closed() => break,
114                    }
115                },
116            }
117            engine.unsubscribe(&subject, id);
118            debug!(subject = %subject, %id, "subscriber stream ended");
119        });
120
121        Ok(Response::new(ReceiverStream::new(rx_out)))
122    }
123
124    // --- Durable ---
125
126    async fn publish_durable(
127        &self,
128        request: Request<Streaming<EventEnvelope>>,
129    ) -> Result<Response<PublishDurableAck>, Status> {
130        if self.engine.store().is_none() {
131            return Err(Status::failed_precondition(
132                "durable mode not enabled: configure HERMES_STORE_PATH",
133            ));
134        }
135
136        let mut stream = request.into_inner();
137        let mut accepted: u64 = 0;
138        let mut persisted: u64 = 0;
139
140        while let Some(envelope) = stream.message().await? {
141            if envelope.subject.is_empty() {
142                return Err(Status::invalid_argument("subject must not be empty"));
143            }
144            if envelope.id.is_empty() {
145                return Err(Status::invalid_argument("id must not be empty"));
146            }
147
148            match self.engine.publish_durable(&envelope) {
149                Ok(_delivered) => {
150                    persisted = persisted.checked_add(1).ok_or_else(|| {
151                        Status::resource_exhausted("too many messages in single stream")
152                    })?;
153                }
154                Err(e) => {
155                    warn!(id = envelope.id, "publish_durable failed: {e}");
156                    return Err(Status::internal(format!("persist failed: {e}")));
157                }
158            }
159
160            accepted = accepted
161                .checked_add(1)
162                .ok_or_else(|| Status::resource_exhausted("too many messages in single stream"))?;
163        }
164
165        debug!(accepted, persisted, "publish_durable stream completed");
166        Ok(Response::new(PublishDurableAck {
167            accepted,
168            persisted,
169        }))
170    }
171
172    type SubscribeDurableStream = ReceiverStream<Result<DurableServerMessage, Status>>;
173
174    async fn subscribe_durable(
175        &self,
176        request: Request<Streaming<DurableClientMessage>>,
177    ) -> Result<Response<Self::SubscribeDurableStream>, Status> {
178        if self.engine.store().is_none() {
179            return Err(Status::failed_precondition(
180                "durable mode not enabled: configure HERMES_STORE_PATH",
181            ));
182        }
183
184        let mut client_stream = request.into_inner();
185
186        // First message must be a DurableSubscribeRequest.
187        let first = client_stream
188            .message()
189            .await?
190            .ok_or_else(|| Status::invalid_argument("expected DurableSubscribeRequest"))?;
191
192        let sub_req = match first.msg {
193            Some(ClientMsg::Subscribe(req)) => req,
194            _ => {
195                return Err(Status::invalid_argument(
196                    "first message must be DurableSubscribeRequest",
197                ));
198            }
199        };
200
201        if sub_req.subject.is_empty() {
202            return Err(Status::invalid_argument("subject must not be empty"));
203        }
204        if sub_req.consumer_name.is_empty() {
205            return Err(Status::invalid_argument("consumer_name must not be empty"));
206        }
207
208        // Validate subject is well-formed
209        let _subject = Subject::from_bytes(&sub_req.subject)
210            .map_err(|e| Status::invalid_argument(format!("invalid subject: {e}")))?;
211
212        let max_in_flight = if sub_req.max_in_flight == 0 {
213            self.config.default_max_in_flight
214        } else {
215            sub_req.max_in_flight
216        };
217
218        let ack_timeout = if sub_req.ack_timeout_seconds == 0 {
219            self.config.default_ack_timeout_secs
220        } else {
221            sub_req.ack_timeout_seconds
222        };
223
224        let consumer_name = sub_req.consumer_name.clone();
225        let (connection_id, rx) = self
226            .engine
227            .subscribe_durable(
228                consumer_name.clone(),
229                sub_req.subject.clone(),
230                sub_req.queue_groups,
231                max_in_flight,
232                ack_timeout,
233            )
234            .map_err(|e| Status::internal(format!("subscribe_durable failed: {e}")))?;
235
236        let (tx_out, rx_out) = tokio::sync::mpsc::channel(max_in_flight as usize);
237        let engine = self.engine.clone();
238        let consumer_name_for_ack = consumer_name.clone();
239
240        // Shared token to cancel both tasks when either side terminates.
241        let cancel = CancellationToken::new();
242
243        // Task: pipe messages from engine to gRPC output stream.
244        let tx_out_clone = tx_out.clone();
245        let cancel_outbound = cancel.clone();
246        tokio::spawn(async move {
247            let mut rx = rx;
248            loop {
249                select! {
250                    msg = rx.recv() => {
251                        match msg {
252                            Some(msg) => {
253                                if tx_out_clone.send(Ok(msg)).await.is_err() {
254                                    break;
255                                }
256                            }
257                            None => break,
258                        }
259                    }
260                    () = cancel_outbound.cancelled() => break,
261                }
262            }
263            cancel_outbound.cancel();
264        });
265
266        // Task: read ack/nack from client stream.
267        let engine_ack = self.engine.clone();
268        let cancel_inbound = cancel.clone();
269        tokio::spawn(async move {
270            loop {
271                select! {
272                    msg = client_stream.message() => {
273                        match msg {
274                            Ok(Some(msg)) => match msg.msg {
275                                Some(ClientMsg::Ack(ack)) => {
276                                    if let Err(e) =
277                                        engine_ack.ack_message(&ack.message_id, &consumer_name_for_ack)
278                                    {
279                                        warn!(
280                                            message_id = ack.message_id,
281                                            consumer = consumer_name_for_ack,
282                                            "ack failed: {e}"
283                                        );
284                                    }
285                                }
286                                Some(ClientMsg::Nack(nack)) => {
287                                    if let Err(e) = engine_ack.nack_message(
288                                        &nack.message_id,
289                                        &consumer_name_for_ack,
290                                        nack.requeue,
291                                    ) {
292                                        warn!(
293                                            message_id = nack.message_id,
294                                            consumer = consumer_name_for_ack,
295                                            "nack failed: {e}"
296                                        );
297                                    }
298                                }
299                                Some(ClientMsg::Subscribe(_)) => {
300                                    warn!(
301                                        consumer = consumer_name_for_ack,
302                                        "unexpected subscribe message after initial"
303                                    );
304                                }
305                                None => {}
306                            },
307                            _ => break,
308                        }
309                    }
310                    () = cancel_inbound.cancelled() => break,
311                }
312            }
313
314            // Client disconnected — messages in-flight stay "delivered" and will be redelivered.
315            cancel_inbound.cancel();
316            engine.unsubscribe_durable(&consumer_name, connection_id);
317            debug!(
318                consumer = consumer_name,
319                connection_id, "durable subscriber disconnected"
320            );
321        });
322
323        Ok(Response::new(ReceiverStream::new(rx_out)))
324    }
325}