1use crate::{TcpConfig, TcpSpecRegistry};
4use mockforge_core::Result;
5use std::net::SocketAddr;
6use std::sync::Arc;
7use tokio::io::{AsyncReadExt, AsyncWriteExt};
8use tokio::net::{TcpListener, TcpStream};
9use tokio::time::{sleep, timeout, Duration};
10use tracing::{debug, error, info, warn};
11
12pub struct TcpServer {
14 config: TcpConfig,
15 spec_registry: Arc<TcpSpecRegistry>,
16}
17
18impl TcpServer {
19 pub fn new(config: TcpConfig, spec_registry: Arc<TcpSpecRegistry>) -> Result<Self> {
21 Ok(Self {
22 config,
23 spec_registry,
24 })
25 }
26
27 pub async fn start(&self) -> Result<()> {
29 let addr = format!("{}:{}", self.config.host, self.config.port);
30 let listener = TcpListener::bind(&addr).await?;
31
32 info!("TCP server listening on {}", addr);
33
34 loop {
35 match listener.accept().await {
36 Ok((stream, peer_addr)) => {
37 debug!("New TCP connection from {}", peer_addr);
38
39 let registry = self.spec_registry.clone();
40 let config = self.config.clone();
41
42 tokio::spawn(async move {
43 if let Err(e) =
44 handle_tcp_connection(stream, peer_addr, registry, config).await
45 {
46 error!("TCP connection error from {}: {}", peer_addr, e);
47 }
48 });
49 }
50 Err(e) => {
51 error!("Failed to accept TCP connection: {}", e);
52 }
53 }
54 }
55 }
56}
57
58async fn handle_tcp_connection(
60 mut stream: TcpStream,
61 peer_addr: SocketAddr,
62 registry: Arc<TcpSpecRegistry>,
63 config: TcpConfig,
64) -> Result<()> {
65 debug!("Handling TCP connection from {}", peer_addr);
66
67 let mut buffer = vec![0u8; config.read_buffer_size];
68 let mut accumulated_data = Vec::new();
69
70 loop {
71 let read_timeout = Duration::from_secs(config.timeout_secs);
73
74 match timeout(read_timeout, stream.read(&mut buffer)).await {
75 Ok(Ok(0)) => {
76 debug!("TCP connection closed by client: {}", peer_addr);
78 break;
79 }
80 Ok(Ok(n)) => {
81 let received_data = &buffer[..n];
82 accumulated_data.extend_from_slice(received_data);
83
84 debug!("Received {} bytes from {}", n, peer_addr);
85
86 let response_data =
88 if let Some(fixture) = registry.find_matching_fixture(&accumulated_data) {
89 debug!("Found matching fixture: {}", fixture.identifier);
90
91 if fixture.response.delay_ms > 0 {
93 sleep(Duration::from_millis(fixture.response.delay_ms)).await;
94 }
95
96 generate_response_data(&fixture.response)?
98 } else if config.echo_mode {
99 debug!("No fixture match, echoing data back");
101 accumulated_data.clone()
102 } else {
103 warn!("No fixture match and echo mode disabled, closing connection");
105 break;
106 };
107
108 if !response_data.is_empty() {
110 if let Err(e) = stream.write_all(&response_data).await {
111 error!("Failed to write response to {}: {}", peer_addr, e);
112 break;
113 }
114
115 if let Err(e) = stream.flush().await {
116 error!("Failed to flush response to {}: {}", peer_addr, e);
117 break;
118 }
119 }
120
121 if let Some(fixture) = registry.find_matching_fixture(&accumulated_data) {
123 if fixture.response.close_after_response {
124 debug!("Closing connection after response as configured");
125 break;
126 }
127
128 if !fixture.response.keep_alive {
129 debug!("Closing connection (keep_alive=false)");
130 break;
131 }
132 } else if !config.echo_mode {
133 break;
135 }
136
137 if let Some(ref delimiter) = config.delimiter {
139 if accumulated_data.ends_with(delimiter) {
140 debug!("Received complete message (matched delimiter), resetting buffer");
141 accumulated_data.clear();
142 }
143 } else {
144 accumulated_data.clear();
146 }
147 }
148 Ok(Err(e)) => {
149 error!("TCP read error from {}: {}", peer_addr, e);
150 break;
151 }
152 Err(_) => {
153 warn!("TCP read timeout from {}", peer_addr);
154 break;
155 }
156 }
157 }
158
159 debug!("TCP connection handler finished for {}", peer_addr);
160 Ok(())
161}
162
163fn generate_response_data(response: &crate::fixtures::TcpResponse) -> Result<Vec<u8>> {
165 match response.encoding.as_str() {
166 "hex" => hex::decode(&response.data)
167 .map_err(|e| mockforge_core::Error::generic(format!("Invalid hex data: {}", e))),
168 "base64" => base64::decode(&response.data)
169 .map_err(|e| mockforge_core::Error::generic(format!("Invalid base64 data: {}", e))),
170 "text" => Ok(response.data.as_bytes().to_vec()),
171 "file" => {
172 let file_path = response.file_path.as_ref().ok_or_else(|| {
173 mockforge_core::Error::generic("file_path not specified for file encoding")
174 })?;
175
176 std::fs::read(file_path).map_err(|e| {
177 mockforge_core::Error::generic(format!(
178 "Failed to read file {:?}: {}",
179 file_path, e
180 ))
181 })
182 }
183 _ => Err(mockforge_core::Error::generic(format!(
184 "Unknown encoding: {}. Supported: hex, base64, text, file",
185 response.encoding
186 ))),
187 }
188}