Skip to main content

hermes_server/
config.rs

1use std::net::SocketAddr;
2use std::path::PathBuf;
3
4use tracing::info;
5
6pub struct ServerConfig {
7    pub listen_addr: SocketAddr,
8    /// Capacity of the broadcast (fanout) and mpsc (queue-group) channels
9    /// inside the broker engine. This is the main buffer between publishers
10    /// and subscribers.
11    pub subscriber_channel_capacity: usize,
12    /// Capacity of the gRPC output mpsc channel per subscriber.
13    /// This is a small decoupling buffer between the forwarding task and the
14    /// gRPC write. Keep it small — back-pressure should be handled by the
15    /// broadcast/mpsc channel above.
16    pub grpc_output_buffer: usize,
17    /// Path to the redb store file. None = fire-and-forget only, no durable mode.
18    pub store_path: Option<PathBuf>,
19    /// How often the redelivery loop runs (seconds).
20    pub redelivery_interval_secs: u64,
21    /// Max delivery attempts before dead-lettering.
22    pub max_delivery_attempts: u32,
23    /// How long to keep acked messages before GC (seconds).
24    pub retention_secs: u64,
25    /// Default ack timeout for durable subscriptions (seconds).
26    pub default_ack_timeout_secs: u32,
27    /// Default max in-flight messages for durable subscriptions.
28    pub default_max_in_flight: u32,
29    /// How often the GC loop runs (seconds).
30    pub gc_interval_secs: u64,
31    /// Max expired messages processed per consumer per redelivery cycle.
32    pub redelivery_batch_size: u32,
33}
34
35impl Default for ServerConfig {
36    fn default() -> Self {
37        Self {
38            listen_addr: "0.0.0.0:4222".parse().unwrap(),
39            subscriber_channel_capacity: 8192,
40            grpc_output_buffer: 1024,
41            store_path: None,
42            redelivery_interval_secs: 5,
43            max_delivery_attempts: 5,
44            retention_secs: 3600,
45            default_ack_timeout_secs: 30,
46            default_max_in_flight: 32,
47            gc_interval_secs: 60,
48            redelivery_batch_size: 100,
49        }
50    }
51}
52
53impl ServerConfig {
54    pub fn from_env() -> Self {
55        let mut config = Self::default();
56
57        if let Ok(addr) = std::env::var("HERMES_LISTEN_ADDR")
58            && let Ok(parsed) = addr.parse()
59        {
60            config.listen_addr = parsed;
61        }
62        if let Ok(cap) = std::env::var("HERMES_CHANNEL_CAPACITY")
63            && let Ok(parsed) = cap.parse()
64        {
65            config.subscriber_channel_capacity = parsed;
66        }
67        if let Ok(v) = std::env::var("HERMES_GRPC_OUTPUT_BUFFER")
68            && let Ok(parsed) = v.parse()
69        {
70            config.grpc_output_buffer = parsed;
71        }
72        if let Ok(path) = std::env::var("HERMES_STORE_PATH") {
73            config.store_path = Some(PathBuf::from(path));
74        }
75        if let Ok(v) = std::env::var("HERMES_REDELIVERY_INTERVAL")
76            && let Ok(parsed) = v.parse()
77        {
78            config.redelivery_interval_secs = parsed;
79        }
80        if let Ok(v) = std::env::var("HERMES_MAX_DELIVERY_ATTEMPTS")
81            && let Ok(parsed) = v.parse()
82        {
83            config.max_delivery_attempts = parsed;
84        }
85        if let Ok(v) = std::env::var("HERMES_RETENTION_SECS")
86            && let Ok(parsed) = v.parse()
87        {
88            config.retention_secs = parsed;
89        }
90        if let Ok(v) = std::env::var("HERMES_ACK_TIMEOUT")
91            && let Ok(parsed) = v.parse()
92        {
93            config.default_ack_timeout_secs = parsed;
94        }
95        if let Ok(v) = std::env::var("HERMES_MAX_IN_FLIGHT")
96            && let Ok(parsed) = v.parse()
97        {
98            config.default_max_in_flight = parsed;
99        }
100        if let Ok(v) = std::env::var("HERMES_GC_INTERVAL")
101            && let Ok(parsed) = v.parse()
102        {
103            config.gc_interval_secs = parsed;
104        }
105        if let Ok(v) = std::env::var("HERMES_REDELIVERY_BATCH_SIZE")
106            && let Ok(parsed) = v.parse()
107        {
108            config.redelivery_batch_size = parsed;
109        }
110
111        info!(
112            listen_addr = %config.listen_addr,
113            channel_capacity = config.subscriber_channel_capacity,
114            grpc_output_buffer = config.grpc_output_buffer,
115            store_path = ?config.store_path,
116            redelivery_interval_secs = config.redelivery_interval_secs,
117            max_delivery_attempts = config.max_delivery_attempts,
118            retention_secs = config.retention_secs,
119            default_ack_timeout_secs = config.default_ack_timeout_secs,
120            default_max_in_flight = config.default_max_in_flight,
121            gc_interval_secs = config.gc_interval_secs,
122            redelivery_batch_size = config.redelivery_batch_size,
123            "config loaded"
124        );
125
126        config
127    }
128}