Skip to main content

rns_net/interface/
local.rs

1//! Local shared instance interface.
2//!
3//! Provides communication between the shared RNS instance and local client
4//! programs. Uses Unix abstract sockets on Linux, TCP on other platforms.
5//! HDLC framing over the connection (same as TCP interfaces).
6//!
7//! Two modes:
8//! - `LocalServer`: The shared instance binds and accepts client connections.
9//! - `LocalClient`: Connects to an existing shared instance.
10
11use std::io::{self, Read, Write};
12use std::net::{TcpListener, TcpStream};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use std::thread;
16use std::time::Duration;
17
18use rns_core::constants;
19use rns_core::transport::types::{InterfaceId, InterfaceInfo};
20
21use crate::event::{Event, EventSender};
22use crate::hdlc;
23use crate::interface::{ListenerControl, Writer};
24
25/// Configuration for a Local server (shared instance).
26#[derive(Debug, Clone)]
27pub struct LocalServerConfig {
28    pub instance_name: String,
29    pub port: u16,
30    pub interface_id: InterfaceId,
31}
32
33impl Default for LocalServerConfig {
34    fn default() -> Self {
35        LocalServerConfig {
36            instance_name: "default".into(),
37            port: 37428,
38            interface_id: InterfaceId(0),
39        }
40    }
41}
42
43/// Configuration for a Local client (connecting to shared instance).
44#[derive(Debug, Clone)]
45pub struct LocalClientConfig {
46    pub name: String,
47    pub instance_name: String,
48    pub port: u16,
49    pub interface_id: InterfaceId,
50    pub reconnect_wait: Duration,
51}
52
53impl Default for LocalClientConfig {
54    fn default() -> Self {
55        LocalClientConfig {
56            name: "Local shared instance".into(),
57            instance_name: "default".into(),
58            port: 37428,
59            interface_id: InterfaceId(0),
60            reconnect_wait: Duration::from_secs(8),
61        }
62    }
63}
64
65/// HDLC writer over a TCP or Unix stream.
66struct LocalWriter {
67    stream: TcpStream,
68}
69
70impl Writer for LocalWriter {
71    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
72        self.stream.write_all(&hdlc::frame(data))
73    }
74}
75
76#[cfg(target_os = "linux")]
77mod unix_socket {
78    use std::io;
79    use std::os::linux::net::SocketAddrExt;
80    use std::os::unix::net::{SocketAddr, UnixListener, UnixStream};
81
82    fn abstract_addr(instance_name: &str) -> io::Result<SocketAddr> {
83        SocketAddr::from_abstract_name(format!("rns/{}", instance_name))
84    }
85
86    /// Try to bind a Unix abstract socket with the given instance name.
87    pub fn try_bind_unix(instance_name: &str) -> io::Result<UnixListener> {
88        let addr = abstract_addr(instance_name)?;
89        UnixListener::bind_addr(&addr)
90    }
91
92    /// Try to connect to a Unix abstract socket.
93    pub fn try_connect_unix(instance_name: &str) -> io::Result<UnixStream> {
94        let addr = abstract_addr(instance_name)?;
95        UnixStream::connect_addr(&addr)
96    }
97}
98
99// ==================== LOCAL SERVER ====================
100
101/// Start a local server (shared instance).
102/// Tries Unix abstract socket first on Linux, falls back to TCP.
103/// Spawns an acceptor thread. Each client gets a dynamically allocated InterfaceId.
104pub fn start_server(
105    config: LocalServerConfig,
106    tx: EventSender,
107    next_id: Arc<AtomicU64>,
108) -> io::Result<ListenerControl> {
109    let control = ListenerControl::new();
110    // Try Unix socket first on Linux
111    #[cfg(target_os = "linux")]
112    {
113        match unix_socket::try_bind_unix(&config.instance_name) {
114            Ok(listener) => {
115                listener.set_nonblocking(true)?;
116                log::info!(
117                    "Local server using Unix socket: rns/{}",
118                    config.instance_name
119                );
120                let name = format!("rns/{}", config.instance_name);
121                let listener_control = control.clone();
122                thread::Builder::new()
123                    .name("local-server".into())
124                    .spawn(move || {
125                        unix_server_loop(listener, name, tx, next_id, listener_control);
126                    })?;
127                return Ok(control);
128            }
129            Err(e) => {
130                log::info!("Unix socket bind failed ({}), falling back to TCP", e);
131            }
132        }
133    }
134
135    // Fallback: TCP on localhost
136    let addr = format!("127.0.0.1:{}", config.port);
137    let listener = TcpListener::bind(&addr)?;
138    listener.set_nonblocking(true)?;
139
140    log::info!("Local server listening on TCP {}", addr);
141
142    let listener_control = control.clone();
143    thread::Builder::new()
144        .name("local-server".into())
145        .spawn(move || {
146            tcp_server_loop(listener, tx, next_id, listener_control);
147        })?;
148
149    Ok(control)
150}
151
152/// TCP server accept loop for local interface.
153fn tcp_server_loop(
154    listener: TcpListener,
155    tx: EventSender,
156    next_id: Arc<AtomicU64>,
157    control: ListenerControl,
158) {
159    loop {
160        if control.should_stop() {
161            log::info!("Local TCP listener stopping");
162            return;
163        }
164
165        let stream_result = listener.accept().map(|(stream, _)| stream);
166        let stream = match stream_result {
167            Ok(s) => s,
168            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
169                thread::sleep(Duration::from_millis(50));
170                continue;
171            }
172            Err(e) => {
173                log::warn!("Local server accept failed: {}", e);
174                continue;
175            }
176        };
177
178        if let Err(e) = stream.set_nodelay(true) {
179            log::warn!("Local server set_nodelay failed: {}", e);
180        }
181
182        let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
183        spawn_local_client_handler(stream, client_id, tx.clone());
184    }
185}
186
187/// Unix socket server accept loop for local interface.
188#[cfg(target_os = "linux")]
189fn unix_server_loop(
190    listener: std::os::unix::net::UnixListener,
191    name: String,
192    tx: EventSender,
193    next_id: Arc<AtomicU64>,
194    control: ListenerControl,
195) {
196    loop {
197        if control.should_stop() {
198            log::info!("[{}] Local Unix listener stopping", name);
199            return;
200        }
201
202        let stream_result = listener.accept().map(|(stream, _)| stream);
203        let stream = match stream_result {
204            Ok(s) => s,
205            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
206                thread::sleep(Duration::from_millis(50));
207                continue;
208            }
209            Err(e) => {
210                log::warn!("[{}] Local server accept failed: {}", name, e);
211                continue;
212            }
213        };
214
215        let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
216
217        // Convert UnixStream to a pair of read/write handles
218        let writer_stream = match stream.try_clone() {
219            Ok(s) => s,
220            Err(e) => {
221                log::warn!("Local server clone failed: {}", e);
222                continue;
223            }
224        };
225
226        let info = make_local_interface_info(client_id);
227        let writer: Box<dyn Writer> = Box::new(UnixLocalWriter {
228            stream: writer_stream,
229        });
230
231        if tx
232            .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
233            .is_err()
234        {
235            return;
236        }
237
238        let client_tx = tx.clone();
239        thread::Builder::new()
240            .name(format!("local-unix-reader-{}", client_id.0))
241            .spawn(move || {
242                unix_reader_loop(stream, client_id, client_tx);
243            })
244            .ok();
245    }
246}
247
248#[cfg(target_os = "linux")]
249struct UnixLocalWriter {
250    stream: std::os::unix::net::UnixStream,
251}
252
253#[cfg(target_os = "linux")]
254impl Writer for UnixLocalWriter {
255    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
256        use std::io::Write;
257        self.stream.write_all(&hdlc::frame(data))
258    }
259}
260
261#[cfg(target_os = "linux")]
262fn unix_reader_loop(mut stream: std::os::unix::net::UnixStream, id: InterfaceId, tx: EventSender) {
263    use std::io::Read;
264    let mut decoder = hdlc::Decoder::new();
265    let mut buf = [0u8; 4096];
266
267    loop {
268        match stream.read(&mut buf) {
269            Ok(0) => {
270                let _ = tx.send(Event::InterfaceDown(id));
271                return;
272            }
273            Ok(n) => {
274                for frame in decoder.feed(&buf[..n]) {
275                    if tx
276                        .send(Event::Frame {
277                            interface_id: id,
278                            data: frame,
279                        })
280                        .is_err()
281                    {
282                        return;
283                    }
284                }
285            }
286            Err(_) => {
287                let _ = tx.send(Event::InterfaceDown(id));
288                return;
289            }
290        }
291    }
292}
293
294/// Spawn handler threads for a connected TCP local client.
295fn spawn_local_client_handler(stream: TcpStream, client_id: InterfaceId, tx: EventSender) {
296    let writer_stream = match stream.try_clone() {
297        Ok(s) => s,
298        Err(e) => {
299            log::warn!("Local server clone failed: {}", e);
300            return;
301        }
302    };
303
304    let info = make_local_interface_info(client_id);
305    let writer: Box<dyn Writer> = Box::new(LocalWriter {
306        stream: writer_stream,
307    });
308
309    if tx
310        .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
311        .is_err()
312    {
313        return;
314    }
315
316    thread::Builder::new()
317        .name(format!("local-reader-{}", client_id.0))
318        .spawn(move || {
319            tcp_reader_loop(stream, client_id, tx);
320        })
321        .ok();
322}
323
324fn tcp_reader_loop(mut stream: TcpStream, id: InterfaceId, tx: EventSender) {
325    let mut decoder = hdlc::Decoder::new();
326    let mut buf = [0u8; 4096];
327
328    loop {
329        match stream.read(&mut buf) {
330            Ok(0) => {
331                log::info!("Local client {} disconnected", id.0);
332                let _ = tx.send(Event::InterfaceDown(id));
333                return;
334            }
335            Ok(n) => {
336                for frame in decoder.feed(&buf[..n]) {
337                    if tx
338                        .send(Event::Frame {
339                            interface_id: id,
340                            data: frame,
341                        })
342                        .is_err()
343                    {
344                        return;
345                    }
346                }
347            }
348            Err(e) => {
349                log::warn!("Local client {} read error: {}", id.0, e);
350                let _ = tx.send(Event::InterfaceDown(id));
351                return;
352            }
353        }
354    }
355}
356
357fn make_local_interface_info(id: InterfaceId) -> InterfaceInfo {
358    InterfaceInfo {
359        id,
360        name: String::from("LocalInterface"),
361        mode: constants::MODE_FULL,
362        out_capable: true,
363        in_capable: true,
364        bitrate: Some(1_000_000_000), // 1 Gbps
365        announce_rate_target: None,
366        announce_rate_grace: 0,
367        announce_rate_penalty: 0.0,
368        announce_cap: constants::ANNOUNCE_CAP,
369        is_local_client: false,
370        wants_tunnel: false,
371        tunnel_id: None,
372        mtu: 65535,
373        ia_freq: 0.0,
374        started: 0.0,
375        ingress_control: false,
376    }
377}
378
379// ==================== LOCAL CLIENT ====================
380
381#[cfg(target_os = "linux")]
382enum LocalClientStream {
383    Unix(std::os::unix::net::UnixStream),
384    Tcp(TcpStream),
385}
386
387#[cfg(target_os = "linux")]
388impl LocalClientStream {
389    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
390        match self {
391            LocalClientStream::Unix(stream) => stream.read(buf),
392            LocalClientStream::Tcp(stream) => stream.read(buf),
393        }
394    }
395
396    fn writer(&self) -> io::Result<Box<dyn Writer>> {
397        match self {
398            LocalClientStream::Unix(stream) => Ok(Box::new(UnixLocalWriter {
399                stream: stream.try_clone()?,
400            })),
401            LocalClientStream::Tcp(stream) => Ok(Box::new(LocalWriter {
402                stream: stream.try_clone()?,
403            })),
404        }
405    }
406}
407
408#[cfg(not(target_os = "linux"))]
409type LocalClientStream = TcpStream;
410
411#[cfg(not(target_os = "linux"))]
412fn local_client_stream_writer(stream: &LocalClientStream) -> io::Result<Box<dyn Writer>> {
413    Ok(Box::new(LocalWriter {
414        stream: stream.try_clone()?,
415    }))
416}
417
418#[cfg(target_os = "linux")]
419fn local_client_stream_writer(stream: &LocalClientStream) -> io::Result<Box<dyn Writer>> {
420    stream.writer()
421}
422
423fn try_connect_tcp(config: &LocalClientConfig) -> io::Result<TcpStream> {
424    let addr = format!("127.0.0.1:{}", config.port);
425    let stream = TcpStream::connect(&addr)?;
426    stream.set_nodelay(true)?;
427    log::info!(
428        "[{}] Connected to shared instance via TCP {}",
429        config.name,
430        addr
431    );
432    Ok(stream)
433}
434
435#[cfg(target_os = "linux")]
436fn try_connect_local_client(config: &LocalClientConfig) -> io::Result<LocalClientStream> {
437    match unix_socket::try_connect_unix(&config.instance_name) {
438        Ok(stream) => {
439            log::info!(
440                "[{}] Connected to shared instance via Unix socket: rns/{}",
441                config.name,
442                config.instance_name
443            );
444            Ok(LocalClientStream::Unix(stream))
445        }
446        Err(e) => {
447            log::info!(
448                "[{}] Unix socket connect failed ({}), trying TCP",
449                config.name,
450                e
451            );
452            try_connect_tcp(config).map(LocalClientStream::Tcp)
453        }
454    }
455}
456
457#[cfg(not(target_os = "linux"))]
458fn try_connect_local_client(config: &LocalClientConfig) -> io::Result<LocalClientStream> {
459    try_connect_tcp(config)
460}
461
462fn reconnect_local_client(config: &LocalClientConfig, tx: &EventSender) -> LocalClientStream {
463    loop {
464        thread::sleep(config.reconnect_wait);
465        match try_connect_local_client(config) {
466            Ok(stream) => match local_client_stream_writer(&stream) {
467                Ok(writer) => {
468                    let _ = tx.send(Event::InterfaceUp(config.interface_id, Some(writer), None));
469                    return stream;
470                }
471                Err(e) => {
472                    log::warn!("[{}] failed to clone reconnect writer: {}", config.name, e);
473                }
474            },
475            Err(e) => {
476                log::warn!("[{}] reconnect failed: {}", config.name, e);
477            }
478        }
479    }
480}
481
482fn local_client_reader_loop(
483    mut stream: LocalClientStream,
484    config: LocalClientConfig,
485    tx: EventSender,
486) {
487    let id = config.interface_id;
488    let mut decoder = hdlc::Decoder::new();
489    let mut buf = [0u8; 4096];
490
491    loop {
492        match stream.read(&mut buf) {
493            Ok(0) => {
494                log::warn!("[{}] shared connection closed", config.name);
495                let _ = tx.send(Event::InterfaceDown(id));
496                stream = reconnect_local_client(&config, &tx);
497                decoder = hdlc::Decoder::new();
498            }
499            Ok(n) => {
500                for frame in decoder.feed(&buf[..n]) {
501                    if tx
502                        .send(Event::Frame {
503                            interface_id: id,
504                            data: frame,
505                        })
506                        .is_err()
507                    {
508                        return;
509                    }
510                }
511            }
512            Err(e) => {
513                log::warn!("[{}] shared read error: {}", config.name, e);
514                let _ = tx.send(Event::InterfaceDown(id));
515                stream = reconnect_local_client(&config, &tx);
516                decoder = hdlc::Decoder::new();
517            }
518        }
519    }
520}
521
522/// Start a local client (connect to shared instance).
523/// Tries Unix socket first on Linux, falls back to TCP.
524/// Returns the writer for the driver.
525pub fn start_client(config: LocalClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
526    let id = config.interface_id;
527    let stream = try_connect_local_client(&config)?;
528    let writer = local_client_stream_writer(&stream)?;
529
530    let _ = tx.send(Event::InterfaceUp(id, None, None));
531
532    thread::Builder::new()
533        .name(format!("local-client-reader-{}", id.0))
534        .spawn(move || {
535            local_client_reader_loop(stream, config, tx);
536        })?;
537
538    Ok(writer)
539}
540
541// --- Factory implementations ---
542
543use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
544use std::collections::HashMap;
545
546/// Factory for `LocalServerInterface`.
547pub struct LocalServerFactory;
548
549impl InterfaceFactory for LocalServerFactory {
550    fn type_name(&self) -> &str {
551        "LocalServerInterface"
552    }
553
554    fn parse_config(
555        &self,
556        _name: &str,
557        id: InterfaceId,
558        params: &HashMap<String, String>,
559    ) -> Result<Box<dyn InterfaceConfigData>, String> {
560        let instance_name = params
561            .get("instance_name")
562            .cloned()
563            .unwrap_or_else(|| "default".into());
564        let port = params
565            .get("port")
566            .and_then(|v| v.parse().ok())
567            .unwrap_or(37428);
568
569        Ok(Box::new(LocalServerConfig {
570            instance_name,
571            port,
572            interface_id: id,
573        }))
574    }
575
576    fn start(
577        &self,
578        config: Box<dyn InterfaceConfigData>,
579        ctx: StartContext,
580    ) -> std::io::Result<StartResult> {
581        let server_config = *config
582            .into_any()
583            .downcast::<LocalServerConfig>()
584            .map_err(|_| {
585                std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
586            })?;
587
588        let control = start_server(server_config, ctx.tx, ctx.next_dynamic_id)?;
589        Ok(StartResult::Listener {
590            control: Some(control),
591        })
592    }
593}
594
595/// Factory for `LocalClientInterface`.
596pub struct LocalClientFactory;
597
598impl InterfaceFactory for LocalClientFactory {
599    fn type_name(&self) -> &str {
600        "LocalClientInterface"
601    }
602
603    fn parse_config(
604        &self,
605        _name: &str,
606        id: InterfaceId,
607        params: &HashMap<String, String>,
608    ) -> Result<Box<dyn InterfaceConfigData>, String> {
609        let instance_name = params
610            .get("instance_name")
611            .cloned()
612            .unwrap_or_else(|| "default".into());
613        let port = params
614            .get("port")
615            .and_then(|v| v.parse().ok())
616            .unwrap_or(37428);
617
618        Ok(Box::new(LocalClientConfig {
619            instance_name,
620            port,
621            interface_id: id,
622            ..LocalClientConfig::default()
623        }))
624    }
625
626    fn start(
627        &self,
628        config: Box<dyn InterfaceConfigData>,
629        ctx: StartContext,
630    ) -> std::io::Result<StartResult> {
631        let client_config = *config
632            .into_any()
633            .downcast::<LocalClientConfig>()
634            .map_err(|_| {
635                std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
636            })?;
637
638        let id = client_config.interface_id;
639        let name = client_config.name.clone();
640        let info = InterfaceInfo {
641            id,
642            name,
643            mode: ctx.mode,
644            out_capable: true,
645            in_capable: true,
646            bitrate: Some(1_000_000_000),
647            announce_rate_target: None,
648            announce_rate_grace: 0,
649            announce_rate_penalty: 0.0,
650            announce_cap: rns_core::constants::ANNOUNCE_CAP,
651            is_local_client: false,
652            wants_tunnel: false,
653            tunnel_id: None,
654            mtu: 65535,
655            ingress_control: false,
656            ia_freq: 0.0,
657            started: crate::time::now(),
658        };
659
660        let writer = start_client(client_config, ctx.tx)?;
661
662        Ok(StartResult::Simple {
663            id,
664            info,
665            writer,
666            interface_type_name: "LocalInterface".to_string(),
667        })
668    }
669}
670
671#[cfg(test)]
672mod tests {
673    use super::*;
674    use std::sync::mpsc;
675    use std::sync::mpsc::RecvTimeoutError;
676
677    fn connect_test_client(instance_name: &str, _port: u16) {
678        #[cfg(target_os = "linux")]
679        {
680            let _client = unix_socket::try_connect_unix(instance_name).unwrap();
681        }
682
683        #[cfg(not(target_os = "linux"))]
684        {
685            let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
686        }
687    }
688
689    fn find_free_port() -> u16 {
690        TcpListener::bind("127.0.0.1:0")
691            .unwrap()
692            .local_addr()
693            .unwrap()
694            .port()
695    }
696
697    #[test]
698    fn server_bind_tcp() {
699        let port = find_free_port();
700        let instance_name = "test-bind".to_string();
701        let (tx, _rx) = crate::event::channel();
702        let next_id = Arc::new(AtomicU64::new(7000));
703
704        let config = LocalServerConfig {
705            instance_name: instance_name.clone(),
706            port,
707            interface_id: InterfaceId(70),
708        };
709
710        // We force TCP by using a unique instance name that won't conflict
711        // with any existing Unix socket
712        start_server(config, tx, next_id).unwrap();
713        thread::sleep(Duration::from_millis(50));
714
715        connect_test_client(&instance_name, port);
716    }
717
718    #[test]
719    fn server_accept_client() {
720        let port = find_free_port();
721        let instance_name = "test-accept".to_string();
722        let (tx, rx) = crate::event::channel();
723        let next_id = Arc::new(AtomicU64::new(7100));
724
725        let config = LocalServerConfig {
726            instance_name: instance_name.clone(),
727            port,
728            interface_id: InterfaceId(71),
729        };
730
731        start_server(config, tx, next_id).unwrap();
732        thread::sleep(Duration::from_millis(50));
733
734        connect_test_client(&instance_name, port);
735
736        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
737        match event {
738            Event::InterfaceUp(id, writer, info) => {
739                assert_eq!(id, InterfaceId(7100));
740                assert!(writer.is_some());
741                assert!(info.is_some());
742            }
743            other => panic!("expected InterfaceUp, got {:?}", other),
744        }
745    }
746
747    #[test]
748    fn server_stop_prevents_new_accepts() {
749        let port = find_free_port();
750        let instance_name = "test-stop".to_string();
751        let (tx, rx) = crate::event::channel();
752        let next_id = Arc::new(AtomicU64::new(7150));
753
754        let config = LocalServerConfig {
755            instance_name: instance_name.clone(),
756            port,
757            interface_id: InterfaceId(71),
758        };
759
760        let control = start_server(config, tx, next_id).unwrap();
761        thread::sleep(Duration::from_millis(50));
762        control.request_stop();
763        thread::sleep(Duration::from_millis(120));
764
765        #[cfg(target_os = "linux")]
766        let connect_result = unix_socket::try_connect_unix(&instance_name);
767
768        #[cfg(not(target_os = "linux"))]
769        let connect_result = TcpStream::connect(format!("127.0.0.1:{}", port));
770
771        if let Ok(stream) = connect_result {
772            drop(stream);
773        }
774
775        match rx.recv_timeout(Duration::from_millis(200)) {
776            Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => {}
777            other => panic!("expected no InterfaceUp after server stop, got {:?}", other),
778        }
779    }
780
781    #[test]
782    fn client_send_receive() {
783        let port = find_free_port();
784        let (server_tx, server_rx) = crate::event::channel();
785        let next_id = Arc::new(AtomicU64::new(7200));
786
787        let server_config = LocalServerConfig {
788            instance_name: "test-sr".into(),
789            port,
790            interface_id: InterfaceId(72),
791        };
792
793        start_server(server_config, server_tx, next_id).unwrap();
794        thread::sleep(Duration::from_millis(50));
795
796        // Connect client
797        let (client_tx, client_rx) = crate::event::channel();
798        let client_config = LocalClientConfig {
799            name: "test-client".into(),
800            instance_name: "test-sr".into(),
801            port,
802            interface_id: InterfaceId(73),
803            reconnect_wait: Duration::from_secs(1),
804        };
805
806        let mut client_writer = start_client(client_config, client_tx).unwrap();
807
808        // Get server-side InterfaceUp
809        let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
810        let mut server_writer = match event {
811            Event::InterfaceUp(_, Some(w), _) => w,
812            other => panic!("expected InterfaceUp with writer, got {:?}", other),
813        };
814
815        // Get client-side InterfaceUp
816        let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
817        match event {
818            Event::InterfaceUp(id, _, _) => assert_eq!(id, InterfaceId(73)),
819            other => panic!("expected InterfaceUp, got {:?}", other),
820        }
821
822        // Client sends to server
823        let payload: Vec<u8> = (0..32).collect();
824        client_writer.send_frame(&payload).unwrap();
825
826        let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
827        match event {
828            Event::Frame { data, .. } => assert_eq!(data, payload),
829            other => panic!("expected Frame, got {:?}", other),
830        }
831
832        // Server sends to client
833        let payload2: Vec<u8> = (100..132).collect();
834        server_writer.send_frame(&payload2).unwrap();
835
836        let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
837        match event {
838            Event::Frame { data, .. } => assert_eq!(data, payload2),
839            other => panic!("expected Frame, got {:?}", other),
840        }
841    }
842
843    #[test]
844    fn multiple_local_clients() {
845        let port = find_free_port();
846        let instance_name = "test-multi".to_string();
847        let (tx, rx) = crate::event::channel();
848        let next_id = Arc::new(AtomicU64::new(7300));
849
850        let config = LocalServerConfig {
851            instance_name: instance_name.clone(),
852            port,
853            interface_id: InterfaceId(74),
854        };
855
856        start_server(config, tx, next_id).unwrap();
857        thread::sleep(Duration::from_millis(50));
858
859        connect_test_client(&instance_name, port);
860        connect_test_client(&instance_name, port);
861
862        let mut ids = Vec::new();
863        for _ in 0..2 {
864            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
865            match event {
866                Event::InterfaceUp(id, _, _) => ids.push(id),
867                other => panic!("expected InterfaceUp, got {:?}", other),
868            }
869        }
870
871        assert_eq!(ids.len(), 2);
872        assert_ne!(ids[0], ids[1]);
873    }
874
875    #[test]
876    fn client_disconnect_detected() {
877        let port = find_free_port();
878        let instance_name = "test-dc".to_string();
879        let (tx, rx) = crate::event::channel();
880        let next_id = Arc::new(AtomicU64::new(7400));
881
882        let config = LocalServerConfig {
883            instance_name: instance_name.clone(),
884            port,
885            interface_id: InterfaceId(75),
886        };
887
888        start_server(config, tx, next_id).unwrap();
889        thread::sleep(Duration::from_millis(50));
890
891        #[cfg(target_os = "linux")]
892        let client = unix_socket::try_connect_unix(&instance_name).unwrap();
893
894        #[cfg(not(target_os = "linux"))]
895        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
896
897        // Drain InterfaceUp
898        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
899
900        // Disconnect
901        drop(client);
902
903        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
904        assert!(
905            matches!(event, Event::InterfaceDown(_)),
906            "expected InterfaceDown, got {:?}",
907            event
908        );
909    }
910
911    #[test]
912    fn client_reconnects_after_tcp_restart() {
913        let port = find_free_port();
914        let addr = format!("127.0.0.1:{}", port);
915        let instance_name = format!("test-reconnect-{}", port);
916
917        let listener1 = TcpListener::bind(&addr).unwrap();
918        let (accepted1_tx, accepted1_rx) = mpsc::channel();
919        thread::spawn(move || {
920            let (stream, _) = listener1.accept().unwrap();
921            accepted1_tx.send(stream).unwrap();
922        });
923
924        let (client_tx, client_rx) = crate::event::channel();
925        let client_config = LocalClientConfig {
926            name: "test-client".into(),
927            instance_name,
928            port,
929            interface_id: InterfaceId(76),
930            reconnect_wait: Duration::from_millis(50),
931        };
932
933        let _writer = start_client(client_config, client_tx).unwrap();
934        let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
935        assert!(matches!(
936            event,
937            Event::InterfaceUp(InterfaceId(76), None, None)
938        ));
939
940        let stream1 = accepted1_rx.recv_timeout(Duration::from_secs(2)).unwrap();
941        drop(stream1);
942
943        let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
944        assert!(matches!(event, Event::InterfaceDown(InterfaceId(76))));
945
946        let listener2 = TcpListener::bind(&addr).unwrap();
947        let (accepted2_tx, accepted2_rx) = mpsc::channel();
948        thread::spawn(move || {
949            let (stream, _) = listener2.accept().unwrap();
950            accepted2_tx.send(stream).unwrap();
951        });
952
953        let mut reconnected_writer = None;
954        for _ in 0..10 {
955            let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
956            match event {
957                Event::InterfaceUp(InterfaceId(76), writer, None) if writer.is_some() => {
958                    reconnected_writer = writer;
959                    break;
960                }
961                _ => {}
962            }
963        }
964
965        let mut reconnected_writer = reconnected_writer.expect("missing reconnect writer");
966        let mut stream2 = accepted2_rx.recv_timeout(Duration::from_secs(2)).unwrap();
967        reconnected_writer.send_frame(b"client->server").unwrap();
968        stream2
969            .set_read_timeout(Some(Duration::from_secs(2)))
970            .unwrap();
971        let mut buf = [0u8; 64];
972        let n = stream2.read(&mut buf).unwrap();
973        assert!(n > 0, "expected bytes from refreshed writer");
974    }
975
976    #[cfg(target_os = "linux")]
977    #[test]
978    fn unix_abstract_socket_helpers_work() {
979        let instance_name = format!(
980            "test-abstract-{}",
981            std::time::SystemTime::now()
982                .duration_since(std::time::UNIX_EPOCH)
983                .unwrap()
984                .as_nanos()
985        );
986
987        let listener = unix_socket::try_bind_unix(&instance_name).unwrap();
988        let accept_thread = thread::spawn(move || listener.accept().unwrap().0);
989
990        let mut client = unix_socket::try_connect_unix(&instance_name).unwrap();
991        let mut server = accept_thread.join().unwrap();
992
993        client.write_all(b"ping").unwrap();
994        let mut buf = [0u8; 4];
995        server.read_exact(&mut buf).unwrap();
996        assert_eq!(&buf, b"ping");
997    }
998}