hermes_server/lib.rs
1//! `hermes-server` is the server-side crate for the Hermes gRPC event broker.
2//!
3//! It provides:
4//! - a broker engine with fanout and queue-group delivery
5//! - a gRPC service implementation (via `tonic`)
6//! - optional durable delivery with redelivery + garbage-collection loops
7//!
8//! # Quick start
9//!
10//! Run a server from a bound `TcpListener`:
11//!
12//! ```no_run
13//! use tokio::net::TcpListener;
14//!
15//! #[tokio::main]
16//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
17//! let listener = TcpListener::bind("127.0.0.1:4222").await?;
18//! hermes_server::run(listener).await?;
19//! Ok(())
20//! }
21//! ```
22//!
23//! Run with explicit configuration:
24//!
25//! ```no_run
26//! use tokio::net::TcpListener;
27//! use hermes_server::config::ServerConfig;
28//!
29//! #[tokio::main]
30//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
31//! let listener = TcpListener::bind("127.0.0.1:4222").await?;
32//! let mut cfg = ServerConfig::default();
33//! cfg.subscriber_channel_capacity = 16_384;
34//! cfg.store_path = Some("hermes.redb".into());
35//! hermes_server::run_with_config(listener, cfg).await?;
36//! Ok(())
37//! }
38//! ```
39//!
40//! # Configuration
41//!
42//! The server can be configured programmatically with [`config::ServerConfig`] and can
43//! also be initialized from environment variables through `ServerConfig::from_env()`.
44//!
45//! Useful env vars include:
46//! - `HERMES_LISTEN_ADDR`
47//! - `HERMES_CHANNEL_CAPACITY`
48//! - `HERMES_GRPC_OUTPUT_BUFFER`
49//! - `HERMES_STORE_PATH`
50//! - `HERMES_REDELIVERY_INTERVAL`
51//! - `HERMES_MAX_DELIVERY_ATTEMPTS`
52//! - `HERMES_RETENTION_SECS`
53//! - `HERMES_ACK_TIMEOUT`
54//! - `HERMES_MAX_IN_FLIGHT`
55//! - `HERMES_GC_INTERVAL`
56//! - `HERMES_REDELIVERY_BATCH_SIZE`
57//!
58//! # Notes
59//!
60//! - If `store_path` is `None`, the server runs in fire-and-forget mode (no durable
61//! redelivery persistence).
62//! - If `store_path` is set, durable store-backed redelivery and GC loops are spawned.
63
64pub mod broker;
65pub mod config;
66pub mod grpc;
67pub mod redelivery;
68pub mod subscription;
69
70use std::sync::Arc;
71
72use hermes_proto::broker_server::BrokerServer;
73use hermes_store::RedbMessageStore;
74use tokio::net::TcpListener;
75use tonic::transport::Server;
76use tracing::info;
77
78/// Run the broker server on the given listener.
79/// Useful for integration tests that need a server on a random port.
80pub async fn run(listener: TcpListener) -> Result<(), Box<dyn std::error::Error>> {
81 run_with_config(listener, config::ServerConfig::default()).await
82}
83
84/// Run the broker server with a specific config.
85pub async fn run_with_config(
86 listener: TcpListener,
87 config: config::ServerConfig,
88) -> Result<(), Box<dyn std::error::Error>> {
89 let store: Option<Arc<dyn hermes_store::MessageStore>> =
90 if let Some(ref path) = config.store_path {
91 let store = RedbMessageStore::open(path)?;
92 info!(?path, "durable store opened");
93 Some(Arc::new(store))
94 } else {
95 None
96 };
97
98 let engine = if let Some(ref store) = store {
99 Arc::new(broker::BrokerEngine::with_store(
100 config.subscriber_channel_capacity,
101 store.clone(),
102 ))
103 } else {
104 Arc::new(broker::BrokerEngine::new(
105 config.subscriber_channel_capacity,
106 ))
107 };
108
109 // Spawn redelivery + GC loops if store is enabled.
110 if let Some(ref store) = store {
111 redelivery::spawn_redelivery_loop(
112 engine.clone(),
113 config.redelivery_interval_secs,
114 config.max_delivery_attempts,
115 config.redelivery_batch_size,
116 );
117 redelivery::spawn_gc_loop(
118 store.clone(),
119 config.retention_secs,
120 config.gc_interval_secs,
121 );
122 }
123
124 let service = grpc::BrokerService::new(engine, config);
125
126 let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
127
128 let reflection = tonic_reflection::server::Builder::configure()
129 .register_encoded_file_descriptor_set(hermes_proto::FILE_DESCRIPTOR_SET)
130 .build_v1()?;
131
132 Server::builder()
133 .add_service(reflection)
134 .add_service(BrokerServer::new(service))
135 .serve_with_incoming(incoming)
136 .await?;
137
138 Ok(())
139}