Skip to main content

rns_net/interface/
tcp_server.rs

1//! TCP server interface with HDLC framing.
2//!
3//! Accepts client connections and spawns per-client reader threads.
4//! Each client gets a dynamically allocated InterfaceId.
5//! Matches Python `TCPServerInterface` from `TCPInterface.py`.
6
7use std::io::{self, Read, Write};
8use std::net::{TcpListener, TcpStream};
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use std::thread;
12
13use rns_core::constants;
14use rns_core::transport::types::{InterfaceId, InterfaceInfo};
15
16use crate::event::{Event, EventSender};
17use crate::hdlc;
18use crate::interface::Writer;
19
20/// Configuration for a TCP server interface.
21#[derive(Debug, Clone)]
22pub struct TcpServerConfig {
23    pub name: String,
24    pub listen_ip: String,
25    pub listen_port: u16,
26    pub interface_id: InterfaceId,
27}
28
29impl Default for TcpServerConfig {
30    fn default() -> Self {
31        TcpServerConfig {
32            name: String::new(),
33            listen_ip: "0.0.0.0".into(),
34            listen_port: 4242,
35            interface_id: InterfaceId(0),
36        }
37    }
38}
39
40/// Writer that sends HDLC-framed data over a TCP stream.
41struct TcpServerWriter {
42    stream: TcpStream,
43}
44
45impl Writer for TcpServerWriter {
46    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
47        self.stream.write_all(&hdlc::frame(data))
48    }
49}
50
51/// Start a TCP server. Spawns a listener thread that accepts connections
52/// and per-client reader threads. Returns immediately.
53///
54/// `next_id` is shared with the node for allocating unique InterfaceIds
55/// for each connected client.
56pub fn start(
57    config: TcpServerConfig,
58    tx: EventSender,
59    next_id: Arc<AtomicU64>,
60) -> io::Result<()> {
61    let addr = format!("{}:{}", config.listen_ip, config.listen_port);
62    let listener = TcpListener::bind(&addr)?;
63
64    log::info!("[{}] TCP server listening on {}", config.name, addr);
65
66    let name = config.name.clone();
67    thread::Builder::new()
68        .name(format!("tcp-server-{}", config.interface_id.0))
69        .spawn(move || {
70            listener_loop(listener, name, tx, next_id);
71        })?;
72
73    Ok(())
74}
75
76/// Listener thread: accepts connections and spawns reader threads.
77fn listener_loop(
78    listener: TcpListener,
79    name: String,
80    tx: EventSender,
81    next_id: Arc<AtomicU64>,
82) {
83    for stream_result in listener.incoming() {
84        let stream = match stream_result {
85            Ok(s) => s,
86            Err(e) => {
87                log::warn!("[{}] accept failed: {}", name, e);
88                continue;
89            }
90        };
91
92        let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
93        let peer_addr = stream.peer_addr().ok();
94
95        log::info!(
96            "[{}] client connected: {:?} → id {}",
97            name,
98            peer_addr,
99            client_id.0
100        );
101
102        // Set TCP_NODELAY on the client socket
103        if let Err(e) = stream.set_nodelay(true) {
104            log::warn!("[{}] set_nodelay failed: {}", name, e);
105        }
106
107        // Clone stream for writer
108        let writer_stream = match stream.try_clone() {
109            Ok(s) => s,
110            Err(e) => {
111                log::warn!("[{}] failed to clone stream: {}", name, e);
112                continue;
113            }
114        };
115
116        let writer: Box<dyn Writer> = Box::new(TcpServerWriter { stream: writer_stream });
117
118        let info = InterfaceInfo {
119            id: client_id,
120            name: format!("TCPServerInterface/Client-{}", client_id.0),
121            mode: constants::MODE_FULL,
122            out_capable: true,
123            in_capable: true,
124            bitrate: None,
125            announce_rate_target: None,
126            announce_rate_grace: 0,
127            announce_rate_penalty: 0.0,
128            announce_cap: constants::ANNOUNCE_CAP,
129            is_local_client: false,
130            wants_tunnel: false,
131            tunnel_id: None,
132            mtu: 65535,
133            ia_freq: 0.0,
134            started: 0.0,
135            ingress_control: true,
136        };
137
138        // Send InterfaceUp with InterfaceInfo for dynamic registration
139        if tx
140            .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
141            .is_err()
142        {
143            // Driver shut down
144            return;
145        }
146
147        // Spawn reader thread for this client
148        let client_tx = tx.clone();
149        let client_name = name.clone();
150        thread::Builder::new()
151            .name(format!("tcp-server-reader-{}", client_id.0))
152            .spawn(move || {
153                client_reader_loop(stream, client_id, client_name, client_tx);
154            })
155            .ok();
156    }
157}
158
159/// Per-client reader thread: reads HDLC frames, sends to driver.
160fn client_reader_loop(
161    mut stream: TcpStream,
162    id: InterfaceId,
163    name: String,
164    tx: EventSender,
165) {
166    let mut decoder = hdlc::Decoder::new();
167    let mut buf = [0u8; 4096];
168
169    loop {
170        match stream.read(&mut buf) {
171            Ok(0) => {
172                log::info!("[{}] client {} disconnected", name, id.0);
173                let _ = tx.send(Event::InterfaceDown(id));
174                return;
175            }
176            Ok(n) => {
177                for frame in decoder.feed(&buf[..n]) {
178                    if tx
179                        .send(Event::Frame {
180                            interface_id: id,
181                            data: frame,
182                        })
183                        .is_err()
184                    {
185                        // Driver shut down
186                        return;
187                    }
188                }
189            }
190            Err(e) => {
191                log::warn!("[{}] client {} read error: {}", name, id.0, e);
192                let _ = tx.send(Event::InterfaceDown(id));
193                return;
194            }
195        }
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202    use std::net::TcpStream;
203    use std::sync::mpsc;
204    use std::time::Duration;
205
206    fn find_free_port() -> u16 {
207        TcpListener::bind("127.0.0.1:0")
208            .unwrap()
209            .local_addr()
210            .unwrap()
211            .port()
212    }
213
214    #[test]
215    fn accept_connection() {
216        let port = find_free_port();
217        let (tx, rx) = mpsc::channel();
218        let next_id = Arc::new(AtomicU64::new(1000));
219
220        let config = TcpServerConfig {
221            name: "test-server".into(),
222            listen_ip: "127.0.0.1".into(),
223            listen_port: port,
224            interface_id: InterfaceId(1),
225        };
226
227        start(config, tx, next_id).unwrap();
228
229        // Give server time to start listening
230        thread::sleep(Duration::from_millis(50));
231
232        // Connect a client
233        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
234
235        // Should receive InterfaceUp with InterfaceInfo (dynamic)
236        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
237        match event {
238            Event::InterfaceUp(id, writer, info) => {
239                assert_eq!(id, InterfaceId(1000));
240                assert!(writer.is_some());
241                assert!(info.is_some());
242            }
243            other => panic!("expected InterfaceUp, got {:?}", other),
244        }
245    }
246
247    #[test]
248    fn receive_frame_from_client() {
249        let port = find_free_port();
250        let (tx, rx) = mpsc::channel();
251        let next_id = Arc::new(AtomicU64::new(2000));
252
253        let config = TcpServerConfig {
254            name: "test-server".into(),
255            listen_ip: "127.0.0.1".into(),
256            listen_port: port,
257            interface_id: InterfaceId(2),
258        };
259
260        start(config, tx, next_id).unwrap();
261        thread::sleep(Duration::from_millis(50));
262
263        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
264
265        // Drain InterfaceUp
266        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
267
268        // Send an HDLC frame (>= 19 bytes)
269        let payload: Vec<u8> = (0..32).collect();
270        let framed = hdlc::frame(&payload);
271        client.write_all(&framed).unwrap();
272
273        // Should receive Frame event
274        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
275        match event {
276            Event::Frame { interface_id, data } => {
277                assert_eq!(interface_id, InterfaceId(2000));
278                assert_eq!(data, payload);
279            }
280            other => panic!("expected Frame, got {:?}", other),
281        }
282    }
283
284    #[test]
285    fn send_frame_to_client() {
286        let port = find_free_port();
287        let (tx, rx) = mpsc::channel();
288        let next_id = Arc::new(AtomicU64::new(3000));
289
290        let config = TcpServerConfig {
291            name: "test-server".into(),
292            listen_ip: "127.0.0.1".into(),
293            listen_port: port,
294            interface_id: InterfaceId(3),
295        };
296
297        start(config, tx, next_id).unwrap();
298        thread::sleep(Duration::from_millis(50));
299
300        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
301        client.set_read_timeout(Some(Duration::from_secs(2))).unwrap();
302
303        // Get the writer from InterfaceUp
304        let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
305        let mut writer = match event {
306            Event::InterfaceUp(_, Some(w), _) => w,
307            other => panic!("expected InterfaceUp with writer, got {:?}", other),
308        };
309
310        // Send a frame via writer
311        let payload: Vec<u8> = (0..24).collect();
312        writer.send_frame(&payload).unwrap();
313
314        // Read from client side
315        let mut buf = [0u8; 256];
316        let n = client.read(&mut buf).unwrap();
317        let expected = hdlc::frame(&payload);
318        assert_eq!(&buf[..n], &expected[..]);
319    }
320
321    #[test]
322    fn multiple_clients() {
323        let port = find_free_port();
324        let (tx, rx) = mpsc::channel();
325        let next_id = Arc::new(AtomicU64::new(4000));
326
327        let config = TcpServerConfig {
328            name: "test-server".into(),
329            listen_ip: "127.0.0.1".into(),
330            listen_port: port,
331            interface_id: InterfaceId(4),
332        };
333
334        start(config, tx, next_id).unwrap();
335        thread::sleep(Duration::from_millis(50));
336
337        // Connect two clients
338        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
339        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
340
341        // Collect InterfaceUp events
342        let mut ids = Vec::new();
343        for _ in 0..2 {
344            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
345            match event {
346                Event::InterfaceUp(id, _, _) => ids.push(id),
347                other => panic!("expected InterfaceUp, got {:?}", other),
348            }
349        }
350
351        // Should have unique IDs
352        assert_eq!(ids.len(), 2);
353        assert_ne!(ids[0], ids[1]);
354    }
355
356    #[test]
357    fn client_disconnect() {
358        let port = find_free_port();
359        let (tx, rx) = mpsc::channel();
360        let next_id = Arc::new(AtomicU64::new(5000));
361
362        let config = TcpServerConfig {
363            name: "test-server".into(),
364            listen_ip: "127.0.0.1".into(),
365            listen_port: port,
366            interface_id: InterfaceId(5),
367        };
368
369        start(config, tx, next_id).unwrap();
370        thread::sleep(Duration::from_millis(50));
371
372        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
373
374        // Drain InterfaceUp
375        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
376
377        // Disconnect
378        drop(client);
379
380        // Should receive InterfaceDown
381        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
382        assert!(
383            matches!(event, Event::InterfaceDown(InterfaceId(5000))),
384            "expected InterfaceDown(5000), got {:?}",
385            event
386        );
387    }
388
389    #[test]
390    fn server_bind_port() {
391        let port = find_free_port();
392        let (tx, _rx) = mpsc::channel();
393        let next_id = Arc::new(AtomicU64::new(6000));
394
395        let config = TcpServerConfig {
396            name: "test-server".into(),
397            listen_ip: "127.0.0.1".into(),
398            listen_port: port,
399            interface_id: InterfaceId(6),
400        };
401
402        // Should not error
403        start(config, tx, next_id).unwrap();
404    }
405}