Skip to main content

rns_net/interface/
tcp_server.rs

1//! TCP server interface with HDLC framing.
2//!
3//! Accepts client connections and spawns per-client reader threads.
4//! Each client gets a dynamically allocated InterfaceId.
5//! Matches Python `TCPServerInterface` from `TCPInterface.py`.
6
7use std::io::{self, Read, Write};
8use std::net::{TcpListener, TcpStream};
9use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
10use std::sync::{Arc, Mutex};
11use std::thread;
12
13use rns_core::constants;
14use rns_core::transport::types::{IngressControlConfig, InterfaceId, InterfaceInfo};
15
16use crate::event::{Event, EventSender};
17use crate::hdlc;
18use crate::interface::{lock_or_recover, ListenerControl, Writer};
19
20/// Configuration for a TCP server interface.
21#[derive(Debug, Clone)]
22pub struct TcpServerConfig {
23    pub name: String,
24    pub listen_ip: String,
25    pub listen_port: u16,
26    pub interface_id: InterfaceId,
27    pub max_connections: Option<usize>,
28    pub ingress_control: IngressControlConfig,
29    pub runtime: Arc<Mutex<TcpServerRuntime>>,
30}
31
32#[derive(Debug, Clone)]
33pub struct TcpServerRuntime {
34    pub max_connections: Option<usize>,
35}
36
37impl TcpServerRuntime {
38    pub fn from_config(config: &TcpServerConfig) -> Self {
39        Self {
40            max_connections: config.max_connections,
41        }
42    }
43}
44
45#[derive(Debug, Clone)]
46pub struct TcpServerRuntimeConfigHandle {
47    pub interface_name: String,
48    pub runtime: Arc<Mutex<TcpServerRuntime>>,
49    pub startup: TcpServerRuntime,
50}
51
52impl Default for TcpServerConfig {
53    fn default() -> Self {
54        let mut config = TcpServerConfig {
55            name: String::new(),
56            listen_ip: "0.0.0.0".into(),
57            listen_port: 4242,
58            interface_id: InterfaceId(0),
59            max_connections: None,
60            ingress_control: IngressControlConfig::enabled(),
61            runtime: Arc::new(Mutex::new(TcpServerRuntime {
62                max_connections: None,
63            })),
64        };
65        let startup = TcpServerRuntime::from_config(&config);
66        config.runtime = Arc::new(Mutex::new(startup));
67        config
68    }
69}
70
71/// Writer that sends HDLC-framed data over a TCP stream.
72struct TcpServerWriter {
73    stream: TcpStream,
74}
75
76impl Writer for TcpServerWriter {
77    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
78        self.stream.write_all(&hdlc::frame(data))
79    }
80}
81
82/// Start a TCP server. Spawns a listener thread that accepts connections
83/// and per-client reader threads. Returns immediately.
84///
85/// `next_id` is shared with the node for allocating unique InterfaceIds
86/// for each connected client.
87pub fn start(
88    config: TcpServerConfig,
89    tx: EventSender,
90    next_id: Arc<AtomicU64>,
91) -> io::Result<ListenerControl> {
92    let addr = format!("{}:{}", config.listen_ip, config.listen_port);
93    let listener = TcpListener::bind(&addr)?;
94    listener.set_nonblocking(true)?;
95
96    log::info!("[{}] TCP server listening on {}", config.name, addr);
97
98    let name = config.name.clone();
99    let runtime = Arc::clone(&config.runtime);
100    let ingress_control = config.ingress_control;
101    let active_connections = Arc::new(AtomicUsize::new(0));
102    let control = ListenerControl::new();
103    let listener_control = control.clone();
104    thread::Builder::new()
105        .name(format!("tcp-server-{}", config.interface_id.0))
106        .spawn(move || {
107            listener_loop(
108                listener,
109                name,
110                tx,
111                next_id,
112                runtime,
113                ingress_control,
114                active_connections,
115                listener_control,
116            );
117        })?;
118
119    Ok(control)
120}
121
122/// Listener thread: accepts connections and spawns reader threads.
123fn listener_loop(
124    listener: TcpListener,
125    name: String,
126    tx: EventSender,
127    next_id: Arc<AtomicU64>,
128    runtime: Arc<Mutex<TcpServerRuntime>>,
129    ingress_control: IngressControlConfig,
130    active_connections: Arc<AtomicUsize>,
131    control: ListenerControl,
132) {
133    loop {
134        if control.should_stop() {
135            log::info!("[{}] listener stopping", name);
136            return;
137        }
138
139        let stream_result = listener.accept().map(|(stream, _)| stream);
140        let stream = match stream_result {
141            Ok(s) => s,
142            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
143                thread::sleep(std::time::Duration::from_millis(50));
144                continue;
145            }
146            Err(e) => {
147                log::warn!("[{}] accept failed: {}", name, e);
148                continue;
149            }
150        };
151
152        let max_connections = lock_or_recover(&runtime, "tcp server runtime").max_connections;
153        if let Some(max) = max_connections {
154            if active_connections.load(Ordering::Relaxed) >= max {
155                let peer = stream.peer_addr().ok();
156                log::warn!(
157                    "[{}] max connections ({}) reached, rejecting {:?}",
158                    name,
159                    max,
160                    peer
161                );
162                drop(stream);
163                continue;
164            }
165        }
166
167        active_connections.fetch_add(1, Ordering::Relaxed);
168
169        let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
170        let peer_addr = stream.peer_addr().ok();
171
172        log::info!(
173            "[{}] client connected: {:?} → id {}",
174            name,
175            peer_addr,
176            client_id.0
177        );
178
179        // Set TCP_NODELAY on the client socket
180        if let Err(e) = stream.set_nodelay(true) {
181            log::warn!("[{}] set_nodelay failed: {}", name, e);
182        }
183
184        // Clone stream for writer
185        let writer_stream = match stream.try_clone() {
186            Ok(s) => s,
187            Err(e) => {
188                log::warn!("[{}] failed to clone stream: {}", name, e);
189                continue;
190            }
191        };
192
193        let writer: Box<dyn Writer> = Box::new(TcpServerWriter {
194            stream: writer_stream,
195        });
196
197        let info = InterfaceInfo {
198            id: client_id,
199            name: format!("TCPServerInterface/Client-{}", client_id.0),
200            mode: constants::MODE_FULL,
201            out_capable: true,
202            in_capable: true,
203            bitrate: None,
204            airtime_profile: None,
205            announce_rate_target: None,
206            announce_rate_grace: 0,
207            announce_rate_penalty: 0.0,
208            announce_cap: constants::ANNOUNCE_CAP,
209            is_local_client: false,
210            wants_tunnel: false,
211            tunnel_id: None,
212            mtu: 65535,
213            ia_freq: 0.0,
214            ip_freq: 0.0,
215            op_freq: 0.0,
216            op_samples: 0,
217            started: 0.0,
218            ingress_control,
219        };
220
221        // Send InterfaceUp with InterfaceInfo for dynamic registration
222        if tx
223            .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
224            .is_err()
225        {
226            // Driver shut down
227            return;
228        }
229
230        // Spawn reader thread for this client
231        let client_tx = tx.clone();
232        let client_name = name.clone();
233        let client_active = active_connections.clone();
234        thread::Builder::new()
235            .name(format!("tcp-server-reader-{}", client_id.0))
236            .spawn(move || {
237                client_reader_loop(stream, client_id, client_name, client_tx, client_active);
238            })
239            .ok();
240    }
241}
242
243/// Per-client reader thread: reads HDLC frames, sends to driver.
244fn client_reader_loop(
245    mut stream: TcpStream,
246    id: InterfaceId,
247    name: String,
248    tx: EventSender,
249    active_connections: Arc<AtomicUsize>,
250) {
251    let mut decoder = hdlc::Decoder::new();
252    let mut buf = [0u8; 4096];
253
254    loop {
255        match stream.read(&mut buf) {
256            Ok(0) => {
257                log::info!("[{}] client {} disconnected", name, id.0);
258                active_connections.fetch_sub(1, Ordering::Relaxed);
259                let _ = tx.send(Event::InterfaceDown(id));
260                return;
261            }
262            Ok(n) => {
263                for frame in decoder.feed(&buf[..n]) {
264                    if tx
265                        .send(Event::Frame {
266                            interface_id: id,
267                            data: frame,
268                            rssi: None,
269                            snr: None,
270                        })
271                        .is_err()
272                    {
273                        // Driver shut down
274                        active_connections.fetch_sub(1, Ordering::Relaxed);
275                        return;
276                    }
277                }
278            }
279            Err(e) => {
280                log::warn!("[{}] client {} read error: {}", name, id.0, e);
281                active_connections.fetch_sub(1, Ordering::Relaxed);
282                let _ = tx.send(Event::InterfaceDown(id));
283                return;
284            }
285        }
286    }
287}
288
289// --- Factory implementation ---
290
291use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
292use std::collections::HashMap;
293
294/// Factory for `TCPServerInterface`.
295pub struct TcpServerFactory;
296
297impl InterfaceFactory for TcpServerFactory {
298    fn type_name(&self) -> &str {
299        "TCPServerInterface"
300    }
301
302    fn parse_config(
303        &self,
304        name: &str,
305        id: InterfaceId,
306        params: &HashMap<String, String>,
307    ) -> Result<Box<dyn InterfaceConfigData>, String> {
308        let listen_ip = params
309            .get("listen_ip")
310            .cloned()
311            .unwrap_or_else(|| "0.0.0.0".into());
312        let listen_port = params
313            .get("listen_port")
314            .and_then(|v| v.parse().ok())
315            .unwrap_or(4242);
316        let max_connections = params.get("max_connections").and_then(|v| v.parse().ok());
317        let mut config = TcpServerConfig {
318            name: name.to_string(),
319            listen_ip,
320            listen_port,
321            interface_id: id,
322            max_connections,
323            ingress_control: IngressControlConfig::enabled(),
324            runtime: Arc::new(Mutex::new(TcpServerRuntime {
325                max_connections: None,
326            })),
327        };
328        let startup = TcpServerRuntime::from_config(&config);
329        config.runtime = Arc::new(Mutex::new(startup));
330        Ok(Box::new(config))
331    }
332
333    fn start(
334        &self,
335        config: Box<dyn InterfaceConfigData>,
336        ctx: StartContext,
337    ) -> io::Result<StartResult> {
338        let mut cfg = *config
339            .into_any()
340            .downcast::<TcpServerConfig>()
341            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
342        cfg.ingress_control = ctx.ingress_control;
343        let control = start(cfg, ctx.tx, ctx.next_dynamic_id)?;
344        Ok(StartResult::Listener {
345            control: Some(control),
346        })
347    }
348}
349
350pub(crate) fn runtime_handle_from_config(config: &TcpServerConfig) -> TcpServerRuntimeConfigHandle {
351    TcpServerRuntimeConfigHandle {
352        interface_name: config.name.clone(),
353        runtime: Arc::clone(&config.runtime),
354        startup: TcpServerRuntime::from_config(config),
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361    use std::net::TcpStream;
362    use std::sync::mpsc::RecvTimeoutError;
363    use std::time::Duration;
364
365    fn find_free_port() -> u16 {
366        TcpListener::bind("127.0.0.1:0")
367            .unwrap()
368            .local_addr()
369            .unwrap()
370            .port()
371    }
372
373    fn make_server_config(
374        port: u16,
375        interface_id: u64,
376        max_connections: Option<usize>,
377    ) -> TcpServerConfig {
378        let mut config = TcpServerConfig {
379            name: "test-server".into(),
380            listen_ip: "127.0.0.1".into(),
381            listen_port: port,
382            interface_id: InterfaceId(interface_id),
383            max_connections,
384            ingress_control: IngressControlConfig::enabled(),
385            runtime: Arc::new(Mutex::new(TcpServerRuntime {
386                max_connections: None,
387            })),
388        };
389        let startup = TcpServerRuntime::from_config(&config);
390        config.runtime = Arc::new(Mutex::new(startup));
391        config
392    }
393
394    #[test]
395    fn accept_connection() {
396        let port = find_free_port();
397        let (tx, rx) = crate::event::channel();
398        let next_id = Arc::new(AtomicU64::new(1000));
399
400        let config = make_server_config(port, 1, None);
401
402        start(config, tx, next_id).unwrap();
403
404        // Give server time to start listening
405        thread::sleep(Duration::from_millis(50));
406
407        // Connect a client
408        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
409
410        // Should receive InterfaceUp with InterfaceInfo (dynamic)
411        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
412        match event {
413            Event::InterfaceUp(id, writer, info) => {
414                assert_eq!(id, InterfaceId(1000));
415                assert!(writer.is_some());
416                assert!(info.is_some());
417            }
418            other => panic!("expected InterfaceUp, got {:?}", other),
419        }
420    }
421
422    #[test]
423    fn spawned_client_inherits_ingress_control_config() {
424        let port = find_free_port();
425        let (tx, rx) = crate::event::channel();
426        let next_id = Arc::new(AtomicU64::new(1100));
427
428        let mut config = make_server_config(port, 11, None);
429        config.ingress_control = IngressControlConfig::disabled();
430        config.ingress_control.max_held_announces = 17;
431        config.ingress_control.burst_hold = 1.5;
432        config.ingress_control.burst_freq_new = 2.5;
433        config.ingress_control.burst_freq = 3.5;
434        config.ingress_control.new_time = 4.5;
435        config.ingress_control.burst_penalty = 5.5;
436        config.ingress_control.held_release_interval = 6.5;
437
438        start(config, tx, next_id).unwrap();
439        thread::sleep(Duration::from_millis(50));
440
441        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
442
443        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
444        match event {
445            Event::InterfaceUp(_, _, Some(info)) => {
446                assert!(!info.ingress_control.enabled);
447                assert_eq!(info.ingress_control.max_held_announces, 17);
448                assert_eq!(info.ingress_control.burst_hold, 1.5);
449                assert_eq!(info.ingress_control.burst_freq_new, 2.5);
450                assert_eq!(info.ingress_control.burst_freq, 3.5);
451                assert_eq!(info.ingress_control.new_time, 4.5);
452                assert_eq!(info.ingress_control.burst_penalty, 5.5);
453                assert_eq!(info.ingress_control.held_release_interval, 6.5);
454            }
455            other => panic!("expected InterfaceUp with InterfaceInfo, got {:?}", other),
456        }
457    }
458
459    #[test]
460    fn listener_stop_prevents_new_accepts() {
461        let port = find_free_port();
462        let (tx, rx) = crate::event::channel();
463        let next_id = Arc::new(AtomicU64::new(1500));
464
465        let config = make_server_config(port, 15, None);
466        let control = start(config, tx, next_id).unwrap();
467
468        thread::sleep(Duration::from_millis(50));
469        control.request_stop();
470        thread::sleep(Duration::from_millis(120));
471
472        let connect_result = TcpStream::connect(format!("127.0.0.1:{}", port));
473        if let Ok(stream) = connect_result {
474            drop(stream);
475        }
476
477        match rx.recv_timeout(Duration::from_millis(200)) {
478            Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => {}
479            other => panic!(
480                "expected no InterfaceUp after listener stop, got {:?}",
481                other
482            ),
483        }
484    }
485
486    #[test]
487    fn receive_frame_from_client() {
488        let port = find_free_port();
489        let (tx, rx) = crate::event::channel();
490        let next_id = Arc::new(AtomicU64::new(2000));
491
492        let config = make_server_config(port, 2, None);
493
494        start(config, tx, next_id).unwrap();
495        thread::sleep(Duration::from_millis(50));
496
497        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
498
499        // Drain InterfaceUp
500        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
501
502        // Send an HDLC frame (>= 19 bytes)
503        let payload: Vec<u8> = (0..32).collect();
504        let framed = hdlc::frame(&payload);
505        client.write_all(&framed).unwrap();
506
507        // Should receive Frame event
508        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
509        match event {
510            Event::Frame {
511                interface_id,
512                data,
513                rssi: _,
514                snr: _,
515            } => {
516                assert_eq!(interface_id, InterfaceId(2000));
517                assert_eq!(data, payload);
518            }
519            other => panic!("expected Frame, got {:?}", other),
520        }
521    }
522
523    #[test]
524    fn send_frame_to_client() {
525        let port = find_free_port();
526        let (tx, rx) = crate::event::channel();
527        let next_id = Arc::new(AtomicU64::new(3000));
528
529        let config = make_server_config(port, 3, None);
530
531        start(config, tx, next_id).unwrap();
532        thread::sleep(Duration::from_millis(50));
533
534        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
535        client
536            .set_read_timeout(Some(Duration::from_secs(2)))
537            .unwrap();
538
539        // Get the writer from InterfaceUp
540        let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
541        let mut writer = match event {
542            Event::InterfaceUp(_, Some(w), _) => w,
543            other => panic!("expected InterfaceUp with writer, got {:?}", other),
544        };
545
546        // Send a frame via writer
547        let payload: Vec<u8> = (0..24).collect();
548        writer.send_frame(&payload).unwrap();
549
550        // Read from client side
551        let mut buf = [0u8; 256];
552        let n = client.read(&mut buf).unwrap();
553        let expected = hdlc::frame(&payload);
554        assert_eq!(&buf[..n], &expected[..]);
555    }
556
557    #[test]
558    fn multiple_clients() {
559        let port = find_free_port();
560        let (tx, rx) = crate::event::channel();
561        let next_id = Arc::new(AtomicU64::new(4000));
562
563        let config = make_server_config(port, 4, None);
564
565        start(config, tx, next_id).unwrap();
566        thread::sleep(Duration::from_millis(50));
567
568        // Connect two clients
569        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
570        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
571
572        // Collect InterfaceUp events
573        let mut ids = Vec::new();
574        for _ in 0..2 {
575            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
576            match event {
577                Event::InterfaceUp(id, _, _) => ids.push(id),
578                other => panic!("expected InterfaceUp, got {:?}", other),
579            }
580        }
581
582        // Should have unique IDs
583        assert_eq!(ids.len(), 2);
584        assert_ne!(ids[0], ids[1]);
585    }
586
587    #[test]
588    fn client_disconnect() {
589        let port = find_free_port();
590        let (tx, rx) = crate::event::channel();
591        let next_id = Arc::new(AtomicU64::new(5000));
592
593        let config = make_server_config(port, 5, None);
594
595        start(config, tx, next_id).unwrap();
596        thread::sleep(Duration::from_millis(50));
597
598        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
599
600        // Drain InterfaceUp
601        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
602
603        // Disconnect
604        drop(client);
605
606        // Should receive InterfaceDown
607        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
608        assert!(
609            matches!(event, Event::InterfaceDown(InterfaceId(5000))),
610            "expected InterfaceDown(5000), got {:?}",
611            event
612        );
613    }
614
615    #[test]
616    fn server_bind_port() {
617        let port = find_free_port();
618        let (tx, _rx) = crate::event::channel();
619        let next_id = Arc::new(AtomicU64::new(6000));
620
621        let config = make_server_config(port, 6, None);
622
623        // Should not error
624        start(config, tx, next_id).unwrap();
625    }
626
627    #[test]
628    fn max_connections_rejects_excess() {
629        let port = find_free_port();
630        let (tx, rx) = crate::event::channel();
631        let next_id = Arc::new(AtomicU64::new(7000));
632
633        let config = make_server_config(port, 7, Some(2));
634
635        start(config, tx, next_id).unwrap();
636        thread::sleep(Duration::from_millis(50));
637
638        // Connect two clients (at limit)
639        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
640        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
641
642        // Drain both InterfaceUp events
643        for _ in 0..2 {
644            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
645            assert!(matches!(event, Event::InterfaceUp(_, _, _)));
646        }
647
648        // Third connection should be accepted at TCP level but immediately dropped by server
649        let client3 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
650        client3
651            .set_read_timeout(Some(Duration::from_millis(500)))
652            .unwrap();
653
654        // Give server time to reject
655        thread::sleep(Duration::from_millis(100));
656
657        // Should NOT receive a third InterfaceUp
658        let result = rx.recv_timeout(Duration::from_millis(500));
659        assert!(
660            result.is_err(),
661            "expected no InterfaceUp for rejected connection, got {:?}",
662            result
663        );
664    }
665
666    #[test]
667    fn max_connections_allows_after_disconnect() {
668        let port = find_free_port();
669        let (tx, rx) = crate::event::channel();
670        let next_id = Arc::new(AtomicU64::new(7100));
671
672        let config = make_server_config(port, 71, Some(1));
673
674        start(config, tx, next_id).unwrap();
675        thread::sleep(Duration::from_millis(50));
676
677        // Connect first client
678        let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
679        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
680        assert!(matches!(event, Event::InterfaceUp(_, _, _)));
681
682        // Disconnect first client
683        drop(client1);
684
685        // Wait for InterfaceDown
686        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
687        assert!(matches!(event, Event::InterfaceDown(_)));
688
689        // Now a new connection should be accepted
690        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
691        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
692        assert!(
693            matches!(event, Event::InterfaceUp(_, _, _)),
694            "expected InterfaceUp after slot freed, got {:?}",
695            event
696        );
697    }
698
699    #[test]
700    fn runtime_max_connections_updates_live() {
701        let port = find_free_port();
702        let (tx, rx) = crate::event::channel();
703        let next_id = Arc::new(AtomicU64::new(7200));
704
705        let config = make_server_config(port, 72, None);
706        let runtime = Arc::clone(&config.runtime);
707
708        start(config, tx, next_id).unwrap();
709        thread::sleep(Duration::from_millis(50));
710
711        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
712        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
713        assert!(matches!(event, Event::InterfaceUp(_, _, _)));
714
715        {
716            let mut runtime = runtime.lock().unwrap();
717            runtime.max_connections = Some(1);
718        }
719
720        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
721        let result = rx.recv_timeout(Duration::from_millis(400));
722        assert!(
723            result.is_err(),
724            "expected no InterfaceUp after lowering max_connections, got {:?}",
725            result
726        );
727    }
728}