Skip to main content

rns_net/interface/
local.rs

1//! Local shared instance interface.
2//!
3//! Provides communication between the shared RNS instance and local client
4//! programs. Uses Unix abstract sockets on Linux, TCP on other platforms.
5//! HDLC framing over the connection (same as TCP interfaces).
6//!
7//! Two modes:
8//! - `LocalServer`: The shared instance binds and accepts client connections.
9//! - `LocalClient`: Connects to an existing shared instance.
10
11use std::io::{self, Read, Write};
12use std::net::{TcpListener, TcpStream};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::{Arc, Mutex};
15use std::thread;
16use std::time::{Duration, Instant};
17
18use rns_core::constants;
19use rns_core::transport::types::{InterfaceId, InterfaceInfo};
20
21use crate::event::{Event, EventSender};
22use crate::hdlc;
23use crate::interface::{ListenerControl, Writer};
24
25#[cfg(target_os = "android")]
26const CLIENT_SLEEP_PAUSE_TIMEOUT: Duration = Duration::from_secs(12);
27#[cfg(target_os = "android")]
28const PHY_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(5);
29
30/// Configuration for a Local server (shared instance).
31#[derive(Debug, Clone)]
32pub struct LocalServerConfig {
33    pub instance_name: String,
34    pub port: u16,
35    pub interface_id: InterfaceId,
36}
37
38impl Default for LocalServerConfig {
39    fn default() -> Self {
40        LocalServerConfig {
41            instance_name: "default".into(),
42            port: 37428,
43            interface_id: InterfaceId(0),
44        }
45    }
46}
47
48/// Configuration for a Local client (connecting to shared instance).
49#[derive(Debug, Clone)]
50pub struct LocalClientConfig {
51    pub name: String,
52    pub instance_name: String,
53    pub port: u16,
54    pub interface_id: InterfaceId,
55    pub reconnect_wait: Duration,
56}
57
58impl Default for LocalClientConfig {
59    fn default() -> Self {
60        LocalClientConfig {
61            name: "Local shared instance".into(),
62            instance_name: "default".into(),
63            port: 37428,
64            interface_id: InterfaceId(0),
65            reconnect_wait: Duration::from_secs(8),
66        }
67    }
68}
69
70/// HDLC writer over a TCP or Unix stream.
71struct LocalWriter {
72    stream: TcpStream,
73    sleep_hold: Option<ClientSleepHold>,
74}
75
76impl Writer for LocalWriter {
77    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
78        if self
79            .sleep_hold
80            .as_ref()
81            .is_some_and(ClientSleepHold::should_drop_outbound)
82        {
83            log::debug!("TX paused for LocalInterface client, dropping outbound packet");
84            return Ok(());
85        }
86        self.stream.write_all(&hdlc::frame(data))
87    }
88}
89
90#[derive(Clone)]
91struct ClientSleepHold {
92    timeout: Duration,
93    deadline: Arc<Mutex<Instant>>,
94}
95
96impl ClientSleepHold {
97    #[cfg_attr(not(any(target_os = "android", test)), allow(dead_code))]
98    fn new(timeout: Duration) -> Self {
99        Self {
100            timeout,
101            deadline: Arc::new(Mutex::new(Instant::now() + timeout)),
102        }
103    }
104
105    fn refresh(&self) {
106        *lock_or_recover(&self.deadline) = Instant::now() + self.timeout;
107    }
108
109    fn should_drop_outbound(&self) -> bool {
110        Instant::now() > *lock_or_recover(&self.deadline)
111    }
112}
113
114fn lock_or_recover<T>(mutex: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
115    match mutex.lock() {
116        Ok(guard) => guard,
117        Err(poisoned) => poisoned.into_inner(),
118    }
119}
120
121fn android_client_sleep_hold() -> Option<ClientSleepHold> {
122    #[cfg(target_os = "android")]
123    {
124        Some(ClientSleepHold::new(CLIENT_SLEEP_PAUSE_TIMEOUT))
125    }
126    #[cfg(not(target_os = "android"))]
127    {
128        None
129    }
130}
131
132#[cfg_attr(not(any(target_os = "android", test)), allow(dead_code))]
133fn spawn_physical_keepalive_loop(
134    mut writer: Box<dyn Writer>,
135    interface_id: InterfaceId,
136    interface_name: String,
137    interval: Duration,
138) {
139    thread::Builder::new()
140        .name(format!("local-phy-keepalive-{}", interface_id.0))
141        .spawn(move || loop {
142            thread::sleep(interval);
143            if let Err(err) = writer.send_frame(&[]) {
144                log::debug!(
145                    "[{}:{}] LocalInterface physical keepalive stopped: {}",
146                    interface_name,
147                    interface_id.0,
148                    err
149                );
150                return;
151            }
152        })
153        .ok();
154}
155
156fn maybe_spawn_local_client_phy_keepalive(
157    stream: &LocalClientStream,
158    interface_id: InterfaceId,
159    interface_name: &str,
160) -> io::Result<()> {
161    #[cfg(target_os = "android")]
162    {
163        let writer = local_client_stream_writer(stream)?;
164        spawn_physical_keepalive_loop(
165            writer,
166            interface_id,
167            interface_name.to_string(),
168            PHY_KEEPALIVE_INTERVAL,
169        );
170    }
171
172    #[cfg(not(target_os = "android"))]
173    {
174        let _ = (stream, interface_id, interface_name);
175    }
176
177    Ok(())
178}
179
180#[cfg(target_os = "linux")]
181mod unix_socket {
182    use std::io;
183    use std::os::linux::net::SocketAddrExt;
184    use std::os::unix::net::{SocketAddr, UnixListener, UnixStream};
185
186    fn abstract_addr(instance_name: &str) -> io::Result<SocketAddr> {
187        SocketAddr::from_abstract_name(format!("rns/{}", instance_name))
188    }
189
190    /// Try to bind a Unix abstract socket with the given instance name.
191    pub fn try_bind_unix(instance_name: &str) -> io::Result<UnixListener> {
192        let addr = abstract_addr(instance_name)?;
193        UnixListener::bind_addr(&addr)
194    }
195
196    /// Try to connect to a Unix abstract socket.
197    pub fn try_connect_unix(instance_name: &str) -> io::Result<UnixStream> {
198        let addr = abstract_addr(instance_name)?;
199        UnixStream::connect_addr(&addr)
200    }
201}
202
203// ==================== LOCAL SERVER ====================
204
205/// Start a local server (shared instance).
206/// Tries Unix abstract socket first on Linux, falls back to TCP.
207/// Spawns an acceptor thread. Each client gets a dynamically allocated InterfaceId.
208pub fn start_server(
209    config: LocalServerConfig,
210    tx: EventSender,
211    next_id: Arc<AtomicU64>,
212) -> io::Result<ListenerControl> {
213    let control = ListenerControl::new();
214    // Try Unix socket first on Linux
215    #[cfg(target_os = "linux")]
216    {
217        match unix_socket::try_bind_unix(&config.instance_name) {
218            Ok(listener) => {
219                listener.set_nonblocking(true)?;
220                log::info!(
221                    "Local server using Unix socket: rns/{}",
222                    config.instance_name
223                );
224                let name = format!("rns/{}", config.instance_name);
225                let listener_control = control.clone();
226                thread::Builder::new()
227                    .name("local-server".into())
228                    .spawn(move || {
229                        unix_server_loop(listener, name, tx, next_id, listener_control);
230                    })?;
231                return Ok(control);
232            }
233            Err(e) => {
234                log::info!("Unix socket bind failed ({}), falling back to TCP", e);
235            }
236        }
237    }
238
239    // Fallback: TCP on localhost
240    let addr = format!("127.0.0.1:{}", config.port);
241    let listener = TcpListener::bind(&addr)?;
242    listener.set_nonblocking(true)?;
243
244    log::info!("Local server listening on TCP {}", addr);
245
246    let listener_control = control.clone();
247    thread::Builder::new()
248        .name("local-server".into())
249        .spawn(move || {
250            tcp_server_loop(listener, tx, next_id, listener_control);
251        })?;
252
253    Ok(control)
254}
255
256/// TCP server accept loop for local interface.
257fn tcp_server_loop(
258    listener: TcpListener,
259    tx: EventSender,
260    next_id: Arc<AtomicU64>,
261    control: ListenerControl,
262) {
263    loop {
264        if control.should_stop() {
265            log::info!("Local TCP listener stopping");
266            return;
267        }
268
269        let stream_result = listener.accept().map(|(stream, _)| stream);
270        let stream = match stream_result {
271            Ok(s) => s,
272            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
273                thread::sleep(Duration::from_millis(50));
274                continue;
275            }
276            Err(e) => {
277                log::warn!("Local server accept failed: {}", e);
278                continue;
279            }
280        };
281
282        if let Err(e) = stream.set_nodelay(true) {
283            log::warn!("Local server set_nodelay failed: {}", e);
284        }
285
286        let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
287        spawn_local_client_handler(stream, client_id, tx.clone());
288    }
289}
290
291/// Unix socket server accept loop for local interface.
292#[cfg(target_os = "linux")]
293fn unix_server_loop(
294    listener: std::os::unix::net::UnixListener,
295    name: String,
296    tx: EventSender,
297    next_id: Arc<AtomicU64>,
298    control: ListenerControl,
299) {
300    loop {
301        if control.should_stop() {
302            log::info!("[{}] Local Unix listener stopping", name);
303            return;
304        }
305
306        let stream_result = listener.accept().map(|(stream, _)| stream);
307        let stream = match stream_result {
308            Ok(s) => s,
309            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
310                thread::sleep(Duration::from_millis(50));
311                continue;
312            }
313            Err(e) => {
314                log::warn!("[{}] Local server accept failed: {}", name, e);
315                continue;
316            }
317        };
318
319        let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
320
321        // Convert UnixStream to a pair of read/write handles
322        let writer_stream = match stream.try_clone() {
323            Ok(s) => s,
324            Err(e) => {
325                log::warn!("Local server clone failed: {}", e);
326                continue;
327            }
328        };
329
330        let sleep_hold = android_client_sleep_hold();
331        let info = make_local_interface_info(client_id);
332        let writer: Box<dyn Writer> = Box::new(UnixLocalWriter {
333            stream: writer_stream,
334            sleep_hold: sleep_hold.clone(),
335        });
336
337        if tx
338            .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
339            .is_err()
340        {
341            return;
342        }
343
344        let client_tx = tx.clone();
345        thread::Builder::new()
346            .name(format!("local-unix-reader-{}", client_id.0))
347            .spawn(move || {
348                unix_reader_loop(stream, client_id, client_tx, sleep_hold);
349            })
350            .ok();
351    }
352}
353
354#[cfg(target_os = "linux")]
355struct UnixLocalWriter {
356    stream: std::os::unix::net::UnixStream,
357    sleep_hold: Option<ClientSleepHold>,
358}
359
360#[cfg(target_os = "linux")]
361impl Writer for UnixLocalWriter {
362    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
363        use std::io::Write;
364        if self
365            .sleep_hold
366            .as_ref()
367            .is_some_and(ClientSleepHold::should_drop_outbound)
368        {
369            log::debug!("TX paused for LocalInterface client, dropping outbound packet");
370            return Ok(());
371        }
372        self.stream.write_all(&hdlc::frame(data))
373    }
374}
375
376#[cfg(target_os = "linux")]
377fn unix_reader_loop(
378    mut stream: std::os::unix::net::UnixStream,
379    id: InterfaceId,
380    tx: EventSender,
381    sleep_hold: Option<ClientSleepHold>,
382) {
383    use std::io::Read;
384    let mut decoder = hdlc::Decoder::new();
385    let mut buf = [0u8; 4096];
386
387    loop {
388        match stream.read(&mut buf) {
389            Ok(0) => {
390                let _ = tx.send(Event::InterfaceDown(id));
391                return;
392            }
393            Ok(n) => {
394                if let Some(ref sleep_hold) = sleep_hold {
395                    sleep_hold.refresh();
396                }
397                for frame in decoder.feed(&buf[..n]) {
398                    if tx
399                        .send(Event::Frame {
400                            interface_id: id,
401                            data: frame,
402                        })
403                        .is_err()
404                    {
405                        return;
406                    }
407                }
408            }
409            Err(_) => {
410                let _ = tx.send(Event::InterfaceDown(id));
411                return;
412            }
413        }
414    }
415}
416
417/// Spawn handler threads for a connected TCP local client.
418fn spawn_local_client_handler(stream: TcpStream, client_id: InterfaceId, tx: EventSender) {
419    let writer_stream = match stream.try_clone() {
420        Ok(s) => s,
421        Err(e) => {
422            log::warn!("Local server clone failed: {}", e);
423            return;
424        }
425    };
426
427    let sleep_hold = android_client_sleep_hold();
428    let info = make_local_interface_info(client_id);
429    let writer: Box<dyn Writer> = Box::new(LocalWriter {
430        stream: writer_stream,
431        sleep_hold: sleep_hold.clone(),
432    });
433
434    if tx
435        .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
436        .is_err()
437    {
438        return;
439    }
440
441    thread::Builder::new()
442        .name(format!("local-reader-{}", client_id.0))
443        .spawn(move || {
444            tcp_reader_loop(stream, client_id, tx, sleep_hold);
445        })
446        .ok();
447}
448
449fn tcp_reader_loop(
450    mut stream: TcpStream,
451    id: InterfaceId,
452    tx: EventSender,
453    sleep_hold: Option<ClientSleepHold>,
454) {
455    let mut decoder = hdlc::Decoder::new();
456    let mut buf = [0u8; 4096];
457
458    loop {
459        match stream.read(&mut buf) {
460            Ok(0) => {
461                log::info!("Local client {} disconnected", id.0);
462                let _ = tx.send(Event::InterfaceDown(id));
463                return;
464            }
465            Ok(n) => {
466                if let Some(ref sleep_hold) = sleep_hold {
467                    sleep_hold.refresh();
468                }
469                for frame in decoder.feed(&buf[..n]) {
470                    if tx
471                        .send(Event::Frame {
472                            interface_id: id,
473                            data: frame,
474                        })
475                        .is_err()
476                    {
477                        return;
478                    }
479                }
480            }
481            Err(e) => {
482                log::warn!("Local client {} read error: {}", id.0, e);
483                let _ = tx.send(Event::InterfaceDown(id));
484                return;
485            }
486        }
487    }
488}
489
490fn make_local_interface_info(id: InterfaceId) -> InterfaceInfo {
491    InterfaceInfo {
492        id,
493        name: String::from("LocalInterface"),
494        mode: constants::MODE_FULL,
495        out_capable: true,
496        in_capable: true,
497        bitrate: Some(1_000_000_000), // 1 Gbps
498        airtime_profile: None,
499        announce_rate_target: None,
500        announce_rate_grace: 0,
501        announce_rate_penalty: 0.0,
502        announce_cap: constants::ANNOUNCE_CAP,
503        is_local_client: false,
504        wants_tunnel: false,
505        tunnel_id: None,
506        mtu: 65535,
507        ia_freq: 0.0,
508        started: 0.0,
509        ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
510    }
511}
512
513// ==================== LOCAL CLIENT ====================
514
515#[cfg(target_os = "linux")]
516enum LocalClientStream {
517    Unix(std::os::unix::net::UnixStream),
518    Tcp(TcpStream),
519}
520
521#[cfg(target_os = "linux")]
522impl LocalClientStream {
523    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
524        match self {
525            LocalClientStream::Unix(stream) => stream.read(buf),
526            LocalClientStream::Tcp(stream) => stream.read(buf),
527        }
528    }
529
530    fn writer(&self) -> io::Result<Box<dyn Writer>> {
531        match self {
532            LocalClientStream::Unix(stream) => Ok(Box::new(UnixLocalWriter {
533                stream: stream.try_clone()?,
534                sleep_hold: None,
535            })),
536            LocalClientStream::Tcp(stream) => Ok(Box::new(LocalWriter {
537                stream: stream.try_clone()?,
538                sleep_hold: None,
539            })),
540        }
541    }
542}
543
544#[cfg(not(target_os = "linux"))]
545type LocalClientStream = TcpStream;
546
547#[cfg(not(target_os = "linux"))]
548fn local_client_stream_writer(stream: &LocalClientStream) -> io::Result<Box<dyn Writer>> {
549    Ok(Box::new(LocalWriter {
550        stream: stream.try_clone()?,
551        sleep_hold: None,
552    }))
553}
554
555#[cfg(target_os = "linux")]
556fn local_client_stream_writer(stream: &LocalClientStream) -> io::Result<Box<dyn Writer>> {
557    stream.writer()
558}
559
560fn try_connect_tcp(config: &LocalClientConfig) -> io::Result<TcpStream> {
561    let addr = format!("127.0.0.1:{}", config.port);
562    let stream = TcpStream::connect(&addr)?;
563    stream.set_nodelay(true)?;
564    log::info!(
565        "[{}] Connected to shared instance via TCP {}",
566        config.name,
567        addr
568    );
569    Ok(stream)
570}
571
572#[cfg(target_os = "linux")]
573fn try_connect_local_client(config: &LocalClientConfig) -> io::Result<LocalClientStream> {
574    match unix_socket::try_connect_unix(&config.instance_name) {
575        Ok(stream) => {
576            log::info!(
577                "[{}] Connected to shared instance via Unix socket: rns/{}",
578                config.name,
579                config.instance_name
580            );
581            Ok(LocalClientStream::Unix(stream))
582        }
583        Err(e) => {
584            log::info!(
585                "[{}] Unix socket connect failed ({}), trying TCP",
586                config.name,
587                e
588            );
589            try_connect_tcp(config).map(LocalClientStream::Tcp)
590        }
591    }
592}
593
594#[cfg(not(target_os = "linux"))]
595fn try_connect_local_client(config: &LocalClientConfig) -> io::Result<LocalClientStream> {
596    try_connect_tcp(config)
597}
598
599fn reconnect_local_client(config: &LocalClientConfig, tx: &EventSender) -> LocalClientStream {
600    loop {
601        thread::sleep(config.reconnect_wait);
602        match try_connect_local_client(config) {
603            Ok(stream) => match local_client_stream_writer(&stream) {
604                Ok(writer) => {
605                    let _ = maybe_spawn_local_client_phy_keepalive(
606                        &stream,
607                        config.interface_id,
608                        &config.name,
609                    );
610                    let _ = tx.send(Event::InterfaceUp(config.interface_id, Some(writer), None));
611                    return stream;
612                }
613                Err(e) => {
614                    log::warn!("[{}] failed to clone reconnect writer: {}", config.name, e);
615                }
616            },
617            Err(e) => {
618                log::warn!("[{}] reconnect failed: {}", config.name, e);
619            }
620        }
621    }
622}
623
624fn local_client_reader_loop(
625    mut stream: LocalClientStream,
626    config: LocalClientConfig,
627    tx: EventSender,
628) {
629    let id = config.interface_id;
630    let mut decoder = hdlc::Decoder::new();
631    let mut buf = [0u8; 4096];
632
633    loop {
634        match stream.read(&mut buf) {
635            Ok(0) => {
636                log::warn!("[{}] shared connection closed", config.name);
637                let _ = tx.send(Event::InterfaceDown(id));
638                stream = reconnect_local_client(&config, &tx);
639                decoder = hdlc::Decoder::new();
640            }
641            Ok(n) => {
642                for frame in decoder.feed(&buf[..n]) {
643                    if tx
644                        .send(Event::Frame {
645                            interface_id: id,
646                            data: frame,
647                        })
648                        .is_err()
649                    {
650                        return;
651                    }
652                }
653            }
654            Err(e) => {
655                log::warn!("[{}] shared read error: {}", config.name, e);
656                let _ = tx.send(Event::InterfaceDown(id));
657                stream = reconnect_local_client(&config, &tx);
658                decoder = hdlc::Decoder::new();
659            }
660        }
661    }
662}
663
664/// Start a local client (connect to shared instance).
665/// Tries Unix socket first on Linux, falls back to TCP.
666/// Returns the writer for the driver.
667pub fn start_client(config: LocalClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
668    let id = config.interface_id;
669    let stream = try_connect_local_client(&config)?;
670    let writer = local_client_stream_writer(&stream)?;
671    maybe_spawn_local_client_phy_keepalive(&stream, id, &config.name)?;
672
673    let _ = tx.send(Event::InterfaceUp(id, None, None));
674
675    thread::Builder::new()
676        .name(format!("local-client-reader-{}", id.0))
677        .spawn(move || {
678            local_client_reader_loop(stream, config, tx);
679        })?;
680
681    Ok(writer)
682}
683
684// --- Factory implementations ---
685
686use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
687use std::collections::HashMap;
688
689/// Factory for `LocalServerInterface`.
690pub struct LocalServerFactory;
691
692impl InterfaceFactory for LocalServerFactory {
693    fn type_name(&self) -> &str {
694        "LocalServerInterface"
695    }
696
697    fn parse_config(
698        &self,
699        _name: &str,
700        id: InterfaceId,
701        params: &HashMap<String, String>,
702    ) -> Result<Box<dyn InterfaceConfigData>, String> {
703        let instance_name = params
704            .get("instance_name")
705            .cloned()
706            .unwrap_or_else(|| "default".into());
707        let port = params
708            .get("port")
709            .and_then(|v| v.parse().ok())
710            .unwrap_or(37428);
711
712        Ok(Box::new(LocalServerConfig {
713            instance_name,
714            port,
715            interface_id: id,
716        }))
717    }
718
719    fn start(
720        &self,
721        config: Box<dyn InterfaceConfigData>,
722        ctx: StartContext,
723    ) -> std::io::Result<StartResult> {
724        let server_config = *config
725            .into_any()
726            .downcast::<LocalServerConfig>()
727            .map_err(|_| {
728                std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
729            })?;
730
731        let control = start_server(server_config, ctx.tx, ctx.next_dynamic_id)?;
732        Ok(StartResult::Listener {
733            control: Some(control),
734        })
735    }
736}
737
738/// Factory for `LocalClientInterface`.
739pub struct LocalClientFactory;
740
741impl InterfaceFactory for LocalClientFactory {
742    fn type_name(&self) -> &str {
743        "LocalClientInterface"
744    }
745
746    fn parse_config(
747        &self,
748        _name: &str,
749        id: InterfaceId,
750        params: &HashMap<String, String>,
751    ) -> Result<Box<dyn InterfaceConfigData>, String> {
752        let instance_name = params
753            .get("instance_name")
754            .cloned()
755            .unwrap_or_else(|| "default".into());
756        let port = params
757            .get("port")
758            .and_then(|v| v.parse().ok())
759            .unwrap_or(37428);
760
761        Ok(Box::new(LocalClientConfig {
762            instance_name,
763            port,
764            interface_id: id,
765            ..LocalClientConfig::default()
766        }))
767    }
768
769    fn start(
770        &self,
771        config: Box<dyn InterfaceConfigData>,
772        ctx: StartContext,
773    ) -> std::io::Result<StartResult> {
774        let client_config = *config
775            .into_any()
776            .downcast::<LocalClientConfig>()
777            .map_err(|_| {
778                std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
779            })?;
780
781        let id = client_config.interface_id;
782        let name = client_config.name.clone();
783        let info = InterfaceInfo {
784            id,
785            name,
786            mode: ctx.mode,
787            out_capable: true,
788            in_capable: true,
789            bitrate: Some(1_000_000_000),
790            airtime_profile: None,
791            announce_rate_target: None,
792            announce_rate_grace: 0,
793            announce_rate_penalty: 0.0,
794            announce_cap: rns_core::constants::ANNOUNCE_CAP,
795            is_local_client: false,
796            wants_tunnel: false,
797            tunnel_id: None,
798            mtu: 65535,
799            ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
800            ia_freq: 0.0,
801            started: crate::time::now(),
802        };
803
804        let writer = start_client(client_config, ctx.tx)?;
805
806        Ok(StartResult::Simple {
807            id,
808            info,
809            writer,
810            interface_type_name: "LocalInterface".to_string(),
811        })
812    }
813}
814
815#[cfg(test)]
816mod tests {
817    use super::*;
818    use std::sync::mpsc;
819    use std::sync::mpsc::RecvTimeoutError;
820
821    #[cfg(target_os = "linux")]
822    type TestClient = std::os::unix::net::UnixStream;
823
824    #[cfg(not(target_os = "linux"))]
825    type TestClient = TcpStream;
826
827    fn connect_test_client(instance_name: &str, _port: u16) -> TestClient {
828        #[cfg(target_os = "linux")]
829        {
830            unix_socket::try_connect_unix(instance_name).unwrap()
831        }
832
833        #[cfg(not(target_os = "linux"))]
834        {
835            TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap()
836        }
837    }
838
839    fn find_free_port() -> u16 {
840        TcpListener::bind("127.0.0.1:0")
841            .unwrap()
842            .local_addr()
843            .unwrap()
844            .port()
845    }
846
847    #[test]
848    fn server_bind_tcp() {
849        let port = find_free_port();
850        let instance_name = "test-bind".to_string();
851        let (tx, _rx) = crate::event::channel();
852        let next_id = Arc::new(AtomicU64::new(7000));
853
854        let config = LocalServerConfig {
855            instance_name: instance_name.clone(),
856            port,
857            interface_id: InterfaceId(70),
858        };
859
860        // We force TCP by using a unique instance name that won't conflict
861        // with any existing Unix socket
862        start_server(config, tx, next_id).unwrap();
863        thread::sleep(Duration::from_millis(50));
864
865        connect_test_client(&instance_name, port);
866    }
867
868    #[test]
869    fn server_accept_client() {
870        let port = find_free_port();
871        let instance_name = "test-accept".to_string();
872        let (tx, rx) = crate::event::channel();
873        let next_id = Arc::new(AtomicU64::new(7100));
874
875        let config = LocalServerConfig {
876            instance_name: instance_name.clone(),
877            port,
878            interface_id: InterfaceId(71),
879        };
880
881        start_server(config, tx, next_id).unwrap();
882        thread::sleep(Duration::from_millis(50));
883
884        connect_test_client(&instance_name, port);
885
886        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
887        match event {
888            Event::InterfaceUp(id, writer, info) => {
889                assert_eq!(id, InterfaceId(7100));
890                assert!(writer.is_some());
891                assert!(info.is_some());
892            }
893            other => panic!("expected InterfaceUp, got {:?}", other),
894        }
895    }
896
897    #[test]
898    fn server_stop_prevents_new_accepts() {
899        let port = find_free_port();
900        let instance_name = "test-stop".to_string();
901        let (tx, rx) = crate::event::channel();
902        let next_id = Arc::new(AtomicU64::new(7150));
903
904        let config = LocalServerConfig {
905            instance_name: instance_name.clone(),
906            port,
907            interface_id: InterfaceId(71),
908        };
909
910        let control = start_server(config, tx, next_id).unwrap();
911        thread::sleep(Duration::from_millis(50));
912        control.request_stop();
913        thread::sleep(Duration::from_millis(120));
914
915        #[cfg(target_os = "linux")]
916        let connect_result = unix_socket::try_connect_unix(&instance_name);
917
918        #[cfg(not(target_os = "linux"))]
919        let connect_result = TcpStream::connect(format!("127.0.0.1:{}", port));
920
921        if let Ok(stream) = connect_result {
922            drop(stream);
923        }
924
925        match rx.recv_timeout(Duration::from_millis(200)) {
926            Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => {}
927            other => panic!("expected no InterfaceUp after server stop, got {:?}", other),
928        }
929    }
930
931    #[test]
932    fn client_send_receive() {
933        let port = find_free_port();
934        let (server_tx, server_rx) = crate::event::channel();
935        let next_id = Arc::new(AtomicU64::new(7200));
936
937        let server_config = LocalServerConfig {
938            instance_name: "test-sr".into(),
939            port,
940            interface_id: InterfaceId(72),
941        };
942
943        start_server(server_config, server_tx, next_id).unwrap();
944        thread::sleep(Duration::from_millis(50));
945
946        // Connect client
947        let (client_tx, client_rx) = crate::event::channel();
948        let client_config = LocalClientConfig {
949            name: "test-client".into(),
950            instance_name: "test-sr".into(),
951            port,
952            interface_id: InterfaceId(73),
953            reconnect_wait: Duration::from_secs(1),
954        };
955
956        let mut client_writer = start_client(client_config, client_tx).unwrap();
957
958        // Get server-side InterfaceUp
959        let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
960        let mut server_writer = match event {
961            Event::InterfaceUp(_, Some(w), _) => w,
962            other => panic!("expected InterfaceUp with writer, got {:?}", other),
963        };
964
965        // Get client-side InterfaceUp
966        let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
967        match event {
968            Event::InterfaceUp(id, _, _) => assert_eq!(id, InterfaceId(73)),
969            other => panic!("expected InterfaceUp, got {:?}", other),
970        }
971
972        // Client sends to server
973        let payload: Vec<u8> = (0..32).collect();
974        client_writer.send_frame(&payload).unwrap();
975
976        let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
977        match event {
978            Event::Frame { data, .. } => assert_eq!(data, payload),
979            other => panic!("expected Frame, got {:?}", other),
980        }
981
982        // Server sends to client
983        let payload2: Vec<u8> = (100..132).collect();
984        server_writer.send_frame(&payload2).unwrap();
985
986        let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
987        match event {
988            Event::Frame { data, .. } => assert_eq!(data, payload2),
989            other => panic!("expected Frame, got {:?}", other),
990        }
991    }
992
993    #[test]
994    fn sleep_hold_drops_outbound_after_timeout_and_refresh_restores() {
995        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
996        let addr = listener.local_addr().unwrap();
997        let mut client = TcpStream::connect(addr).unwrap();
998        let (server_stream, _) = listener.accept().unwrap();
999        client
1000            .set_read_timeout(Some(Duration::from_millis(200)))
1001            .unwrap();
1002
1003        let sleep_hold = ClientSleepHold::new(Duration::from_millis(30));
1004        let mut writer = LocalWriter {
1005            stream: server_stream,
1006            sleep_hold: Some(sleep_hold.clone()),
1007        };
1008
1009        writer.send_frame(b"live").unwrap();
1010        let mut buf = [0u8; 16];
1011        let n = client.read(&mut buf).unwrap();
1012        assert!(n > 0);
1013
1014        thread::sleep(Duration::from_millis(50));
1015        writer.send_frame(b"drop").unwrap();
1016        let err = client.read(&mut buf).unwrap_err();
1017        assert!(
1018            matches!(
1019                err.kind(),
1020                io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut
1021            ),
1022            "paused writer should not emit bytes, got {err:?}"
1023        );
1024
1025        sleep_hold.refresh();
1026        writer.send_frame(b"again").unwrap();
1027        let n = client.read(&mut buf).unwrap();
1028        assert!(n > 0);
1029    }
1030
1031    #[test]
1032    fn tcp_reader_refreshes_sleep_hold_on_inbound_data() {
1033        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1034        let addr = listener.local_addr().unwrap();
1035        let mut client = TcpStream::connect(addr).unwrap();
1036        let (server_stream, _) = listener.accept().unwrap();
1037        let writer_stream = server_stream.try_clone().unwrap();
1038        client
1039            .set_read_timeout(Some(Duration::from_millis(200)))
1040            .unwrap();
1041
1042        let sleep_hold = ClientSleepHold::new(Duration::from_millis(30));
1043        let reader_sleep_hold = sleep_hold.clone();
1044        let (tx, rx) = crate::event::channel();
1045        thread::spawn(move || {
1046            tcp_reader_loop(server_stream, InterfaceId(98), tx, Some(reader_sleep_hold));
1047        });
1048
1049        let mut writer = LocalWriter {
1050            stream: writer_stream,
1051            sleep_hold: Some(sleep_hold),
1052        };
1053
1054        thread::sleep(Duration::from_millis(50));
1055        writer.send_frame(b"drop").unwrap();
1056        let mut buf = [0u8; 32];
1057        let err = client.read(&mut buf).unwrap_err();
1058        assert!(
1059            matches!(
1060                err.kind(),
1061                io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut
1062            ),
1063            "paused writer should not emit bytes, got {err:?}"
1064        );
1065
1066        let inbound = vec![0x42; constants::HEADER_MINSIZE];
1067        client.write_all(&hdlc::frame(&inbound)).unwrap();
1068        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1069        match event {
1070            Event::Frame { interface_id, data } => {
1071                assert_eq!(interface_id, InterfaceId(98));
1072                assert_eq!(data, inbound);
1073            }
1074            other => panic!("expected Frame, got {:?}", other),
1075        }
1076
1077        writer.send_frame(b"again").unwrap();
1078        let n = client.read(&mut buf).unwrap();
1079        assert!(n > 0);
1080    }
1081
1082    #[test]
1083    fn physical_keepalive_loop_sends_empty_hdlc_frame() {
1084        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1085        let addr = listener.local_addr().unwrap();
1086        let mut client = TcpStream::connect(addr).unwrap();
1087        let (server_stream, _) = listener.accept().unwrap();
1088        client
1089            .set_read_timeout(Some(Duration::from_millis(500)))
1090            .unwrap();
1091
1092        let writer: Box<dyn Writer> = Box::new(LocalWriter {
1093            stream: server_stream,
1094            sleep_hold: None,
1095        });
1096        spawn_physical_keepalive_loop(
1097            writer,
1098            InterfaceId(99),
1099            "test-local".into(),
1100            Duration::from_millis(10),
1101        );
1102
1103        let mut buf = [0u8; 2];
1104        client.read_exact(&mut buf).unwrap();
1105        assert_eq!(buf, [0x7E, 0x7E]);
1106    }
1107
1108    #[test]
1109    fn multiple_local_clients() {
1110        let port = find_free_port();
1111        let instance_name = "test-multi".to_string();
1112        let (tx, rx) = crate::event::channel();
1113        let next_id = Arc::new(AtomicU64::new(7300));
1114
1115        let config = LocalServerConfig {
1116            instance_name: instance_name.clone(),
1117            port,
1118            interface_id: InterfaceId(74),
1119        };
1120
1121        start_server(config, tx, next_id).unwrap();
1122        thread::sleep(Duration::from_millis(50));
1123
1124        let _client1 = connect_test_client(&instance_name, port);
1125        let _client2 = connect_test_client(&instance_name, port);
1126
1127        let mut ids = Vec::new();
1128        for _ in 0..2 {
1129            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1130            match event {
1131                Event::InterfaceUp(id, _, _) => ids.push(id),
1132                other => panic!("expected InterfaceUp, got {:?}", other),
1133            }
1134        }
1135
1136        assert_eq!(ids.len(), 2);
1137        assert_ne!(ids[0], ids[1]);
1138    }
1139
1140    #[test]
1141    fn client_disconnect_detected() {
1142        let port = find_free_port();
1143        let instance_name = "test-dc".to_string();
1144        let (tx, rx) = crate::event::channel();
1145        let next_id = Arc::new(AtomicU64::new(7400));
1146
1147        let config = LocalServerConfig {
1148            instance_name: instance_name.clone(),
1149            port,
1150            interface_id: InterfaceId(75),
1151        };
1152
1153        start_server(config, tx, next_id).unwrap();
1154        thread::sleep(Duration::from_millis(50));
1155
1156        #[cfg(target_os = "linux")]
1157        let client = unix_socket::try_connect_unix(&instance_name).unwrap();
1158
1159        #[cfg(not(target_os = "linux"))]
1160        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1161
1162        // Drain InterfaceUp
1163        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1164
1165        // Disconnect
1166        drop(client);
1167
1168        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1169        assert!(
1170            matches!(event, Event::InterfaceDown(_)),
1171            "expected InterfaceDown, got {:?}",
1172            event
1173        );
1174    }
1175
1176    #[test]
1177    fn client_reconnects_after_tcp_restart() {
1178        let port = find_free_port();
1179        let addr = format!("127.0.0.1:{}", port);
1180        let instance_name = format!("test-reconnect-{}", port);
1181
1182        let listener1 = TcpListener::bind(&addr).unwrap();
1183        let (accepted1_tx, accepted1_rx) = mpsc::channel();
1184        thread::spawn(move || {
1185            let (stream, _) = listener1.accept().unwrap();
1186            accepted1_tx.send(stream).unwrap();
1187        });
1188
1189        let (client_tx, client_rx) = crate::event::channel();
1190        let client_config = LocalClientConfig {
1191            name: "test-client".into(),
1192            instance_name,
1193            port,
1194            interface_id: InterfaceId(76),
1195            reconnect_wait: Duration::from_millis(50),
1196        };
1197
1198        let _writer = start_client(client_config, client_tx).unwrap();
1199        let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1200        assert!(matches!(
1201            event,
1202            Event::InterfaceUp(InterfaceId(76), None, None)
1203        ));
1204
1205        let stream1 = accepted1_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1206        drop(stream1);
1207
1208        let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1209        assert!(matches!(event, Event::InterfaceDown(InterfaceId(76))));
1210
1211        let listener2 = TcpListener::bind(&addr).unwrap();
1212        let (accepted2_tx, accepted2_rx) = mpsc::channel();
1213        thread::spawn(move || {
1214            let (stream, _) = listener2.accept().unwrap();
1215            accepted2_tx.send(stream).unwrap();
1216        });
1217
1218        let mut reconnected_writer = None;
1219        for _ in 0..10 {
1220            let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1221            match event {
1222                Event::InterfaceUp(InterfaceId(76), writer, None) if writer.is_some() => {
1223                    reconnected_writer = writer;
1224                    break;
1225                }
1226                _ => {}
1227            }
1228        }
1229
1230        let mut reconnected_writer = reconnected_writer.expect("missing reconnect writer");
1231        let mut stream2 = accepted2_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1232        reconnected_writer.send_frame(b"client->server").unwrap();
1233        stream2
1234            .set_read_timeout(Some(Duration::from_secs(2)))
1235            .unwrap();
1236        let mut buf = [0u8; 64];
1237        let n = stream2.read(&mut buf).unwrap();
1238        assert!(n > 0, "expected bytes from refreshed writer");
1239    }
1240
1241    #[cfg(target_os = "linux")]
1242    #[test]
1243    fn unix_abstract_socket_helpers_work() {
1244        let instance_name = format!(
1245            "test-abstract-{}",
1246            std::time::SystemTime::now()
1247                .duration_since(std::time::UNIX_EPOCH)
1248                .unwrap()
1249                .as_nanos()
1250        );
1251
1252        let listener = unix_socket::try_bind_unix(&instance_name).unwrap();
1253        let accept_thread = thread::spawn(move || listener.accept().unwrap().0);
1254
1255        let mut client = unix_socket::try_connect_unix(&instance_name).unwrap();
1256        let mut server = accept_thread.join().unwrap();
1257
1258        client.write_all(b"ping").unwrap();
1259        let mut buf = [0u8; 4];
1260        server.read_exact(&mut buf).unwrap();
1261        assert_eq!(&buf, b"ping");
1262    }
1263}