1use std::io::{self, Read, Write};
8use std::net::{TcpListener, TcpStream};
9use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
10use std::sync::{Arc, Mutex};
11use std::thread;
12
13use rns_core::constants;
14use rns_core::transport::types::{IngressControlConfig, InterfaceId, InterfaceInfo};
15
16use crate::event::{Event, EventSender};
17use crate::hdlc;
18use crate::interface::{lock_or_recover, ListenerControl, Writer};
19
20#[derive(Debug, Clone)]
22pub struct TcpServerConfig {
23 pub name: String,
24 pub listen_ip: String,
25 pub listen_port: u16,
26 pub interface_id: InterfaceId,
27 pub max_connections: Option<usize>,
28 pub ingress_control: IngressControlConfig,
29 pub runtime: Arc<Mutex<TcpServerRuntime>>,
30}
31
32#[derive(Debug, Clone)]
33pub struct TcpServerRuntime {
34 pub max_connections: Option<usize>,
35}
36
37impl TcpServerRuntime {
38 pub fn from_config(config: &TcpServerConfig) -> Self {
39 Self {
40 max_connections: config.max_connections,
41 }
42 }
43}
44
45#[derive(Debug, Clone)]
46pub struct TcpServerRuntimeConfigHandle {
47 pub interface_name: String,
48 pub runtime: Arc<Mutex<TcpServerRuntime>>,
49 pub startup: TcpServerRuntime,
50}
51
52impl Default for TcpServerConfig {
53 fn default() -> Self {
54 let mut config = TcpServerConfig {
55 name: String::new(),
56 listen_ip: "0.0.0.0".into(),
57 listen_port: 4242,
58 interface_id: InterfaceId(0),
59 max_connections: None,
60 ingress_control: IngressControlConfig::enabled(),
61 runtime: Arc::new(Mutex::new(TcpServerRuntime {
62 max_connections: None,
63 })),
64 };
65 let startup = TcpServerRuntime::from_config(&config);
66 config.runtime = Arc::new(Mutex::new(startup));
67 config
68 }
69}
70
71struct TcpServerWriter {
73 stream: TcpStream,
74}
75
76impl Writer for TcpServerWriter {
77 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
78 self.stream.write_all(&hdlc::frame(data))
79 }
80}
81
82pub fn start(
88 config: TcpServerConfig,
89 tx: EventSender,
90 next_id: Arc<AtomicU64>,
91) -> io::Result<ListenerControl> {
92 let addr = format!("{}:{}", config.listen_ip, config.listen_port);
93 let listener = TcpListener::bind(&addr)?;
94 listener.set_nonblocking(true)?;
95
96 log::info!("[{}] TCP server listening on {}", config.name, addr);
97
98 let name = config.name.clone();
99 let runtime = Arc::clone(&config.runtime);
100 let ingress_control = config.ingress_control;
101 let active_connections = Arc::new(AtomicUsize::new(0));
102 let control = ListenerControl::new();
103 let listener_control = control.clone();
104 thread::Builder::new()
105 .name(format!("tcp-server-{}", config.interface_id.0))
106 .spawn(move || {
107 listener_loop(
108 listener,
109 name,
110 tx,
111 next_id,
112 runtime,
113 ingress_control,
114 active_connections,
115 listener_control,
116 );
117 })?;
118
119 Ok(control)
120}
121
122fn listener_loop(
124 listener: TcpListener,
125 name: String,
126 tx: EventSender,
127 next_id: Arc<AtomicU64>,
128 runtime: Arc<Mutex<TcpServerRuntime>>,
129 ingress_control: IngressControlConfig,
130 active_connections: Arc<AtomicUsize>,
131 control: ListenerControl,
132) {
133 loop {
134 if control.should_stop() {
135 log::info!("[{}] listener stopping", name);
136 return;
137 }
138
139 let stream_result = listener.accept().map(|(stream, _)| stream);
140 let stream = match stream_result {
141 Ok(s) => s,
142 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
143 thread::sleep(std::time::Duration::from_millis(50));
144 continue;
145 }
146 Err(e) => {
147 log::warn!("[{}] accept failed: {}", name, e);
148 continue;
149 }
150 };
151
152 let max_connections = lock_or_recover(&runtime, "tcp server runtime").max_connections;
153 if let Some(max) = max_connections {
154 if active_connections.load(Ordering::Relaxed) >= max {
155 let peer = stream.peer_addr().ok();
156 log::warn!(
157 "[{}] max connections ({}) reached, rejecting {:?}",
158 name,
159 max,
160 peer
161 );
162 drop(stream);
163 continue;
164 }
165 }
166
167 active_connections.fetch_add(1, Ordering::Relaxed);
168
169 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
170 let peer_addr = stream.peer_addr().ok();
171
172 log::info!(
173 "[{}] client connected: {:?} → id {}",
174 name,
175 peer_addr,
176 client_id.0
177 );
178
179 if let Err(e) = stream.set_nodelay(true) {
181 log::warn!("[{}] set_nodelay failed: {}", name, e);
182 }
183
184 let writer_stream = match stream.try_clone() {
186 Ok(s) => s,
187 Err(e) => {
188 log::warn!("[{}] failed to clone stream: {}", name, e);
189 continue;
190 }
191 };
192
193 let writer: Box<dyn Writer> = Box::new(TcpServerWriter {
194 stream: writer_stream,
195 });
196
197 let info = InterfaceInfo {
198 id: client_id,
199 name: format!("TCPServerInterface/Client-{}", client_id.0),
200 mode: constants::MODE_FULL,
201 out_capable: true,
202 in_capable: true,
203 bitrate: None,
204 airtime_profile: None,
205 announce_rate_target: None,
206 announce_rate_grace: 0,
207 announce_rate_penalty: 0.0,
208 announce_cap: constants::ANNOUNCE_CAP,
209 is_local_client: false,
210 wants_tunnel: false,
211 tunnel_id: None,
212 mtu: 65535,
213 ia_freq: 0.0,
214 ip_freq: 0.0,
215 op_freq: 0.0,
216 op_samples: 0,
217 started: 0.0,
218 ingress_control,
219 };
220
221 if tx
223 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
224 .is_err()
225 {
226 return;
228 }
229
230 let client_tx = tx.clone();
232 let client_name = name.clone();
233 let client_active = active_connections.clone();
234 thread::Builder::new()
235 .name(format!("tcp-server-reader-{}", client_id.0))
236 .spawn(move || {
237 client_reader_loop(stream, client_id, client_name, client_tx, client_active);
238 })
239 .ok();
240 }
241}
242
243fn client_reader_loop(
245 mut stream: TcpStream,
246 id: InterfaceId,
247 name: String,
248 tx: EventSender,
249 active_connections: Arc<AtomicUsize>,
250) {
251 let mut decoder = hdlc::Decoder::new();
252 let mut buf = [0u8; 4096];
253
254 loop {
255 match stream.read(&mut buf) {
256 Ok(0) => {
257 log::info!("[{}] client {} disconnected", name, id.0);
258 active_connections.fetch_sub(1, Ordering::Relaxed);
259 let _ = tx.send(Event::InterfaceDown(id));
260 return;
261 }
262 Ok(n) => {
263 for frame in decoder.feed(&buf[..n]) {
264 if tx
265 .send(Event::Frame {
266 interface_id: id,
267 data: frame,
268 rssi: None,
269 snr: None,
270 })
271 .is_err()
272 {
273 active_connections.fetch_sub(1, Ordering::Relaxed);
275 return;
276 }
277 }
278 }
279 Err(e) => {
280 log::warn!("[{}] client {} read error: {}", name, id.0, e);
281 active_connections.fetch_sub(1, Ordering::Relaxed);
282 let _ = tx.send(Event::InterfaceDown(id));
283 return;
284 }
285 }
286 }
287}
288
289use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
292use std::collections::HashMap;
293
294pub struct TcpServerFactory;
296
297impl InterfaceFactory for TcpServerFactory {
298 fn type_name(&self) -> &str {
299 "TCPServerInterface"
300 }
301
302 fn parse_config(
303 &self,
304 name: &str,
305 id: InterfaceId,
306 params: &HashMap<String, String>,
307 ) -> Result<Box<dyn InterfaceConfigData>, String> {
308 let listen_ip = params
309 .get("listen_ip")
310 .cloned()
311 .unwrap_or_else(|| "0.0.0.0".into());
312 let listen_port = params
313 .get("listen_port")
314 .and_then(|v| v.parse().ok())
315 .unwrap_or(4242);
316 let max_connections = params.get("max_connections").and_then(|v| v.parse().ok());
317 let mut config = TcpServerConfig {
318 name: name.to_string(),
319 listen_ip,
320 listen_port,
321 interface_id: id,
322 max_connections,
323 ingress_control: IngressControlConfig::enabled(),
324 runtime: Arc::new(Mutex::new(TcpServerRuntime {
325 max_connections: None,
326 })),
327 };
328 let startup = TcpServerRuntime::from_config(&config);
329 config.runtime = Arc::new(Mutex::new(startup));
330 Ok(Box::new(config))
331 }
332
333 fn start(
334 &self,
335 config: Box<dyn InterfaceConfigData>,
336 ctx: StartContext,
337 ) -> io::Result<StartResult> {
338 let mut cfg = *config
339 .into_any()
340 .downcast::<TcpServerConfig>()
341 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
342 cfg.ingress_control = ctx.ingress_control;
343 let control = start(cfg, ctx.tx, ctx.next_dynamic_id)?;
344 Ok(StartResult::Listener {
345 control: Some(control),
346 })
347 }
348}
349
350pub(crate) fn runtime_handle_from_config(config: &TcpServerConfig) -> TcpServerRuntimeConfigHandle {
351 TcpServerRuntimeConfigHandle {
352 interface_name: config.name.clone(),
353 runtime: Arc::clone(&config.runtime),
354 startup: TcpServerRuntime::from_config(config),
355 }
356}
357
358#[cfg(test)]
359mod tests {
360 use super::*;
361 use std::net::TcpStream;
362 use std::sync::mpsc::RecvTimeoutError;
363 use std::time::Duration;
364
365 fn find_free_port() -> u16 {
366 TcpListener::bind("127.0.0.1:0")
367 .unwrap()
368 .local_addr()
369 .unwrap()
370 .port()
371 }
372
373 fn make_server_config(
374 port: u16,
375 interface_id: u64,
376 max_connections: Option<usize>,
377 ) -> TcpServerConfig {
378 let mut config = TcpServerConfig {
379 name: "test-server".into(),
380 listen_ip: "127.0.0.1".into(),
381 listen_port: port,
382 interface_id: InterfaceId(interface_id),
383 max_connections,
384 ingress_control: IngressControlConfig::enabled(),
385 runtime: Arc::new(Mutex::new(TcpServerRuntime {
386 max_connections: None,
387 })),
388 };
389 let startup = TcpServerRuntime::from_config(&config);
390 config.runtime = Arc::new(Mutex::new(startup));
391 config
392 }
393
394 #[test]
395 fn accept_connection() {
396 let port = find_free_port();
397 let (tx, rx) = crate::event::channel();
398 let next_id = Arc::new(AtomicU64::new(1000));
399
400 let config = make_server_config(port, 1, None);
401
402 start(config, tx, next_id).unwrap();
403
404 thread::sleep(Duration::from_millis(50));
406
407 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
409
410 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
412 match event {
413 Event::InterfaceUp(id, writer, info) => {
414 assert_eq!(id, InterfaceId(1000));
415 assert!(writer.is_some());
416 assert!(info.is_some());
417 }
418 other => panic!("expected InterfaceUp, got {:?}", other),
419 }
420 }
421
422 #[test]
423 fn spawned_client_inherits_ingress_control_config() {
424 let port = find_free_port();
425 let (tx, rx) = crate::event::channel();
426 let next_id = Arc::new(AtomicU64::new(1100));
427
428 let mut config = make_server_config(port, 11, None);
429 config.ingress_control = IngressControlConfig::disabled();
430 config.ingress_control.max_held_announces = 17;
431 config.ingress_control.burst_hold = 1.5;
432 config.ingress_control.burst_freq_new = 2.5;
433 config.ingress_control.burst_freq = 3.5;
434 config.ingress_control.new_time = 4.5;
435 config.ingress_control.burst_penalty = 5.5;
436 config.ingress_control.held_release_interval = 6.5;
437
438 start(config, tx, next_id).unwrap();
439 thread::sleep(Duration::from_millis(50));
440
441 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
442
443 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
444 match event {
445 Event::InterfaceUp(_, _, Some(info)) => {
446 assert!(!info.ingress_control.enabled);
447 assert_eq!(info.ingress_control.max_held_announces, 17);
448 assert_eq!(info.ingress_control.burst_hold, 1.5);
449 assert_eq!(info.ingress_control.burst_freq_new, 2.5);
450 assert_eq!(info.ingress_control.burst_freq, 3.5);
451 assert_eq!(info.ingress_control.new_time, 4.5);
452 assert_eq!(info.ingress_control.burst_penalty, 5.5);
453 assert_eq!(info.ingress_control.held_release_interval, 6.5);
454 }
455 other => panic!("expected InterfaceUp with InterfaceInfo, got {:?}", other),
456 }
457 }
458
459 #[test]
460 fn listener_stop_prevents_new_accepts() {
461 let port = find_free_port();
462 let (tx, rx) = crate::event::channel();
463 let next_id = Arc::new(AtomicU64::new(1500));
464
465 let config = make_server_config(port, 15, None);
466 let control = start(config, tx, next_id).unwrap();
467
468 thread::sleep(Duration::from_millis(50));
469 control.request_stop();
470 thread::sleep(Duration::from_millis(120));
471
472 let connect_result = TcpStream::connect(format!("127.0.0.1:{}", port));
473 if let Ok(stream) = connect_result {
474 drop(stream);
475 }
476
477 match rx.recv_timeout(Duration::from_millis(200)) {
478 Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => {}
479 other => panic!(
480 "expected no InterfaceUp after listener stop, got {:?}",
481 other
482 ),
483 }
484 }
485
486 #[test]
487 fn receive_frame_from_client() {
488 let port = find_free_port();
489 let (tx, rx) = crate::event::channel();
490 let next_id = Arc::new(AtomicU64::new(2000));
491
492 let config = make_server_config(port, 2, None);
493
494 start(config, tx, next_id).unwrap();
495 thread::sleep(Duration::from_millis(50));
496
497 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
498
499 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
501
502 let payload: Vec<u8> = (0..32).collect();
504 let framed = hdlc::frame(&payload);
505 client.write_all(&framed).unwrap();
506
507 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
509 match event {
510 Event::Frame {
511 interface_id,
512 data,
513 rssi: _,
514 snr: _,
515 } => {
516 assert_eq!(interface_id, InterfaceId(2000));
517 assert_eq!(data, payload);
518 }
519 other => panic!("expected Frame, got {:?}", other),
520 }
521 }
522
523 #[test]
524 fn send_frame_to_client() {
525 let port = find_free_port();
526 let (tx, rx) = crate::event::channel();
527 let next_id = Arc::new(AtomicU64::new(3000));
528
529 let config = make_server_config(port, 3, None);
530
531 start(config, tx, next_id).unwrap();
532 thread::sleep(Duration::from_millis(50));
533
534 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
535 client
536 .set_read_timeout(Some(Duration::from_secs(2)))
537 .unwrap();
538
539 let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
541 let mut writer = match event {
542 Event::InterfaceUp(_, Some(w), _) => w,
543 other => panic!("expected InterfaceUp with writer, got {:?}", other),
544 };
545
546 let payload: Vec<u8> = (0..24).collect();
548 writer.send_frame(&payload).unwrap();
549
550 let mut buf = [0u8; 256];
552 let n = client.read(&mut buf).unwrap();
553 let expected = hdlc::frame(&payload);
554 assert_eq!(&buf[..n], &expected[..]);
555 }
556
557 #[test]
558 fn multiple_clients() {
559 let port = find_free_port();
560 let (tx, rx) = crate::event::channel();
561 let next_id = Arc::new(AtomicU64::new(4000));
562
563 let config = make_server_config(port, 4, None);
564
565 start(config, tx, next_id).unwrap();
566 thread::sleep(Duration::from_millis(50));
567
568 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
570 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
571
572 let mut ids = Vec::new();
574 for _ in 0..2 {
575 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
576 match event {
577 Event::InterfaceUp(id, _, _) => ids.push(id),
578 other => panic!("expected InterfaceUp, got {:?}", other),
579 }
580 }
581
582 assert_eq!(ids.len(), 2);
584 assert_ne!(ids[0], ids[1]);
585 }
586
587 #[test]
588 fn client_disconnect() {
589 let port = find_free_port();
590 let (tx, rx) = crate::event::channel();
591 let next_id = Arc::new(AtomicU64::new(5000));
592
593 let config = make_server_config(port, 5, None);
594
595 start(config, tx, next_id).unwrap();
596 thread::sleep(Duration::from_millis(50));
597
598 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
599
600 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
602
603 drop(client);
605
606 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
608 assert!(
609 matches!(event, Event::InterfaceDown(InterfaceId(5000))),
610 "expected InterfaceDown(5000), got {:?}",
611 event
612 );
613 }
614
615 #[test]
616 fn server_bind_port() {
617 let port = find_free_port();
618 let (tx, _rx) = crate::event::channel();
619 let next_id = Arc::new(AtomicU64::new(6000));
620
621 let config = make_server_config(port, 6, None);
622
623 start(config, tx, next_id).unwrap();
625 }
626
627 #[test]
628 fn max_connections_rejects_excess() {
629 let port = find_free_port();
630 let (tx, rx) = crate::event::channel();
631 let next_id = Arc::new(AtomicU64::new(7000));
632
633 let config = make_server_config(port, 7, Some(2));
634
635 start(config, tx, next_id).unwrap();
636 thread::sleep(Duration::from_millis(50));
637
638 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
640 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
641
642 for _ in 0..2 {
644 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
645 assert!(matches!(event, Event::InterfaceUp(_, _, _)));
646 }
647
648 let client3 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
650 client3
651 .set_read_timeout(Some(Duration::from_millis(500)))
652 .unwrap();
653
654 thread::sleep(Duration::from_millis(100));
656
657 let result = rx.recv_timeout(Duration::from_millis(500));
659 assert!(
660 result.is_err(),
661 "expected no InterfaceUp for rejected connection, got {:?}",
662 result
663 );
664 }
665
666 #[test]
667 fn max_connections_allows_after_disconnect() {
668 let port = find_free_port();
669 let (tx, rx) = crate::event::channel();
670 let next_id = Arc::new(AtomicU64::new(7100));
671
672 let config = make_server_config(port, 71, Some(1));
673
674 start(config, tx, next_id).unwrap();
675 thread::sleep(Duration::from_millis(50));
676
677 let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
679 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
680 assert!(matches!(event, Event::InterfaceUp(_, _, _)));
681
682 drop(client1);
684
685 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
687 assert!(matches!(event, Event::InterfaceDown(_)));
688
689 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
691 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
692 assert!(
693 matches!(event, Event::InterfaceUp(_, _, _)),
694 "expected InterfaceUp after slot freed, got {:?}",
695 event
696 );
697 }
698
699 #[test]
700 fn runtime_max_connections_updates_live() {
701 let port = find_free_port();
702 let (tx, rx) = crate::event::channel();
703 let next_id = Arc::new(AtomicU64::new(7200));
704
705 let config = make_server_config(port, 72, None);
706 let runtime = Arc::clone(&config.runtime);
707
708 start(config, tx, next_id).unwrap();
709 thread::sleep(Duration::from_millis(50));
710
711 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
712 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
713 assert!(matches!(event, Event::InterfaceUp(_, _, _)));
714
715 {
716 let mut runtime = runtime.lock().unwrap();
717 runtime.max_connections = Some(1);
718 }
719
720 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
721 let result = rx.recv_timeout(Duration::from_millis(400));
722 assert!(
723 result.is_err(),
724 "expected no InterfaceUp after lowering max_connections, got {:?}",
725 result
726 );
727 }
728}