Skip to main content

rns_net/interface/
local.rs

1//! Local shared instance interface.
2//!
3//! Provides communication between the shared RNS instance and local client
4//! programs. Uses Unix abstract sockets on Linux, TCP on other platforms.
5//! HDLC framing over the connection (same as TCP interfaces).
6//!
7//! Two modes:
8//! - `LocalServer`: The shared instance binds and accepts client connections.
9//! - `LocalClient`: Connects to an existing shared instance.
10
11use std::io::{self, Read, Write};
12use std::net::{TcpListener, TcpStream};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::{Arc, Mutex};
15use std::thread;
16use std::time::{Duration, Instant};
17
18use rns_core::constants;
19use rns_core::transport::types::{InterfaceId, InterfaceInfo};
20
21use crate::event::{Event, EventSender};
22use crate::hdlc;
23use crate::interface::{ListenerControl, Writer};
24
25#[cfg(target_os = "android")]
26const CLIENT_SLEEP_PAUSE_TIMEOUT: Duration = Duration::from_secs(12);
27#[cfg(target_os = "android")]
28const PHY_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(5);
29
30/// Configuration for a Local server (shared instance).
31#[derive(Debug, Clone)]
32pub struct LocalServerConfig {
33    pub instance_name: String,
34    pub port: u16,
35    pub interface_id: InterfaceId,
36}
37
38impl Default for LocalServerConfig {
39    fn default() -> Self {
40        LocalServerConfig {
41            instance_name: "default".into(),
42            port: 37428,
43            interface_id: InterfaceId(0),
44        }
45    }
46}
47
48/// Configuration for a Local client (connecting to shared instance).
49#[derive(Debug, Clone)]
50pub struct LocalClientConfig {
51    pub name: String,
52    pub instance_name: String,
53    pub port: u16,
54    pub interface_id: InterfaceId,
55    pub reconnect_wait: Duration,
56}
57
58impl Default for LocalClientConfig {
59    fn default() -> Self {
60        LocalClientConfig {
61            name: "Local shared instance".into(),
62            instance_name: "default".into(),
63            port: 37428,
64            interface_id: InterfaceId(0),
65            reconnect_wait: Duration::from_secs(8),
66        }
67    }
68}
69
70/// HDLC writer over a TCP or Unix stream.
71struct LocalWriter {
72    stream: TcpStream,
73    sleep_hold: Option<ClientSleepHold>,
74}
75
76impl Writer for LocalWriter {
77    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
78        if self
79            .sleep_hold
80            .as_ref()
81            .is_some_and(ClientSleepHold::should_drop_outbound)
82        {
83            log::debug!("TX paused for LocalInterface client, dropping outbound packet");
84            return Ok(());
85        }
86        self.stream.write_all(&hdlc::frame(data))
87    }
88}
89
90#[derive(Clone)]
91struct ClientSleepHold {
92    timeout: Duration,
93    deadline: Arc<Mutex<Instant>>,
94}
95
96impl ClientSleepHold {
97    #[cfg_attr(not(any(target_os = "android", test)), allow(dead_code))]
98    fn new(timeout: Duration) -> Self {
99        Self {
100            timeout,
101            deadline: Arc::new(Mutex::new(Instant::now() + timeout)),
102        }
103    }
104
105    fn refresh(&self) {
106        *lock_or_recover(&self.deadline) = Instant::now() + self.timeout;
107    }
108
109    fn should_drop_outbound(&self) -> bool {
110        Instant::now() > *lock_or_recover(&self.deadline)
111    }
112}
113
114fn lock_or_recover<T>(mutex: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
115    match mutex.lock() {
116        Ok(guard) => guard,
117        Err(poisoned) => poisoned.into_inner(),
118    }
119}
120
121fn android_client_sleep_hold() -> Option<ClientSleepHold> {
122    #[cfg(target_os = "android")]
123    {
124        Some(ClientSleepHold::new(CLIENT_SLEEP_PAUSE_TIMEOUT))
125    }
126    #[cfg(not(target_os = "android"))]
127    {
128        None
129    }
130}
131
132#[cfg_attr(not(any(target_os = "android", test)), allow(dead_code))]
133fn spawn_physical_keepalive_loop(
134    mut writer: Box<dyn Writer>,
135    interface_id: InterfaceId,
136    interface_name: String,
137    interval: Duration,
138) {
139    thread::Builder::new()
140        .name(format!("local-phy-keepalive-{}", interface_id.0))
141        .spawn(move || loop {
142            thread::sleep(interval);
143            if let Err(err) = writer.send_frame(&[]) {
144                log::debug!(
145                    "[{}:{}] LocalInterface physical keepalive stopped: {}",
146                    interface_name,
147                    interface_id.0,
148                    err
149                );
150                return;
151            }
152        })
153        .ok();
154}
155
156fn maybe_spawn_local_client_phy_keepalive(
157    stream: &LocalClientStream,
158    interface_id: InterfaceId,
159    interface_name: &str,
160) -> io::Result<()> {
161    #[cfg(target_os = "android")]
162    {
163        let writer = local_client_stream_writer(stream)?;
164        spawn_physical_keepalive_loop(
165            writer,
166            interface_id,
167            interface_name.to_string(),
168            PHY_KEEPALIVE_INTERVAL,
169        );
170    }
171
172    #[cfg(not(target_os = "android"))]
173    {
174        let _ = (stream, interface_id, interface_name);
175    }
176
177    Ok(())
178}
179
180#[cfg(target_os = "linux")]
181mod unix_socket {
182    use std::io;
183    use std::os::linux::net::SocketAddrExt;
184    use std::os::unix::net::{SocketAddr, UnixListener, UnixStream};
185
186    fn abstract_addr(instance_name: &str) -> io::Result<SocketAddr> {
187        SocketAddr::from_abstract_name(format!("rns/{}", instance_name))
188    }
189
190    /// Try to bind a Unix abstract socket with the given instance name.
191    pub fn try_bind_unix(instance_name: &str) -> io::Result<UnixListener> {
192        let addr = abstract_addr(instance_name)?;
193        UnixListener::bind_addr(&addr)
194    }
195
196    /// Try to connect to a Unix abstract socket.
197    pub fn try_connect_unix(instance_name: &str) -> io::Result<UnixStream> {
198        let addr = abstract_addr(instance_name)?;
199        UnixStream::connect_addr(&addr)
200    }
201}
202
203// ==================== LOCAL SERVER ====================
204
205/// Start a local server (shared instance).
206/// Tries Unix abstract socket first on Linux, falls back to TCP.
207/// Spawns an acceptor thread. Each client gets a dynamically allocated InterfaceId.
208pub fn start_server(
209    config: LocalServerConfig,
210    tx: EventSender,
211    next_id: Arc<AtomicU64>,
212) -> io::Result<ListenerControl> {
213    let control = ListenerControl::new();
214    // Try Unix socket first on Linux
215    #[cfg(target_os = "linux")]
216    {
217        match unix_socket::try_bind_unix(&config.instance_name) {
218            Ok(listener) => {
219                listener.set_nonblocking(true)?;
220                log::info!(
221                    "Local server using Unix socket: rns/{}",
222                    config.instance_name
223                );
224                let name = format!("rns/{}", config.instance_name);
225                let listener_control = control.clone();
226                thread::Builder::new()
227                    .name("local-server".into())
228                    .spawn(move || {
229                        unix_server_loop(listener, name, tx, next_id, listener_control);
230                    })?;
231                return Ok(control);
232            }
233            Err(e) => {
234                log::info!("Unix socket bind failed ({}), falling back to TCP", e);
235            }
236        }
237    }
238
239    // Fallback: TCP on localhost
240    let addr = format!("127.0.0.1:{}", config.port);
241    let listener = TcpListener::bind(&addr)?;
242    listener.set_nonblocking(true)?;
243
244    log::info!("Local server listening on TCP {}", addr);
245
246    let listener_control = control.clone();
247    thread::Builder::new()
248        .name("local-server".into())
249        .spawn(move || {
250            tcp_server_loop(listener, tx, next_id, listener_control);
251        })?;
252
253    Ok(control)
254}
255
256/// TCP server accept loop for local interface.
257fn tcp_server_loop(
258    listener: TcpListener,
259    tx: EventSender,
260    next_id: Arc<AtomicU64>,
261    control: ListenerControl,
262) {
263    loop {
264        if control.should_stop() {
265            log::info!("Local TCP listener stopping");
266            return;
267        }
268
269        let stream_result = listener.accept().map(|(stream, _)| stream);
270        let stream = match stream_result {
271            Ok(s) => s,
272            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
273                thread::sleep(Duration::from_millis(50));
274                continue;
275            }
276            Err(e) => {
277                log::warn!("Local server accept failed: {}", e);
278                continue;
279            }
280        };
281
282        if let Err(e) = stream.set_nodelay(true) {
283            log::warn!("Local server set_nodelay failed: {}", e);
284        }
285
286        let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
287        spawn_local_client_handler(stream, client_id, tx.clone());
288    }
289}
290
291/// Unix socket server accept loop for local interface.
292#[cfg(target_os = "linux")]
293fn unix_server_loop(
294    listener: std::os::unix::net::UnixListener,
295    name: String,
296    tx: EventSender,
297    next_id: Arc<AtomicU64>,
298    control: ListenerControl,
299) {
300    loop {
301        if control.should_stop() {
302            log::info!("[{}] Local Unix listener stopping", name);
303            return;
304        }
305
306        let stream_result = listener.accept().map(|(stream, _)| stream);
307        let stream = match stream_result {
308            Ok(s) => s,
309            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
310                thread::sleep(Duration::from_millis(50));
311                continue;
312            }
313            Err(e) => {
314                log::warn!("[{}] Local server accept failed: {}", name, e);
315                continue;
316            }
317        };
318
319        let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
320
321        // Convert UnixStream to a pair of read/write handles
322        let writer_stream = match stream.try_clone() {
323            Ok(s) => s,
324            Err(e) => {
325                log::warn!("Local server clone failed: {}", e);
326                continue;
327            }
328        };
329
330        let sleep_hold = android_client_sleep_hold();
331        let info = make_local_interface_info(client_id);
332        let writer: Box<dyn Writer> = Box::new(UnixLocalWriter {
333            stream: writer_stream,
334            sleep_hold: sleep_hold.clone(),
335        });
336
337        if tx
338            .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
339            .is_err()
340        {
341            return;
342        }
343
344        let client_tx = tx.clone();
345        thread::Builder::new()
346            .name(format!("local-unix-reader-{}", client_id.0))
347            .spawn(move || {
348                unix_reader_loop(stream, client_id, client_tx, sleep_hold);
349            })
350            .ok();
351    }
352}
353
354#[cfg(target_os = "linux")]
355struct UnixLocalWriter {
356    stream: std::os::unix::net::UnixStream,
357    sleep_hold: Option<ClientSleepHold>,
358}
359
360#[cfg(target_os = "linux")]
361impl Writer for UnixLocalWriter {
362    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
363        use std::io::Write;
364        if self
365            .sleep_hold
366            .as_ref()
367            .is_some_and(ClientSleepHold::should_drop_outbound)
368        {
369            log::debug!("TX paused for LocalInterface client, dropping outbound packet");
370            return Ok(());
371        }
372        self.stream.write_all(&hdlc::frame(data))
373    }
374}
375
376#[cfg(target_os = "linux")]
377fn unix_reader_loop(
378    mut stream: std::os::unix::net::UnixStream,
379    id: InterfaceId,
380    tx: EventSender,
381    sleep_hold: Option<ClientSleepHold>,
382) {
383    use std::io::Read;
384    let mut decoder = hdlc::Decoder::new();
385    let mut buf = [0u8; 4096];
386
387    loop {
388        match stream.read(&mut buf) {
389            Ok(0) => {
390                let _ = tx.send(Event::InterfaceDown(id));
391                return;
392            }
393            Ok(n) => {
394                if let Some(ref sleep_hold) = sleep_hold {
395                    sleep_hold.refresh();
396                }
397                for frame in decoder.feed(&buf[..n]) {
398                    if tx
399                        .send(Event::Frame {
400                            interface_id: id,
401                            data: frame,
402                            rssi: None,
403                            snr: None,
404                        })
405                        .is_err()
406                    {
407                        return;
408                    }
409                }
410            }
411            Err(_) => {
412                let _ = tx.send(Event::InterfaceDown(id));
413                return;
414            }
415        }
416    }
417}
418
419/// Spawn handler threads for a connected TCP local client.
420fn spawn_local_client_handler(stream: TcpStream, client_id: InterfaceId, tx: EventSender) {
421    let writer_stream = match stream.try_clone() {
422        Ok(s) => s,
423        Err(e) => {
424            log::warn!("Local server clone failed: {}", e);
425            return;
426        }
427    };
428
429    let sleep_hold = android_client_sleep_hold();
430    let info = make_local_interface_info(client_id);
431    let writer: Box<dyn Writer> = Box::new(LocalWriter {
432        stream: writer_stream,
433        sleep_hold: sleep_hold.clone(),
434    });
435
436    if tx
437        .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
438        .is_err()
439    {
440        return;
441    }
442
443    thread::Builder::new()
444        .name(format!("local-reader-{}", client_id.0))
445        .spawn(move || {
446            tcp_reader_loop(stream, client_id, tx, sleep_hold);
447        })
448        .ok();
449}
450
451fn tcp_reader_loop(
452    mut stream: TcpStream,
453    id: InterfaceId,
454    tx: EventSender,
455    sleep_hold: Option<ClientSleepHold>,
456) {
457    let mut decoder = hdlc::Decoder::new();
458    let mut buf = [0u8; 4096];
459
460    loop {
461        match stream.read(&mut buf) {
462            Ok(0) => {
463                log::info!("Local client {} disconnected", id.0);
464                let _ = tx.send(Event::InterfaceDown(id));
465                return;
466            }
467            Ok(n) => {
468                if let Some(ref sleep_hold) = sleep_hold {
469                    sleep_hold.refresh();
470                }
471                for frame in decoder.feed(&buf[..n]) {
472                    if tx
473                        .send(Event::Frame {
474                            interface_id: id,
475                            data: frame,
476                            rssi: None,
477                            snr: None,
478                        })
479                        .is_err()
480                    {
481                        return;
482                    }
483                }
484            }
485            Err(e) => {
486                log::warn!("Local client {} read error: {}", id.0, e);
487                let _ = tx.send(Event::InterfaceDown(id));
488                return;
489            }
490        }
491    }
492}
493
494fn make_local_interface_info(id: InterfaceId) -> InterfaceInfo {
495    InterfaceInfo {
496        id,
497        name: String::from("LocalInterface"),
498        mode: constants::MODE_FULL,
499        out_capable: true,
500        in_capable: true,
501        bitrate: Some(1_000_000_000), // 1 Gbps
502        airtime_profile: None,
503        announce_rate_target: None,
504        announce_rate_grace: 0,
505        announce_rate_penalty: 0.0,
506        announce_cap: constants::ANNOUNCE_CAP,
507        is_local_client: true,
508        wants_tunnel: false,
509        tunnel_id: None,
510        mtu: 65535,
511        ia_freq: 0.0,
512        ip_freq: 0.0,
513        op_freq: 0.0,
514        op_samples: 0,
515        started: 0.0,
516        ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
517    }
518}
519
520// ==================== LOCAL CLIENT ====================
521
522#[cfg(target_os = "linux")]
523enum LocalClientStream {
524    Unix(std::os::unix::net::UnixStream),
525    Tcp(TcpStream),
526}
527
528#[cfg(target_os = "linux")]
529impl LocalClientStream {
530    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
531        match self {
532            LocalClientStream::Unix(stream) => stream.read(buf),
533            LocalClientStream::Tcp(stream) => stream.read(buf),
534        }
535    }
536
537    fn writer(&self) -> io::Result<Box<dyn Writer>> {
538        match self {
539            LocalClientStream::Unix(stream) => Ok(Box::new(UnixLocalWriter {
540                stream: stream.try_clone()?,
541                sleep_hold: None,
542            })),
543            LocalClientStream::Tcp(stream) => Ok(Box::new(LocalWriter {
544                stream: stream.try_clone()?,
545                sleep_hold: None,
546            })),
547        }
548    }
549}
550
551#[cfg(not(target_os = "linux"))]
552type LocalClientStream = TcpStream;
553
554#[cfg(not(target_os = "linux"))]
555fn local_client_stream_writer(stream: &LocalClientStream) -> io::Result<Box<dyn Writer>> {
556    Ok(Box::new(LocalWriter {
557        stream: stream.try_clone()?,
558        sleep_hold: None,
559    }))
560}
561
562#[cfg(target_os = "linux")]
563fn local_client_stream_writer(stream: &LocalClientStream) -> io::Result<Box<dyn Writer>> {
564    stream.writer()
565}
566
567fn try_connect_tcp(config: &LocalClientConfig) -> io::Result<TcpStream> {
568    let addr = format!("127.0.0.1:{}", config.port);
569    let stream = TcpStream::connect(&addr)?;
570    stream.set_nodelay(true)?;
571    log::info!(
572        "[{}] Connected to shared instance via TCP {}",
573        config.name,
574        addr
575    );
576    Ok(stream)
577}
578
579#[cfg(target_os = "linux")]
580fn try_connect_local_client(config: &LocalClientConfig) -> io::Result<LocalClientStream> {
581    match unix_socket::try_connect_unix(&config.instance_name) {
582        Ok(stream) => {
583            log::info!(
584                "[{}] Connected to shared instance via Unix socket: rns/{}",
585                config.name,
586                config.instance_name
587            );
588            Ok(LocalClientStream::Unix(stream))
589        }
590        Err(e) => {
591            log::info!(
592                "[{}] Unix socket connect failed ({}), trying TCP",
593                config.name,
594                e
595            );
596            try_connect_tcp(config).map(LocalClientStream::Tcp)
597        }
598    }
599}
600
601#[cfg(not(target_os = "linux"))]
602fn try_connect_local_client(config: &LocalClientConfig) -> io::Result<LocalClientStream> {
603    try_connect_tcp(config)
604}
605
606fn reconnect_local_client(config: &LocalClientConfig, tx: &EventSender) -> LocalClientStream {
607    loop {
608        thread::sleep(config.reconnect_wait);
609        match try_connect_local_client(config) {
610            Ok(stream) => match local_client_stream_writer(&stream) {
611                Ok(writer) => {
612                    let _ = maybe_spawn_local_client_phy_keepalive(
613                        &stream,
614                        config.interface_id,
615                        &config.name,
616                    );
617                    let _ = tx.send(Event::InterfaceUp(config.interface_id, Some(writer), None));
618                    return stream;
619                }
620                Err(e) => {
621                    log::warn!("[{}] failed to clone reconnect writer: {}", config.name, e);
622                }
623            },
624            Err(e) => {
625                log::warn!("[{}] reconnect failed: {}", config.name, e);
626            }
627        }
628    }
629}
630
631fn local_client_reader_loop(
632    mut stream: LocalClientStream,
633    config: LocalClientConfig,
634    tx: EventSender,
635) {
636    let id = config.interface_id;
637    let mut decoder = hdlc::Decoder::new();
638    let mut buf = [0u8; 4096];
639
640    loop {
641        match stream.read(&mut buf) {
642            Ok(0) => {
643                log::warn!("[{}] shared connection closed", config.name);
644                let _ = tx.send(Event::InterfaceDown(id));
645                stream = reconnect_local_client(&config, &tx);
646                decoder = hdlc::Decoder::new();
647            }
648            Ok(n) => {
649                for frame in decoder.feed(&buf[..n]) {
650                    if tx
651                        .send(Event::Frame {
652                            interface_id: id,
653                            data: frame,
654                            rssi: None,
655                            snr: None,
656                        })
657                        .is_err()
658                    {
659                        return;
660                    }
661                }
662            }
663            Err(e) => {
664                log::warn!("[{}] shared read error: {}", config.name, e);
665                let _ = tx.send(Event::InterfaceDown(id));
666                stream = reconnect_local_client(&config, &tx);
667                decoder = hdlc::Decoder::new();
668            }
669        }
670    }
671}
672
673/// Start a local client (connect to shared instance).
674/// Tries Unix socket first on Linux, falls back to TCP.
675/// Returns the writer for the driver.
676pub fn start_client(config: LocalClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
677    let id = config.interface_id;
678    let stream = try_connect_local_client(&config)?;
679    let writer = local_client_stream_writer(&stream)?;
680    maybe_spawn_local_client_phy_keepalive(&stream, id, &config.name)?;
681
682    let _ = tx.send(Event::InterfaceUp(id, None, None));
683
684    thread::Builder::new()
685        .name(format!("local-client-reader-{}", id.0))
686        .spawn(move || {
687            local_client_reader_loop(stream, config, tx);
688        })?;
689
690    Ok(writer)
691}
692
693// --- Factory implementations ---
694
695use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
696use std::collections::HashMap;
697
698/// Factory for `LocalServerInterface`.
699pub struct LocalServerFactory;
700
701impl InterfaceFactory for LocalServerFactory {
702    fn type_name(&self) -> &str {
703        "LocalServerInterface"
704    }
705
706    fn parse_config(
707        &self,
708        _name: &str,
709        id: InterfaceId,
710        params: &HashMap<String, String>,
711    ) -> Result<Box<dyn InterfaceConfigData>, String> {
712        let instance_name = params
713            .get("instance_name")
714            .cloned()
715            .unwrap_or_else(|| "default".into());
716        let port = params
717            .get("port")
718            .and_then(|v| v.parse().ok())
719            .unwrap_or(37428);
720
721        Ok(Box::new(LocalServerConfig {
722            instance_name,
723            port,
724            interface_id: id,
725        }))
726    }
727
728    fn start(
729        &self,
730        config: Box<dyn InterfaceConfigData>,
731        ctx: StartContext,
732    ) -> std::io::Result<StartResult> {
733        let server_config = *config
734            .into_any()
735            .downcast::<LocalServerConfig>()
736            .map_err(|_| {
737                std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
738            })?;
739
740        let control = start_server(server_config, ctx.tx, ctx.next_dynamic_id)?;
741        Ok(StartResult::Listener {
742            control: Some(control),
743        })
744    }
745}
746
747/// Factory for `LocalClientInterface`.
748pub struct LocalClientFactory;
749
750impl InterfaceFactory for LocalClientFactory {
751    fn type_name(&self) -> &str {
752        "LocalClientInterface"
753    }
754
755    fn parse_config(
756        &self,
757        _name: &str,
758        id: InterfaceId,
759        params: &HashMap<String, String>,
760    ) -> Result<Box<dyn InterfaceConfigData>, String> {
761        let instance_name = params
762            .get("instance_name")
763            .cloned()
764            .unwrap_or_else(|| "default".into());
765        let port = params
766            .get("port")
767            .and_then(|v| v.parse().ok())
768            .unwrap_or(37428);
769
770        Ok(Box::new(LocalClientConfig {
771            instance_name,
772            port,
773            interface_id: id,
774            ..LocalClientConfig::default()
775        }))
776    }
777
778    fn start(
779        &self,
780        config: Box<dyn InterfaceConfigData>,
781        ctx: StartContext,
782    ) -> std::io::Result<StartResult> {
783        let client_config = *config
784            .into_any()
785            .downcast::<LocalClientConfig>()
786            .map_err(|_| {
787                std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
788            })?;
789
790        let id = client_config.interface_id;
791        let name = client_config.name.clone();
792        let info = InterfaceInfo {
793            id,
794            name,
795            mode: ctx.mode,
796            out_capable: true,
797            in_capable: true,
798            bitrate: Some(1_000_000_000),
799            airtime_profile: None,
800            announce_rate_target: None,
801            announce_rate_grace: 0,
802            announce_rate_penalty: 0.0,
803            announce_cap: rns_core::constants::ANNOUNCE_CAP,
804            is_local_client: false,
805            wants_tunnel: false,
806            tunnel_id: None,
807            mtu: 65535,
808            ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
809            ia_freq: 0.0,
810            ip_freq: 0.0,
811            op_freq: 0.0,
812            op_samples: 0,
813            started: crate::time::now(),
814        };
815
816        let writer = start_client(client_config, ctx.tx)?;
817
818        Ok(StartResult::Simple {
819            id,
820            info,
821            writer,
822            interface_type_name: "LocalInterface".to_string(),
823        })
824    }
825}
826
827#[cfg(test)]
828mod tests {
829    use super::*;
830    use std::sync::mpsc;
831    use std::sync::mpsc::RecvTimeoutError;
832
833    #[cfg(target_os = "linux")]
834    type TestClient = std::os::unix::net::UnixStream;
835
836    #[cfg(not(target_os = "linux"))]
837    type TestClient = TcpStream;
838
839    fn connect_test_client(instance_name: &str, _port: u16) -> TestClient {
840        #[cfg(target_os = "linux")]
841        {
842            unix_socket::try_connect_unix(instance_name).unwrap()
843        }
844
845        #[cfg(not(target_os = "linux"))]
846        {
847            TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap()
848        }
849    }
850
851    fn find_free_port() -> u16 {
852        TcpListener::bind("127.0.0.1:0")
853            .unwrap()
854            .local_addr()
855            .unwrap()
856            .port()
857    }
858
859    #[test]
860    fn server_bind_tcp() {
861        let port = find_free_port();
862        let instance_name = "test-bind".to_string();
863        let (tx, _rx) = crate::event::channel();
864        let next_id = Arc::new(AtomicU64::new(7000));
865
866        let config = LocalServerConfig {
867            instance_name: instance_name.clone(),
868            port,
869            interface_id: InterfaceId(70),
870        };
871
872        // We force TCP by using a unique instance name that won't conflict
873        // with any existing Unix socket
874        start_server(config, tx, next_id).unwrap();
875        thread::sleep(Duration::from_millis(50));
876
877        connect_test_client(&instance_name, port);
878    }
879
880    #[test]
881    fn server_accept_client() {
882        let port = find_free_port();
883        let instance_name = "test-accept".to_string();
884        let (tx, rx) = crate::event::channel();
885        let next_id = Arc::new(AtomicU64::new(7100));
886
887        let config = LocalServerConfig {
888            instance_name: instance_name.clone(),
889            port,
890            interface_id: InterfaceId(71),
891        };
892
893        start_server(config, tx, next_id).unwrap();
894        thread::sleep(Duration::from_millis(50));
895
896        connect_test_client(&instance_name, port);
897
898        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
899        match event {
900            Event::InterfaceUp(id, writer, info) => {
901                assert_eq!(id, InterfaceId(7100));
902                assert!(writer.is_some());
903                assert!(info.is_some());
904            }
905            other => panic!("expected InterfaceUp, got {:?}", other),
906        }
907    }
908
909    #[test]
910    fn server_stop_prevents_new_accepts() {
911        let port = find_free_port();
912        let instance_name = "test-stop".to_string();
913        let (tx, rx) = crate::event::channel();
914        let next_id = Arc::new(AtomicU64::new(7150));
915
916        let config = LocalServerConfig {
917            instance_name: instance_name.clone(),
918            port,
919            interface_id: InterfaceId(71),
920        };
921
922        let control = start_server(config, tx, next_id).unwrap();
923        thread::sleep(Duration::from_millis(50));
924        control.request_stop();
925        thread::sleep(Duration::from_millis(120));
926
927        #[cfg(target_os = "linux")]
928        let connect_result = unix_socket::try_connect_unix(&instance_name);
929
930        #[cfg(not(target_os = "linux"))]
931        let connect_result = TcpStream::connect(format!("127.0.0.1:{}", port));
932
933        if let Ok(stream) = connect_result {
934            drop(stream);
935        }
936
937        match rx.recv_timeout(Duration::from_millis(200)) {
938            Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => {}
939            other => panic!("expected no InterfaceUp after server stop, got {:?}", other),
940        }
941    }
942
943    #[test]
944    fn client_send_receive() {
945        let port = find_free_port();
946        let (server_tx, server_rx) = crate::event::channel();
947        let next_id = Arc::new(AtomicU64::new(7200));
948
949        let server_config = LocalServerConfig {
950            instance_name: "test-sr".into(),
951            port,
952            interface_id: InterfaceId(72),
953        };
954
955        start_server(server_config, server_tx, next_id).unwrap();
956        thread::sleep(Duration::from_millis(50));
957
958        // Connect client
959        let (client_tx, client_rx) = crate::event::channel();
960        let client_config = LocalClientConfig {
961            name: "test-client".into(),
962            instance_name: "test-sr".into(),
963            port,
964            interface_id: InterfaceId(73),
965            reconnect_wait: Duration::from_secs(1),
966        };
967
968        let mut client_writer = start_client(client_config, client_tx).unwrap();
969
970        // Get server-side InterfaceUp
971        let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
972        let mut server_writer = match event {
973            Event::InterfaceUp(_, Some(w), Some(info)) => {
974                assert!(info.is_local_client);
975                w
976            }
977            other => panic!("expected InterfaceUp with writer, got {:?}", other),
978        };
979
980        // Get client-side InterfaceUp
981        let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
982        match event {
983            Event::InterfaceUp(id, _, _) => assert_eq!(id, InterfaceId(73)),
984            other => panic!("expected InterfaceUp, got {:?}", other),
985        }
986
987        // Client sends to server
988        let payload: Vec<u8> = (0..32).collect();
989        client_writer.send_frame(&payload).unwrap();
990
991        let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
992        match event {
993            Event::Frame { data, .. } => assert_eq!(data, payload),
994            other => panic!("expected Frame, got {:?}", other),
995        }
996
997        // Server sends to client
998        let payload2: Vec<u8> = (100..132).collect();
999        server_writer.send_frame(&payload2).unwrap();
1000
1001        let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1002        match event {
1003            Event::Frame { data, .. } => assert_eq!(data, payload2),
1004            other => panic!("expected Frame, got {:?}", other),
1005        }
1006    }
1007
1008    #[test]
1009    fn sleep_hold_drops_outbound_after_timeout_and_refresh_restores() {
1010        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1011        let addr = listener.local_addr().unwrap();
1012        let mut client = TcpStream::connect(addr).unwrap();
1013        let (server_stream, _) = listener.accept().unwrap();
1014        client
1015            .set_read_timeout(Some(Duration::from_millis(200)))
1016            .unwrap();
1017
1018        let sleep_hold = ClientSleepHold::new(Duration::from_millis(30));
1019        let mut writer = LocalWriter {
1020            stream: server_stream,
1021            sleep_hold: Some(sleep_hold.clone()),
1022        };
1023
1024        writer.send_frame(b"live").unwrap();
1025        let mut buf = [0u8; 16];
1026        let n = client.read(&mut buf).unwrap();
1027        assert!(n > 0);
1028
1029        thread::sleep(Duration::from_millis(50));
1030        writer.send_frame(b"drop").unwrap();
1031        let err = client.read(&mut buf).unwrap_err();
1032        assert!(
1033            matches!(
1034                err.kind(),
1035                io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut
1036            ),
1037            "paused writer should not emit bytes, got {err:?}"
1038        );
1039
1040        sleep_hold.refresh();
1041        writer.send_frame(b"again").unwrap();
1042        let n = client.read(&mut buf).unwrap();
1043        assert!(n > 0);
1044    }
1045
1046    #[test]
1047    fn tcp_reader_refreshes_sleep_hold_on_inbound_data() {
1048        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1049        let addr = listener.local_addr().unwrap();
1050        let mut client = TcpStream::connect(addr).unwrap();
1051        let (server_stream, _) = listener.accept().unwrap();
1052        let writer_stream = server_stream.try_clone().unwrap();
1053        client
1054            .set_read_timeout(Some(Duration::from_millis(200)))
1055            .unwrap();
1056
1057        let sleep_hold = ClientSleepHold::new(Duration::from_millis(30));
1058        let reader_sleep_hold = sleep_hold.clone();
1059        let (tx, rx) = crate::event::channel();
1060        thread::spawn(move || {
1061            tcp_reader_loop(server_stream, InterfaceId(98), tx, Some(reader_sleep_hold));
1062        });
1063
1064        let mut writer = LocalWriter {
1065            stream: writer_stream,
1066            sleep_hold: Some(sleep_hold),
1067        };
1068
1069        thread::sleep(Duration::from_millis(50));
1070        writer.send_frame(b"drop").unwrap();
1071        let mut buf = [0u8; 32];
1072        let err = client.read(&mut buf).unwrap_err();
1073        assert!(
1074            matches!(
1075                err.kind(),
1076                io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut
1077            ),
1078            "paused writer should not emit bytes, got {err:?}"
1079        );
1080
1081        let inbound = vec![0x42; constants::HEADER_MINSIZE];
1082        client.write_all(&hdlc::frame(&inbound)).unwrap();
1083        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1084        match event {
1085            Event::Frame {
1086                interface_id,
1087                data,
1088                rssi: _,
1089                snr: _,
1090            } => {
1091                assert_eq!(interface_id, InterfaceId(98));
1092                assert_eq!(data, inbound);
1093            }
1094            other => panic!("expected Frame, got {:?}", other),
1095        }
1096
1097        writer.send_frame(b"again").unwrap();
1098        let n = client.read(&mut buf).unwrap();
1099        assert!(n > 0);
1100    }
1101
1102    #[test]
1103    fn physical_keepalive_loop_sends_empty_hdlc_frame() {
1104        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1105        let addr = listener.local_addr().unwrap();
1106        let mut client = TcpStream::connect(addr).unwrap();
1107        let (server_stream, _) = listener.accept().unwrap();
1108        client
1109            .set_read_timeout(Some(Duration::from_millis(500)))
1110            .unwrap();
1111
1112        let writer: Box<dyn Writer> = Box::new(LocalWriter {
1113            stream: server_stream,
1114            sleep_hold: None,
1115        });
1116        spawn_physical_keepalive_loop(
1117            writer,
1118            InterfaceId(99),
1119            "test-local".into(),
1120            Duration::from_millis(10),
1121        );
1122
1123        let mut buf = [0u8; 2];
1124        client.read_exact(&mut buf).unwrap();
1125        assert_eq!(buf, [0x7E, 0x7E]);
1126    }
1127
1128    #[test]
1129    fn multiple_local_clients() {
1130        let port = find_free_port();
1131        let instance_name = "test-multi".to_string();
1132        let (tx, rx) = crate::event::channel();
1133        let next_id = Arc::new(AtomicU64::new(7300));
1134
1135        let config = LocalServerConfig {
1136            instance_name: instance_name.clone(),
1137            port,
1138            interface_id: InterfaceId(74),
1139        };
1140
1141        start_server(config, tx, next_id).unwrap();
1142        thread::sleep(Duration::from_millis(50));
1143
1144        let _client1 = connect_test_client(&instance_name, port);
1145        let _client2 = connect_test_client(&instance_name, port);
1146
1147        let mut ids = Vec::new();
1148        for _ in 0..2 {
1149            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1150            match event {
1151                Event::InterfaceUp(id, _, _) => ids.push(id),
1152                other => panic!("expected InterfaceUp, got {:?}", other),
1153            }
1154        }
1155
1156        assert_eq!(ids.len(), 2);
1157        assert_ne!(ids[0], ids[1]);
1158    }
1159
1160    #[test]
1161    fn client_disconnect_detected() {
1162        let port = find_free_port();
1163        let instance_name = "test-dc".to_string();
1164        let (tx, rx) = crate::event::channel();
1165        let next_id = Arc::new(AtomicU64::new(7400));
1166
1167        let config = LocalServerConfig {
1168            instance_name: instance_name.clone(),
1169            port,
1170            interface_id: InterfaceId(75),
1171        };
1172
1173        start_server(config, tx, next_id).unwrap();
1174        thread::sleep(Duration::from_millis(50));
1175
1176        #[cfg(target_os = "linux")]
1177        let client = unix_socket::try_connect_unix(&instance_name).unwrap();
1178
1179        #[cfg(not(target_os = "linux"))]
1180        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1181
1182        // Drain InterfaceUp
1183        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1184
1185        // Disconnect
1186        drop(client);
1187
1188        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1189        assert!(
1190            matches!(event, Event::InterfaceDown(_)),
1191            "expected InterfaceDown, got {:?}",
1192            event
1193        );
1194    }
1195
1196    #[test]
1197    fn client_reconnects_after_tcp_restart() {
1198        let port = find_free_port();
1199        let addr = format!("127.0.0.1:{}", port);
1200        let instance_name = format!("test-reconnect-{}", port);
1201
1202        let listener1 = TcpListener::bind(&addr).unwrap();
1203        let (accepted1_tx, accepted1_rx) = mpsc::channel();
1204        thread::spawn(move || {
1205            let (stream, _) = listener1.accept().unwrap();
1206            accepted1_tx.send(stream).unwrap();
1207        });
1208
1209        let (client_tx, client_rx) = crate::event::channel();
1210        let client_config = LocalClientConfig {
1211            name: "test-client".into(),
1212            instance_name,
1213            port,
1214            interface_id: InterfaceId(76),
1215            reconnect_wait: Duration::from_millis(50),
1216        };
1217
1218        let _writer = start_client(client_config, client_tx).unwrap();
1219        let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1220        assert!(matches!(
1221            event,
1222            Event::InterfaceUp(InterfaceId(76), None, None)
1223        ));
1224
1225        let stream1 = accepted1_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1226        drop(stream1);
1227
1228        let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1229        assert!(matches!(event, Event::InterfaceDown(InterfaceId(76))));
1230
1231        let listener2 = TcpListener::bind(&addr).unwrap();
1232        let (accepted2_tx, accepted2_rx) = mpsc::channel();
1233        thread::spawn(move || {
1234            let (stream, _) = listener2.accept().unwrap();
1235            accepted2_tx.send(stream).unwrap();
1236        });
1237
1238        let mut reconnected_writer = None;
1239        for _ in 0..10 {
1240            let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1241            match event {
1242                Event::InterfaceUp(InterfaceId(76), writer, None) if writer.is_some() => {
1243                    reconnected_writer = writer;
1244                    break;
1245                }
1246                _ => {}
1247            }
1248        }
1249
1250        let mut reconnected_writer = reconnected_writer.expect("missing reconnect writer");
1251        let mut stream2 = accepted2_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1252        reconnected_writer.send_frame(b"client->server").unwrap();
1253        stream2
1254            .set_read_timeout(Some(Duration::from_secs(2)))
1255            .unwrap();
1256        let mut buf = [0u8; 64];
1257        let n = stream2.read(&mut buf).unwrap();
1258        assert!(n > 0, "expected bytes from refreshed writer");
1259    }
1260
1261    #[cfg(target_os = "linux")]
1262    #[test]
1263    fn unix_abstract_socket_helpers_work() {
1264        let instance_name = format!(
1265            "test-abstract-{}",
1266            std::time::SystemTime::now()
1267                .duration_since(std::time::UNIX_EPOCH)
1268                .unwrap()
1269                .as_nanos()
1270        );
1271
1272        let listener = unix_socket::try_bind_unix(&instance_name).unwrap();
1273        let accept_thread = thread::spawn(move || listener.accept().unwrap().0);
1274
1275        let mut client = unix_socket::try_connect_unix(&instance_name).unwrap();
1276        let mut server = accept_thread.join().unwrap();
1277
1278        client.write_all(b"ping").unwrap();
1279        let mut buf = [0u8; 4];
1280        server.read_exact(&mut buf).unwrap();
1281        assert_eq!(&buf, b"ping");
1282    }
1283}