Skip to main content

rns_net/interface/
local.rs

1//! Local shared instance interface.
2//!
3//! Provides communication between the shared RNS instance and local client
4//! programs. Uses Unix abstract sockets on Linux, TCP on other platforms.
5//! HDLC framing over the connection (same as TCP interfaces).
6//!
7//! Two modes:
8//! - `LocalServer`: The shared instance binds and accepts client connections.
9//! - `LocalClient`: Connects to an existing shared instance.
10
11use std::io::{self, Read, Write};
12use std::net::{TcpListener, TcpStream};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use std::thread;
16use std::time::Duration;
17
18use rns_core::constants;
19use rns_core::transport::types::{InterfaceId, InterfaceInfo};
20
21use crate::event::{Event, EventSender};
22use crate::hdlc;
23use crate::interface::Writer;
24
25/// Configuration for a Local server (shared instance).
26#[derive(Debug, Clone)]
27pub struct LocalServerConfig {
28    pub instance_name: String,
29    pub port: u16,
30    pub interface_id: InterfaceId,
31}
32
33impl Default for LocalServerConfig {
34    fn default() -> Self {
35        LocalServerConfig {
36            instance_name: "default".into(),
37            port: 37428,
38            interface_id: InterfaceId(0),
39        }
40    }
41}
42
43/// Configuration for a Local client (connecting to shared instance).
44#[derive(Debug, Clone)]
45pub struct LocalClientConfig {
46    pub name: String,
47    pub instance_name: String,
48    pub port: u16,
49    pub interface_id: InterfaceId,
50    pub reconnect_wait: Duration,
51}
52
53impl Default for LocalClientConfig {
54    fn default() -> Self {
55        LocalClientConfig {
56            name: "Local shared instance".into(),
57            instance_name: "default".into(),
58            port: 37428,
59            interface_id: InterfaceId(0),
60            reconnect_wait: Duration::from_secs(8),
61        }
62    }
63}
64
65/// HDLC writer over a TCP or Unix stream.
66struct LocalWriter {
67    stream: TcpStream,
68}
69
70impl Writer for LocalWriter {
71    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
72        self.stream.write_all(&hdlc::frame(data))
73    }
74}
75
76#[cfg(target_os = "linux")]
77mod unix_socket {
78    use std::io;
79    use std::os::unix::net::{UnixListener, UnixStream};
80
81    /// Try to bind a Unix abstract socket with the given instance name.
82    pub fn try_bind_unix(instance_name: &str) -> io::Result<UnixListener> {
83        let path = format!("\0rns/{}", instance_name);
84        UnixListener::bind(path)
85    }
86
87    /// Try to connect to a Unix abstract socket.
88    pub fn try_connect_unix(instance_name: &str) -> io::Result<UnixStream> {
89        let path = format!("\0rns/{}", instance_name);
90        UnixStream::connect(path)
91    }
92}
93
94// ==================== LOCAL SERVER ====================
95
96/// Start a local server (shared instance).
97/// Tries Unix abstract socket first on Linux, falls back to TCP.
98/// Spawns an acceptor thread. Each client gets a dynamically allocated InterfaceId.
99pub fn start_server(
100    config: LocalServerConfig,
101    tx: EventSender,
102    next_id: Arc<AtomicU64>,
103) -> io::Result<()> {
104    // Try Unix socket first on Linux
105    #[cfg(target_os = "linux")]
106    {
107        match unix_socket::try_bind_unix(&config.instance_name) {
108            Ok(listener) => {
109                log::info!(
110                    "Local server using Unix socket: rns/{}",
111                    config.instance_name
112                );
113                let name = format!("rns/{}", config.instance_name);
114                thread::Builder::new()
115                    .name("local-server".into())
116                    .spawn(move || {
117                        unix_server_loop(listener, name, tx, next_id);
118                    })?;
119                return Ok(());
120            }
121            Err(e) => {
122                log::info!(
123                    "Unix socket bind failed ({}), falling back to TCP",
124                    e
125                );
126            }
127        }
128    }
129
130    // Fallback: TCP on localhost
131    let addr = format!("127.0.0.1:{}", config.port);
132    let listener = TcpListener::bind(&addr)?;
133
134    log::info!("Local server listening on TCP {}", addr);
135
136    thread::Builder::new()
137        .name("local-server".into())
138        .spawn(move || {
139            tcp_server_loop(listener, tx, next_id);
140        })?;
141
142    Ok(())
143}
144
145/// TCP server accept loop for local interface.
146fn tcp_server_loop(listener: TcpListener, tx: EventSender, next_id: Arc<AtomicU64>) {
147    for stream_result in listener.incoming() {
148        let stream = match stream_result {
149            Ok(s) => s,
150            Err(e) => {
151                log::warn!("Local server accept failed: {}", e);
152                continue;
153            }
154        };
155
156        if let Err(e) = stream.set_nodelay(true) {
157            log::warn!("Local server set_nodelay failed: {}", e);
158        }
159
160        let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
161        spawn_local_client_handler(stream, client_id, tx.clone());
162    }
163}
164
165/// Unix socket server accept loop for local interface.
166#[cfg(target_os = "linux")]
167fn unix_server_loop(
168    listener: std::os::unix::net::UnixListener,
169    name: String,
170    tx: EventSender,
171    next_id: Arc<AtomicU64>,
172) {
173    for stream_result in listener.incoming() {
174        let stream = match stream_result {
175            Ok(s) => s,
176            Err(e) => {
177                log::warn!("[{}] Local server accept failed: {}", name, e);
178                continue;
179            }
180        };
181
182        let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
183
184        // Convert UnixStream to a pair of read/write handles
185        let writer_stream = match stream.try_clone() {
186            Ok(s) => s,
187            Err(e) => {
188                log::warn!("Local server clone failed: {}", e);
189                continue;
190            }
191        };
192
193        let info = make_local_interface_info(client_id);
194        let writer: Box<dyn Writer> = Box::new(UnixLocalWriter { stream: writer_stream });
195
196        if tx
197            .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
198            .is_err()
199        {
200            return;
201        }
202
203        let client_tx = tx.clone();
204        thread::Builder::new()
205            .name(format!("local-unix-reader-{}", client_id.0))
206            .spawn(move || {
207                unix_reader_loop(stream, client_id, client_tx);
208            })
209            .ok();
210    }
211}
212
213#[cfg(target_os = "linux")]
214struct UnixLocalWriter {
215    stream: std::os::unix::net::UnixStream,
216}
217
218#[cfg(target_os = "linux")]
219impl Writer for UnixLocalWriter {
220    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
221        use std::io::Write;
222        self.stream.write_all(&hdlc::frame(data))
223    }
224}
225
226#[cfg(target_os = "linux")]
227fn unix_reader_loop(
228    mut stream: std::os::unix::net::UnixStream,
229    id: InterfaceId,
230    tx: EventSender,
231) {
232    use std::io::Read;
233    let mut decoder = hdlc::Decoder::new();
234    let mut buf = [0u8; 4096];
235
236    loop {
237        match stream.read(&mut buf) {
238            Ok(0) => {
239                let _ = tx.send(Event::InterfaceDown(id));
240                return;
241            }
242            Ok(n) => {
243                for frame in decoder.feed(&buf[..n]) {
244                    if tx
245                        .send(Event::Frame {
246                            interface_id: id,
247                            data: frame,
248                        })
249                        .is_err()
250                    {
251                        return;
252                    }
253                }
254            }
255            Err(_) => {
256                let _ = tx.send(Event::InterfaceDown(id));
257                return;
258            }
259        }
260    }
261}
262
263/// Spawn handler threads for a connected TCP local client.
264fn spawn_local_client_handler(stream: TcpStream, client_id: InterfaceId, tx: EventSender) {
265    let writer_stream = match stream.try_clone() {
266        Ok(s) => s,
267        Err(e) => {
268            log::warn!("Local server clone failed: {}", e);
269            return;
270        }
271    };
272
273    let info = make_local_interface_info(client_id);
274    let writer: Box<dyn Writer> = Box::new(LocalWriter { stream: writer_stream });
275
276    if tx
277        .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
278        .is_err()
279    {
280        return;
281    }
282
283    thread::Builder::new()
284        .name(format!("local-reader-{}", client_id.0))
285        .spawn(move || {
286            tcp_reader_loop(stream, client_id, tx);
287        })
288        .ok();
289}
290
291fn tcp_reader_loop(mut stream: TcpStream, id: InterfaceId, tx: EventSender) {
292    let mut decoder = hdlc::Decoder::new();
293    let mut buf = [0u8; 4096];
294
295    loop {
296        match stream.read(&mut buf) {
297            Ok(0) => {
298                log::info!("Local client {} disconnected", id.0);
299                let _ = tx.send(Event::InterfaceDown(id));
300                return;
301            }
302            Ok(n) => {
303                for frame in decoder.feed(&buf[..n]) {
304                    if tx
305                        .send(Event::Frame {
306                            interface_id: id,
307                            data: frame,
308                        })
309                        .is_err()
310                    {
311                        return;
312                    }
313                }
314            }
315            Err(e) => {
316                log::warn!("Local client {} read error: {}", id.0, e);
317                let _ = tx.send(Event::InterfaceDown(id));
318                return;
319            }
320        }
321    }
322}
323
324fn make_local_interface_info(id: InterfaceId) -> InterfaceInfo {
325    InterfaceInfo {
326        id,
327        name: String::from("LocalInterface"),
328        mode: constants::MODE_FULL,
329        out_capable: true,
330        in_capable: true,
331        bitrate: Some(1_000_000_000), // 1 Gbps
332        announce_rate_target: None,
333        announce_rate_grace: 0,
334        announce_rate_penalty: 0.0,
335        announce_cap: constants::ANNOUNCE_CAP,
336        is_local_client: false,
337        wants_tunnel: false,
338        tunnel_id: None,
339        mtu: 65535,
340        ia_freq: 0.0,
341        started: 0.0,
342        ingress_control: false,
343    }
344}
345
346// ==================== LOCAL CLIENT ====================
347
348/// Start a local client (connect to shared instance).
349/// Tries Unix socket first on Linux, falls back to TCP.
350/// Returns the writer for the driver.
351pub fn start_client(config: LocalClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
352    let id = config.interface_id;
353
354    // Try Unix socket first on Linux
355    #[cfg(target_os = "linux")]
356    {
357        match unix_socket::try_connect_unix(&config.instance_name) {
358            Ok(stream) => {
359                log::info!(
360                    "[{}] Connected to shared instance via Unix socket: rns/{}",
361                    config.name,
362                    config.instance_name
363                );
364
365                let writer_stream = stream.try_clone()?;
366                let _ = tx.send(Event::InterfaceUp(id, None, None));
367
368                let client_tx = tx;
369                thread::Builder::new()
370                    .name(format!("local-client-reader-{}", id.0))
371                    .spawn(move || {
372                        unix_reader_loop(stream, id, client_tx);
373                    })?;
374
375                return Ok(Box::new(UnixLocalWriter { stream: writer_stream }));
376            }
377            Err(e) => {
378                log::info!(
379                    "[{}] Unix socket connect failed ({}), trying TCP",
380                    config.name,
381                    e
382                );
383            }
384        }
385    }
386
387    // Fallback: TCP
388    let addr = format!("127.0.0.1:{}", config.port);
389    let stream = TcpStream::connect(&addr)?;
390    stream.set_nodelay(true)?;
391
392    log::info!("[{}] Connected to shared instance via TCP {}", config.name, addr);
393
394    let reader_stream = stream.try_clone()?;
395    let writer_stream = stream.try_clone()?;
396
397    let _ = tx.send(Event::InterfaceUp(id, None, None));
398
399    thread::Builder::new()
400        .name(format!("local-client-reader-{}", id.0))
401        .spawn(move || {
402            tcp_reader_loop(reader_stream, id, tx);
403        })?;
404
405    Ok(Box::new(LocalWriter { stream: writer_stream }))
406}
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411    use std::sync::mpsc;
412
413    fn find_free_port() -> u16 {
414        TcpListener::bind("127.0.0.1:0")
415            .unwrap()
416            .local_addr()
417            .unwrap()
418            .port()
419    }
420
421    #[test]
422    fn server_bind_tcp() {
423        let port = find_free_port();
424        let (tx, _rx) = mpsc::channel();
425        let next_id = Arc::new(AtomicU64::new(7000));
426
427        let config = LocalServerConfig {
428            instance_name: "test-bind".into(),
429            port,
430            interface_id: InterfaceId(70),
431        };
432
433        // We force TCP by using a unique instance name that won't conflict
434        // with any existing Unix socket
435        start_server(config, tx, next_id).unwrap();
436        thread::sleep(Duration::from_millis(50));
437
438        // Should be able to connect
439        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
440    }
441
442    #[test]
443    fn server_accept_client() {
444        let port = find_free_port();
445        let (tx, rx) = mpsc::channel();
446        let next_id = Arc::new(AtomicU64::new(7100));
447
448        let config = LocalServerConfig {
449            instance_name: "test-accept".into(),
450            port,
451            interface_id: InterfaceId(71),
452        };
453
454        start_server(config, tx, next_id).unwrap();
455        thread::sleep(Duration::from_millis(50));
456
457        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
458
459        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
460        match event {
461            Event::InterfaceUp(id, writer, info) => {
462                assert_eq!(id, InterfaceId(7100));
463                assert!(writer.is_some());
464                assert!(info.is_some());
465            }
466            other => panic!("expected InterfaceUp, got {:?}", other),
467        }
468    }
469
470    #[test]
471    fn client_send_receive() {
472        let port = find_free_port();
473        let (server_tx, server_rx) = mpsc::channel();
474        let next_id = Arc::new(AtomicU64::new(7200));
475
476        let server_config = LocalServerConfig {
477            instance_name: "test-sr".into(),
478            port,
479            interface_id: InterfaceId(72),
480        };
481
482        start_server(server_config, server_tx, next_id).unwrap();
483        thread::sleep(Duration::from_millis(50));
484
485        // Connect client
486        let (client_tx, client_rx) = mpsc::channel();
487        let client_config = LocalClientConfig {
488            name: "test-client".into(),
489            instance_name: "test-sr".into(),
490            port,
491            interface_id: InterfaceId(73),
492            reconnect_wait: Duration::from_secs(1),
493        };
494
495        let mut client_writer = start_client(client_config, client_tx).unwrap();
496
497        // Get server-side InterfaceUp
498        let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
499        let mut server_writer = match event {
500            Event::InterfaceUp(_, Some(w), _) => w,
501            other => panic!("expected InterfaceUp with writer, got {:?}", other),
502        };
503
504        // Get client-side InterfaceUp
505        let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
506        match event {
507            Event::InterfaceUp(id, _, _) => assert_eq!(id, InterfaceId(73)),
508            other => panic!("expected InterfaceUp, got {:?}", other),
509        }
510
511        // Client sends to server
512        let payload: Vec<u8> = (0..32).collect();
513        client_writer.send_frame(&payload).unwrap();
514
515        let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
516        match event {
517            Event::Frame { data, .. } => assert_eq!(data, payload),
518            other => panic!("expected Frame, got {:?}", other),
519        }
520
521        // Server sends to client
522        let payload2: Vec<u8> = (100..132).collect();
523        server_writer.send_frame(&payload2).unwrap();
524
525        let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
526        match event {
527            Event::Frame { data, .. } => assert_eq!(data, payload2),
528            other => panic!("expected Frame, got {:?}", other),
529        }
530    }
531
532    #[test]
533    fn multiple_local_clients() {
534        let port = find_free_port();
535        let (tx, rx) = mpsc::channel();
536        let next_id = Arc::new(AtomicU64::new(7300));
537
538        let config = LocalServerConfig {
539            instance_name: "test-multi".into(),
540            port,
541            interface_id: InterfaceId(74),
542        };
543
544        start_server(config, tx, next_id).unwrap();
545        thread::sleep(Duration::from_millis(50));
546
547        let _c1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
548        let _c2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
549
550        let mut ids = Vec::new();
551        for _ in 0..2 {
552            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
553            match event {
554                Event::InterfaceUp(id, _, _) => ids.push(id),
555                other => panic!("expected InterfaceUp, got {:?}", other),
556            }
557        }
558
559        assert_eq!(ids.len(), 2);
560        assert_ne!(ids[0], ids[1]);
561    }
562
563    #[test]
564    fn client_disconnect_detected() {
565        let port = find_free_port();
566        let (tx, rx) = mpsc::channel();
567        let next_id = Arc::new(AtomicU64::new(7400));
568
569        let config = LocalServerConfig {
570            instance_name: "test-dc".into(),
571            port,
572            interface_id: InterfaceId(75),
573        };
574
575        start_server(config, tx, next_id).unwrap();
576        thread::sleep(Duration::from_millis(50));
577
578        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
579
580        // Drain InterfaceUp
581        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
582
583        // Disconnect
584        drop(client);
585
586        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
587        assert!(
588            matches!(event, Event::InterfaceDown(_)),
589            "expected InterfaceDown, got {:?}",
590            event
591        );
592    }
593}