1use std::io::{self, Read, Write};
8use std::net::{TcpListener, TcpStream};
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use std::thread;
12
13use rns_core::constants;
14use rns_core::transport::types::{InterfaceId, InterfaceInfo};
15
16use crate::event::{Event, EventSender};
17use crate::hdlc;
18use crate::interface::Writer;
19
20#[derive(Debug, Clone)]
22pub struct TcpServerConfig {
23 pub name: String,
24 pub listen_ip: String,
25 pub listen_port: u16,
26 pub interface_id: InterfaceId,
27}
28
29impl Default for TcpServerConfig {
30 fn default() -> Self {
31 TcpServerConfig {
32 name: String::new(),
33 listen_ip: "0.0.0.0".into(),
34 listen_port: 4242,
35 interface_id: InterfaceId(0),
36 }
37 }
38}
39
40struct TcpServerWriter {
42 stream: TcpStream,
43}
44
45impl Writer for TcpServerWriter {
46 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
47 self.stream.write_all(&hdlc::frame(data))
48 }
49}
50
51pub fn start(
57 config: TcpServerConfig,
58 tx: EventSender,
59 next_id: Arc<AtomicU64>,
60) -> io::Result<()> {
61 let addr = format!("{}:{}", config.listen_ip, config.listen_port);
62 let listener = TcpListener::bind(&addr)?;
63
64 log::info!("[{}] TCP server listening on {}", config.name, addr);
65
66 let name = config.name.clone();
67 thread::Builder::new()
68 .name(format!("tcp-server-{}", config.interface_id.0))
69 .spawn(move || {
70 listener_loop(listener, name, tx, next_id);
71 })?;
72
73 Ok(())
74}
75
76fn listener_loop(
78 listener: TcpListener,
79 name: String,
80 tx: EventSender,
81 next_id: Arc<AtomicU64>,
82) {
83 for stream_result in listener.incoming() {
84 let stream = match stream_result {
85 Ok(s) => s,
86 Err(e) => {
87 log::warn!("[{}] accept failed: {}", name, e);
88 continue;
89 }
90 };
91
92 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
93 let peer_addr = stream.peer_addr().ok();
94
95 log::info!(
96 "[{}] client connected: {:?} → id {}",
97 name,
98 peer_addr,
99 client_id.0
100 );
101
102 if let Err(e) = stream.set_nodelay(true) {
104 log::warn!("[{}] set_nodelay failed: {}", name, e);
105 }
106
107 let writer_stream = match stream.try_clone() {
109 Ok(s) => s,
110 Err(e) => {
111 log::warn!("[{}] failed to clone stream: {}", name, e);
112 continue;
113 }
114 };
115
116 let writer: Box<dyn Writer> = Box::new(TcpServerWriter { stream: writer_stream });
117
118 let info = InterfaceInfo {
119 id: client_id,
120 name: format!("TCPServerInterface/Client-{}", client_id.0),
121 mode: constants::MODE_FULL,
122 out_capable: true,
123 in_capable: true,
124 bitrate: None,
125 announce_rate_target: None,
126 announce_rate_grace: 0,
127 announce_rate_penalty: 0.0,
128 announce_cap: constants::ANNOUNCE_CAP,
129 is_local_client: false,
130 wants_tunnel: false,
131 tunnel_id: None,
132 mtu: 65535,
133 ia_freq: 0.0,
134 started: 0.0,
135 ingress_control: true,
136 };
137
138 if tx
140 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
141 .is_err()
142 {
143 return;
145 }
146
147 let client_tx = tx.clone();
149 let client_name = name.clone();
150 thread::Builder::new()
151 .name(format!("tcp-server-reader-{}", client_id.0))
152 .spawn(move || {
153 client_reader_loop(stream, client_id, client_name, client_tx);
154 })
155 .ok();
156 }
157}
158
159fn client_reader_loop(
161 mut stream: TcpStream,
162 id: InterfaceId,
163 name: String,
164 tx: EventSender,
165) {
166 let mut decoder = hdlc::Decoder::new();
167 let mut buf = [0u8; 4096];
168
169 loop {
170 match stream.read(&mut buf) {
171 Ok(0) => {
172 log::info!("[{}] client {} disconnected", name, id.0);
173 let _ = tx.send(Event::InterfaceDown(id));
174 return;
175 }
176 Ok(n) => {
177 for frame in decoder.feed(&buf[..n]) {
178 if tx
179 .send(Event::Frame {
180 interface_id: id,
181 data: frame,
182 })
183 .is_err()
184 {
185 return;
187 }
188 }
189 }
190 Err(e) => {
191 log::warn!("[{}] client {} read error: {}", name, id.0, e);
192 let _ = tx.send(Event::InterfaceDown(id));
193 return;
194 }
195 }
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202 use std::net::TcpStream;
203 use std::sync::mpsc;
204 use std::time::Duration;
205
206 fn find_free_port() -> u16 {
207 TcpListener::bind("127.0.0.1:0")
208 .unwrap()
209 .local_addr()
210 .unwrap()
211 .port()
212 }
213
214 #[test]
215 fn accept_connection() {
216 let port = find_free_port();
217 let (tx, rx) = mpsc::channel();
218 let next_id = Arc::new(AtomicU64::new(1000));
219
220 let config = TcpServerConfig {
221 name: "test-server".into(),
222 listen_ip: "127.0.0.1".into(),
223 listen_port: port,
224 interface_id: InterfaceId(1),
225 };
226
227 start(config, tx, next_id).unwrap();
228
229 thread::sleep(Duration::from_millis(50));
231
232 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
234
235 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
237 match event {
238 Event::InterfaceUp(id, writer, info) => {
239 assert_eq!(id, InterfaceId(1000));
240 assert!(writer.is_some());
241 assert!(info.is_some());
242 }
243 other => panic!("expected InterfaceUp, got {:?}", other),
244 }
245 }
246
247 #[test]
248 fn receive_frame_from_client() {
249 let port = find_free_port();
250 let (tx, rx) = mpsc::channel();
251 let next_id = Arc::new(AtomicU64::new(2000));
252
253 let config = TcpServerConfig {
254 name: "test-server".into(),
255 listen_ip: "127.0.0.1".into(),
256 listen_port: port,
257 interface_id: InterfaceId(2),
258 };
259
260 start(config, tx, next_id).unwrap();
261 thread::sleep(Duration::from_millis(50));
262
263 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
264
265 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
267
268 let payload: Vec<u8> = (0..32).collect();
270 let framed = hdlc::frame(&payload);
271 client.write_all(&framed).unwrap();
272
273 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
275 match event {
276 Event::Frame { interface_id, data } => {
277 assert_eq!(interface_id, InterfaceId(2000));
278 assert_eq!(data, payload);
279 }
280 other => panic!("expected Frame, got {:?}", other),
281 }
282 }
283
284 #[test]
285 fn send_frame_to_client() {
286 let port = find_free_port();
287 let (tx, rx) = mpsc::channel();
288 let next_id = Arc::new(AtomicU64::new(3000));
289
290 let config = TcpServerConfig {
291 name: "test-server".into(),
292 listen_ip: "127.0.0.1".into(),
293 listen_port: port,
294 interface_id: InterfaceId(3),
295 };
296
297 start(config, tx, next_id).unwrap();
298 thread::sleep(Duration::from_millis(50));
299
300 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
301 client.set_read_timeout(Some(Duration::from_secs(2))).unwrap();
302
303 let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
305 let mut writer = match event {
306 Event::InterfaceUp(_, Some(w), _) => w,
307 other => panic!("expected InterfaceUp with writer, got {:?}", other),
308 };
309
310 let payload: Vec<u8> = (0..24).collect();
312 writer.send_frame(&payload).unwrap();
313
314 let mut buf = [0u8; 256];
316 let n = client.read(&mut buf).unwrap();
317 let expected = hdlc::frame(&payload);
318 assert_eq!(&buf[..n], &expected[..]);
319 }
320
321 #[test]
322 fn multiple_clients() {
323 let port = find_free_port();
324 let (tx, rx) = mpsc::channel();
325 let next_id = Arc::new(AtomicU64::new(4000));
326
327 let config = TcpServerConfig {
328 name: "test-server".into(),
329 listen_ip: "127.0.0.1".into(),
330 listen_port: port,
331 interface_id: InterfaceId(4),
332 };
333
334 start(config, tx, next_id).unwrap();
335 thread::sleep(Duration::from_millis(50));
336
337 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
339 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
340
341 let mut ids = Vec::new();
343 for _ in 0..2 {
344 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
345 match event {
346 Event::InterfaceUp(id, _, _) => ids.push(id),
347 other => panic!("expected InterfaceUp, got {:?}", other),
348 }
349 }
350
351 assert_eq!(ids.len(), 2);
353 assert_ne!(ids[0], ids[1]);
354 }
355
356 #[test]
357 fn client_disconnect() {
358 let port = find_free_port();
359 let (tx, rx) = mpsc::channel();
360 let next_id = Arc::new(AtomicU64::new(5000));
361
362 let config = TcpServerConfig {
363 name: "test-server".into(),
364 listen_ip: "127.0.0.1".into(),
365 listen_port: port,
366 interface_id: InterfaceId(5),
367 };
368
369 start(config, tx, next_id).unwrap();
370 thread::sleep(Duration::from_millis(50));
371
372 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
373
374 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
376
377 drop(client);
379
380 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
382 assert!(
383 matches!(event, Event::InterfaceDown(InterfaceId(5000))),
384 "expected InterfaceDown(5000), got {:?}",
385 event
386 );
387 }
388
389 #[test]
390 fn server_bind_port() {
391 let port = find_free_port();
392 let (tx, _rx) = mpsc::channel();
393 let next_id = Arc::new(AtomicU64::new(6000));
394
395 let config = TcpServerConfig {
396 name: "test-server".into(),
397 listen_ip: "127.0.0.1".into(),
398 listen_port: port,
399 interface_id: InterfaceId(6),
400 };
401
402 start(config, tx, next_id).unwrap();
404 }
405}