Skip to main content

rns_net/interface/
local.rs

1//! Local shared instance interface.
2//!
3//! Provides communication between the shared RNS instance and local client
4//! programs. Uses Unix abstract sockets on Linux, TCP on other platforms.
5//! HDLC framing over the connection (same as TCP interfaces).
6//!
7//! Two modes:
8//! - `LocalServer`: The shared instance binds and accepts client connections.
9//! - `LocalClient`: Connects to an existing shared instance.
10
11use std::io::{self, Read, Write};
12use std::net::{TcpListener, TcpStream};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::{Arc, Mutex};
15use std::thread;
16use std::time::{Duration, Instant};
17
18use rns_core::constants;
19use rns_core::transport::types::{InterfaceId, InterfaceInfo};
20
21use crate::event::{Event, EventSender};
22use crate::hdlc;
23use crate::interface::{ListenerControl, Writer};
24
25#[cfg(target_os = "android")]
26const CLIENT_SLEEP_PAUSE_TIMEOUT: Duration = Duration::from_secs(12);
27#[cfg(target_os = "android")]
28const PHY_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(5);
29
30/// Configuration for a Local server (shared instance).
31#[derive(Debug, Clone)]
32pub struct LocalServerConfig {
33    pub instance_name: String,
34    pub port: u16,
35    pub interface_id: InterfaceId,
36}
37
38impl Default for LocalServerConfig {
39    fn default() -> Self {
40        LocalServerConfig {
41            instance_name: "default".into(),
42            port: 37428,
43            interface_id: InterfaceId(0),
44        }
45    }
46}
47
48/// Configuration for a Local client (connecting to shared instance).
49#[derive(Debug, Clone)]
50pub struct LocalClientConfig {
51    pub name: String,
52    pub instance_name: String,
53    pub port: u16,
54    pub interface_id: InterfaceId,
55    pub reconnect_wait: Duration,
56}
57
58impl Default for LocalClientConfig {
59    fn default() -> Self {
60        LocalClientConfig {
61            name: "Local shared instance".into(),
62            instance_name: "default".into(),
63            port: 37428,
64            interface_id: InterfaceId(0),
65            reconnect_wait: Duration::from_secs(8),
66        }
67    }
68}
69
70/// HDLC writer over a TCP or Unix stream.
71struct LocalWriter {
72    stream: TcpStream,
73    sleep_hold: Option<ClientSleepHold>,
74}
75
76impl Writer for LocalWriter {
77    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
78        if self
79            .sleep_hold
80            .as_ref()
81            .is_some_and(ClientSleepHold::should_drop_outbound)
82        {
83            log::debug!("TX paused for LocalInterface client, dropping outbound packet");
84            return Ok(());
85        }
86        self.stream.write_all(&hdlc::frame(data))
87    }
88}
89
90#[derive(Clone)]
91struct ClientSleepHold {
92    timeout: Duration,
93    deadline: Arc<Mutex<Instant>>,
94}
95
96impl ClientSleepHold {
97    #[cfg_attr(not(any(target_os = "android", test)), allow(dead_code))]
98    fn new(timeout: Duration) -> Self {
99        Self {
100            timeout,
101            deadline: Arc::new(Mutex::new(Instant::now() + timeout)),
102        }
103    }
104
105    fn refresh(&self) {
106        *lock_or_recover(&self.deadline) = Instant::now() + self.timeout;
107    }
108
109    fn should_drop_outbound(&self) -> bool {
110        Instant::now() > *lock_or_recover(&self.deadline)
111    }
112}
113
114fn lock_or_recover<T>(mutex: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
115    match mutex.lock() {
116        Ok(guard) => guard,
117        Err(poisoned) => poisoned.into_inner(),
118    }
119}
120
121fn android_client_sleep_hold() -> Option<ClientSleepHold> {
122    #[cfg(target_os = "android")]
123    {
124        Some(ClientSleepHold::new(CLIENT_SLEEP_PAUSE_TIMEOUT))
125    }
126    #[cfg(not(target_os = "android"))]
127    {
128        None
129    }
130}
131
132#[cfg_attr(not(any(target_os = "android", test)), allow(dead_code))]
133fn spawn_physical_keepalive_loop(
134    mut writer: Box<dyn Writer>,
135    interface_id: InterfaceId,
136    interface_name: String,
137    interval: Duration,
138) {
139    thread::Builder::new()
140        .name(format!("local-phy-keepalive-{}", interface_id.0))
141        .spawn(move || loop {
142            thread::sleep(interval);
143            if let Err(err) = writer.send_frame(&[]) {
144                log::debug!(
145                    "[{}:{}] LocalInterface physical keepalive stopped: {}",
146                    interface_name,
147                    interface_id.0,
148                    err
149                );
150                return;
151            }
152        })
153        .ok();
154}
155
156fn maybe_spawn_local_client_phy_keepalive(
157    stream: &LocalClientStream,
158    interface_id: InterfaceId,
159    interface_name: &str,
160) -> io::Result<()> {
161    #[cfg(target_os = "android")]
162    {
163        let writer = local_client_stream_writer(stream)?;
164        spawn_physical_keepalive_loop(
165            writer,
166            interface_id,
167            interface_name.to_string(),
168            PHY_KEEPALIVE_INTERVAL,
169        );
170    }
171
172    #[cfg(not(target_os = "android"))]
173    {
174        let _ = (stream, interface_id, interface_name);
175    }
176
177    Ok(())
178}
179
180#[cfg(target_os = "linux")]
181mod unix_socket {
182    use std::io;
183    use std::os::linux::net::SocketAddrExt;
184    use std::os::unix::net::{SocketAddr, UnixListener, UnixStream};
185
186    fn abstract_addr(instance_name: &str) -> io::Result<SocketAddr> {
187        SocketAddr::from_abstract_name(format!("rns/{}", instance_name))
188    }
189
190    /// Try to bind a Unix abstract socket with the given instance name.
191    pub fn try_bind_unix(instance_name: &str) -> io::Result<UnixListener> {
192        let addr = abstract_addr(instance_name)?;
193        UnixListener::bind_addr(&addr)
194    }
195
196    /// Try to connect to a Unix abstract socket.
197    pub fn try_connect_unix(instance_name: &str) -> io::Result<UnixStream> {
198        let addr = abstract_addr(instance_name)?;
199        UnixStream::connect_addr(&addr)
200    }
201}
202
203// ==================== LOCAL SERVER ====================
204
205/// Start a local server (shared instance).
206/// Tries Unix abstract socket first on Linux, falls back to TCP.
207/// Spawns an acceptor thread. Each client gets a dynamically allocated InterfaceId.
208pub fn start_server(
209    config: LocalServerConfig,
210    tx: EventSender,
211    next_id: Arc<AtomicU64>,
212) -> io::Result<ListenerControl> {
213    let control = ListenerControl::new();
214    // Try Unix socket first on Linux
215    #[cfg(target_os = "linux")]
216    {
217        match unix_socket::try_bind_unix(&config.instance_name) {
218            Ok(listener) => {
219                listener.set_nonblocking(true)?;
220                log::info!(
221                    "Local server using Unix socket: rns/{}",
222                    config.instance_name
223                );
224                let name = format!("rns/{}", config.instance_name);
225                let listener_control = control.clone();
226                thread::Builder::new()
227                    .name("local-server".into())
228                    .spawn(move || {
229                        unix_server_loop(listener, name, tx, next_id, listener_control);
230                    })?;
231                return Ok(control);
232            }
233            Err(e) => {
234                log::info!("Unix socket bind failed ({}), falling back to TCP", e);
235            }
236        }
237    }
238
239    // Fallback: TCP on localhost
240    let addr = format!("127.0.0.1:{}", config.port);
241    let listener = TcpListener::bind(&addr)?;
242    listener.set_nonblocking(true)?;
243
244    log::info!("Local server listening on TCP {}", addr);
245
246    let listener_control = control.clone();
247    thread::Builder::new()
248        .name("local-server".into())
249        .spawn(move || {
250            tcp_server_loop(listener, tx, next_id, listener_control);
251        })?;
252
253    Ok(control)
254}
255
256/// TCP server accept loop for local interface.
257fn tcp_server_loop(
258    listener: TcpListener,
259    tx: EventSender,
260    next_id: Arc<AtomicU64>,
261    control: ListenerControl,
262) {
263    loop {
264        if control.should_stop() {
265            log::info!("Local TCP listener stopping");
266            return;
267        }
268
269        let stream_result = listener.accept().map(|(stream, _)| stream);
270        let stream = match stream_result {
271            Ok(s) => s,
272            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
273                thread::sleep(Duration::from_millis(50));
274                continue;
275            }
276            Err(e) => {
277                log::warn!("Local server accept failed: {}", e);
278                continue;
279            }
280        };
281
282        if let Err(e) = stream.set_nodelay(true) {
283            log::warn!("Local server set_nodelay failed: {}", e);
284        }
285
286        let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
287        spawn_local_client_handler(stream, client_id, tx.clone());
288    }
289}
290
291/// Unix socket server accept loop for local interface.
292#[cfg(target_os = "linux")]
293fn unix_server_loop(
294    listener: std::os::unix::net::UnixListener,
295    name: String,
296    tx: EventSender,
297    next_id: Arc<AtomicU64>,
298    control: ListenerControl,
299) {
300    loop {
301        if control.should_stop() {
302            log::info!("[{}] Local Unix listener stopping", name);
303            return;
304        }
305
306        let stream_result = listener.accept().map(|(stream, _)| stream);
307        let stream = match stream_result {
308            Ok(s) => s,
309            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
310                thread::sleep(Duration::from_millis(50));
311                continue;
312            }
313            Err(e) => {
314                log::warn!("[{}] Local server accept failed: {}", name, e);
315                continue;
316            }
317        };
318
319        let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
320
321        // Convert UnixStream to a pair of read/write handles
322        let writer_stream = match stream.try_clone() {
323            Ok(s) => s,
324            Err(e) => {
325                log::warn!("Local server clone failed: {}", e);
326                continue;
327            }
328        };
329
330        let sleep_hold = android_client_sleep_hold();
331        let info = make_local_interface_info(client_id);
332        let writer: Box<dyn Writer> = Box::new(UnixLocalWriter {
333            stream: writer_stream,
334            sleep_hold: sleep_hold.clone(),
335        });
336
337        if tx
338            .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
339            .is_err()
340        {
341            return;
342        }
343
344        let client_tx = tx.clone();
345        thread::Builder::new()
346            .name(format!("local-unix-reader-{}", client_id.0))
347            .spawn(move || {
348                unix_reader_loop(stream, client_id, client_tx, sleep_hold);
349            })
350            .ok();
351    }
352}
353
354#[cfg(target_os = "linux")]
355struct UnixLocalWriter {
356    stream: std::os::unix::net::UnixStream,
357    sleep_hold: Option<ClientSleepHold>,
358}
359
360#[cfg(target_os = "linux")]
361impl Writer for UnixLocalWriter {
362    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
363        use std::io::Write;
364        if self
365            .sleep_hold
366            .as_ref()
367            .is_some_and(ClientSleepHold::should_drop_outbound)
368        {
369            log::debug!("TX paused for LocalInterface client, dropping outbound packet");
370            return Ok(());
371        }
372        self.stream.write_all(&hdlc::frame(data))
373    }
374}
375
376#[cfg(target_os = "linux")]
377fn unix_reader_loop(
378    mut stream: std::os::unix::net::UnixStream,
379    id: InterfaceId,
380    tx: EventSender,
381    sleep_hold: Option<ClientSleepHold>,
382) {
383    use std::io::Read;
384    let mut decoder = hdlc::Decoder::new();
385    let mut buf = [0u8; 4096];
386
387    loop {
388        match stream.read(&mut buf) {
389            Ok(0) => {
390                let _ = tx.send(Event::InterfaceDown(id));
391                return;
392            }
393            Ok(n) => {
394                if let Some(ref sleep_hold) = sleep_hold {
395                    sleep_hold.refresh();
396                }
397                for frame in decoder.feed(&buf[..n]) {
398                    if tx
399                        .send(Event::Frame {
400                            interface_id: id,
401                            data: frame,
402                            rssi: None,
403                            snr: None,
404                        })
405                        .is_err()
406                    {
407                        return;
408                    }
409                }
410            }
411            Err(_) => {
412                let _ = tx.send(Event::InterfaceDown(id));
413                return;
414            }
415        }
416    }
417}
418
419/// Spawn handler threads for a connected TCP local client.
420fn spawn_local_client_handler(stream: TcpStream, client_id: InterfaceId, tx: EventSender) {
421    let writer_stream = match stream.try_clone() {
422        Ok(s) => s,
423        Err(e) => {
424            log::warn!("Local server clone failed: {}", e);
425            return;
426        }
427    };
428
429    let sleep_hold = android_client_sleep_hold();
430    let info = make_local_interface_info(client_id);
431    let writer: Box<dyn Writer> = Box::new(LocalWriter {
432        stream: writer_stream,
433        sleep_hold: sleep_hold.clone(),
434    });
435
436    if tx
437        .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
438        .is_err()
439    {
440        return;
441    }
442
443    thread::Builder::new()
444        .name(format!("local-reader-{}", client_id.0))
445        .spawn(move || {
446            tcp_reader_loop(stream, client_id, tx, sleep_hold);
447        })
448        .ok();
449}
450
451fn tcp_reader_loop(
452    mut stream: TcpStream,
453    id: InterfaceId,
454    tx: EventSender,
455    sleep_hold: Option<ClientSleepHold>,
456) {
457    let mut decoder = hdlc::Decoder::new();
458    let mut buf = [0u8; 4096];
459
460    loop {
461        match stream.read(&mut buf) {
462            Ok(0) => {
463                log::info!("Local client {} disconnected", id.0);
464                let _ = tx.send(Event::InterfaceDown(id));
465                return;
466            }
467            Ok(n) => {
468                if let Some(ref sleep_hold) = sleep_hold {
469                    sleep_hold.refresh();
470                }
471                for frame in decoder.feed(&buf[..n]) {
472                    if tx
473                        .send(Event::Frame {
474                            interface_id: id,
475                            data: frame,
476                            rssi: None,
477                            snr: None,
478                        })
479                        .is_err()
480                    {
481                        return;
482                    }
483                }
484            }
485            Err(e) => {
486                log::warn!("Local client {} read error: {}", id.0, e);
487                let _ = tx.send(Event::InterfaceDown(id));
488                return;
489            }
490        }
491    }
492}
493
494fn make_local_interface_info(id: InterfaceId) -> InterfaceInfo {
495    InterfaceInfo {
496        id,
497        name: String::from("LocalInterface"),
498        mode: constants::MODE_FULL,
499        out_capable: true,
500        in_capable: true,
501        bitrate: Some(1_000_000_000), // 1 Gbps
502        airtime_profile: None,
503        announce_rate_target: None,
504        announce_rate_grace: 0,
505        announce_rate_penalty: 0.0,
506        announce_cap: constants::ANNOUNCE_CAP,
507        is_local_client: false,
508        wants_tunnel: false,
509        tunnel_id: None,
510        mtu: 65535,
511        ia_freq: 0.0,
512        ip_freq: 0.0,
513        op_freq: 0.0,
514        op_samples: 0,
515        started: 0.0,
516        ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
517    }
518}
519
520// ==================== LOCAL CLIENT ====================
521
522#[cfg(target_os = "linux")]
523enum LocalClientStream {
524    Unix(std::os::unix::net::UnixStream),
525    Tcp(TcpStream),
526}
527
528#[cfg(target_os = "linux")]
529impl LocalClientStream {
530    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
531        match self {
532            LocalClientStream::Unix(stream) => stream.read(buf),
533            LocalClientStream::Tcp(stream) => stream.read(buf),
534        }
535    }
536
537    fn writer(&self) -> io::Result<Box<dyn Writer>> {
538        match self {
539            LocalClientStream::Unix(stream) => Ok(Box::new(UnixLocalWriter {
540                stream: stream.try_clone()?,
541                sleep_hold: None,
542            })),
543            LocalClientStream::Tcp(stream) => Ok(Box::new(LocalWriter {
544                stream: stream.try_clone()?,
545                sleep_hold: None,
546            })),
547        }
548    }
549}
550
551#[cfg(not(target_os = "linux"))]
552type LocalClientStream = TcpStream;
553
554#[cfg(not(target_os = "linux"))]
555fn local_client_stream_writer(stream: &LocalClientStream) -> io::Result<Box<dyn Writer>> {
556    Ok(Box::new(LocalWriter {
557        stream: stream.try_clone()?,
558        sleep_hold: None,
559    }))
560}
561
562#[cfg(target_os = "linux")]
563fn local_client_stream_writer(stream: &LocalClientStream) -> io::Result<Box<dyn Writer>> {
564    stream.writer()
565}
566
567fn try_connect_tcp(config: &LocalClientConfig) -> io::Result<TcpStream> {
568    let addr = format!("127.0.0.1:{}", config.port);
569    let stream = TcpStream::connect(&addr)?;
570    stream.set_nodelay(true)?;
571    log::info!(
572        "[{}] Connected to shared instance via TCP {}",
573        config.name,
574        addr
575    );
576    Ok(stream)
577}
578
579#[cfg(target_os = "linux")]
580fn try_connect_local_client(config: &LocalClientConfig) -> io::Result<LocalClientStream> {
581    match unix_socket::try_connect_unix(&config.instance_name) {
582        Ok(stream) => {
583            log::info!(
584                "[{}] Connected to shared instance via Unix socket: rns/{}",
585                config.name,
586                config.instance_name
587            );
588            Ok(LocalClientStream::Unix(stream))
589        }
590        Err(e) => {
591            log::info!(
592                "[{}] Unix socket connect failed ({}), trying TCP",
593                config.name,
594                e
595            );
596            try_connect_tcp(config).map(LocalClientStream::Tcp)
597        }
598    }
599}
600
601#[cfg(not(target_os = "linux"))]
602fn try_connect_local_client(config: &LocalClientConfig) -> io::Result<LocalClientStream> {
603    try_connect_tcp(config)
604}
605
606fn reconnect_local_client(config: &LocalClientConfig, tx: &EventSender) -> LocalClientStream {
607    loop {
608        thread::sleep(config.reconnect_wait);
609        match try_connect_local_client(config) {
610            Ok(stream) => match local_client_stream_writer(&stream) {
611                Ok(writer) => {
612                    let _ = maybe_spawn_local_client_phy_keepalive(
613                        &stream,
614                        config.interface_id,
615                        &config.name,
616                    );
617                    let _ = tx.send(Event::InterfaceUp(config.interface_id, Some(writer), None));
618                    return stream;
619                }
620                Err(e) => {
621                    log::warn!("[{}] failed to clone reconnect writer: {}", config.name, e);
622                }
623            },
624            Err(e) => {
625                log::warn!("[{}] reconnect failed: {}", config.name, e);
626            }
627        }
628    }
629}
630
631fn local_client_reader_loop(
632    mut stream: LocalClientStream,
633    config: LocalClientConfig,
634    tx: EventSender,
635) {
636    let id = config.interface_id;
637    let mut decoder = hdlc::Decoder::new();
638    let mut buf = [0u8; 4096];
639
640    loop {
641        match stream.read(&mut buf) {
642            Ok(0) => {
643                log::warn!("[{}] shared connection closed", config.name);
644                let _ = tx.send(Event::InterfaceDown(id));
645                stream = reconnect_local_client(&config, &tx);
646                decoder = hdlc::Decoder::new();
647            }
648            Ok(n) => {
649                for frame in decoder.feed(&buf[..n]) {
650                    if tx
651                        .send(Event::Frame {
652                            interface_id: id,
653                            data: frame,
654                            rssi: None,
655                            snr: None,
656                        })
657                        .is_err()
658                    {
659                        return;
660                    }
661                }
662            }
663            Err(e) => {
664                log::warn!("[{}] shared read error: {}", config.name, e);
665                let _ = tx.send(Event::InterfaceDown(id));
666                stream = reconnect_local_client(&config, &tx);
667                decoder = hdlc::Decoder::new();
668            }
669        }
670    }
671}
672
673/// Start a local client (connect to shared instance).
674/// Tries Unix socket first on Linux, falls back to TCP.
675/// Returns the writer for the driver.
676pub fn start_client(config: LocalClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
677    let id = config.interface_id;
678    let stream = try_connect_local_client(&config)?;
679    let writer = local_client_stream_writer(&stream)?;
680    maybe_spawn_local_client_phy_keepalive(&stream, id, &config.name)?;
681
682    let _ = tx.send(Event::InterfaceUp(id, None, None));
683
684    thread::Builder::new()
685        .name(format!("local-client-reader-{}", id.0))
686        .spawn(move || {
687            local_client_reader_loop(stream, config, tx);
688        })?;
689
690    Ok(writer)
691}
692
693// --- Factory implementations ---
694
695use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
696use std::collections::HashMap;
697
698/// Factory for `LocalServerInterface`.
699pub struct LocalServerFactory;
700
701impl InterfaceFactory for LocalServerFactory {
702    fn type_name(&self) -> &str {
703        "LocalServerInterface"
704    }
705
706    fn parse_config(
707        &self,
708        _name: &str,
709        id: InterfaceId,
710        params: &HashMap<String, String>,
711    ) -> Result<Box<dyn InterfaceConfigData>, String> {
712        let instance_name = params
713            .get("instance_name")
714            .cloned()
715            .unwrap_or_else(|| "default".into());
716        let port = params
717            .get("port")
718            .and_then(|v| v.parse().ok())
719            .unwrap_or(37428);
720
721        Ok(Box::new(LocalServerConfig {
722            instance_name,
723            port,
724            interface_id: id,
725        }))
726    }
727
728    fn start(
729        &self,
730        config: Box<dyn InterfaceConfigData>,
731        ctx: StartContext,
732    ) -> std::io::Result<StartResult> {
733        let server_config = *config
734            .into_any()
735            .downcast::<LocalServerConfig>()
736            .map_err(|_| {
737                std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
738            })?;
739
740        let control = start_server(server_config, ctx.tx, ctx.next_dynamic_id)?;
741        Ok(StartResult::Listener {
742            control: Some(control),
743        })
744    }
745}
746
747/// Factory for `LocalClientInterface`.
748pub struct LocalClientFactory;
749
750impl InterfaceFactory for LocalClientFactory {
751    fn type_name(&self) -> &str {
752        "LocalClientInterface"
753    }
754
755    fn parse_config(
756        &self,
757        _name: &str,
758        id: InterfaceId,
759        params: &HashMap<String, String>,
760    ) -> Result<Box<dyn InterfaceConfigData>, String> {
761        let instance_name = params
762            .get("instance_name")
763            .cloned()
764            .unwrap_or_else(|| "default".into());
765        let port = params
766            .get("port")
767            .and_then(|v| v.parse().ok())
768            .unwrap_or(37428);
769
770        Ok(Box::new(LocalClientConfig {
771            instance_name,
772            port,
773            interface_id: id,
774            ..LocalClientConfig::default()
775        }))
776    }
777
778    fn start(
779        &self,
780        config: Box<dyn InterfaceConfigData>,
781        ctx: StartContext,
782    ) -> std::io::Result<StartResult> {
783        let client_config = *config
784            .into_any()
785            .downcast::<LocalClientConfig>()
786            .map_err(|_| {
787                std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
788            })?;
789
790        let id = client_config.interface_id;
791        let name = client_config.name.clone();
792        let info = InterfaceInfo {
793            id,
794            name,
795            mode: ctx.mode,
796            out_capable: true,
797            in_capable: true,
798            bitrate: Some(1_000_000_000),
799            airtime_profile: None,
800            announce_rate_target: None,
801            announce_rate_grace: 0,
802            announce_rate_penalty: 0.0,
803            announce_cap: rns_core::constants::ANNOUNCE_CAP,
804            is_local_client: false,
805            wants_tunnel: false,
806            tunnel_id: None,
807            mtu: 65535,
808            ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
809            ia_freq: 0.0,
810            ip_freq: 0.0,
811            op_freq: 0.0,
812            op_samples: 0,
813            started: crate::time::now(),
814        };
815
816        let writer = start_client(client_config, ctx.tx)?;
817
818        Ok(StartResult::Simple {
819            id,
820            info,
821            writer,
822            interface_type_name: "LocalInterface".to_string(),
823        })
824    }
825}
826
827#[cfg(test)]
828mod tests {
829    use super::*;
830    use std::sync::mpsc;
831    use std::sync::mpsc::RecvTimeoutError;
832
833    #[cfg(target_os = "linux")]
834    type TestClient = std::os::unix::net::UnixStream;
835
836    #[cfg(not(target_os = "linux"))]
837    type TestClient = TcpStream;
838
839    fn connect_test_client(instance_name: &str, _port: u16) -> TestClient {
840        #[cfg(target_os = "linux")]
841        {
842            unix_socket::try_connect_unix(instance_name).unwrap()
843        }
844
845        #[cfg(not(target_os = "linux"))]
846        {
847            TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap()
848        }
849    }
850
851    fn find_free_port() -> u16 {
852        TcpListener::bind("127.0.0.1:0")
853            .unwrap()
854            .local_addr()
855            .unwrap()
856            .port()
857    }
858
859    #[test]
860    fn server_bind_tcp() {
861        let port = find_free_port();
862        let instance_name = "test-bind".to_string();
863        let (tx, _rx) = crate::event::channel();
864        let next_id = Arc::new(AtomicU64::new(7000));
865
866        let config = LocalServerConfig {
867            instance_name: instance_name.clone(),
868            port,
869            interface_id: InterfaceId(70),
870        };
871
872        // We force TCP by using a unique instance name that won't conflict
873        // with any existing Unix socket
874        start_server(config, tx, next_id).unwrap();
875        thread::sleep(Duration::from_millis(50));
876
877        connect_test_client(&instance_name, port);
878    }
879
880    #[test]
881    fn server_accept_client() {
882        let port = find_free_port();
883        let instance_name = "test-accept".to_string();
884        let (tx, rx) = crate::event::channel();
885        let next_id = Arc::new(AtomicU64::new(7100));
886
887        let config = LocalServerConfig {
888            instance_name: instance_name.clone(),
889            port,
890            interface_id: InterfaceId(71),
891        };
892
893        start_server(config, tx, next_id).unwrap();
894        thread::sleep(Duration::from_millis(50));
895
896        connect_test_client(&instance_name, port);
897
898        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
899        match event {
900            Event::InterfaceUp(id, writer, info) => {
901                assert_eq!(id, InterfaceId(7100));
902                assert!(writer.is_some());
903                assert!(info.is_some());
904            }
905            other => panic!("expected InterfaceUp, got {:?}", other),
906        }
907    }
908
909    #[test]
910    fn server_stop_prevents_new_accepts() {
911        let port = find_free_port();
912        let instance_name = "test-stop".to_string();
913        let (tx, rx) = crate::event::channel();
914        let next_id = Arc::new(AtomicU64::new(7150));
915
916        let config = LocalServerConfig {
917            instance_name: instance_name.clone(),
918            port,
919            interface_id: InterfaceId(71),
920        };
921
922        let control = start_server(config, tx, next_id).unwrap();
923        thread::sleep(Duration::from_millis(50));
924        control.request_stop();
925        thread::sleep(Duration::from_millis(120));
926
927        #[cfg(target_os = "linux")]
928        let connect_result = unix_socket::try_connect_unix(&instance_name);
929
930        #[cfg(not(target_os = "linux"))]
931        let connect_result = TcpStream::connect(format!("127.0.0.1:{}", port));
932
933        if let Ok(stream) = connect_result {
934            drop(stream);
935        }
936
937        match rx.recv_timeout(Duration::from_millis(200)) {
938            Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => {}
939            other => panic!("expected no InterfaceUp after server stop, got {:?}", other),
940        }
941    }
942
943    #[test]
944    fn client_send_receive() {
945        let port = find_free_port();
946        let (server_tx, server_rx) = crate::event::channel();
947        let next_id = Arc::new(AtomicU64::new(7200));
948
949        let server_config = LocalServerConfig {
950            instance_name: "test-sr".into(),
951            port,
952            interface_id: InterfaceId(72),
953        };
954
955        start_server(server_config, server_tx, next_id).unwrap();
956        thread::sleep(Duration::from_millis(50));
957
958        // Connect client
959        let (client_tx, client_rx) = crate::event::channel();
960        let client_config = LocalClientConfig {
961            name: "test-client".into(),
962            instance_name: "test-sr".into(),
963            port,
964            interface_id: InterfaceId(73),
965            reconnect_wait: Duration::from_secs(1),
966        };
967
968        let mut client_writer = start_client(client_config, client_tx).unwrap();
969
970        // Get server-side InterfaceUp
971        let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
972        let mut server_writer = match event {
973            Event::InterfaceUp(_, Some(w), _) => w,
974            other => panic!("expected InterfaceUp with writer, got {:?}", other),
975        };
976
977        // Get client-side InterfaceUp
978        let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
979        match event {
980            Event::InterfaceUp(id, _, _) => assert_eq!(id, InterfaceId(73)),
981            other => panic!("expected InterfaceUp, got {:?}", other),
982        }
983
984        // Client sends to server
985        let payload: Vec<u8> = (0..32).collect();
986        client_writer.send_frame(&payload).unwrap();
987
988        let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
989        match event {
990            Event::Frame { data, .. } => assert_eq!(data, payload),
991            other => panic!("expected Frame, got {:?}", other),
992        }
993
994        // Server sends to client
995        let payload2: Vec<u8> = (100..132).collect();
996        server_writer.send_frame(&payload2).unwrap();
997
998        let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
999        match event {
1000            Event::Frame { data, .. } => assert_eq!(data, payload2),
1001            other => panic!("expected Frame, got {:?}", other),
1002        }
1003    }
1004
1005    #[test]
1006    fn sleep_hold_drops_outbound_after_timeout_and_refresh_restores() {
1007        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1008        let addr = listener.local_addr().unwrap();
1009        let mut client = TcpStream::connect(addr).unwrap();
1010        let (server_stream, _) = listener.accept().unwrap();
1011        client
1012            .set_read_timeout(Some(Duration::from_millis(200)))
1013            .unwrap();
1014
1015        let sleep_hold = ClientSleepHold::new(Duration::from_millis(30));
1016        let mut writer = LocalWriter {
1017            stream: server_stream,
1018            sleep_hold: Some(sleep_hold.clone()),
1019        };
1020
1021        writer.send_frame(b"live").unwrap();
1022        let mut buf = [0u8; 16];
1023        let n = client.read(&mut buf).unwrap();
1024        assert!(n > 0);
1025
1026        thread::sleep(Duration::from_millis(50));
1027        writer.send_frame(b"drop").unwrap();
1028        let err = client.read(&mut buf).unwrap_err();
1029        assert!(
1030            matches!(
1031                err.kind(),
1032                io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut
1033            ),
1034            "paused writer should not emit bytes, got {err:?}"
1035        );
1036
1037        sleep_hold.refresh();
1038        writer.send_frame(b"again").unwrap();
1039        let n = client.read(&mut buf).unwrap();
1040        assert!(n > 0);
1041    }
1042
1043    #[test]
1044    fn tcp_reader_refreshes_sleep_hold_on_inbound_data() {
1045        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1046        let addr = listener.local_addr().unwrap();
1047        let mut client = TcpStream::connect(addr).unwrap();
1048        let (server_stream, _) = listener.accept().unwrap();
1049        let writer_stream = server_stream.try_clone().unwrap();
1050        client
1051            .set_read_timeout(Some(Duration::from_millis(200)))
1052            .unwrap();
1053
1054        let sleep_hold = ClientSleepHold::new(Duration::from_millis(30));
1055        let reader_sleep_hold = sleep_hold.clone();
1056        let (tx, rx) = crate::event::channel();
1057        thread::spawn(move || {
1058            tcp_reader_loop(server_stream, InterfaceId(98), tx, Some(reader_sleep_hold));
1059        });
1060
1061        let mut writer = LocalWriter {
1062            stream: writer_stream,
1063            sleep_hold: Some(sleep_hold),
1064        };
1065
1066        thread::sleep(Duration::from_millis(50));
1067        writer.send_frame(b"drop").unwrap();
1068        let mut buf = [0u8; 32];
1069        let err = client.read(&mut buf).unwrap_err();
1070        assert!(
1071            matches!(
1072                err.kind(),
1073                io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut
1074            ),
1075            "paused writer should not emit bytes, got {err:?}"
1076        );
1077
1078        let inbound = vec![0x42; constants::HEADER_MINSIZE];
1079        client.write_all(&hdlc::frame(&inbound)).unwrap();
1080        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1081        match event {
1082            Event::Frame {
1083                interface_id,
1084                data,
1085                rssi: _,
1086                snr: _,
1087            } => {
1088                assert_eq!(interface_id, InterfaceId(98));
1089                assert_eq!(data, inbound);
1090            }
1091            other => panic!("expected Frame, got {:?}", other),
1092        }
1093
1094        writer.send_frame(b"again").unwrap();
1095        let n = client.read(&mut buf).unwrap();
1096        assert!(n > 0);
1097    }
1098
1099    #[test]
1100    fn physical_keepalive_loop_sends_empty_hdlc_frame() {
1101        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1102        let addr = listener.local_addr().unwrap();
1103        let mut client = TcpStream::connect(addr).unwrap();
1104        let (server_stream, _) = listener.accept().unwrap();
1105        client
1106            .set_read_timeout(Some(Duration::from_millis(500)))
1107            .unwrap();
1108
1109        let writer: Box<dyn Writer> = Box::new(LocalWriter {
1110            stream: server_stream,
1111            sleep_hold: None,
1112        });
1113        spawn_physical_keepalive_loop(
1114            writer,
1115            InterfaceId(99),
1116            "test-local".into(),
1117            Duration::from_millis(10),
1118        );
1119
1120        let mut buf = [0u8; 2];
1121        client.read_exact(&mut buf).unwrap();
1122        assert_eq!(buf, [0x7E, 0x7E]);
1123    }
1124
1125    #[test]
1126    fn multiple_local_clients() {
1127        let port = find_free_port();
1128        let instance_name = "test-multi".to_string();
1129        let (tx, rx) = crate::event::channel();
1130        let next_id = Arc::new(AtomicU64::new(7300));
1131
1132        let config = LocalServerConfig {
1133            instance_name: instance_name.clone(),
1134            port,
1135            interface_id: InterfaceId(74),
1136        };
1137
1138        start_server(config, tx, next_id).unwrap();
1139        thread::sleep(Duration::from_millis(50));
1140
1141        let _client1 = connect_test_client(&instance_name, port);
1142        let _client2 = connect_test_client(&instance_name, port);
1143
1144        let mut ids = Vec::new();
1145        for _ in 0..2 {
1146            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1147            match event {
1148                Event::InterfaceUp(id, _, _) => ids.push(id),
1149                other => panic!("expected InterfaceUp, got {:?}", other),
1150            }
1151        }
1152
1153        assert_eq!(ids.len(), 2);
1154        assert_ne!(ids[0], ids[1]);
1155    }
1156
1157    #[test]
1158    fn client_disconnect_detected() {
1159        let port = find_free_port();
1160        let instance_name = "test-dc".to_string();
1161        let (tx, rx) = crate::event::channel();
1162        let next_id = Arc::new(AtomicU64::new(7400));
1163
1164        let config = LocalServerConfig {
1165            instance_name: instance_name.clone(),
1166            port,
1167            interface_id: InterfaceId(75),
1168        };
1169
1170        start_server(config, tx, next_id).unwrap();
1171        thread::sleep(Duration::from_millis(50));
1172
1173        #[cfg(target_os = "linux")]
1174        let client = unix_socket::try_connect_unix(&instance_name).unwrap();
1175
1176        #[cfg(not(target_os = "linux"))]
1177        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1178
1179        // Drain InterfaceUp
1180        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1181
1182        // Disconnect
1183        drop(client);
1184
1185        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1186        assert!(
1187            matches!(event, Event::InterfaceDown(_)),
1188            "expected InterfaceDown, got {:?}",
1189            event
1190        );
1191    }
1192
1193    #[test]
1194    fn client_reconnects_after_tcp_restart() {
1195        let port = find_free_port();
1196        let addr = format!("127.0.0.1:{}", port);
1197        let instance_name = format!("test-reconnect-{}", port);
1198
1199        let listener1 = TcpListener::bind(&addr).unwrap();
1200        let (accepted1_tx, accepted1_rx) = mpsc::channel();
1201        thread::spawn(move || {
1202            let (stream, _) = listener1.accept().unwrap();
1203            accepted1_tx.send(stream).unwrap();
1204        });
1205
1206        let (client_tx, client_rx) = crate::event::channel();
1207        let client_config = LocalClientConfig {
1208            name: "test-client".into(),
1209            instance_name,
1210            port,
1211            interface_id: InterfaceId(76),
1212            reconnect_wait: Duration::from_millis(50),
1213        };
1214
1215        let _writer = start_client(client_config, client_tx).unwrap();
1216        let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1217        assert!(matches!(
1218            event,
1219            Event::InterfaceUp(InterfaceId(76), None, None)
1220        ));
1221
1222        let stream1 = accepted1_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1223        drop(stream1);
1224
1225        let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1226        assert!(matches!(event, Event::InterfaceDown(InterfaceId(76))));
1227
1228        let listener2 = TcpListener::bind(&addr).unwrap();
1229        let (accepted2_tx, accepted2_rx) = mpsc::channel();
1230        thread::spawn(move || {
1231            let (stream, _) = listener2.accept().unwrap();
1232            accepted2_tx.send(stream).unwrap();
1233        });
1234
1235        let mut reconnected_writer = None;
1236        for _ in 0..10 {
1237            let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1238            match event {
1239                Event::InterfaceUp(InterfaceId(76), writer, None) if writer.is_some() => {
1240                    reconnected_writer = writer;
1241                    break;
1242                }
1243                _ => {}
1244            }
1245        }
1246
1247        let mut reconnected_writer = reconnected_writer.expect("missing reconnect writer");
1248        let mut stream2 = accepted2_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1249        reconnected_writer.send_frame(b"client->server").unwrap();
1250        stream2
1251            .set_read_timeout(Some(Duration::from_secs(2)))
1252            .unwrap();
1253        let mut buf = [0u8; 64];
1254        let n = stream2.read(&mut buf).unwrap();
1255        assert!(n > 0, "expected bytes from refreshed writer");
1256    }
1257
1258    #[cfg(target_os = "linux")]
1259    #[test]
1260    fn unix_abstract_socket_helpers_work() {
1261        let instance_name = format!(
1262            "test-abstract-{}",
1263            std::time::SystemTime::now()
1264                .duration_since(std::time::UNIX_EPOCH)
1265                .unwrap()
1266                .as_nanos()
1267        );
1268
1269        let listener = unix_socket::try_bind_unix(&instance_name).unwrap();
1270        let accept_thread = thread::spawn(move || listener.accept().unwrap().0);
1271
1272        let mut client = unix_socket::try_connect_unix(&instance_name).unwrap();
1273        let mut server = accept_thread.join().unwrap();
1274
1275        client.write_all(b"ping").unwrap();
1276        let mut buf = [0u8; 4];
1277        server.read_exact(&mut buf).unwrap();
1278        assert_eq!(&buf, b"ping");
1279    }
1280}