1use std::collections::HashMap;
13use std::hash::{BuildHasher, Hasher};
14use std::io::{self, Read, Write};
15use std::net::{IpAddr, Shutdown, TcpListener, TcpStream, ToSocketAddrs};
16use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
17use std::sync::{Arc, Mutex};
18use std::thread;
19use std::time::{Duration, Instant};
20
21use polling::{Event as PollEvent, Events, Poller};
22use socket2::{SockRef, TcpKeepalive};
23
24use rns_core::constants;
25use rns_core::transport::types::{IngressControlConfig, InterfaceId, InterfaceInfo};
26
27use crate::event::{Event, EventSender};
28use crate::hdlc;
29use crate::interface::{
30 lock_or_recover, InterfaceConfigData, InterfaceFactory, StartContext, StartResult, Writer,
31};
32use crate::BackbonePeerStateEntry;
33
34#[allow(dead_code)]
36const HW_MTU: usize = 1_048_576;
37
38#[derive(Debug, Clone)]
40pub struct BackboneConfig {
41 pub name: String,
42 pub listen_ip: String,
43 pub listen_port: u16,
44 pub interface_id: InterfaceId,
45 pub mode: u8,
46 pub max_connections: Option<usize>,
47 pub idle_timeout: Option<Duration>,
48 pub write_stall_timeout: Option<Duration>,
49 pub abuse: BackboneAbuseConfig,
50 pub ingress_control: IngressControlConfig,
51 pub runtime: Arc<Mutex<BackboneServerRuntime>>,
52 pub peer_state: Arc<Mutex<BackbonePeerMonitor>>,
53}
54
55#[derive(Debug, Clone, Default)]
57pub struct BackboneAbuseConfig {
58 pub max_penalty_duration: Option<Duration>,
59}
60
61#[derive(Debug, Clone)]
63pub struct BackboneServerRuntime {
64 pub max_connections: Option<usize>,
65 pub idle_timeout: Option<Duration>,
66 pub write_stall_timeout: Option<Duration>,
67 pub abuse: BackboneAbuseConfig,
68}
69
70impl BackboneServerRuntime {
71 pub fn from_config(config: &BackboneConfig) -> Self {
72 Self {
73 max_connections: config.max_connections,
74 idle_timeout: config.idle_timeout,
75 write_stall_timeout: config.write_stall_timeout,
76 abuse: config.abuse.clone(),
77 }
78 }
79}
80
81#[derive(Debug, Clone)]
82pub struct BackboneRuntimeConfigHandle {
83 pub interface_name: String,
84 pub runtime: Arc<Mutex<BackboneServerRuntime>>,
85 pub startup: BackboneServerRuntime,
86}
87
88#[derive(Debug, Clone)]
89pub struct BackbonePeerStateHandle {
90 pub interface_id: InterfaceId,
91 pub interface_name: String,
92 pub peer_state: Arc<Mutex<BackbonePeerMonitor>>,
93}
94
95impl Default for BackboneConfig {
96 fn default() -> Self {
97 let mut config = BackboneConfig {
98 name: String::new(),
99 listen_ip: "0.0.0.0".into(),
100 listen_port: 0,
101 interface_id: InterfaceId(0),
102 mode: constants::MODE_FULL,
103 max_connections: None,
104 idle_timeout: None,
105 write_stall_timeout: None,
106 abuse: BackboneAbuseConfig::default(),
107 ingress_control: IngressControlConfig::enabled(),
108 runtime: Arc::new(Mutex::new(BackboneServerRuntime {
109 max_connections: None,
110 idle_timeout: None,
111 write_stall_timeout: None,
112 abuse: BackboneAbuseConfig::default(),
113 })),
114 peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
115 };
116 let startup = BackboneServerRuntime::from_config(&config);
117 config.runtime = Arc::new(Mutex::new(startup));
118 config
119 }
120}
121
122const MAX_PENDING_BYTES: usize = 512 * 1024;
125
126struct BackboneWriter {
128 stream: TcpStream,
129 runtime: Arc<Mutex<BackboneServerRuntime>>,
130 interface_name: String,
131 interface_id: InterfaceId,
132 event_tx: EventSender,
133 pending: Vec<u8>,
134 stall_started: Option<Instant>,
135 disconnect_notified: bool,
136 write_stall_flag: Arc<AtomicBool>,
137}
138
139impl Writer for BackboneWriter {
140 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
141 let write_stall_timeout =
142 lock_or_recover(&self.runtime, "backbone runtime").write_stall_timeout;
143 if !self.pending.is_empty() {
144 self.flush_pending(write_stall_timeout)?;
145 if !self.pending.is_empty() {
146 return Err(io::Error::new(
147 io::ErrorKind::WouldBlock,
148 "backbone writer still stalled",
149 ));
150 }
151 }
152
153 let frame = hdlc::frame(data);
154 self.write_buffer(&frame, write_stall_timeout)
155 }
156}
157
158impl BackboneWriter {
159 fn write_buffer(
160 &mut self,
161 data: &[u8],
162 write_stall_timeout: Option<Duration>,
163 ) -> io::Result<()> {
164 let mut written = 0usize;
165 while written < data.len() {
166 match self.stream.write(&data[written..]) {
167 Ok(0) => {
168 return Err(io::Error::new(
169 io::ErrorKind::WriteZero,
170 "backbone writer wrote zero bytes",
171 ))
172 }
173 Ok(n) => {
174 written += n;
175 self.stall_started = None;
176 }
177 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
178 let now = Instant::now();
179 let started = self.stall_started.get_or_insert(now);
180 if let Some(timeout) = write_stall_timeout {
181 if now.duration_since(*started) >= timeout {
182 return Err(self.disconnect_for_write_stall(timeout));
183 }
184 }
185 if self.pending.len() + data[written..].len() > MAX_PENDING_BYTES {
186 return Err(self.disconnect_for_write_stall(
187 write_stall_timeout.unwrap_or(Duration::from_secs(30)),
188 ));
189 }
190 self.pending.extend_from_slice(&data[written..]);
191 return Err(io::Error::new(
192 io::ErrorKind::WouldBlock,
193 "backbone writer would block",
194 ));
195 }
196 Err(e) => return Err(e),
197 }
198 }
199 Ok(())
200 }
201
202 fn flush_pending(&mut self, write_stall_timeout: Option<Duration>) -> io::Result<()> {
203 if self.pending.is_empty() {
204 return Ok(());
205 }
206
207 let pending = std::mem::take(&mut self.pending);
208 match self.write_buffer(&pending, write_stall_timeout) {
209 Ok(()) => Ok(()),
210 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(()),
211 Err(e) => Err(e),
212 }
213 }
214
215 fn disconnect_for_write_stall(&mut self, timeout: Duration) -> io::Error {
216 if !self.disconnect_notified {
217 log::warn!(
218 "[{}] backbone client {} disconnected due to write stall timeout ({:?})",
219 self.interface_name,
220 self.interface_id.0,
221 timeout
222 );
223 self.write_stall_flag.store(true, Ordering::Relaxed);
224 let _ = self.stream.shutdown(Shutdown::Both);
225 let _ = self.event_tx.send(Event::InterfaceDown(self.interface_id));
226 self.disconnect_notified = true;
227 }
228 io::Error::new(
229 io::ErrorKind::TimedOut,
230 format!("backbone writer stalled for {:?}", timeout),
231 )
232 }
233}
234
235pub fn start(config: BackboneConfig, tx: EventSender, next_id: Arc<AtomicU64>) -> io::Result<()> {
237 let addr = format!("{}:{}", config.listen_ip, config.listen_port);
238 let listener = TcpListener::bind(&addr)?;
239 listener.set_nonblocking(true)?;
240
241 log::info!(
242 "[{}] backbone server listening on {}",
243 config.name,
244 listener
245 .local_addr()
246 .unwrap_or_else(|_| std::net::SocketAddr::from(([0, 0, 0, 0], config.listen_port)))
247 );
248
249 let name = config.name.clone();
250 let server_interface_id = config.interface_id;
251 let runtime = Arc::clone(&config.runtime);
252 let peer_state = Arc::clone(&config.peer_state);
253 let ingress_control = config.ingress_control;
254 let accepted_peer_mode = config.mode;
255 thread::Builder::new()
256 .name(format!("backbone-poll-{}", config.interface_id.0))
257 .spawn(move || {
258 if let Err(e) = poll_loop(
259 listener,
260 name,
261 server_interface_id,
262 tx,
263 next_id,
264 runtime,
265 peer_state,
266 ingress_control,
267 accepted_peer_mode,
268 ) {
269 log::error!("backbone poll loop error: {}", e);
270 }
271 })?;
272
273 Ok(())
274}
275
276struct ClientState {
278 id: InterfaceId,
279 peer_ip: IpAddr,
280 peer_port: u16,
281 stream: TcpStream,
282 decoder: hdlc::Decoder,
283 connected_at: Instant,
284 has_received_data: bool,
285 write_stall_flag: Arc<AtomicBool>,
286}
287
288#[derive(Debug, Clone)]
289struct PeerBehaviorState {
290 blacklisted_until: Option<Instant>,
291 blacklist_reason: Option<String>,
292 reject_count: u64,
293 connected_count: usize,
294}
295
296impl PeerBehaviorState {
297 fn new() -> Self {
298 Self {
299 blacklisted_until: None,
300 blacklist_reason: None,
301 reject_count: 0,
302 connected_count: 0,
303 }
304 }
305}
306
307#[derive(Debug, Clone, Default)]
308pub struct BackbonePeerMonitor {
309 peers: HashMap<IpAddr, PeerBehaviorState>,
310}
311
312impl BackbonePeerMonitor {
313 pub fn new() -> Self {
314 Self {
315 peers: HashMap::new(),
316 }
317 }
318
319 fn upsert_snapshot(&mut self, peers: &HashMap<IpAddr, PeerBehaviorState>) {
320 let mut merged = self.peers.clone();
321
322 for (peer_ip, state) in peers {
323 let entry = merged
324 .entry(*peer_ip)
325 .or_insert_with(PeerBehaviorState::new);
326 entry.connected_count = state.connected_count;
327 entry.reject_count = state.reject_count;
328 if state.blacklisted_until.is_some() {
329 entry.blacklisted_until = state.blacklisted_until;
330 entry.blacklist_reason = state.blacklist_reason.clone();
331 }
332 }
333
334 merged.retain(|peer_ip, state| {
335 peers.contains_key(peer_ip)
336 || state.blacklisted_until.is_some()
337 || state.reject_count > 0
338 });
339 self.peers = merged;
340 }
341
342 fn sync_into(&self, peers: &mut HashMap<IpAddr, PeerBehaviorState>) {
343 for (peer_ip, state) in &self.peers {
344 let entry = peers.entry(*peer_ip).or_insert_with(PeerBehaviorState::new);
345 entry.blacklisted_until = state.blacklisted_until;
346 entry.blacklist_reason = state.blacklist_reason.clone();
347 entry.reject_count = state.reject_count;
348 }
349
350 peers.retain(|peer_ip, state| {
351 if state.connected_count > 0 {
352 return true;
353 }
354 self.peers.contains_key(peer_ip)
355 });
356 }
357
358 pub fn list(&self, interface_name: &str) -> Vec<BackbonePeerStateEntry> {
359 let now = Instant::now();
360 let mut entries: Vec<BackbonePeerStateEntry> = self
361 .peers
362 .iter()
363 .map(|(peer_ip, state)| BackbonePeerStateEntry {
364 interface_name: interface_name.to_string(),
365 peer_ip: *peer_ip,
366 connected_count: state.connected_count,
367 blacklisted_remaining_secs: state
368 .blacklisted_until
369 .and_then(|until| (until > now).then(|| (until - now).as_secs_f64())),
370 blacklist_reason: state.blacklist_reason.clone(),
371 reject_count: state.reject_count,
372 })
373 .collect();
374 entries.sort_by(|a, b| a.peer_ip.cmp(&b.peer_ip));
375 entries
376 }
377
378 pub fn clear(&mut self, peer_ip: IpAddr) -> bool {
379 self.peers.remove(&peer_ip).is_some()
380 }
381
382 pub fn blacklist(&mut self, peer_ip: IpAddr, duration: Duration, reason: String) -> bool {
383 let state = self
384 .peers
385 .entry(peer_ip)
386 .or_insert_with(PeerBehaviorState::new);
387 state.blacklisted_until = Some(Instant::now() + duration);
388 state.blacklist_reason = Some(reason);
389 true
390 }
391
392 #[cfg(test)]
393 pub fn seed_entry(&mut self, entry: BackbonePeerStateEntry) {
394 let mut state = PeerBehaviorState::new();
395 state.connected_count = entry.connected_count;
396 state.reject_count = entry.reject_count;
397 state.blacklist_reason = entry.blacklist_reason;
398 if let Some(remaining) = entry.blacklisted_remaining_secs {
399 state.blacklisted_until = Some(Instant::now() + Duration::from_secs_f64(remaining));
400 }
401 self.peers.insert(entry.peer_ip, state);
402 }
403}
404
405#[derive(Clone, Copy)]
406enum DisconnectReason {
407 RemoteClosed,
408 IdleTimeout,
409 WriteStall,
410}
411
412fn poll_loop(
414 listener: TcpListener,
415 name: String,
416 server_interface_id: InterfaceId,
417 tx: EventSender,
418 next_id: Arc<AtomicU64>,
419 runtime: Arc<Mutex<BackboneServerRuntime>>,
420 peer_state: Arc<Mutex<BackbonePeerMonitor>>,
421 ingress_control: IngressControlConfig,
422 accepted_peer_mode: u8,
423) -> io::Result<()> {
424 let poller = Poller::new()?;
425
426 const LISTENER_KEY: usize = 0;
427
428 unsafe { poller.add(&listener, PollEvent::readable(LISTENER_KEY))? };
430
431 let mut clients: HashMap<usize, ClientState> = HashMap::new();
432 let mut peers: HashMap<IpAddr, PeerBehaviorState> = HashMap::new();
433 let mut events = Events::new();
434 let mut next_key: usize = 1;
435
436 loop {
437 let runtime_snapshot = runtime.lock().unwrap().clone();
438 let max_connections = runtime_snapshot.max_connections;
439 let idle_timeout = runtime_snapshot.idle_timeout;
440 cleanup_peer_state(&mut peers);
441 {
442 let mut monitor = peer_state.lock().unwrap();
443 monitor.sync_into(&mut peers);
444 monitor.upsert_snapshot(&peers);
445 }
446
447 events.clear();
448 poller.wait(&mut events, Some(Duration::from_secs(1)))?;
449
450 for ev in events.iter() {
451 if ev.key == LISTENER_KEY {
452 loop {
454 match listener.accept() {
455 Ok((stream, peer_addr)) => {
456 let peer_ip = peer_addr.ip();
457 let peer_port = peer_addr.port();
458
459 if is_ip_blacklisted(&mut peers, peer_ip) {
460 if let Some(state) = peers.get_mut(&peer_ip) {
461 state.reject_count = state.reject_count.saturating_add(1);
462 }
463 peer_state.lock().unwrap().upsert_snapshot(&peers);
464 log::debug!("[{}] rejecting blacklisted peer {}", name, peer_addr);
465 drop(stream);
466 continue;
467 }
468
469 if let Some(max) = max_connections {
470 if clients.len() >= max {
471 log::warn!(
472 "[{}] max connections ({}) reached, rejecting {}",
473 name,
474 max,
475 peer_addr
476 );
477 drop(stream);
478 continue;
479 }
480 }
481
482 stream.set_nonblocking(true).ok();
483 stream.set_nodelay(true).ok();
484 set_tcp_keepalive(&stream).ok();
485
486 #[cfg(target_os = "macos")]
488 {
489 let sock = SockRef::from(&stream);
490 sock.set_nosigpipe(true).ok();
491 }
492
493 let key = next_key;
494 next_key += 1;
495 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
496
497 log::info!(
498 "[{}] backbone client connected: {} → id {}",
499 name,
500 peer_addr,
501 client_id.0
502 );
503
504 if let Err(e) = unsafe { poller.add(&stream, PollEvent::readable(key)) }
507 {
508 log::warn!("[{}] failed to add client to poller: {}", name, e);
509 continue; }
511
512 let writer_stream = match stream.try_clone() {
514 Ok(s) => s,
515 Err(e) => {
516 log::warn!("[{}] failed to clone client stream: {}", name, e);
517 let _ = poller.delete(&stream);
518 continue; }
520 };
521 let write_stall_flag = Arc::new(AtomicBool::new(false));
522 let writer: Box<dyn Writer> = Box::new(BackboneWriter {
523 stream: writer_stream,
524 runtime: Arc::clone(&runtime),
525 interface_name: name.clone(),
526 interface_id: client_id,
527 event_tx: tx.clone(),
528 pending: Vec::new(),
529 stall_started: None,
530 disconnect_notified: false,
531 write_stall_flag: Arc::clone(&write_stall_flag),
532 });
533
534 clients.insert(
535 key,
536 ClientState {
537 id: client_id,
538 peer_ip,
539 peer_port,
540 stream,
541 decoder: hdlc::Decoder::new(),
542 connected_at: Instant::now(),
543 has_received_data: false,
544 write_stall_flag,
545 },
546 );
547 peers
548 .entry(peer_ip)
549 .or_insert_with(PeerBehaviorState::new)
550 .connected_count += 1;
551 peer_state.lock().unwrap().upsert_snapshot(&peers);
552 let _ = tx.send(Event::BackbonePeerConnected {
553 server_interface_id,
554 peer_interface_id: client_id,
555 peer_ip,
556 peer_port,
557 });
558
559 let info = InterfaceInfo {
560 id: client_id,
561 name: format!("BackboneInterface/{}", client_id.0),
562 mode: accepted_peer_mode,
563 out_capable: true,
564 in_capable: true,
565 bitrate: Some(1_000_000_000), airtime_profile: None,
567 announce_rate_target: None,
568 announce_rate_grace: 0,
569 announce_rate_penalty: 0.0,
570 announce_cap: constants::ANNOUNCE_CAP,
571 is_local_client: false,
572 wants_tunnel: false,
573 tunnel_id: None,
574 mtu: 65535,
575 ia_freq: 0.0,
576 ip_freq: 0.0,
577 op_freq: 0.0,
578 op_samples: 0,
579 started: 0.0,
580 ingress_control,
581 };
582
583 if tx
584 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
585 .is_err()
586 {
587 cleanup(&poller, &clients, &listener);
589 return Ok(());
590 }
591 }
592 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
593 Err(e) => {
594 log::warn!("[{}] accept error: {}", name, e);
595 break;
596 }
597 }
598 }
599 poller.modify(&listener, PollEvent::readable(LISTENER_KEY))?;
601 } else if clients.contains_key(&ev.key) {
602 let key = ev.key;
603 let mut should_remove = false;
604 let mut client_id = InterfaceId(0);
605
606 let mut buf = [0u8; 4096];
607 let read_result = {
608 let client = clients.get_mut(&key).unwrap();
609 client.stream.read(&mut buf)
610 };
611
612 match read_result {
613 Ok(0) | Err(_) => {
614 if let Some(c) = clients.get(&key) {
615 client_id = c.id;
616 }
617 should_remove = true;
618 }
619 Ok(n) => {
620 let client = clients.get_mut(&key).unwrap();
621 client_id = client.id;
622 client.has_received_data = true;
623 for frame in client.decoder.feed(&buf[..n]) {
624 if tx
625 .send(Event::Frame {
626 interface_id: client_id,
627 data: frame,
628 rssi: None,
629 snr: None,
630 })
631 .is_err()
632 {
633 cleanup(&poller, &clients, &listener);
634 return Ok(());
635 }
636 }
637 }
638 }
639
640 if should_remove {
641 let reason = if clients
642 .get(&key)
643 .is_some_and(|c| c.write_stall_flag.load(Ordering::Relaxed))
644 {
645 DisconnectReason::WriteStall
646 } else {
647 DisconnectReason::RemoteClosed
648 };
649 disconnect_client(
650 &poller,
651 &mut clients,
652 &mut peers,
653 &name,
654 server_interface_id,
655 &tx,
656 &peer_state,
657 key,
658 client_id,
659 reason,
660 );
661 } else if let Some(client) = clients.get(&key) {
662 poller.modify(&client.stream, PollEvent::readable(key))?;
664 }
665 }
666 }
667
668 if let Some(timeout) = idle_timeout {
669 let now = Instant::now();
670 let timed_out: Vec<(usize, InterfaceId)> = clients
671 .iter()
672 .filter_map(|(&key, client)| {
673 if client.has_received_data || now.duration_since(client.connected_at) < timeout
674 {
675 None
676 } else {
677 Some((key, client.id))
678 }
679 })
680 .collect();
681
682 for (key, client_id) in timed_out {
683 disconnect_client(
684 &poller,
685 &mut clients,
686 &mut peers,
687 &name,
688 server_interface_id,
689 &tx,
690 &peer_state,
691 key,
692 client_id,
693 DisconnectReason::IdleTimeout,
694 );
695 }
696 }
697 }
698}
699
700fn cleanup_peer_state(peers: &mut HashMap<IpAddr, PeerBehaviorState>) {
701 let now = Instant::now();
702 peers.retain(|_, state| {
703 if matches!(state.blacklisted_until, Some(until) if now >= until) {
704 state.blacklisted_until = None;
705 state.blacklist_reason = None;
706 }
707 state.blacklisted_until.is_some() || state.connected_count > 0 || state.reject_count > 0
708 });
709}
710
711fn is_ip_blacklisted(peers: &mut HashMap<IpAddr, PeerBehaviorState>, peer_ip: IpAddr) -> bool {
712 let now = Instant::now();
713 if let Some(state) = peers.get_mut(&peer_ip) {
714 if let Some(until) = state.blacklisted_until {
715 if now < until {
716 return true;
717 }
718 state.blacklisted_until = None;
719 }
720 }
721 false
722}
723
724fn disconnect_client(
725 poller: &Poller,
726 clients: &mut HashMap<usize, ClientState>,
727 peers: &mut HashMap<IpAddr, PeerBehaviorState>,
728 name: &str,
729 server_interface_id: InterfaceId,
730 tx: &EventSender,
731 peer_state: &Arc<Mutex<BackbonePeerMonitor>>,
732 key: usize,
733 client_id: InterfaceId,
734 reason: DisconnectReason,
735) {
736 let Some(client) = clients.remove(&key) else {
737 return;
738 };
739
740 match reason {
741 DisconnectReason::RemoteClosed => {
742 log::info!("[{}] backbone client {} disconnected", name, client_id.0);
743 }
744 DisconnectReason::IdleTimeout => {
745 log::info!(
746 "[{}] backbone client {} disconnected due to idle timeout",
747 name,
748 client_id.0
749 );
750 }
751 DisconnectReason::WriteStall => {
752 }
754 }
755
756 let _ = poller.delete(&client.stream);
757 let connected_for = client.connected_at.elapsed();
759 let _ = tx.send(Event::BackbonePeerDisconnected {
760 server_interface_id,
761 peer_interface_id: client.id,
762 peer_ip: client.peer_ip,
763 peer_port: client.peer_port,
764 connected_for,
765 had_received_data: client.has_received_data,
766 });
767 match reason {
768 DisconnectReason::IdleTimeout => {
769 let _ = tx.send(Event::BackbonePeerIdleTimeout {
770 server_interface_id,
771 peer_interface_id: client.id,
772 peer_ip: client.peer_ip,
773 peer_port: client.peer_port,
774 connected_for,
775 });
776 }
777 DisconnectReason::WriteStall => {
778 let _ = tx.send(Event::BackbonePeerWriteStall {
779 server_interface_id,
780 peer_interface_id: client.id,
781 peer_ip: client.peer_ip,
782 peer_port: client.peer_port,
783 connected_for,
784 });
785 }
786 DisconnectReason::RemoteClosed => {}
787 }
788
789 if let Some(state) = peers.get_mut(&client.peer_ip) {
790 state.connected_count = state.connected_count.saturating_sub(1);
791 }
792 peer_state.lock().unwrap().upsert_snapshot(peers);
793 if !matches!(reason, DisconnectReason::WriteStall) {
795 let _ = tx.send(Event::InterfaceDown(client_id));
796 }
797}
798
799fn set_tcp_keepalive(stream: &TcpStream) -> io::Result<()> {
800 let sock = SockRef::from(stream);
801 let mut keepalive = TcpKeepalive::new()
802 .with_time(Duration::from_secs(5))
803 .with_interval(Duration::from_secs(2));
804 #[cfg(any(target_os = "linux", target_os = "macos"))]
805 {
806 keepalive = keepalive.with_retries(12);
807 }
808 sock.set_tcp_keepalive(&keepalive)
809}
810
811fn cleanup(poller: &Poller, clients: &HashMap<usize, ClientState>, listener: &TcpListener) {
812 for (_, client) in clients {
813 let _ = poller.delete(&client.stream);
814 }
815 let _ = poller.delete(listener);
816}
817
818#[derive(Debug, Clone)]
824pub struct BackboneClientConfig {
825 pub name: String,
826 pub target_host: String,
827 pub target_port: u16,
828 pub interface_id: InterfaceId,
829 pub reconnect_wait: Duration,
830 pub max_reconnect_tries: Option<u32>,
831 pub connect_timeout: Duration,
832 pub transport_identity: Option<String>,
833 pub runtime: Arc<Mutex<BackboneClientRuntime>>,
834}
835
836#[derive(Debug, Clone)]
837pub struct BackboneClientRuntime {
838 pub reconnect_wait: Duration,
839 pub max_reconnect_tries: Option<u32>,
840 pub connect_timeout: Duration,
841}
842
843impl BackboneClientRuntime {
844 pub fn from_config(config: &BackboneClientConfig) -> Self {
845 Self {
846 reconnect_wait: config.reconnect_wait,
847 max_reconnect_tries: config.max_reconnect_tries,
848 connect_timeout: config.connect_timeout,
849 }
850 }
851}
852
853#[derive(Debug, Clone)]
854pub struct BackboneClientRuntimeConfigHandle {
855 pub interface_name: String,
856 pub runtime: Arc<Mutex<BackboneClientRuntime>>,
857 pub startup: BackboneClientRuntime,
858}
859
860impl Default for BackboneClientConfig {
861 fn default() -> Self {
862 let mut config = BackboneClientConfig {
863 name: String::new(),
864 target_host: "127.0.0.1".into(),
865 target_port: 4242,
866 interface_id: InterfaceId(0),
867 reconnect_wait: Duration::from_secs(5),
868 max_reconnect_tries: None,
869 connect_timeout: Duration::from_secs(5),
870 transport_identity: None,
871 runtime: Arc::new(Mutex::new(BackboneClientRuntime {
872 reconnect_wait: Duration::from_secs(5),
873 max_reconnect_tries: None,
874 connect_timeout: Duration::from_secs(5),
875 })),
876 };
877 let startup = BackboneClientRuntime::from_config(&config);
878 config.runtime = Arc::new(Mutex::new(startup));
879 config
880 }
881}
882
883struct BackboneClientWriter {
885 stream: TcpStream,
886}
887
888impl Writer for BackboneClientWriter {
889 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
890 self.stream.write_all(&hdlc::frame(data))
891 }
892}
893
894fn try_connect_client(config: &BackboneClientConfig) -> io::Result<TcpStream> {
896 let runtime = config.runtime.lock().unwrap().clone();
897 let addr_str = format!("{}:{}", config.target_host, config.target_port);
898 let addr = addr_str
899 .to_socket_addrs()?
900 .next()
901 .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "no addresses resolved"))?;
902
903 let stream = TcpStream::connect_timeout(&addr, runtime.connect_timeout)?;
904 stream.set_nodelay(true)?;
905 set_tcp_keepalive(&stream).ok();
906
907 #[cfg(target_os = "macos")]
909 {
910 let sock = SockRef::from(&stream);
911 sock.set_nosigpipe(true).ok();
912 }
913
914 Ok(stream)
915}
916
917pub fn start_client(config: BackboneClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
919 let stream = try_connect_client(&config)?;
920 let reader_stream = stream.try_clone()?;
921 let writer_stream = stream.try_clone()?;
922
923 let id = config.interface_id;
924 log::info!(
925 "[{}] backbone client connected to {}:{}",
926 config.name,
927 config.target_host,
928 config.target_port
929 );
930
931 let _ = tx.send(Event::InterfaceUp(id, None, None));
933
934 thread::Builder::new()
935 .name(format!("backbone-client-{}", id.0))
936 .spawn(move || {
937 client_reader_loop(reader_stream, config, tx);
938 })?;
939
940 Ok(Box::new(BackboneClientWriter {
941 stream: writer_stream,
942 }))
943}
944
945fn client_reader_loop(mut stream: TcpStream, config: BackboneClientConfig, tx: EventSender) {
948 let id = config.interface_id;
949 let mut decoder = hdlc::Decoder::new();
950 let mut buf = [0u8; 4096];
951
952 loop {
953 match stream.read(&mut buf) {
954 Ok(0) => {
955 log::warn!("[{}] connection closed", config.name);
956 let _ = tx.send(Event::InterfaceDown(id));
957 match client_reconnect(&config, &tx) {
958 Some(new_stream) => {
959 stream = new_stream;
960 decoder = hdlc::Decoder::new();
961 continue;
962 }
963 None => {
964 log::error!("[{}] reconnection failed, giving up", config.name);
965 return;
966 }
967 }
968 }
969 Ok(n) => {
970 for frame in decoder.feed(&buf[..n]) {
971 if tx
972 .send(Event::Frame {
973 interface_id: id,
974 data: frame,
975 rssi: None,
976 snr: None,
977 })
978 .is_err()
979 {
980 return;
981 }
982 }
983 }
984 Err(e) => {
985 log::warn!("[{}] read error: {}", config.name, e);
986 let _ = tx.send(Event::InterfaceDown(id));
987 match client_reconnect(&config, &tx) {
988 Some(new_stream) => {
989 stream = new_stream;
990 decoder = hdlc::Decoder::new();
991 continue;
992 }
993 None => {
994 log::error!("[{}] reconnection failed, giving up", config.name);
995 return;
996 }
997 }
998 }
999 }
1000 }
1001}
1002
1003const MAX_BACKOFF_SHIFT: u32 = 6;
1006
1007fn client_reconnect(config: &BackboneClientConfig, tx: &EventSender) -> Option<TcpStream> {
1011 let mut attempts = 0u32;
1012 loop {
1013 let runtime = config.runtime.lock().unwrap().clone();
1014
1015 let shift = attempts.min(MAX_BACKOFF_SHIFT);
1016 let backoff = runtime.reconnect_wait * 2u32.pow(shift);
1017 let jitter_range = backoff / 4;
1019 let jitter = if jitter_range.as_nanos() > 0 {
1020 let offset = Duration::from_nanos(
1021 (std::hash::RandomState::new().build_hasher().finish()
1022 % jitter_range.as_nanos() as u64)
1023 * 2,
1024 );
1025 if offset > jitter_range {
1026 backoff + (offset - jitter_range)
1027 } else {
1028 backoff - (jitter_range - offset)
1029 }
1030 } else {
1031 backoff
1032 };
1033 thread::sleep(jitter);
1034
1035 attempts += 1;
1036
1037 if let Some(max) = runtime.max_reconnect_tries {
1038 if attempts > max {
1039 let _ = tx.send(Event::InterfaceDown(config.interface_id));
1040 return None;
1041 }
1042 }
1043
1044 log::info!(
1045 "[{}] reconnect attempt {} (backoff {:.1}s) ...",
1046 config.name,
1047 attempts,
1048 jitter.as_secs_f64(),
1049 );
1050
1051 match try_connect_client(config) {
1052 Ok(new_stream) => {
1053 let writer_stream = match new_stream.try_clone() {
1054 Ok(s) => s,
1055 Err(e) => {
1056 log::warn!("[{}] failed to clone stream: {}", config.name, e);
1057 continue;
1058 }
1059 };
1060 log::info!(
1061 "[{}] reconnected after {} attempt(s)",
1062 config.name,
1063 attempts
1064 );
1065 let new_writer: Box<dyn Writer> = Box::new(BackboneClientWriter {
1066 stream: writer_stream,
1067 });
1068 let _ = tx.send(Event::InterfaceUp(
1069 config.interface_id,
1070 Some(new_writer),
1071 None,
1072 ));
1073 return Some(new_stream);
1074 }
1075 Err(e) => {
1076 log::warn!("[{}] reconnect failed: {}", config.name, e);
1077 }
1078 }
1079 }
1080}
1081
1082#[derive(Clone)]
1089pub(crate) enum BackboneMode {
1090 Server(BackboneConfig),
1091 Client(BackboneClientConfig, u8),
1092}
1093
1094pub struct BackboneInterfaceFactory;
1100
1101fn parse_positive_duration_secs(params: &HashMap<String, String>, key: &str) -> Option<Duration> {
1102 params
1103 .get(key)
1104 .and_then(|v| v.parse::<f64>().ok())
1105 .filter(|v| *v > 0.0)
1106 .map(Duration::from_secs_f64)
1107}
1108
1109impl InterfaceFactory for BackboneInterfaceFactory {
1110 fn type_name(&self) -> &str {
1111 "BackboneInterface"
1112 }
1113
1114 fn parse_config(
1115 &self,
1116 name: &str,
1117 id: InterfaceId,
1118 params: &HashMap<String, String>,
1119 ) -> Result<Box<dyn InterfaceConfigData>, String> {
1120 if let Some(target_host) = params.get("remote").or_else(|| params.get("target_host")) {
1121 let target_host = target_host.clone();
1123 let target_port = params
1124 .get("target_port")
1125 .or_else(|| params.get("port"))
1126 .and_then(|v| v.parse().ok())
1127 .unwrap_or(4242);
1128 let transport_identity = params.get("transport_identity").cloned();
1129 let priority = match params.get("priority") {
1130 Some(value) => value.parse::<u8>().map_err(|_| {
1131 format!(
1132 "invalid Backbone peer priority '{}' (expected 0..100)",
1133 value
1134 )
1135 })?,
1136 None => crate::driver::BACKBONE_PEER_POOL_CONFIGURED_DEFAULT_PRIORITY,
1137 };
1138 if priority > 100 {
1139 return Err(format!(
1140 "invalid Backbone peer priority '{}' (expected 0..100)",
1141 priority
1142 ));
1143 }
1144 Ok(Box::new(BackboneMode::Client(
1145 BackboneClientConfig {
1146 name: name.to_string(),
1147 target_host,
1148 target_port,
1149 interface_id: id,
1150 transport_identity,
1151 ..BackboneClientConfig::default()
1152 },
1153 priority,
1154 )))
1155 } else {
1156 let listen_ip = params
1158 .get("listen_ip")
1159 .or_else(|| params.get("device"))
1160 .cloned()
1161 .unwrap_or_else(|| "0.0.0.0".into());
1162 let listen_port = params
1163 .get("listen_port")
1164 .or_else(|| params.get("port"))
1165 .and_then(|v| v.parse().ok())
1166 .unwrap_or(4242);
1167 let max_connections = params.get("max_connections").and_then(|v| v.parse().ok());
1168 let idle_timeout = parse_positive_duration_secs(params, "idle_timeout");
1169 let write_stall_timeout = parse_positive_duration_secs(params, "write_stall_timeout");
1170 let abuse = BackboneAbuseConfig {
1171 max_penalty_duration: parse_positive_duration_secs(params, "max_penalty_duration"),
1172 };
1173 let mut config = BackboneConfig {
1174 name: name.to_string(),
1175 listen_ip,
1176 listen_port,
1177 interface_id: id,
1178 mode: constants::MODE_FULL,
1179 max_connections,
1180 idle_timeout,
1181 write_stall_timeout,
1182 abuse,
1183 ingress_control: IngressControlConfig::enabled(),
1184 runtime: Arc::new(Mutex::new(BackboneServerRuntime {
1185 max_connections: None,
1186 idle_timeout: None,
1187 write_stall_timeout: None,
1188 abuse: BackboneAbuseConfig::default(),
1189 })),
1190 peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
1191 };
1192 let startup = BackboneServerRuntime::from_config(&config);
1193 config.runtime = Arc::new(Mutex::new(startup));
1194 Ok(Box::new(BackboneMode::Server(config)))
1195 }
1196 }
1197
1198 fn start(
1199 &self,
1200 config: Box<dyn InterfaceConfigData>,
1201 ctx: StartContext,
1202 ) -> io::Result<StartResult> {
1203 let mode = *config.into_any().downcast::<BackboneMode>().map_err(|_| {
1204 io::Error::new(
1205 io::ErrorKind::InvalidData,
1206 "wrong config type for BackboneInterface",
1207 )
1208 })?;
1209
1210 match mode {
1211 BackboneMode::Client(cfg, _) => {
1212 let id = cfg.interface_id;
1213 let name = cfg.name.clone();
1214 let info = InterfaceInfo {
1215 id,
1216 name,
1217 mode: ctx.mode,
1218 out_capable: true,
1219 in_capable: true,
1220 bitrate: Some(1_000_000_000),
1221 airtime_profile: None,
1222 announce_rate_target: None,
1223 announce_rate_grace: 0,
1224 announce_rate_penalty: 0.0,
1225 announce_cap: constants::ANNOUNCE_CAP,
1226 is_local_client: false,
1227 wants_tunnel: false,
1228 tunnel_id: None,
1229 mtu: 65535,
1230 ingress_control: ctx.ingress_control,
1231 ia_freq: 0.0,
1232 ip_freq: 0.0,
1233 op_freq: 0.0,
1234 op_samples: 0,
1235 started: crate::time::now(),
1236 };
1237 let writer = start_client(cfg, ctx.tx)?;
1238 Ok(StartResult::Simple {
1239 id,
1240 info,
1241 writer,
1242 interface_type_name: "BackboneInterface".to_string(),
1243 })
1244 }
1245 BackboneMode::Server(mut cfg) => {
1246 cfg.ingress_control = ctx.ingress_control;
1247 cfg.mode = ctx.mode;
1248 start(cfg, ctx.tx, ctx.next_dynamic_id)?;
1249 Ok(StartResult::Listener { control: None })
1250 }
1251 }
1252 }
1253}
1254
1255pub(crate) fn runtime_handle_from_mode(mode: &BackboneMode) -> Option<BackboneRuntimeConfigHandle> {
1256 match mode {
1257 BackboneMode::Server(config) => Some(BackboneRuntimeConfigHandle {
1258 interface_name: config.name.clone(),
1259 runtime: Arc::clone(&config.runtime),
1260 startup: BackboneServerRuntime::from_config(config),
1261 }),
1262 BackboneMode::Client(_, _) => None,
1263 }
1264}
1265
1266pub(crate) fn peer_state_handle_from_mode(mode: &BackboneMode) -> Option<BackbonePeerStateHandle> {
1267 match mode {
1268 BackboneMode::Server(config) => Some(BackbonePeerStateHandle {
1269 interface_id: config.interface_id,
1270 interface_name: config.name.clone(),
1271 peer_state: Arc::clone(&config.peer_state),
1272 }),
1273 BackboneMode::Client(_, _) => None,
1274 }
1275}
1276
1277pub(crate) fn client_runtime_handle_from_mode(
1278 mode: &BackboneMode,
1279) -> Option<BackboneClientRuntimeConfigHandle> {
1280 match mode {
1281 BackboneMode::Client(config, _) => Some(BackboneClientRuntimeConfigHandle {
1282 interface_name: config.name.clone(),
1283 runtime: Arc::clone(&config.runtime),
1284 startup: BackboneClientRuntime::from_config(config),
1285 }),
1286 BackboneMode::Server(_) => None,
1287 }
1288}
1289
1290pub(crate) fn client_config_from_mode(mode: &BackboneMode) -> Option<BackboneClientConfig> {
1291 match mode {
1292 BackboneMode::Client(config, _) => Some(config.clone()),
1293 BackboneMode::Server(_) => None,
1294 }
1295}
1296
1297pub(crate) fn client_priority_from_mode(mode: &BackboneMode) -> Option<u8> {
1298 match mode {
1299 BackboneMode::Client(_, priority) => Some(*priority),
1300 BackboneMode::Server(_) => None,
1301 }
1302}
1303
1304#[cfg(test)]
1305mod tests {
1306 use super::*;
1307 use std::sync::mpsc;
1308 use std::time::Duration;
1309
1310 fn find_free_port() -> u16 {
1311 TcpListener::bind("127.0.0.1:0")
1312 .unwrap()
1313 .local_addr()
1314 .unwrap()
1315 .port()
1316 }
1317
1318 fn recv_non_peer_event(
1319 rx: &mpsc::Receiver<Event>,
1320 timeout: Duration,
1321 ) -> Result<Event, mpsc::RecvTimeoutError> {
1322 let deadline = Instant::now() + timeout;
1323 loop {
1324 let remaining = deadline.saturating_duration_since(Instant::now());
1325 if remaining.is_zero() {
1326 return Err(mpsc::RecvTimeoutError::Timeout);
1327 }
1328 let event = rx.recv_timeout(remaining)?;
1329 match event {
1330 Event::BackbonePeerConnected { .. }
1331 | Event::BackbonePeerDisconnected { .. }
1332 | Event::BackbonePeerIdleTimeout { .. }
1333 | Event::BackbonePeerWriteStall { .. }
1334 | Event::BackbonePeerPenalty { .. } => continue,
1335 other => return Ok(other),
1336 }
1337 }
1338 }
1339
1340 fn make_server_config(
1341 port: u16,
1342 interface_id: u64,
1343 max_connections: Option<usize>,
1344 idle_timeout: Option<Duration>,
1345 write_stall_timeout: Option<Duration>,
1346 abuse: BackboneAbuseConfig,
1347 ) -> BackboneConfig {
1348 let mut config = BackboneConfig {
1349 name: "test-backbone".into(),
1350 listen_ip: "127.0.0.1".into(),
1351 listen_port: port,
1352 interface_id: InterfaceId(interface_id),
1353 mode: constants::MODE_FULL,
1354 max_connections,
1355 idle_timeout,
1356 write_stall_timeout,
1357 abuse,
1358 ingress_control: IngressControlConfig::enabled(),
1359 runtime: Arc::new(Mutex::new(BackboneServerRuntime {
1360 max_connections: None,
1361 idle_timeout: None,
1362 write_stall_timeout: None,
1363 abuse: BackboneAbuseConfig::default(),
1364 })),
1365 peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
1366 };
1367 let startup = BackboneServerRuntime::from_config(&config);
1368 config.runtime = Arc::new(Mutex::new(startup));
1369 config
1370 }
1371
1372 #[test]
1373 fn backbone_accept_connection() {
1374 let port = find_free_port();
1375 let (tx, rx) = crate::event::channel();
1376 let next_id = Arc::new(AtomicU64::new(8000));
1377
1378 let config = make_server_config(port, 80, None, None, None, BackboneAbuseConfig::default());
1379
1380 start(config, tx, next_id).unwrap();
1381 thread::sleep(Duration::from_millis(50));
1382
1383 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1384
1385 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1386 match event {
1387 Event::InterfaceUp(id, writer, info) => {
1388 assert_eq!(id, InterfaceId(8000));
1389 assert!(writer.is_some());
1390 assert!(info.is_some());
1391 let info = info.unwrap();
1392 assert!(info.out_capable);
1393 assert!(info.in_capable);
1394 }
1395 other => panic!("expected InterfaceUp, got {:?}", other),
1396 }
1397 }
1398
1399 #[test]
1400 fn backbone_accepted_connection_inherits_server_mode() {
1401 let port = find_free_port();
1402 let (tx, rx) = crate::event::channel();
1403 let next_id = Arc::new(AtomicU64::new(8050));
1404
1405 let mut config =
1406 make_server_config(port, 85, None, None, None, BackboneAbuseConfig::default());
1407 config.mode = constants::MODE_GATEWAY;
1408
1409 start(config, tx, next_id).unwrap();
1410 thread::sleep(Duration::from_millis(50));
1411
1412 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1413
1414 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1415 match event {
1416 Event::InterfaceUp(_, _, Some(info)) => {
1417 assert_eq!(info.mode, constants::MODE_GATEWAY);
1418 }
1419 other => panic!("expected InterfaceUp with info, got {:?}", other),
1420 }
1421 }
1422
1423 #[test]
1424 fn backbone_receive_frame() {
1425 let port = find_free_port();
1426 let (tx, rx) = crate::event::channel();
1427 let next_id = Arc::new(AtomicU64::new(8100));
1428
1429 let config = make_server_config(port, 81, None, None, None, BackboneAbuseConfig::default());
1430
1431 start(config, tx, next_id).unwrap();
1432 thread::sleep(Duration::from_millis(50));
1433
1434 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1435
1436 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1438
1439 let payload: Vec<u8> = (0..32).collect();
1441 client.write_all(&hdlc::frame(&payload)).unwrap();
1442
1443 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1444 match event {
1445 Event::Frame {
1446 interface_id,
1447 data,
1448 rssi: _,
1449 snr: _,
1450 } => {
1451 assert_eq!(interface_id, InterfaceId(8100));
1452 assert_eq!(data, payload);
1453 }
1454 other => panic!("expected Frame, got {:?}", other),
1455 }
1456 }
1457
1458 #[test]
1459 fn backbone_send_to_client() {
1460 let port = find_free_port();
1461 let (tx, rx) = crate::event::channel();
1462 let next_id = Arc::new(AtomicU64::new(8200));
1463
1464 let config = make_server_config(port, 82, None, None, None, BackboneAbuseConfig::default());
1465
1466 start(config, tx, next_id).unwrap();
1467 thread::sleep(Duration::from_millis(50));
1468
1469 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1470 client
1471 .set_read_timeout(Some(Duration::from_secs(2)))
1472 .unwrap();
1473
1474 let event = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1476 let mut writer = match event {
1477 Event::InterfaceUp(_, Some(w), _) => w,
1478 other => panic!("expected InterfaceUp with writer, got {:?}", other),
1479 };
1480
1481 let payload: Vec<u8> = (0..24).collect();
1483 writer.send_frame(&payload).unwrap();
1484
1485 let mut buf = [0u8; 256];
1487 let n = client.read(&mut buf).unwrap();
1488 let expected = hdlc::frame(&payload);
1489 assert_eq!(&buf[..n], &expected[..]);
1490 }
1491
1492 #[test]
1493 fn backbone_multiple_clients() {
1494 let port = find_free_port();
1495 let (tx, rx) = crate::event::channel();
1496 let next_id = Arc::new(AtomicU64::new(8300));
1497
1498 let config = make_server_config(port, 83, None, None, None, BackboneAbuseConfig::default());
1499
1500 start(config, tx, next_id).unwrap();
1501 thread::sleep(Duration::from_millis(50));
1502
1503 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1504 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1505
1506 let mut ids = Vec::new();
1507 for _ in 0..2 {
1508 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1509 match event {
1510 Event::InterfaceUp(id, _, _) => ids.push(id),
1511 other => panic!("expected InterfaceUp, got {:?}", other),
1512 }
1513 }
1514
1515 assert_eq!(ids.len(), 2);
1516 assert_ne!(ids[0], ids[1]);
1517 }
1518
1519 #[test]
1520 fn backbone_client_disconnect() {
1521 let port = find_free_port();
1522 let (tx, rx) = crate::event::channel();
1523 let next_id = Arc::new(AtomicU64::new(8400));
1524
1525 let config = make_server_config(port, 84, None, None, None, BackboneAbuseConfig::default());
1526
1527 start(config, tx, next_id).unwrap();
1528 thread::sleep(Duration::from_millis(50));
1529
1530 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1531
1532 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1534
1535 drop(client);
1537
1538 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1540 assert!(
1541 matches!(event, Event::InterfaceDown(InterfaceId(8400))),
1542 "expected InterfaceDown(8400), got {:?}",
1543 event
1544 );
1545 }
1546
1547 #[test]
1548 fn backbone_epoll_multiplexing() {
1549 let port = find_free_port();
1550 let (tx, rx) = crate::event::channel();
1551 let next_id = Arc::new(AtomicU64::new(8500));
1552
1553 let config = make_server_config(port, 85, None, None, None, BackboneAbuseConfig::default());
1554
1555 start(config, tx, next_id).unwrap();
1556 thread::sleep(Duration::from_millis(50));
1557
1558 let mut client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1559 let mut client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1560
1561 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1563 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1564
1565 let payload1: Vec<u8> = (0..24).collect();
1567 let payload2: Vec<u8> = (100..130).collect();
1568 client1.write_all(&hdlc::frame(&payload1)).unwrap();
1569 client2.write_all(&hdlc::frame(&payload2)).unwrap();
1570
1571 let mut received = Vec::new();
1573 for _ in 0..2 {
1574 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1575 match event {
1576 Event::Frame { data, .. } => received.push(data),
1577 other => panic!("expected Frame, got {:?}", other),
1578 }
1579 }
1580 assert!(received.contains(&payload1));
1581 assert!(received.contains(&payload2));
1582 }
1583
1584 #[test]
1585 fn backbone_bind_port() {
1586 let port = find_free_port();
1587 let (tx, _rx) = crate::event::channel();
1588 let next_id = Arc::new(AtomicU64::new(8600));
1589
1590 let config = make_server_config(port, 86, None, None, None, BackboneAbuseConfig::default());
1591
1592 start(config, tx, next_id).unwrap();
1594 }
1595
1596 #[test]
1597 fn backbone_hdlc_fragmented() {
1598 let port = find_free_port();
1599 let (tx, rx) = crate::event::channel();
1600 let next_id = Arc::new(AtomicU64::new(8700));
1601
1602 let config = make_server_config(port, 87, None, None, None, BackboneAbuseConfig::default());
1603
1604 start(config, tx, next_id).unwrap();
1605 thread::sleep(Duration::from_millis(50));
1606
1607 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1608 client.set_nodelay(true).unwrap();
1609
1610 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1612
1613 let payload: Vec<u8> = (0..32).collect();
1615 let framed = hdlc::frame(&payload);
1616 let mid = framed.len() / 2;
1617
1618 client.write_all(&framed[..mid]).unwrap();
1619 thread::sleep(Duration::from_millis(50));
1620 client.write_all(&framed[mid..]).unwrap();
1621
1622 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1624 match event {
1625 Event::Frame { data, .. } => {
1626 assert_eq!(data, payload);
1627 }
1628 other => panic!("expected Frame, got {:?}", other),
1629 }
1630 }
1631
1632 fn make_client_config(port: u16, id: u64) -> BackboneClientConfig {
1637 BackboneClientConfig {
1638 name: format!("test-bb-client-{}", port),
1639 target_host: "127.0.0.1".into(),
1640 target_port: port,
1641 interface_id: InterfaceId(id),
1642 reconnect_wait: Duration::from_millis(100),
1643 max_reconnect_tries: Some(2),
1644 connect_timeout: Duration::from_secs(2),
1645 transport_identity: None,
1646 runtime: Arc::new(Mutex::new(BackboneClientRuntime {
1647 reconnect_wait: Duration::from_millis(100),
1648 max_reconnect_tries: Some(2),
1649 connect_timeout: Duration::from_secs(2),
1650 })),
1651 }
1652 }
1653
1654 #[test]
1655 fn backbone_client_connect() {
1656 let port = find_free_port();
1657 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1658 let (tx, rx) = crate::event::channel();
1659
1660 let config = make_client_config(port, 9000);
1661 let _writer = start_client(config, tx).unwrap();
1662
1663 let _server_stream = listener.accept().unwrap();
1664
1665 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1666 assert!(matches!(event, Event::InterfaceUp(InterfaceId(9000), _, _)));
1667 }
1668
1669 #[test]
1670 fn backbone_client_receive_frame() {
1671 let port = find_free_port();
1672 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1673 let (tx, rx) = crate::event::channel();
1674
1675 let config = make_client_config(port, 9100);
1676 let _writer = start_client(config, tx).unwrap();
1677
1678 let (mut server_stream, _) = listener.accept().unwrap();
1679
1680 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1682
1683 let payload: Vec<u8> = (0..32).collect();
1685 server_stream.write_all(&hdlc::frame(&payload)).unwrap();
1686
1687 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1688 match event {
1689 Event::Frame {
1690 interface_id,
1691 data,
1692 rssi: _,
1693 snr: _,
1694 } => {
1695 assert_eq!(interface_id, InterfaceId(9100));
1696 assert_eq!(data, payload);
1697 }
1698 other => panic!("expected Frame, got {:?}", other),
1699 }
1700 }
1701
1702 #[test]
1703 fn backbone_client_send_frame() {
1704 let port = find_free_port();
1705 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1706 let (tx, _rx) = crate::event::channel();
1707
1708 let config = make_client_config(port, 9200);
1709 let mut writer = start_client(config, tx).unwrap();
1710
1711 let (mut server_stream, _) = listener.accept().unwrap();
1712 server_stream
1713 .set_read_timeout(Some(Duration::from_secs(2)))
1714 .unwrap();
1715
1716 let payload: Vec<u8> = (0..24).collect();
1717 writer.send_frame(&payload).unwrap();
1718
1719 let mut buf = [0u8; 256];
1720 let n = server_stream.read(&mut buf).unwrap();
1721 let expected = hdlc::frame(&payload);
1722 assert_eq!(&buf[..n], &expected[..]);
1723 }
1724
1725 #[test]
1726 fn backbone_max_connections_rejects_excess() {
1727 let port = find_free_port();
1728 let (tx, rx) = crate::event::channel();
1729 let next_id = Arc::new(AtomicU64::new(8800));
1730
1731 let config = make_server_config(
1732 port,
1733 88,
1734 Some(2),
1735 None,
1736 None,
1737 BackboneAbuseConfig::default(),
1738 );
1739
1740 start(config, tx, next_id).unwrap();
1741 thread::sleep(Duration::from_millis(50));
1742
1743 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1745 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1746
1747 for _ in 0..2 {
1749 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1750 assert!(matches!(event, Event::InterfaceUp(_, _, _)));
1751 }
1752
1753 let client3 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1755 client3
1756 .set_read_timeout(Some(Duration::from_millis(500)))
1757 .unwrap();
1758
1759 thread::sleep(Duration::from_millis(100));
1761
1762 let result = recv_non_peer_event(&rx, Duration::from_millis(500));
1764 assert!(
1765 result.is_err(),
1766 "expected no InterfaceUp for rejected connection, got {:?}",
1767 result
1768 );
1769 }
1770
1771 #[test]
1772 fn backbone_max_connections_allows_after_disconnect() {
1773 let port = find_free_port();
1774 let (tx, rx) = crate::event::channel();
1775 let next_id = Arc::new(AtomicU64::new(8900));
1776
1777 let config = make_server_config(
1778 port,
1779 89,
1780 Some(1),
1781 None,
1782 None,
1783 BackboneAbuseConfig::default(),
1784 );
1785
1786 start(config, tx, next_id).unwrap();
1787 thread::sleep(Duration::from_millis(50));
1788
1789 let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1791 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1792 assert!(matches!(event, Event::InterfaceUp(_, _, _)));
1793
1794 drop(client1);
1796
1797 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1799 assert!(matches!(event, Event::InterfaceDown(_)));
1800
1801 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1803 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1804 assert!(
1805 matches!(event, Event::InterfaceUp(_, _, _)),
1806 "expected InterfaceUp after slot freed, got {:?}",
1807 event
1808 );
1809 }
1810
1811 #[test]
1812 fn backbone_client_reconnect() {
1813 let port = find_free_port();
1814 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1815 listener.set_nonblocking(false).unwrap();
1816 let (tx, rx) = crate::event::channel();
1817
1818 let config = make_client_config(port, 9300);
1819 let _writer = start_client(config, tx).unwrap();
1820
1821 let (server_stream, _) = listener.accept().unwrap();
1823
1824 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1826
1827 drop(server_stream);
1828
1829 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1831 assert!(matches!(event, Event::InterfaceDown(InterfaceId(9300))));
1832
1833 let _server_stream2 = listener.accept().unwrap();
1835
1836 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1838 assert!(matches!(event, Event::InterfaceUp(InterfaceId(9300), _, _)));
1839 }
1840
1841 #[test]
1842 fn backbone_idle_timeout_disconnects_silent_client() {
1843 let port = find_free_port();
1844 let (tx, rx) = crate::event::channel();
1845 let next_id = Arc::new(AtomicU64::new(9400));
1846
1847 let config = make_server_config(
1848 port,
1849 94,
1850 None,
1851 Some(Duration::from_millis(150)),
1852 None,
1853 BackboneAbuseConfig::default(),
1854 );
1855
1856 start(config, tx, next_id).unwrap();
1857 thread::sleep(Duration::from_millis(50));
1858
1859 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1860
1861 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1862 let client_id = match event {
1863 Event::InterfaceUp(id, _, _) => id,
1864 other => panic!("expected InterfaceUp, got {:?}", other),
1865 };
1866
1867 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1868 assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1869 }
1870
1871 #[test]
1872 fn backbone_idle_timeout_ignores_client_after_data() {
1873 let port = find_free_port();
1874 let (tx, rx) = crate::event::channel();
1875 let next_id = Arc::new(AtomicU64::new(9500));
1876
1877 let config = make_server_config(
1878 port,
1879 95,
1880 None,
1881 Some(Duration::from_millis(200)),
1882 None,
1883 BackboneAbuseConfig::default(),
1884 );
1885
1886 start(config, tx, next_id).unwrap();
1887 thread::sleep(Duration::from_millis(50));
1888
1889 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1890
1891 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1892 let client_id = match event {
1893 Event::InterfaceUp(id, _, _) => id,
1894 other => panic!("expected InterfaceUp, got {:?}", other),
1895 };
1896
1897 client.write_all(&hdlc::frame(&[1u8; 24])).unwrap();
1898
1899 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1900 match event {
1901 Event::Frame {
1902 interface_id,
1903 data,
1904 rssi: _,
1905 snr: _,
1906 } => {
1907 assert_eq!(interface_id, client_id);
1908 assert_eq!(data, vec![1u8; 24]);
1909 }
1910 other => panic!("expected Frame, got {:?}", other),
1911 }
1912
1913 let result = recv_non_peer_event(&rx, Duration::from_millis(500));
1914 assert!(
1915 result.is_err(),
1916 "expected no InterfaceDown after client sent data, got {:?}",
1917 result
1918 );
1919 }
1920
1921 #[test]
1922 fn backbone_runtime_idle_timeout_updates_live() {
1923 let port = find_free_port();
1924 let (tx, rx) = crate::event::channel();
1925 let next_id = Arc::new(AtomicU64::new(9650));
1926
1927 let config = make_server_config(port, 97, None, None, None, BackboneAbuseConfig::default());
1928 let runtime = Arc::clone(&config.runtime);
1929
1930 start(config, tx, next_id).unwrap();
1931 thread::sleep(Duration::from_millis(50));
1932
1933 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1934 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1935 let client_id = match event {
1936 Event::InterfaceUp(id, _, _) => id,
1937 other => panic!("expected InterfaceUp, got {:?}", other),
1938 };
1939
1940 {
1941 let mut runtime = runtime.lock().unwrap();
1942 runtime.idle_timeout = Some(Duration::from_millis(150));
1943 }
1944
1945 let event = recv_non_peer_event(&rx, Duration::from_secs(4)).unwrap();
1946 assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1947 }
1948
1949 #[test]
1950 fn backbone_write_stall_timeout_disconnects_unwritable_client() {
1951 let port = find_free_port();
1952 let (tx, rx) = crate::event::channel();
1953 let next_id = Arc::new(AtomicU64::new(9660));
1954
1955 let config = make_server_config(
1956 port,
1957 98,
1958 None,
1959 None,
1960 Some(Duration::from_millis(50)),
1961 BackboneAbuseConfig::default(),
1962 );
1963
1964 start(config, tx, next_id).unwrap();
1965 thread::sleep(Duration::from_millis(50));
1966
1967 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1968 client
1969 .set_read_timeout(Some(Duration::from_millis(100)))
1970 .unwrap();
1971 let sock = SockRef::from(&client);
1972 sock.set_recv_buffer_size(4096).ok();
1973
1974 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1975 let (client_id, mut writer) = match event {
1976 Event::InterfaceUp(id, Some(writer), _) => (id, writer),
1977 other => panic!("expected InterfaceUp with writer, got {:?}", other),
1978 };
1979
1980 let payload = vec![0x55; 512 * 1024];
1981 let deadline = Instant::now() + Duration::from_secs(3);
1982 let mut stalled = false;
1983 while Instant::now() < deadline {
1984 match writer.send_frame(&payload) {
1985 Ok(()) => {}
1986 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1987 thread::sleep(Duration::from_millis(10));
1988 }
1989 Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {
1990 stalled = true;
1991 break;
1992 }
1993 Err(e) => panic!("unexpected send error: {}", e),
1994 }
1995 }
1996
1997 assert!(stalled, "expected writer to time out on persistent stall");
1998 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1999 assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
2000 }
2001
2002 fn wait_for<F>(rx: &mpsc::Receiver<Event>, timeout: Duration, mut pred: F) -> Option<Event>
2004 where
2005 F: FnMut(&Event) -> bool,
2006 {
2007 let deadline = Instant::now() + timeout;
2008 loop {
2009 let remaining = deadline.saturating_duration_since(Instant::now());
2010 if remaining.is_zero() {
2011 return None;
2012 }
2013 match rx.recv_timeout(remaining) {
2014 Ok(event) if pred(&event) => return Some(event),
2015 Ok(_) => continue,
2016 Err(_) => return None,
2017 }
2018 }
2019 }
2020
2021 #[test]
2022 fn backbone_write_stall_emits_peer_events() {
2023 let port = find_free_port();
2024 let (tx, rx) = crate::event::channel();
2025 let next_id = Arc::new(AtomicU64::new(9700));
2026
2027 let config = make_server_config(
2028 port,
2029 97,
2030 None,
2031 None,
2032 Some(Duration::from_millis(50)), BackboneAbuseConfig::default(),
2034 );
2035
2036 start(config, tx, next_id).unwrap();
2037 thread::sleep(Duration::from_millis(50));
2038
2039 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
2041 client
2042 .set_read_timeout(Some(Duration::from_millis(100)))
2043 .unwrap();
2044 let sock = SockRef::from(&client);
2045 sock.set_recv_buffer_size(4096).ok();
2046
2047 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
2049 let mut writer = match event {
2050 Event::InterfaceUp(_, Some(w), _) => w,
2051 other => panic!("expected InterfaceUp with writer, got {:?}", other),
2052 };
2053
2054 let payload = vec![0x55; 512 * 1024];
2056 let deadline = Instant::now() + Duration::from_secs(3);
2057 while Instant::now() < deadline {
2058 match writer.send_frame(&payload) {
2059 Ok(()) | Err(_) => {
2060 if Instant::now() + Duration::from_millis(10) > deadline {
2061 break;
2062 }
2063 thread::sleep(Duration::from_millis(10));
2064 }
2065 }
2066 }
2067
2068 let stall_event = wait_for(&rx, Duration::from_secs(3), |e| {
2070 matches!(e, Event::BackbonePeerWriteStall { .. })
2071 });
2072 assert!(
2073 stall_event.is_some(),
2074 "expected BackbonePeerWriteStall event"
2075 );
2076 }
2077
2078 #[test]
2079 fn backbone_blacklisted_peer_rejected_on_connect() {
2080 let port = find_free_port();
2081 let (tx, rx) = crate::event::channel();
2082 let next_id = Arc::new(AtomicU64::new(9800));
2083
2084 let config = make_server_config(port, 98, None, None, None, BackboneAbuseConfig::default());
2085 let peer_state = config.peer_state.clone();
2086
2087 start(config, tx.clone(), next_id).unwrap();
2088 thread::sleep(Duration::from_millis(50));
2089
2090 let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
2092 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
2093 assert!(
2094 matches!(event, Event::InterfaceUp(_, _, _)),
2095 "first connection should succeed"
2096 );
2097 drop(client1);
2098
2099 thread::sleep(Duration::from_millis(100));
2101 while rx.try_recv().is_ok() {}
2102
2103 peer_state.lock().unwrap().blacklist(
2105 "127.0.0.1".parse().unwrap(),
2106 Duration::from_secs(60),
2107 "test blacklist".into(),
2108 );
2109
2110 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
2112 thread::sleep(Duration::from_millis(200));
2114
2115 let event = rx.try_recv();
2117 match event {
2118 Ok(Event::InterfaceUp(_, _, _)) => {
2119 panic!("blacklisted peer should not get InterfaceUp")
2120 }
2121 _ => {} }
2123 }
2124
2125 #[test]
2126 fn backbone_parse_config_reads_abuse_settings() {
2127 let factory = BackboneInterfaceFactory;
2128 let mut params = HashMap::new();
2129 params.insert("listen_ip".into(), "127.0.0.1".into());
2130 params.insert("listen_port".into(), "4242".into());
2131 params.insert("idle_timeout".into(), "15".into());
2132 params.insert("write_stall_timeout".into(), "45".into());
2133 params.insert("max_penalty_duration".into(), "3600".into());
2134
2135 let config = factory
2136 .parse_config("test-backbone", InterfaceId(97), ¶ms)
2137 .unwrap();
2138 let mode = *config.into_any().downcast::<BackboneMode>().unwrap();
2139
2140 match mode {
2141 BackboneMode::Server(config) => {
2142 assert_eq!(config.listen_ip, "127.0.0.1");
2143 assert_eq!(config.listen_port, 4242);
2144 assert_eq!(config.idle_timeout, Some(Duration::from_secs(15)));
2145 assert_eq!(config.write_stall_timeout, Some(Duration::from_secs(45)));
2146 assert_eq!(
2147 config.abuse.max_penalty_duration,
2148 Some(Duration::from_secs(3600))
2149 );
2150 }
2151 BackboneMode::Client(_, _) => panic!("expected server config"),
2152 }
2153 }
2154
2155 #[test]
2156 fn backbone_parse_config_reads_client_priority() {
2157 let factory = BackboneInterfaceFactory;
2158 let mut params = HashMap::new();
2159 params.insert("remote".into(), "example.com".into());
2160 params.insert("target_port".into(), "4242".into());
2161 params.insert("priority".into(), "87".into());
2162
2163 let config = factory
2164 .parse_config("test-backbone-client", InterfaceId(96), ¶ms)
2165 .unwrap();
2166 let mode = *config.into_any().downcast::<BackboneMode>().unwrap();
2167
2168 match mode {
2169 BackboneMode::Client(_, priority) => assert_eq!(priority, 87),
2170 BackboneMode::Server(_) => panic!("expected client config"),
2171 }
2172 }
2173
2174 #[test]
2175 fn backbone_parse_config_defaults_client_priority() {
2176 let factory = BackboneInterfaceFactory;
2177 let mut params = HashMap::new();
2178 params.insert("remote".into(), "example.com".into());
2179
2180 let config = factory
2181 .parse_config("test-backbone-client", InterfaceId(95), ¶ms)
2182 .unwrap();
2183 let mode = *config.into_any().downcast::<BackboneMode>().unwrap();
2184
2185 match mode {
2186 BackboneMode::Client(_, priority) => assert_eq!(priority, 60),
2187 BackboneMode::Server(_) => panic!("expected client config"),
2188 }
2189 }
2190
2191 #[test]
2192 fn backbone_parse_config_rejects_invalid_client_priority() {
2193 let factory = BackboneInterfaceFactory;
2194 for value in ["-1", "101", "fast"] {
2195 let mut params = HashMap::new();
2196 params.insert("remote".into(), "example.com".into());
2197 params.insert("priority".into(), value.into());
2198
2199 let err = match factory.parse_config("test-backbone-client", InterfaceId(94), ¶ms) {
2200 Ok(_) => panic!("priority {value} should be rejected"),
2201 Err(err) => err,
2202 };
2203 assert!(
2204 err.contains("priority"),
2205 "unexpected error for {value}: {err}"
2206 );
2207 }
2208 }
2209}