Skip to main content

rns_net/interface/
local.rs

1//! Local shared instance interface.
2//!
3//! Provides communication between the shared RNS instance and local client
4//! programs. Uses Unix abstract sockets on Linux, TCP on other platforms.
5//! HDLC framing over the connection (same as TCP interfaces).
6//!
7//! Two modes:
8//! - `LocalServer`: The shared instance binds and accepts client connections.
9//! - `LocalClient`: Connects to an existing shared instance.
10
11use std::io::{self, Read, Write};
12use std::net::{TcpListener, TcpStream};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use std::thread;
16use std::time::Duration;
17
18use rns_core::constants;
19use rns_core::transport::types::{InterfaceId, InterfaceInfo};
20
21use crate::event::{Event, EventSender};
22use crate::hdlc;
23use crate::interface::Writer;
24
25/// Configuration for a Local server (shared instance).
26#[derive(Debug, Clone)]
27pub struct LocalServerConfig {
28    pub instance_name: String,
29    pub port: u16,
30    pub interface_id: InterfaceId,
31}
32
33impl Default for LocalServerConfig {
34    fn default() -> Self {
35        LocalServerConfig {
36            instance_name: "default".into(),
37            port: 37428,
38            interface_id: InterfaceId(0),
39        }
40    }
41}
42
43/// Configuration for a Local client (connecting to shared instance).
44#[derive(Debug, Clone)]
45pub struct LocalClientConfig {
46    pub name: String,
47    pub instance_name: String,
48    pub port: u16,
49    pub interface_id: InterfaceId,
50    pub reconnect_wait: Duration,
51}
52
53impl Default for LocalClientConfig {
54    fn default() -> Self {
55        LocalClientConfig {
56            name: "Local shared instance".into(),
57            instance_name: "default".into(),
58            port: 37428,
59            interface_id: InterfaceId(0),
60            reconnect_wait: Duration::from_secs(8),
61        }
62    }
63}
64
65/// HDLC writer over a TCP or Unix stream.
66struct LocalWriter {
67    stream: TcpStream,
68}
69
70impl Writer for LocalWriter {
71    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
72        self.stream.write_all(&hdlc::frame(data))
73    }
74}
75
76#[cfg(target_os = "linux")]
77mod unix_socket {
78    use std::io;
79    use std::os::unix::net::{UnixListener, UnixStream};
80
81    /// Try to bind a Unix abstract socket with the given instance name.
82    pub fn try_bind_unix(instance_name: &str) -> io::Result<UnixListener> {
83        let path = format!("\0rns/{}", instance_name);
84        UnixListener::bind(path)
85    }
86
87    /// Try to connect to a Unix abstract socket.
88    pub fn try_connect_unix(instance_name: &str) -> io::Result<UnixStream> {
89        let path = format!("\0rns/{}", instance_name);
90        UnixStream::connect(path)
91    }
92}
93
94// ==================== LOCAL SERVER ====================
95
96/// Start a local server (shared instance).
97/// Tries Unix abstract socket first on Linux, falls back to TCP.
98/// Spawns an acceptor thread. Each client gets a dynamically allocated InterfaceId.
99pub fn start_server(
100    config: LocalServerConfig,
101    tx: EventSender,
102    next_id: Arc<AtomicU64>,
103) -> io::Result<()> {
104    // Try Unix socket first on Linux
105    #[cfg(target_os = "linux")]
106    {
107        match unix_socket::try_bind_unix(&config.instance_name) {
108            Ok(listener) => {
109                log::info!(
110                    "Local server using Unix socket: rns/{}",
111                    config.instance_name
112                );
113                let name = format!("rns/{}", config.instance_name);
114                thread::Builder::new()
115                    .name("local-server".into())
116                    .spawn(move || {
117                        unix_server_loop(listener, name, tx, next_id);
118                    })?;
119                return Ok(());
120            }
121            Err(e) => {
122                log::info!(
123                    "Unix socket bind failed ({}), falling back to TCP",
124                    e
125                );
126            }
127        }
128    }
129
130    // Fallback: TCP on localhost
131    let addr = format!("127.0.0.1:{}", config.port);
132    let listener = TcpListener::bind(&addr)?;
133
134    log::info!("Local server listening on TCP {}", addr);
135
136    thread::Builder::new()
137        .name("local-server".into())
138        .spawn(move || {
139            tcp_server_loop(listener, tx, next_id);
140        })?;
141
142    Ok(())
143}
144
145/// TCP server accept loop for local interface.
146fn tcp_server_loop(listener: TcpListener, tx: EventSender, next_id: Arc<AtomicU64>) {
147    for stream_result in listener.incoming() {
148        let stream = match stream_result {
149            Ok(s) => s,
150            Err(e) => {
151                log::warn!("Local server accept failed: {}", e);
152                continue;
153            }
154        };
155
156        if let Err(e) = stream.set_nodelay(true) {
157            log::warn!("Local server set_nodelay failed: {}", e);
158        }
159
160        let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
161        spawn_local_client_handler(stream, client_id, tx.clone());
162    }
163}
164
165/// Unix socket server accept loop for local interface.
166#[cfg(target_os = "linux")]
167fn unix_server_loop(
168    listener: std::os::unix::net::UnixListener,
169    name: String,
170    tx: EventSender,
171    next_id: Arc<AtomicU64>,
172) {
173    for stream_result in listener.incoming() {
174        let stream = match stream_result {
175            Ok(s) => s,
176            Err(e) => {
177                log::warn!("[{}] Local server accept failed: {}", name, e);
178                continue;
179            }
180        };
181
182        let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
183
184        // Convert UnixStream to a pair of read/write handles
185        let writer_stream = match stream.try_clone() {
186            Ok(s) => s,
187            Err(e) => {
188                log::warn!("Local server clone failed: {}", e);
189                continue;
190            }
191        };
192
193        let info = make_local_interface_info(client_id);
194        let writer: Box<dyn Writer> = Box::new(UnixLocalWriter { stream: writer_stream });
195
196        if tx
197            .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
198            .is_err()
199        {
200            return;
201        }
202
203        let client_tx = tx.clone();
204        thread::Builder::new()
205            .name(format!("local-unix-reader-{}", client_id.0))
206            .spawn(move || {
207                unix_reader_loop(stream, client_id, client_tx);
208            })
209            .ok();
210    }
211}
212
213#[cfg(target_os = "linux")]
214struct UnixLocalWriter {
215    stream: std::os::unix::net::UnixStream,
216}
217
218#[cfg(target_os = "linux")]
219impl Writer for UnixLocalWriter {
220    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
221        use std::io::Write;
222        self.stream.write_all(&hdlc::frame(data))
223    }
224}
225
226#[cfg(target_os = "linux")]
227fn unix_reader_loop(
228    mut stream: std::os::unix::net::UnixStream,
229    id: InterfaceId,
230    tx: EventSender,
231) {
232    use std::io::Read;
233    let mut decoder = hdlc::Decoder::new();
234    let mut buf = [0u8; 4096];
235
236    loop {
237        match stream.read(&mut buf) {
238            Ok(0) => {
239                let _ = tx.send(Event::InterfaceDown(id));
240                return;
241            }
242            Ok(n) => {
243                for frame in decoder.feed(&buf[..n]) {
244                    if tx
245                        .send(Event::Frame {
246                            interface_id: id,
247                            data: frame,
248                        })
249                        .is_err()
250                    {
251                        return;
252                    }
253                }
254            }
255            Err(_) => {
256                let _ = tx.send(Event::InterfaceDown(id));
257                return;
258            }
259        }
260    }
261}
262
263/// Spawn handler threads for a connected TCP local client.
264fn spawn_local_client_handler(stream: TcpStream, client_id: InterfaceId, tx: EventSender) {
265    let writer_stream = match stream.try_clone() {
266        Ok(s) => s,
267        Err(e) => {
268            log::warn!("Local server clone failed: {}", e);
269            return;
270        }
271    };
272
273    let info = make_local_interface_info(client_id);
274    let writer: Box<dyn Writer> = Box::new(LocalWriter { stream: writer_stream });
275
276    if tx
277        .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
278        .is_err()
279    {
280        return;
281    }
282
283    thread::Builder::new()
284        .name(format!("local-reader-{}", client_id.0))
285        .spawn(move || {
286            tcp_reader_loop(stream, client_id, tx);
287        })
288        .ok();
289}
290
291fn tcp_reader_loop(mut stream: TcpStream, id: InterfaceId, tx: EventSender) {
292    let mut decoder = hdlc::Decoder::new();
293    let mut buf = [0u8; 4096];
294
295    loop {
296        match stream.read(&mut buf) {
297            Ok(0) => {
298                log::info!("Local client {} disconnected", id.0);
299                let _ = tx.send(Event::InterfaceDown(id));
300                return;
301            }
302            Ok(n) => {
303                for frame in decoder.feed(&buf[..n]) {
304                    if tx
305                        .send(Event::Frame {
306                            interface_id: id,
307                            data: frame,
308                        })
309                        .is_err()
310                    {
311                        return;
312                    }
313                }
314            }
315            Err(e) => {
316                log::warn!("Local client {} read error: {}", id.0, e);
317                let _ = tx.send(Event::InterfaceDown(id));
318                return;
319            }
320        }
321    }
322}
323
324fn make_local_interface_info(id: InterfaceId) -> InterfaceInfo {
325    InterfaceInfo {
326        id,
327        name: String::from("LocalInterface"),
328        mode: constants::MODE_FULL,
329        out_capable: true,
330        in_capable: true,
331        bitrate: Some(1_000_000_000), // 1 Gbps
332        announce_rate_target: None,
333        announce_rate_grace: 0,
334        announce_rate_penalty: 0.0,
335        announce_cap: constants::ANNOUNCE_CAP,
336        is_local_client: false,
337        wants_tunnel: false,
338        tunnel_id: None,
339        mtu: 65535,
340        ia_freq: 0.0,
341        started: 0.0,
342        ingress_control: false,
343    }
344}
345
346// ==================== LOCAL CLIENT ====================
347
348/// Start a local client (connect to shared instance).
349/// Tries Unix socket first on Linux, falls back to TCP.
350/// Returns the writer for the driver.
351pub fn start_client(config: LocalClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
352    let id = config.interface_id;
353
354    // Try Unix socket first on Linux
355    #[cfg(target_os = "linux")]
356    {
357        match unix_socket::try_connect_unix(&config.instance_name) {
358            Ok(stream) => {
359                log::info!(
360                    "[{}] Connected to shared instance via Unix socket: rns/{}",
361                    config.name,
362                    config.instance_name
363                );
364
365                let writer_stream = stream.try_clone()?;
366                let _ = tx.send(Event::InterfaceUp(id, None, None));
367
368                let client_tx = tx;
369                thread::Builder::new()
370                    .name(format!("local-client-reader-{}", id.0))
371                    .spawn(move || {
372                        unix_reader_loop(stream, id, client_tx);
373                    })?;
374
375                return Ok(Box::new(UnixLocalWriter { stream: writer_stream }));
376            }
377            Err(e) => {
378                log::info!(
379                    "[{}] Unix socket connect failed ({}), trying TCP",
380                    config.name,
381                    e
382                );
383            }
384        }
385    }
386
387    // Fallback: TCP
388    let addr = format!("127.0.0.1:{}", config.port);
389    let stream = TcpStream::connect(&addr)?;
390    stream.set_nodelay(true)?;
391
392    log::info!("[{}] Connected to shared instance via TCP {}", config.name, addr);
393
394    let reader_stream = stream.try_clone()?;
395    let writer_stream = stream.try_clone()?;
396
397    let _ = tx.send(Event::InterfaceUp(id, None, None));
398
399    thread::Builder::new()
400        .name(format!("local-client-reader-{}", id.0))
401        .spawn(move || {
402            tcp_reader_loop(reader_stream, id, tx);
403        })?;
404
405    Ok(Box::new(LocalWriter { stream: writer_stream }))
406}
407
408// --- Factory implementations ---
409
410use std::collections::HashMap;
411use super::{InterfaceFactory, InterfaceConfigData, StartContext, StartResult};
412
413/// Factory for `LocalServerInterface`.
414pub struct LocalServerFactory;
415
416impl InterfaceFactory for LocalServerFactory {
417    fn type_name(&self) -> &str { "LocalServerInterface" }
418
419    fn parse_config(
420        &self,
421        _name: &str,
422        id: InterfaceId,
423        params: &HashMap<String, String>,
424    ) -> Result<Box<dyn InterfaceConfigData>, String> {
425        let instance_name = params.get("instance_name")
426            .cloned()
427            .unwrap_or_else(|| "default".into());
428        let port = params.get("port")
429            .and_then(|v| v.parse().ok())
430            .unwrap_or(37428);
431
432        Ok(Box::new(LocalServerConfig {
433            instance_name,
434            port,
435            interface_id: id,
436        }))
437    }
438
439    fn start(
440        &self,
441        config: Box<dyn InterfaceConfigData>,
442        ctx: StartContext,
443    ) -> std::io::Result<StartResult> {
444        let server_config = *config.into_any().downcast::<LocalServerConfig>()
445            .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type"))?;
446
447        start_server(server_config, ctx.tx, ctx.next_dynamic_id)?;
448        Ok(StartResult::Listener)
449    }
450}
451
452/// Factory for `LocalClientInterface`.
453pub struct LocalClientFactory;
454
455impl InterfaceFactory for LocalClientFactory {
456    fn type_name(&self) -> &str { "LocalClientInterface" }
457
458    fn parse_config(
459        &self,
460        _name: &str,
461        id: InterfaceId,
462        params: &HashMap<String, String>,
463    ) -> Result<Box<dyn InterfaceConfigData>, String> {
464        let instance_name = params.get("instance_name")
465            .cloned()
466            .unwrap_or_else(|| "default".into());
467        let port = params.get("port")
468            .and_then(|v| v.parse().ok())
469            .unwrap_or(37428);
470
471        Ok(Box::new(LocalClientConfig {
472            instance_name,
473            port,
474            interface_id: id,
475            ..LocalClientConfig::default()
476        }))
477    }
478
479    fn start(
480        &self,
481        config: Box<dyn InterfaceConfigData>,
482        ctx: StartContext,
483    ) -> std::io::Result<StartResult> {
484        let client_config = *config.into_any().downcast::<LocalClientConfig>()
485            .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type"))?;
486
487        let id = client_config.interface_id;
488        let name = client_config.name.clone();
489        let info = InterfaceInfo {
490            id,
491            name,
492            mode: ctx.mode,
493            out_capable: true,
494            in_capable: true,
495            bitrate: Some(1_000_000_000),
496            announce_rate_target: None,
497            announce_rate_grace: 0,
498            announce_rate_penalty: 0.0,
499            announce_cap: rns_core::constants::ANNOUNCE_CAP,
500            is_local_client: false,
501            wants_tunnel: false,
502            tunnel_id: None,
503            mtu: 65535,
504            ingress_control: false,
505            ia_freq: 0.0,
506            started: crate::time::now(),
507        };
508
509        let writer = start_client(client_config, ctx.tx)?;
510
511        Ok(StartResult::Simple {
512            id,
513            info,
514            writer,
515            interface_type_name: "LocalInterface".to_string(),
516        })
517    }
518}
519
520#[cfg(test)]
521mod tests {
522    use super::*;
523    use std::sync::mpsc;
524
525    fn find_free_port() -> u16 {
526        TcpListener::bind("127.0.0.1:0")
527            .unwrap()
528            .local_addr()
529            .unwrap()
530            .port()
531    }
532
533    #[test]
534    fn server_bind_tcp() {
535        let port = find_free_port();
536        let (tx, _rx) = mpsc::channel();
537        let next_id = Arc::new(AtomicU64::new(7000));
538
539        let config = LocalServerConfig {
540            instance_name: "test-bind".into(),
541            port,
542            interface_id: InterfaceId(70),
543        };
544
545        // We force TCP by using a unique instance name that won't conflict
546        // with any existing Unix socket
547        start_server(config, tx, next_id).unwrap();
548        thread::sleep(Duration::from_millis(50));
549
550        // Should be able to connect
551        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
552    }
553
554    #[test]
555    fn server_accept_client() {
556        let port = find_free_port();
557        let (tx, rx) = mpsc::channel();
558        let next_id = Arc::new(AtomicU64::new(7100));
559
560        let config = LocalServerConfig {
561            instance_name: "test-accept".into(),
562            port,
563            interface_id: InterfaceId(71),
564        };
565
566        start_server(config, tx, next_id).unwrap();
567        thread::sleep(Duration::from_millis(50));
568
569        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
570
571        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
572        match event {
573            Event::InterfaceUp(id, writer, info) => {
574                assert_eq!(id, InterfaceId(7100));
575                assert!(writer.is_some());
576                assert!(info.is_some());
577            }
578            other => panic!("expected InterfaceUp, got {:?}", other),
579        }
580    }
581
582    #[test]
583    fn client_send_receive() {
584        let port = find_free_port();
585        let (server_tx, server_rx) = mpsc::channel();
586        let next_id = Arc::new(AtomicU64::new(7200));
587
588        let server_config = LocalServerConfig {
589            instance_name: "test-sr".into(),
590            port,
591            interface_id: InterfaceId(72),
592        };
593
594        start_server(server_config, server_tx, next_id).unwrap();
595        thread::sleep(Duration::from_millis(50));
596
597        // Connect client
598        let (client_tx, client_rx) = mpsc::channel();
599        let client_config = LocalClientConfig {
600            name: "test-client".into(),
601            instance_name: "test-sr".into(),
602            port,
603            interface_id: InterfaceId(73),
604            reconnect_wait: Duration::from_secs(1),
605        };
606
607        let mut client_writer = start_client(client_config, client_tx).unwrap();
608
609        // Get server-side InterfaceUp
610        let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
611        let mut server_writer = match event {
612            Event::InterfaceUp(_, Some(w), _) => w,
613            other => panic!("expected InterfaceUp with writer, got {:?}", other),
614        };
615
616        // Get client-side InterfaceUp
617        let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
618        match event {
619            Event::InterfaceUp(id, _, _) => assert_eq!(id, InterfaceId(73)),
620            other => panic!("expected InterfaceUp, got {:?}", other),
621        }
622
623        // Client sends to server
624        let payload: Vec<u8> = (0..32).collect();
625        client_writer.send_frame(&payload).unwrap();
626
627        let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
628        match event {
629            Event::Frame { data, .. } => assert_eq!(data, payload),
630            other => panic!("expected Frame, got {:?}", other),
631        }
632
633        // Server sends to client
634        let payload2: Vec<u8> = (100..132).collect();
635        server_writer.send_frame(&payload2).unwrap();
636
637        let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
638        match event {
639            Event::Frame { data, .. } => assert_eq!(data, payload2),
640            other => panic!("expected Frame, got {:?}", other),
641        }
642    }
643
644    #[test]
645    fn multiple_local_clients() {
646        let port = find_free_port();
647        let (tx, rx) = mpsc::channel();
648        let next_id = Arc::new(AtomicU64::new(7300));
649
650        let config = LocalServerConfig {
651            instance_name: "test-multi".into(),
652            port,
653            interface_id: InterfaceId(74),
654        };
655
656        start_server(config, tx, next_id).unwrap();
657        thread::sleep(Duration::from_millis(50));
658
659        let _c1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
660        let _c2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
661
662        let mut ids = Vec::new();
663        for _ in 0..2 {
664            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
665            match event {
666                Event::InterfaceUp(id, _, _) => ids.push(id),
667                other => panic!("expected InterfaceUp, got {:?}", other),
668            }
669        }
670
671        assert_eq!(ids.len(), 2);
672        assert_ne!(ids[0], ids[1]);
673    }
674
675    #[test]
676    fn client_disconnect_detected() {
677        let port = find_free_port();
678        let (tx, rx) = mpsc::channel();
679        let next_id = Arc::new(AtomicU64::new(7400));
680
681        let config = LocalServerConfig {
682            instance_name: "test-dc".into(),
683            port,
684            interface_id: InterfaceId(75),
685        };
686
687        start_server(config, tx, next_id).unwrap();
688        thread::sleep(Duration::from_millis(50));
689
690        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
691
692        // Drain InterfaceUp
693        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
694
695        // Disconnect
696        drop(client);
697
698        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
699        assert!(
700            matches!(event, Event::InterfaceDown(_)),
701            "expected InterfaceDown, got {:?}",
702            event
703        );
704    }
705}