Skip to main content

rns_net/interface/
tcp_server.rs

1//! TCP server interface with HDLC framing.
2//!
3//! Accepts client connections and spawns per-client reader threads.
4//! Each client gets a dynamically allocated InterfaceId.
5//! Matches Python `TCPServerInterface` from `TCPInterface.py`.
6
7use std::io::{self, Read, Write};
8use std::net::{TcpListener, TcpStream};
9use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
10use std::sync::{Arc, Mutex};
11use std::thread;
12
13use rns_core::constants;
14use rns_core::transport::types::{IngressControlConfig, InterfaceId, InterfaceInfo};
15
16use crate::event::{Event, EventSender};
17use crate::hdlc;
18use crate::interface::{lock_or_recover, ListenerControl, Writer};
19
20/// Configuration for a TCP server interface.
21#[derive(Debug, Clone)]
22pub struct TcpServerConfig {
23    pub name: String,
24    pub listen_ip: String,
25    pub listen_port: u16,
26    pub interface_id: InterfaceId,
27    pub max_connections: Option<usize>,
28    pub ingress_control: IngressControlConfig,
29    pub runtime: Arc<Mutex<TcpServerRuntime>>,
30}
31
32#[derive(Debug, Clone)]
33pub struct TcpServerRuntime {
34    pub max_connections: Option<usize>,
35}
36
37impl TcpServerRuntime {
38    pub fn from_config(config: &TcpServerConfig) -> Self {
39        Self {
40            max_connections: config.max_connections,
41        }
42    }
43}
44
45#[derive(Debug, Clone)]
46pub struct TcpServerRuntimeConfigHandle {
47    pub interface_name: String,
48    pub runtime: Arc<Mutex<TcpServerRuntime>>,
49    pub startup: TcpServerRuntime,
50}
51
52impl Default for TcpServerConfig {
53    fn default() -> Self {
54        let mut config = TcpServerConfig {
55            name: String::new(),
56            listen_ip: "0.0.0.0".into(),
57            listen_port: 4242,
58            interface_id: InterfaceId(0),
59            max_connections: None,
60            ingress_control: IngressControlConfig::enabled(),
61            runtime: Arc::new(Mutex::new(TcpServerRuntime {
62                max_connections: None,
63            })),
64        };
65        let startup = TcpServerRuntime::from_config(&config);
66        config.runtime = Arc::new(Mutex::new(startup));
67        config
68    }
69}
70
71/// Writer that sends HDLC-framed data over a TCP stream.
72struct TcpServerWriter {
73    stream: TcpStream,
74}
75
76impl Writer for TcpServerWriter {
77    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
78        self.stream.write_all(&hdlc::frame(data))
79    }
80}
81
82/// Start a TCP server. Spawns a listener thread that accepts connections
83/// and per-client reader threads. Returns immediately.
84///
85/// `next_id` is shared with the node for allocating unique InterfaceIds
86/// for each connected client.
87pub fn start(
88    config: TcpServerConfig,
89    tx: EventSender,
90    next_id: Arc<AtomicU64>,
91) -> io::Result<ListenerControl> {
92    let addr = format!("{}:{}", config.listen_ip, config.listen_port);
93    let listener = TcpListener::bind(&addr)?;
94    listener.set_nonblocking(true)?;
95
96    log::info!("[{}] TCP server listening on {}", config.name, addr);
97
98    let name = config.name.clone();
99    let runtime = Arc::clone(&config.runtime);
100    let ingress_control = config.ingress_control;
101    let active_connections = Arc::new(AtomicUsize::new(0));
102    let control = ListenerControl::new();
103    let listener_control = control.clone();
104    thread::Builder::new()
105        .name(format!("tcp-server-{}", config.interface_id.0))
106        .spawn(move || {
107            listener_loop(
108                listener,
109                name,
110                tx,
111                next_id,
112                runtime,
113                ingress_control,
114                active_connections,
115                listener_control,
116            );
117        })?;
118
119    Ok(control)
120}
121
122/// Listener thread: accepts connections and spawns reader threads.
123fn listener_loop(
124    listener: TcpListener,
125    name: String,
126    tx: EventSender,
127    next_id: Arc<AtomicU64>,
128    runtime: Arc<Mutex<TcpServerRuntime>>,
129    ingress_control: IngressControlConfig,
130    active_connections: Arc<AtomicUsize>,
131    control: ListenerControl,
132) {
133    loop {
134        if control.should_stop() {
135            log::info!("[{}] listener stopping", name);
136            return;
137        }
138
139        let stream_result = listener.accept().map(|(stream, _)| stream);
140        let stream = match stream_result {
141            Ok(s) => s,
142            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
143                thread::sleep(std::time::Duration::from_millis(50));
144                continue;
145            }
146            Err(e) => {
147                log::warn!("[{}] accept failed: {}", name, e);
148                continue;
149            }
150        };
151
152        let max_connections = lock_or_recover(&runtime, "tcp server runtime").max_connections;
153        if let Some(max) = max_connections {
154            if active_connections.load(Ordering::Relaxed) >= max {
155                let peer = stream.peer_addr().ok();
156                log::warn!(
157                    "[{}] max connections ({}) reached, rejecting {:?}",
158                    name,
159                    max,
160                    peer
161                );
162                drop(stream);
163                continue;
164            }
165        }
166
167        active_connections.fetch_add(1, Ordering::Relaxed);
168
169        let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
170        let peer_addr = stream.peer_addr().ok();
171
172        log::info!(
173            "[{}] client connected: {:?} → id {}",
174            name,
175            peer_addr,
176            client_id.0
177        );
178
179        // Set TCP_NODELAY on the client socket
180        if let Err(e) = stream.set_nodelay(true) {
181            log::warn!("[{}] set_nodelay failed: {}", name, e);
182        }
183
184        // Clone stream for writer
185        let writer_stream = match stream.try_clone() {
186            Ok(s) => s,
187            Err(e) => {
188                log::warn!("[{}] failed to clone stream: {}", name, e);
189                continue;
190            }
191        };
192
193        let writer: Box<dyn Writer> = Box::new(TcpServerWriter {
194            stream: writer_stream,
195        });
196
197        let info = InterfaceInfo {
198            id: client_id,
199            name: format!("TCPServerInterface/Client-{}", client_id.0),
200            mode: constants::MODE_FULL,
201            out_capable: true,
202            in_capable: true,
203            bitrate: None,
204            airtime_profile: None,
205            announce_rate_target: None,
206            announce_rate_grace: 0,
207            announce_rate_penalty: 0.0,
208            announce_cap: constants::ANNOUNCE_CAP,
209            is_local_client: false,
210            wants_tunnel: false,
211            tunnel_id: None,
212            mtu: 65535,
213            ia_freq: 0.0,
214            started: 0.0,
215            ingress_control,
216        };
217
218        // Send InterfaceUp with InterfaceInfo for dynamic registration
219        if tx
220            .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
221            .is_err()
222        {
223            // Driver shut down
224            return;
225        }
226
227        // Spawn reader thread for this client
228        let client_tx = tx.clone();
229        let client_name = name.clone();
230        let client_active = active_connections.clone();
231        thread::Builder::new()
232            .name(format!("tcp-server-reader-{}", client_id.0))
233            .spawn(move || {
234                client_reader_loop(stream, client_id, client_name, client_tx, client_active);
235            })
236            .ok();
237    }
238}
239
240/// Per-client reader thread: reads HDLC frames, sends to driver.
241fn client_reader_loop(
242    mut stream: TcpStream,
243    id: InterfaceId,
244    name: String,
245    tx: EventSender,
246    active_connections: Arc<AtomicUsize>,
247) {
248    let mut decoder = hdlc::Decoder::new();
249    let mut buf = [0u8; 4096];
250
251    loop {
252        match stream.read(&mut buf) {
253            Ok(0) => {
254                log::info!("[{}] client {} disconnected", name, id.0);
255                active_connections.fetch_sub(1, Ordering::Relaxed);
256                let _ = tx.send(Event::InterfaceDown(id));
257                return;
258            }
259            Ok(n) => {
260                for frame in decoder.feed(&buf[..n]) {
261                    if tx
262                        .send(Event::Frame {
263                            interface_id: id,
264                            data: frame,
265                        })
266                        .is_err()
267                    {
268                        // Driver shut down
269                        active_connections.fetch_sub(1, Ordering::Relaxed);
270                        return;
271                    }
272                }
273            }
274            Err(e) => {
275                log::warn!("[{}] client {} read error: {}", name, id.0, e);
276                active_connections.fetch_sub(1, Ordering::Relaxed);
277                let _ = tx.send(Event::InterfaceDown(id));
278                return;
279            }
280        }
281    }
282}
283
284// --- Factory implementation ---
285
286use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
287use std::collections::HashMap;
288
289/// Factory for `TCPServerInterface`.
290pub struct TcpServerFactory;
291
292impl InterfaceFactory for TcpServerFactory {
293    fn type_name(&self) -> &str {
294        "TCPServerInterface"
295    }
296
297    fn parse_config(
298        &self,
299        name: &str,
300        id: InterfaceId,
301        params: &HashMap<String, String>,
302    ) -> Result<Box<dyn InterfaceConfigData>, String> {
303        let listen_ip = params
304            .get("listen_ip")
305            .cloned()
306            .unwrap_or_else(|| "0.0.0.0".into());
307        let listen_port = params
308            .get("listen_port")
309            .and_then(|v| v.parse().ok())
310            .unwrap_or(4242);
311        let max_connections = params.get("max_connections").and_then(|v| v.parse().ok());
312        let mut config = TcpServerConfig {
313            name: name.to_string(),
314            listen_ip,
315            listen_port,
316            interface_id: id,
317            max_connections,
318            ingress_control: IngressControlConfig::enabled(),
319            runtime: Arc::new(Mutex::new(TcpServerRuntime {
320                max_connections: None,
321            })),
322        };
323        let startup = TcpServerRuntime::from_config(&config);
324        config.runtime = Arc::new(Mutex::new(startup));
325        Ok(Box::new(config))
326    }
327
328    fn start(
329        &self,
330        config: Box<dyn InterfaceConfigData>,
331        ctx: StartContext,
332    ) -> io::Result<StartResult> {
333        let mut cfg = *config
334            .into_any()
335            .downcast::<TcpServerConfig>()
336            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
337        cfg.ingress_control = ctx.ingress_control;
338        let control = start(cfg, ctx.tx, ctx.next_dynamic_id)?;
339        Ok(StartResult::Listener {
340            control: Some(control),
341        })
342    }
343}
344
345pub(crate) fn runtime_handle_from_config(config: &TcpServerConfig) -> TcpServerRuntimeConfigHandle {
346    TcpServerRuntimeConfigHandle {
347        interface_name: config.name.clone(),
348        runtime: Arc::clone(&config.runtime),
349        startup: TcpServerRuntime::from_config(config),
350    }
351}
352
353#[cfg(test)]
354mod tests {
355    use super::*;
356    use std::net::TcpStream;
357    use std::sync::mpsc::RecvTimeoutError;
358    use std::time::Duration;
359
360    fn find_free_port() -> u16 {
361        TcpListener::bind("127.0.0.1:0")
362            .unwrap()
363            .local_addr()
364            .unwrap()
365            .port()
366    }
367
368    fn make_server_config(
369        port: u16,
370        interface_id: u64,
371        max_connections: Option<usize>,
372    ) -> TcpServerConfig {
373        let mut config = TcpServerConfig {
374            name: "test-server".into(),
375            listen_ip: "127.0.0.1".into(),
376            listen_port: port,
377            interface_id: InterfaceId(interface_id),
378            max_connections,
379            ingress_control: IngressControlConfig::enabled(),
380            runtime: Arc::new(Mutex::new(TcpServerRuntime {
381                max_connections: None,
382            })),
383        };
384        let startup = TcpServerRuntime::from_config(&config);
385        config.runtime = Arc::new(Mutex::new(startup));
386        config
387    }
388
389    #[test]
390    fn accept_connection() {
391        let port = find_free_port();
392        let (tx, rx) = crate::event::channel();
393        let next_id = Arc::new(AtomicU64::new(1000));
394
395        let config = make_server_config(port, 1, None);
396
397        start(config, tx, next_id).unwrap();
398
399        // Give server time to start listening
400        thread::sleep(Duration::from_millis(50));
401
402        // Connect a client
403        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
404
405        // Should receive InterfaceUp with InterfaceInfo (dynamic)
406        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
407        match event {
408            Event::InterfaceUp(id, writer, info) => {
409                assert_eq!(id, InterfaceId(1000));
410                assert!(writer.is_some());
411                assert!(info.is_some());
412            }
413            other => panic!("expected InterfaceUp, got {:?}", other),
414        }
415    }
416
417    #[test]
418    fn spawned_client_inherits_ingress_control_config() {
419        let port = find_free_port();
420        let (tx, rx) = crate::event::channel();
421        let next_id = Arc::new(AtomicU64::new(1100));
422
423        let mut config = make_server_config(port, 11, None);
424        config.ingress_control = IngressControlConfig::disabled();
425        config.ingress_control.max_held_announces = 17;
426        config.ingress_control.burst_hold = 1.5;
427        config.ingress_control.burst_freq_new = 2.5;
428        config.ingress_control.burst_freq = 3.5;
429        config.ingress_control.new_time = 4.5;
430        config.ingress_control.burst_penalty = 5.5;
431        config.ingress_control.held_release_interval = 6.5;
432
433        start(config, tx, next_id).unwrap();
434        thread::sleep(Duration::from_millis(50));
435
436        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
437
438        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
439        match event {
440            Event::InterfaceUp(_, _, Some(info)) => {
441                assert!(!info.ingress_control.enabled);
442                assert_eq!(info.ingress_control.max_held_announces, 17);
443                assert_eq!(info.ingress_control.burst_hold, 1.5);
444                assert_eq!(info.ingress_control.burst_freq_new, 2.5);
445                assert_eq!(info.ingress_control.burst_freq, 3.5);
446                assert_eq!(info.ingress_control.new_time, 4.5);
447                assert_eq!(info.ingress_control.burst_penalty, 5.5);
448                assert_eq!(info.ingress_control.held_release_interval, 6.5);
449            }
450            other => panic!("expected InterfaceUp with InterfaceInfo, got {:?}", other),
451        }
452    }
453
454    #[test]
455    fn listener_stop_prevents_new_accepts() {
456        let port = find_free_port();
457        let (tx, rx) = crate::event::channel();
458        let next_id = Arc::new(AtomicU64::new(1500));
459
460        let config = make_server_config(port, 15, None);
461        let control = start(config, tx, next_id).unwrap();
462
463        thread::sleep(Duration::from_millis(50));
464        control.request_stop();
465        thread::sleep(Duration::from_millis(120));
466
467        let connect_result = TcpStream::connect(format!("127.0.0.1:{}", port));
468        if let Ok(stream) = connect_result {
469            drop(stream);
470        }
471
472        match rx.recv_timeout(Duration::from_millis(200)) {
473            Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => {}
474            other => panic!(
475                "expected no InterfaceUp after listener stop, got {:?}",
476                other
477            ),
478        }
479    }
480
481    #[test]
482    fn receive_frame_from_client() {
483        let port = find_free_port();
484        let (tx, rx) = crate::event::channel();
485        let next_id = Arc::new(AtomicU64::new(2000));
486
487        let config = make_server_config(port, 2, None);
488
489        start(config, tx, next_id).unwrap();
490        thread::sleep(Duration::from_millis(50));
491
492        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
493
494        // Drain InterfaceUp
495        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
496
497        // Send an HDLC frame (>= 19 bytes)
498        let payload: Vec<u8> = (0..32).collect();
499        let framed = hdlc::frame(&payload);
500        client.write_all(&framed).unwrap();
501
502        // Should receive Frame event
503        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
504        match event {
505            Event::Frame { interface_id, data } => {
506                assert_eq!(interface_id, InterfaceId(2000));
507                assert_eq!(data, payload);
508            }
509            other => panic!("expected Frame, got {:?}", other),
510        }
511    }
512
513    #[test]
514    fn send_frame_to_client() {
515        let port = find_free_port();
516        let (tx, rx) = crate::event::channel();
517        let next_id = Arc::new(AtomicU64::new(3000));
518
519        let config = make_server_config(port, 3, None);
520
521        start(config, tx, next_id).unwrap();
522        thread::sleep(Duration::from_millis(50));
523
524        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
525        client
526            .set_read_timeout(Some(Duration::from_secs(2)))
527            .unwrap();
528
529        // Get the writer from InterfaceUp
530        let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
531        let mut writer = match event {
532            Event::InterfaceUp(_, Some(w), _) => w,
533            other => panic!("expected InterfaceUp with writer, got {:?}", other),
534        };
535
536        // Send a frame via writer
537        let payload: Vec<u8> = (0..24).collect();
538        writer.send_frame(&payload).unwrap();
539
540        // Read from client side
541        let mut buf = [0u8; 256];
542        let n = client.read(&mut buf).unwrap();
543        let expected = hdlc::frame(&payload);
544        assert_eq!(&buf[..n], &expected[..]);
545    }
546
547    #[test]
548    fn multiple_clients() {
549        let port = find_free_port();
550        let (tx, rx) = crate::event::channel();
551        let next_id = Arc::new(AtomicU64::new(4000));
552
553        let config = make_server_config(port, 4, None);
554
555        start(config, tx, next_id).unwrap();
556        thread::sleep(Duration::from_millis(50));
557
558        // Connect two clients
559        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
560        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
561
562        // Collect InterfaceUp events
563        let mut ids = Vec::new();
564        for _ in 0..2 {
565            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
566            match event {
567                Event::InterfaceUp(id, _, _) => ids.push(id),
568                other => panic!("expected InterfaceUp, got {:?}", other),
569            }
570        }
571
572        // Should have unique IDs
573        assert_eq!(ids.len(), 2);
574        assert_ne!(ids[0], ids[1]);
575    }
576
577    #[test]
578    fn client_disconnect() {
579        let port = find_free_port();
580        let (tx, rx) = crate::event::channel();
581        let next_id = Arc::new(AtomicU64::new(5000));
582
583        let config = make_server_config(port, 5, None);
584
585        start(config, tx, next_id).unwrap();
586        thread::sleep(Duration::from_millis(50));
587
588        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
589
590        // Drain InterfaceUp
591        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
592
593        // Disconnect
594        drop(client);
595
596        // Should receive InterfaceDown
597        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
598        assert!(
599            matches!(event, Event::InterfaceDown(InterfaceId(5000))),
600            "expected InterfaceDown(5000), got {:?}",
601            event
602        );
603    }
604
605    #[test]
606    fn server_bind_port() {
607        let port = find_free_port();
608        let (tx, _rx) = crate::event::channel();
609        let next_id = Arc::new(AtomicU64::new(6000));
610
611        let config = make_server_config(port, 6, None);
612
613        // Should not error
614        start(config, tx, next_id).unwrap();
615    }
616
617    #[test]
618    fn max_connections_rejects_excess() {
619        let port = find_free_port();
620        let (tx, rx) = crate::event::channel();
621        let next_id = Arc::new(AtomicU64::new(7000));
622
623        let config = make_server_config(port, 7, Some(2));
624
625        start(config, tx, next_id).unwrap();
626        thread::sleep(Duration::from_millis(50));
627
628        // Connect two clients (at limit)
629        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
630        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
631
632        // Drain both InterfaceUp events
633        for _ in 0..2 {
634            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
635            assert!(matches!(event, Event::InterfaceUp(_, _, _)));
636        }
637
638        // Third connection should be accepted at TCP level but immediately dropped by server
639        let client3 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
640        client3
641            .set_read_timeout(Some(Duration::from_millis(500)))
642            .unwrap();
643
644        // Give server time to reject
645        thread::sleep(Duration::from_millis(100));
646
647        // Should NOT receive a third InterfaceUp
648        let result = rx.recv_timeout(Duration::from_millis(500));
649        assert!(
650            result.is_err(),
651            "expected no InterfaceUp for rejected connection, got {:?}",
652            result
653        );
654    }
655
656    #[test]
657    fn max_connections_allows_after_disconnect() {
658        let port = find_free_port();
659        let (tx, rx) = crate::event::channel();
660        let next_id = Arc::new(AtomicU64::new(7100));
661
662        let config = make_server_config(port, 71, Some(1));
663
664        start(config, tx, next_id).unwrap();
665        thread::sleep(Duration::from_millis(50));
666
667        // Connect first client
668        let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
669        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
670        assert!(matches!(event, Event::InterfaceUp(_, _, _)));
671
672        // Disconnect first client
673        drop(client1);
674
675        // Wait for InterfaceDown
676        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
677        assert!(matches!(event, Event::InterfaceDown(_)));
678
679        // Now a new connection should be accepted
680        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
681        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
682        assert!(
683            matches!(event, Event::InterfaceUp(_, _, _)),
684            "expected InterfaceUp after slot freed, got {:?}",
685            event
686        );
687    }
688
689    #[test]
690    fn runtime_max_connections_updates_live() {
691        let port = find_free_port();
692        let (tx, rx) = crate::event::channel();
693        let next_id = Arc::new(AtomicU64::new(7200));
694
695        let config = make_server_config(port, 72, None);
696        let runtime = Arc::clone(&config.runtime);
697
698        start(config, tx, next_id).unwrap();
699        thread::sleep(Duration::from_millis(50));
700
701        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
702        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
703        assert!(matches!(event, Event::InterfaceUp(_, _, _)));
704
705        {
706            let mut runtime = runtime.lock().unwrap();
707            runtime.max_connections = Some(1);
708        }
709
710        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
711        let result = rx.recv_timeout(Duration::from_millis(400));
712        assert!(
713            result.is_err(),
714            "expected no InterfaceUp after lowering max_connections, got {:?}",
715            result
716        );
717    }
718}