Skip to main content

rns_net/interface/
tcp_server.rs

1//! TCP server interface with HDLC framing.
2//!
3//! Accepts client connections and spawns per-client reader threads.
4//! Each client gets a dynamically allocated InterfaceId.
5//! Matches Python `TCPServerInterface` from `TCPInterface.py`.
6
7use std::io::{self, Read, Write};
8use std::net::{TcpListener, TcpStream};
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use std::thread;
12
13use rns_core::constants;
14use rns_core::transport::types::{InterfaceId, InterfaceInfo};
15
16use crate::event::{Event, EventSender};
17use crate::hdlc;
18use crate::interface::Writer;
19
20/// Configuration for a TCP server interface.
21#[derive(Debug, Clone)]
22pub struct TcpServerConfig {
23    pub name: String,
24    pub listen_ip: String,
25    pub listen_port: u16,
26    pub interface_id: InterfaceId,
27}
28
29impl Default for TcpServerConfig {
30    fn default() -> Self {
31        TcpServerConfig {
32            name: String::new(),
33            listen_ip: "0.0.0.0".into(),
34            listen_port: 4242,
35            interface_id: InterfaceId(0),
36        }
37    }
38}
39
40/// Writer that sends HDLC-framed data over a TCP stream.
41struct TcpServerWriter {
42    stream: TcpStream,
43}
44
45impl Writer for TcpServerWriter {
46    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
47        self.stream.write_all(&hdlc::frame(data))
48    }
49}
50
51/// Start a TCP server. Spawns a listener thread that accepts connections
52/// and per-client reader threads. Returns immediately.
53///
54/// `next_id` is shared with the node for allocating unique InterfaceIds
55/// for each connected client.
56pub fn start(config: TcpServerConfig, tx: EventSender, next_id: Arc<AtomicU64>) -> io::Result<()> {
57    let addr = format!("{}:{}", config.listen_ip, config.listen_port);
58    let listener = TcpListener::bind(&addr)?;
59
60    log::info!("[{}] TCP server listening on {}", config.name, addr);
61
62    let name = config.name.clone();
63    thread::Builder::new()
64        .name(format!("tcp-server-{}", config.interface_id.0))
65        .spawn(move || {
66            listener_loop(listener, name, tx, next_id);
67        })?;
68
69    Ok(())
70}
71
72/// Listener thread: accepts connections and spawns reader threads.
73fn listener_loop(listener: TcpListener, name: String, tx: EventSender, next_id: Arc<AtomicU64>) {
74    for stream_result in listener.incoming() {
75        let stream = match stream_result {
76            Ok(s) => s,
77            Err(e) => {
78                log::warn!("[{}] accept failed: {}", name, e);
79                continue;
80            }
81        };
82
83        let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
84        let peer_addr = stream.peer_addr().ok();
85
86        log::info!(
87            "[{}] client connected: {:?} → id {}",
88            name,
89            peer_addr,
90            client_id.0
91        );
92
93        // Set TCP_NODELAY on the client socket
94        if let Err(e) = stream.set_nodelay(true) {
95            log::warn!("[{}] set_nodelay failed: {}", name, e);
96        }
97
98        // Clone stream for writer
99        let writer_stream = match stream.try_clone() {
100            Ok(s) => s,
101            Err(e) => {
102                log::warn!("[{}] failed to clone stream: {}", name, e);
103                continue;
104            }
105        };
106
107        let writer: Box<dyn Writer> = Box::new(TcpServerWriter {
108            stream: writer_stream,
109        });
110
111        let info = InterfaceInfo {
112            id: client_id,
113            name: format!("TCPServerInterface/Client-{}", client_id.0),
114            mode: constants::MODE_FULL,
115            out_capable: true,
116            in_capable: true,
117            bitrate: None,
118            announce_rate_target: None,
119            announce_rate_grace: 0,
120            announce_rate_penalty: 0.0,
121            announce_cap: constants::ANNOUNCE_CAP,
122            is_local_client: false,
123            wants_tunnel: false,
124            tunnel_id: None,
125            mtu: 65535,
126            ia_freq: 0.0,
127            started: 0.0,
128            ingress_control: true,
129        };
130
131        // Send InterfaceUp with InterfaceInfo for dynamic registration
132        if tx
133            .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
134            .is_err()
135        {
136            // Driver shut down
137            return;
138        }
139
140        // Spawn reader thread for this client
141        let client_tx = tx.clone();
142        let client_name = name.clone();
143        thread::Builder::new()
144            .name(format!("tcp-server-reader-{}", client_id.0))
145            .spawn(move || {
146                client_reader_loop(stream, client_id, client_name, client_tx);
147            })
148            .ok();
149    }
150}
151
152/// Per-client reader thread: reads HDLC frames, sends to driver.
153fn client_reader_loop(mut stream: TcpStream, id: InterfaceId, name: String, tx: EventSender) {
154    let mut decoder = hdlc::Decoder::new();
155    let mut buf = [0u8; 4096];
156
157    loop {
158        match stream.read(&mut buf) {
159            Ok(0) => {
160                log::info!("[{}] client {} disconnected", name, id.0);
161                let _ = tx.send(Event::InterfaceDown(id));
162                return;
163            }
164            Ok(n) => {
165                for frame in decoder.feed(&buf[..n]) {
166                    if tx
167                        .send(Event::Frame {
168                            interface_id: id,
169                            data: frame,
170                        })
171                        .is_err()
172                    {
173                        // Driver shut down
174                        return;
175                    }
176                }
177            }
178            Err(e) => {
179                log::warn!("[{}] client {} read error: {}", name, id.0, e);
180                let _ = tx.send(Event::InterfaceDown(id));
181                return;
182            }
183        }
184    }
185}
186
187// --- Factory implementation ---
188
189use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
190use std::collections::HashMap;
191
192/// Factory for `TCPServerInterface`.
193pub struct TcpServerFactory;
194
195impl InterfaceFactory for TcpServerFactory {
196    fn type_name(&self) -> &str {
197        "TCPServerInterface"
198    }
199
200    fn parse_config(
201        &self,
202        name: &str,
203        id: InterfaceId,
204        params: &HashMap<String, String>,
205    ) -> Result<Box<dyn InterfaceConfigData>, String> {
206        let listen_ip = params
207            .get("listen_ip")
208            .cloned()
209            .unwrap_or_else(|| "0.0.0.0".into());
210        let listen_port = params
211            .get("listen_port")
212            .and_then(|v| v.parse().ok())
213            .unwrap_or(4242);
214
215        Ok(Box::new(TcpServerConfig {
216            name: name.to_string(),
217            listen_ip,
218            listen_port,
219            interface_id: id,
220        }))
221    }
222
223    fn start(
224        &self,
225        config: Box<dyn InterfaceConfigData>,
226        ctx: StartContext,
227    ) -> io::Result<StartResult> {
228        let cfg = *config
229            .into_any()
230            .downcast::<TcpServerConfig>()
231            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
232        start(cfg, ctx.tx, ctx.next_dynamic_id)?;
233        Ok(StartResult::Listener)
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use std::net::TcpStream;
241    use std::sync::mpsc;
242    use std::time::Duration;
243
244    fn find_free_port() -> u16 {
245        TcpListener::bind("127.0.0.1:0")
246            .unwrap()
247            .local_addr()
248            .unwrap()
249            .port()
250    }
251
252    #[test]
253    fn accept_connection() {
254        let port = find_free_port();
255        let (tx, rx) = mpsc::channel();
256        let next_id = Arc::new(AtomicU64::new(1000));
257
258        let config = TcpServerConfig {
259            name: "test-server".into(),
260            listen_ip: "127.0.0.1".into(),
261            listen_port: port,
262            interface_id: InterfaceId(1),
263        };
264
265        start(config, tx, next_id).unwrap();
266
267        // Give server time to start listening
268        thread::sleep(Duration::from_millis(50));
269
270        // Connect a client
271        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
272
273        // Should receive InterfaceUp with InterfaceInfo (dynamic)
274        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
275        match event {
276            Event::InterfaceUp(id, writer, info) => {
277                assert_eq!(id, InterfaceId(1000));
278                assert!(writer.is_some());
279                assert!(info.is_some());
280            }
281            other => panic!("expected InterfaceUp, got {:?}", other),
282        }
283    }
284
285    #[test]
286    fn receive_frame_from_client() {
287        let port = find_free_port();
288        let (tx, rx) = mpsc::channel();
289        let next_id = Arc::new(AtomicU64::new(2000));
290
291        let config = TcpServerConfig {
292            name: "test-server".into(),
293            listen_ip: "127.0.0.1".into(),
294            listen_port: port,
295            interface_id: InterfaceId(2),
296        };
297
298        start(config, tx, next_id).unwrap();
299        thread::sleep(Duration::from_millis(50));
300
301        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
302
303        // Drain InterfaceUp
304        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
305
306        // Send an HDLC frame (>= 19 bytes)
307        let payload: Vec<u8> = (0..32).collect();
308        let framed = hdlc::frame(&payload);
309        client.write_all(&framed).unwrap();
310
311        // Should receive Frame event
312        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
313        match event {
314            Event::Frame { interface_id, data } => {
315                assert_eq!(interface_id, InterfaceId(2000));
316                assert_eq!(data, payload);
317            }
318            other => panic!("expected Frame, got {:?}", other),
319        }
320    }
321
322    #[test]
323    fn send_frame_to_client() {
324        let port = find_free_port();
325        let (tx, rx) = mpsc::channel();
326        let next_id = Arc::new(AtomicU64::new(3000));
327
328        let config = TcpServerConfig {
329            name: "test-server".into(),
330            listen_ip: "127.0.0.1".into(),
331            listen_port: port,
332            interface_id: InterfaceId(3),
333        };
334
335        start(config, tx, next_id).unwrap();
336        thread::sleep(Duration::from_millis(50));
337
338        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
339        client
340            .set_read_timeout(Some(Duration::from_secs(2)))
341            .unwrap();
342
343        // Get the writer from InterfaceUp
344        let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
345        let mut writer = match event {
346            Event::InterfaceUp(_, Some(w), _) => w,
347            other => panic!("expected InterfaceUp with writer, got {:?}", other),
348        };
349
350        // Send a frame via writer
351        let payload: Vec<u8> = (0..24).collect();
352        writer.send_frame(&payload).unwrap();
353
354        // Read from client side
355        let mut buf = [0u8; 256];
356        let n = client.read(&mut buf).unwrap();
357        let expected = hdlc::frame(&payload);
358        assert_eq!(&buf[..n], &expected[..]);
359    }
360
361    #[test]
362    fn multiple_clients() {
363        let port = find_free_port();
364        let (tx, rx) = mpsc::channel();
365        let next_id = Arc::new(AtomicU64::new(4000));
366
367        let config = TcpServerConfig {
368            name: "test-server".into(),
369            listen_ip: "127.0.0.1".into(),
370            listen_port: port,
371            interface_id: InterfaceId(4),
372        };
373
374        start(config, tx, next_id).unwrap();
375        thread::sleep(Duration::from_millis(50));
376
377        // Connect two clients
378        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
379        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
380
381        // Collect InterfaceUp events
382        let mut ids = Vec::new();
383        for _ in 0..2 {
384            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
385            match event {
386                Event::InterfaceUp(id, _, _) => ids.push(id),
387                other => panic!("expected InterfaceUp, got {:?}", other),
388            }
389        }
390
391        // Should have unique IDs
392        assert_eq!(ids.len(), 2);
393        assert_ne!(ids[0], ids[1]);
394    }
395
396    #[test]
397    fn client_disconnect() {
398        let port = find_free_port();
399        let (tx, rx) = mpsc::channel();
400        let next_id = Arc::new(AtomicU64::new(5000));
401
402        let config = TcpServerConfig {
403            name: "test-server".into(),
404            listen_ip: "127.0.0.1".into(),
405            listen_port: port,
406            interface_id: InterfaceId(5),
407        };
408
409        start(config, tx, next_id).unwrap();
410        thread::sleep(Duration::from_millis(50));
411
412        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
413
414        // Drain InterfaceUp
415        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
416
417        // Disconnect
418        drop(client);
419
420        // Should receive InterfaceDown
421        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
422        assert!(
423            matches!(event, Event::InterfaceDown(InterfaceId(5000))),
424            "expected InterfaceDown(5000), got {:?}",
425            event
426        );
427    }
428
429    #[test]
430    fn server_bind_port() {
431        let port = find_free_port();
432        let (tx, _rx) = mpsc::channel();
433        let next_id = Arc::new(AtomicU64::new(6000));
434
435        let config = TcpServerConfig {
436            name: "test-server".into(),
437            listen_ip: "127.0.0.1".into(),
438            listen_port: port,
439            interface_id: InterfaceId(6),
440        };
441
442        // Should not error
443        start(config, tx, next_id).unwrap();
444    }
445}