nntp_proxy/router/mod.rs
1//! Backend server selection and load balancing
2//!
3//! This module handles selecting backend servers using round-robin
4//! with simple load tracking for monitoring.
5//!
6//! # Overview
7//!
8//! The `BackendSelector` provides thread-safe backend selection for routing
9//! NNTP commands across multiple backend servers. It uses a lock-free
10//! round-robin algorithm with atomic operations for concurrent access.
11//!
12//! # Usage
13//!
14//! ```no_run
15//! use nntp_proxy::router::BackendSelector;
16//! use nntp_proxy::types::{BackendId, ClientId};
17//! # use nntp_proxy::pool::DeadpoolConnectionProvider;
18//!
19//! let mut selector = BackendSelector::new();
20//! # let provider = DeadpoolConnectionProvider::new(
21//! # "localhost".to_string(), 119, "test".to_string(), 10, None, None
22//! # );
23//! selector.add_backend(BackendId::from_index(0), "server1".to_string(), provider);
24//!
25//! // Route a command
26//! let client_id = ClientId::new();
27//! let backend_id = selector.route_command_sync(client_id, "LIST").unwrap();
28//!
29//! // After command completes
30//! selector.complete_command_sync(backend_id);
31//! ```
32
33use anyhow::Result;
34use std::sync::Arc;
35use std::sync::atomic::{AtomicUsize, Ordering};
36use tracing::{debug, info};
37
38use crate::pool::DeadpoolConnectionProvider;
39use crate::types::{BackendId, ClientId};
40
41/// Backend connection information
42#[derive(Debug, Clone)]
43struct BackendInfo {
44 /// Backend identifier
45 id: BackendId,
46 /// Server name for logging
47 name: String,
48 /// Connection provider for this backend
49 provider: DeadpoolConnectionProvider,
50 /// Number of pending requests on this backend (for load balancing)
51 pending_count: Arc<AtomicUsize>,
52 /// Number of connections in stateful mode (for hybrid routing reservation)
53 stateful_count: Arc<AtomicUsize>,
54}
55
56/// Selects backend servers using round-robin with load tracking
57///
58/// # Thread Safety
59///
60/// This struct is designed for concurrent access across multiple threads.
61/// The round-robin counter and pending counts use atomic operations for
62/// lock-free performance.
63///
64/// # Load Balancing
65///
66/// - **Strategy**: Round-robin rotation through available backends
67/// - **Tracking**: Atomic counters track pending commands per backend
68/// - **Monitoring**: Load statistics available via `backend_load()`
69///
70/// # Examples
71///
72/// ```no_run
73/// # use nntp_proxy::router::BackendSelector;
74/// # use nntp_proxy::types::{BackendId, ClientId};
75/// # use nntp_proxy::pool::DeadpoolConnectionProvider;
76/// let mut selector = BackendSelector::new();
77///
78/// # let provider = DeadpoolConnectionProvider::new(
79/// # "localhost".to_string(), 119, "test".to_string(), 10, None, None
80/// # );
81/// selector.add_backend(
82/// BackendId::from_index(0),
83/// "backend-1".to_string(),
84/// provider,
85/// );
86///
87/// // Route commands
88/// let backend = selector.route_command_sync(ClientId::new(), "LIST")?;
89/// # Ok::<(), anyhow::Error>(())
90/// ```
91#[derive(Debug)]
92pub struct BackendSelector {
93 /// Backend connection providers
94 backends: Vec<BackendInfo>,
95 /// Current backend index for round-robin selection
96 current_backend: AtomicUsize,
97}
98
99impl Default for BackendSelector {
100 fn default() -> Self {
101 Self::new()
102 }
103}
104
105impl BackendSelector {
106 /// Create a new backend selector
107 #[must_use]
108 pub fn new() -> Self {
109 Self {
110 // Pre-allocate for typical number of backend servers (most setups have 2-8)
111 backends: Vec::with_capacity(4),
112 current_backend: AtomicUsize::new(0),
113 }
114 }
115
116 /// Add a backend server to the router
117 pub fn add_backend(
118 &mut self,
119 backend_id: BackendId,
120 name: String,
121 provider: DeadpoolConnectionProvider,
122 ) {
123 info!("Added backend {:?} ({})", backend_id, name);
124 self.backends.push(BackendInfo {
125 id: backend_id,
126 name,
127 provider,
128 pending_count: Arc::new(AtomicUsize::new(0)),
129 stateful_count: Arc::new(AtomicUsize::new(0)),
130 });
131 }
132
133 /// Select the next backend using round-robin strategy
134 fn select_backend(&self) -> Option<&BackendInfo> {
135 if self.backends.is_empty() {
136 return None;
137 }
138
139 let index = self.current_backend.fetch_add(1, Ordering::Relaxed) % self.backends.len();
140 Some(&self.backends[index])
141 }
142
143 /// Select a backend for the given command using round-robin
144 /// Returns the backend ID to use for this command
145 pub fn route_command_sync(&self, _client_id: ClientId, _command: &str) -> Result<BackendId> {
146 let backend = self.select_backend().ok_or_else(|| {
147 anyhow::anyhow!(
148 "No backends available for routing (total backends: {})",
149 self.backends.len()
150 )
151 })?;
152
153 // Increment pending count for load tracking
154 backend.pending_count.fetch_add(1, Ordering::Relaxed);
155
156 debug!(
157 "Selected backend {:?} ({}) for command",
158 backend.id, backend.name
159 );
160
161 Ok(backend.id)
162 }
163
164 /// Mark a command as complete, decrementing the pending count
165 pub fn complete_command_sync(&self, backend_id: BackendId) {
166 if let Some(backend) = self.backends.iter().find(|b| b.id == backend_id) {
167 backend.pending_count.fetch_sub(1, Ordering::Relaxed);
168 }
169 }
170
171 /// Get the connection provider for a backend
172 #[must_use]
173 pub fn get_backend_provider(
174 &self,
175 backend_id: BackendId,
176 ) -> Option<&DeadpoolConnectionProvider> {
177 self.backends
178 .iter()
179 .find(|b| b.id == backend_id)
180 .map(|b| &b.provider)
181 }
182
183 /// Get the number of backends
184 #[must_use]
185 #[inline]
186 pub fn backend_count(&self) -> usize {
187 self.backends.len()
188 }
189
190 /// Get backend load (pending requests) for monitoring
191 #[must_use]
192 pub fn backend_load(&self, backend_id: BackendId) -> Option<usize> {
193 self.backends
194 .iter()
195 .find(|b| b.id == backend_id)
196 .map(|b| b.pending_count.load(Ordering::Relaxed))
197 }
198
199 /// Try to acquire a stateful connection slot for hybrid mode
200 /// Returns true if acquisition succeeded (within max_connections-1 limit)
201 /// Returns false if all stateful slots are taken (need to keep 1 for PCR)
202 pub fn try_acquire_stateful(&self, backend_id: BackendId) -> bool {
203 if let Some(backend) = self.backends.iter().find(|b| b.id == backend_id) {
204 // Get max connections from the provider's pool
205 let max_connections = backend.provider.max_size();
206
207 // Reserve 1 connection for per-command routing
208 let max_stateful = max_connections.saturating_sub(1);
209
210 // Try to increment if we haven't hit the limit
211 let mut current = backend.stateful_count.load(Ordering::Acquire);
212 loop {
213 if current >= max_stateful {
214 debug!(
215 "Backend {:?} ({}) stateful limit reached: {}/{}",
216 backend_id, backend.name, current, max_stateful
217 );
218 return false;
219 }
220
221 match backend.stateful_count.compare_exchange_weak(
222 current,
223 current + 1,
224 Ordering::AcqRel,
225 Ordering::Acquire,
226 ) {
227 Ok(_) => {
228 debug!(
229 "Backend {:?} ({}) acquired stateful slot: {}/{}",
230 backend_id,
231 backend.name,
232 current + 1,
233 max_stateful
234 );
235 return true;
236 }
237 Err(actual) => current = actual,
238 }
239 }
240 }
241 false
242 }
243
244 /// Release a stateful connection slot
245 pub fn release_stateful(&self, backend_id: BackendId) {
246 if let Some(backend) = self.backends.iter().find(|b| b.id == backend_id) {
247 // Atomically decrement if greater than zero, avoiding underflow and spurious logs
248 let result = backend.stateful_count.fetch_update(
249 Ordering::AcqRel,
250 Ordering::Acquire,
251 |current| {
252 if current == 0 {
253 None
254 } else {
255 Some(current - 1)
256 }
257 },
258 );
259 match result {
260 Ok(prev) => {
261 debug!(
262 "Backend {:?} ({}) released stateful slot: {}/{}",
263 backend_id,
264 backend.name,
265 prev - 1,
266 backend.provider.max_size().saturating_sub(1)
267 );
268 }
269 Err(0) => {
270 debug!(
271 "Backend {:?} ({}) release_stateful called when count already 0",
272 backend_id, backend.name
273 );
274 }
275 Err(other) => unreachable!(
276 "Unexpected error in fetch_update: got Err({other}), expected only Err(0)"
277 ),
278 }
279 }
280 }
281
282 /// Get the number of stateful connections for a backend
283 #[must_use]
284 pub fn stateful_count(&self, backend_id: BackendId) -> Option<usize> {
285 self.backends
286 .iter()
287 .find(|b| b.id == backend_id)
288 .map(|b| b.stateful_count.load(Ordering::Relaxed))
289 }
290}
291
292#[cfg(test)]
293mod tests;