1use std::io::{self, Read, Write};
12use std::net::{TcpListener, TcpStream};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::{Arc, Mutex};
15use std::thread;
16use std::time::{Duration, Instant};
17
18use rns_core::constants;
19use rns_core::transport::types::{InterfaceId, InterfaceInfo};
20
21use crate::event::{Event, EventSender};
22use crate::hdlc;
23use crate::interface::{ListenerControl, Writer};
24
25#[cfg(target_os = "android")]
26const CLIENT_SLEEP_PAUSE_TIMEOUT: Duration = Duration::from_secs(12);
27#[cfg(target_os = "android")]
28const PHY_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(5);
29
30#[derive(Debug, Clone)]
32pub struct LocalServerConfig {
33 pub instance_name: String,
34 pub port: u16,
35 pub interface_id: InterfaceId,
36}
37
38impl Default for LocalServerConfig {
39 fn default() -> Self {
40 LocalServerConfig {
41 instance_name: "default".into(),
42 port: 37428,
43 interface_id: InterfaceId(0),
44 }
45 }
46}
47
48#[derive(Debug, Clone)]
50pub struct LocalClientConfig {
51 pub name: String,
52 pub instance_name: String,
53 pub port: u16,
54 pub interface_id: InterfaceId,
55 pub reconnect_wait: Duration,
56}
57
58impl Default for LocalClientConfig {
59 fn default() -> Self {
60 LocalClientConfig {
61 name: "Local shared instance".into(),
62 instance_name: "default".into(),
63 port: 37428,
64 interface_id: InterfaceId(0),
65 reconnect_wait: Duration::from_secs(8),
66 }
67 }
68}
69
70struct LocalWriter {
72 stream: TcpStream,
73 sleep_hold: Option<ClientSleepHold>,
74}
75
76impl Writer for LocalWriter {
77 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
78 if self
79 .sleep_hold
80 .as_ref()
81 .is_some_and(ClientSleepHold::should_drop_outbound)
82 {
83 log::debug!("TX paused for LocalInterface client, dropping outbound packet");
84 return Ok(());
85 }
86 self.stream.write_all(&hdlc::frame(data))
87 }
88}
89
90#[derive(Clone)]
91struct ClientSleepHold {
92 timeout: Duration,
93 deadline: Arc<Mutex<Instant>>,
94}
95
96impl ClientSleepHold {
97 #[cfg_attr(not(any(target_os = "android", test)), allow(dead_code))]
98 fn new(timeout: Duration) -> Self {
99 Self {
100 timeout,
101 deadline: Arc::new(Mutex::new(Instant::now() + timeout)),
102 }
103 }
104
105 fn refresh(&self) {
106 *lock_or_recover(&self.deadline) = Instant::now() + self.timeout;
107 }
108
109 fn should_drop_outbound(&self) -> bool {
110 Instant::now() > *lock_or_recover(&self.deadline)
111 }
112}
113
114fn lock_or_recover<T>(mutex: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
115 match mutex.lock() {
116 Ok(guard) => guard,
117 Err(poisoned) => poisoned.into_inner(),
118 }
119}
120
121fn android_client_sleep_hold() -> Option<ClientSleepHold> {
122 #[cfg(target_os = "android")]
123 {
124 Some(ClientSleepHold::new(CLIENT_SLEEP_PAUSE_TIMEOUT))
125 }
126 #[cfg(not(target_os = "android"))]
127 {
128 None
129 }
130}
131
132#[cfg_attr(not(any(target_os = "android", test)), allow(dead_code))]
133fn spawn_physical_keepalive_loop(
134 mut writer: Box<dyn Writer>,
135 interface_id: InterfaceId,
136 interface_name: String,
137 interval: Duration,
138) {
139 thread::Builder::new()
140 .name(format!("local-phy-keepalive-{}", interface_id.0))
141 .spawn(move || loop {
142 thread::sleep(interval);
143 if let Err(err) = writer.send_frame(&[]) {
144 log::debug!(
145 "[{}:{}] LocalInterface physical keepalive stopped: {}",
146 interface_name,
147 interface_id.0,
148 err
149 );
150 return;
151 }
152 })
153 .ok();
154}
155
156fn maybe_spawn_local_client_phy_keepalive(
157 stream: &LocalClientStream,
158 interface_id: InterfaceId,
159 interface_name: &str,
160) -> io::Result<()> {
161 #[cfg(target_os = "android")]
162 {
163 let writer = local_client_stream_writer(stream)?;
164 spawn_physical_keepalive_loop(
165 writer,
166 interface_id,
167 interface_name.to_string(),
168 PHY_KEEPALIVE_INTERVAL,
169 );
170 }
171
172 #[cfg(not(target_os = "android"))]
173 {
174 let _ = (stream, interface_id, interface_name);
175 }
176
177 Ok(())
178}
179
180#[cfg(target_os = "linux")]
181mod unix_socket {
182 use std::io;
183 use std::os::linux::net::SocketAddrExt;
184 use std::os::unix::net::{SocketAddr, UnixListener, UnixStream};
185
186 fn abstract_addr(instance_name: &str) -> io::Result<SocketAddr> {
187 SocketAddr::from_abstract_name(format!("rns/{}", instance_name))
188 }
189
190 pub fn try_bind_unix(instance_name: &str) -> io::Result<UnixListener> {
192 let addr = abstract_addr(instance_name)?;
193 UnixListener::bind_addr(&addr)
194 }
195
196 pub fn try_connect_unix(instance_name: &str) -> io::Result<UnixStream> {
198 let addr = abstract_addr(instance_name)?;
199 UnixStream::connect_addr(&addr)
200 }
201}
202
203pub fn start_server(
209 config: LocalServerConfig,
210 tx: EventSender,
211 next_id: Arc<AtomicU64>,
212) -> io::Result<ListenerControl> {
213 let control = ListenerControl::new();
214 #[cfg(target_os = "linux")]
216 {
217 match unix_socket::try_bind_unix(&config.instance_name) {
218 Ok(listener) => {
219 listener.set_nonblocking(true)?;
220 log::info!(
221 "Local server using Unix socket: rns/{}",
222 config.instance_name
223 );
224 let name = format!("rns/{}", config.instance_name);
225 let listener_control = control.clone();
226 thread::Builder::new()
227 .name("local-server".into())
228 .spawn(move || {
229 unix_server_loop(listener, name, tx, next_id, listener_control);
230 })?;
231 return Ok(control);
232 }
233 Err(e) => {
234 log::info!("Unix socket bind failed ({}), falling back to TCP", e);
235 }
236 }
237 }
238
239 let addr = format!("127.0.0.1:{}", config.port);
241 let listener = TcpListener::bind(&addr)?;
242 listener.set_nonblocking(true)?;
243
244 log::info!("Local server listening on TCP {}", addr);
245
246 let listener_control = control.clone();
247 thread::Builder::new()
248 .name("local-server".into())
249 .spawn(move || {
250 tcp_server_loop(listener, tx, next_id, listener_control);
251 })?;
252
253 Ok(control)
254}
255
256fn tcp_server_loop(
258 listener: TcpListener,
259 tx: EventSender,
260 next_id: Arc<AtomicU64>,
261 control: ListenerControl,
262) {
263 loop {
264 if control.should_stop() {
265 log::info!("Local TCP listener stopping");
266 return;
267 }
268
269 let stream_result = listener.accept().map(|(stream, _)| stream);
270 let stream = match stream_result {
271 Ok(s) => s,
272 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
273 thread::sleep(Duration::from_millis(50));
274 continue;
275 }
276 Err(e) => {
277 log::warn!("Local server accept failed: {}", e);
278 continue;
279 }
280 };
281
282 if let Err(e) = stream.set_nodelay(true) {
283 log::warn!("Local server set_nodelay failed: {}", e);
284 }
285
286 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
287 spawn_local_client_handler(stream, client_id, tx.clone());
288 }
289}
290
291#[cfg(target_os = "linux")]
293fn unix_server_loop(
294 listener: std::os::unix::net::UnixListener,
295 name: String,
296 tx: EventSender,
297 next_id: Arc<AtomicU64>,
298 control: ListenerControl,
299) {
300 loop {
301 if control.should_stop() {
302 log::info!("[{}] Local Unix listener stopping", name);
303 return;
304 }
305
306 let stream_result = listener.accept().map(|(stream, _)| stream);
307 let stream = match stream_result {
308 Ok(s) => s,
309 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
310 thread::sleep(Duration::from_millis(50));
311 continue;
312 }
313 Err(e) => {
314 log::warn!("[{}] Local server accept failed: {}", name, e);
315 continue;
316 }
317 };
318
319 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
320
321 let writer_stream = match stream.try_clone() {
323 Ok(s) => s,
324 Err(e) => {
325 log::warn!("Local server clone failed: {}", e);
326 continue;
327 }
328 };
329
330 let sleep_hold = android_client_sleep_hold();
331 let info = make_local_interface_info(client_id);
332 let writer: Box<dyn Writer> = Box::new(UnixLocalWriter {
333 stream: writer_stream,
334 sleep_hold: sleep_hold.clone(),
335 });
336
337 if tx
338 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
339 .is_err()
340 {
341 return;
342 }
343
344 let client_tx = tx.clone();
345 thread::Builder::new()
346 .name(format!("local-unix-reader-{}", client_id.0))
347 .spawn(move || {
348 unix_reader_loop(stream, client_id, client_tx, sleep_hold);
349 })
350 .ok();
351 }
352}
353
354#[cfg(target_os = "linux")]
355struct UnixLocalWriter {
356 stream: std::os::unix::net::UnixStream,
357 sleep_hold: Option<ClientSleepHold>,
358}
359
360#[cfg(target_os = "linux")]
361impl Writer for UnixLocalWriter {
362 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
363 use std::io::Write;
364 if self
365 .sleep_hold
366 .as_ref()
367 .is_some_and(ClientSleepHold::should_drop_outbound)
368 {
369 log::debug!("TX paused for LocalInterface client, dropping outbound packet");
370 return Ok(());
371 }
372 self.stream.write_all(&hdlc::frame(data))
373 }
374}
375
376#[cfg(target_os = "linux")]
377fn unix_reader_loop(
378 mut stream: std::os::unix::net::UnixStream,
379 id: InterfaceId,
380 tx: EventSender,
381 sleep_hold: Option<ClientSleepHold>,
382) {
383 use std::io::Read;
384 let mut decoder = hdlc::Decoder::new();
385 let mut buf = [0u8; 4096];
386
387 loop {
388 match stream.read(&mut buf) {
389 Ok(0) => {
390 let _ = tx.send(Event::InterfaceDown(id));
391 return;
392 }
393 Ok(n) => {
394 if let Some(ref sleep_hold) = sleep_hold {
395 sleep_hold.refresh();
396 }
397 for frame in decoder.feed(&buf[..n]) {
398 if tx
399 .send(Event::Frame {
400 interface_id: id,
401 data: frame,
402 })
403 .is_err()
404 {
405 return;
406 }
407 }
408 }
409 Err(_) => {
410 let _ = tx.send(Event::InterfaceDown(id));
411 return;
412 }
413 }
414 }
415}
416
417fn spawn_local_client_handler(stream: TcpStream, client_id: InterfaceId, tx: EventSender) {
419 let writer_stream = match stream.try_clone() {
420 Ok(s) => s,
421 Err(e) => {
422 log::warn!("Local server clone failed: {}", e);
423 return;
424 }
425 };
426
427 let sleep_hold = android_client_sleep_hold();
428 let info = make_local_interface_info(client_id);
429 let writer: Box<dyn Writer> = Box::new(LocalWriter {
430 stream: writer_stream,
431 sleep_hold: sleep_hold.clone(),
432 });
433
434 if tx
435 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
436 .is_err()
437 {
438 return;
439 }
440
441 thread::Builder::new()
442 .name(format!("local-reader-{}", client_id.0))
443 .spawn(move || {
444 tcp_reader_loop(stream, client_id, tx, sleep_hold);
445 })
446 .ok();
447}
448
449fn tcp_reader_loop(
450 mut stream: TcpStream,
451 id: InterfaceId,
452 tx: EventSender,
453 sleep_hold: Option<ClientSleepHold>,
454) {
455 let mut decoder = hdlc::Decoder::new();
456 let mut buf = [0u8; 4096];
457
458 loop {
459 match stream.read(&mut buf) {
460 Ok(0) => {
461 log::info!("Local client {} disconnected", id.0);
462 let _ = tx.send(Event::InterfaceDown(id));
463 return;
464 }
465 Ok(n) => {
466 if let Some(ref sleep_hold) = sleep_hold {
467 sleep_hold.refresh();
468 }
469 for frame in decoder.feed(&buf[..n]) {
470 if tx
471 .send(Event::Frame {
472 interface_id: id,
473 data: frame,
474 })
475 .is_err()
476 {
477 return;
478 }
479 }
480 }
481 Err(e) => {
482 log::warn!("Local client {} read error: {}", id.0, e);
483 let _ = tx.send(Event::InterfaceDown(id));
484 return;
485 }
486 }
487 }
488}
489
490fn make_local_interface_info(id: InterfaceId) -> InterfaceInfo {
491 InterfaceInfo {
492 id,
493 name: String::from("LocalInterface"),
494 mode: constants::MODE_FULL,
495 out_capable: true,
496 in_capable: true,
497 bitrate: Some(1_000_000_000), airtime_profile: None,
499 announce_rate_target: None,
500 announce_rate_grace: 0,
501 announce_rate_penalty: 0.0,
502 announce_cap: constants::ANNOUNCE_CAP,
503 is_local_client: false,
504 wants_tunnel: false,
505 tunnel_id: None,
506 mtu: 65535,
507 ia_freq: 0.0,
508 started: 0.0,
509 ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
510 }
511}
512
513#[cfg(target_os = "linux")]
516enum LocalClientStream {
517 Unix(std::os::unix::net::UnixStream),
518 Tcp(TcpStream),
519}
520
521#[cfg(target_os = "linux")]
522impl LocalClientStream {
523 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
524 match self {
525 LocalClientStream::Unix(stream) => stream.read(buf),
526 LocalClientStream::Tcp(stream) => stream.read(buf),
527 }
528 }
529
530 fn writer(&self) -> io::Result<Box<dyn Writer>> {
531 match self {
532 LocalClientStream::Unix(stream) => Ok(Box::new(UnixLocalWriter {
533 stream: stream.try_clone()?,
534 sleep_hold: None,
535 })),
536 LocalClientStream::Tcp(stream) => Ok(Box::new(LocalWriter {
537 stream: stream.try_clone()?,
538 sleep_hold: None,
539 })),
540 }
541 }
542}
543
544#[cfg(not(target_os = "linux"))]
545type LocalClientStream = TcpStream;
546
547#[cfg(not(target_os = "linux"))]
548fn local_client_stream_writer(stream: &LocalClientStream) -> io::Result<Box<dyn Writer>> {
549 Ok(Box::new(LocalWriter {
550 stream: stream.try_clone()?,
551 sleep_hold: None,
552 }))
553}
554
555#[cfg(target_os = "linux")]
556fn local_client_stream_writer(stream: &LocalClientStream) -> io::Result<Box<dyn Writer>> {
557 stream.writer()
558}
559
560fn try_connect_tcp(config: &LocalClientConfig) -> io::Result<TcpStream> {
561 let addr = format!("127.0.0.1:{}", config.port);
562 let stream = TcpStream::connect(&addr)?;
563 stream.set_nodelay(true)?;
564 log::info!(
565 "[{}] Connected to shared instance via TCP {}",
566 config.name,
567 addr
568 );
569 Ok(stream)
570}
571
572#[cfg(target_os = "linux")]
573fn try_connect_local_client(config: &LocalClientConfig) -> io::Result<LocalClientStream> {
574 match unix_socket::try_connect_unix(&config.instance_name) {
575 Ok(stream) => {
576 log::info!(
577 "[{}] Connected to shared instance via Unix socket: rns/{}",
578 config.name,
579 config.instance_name
580 );
581 Ok(LocalClientStream::Unix(stream))
582 }
583 Err(e) => {
584 log::info!(
585 "[{}] Unix socket connect failed ({}), trying TCP",
586 config.name,
587 e
588 );
589 try_connect_tcp(config).map(LocalClientStream::Tcp)
590 }
591 }
592}
593
594#[cfg(not(target_os = "linux"))]
595fn try_connect_local_client(config: &LocalClientConfig) -> io::Result<LocalClientStream> {
596 try_connect_tcp(config)
597}
598
599fn reconnect_local_client(config: &LocalClientConfig, tx: &EventSender) -> LocalClientStream {
600 loop {
601 thread::sleep(config.reconnect_wait);
602 match try_connect_local_client(config) {
603 Ok(stream) => match local_client_stream_writer(&stream) {
604 Ok(writer) => {
605 let _ = maybe_spawn_local_client_phy_keepalive(
606 &stream,
607 config.interface_id,
608 &config.name,
609 );
610 let _ = tx.send(Event::InterfaceUp(config.interface_id, Some(writer), None));
611 return stream;
612 }
613 Err(e) => {
614 log::warn!("[{}] failed to clone reconnect writer: {}", config.name, e);
615 }
616 },
617 Err(e) => {
618 log::warn!("[{}] reconnect failed: {}", config.name, e);
619 }
620 }
621 }
622}
623
624fn local_client_reader_loop(
625 mut stream: LocalClientStream,
626 config: LocalClientConfig,
627 tx: EventSender,
628) {
629 let id = config.interface_id;
630 let mut decoder = hdlc::Decoder::new();
631 let mut buf = [0u8; 4096];
632
633 loop {
634 match stream.read(&mut buf) {
635 Ok(0) => {
636 log::warn!("[{}] shared connection closed", config.name);
637 let _ = tx.send(Event::InterfaceDown(id));
638 stream = reconnect_local_client(&config, &tx);
639 decoder = hdlc::Decoder::new();
640 }
641 Ok(n) => {
642 for frame in decoder.feed(&buf[..n]) {
643 if tx
644 .send(Event::Frame {
645 interface_id: id,
646 data: frame,
647 })
648 .is_err()
649 {
650 return;
651 }
652 }
653 }
654 Err(e) => {
655 log::warn!("[{}] shared read error: {}", config.name, e);
656 let _ = tx.send(Event::InterfaceDown(id));
657 stream = reconnect_local_client(&config, &tx);
658 decoder = hdlc::Decoder::new();
659 }
660 }
661 }
662}
663
664pub fn start_client(config: LocalClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
668 let id = config.interface_id;
669 let stream = try_connect_local_client(&config)?;
670 let writer = local_client_stream_writer(&stream)?;
671 maybe_spawn_local_client_phy_keepalive(&stream, id, &config.name)?;
672
673 let _ = tx.send(Event::InterfaceUp(id, None, None));
674
675 thread::Builder::new()
676 .name(format!("local-client-reader-{}", id.0))
677 .spawn(move || {
678 local_client_reader_loop(stream, config, tx);
679 })?;
680
681 Ok(writer)
682}
683
684use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
687use std::collections::HashMap;
688
689pub struct LocalServerFactory;
691
692impl InterfaceFactory for LocalServerFactory {
693 fn type_name(&self) -> &str {
694 "LocalServerInterface"
695 }
696
697 fn parse_config(
698 &self,
699 _name: &str,
700 id: InterfaceId,
701 params: &HashMap<String, String>,
702 ) -> Result<Box<dyn InterfaceConfigData>, String> {
703 let instance_name = params
704 .get("instance_name")
705 .cloned()
706 .unwrap_or_else(|| "default".into());
707 let port = params
708 .get("port")
709 .and_then(|v| v.parse().ok())
710 .unwrap_or(37428);
711
712 Ok(Box::new(LocalServerConfig {
713 instance_name,
714 port,
715 interface_id: id,
716 }))
717 }
718
719 fn start(
720 &self,
721 config: Box<dyn InterfaceConfigData>,
722 ctx: StartContext,
723 ) -> std::io::Result<StartResult> {
724 let server_config = *config
725 .into_any()
726 .downcast::<LocalServerConfig>()
727 .map_err(|_| {
728 std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
729 })?;
730
731 let control = start_server(server_config, ctx.tx, ctx.next_dynamic_id)?;
732 Ok(StartResult::Listener {
733 control: Some(control),
734 })
735 }
736}
737
738pub struct LocalClientFactory;
740
741impl InterfaceFactory for LocalClientFactory {
742 fn type_name(&self) -> &str {
743 "LocalClientInterface"
744 }
745
746 fn parse_config(
747 &self,
748 _name: &str,
749 id: InterfaceId,
750 params: &HashMap<String, String>,
751 ) -> Result<Box<dyn InterfaceConfigData>, String> {
752 let instance_name = params
753 .get("instance_name")
754 .cloned()
755 .unwrap_or_else(|| "default".into());
756 let port = params
757 .get("port")
758 .and_then(|v| v.parse().ok())
759 .unwrap_or(37428);
760
761 Ok(Box::new(LocalClientConfig {
762 instance_name,
763 port,
764 interface_id: id,
765 ..LocalClientConfig::default()
766 }))
767 }
768
769 fn start(
770 &self,
771 config: Box<dyn InterfaceConfigData>,
772 ctx: StartContext,
773 ) -> std::io::Result<StartResult> {
774 let client_config = *config
775 .into_any()
776 .downcast::<LocalClientConfig>()
777 .map_err(|_| {
778 std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
779 })?;
780
781 let id = client_config.interface_id;
782 let name = client_config.name.clone();
783 let info = InterfaceInfo {
784 id,
785 name,
786 mode: ctx.mode,
787 out_capable: true,
788 in_capable: true,
789 bitrate: Some(1_000_000_000),
790 airtime_profile: None,
791 announce_rate_target: None,
792 announce_rate_grace: 0,
793 announce_rate_penalty: 0.0,
794 announce_cap: rns_core::constants::ANNOUNCE_CAP,
795 is_local_client: false,
796 wants_tunnel: false,
797 tunnel_id: None,
798 mtu: 65535,
799 ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
800 ia_freq: 0.0,
801 started: crate::time::now(),
802 };
803
804 let writer = start_client(client_config, ctx.tx)?;
805
806 Ok(StartResult::Simple {
807 id,
808 info,
809 writer,
810 interface_type_name: "LocalInterface".to_string(),
811 })
812 }
813}
814
815#[cfg(test)]
816mod tests {
817 use super::*;
818 use std::sync::mpsc;
819 use std::sync::mpsc::RecvTimeoutError;
820
821 #[cfg(target_os = "linux")]
822 type TestClient = std::os::unix::net::UnixStream;
823
824 #[cfg(not(target_os = "linux"))]
825 type TestClient = TcpStream;
826
827 fn connect_test_client(instance_name: &str, _port: u16) -> TestClient {
828 #[cfg(target_os = "linux")]
829 {
830 unix_socket::try_connect_unix(instance_name).unwrap()
831 }
832
833 #[cfg(not(target_os = "linux"))]
834 {
835 TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap()
836 }
837 }
838
839 fn find_free_port() -> u16 {
840 TcpListener::bind("127.0.0.1:0")
841 .unwrap()
842 .local_addr()
843 .unwrap()
844 .port()
845 }
846
847 #[test]
848 fn server_bind_tcp() {
849 let port = find_free_port();
850 let instance_name = "test-bind".to_string();
851 let (tx, _rx) = crate::event::channel();
852 let next_id = Arc::new(AtomicU64::new(7000));
853
854 let config = LocalServerConfig {
855 instance_name: instance_name.clone(),
856 port,
857 interface_id: InterfaceId(70),
858 };
859
860 start_server(config, tx, next_id).unwrap();
863 thread::sleep(Duration::from_millis(50));
864
865 connect_test_client(&instance_name, port);
866 }
867
868 #[test]
869 fn server_accept_client() {
870 let port = find_free_port();
871 let instance_name = "test-accept".to_string();
872 let (tx, rx) = crate::event::channel();
873 let next_id = Arc::new(AtomicU64::new(7100));
874
875 let config = LocalServerConfig {
876 instance_name: instance_name.clone(),
877 port,
878 interface_id: InterfaceId(71),
879 };
880
881 start_server(config, tx, next_id).unwrap();
882 thread::sleep(Duration::from_millis(50));
883
884 connect_test_client(&instance_name, port);
885
886 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
887 match event {
888 Event::InterfaceUp(id, writer, info) => {
889 assert_eq!(id, InterfaceId(7100));
890 assert!(writer.is_some());
891 assert!(info.is_some());
892 }
893 other => panic!("expected InterfaceUp, got {:?}", other),
894 }
895 }
896
897 #[test]
898 fn server_stop_prevents_new_accepts() {
899 let port = find_free_port();
900 let instance_name = "test-stop".to_string();
901 let (tx, rx) = crate::event::channel();
902 let next_id = Arc::new(AtomicU64::new(7150));
903
904 let config = LocalServerConfig {
905 instance_name: instance_name.clone(),
906 port,
907 interface_id: InterfaceId(71),
908 };
909
910 let control = start_server(config, tx, next_id).unwrap();
911 thread::sleep(Duration::from_millis(50));
912 control.request_stop();
913 thread::sleep(Duration::from_millis(120));
914
915 #[cfg(target_os = "linux")]
916 let connect_result = unix_socket::try_connect_unix(&instance_name);
917
918 #[cfg(not(target_os = "linux"))]
919 let connect_result = TcpStream::connect(format!("127.0.0.1:{}", port));
920
921 if let Ok(stream) = connect_result {
922 drop(stream);
923 }
924
925 match rx.recv_timeout(Duration::from_millis(200)) {
926 Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => {}
927 other => panic!("expected no InterfaceUp after server stop, got {:?}", other),
928 }
929 }
930
931 #[test]
932 fn client_send_receive() {
933 let port = find_free_port();
934 let (server_tx, server_rx) = crate::event::channel();
935 let next_id = Arc::new(AtomicU64::new(7200));
936
937 let server_config = LocalServerConfig {
938 instance_name: "test-sr".into(),
939 port,
940 interface_id: InterfaceId(72),
941 };
942
943 start_server(server_config, server_tx, next_id).unwrap();
944 thread::sleep(Duration::from_millis(50));
945
946 let (client_tx, client_rx) = crate::event::channel();
948 let client_config = LocalClientConfig {
949 name: "test-client".into(),
950 instance_name: "test-sr".into(),
951 port,
952 interface_id: InterfaceId(73),
953 reconnect_wait: Duration::from_secs(1),
954 };
955
956 let mut client_writer = start_client(client_config, client_tx).unwrap();
957
958 let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
960 let mut server_writer = match event {
961 Event::InterfaceUp(_, Some(w), _) => w,
962 other => panic!("expected InterfaceUp with writer, got {:?}", other),
963 };
964
965 let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
967 match event {
968 Event::InterfaceUp(id, _, _) => assert_eq!(id, InterfaceId(73)),
969 other => panic!("expected InterfaceUp, got {:?}", other),
970 }
971
972 let payload: Vec<u8> = (0..32).collect();
974 client_writer.send_frame(&payload).unwrap();
975
976 let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
977 match event {
978 Event::Frame { data, .. } => assert_eq!(data, payload),
979 other => panic!("expected Frame, got {:?}", other),
980 }
981
982 let payload2: Vec<u8> = (100..132).collect();
984 server_writer.send_frame(&payload2).unwrap();
985
986 let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
987 match event {
988 Event::Frame { data, .. } => assert_eq!(data, payload2),
989 other => panic!("expected Frame, got {:?}", other),
990 }
991 }
992
993 #[test]
994 fn sleep_hold_drops_outbound_after_timeout_and_refresh_restores() {
995 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
996 let addr = listener.local_addr().unwrap();
997 let mut client = TcpStream::connect(addr).unwrap();
998 let (server_stream, _) = listener.accept().unwrap();
999 client
1000 .set_read_timeout(Some(Duration::from_millis(200)))
1001 .unwrap();
1002
1003 let sleep_hold = ClientSleepHold::new(Duration::from_millis(30));
1004 let mut writer = LocalWriter {
1005 stream: server_stream,
1006 sleep_hold: Some(sleep_hold.clone()),
1007 };
1008
1009 writer.send_frame(b"live").unwrap();
1010 let mut buf = [0u8; 16];
1011 let n = client.read(&mut buf).unwrap();
1012 assert!(n > 0);
1013
1014 thread::sleep(Duration::from_millis(50));
1015 writer.send_frame(b"drop").unwrap();
1016 let err = client.read(&mut buf).unwrap_err();
1017 assert!(
1018 matches!(
1019 err.kind(),
1020 io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut
1021 ),
1022 "paused writer should not emit bytes, got {err:?}"
1023 );
1024
1025 sleep_hold.refresh();
1026 writer.send_frame(b"again").unwrap();
1027 let n = client.read(&mut buf).unwrap();
1028 assert!(n > 0);
1029 }
1030
1031 #[test]
1032 fn tcp_reader_refreshes_sleep_hold_on_inbound_data() {
1033 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1034 let addr = listener.local_addr().unwrap();
1035 let mut client = TcpStream::connect(addr).unwrap();
1036 let (server_stream, _) = listener.accept().unwrap();
1037 let writer_stream = server_stream.try_clone().unwrap();
1038 client
1039 .set_read_timeout(Some(Duration::from_millis(200)))
1040 .unwrap();
1041
1042 let sleep_hold = ClientSleepHold::new(Duration::from_millis(30));
1043 let reader_sleep_hold = sleep_hold.clone();
1044 let (tx, rx) = crate::event::channel();
1045 thread::spawn(move || {
1046 tcp_reader_loop(server_stream, InterfaceId(98), tx, Some(reader_sleep_hold));
1047 });
1048
1049 let mut writer = LocalWriter {
1050 stream: writer_stream,
1051 sleep_hold: Some(sleep_hold),
1052 };
1053
1054 thread::sleep(Duration::from_millis(50));
1055 writer.send_frame(b"drop").unwrap();
1056 let mut buf = [0u8; 32];
1057 let err = client.read(&mut buf).unwrap_err();
1058 assert!(
1059 matches!(
1060 err.kind(),
1061 io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut
1062 ),
1063 "paused writer should not emit bytes, got {err:?}"
1064 );
1065
1066 let inbound = vec![0x42; constants::HEADER_MINSIZE];
1067 client.write_all(&hdlc::frame(&inbound)).unwrap();
1068 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1069 match event {
1070 Event::Frame { interface_id, data } => {
1071 assert_eq!(interface_id, InterfaceId(98));
1072 assert_eq!(data, inbound);
1073 }
1074 other => panic!("expected Frame, got {:?}", other),
1075 }
1076
1077 writer.send_frame(b"again").unwrap();
1078 let n = client.read(&mut buf).unwrap();
1079 assert!(n > 0);
1080 }
1081
1082 #[test]
1083 fn physical_keepalive_loop_sends_empty_hdlc_frame() {
1084 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1085 let addr = listener.local_addr().unwrap();
1086 let mut client = TcpStream::connect(addr).unwrap();
1087 let (server_stream, _) = listener.accept().unwrap();
1088 client
1089 .set_read_timeout(Some(Duration::from_millis(500)))
1090 .unwrap();
1091
1092 let writer: Box<dyn Writer> = Box::new(LocalWriter {
1093 stream: server_stream,
1094 sleep_hold: None,
1095 });
1096 spawn_physical_keepalive_loop(
1097 writer,
1098 InterfaceId(99),
1099 "test-local".into(),
1100 Duration::from_millis(10),
1101 );
1102
1103 let mut buf = [0u8; 2];
1104 client.read_exact(&mut buf).unwrap();
1105 assert_eq!(buf, [0x7E, 0x7E]);
1106 }
1107
1108 #[test]
1109 fn multiple_local_clients() {
1110 let port = find_free_port();
1111 let instance_name = "test-multi".to_string();
1112 let (tx, rx) = crate::event::channel();
1113 let next_id = Arc::new(AtomicU64::new(7300));
1114
1115 let config = LocalServerConfig {
1116 instance_name: instance_name.clone(),
1117 port,
1118 interface_id: InterfaceId(74),
1119 };
1120
1121 start_server(config, tx, next_id).unwrap();
1122 thread::sleep(Duration::from_millis(50));
1123
1124 let _client1 = connect_test_client(&instance_name, port);
1125 let _client2 = connect_test_client(&instance_name, port);
1126
1127 let mut ids = Vec::new();
1128 for _ in 0..2 {
1129 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1130 match event {
1131 Event::InterfaceUp(id, _, _) => ids.push(id),
1132 other => panic!("expected InterfaceUp, got {:?}", other),
1133 }
1134 }
1135
1136 assert_eq!(ids.len(), 2);
1137 assert_ne!(ids[0], ids[1]);
1138 }
1139
1140 #[test]
1141 fn client_disconnect_detected() {
1142 let port = find_free_port();
1143 let instance_name = "test-dc".to_string();
1144 let (tx, rx) = crate::event::channel();
1145 let next_id = Arc::new(AtomicU64::new(7400));
1146
1147 let config = LocalServerConfig {
1148 instance_name: instance_name.clone(),
1149 port,
1150 interface_id: InterfaceId(75),
1151 };
1152
1153 start_server(config, tx, next_id).unwrap();
1154 thread::sleep(Duration::from_millis(50));
1155
1156 #[cfg(target_os = "linux")]
1157 let client = unix_socket::try_connect_unix(&instance_name).unwrap();
1158
1159 #[cfg(not(target_os = "linux"))]
1160 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1161
1162 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1164
1165 drop(client);
1167
1168 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1169 assert!(
1170 matches!(event, Event::InterfaceDown(_)),
1171 "expected InterfaceDown, got {:?}",
1172 event
1173 );
1174 }
1175
1176 #[test]
1177 fn client_reconnects_after_tcp_restart() {
1178 let port = find_free_port();
1179 let addr = format!("127.0.0.1:{}", port);
1180 let instance_name = format!("test-reconnect-{}", port);
1181
1182 let listener1 = TcpListener::bind(&addr).unwrap();
1183 let (accepted1_tx, accepted1_rx) = mpsc::channel();
1184 thread::spawn(move || {
1185 let (stream, _) = listener1.accept().unwrap();
1186 accepted1_tx.send(stream).unwrap();
1187 });
1188
1189 let (client_tx, client_rx) = crate::event::channel();
1190 let client_config = LocalClientConfig {
1191 name: "test-client".into(),
1192 instance_name,
1193 port,
1194 interface_id: InterfaceId(76),
1195 reconnect_wait: Duration::from_millis(50),
1196 };
1197
1198 let _writer = start_client(client_config, client_tx).unwrap();
1199 let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1200 assert!(matches!(
1201 event,
1202 Event::InterfaceUp(InterfaceId(76), None, None)
1203 ));
1204
1205 let stream1 = accepted1_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1206 drop(stream1);
1207
1208 let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1209 assert!(matches!(event, Event::InterfaceDown(InterfaceId(76))));
1210
1211 let listener2 = TcpListener::bind(&addr).unwrap();
1212 let (accepted2_tx, accepted2_rx) = mpsc::channel();
1213 thread::spawn(move || {
1214 let (stream, _) = listener2.accept().unwrap();
1215 accepted2_tx.send(stream).unwrap();
1216 });
1217
1218 let mut reconnected_writer = None;
1219 for _ in 0..10 {
1220 let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1221 match event {
1222 Event::InterfaceUp(InterfaceId(76), writer, None) if writer.is_some() => {
1223 reconnected_writer = writer;
1224 break;
1225 }
1226 _ => {}
1227 }
1228 }
1229
1230 let mut reconnected_writer = reconnected_writer.expect("missing reconnect writer");
1231 let mut stream2 = accepted2_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1232 reconnected_writer.send_frame(b"client->server").unwrap();
1233 stream2
1234 .set_read_timeout(Some(Duration::from_secs(2)))
1235 .unwrap();
1236 let mut buf = [0u8; 64];
1237 let n = stream2.read(&mut buf).unwrap();
1238 assert!(n > 0, "expected bytes from refreshed writer");
1239 }
1240
1241 #[cfg(target_os = "linux")]
1242 #[test]
1243 fn unix_abstract_socket_helpers_work() {
1244 let instance_name = format!(
1245 "test-abstract-{}",
1246 std::time::SystemTime::now()
1247 .duration_since(std::time::UNIX_EPOCH)
1248 .unwrap()
1249 .as_nanos()
1250 );
1251
1252 let listener = unix_socket::try_bind_unix(&instance_name).unwrap();
1253 let accept_thread = thread::spawn(move || listener.accept().unwrap().0);
1254
1255 let mut client = unix_socket::try_connect_unix(&instance_name).unwrap();
1256 let mut server = accept_thread.join().unwrap();
1257
1258 client.write_all(b"ping").unwrap();
1259 let mut buf = [0u8; 4];
1260 server.read_exact(&mut buf).unwrap();
1261 assert_eq!(&buf, b"ping");
1262 }
1263}