1use std::sync::Arc;
2
3use hermes_core::Subject;
4use hermes_proto::{
5 DurableClientMessage, DurableServerMessage, EventEnvelope, PublishAck, PublishDurableAck,
6 SubscribeRequest, broker_server::Broker, durable_client_message::Msg as ClientMsg,
7};
8use tokio::select;
9use tokio_stream::wrappers::ReceiverStream;
10use tokio_util::sync::CancellationToken;
11use tonic::{Request, Response, Status, Streaming};
12use tracing::{debug, warn};
13
14use crate::broker::BrokerEngine;
15use crate::config::ServerConfig;
16use crate::subscription::SubscriptionReceiver;
17
18pub struct BrokerService {
19 engine: Arc<BrokerEngine>,
20 config: ServerConfig,
21}
22
23impl BrokerService {
24 pub fn new(engine: Arc<BrokerEngine>, config: ServerConfig) -> Self {
25 Self { engine, config }
26 }
27}
28
29#[tonic::async_trait]
30impl Broker for BrokerService {
31 async fn publish(
34 &self,
35 request: Request<Streaming<EventEnvelope>>,
36 ) -> Result<Response<PublishAck>, Status> {
37 let mut stream = request.into_inner();
38 let mut accepted: u64 = 0;
39
40 while let Some(envelope) = stream.message().await? {
41 if envelope.subject.is_empty() {
42 return Err(Status::invalid_argument("subject must not be empty"));
43 }
44 if envelope.id.is_empty() {
45 return Err(Status::invalid_argument("id must not be empty"));
46 }
47
48 let _delivered = self.engine.publish(&envelope);
49 accepted = accepted
50 .checked_add(1)
51 .ok_or_else(|| Status::resource_exhausted("too many messages in single stream"))?;
52 }
53
54 debug!(accepted, "publish stream completed");
55 Ok(Response::new(PublishAck { accepted }))
56 }
57
58 type SubscribeStream = ReceiverStream<Result<EventEnvelope, Status>>;
59
60 async fn subscribe(
61 &self,
62 request: Request<SubscribeRequest>,
63 ) -> Result<Response<Self::SubscribeStream>, Status> {
64 let req = request.into_inner();
65
66 if req.subject.is_empty() {
67 return Err(Status::invalid_argument("subject must not be empty"));
68 }
69
70 let subject = Subject::from_bytes(&req.subject)
71 .map_err(|e| Status::invalid_argument(format!("invalid subject: {e}")))?;
72
73 let (id, receiver) = self.engine.subscribe(subject.clone(), req.queue_groups);
74
75 let (tx_out, rx_out) = tokio::sync::mpsc::channel(self.config.grpc_output_buffer);
76 let engine = self.engine.clone();
77
78 tokio::spawn(async move {
79 match receiver {
80 SubscriptionReceiver::Fanout(mut rx) => loop {
81 select! {
82 result = rx.recv() => {
83 match result {
84 Ok(arc_env) => {
85 let env = Arc::unwrap_or_clone(arc_env);
86 if tx_out.send(Ok(env)).await.is_err() {
87 break;
88 }
89 }
90 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
91 warn!(subject = %subject, "subscriber lagged, missed {n} messages");
92 continue;
93 }
94 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
95 }
96 }
97 _ = tx_out.closed() => break,
98 }
99 },
100 SubscriptionReceiver::QueueGroup(mut rx) => loop {
101 select! {
102 msg = rx.recv() => {
103 match msg {
104 Some(arc_env) => {
105 let env = Arc::unwrap_or_clone(arc_env);
106 if tx_out.send(Ok(env)).await.is_err() {
107 break;
108 }
109 }
110 None => break,
111 }
112 }
113 _ = tx_out.closed() => break,
114 }
115 },
116 }
117 engine.unsubscribe(&subject, id);
118 debug!(subject = %subject, %id, "subscriber stream ended");
119 });
120
121 Ok(Response::new(ReceiverStream::new(rx_out)))
122 }
123
124 async fn publish_durable(
127 &self,
128 request: Request<Streaming<EventEnvelope>>,
129 ) -> Result<Response<PublishDurableAck>, Status> {
130 if self.engine.store().is_none() {
131 return Err(Status::failed_precondition(
132 "durable mode not enabled: configure HERMES_STORE_PATH",
133 ));
134 }
135
136 let mut stream = request.into_inner();
137 let mut accepted: u64 = 0;
138 let mut persisted: u64 = 0;
139
140 while let Some(envelope) = stream.message().await? {
141 if envelope.subject.is_empty() {
142 return Err(Status::invalid_argument("subject must not be empty"));
143 }
144 if envelope.id.is_empty() {
145 return Err(Status::invalid_argument("id must not be empty"));
146 }
147
148 match self.engine.publish_durable(&envelope) {
149 Ok(_delivered) => {
150 persisted = persisted.checked_add(1).ok_or_else(|| {
151 Status::resource_exhausted("too many messages in single stream")
152 })?;
153 }
154 Err(e) => {
155 warn!(id = envelope.id, "publish_durable failed: {e}");
156 return Err(Status::internal(format!("persist failed: {e}")));
157 }
158 }
159
160 accepted = accepted
161 .checked_add(1)
162 .ok_or_else(|| Status::resource_exhausted("too many messages in single stream"))?;
163 }
164
165 debug!(accepted, persisted, "publish_durable stream completed");
166 Ok(Response::new(PublishDurableAck {
167 accepted,
168 persisted,
169 }))
170 }
171
172 type SubscribeDurableStream = ReceiverStream<Result<DurableServerMessage, Status>>;
173
174 async fn subscribe_durable(
175 &self,
176 request: Request<Streaming<DurableClientMessage>>,
177 ) -> Result<Response<Self::SubscribeDurableStream>, Status> {
178 if self.engine.store().is_none() {
179 return Err(Status::failed_precondition(
180 "durable mode not enabled: configure HERMES_STORE_PATH",
181 ));
182 }
183
184 let mut client_stream = request.into_inner();
185
186 let first = client_stream
188 .message()
189 .await?
190 .ok_or_else(|| Status::invalid_argument("expected DurableSubscribeRequest"))?;
191
192 let sub_req = match first.msg {
193 Some(ClientMsg::Subscribe(req)) => req,
194 _ => {
195 return Err(Status::invalid_argument(
196 "first message must be DurableSubscribeRequest",
197 ));
198 }
199 };
200
201 if sub_req.subject.is_empty() {
202 return Err(Status::invalid_argument("subject must not be empty"));
203 }
204 if sub_req.consumer_name.is_empty() {
205 return Err(Status::invalid_argument("consumer_name must not be empty"));
206 }
207
208 let _subject = Subject::from_bytes(&sub_req.subject)
210 .map_err(|e| Status::invalid_argument(format!("invalid subject: {e}")))?;
211
212 let max_in_flight = if sub_req.max_in_flight == 0 {
213 self.config.default_max_in_flight
214 } else {
215 sub_req.max_in_flight
216 };
217
218 let ack_timeout = if sub_req.ack_timeout_seconds == 0 {
219 self.config.default_ack_timeout_secs
220 } else {
221 sub_req.ack_timeout_seconds
222 };
223
224 let consumer_name = sub_req.consumer_name.clone();
225 let (connection_id, rx) = self
226 .engine
227 .subscribe_durable(
228 consumer_name.clone(),
229 sub_req.subject.clone(),
230 sub_req.queue_groups,
231 max_in_flight,
232 ack_timeout,
233 )
234 .map_err(|e| Status::internal(format!("subscribe_durable failed: {e}")))?;
235
236 let (tx_out, rx_out) = tokio::sync::mpsc::channel(max_in_flight as usize);
237 let engine = self.engine.clone();
238 let consumer_name_for_ack = consumer_name.clone();
239
240 let cancel = CancellationToken::new();
242
243 let tx_out_clone = tx_out.clone();
245 let cancel_outbound = cancel.clone();
246 tokio::spawn(async move {
247 let mut rx = rx;
248 loop {
249 select! {
250 msg = rx.recv() => {
251 match msg {
252 Some(msg) => {
253 if tx_out_clone.send(Ok(msg)).await.is_err() {
254 break;
255 }
256 }
257 None => break,
258 }
259 }
260 () = cancel_outbound.cancelled() => break,
261 }
262 }
263 cancel_outbound.cancel();
264 });
265
266 let engine_ack = self.engine.clone();
268 let cancel_inbound = cancel.clone();
269 tokio::spawn(async move {
270 loop {
271 select! {
272 msg = client_stream.message() => {
273 match msg {
274 Ok(Some(msg)) => match msg.msg {
275 Some(ClientMsg::Ack(ack)) => {
276 if let Err(e) =
277 engine_ack.ack_message(&ack.message_id, &consumer_name_for_ack)
278 {
279 warn!(
280 message_id = ack.message_id,
281 consumer = consumer_name_for_ack,
282 "ack failed: {e}"
283 );
284 }
285 }
286 Some(ClientMsg::Nack(nack)) => {
287 if let Err(e) = engine_ack.nack_message(
288 &nack.message_id,
289 &consumer_name_for_ack,
290 nack.requeue,
291 ) {
292 warn!(
293 message_id = nack.message_id,
294 consumer = consumer_name_for_ack,
295 "nack failed: {e}"
296 );
297 }
298 }
299 Some(ClientMsg::Subscribe(_)) => {
300 warn!(
301 consumer = consumer_name_for_ack,
302 "unexpected subscribe message after initial"
303 );
304 }
305 None => {}
306 },
307 _ => break,
308 }
309 }
310 () = cancel_inbound.cancelled() => break,
311 }
312 }
313
314 cancel_inbound.cancel();
316 engine.unsubscribe_durable(&consumer_name, connection_id);
317 debug!(
318 consumer = consumer_name,
319 connection_id, "durable subscriber disconnected"
320 );
321 });
322
323 Ok(Response::new(ReceiverStream::new(rx_out)))
324 }
325}