Skip to main content

hermes_server/
lib.rs

1//! `hermes-server` is the server-side crate for the Hermes gRPC event broker.
2//!
3//! It provides:
4//! - a broker engine with fanout and queue-group delivery
5//! - a gRPC service implementation (via `tonic`)
6//! - optional durable delivery with redelivery + garbage-collection loops
7//!
8//! # Quick start
9//!
10//! Run a server from a bound `TcpListener`:
11//!
12//! ```no_run
13//! use tokio::net::TcpListener;
14//!
15//! #[tokio::main]
16//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
17//!     let listener = TcpListener::bind("127.0.0.1:4222").await?;
18//!     hermes_server::run(listener).await?;
19//!     Ok(())
20//! }
21//! ```
22//!
23//! Run with explicit configuration:
24//!
25//! ```no_run
26//! use tokio::net::TcpListener;
27//! use hermes_server::config::ServerConfig;
28//!
29//! #[tokio::main]
30//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
31//!     let listener = TcpListener::bind("127.0.0.1:4222").await?;
32//!     let mut cfg = ServerConfig::default();
33//!     cfg.subscriber_channel_capacity = 16_384;
34//!     cfg.store_path = Some("hermes.redb".into());
35//!     hermes_server::run_with_config(listener, cfg).await?;
36//!     Ok(())
37//! }
38//! ```
39//!
40//! # Configuration
41//!
42//! The server can be configured programmatically with [`config::ServerConfig`] and can
43//! also be initialized from environment variables through `ServerConfig::from_env()`.
44//!
45//! Useful env vars include:
46//! - `HERMES_LISTEN_ADDR`
47//! - `HERMES_CHANNEL_CAPACITY`
48//! - `HERMES_GRPC_OUTPUT_BUFFER`
49//! - `HERMES_STORE_PATH`
50//! - `HERMES_REDELIVERY_INTERVAL`
51//! - `HERMES_MAX_DELIVERY_ATTEMPTS`
52//! - `HERMES_RETENTION_SECS`
53//! - `HERMES_ACK_TIMEOUT`
54//! - `HERMES_MAX_IN_FLIGHT`
55//! - `HERMES_GC_INTERVAL`
56//! - `HERMES_REDELIVERY_BATCH_SIZE`
57//!
58//! # Notes
59//!
60//! - If `store_path` is `None`, the server runs in fire-and-forget mode (no durable
61//!   redelivery persistence).
62//! - If `store_path` is set, durable store-backed redelivery and GC loops are spawned.
63
64pub mod broker;
65pub mod config;
66pub mod grpc;
67pub mod redelivery;
68pub mod subscription;
69
70use std::sync::Arc;
71
72use hermes_proto::broker_server::BrokerServer;
73use hermes_store::RedbMessageStore;
74use tokio::net::TcpListener;
75use tonic::transport::Server;
76use tracing::info;
77
78/// Run the broker server on the given listener.
79/// Useful for integration tests that need a server on a random port.
80pub async fn run(listener: TcpListener) -> Result<(), Box<dyn std::error::Error>> {
81    run_with_config(listener, config::ServerConfig::default()).await
82}
83
84/// Run the broker server with a specific config.
85pub async fn run_with_config(
86    listener: TcpListener,
87    config: config::ServerConfig,
88) -> Result<(), Box<dyn std::error::Error>> {
89    let store: Option<Arc<dyn hermes_store::MessageStore>> =
90        if let Some(ref path) = config.store_path {
91            let store = RedbMessageStore::open(path)?;
92            info!(?path, "durable store opened");
93            Some(Arc::new(store))
94        } else {
95            None
96        };
97
98    let engine = if let Some(ref store) = store {
99        Arc::new(broker::BrokerEngine::with_store(
100            config.subscriber_channel_capacity,
101            store.clone(),
102        ))
103    } else {
104        Arc::new(broker::BrokerEngine::new(
105            config.subscriber_channel_capacity,
106        ))
107    };
108
109    // Spawn redelivery + GC loops if store is enabled.
110    if let Some(ref store) = store {
111        redelivery::spawn_redelivery_loop(
112            engine.clone(),
113            config.redelivery_interval_secs,
114            config.max_delivery_attempts,
115            config.redelivery_batch_size,
116        );
117        redelivery::spawn_gc_loop(
118            store.clone(),
119            config.retention_secs,
120            config.gc_interval_secs,
121        );
122    }
123
124    let service = grpc::BrokerService::new(engine, config);
125
126    let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
127
128    let reflection = tonic_reflection::server::Builder::configure()
129        .register_encoded_file_descriptor_set(hermes_proto::FILE_DESCRIPTOR_SET)
130        .build_v1()?;
131
132    Server::builder()
133        .add_service(reflection)
134        .add_service(BrokerServer::new(service))
135        .serve_with_incoming(incoming)
136        .await?;
137
138    Ok(())
139}