Skip to main content

rns_net/interface/
tcp_server.rs

1//! TCP server interface with HDLC framing.
2//!
3//! Accepts client connections and spawns per-client reader threads.
4//! Each client gets a dynamically allocated InterfaceId.
5//! Matches Python `TCPServerInterface` from `TCPInterface.py`.
6
7use std::io::{self, Read, Write};
8use std::net::{TcpListener, TcpStream};
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use std::thread;
12
13use rns_core::constants;
14use rns_core::transport::types::{InterfaceId, InterfaceInfo};
15
16use crate::event::{Event, EventSender};
17use crate::hdlc;
18use crate::interface::Writer;
19
20/// Configuration for a TCP server interface.
21#[derive(Debug, Clone)]
22pub struct TcpServerConfig {
23    pub name: String,
24    pub listen_ip: String,
25    pub listen_port: u16,
26    pub interface_id: InterfaceId,
27}
28
29impl Default for TcpServerConfig {
30    fn default() -> Self {
31        TcpServerConfig {
32            name: String::new(),
33            listen_ip: "0.0.0.0".into(),
34            listen_port: 4242,
35            interface_id: InterfaceId(0),
36        }
37    }
38}
39
40/// Writer that sends HDLC-framed data over a TCP stream.
41struct TcpServerWriter {
42    stream: TcpStream,
43}
44
45impl Writer for TcpServerWriter {
46    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
47        self.stream.write_all(&hdlc::frame(data))
48    }
49}
50
51/// Start a TCP server. Spawns a listener thread that accepts connections
52/// and per-client reader threads. Returns immediately.
53///
54/// `next_id` is shared with the node for allocating unique InterfaceIds
55/// for each connected client.
56pub fn start(
57    config: TcpServerConfig,
58    tx: EventSender,
59    next_id: Arc<AtomicU64>,
60) -> io::Result<()> {
61    let addr = format!("{}:{}", config.listen_ip, config.listen_port);
62    let listener = TcpListener::bind(&addr)?;
63
64    log::info!("[{}] TCP server listening on {}", config.name, addr);
65
66    let name = config.name.clone();
67    thread::Builder::new()
68        .name(format!("tcp-server-{}", config.interface_id.0))
69        .spawn(move || {
70            listener_loop(listener, name, tx, next_id);
71        })?;
72
73    Ok(())
74}
75
76/// Listener thread: accepts connections and spawns reader threads.
77fn listener_loop(
78    listener: TcpListener,
79    name: String,
80    tx: EventSender,
81    next_id: Arc<AtomicU64>,
82) {
83    for stream_result in listener.incoming() {
84        let stream = match stream_result {
85            Ok(s) => s,
86            Err(e) => {
87                log::warn!("[{}] accept failed: {}", name, e);
88                continue;
89            }
90        };
91
92        let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
93        let peer_addr = stream.peer_addr().ok();
94
95        log::info!(
96            "[{}] client connected: {:?} → id {}",
97            name,
98            peer_addr,
99            client_id.0
100        );
101
102        // Set TCP_NODELAY on the client socket
103        if let Err(e) = stream.set_nodelay(true) {
104            log::warn!("[{}] set_nodelay failed: {}", name, e);
105        }
106
107        // Clone stream for writer
108        let writer_stream = match stream.try_clone() {
109            Ok(s) => s,
110            Err(e) => {
111                log::warn!("[{}] failed to clone stream: {}", name, e);
112                continue;
113            }
114        };
115
116        let writer: Box<dyn Writer> = Box::new(TcpServerWriter { stream: writer_stream });
117
118        let info = InterfaceInfo {
119            id: client_id,
120            name: format!("TCPServerInterface/Client-{}", client_id.0),
121            mode: constants::MODE_FULL,
122            out_capable: true,
123            in_capable: true,
124            bitrate: None,
125            announce_rate_target: None,
126            announce_rate_grace: 0,
127            announce_rate_penalty: 0.0,
128            announce_cap: constants::ANNOUNCE_CAP,
129            is_local_client: false,
130            wants_tunnel: false,
131            tunnel_id: None,
132            mtu: 65535,
133            ia_freq: 0.0,
134            started: 0.0,
135            ingress_control: true,
136        };
137
138        // Send InterfaceUp with InterfaceInfo for dynamic registration
139        if tx
140            .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
141            .is_err()
142        {
143            // Driver shut down
144            return;
145        }
146
147        // Spawn reader thread for this client
148        let client_tx = tx.clone();
149        let client_name = name.clone();
150        thread::Builder::new()
151            .name(format!("tcp-server-reader-{}", client_id.0))
152            .spawn(move || {
153                client_reader_loop(stream, client_id, client_name, client_tx);
154            })
155            .ok();
156    }
157}
158
159/// Per-client reader thread: reads HDLC frames, sends to driver.
160fn client_reader_loop(
161    mut stream: TcpStream,
162    id: InterfaceId,
163    name: String,
164    tx: EventSender,
165) {
166    let mut decoder = hdlc::Decoder::new();
167    let mut buf = [0u8; 4096];
168
169    loop {
170        match stream.read(&mut buf) {
171            Ok(0) => {
172                log::info!("[{}] client {} disconnected", name, id.0);
173                let _ = tx.send(Event::InterfaceDown(id));
174                return;
175            }
176            Ok(n) => {
177                for frame in decoder.feed(&buf[..n]) {
178                    if tx
179                        .send(Event::Frame {
180                            interface_id: id,
181                            data: frame,
182                        })
183                        .is_err()
184                    {
185                        // Driver shut down
186                        return;
187                    }
188                }
189            }
190            Err(e) => {
191                log::warn!("[{}] client {} read error: {}", name, id.0, e);
192                let _ = tx.send(Event::InterfaceDown(id));
193                return;
194            }
195        }
196    }
197}
198
199// --- Factory implementation ---
200
201use std::collections::HashMap;
202use super::{InterfaceFactory, InterfaceConfigData, StartContext, StartResult};
203
204/// Factory for `TCPServerInterface`.
205pub struct TcpServerFactory;
206
207impl InterfaceFactory for TcpServerFactory {
208    fn type_name(&self) -> &str { "TCPServerInterface" }
209
210    fn parse_config(
211        &self,
212        name: &str,
213        id: InterfaceId,
214        params: &HashMap<String, String>,
215    ) -> Result<Box<dyn InterfaceConfigData>, String> {
216        let listen_ip = params.get("listen_ip")
217            .cloned()
218            .unwrap_or_else(|| "0.0.0.0".into());
219        let listen_port = params.get("listen_port")
220            .and_then(|v| v.parse().ok())
221            .unwrap_or(4242);
222
223        Ok(Box::new(TcpServerConfig {
224            name: name.to_string(),
225            listen_ip,
226            listen_port,
227            interface_id: id,
228        }))
229    }
230
231    fn start(
232        &self,
233        config: Box<dyn InterfaceConfigData>,
234        ctx: StartContext,
235    ) -> io::Result<StartResult> {
236        let cfg = *config.into_any().downcast::<TcpServerConfig>()
237            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
238        start(cfg, ctx.tx, ctx.next_dynamic_id)?;
239        Ok(StartResult::Listener)
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246    use std::net::TcpStream;
247    use std::sync::mpsc;
248    use std::time::Duration;
249
250    fn find_free_port() -> u16 {
251        TcpListener::bind("127.0.0.1:0")
252            .unwrap()
253            .local_addr()
254            .unwrap()
255            .port()
256    }
257
258    #[test]
259    fn accept_connection() {
260        let port = find_free_port();
261        let (tx, rx) = mpsc::channel();
262        let next_id = Arc::new(AtomicU64::new(1000));
263
264        let config = TcpServerConfig {
265            name: "test-server".into(),
266            listen_ip: "127.0.0.1".into(),
267            listen_port: port,
268            interface_id: InterfaceId(1),
269        };
270
271        start(config, tx, next_id).unwrap();
272
273        // Give server time to start listening
274        thread::sleep(Duration::from_millis(50));
275
276        // Connect a client
277        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
278
279        // Should receive InterfaceUp with InterfaceInfo (dynamic)
280        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
281        match event {
282            Event::InterfaceUp(id, writer, info) => {
283                assert_eq!(id, InterfaceId(1000));
284                assert!(writer.is_some());
285                assert!(info.is_some());
286            }
287            other => panic!("expected InterfaceUp, got {:?}", other),
288        }
289    }
290
291    #[test]
292    fn receive_frame_from_client() {
293        let port = find_free_port();
294        let (tx, rx) = mpsc::channel();
295        let next_id = Arc::new(AtomicU64::new(2000));
296
297        let config = TcpServerConfig {
298            name: "test-server".into(),
299            listen_ip: "127.0.0.1".into(),
300            listen_port: port,
301            interface_id: InterfaceId(2),
302        };
303
304        start(config, tx, next_id).unwrap();
305        thread::sleep(Duration::from_millis(50));
306
307        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
308
309        // Drain InterfaceUp
310        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
311
312        // Send an HDLC frame (>= 19 bytes)
313        let payload: Vec<u8> = (0..32).collect();
314        let framed = hdlc::frame(&payload);
315        client.write_all(&framed).unwrap();
316
317        // Should receive Frame event
318        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
319        match event {
320            Event::Frame { interface_id, data } => {
321                assert_eq!(interface_id, InterfaceId(2000));
322                assert_eq!(data, payload);
323            }
324            other => panic!("expected Frame, got {:?}", other),
325        }
326    }
327
328    #[test]
329    fn send_frame_to_client() {
330        let port = find_free_port();
331        let (tx, rx) = mpsc::channel();
332        let next_id = Arc::new(AtomicU64::new(3000));
333
334        let config = TcpServerConfig {
335            name: "test-server".into(),
336            listen_ip: "127.0.0.1".into(),
337            listen_port: port,
338            interface_id: InterfaceId(3),
339        };
340
341        start(config, tx, next_id).unwrap();
342        thread::sleep(Duration::from_millis(50));
343
344        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
345        client.set_read_timeout(Some(Duration::from_secs(2))).unwrap();
346
347        // Get the writer from InterfaceUp
348        let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
349        let mut writer = match event {
350            Event::InterfaceUp(_, Some(w), _) => w,
351            other => panic!("expected InterfaceUp with writer, got {:?}", other),
352        };
353
354        // Send a frame via writer
355        let payload: Vec<u8> = (0..24).collect();
356        writer.send_frame(&payload).unwrap();
357
358        // Read from client side
359        let mut buf = [0u8; 256];
360        let n = client.read(&mut buf).unwrap();
361        let expected = hdlc::frame(&payload);
362        assert_eq!(&buf[..n], &expected[..]);
363    }
364
365    #[test]
366    fn multiple_clients() {
367        let port = find_free_port();
368        let (tx, rx) = mpsc::channel();
369        let next_id = Arc::new(AtomicU64::new(4000));
370
371        let config = TcpServerConfig {
372            name: "test-server".into(),
373            listen_ip: "127.0.0.1".into(),
374            listen_port: port,
375            interface_id: InterfaceId(4),
376        };
377
378        start(config, tx, next_id).unwrap();
379        thread::sleep(Duration::from_millis(50));
380
381        // Connect two clients
382        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
383        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
384
385        // Collect InterfaceUp events
386        let mut ids = Vec::new();
387        for _ in 0..2 {
388            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
389            match event {
390                Event::InterfaceUp(id, _, _) => ids.push(id),
391                other => panic!("expected InterfaceUp, got {:?}", other),
392            }
393        }
394
395        // Should have unique IDs
396        assert_eq!(ids.len(), 2);
397        assert_ne!(ids[0], ids[1]);
398    }
399
400    #[test]
401    fn client_disconnect() {
402        let port = find_free_port();
403        let (tx, rx) = mpsc::channel();
404        let next_id = Arc::new(AtomicU64::new(5000));
405
406        let config = TcpServerConfig {
407            name: "test-server".into(),
408            listen_ip: "127.0.0.1".into(),
409            listen_port: port,
410            interface_id: InterfaceId(5),
411        };
412
413        start(config, tx, next_id).unwrap();
414        thread::sleep(Duration::from_millis(50));
415
416        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
417
418        // Drain InterfaceUp
419        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
420
421        // Disconnect
422        drop(client);
423
424        // Should receive InterfaceDown
425        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
426        assert!(
427            matches!(event, Event::InterfaceDown(InterfaceId(5000))),
428            "expected InterfaceDown(5000), got {:?}",
429            event
430        );
431    }
432
433    #[test]
434    fn server_bind_port() {
435        let port = find_free_port();
436        let (tx, _rx) = mpsc::channel();
437        let next_id = Arc::new(AtomicU64::new(6000));
438
439        let config = TcpServerConfig {
440            name: "test-server".into(),
441            listen_ip: "127.0.0.1".into(),
442            listen_port: port,
443            interface_id: InterfaceId(6),
444        };
445
446        // Should not error
447        start(config, tx, next_id).unwrap();
448    }
449}