1use std::io::{self, Read, Write};
8use std::net::{TcpListener, TcpStream};
9use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
10use std::sync::{Arc, Mutex};
11use std::thread;
12
13use rns_core::constants;
14use rns_core::transport::types::{InterfaceId, InterfaceInfo};
15
16use crate::event::{Event, EventSender};
17use crate::hdlc;
18use crate::interface::{ListenerControl, Writer};
19
20#[derive(Debug, Clone)]
22pub struct TcpServerConfig {
23 pub name: String,
24 pub listen_ip: String,
25 pub listen_port: u16,
26 pub interface_id: InterfaceId,
27 pub max_connections: Option<usize>,
28 pub runtime: Arc<Mutex<TcpServerRuntime>>,
29}
30
31#[derive(Debug, Clone)]
32pub struct TcpServerRuntime {
33 pub max_connections: Option<usize>,
34}
35
36impl TcpServerRuntime {
37 pub fn from_config(config: &TcpServerConfig) -> Self {
38 Self {
39 max_connections: config.max_connections,
40 }
41 }
42}
43
44#[derive(Debug, Clone)]
45pub struct TcpServerRuntimeConfigHandle {
46 pub interface_name: String,
47 pub runtime: Arc<Mutex<TcpServerRuntime>>,
48 pub startup: TcpServerRuntime,
49}
50
51impl Default for TcpServerConfig {
52 fn default() -> Self {
53 let mut config = TcpServerConfig {
54 name: String::new(),
55 listen_ip: "0.0.0.0".into(),
56 listen_port: 4242,
57 interface_id: InterfaceId(0),
58 max_connections: None,
59 runtime: Arc::new(Mutex::new(TcpServerRuntime {
60 max_connections: None,
61 })),
62 };
63 let startup = TcpServerRuntime::from_config(&config);
64 config.runtime = Arc::new(Mutex::new(startup));
65 config
66 }
67}
68
69struct TcpServerWriter {
71 stream: TcpStream,
72}
73
74impl Writer for TcpServerWriter {
75 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
76 self.stream.write_all(&hdlc::frame(data))
77 }
78}
79
80pub fn start(
86 config: TcpServerConfig,
87 tx: EventSender,
88 next_id: Arc<AtomicU64>,
89) -> io::Result<ListenerControl> {
90 let addr = format!("{}:{}", config.listen_ip, config.listen_port);
91 let listener = TcpListener::bind(&addr)?;
92 listener.set_nonblocking(true)?;
93
94 log::info!("[{}] TCP server listening on {}", config.name, addr);
95
96 let name = config.name.clone();
97 let runtime = Arc::clone(&config.runtime);
98 let active_connections = Arc::new(AtomicUsize::new(0));
99 let control = ListenerControl::new();
100 let listener_control = control.clone();
101 thread::Builder::new()
102 .name(format!("tcp-server-{}", config.interface_id.0))
103 .spawn(move || {
104 listener_loop(
105 listener,
106 name,
107 tx,
108 next_id,
109 runtime,
110 active_connections,
111 listener_control,
112 );
113 })?;
114
115 Ok(control)
116}
117
118fn listener_loop(
120 listener: TcpListener,
121 name: String,
122 tx: EventSender,
123 next_id: Arc<AtomicU64>,
124 runtime: Arc<Mutex<TcpServerRuntime>>,
125 active_connections: Arc<AtomicUsize>,
126 control: ListenerControl,
127) {
128 loop {
129 if control.should_stop() {
130 log::info!("[{}] listener stopping", name);
131 return;
132 }
133
134 let stream_result = listener.accept().map(|(stream, _)| stream);
135 let stream = match stream_result {
136 Ok(s) => s,
137 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
138 thread::sleep(std::time::Duration::from_millis(50));
139 continue;
140 }
141 Err(e) => {
142 log::warn!("[{}] accept failed: {}", name, e);
143 continue;
144 }
145 };
146
147 let max_connections = runtime.lock().unwrap().max_connections;
148 if let Some(max) = max_connections {
149 if active_connections.load(Ordering::Relaxed) >= max {
150 let peer = stream.peer_addr().ok();
151 log::warn!(
152 "[{}] max connections ({}) reached, rejecting {:?}",
153 name,
154 max,
155 peer
156 );
157 drop(stream);
158 continue;
159 }
160 }
161
162 active_connections.fetch_add(1, Ordering::Relaxed);
163
164 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
165 let peer_addr = stream.peer_addr().ok();
166
167 log::info!(
168 "[{}] client connected: {:?} → id {}",
169 name,
170 peer_addr,
171 client_id.0
172 );
173
174 if let Err(e) = stream.set_nodelay(true) {
176 log::warn!("[{}] set_nodelay failed: {}", name, e);
177 }
178
179 let writer_stream = match stream.try_clone() {
181 Ok(s) => s,
182 Err(e) => {
183 log::warn!("[{}] failed to clone stream: {}", name, e);
184 continue;
185 }
186 };
187
188 let writer: Box<dyn Writer> = Box::new(TcpServerWriter {
189 stream: writer_stream,
190 });
191
192 let info = InterfaceInfo {
193 id: client_id,
194 name: format!("TCPServerInterface/Client-{}", client_id.0),
195 mode: constants::MODE_FULL,
196 out_capable: true,
197 in_capable: true,
198 bitrate: None,
199 announce_rate_target: None,
200 announce_rate_grace: 0,
201 announce_rate_penalty: 0.0,
202 announce_cap: constants::ANNOUNCE_CAP,
203 is_local_client: false,
204 wants_tunnel: false,
205 tunnel_id: None,
206 mtu: 65535,
207 ia_freq: 0.0,
208 started: 0.0,
209 ingress_control: true,
210 };
211
212 if tx
214 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
215 .is_err()
216 {
217 return;
219 }
220
221 let client_tx = tx.clone();
223 let client_name = name.clone();
224 let client_active = active_connections.clone();
225 thread::Builder::new()
226 .name(format!("tcp-server-reader-{}", client_id.0))
227 .spawn(move || {
228 client_reader_loop(stream, client_id, client_name, client_tx, client_active);
229 })
230 .ok();
231 }
232}
233
234fn client_reader_loop(
236 mut stream: TcpStream,
237 id: InterfaceId,
238 name: String,
239 tx: EventSender,
240 active_connections: Arc<AtomicUsize>,
241) {
242 let mut decoder = hdlc::Decoder::new();
243 let mut buf = [0u8; 4096];
244
245 loop {
246 match stream.read(&mut buf) {
247 Ok(0) => {
248 log::info!("[{}] client {} disconnected", name, id.0);
249 active_connections.fetch_sub(1, Ordering::Relaxed);
250 let _ = tx.send(Event::InterfaceDown(id));
251 return;
252 }
253 Ok(n) => {
254 for frame in decoder.feed(&buf[..n]) {
255 if tx
256 .send(Event::Frame {
257 interface_id: id,
258 data: frame,
259 })
260 .is_err()
261 {
262 active_connections.fetch_sub(1, Ordering::Relaxed);
264 return;
265 }
266 }
267 }
268 Err(e) => {
269 log::warn!("[{}] client {} read error: {}", name, id.0, e);
270 active_connections.fetch_sub(1, Ordering::Relaxed);
271 let _ = tx.send(Event::InterfaceDown(id));
272 return;
273 }
274 }
275 }
276}
277
278use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
281use std::collections::HashMap;
282
283pub struct TcpServerFactory;
285
286impl InterfaceFactory for TcpServerFactory {
287 fn type_name(&self) -> &str {
288 "TCPServerInterface"
289 }
290
291 fn parse_config(
292 &self,
293 name: &str,
294 id: InterfaceId,
295 params: &HashMap<String, String>,
296 ) -> Result<Box<dyn InterfaceConfigData>, String> {
297 let listen_ip = params
298 .get("listen_ip")
299 .cloned()
300 .unwrap_or_else(|| "0.0.0.0".into());
301 let listen_port = params
302 .get("listen_port")
303 .and_then(|v| v.parse().ok())
304 .unwrap_or(4242);
305 let max_connections = params.get("max_connections").and_then(|v| v.parse().ok());
306 let mut config = TcpServerConfig {
307 name: name.to_string(),
308 listen_ip,
309 listen_port,
310 interface_id: id,
311 max_connections,
312 runtime: Arc::new(Mutex::new(TcpServerRuntime {
313 max_connections: None,
314 })),
315 };
316 let startup = TcpServerRuntime::from_config(&config);
317 config.runtime = Arc::new(Mutex::new(startup));
318 Ok(Box::new(config))
319 }
320
321 fn start(
322 &self,
323 config: Box<dyn InterfaceConfigData>,
324 ctx: StartContext,
325 ) -> io::Result<StartResult> {
326 let cfg = *config
327 .into_any()
328 .downcast::<TcpServerConfig>()
329 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
330 let control = start(cfg, ctx.tx, ctx.next_dynamic_id)?;
331 Ok(StartResult::Listener {
332 control: Some(control),
333 })
334 }
335}
336
337pub(crate) fn runtime_handle_from_config(config: &TcpServerConfig) -> TcpServerRuntimeConfigHandle {
338 TcpServerRuntimeConfigHandle {
339 interface_name: config.name.clone(),
340 runtime: Arc::clone(&config.runtime),
341 startup: TcpServerRuntime::from_config(config),
342 }
343}
344
345#[cfg(test)]
346mod tests {
347 use super::*;
348 use std::net::TcpStream;
349 use std::sync::mpsc::RecvTimeoutError;
350 use std::time::Duration;
351
352 fn find_free_port() -> u16 {
353 TcpListener::bind("127.0.0.1:0")
354 .unwrap()
355 .local_addr()
356 .unwrap()
357 .port()
358 }
359
360 fn make_server_config(
361 port: u16,
362 interface_id: u64,
363 max_connections: Option<usize>,
364 ) -> TcpServerConfig {
365 let mut config = TcpServerConfig {
366 name: "test-server".into(),
367 listen_ip: "127.0.0.1".into(),
368 listen_port: port,
369 interface_id: InterfaceId(interface_id),
370 max_connections,
371 runtime: Arc::new(Mutex::new(TcpServerRuntime {
372 max_connections: None,
373 })),
374 };
375 let startup = TcpServerRuntime::from_config(&config);
376 config.runtime = Arc::new(Mutex::new(startup));
377 config
378 }
379
380 #[test]
381 fn accept_connection() {
382 let port = find_free_port();
383 let (tx, rx) = crate::event::channel();
384 let next_id = Arc::new(AtomicU64::new(1000));
385
386 let config = make_server_config(port, 1, None);
387
388 start(config, tx, next_id).unwrap();
389
390 thread::sleep(Duration::from_millis(50));
392
393 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
395
396 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
398 match event {
399 Event::InterfaceUp(id, writer, info) => {
400 assert_eq!(id, InterfaceId(1000));
401 assert!(writer.is_some());
402 assert!(info.is_some());
403 }
404 other => panic!("expected InterfaceUp, got {:?}", other),
405 }
406 }
407
408 #[test]
409 fn listener_stop_prevents_new_accepts() {
410 let port = find_free_port();
411 let (tx, rx) = crate::event::channel();
412 let next_id = Arc::new(AtomicU64::new(1500));
413
414 let config = make_server_config(port, 15, None);
415 let control = start(config, tx, next_id).unwrap();
416
417 thread::sleep(Duration::from_millis(50));
418 control.request_stop();
419 thread::sleep(Duration::from_millis(120));
420
421 let connect_result = TcpStream::connect(format!("127.0.0.1:{}", port));
422 if let Ok(stream) = connect_result {
423 drop(stream);
424 }
425
426 match rx.recv_timeout(Duration::from_millis(200)) {
427 Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => {}
428 other => panic!(
429 "expected no InterfaceUp after listener stop, got {:?}",
430 other
431 ),
432 }
433 }
434
435 #[test]
436 fn receive_frame_from_client() {
437 let port = find_free_port();
438 let (tx, rx) = crate::event::channel();
439 let next_id = Arc::new(AtomicU64::new(2000));
440
441 let config = make_server_config(port, 2, None);
442
443 start(config, tx, next_id).unwrap();
444 thread::sleep(Duration::from_millis(50));
445
446 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
447
448 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
450
451 let payload: Vec<u8> = (0..32).collect();
453 let framed = hdlc::frame(&payload);
454 client.write_all(&framed).unwrap();
455
456 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
458 match event {
459 Event::Frame { interface_id, data } => {
460 assert_eq!(interface_id, InterfaceId(2000));
461 assert_eq!(data, payload);
462 }
463 other => panic!("expected Frame, got {:?}", other),
464 }
465 }
466
467 #[test]
468 fn send_frame_to_client() {
469 let port = find_free_port();
470 let (tx, rx) = crate::event::channel();
471 let next_id = Arc::new(AtomicU64::new(3000));
472
473 let config = make_server_config(port, 3, None);
474
475 start(config, tx, next_id).unwrap();
476 thread::sleep(Duration::from_millis(50));
477
478 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
479 client
480 .set_read_timeout(Some(Duration::from_secs(2)))
481 .unwrap();
482
483 let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
485 let mut writer = match event {
486 Event::InterfaceUp(_, Some(w), _) => w,
487 other => panic!("expected InterfaceUp with writer, got {:?}", other),
488 };
489
490 let payload: Vec<u8> = (0..24).collect();
492 writer.send_frame(&payload).unwrap();
493
494 let mut buf = [0u8; 256];
496 let n = client.read(&mut buf).unwrap();
497 let expected = hdlc::frame(&payload);
498 assert_eq!(&buf[..n], &expected[..]);
499 }
500
501 #[test]
502 fn multiple_clients() {
503 let port = find_free_port();
504 let (tx, rx) = crate::event::channel();
505 let next_id = Arc::new(AtomicU64::new(4000));
506
507 let config = make_server_config(port, 4, None);
508
509 start(config, tx, next_id).unwrap();
510 thread::sleep(Duration::from_millis(50));
511
512 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
514 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
515
516 let mut ids = Vec::new();
518 for _ in 0..2 {
519 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
520 match event {
521 Event::InterfaceUp(id, _, _) => ids.push(id),
522 other => panic!("expected InterfaceUp, got {:?}", other),
523 }
524 }
525
526 assert_eq!(ids.len(), 2);
528 assert_ne!(ids[0], ids[1]);
529 }
530
531 #[test]
532 fn client_disconnect() {
533 let port = find_free_port();
534 let (tx, rx) = crate::event::channel();
535 let next_id = Arc::new(AtomicU64::new(5000));
536
537 let config = make_server_config(port, 5, None);
538
539 start(config, tx, next_id).unwrap();
540 thread::sleep(Duration::from_millis(50));
541
542 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
543
544 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
546
547 drop(client);
549
550 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
552 assert!(
553 matches!(event, Event::InterfaceDown(InterfaceId(5000))),
554 "expected InterfaceDown(5000), got {:?}",
555 event
556 );
557 }
558
559 #[test]
560 fn server_bind_port() {
561 let port = find_free_port();
562 let (tx, _rx) = crate::event::channel();
563 let next_id = Arc::new(AtomicU64::new(6000));
564
565 let config = make_server_config(port, 6, None);
566
567 start(config, tx, next_id).unwrap();
569 }
570
571 #[test]
572 fn max_connections_rejects_excess() {
573 let port = find_free_port();
574 let (tx, rx) = crate::event::channel();
575 let next_id = Arc::new(AtomicU64::new(7000));
576
577 let config = make_server_config(port, 7, Some(2));
578
579 start(config, tx, next_id).unwrap();
580 thread::sleep(Duration::from_millis(50));
581
582 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
584 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
585
586 for _ in 0..2 {
588 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
589 assert!(matches!(event, Event::InterfaceUp(_, _, _)));
590 }
591
592 let client3 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
594 client3
595 .set_read_timeout(Some(Duration::from_millis(500)))
596 .unwrap();
597
598 thread::sleep(Duration::from_millis(100));
600
601 let result = rx.recv_timeout(Duration::from_millis(500));
603 assert!(
604 result.is_err(),
605 "expected no InterfaceUp for rejected connection, got {:?}",
606 result
607 );
608 }
609
610 #[test]
611 fn max_connections_allows_after_disconnect() {
612 let port = find_free_port();
613 let (tx, rx) = crate::event::channel();
614 let next_id = Arc::new(AtomicU64::new(7100));
615
616 let config = make_server_config(port, 71, Some(1));
617
618 start(config, tx, next_id).unwrap();
619 thread::sleep(Duration::from_millis(50));
620
621 let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
623 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
624 assert!(matches!(event, Event::InterfaceUp(_, _, _)));
625
626 drop(client1);
628
629 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
631 assert!(matches!(event, Event::InterfaceDown(_)));
632
633 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
635 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
636 assert!(
637 matches!(event, Event::InterfaceUp(_, _, _)),
638 "expected InterfaceUp after slot freed, got {:?}",
639 event
640 );
641 }
642
643 #[test]
644 fn runtime_max_connections_updates_live() {
645 let port = find_free_port();
646 let (tx, rx) = crate::event::channel();
647 let next_id = Arc::new(AtomicU64::new(7200));
648
649 let config = make_server_config(port, 72, None);
650 let runtime = Arc::clone(&config.runtime);
651
652 start(config, tx, next_id).unwrap();
653 thread::sleep(Duration::from_millis(50));
654
655 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
656 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
657 assert!(matches!(event, Event::InterfaceUp(_, _, _)));
658
659 {
660 let mut runtime = runtime.lock().unwrap();
661 runtime.max_connections = Some(1);
662 }
663
664 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
665 let result = rx.recv_timeout(Duration::from_millis(400));
666 assert!(
667 result.is_err(),
668 "expected no InterfaceUp after lowering max_connections, got {:?}",
669 result
670 );
671 }
672}