hermes_broker_client/lib.rs
1//! Client library for the Hermes message broker.
2//!
3//! Provides [`Publisher`] for fire-and-forget message publishing and [`Subscriber`]
4//! for receiving messages over a gRPC stream.
5//!
6//! # Quick start
7//!
8//! ```rust,no_run
9//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
10//! use hermes_broker_client::{connect, Publisher, Subscriber};
11//!
12//! // Plaintext (dev / testing)
13//! let channel = connect("http://[::1]:50051", None).await?;
14//!
15//! // mTLS (production)
16//! // let tls = hermes_broker_client::TlsConfig {
17//! // ca_cert: std::fs::read("ca.pem")?,
18//! // client_cert: std::fs::read("client.pem")?,
19//! // client_key: std::fs::read("client-key.pem")?,
20//! // };
21//! // let channel = connect("https://[::1]:50051", Some(tls)).await?;
22//!
23//! // Publish
24//! let publisher = Publisher::new(channel.clone());
25//! publisher.publish("orders.eu.created", &b"hello"[..]).await?;
26//!
27//! // Subscribe
28//! let mut subscriber = Subscriber::new(channel).await?;
29//! subscriber.subscribe("orders.>", None).await?;
30//! while let Some(msg) = subscriber.recv().await {
31//! println!("{}: {:?}", msg.subject, msg.payload);
32//! }
33//! # Ok(())
34//! # }
35//! ```
36
37mod publisher;
38mod subscriber;
39
40pub use publisher::Publisher;
41pub use subscriber::Subscriber;
42
43use hermes_proto::broker_client::BrokerClient;
44use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity};
45use tracing::{debug, info};
46
47/// TLS configuration for mTLS connections.
48pub struct TlsConfig {
49 /// PEM-encoded CA certificate (to verify the server).
50 pub ca_cert: Vec<u8>,
51 /// PEM-encoded client certificate.
52 pub client_cert: Vec<u8>,
53 /// PEM-encoded client private key.
54 pub client_key: Vec<u8>,
55}
56
57/// Connect to a hermes broker and return a channel for creating publishers/subscribers.
58///
59/// Pass `None` for plaintext (dev/test) or `Some(TlsConfig)` for mTLS.
60pub async fn connect(
61 addr: &str,
62 tls: Option<TlsConfig>,
63) -> Result<Channel, tonic::transport::Error> {
64 debug!(addr, tls = tls.is_some(), "connecting to hermes broker");
65 let mut endpoint = Channel::from_shared(addr.to_string())
66 .expect("invalid address")
67 .connect_timeout(std::time::Duration::from_secs(5))
68 .timeout(std::time::Duration::from_secs(10))
69 .keep_alive_timeout(std::time::Duration::from_secs(5))
70 .http2_keep_alive_interval(std::time::Duration::from_secs(10));
71
72 if let Some(tls) = tls {
73 let tls_config = ClientTlsConfig::new()
74 .ca_certificate(Certificate::from_pem(tls.ca_cert))
75 .identity(Identity::from_pem(tls.client_cert, tls.client_key))
76 .domain_name("localhost");
77 endpoint = endpoint.tls_config(tls_config)?;
78 }
79
80 let channel = endpoint.connect().await?;
81 info!(addr, "connected to hermes broker");
82 Ok(channel)
83}
84
85/// Create a raw gRPC client (useful for advanced usage).
86pub fn raw_client(channel: Channel) -> BrokerClient<Channel> {
87 BrokerClient::new(channel)
88}