1use std::io::{self, Read, Write};
12use std::net::{TcpListener, TcpStream};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use std::thread;
16use std::time::Duration;
17
18use rns_core::constants;
19use rns_core::transport::types::{InterfaceId, InterfaceInfo};
20
21use crate::event::{Event, EventSender};
22use crate::hdlc;
23use crate::interface::{ListenerControl, Writer};
24
25#[derive(Debug, Clone)]
27pub struct LocalServerConfig {
28 pub instance_name: String,
29 pub port: u16,
30 pub interface_id: InterfaceId,
31}
32
33impl Default for LocalServerConfig {
34 fn default() -> Self {
35 LocalServerConfig {
36 instance_name: "default".into(),
37 port: 37428,
38 interface_id: InterfaceId(0),
39 }
40 }
41}
42
43#[derive(Debug, Clone)]
45pub struct LocalClientConfig {
46 pub name: String,
47 pub instance_name: String,
48 pub port: u16,
49 pub interface_id: InterfaceId,
50 pub reconnect_wait: Duration,
51}
52
53impl Default for LocalClientConfig {
54 fn default() -> Self {
55 LocalClientConfig {
56 name: "Local shared instance".into(),
57 instance_name: "default".into(),
58 port: 37428,
59 interface_id: InterfaceId(0),
60 reconnect_wait: Duration::from_secs(8),
61 }
62 }
63}
64
65struct LocalWriter {
67 stream: TcpStream,
68}
69
70impl Writer for LocalWriter {
71 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
72 self.stream.write_all(&hdlc::frame(data))
73 }
74}
75
76#[cfg(target_os = "linux")]
77mod unix_socket {
78 use std::io;
79 use std::os::linux::net::SocketAddrExt;
80 use std::os::unix::net::{SocketAddr, UnixListener, UnixStream};
81
82 fn abstract_addr(instance_name: &str) -> io::Result<SocketAddr> {
83 SocketAddr::from_abstract_name(format!("rns/{}", instance_name))
84 }
85
86 pub fn try_bind_unix(instance_name: &str) -> io::Result<UnixListener> {
88 let addr = abstract_addr(instance_name)?;
89 UnixListener::bind_addr(&addr)
90 }
91
92 pub fn try_connect_unix(instance_name: &str) -> io::Result<UnixStream> {
94 let addr = abstract_addr(instance_name)?;
95 UnixStream::connect_addr(&addr)
96 }
97}
98
99pub fn start_server(
105 config: LocalServerConfig,
106 tx: EventSender,
107 next_id: Arc<AtomicU64>,
108) -> io::Result<ListenerControl> {
109 let control = ListenerControl::new();
110 #[cfg(target_os = "linux")]
112 {
113 match unix_socket::try_bind_unix(&config.instance_name) {
114 Ok(listener) => {
115 listener.set_nonblocking(true)?;
116 log::info!(
117 "Local server using Unix socket: rns/{}",
118 config.instance_name
119 );
120 let name = format!("rns/{}", config.instance_name);
121 let listener_control = control.clone();
122 thread::Builder::new()
123 .name("local-server".into())
124 .spawn(move || {
125 unix_server_loop(listener, name, tx, next_id, listener_control);
126 })?;
127 return Ok(control);
128 }
129 Err(e) => {
130 log::info!("Unix socket bind failed ({}), falling back to TCP", e);
131 }
132 }
133 }
134
135 let addr = format!("127.0.0.1:{}", config.port);
137 let listener = TcpListener::bind(&addr)?;
138 listener.set_nonblocking(true)?;
139
140 log::info!("Local server listening on TCP {}", addr);
141
142 let listener_control = control.clone();
143 thread::Builder::new()
144 .name("local-server".into())
145 .spawn(move || {
146 tcp_server_loop(listener, tx, next_id, listener_control);
147 })?;
148
149 Ok(control)
150}
151
152fn tcp_server_loop(
154 listener: TcpListener,
155 tx: EventSender,
156 next_id: Arc<AtomicU64>,
157 control: ListenerControl,
158) {
159 loop {
160 if control.should_stop() {
161 log::info!("Local TCP listener stopping");
162 return;
163 }
164
165 let stream_result = listener.accept().map(|(stream, _)| stream);
166 let stream = match stream_result {
167 Ok(s) => s,
168 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
169 thread::sleep(Duration::from_millis(50));
170 continue;
171 }
172 Err(e) => {
173 log::warn!("Local server accept failed: {}", e);
174 continue;
175 }
176 };
177
178 if let Err(e) = stream.set_nodelay(true) {
179 log::warn!("Local server set_nodelay failed: {}", e);
180 }
181
182 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
183 spawn_local_client_handler(stream, client_id, tx.clone());
184 }
185}
186
187#[cfg(target_os = "linux")]
189fn unix_server_loop(
190 listener: std::os::unix::net::UnixListener,
191 name: String,
192 tx: EventSender,
193 next_id: Arc<AtomicU64>,
194 control: ListenerControl,
195) {
196 loop {
197 if control.should_stop() {
198 log::info!("[{}] Local Unix listener stopping", name);
199 return;
200 }
201
202 let stream_result = listener.accept().map(|(stream, _)| stream);
203 let stream = match stream_result {
204 Ok(s) => s,
205 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
206 thread::sleep(Duration::from_millis(50));
207 continue;
208 }
209 Err(e) => {
210 log::warn!("[{}] Local server accept failed: {}", name, e);
211 continue;
212 }
213 };
214
215 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
216
217 let writer_stream = match stream.try_clone() {
219 Ok(s) => s,
220 Err(e) => {
221 log::warn!("Local server clone failed: {}", e);
222 continue;
223 }
224 };
225
226 let info = make_local_interface_info(client_id);
227 let writer: Box<dyn Writer> = Box::new(UnixLocalWriter {
228 stream: writer_stream,
229 });
230
231 if tx
232 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
233 .is_err()
234 {
235 return;
236 }
237
238 let client_tx = tx.clone();
239 thread::Builder::new()
240 .name(format!("local-unix-reader-{}", client_id.0))
241 .spawn(move || {
242 unix_reader_loop(stream, client_id, client_tx);
243 })
244 .ok();
245 }
246}
247
248#[cfg(target_os = "linux")]
249struct UnixLocalWriter {
250 stream: std::os::unix::net::UnixStream,
251}
252
253#[cfg(target_os = "linux")]
254impl Writer for UnixLocalWriter {
255 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
256 use std::io::Write;
257 self.stream.write_all(&hdlc::frame(data))
258 }
259}
260
261#[cfg(target_os = "linux")]
262fn unix_reader_loop(mut stream: std::os::unix::net::UnixStream, id: InterfaceId, tx: EventSender) {
263 use std::io::Read;
264 let mut decoder = hdlc::Decoder::new();
265 let mut buf = [0u8; 4096];
266
267 loop {
268 match stream.read(&mut buf) {
269 Ok(0) => {
270 let _ = tx.send(Event::InterfaceDown(id));
271 return;
272 }
273 Ok(n) => {
274 for frame in decoder.feed(&buf[..n]) {
275 if tx
276 .send(Event::Frame {
277 interface_id: id,
278 data: frame,
279 })
280 .is_err()
281 {
282 return;
283 }
284 }
285 }
286 Err(_) => {
287 let _ = tx.send(Event::InterfaceDown(id));
288 return;
289 }
290 }
291 }
292}
293
294fn spawn_local_client_handler(stream: TcpStream, client_id: InterfaceId, tx: EventSender) {
296 let writer_stream = match stream.try_clone() {
297 Ok(s) => s,
298 Err(e) => {
299 log::warn!("Local server clone failed: {}", e);
300 return;
301 }
302 };
303
304 let info = make_local_interface_info(client_id);
305 let writer: Box<dyn Writer> = Box::new(LocalWriter {
306 stream: writer_stream,
307 });
308
309 if tx
310 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
311 .is_err()
312 {
313 return;
314 }
315
316 thread::Builder::new()
317 .name(format!("local-reader-{}", client_id.0))
318 .spawn(move || {
319 tcp_reader_loop(stream, client_id, tx);
320 })
321 .ok();
322}
323
324fn tcp_reader_loop(mut stream: TcpStream, id: InterfaceId, tx: EventSender) {
325 let mut decoder = hdlc::Decoder::new();
326 let mut buf = [0u8; 4096];
327
328 loop {
329 match stream.read(&mut buf) {
330 Ok(0) => {
331 log::info!("Local client {} disconnected", id.0);
332 let _ = tx.send(Event::InterfaceDown(id));
333 return;
334 }
335 Ok(n) => {
336 for frame in decoder.feed(&buf[..n]) {
337 if tx
338 .send(Event::Frame {
339 interface_id: id,
340 data: frame,
341 })
342 .is_err()
343 {
344 return;
345 }
346 }
347 }
348 Err(e) => {
349 log::warn!("Local client {} read error: {}", id.0, e);
350 let _ = tx.send(Event::InterfaceDown(id));
351 return;
352 }
353 }
354 }
355}
356
357fn make_local_interface_info(id: InterfaceId) -> InterfaceInfo {
358 InterfaceInfo {
359 id,
360 name: String::from("LocalInterface"),
361 mode: constants::MODE_FULL,
362 out_capable: true,
363 in_capable: true,
364 bitrate: Some(1_000_000_000), announce_rate_target: None,
366 announce_rate_grace: 0,
367 announce_rate_penalty: 0.0,
368 announce_cap: constants::ANNOUNCE_CAP,
369 is_local_client: false,
370 wants_tunnel: false,
371 tunnel_id: None,
372 mtu: 65535,
373 ia_freq: 0.0,
374 started: 0.0,
375 ingress_control: false,
376 }
377}
378
379#[cfg(target_os = "linux")]
382enum LocalClientStream {
383 Unix(std::os::unix::net::UnixStream),
384 Tcp(TcpStream),
385}
386
387#[cfg(target_os = "linux")]
388impl LocalClientStream {
389 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
390 match self {
391 LocalClientStream::Unix(stream) => stream.read(buf),
392 LocalClientStream::Tcp(stream) => stream.read(buf),
393 }
394 }
395
396 fn writer(&self) -> io::Result<Box<dyn Writer>> {
397 match self {
398 LocalClientStream::Unix(stream) => Ok(Box::new(UnixLocalWriter {
399 stream: stream.try_clone()?,
400 })),
401 LocalClientStream::Tcp(stream) => Ok(Box::new(LocalWriter {
402 stream: stream.try_clone()?,
403 })),
404 }
405 }
406}
407
408#[cfg(not(target_os = "linux"))]
409type LocalClientStream = TcpStream;
410
411#[cfg(not(target_os = "linux"))]
412fn local_client_stream_writer(stream: &LocalClientStream) -> io::Result<Box<dyn Writer>> {
413 Ok(Box::new(LocalWriter {
414 stream: stream.try_clone()?,
415 }))
416}
417
418#[cfg(target_os = "linux")]
419fn local_client_stream_writer(stream: &LocalClientStream) -> io::Result<Box<dyn Writer>> {
420 stream.writer()
421}
422
423fn try_connect_tcp(config: &LocalClientConfig) -> io::Result<TcpStream> {
424 let addr = format!("127.0.0.1:{}", config.port);
425 let stream = TcpStream::connect(&addr)?;
426 stream.set_nodelay(true)?;
427 log::info!(
428 "[{}] Connected to shared instance via TCP {}",
429 config.name,
430 addr
431 );
432 Ok(stream)
433}
434
435#[cfg(target_os = "linux")]
436fn try_connect_local_client(config: &LocalClientConfig) -> io::Result<LocalClientStream> {
437 match unix_socket::try_connect_unix(&config.instance_name) {
438 Ok(stream) => {
439 log::info!(
440 "[{}] Connected to shared instance via Unix socket: rns/{}",
441 config.name,
442 config.instance_name
443 );
444 Ok(LocalClientStream::Unix(stream))
445 }
446 Err(e) => {
447 log::info!(
448 "[{}] Unix socket connect failed ({}), trying TCP",
449 config.name,
450 e
451 );
452 try_connect_tcp(config).map(LocalClientStream::Tcp)
453 }
454 }
455}
456
457#[cfg(not(target_os = "linux"))]
458fn try_connect_local_client(config: &LocalClientConfig) -> io::Result<LocalClientStream> {
459 try_connect_tcp(config)
460}
461
462fn reconnect_local_client(config: &LocalClientConfig, tx: &EventSender) -> LocalClientStream {
463 loop {
464 thread::sleep(config.reconnect_wait);
465 match try_connect_local_client(config) {
466 Ok(stream) => match local_client_stream_writer(&stream) {
467 Ok(writer) => {
468 let _ = tx.send(Event::InterfaceUp(config.interface_id, Some(writer), None));
469 return stream;
470 }
471 Err(e) => {
472 log::warn!("[{}] failed to clone reconnect writer: {}", config.name, e);
473 }
474 },
475 Err(e) => {
476 log::warn!("[{}] reconnect failed: {}", config.name, e);
477 }
478 }
479 }
480}
481
482fn local_client_reader_loop(
483 mut stream: LocalClientStream,
484 config: LocalClientConfig,
485 tx: EventSender,
486) {
487 let id = config.interface_id;
488 let mut decoder = hdlc::Decoder::new();
489 let mut buf = [0u8; 4096];
490
491 loop {
492 match stream.read(&mut buf) {
493 Ok(0) => {
494 log::warn!("[{}] shared connection closed", config.name);
495 let _ = tx.send(Event::InterfaceDown(id));
496 stream = reconnect_local_client(&config, &tx);
497 decoder = hdlc::Decoder::new();
498 }
499 Ok(n) => {
500 for frame in decoder.feed(&buf[..n]) {
501 if tx
502 .send(Event::Frame {
503 interface_id: id,
504 data: frame,
505 })
506 .is_err()
507 {
508 return;
509 }
510 }
511 }
512 Err(e) => {
513 log::warn!("[{}] shared read error: {}", config.name, e);
514 let _ = tx.send(Event::InterfaceDown(id));
515 stream = reconnect_local_client(&config, &tx);
516 decoder = hdlc::Decoder::new();
517 }
518 }
519 }
520}
521
522pub fn start_client(config: LocalClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
526 let id = config.interface_id;
527 let stream = try_connect_local_client(&config)?;
528 let writer = local_client_stream_writer(&stream)?;
529
530 let _ = tx.send(Event::InterfaceUp(id, None, None));
531
532 thread::Builder::new()
533 .name(format!("local-client-reader-{}", id.0))
534 .spawn(move || {
535 local_client_reader_loop(stream, config, tx);
536 })?;
537
538 Ok(writer)
539}
540
541use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
544use std::collections::HashMap;
545
546pub struct LocalServerFactory;
548
549impl InterfaceFactory for LocalServerFactory {
550 fn type_name(&self) -> &str {
551 "LocalServerInterface"
552 }
553
554 fn parse_config(
555 &self,
556 _name: &str,
557 id: InterfaceId,
558 params: &HashMap<String, String>,
559 ) -> Result<Box<dyn InterfaceConfigData>, String> {
560 let instance_name = params
561 .get("instance_name")
562 .cloned()
563 .unwrap_or_else(|| "default".into());
564 let port = params
565 .get("port")
566 .and_then(|v| v.parse().ok())
567 .unwrap_or(37428);
568
569 Ok(Box::new(LocalServerConfig {
570 instance_name,
571 port,
572 interface_id: id,
573 }))
574 }
575
576 fn start(
577 &self,
578 config: Box<dyn InterfaceConfigData>,
579 ctx: StartContext,
580 ) -> std::io::Result<StartResult> {
581 let server_config = *config
582 .into_any()
583 .downcast::<LocalServerConfig>()
584 .map_err(|_| {
585 std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
586 })?;
587
588 let control = start_server(server_config, ctx.tx, ctx.next_dynamic_id)?;
589 Ok(StartResult::Listener {
590 control: Some(control),
591 })
592 }
593}
594
595pub struct LocalClientFactory;
597
598impl InterfaceFactory for LocalClientFactory {
599 fn type_name(&self) -> &str {
600 "LocalClientInterface"
601 }
602
603 fn parse_config(
604 &self,
605 _name: &str,
606 id: InterfaceId,
607 params: &HashMap<String, String>,
608 ) -> Result<Box<dyn InterfaceConfigData>, String> {
609 let instance_name = params
610 .get("instance_name")
611 .cloned()
612 .unwrap_or_else(|| "default".into());
613 let port = params
614 .get("port")
615 .and_then(|v| v.parse().ok())
616 .unwrap_or(37428);
617
618 Ok(Box::new(LocalClientConfig {
619 instance_name,
620 port,
621 interface_id: id,
622 ..LocalClientConfig::default()
623 }))
624 }
625
626 fn start(
627 &self,
628 config: Box<dyn InterfaceConfigData>,
629 ctx: StartContext,
630 ) -> std::io::Result<StartResult> {
631 let client_config = *config
632 .into_any()
633 .downcast::<LocalClientConfig>()
634 .map_err(|_| {
635 std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
636 })?;
637
638 let id = client_config.interface_id;
639 let name = client_config.name.clone();
640 let info = InterfaceInfo {
641 id,
642 name,
643 mode: ctx.mode,
644 out_capable: true,
645 in_capable: true,
646 bitrate: Some(1_000_000_000),
647 announce_rate_target: None,
648 announce_rate_grace: 0,
649 announce_rate_penalty: 0.0,
650 announce_cap: rns_core::constants::ANNOUNCE_CAP,
651 is_local_client: false,
652 wants_tunnel: false,
653 tunnel_id: None,
654 mtu: 65535,
655 ingress_control: false,
656 ia_freq: 0.0,
657 started: crate::time::now(),
658 };
659
660 let writer = start_client(client_config, ctx.tx)?;
661
662 Ok(StartResult::Simple {
663 id,
664 info,
665 writer,
666 interface_type_name: "LocalInterface".to_string(),
667 })
668 }
669}
670
671#[cfg(test)]
672mod tests {
673 use super::*;
674 use std::sync::mpsc;
675 use std::sync::mpsc::RecvTimeoutError;
676
677 fn connect_test_client(instance_name: &str, _port: u16) {
678 #[cfg(target_os = "linux")]
679 {
680 let _client = unix_socket::try_connect_unix(instance_name).unwrap();
681 }
682
683 #[cfg(not(target_os = "linux"))]
684 {
685 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
686 }
687 }
688
689 fn find_free_port() -> u16 {
690 TcpListener::bind("127.0.0.1:0")
691 .unwrap()
692 .local_addr()
693 .unwrap()
694 .port()
695 }
696
697 #[test]
698 fn server_bind_tcp() {
699 let port = find_free_port();
700 let instance_name = "test-bind".to_string();
701 let (tx, _rx) = crate::event::channel();
702 let next_id = Arc::new(AtomicU64::new(7000));
703
704 let config = LocalServerConfig {
705 instance_name: instance_name.clone(),
706 port,
707 interface_id: InterfaceId(70),
708 };
709
710 start_server(config, tx, next_id).unwrap();
713 thread::sleep(Duration::from_millis(50));
714
715 connect_test_client(&instance_name, port);
716 }
717
718 #[test]
719 fn server_accept_client() {
720 let port = find_free_port();
721 let instance_name = "test-accept".to_string();
722 let (tx, rx) = crate::event::channel();
723 let next_id = Arc::new(AtomicU64::new(7100));
724
725 let config = LocalServerConfig {
726 instance_name: instance_name.clone(),
727 port,
728 interface_id: InterfaceId(71),
729 };
730
731 start_server(config, tx, next_id).unwrap();
732 thread::sleep(Duration::from_millis(50));
733
734 connect_test_client(&instance_name, port);
735
736 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
737 match event {
738 Event::InterfaceUp(id, writer, info) => {
739 assert_eq!(id, InterfaceId(7100));
740 assert!(writer.is_some());
741 assert!(info.is_some());
742 }
743 other => panic!("expected InterfaceUp, got {:?}", other),
744 }
745 }
746
747 #[test]
748 fn server_stop_prevents_new_accepts() {
749 let port = find_free_port();
750 let instance_name = "test-stop".to_string();
751 let (tx, rx) = crate::event::channel();
752 let next_id = Arc::new(AtomicU64::new(7150));
753
754 let config = LocalServerConfig {
755 instance_name: instance_name.clone(),
756 port,
757 interface_id: InterfaceId(71),
758 };
759
760 let control = start_server(config, tx, next_id).unwrap();
761 thread::sleep(Duration::from_millis(50));
762 control.request_stop();
763 thread::sleep(Duration::from_millis(120));
764
765 #[cfg(target_os = "linux")]
766 let connect_result = unix_socket::try_connect_unix(&instance_name);
767
768 #[cfg(not(target_os = "linux"))]
769 let connect_result = TcpStream::connect(format!("127.0.0.1:{}", port));
770
771 if let Ok(stream) = connect_result {
772 drop(stream);
773 }
774
775 match rx.recv_timeout(Duration::from_millis(200)) {
776 Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => {}
777 other => panic!("expected no InterfaceUp after server stop, got {:?}", other),
778 }
779 }
780
781 #[test]
782 fn client_send_receive() {
783 let port = find_free_port();
784 let (server_tx, server_rx) = crate::event::channel();
785 let next_id = Arc::new(AtomicU64::new(7200));
786
787 let server_config = LocalServerConfig {
788 instance_name: "test-sr".into(),
789 port,
790 interface_id: InterfaceId(72),
791 };
792
793 start_server(server_config, server_tx, next_id).unwrap();
794 thread::sleep(Duration::from_millis(50));
795
796 let (client_tx, client_rx) = crate::event::channel();
798 let client_config = LocalClientConfig {
799 name: "test-client".into(),
800 instance_name: "test-sr".into(),
801 port,
802 interface_id: InterfaceId(73),
803 reconnect_wait: Duration::from_secs(1),
804 };
805
806 let mut client_writer = start_client(client_config, client_tx).unwrap();
807
808 let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
810 let mut server_writer = match event {
811 Event::InterfaceUp(_, Some(w), _) => w,
812 other => panic!("expected InterfaceUp with writer, got {:?}", other),
813 };
814
815 let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
817 match event {
818 Event::InterfaceUp(id, _, _) => assert_eq!(id, InterfaceId(73)),
819 other => panic!("expected InterfaceUp, got {:?}", other),
820 }
821
822 let payload: Vec<u8> = (0..32).collect();
824 client_writer.send_frame(&payload).unwrap();
825
826 let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
827 match event {
828 Event::Frame { data, .. } => assert_eq!(data, payload),
829 other => panic!("expected Frame, got {:?}", other),
830 }
831
832 let payload2: Vec<u8> = (100..132).collect();
834 server_writer.send_frame(&payload2).unwrap();
835
836 let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
837 match event {
838 Event::Frame { data, .. } => assert_eq!(data, payload2),
839 other => panic!("expected Frame, got {:?}", other),
840 }
841 }
842
843 #[test]
844 fn multiple_local_clients() {
845 let port = find_free_port();
846 let instance_name = "test-multi".to_string();
847 let (tx, rx) = crate::event::channel();
848 let next_id = Arc::new(AtomicU64::new(7300));
849
850 let config = LocalServerConfig {
851 instance_name: instance_name.clone(),
852 port,
853 interface_id: InterfaceId(74),
854 };
855
856 start_server(config, tx, next_id).unwrap();
857 thread::sleep(Duration::from_millis(50));
858
859 connect_test_client(&instance_name, port);
860 connect_test_client(&instance_name, port);
861
862 let mut ids = Vec::new();
863 for _ in 0..2 {
864 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
865 match event {
866 Event::InterfaceUp(id, _, _) => ids.push(id),
867 other => panic!("expected InterfaceUp, got {:?}", other),
868 }
869 }
870
871 assert_eq!(ids.len(), 2);
872 assert_ne!(ids[0], ids[1]);
873 }
874
875 #[test]
876 fn client_disconnect_detected() {
877 let port = find_free_port();
878 let instance_name = "test-dc".to_string();
879 let (tx, rx) = crate::event::channel();
880 let next_id = Arc::new(AtomicU64::new(7400));
881
882 let config = LocalServerConfig {
883 instance_name: instance_name.clone(),
884 port,
885 interface_id: InterfaceId(75),
886 };
887
888 start_server(config, tx, next_id).unwrap();
889 thread::sleep(Duration::from_millis(50));
890
891 #[cfg(target_os = "linux")]
892 let client = unix_socket::try_connect_unix(&instance_name).unwrap();
893
894 #[cfg(not(target_os = "linux"))]
895 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
896
897 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
899
900 drop(client);
902
903 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
904 assert!(
905 matches!(event, Event::InterfaceDown(_)),
906 "expected InterfaceDown, got {:?}",
907 event
908 );
909 }
910
911 #[test]
912 fn client_reconnects_after_tcp_restart() {
913 let port = find_free_port();
914 let addr = format!("127.0.0.1:{}", port);
915 let instance_name = format!("test-reconnect-{}", port);
916
917 let listener1 = TcpListener::bind(&addr).unwrap();
918 let (accepted1_tx, accepted1_rx) = mpsc::channel();
919 thread::spawn(move || {
920 let (stream, _) = listener1.accept().unwrap();
921 accepted1_tx.send(stream).unwrap();
922 });
923
924 let (client_tx, client_rx) = crate::event::channel();
925 let client_config = LocalClientConfig {
926 name: "test-client".into(),
927 instance_name,
928 port,
929 interface_id: InterfaceId(76),
930 reconnect_wait: Duration::from_millis(50),
931 };
932
933 let _writer = start_client(client_config, client_tx).unwrap();
934 let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
935 assert!(matches!(
936 event,
937 Event::InterfaceUp(InterfaceId(76), None, None)
938 ));
939
940 let stream1 = accepted1_rx.recv_timeout(Duration::from_secs(2)).unwrap();
941 drop(stream1);
942
943 let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
944 assert!(matches!(event, Event::InterfaceDown(InterfaceId(76))));
945
946 let listener2 = TcpListener::bind(&addr).unwrap();
947 let (accepted2_tx, accepted2_rx) = mpsc::channel();
948 thread::spawn(move || {
949 let (stream, _) = listener2.accept().unwrap();
950 accepted2_tx.send(stream).unwrap();
951 });
952
953 let mut reconnected_writer = None;
954 for _ in 0..10 {
955 let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
956 match event {
957 Event::InterfaceUp(InterfaceId(76), writer, None) if writer.is_some() => {
958 reconnected_writer = writer;
959 break;
960 }
961 _ => {}
962 }
963 }
964
965 let mut reconnected_writer = reconnected_writer.expect("missing reconnect writer");
966 let mut stream2 = accepted2_rx.recv_timeout(Duration::from_secs(2)).unwrap();
967 reconnected_writer.send_frame(b"client->server").unwrap();
968 stream2
969 .set_read_timeout(Some(Duration::from_secs(2)))
970 .unwrap();
971 let mut buf = [0u8; 64];
972 let n = stream2.read(&mut buf).unwrap();
973 assert!(n > 0, "expected bytes from refreshed writer");
974 }
975
976 #[cfg(target_os = "linux")]
977 #[test]
978 fn unix_abstract_socket_helpers_work() {
979 let instance_name = format!(
980 "test-abstract-{}",
981 std::time::SystemTime::now()
982 .duration_since(std::time::UNIX_EPOCH)
983 .unwrap()
984 .as_nanos()
985 );
986
987 let listener = unix_socket::try_bind_unix(&instance_name).unwrap();
988 let accept_thread = thread::spawn(move || listener.accept().unwrap().0);
989
990 let mut client = unix_socket::try_connect_unix(&instance_name).unwrap();
991 let mut server = accept_thread.join().unwrap();
992
993 client.write_all(b"ping").unwrap();
994 let mut buf = [0u8; 4];
995 server.read_exact(&mut buf).unwrap();
996 assert_eq!(&buf, b"ping");
997 }
998}