Skip to main content

rns_net/interface/
tcp_server.rs

1//! TCP server interface with HDLC framing.
2//!
3//! Accepts client connections and spawns per-client reader threads.
4//! Each client gets a dynamically allocated InterfaceId.
5//! Matches Python `TCPServerInterface` from `TCPInterface.py`.
6
7use std::io::{self, Read, Write};
8use std::net::{TcpListener, TcpStream};
9use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
10use std::sync::{Arc, Mutex};
11use std::thread;
12
13use rns_core::constants;
14use rns_core::transport::types::{InterfaceId, InterfaceInfo};
15
16use crate::event::{Event, EventSender};
17use crate::hdlc;
18use crate::interface::{ListenerControl, Writer};
19
20/// Configuration for a TCP server interface.
21#[derive(Debug, Clone)]
22pub struct TcpServerConfig {
23    pub name: String,
24    pub listen_ip: String,
25    pub listen_port: u16,
26    pub interface_id: InterfaceId,
27    pub max_connections: Option<usize>,
28    pub runtime: Arc<Mutex<TcpServerRuntime>>,
29}
30
31#[derive(Debug, Clone)]
32pub struct TcpServerRuntime {
33    pub max_connections: Option<usize>,
34}
35
36impl TcpServerRuntime {
37    pub fn from_config(config: &TcpServerConfig) -> Self {
38        Self {
39            max_connections: config.max_connections,
40        }
41    }
42}
43
44#[derive(Debug, Clone)]
45pub struct TcpServerRuntimeConfigHandle {
46    pub interface_name: String,
47    pub runtime: Arc<Mutex<TcpServerRuntime>>,
48    pub startup: TcpServerRuntime,
49}
50
51impl Default for TcpServerConfig {
52    fn default() -> Self {
53        let mut config = TcpServerConfig {
54            name: String::new(),
55            listen_ip: "0.0.0.0".into(),
56            listen_port: 4242,
57            interface_id: InterfaceId(0),
58            max_connections: None,
59            runtime: Arc::new(Mutex::new(TcpServerRuntime {
60                max_connections: None,
61            })),
62        };
63        let startup = TcpServerRuntime::from_config(&config);
64        config.runtime = Arc::new(Mutex::new(startup));
65        config
66    }
67}
68
69/// Writer that sends HDLC-framed data over a TCP stream.
70struct TcpServerWriter {
71    stream: TcpStream,
72}
73
74impl Writer for TcpServerWriter {
75    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
76        self.stream.write_all(&hdlc::frame(data))
77    }
78}
79
80/// Start a TCP server. Spawns a listener thread that accepts connections
81/// and per-client reader threads. Returns immediately.
82///
83/// `next_id` is shared with the node for allocating unique InterfaceIds
84/// for each connected client.
85pub fn start(
86    config: TcpServerConfig,
87    tx: EventSender,
88    next_id: Arc<AtomicU64>,
89) -> io::Result<ListenerControl> {
90    let addr = format!("{}:{}", config.listen_ip, config.listen_port);
91    let listener = TcpListener::bind(&addr)?;
92    listener.set_nonblocking(true)?;
93
94    log::info!("[{}] TCP server listening on {}", config.name, addr);
95
96    let name = config.name.clone();
97    let runtime = Arc::clone(&config.runtime);
98    let active_connections = Arc::new(AtomicUsize::new(0));
99    let control = ListenerControl::new();
100    let listener_control = control.clone();
101    thread::Builder::new()
102        .name(format!("tcp-server-{}", config.interface_id.0))
103        .spawn(move || {
104            listener_loop(
105                listener,
106                name,
107                tx,
108                next_id,
109                runtime,
110                active_connections,
111                listener_control,
112            );
113        })?;
114
115    Ok(control)
116}
117
118/// Listener thread: accepts connections and spawns reader threads.
119fn listener_loop(
120    listener: TcpListener,
121    name: String,
122    tx: EventSender,
123    next_id: Arc<AtomicU64>,
124    runtime: Arc<Mutex<TcpServerRuntime>>,
125    active_connections: Arc<AtomicUsize>,
126    control: ListenerControl,
127) {
128    loop {
129        if control.should_stop() {
130            log::info!("[{}] listener stopping", name);
131            return;
132        }
133
134        let stream_result = listener.accept().map(|(stream, _)| stream);
135        let stream = match stream_result {
136            Ok(s) => s,
137            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
138                thread::sleep(std::time::Duration::from_millis(50));
139                continue;
140            }
141            Err(e) => {
142                log::warn!("[{}] accept failed: {}", name, e);
143                continue;
144            }
145        };
146
147        let max_connections = runtime.lock().unwrap().max_connections;
148        if let Some(max) = max_connections {
149            if active_connections.load(Ordering::Relaxed) >= max {
150                let peer = stream.peer_addr().ok();
151                log::warn!(
152                    "[{}] max connections ({}) reached, rejecting {:?}",
153                    name,
154                    max,
155                    peer
156                );
157                drop(stream);
158                continue;
159            }
160        }
161
162        active_connections.fetch_add(1, Ordering::Relaxed);
163
164        let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
165        let peer_addr = stream.peer_addr().ok();
166
167        log::info!(
168            "[{}] client connected: {:?} → id {}",
169            name,
170            peer_addr,
171            client_id.0
172        );
173
174        // Set TCP_NODELAY on the client socket
175        if let Err(e) = stream.set_nodelay(true) {
176            log::warn!("[{}] set_nodelay failed: {}", name, e);
177        }
178
179        // Clone stream for writer
180        let writer_stream = match stream.try_clone() {
181            Ok(s) => s,
182            Err(e) => {
183                log::warn!("[{}] failed to clone stream: {}", name, e);
184                continue;
185            }
186        };
187
188        let writer: Box<dyn Writer> = Box::new(TcpServerWriter {
189            stream: writer_stream,
190        });
191
192        let info = InterfaceInfo {
193            id: client_id,
194            name: format!("TCPServerInterface/Client-{}", client_id.0),
195            mode: constants::MODE_FULL,
196            out_capable: true,
197            in_capable: true,
198            bitrate: None,
199            announce_rate_target: None,
200            announce_rate_grace: 0,
201            announce_rate_penalty: 0.0,
202            announce_cap: constants::ANNOUNCE_CAP,
203            is_local_client: false,
204            wants_tunnel: false,
205            tunnel_id: None,
206            mtu: 65535,
207            ia_freq: 0.0,
208            started: 0.0,
209            ingress_control: true,
210        };
211
212        // Send InterfaceUp with InterfaceInfo for dynamic registration
213        if tx
214            .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
215            .is_err()
216        {
217            // Driver shut down
218            return;
219        }
220
221        // Spawn reader thread for this client
222        let client_tx = tx.clone();
223        let client_name = name.clone();
224        let client_active = active_connections.clone();
225        thread::Builder::new()
226            .name(format!("tcp-server-reader-{}", client_id.0))
227            .spawn(move || {
228                client_reader_loop(stream, client_id, client_name, client_tx, client_active);
229            })
230            .ok();
231    }
232}
233
234/// Per-client reader thread: reads HDLC frames, sends to driver.
235fn client_reader_loop(
236    mut stream: TcpStream,
237    id: InterfaceId,
238    name: String,
239    tx: EventSender,
240    active_connections: Arc<AtomicUsize>,
241) {
242    let mut decoder = hdlc::Decoder::new();
243    let mut buf = [0u8; 4096];
244
245    loop {
246        match stream.read(&mut buf) {
247            Ok(0) => {
248                log::info!("[{}] client {} disconnected", name, id.0);
249                active_connections.fetch_sub(1, Ordering::Relaxed);
250                let _ = tx.send(Event::InterfaceDown(id));
251                return;
252            }
253            Ok(n) => {
254                for frame in decoder.feed(&buf[..n]) {
255                    if tx
256                        .send(Event::Frame {
257                            interface_id: id,
258                            data: frame,
259                        })
260                        .is_err()
261                    {
262                        // Driver shut down
263                        active_connections.fetch_sub(1, Ordering::Relaxed);
264                        return;
265                    }
266                }
267            }
268            Err(e) => {
269                log::warn!("[{}] client {} read error: {}", name, id.0, e);
270                active_connections.fetch_sub(1, Ordering::Relaxed);
271                let _ = tx.send(Event::InterfaceDown(id));
272                return;
273            }
274        }
275    }
276}
277
278// --- Factory implementation ---
279
280use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
281use std::collections::HashMap;
282
283/// Factory for `TCPServerInterface`.
284pub struct TcpServerFactory;
285
286impl InterfaceFactory for TcpServerFactory {
287    fn type_name(&self) -> &str {
288        "TCPServerInterface"
289    }
290
291    fn parse_config(
292        &self,
293        name: &str,
294        id: InterfaceId,
295        params: &HashMap<String, String>,
296    ) -> Result<Box<dyn InterfaceConfigData>, String> {
297        let listen_ip = params
298            .get("listen_ip")
299            .cloned()
300            .unwrap_or_else(|| "0.0.0.0".into());
301        let listen_port = params
302            .get("listen_port")
303            .and_then(|v| v.parse().ok())
304            .unwrap_or(4242);
305        let max_connections = params.get("max_connections").and_then(|v| v.parse().ok());
306        let mut config = TcpServerConfig {
307            name: name.to_string(),
308            listen_ip,
309            listen_port,
310            interface_id: id,
311            max_connections,
312            runtime: Arc::new(Mutex::new(TcpServerRuntime {
313                max_connections: None,
314            })),
315        };
316        let startup = TcpServerRuntime::from_config(&config);
317        config.runtime = Arc::new(Mutex::new(startup));
318        Ok(Box::new(config))
319    }
320
321    fn start(
322        &self,
323        config: Box<dyn InterfaceConfigData>,
324        ctx: StartContext,
325    ) -> io::Result<StartResult> {
326        let cfg = *config
327            .into_any()
328            .downcast::<TcpServerConfig>()
329            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
330        let control = start(cfg, ctx.tx, ctx.next_dynamic_id)?;
331        Ok(StartResult::Listener {
332            control: Some(control),
333        })
334    }
335}
336
337pub(crate) fn runtime_handle_from_config(config: &TcpServerConfig) -> TcpServerRuntimeConfigHandle {
338    TcpServerRuntimeConfigHandle {
339        interface_name: config.name.clone(),
340        runtime: Arc::clone(&config.runtime),
341        startup: TcpServerRuntime::from_config(config),
342    }
343}
344
345#[cfg(test)]
346mod tests {
347    use super::*;
348    use std::net::TcpStream;
349    use std::sync::mpsc::RecvTimeoutError;
350    use std::time::Duration;
351
352    fn find_free_port() -> u16 {
353        TcpListener::bind("127.0.0.1:0")
354            .unwrap()
355            .local_addr()
356            .unwrap()
357            .port()
358    }
359
360    fn make_server_config(
361        port: u16,
362        interface_id: u64,
363        max_connections: Option<usize>,
364    ) -> TcpServerConfig {
365        let mut config = TcpServerConfig {
366            name: "test-server".into(),
367            listen_ip: "127.0.0.1".into(),
368            listen_port: port,
369            interface_id: InterfaceId(interface_id),
370            max_connections,
371            runtime: Arc::new(Mutex::new(TcpServerRuntime {
372                max_connections: None,
373            })),
374        };
375        let startup = TcpServerRuntime::from_config(&config);
376        config.runtime = Arc::new(Mutex::new(startup));
377        config
378    }
379
380    #[test]
381    fn accept_connection() {
382        let port = find_free_port();
383        let (tx, rx) = crate::event::channel();
384        let next_id = Arc::new(AtomicU64::new(1000));
385
386        let config = make_server_config(port, 1, None);
387
388        start(config, tx, next_id).unwrap();
389
390        // Give server time to start listening
391        thread::sleep(Duration::from_millis(50));
392
393        // Connect a client
394        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
395
396        // Should receive InterfaceUp with InterfaceInfo (dynamic)
397        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
398        match event {
399            Event::InterfaceUp(id, writer, info) => {
400                assert_eq!(id, InterfaceId(1000));
401                assert!(writer.is_some());
402                assert!(info.is_some());
403            }
404            other => panic!("expected InterfaceUp, got {:?}", other),
405        }
406    }
407
408    #[test]
409    fn listener_stop_prevents_new_accepts() {
410        let port = find_free_port();
411        let (tx, rx) = crate::event::channel();
412        let next_id = Arc::new(AtomicU64::new(1500));
413
414        let config = make_server_config(port, 15, None);
415        let control = start(config, tx, next_id).unwrap();
416
417        thread::sleep(Duration::from_millis(50));
418        control.request_stop();
419        thread::sleep(Duration::from_millis(120));
420
421        let connect_result = TcpStream::connect(format!("127.0.0.1:{}", port));
422        if let Ok(stream) = connect_result {
423            drop(stream);
424        }
425
426        match rx.recv_timeout(Duration::from_millis(200)) {
427            Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => {}
428            other => panic!(
429                "expected no InterfaceUp after listener stop, got {:?}",
430                other
431            ),
432        }
433    }
434
435    #[test]
436    fn receive_frame_from_client() {
437        let port = find_free_port();
438        let (tx, rx) = crate::event::channel();
439        let next_id = Arc::new(AtomicU64::new(2000));
440
441        let config = make_server_config(port, 2, None);
442
443        start(config, tx, next_id).unwrap();
444        thread::sleep(Duration::from_millis(50));
445
446        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
447
448        // Drain InterfaceUp
449        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
450
451        // Send an HDLC frame (>= 19 bytes)
452        let payload: Vec<u8> = (0..32).collect();
453        let framed = hdlc::frame(&payload);
454        client.write_all(&framed).unwrap();
455
456        // Should receive Frame event
457        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
458        match event {
459            Event::Frame { interface_id, data } => {
460                assert_eq!(interface_id, InterfaceId(2000));
461                assert_eq!(data, payload);
462            }
463            other => panic!("expected Frame, got {:?}", other),
464        }
465    }
466
467    #[test]
468    fn send_frame_to_client() {
469        let port = find_free_port();
470        let (tx, rx) = crate::event::channel();
471        let next_id = Arc::new(AtomicU64::new(3000));
472
473        let config = make_server_config(port, 3, None);
474
475        start(config, tx, next_id).unwrap();
476        thread::sleep(Duration::from_millis(50));
477
478        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
479        client
480            .set_read_timeout(Some(Duration::from_secs(2)))
481            .unwrap();
482
483        // Get the writer from InterfaceUp
484        let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
485        let mut writer = match event {
486            Event::InterfaceUp(_, Some(w), _) => w,
487            other => panic!("expected InterfaceUp with writer, got {:?}", other),
488        };
489
490        // Send a frame via writer
491        let payload: Vec<u8> = (0..24).collect();
492        writer.send_frame(&payload).unwrap();
493
494        // Read from client side
495        let mut buf = [0u8; 256];
496        let n = client.read(&mut buf).unwrap();
497        let expected = hdlc::frame(&payload);
498        assert_eq!(&buf[..n], &expected[..]);
499    }
500
501    #[test]
502    fn multiple_clients() {
503        let port = find_free_port();
504        let (tx, rx) = crate::event::channel();
505        let next_id = Arc::new(AtomicU64::new(4000));
506
507        let config = make_server_config(port, 4, None);
508
509        start(config, tx, next_id).unwrap();
510        thread::sleep(Duration::from_millis(50));
511
512        // Connect two clients
513        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
514        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
515
516        // Collect InterfaceUp events
517        let mut ids = Vec::new();
518        for _ in 0..2 {
519            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
520            match event {
521                Event::InterfaceUp(id, _, _) => ids.push(id),
522                other => panic!("expected InterfaceUp, got {:?}", other),
523            }
524        }
525
526        // Should have unique IDs
527        assert_eq!(ids.len(), 2);
528        assert_ne!(ids[0], ids[1]);
529    }
530
531    #[test]
532    fn client_disconnect() {
533        let port = find_free_port();
534        let (tx, rx) = crate::event::channel();
535        let next_id = Arc::new(AtomicU64::new(5000));
536
537        let config = make_server_config(port, 5, None);
538
539        start(config, tx, next_id).unwrap();
540        thread::sleep(Duration::from_millis(50));
541
542        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
543
544        // Drain InterfaceUp
545        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
546
547        // Disconnect
548        drop(client);
549
550        // Should receive InterfaceDown
551        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
552        assert!(
553            matches!(event, Event::InterfaceDown(InterfaceId(5000))),
554            "expected InterfaceDown(5000), got {:?}",
555            event
556        );
557    }
558
559    #[test]
560    fn server_bind_port() {
561        let port = find_free_port();
562        let (tx, _rx) = crate::event::channel();
563        let next_id = Arc::new(AtomicU64::new(6000));
564
565        let config = make_server_config(port, 6, None);
566
567        // Should not error
568        start(config, tx, next_id).unwrap();
569    }
570
571    #[test]
572    fn max_connections_rejects_excess() {
573        let port = find_free_port();
574        let (tx, rx) = crate::event::channel();
575        let next_id = Arc::new(AtomicU64::new(7000));
576
577        let config = make_server_config(port, 7, Some(2));
578
579        start(config, tx, next_id).unwrap();
580        thread::sleep(Duration::from_millis(50));
581
582        // Connect two clients (at limit)
583        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
584        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
585
586        // Drain both InterfaceUp events
587        for _ in 0..2 {
588            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
589            assert!(matches!(event, Event::InterfaceUp(_, _, _)));
590        }
591
592        // Third connection should be accepted at TCP level but immediately dropped by server
593        let client3 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
594        client3
595            .set_read_timeout(Some(Duration::from_millis(500)))
596            .unwrap();
597
598        // Give server time to reject
599        thread::sleep(Duration::from_millis(100));
600
601        // Should NOT receive a third InterfaceUp
602        let result = rx.recv_timeout(Duration::from_millis(500));
603        assert!(
604            result.is_err(),
605            "expected no InterfaceUp for rejected connection, got {:?}",
606            result
607        );
608    }
609
610    #[test]
611    fn max_connections_allows_after_disconnect() {
612        let port = find_free_port();
613        let (tx, rx) = crate::event::channel();
614        let next_id = Arc::new(AtomicU64::new(7100));
615
616        let config = make_server_config(port, 71, Some(1));
617
618        start(config, tx, next_id).unwrap();
619        thread::sleep(Duration::from_millis(50));
620
621        // Connect first client
622        let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
623        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
624        assert!(matches!(event, Event::InterfaceUp(_, _, _)));
625
626        // Disconnect first client
627        drop(client1);
628
629        // Wait for InterfaceDown
630        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
631        assert!(matches!(event, Event::InterfaceDown(_)));
632
633        // Now a new connection should be accepted
634        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
635        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
636        assert!(
637            matches!(event, Event::InterfaceUp(_, _, _)),
638            "expected InterfaceUp after slot freed, got {:?}",
639            event
640        );
641    }
642
643    #[test]
644    fn runtime_max_connections_updates_live() {
645        let port = find_free_port();
646        let (tx, rx) = crate::event::channel();
647        let next_id = Arc::new(AtomicU64::new(7200));
648
649        let config = make_server_config(port, 72, None);
650        let runtime = Arc::clone(&config.runtime);
651
652        start(config, tx, next_id).unwrap();
653        thread::sleep(Duration::from_millis(50));
654
655        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
656        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
657        assert!(matches!(event, Event::InterfaceUp(_, _, _)));
658
659        {
660            let mut runtime = runtime.lock().unwrap();
661            runtime.max_connections = Some(1);
662        }
663
664        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
665        let result = rx.recv_timeout(Duration::from_millis(400));
666        assert!(
667            result.is_err(),
668            "expected no InterfaceUp after lowering max_connections, got {:?}",
669            result
670        );
671    }
672}