Skip to main content

mockforge_core/
server_utils.rs

1//! Common server utilities for MockForge
2
3use std::net::SocketAddr;
4
5/// Create a SocketAddr for server binding from host and port
6///
7/// # Arguments
8/// * `host` - Host address string (e.g., "127.0.0.1", "0.0.0.0", "example.com")
9/// * `port` - Port number
10///
11/// # Returns
12/// * `Ok(SocketAddr)` - Parsed socket address
13/// * `Err(String)` - Error message if parsing fails
14pub fn create_socket_addr(host: &str, port: u16) -> Result<SocketAddr, String> {
15    format!("{}:{}", host, port)
16        .parse()
17        .map_err(|e| format!("Invalid socket address {}:{}: {}", host, port, e))
18}
19
20/// Create a standard IPv4 localhost SocketAddr (127.0.0.1:port)
21///
22/// # Arguments
23/// * `port` - Port number to bind to
24pub fn localhost_socket_addr(port: u16) -> SocketAddr {
25    SocketAddr::from(([127, 0, 0, 1], port))
26}
27
28/// Create a standard IPv4 wildcard SocketAddr (0.0.0.0:port) to listen on all interfaces
29///
30/// # Arguments
31/// * `port` - Port number to bind to
32pub fn wildcard_socket_addr(port: u16) -> SocketAddr {
33    SocketAddr::from(([0, 0, 0, 0], port))
34}
35
36/// Server startup configuration for binding and listening
37#[derive(Debug, Clone)]
38pub struct ServerConfig {
39    /// Host address to bind to (e.g., "0.0.0.0" or "127.0.0.1")
40    pub host: String,
41    /// Port number to bind to
42    pub port: u16,
43    /// Type of server to start
44    pub server_type: ServerType,
45}
46
47/// Server type enumeration
48#[derive(Debug, Clone)]
49pub enum ServerType {
50    /// HTTP/REST server
51    HTTP,
52    /// WebSocket server
53    WebSocket,
54    /// gRPC server
55    GRPC,
56}
57
58impl ServerConfig {
59    /// Create a new server configuration
60    pub fn new(host: String, port: u16, server_type: ServerType) -> Self {
61        Self {
62            host,
63            port,
64            server_type,
65        }
66    }
67
68    /// Create HTTP server configuration
69    pub fn http(port: u16) -> Self {
70        Self::new("0.0.0.0".to_string(), port, ServerType::HTTP)
71    }
72
73    /// Create WebSocket server configuration
74    pub fn websocket(port: u16) -> Self {
75        Self::new("0.0.0.0".to_string(), port, ServerType::WebSocket)
76    }
77
78    /// Create gRPC server configuration
79    pub fn grpc(port: u16) -> Self {
80        Self::new("0.0.0.0".to_string(), port, ServerType::GRPC)
81    }
82
83    /// Get the socket address for this configuration
84    pub fn socket_addr(&self) -> Result<SocketAddr, String> {
85        create_socket_addr(&self.host, self.port)
86    }
87
88    /// Get a formatted server description
89    pub fn description(&self) -> String {
90        match self.server_type {
91            ServerType::HTTP => format!("HTTP server on {}:{}", self.host, self.port),
92            ServerType::WebSocket => format!("WebSocket server on {}:{}", self.host, self.port),
93            ServerType::GRPC => format!("gRPC server on {}:{}", self.host, self.port),
94        }
95    }
96}
97
98/// Common server traits for consistent startup behavior
99///
100/// This trait allows different server implementations (HTTP, WebSocket, gRPC)
101/// to be started using a unified interface.
102pub trait ServerStarter {
103    /// Get the server type
104    fn server_type(&self) -> ServerType;
105
106    /// Get the port this server will bind to
107    fn port(&self) -> u16;
108
109    /// Start the server (implementation-specific)
110    ///
111    /// Returns a future that resolves when the server is running or fails to start.
112    fn start_server(
113        self,
114    ) -> impl std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send;
115}
116
117/// Helper function to start any server that implements ServerStarter
118///
119/// Logs server startup information and handles server initialization.
120///
121/// # Arguments
122/// * `server` - Server instance implementing ServerStarter
123///
124/// # Returns
125/// Result indicating success or failure of server startup
126pub async fn start_server<S: ServerStarter>(
127    server: S,
128) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
129    let port = server.port();
130    let server_type = server.server_type();
131
132    match server_type {
133        ServerType::HTTP => tracing::info!("HTTP listening on port {}", port),
134        ServerType::WebSocket => tracing::info!("WebSocket listening on port {}", port),
135        ServerType::GRPC => tracing::info!("gRPC listening on port {}", port),
136    }
137
138    server.start_server().await
139}
140
141/// TCP accept loop module for protocol crates
142pub mod tcp_accept {
143    use std::future::Future;
144    use std::net::SocketAddr;
145    use std::sync::Arc;
146    use tokio::net::{TcpListener, TcpStream};
147
148    /// Run a TCP accept loop that spawns a task for each incoming connection.
149    ///
150    /// This encapsulates the common pattern used by Kafka, MQTT, AMQP, SMTP, FTP,
151    /// and TCP protocol crates: bind a `TcpListener`, accept connections in a loop,
152    /// and spawn a tokio task for each one.
153    ///
154    /// The handler receives the raw `TcpStream` and the peer's `SocketAddr`.
155    /// Connection errors are logged and do not terminate the loop.
156    ///
157    /// # Arguments
158    /// * `listener` - A bound `TcpListener` ready to accept connections
159    /// * `handler` - An `Arc`-wrapped handler that will be cloned for each connection.
160    ///   The handler is a function (or closure) that takes `(TcpStream, SocketAddr)`
161    ///   and returns a `Future` resolving to `anyhow::Result<()>`.
162    ///
163    /// # Returns
164    /// This function runs forever (until the listener errors fatally).
165    /// It returns `anyhow::Result<()>` only if the accept call itself fails
166    /// with a non-transient error.
167    ///
168    /// # Example
169    ///
170    /// ```rust,no_run
171    /// use std::sync::Arc;
172    /// use tokio::net::TcpListener;
173    /// use mockforge_core::server_utils::tcp_accept::run_tcp_accept_loop;
174    ///
175    /// async fn handle(stream: tokio::net::TcpStream, addr: std::net::SocketAddr) -> anyhow::Result<()> {
176    ///     tracing::info!("connection from {}", addr);
177    ///     Ok(())
178    /// }
179    ///
180    /// # async fn example() -> anyhow::Result<()> {
181    /// let listener = TcpListener::bind("0.0.0.0:9092").await?;
182    /// let handler = Arc::new(|stream, addr| handle(stream, addr));
183    /// run_tcp_accept_loop(listener, handler).await
184    /// # }
185    /// ```
186    pub async fn run_tcp_accept_loop<F, Fut>(
187        listener: TcpListener,
188        handler: Arc<F>,
189    ) -> anyhow::Result<()>
190    where
191        F: Fn(TcpStream, SocketAddr) -> Fut + Send + Sync + 'static,
192        Fut: Future<Output = anyhow::Result<()>> + Send,
193    {
194        loop {
195            match listener.accept().await {
196                Ok((stream, addr)) => {
197                    let handler = Arc::clone(&handler);
198                    tokio::spawn(async move {
199                        if let Err(e) = handler(stream, addr).await {
200                            tracing::error!("Connection error from {}: {}", addr, e);
201                        }
202                    });
203                }
204                Err(e) => {
205                    tracing::error!("Failed to accept TCP connection: {}", e);
206                }
207            }
208        }
209    }
210}
211
212/// Server health check utilities
213pub mod health {
214    use serde::{Deserialize, Serialize};
215
216    /// Server health status information
217    #[derive(Debug, Serialize, Deserialize)]
218    pub struct HealthStatus {
219        /// Health status string (e.g., "healthy", "unhealthy: reason")
220        pub status: String,
221        /// ISO 8601 timestamp of the health check
222        pub timestamp: String,
223        /// Server uptime in seconds
224        pub uptime_seconds: u64,
225        /// Server version string
226        pub version: String,
227    }
228
229    impl HealthStatus {
230        /// Create a healthy status response
231        pub fn healthy(uptime_seconds: u64, version: &str) -> Self {
232            Self {
233                status: "healthy".to_string(),
234                timestamp: chrono::Utc::now().to_rfc3339(),
235                uptime_seconds,
236                version: version.to_string(),
237            }
238        }
239
240        /// Create an unhealthy status response with a reason
241        pub fn unhealthy(reason: &str, uptime_seconds: u64, version: &str) -> Self {
242            Self {
243                status: format!("unhealthy: {}", reason),
244                timestamp: chrono::Utc::now().to_rfc3339(),
245                uptime_seconds,
246                version: version.to_string(),
247            }
248        }
249    }
250}
251
252/// Common error response utilities
253pub mod errors {
254    use axum::{http::StatusCode, Json};
255    use serde_json::json;
256
257    /// Create a standard JSON error response for HTTP handlers
258    ///
259    /// # Arguments
260    /// * `status` - HTTP status code (e.g., 400, 500)
261    /// * `message` - Error message
262    ///
263    /// # Returns
264    /// Tuple of (status_code, JSON response) for use with Axum handlers
265    pub fn json_error(status: StatusCode, message: &str) -> (StatusCode, Json<serde_json::Value>) {
266        let error_response = json!({
267            "error": {
268                "message": message,
269                "status_code": status.as_u16()
270            },
271            "timestamp": chrono::Utc::now().to_rfc3339()
272        });
273
274        (status, Json(error_response))
275    }
276
277    /// Create a standard JSON success response for HTTP handlers
278    ///
279    /// # Arguments
280    /// * `data` - Serializable data to include in the response
281    ///
282    /// # Returns
283    /// Tuple of (HTTP 200 OK, JSON response) for use with Axum handlers
284    pub fn json_success<T: serde::Serialize>(data: T) -> (StatusCode, Json<serde_json::Value>) {
285        let success_response = json!({
286            "success": true,
287            "data": data,
288            "timestamp": chrono::Utc::now().to_rfc3339()
289        });
290
291        (StatusCode::OK, Json(success_response))
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298
299    #[test]
300    fn test_create_socket_addr() {
301        let addr = create_socket_addr("127.0.0.1", 9080).unwrap();
302        assert_eq!(addr.to_string(), "127.0.0.1:9080");
303    }
304
305    #[test]
306    fn test_server_config() {
307        let config = ServerConfig::http(3000);
308        assert_eq!(config.port, 3000);
309        assert_eq!(config.host, "0.0.0.0");
310        matches!(config.server_type, ServerType::HTTP);
311    }
312
313    #[test]
314    fn test_localhost_socket_addr() {
315        let addr = localhost_socket_addr(9080);
316        assert_eq!(addr.to_string(), "127.0.0.1:9080");
317    }
318
319    #[test]
320    fn test_wildcard_socket_addr() {
321        let addr = wildcard_socket_addr(9080);
322        assert_eq!(addr.to_string(), "0.0.0.0:9080");
323    }
324
325    #[tokio::test]
326    async fn test_tcp_accept_loop_handles_connection() {
327        use std::sync::atomic::{AtomicU64, Ordering};
328        use std::sync::Arc;
329        use tokio::net::TcpListener;
330
331        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
332        let port = listener.local_addr().unwrap().port();
333
334        let connection_count = Arc::new(AtomicU64::new(0));
335        let count_clone = connection_count.clone();
336
337        let handler =
338            Arc::new(move |_stream: tokio::net::TcpStream, _addr: std::net::SocketAddr| {
339                let count = count_clone.clone();
340                async move {
341                    count.fetch_add(1, Ordering::Relaxed);
342                    Ok(())
343                }
344            });
345
346        // Spawn the accept loop in background
347        let loop_handle = tokio::spawn(tcp_accept::run_tcp_accept_loop(listener, handler));
348
349        // Connect a client
350        let _client = tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port)).await.unwrap();
351
352        // Give the handler time to run
353        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
354
355        assert_eq!(connection_count.load(Ordering::Relaxed), 1);
356
357        loop_handle.abort();
358    }
359}