hermes-client
Asynchronous Rust client to publish and consume events through the Hermes gRPC broker.
Installation
Add the crate to your Cargo.toml:
[dependencies]
hermes-broker-client = "0.3"
hermes-broker-core = "0.3"
serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
futures = "0.3"
Quickstart — fire-and-forget pub/sub
Fanout rule: subscribers only receive messages published after they
connect. Always start the subscriber before publishing.
use futures::StreamExt;
use hermes_client::HermesClient;
use hermes_core::Event;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, Event)]
#[event(subject = "chat.message")]
struct ChatMessage {
user: String,
text: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = HermesClient::connect("http://127.0.0.1:4222").await?;
let sub_client = client.clone();
let handle = tokio::spawn(async move {
let mut stream = sub_client.subscribe::<ChatMessage>(&[]).await.unwrap();
while let Some(msg) = stream.next().await {
println!("received: {:?}", msg.unwrap());
}
});
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
for i in 0..5 {
client
.publish(&ChatMessage {
user: "alice".into(),
text: format!("Hello #{i}!"),
})
.await?;
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
handle.abort();
Ok(())
}
Queue groups — load-balanced consumption
When multiple subscribers join the same queue group, each message is
delivered to exactly one subscriber in the group (round-robin).
use futures::StreamExt;
use hermes_client::HermesClient;
use hermes_core::Event;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, Event)]
struct Task {
id: u32,
payload: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = HermesClient::connect("http://127.0.0.1:4222").await?;
for worker_id in 0..3 {
let c = client.clone();
tokio::spawn(async move {
let mut stream = c.subscribe::<Task>(&["workers"]).await.unwrap();
while let Some(Ok(task)) = stream.next().await {
println!("Worker {worker_id} got task #{}: {}", task.id, task.payload);
}
});
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
for i in 0..10 {
client
.publish(&Task { id: i, payload: format!("job-{i}") })
.await?;
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok(())
}
Custom subjects with publish_on / subscribe_on
When #[derive(Event)] auto-subjects are too static, use custom subjects
to route messages dynamically — for example, with an entity ID in the subject.
Building a Subject
Subject uses a fluent builder:
use hermes_core::Subject;
let subject = Subject::new().str("order").int(42).str("status");
let pattern = Subject::new().str("order").any().str("status");
let catch_all = Subject::new().str("order").rest();
let s = Subject::from("order.42.status");
Segment constructors
| Constructor |
Produces |
Display |
.str("x") |
Segment::Str("x") |
x |
.int(42) |
Segment::Int(42) |
42 |
.any() |
Segment::Any |
* |
.rest() |
Segment::Rest |
> |
.segment(seg) |
any Segment value |
— |
For standalone use: Segment::s("x"), Segment::i(42), Segment::any(), Segment::rest().
subscribe_on — exemples pratiques
use hermes_client::HermesClient;
use hermes_core::{Event, Subject};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, Event)]
struct JobLogs { line: String }
async fn examples(client: &HermesClient) -> Result<(), Box<dyn std::error::Error>> {
let job_id = "abc-123";
let mut stream = client.subscribe_on::<JobLogs>(
&Subject::new().str("job").str(job_id).str("logs"), &[],
).await?;
let mut stream = client.subscribe_on::<JobLogs>(
&Subject::new().str("job").any().str("logs"), &[],
).await?;
let prefix = "build";
let mut stream = client.subscribe_on::<JobLogs>(
&Subject::new().str("job").str(prefix).rest(), &[],
).await?;
Ok(())
}
Event subjects with derive(Event)
By default, #[derive(Event)] generates a subject from module_path + type_name.
Override it explicitly with #[event(subject = "...")]:
use hermes_core::Event;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, Event)]
#[event(subject = "orders.created")]
struct OrderCreated {
order_id: String,
}
Event groups — subscribe to multiple event types
event_group! merges several event types into a single stream, automatically
dispatched by subject:
use futures::StreamExt;
use hermes_client::HermesClient;
use hermes_core::{event_group, Event};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, Event)]
struct OrderCreated { order_id: String }
#[derive(Debug, Clone, Serialize, Deserialize, Event)]
struct OrderShipped { order_id: String, tracking_id: String }
event_group!(OrderEvents = [OrderCreated, OrderShipped]);
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = HermesClient::connect("http://127.0.0.1:4222").await?;
let sub = client.clone();
tokio::spawn(async move {
let mut stream = sub.subscribe_group::<OrderEvents>(&[]).await.unwrap();
while let Some(Ok(event)) = stream.next().await {
match event {
OrderEvents::OrderCreated(e) => println!("created: {}", e.order_id),
OrderEvents::OrderShipped(e) => println!("shipped: {}", e.tracking_id),
}
}
});
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
client.publish(&OrderCreated { order_id: "ORD-1".into() }).await?;
client.publish(&OrderShipped { order_id: "ORD-1".into(), tracking_id: "TRK-42".into() }).await?;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
Ok(())
}
BatchPublisher — high-throughput publishing
BatchPublisher keeps a single gRPC client-stream open. All messages flow
through the same HTTP/2 stream, avoiding per-message round-trip overhead.
use hermes_client::HermesClient;
use hermes_core::{Event, Subject};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, Event)]
struct Metric { name: String, value: f64 }
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = HermesClient::connect("http://127.0.0.1:4222").await?;
let batch = client.batch_publisher();
for i in 0..10_000 {
batch.send(&Metric { name: format!("cpu.{i}"), value: 0.42 }).await?;
}
let subject = Subject::new().str("metrics").str("raw");
batch.send_raw(&subject, b"raw payload".to_vec()).await?;
let ack = batch.flush().await?;
println!("server accepted {} messages", ack.accepted);
Ok(())
}
Durable subscriptions — at-least-once delivery
Durable mode persists messages server-side. Consumers ack or nack
each message; un-acked messages are automatically redelivered.
Requires the server to be configured with a store path (HERMES_STORE_PATH).
use hermes_client::HermesClient;
use hermes_core::Event;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, Event)]
struct Payment { id: String, amount: f64 }
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = HermesClient::connect("http://127.0.0.1:4222").await?;
client.publish_durable(&Payment { id: "PAY-1".into(), amount: 99.99 }).await?;
let mut sub = client
.subscribe_durable::<Payment>("payment-processor", &[], 10, 30)
.await?;
while let Some(msg) = sub.next().await {
let msg = msg?;
println!("processing payment: {:?}", msg.event);
msg.ack().await?; }
Ok(())
}
| Parameter |
Description |
consumer_name |
Unique name for this consumer. Reusing the name resumes from the last acked offset. |
queue_groups |
Empty for fanout, or group names for load-balanced delivery. |
max_in_flight |
Maximum un-acked messages before the broker pauses delivery. |
ack_timeout_secs |
Seconds before un-acked messages are redelivered. |
API reference
| Method |
Description |
HermesClient::connect(uri) |
Connect to a Hermes broker |
publish(&event) |
Fire-and-forget publish (auto subject) |
publish_on(&event, &subject) |
Fire-and-forget publish on a custom subject |
publish_durable(&event) |
Durable publish (persisted before ack) |
subscribe::<E>(queue_groups) |
Typed subscription (auto subject) |
subscribe_on::<E>(&subject, queue_groups) |
Typed subscription on a custom subject (supports wildcards) |
subscribe_group::<G>(queue_groups) |
Event-group subscription (multiple types) |
subscribe_durable::<E>(name, groups, max, timeout) |
Durable subscription with ack/nack |
batch_publisher() |
High-throughput buffered publisher |
publish_raw(&subject, payload) |
Untyped raw publish |
subscribe_raw(&subject, groups) |
Untyped raw subscription |
Notes
- Address must include a scheme, e.g.
http://127.0.0.1:4222.
- Durable features require the server to be configured with a store path.
- See docs.rs for complete API documentation.