Skip to main content

hermes_server/
lib.rs

1//! `hermes-server` is the server-side crate for the Hermes gRPC event broker.
2//!
3//! It provides:
4//! - a broker engine with fanout and queue-group delivery
5//! - a gRPC service implementation (via `tonic`)
6//! - optional durable delivery with redelivery + garbage-collection loops
7//!
8//! # Quick start
9//!
10//! Run a server from a bound `TcpListener`:
11//!
12//! ```no_run
13//! use tokio::net::TcpListener;
14//!
15//! #[tokio::main]
16//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
17//!     let listener = TcpListener::bind("127.0.0.1:4222").await?;
18//!     hermes_server::run(listener).await?;
19//!     Ok(())
20//! }
21//! ```
22//!
23//! Run with explicit configuration:
24//!
25//! ```no_run
26//! use tokio::net::TcpListener;
27//! use hermes_server::config::ServerConfig;
28//!
29//! #[tokio::main]
30//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
31//!     let listener = TcpListener::bind("127.0.0.1:4222").await?;
32//!     let mut cfg = ServerConfig::default();
33//!     cfg.subscriber_channel_capacity = 16_384;
34//!     cfg.store_path = Some("hermes.redb".into());
35//!     hermes_server::run_with_config(listener, cfg).await?;
36//!     Ok(())
37//! }
38//! ```
39//!
40//! # Configuration
41//!
42//! The server can be configured programmatically with [`config::ServerConfig`] and can
43//! also be initialized from environment variables through `ServerConfig::from_env()`.
44//!
45//! Useful env vars include:
46//! - `HERMES_LISTEN_ADDR`
47//! - `HERMES_CHANNEL_CAPACITY`
48//! - `HERMES_GRPC_OUTPUT_BUFFER`
49//! - `HERMES_STORE_PATH`
50//! - `HERMES_REDELIVERY_INTERVAL`
51//! - `HERMES_MAX_DELIVERY_ATTEMPTS`
52//! - `HERMES_RETENTION_SECS`
53//! - `HERMES_ACK_TIMEOUT`
54//! - `HERMES_MAX_IN_FLIGHT`
55//! - `HERMES_GC_INTERVAL`
56//! - `HERMES_REDELIVERY_BATCH_SIZE`
57//!
58//! # Notes
59//!
60//! - If `store_path` is `None`, the server runs in fire-and-forget mode (no durable
61//!   redelivery persistence).
62//! - If `store_path` is set, durable store-backed redelivery and GC loops are spawned.
63
64pub mod broker;
65pub mod config;
66pub mod grpc;
67pub mod redelivery;
68pub mod subscription;
69
70use std::future::Future;
71use std::sync::Arc;
72
73use hermes_proto::broker_server::BrokerServer;
74use hermes_store::RedbMessageStore;
75use tokio::net::TcpListener;
76use tokio_util::sync::CancellationToken;
77use tonic::transport::Server;
78use tracing::info;
79
80/// Run the broker server on the given listener (runs until the process is killed).
81/// Useful for integration tests that need a server on a random port.
82pub async fn run(listener: TcpListener) -> Result<(), Box<dyn std::error::Error>> {
83    run_with_config(listener, config::ServerConfig::default()).await
84}
85
86/// Run the broker server with a specific config (runs until the process is killed).
87pub async fn run_with_config(
88    listener: TcpListener,
89    config: config::ServerConfig,
90) -> Result<(), Box<dyn std::error::Error>> {
91    // A future that never resolves — the server runs forever.
92    run_with_shutdown(listener, config, std::future::pending::<()>()).await
93}
94
95/// Run the broker server with graceful shutdown.
96///
97/// The server will stop accepting new connections when `shutdown` resolves,
98/// and will finish processing in-flight requests before returning.
99pub async fn run_with_shutdown(
100    listener: TcpListener,
101    config: config::ServerConfig,
102    shutdown: impl Future<Output = ()>,
103) -> Result<(), Box<dyn std::error::Error>> {
104    let store: Option<Arc<dyn hermes_store::MessageStore>> =
105        if let Some(ref path) = config.store_path {
106            let store = RedbMessageStore::open(path)?;
107            info!(?path, "durable store opened");
108            Some(Arc::new(store))
109        } else {
110            None
111        };
112
113    let engine = Arc::new(broker::BrokerEngine::new(
114        config.subscriber_channel_capacity,
115        store.clone(),
116    ));
117
118    // Token to cancel background loops on shutdown.
119    let cancel = CancellationToken::new();
120
121    // Spawn redelivery + GC loops if store is enabled.
122    if let Some(ref store) = store {
123        redelivery::spawn_redelivery_loop(
124            engine.clone(),
125            config.redelivery_interval_secs,
126            config.max_delivery_attempts,
127            config.redelivery_batch_size,
128            cancel.clone(),
129        );
130        redelivery::spawn_gc_loop(
131            store.clone(),
132            config.retention_secs,
133            config.gc_interval_secs,
134            cancel.clone(),
135        );
136    }
137
138    let service = grpc::BrokerService::new(engine, config);
139
140    let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
141
142    let reflection = tonic_reflection::server::Builder::configure()
143        .register_encoded_file_descriptor_set(hermes_proto::FILE_DESCRIPTOR_SET)
144        .build_v1()?;
145
146    Server::builder()
147        .add_service(reflection)
148        .add_service(BrokerServer::new(service))
149        .serve_with_incoming_shutdown(incoming, shutdown)
150        .await?;
151
152    // Stop background loops and let them release the store.
153    cancel.cancel();
154
155    Ok(())
156}