1use std::io::{self, Read, Write};
8use std::net::{TcpListener, TcpStream};
9use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
10use std::sync::{Arc, Mutex};
11use std::thread;
12
13use rns_core::constants;
14use rns_core::transport::types::{IngressControlConfig, InterfaceId, InterfaceInfo};
15
16use crate::event::{Event, EventSender};
17use crate::hdlc;
18use crate::interface::{lock_or_recover, ListenerControl, Writer};
19
20#[derive(Debug, Clone)]
22pub struct TcpServerConfig {
23 pub name: String,
24 pub listen_ip: String,
25 pub listen_port: u16,
26 pub interface_id: InterfaceId,
27 pub max_connections: Option<usize>,
28 pub ingress_control: IngressControlConfig,
29 pub runtime: Arc<Mutex<TcpServerRuntime>>,
30}
31
32#[derive(Debug, Clone)]
33pub struct TcpServerRuntime {
34 pub max_connections: Option<usize>,
35}
36
37impl TcpServerRuntime {
38 pub fn from_config(config: &TcpServerConfig) -> Self {
39 Self {
40 max_connections: config.max_connections,
41 }
42 }
43}
44
45#[derive(Debug, Clone)]
46pub struct TcpServerRuntimeConfigHandle {
47 pub interface_name: String,
48 pub runtime: Arc<Mutex<TcpServerRuntime>>,
49 pub startup: TcpServerRuntime,
50}
51
52impl Default for TcpServerConfig {
53 fn default() -> Self {
54 let mut config = TcpServerConfig {
55 name: String::new(),
56 listen_ip: "0.0.0.0".into(),
57 listen_port: 4242,
58 interface_id: InterfaceId(0),
59 max_connections: None,
60 ingress_control: IngressControlConfig::enabled(),
61 runtime: Arc::new(Mutex::new(TcpServerRuntime {
62 max_connections: None,
63 })),
64 };
65 let startup = TcpServerRuntime::from_config(&config);
66 config.runtime = Arc::new(Mutex::new(startup));
67 config
68 }
69}
70
71struct TcpServerWriter {
73 stream: TcpStream,
74}
75
76impl Writer for TcpServerWriter {
77 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
78 self.stream.write_all(&hdlc::frame(data))
79 }
80}
81
82pub fn start(
88 config: TcpServerConfig,
89 tx: EventSender,
90 next_id: Arc<AtomicU64>,
91) -> io::Result<ListenerControl> {
92 let addr = format!("{}:{}", config.listen_ip, config.listen_port);
93 let listener = TcpListener::bind(&addr)?;
94 listener.set_nonblocking(true)?;
95
96 log::info!("[{}] TCP server listening on {}", config.name, addr);
97
98 let name = config.name.clone();
99 let runtime = Arc::clone(&config.runtime);
100 let ingress_control = config.ingress_control;
101 let active_connections = Arc::new(AtomicUsize::new(0));
102 let control = ListenerControl::new();
103 let listener_control = control.clone();
104 thread::Builder::new()
105 .name(format!("tcp-server-{}", config.interface_id.0))
106 .spawn(move || {
107 listener_loop(
108 listener,
109 name,
110 tx,
111 next_id,
112 runtime,
113 ingress_control,
114 active_connections,
115 listener_control,
116 );
117 })?;
118
119 Ok(control)
120}
121
122fn listener_loop(
124 listener: TcpListener,
125 name: String,
126 tx: EventSender,
127 next_id: Arc<AtomicU64>,
128 runtime: Arc<Mutex<TcpServerRuntime>>,
129 ingress_control: IngressControlConfig,
130 active_connections: Arc<AtomicUsize>,
131 control: ListenerControl,
132) {
133 loop {
134 if control.should_stop() {
135 log::info!("[{}] listener stopping", name);
136 return;
137 }
138
139 let stream_result = listener.accept().map(|(stream, _)| stream);
140 let stream = match stream_result {
141 Ok(s) => s,
142 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
143 thread::sleep(std::time::Duration::from_millis(50));
144 continue;
145 }
146 Err(e) => {
147 log::warn!("[{}] accept failed: {}", name, e);
148 continue;
149 }
150 };
151
152 let max_connections = lock_or_recover(&runtime, "tcp server runtime").max_connections;
153 if let Some(max) = max_connections {
154 if active_connections.load(Ordering::Relaxed) >= max {
155 let peer = stream.peer_addr().ok();
156 log::warn!(
157 "[{}] max connections ({}) reached, rejecting {:?}",
158 name,
159 max,
160 peer
161 );
162 drop(stream);
163 continue;
164 }
165 }
166
167 active_connections.fetch_add(1, Ordering::Relaxed);
168
169 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
170 let peer_addr = stream.peer_addr().ok();
171
172 log::info!(
173 "[{}] client connected: {:?} → id {}",
174 name,
175 peer_addr,
176 client_id.0
177 );
178
179 if let Err(e) = stream.set_nodelay(true) {
181 log::warn!("[{}] set_nodelay failed: {}", name, e);
182 }
183
184 let writer_stream = match stream.try_clone() {
186 Ok(s) => s,
187 Err(e) => {
188 log::warn!("[{}] failed to clone stream: {}", name, e);
189 continue;
190 }
191 };
192
193 let writer: Box<dyn Writer> = Box::new(TcpServerWriter {
194 stream: writer_stream,
195 });
196
197 let info = InterfaceInfo {
198 id: client_id,
199 name: format!("TCPServerInterface/Client-{}", client_id.0),
200 mode: constants::MODE_FULL,
201 out_capable: true,
202 in_capable: true,
203 bitrate: None,
204 airtime_profile: None,
205 announce_rate_target: None,
206 announce_rate_grace: 0,
207 announce_rate_penalty: 0.0,
208 announce_cap: constants::ANNOUNCE_CAP,
209 is_local_client: false,
210 wants_tunnel: false,
211 tunnel_id: None,
212 mtu: 65535,
213 ia_freq: 0.0,
214 started: 0.0,
215 ingress_control,
216 };
217
218 if tx
220 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
221 .is_err()
222 {
223 return;
225 }
226
227 let client_tx = tx.clone();
229 let client_name = name.clone();
230 let client_active = active_connections.clone();
231 thread::Builder::new()
232 .name(format!("tcp-server-reader-{}", client_id.0))
233 .spawn(move || {
234 client_reader_loop(stream, client_id, client_name, client_tx, client_active);
235 })
236 .ok();
237 }
238}
239
240fn client_reader_loop(
242 mut stream: TcpStream,
243 id: InterfaceId,
244 name: String,
245 tx: EventSender,
246 active_connections: Arc<AtomicUsize>,
247) {
248 let mut decoder = hdlc::Decoder::new();
249 let mut buf = [0u8; 4096];
250
251 loop {
252 match stream.read(&mut buf) {
253 Ok(0) => {
254 log::info!("[{}] client {} disconnected", name, id.0);
255 active_connections.fetch_sub(1, Ordering::Relaxed);
256 let _ = tx.send(Event::InterfaceDown(id));
257 return;
258 }
259 Ok(n) => {
260 for frame in decoder.feed(&buf[..n]) {
261 if tx
262 .send(Event::Frame {
263 interface_id: id,
264 data: frame,
265 })
266 .is_err()
267 {
268 active_connections.fetch_sub(1, Ordering::Relaxed);
270 return;
271 }
272 }
273 }
274 Err(e) => {
275 log::warn!("[{}] client {} read error: {}", name, id.0, e);
276 active_connections.fetch_sub(1, Ordering::Relaxed);
277 let _ = tx.send(Event::InterfaceDown(id));
278 return;
279 }
280 }
281 }
282}
283
284use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
287use std::collections::HashMap;
288
289pub struct TcpServerFactory;
291
292impl InterfaceFactory for TcpServerFactory {
293 fn type_name(&self) -> &str {
294 "TCPServerInterface"
295 }
296
297 fn parse_config(
298 &self,
299 name: &str,
300 id: InterfaceId,
301 params: &HashMap<String, String>,
302 ) -> Result<Box<dyn InterfaceConfigData>, String> {
303 let listen_ip = params
304 .get("listen_ip")
305 .cloned()
306 .unwrap_or_else(|| "0.0.0.0".into());
307 let listen_port = params
308 .get("listen_port")
309 .and_then(|v| v.parse().ok())
310 .unwrap_or(4242);
311 let max_connections = params.get("max_connections").and_then(|v| v.parse().ok());
312 let mut config = TcpServerConfig {
313 name: name.to_string(),
314 listen_ip,
315 listen_port,
316 interface_id: id,
317 max_connections,
318 ingress_control: IngressControlConfig::enabled(),
319 runtime: Arc::new(Mutex::new(TcpServerRuntime {
320 max_connections: None,
321 })),
322 };
323 let startup = TcpServerRuntime::from_config(&config);
324 config.runtime = Arc::new(Mutex::new(startup));
325 Ok(Box::new(config))
326 }
327
328 fn start(
329 &self,
330 config: Box<dyn InterfaceConfigData>,
331 ctx: StartContext,
332 ) -> io::Result<StartResult> {
333 let mut cfg = *config
334 .into_any()
335 .downcast::<TcpServerConfig>()
336 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
337 cfg.ingress_control = ctx.ingress_control;
338 let control = start(cfg, ctx.tx, ctx.next_dynamic_id)?;
339 Ok(StartResult::Listener {
340 control: Some(control),
341 })
342 }
343}
344
345pub(crate) fn runtime_handle_from_config(config: &TcpServerConfig) -> TcpServerRuntimeConfigHandle {
346 TcpServerRuntimeConfigHandle {
347 interface_name: config.name.clone(),
348 runtime: Arc::clone(&config.runtime),
349 startup: TcpServerRuntime::from_config(config),
350 }
351}
352
353#[cfg(test)]
354mod tests {
355 use super::*;
356 use std::net::TcpStream;
357 use std::sync::mpsc::RecvTimeoutError;
358 use std::time::Duration;
359
360 fn find_free_port() -> u16 {
361 TcpListener::bind("127.0.0.1:0")
362 .unwrap()
363 .local_addr()
364 .unwrap()
365 .port()
366 }
367
368 fn make_server_config(
369 port: u16,
370 interface_id: u64,
371 max_connections: Option<usize>,
372 ) -> TcpServerConfig {
373 let mut config = TcpServerConfig {
374 name: "test-server".into(),
375 listen_ip: "127.0.0.1".into(),
376 listen_port: port,
377 interface_id: InterfaceId(interface_id),
378 max_connections,
379 ingress_control: IngressControlConfig::enabled(),
380 runtime: Arc::new(Mutex::new(TcpServerRuntime {
381 max_connections: None,
382 })),
383 };
384 let startup = TcpServerRuntime::from_config(&config);
385 config.runtime = Arc::new(Mutex::new(startup));
386 config
387 }
388
389 #[test]
390 fn accept_connection() {
391 let port = find_free_port();
392 let (tx, rx) = crate::event::channel();
393 let next_id = Arc::new(AtomicU64::new(1000));
394
395 let config = make_server_config(port, 1, None);
396
397 start(config, tx, next_id).unwrap();
398
399 thread::sleep(Duration::from_millis(50));
401
402 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
404
405 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
407 match event {
408 Event::InterfaceUp(id, writer, info) => {
409 assert_eq!(id, InterfaceId(1000));
410 assert!(writer.is_some());
411 assert!(info.is_some());
412 }
413 other => panic!("expected InterfaceUp, got {:?}", other),
414 }
415 }
416
417 #[test]
418 fn spawned_client_inherits_ingress_control_config() {
419 let port = find_free_port();
420 let (tx, rx) = crate::event::channel();
421 let next_id = Arc::new(AtomicU64::new(1100));
422
423 let mut config = make_server_config(port, 11, None);
424 config.ingress_control = IngressControlConfig::disabled();
425 config.ingress_control.max_held_announces = 17;
426 config.ingress_control.burst_hold = 1.5;
427 config.ingress_control.burst_freq_new = 2.5;
428 config.ingress_control.burst_freq = 3.5;
429 config.ingress_control.new_time = 4.5;
430 config.ingress_control.burst_penalty = 5.5;
431 config.ingress_control.held_release_interval = 6.5;
432
433 start(config, tx, next_id).unwrap();
434 thread::sleep(Duration::from_millis(50));
435
436 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
437
438 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
439 match event {
440 Event::InterfaceUp(_, _, Some(info)) => {
441 assert!(!info.ingress_control.enabled);
442 assert_eq!(info.ingress_control.max_held_announces, 17);
443 assert_eq!(info.ingress_control.burst_hold, 1.5);
444 assert_eq!(info.ingress_control.burst_freq_new, 2.5);
445 assert_eq!(info.ingress_control.burst_freq, 3.5);
446 assert_eq!(info.ingress_control.new_time, 4.5);
447 assert_eq!(info.ingress_control.burst_penalty, 5.5);
448 assert_eq!(info.ingress_control.held_release_interval, 6.5);
449 }
450 other => panic!("expected InterfaceUp with InterfaceInfo, got {:?}", other),
451 }
452 }
453
454 #[test]
455 fn listener_stop_prevents_new_accepts() {
456 let port = find_free_port();
457 let (tx, rx) = crate::event::channel();
458 let next_id = Arc::new(AtomicU64::new(1500));
459
460 let config = make_server_config(port, 15, None);
461 let control = start(config, tx, next_id).unwrap();
462
463 thread::sleep(Duration::from_millis(50));
464 control.request_stop();
465 thread::sleep(Duration::from_millis(120));
466
467 let connect_result = TcpStream::connect(format!("127.0.0.1:{}", port));
468 if let Ok(stream) = connect_result {
469 drop(stream);
470 }
471
472 match rx.recv_timeout(Duration::from_millis(200)) {
473 Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => {}
474 other => panic!(
475 "expected no InterfaceUp after listener stop, got {:?}",
476 other
477 ),
478 }
479 }
480
481 #[test]
482 fn receive_frame_from_client() {
483 let port = find_free_port();
484 let (tx, rx) = crate::event::channel();
485 let next_id = Arc::new(AtomicU64::new(2000));
486
487 let config = make_server_config(port, 2, None);
488
489 start(config, tx, next_id).unwrap();
490 thread::sleep(Duration::from_millis(50));
491
492 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
493
494 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
496
497 let payload: Vec<u8> = (0..32).collect();
499 let framed = hdlc::frame(&payload);
500 client.write_all(&framed).unwrap();
501
502 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
504 match event {
505 Event::Frame { interface_id, data } => {
506 assert_eq!(interface_id, InterfaceId(2000));
507 assert_eq!(data, payload);
508 }
509 other => panic!("expected Frame, got {:?}", other),
510 }
511 }
512
513 #[test]
514 fn send_frame_to_client() {
515 let port = find_free_port();
516 let (tx, rx) = crate::event::channel();
517 let next_id = Arc::new(AtomicU64::new(3000));
518
519 let config = make_server_config(port, 3, None);
520
521 start(config, tx, next_id).unwrap();
522 thread::sleep(Duration::from_millis(50));
523
524 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
525 client
526 .set_read_timeout(Some(Duration::from_secs(2)))
527 .unwrap();
528
529 let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
531 let mut writer = match event {
532 Event::InterfaceUp(_, Some(w), _) => w,
533 other => panic!("expected InterfaceUp with writer, got {:?}", other),
534 };
535
536 let payload: Vec<u8> = (0..24).collect();
538 writer.send_frame(&payload).unwrap();
539
540 let mut buf = [0u8; 256];
542 let n = client.read(&mut buf).unwrap();
543 let expected = hdlc::frame(&payload);
544 assert_eq!(&buf[..n], &expected[..]);
545 }
546
547 #[test]
548 fn multiple_clients() {
549 let port = find_free_port();
550 let (tx, rx) = crate::event::channel();
551 let next_id = Arc::new(AtomicU64::new(4000));
552
553 let config = make_server_config(port, 4, None);
554
555 start(config, tx, next_id).unwrap();
556 thread::sleep(Duration::from_millis(50));
557
558 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
560 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
561
562 let mut ids = Vec::new();
564 for _ in 0..2 {
565 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
566 match event {
567 Event::InterfaceUp(id, _, _) => ids.push(id),
568 other => panic!("expected InterfaceUp, got {:?}", other),
569 }
570 }
571
572 assert_eq!(ids.len(), 2);
574 assert_ne!(ids[0], ids[1]);
575 }
576
577 #[test]
578 fn client_disconnect() {
579 let port = find_free_port();
580 let (tx, rx) = crate::event::channel();
581 let next_id = Arc::new(AtomicU64::new(5000));
582
583 let config = make_server_config(port, 5, None);
584
585 start(config, tx, next_id).unwrap();
586 thread::sleep(Duration::from_millis(50));
587
588 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
589
590 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
592
593 drop(client);
595
596 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
598 assert!(
599 matches!(event, Event::InterfaceDown(InterfaceId(5000))),
600 "expected InterfaceDown(5000), got {:?}",
601 event
602 );
603 }
604
605 #[test]
606 fn server_bind_port() {
607 let port = find_free_port();
608 let (tx, _rx) = crate::event::channel();
609 let next_id = Arc::new(AtomicU64::new(6000));
610
611 let config = make_server_config(port, 6, None);
612
613 start(config, tx, next_id).unwrap();
615 }
616
617 #[test]
618 fn max_connections_rejects_excess() {
619 let port = find_free_port();
620 let (tx, rx) = crate::event::channel();
621 let next_id = Arc::new(AtomicU64::new(7000));
622
623 let config = make_server_config(port, 7, Some(2));
624
625 start(config, tx, next_id).unwrap();
626 thread::sleep(Duration::from_millis(50));
627
628 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
630 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
631
632 for _ in 0..2 {
634 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
635 assert!(matches!(event, Event::InterfaceUp(_, _, _)));
636 }
637
638 let client3 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
640 client3
641 .set_read_timeout(Some(Duration::from_millis(500)))
642 .unwrap();
643
644 thread::sleep(Duration::from_millis(100));
646
647 let result = rx.recv_timeout(Duration::from_millis(500));
649 assert!(
650 result.is_err(),
651 "expected no InterfaceUp for rejected connection, got {:?}",
652 result
653 );
654 }
655
656 #[test]
657 fn max_connections_allows_after_disconnect() {
658 let port = find_free_port();
659 let (tx, rx) = crate::event::channel();
660 let next_id = Arc::new(AtomicU64::new(7100));
661
662 let config = make_server_config(port, 71, Some(1));
663
664 start(config, tx, next_id).unwrap();
665 thread::sleep(Duration::from_millis(50));
666
667 let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
669 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
670 assert!(matches!(event, Event::InterfaceUp(_, _, _)));
671
672 drop(client1);
674
675 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
677 assert!(matches!(event, Event::InterfaceDown(_)));
678
679 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
681 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
682 assert!(
683 matches!(event, Event::InterfaceUp(_, _, _)),
684 "expected InterfaceUp after slot freed, got {:?}",
685 event
686 );
687 }
688
689 #[test]
690 fn runtime_max_connections_updates_live() {
691 let port = find_free_port();
692 let (tx, rx) = crate::event::channel();
693 let next_id = Arc::new(AtomicU64::new(7200));
694
695 let config = make_server_config(port, 72, None);
696 let runtime = Arc::clone(&config.runtime);
697
698 start(config, tx, next_id).unwrap();
699 thread::sleep(Duration::from_millis(50));
700
701 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
702 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
703 assert!(matches!(event, Event::InterfaceUp(_, _, _)));
704
705 {
706 let mut runtime = runtime.lock().unwrap();
707 runtime.max_connections = Some(1);
708 }
709
710 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
711 let result = rx.recv_timeout(Duration::from_millis(400));
712 assert!(
713 result.is_err(),
714 "expected no InterfaceUp after lowering max_connections, got {:?}",
715 result
716 );
717 }
718}