Skip to main content

rns_net/interface/
tcp_server.rs

1//! TCP server interface with HDLC framing.
2//!
3//! Accepts client connections and spawns per-client reader threads.
4//! Each client gets a dynamically allocated InterfaceId.
5//! Matches Python `TCPServerInterface` from `TCPInterface.py`.
6
7use std::io::{self, Read, Write};
8use std::net::{TcpListener, TcpStream};
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use std::thread;
12
13use rns_core::constants;
14use rns_core::transport::types::{InterfaceId, InterfaceInfo};
15
16use crate::event::{Event, EventSender};
17use crate::hdlc;
18use crate::interface::Writer;
19
20/// Configuration for a TCP server interface.
21#[derive(Debug, Clone)]
22pub struct TcpServerConfig {
23    pub name: String,
24    pub listen_ip: String,
25    pub listen_port: u16,
26    pub interface_id: InterfaceId,
27}
28
29impl Default for TcpServerConfig {
30    fn default() -> Self {
31        TcpServerConfig {
32            name: String::new(),
33            listen_ip: "0.0.0.0".into(),
34            listen_port: 4242,
35            interface_id: InterfaceId(0),
36        }
37    }
38}
39
40/// Writer that sends HDLC-framed data over a TCP stream.
41struct TcpServerWriter {
42    stream: TcpStream,
43}
44
45impl Writer for TcpServerWriter {
46    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
47        self.stream.write_all(&hdlc::frame(data))
48    }
49}
50
51/// Start a TCP server. Spawns a listener thread that accepts connections
52/// and per-client reader threads. Returns immediately.
53///
54/// `next_id` is shared with the node for allocating unique InterfaceIds
55/// for each connected client.
56pub fn start(
57    config: TcpServerConfig,
58    tx: EventSender,
59    next_id: Arc<AtomicU64>,
60) -> io::Result<()> {
61    let addr = format!("{}:{}", config.listen_ip, config.listen_port);
62    let listener = TcpListener::bind(&addr)?;
63
64    log::info!("[{}] TCP server listening on {}", config.name, addr);
65
66    let name = config.name.clone();
67    thread::Builder::new()
68        .name(format!("tcp-server-{}", config.interface_id.0))
69        .spawn(move || {
70            listener_loop(listener, name, tx, next_id);
71        })?;
72
73    Ok(())
74}
75
76/// Listener thread: accepts connections and spawns reader threads.
77fn listener_loop(
78    listener: TcpListener,
79    name: String,
80    tx: EventSender,
81    next_id: Arc<AtomicU64>,
82) {
83    for stream_result in listener.incoming() {
84        let stream = match stream_result {
85            Ok(s) => s,
86            Err(e) => {
87                log::warn!("[{}] accept failed: {}", name, e);
88                continue;
89            }
90        };
91
92        let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
93        let peer_addr = stream.peer_addr().ok();
94
95        log::info!(
96            "[{}] client connected: {:?} → id {}",
97            name,
98            peer_addr,
99            client_id.0
100        );
101
102        // Set TCP_NODELAY on the client socket
103        if let Err(e) = stream.set_nodelay(true) {
104            log::warn!("[{}] set_nodelay failed: {}", name, e);
105        }
106
107        // Clone stream for writer
108        let writer_stream = match stream.try_clone() {
109            Ok(s) => s,
110            Err(e) => {
111                log::warn!("[{}] failed to clone stream: {}", name, e);
112                continue;
113            }
114        };
115
116        let writer: Box<dyn Writer> = Box::new(TcpServerWriter { stream: writer_stream });
117
118        let info = InterfaceInfo {
119            id: client_id,
120            name: format!("TCPServerInterface/Client-{}", client_id.0),
121            mode: constants::MODE_FULL,
122            out_capable: true,
123            in_capable: true,
124            bitrate: None,
125            announce_rate_target: None,
126            announce_rate_grace: 0,
127            announce_rate_penalty: 0.0,
128            announce_cap: constants::ANNOUNCE_CAP,
129            is_local_client: false,
130            wants_tunnel: false,
131            tunnel_id: None,
132        };
133
134        // Send InterfaceUp with InterfaceInfo for dynamic registration
135        if tx
136            .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
137            .is_err()
138        {
139            // Driver shut down
140            return;
141        }
142
143        // Spawn reader thread for this client
144        let client_tx = tx.clone();
145        let client_name = name.clone();
146        thread::Builder::new()
147            .name(format!("tcp-server-reader-{}", client_id.0))
148            .spawn(move || {
149                client_reader_loop(stream, client_id, client_name, client_tx);
150            })
151            .ok();
152    }
153}
154
155/// Per-client reader thread: reads HDLC frames, sends to driver.
156fn client_reader_loop(
157    mut stream: TcpStream,
158    id: InterfaceId,
159    name: String,
160    tx: EventSender,
161) {
162    let mut decoder = hdlc::Decoder::new();
163    let mut buf = [0u8; 4096];
164
165    loop {
166        match stream.read(&mut buf) {
167            Ok(0) => {
168                log::info!("[{}] client {} disconnected", name, id.0);
169                let _ = tx.send(Event::InterfaceDown(id));
170                return;
171            }
172            Ok(n) => {
173                for frame in decoder.feed(&buf[..n]) {
174                    if tx
175                        .send(Event::Frame {
176                            interface_id: id,
177                            data: frame,
178                        })
179                        .is_err()
180                    {
181                        // Driver shut down
182                        return;
183                    }
184                }
185            }
186            Err(e) => {
187                log::warn!("[{}] client {} read error: {}", name, id.0, e);
188                let _ = tx.send(Event::InterfaceDown(id));
189                return;
190            }
191        }
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198    use std::net::TcpStream;
199    use std::sync::mpsc;
200    use std::time::Duration;
201
202    fn find_free_port() -> u16 {
203        TcpListener::bind("127.0.0.1:0")
204            .unwrap()
205            .local_addr()
206            .unwrap()
207            .port()
208    }
209
210    #[test]
211    fn accept_connection() {
212        let port = find_free_port();
213        let (tx, rx) = mpsc::channel();
214        let next_id = Arc::new(AtomicU64::new(1000));
215
216        let config = TcpServerConfig {
217            name: "test-server".into(),
218            listen_ip: "127.0.0.1".into(),
219            listen_port: port,
220            interface_id: InterfaceId(1),
221        };
222
223        start(config, tx, next_id).unwrap();
224
225        // Give server time to start listening
226        thread::sleep(Duration::from_millis(50));
227
228        // Connect a client
229        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
230
231        // Should receive InterfaceUp with InterfaceInfo (dynamic)
232        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
233        match event {
234            Event::InterfaceUp(id, writer, info) => {
235                assert_eq!(id, InterfaceId(1000));
236                assert!(writer.is_some());
237                assert!(info.is_some());
238            }
239            other => panic!("expected InterfaceUp, got {:?}", other),
240        }
241    }
242
243    #[test]
244    fn receive_frame_from_client() {
245        let port = find_free_port();
246        let (tx, rx) = mpsc::channel();
247        let next_id = Arc::new(AtomicU64::new(2000));
248
249        let config = TcpServerConfig {
250            name: "test-server".into(),
251            listen_ip: "127.0.0.1".into(),
252            listen_port: port,
253            interface_id: InterfaceId(2),
254        };
255
256        start(config, tx, next_id).unwrap();
257        thread::sleep(Duration::from_millis(50));
258
259        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
260
261        // Drain InterfaceUp
262        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
263
264        // Send an HDLC frame (>= 19 bytes)
265        let payload: Vec<u8> = (0..32).collect();
266        let framed = hdlc::frame(&payload);
267        client.write_all(&framed).unwrap();
268
269        // Should receive Frame event
270        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
271        match event {
272            Event::Frame { interface_id, data } => {
273                assert_eq!(interface_id, InterfaceId(2000));
274                assert_eq!(data, payload);
275            }
276            other => panic!("expected Frame, got {:?}", other),
277        }
278    }
279
280    #[test]
281    fn send_frame_to_client() {
282        let port = find_free_port();
283        let (tx, rx) = mpsc::channel();
284        let next_id = Arc::new(AtomicU64::new(3000));
285
286        let config = TcpServerConfig {
287            name: "test-server".into(),
288            listen_ip: "127.0.0.1".into(),
289            listen_port: port,
290            interface_id: InterfaceId(3),
291        };
292
293        start(config, tx, next_id).unwrap();
294        thread::sleep(Duration::from_millis(50));
295
296        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
297        client.set_read_timeout(Some(Duration::from_secs(2))).unwrap();
298
299        // Get the writer from InterfaceUp
300        let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
301        let mut writer = match event {
302            Event::InterfaceUp(_, Some(w), _) => w,
303            other => panic!("expected InterfaceUp with writer, got {:?}", other),
304        };
305
306        // Send a frame via writer
307        let payload: Vec<u8> = (0..24).collect();
308        writer.send_frame(&payload).unwrap();
309
310        // Read from client side
311        let mut buf = [0u8; 256];
312        let n = client.read(&mut buf).unwrap();
313        let expected = hdlc::frame(&payload);
314        assert_eq!(&buf[..n], &expected[..]);
315    }
316
317    #[test]
318    fn multiple_clients() {
319        let port = find_free_port();
320        let (tx, rx) = mpsc::channel();
321        let next_id = Arc::new(AtomicU64::new(4000));
322
323        let config = TcpServerConfig {
324            name: "test-server".into(),
325            listen_ip: "127.0.0.1".into(),
326            listen_port: port,
327            interface_id: InterfaceId(4),
328        };
329
330        start(config, tx, next_id).unwrap();
331        thread::sleep(Duration::from_millis(50));
332
333        // Connect two clients
334        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
335        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
336
337        // Collect InterfaceUp events
338        let mut ids = Vec::new();
339        for _ in 0..2 {
340            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
341            match event {
342                Event::InterfaceUp(id, _, _) => ids.push(id),
343                other => panic!("expected InterfaceUp, got {:?}", other),
344            }
345        }
346
347        // Should have unique IDs
348        assert_eq!(ids.len(), 2);
349        assert_ne!(ids[0], ids[1]);
350    }
351
352    #[test]
353    fn client_disconnect() {
354        let port = find_free_port();
355        let (tx, rx) = mpsc::channel();
356        let next_id = Arc::new(AtomicU64::new(5000));
357
358        let config = TcpServerConfig {
359            name: "test-server".into(),
360            listen_ip: "127.0.0.1".into(),
361            listen_port: port,
362            interface_id: InterfaceId(5),
363        };
364
365        start(config, tx, next_id).unwrap();
366        thread::sleep(Duration::from_millis(50));
367
368        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
369
370        // Drain InterfaceUp
371        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
372
373        // Disconnect
374        drop(client);
375
376        // Should receive InterfaceDown
377        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
378        assert!(
379            matches!(event, Event::InterfaceDown(InterfaceId(5000))),
380            "expected InterfaceDown(5000), got {:?}",
381            event
382        );
383    }
384
385    #[test]
386    fn server_bind_port() {
387        let port = find_free_port();
388        let (tx, _rx) = mpsc::channel();
389        let next_id = Arc::new(AtomicU64::new(6000));
390
391        let config = TcpServerConfig {
392            name: "test-server".into(),
393            listen_ip: "127.0.0.1".into(),
394            listen_port: port,
395            interface_id: InterfaceId(6),
396        };
397
398        // Should not error
399        start(config, tx, next_id).unwrap();
400    }
401}