Skip to main content

rust_web_server/service_discovery/
mod.rs

1//! Dynamic backend pool with pluggable discovery sources.
2//!
3//! [`BackendPool`] maintains a thread-safe list of `"host:port"` addresses that
4//! can be refreshed on a background thread.  Discovery is delegated to a
5//! [`DiscoverySource`]:
6//!
7//! | Variant      | Description                                               |
8//! |------------- |---------------------------------------------------------- |
9//! | `Static`     | Fixed list — no polling required.                         |
10//! | `EnvPrefix`  | Scan `PREFIX_0`, `PREFIX_1`, … environment variables.     |
11//! | `File`       | Read one `host:port` per line from a file.                |
12//! | `Dns`        | A-record lookup — resolve hostname to all IPs.            |
13//!
14//! # Example
15//!
16//! ```rust,no_run
17//! use rust_web_server::service_discovery::BackendPool;
18//!
19//! // Fixed list — no background thread needed.
20//! let pool = BackendPool::r#static(vec!["10.0.0.1:8080".into(), "10.0.0.2:8080".into()]);
21//! println!("{:?}", pool.backends());
22//!
23//! // Env-var discovery, refreshed every 60 seconds.
24//! let pool = BackendPool::env_prefix("MY_SVC_BACKEND")
25//!     .poll_interval_secs(60);
26//! pool.start();
27//! println!("{:?}", pool.backends());
28//! ```
29
30#[cfg(test)]
31mod tests;
32
33use std::net::ToSocketAddrs;
34use std::sync::{Arc, RwLock};
35use std::time::Duration;
36
37// ── DiscoverySource ───────────────────────────────────────────────────────────
38
39/// Controls how [`BackendPool`] discovers backend addresses.
40pub enum DiscoverySource {
41    /// Fixed list of `"host:port"` addresses — never refreshed.
42    Static(Vec<String>),
43    /// Scan environment variables `PREFIX_0`, `PREFIX_1`, … until one is absent.
44    EnvPrefix(String),
45    /// Read one `host:port` per line from a file.  Blank lines and lines starting
46    /// with `#` are ignored.
47    File(String),
48    /// Resolve `hostname` via A-record DNS lookup; format each IP as `ip:port`.
49    Dns { hostname: String, port: u16 },
50}
51
52impl DiscoverySource {
53    /// Perform a single discovery cycle and return the current backend list.
54    fn resolve(&self) -> Vec<String> {
55        match self {
56            DiscoverySource::Static(v) => v.clone(),
57
58            DiscoverySource::EnvPrefix(prefix) => {
59                let mut backends = Vec::new();
60                let mut i = 0usize;
61                loop {
62                    let key = format!("{}_{}", prefix, i);
63                    match std::env::var(&key) {
64                        Ok(val) => { backends.push(val); i += 1; }
65                        Err(_) => break,
66                    }
67                }
68                backends
69            }
70
71            DiscoverySource::File(path) => {
72                match std::fs::read_to_string(path) {
73                    Ok(contents) => contents
74                        .lines()
75                        .map(str::trim)
76                        .filter(|line| !line.is_empty() && !line.starts_with('#'))
77                        .map(str::to_string)
78                        .collect(),
79                    Err(e) => {
80                        eprintln!("service_discovery: cannot read backend file {:?}: {}", path, e);
81                        Vec::new()
82                    }
83                }
84            }
85
86            DiscoverySource::Dns { hostname, port } => {
87                let addr_str = format!("{}:{}", hostname, port);
88                match addr_str.to_socket_addrs() {
89                    Ok(addrs) => addrs
90                        .map(|sa| format!("{}:{}", sa.ip(), sa.port()))
91                        .collect(),
92                    Err(e) => {
93                        eprintln!("service_discovery: DNS lookup for {} failed: {}", addr_str, e);
94                        Vec::new()
95                    }
96                }
97            }
98        }
99    }
100}
101
102// ── BackendPool ───────────────────────────────────────────────────────────────
103
104/// Thread-safe pool of backend addresses, optionally refreshed in the background.
105///
106/// Clone this type freely — all clones share the same underlying `RwLock<Vec>`.
107#[derive(Clone)]
108pub struct BackendPool {
109    backends: Arc<RwLock<Vec<String>>>,
110    source: Arc<DiscoverySource>,
111    poll_interval_secs: u64,
112}
113
114impl BackendPool {
115    fn new(source: DiscoverySource) -> Self {
116        Self {
117            backends: Arc::new(RwLock::new(Vec::new())),
118            source: Arc::new(source),
119            poll_interval_secs: 30,
120        }
121    }
122
123    /// Create a pool from a fixed list of backends.
124    ///
125    /// The list is available immediately; `start()` is a no-op.
126    pub fn r#static(backends: Vec<String>) -> Self {
127        let initial = backends.clone();
128        let pool = Self::new(DiscoverySource::Static(backends));
129        *pool.backends.write().unwrap() = initial;
130        pool
131    }
132
133    /// Create a pool whose backends are read from environment variables
134    /// `PREFIX_0`, `PREFIX_1`, … at startup and every `poll_interval_secs`.
135    pub fn env_prefix(prefix: impl Into<String>) -> Self {
136        Self::new(DiscoverySource::EnvPrefix(prefix.into()))
137    }
138
139    /// Create a pool whose backends are read from a file (one `host:port` per line).
140    pub fn file(path: impl Into<String>) -> Self {
141        Self::new(DiscoverySource::File(path.into()))
142    }
143
144    /// Create a pool whose backends are discovered via DNS A-record lookup.
145    pub fn dns(hostname: impl Into<String>, port: u16) -> Self {
146        Self::new(DiscoverySource::Dns { hostname: hostname.into(), port })
147    }
148
149    /// Override the background refresh interval (default: 30 seconds).
150    ///
151    /// Only meaningful for `File` and `Dns` sources.
152    pub fn poll_interval_secs(mut self, secs: u64) -> Self {
153        self.poll_interval_secs = secs;
154        self
155    }
156
157    /// Start the background refresh thread.
158    ///
159    /// For `Static` sources this is a no-op.  For all others, an immediate
160    /// `refresh()` is performed before spawning the background thread so that
161    /// the first `backends()` call returns a populated list.
162    pub fn start(&self) {
163        if matches!(self.source.as_ref(), DiscoverySource::Static(_)) {
164            return;
165        }
166        self.refresh();
167        let pool = self.clone();
168        let interval = Duration::from_secs(self.poll_interval_secs);
169        std::thread::spawn(move || loop {
170            std::thread::sleep(interval);
171            pool.refresh();
172        });
173    }
174
175    /// Return a snapshot of the current backend list.
176    pub fn backends(&self) -> Vec<String> {
177        self.backends.read().unwrap().clone()
178    }
179
180    /// Replace the current backend list with `backends`.
181    ///
182    /// Useful for testing or for external control planes that push updates.
183    pub fn update(&self, backends: Vec<String>) {
184        *self.backends.write().unwrap() = backends;
185    }
186
187    /// Perform one synchronous refresh cycle.
188    ///
189    /// Called automatically by `start()` and by the background thread.
190    pub fn refresh(&self) {
191        let resolved = self.source.resolve();
192        *self.backends.write().unwrap() = resolved;
193    }
194}