mockforge_tcp/
server.rs

1//! TCP server implementation
2
3use crate::{TcpConfig, TcpSpecRegistry};
4use mockforge_core::Result;
5use std::net::SocketAddr;
6use std::sync::Arc;
7use tokio::io::{AsyncReadExt, AsyncWriteExt};
8use tokio::net::{TcpListener, TcpStream};
9use tokio::time::{sleep, timeout, Duration};
10use tracing::{debug, error, info, warn};
11
12/// TCP server
13pub struct TcpServer {
14    config: TcpConfig,
15    spec_registry: Arc<TcpSpecRegistry>,
16}
17
18impl TcpServer {
19    /// Create a new TCP server
20    pub fn new(config: TcpConfig, spec_registry: Arc<TcpSpecRegistry>) -> Result<Self> {
21        Ok(Self {
22            config,
23            spec_registry,
24        })
25    }
26
27    /// Start the TCP server
28    pub async fn start(&self) -> Result<()> {
29        let addr = format!("{}:{}", self.config.host, self.config.port);
30        let listener = TcpListener::bind(&addr).await?;
31
32        info!("TCP server listening on {}", addr);
33
34        loop {
35            match listener.accept().await {
36                Ok((stream, peer_addr)) => {
37                    debug!("New TCP connection from {}", peer_addr);
38
39                    let registry = self.spec_registry.clone();
40                    let config = self.config.clone();
41
42                    tokio::spawn(async move {
43                        if let Err(e) =
44                            handle_tcp_connection(stream, peer_addr, registry, config).await
45                        {
46                            error!("TCP connection error from {}: {}", peer_addr, e);
47                        }
48                    });
49                }
50                Err(e) => {
51                    error!("Failed to accept TCP connection: {}", e);
52                }
53            }
54        }
55    }
56}
57
58/// Handle a single TCP connection
59async fn handle_tcp_connection(
60    mut stream: TcpStream,
61    peer_addr: SocketAddr,
62    registry: Arc<TcpSpecRegistry>,
63    config: TcpConfig,
64) -> Result<()> {
65    debug!("Handling TCP connection from {}", peer_addr);
66
67    let mut buffer = vec![0u8; config.read_buffer_size];
68    let mut accumulated_data = Vec::new();
69
70    loop {
71        // Set read timeout
72        let read_timeout = Duration::from_secs(config.timeout_secs);
73
74        match timeout(read_timeout, stream.read(&mut buffer)).await {
75            Ok(Ok(0)) => {
76                // Connection closed by client
77                debug!("TCP connection closed by client: {}", peer_addr);
78                break;
79            }
80            Ok(Ok(n)) => {
81                let received_data = &buffer[..n];
82                accumulated_data.extend_from_slice(received_data);
83
84                debug!("Received {} bytes from {}", n, peer_addr);
85
86                // Try to find matching fixture
87                let response_data =
88                    if let Some(fixture) = registry.find_matching_fixture(&accumulated_data) {
89                        debug!("Found matching fixture: {}", fixture.identifier);
90
91                        // Apply delay if configured
92                        if fixture.response.delay_ms > 0 {
93                            sleep(Duration::from_millis(fixture.response.delay_ms)).await;
94                        }
95
96                        // Generate response data
97                        generate_response_data(&fixture.response)?
98                    } else if config.echo_mode {
99                        // Echo mode: echo back received data
100                        debug!("No fixture match, echoing data back");
101                        accumulated_data.clone()
102                    } else {
103                        // No match and echo mode disabled - close connection
104                        warn!("No fixture match and echo mode disabled, closing connection");
105                        break;
106                    };
107
108                // Send response
109                if !response_data.is_empty() {
110                    if let Err(e) = stream.write_all(&response_data).await {
111                        error!("Failed to write response to {}: {}", peer_addr, e);
112                        break;
113                    }
114
115                    if let Err(e) = stream.flush().await {
116                        error!("Failed to flush response to {}: {}", peer_addr, e);
117                        break;
118                    }
119                }
120
121                // Check if we should close after response
122                if let Some(fixture) = registry.find_matching_fixture(&accumulated_data) {
123                    if fixture.response.close_after_response {
124                        debug!("Closing connection after response as configured");
125                        break;
126                    }
127
128                    if !fixture.response.keep_alive {
129                        debug!("Closing connection (keep_alive=false)");
130                        break;
131                    }
132                } else if !config.echo_mode {
133                    // Close if echo mode disabled and no fixture matched
134                    break;
135                }
136
137                // If delimiter is configured, check if we've received complete message
138                if let Some(ref delimiter) = config.delimiter {
139                    if accumulated_data.ends_with(delimiter) {
140                        debug!("Received complete message (matched delimiter), resetting buffer");
141                        accumulated_data.clear();
142                    }
143                } else {
144                    // Stream mode: reset buffer for next read
145                    accumulated_data.clear();
146                }
147            }
148            Ok(Err(e)) => {
149                error!("TCP read error from {}: {}", peer_addr, e);
150                break;
151            }
152            Err(_) => {
153                warn!("TCP read timeout from {}", peer_addr);
154                break;
155            }
156        }
157    }
158
159    debug!("TCP connection handler finished for {}", peer_addr);
160    Ok(())
161}
162
163/// Generate response data from fixture configuration
164fn generate_response_data(response: &crate::fixtures::TcpResponse) -> Result<Vec<u8>> {
165    match response.encoding.as_str() {
166        "hex" => hex::decode(&response.data)
167            .map_err(|e| mockforge_core::Error::generic(format!("Invalid hex data: {}", e))),
168        "base64" => base64::decode(&response.data)
169            .map_err(|e| mockforge_core::Error::generic(format!("Invalid base64 data: {}", e))),
170        "text" => Ok(response.data.as_bytes().to_vec()),
171        "file" => {
172            let file_path = response.file_path.as_ref().ok_or_else(|| {
173                mockforge_core::Error::generic("file_path not specified for file encoding")
174            })?;
175
176            std::fs::read(file_path).map_err(|e| {
177                mockforge_core::Error::generic(format!(
178                    "Failed to read file {:?}: {}",
179                    file_path, e
180                ))
181            })
182        }
183        _ => Err(mockforge_core::Error::generic(format!(
184            "Unknown encoding: {}. Supported: hex, base64, text, file",
185            response.encoding
186        ))),
187    }
188}