Skip to main content

rns_net/interface/
tcp_server.rs

1//! TCP server interface with HDLC framing.
2//!
3//! Accepts client connections and spawns per-client reader threads.
4//! Each client gets a dynamically allocated InterfaceId.
5//! Matches Python `TCPServerInterface` from `TCPInterface.py`.
6
7use std::io::{self, Read, Write};
8use std::net::{TcpListener, TcpStream};
9use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
10use std::sync::{Arc, Mutex};
11use std::thread;
12
13use rns_core::constants;
14use rns_core::transport::types::{IngressControlConfig, InterfaceId, InterfaceInfo};
15
16use crate::event::{Event, EventSender};
17use crate::hdlc;
18use crate::interface::{ListenerControl, Writer};
19
20/// Configuration for a TCP server interface.
21#[derive(Debug, Clone)]
22pub struct TcpServerConfig {
23    pub name: String,
24    pub listen_ip: String,
25    pub listen_port: u16,
26    pub interface_id: InterfaceId,
27    pub max_connections: Option<usize>,
28    pub ingress_control: IngressControlConfig,
29    pub runtime: Arc<Mutex<TcpServerRuntime>>,
30}
31
32#[derive(Debug, Clone)]
33pub struct TcpServerRuntime {
34    pub max_connections: Option<usize>,
35}
36
37impl TcpServerRuntime {
38    pub fn from_config(config: &TcpServerConfig) -> Self {
39        Self {
40            max_connections: config.max_connections,
41        }
42    }
43}
44
45#[derive(Debug, Clone)]
46pub struct TcpServerRuntimeConfigHandle {
47    pub interface_name: String,
48    pub runtime: Arc<Mutex<TcpServerRuntime>>,
49    pub startup: TcpServerRuntime,
50}
51
52impl Default for TcpServerConfig {
53    fn default() -> Self {
54        let mut config = TcpServerConfig {
55            name: String::new(),
56            listen_ip: "0.0.0.0".into(),
57            listen_port: 4242,
58            interface_id: InterfaceId(0),
59            max_connections: None,
60            ingress_control: IngressControlConfig::enabled(),
61            runtime: Arc::new(Mutex::new(TcpServerRuntime {
62                max_connections: None,
63            })),
64        };
65        let startup = TcpServerRuntime::from_config(&config);
66        config.runtime = Arc::new(Mutex::new(startup));
67        config
68    }
69}
70
71/// Writer that sends HDLC-framed data over a TCP stream.
72struct TcpServerWriter {
73    stream: TcpStream,
74}
75
76impl Writer for TcpServerWriter {
77    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
78        self.stream.write_all(&hdlc::frame(data))
79    }
80}
81
82/// Start a TCP server. Spawns a listener thread that accepts connections
83/// and per-client reader threads. Returns immediately.
84///
85/// `next_id` is shared with the node for allocating unique InterfaceIds
86/// for each connected client.
87pub fn start(
88    config: TcpServerConfig,
89    tx: EventSender,
90    next_id: Arc<AtomicU64>,
91) -> io::Result<ListenerControl> {
92    let addr = format!("{}:{}", config.listen_ip, config.listen_port);
93    let listener = TcpListener::bind(&addr)?;
94    listener.set_nonblocking(true)?;
95
96    log::info!("[{}] TCP server listening on {}", config.name, addr);
97
98    let name = config.name.clone();
99    let runtime = Arc::clone(&config.runtime);
100    let ingress_control = config.ingress_control;
101    let active_connections = Arc::new(AtomicUsize::new(0));
102    let control = ListenerControl::new();
103    let listener_control = control.clone();
104    thread::Builder::new()
105        .name(format!("tcp-server-{}", config.interface_id.0))
106        .spawn(move || {
107            listener_loop(
108                listener,
109                name,
110                tx,
111                next_id,
112                runtime,
113                ingress_control,
114                active_connections,
115                listener_control,
116            );
117        })?;
118
119    Ok(control)
120}
121
122/// Listener thread: accepts connections and spawns reader threads.
123fn listener_loop(
124    listener: TcpListener,
125    name: String,
126    tx: EventSender,
127    next_id: Arc<AtomicU64>,
128    runtime: Arc<Mutex<TcpServerRuntime>>,
129    ingress_control: IngressControlConfig,
130    active_connections: Arc<AtomicUsize>,
131    control: ListenerControl,
132) {
133    loop {
134        if control.should_stop() {
135            log::info!("[{}] listener stopping", name);
136            return;
137        }
138
139        let stream_result = listener.accept().map(|(stream, _)| stream);
140        let stream = match stream_result {
141            Ok(s) => s,
142            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
143                thread::sleep(std::time::Duration::from_millis(50));
144                continue;
145            }
146            Err(e) => {
147                log::warn!("[{}] accept failed: {}", name, e);
148                continue;
149            }
150        };
151
152        let max_connections = runtime.lock().unwrap().max_connections;
153        if let Some(max) = max_connections {
154            if active_connections.load(Ordering::Relaxed) >= max {
155                let peer = stream.peer_addr().ok();
156                log::warn!(
157                    "[{}] max connections ({}) reached, rejecting {:?}",
158                    name,
159                    max,
160                    peer
161                );
162                drop(stream);
163                continue;
164            }
165        }
166
167        active_connections.fetch_add(1, Ordering::Relaxed);
168
169        let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
170        let peer_addr = stream.peer_addr().ok();
171
172        log::info!(
173            "[{}] client connected: {:?} → id {}",
174            name,
175            peer_addr,
176            client_id.0
177        );
178
179        // Set TCP_NODELAY on the client socket
180        if let Err(e) = stream.set_nodelay(true) {
181            log::warn!("[{}] set_nodelay failed: {}", name, e);
182        }
183
184        // Clone stream for writer
185        let writer_stream = match stream.try_clone() {
186            Ok(s) => s,
187            Err(e) => {
188                log::warn!("[{}] failed to clone stream: {}", name, e);
189                continue;
190            }
191        };
192
193        let writer: Box<dyn Writer> = Box::new(TcpServerWriter {
194            stream: writer_stream,
195        });
196
197        let info = InterfaceInfo {
198            id: client_id,
199            name: format!("TCPServerInterface/Client-{}", client_id.0),
200            mode: constants::MODE_FULL,
201            out_capable: true,
202            in_capable: true,
203            bitrate: None,
204            announce_rate_target: None,
205            announce_rate_grace: 0,
206            announce_rate_penalty: 0.0,
207            announce_cap: constants::ANNOUNCE_CAP,
208            is_local_client: false,
209            wants_tunnel: false,
210            tunnel_id: None,
211            mtu: 65535,
212            ia_freq: 0.0,
213            started: 0.0,
214            ingress_control,
215        };
216
217        // Send InterfaceUp with InterfaceInfo for dynamic registration
218        if tx
219            .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
220            .is_err()
221        {
222            // Driver shut down
223            return;
224        }
225
226        // Spawn reader thread for this client
227        let client_tx = tx.clone();
228        let client_name = name.clone();
229        let client_active = active_connections.clone();
230        thread::Builder::new()
231            .name(format!("tcp-server-reader-{}", client_id.0))
232            .spawn(move || {
233                client_reader_loop(stream, client_id, client_name, client_tx, client_active);
234            })
235            .ok();
236    }
237}
238
239/// Per-client reader thread: reads HDLC frames, sends to driver.
240fn client_reader_loop(
241    mut stream: TcpStream,
242    id: InterfaceId,
243    name: String,
244    tx: EventSender,
245    active_connections: Arc<AtomicUsize>,
246) {
247    let mut decoder = hdlc::Decoder::new();
248    let mut buf = [0u8; 4096];
249
250    loop {
251        match stream.read(&mut buf) {
252            Ok(0) => {
253                log::info!("[{}] client {} disconnected", name, id.0);
254                active_connections.fetch_sub(1, Ordering::Relaxed);
255                let _ = tx.send(Event::InterfaceDown(id));
256                return;
257            }
258            Ok(n) => {
259                for frame in decoder.feed(&buf[..n]) {
260                    if tx
261                        .send(Event::Frame {
262                            interface_id: id,
263                            data: frame,
264                        })
265                        .is_err()
266                    {
267                        // Driver shut down
268                        active_connections.fetch_sub(1, Ordering::Relaxed);
269                        return;
270                    }
271                }
272            }
273            Err(e) => {
274                log::warn!("[{}] client {} read error: {}", name, id.0, e);
275                active_connections.fetch_sub(1, Ordering::Relaxed);
276                let _ = tx.send(Event::InterfaceDown(id));
277                return;
278            }
279        }
280    }
281}
282
283// --- Factory implementation ---
284
285use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
286use std::collections::HashMap;
287
288/// Factory for `TCPServerInterface`.
289pub struct TcpServerFactory;
290
291impl InterfaceFactory for TcpServerFactory {
292    fn type_name(&self) -> &str {
293        "TCPServerInterface"
294    }
295
296    fn parse_config(
297        &self,
298        name: &str,
299        id: InterfaceId,
300        params: &HashMap<String, String>,
301    ) -> Result<Box<dyn InterfaceConfigData>, String> {
302        let listen_ip = params
303            .get("listen_ip")
304            .cloned()
305            .unwrap_or_else(|| "0.0.0.0".into());
306        let listen_port = params
307            .get("listen_port")
308            .and_then(|v| v.parse().ok())
309            .unwrap_or(4242);
310        let max_connections = params.get("max_connections").and_then(|v| v.parse().ok());
311        let mut config = TcpServerConfig {
312            name: name.to_string(),
313            listen_ip,
314            listen_port,
315            interface_id: id,
316            max_connections,
317            ingress_control: IngressControlConfig::enabled(),
318            runtime: Arc::new(Mutex::new(TcpServerRuntime {
319                max_connections: None,
320            })),
321        };
322        let startup = TcpServerRuntime::from_config(&config);
323        config.runtime = Arc::new(Mutex::new(startup));
324        Ok(Box::new(config))
325    }
326
327    fn start(
328        &self,
329        config: Box<dyn InterfaceConfigData>,
330        ctx: StartContext,
331    ) -> io::Result<StartResult> {
332        let mut cfg = *config
333            .into_any()
334            .downcast::<TcpServerConfig>()
335            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
336        cfg.ingress_control = ctx.ingress_control;
337        let control = start(cfg, ctx.tx, ctx.next_dynamic_id)?;
338        Ok(StartResult::Listener {
339            control: Some(control),
340        })
341    }
342}
343
344pub(crate) fn runtime_handle_from_config(config: &TcpServerConfig) -> TcpServerRuntimeConfigHandle {
345    TcpServerRuntimeConfigHandle {
346        interface_name: config.name.clone(),
347        runtime: Arc::clone(&config.runtime),
348        startup: TcpServerRuntime::from_config(config),
349    }
350}
351
352#[cfg(test)]
353mod tests {
354    use super::*;
355    use std::net::TcpStream;
356    use std::sync::mpsc::RecvTimeoutError;
357    use std::time::Duration;
358
359    fn find_free_port() -> u16 {
360        TcpListener::bind("127.0.0.1:0")
361            .unwrap()
362            .local_addr()
363            .unwrap()
364            .port()
365    }
366
367    fn make_server_config(
368        port: u16,
369        interface_id: u64,
370        max_connections: Option<usize>,
371    ) -> TcpServerConfig {
372        let mut config = TcpServerConfig {
373            name: "test-server".into(),
374            listen_ip: "127.0.0.1".into(),
375            listen_port: port,
376            interface_id: InterfaceId(interface_id),
377            max_connections,
378            ingress_control: IngressControlConfig::enabled(),
379            runtime: Arc::new(Mutex::new(TcpServerRuntime {
380                max_connections: None,
381            })),
382        };
383        let startup = TcpServerRuntime::from_config(&config);
384        config.runtime = Arc::new(Mutex::new(startup));
385        config
386    }
387
388    #[test]
389    fn accept_connection() {
390        let port = find_free_port();
391        let (tx, rx) = crate::event::channel();
392        let next_id = Arc::new(AtomicU64::new(1000));
393
394        let config = make_server_config(port, 1, None);
395
396        start(config, tx, next_id).unwrap();
397
398        // Give server time to start listening
399        thread::sleep(Duration::from_millis(50));
400
401        // Connect a client
402        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
403
404        // Should receive InterfaceUp with InterfaceInfo (dynamic)
405        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
406        match event {
407            Event::InterfaceUp(id, writer, info) => {
408                assert_eq!(id, InterfaceId(1000));
409                assert!(writer.is_some());
410                assert!(info.is_some());
411            }
412            other => panic!("expected InterfaceUp, got {:?}", other),
413        }
414    }
415
416    #[test]
417    fn spawned_client_inherits_ingress_control_config() {
418        let port = find_free_port();
419        let (tx, rx) = crate::event::channel();
420        let next_id = Arc::new(AtomicU64::new(1100));
421
422        let mut config = make_server_config(port, 11, None);
423        config.ingress_control = IngressControlConfig::disabled();
424        config.ingress_control.max_held_announces = 17;
425        config.ingress_control.burst_hold = 1.5;
426        config.ingress_control.burst_freq_new = 2.5;
427        config.ingress_control.burst_freq = 3.5;
428        config.ingress_control.new_time = 4.5;
429        config.ingress_control.burst_penalty = 5.5;
430        config.ingress_control.held_release_interval = 6.5;
431
432        start(config, tx, next_id).unwrap();
433        thread::sleep(Duration::from_millis(50));
434
435        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
436
437        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
438        match event {
439            Event::InterfaceUp(_, _, Some(info)) => {
440                assert!(!info.ingress_control.enabled);
441                assert_eq!(info.ingress_control.max_held_announces, 17);
442                assert_eq!(info.ingress_control.burst_hold, 1.5);
443                assert_eq!(info.ingress_control.burst_freq_new, 2.5);
444                assert_eq!(info.ingress_control.burst_freq, 3.5);
445                assert_eq!(info.ingress_control.new_time, 4.5);
446                assert_eq!(info.ingress_control.burst_penalty, 5.5);
447                assert_eq!(info.ingress_control.held_release_interval, 6.5);
448            }
449            other => panic!("expected InterfaceUp with InterfaceInfo, got {:?}", other),
450        }
451    }
452
453    #[test]
454    fn listener_stop_prevents_new_accepts() {
455        let port = find_free_port();
456        let (tx, rx) = crate::event::channel();
457        let next_id = Arc::new(AtomicU64::new(1500));
458
459        let config = make_server_config(port, 15, None);
460        let control = start(config, tx, next_id).unwrap();
461
462        thread::sleep(Duration::from_millis(50));
463        control.request_stop();
464        thread::sleep(Duration::from_millis(120));
465
466        let connect_result = TcpStream::connect(format!("127.0.0.1:{}", port));
467        if let Ok(stream) = connect_result {
468            drop(stream);
469        }
470
471        match rx.recv_timeout(Duration::from_millis(200)) {
472            Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => {}
473            other => panic!(
474                "expected no InterfaceUp after listener stop, got {:?}",
475                other
476            ),
477        }
478    }
479
480    #[test]
481    fn receive_frame_from_client() {
482        let port = find_free_port();
483        let (tx, rx) = crate::event::channel();
484        let next_id = Arc::new(AtomicU64::new(2000));
485
486        let config = make_server_config(port, 2, None);
487
488        start(config, tx, next_id).unwrap();
489        thread::sleep(Duration::from_millis(50));
490
491        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
492
493        // Drain InterfaceUp
494        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
495
496        // Send an HDLC frame (>= 19 bytes)
497        let payload: Vec<u8> = (0..32).collect();
498        let framed = hdlc::frame(&payload);
499        client.write_all(&framed).unwrap();
500
501        // Should receive Frame event
502        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
503        match event {
504            Event::Frame { interface_id, data } => {
505                assert_eq!(interface_id, InterfaceId(2000));
506                assert_eq!(data, payload);
507            }
508            other => panic!("expected Frame, got {:?}", other),
509        }
510    }
511
512    #[test]
513    fn send_frame_to_client() {
514        let port = find_free_port();
515        let (tx, rx) = crate::event::channel();
516        let next_id = Arc::new(AtomicU64::new(3000));
517
518        let config = make_server_config(port, 3, None);
519
520        start(config, tx, next_id).unwrap();
521        thread::sleep(Duration::from_millis(50));
522
523        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
524        client
525            .set_read_timeout(Some(Duration::from_secs(2)))
526            .unwrap();
527
528        // Get the writer from InterfaceUp
529        let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
530        let mut writer = match event {
531            Event::InterfaceUp(_, Some(w), _) => w,
532            other => panic!("expected InterfaceUp with writer, got {:?}", other),
533        };
534
535        // Send a frame via writer
536        let payload: Vec<u8> = (0..24).collect();
537        writer.send_frame(&payload).unwrap();
538
539        // Read from client side
540        let mut buf = [0u8; 256];
541        let n = client.read(&mut buf).unwrap();
542        let expected = hdlc::frame(&payload);
543        assert_eq!(&buf[..n], &expected[..]);
544    }
545
546    #[test]
547    fn multiple_clients() {
548        let port = find_free_port();
549        let (tx, rx) = crate::event::channel();
550        let next_id = Arc::new(AtomicU64::new(4000));
551
552        let config = make_server_config(port, 4, None);
553
554        start(config, tx, next_id).unwrap();
555        thread::sleep(Duration::from_millis(50));
556
557        // Connect two clients
558        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
559        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
560
561        // Collect InterfaceUp events
562        let mut ids = Vec::new();
563        for _ in 0..2 {
564            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
565            match event {
566                Event::InterfaceUp(id, _, _) => ids.push(id),
567                other => panic!("expected InterfaceUp, got {:?}", other),
568            }
569        }
570
571        // Should have unique IDs
572        assert_eq!(ids.len(), 2);
573        assert_ne!(ids[0], ids[1]);
574    }
575
576    #[test]
577    fn client_disconnect() {
578        let port = find_free_port();
579        let (tx, rx) = crate::event::channel();
580        let next_id = Arc::new(AtomicU64::new(5000));
581
582        let config = make_server_config(port, 5, None);
583
584        start(config, tx, next_id).unwrap();
585        thread::sleep(Duration::from_millis(50));
586
587        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
588
589        // Drain InterfaceUp
590        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
591
592        // Disconnect
593        drop(client);
594
595        // Should receive InterfaceDown
596        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
597        assert!(
598            matches!(event, Event::InterfaceDown(InterfaceId(5000))),
599            "expected InterfaceDown(5000), got {:?}",
600            event
601        );
602    }
603
604    #[test]
605    fn server_bind_port() {
606        let port = find_free_port();
607        let (tx, _rx) = crate::event::channel();
608        let next_id = Arc::new(AtomicU64::new(6000));
609
610        let config = make_server_config(port, 6, None);
611
612        // Should not error
613        start(config, tx, next_id).unwrap();
614    }
615
616    #[test]
617    fn max_connections_rejects_excess() {
618        let port = find_free_port();
619        let (tx, rx) = crate::event::channel();
620        let next_id = Arc::new(AtomicU64::new(7000));
621
622        let config = make_server_config(port, 7, Some(2));
623
624        start(config, tx, next_id).unwrap();
625        thread::sleep(Duration::from_millis(50));
626
627        // Connect two clients (at limit)
628        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
629        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
630
631        // Drain both InterfaceUp events
632        for _ in 0..2 {
633            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
634            assert!(matches!(event, Event::InterfaceUp(_, _, _)));
635        }
636
637        // Third connection should be accepted at TCP level but immediately dropped by server
638        let client3 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
639        client3
640            .set_read_timeout(Some(Duration::from_millis(500)))
641            .unwrap();
642
643        // Give server time to reject
644        thread::sleep(Duration::from_millis(100));
645
646        // Should NOT receive a third InterfaceUp
647        let result = rx.recv_timeout(Duration::from_millis(500));
648        assert!(
649            result.is_err(),
650            "expected no InterfaceUp for rejected connection, got {:?}",
651            result
652        );
653    }
654
655    #[test]
656    fn max_connections_allows_after_disconnect() {
657        let port = find_free_port();
658        let (tx, rx) = crate::event::channel();
659        let next_id = Arc::new(AtomicU64::new(7100));
660
661        let config = make_server_config(port, 71, Some(1));
662
663        start(config, tx, next_id).unwrap();
664        thread::sleep(Duration::from_millis(50));
665
666        // Connect first client
667        let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
668        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
669        assert!(matches!(event, Event::InterfaceUp(_, _, _)));
670
671        // Disconnect first client
672        drop(client1);
673
674        // Wait for InterfaceDown
675        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
676        assert!(matches!(event, Event::InterfaceDown(_)));
677
678        // Now a new connection should be accepted
679        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
680        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
681        assert!(
682            matches!(event, Event::InterfaceUp(_, _, _)),
683            "expected InterfaceUp after slot freed, got {:?}",
684            event
685        );
686    }
687
688    #[test]
689    fn runtime_max_connections_updates_live() {
690        let port = find_free_port();
691        let (tx, rx) = crate::event::channel();
692        let next_id = Arc::new(AtomicU64::new(7200));
693
694        let config = make_server_config(port, 72, None);
695        let runtime = Arc::clone(&config.runtime);
696
697        start(config, tx, next_id).unwrap();
698        thread::sleep(Duration::from_millis(50));
699
700        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
701        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
702        assert!(matches!(event, Event::InterfaceUp(_, _, _)));
703
704        {
705            let mut runtime = runtime.lock().unwrap();
706            runtime.max_connections = Some(1);
707        }
708
709        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
710        let result = rx.recv_timeout(Duration::from_millis(400));
711        assert!(
712            result.is_err(),
713            "expected no InterfaceUp after lowering max_connections, got {:?}",
714            result
715        );
716    }
717}