Skip to main content

rns_net/interface/
local.rs

1//! Local shared instance interface.
2//!
3//! Provides communication between the shared RNS instance and local client
4//! programs. Uses Unix abstract sockets on Linux, TCP on other platforms.
5//! HDLC framing over the connection (same as TCP interfaces).
6//!
7//! Two modes:
8//! - `LocalServer`: The shared instance binds and accepts client connections.
9//! - `LocalClient`: Connects to an existing shared instance.
10
11use std::io::{self, Read, Write};
12use std::net::{TcpListener, TcpStream};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use std::thread;
16use std::time::Duration;
17
18use rns_core::constants;
19use rns_core::transport::types::{InterfaceId, InterfaceInfo};
20
21use crate::event::{Event, EventSender};
22use crate::hdlc;
23use crate::interface::Writer;
24
25/// Configuration for a Local server (shared instance).
26#[derive(Debug, Clone)]
27pub struct LocalServerConfig {
28    pub instance_name: String,
29    pub port: u16,
30    pub interface_id: InterfaceId,
31}
32
33impl Default for LocalServerConfig {
34    fn default() -> Self {
35        LocalServerConfig {
36            instance_name: "default".into(),
37            port: 37428,
38            interface_id: InterfaceId(0),
39        }
40    }
41}
42
43/// Configuration for a Local client (connecting to shared instance).
44#[derive(Debug, Clone)]
45pub struct LocalClientConfig {
46    pub name: String,
47    pub instance_name: String,
48    pub port: u16,
49    pub interface_id: InterfaceId,
50    pub reconnect_wait: Duration,
51}
52
53impl Default for LocalClientConfig {
54    fn default() -> Self {
55        LocalClientConfig {
56            name: "Local shared instance".into(),
57            instance_name: "default".into(),
58            port: 37428,
59            interface_id: InterfaceId(0),
60            reconnect_wait: Duration::from_secs(8),
61        }
62    }
63}
64
65/// HDLC writer over a TCP or Unix stream.
66struct LocalWriter {
67    stream: TcpStream,
68}
69
70impl Writer for LocalWriter {
71    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
72        self.stream.write_all(&hdlc::frame(data))
73    }
74}
75
76#[cfg(target_os = "linux")]
77mod unix_socket {
78    use std::io;
79    use std::os::unix::net::{UnixListener, UnixStream};
80
81    /// Try to bind a Unix abstract socket with the given instance name.
82    pub fn try_bind_unix(instance_name: &str) -> io::Result<UnixListener> {
83        let path = format!("\0rns/{}", instance_name);
84        UnixListener::bind(path)
85    }
86
87    /// Try to connect to a Unix abstract socket.
88    pub fn try_connect_unix(instance_name: &str) -> io::Result<UnixStream> {
89        let path = format!("\0rns/{}", instance_name);
90        UnixStream::connect(path)
91    }
92}
93
94// ==================== LOCAL SERVER ====================
95
96/// Start a local server (shared instance).
97/// Tries Unix abstract socket first on Linux, falls back to TCP.
98/// Spawns an acceptor thread. Each client gets a dynamically allocated InterfaceId.
99pub fn start_server(
100    config: LocalServerConfig,
101    tx: EventSender,
102    next_id: Arc<AtomicU64>,
103) -> io::Result<()> {
104    // Try Unix socket first on Linux
105    #[cfg(target_os = "linux")]
106    {
107        match unix_socket::try_bind_unix(&config.instance_name) {
108            Ok(listener) => {
109                log::info!(
110                    "Local server using Unix socket: rns/{}",
111                    config.instance_name
112                );
113                let name = format!("rns/{}", config.instance_name);
114                thread::Builder::new()
115                    .name("local-server".into())
116                    .spawn(move || {
117                        unix_server_loop(listener, name, tx, next_id);
118                    })?;
119                return Ok(());
120            }
121            Err(e) => {
122                log::info!("Unix socket bind failed ({}), falling back to TCP", e);
123            }
124        }
125    }
126
127    // Fallback: TCP on localhost
128    let addr = format!("127.0.0.1:{}", config.port);
129    let listener = TcpListener::bind(&addr)?;
130
131    log::info!("Local server listening on TCP {}", addr);
132
133    thread::Builder::new()
134        .name("local-server".into())
135        .spawn(move || {
136            tcp_server_loop(listener, tx, next_id);
137        })?;
138
139    Ok(())
140}
141
142/// TCP server accept loop for local interface.
143fn tcp_server_loop(listener: TcpListener, tx: EventSender, next_id: Arc<AtomicU64>) {
144    for stream_result in listener.incoming() {
145        let stream = match stream_result {
146            Ok(s) => s,
147            Err(e) => {
148                log::warn!("Local server accept failed: {}", e);
149                continue;
150            }
151        };
152
153        if let Err(e) = stream.set_nodelay(true) {
154            log::warn!("Local server set_nodelay failed: {}", e);
155        }
156
157        let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
158        spawn_local_client_handler(stream, client_id, tx.clone());
159    }
160}
161
162/// Unix socket server accept loop for local interface.
163#[cfg(target_os = "linux")]
164fn unix_server_loop(
165    listener: std::os::unix::net::UnixListener,
166    name: String,
167    tx: EventSender,
168    next_id: Arc<AtomicU64>,
169) {
170    for stream_result in listener.incoming() {
171        let stream = match stream_result {
172            Ok(s) => s,
173            Err(e) => {
174                log::warn!("[{}] Local server accept failed: {}", name, e);
175                continue;
176            }
177        };
178
179        let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
180
181        // Convert UnixStream to a pair of read/write handles
182        let writer_stream = match stream.try_clone() {
183            Ok(s) => s,
184            Err(e) => {
185                log::warn!("Local server clone failed: {}", e);
186                continue;
187            }
188        };
189
190        let info = make_local_interface_info(client_id);
191        let writer: Box<dyn Writer> = Box::new(UnixLocalWriter {
192            stream: writer_stream,
193        });
194
195        if tx
196            .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
197            .is_err()
198        {
199            return;
200        }
201
202        let client_tx = tx.clone();
203        thread::Builder::new()
204            .name(format!("local-unix-reader-{}", client_id.0))
205            .spawn(move || {
206                unix_reader_loop(stream, client_id, client_tx);
207            })
208            .ok();
209    }
210}
211
212#[cfg(target_os = "linux")]
213struct UnixLocalWriter {
214    stream: std::os::unix::net::UnixStream,
215}
216
217#[cfg(target_os = "linux")]
218impl Writer for UnixLocalWriter {
219    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
220        use std::io::Write;
221        self.stream.write_all(&hdlc::frame(data))
222    }
223}
224
225#[cfg(target_os = "linux")]
226fn unix_reader_loop(mut stream: std::os::unix::net::UnixStream, id: InterfaceId, tx: EventSender) {
227    use std::io::Read;
228    let mut decoder = hdlc::Decoder::new();
229    let mut buf = [0u8; 4096];
230
231    loop {
232        match stream.read(&mut buf) {
233            Ok(0) => {
234                let _ = tx.send(Event::InterfaceDown(id));
235                return;
236            }
237            Ok(n) => {
238                for frame in decoder.feed(&buf[..n]) {
239                    if tx
240                        .send(Event::Frame {
241                            interface_id: id,
242                            data: frame,
243                        })
244                        .is_err()
245                    {
246                        return;
247                    }
248                }
249            }
250            Err(_) => {
251                let _ = tx.send(Event::InterfaceDown(id));
252                return;
253            }
254        }
255    }
256}
257
258/// Spawn handler threads for a connected TCP local client.
259fn spawn_local_client_handler(stream: TcpStream, client_id: InterfaceId, tx: EventSender) {
260    let writer_stream = match stream.try_clone() {
261        Ok(s) => s,
262        Err(e) => {
263            log::warn!("Local server clone failed: {}", e);
264            return;
265        }
266    };
267
268    let info = make_local_interface_info(client_id);
269    let writer: Box<dyn Writer> = Box::new(LocalWriter {
270        stream: writer_stream,
271    });
272
273    if tx
274        .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
275        .is_err()
276    {
277        return;
278    }
279
280    thread::Builder::new()
281        .name(format!("local-reader-{}", client_id.0))
282        .spawn(move || {
283            tcp_reader_loop(stream, client_id, tx);
284        })
285        .ok();
286}
287
288fn tcp_reader_loop(mut stream: TcpStream, id: InterfaceId, tx: EventSender) {
289    let mut decoder = hdlc::Decoder::new();
290    let mut buf = [0u8; 4096];
291
292    loop {
293        match stream.read(&mut buf) {
294            Ok(0) => {
295                log::info!("Local client {} disconnected", id.0);
296                let _ = tx.send(Event::InterfaceDown(id));
297                return;
298            }
299            Ok(n) => {
300                for frame in decoder.feed(&buf[..n]) {
301                    if tx
302                        .send(Event::Frame {
303                            interface_id: id,
304                            data: frame,
305                        })
306                        .is_err()
307                    {
308                        return;
309                    }
310                }
311            }
312            Err(e) => {
313                log::warn!("Local client {} read error: {}", id.0, e);
314                let _ = tx.send(Event::InterfaceDown(id));
315                return;
316            }
317        }
318    }
319}
320
321fn make_local_interface_info(id: InterfaceId) -> InterfaceInfo {
322    InterfaceInfo {
323        id,
324        name: String::from("LocalInterface"),
325        mode: constants::MODE_FULL,
326        out_capable: true,
327        in_capable: true,
328        bitrate: Some(1_000_000_000), // 1 Gbps
329        announce_rate_target: None,
330        announce_rate_grace: 0,
331        announce_rate_penalty: 0.0,
332        announce_cap: constants::ANNOUNCE_CAP,
333        is_local_client: false,
334        wants_tunnel: false,
335        tunnel_id: None,
336        mtu: 65535,
337        ia_freq: 0.0,
338        started: 0.0,
339        ingress_control: false,
340    }
341}
342
343// ==================== LOCAL CLIENT ====================
344
345/// Start a local client (connect to shared instance).
346/// Tries Unix socket first on Linux, falls back to TCP.
347/// Returns the writer for the driver.
348pub fn start_client(config: LocalClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
349    let id = config.interface_id;
350
351    // Try Unix socket first on Linux
352    #[cfg(target_os = "linux")]
353    {
354        match unix_socket::try_connect_unix(&config.instance_name) {
355            Ok(stream) => {
356                log::info!(
357                    "[{}] Connected to shared instance via Unix socket: rns/{}",
358                    config.name,
359                    config.instance_name
360                );
361
362                let writer_stream = stream.try_clone()?;
363                let _ = tx.send(Event::InterfaceUp(id, None, None));
364
365                let client_tx = tx;
366                thread::Builder::new()
367                    .name(format!("local-client-reader-{}", id.0))
368                    .spawn(move || {
369                        unix_reader_loop(stream, id, client_tx);
370                    })?;
371
372                return Ok(Box::new(UnixLocalWriter {
373                    stream: writer_stream,
374                }));
375            }
376            Err(e) => {
377                log::info!(
378                    "[{}] Unix socket connect failed ({}), trying TCP",
379                    config.name,
380                    e
381                );
382            }
383        }
384    }
385
386    // Fallback: TCP
387    let addr = format!("127.0.0.1:{}", config.port);
388    let stream = TcpStream::connect(&addr)?;
389    stream.set_nodelay(true)?;
390
391    log::info!(
392        "[{}] Connected to shared instance via TCP {}",
393        config.name,
394        addr
395    );
396
397    let reader_stream = stream.try_clone()?;
398    let writer_stream = stream.try_clone()?;
399
400    let _ = tx.send(Event::InterfaceUp(id, None, None));
401
402    thread::Builder::new()
403        .name(format!("local-client-reader-{}", id.0))
404        .spawn(move || {
405            tcp_reader_loop(reader_stream, id, tx);
406        })?;
407
408    Ok(Box::new(LocalWriter {
409        stream: writer_stream,
410    }))
411}
412
413// --- Factory implementations ---
414
415use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
416use std::collections::HashMap;
417
418/// Factory for `LocalServerInterface`.
419pub struct LocalServerFactory;
420
421impl InterfaceFactory for LocalServerFactory {
422    fn type_name(&self) -> &str {
423        "LocalServerInterface"
424    }
425
426    fn parse_config(
427        &self,
428        _name: &str,
429        id: InterfaceId,
430        params: &HashMap<String, String>,
431    ) -> Result<Box<dyn InterfaceConfigData>, String> {
432        let instance_name = params
433            .get("instance_name")
434            .cloned()
435            .unwrap_or_else(|| "default".into());
436        let port = params
437            .get("port")
438            .and_then(|v| v.parse().ok())
439            .unwrap_or(37428);
440
441        Ok(Box::new(LocalServerConfig {
442            instance_name,
443            port,
444            interface_id: id,
445        }))
446    }
447
448    fn start(
449        &self,
450        config: Box<dyn InterfaceConfigData>,
451        ctx: StartContext,
452    ) -> std::io::Result<StartResult> {
453        let server_config = *config
454            .into_any()
455            .downcast::<LocalServerConfig>()
456            .map_err(|_| {
457                std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
458            })?;
459
460        start_server(server_config, ctx.tx, ctx.next_dynamic_id)?;
461        Ok(StartResult::Listener)
462    }
463}
464
465/// Factory for `LocalClientInterface`.
466pub struct LocalClientFactory;
467
468impl InterfaceFactory for LocalClientFactory {
469    fn type_name(&self) -> &str {
470        "LocalClientInterface"
471    }
472
473    fn parse_config(
474        &self,
475        _name: &str,
476        id: InterfaceId,
477        params: &HashMap<String, String>,
478    ) -> Result<Box<dyn InterfaceConfigData>, String> {
479        let instance_name = params
480            .get("instance_name")
481            .cloned()
482            .unwrap_or_else(|| "default".into());
483        let port = params
484            .get("port")
485            .and_then(|v| v.parse().ok())
486            .unwrap_or(37428);
487
488        Ok(Box::new(LocalClientConfig {
489            instance_name,
490            port,
491            interface_id: id,
492            ..LocalClientConfig::default()
493        }))
494    }
495
496    fn start(
497        &self,
498        config: Box<dyn InterfaceConfigData>,
499        ctx: StartContext,
500    ) -> std::io::Result<StartResult> {
501        let client_config = *config
502            .into_any()
503            .downcast::<LocalClientConfig>()
504            .map_err(|_| {
505                std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
506            })?;
507
508        let id = client_config.interface_id;
509        let name = client_config.name.clone();
510        let info = InterfaceInfo {
511            id,
512            name,
513            mode: ctx.mode,
514            out_capable: true,
515            in_capable: true,
516            bitrate: Some(1_000_000_000),
517            announce_rate_target: None,
518            announce_rate_grace: 0,
519            announce_rate_penalty: 0.0,
520            announce_cap: rns_core::constants::ANNOUNCE_CAP,
521            is_local_client: false,
522            wants_tunnel: false,
523            tunnel_id: None,
524            mtu: 65535,
525            ingress_control: false,
526            ia_freq: 0.0,
527            started: crate::time::now(),
528        };
529
530        let writer = start_client(client_config, ctx.tx)?;
531
532        Ok(StartResult::Simple {
533            id,
534            info,
535            writer,
536            interface_type_name: "LocalInterface".to_string(),
537        })
538    }
539}
540
541#[cfg(test)]
542mod tests {
543    use super::*;
544    use std::sync::mpsc;
545
546    fn find_free_port() -> u16 {
547        TcpListener::bind("127.0.0.1:0")
548            .unwrap()
549            .local_addr()
550            .unwrap()
551            .port()
552    }
553
554    #[test]
555    fn server_bind_tcp() {
556        let port = find_free_port();
557        let (tx, _rx) = mpsc::channel();
558        let next_id = Arc::new(AtomicU64::new(7000));
559
560        let config = LocalServerConfig {
561            instance_name: "test-bind".into(),
562            port,
563            interface_id: InterfaceId(70),
564        };
565
566        // We force TCP by using a unique instance name that won't conflict
567        // with any existing Unix socket
568        start_server(config, tx, next_id).unwrap();
569        thread::sleep(Duration::from_millis(50));
570
571        // Should be able to connect
572        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
573    }
574
575    #[test]
576    fn server_accept_client() {
577        let port = find_free_port();
578        let (tx, rx) = mpsc::channel();
579        let next_id = Arc::new(AtomicU64::new(7100));
580
581        let config = LocalServerConfig {
582            instance_name: "test-accept".into(),
583            port,
584            interface_id: InterfaceId(71),
585        };
586
587        start_server(config, tx, next_id).unwrap();
588        thread::sleep(Duration::from_millis(50));
589
590        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
591
592        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
593        match event {
594            Event::InterfaceUp(id, writer, info) => {
595                assert_eq!(id, InterfaceId(7100));
596                assert!(writer.is_some());
597                assert!(info.is_some());
598            }
599            other => panic!("expected InterfaceUp, got {:?}", other),
600        }
601    }
602
603    #[test]
604    fn client_send_receive() {
605        let port = find_free_port();
606        let (server_tx, server_rx) = mpsc::channel();
607        let next_id = Arc::new(AtomicU64::new(7200));
608
609        let server_config = LocalServerConfig {
610            instance_name: "test-sr".into(),
611            port,
612            interface_id: InterfaceId(72),
613        };
614
615        start_server(server_config, server_tx, next_id).unwrap();
616        thread::sleep(Duration::from_millis(50));
617
618        // Connect client
619        let (client_tx, client_rx) = mpsc::channel();
620        let client_config = LocalClientConfig {
621            name: "test-client".into(),
622            instance_name: "test-sr".into(),
623            port,
624            interface_id: InterfaceId(73),
625            reconnect_wait: Duration::from_secs(1),
626        };
627
628        let mut client_writer = start_client(client_config, client_tx).unwrap();
629
630        // Get server-side InterfaceUp
631        let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
632        let mut server_writer = match event {
633            Event::InterfaceUp(_, Some(w), _) => w,
634            other => panic!("expected InterfaceUp with writer, got {:?}", other),
635        };
636
637        // Get client-side InterfaceUp
638        let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
639        match event {
640            Event::InterfaceUp(id, _, _) => assert_eq!(id, InterfaceId(73)),
641            other => panic!("expected InterfaceUp, got {:?}", other),
642        }
643
644        // Client sends to server
645        let payload: Vec<u8> = (0..32).collect();
646        client_writer.send_frame(&payload).unwrap();
647
648        let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
649        match event {
650            Event::Frame { data, .. } => assert_eq!(data, payload),
651            other => panic!("expected Frame, got {:?}", other),
652        }
653
654        // Server sends to client
655        let payload2: Vec<u8> = (100..132).collect();
656        server_writer.send_frame(&payload2).unwrap();
657
658        let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
659        match event {
660            Event::Frame { data, .. } => assert_eq!(data, payload2),
661            other => panic!("expected Frame, got {:?}", other),
662        }
663    }
664
665    #[test]
666    fn multiple_local_clients() {
667        let port = find_free_port();
668        let (tx, rx) = mpsc::channel();
669        let next_id = Arc::new(AtomicU64::new(7300));
670
671        let config = LocalServerConfig {
672            instance_name: "test-multi".into(),
673            port,
674            interface_id: InterfaceId(74),
675        };
676
677        start_server(config, tx, next_id).unwrap();
678        thread::sleep(Duration::from_millis(50));
679
680        let _c1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
681        let _c2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
682
683        let mut ids = Vec::new();
684        for _ in 0..2 {
685            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
686            match event {
687                Event::InterfaceUp(id, _, _) => ids.push(id),
688                other => panic!("expected InterfaceUp, got {:?}", other),
689            }
690        }
691
692        assert_eq!(ids.len(), 2);
693        assert_ne!(ids[0], ids[1]);
694    }
695
696    #[test]
697    fn client_disconnect_detected() {
698        let port = find_free_port();
699        let (tx, rx) = mpsc::channel();
700        let next_id = Arc::new(AtomicU64::new(7400));
701
702        let config = LocalServerConfig {
703            instance_name: "test-dc".into(),
704            port,
705            interface_id: InterfaceId(75),
706        };
707
708        start_server(config, tx, next_id).unwrap();
709        thread::sleep(Duration::from_millis(50));
710
711        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
712
713        // Drain InterfaceUp
714        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
715
716        // Disconnect
717        drop(client);
718
719        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
720        assert!(
721            matches!(event, Event::InterfaceDown(_)),
722            "expected InterfaceDown, got {:?}",
723            event
724        );
725    }
726}