1use std::io::{self, Read, Write};
8use std::net::{TcpListener, TcpStream};
9use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
10use std::sync::{Arc, Mutex};
11use std::thread;
12
13use rns_core::constants;
14use rns_core::transport::types::{IngressControlConfig, InterfaceId, InterfaceInfo};
15
16use crate::event::{Event, EventSender};
17use crate::hdlc;
18use crate::interface::{ListenerControl, Writer};
19
20#[derive(Debug, Clone)]
22pub struct TcpServerConfig {
23 pub name: String,
24 pub listen_ip: String,
25 pub listen_port: u16,
26 pub interface_id: InterfaceId,
27 pub max_connections: Option<usize>,
28 pub ingress_control: IngressControlConfig,
29 pub runtime: Arc<Mutex<TcpServerRuntime>>,
30}
31
32#[derive(Debug, Clone)]
33pub struct TcpServerRuntime {
34 pub max_connections: Option<usize>,
35}
36
37impl TcpServerRuntime {
38 pub fn from_config(config: &TcpServerConfig) -> Self {
39 Self {
40 max_connections: config.max_connections,
41 }
42 }
43}
44
45#[derive(Debug, Clone)]
46pub struct TcpServerRuntimeConfigHandle {
47 pub interface_name: String,
48 pub runtime: Arc<Mutex<TcpServerRuntime>>,
49 pub startup: TcpServerRuntime,
50}
51
52impl Default for TcpServerConfig {
53 fn default() -> Self {
54 let mut config = TcpServerConfig {
55 name: String::new(),
56 listen_ip: "0.0.0.0".into(),
57 listen_port: 4242,
58 interface_id: InterfaceId(0),
59 max_connections: None,
60 ingress_control: IngressControlConfig::enabled(),
61 runtime: Arc::new(Mutex::new(TcpServerRuntime {
62 max_connections: None,
63 })),
64 };
65 let startup = TcpServerRuntime::from_config(&config);
66 config.runtime = Arc::new(Mutex::new(startup));
67 config
68 }
69}
70
71struct TcpServerWriter {
73 stream: TcpStream,
74}
75
76impl Writer for TcpServerWriter {
77 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
78 self.stream.write_all(&hdlc::frame(data))
79 }
80}
81
82pub fn start(
88 config: TcpServerConfig,
89 tx: EventSender,
90 next_id: Arc<AtomicU64>,
91) -> io::Result<ListenerControl> {
92 let addr = format!("{}:{}", config.listen_ip, config.listen_port);
93 let listener = TcpListener::bind(&addr)?;
94 listener.set_nonblocking(true)?;
95
96 log::info!("[{}] TCP server listening on {}", config.name, addr);
97
98 let name = config.name.clone();
99 let runtime = Arc::clone(&config.runtime);
100 let ingress_control = config.ingress_control;
101 let active_connections = Arc::new(AtomicUsize::new(0));
102 let control = ListenerControl::new();
103 let listener_control = control.clone();
104 thread::Builder::new()
105 .name(format!("tcp-server-{}", config.interface_id.0))
106 .spawn(move || {
107 listener_loop(
108 listener,
109 name,
110 tx,
111 next_id,
112 runtime,
113 ingress_control,
114 active_connections,
115 listener_control,
116 );
117 })?;
118
119 Ok(control)
120}
121
122fn listener_loop(
124 listener: TcpListener,
125 name: String,
126 tx: EventSender,
127 next_id: Arc<AtomicU64>,
128 runtime: Arc<Mutex<TcpServerRuntime>>,
129 ingress_control: IngressControlConfig,
130 active_connections: Arc<AtomicUsize>,
131 control: ListenerControl,
132) {
133 loop {
134 if control.should_stop() {
135 log::info!("[{}] listener stopping", name);
136 return;
137 }
138
139 let stream_result = listener.accept().map(|(stream, _)| stream);
140 let stream = match stream_result {
141 Ok(s) => s,
142 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
143 thread::sleep(std::time::Duration::from_millis(50));
144 continue;
145 }
146 Err(e) => {
147 log::warn!("[{}] accept failed: {}", name, e);
148 continue;
149 }
150 };
151
152 let max_connections = runtime.lock().unwrap().max_connections;
153 if let Some(max) = max_connections {
154 if active_connections.load(Ordering::Relaxed) >= max {
155 let peer = stream.peer_addr().ok();
156 log::warn!(
157 "[{}] max connections ({}) reached, rejecting {:?}",
158 name,
159 max,
160 peer
161 );
162 drop(stream);
163 continue;
164 }
165 }
166
167 active_connections.fetch_add(1, Ordering::Relaxed);
168
169 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
170 let peer_addr = stream.peer_addr().ok();
171
172 log::info!(
173 "[{}] client connected: {:?} → id {}",
174 name,
175 peer_addr,
176 client_id.0
177 );
178
179 if let Err(e) = stream.set_nodelay(true) {
181 log::warn!("[{}] set_nodelay failed: {}", name, e);
182 }
183
184 let writer_stream = match stream.try_clone() {
186 Ok(s) => s,
187 Err(e) => {
188 log::warn!("[{}] failed to clone stream: {}", name, e);
189 continue;
190 }
191 };
192
193 let writer: Box<dyn Writer> = Box::new(TcpServerWriter {
194 stream: writer_stream,
195 });
196
197 let info = InterfaceInfo {
198 id: client_id,
199 name: format!("TCPServerInterface/Client-{}", client_id.0),
200 mode: constants::MODE_FULL,
201 out_capable: true,
202 in_capable: true,
203 bitrate: None,
204 announce_rate_target: None,
205 announce_rate_grace: 0,
206 announce_rate_penalty: 0.0,
207 announce_cap: constants::ANNOUNCE_CAP,
208 is_local_client: false,
209 wants_tunnel: false,
210 tunnel_id: None,
211 mtu: 65535,
212 ia_freq: 0.0,
213 started: 0.0,
214 ingress_control,
215 };
216
217 if tx
219 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
220 .is_err()
221 {
222 return;
224 }
225
226 let client_tx = tx.clone();
228 let client_name = name.clone();
229 let client_active = active_connections.clone();
230 thread::Builder::new()
231 .name(format!("tcp-server-reader-{}", client_id.0))
232 .spawn(move || {
233 client_reader_loop(stream, client_id, client_name, client_tx, client_active);
234 })
235 .ok();
236 }
237}
238
239fn client_reader_loop(
241 mut stream: TcpStream,
242 id: InterfaceId,
243 name: String,
244 tx: EventSender,
245 active_connections: Arc<AtomicUsize>,
246) {
247 let mut decoder = hdlc::Decoder::new();
248 let mut buf = [0u8; 4096];
249
250 loop {
251 match stream.read(&mut buf) {
252 Ok(0) => {
253 log::info!("[{}] client {} disconnected", name, id.0);
254 active_connections.fetch_sub(1, Ordering::Relaxed);
255 let _ = tx.send(Event::InterfaceDown(id));
256 return;
257 }
258 Ok(n) => {
259 for frame in decoder.feed(&buf[..n]) {
260 if tx
261 .send(Event::Frame {
262 interface_id: id,
263 data: frame,
264 })
265 .is_err()
266 {
267 active_connections.fetch_sub(1, Ordering::Relaxed);
269 return;
270 }
271 }
272 }
273 Err(e) => {
274 log::warn!("[{}] client {} read error: {}", name, id.0, e);
275 active_connections.fetch_sub(1, Ordering::Relaxed);
276 let _ = tx.send(Event::InterfaceDown(id));
277 return;
278 }
279 }
280 }
281}
282
283use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
286use std::collections::HashMap;
287
288pub struct TcpServerFactory;
290
291impl InterfaceFactory for TcpServerFactory {
292 fn type_name(&self) -> &str {
293 "TCPServerInterface"
294 }
295
296 fn parse_config(
297 &self,
298 name: &str,
299 id: InterfaceId,
300 params: &HashMap<String, String>,
301 ) -> Result<Box<dyn InterfaceConfigData>, String> {
302 let listen_ip = params
303 .get("listen_ip")
304 .cloned()
305 .unwrap_or_else(|| "0.0.0.0".into());
306 let listen_port = params
307 .get("listen_port")
308 .and_then(|v| v.parse().ok())
309 .unwrap_or(4242);
310 let max_connections = params.get("max_connections").and_then(|v| v.parse().ok());
311 let mut config = TcpServerConfig {
312 name: name.to_string(),
313 listen_ip,
314 listen_port,
315 interface_id: id,
316 max_connections,
317 ingress_control: IngressControlConfig::enabled(),
318 runtime: Arc::new(Mutex::new(TcpServerRuntime {
319 max_connections: None,
320 })),
321 };
322 let startup = TcpServerRuntime::from_config(&config);
323 config.runtime = Arc::new(Mutex::new(startup));
324 Ok(Box::new(config))
325 }
326
327 fn start(
328 &self,
329 config: Box<dyn InterfaceConfigData>,
330 ctx: StartContext,
331 ) -> io::Result<StartResult> {
332 let mut cfg = *config
333 .into_any()
334 .downcast::<TcpServerConfig>()
335 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
336 cfg.ingress_control = ctx.ingress_control;
337 let control = start(cfg, ctx.tx, ctx.next_dynamic_id)?;
338 Ok(StartResult::Listener {
339 control: Some(control),
340 })
341 }
342}
343
344pub(crate) fn runtime_handle_from_config(config: &TcpServerConfig) -> TcpServerRuntimeConfigHandle {
345 TcpServerRuntimeConfigHandle {
346 interface_name: config.name.clone(),
347 runtime: Arc::clone(&config.runtime),
348 startup: TcpServerRuntime::from_config(config),
349 }
350}
351
352#[cfg(test)]
353mod tests {
354 use super::*;
355 use std::net::TcpStream;
356 use std::sync::mpsc::RecvTimeoutError;
357 use std::time::Duration;
358
359 fn find_free_port() -> u16 {
360 TcpListener::bind("127.0.0.1:0")
361 .unwrap()
362 .local_addr()
363 .unwrap()
364 .port()
365 }
366
367 fn make_server_config(
368 port: u16,
369 interface_id: u64,
370 max_connections: Option<usize>,
371 ) -> TcpServerConfig {
372 let mut config = TcpServerConfig {
373 name: "test-server".into(),
374 listen_ip: "127.0.0.1".into(),
375 listen_port: port,
376 interface_id: InterfaceId(interface_id),
377 max_connections,
378 ingress_control: IngressControlConfig::enabled(),
379 runtime: Arc::new(Mutex::new(TcpServerRuntime {
380 max_connections: None,
381 })),
382 };
383 let startup = TcpServerRuntime::from_config(&config);
384 config.runtime = Arc::new(Mutex::new(startup));
385 config
386 }
387
388 #[test]
389 fn accept_connection() {
390 let port = find_free_port();
391 let (tx, rx) = crate::event::channel();
392 let next_id = Arc::new(AtomicU64::new(1000));
393
394 let config = make_server_config(port, 1, None);
395
396 start(config, tx, next_id).unwrap();
397
398 thread::sleep(Duration::from_millis(50));
400
401 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
403
404 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
406 match event {
407 Event::InterfaceUp(id, writer, info) => {
408 assert_eq!(id, InterfaceId(1000));
409 assert!(writer.is_some());
410 assert!(info.is_some());
411 }
412 other => panic!("expected InterfaceUp, got {:?}", other),
413 }
414 }
415
416 #[test]
417 fn spawned_client_inherits_ingress_control_config() {
418 let port = find_free_port();
419 let (tx, rx) = crate::event::channel();
420 let next_id = Arc::new(AtomicU64::new(1100));
421
422 let mut config = make_server_config(port, 11, None);
423 config.ingress_control = IngressControlConfig::disabled();
424 config.ingress_control.max_held_announces = 17;
425 config.ingress_control.burst_hold = 1.5;
426 config.ingress_control.burst_freq_new = 2.5;
427 config.ingress_control.burst_freq = 3.5;
428 config.ingress_control.new_time = 4.5;
429 config.ingress_control.burst_penalty = 5.5;
430 config.ingress_control.held_release_interval = 6.5;
431
432 start(config, tx, next_id).unwrap();
433 thread::sleep(Duration::from_millis(50));
434
435 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
436
437 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
438 match event {
439 Event::InterfaceUp(_, _, Some(info)) => {
440 assert!(!info.ingress_control.enabled);
441 assert_eq!(info.ingress_control.max_held_announces, 17);
442 assert_eq!(info.ingress_control.burst_hold, 1.5);
443 assert_eq!(info.ingress_control.burst_freq_new, 2.5);
444 assert_eq!(info.ingress_control.burst_freq, 3.5);
445 assert_eq!(info.ingress_control.new_time, 4.5);
446 assert_eq!(info.ingress_control.burst_penalty, 5.5);
447 assert_eq!(info.ingress_control.held_release_interval, 6.5);
448 }
449 other => panic!("expected InterfaceUp with InterfaceInfo, got {:?}", other),
450 }
451 }
452
453 #[test]
454 fn listener_stop_prevents_new_accepts() {
455 let port = find_free_port();
456 let (tx, rx) = crate::event::channel();
457 let next_id = Arc::new(AtomicU64::new(1500));
458
459 let config = make_server_config(port, 15, None);
460 let control = start(config, tx, next_id).unwrap();
461
462 thread::sleep(Duration::from_millis(50));
463 control.request_stop();
464 thread::sleep(Duration::from_millis(120));
465
466 let connect_result = TcpStream::connect(format!("127.0.0.1:{}", port));
467 if let Ok(stream) = connect_result {
468 drop(stream);
469 }
470
471 match rx.recv_timeout(Duration::from_millis(200)) {
472 Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => {}
473 other => panic!(
474 "expected no InterfaceUp after listener stop, got {:?}",
475 other
476 ),
477 }
478 }
479
480 #[test]
481 fn receive_frame_from_client() {
482 let port = find_free_port();
483 let (tx, rx) = crate::event::channel();
484 let next_id = Arc::new(AtomicU64::new(2000));
485
486 let config = make_server_config(port, 2, None);
487
488 start(config, tx, next_id).unwrap();
489 thread::sleep(Duration::from_millis(50));
490
491 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
492
493 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
495
496 let payload: Vec<u8> = (0..32).collect();
498 let framed = hdlc::frame(&payload);
499 client.write_all(&framed).unwrap();
500
501 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
503 match event {
504 Event::Frame { interface_id, data } => {
505 assert_eq!(interface_id, InterfaceId(2000));
506 assert_eq!(data, payload);
507 }
508 other => panic!("expected Frame, got {:?}", other),
509 }
510 }
511
512 #[test]
513 fn send_frame_to_client() {
514 let port = find_free_port();
515 let (tx, rx) = crate::event::channel();
516 let next_id = Arc::new(AtomicU64::new(3000));
517
518 let config = make_server_config(port, 3, None);
519
520 start(config, tx, next_id).unwrap();
521 thread::sleep(Duration::from_millis(50));
522
523 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
524 client
525 .set_read_timeout(Some(Duration::from_secs(2)))
526 .unwrap();
527
528 let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
530 let mut writer = match event {
531 Event::InterfaceUp(_, Some(w), _) => w,
532 other => panic!("expected InterfaceUp with writer, got {:?}", other),
533 };
534
535 let payload: Vec<u8> = (0..24).collect();
537 writer.send_frame(&payload).unwrap();
538
539 let mut buf = [0u8; 256];
541 let n = client.read(&mut buf).unwrap();
542 let expected = hdlc::frame(&payload);
543 assert_eq!(&buf[..n], &expected[..]);
544 }
545
546 #[test]
547 fn multiple_clients() {
548 let port = find_free_port();
549 let (tx, rx) = crate::event::channel();
550 let next_id = Arc::new(AtomicU64::new(4000));
551
552 let config = make_server_config(port, 4, None);
553
554 start(config, tx, next_id).unwrap();
555 thread::sleep(Duration::from_millis(50));
556
557 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
559 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
560
561 let mut ids = Vec::new();
563 for _ in 0..2 {
564 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
565 match event {
566 Event::InterfaceUp(id, _, _) => ids.push(id),
567 other => panic!("expected InterfaceUp, got {:?}", other),
568 }
569 }
570
571 assert_eq!(ids.len(), 2);
573 assert_ne!(ids[0], ids[1]);
574 }
575
576 #[test]
577 fn client_disconnect() {
578 let port = find_free_port();
579 let (tx, rx) = crate::event::channel();
580 let next_id = Arc::new(AtomicU64::new(5000));
581
582 let config = make_server_config(port, 5, None);
583
584 start(config, tx, next_id).unwrap();
585 thread::sleep(Duration::from_millis(50));
586
587 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
588
589 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
591
592 drop(client);
594
595 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
597 assert!(
598 matches!(event, Event::InterfaceDown(InterfaceId(5000))),
599 "expected InterfaceDown(5000), got {:?}",
600 event
601 );
602 }
603
604 #[test]
605 fn server_bind_port() {
606 let port = find_free_port();
607 let (tx, _rx) = crate::event::channel();
608 let next_id = Arc::new(AtomicU64::new(6000));
609
610 let config = make_server_config(port, 6, None);
611
612 start(config, tx, next_id).unwrap();
614 }
615
616 #[test]
617 fn max_connections_rejects_excess() {
618 let port = find_free_port();
619 let (tx, rx) = crate::event::channel();
620 let next_id = Arc::new(AtomicU64::new(7000));
621
622 let config = make_server_config(port, 7, Some(2));
623
624 start(config, tx, next_id).unwrap();
625 thread::sleep(Duration::from_millis(50));
626
627 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
629 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
630
631 for _ in 0..2 {
633 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
634 assert!(matches!(event, Event::InterfaceUp(_, _, _)));
635 }
636
637 let client3 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
639 client3
640 .set_read_timeout(Some(Duration::from_millis(500)))
641 .unwrap();
642
643 thread::sleep(Duration::from_millis(100));
645
646 let result = rx.recv_timeout(Duration::from_millis(500));
648 assert!(
649 result.is_err(),
650 "expected no InterfaceUp for rejected connection, got {:?}",
651 result
652 );
653 }
654
655 #[test]
656 fn max_connections_allows_after_disconnect() {
657 let port = find_free_port();
658 let (tx, rx) = crate::event::channel();
659 let next_id = Arc::new(AtomicU64::new(7100));
660
661 let config = make_server_config(port, 71, Some(1));
662
663 start(config, tx, next_id).unwrap();
664 thread::sleep(Duration::from_millis(50));
665
666 let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
668 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
669 assert!(matches!(event, Event::InterfaceUp(_, _, _)));
670
671 drop(client1);
673
674 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
676 assert!(matches!(event, Event::InterfaceDown(_)));
677
678 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
680 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
681 assert!(
682 matches!(event, Event::InterfaceUp(_, _, _)),
683 "expected InterfaceUp after slot freed, got {:?}",
684 event
685 );
686 }
687
688 #[test]
689 fn runtime_max_connections_updates_live() {
690 let port = find_free_port();
691 let (tx, rx) = crate::event::channel();
692 let next_id = Arc::new(AtomicU64::new(7200));
693
694 let config = make_server_config(port, 72, None);
695 let runtime = Arc::clone(&config.runtime);
696
697 start(config, tx, next_id).unwrap();
698 thread::sleep(Duration::from_millis(50));
699
700 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
701 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
702 assert!(matches!(event, Event::InterfaceUp(_, _, _)));
703
704 {
705 let mut runtime = runtime.lock().unwrap();
706 runtime.max_connections = Some(1);
707 }
708
709 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
710 let result = rx.recv_timeout(Duration::from_millis(400));
711 assert!(
712 result.is_err(),
713 "expected no InterfaceUp after lowering max_connections, got {:?}",
714 result
715 );
716 }
717}