Skip to main content

hermes_client/
publisher.rs

1use hermes_core::{Event, Subject};
2use hermes_proto::{EventEnvelope, PublishAck};
3use tokio::sync::mpsc;
4use tokio::task::JoinHandle;
5use tracing::{debug, trace};
6use uuid::Uuid;
7
8use crate::error::ClientError;
9
10/// High-throughput publisher that keeps a single gRPC client-stream open.
11///
12/// All messages sent via [`send`](BatchPublisher::send) or
13/// [`send_raw`](BatchPublisher::send_raw) flow through the same HTTP/2 stream,
14/// avoiding the per-message round-trip overhead of individual publish calls.
15pub struct BatchPublisher {
16    sender: mpsc::Sender<EventEnvelope>,
17    handle: JoinHandle<Result<PublishAck, ClientError>>,
18}
19
20impl BatchPublisher {
21    pub(crate) fn new(
22        sender: mpsc::Sender<EventEnvelope>,
23        handle: JoinHandle<Result<PublishAck, ClientError>>,
24    ) -> Self {
25        Self { sender, handle }
26    }
27
28    /// Send a typed event through the batch stream.
29    pub async fn send<E: Event>(&self, event: &E) -> Result<(), ClientError> {
30        let envelope = make_envelope(event)?;
31        trace!(id = %envelope.id, "batch: sending event");
32        self.sender
33            .send(envelope)
34            .await
35            .map_err(|_| ClientError::ChannelClosed)
36    }
37
38    /// Send a raw envelope (explicit subject + payload) through the batch stream.
39    pub async fn send_raw(&self, subject: &Subject, payload: Vec<u8>) -> Result<(), ClientError> {
40        let envelope = EventEnvelope {
41            id: Uuid::now_v7().to_string(),
42            subject: subject.to_bytes(),
43            payload,
44            headers: Default::default(),
45            timestamp_nanos: now_nanos(),
46        };
47        self.sender
48            .send(envelope)
49            .await
50            .map_err(|_| ClientError::ChannelClosed)
51    }
52
53    /// Close the stream and wait for the server ack.
54    pub async fn flush(self) -> Result<PublishAck, ClientError> {
55        debug!("batch publisher: flushing stream");
56        drop(self.sender);
57        self.handle.await.unwrap_or(Err(ClientError::ChannelClosed))
58    }
59}
60
61fn now_nanos() -> i64 {
62    i64::try_from(
63        std::time::SystemTime::now()
64            .duration_since(std::time::UNIX_EPOCH)
65            .unwrap_or_default()
66            .as_nanos(),
67    )
68    .unwrap_or(i64::MAX)
69}
70
71pub(crate) fn make_envelope<E: Event>(event: &E) -> Result<EventEnvelope, ClientError> {
72    let payload = hermes_core::encode(event)?;
73    Ok(EventEnvelope {
74        id: Uuid::now_v7().to_string(),
75        subject: event.subject().to_bytes(),
76        payload,
77        headers: Default::default(),
78        timestamp_nanos: i64::try_from(
79            std::time::SystemTime::now()
80                .duration_since(std::time::UNIX_EPOCH)
81                .unwrap_or_default()
82                .as_nanos(),
83        )
84        .unwrap_or(i64::MAX),
85    })
86}