Skip to main content

rns_net/interface/
local.rs

1//! Local shared instance interface.
2//!
3//! Provides communication between the shared RNS instance and local client
4//! programs. Uses Unix abstract sockets on Linux, TCP on other platforms.
5//! HDLC framing over the connection (same as TCP interfaces).
6//!
7//! Two modes:
8//! - `LocalServer`: The shared instance binds and accepts client connections.
9//! - `LocalClient`: Connects to an existing shared instance.
10
11use std::io::{self, Read, Write};
12use std::net::{TcpListener, TcpStream};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use std::thread;
16use std::time::Duration;
17
18use rns_core::constants;
19use rns_core::transport::types::{InterfaceId, InterfaceInfo};
20
21use crate::event::{Event, EventSender};
22use crate::hdlc;
23use crate::interface::Writer;
24
25/// Configuration for a Local server (shared instance).
26#[derive(Debug, Clone)]
27pub struct LocalServerConfig {
28    pub instance_name: String,
29    pub port: u16,
30    pub interface_id: InterfaceId,
31}
32
33impl Default for LocalServerConfig {
34    fn default() -> Self {
35        LocalServerConfig {
36            instance_name: "default".into(),
37            port: 37428,
38            interface_id: InterfaceId(0),
39        }
40    }
41}
42
43/// Configuration for a Local client (connecting to shared instance).
44#[derive(Debug, Clone)]
45pub struct LocalClientConfig {
46    pub name: String,
47    pub instance_name: String,
48    pub port: u16,
49    pub interface_id: InterfaceId,
50    pub reconnect_wait: Duration,
51}
52
53impl Default for LocalClientConfig {
54    fn default() -> Self {
55        LocalClientConfig {
56            name: "Local shared instance".into(),
57            instance_name: "default".into(),
58            port: 37428,
59            interface_id: InterfaceId(0),
60            reconnect_wait: Duration::from_secs(8),
61        }
62    }
63}
64
65/// HDLC writer over a TCP or Unix stream.
66struct LocalWriter {
67    stream: TcpStream,
68}
69
70impl Writer for LocalWriter {
71    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
72        self.stream.write_all(&hdlc::frame(data))
73    }
74}
75
76#[cfg(target_os = "linux")]
77mod unix_socket {
78    use std::io;
79    use std::os::unix::net::{UnixListener, UnixStream};
80
81    /// Try to bind a Unix abstract socket with the given instance name.
82    pub fn try_bind_unix(instance_name: &str) -> io::Result<UnixListener> {
83        let path = format!("\0rns/{}", instance_name);
84        UnixListener::bind(path)
85    }
86
87    /// Try to connect to a Unix abstract socket.
88    pub fn try_connect_unix(instance_name: &str) -> io::Result<UnixStream> {
89        let path = format!("\0rns/{}", instance_name);
90        UnixStream::connect(path)
91    }
92}
93
94// ==================== LOCAL SERVER ====================
95
96/// Start a local server (shared instance).
97/// Tries Unix abstract socket first on Linux, falls back to TCP.
98/// Spawns an acceptor thread. Each client gets a dynamically allocated InterfaceId.
99pub fn start_server(
100    config: LocalServerConfig,
101    tx: EventSender,
102    next_id: Arc<AtomicU64>,
103) -> io::Result<()> {
104    // Try Unix socket first on Linux
105    #[cfg(target_os = "linux")]
106    {
107        match unix_socket::try_bind_unix(&config.instance_name) {
108            Ok(listener) => {
109                log::info!(
110                    "Local server using Unix socket: rns/{}",
111                    config.instance_name
112                );
113                let name = format!("rns/{}", config.instance_name);
114                thread::Builder::new()
115                    .name("local-server".into())
116                    .spawn(move || {
117                        unix_server_loop(listener, name, tx, next_id);
118                    })?;
119                return Ok(());
120            }
121            Err(e) => {
122                log::info!(
123                    "Unix socket bind failed ({}), falling back to TCP",
124                    e
125                );
126            }
127        }
128    }
129
130    // Fallback: TCP on localhost
131    let addr = format!("127.0.0.1:{}", config.port);
132    let listener = TcpListener::bind(&addr)?;
133
134    log::info!("Local server listening on TCP {}", addr);
135
136    thread::Builder::new()
137        .name("local-server".into())
138        .spawn(move || {
139            tcp_server_loop(listener, tx, next_id);
140        })?;
141
142    Ok(())
143}
144
145/// TCP server accept loop for local interface.
146fn tcp_server_loop(listener: TcpListener, tx: EventSender, next_id: Arc<AtomicU64>) {
147    for stream_result in listener.incoming() {
148        let stream = match stream_result {
149            Ok(s) => s,
150            Err(e) => {
151                log::warn!("Local server accept failed: {}", e);
152                continue;
153            }
154        };
155
156        if let Err(e) = stream.set_nodelay(true) {
157            log::warn!("Local server set_nodelay failed: {}", e);
158        }
159
160        let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
161        spawn_local_client_handler(stream, client_id, tx.clone());
162    }
163}
164
165/// Unix socket server accept loop for local interface.
166#[cfg(target_os = "linux")]
167fn unix_server_loop(
168    listener: std::os::unix::net::UnixListener,
169    name: String,
170    tx: EventSender,
171    next_id: Arc<AtomicU64>,
172) {
173    for stream_result in listener.incoming() {
174        let stream = match stream_result {
175            Ok(s) => s,
176            Err(e) => {
177                log::warn!("[{}] Local server accept failed: {}", name, e);
178                continue;
179            }
180        };
181
182        let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
183
184        // Convert UnixStream to a pair of read/write handles
185        let writer_stream = match stream.try_clone() {
186            Ok(s) => s,
187            Err(e) => {
188                log::warn!("Local server clone failed: {}", e);
189                continue;
190            }
191        };
192
193        let info = make_local_interface_info(client_id);
194        let writer: Box<dyn Writer> = Box::new(UnixLocalWriter { stream: writer_stream });
195
196        if tx
197            .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
198            .is_err()
199        {
200            return;
201        }
202
203        let client_tx = tx.clone();
204        thread::Builder::new()
205            .name(format!("local-unix-reader-{}", client_id.0))
206            .spawn(move || {
207                unix_reader_loop(stream, client_id, client_tx);
208            })
209            .ok();
210    }
211}
212
213#[cfg(target_os = "linux")]
214struct UnixLocalWriter {
215    stream: std::os::unix::net::UnixStream,
216}
217
218#[cfg(target_os = "linux")]
219impl Writer for UnixLocalWriter {
220    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
221        use std::io::Write;
222        self.stream.write_all(&hdlc::frame(data))
223    }
224}
225
226#[cfg(target_os = "linux")]
227fn unix_reader_loop(
228    mut stream: std::os::unix::net::UnixStream,
229    id: InterfaceId,
230    tx: EventSender,
231) {
232    use std::io::Read;
233    let mut decoder = hdlc::Decoder::new();
234    let mut buf = [0u8; 4096];
235
236    loop {
237        match stream.read(&mut buf) {
238            Ok(0) => {
239                let _ = tx.send(Event::InterfaceDown(id));
240                return;
241            }
242            Ok(n) => {
243                for frame in decoder.feed(&buf[..n]) {
244                    if tx
245                        .send(Event::Frame {
246                            interface_id: id,
247                            data: frame,
248                        })
249                        .is_err()
250                    {
251                        return;
252                    }
253                }
254            }
255            Err(_) => {
256                let _ = tx.send(Event::InterfaceDown(id));
257                return;
258            }
259        }
260    }
261}
262
263/// Spawn handler threads for a connected TCP local client.
264fn spawn_local_client_handler(stream: TcpStream, client_id: InterfaceId, tx: EventSender) {
265    let writer_stream = match stream.try_clone() {
266        Ok(s) => s,
267        Err(e) => {
268            log::warn!("Local server clone failed: {}", e);
269            return;
270        }
271    };
272
273    let info = make_local_interface_info(client_id);
274    let writer: Box<dyn Writer> = Box::new(LocalWriter { stream: writer_stream });
275
276    if tx
277        .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
278        .is_err()
279    {
280        return;
281    }
282
283    thread::Builder::new()
284        .name(format!("local-reader-{}", client_id.0))
285        .spawn(move || {
286            tcp_reader_loop(stream, client_id, tx);
287        })
288        .ok();
289}
290
291fn tcp_reader_loop(mut stream: TcpStream, id: InterfaceId, tx: EventSender) {
292    let mut decoder = hdlc::Decoder::new();
293    let mut buf = [0u8; 4096];
294
295    loop {
296        match stream.read(&mut buf) {
297            Ok(0) => {
298                log::info!("Local client {} disconnected", id.0);
299                let _ = tx.send(Event::InterfaceDown(id));
300                return;
301            }
302            Ok(n) => {
303                for frame in decoder.feed(&buf[..n]) {
304                    if tx
305                        .send(Event::Frame {
306                            interface_id: id,
307                            data: frame,
308                        })
309                        .is_err()
310                    {
311                        return;
312                    }
313                }
314            }
315            Err(e) => {
316                log::warn!("Local client {} read error: {}", id.0, e);
317                let _ = tx.send(Event::InterfaceDown(id));
318                return;
319            }
320        }
321    }
322}
323
324fn make_local_interface_info(id: InterfaceId) -> InterfaceInfo {
325    InterfaceInfo {
326        id,
327        name: String::from("LocalInterface"),
328        mode: constants::MODE_FULL,
329        out_capable: true,
330        in_capable: true,
331        bitrate: Some(1_000_000_000), // 1 Gbps
332        announce_rate_target: None,
333        announce_rate_grace: 0,
334        announce_rate_penalty: 0.0,
335        announce_cap: constants::ANNOUNCE_CAP,
336        is_local_client: false,
337        wants_tunnel: false,
338        tunnel_id: None,
339    }
340}
341
342// ==================== LOCAL CLIENT ====================
343
344/// Start a local client (connect to shared instance).
345/// Tries Unix socket first on Linux, falls back to TCP.
346/// Returns the writer for the driver.
347pub fn start_client(config: LocalClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
348    let id = config.interface_id;
349
350    // Try Unix socket first on Linux
351    #[cfg(target_os = "linux")]
352    {
353        match unix_socket::try_connect_unix(&config.instance_name) {
354            Ok(stream) => {
355                log::info!(
356                    "[{}] Connected to shared instance via Unix socket: rns/{}",
357                    config.name,
358                    config.instance_name
359                );
360
361                let writer_stream = stream.try_clone()?;
362                let _ = tx.send(Event::InterfaceUp(id, None, None));
363
364                let client_tx = tx;
365                thread::Builder::new()
366                    .name(format!("local-client-reader-{}", id.0))
367                    .spawn(move || {
368                        unix_reader_loop(stream, id, client_tx);
369                    })?;
370
371                return Ok(Box::new(UnixLocalWriter { stream: writer_stream }));
372            }
373            Err(e) => {
374                log::info!(
375                    "[{}] Unix socket connect failed ({}), trying TCP",
376                    config.name,
377                    e
378                );
379            }
380        }
381    }
382
383    // Fallback: TCP
384    let addr = format!("127.0.0.1:{}", config.port);
385    let stream = TcpStream::connect(&addr)?;
386    stream.set_nodelay(true)?;
387
388    log::info!("[{}] Connected to shared instance via TCP {}", config.name, addr);
389
390    let reader_stream = stream.try_clone()?;
391    let writer_stream = stream.try_clone()?;
392
393    let _ = tx.send(Event::InterfaceUp(id, None, None));
394
395    thread::Builder::new()
396        .name(format!("local-client-reader-{}", id.0))
397        .spawn(move || {
398            tcp_reader_loop(reader_stream, id, tx);
399        })?;
400
401    Ok(Box::new(LocalWriter { stream: writer_stream }))
402}
403
404#[cfg(test)]
405mod tests {
406    use super::*;
407    use std::sync::mpsc;
408
409    fn find_free_port() -> u16 {
410        TcpListener::bind("127.0.0.1:0")
411            .unwrap()
412            .local_addr()
413            .unwrap()
414            .port()
415    }
416
417    #[test]
418    fn server_bind_tcp() {
419        let port = find_free_port();
420        let (tx, _rx) = mpsc::channel();
421        let next_id = Arc::new(AtomicU64::new(7000));
422
423        let config = LocalServerConfig {
424            instance_name: "test-bind".into(),
425            port,
426            interface_id: InterfaceId(70),
427        };
428
429        // We force TCP by using a unique instance name that won't conflict
430        // with any existing Unix socket
431        start_server(config, tx, next_id).unwrap();
432        thread::sleep(Duration::from_millis(50));
433
434        // Should be able to connect
435        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
436    }
437
438    #[test]
439    fn server_accept_client() {
440        let port = find_free_port();
441        let (tx, rx) = mpsc::channel();
442        let next_id = Arc::new(AtomicU64::new(7100));
443
444        let config = LocalServerConfig {
445            instance_name: "test-accept".into(),
446            port,
447            interface_id: InterfaceId(71),
448        };
449
450        start_server(config, tx, next_id).unwrap();
451        thread::sleep(Duration::from_millis(50));
452
453        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
454
455        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
456        match event {
457            Event::InterfaceUp(id, writer, info) => {
458                assert_eq!(id, InterfaceId(7100));
459                assert!(writer.is_some());
460                assert!(info.is_some());
461            }
462            other => panic!("expected InterfaceUp, got {:?}", other),
463        }
464    }
465
466    #[test]
467    fn client_send_receive() {
468        let port = find_free_port();
469        let (server_tx, server_rx) = mpsc::channel();
470        let next_id = Arc::new(AtomicU64::new(7200));
471
472        let server_config = LocalServerConfig {
473            instance_name: "test-sr".into(),
474            port,
475            interface_id: InterfaceId(72),
476        };
477
478        start_server(server_config, server_tx, next_id).unwrap();
479        thread::sleep(Duration::from_millis(50));
480
481        // Connect client
482        let (client_tx, client_rx) = mpsc::channel();
483        let client_config = LocalClientConfig {
484            name: "test-client".into(),
485            instance_name: "test-sr".into(),
486            port,
487            interface_id: InterfaceId(73),
488            reconnect_wait: Duration::from_secs(1),
489        };
490
491        let mut client_writer = start_client(client_config, client_tx).unwrap();
492
493        // Get server-side InterfaceUp
494        let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
495        let mut server_writer = match event {
496            Event::InterfaceUp(_, Some(w), _) => w,
497            other => panic!("expected InterfaceUp with writer, got {:?}", other),
498        };
499
500        // Get client-side InterfaceUp
501        let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
502        match event {
503            Event::InterfaceUp(id, _, _) => assert_eq!(id, InterfaceId(73)),
504            other => panic!("expected InterfaceUp, got {:?}", other),
505        }
506
507        // Client sends to server
508        let payload: Vec<u8> = (0..32).collect();
509        client_writer.send_frame(&payload).unwrap();
510
511        let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
512        match event {
513            Event::Frame { data, .. } => assert_eq!(data, payload),
514            other => panic!("expected Frame, got {:?}", other),
515        }
516
517        // Server sends to client
518        let payload2: Vec<u8> = (100..132).collect();
519        server_writer.send_frame(&payload2).unwrap();
520
521        let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
522        match event {
523            Event::Frame { data, .. } => assert_eq!(data, payload2),
524            other => panic!("expected Frame, got {:?}", other),
525        }
526    }
527
528    #[test]
529    fn multiple_local_clients() {
530        let port = find_free_port();
531        let (tx, rx) = mpsc::channel();
532        let next_id = Arc::new(AtomicU64::new(7300));
533
534        let config = LocalServerConfig {
535            instance_name: "test-multi".into(),
536            port,
537            interface_id: InterfaceId(74),
538        };
539
540        start_server(config, tx, next_id).unwrap();
541        thread::sleep(Duration::from_millis(50));
542
543        let _c1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
544        let _c2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
545
546        let mut ids = Vec::new();
547        for _ in 0..2 {
548            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
549            match event {
550                Event::InterfaceUp(id, _, _) => ids.push(id),
551                other => panic!("expected InterfaceUp, got {:?}", other),
552            }
553        }
554
555        assert_eq!(ids.len(), 2);
556        assert_ne!(ids[0], ids[1]);
557    }
558
559    #[test]
560    fn client_disconnect_detected() {
561        let port = find_free_port();
562        let (tx, rx) = mpsc::channel();
563        let next_id = Arc::new(AtomicU64::new(7400));
564
565        let config = LocalServerConfig {
566            instance_name: "test-dc".into(),
567            port,
568            interface_id: InterfaceId(75),
569        };
570
571        start_server(config, tx, next_id).unwrap();
572        thread::sleep(Duration::from_millis(50));
573
574        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
575
576        // Drain InterfaceUp
577        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
578
579        // Disconnect
580        drop(client);
581
582        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
583        assert!(
584            matches!(event, Event::InterfaceDown(_)),
585            "expected InterfaceDown, got {:?}",
586            event
587        );
588    }
589}