hermes_server/lib.rs
1//! `hermes-server` is the server-side crate for the Hermes gRPC event broker.
2//!
3//! It provides:
4//! - a broker engine with fanout and queue-group delivery
5//! - a gRPC service implementation (via `tonic`)
6//! - optional durable delivery with redelivery + garbage-collection loops
7//!
8//! # Quick start
9//!
10//! Run a server from a bound `TcpListener`:
11//!
12//! ```no_run
13//! use tokio::net::TcpListener;
14//!
15//! #[tokio::main]
16//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
17//! let listener = TcpListener::bind("127.0.0.1:4222").await?;
18//! hermes_server::run(listener).await?;
19//! Ok(())
20//! }
21//! ```
22//!
23//! Run with explicit configuration:
24//!
25//! ```no_run
26//! use tokio::net::TcpListener;
27//! use hermes_server::config::ServerConfig;
28//!
29//! #[tokio::main]
30//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
31//! let listener = TcpListener::bind("127.0.0.1:4222").await?;
32//! let mut cfg = ServerConfig::default();
33//! cfg.subscriber_channel_capacity = 16_384;
34//! cfg.store_path = Some("hermes.redb".into());
35//! hermes_server::run_with_config(listener, cfg).await?;
36//! Ok(())
37//! }
38//! ```
39//!
40//! # Configuration
41//!
42//! The server can be configured programmatically with [`config::ServerConfig`] and can
43//! also be initialized from environment variables through `ServerConfig::from_env()`.
44//!
45//! Useful env vars include:
46//! - `HERMES_LISTEN_ADDR`
47//! - `HERMES_CHANNEL_CAPACITY`
48//! - `HERMES_GRPC_OUTPUT_BUFFER`
49//! - `HERMES_STORE_PATH`
50//! - `HERMES_REDELIVERY_INTERVAL`
51//! - `HERMES_MAX_DELIVERY_ATTEMPTS`
52//! - `HERMES_RETENTION_SECS`
53//! - `HERMES_ACK_TIMEOUT`
54//! - `HERMES_MAX_IN_FLIGHT`
55//! - `HERMES_GC_INTERVAL`
56//! - `HERMES_REDELIVERY_BATCH_SIZE`
57//!
58//! # Notes
59//!
60//! - If `store_path` is `None`, the server runs in fire-and-forget mode (no durable
61//! redelivery persistence).
62//! - If `store_path` is set, durable store-backed redelivery and GC loops are spawned.
63
64pub mod broker;
65pub mod config;
66pub mod grpc;
67pub mod redelivery;
68pub mod subscription;
69
70use std::future::Future;
71use std::sync::Arc;
72
73use hermes_proto::broker_server::BrokerServer;
74use hermes_store::RedbMessageStore;
75use tokio::net::TcpListener;
76use tokio_util::sync::CancellationToken;
77use tonic::transport::Server;
78use tracing::info;
79
80/// Run the broker server on the given listener (runs until the process is killed).
81/// Useful for integration tests that need a server on a random port.
82pub async fn run(listener: TcpListener) -> Result<(), Box<dyn std::error::Error>> {
83 run_with_config(listener, config::ServerConfig::default()).await
84}
85
86/// Run the broker server with a specific config (runs until the process is killed).
87pub async fn run_with_config(
88 listener: TcpListener,
89 config: config::ServerConfig,
90) -> Result<(), Box<dyn std::error::Error>> {
91 // A future that never resolves — the server runs forever.
92 run_with_shutdown(listener, config, std::future::pending::<()>()).await
93}
94
95/// Run the broker server with graceful shutdown.
96///
97/// The server will stop accepting new connections when `shutdown` resolves,
98/// and will finish processing in-flight requests before returning.
99pub async fn run_with_shutdown(
100 listener: TcpListener,
101 config: config::ServerConfig,
102 shutdown: impl Future<Output = ()>,
103) -> Result<(), Box<dyn std::error::Error>> {
104 let store: Option<Arc<dyn hermes_store::MessageStore>> =
105 if let Some(ref path) = config.store_path {
106 let store = RedbMessageStore::open(path)?;
107 info!(?path, "durable store opened");
108 Some(Arc::new(store))
109 } else {
110 None
111 };
112
113 let engine = Arc::new(broker::BrokerEngine::new(
114 config.subscriber_channel_capacity,
115 store.clone(),
116 ));
117
118 // Token to cancel background loops on shutdown.
119 let cancel = CancellationToken::new();
120
121 // Spawn redelivery + GC loops if store is enabled.
122 if let Some(ref store) = store {
123 redelivery::spawn_redelivery_loop(
124 engine.clone(),
125 config.redelivery_interval_secs,
126 config.max_delivery_attempts,
127 config.redelivery_batch_size,
128 cancel.clone(),
129 );
130 redelivery::spawn_gc_loop(
131 store.clone(),
132 config.retention_secs,
133 config.gc_interval_secs,
134 cancel.clone(),
135 );
136 }
137
138 let service = grpc::BrokerService::new(engine, config);
139
140 let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
141
142 let reflection = tonic_reflection::server::Builder::configure()
143 .register_encoded_file_descriptor_set(hermes_proto::FILE_DESCRIPTOR_SET)
144 .build_v1()?;
145
146 Server::builder()
147 .add_service(reflection)
148 .add_service(BrokerServer::new(service))
149 .serve_with_incoming_shutdown(incoming, shutdown)
150 .await?;
151
152 // Stop background loops and let them release the store.
153 cancel.cancel();
154
155 Ok(())
156}