1use std::io::{self, Read, Write};
12use std::net::{TcpListener, TcpStream};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::{Arc, Mutex};
15use std::thread;
16use std::time::{Duration, Instant};
17
18use rns_core::constants;
19use rns_core::transport::types::{InterfaceId, InterfaceInfo};
20
21use crate::event::{Event, EventSender};
22use crate::hdlc;
23use crate::interface::{ListenerControl, Writer};
24
25#[cfg(target_os = "android")]
26const CLIENT_SLEEP_PAUSE_TIMEOUT: Duration = Duration::from_secs(12);
27#[cfg(target_os = "android")]
28const PHY_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(5);
29
30#[derive(Debug, Clone)]
32pub struct LocalServerConfig {
33 pub instance_name: String,
34 pub port: u16,
35 pub interface_id: InterfaceId,
36}
37
38impl Default for LocalServerConfig {
39 fn default() -> Self {
40 LocalServerConfig {
41 instance_name: "default".into(),
42 port: 37428,
43 interface_id: InterfaceId(0),
44 }
45 }
46}
47
48#[derive(Debug, Clone)]
50pub struct LocalClientConfig {
51 pub name: String,
52 pub instance_name: String,
53 pub port: u16,
54 pub interface_id: InterfaceId,
55 pub reconnect_wait: Duration,
56}
57
58impl Default for LocalClientConfig {
59 fn default() -> Self {
60 LocalClientConfig {
61 name: "Local shared instance".into(),
62 instance_name: "default".into(),
63 port: 37428,
64 interface_id: InterfaceId(0),
65 reconnect_wait: Duration::from_secs(8),
66 }
67 }
68}
69
70struct LocalWriter {
72 stream: TcpStream,
73 sleep_hold: Option<ClientSleepHold>,
74}
75
76impl Writer for LocalWriter {
77 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
78 if self
79 .sleep_hold
80 .as_ref()
81 .is_some_and(ClientSleepHold::should_drop_outbound)
82 {
83 log::debug!("TX paused for LocalInterface client, dropping outbound packet");
84 return Ok(());
85 }
86 self.stream.write_all(&hdlc::frame(data))
87 }
88}
89
90#[derive(Clone)]
91struct ClientSleepHold {
92 timeout: Duration,
93 deadline: Arc<Mutex<Instant>>,
94}
95
96impl ClientSleepHold {
97 #[cfg_attr(not(any(target_os = "android", test)), allow(dead_code))]
98 fn new(timeout: Duration) -> Self {
99 Self {
100 timeout,
101 deadline: Arc::new(Mutex::new(Instant::now() + timeout)),
102 }
103 }
104
105 fn refresh(&self) {
106 *lock_or_recover(&self.deadline) = Instant::now() + self.timeout;
107 }
108
109 fn should_drop_outbound(&self) -> bool {
110 Instant::now() > *lock_or_recover(&self.deadline)
111 }
112}
113
114fn lock_or_recover<T>(mutex: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
115 match mutex.lock() {
116 Ok(guard) => guard,
117 Err(poisoned) => poisoned.into_inner(),
118 }
119}
120
121fn android_client_sleep_hold() -> Option<ClientSleepHold> {
122 #[cfg(target_os = "android")]
123 {
124 Some(ClientSleepHold::new(CLIENT_SLEEP_PAUSE_TIMEOUT))
125 }
126 #[cfg(not(target_os = "android"))]
127 {
128 None
129 }
130}
131
132#[cfg_attr(not(any(target_os = "android", test)), allow(dead_code))]
133fn spawn_physical_keepalive_loop(
134 mut writer: Box<dyn Writer>,
135 interface_id: InterfaceId,
136 interface_name: String,
137 interval: Duration,
138) {
139 thread::Builder::new()
140 .name(format!("local-phy-keepalive-{}", interface_id.0))
141 .spawn(move || loop {
142 thread::sleep(interval);
143 if let Err(err) = writer.send_frame(&[]) {
144 log::debug!(
145 "[{}:{}] LocalInterface physical keepalive stopped: {}",
146 interface_name,
147 interface_id.0,
148 err
149 );
150 return;
151 }
152 })
153 .ok();
154}
155
156fn maybe_spawn_local_client_phy_keepalive(
157 stream: &LocalClientStream,
158 interface_id: InterfaceId,
159 interface_name: &str,
160) -> io::Result<()> {
161 #[cfg(target_os = "android")]
162 {
163 let writer = local_client_stream_writer(stream)?;
164 spawn_physical_keepalive_loop(
165 writer,
166 interface_id,
167 interface_name.to_string(),
168 PHY_KEEPALIVE_INTERVAL,
169 );
170 }
171
172 #[cfg(not(target_os = "android"))]
173 {
174 let _ = (stream, interface_id, interface_name);
175 }
176
177 Ok(())
178}
179
180#[cfg(target_os = "linux")]
181mod unix_socket {
182 use std::io;
183 use std::os::linux::net::SocketAddrExt;
184 use std::os::unix::net::{SocketAddr, UnixListener, UnixStream};
185
186 fn abstract_addr(instance_name: &str) -> io::Result<SocketAddr> {
187 SocketAddr::from_abstract_name(format!("rns/{}", instance_name))
188 }
189
190 pub fn try_bind_unix(instance_name: &str) -> io::Result<UnixListener> {
192 let addr = abstract_addr(instance_name)?;
193 UnixListener::bind_addr(&addr)
194 }
195
196 pub fn try_connect_unix(instance_name: &str) -> io::Result<UnixStream> {
198 let addr = abstract_addr(instance_name)?;
199 UnixStream::connect_addr(&addr)
200 }
201}
202
203pub fn start_server(
209 config: LocalServerConfig,
210 tx: EventSender,
211 next_id: Arc<AtomicU64>,
212) -> io::Result<ListenerControl> {
213 let control = ListenerControl::new();
214 #[cfg(target_os = "linux")]
216 {
217 match unix_socket::try_bind_unix(&config.instance_name) {
218 Ok(listener) => {
219 listener.set_nonblocking(true)?;
220 log::info!(
221 "Local server using Unix socket: rns/{}",
222 config.instance_name
223 );
224 let name = format!("rns/{}", config.instance_name);
225 let listener_control = control.clone();
226 thread::Builder::new()
227 .name("local-server".into())
228 .spawn(move || {
229 unix_server_loop(listener, name, tx, next_id, listener_control);
230 })?;
231 return Ok(control);
232 }
233 Err(e) => {
234 log::info!("Unix socket bind failed ({}), falling back to TCP", e);
235 }
236 }
237 }
238
239 let addr = format!("127.0.0.1:{}", config.port);
241 let listener = TcpListener::bind(&addr)?;
242 listener.set_nonblocking(true)?;
243
244 log::info!("Local server listening on TCP {}", addr);
245
246 let listener_control = control.clone();
247 thread::Builder::new()
248 .name("local-server".into())
249 .spawn(move || {
250 tcp_server_loop(listener, tx, next_id, listener_control);
251 })?;
252
253 Ok(control)
254}
255
256fn tcp_server_loop(
258 listener: TcpListener,
259 tx: EventSender,
260 next_id: Arc<AtomicU64>,
261 control: ListenerControl,
262) {
263 loop {
264 if control.should_stop() {
265 log::info!("Local TCP listener stopping");
266 return;
267 }
268
269 let stream_result = listener.accept().map(|(stream, _)| stream);
270 let stream = match stream_result {
271 Ok(s) => s,
272 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
273 thread::sleep(Duration::from_millis(50));
274 continue;
275 }
276 Err(e) => {
277 log::warn!("Local server accept failed: {}", e);
278 continue;
279 }
280 };
281
282 if let Err(e) = stream.set_nodelay(true) {
283 log::warn!("Local server set_nodelay failed: {}", e);
284 }
285
286 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
287 spawn_local_client_handler(stream, client_id, tx.clone());
288 }
289}
290
291#[cfg(target_os = "linux")]
293fn unix_server_loop(
294 listener: std::os::unix::net::UnixListener,
295 name: String,
296 tx: EventSender,
297 next_id: Arc<AtomicU64>,
298 control: ListenerControl,
299) {
300 loop {
301 if control.should_stop() {
302 log::info!("[{}] Local Unix listener stopping", name);
303 return;
304 }
305
306 let stream_result = listener.accept().map(|(stream, _)| stream);
307 let stream = match stream_result {
308 Ok(s) => s,
309 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
310 thread::sleep(Duration::from_millis(50));
311 continue;
312 }
313 Err(e) => {
314 log::warn!("[{}] Local server accept failed: {}", name, e);
315 continue;
316 }
317 };
318
319 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
320
321 let writer_stream = match stream.try_clone() {
323 Ok(s) => s,
324 Err(e) => {
325 log::warn!("Local server clone failed: {}", e);
326 continue;
327 }
328 };
329
330 let sleep_hold = android_client_sleep_hold();
331 let info = make_local_interface_info(client_id);
332 let writer: Box<dyn Writer> = Box::new(UnixLocalWriter {
333 stream: writer_stream,
334 sleep_hold: sleep_hold.clone(),
335 });
336
337 if tx
338 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
339 .is_err()
340 {
341 return;
342 }
343
344 let client_tx = tx.clone();
345 thread::Builder::new()
346 .name(format!("local-unix-reader-{}", client_id.0))
347 .spawn(move || {
348 unix_reader_loop(stream, client_id, client_tx, sleep_hold);
349 })
350 .ok();
351 }
352}
353
354#[cfg(target_os = "linux")]
355struct UnixLocalWriter {
356 stream: std::os::unix::net::UnixStream,
357 sleep_hold: Option<ClientSleepHold>,
358}
359
360#[cfg(target_os = "linux")]
361impl Writer for UnixLocalWriter {
362 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
363 use std::io::Write;
364 if self
365 .sleep_hold
366 .as_ref()
367 .is_some_and(ClientSleepHold::should_drop_outbound)
368 {
369 log::debug!("TX paused for LocalInterface client, dropping outbound packet");
370 return Ok(());
371 }
372 self.stream.write_all(&hdlc::frame(data))
373 }
374}
375
376#[cfg(target_os = "linux")]
377fn unix_reader_loop(
378 mut stream: std::os::unix::net::UnixStream,
379 id: InterfaceId,
380 tx: EventSender,
381 sleep_hold: Option<ClientSleepHold>,
382) {
383 use std::io::Read;
384 let mut decoder = hdlc::Decoder::new();
385 let mut buf = [0u8; 4096];
386
387 loop {
388 match stream.read(&mut buf) {
389 Ok(0) => {
390 let _ = tx.send(Event::InterfaceDown(id));
391 return;
392 }
393 Ok(n) => {
394 if let Some(ref sleep_hold) = sleep_hold {
395 sleep_hold.refresh();
396 }
397 for frame in decoder.feed(&buf[..n]) {
398 if tx
399 .send(Event::Frame {
400 interface_id: id,
401 data: frame,
402 rssi: None,
403 snr: None,
404 })
405 .is_err()
406 {
407 return;
408 }
409 }
410 }
411 Err(_) => {
412 let _ = tx.send(Event::InterfaceDown(id));
413 return;
414 }
415 }
416 }
417}
418
419fn spawn_local_client_handler(stream: TcpStream, client_id: InterfaceId, tx: EventSender) {
421 let writer_stream = match stream.try_clone() {
422 Ok(s) => s,
423 Err(e) => {
424 log::warn!("Local server clone failed: {}", e);
425 return;
426 }
427 };
428
429 let sleep_hold = android_client_sleep_hold();
430 let info = make_local_interface_info(client_id);
431 let writer: Box<dyn Writer> = Box::new(LocalWriter {
432 stream: writer_stream,
433 sleep_hold: sleep_hold.clone(),
434 });
435
436 if tx
437 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
438 .is_err()
439 {
440 return;
441 }
442
443 thread::Builder::new()
444 .name(format!("local-reader-{}", client_id.0))
445 .spawn(move || {
446 tcp_reader_loop(stream, client_id, tx, sleep_hold);
447 })
448 .ok();
449}
450
451fn tcp_reader_loop(
452 mut stream: TcpStream,
453 id: InterfaceId,
454 tx: EventSender,
455 sleep_hold: Option<ClientSleepHold>,
456) {
457 let mut decoder = hdlc::Decoder::new();
458 let mut buf = [0u8; 4096];
459
460 loop {
461 match stream.read(&mut buf) {
462 Ok(0) => {
463 log::info!("Local client {} disconnected", id.0);
464 let _ = tx.send(Event::InterfaceDown(id));
465 return;
466 }
467 Ok(n) => {
468 if let Some(ref sleep_hold) = sleep_hold {
469 sleep_hold.refresh();
470 }
471 for frame in decoder.feed(&buf[..n]) {
472 if tx
473 .send(Event::Frame {
474 interface_id: id,
475 data: frame,
476 rssi: None,
477 snr: None,
478 })
479 .is_err()
480 {
481 return;
482 }
483 }
484 }
485 Err(e) => {
486 log::warn!("Local client {} read error: {}", id.0, e);
487 let _ = tx.send(Event::InterfaceDown(id));
488 return;
489 }
490 }
491 }
492}
493
494fn make_local_interface_info(id: InterfaceId) -> InterfaceInfo {
495 InterfaceInfo {
496 id,
497 name: String::from("LocalInterface"),
498 mode: constants::MODE_FULL,
499 out_capable: true,
500 in_capable: true,
501 bitrate: Some(1_000_000_000), airtime_profile: None,
503 announce_rate_target: None,
504 announce_rate_grace: 0,
505 announce_rate_penalty: 0.0,
506 announce_cap: constants::ANNOUNCE_CAP,
507 is_local_client: true,
508 wants_tunnel: false,
509 tunnel_id: None,
510 mtu: 65535,
511 ia_freq: 0.0,
512 ip_freq: 0.0,
513 op_freq: 0.0,
514 op_samples: 0,
515 started: 0.0,
516 ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
517 }
518}
519
520#[cfg(target_os = "linux")]
523enum LocalClientStream {
524 Unix(std::os::unix::net::UnixStream),
525 Tcp(TcpStream),
526}
527
528#[cfg(target_os = "linux")]
529impl LocalClientStream {
530 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
531 match self {
532 LocalClientStream::Unix(stream) => stream.read(buf),
533 LocalClientStream::Tcp(stream) => stream.read(buf),
534 }
535 }
536
537 fn writer(&self) -> io::Result<Box<dyn Writer>> {
538 match self {
539 LocalClientStream::Unix(stream) => Ok(Box::new(UnixLocalWriter {
540 stream: stream.try_clone()?,
541 sleep_hold: None,
542 })),
543 LocalClientStream::Tcp(stream) => Ok(Box::new(LocalWriter {
544 stream: stream.try_clone()?,
545 sleep_hold: None,
546 })),
547 }
548 }
549}
550
551#[cfg(not(target_os = "linux"))]
552type LocalClientStream = TcpStream;
553
554#[cfg(not(target_os = "linux"))]
555fn local_client_stream_writer(stream: &LocalClientStream) -> io::Result<Box<dyn Writer>> {
556 Ok(Box::new(LocalWriter {
557 stream: stream.try_clone()?,
558 sleep_hold: None,
559 }))
560}
561
562#[cfg(target_os = "linux")]
563fn local_client_stream_writer(stream: &LocalClientStream) -> io::Result<Box<dyn Writer>> {
564 stream.writer()
565}
566
567fn try_connect_tcp(config: &LocalClientConfig) -> io::Result<TcpStream> {
568 let addr = format!("127.0.0.1:{}", config.port);
569 let stream = TcpStream::connect(&addr)?;
570 stream.set_nodelay(true)?;
571 log::info!(
572 "[{}] Connected to shared instance via TCP {}",
573 config.name,
574 addr
575 );
576 Ok(stream)
577}
578
579#[cfg(target_os = "linux")]
580fn try_connect_local_client(config: &LocalClientConfig) -> io::Result<LocalClientStream> {
581 match unix_socket::try_connect_unix(&config.instance_name) {
582 Ok(stream) => {
583 log::info!(
584 "[{}] Connected to shared instance via Unix socket: rns/{}",
585 config.name,
586 config.instance_name
587 );
588 Ok(LocalClientStream::Unix(stream))
589 }
590 Err(e) => {
591 log::info!(
592 "[{}] Unix socket connect failed ({}), trying TCP",
593 config.name,
594 e
595 );
596 try_connect_tcp(config).map(LocalClientStream::Tcp)
597 }
598 }
599}
600
601#[cfg(not(target_os = "linux"))]
602fn try_connect_local_client(config: &LocalClientConfig) -> io::Result<LocalClientStream> {
603 try_connect_tcp(config)
604}
605
606fn reconnect_local_client(config: &LocalClientConfig, tx: &EventSender) -> LocalClientStream {
607 loop {
608 thread::sleep(config.reconnect_wait);
609 match try_connect_local_client(config) {
610 Ok(stream) => match local_client_stream_writer(&stream) {
611 Ok(writer) => {
612 let _ = maybe_spawn_local_client_phy_keepalive(
613 &stream,
614 config.interface_id,
615 &config.name,
616 );
617 let _ = tx.send(Event::InterfaceUp(config.interface_id, Some(writer), None));
618 return stream;
619 }
620 Err(e) => {
621 log::warn!("[{}] failed to clone reconnect writer: {}", config.name, e);
622 }
623 },
624 Err(e) => {
625 log::warn!("[{}] reconnect failed: {}", config.name, e);
626 }
627 }
628 }
629}
630
631fn local_client_reader_loop(
632 mut stream: LocalClientStream,
633 config: LocalClientConfig,
634 tx: EventSender,
635) {
636 let id = config.interface_id;
637 let mut decoder = hdlc::Decoder::new();
638 let mut buf = [0u8; 4096];
639
640 loop {
641 match stream.read(&mut buf) {
642 Ok(0) => {
643 log::warn!("[{}] shared connection closed", config.name);
644 let _ = tx.send(Event::InterfaceDown(id));
645 stream = reconnect_local_client(&config, &tx);
646 decoder = hdlc::Decoder::new();
647 }
648 Ok(n) => {
649 for frame in decoder.feed(&buf[..n]) {
650 if tx
651 .send(Event::Frame {
652 interface_id: id,
653 data: frame,
654 rssi: None,
655 snr: None,
656 })
657 .is_err()
658 {
659 return;
660 }
661 }
662 }
663 Err(e) => {
664 log::warn!("[{}] shared read error: {}", config.name, e);
665 let _ = tx.send(Event::InterfaceDown(id));
666 stream = reconnect_local_client(&config, &tx);
667 decoder = hdlc::Decoder::new();
668 }
669 }
670 }
671}
672
673pub fn start_client(config: LocalClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
677 let id = config.interface_id;
678 let stream = try_connect_local_client(&config)?;
679 let writer = local_client_stream_writer(&stream)?;
680 maybe_spawn_local_client_phy_keepalive(&stream, id, &config.name)?;
681
682 let _ = tx.send(Event::InterfaceUp(id, None, None));
683
684 thread::Builder::new()
685 .name(format!("local-client-reader-{}", id.0))
686 .spawn(move || {
687 local_client_reader_loop(stream, config, tx);
688 })?;
689
690 Ok(writer)
691}
692
693use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
696use std::collections::HashMap;
697
698pub struct LocalServerFactory;
700
701impl InterfaceFactory for LocalServerFactory {
702 fn type_name(&self) -> &str {
703 "LocalServerInterface"
704 }
705
706 fn parse_config(
707 &self,
708 _name: &str,
709 id: InterfaceId,
710 params: &HashMap<String, String>,
711 ) -> Result<Box<dyn InterfaceConfigData>, String> {
712 let instance_name = params
713 .get("instance_name")
714 .cloned()
715 .unwrap_or_else(|| "default".into());
716 let port = params
717 .get("port")
718 .and_then(|v| v.parse().ok())
719 .unwrap_or(37428);
720
721 Ok(Box::new(LocalServerConfig {
722 instance_name,
723 port,
724 interface_id: id,
725 }))
726 }
727
728 fn start(
729 &self,
730 config: Box<dyn InterfaceConfigData>,
731 ctx: StartContext,
732 ) -> std::io::Result<StartResult> {
733 let server_config = *config
734 .into_any()
735 .downcast::<LocalServerConfig>()
736 .map_err(|_| {
737 std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
738 })?;
739
740 let control = start_server(server_config, ctx.tx, ctx.next_dynamic_id)?;
741 Ok(StartResult::Listener {
742 control: Some(control),
743 })
744 }
745}
746
747pub struct LocalClientFactory;
749
750impl InterfaceFactory for LocalClientFactory {
751 fn type_name(&self) -> &str {
752 "LocalClientInterface"
753 }
754
755 fn parse_config(
756 &self,
757 _name: &str,
758 id: InterfaceId,
759 params: &HashMap<String, String>,
760 ) -> Result<Box<dyn InterfaceConfigData>, String> {
761 let instance_name = params
762 .get("instance_name")
763 .cloned()
764 .unwrap_or_else(|| "default".into());
765 let port = params
766 .get("port")
767 .and_then(|v| v.parse().ok())
768 .unwrap_or(37428);
769
770 Ok(Box::new(LocalClientConfig {
771 instance_name,
772 port,
773 interface_id: id,
774 ..LocalClientConfig::default()
775 }))
776 }
777
778 fn start(
779 &self,
780 config: Box<dyn InterfaceConfigData>,
781 ctx: StartContext,
782 ) -> std::io::Result<StartResult> {
783 let client_config = *config
784 .into_any()
785 .downcast::<LocalClientConfig>()
786 .map_err(|_| {
787 std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
788 })?;
789
790 let id = client_config.interface_id;
791 let name = client_config.name.clone();
792 let info = InterfaceInfo {
793 id,
794 name,
795 mode: ctx.mode,
796 out_capable: true,
797 in_capable: true,
798 bitrate: Some(1_000_000_000),
799 airtime_profile: None,
800 announce_rate_target: None,
801 announce_rate_grace: 0,
802 announce_rate_penalty: 0.0,
803 announce_cap: rns_core::constants::ANNOUNCE_CAP,
804 is_local_client: false,
805 wants_tunnel: false,
806 tunnel_id: None,
807 mtu: 65535,
808 ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
809 ia_freq: 0.0,
810 ip_freq: 0.0,
811 op_freq: 0.0,
812 op_samples: 0,
813 started: crate::time::now(),
814 };
815
816 let writer = start_client(client_config, ctx.tx)?;
817
818 Ok(StartResult::Simple {
819 id,
820 info,
821 writer,
822 interface_type_name: "LocalInterface".to_string(),
823 })
824 }
825}
826
827#[cfg(test)]
828mod tests {
829 use super::*;
830 use std::sync::mpsc;
831 use std::sync::mpsc::RecvTimeoutError;
832
833 #[cfg(target_os = "linux")]
834 type TestClient = std::os::unix::net::UnixStream;
835
836 #[cfg(not(target_os = "linux"))]
837 type TestClient = TcpStream;
838
839 fn connect_test_client(instance_name: &str, _port: u16) -> TestClient {
840 #[cfg(target_os = "linux")]
841 {
842 unix_socket::try_connect_unix(instance_name).unwrap()
843 }
844
845 #[cfg(not(target_os = "linux"))]
846 {
847 TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap()
848 }
849 }
850
851 fn find_free_port() -> u16 {
852 TcpListener::bind("127.0.0.1:0")
853 .unwrap()
854 .local_addr()
855 .unwrap()
856 .port()
857 }
858
859 #[test]
860 fn server_bind_tcp() {
861 let port = find_free_port();
862 let instance_name = "test-bind".to_string();
863 let (tx, _rx) = crate::event::channel();
864 let next_id = Arc::new(AtomicU64::new(7000));
865
866 let config = LocalServerConfig {
867 instance_name: instance_name.clone(),
868 port,
869 interface_id: InterfaceId(70),
870 };
871
872 start_server(config, tx, next_id).unwrap();
875 thread::sleep(Duration::from_millis(50));
876
877 connect_test_client(&instance_name, port);
878 }
879
880 #[test]
881 fn server_accept_client() {
882 let port = find_free_port();
883 let instance_name = "test-accept".to_string();
884 let (tx, rx) = crate::event::channel();
885 let next_id = Arc::new(AtomicU64::new(7100));
886
887 let config = LocalServerConfig {
888 instance_name: instance_name.clone(),
889 port,
890 interface_id: InterfaceId(71),
891 };
892
893 start_server(config, tx, next_id).unwrap();
894 thread::sleep(Duration::from_millis(50));
895
896 connect_test_client(&instance_name, port);
897
898 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
899 match event {
900 Event::InterfaceUp(id, writer, info) => {
901 assert_eq!(id, InterfaceId(7100));
902 assert!(writer.is_some());
903 assert!(info.is_some());
904 }
905 other => panic!("expected InterfaceUp, got {:?}", other),
906 }
907 }
908
909 #[test]
910 fn server_stop_prevents_new_accepts() {
911 let port = find_free_port();
912 let instance_name = "test-stop".to_string();
913 let (tx, rx) = crate::event::channel();
914 let next_id = Arc::new(AtomicU64::new(7150));
915
916 let config = LocalServerConfig {
917 instance_name: instance_name.clone(),
918 port,
919 interface_id: InterfaceId(71),
920 };
921
922 let control = start_server(config, tx, next_id).unwrap();
923 thread::sleep(Duration::from_millis(50));
924 control.request_stop();
925 thread::sleep(Duration::from_millis(120));
926
927 #[cfg(target_os = "linux")]
928 let connect_result = unix_socket::try_connect_unix(&instance_name);
929
930 #[cfg(not(target_os = "linux"))]
931 let connect_result = TcpStream::connect(format!("127.0.0.1:{}", port));
932
933 if let Ok(stream) = connect_result {
934 drop(stream);
935 }
936
937 match rx.recv_timeout(Duration::from_millis(200)) {
938 Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => {}
939 other => panic!("expected no InterfaceUp after server stop, got {:?}", other),
940 }
941 }
942
943 #[test]
944 fn client_send_receive() {
945 let port = find_free_port();
946 let (server_tx, server_rx) = crate::event::channel();
947 let next_id = Arc::new(AtomicU64::new(7200));
948
949 let server_config = LocalServerConfig {
950 instance_name: "test-sr".into(),
951 port,
952 interface_id: InterfaceId(72),
953 };
954
955 start_server(server_config, server_tx, next_id).unwrap();
956 thread::sleep(Duration::from_millis(50));
957
958 let (client_tx, client_rx) = crate::event::channel();
960 let client_config = LocalClientConfig {
961 name: "test-client".into(),
962 instance_name: "test-sr".into(),
963 port,
964 interface_id: InterfaceId(73),
965 reconnect_wait: Duration::from_secs(1),
966 };
967
968 let mut client_writer = start_client(client_config, client_tx).unwrap();
969
970 let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
972 let mut server_writer = match event {
973 Event::InterfaceUp(_, Some(w), Some(info)) => {
974 assert!(info.is_local_client);
975 w
976 }
977 other => panic!("expected InterfaceUp with writer, got {:?}", other),
978 };
979
980 let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
982 match event {
983 Event::InterfaceUp(id, _, _) => assert_eq!(id, InterfaceId(73)),
984 other => panic!("expected InterfaceUp, got {:?}", other),
985 }
986
987 let payload: Vec<u8> = (0..32).collect();
989 client_writer.send_frame(&payload).unwrap();
990
991 let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
992 match event {
993 Event::Frame { data, .. } => assert_eq!(data, payload),
994 other => panic!("expected Frame, got {:?}", other),
995 }
996
997 let payload2: Vec<u8> = (100..132).collect();
999 server_writer.send_frame(&payload2).unwrap();
1000
1001 let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1002 match event {
1003 Event::Frame { data, .. } => assert_eq!(data, payload2),
1004 other => panic!("expected Frame, got {:?}", other),
1005 }
1006 }
1007
1008 #[test]
1009 fn sleep_hold_drops_outbound_after_timeout_and_refresh_restores() {
1010 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1011 let addr = listener.local_addr().unwrap();
1012 let mut client = TcpStream::connect(addr).unwrap();
1013 let (server_stream, _) = listener.accept().unwrap();
1014 client
1015 .set_read_timeout(Some(Duration::from_millis(200)))
1016 .unwrap();
1017
1018 let sleep_hold = ClientSleepHold::new(Duration::from_millis(30));
1019 let mut writer = LocalWriter {
1020 stream: server_stream,
1021 sleep_hold: Some(sleep_hold.clone()),
1022 };
1023
1024 writer.send_frame(b"live").unwrap();
1025 let mut buf = [0u8; 16];
1026 let n = client.read(&mut buf).unwrap();
1027 assert!(n > 0);
1028
1029 thread::sleep(Duration::from_millis(50));
1030 writer.send_frame(b"drop").unwrap();
1031 let err = client.read(&mut buf).unwrap_err();
1032 assert!(
1033 matches!(
1034 err.kind(),
1035 io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut
1036 ),
1037 "paused writer should not emit bytes, got {err:?}"
1038 );
1039
1040 sleep_hold.refresh();
1041 writer.send_frame(b"again").unwrap();
1042 let n = client.read(&mut buf).unwrap();
1043 assert!(n > 0);
1044 }
1045
1046 #[test]
1047 fn tcp_reader_refreshes_sleep_hold_on_inbound_data() {
1048 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1049 let addr = listener.local_addr().unwrap();
1050 let mut client = TcpStream::connect(addr).unwrap();
1051 let (server_stream, _) = listener.accept().unwrap();
1052 let writer_stream = server_stream.try_clone().unwrap();
1053 client
1054 .set_read_timeout(Some(Duration::from_millis(200)))
1055 .unwrap();
1056
1057 let sleep_hold = ClientSleepHold::new(Duration::from_millis(30));
1058 let reader_sleep_hold = sleep_hold.clone();
1059 let (tx, rx) = crate::event::channel();
1060 thread::spawn(move || {
1061 tcp_reader_loop(server_stream, InterfaceId(98), tx, Some(reader_sleep_hold));
1062 });
1063
1064 let mut writer = LocalWriter {
1065 stream: writer_stream,
1066 sleep_hold: Some(sleep_hold),
1067 };
1068
1069 thread::sleep(Duration::from_millis(50));
1070 writer.send_frame(b"drop").unwrap();
1071 let mut buf = [0u8; 32];
1072 let err = client.read(&mut buf).unwrap_err();
1073 assert!(
1074 matches!(
1075 err.kind(),
1076 io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut
1077 ),
1078 "paused writer should not emit bytes, got {err:?}"
1079 );
1080
1081 let inbound = vec![0x42; constants::HEADER_MINSIZE];
1082 client.write_all(&hdlc::frame(&inbound)).unwrap();
1083 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1084 match event {
1085 Event::Frame {
1086 interface_id,
1087 data,
1088 rssi: _,
1089 snr: _,
1090 } => {
1091 assert_eq!(interface_id, InterfaceId(98));
1092 assert_eq!(data, inbound);
1093 }
1094 other => panic!("expected Frame, got {:?}", other),
1095 }
1096
1097 writer.send_frame(b"again").unwrap();
1098 let n = client.read(&mut buf).unwrap();
1099 assert!(n > 0);
1100 }
1101
1102 #[test]
1103 fn physical_keepalive_loop_sends_empty_hdlc_frame() {
1104 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1105 let addr = listener.local_addr().unwrap();
1106 let mut client = TcpStream::connect(addr).unwrap();
1107 let (server_stream, _) = listener.accept().unwrap();
1108 client
1109 .set_read_timeout(Some(Duration::from_millis(500)))
1110 .unwrap();
1111
1112 let writer: Box<dyn Writer> = Box::new(LocalWriter {
1113 stream: server_stream,
1114 sleep_hold: None,
1115 });
1116 spawn_physical_keepalive_loop(
1117 writer,
1118 InterfaceId(99),
1119 "test-local".into(),
1120 Duration::from_millis(10),
1121 );
1122
1123 let mut buf = [0u8; 2];
1124 client.read_exact(&mut buf).unwrap();
1125 assert_eq!(buf, [0x7E, 0x7E]);
1126 }
1127
1128 #[test]
1129 fn multiple_local_clients() {
1130 let port = find_free_port();
1131 let instance_name = "test-multi".to_string();
1132 let (tx, rx) = crate::event::channel();
1133 let next_id = Arc::new(AtomicU64::new(7300));
1134
1135 let config = LocalServerConfig {
1136 instance_name: instance_name.clone(),
1137 port,
1138 interface_id: InterfaceId(74),
1139 };
1140
1141 start_server(config, tx, next_id).unwrap();
1142 thread::sleep(Duration::from_millis(50));
1143
1144 let _client1 = connect_test_client(&instance_name, port);
1145 let _client2 = connect_test_client(&instance_name, port);
1146
1147 let mut ids = Vec::new();
1148 for _ in 0..2 {
1149 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1150 match event {
1151 Event::InterfaceUp(id, _, _) => ids.push(id),
1152 other => panic!("expected InterfaceUp, got {:?}", other),
1153 }
1154 }
1155
1156 assert_eq!(ids.len(), 2);
1157 assert_ne!(ids[0], ids[1]);
1158 }
1159
1160 #[test]
1161 fn client_disconnect_detected() {
1162 let port = find_free_port();
1163 let instance_name = "test-dc".to_string();
1164 let (tx, rx) = crate::event::channel();
1165 let next_id = Arc::new(AtomicU64::new(7400));
1166
1167 let config = LocalServerConfig {
1168 instance_name: instance_name.clone(),
1169 port,
1170 interface_id: InterfaceId(75),
1171 };
1172
1173 start_server(config, tx, next_id).unwrap();
1174 thread::sleep(Duration::from_millis(50));
1175
1176 #[cfg(target_os = "linux")]
1177 let client = unix_socket::try_connect_unix(&instance_name).unwrap();
1178
1179 #[cfg(not(target_os = "linux"))]
1180 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1181
1182 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1184
1185 drop(client);
1187
1188 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1189 assert!(
1190 matches!(event, Event::InterfaceDown(_)),
1191 "expected InterfaceDown, got {:?}",
1192 event
1193 );
1194 }
1195
1196 #[test]
1197 fn client_reconnects_after_tcp_restart() {
1198 let port = find_free_port();
1199 let addr = format!("127.0.0.1:{}", port);
1200 let instance_name = format!("test-reconnect-{}", port);
1201
1202 let listener1 = TcpListener::bind(&addr).unwrap();
1203 let (accepted1_tx, accepted1_rx) = mpsc::channel();
1204 thread::spawn(move || {
1205 let (stream, _) = listener1.accept().unwrap();
1206 accepted1_tx.send(stream).unwrap();
1207 });
1208
1209 let (client_tx, client_rx) = crate::event::channel();
1210 let client_config = LocalClientConfig {
1211 name: "test-client".into(),
1212 instance_name,
1213 port,
1214 interface_id: InterfaceId(76),
1215 reconnect_wait: Duration::from_millis(50),
1216 };
1217
1218 let _writer = start_client(client_config, client_tx).unwrap();
1219 let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1220 assert!(matches!(
1221 event,
1222 Event::InterfaceUp(InterfaceId(76), None, None)
1223 ));
1224
1225 let stream1 = accepted1_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1226 drop(stream1);
1227
1228 let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1229 assert!(matches!(event, Event::InterfaceDown(InterfaceId(76))));
1230
1231 let listener2 = TcpListener::bind(&addr).unwrap();
1232 let (accepted2_tx, accepted2_rx) = mpsc::channel();
1233 thread::spawn(move || {
1234 let (stream, _) = listener2.accept().unwrap();
1235 accepted2_tx.send(stream).unwrap();
1236 });
1237
1238 let mut reconnected_writer = None;
1239 for _ in 0..10 {
1240 let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1241 match event {
1242 Event::InterfaceUp(InterfaceId(76), writer, None) if writer.is_some() => {
1243 reconnected_writer = writer;
1244 break;
1245 }
1246 _ => {}
1247 }
1248 }
1249
1250 let mut reconnected_writer = reconnected_writer.expect("missing reconnect writer");
1251 let mut stream2 = accepted2_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1252 reconnected_writer.send_frame(b"client->server").unwrap();
1253 stream2
1254 .set_read_timeout(Some(Duration::from_secs(2)))
1255 .unwrap();
1256 let mut buf = [0u8; 64];
1257 let n = stream2.read(&mut buf).unwrap();
1258 assert!(n > 0, "expected bytes from refreshed writer");
1259 }
1260
1261 #[cfg(target_os = "linux")]
1262 #[test]
1263 fn unix_abstract_socket_helpers_work() {
1264 let instance_name = format!(
1265 "test-abstract-{}",
1266 std::time::SystemTime::now()
1267 .duration_since(std::time::UNIX_EPOCH)
1268 .unwrap()
1269 .as_nanos()
1270 );
1271
1272 let listener = unix_socket::try_bind_unix(&instance_name).unwrap();
1273 let accept_thread = thread::spawn(move || listener.accept().unwrap().0);
1274
1275 let mut client = unix_socket::try_connect_unix(&instance_name).unwrap();
1276 let mut server = accept_thread.join().unwrap();
1277
1278 client.write_all(b"ping").unwrap();
1279 let mut buf = [0u8; 4];
1280 server.read_exact(&mut buf).unwrap();
1281 assert_eq!(&buf, b"ping");
1282 }
1283}