hermes_client/
publisher.rs1use hermes_core::{Event, Subject};
2use hermes_proto::{EventEnvelope, PublishAck};
3use tokio::sync::mpsc;
4use tokio::task::JoinHandle;
5use tracing::{debug, trace};
6use uuid::Uuid;
7
8use crate::error::ClientError;
9
10pub struct BatchPublisher {
16 sender: mpsc::Sender<EventEnvelope>,
17 handle: JoinHandle<Result<PublishAck, ClientError>>,
18}
19
20impl BatchPublisher {
21 pub(crate) fn new(
22 sender: mpsc::Sender<EventEnvelope>,
23 handle: JoinHandle<Result<PublishAck, ClientError>>,
24 ) -> Self {
25 Self { sender, handle }
26 }
27
28 pub async fn send<E: Event>(&self, event: &E) -> Result<(), ClientError> {
30 let envelope = make_envelope(event)?;
31 trace!(id = %envelope.id, "batch: sending event");
32 self.sender
33 .send(envelope)
34 .await
35 .map_err(|_| ClientError::ChannelClosed)
36 }
37
38 pub async fn send_raw(&self, subject: &Subject, payload: Vec<u8>) -> Result<(), ClientError> {
40 let envelope = EventEnvelope {
41 id: Uuid::now_v7().to_string(),
42 subject: subject.to_bytes(),
43 payload,
44 headers: Default::default(),
45 timestamp_nanos: now_nanos(),
46 };
47 self.sender
48 .send(envelope)
49 .await
50 .map_err(|_| ClientError::ChannelClosed)
51 }
52
53 pub async fn flush(self) -> Result<PublishAck, ClientError> {
55 debug!("batch publisher: flushing stream");
56 drop(self.sender);
57 self.handle.await.unwrap_or(Err(ClientError::ChannelClosed))
58 }
59}
60
61fn now_nanos() -> i64 {
62 i64::try_from(
63 std::time::SystemTime::now()
64 .duration_since(std::time::UNIX_EPOCH)
65 .unwrap_or_default()
66 .as_nanos(),
67 )
68 .unwrap_or(i64::MAX)
69}
70
71pub(crate) fn make_envelope<E: Event>(event: &E) -> Result<EventEnvelope, ClientError> {
72 let payload = hermes_core::encode(event)?;
73 Ok(EventEnvelope {
74 id: Uuid::now_v7().to_string(),
75 subject: event.subject().to_bytes(),
76 payload,
77 headers: Default::default(),
78 timestamp_nanos: i64::try_from(
79 std::time::SystemTime::now()
80 .duration_since(std::time::UNIX_EPOCH)
81 .unwrap_or_default()
82 .as_nanos(),
83 )
84 .unwrap_or(i64::MAX),
85 })
86}