rust_web_server/service_discovery/mod.rs
1//! Dynamic backend pool with pluggable discovery sources.
2//!
3//! [`BackendPool`] maintains a thread-safe list of `"host:port"` addresses that
4//! can be refreshed on a background thread. Discovery is delegated to a
5//! [`DiscoverySource`]:
6//!
7//! | Variant | Description |
8//! |------------- |---------------------------------------------------------- |
9//! | `Static` | Fixed list — no polling required. |
10//! | `EnvPrefix` | Scan `PREFIX_0`, `PREFIX_1`, … environment variables. |
11//! | `File` | Read one `host:port` per line from a file. |
12//! | `Dns` | A-record lookup — resolve hostname to all IPs. |
13//!
14//! # Example
15//!
16//! ```rust,no_run
17//! use rust_web_server::service_discovery::BackendPool;
18//!
19//! // Fixed list — no background thread needed.
20//! let pool = BackendPool::r#static(vec!["10.0.0.1:8080".into(), "10.0.0.2:8080".into()]);
21//! println!("{:?}", pool.backends());
22//!
23//! // Env-var discovery, refreshed every 60 seconds.
24//! let pool = BackendPool::env_prefix("MY_SVC_BACKEND")
25//! .poll_interval_secs(60);
26//! pool.start();
27//! println!("{:?}", pool.backends());
28//! ```
29
30#[cfg(test)]
31mod tests;
32
33use std::net::ToSocketAddrs;
34use std::sync::{Arc, RwLock};
35use std::time::Duration;
36
37// ── DiscoverySource ───────────────────────────────────────────────────────────
38
39/// Controls how [`BackendPool`] discovers backend addresses.
40pub enum DiscoverySource {
41 /// Fixed list of `"host:port"` addresses — never refreshed.
42 Static(Vec<String>),
43 /// Scan environment variables `PREFIX_0`, `PREFIX_1`, … until one is absent.
44 EnvPrefix(String),
45 /// Read one `host:port` per line from a file. Blank lines and lines starting
46 /// with `#` are ignored.
47 File(String),
48 /// Resolve `hostname` via A-record DNS lookup; format each IP as `ip:port`.
49 Dns { hostname: String, port: u16 },
50}
51
52impl DiscoverySource {
53 /// Perform a single discovery cycle and return the current backend list.
54 fn resolve(&self) -> Vec<String> {
55 match self {
56 DiscoverySource::Static(v) => v.clone(),
57
58 DiscoverySource::EnvPrefix(prefix) => {
59 let mut backends = Vec::new();
60 let mut i = 0usize;
61 loop {
62 let key = format!("{}_{}", prefix, i);
63 match std::env::var(&key) {
64 Ok(val) => { backends.push(val); i += 1; }
65 Err(_) => break,
66 }
67 }
68 backends
69 }
70
71 DiscoverySource::File(path) => {
72 match std::fs::read_to_string(path) {
73 Ok(contents) => contents
74 .lines()
75 .map(str::trim)
76 .filter(|line| !line.is_empty() && !line.starts_with('#'))
77 .map(str::to_string)
78 .collect(),
79 Err(e) => {
80 eprintln!("service_discovery: cannot read backend file {:?}: {}", path, e);
81 Vec::new()
82 }
83 }
84 }
85
86 DiscoverySource::Dns { hostname, port } => {
87 let addr_str = format!("{}:{}", hostname, port);
88 match addr_str.to_socket_addrs() {
89 Ok(addrs) => addrs
90 .map(|sa| format!("{}:{}", sa.ip(), sa.port()))
91 .collect(),
92 Err(e) => {
93 eprintln!("service_discovery: DNS lookup for {} failed: {}", addr_str, e);
94 Vec::new()
95 }
96 }
97 }
98 }
99 }
100}
101
102// ── BackendPool ───────────────────────────────────────────────────────────────
103
104/// Thread-safe pool of backend addresses, optionally refreshed in the background.
105///
106/// Clone this type freely — all clones share the same underlying `RwLock<Vec>`.
107#[derive(Clone)]
108pub struct BackendPool {
109 backends: Arc<RwLock<Vec<String>>>,
110 source: Arc<DiscoverySource>,
111 poll_interval_secs: u64,
112}
113
114impl BackendPool {
115 fn new(source: DiscoverySource) -> Self {
116 Self {
117 backends: Arc::new(RwLock::new(Vec::new())),
118 source: Arc::new(source),
119 poll_interval_secs: 30,
120 }
121 }
122
123 /// Create a pool from a fixed list of backends.
124 ///
125 /// The list is available immediately; `start()` is a no-op.
126 pub fn r#static(backends: Vec<String>) -> Self {
127 let initial = backends.clone();
128 let pool = Self::new(DiscoverySource::Static(backends));
129 *pool.backends.write().unwrap() = initial;
130 pool
131 }
132
133 /// Create a pool whose backends are read from environment variables
134 /// `PREFIX_0`, `PREFIX_1`, … at startup and every `poll_interval_secs`.
135 pub fn env_prefix(prefix: impl Into<String>) -> Self {
136 Self::new(DiscoverySource::EnvPrefix(prefix.into()))
137 }
138
139 /// Create a pool whose backends are read from a file (one `host:port` per line).
140 pub fn file(path: impl Into<String>) -> Self {
141 Self::new(DiscoverySource::File(path.into()))
142 }
143
144 /// Create a pool whose backends are discovered via DNS A-record lookup.
145 pub fn dns(hostname: impl Into<String>, port: u16) -> Self {
146 Self::new(DiscoverySource::Dns { hostname: hostname.into(), port })
147 }
148
149 /// Override the background refresh interval (default: 30 seconds).
150 ///
151 /// Only meaningful for `File` and `Dns` sources.
152 pub fn poll_interval_secs(mut self, secs: u64) -> Self {
153 self.poll_interval_secs = secs;
154 self
155 }
156
157 /// Start the background refresh thread.
158 ///
159 /// For `Static` sources this is a no-op. For all others, an immediate
160 /// `refresh()` is performed before spawning the background thread so that
161 /// the first `backends()` call returns a populated list.
162 pub fn start(&self) {
163 if matches!(self.source.as_ref(), DiscoverySource::Static(_)) {
164 return;
165 }
166 self.refresh();
167 let pool = self.clone();
168 let interval = Duration::from_secs(self.poll_interval_secs);
169 std::thread::spawn(move || loop {
170 std::thread::sleep(interval);
171 pool.refresh();
172 });
173 }
174
175 /// Return a snapshot of the current backend list.
176 pub fn backends(&self) -> Vec<String> {
177 self.backends.read().unwrap().clone()
178 }
179
180 /// Replace the current backend list with `backends`.
181 ///
182 /// Useful for testing or for external control planes that push updates.
183 pub fn update(&self, backends: Vec<String>) {
184 *self.backends.write().unwrap() = backends;
185 }
186
187 /// Perform one synchronous refresh cycle.
188 ///
189 /// Called automatically by `start()` and by the background thread.
190 pub fn refresh(&self) {
191 let resolved = self.source.resolve();
192 *self.backends.write().unwrap() = resolved;
193 }
194}