nntp_proxy/router/
mod.rs

1//! Backend server selection and load balancing
2//!
3//! This module handles selecting backend servers using round-robin
4//! with simple load tracking for monitoring.
5//!
6//! # Overview
7//!
8//! The `BackendSelector` provides thread-safe backend selection for routing
9//! NNTP commands across multiple backend servers. It uses a lock-free
10//! round-robin algorithm with atomic operations for concurrent access.
11//!
12//! # Usage
13//!
14//! ```no_run
15//! use nntp_proxy::router::BackendSelector;
16//! use nntp_proxy::types::{BackendId, ClientId};
17//! # use nntp_proxy::pool::DeadpoolConnectionProvider;
18//!
19//! let mut selector = BackendSelector::new();
20//! # let provider = DeadpoolConnectionProvider::new(
21//! #     "localhost".to_string(), 119, "test".to_string(), 10, None, None
22//! # );
23//! selector.add_backend(BackendId::from_index(0), "server1".to_string(), provider);
24//!
25//! // Route a command
26//! let client_id = ClientId::new();
27//! let backend_id = selector.route_command_sync(client_id, "LIST").unwrap();
28//!
29//! // After command completes
30//! selector.complete_command_sync(backend_id);
31//! ```
32
33use anyhow::Result;
34use std::sync::Arc;
35use std::sync::atomic::{AtomicUsize, Ordering};
36use tracing::{debug, info};
37
38use crate::pool::DeadpoolConnectionProvider;
39use crate::types::{BackendId, ClientId};
40
41/// Backend connection information
42#[derive(Debug, Clone)]
43struct BackendInfo {
44    /// Backend identifier
45    id: BackendId,
46    /// Server name for logging
47    name: String,
48    /// Connection provider for this backend
49    provider: DeadpoolConnectionProvider,
50    /// Number of pending requests on this backend (for load balancing)
51    pending_count: Arc<AtomicUsize>,
52    /// Number of connections in stateful mode (for hybrid routing reservation)
53    stateful_count: Arc<AtomicUsize>,
54}
55
56/// Selects backend servers using round-robin with load tracking
57///
58/// # Thread Safety
59///
60/// This struct is designed for concurrent access across multiple threads.
61/// The round-robin counter and pending counts use atomic operations for
62/// lock-free performance.
63///
64/// # Load Balancing
65///
66/// - **Strategy**: Round-robin rotation through available backends
67/// - **Tracking**: Atomic counters track pending commands per backend
68/// - **Monitoring**: Load statistics available via `backend_load()`
69///
70/// # Examples
71///
72/// ```no_run
73/// # use nntp_proxy::router::BackendSelector;
74/// # use nntp_proxy::types::{BackendId, ClientId};
75/// # use nntp_proxy::pool::DeadpoolConnectionProvider;
76/// let mut selector = BackendSelector::new();
77///
78/// # let provider = DeadpoolConnectionProvider::new(
79/// #     "localhost".to_string(), 119, "test".to_string(), 10, None, None
80/// # );
81/// selector.add_backend(
82///     BackendId::from_index(0),
83///     "backend-1".to_string(),
84///     provider,
85/// );
86///
87/// // Route commands
88/// let backend = selector.route_command_sync(ClientId::new(), "LIST")?;
89/// # Ok::<(), anyhow::Error>(())
90/// ```
91#[derive(Debug)]
92pub struct BackendSelector {
93    /// Backend connection providers
94    backends: Vec<BackendInfo>,
95    /// Current backend index for round-robin selection
96    current_backend: AtomicUsize,
97}
98
99impl Default for BackendSelector {
100    fn default() -> Self {
101        Self::new()
102    }
103}
104
105impl BackendSelector {
106    /// Create a new backend selector
107    #[must_use]
108    pub fn new() -> Self {
109        Self {
110            // Pre-allocate for typical number of backend servers (most setups have 2-8)
111            backends: Vec::with_capacity(4),
112            current_backend: AtomicUsize::new(0),
113        }
114    }
115
116    /// Add a backend server to the router
117    pub fn add_backend(
118        &mut self,
119        backend_id: BackendId,
120        name: String,
121        provider: DeadpoolConnectionProvider,
122    ) {
123        info!("Added backend {:?} ({})", backend_id, name);
124        self.backends.push(BackendInfo {
125            id: backend_id,
126            name,
127            provider,
128            pending_count: Arc::new(AtomicUsize::new(0)),
129            stateful_count: Arc::new(AtomicUsize::new(0)),
130        });
131    }
132
133    /// Select the next backend using round-robin strategy
134    fn select_backend(&self) -> Option<&BackendInfo> {
135        if self.backends.is_empty() {
136            return None;
137        }
138
139        let index = self.current_backend.fetch_add(1, Ordering::Relaxed) % self.backends.len();
140        Some(&self.backends[index])
141    }
142
143    /// Select a backend for the given command using round-robin
144    /// Returns the backend ID to use for this command
145    pub fn route_command_sync(&self, _client_id: ClientId, _command: &str) -> Result<BackendId> {
146        let backend = self.select_backend().ok_or_else(|| {
147            anyhow::anyhow!(
148                "No backends available for routing (total backends: {})",
149                self.backends.len()
150            )
151        })?;
152
153        // Increment pending count for load tracking
154        backend.pending_count.fetch_add(1, Ordering::Relaxed);
155
156        debug!(
157            "Selected backend {:?} ({}) for command",
158            backend.id, backend.name
159        );
160
161        Ok(backend.id)
162    }
163
164    /// Mark a command as complete, decrementing the pending count
165    pub fn complete_command_sync(&self, backend_id: BackendId) {
166        if let Some(backend) = self.backends.iter().find(|b| b.id == backend_id) {
167            backend.pending_count.fetch_sub(1, Ordering::Relaxed);
168        }
169    }
170
171    /// Get the connection provider for a backend
172    #[must_use]
173    pub fn get_backend_provider(
174        &self,
175        backend_id: BackendId,
176    ) -> Option<&DeadpoolConnectionProvider> {
177        self.backends
178            .iter()
179            .find(|b| b.id == backend_id)
180            .map(|b| &b.provider)
181    }
182
183    /// Get the number of backends
184    #[must_use]
185    #[inline]
186    pub fn backend_count(&self) -> usize {
187        self.backends.len()
188    }
189
190    /// Get backend load (pending requests) for monitoring
191    #[must_use]
192    pub fn backend_load(&self, backend_id: BackendId) -> Option<usize> {
193        self.backends
194            .iter()
195            .find(|b| b.id == backend_id)
196            .map(|b| b.pending_count.load(Ordering::Relaxed))
197    }
198
199    /// Try to acquire a stateful connection slot for hybrid mode
200    /// Returns true if acquisition succeeded (within max_connections-1 limit)
201    /// Returns false if all stateful slots are taken (need to keep 1 for PCR)
202    pub fn try_acquire_stateful(&self, backend_id: BackendId) -> bool {
203        if let Some(backend) = self.backends.iter().find(|b| b.id == backend_id) {
204            // Get max connections from the provider's pool
205            let max_connections = backend.provider.max_size();
206
207            // Reserve 1 connection for per-command routing
208            let max_stateful = max_connections.saturating_sub(1);
209
210            // Try to increment if we haven't hit the limit
211            let mut current = backend.stateful_count.load(Ordering::Acquire);
212            loop {
213                if current >= max_stateful {
214                    debug!(
215                        "Backend {:?} ({}) stateful limit reached: {}/{}",
216                        backend_id, backend.name, current, max_stateful
217                    );
218                    return false;
219                }
220
221                match backend.stateful_count.compare_exchange_weak(
222                    current,
223                    current + 1,
224                    Ordering::AcqRel,
225                    Ordering::Acquire,
226                ) {
227                    Ok(_) => {
228                        debug!(
229                            "Backend {:?} ({}) acquired stateful slot: {}/{}",
230                            backend_id,
231                            backend.name,
232                            current + 1,
233                            max_stateful
234                        );
235                        return true;
236                    }
237                    Err(actual) => current = actual,
238                }
239            }
240        }
241        false
242    }
243
244    /// Release a stateful connection slot
245    pub fn release_stateful(&self, backend_id: BackendId) {
246        if let Some(backend) = self.backends.iter().find(|b| b.id == backend_id) {
247            // Atomically decrement if greater than zero, avoiding underflow and spurious logs
248            let result = backend.stateful_count.fetch_update(
249                Ordering::AcqRel,
250                Ordering::Acquire,
251                |current| {
252                    if current == 0 {
253                        None
254                    } else {
255                        Some(current - 1)
256                    }
257                },
258            );
259            match result {
260                Ok(prev) => {
261                    debug!(
262                        "Backend {:?} ({}) released stateful slot: {}/{}",
263                        backend_id,
264                        backend.name,
265                        prev - 1,
266                        backend.provider.max_size().saturating_sub(1)
267                    );
268                }
269                Err(0) => {
270                    debug!(
271                        "Backend {:?} ({}) release_stateful called when count already 0",
272                        backend_id, backend.name
273                    );
274                }
275                Err(other) => unreachable!(
276                    "Unexpected error in fetch_update: got Err({other}), expected only Err(0)"
277                ),
278            }
279        }
280    }
281
282    /// Get the number of stateful connections for a backend
283    #[must_use]
284    pub fn stateful_count(&self, backend_id: BackendId) -> Option<usize> {
285        self.backends
286            .iter()
287            .find(|b| b.id == backend_id)
288            .map(|b| b.stateful_count.load(Ordering::Relaxed))
289    }
290}
291
292#[cfg(test)]
293mod tests;