1use std::collections::HashMap;
13use std::hash::{BuildHasher, Hasher};
14use std::io::{self, Read, Write};
15use std::net::{IpAddr, Shutdown, TcpListener, TcpStream, ToSocketAddrs};
16use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
17use std::sync::{Arc, Mutex};
18use std::thread;
19use std::time::{Duration, Instant};
20
21use polling::{Event as PollEvent, Events, Poller};
22use socket2::{SockRef, TcpKeepalive};
23
24use rns_core::constants;
25use rns_core::transport::types::{IngressControlConfig, InterfaceId, InterfaceInfo};
26
27use crate::event::{Event, EventSender};
28use crate::hdlc;
29use crate::interface::{
30 lock_or_recover, InterfaceConfigData, InterfaceFactory, StartContext, StartResult, Writer,
31};
32use crate::BackbonePeerStateEntry;
33
34#[allow(dead_code)]
36const HW_MTU: usize = 1_048_576;
37
38#[derive(Debug, Clone)]
40pub struct BackboneConfig {
41 pub name: String,
42 pub listen_ip: String,
43 pub listen_port: u16,
44 pub interface_id: InterfaceId,
45 pub max_connections: Option<usize>,
46 pub idle_timeout: Option<Duration>,
47 pub write_stall_timeout: Option<Duration>,
48 pub abuse: BackboneAbuseConfig,
49 pub ingress_control: IngressControlConfig,
50 pub runtime: Arc<Mutex<BackboneServerRuntime>>,
51 pub peer_state: Arc<Mutex<BackbonePeerMonitor>>,
52}
53
54#[derive(Debug, Clone, Default)]
56pub struct BackboneAbuseConfig {
57 pub max_penalty_duration: Option<Duration>,
58}
59
60#[derive(Debug, Clone)]
62pub struct BackboneServerRuntime {
63 pub max_connections: Option<usize>,
64 pub idle_timeout: Option<Duration>,
65 pub write_stall_timeout: Option<Duration>,
66 pub abuse: BackboneAbuseConfig,
67}
68
69impl BackboneServerRuntime {
70 pub fn from_config(config: &BackboneConfig) -> Self {
71 Self {
72 max_connections: config.max_connections,
73 idle_timeout: config.idle_timeout,
74 write_stall_timeout: config.write_stall_timeout,
75 abuse: config.abuse.clone(),
76 }
77 }
78}
79
80#[derive(Debug, Clone)]
81pub struct BackboneRuntimeConfigHandle {
82 pub interface_name: String,
83 pub runtime: Arc<Mutex<BackboneServerRuntime>>,
84 pub startup: BackboneServerRuntime,
85}
86
87#[derive(Debug, Clone)]
88pub struct BackbonePeerStateHandle {
89 pub interface_id: InterfaceId,
90 pub interface_name: String,
91 pub peer_state: Arc<Mutex<BackbonePeerMonitor>>,
92}
93
94impl Default for BackboneConfig {
95 fn default() -> Self {
96 let mut config = BackboneConfig {
97 name: String::new(),
98 listen_ip: "0.0.0.0".into(),
99 listen_port: 0,
100 interface_id: InterfaceId(0),
101 max_connections: None,
102 idle_timeout: None,
103 write_stall_timeout: None,
104 abuse: BackboneAbuseConfig::default(),
105 ingress_control: IngressControlConfig::enabled(),
106 runtime: Arc::new(Mutex::new(BackboneServerRuntime {
107 max_connections: None,
108 idle_timeout: None,
109 write_stall_timeout: None,
110 abuse: BackboneAbuseConfig::default(),
111 })),
112 peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
113 };
114 let startup = BackboneServerRuntime::from_config(&config);
115 config.runtime = Arc::new(Mutex::new(startup));
116 config
117 }
118}
119
120const MAX_PENDING_BYTES: usize = 512 * 1024;
123
124struct BackboneWriter {
126 stream: TcpStream,
127 runtime: Arc<Mutex<BackboneServerRuntime>>,
128 interface_name: String,
129 interface_id: InterfaceId,
130 event_tx: EventSender,
131 pending: Vec<u8>,
132 stall_started: Option<Instant>,
133 disconnect_notified: bool,
134 write_stall_flag: Arc<AtomicBool>,
135}
136
137impl Writer for BackboneWriter {
138 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
139 let write_stall_timeout =
140 lock_or_recover(&self.runtime, "backbone runtime").write_stall_timeout;
141 if !self.pending.is_empty() {
142 self.flush_pending(write_stall_timeout)?;
143 if !self.pending.is_empty() {
144 return Err(io::Error::new(
145 io::ErrorKind::WouldBlock,
146 "backbone writer still stalled",
147 ));
148 }
149 }
150
151 let frame = hdlc::frame(data);
152 self.write_buffer(&frame, write_stall_timeout)
153 }
154}
155
156impl BackboneWriter {
157 fn write_buffer(
158 &mut self,
159 data: &[u8],
160 write_stall_timeout: Option<Duration>,
161 ) -> io::Result<()> {
162 let mut written = 0usize;
163 while written < data.len() {
164 match self.stream.write(&data[written..]) {
165 Ok(0) => {
166 return Err(io::Error::new(
167 io::ErrorKind::WriteZero,
168 "backbone writer wrote zero bytes",
169 ))
170 }
171 Ok(n) => {
172 written += n;
173 self.stall_started = None;
174 }
175 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
176 let now = Instant::now();
177 let started = self.stall_started.get_or_insert(now);
178 if let Some(timeout) = write_stall_timeout {
179 if now.duration_since(*started) >= timeout {
180 return Err(self.disconnect_for_write_stall(timeout));
181 }
182 }
183 if self.pending.len() + data[written..].len() > MAX_PENDING_BYTES {
184 return Err(self.disconnect_for_write_stall(
185 write_stall_timeout.unwrap_or(Duration::from_secs(30)),
186 ));
187 }
188 self.pending.extend_from_slice(&data[written..]);
189 return Err(io::Error::new(
190 io::ErrorKind::WouldBlock,
191 "backbone writer would block",
192 ));
193 }
194 Err(e) => return Err(e),
195 }
196 }
197 Ok(())
198 }
199
200 fn flush_pending(&mut self, write_stall_timeout: Option<Duration>) -> io::Result<()> {
201 if self.pending.is_empty() {
202 return Ok(());
203 }
204
205 let pending = std::mem::take(&mut self.pending);
206 match self.write_buffer(&pending, write_stall_timeout) {
207 Ok(()) => Ok(()),
208 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(()),
209 Err(e) => Err(e),
210 }
211 }
212
213 fn disconnect_for_write_stall(&mut self, timeout: Duration) -> io::Error {
214 if !self.disconnect_notified {
215 log::warn!(
216 "[{}] backbone client {} disconnected due to write stall timeout ({:?})",
217 self.interface_name,
218 self.interface_id.0,
219 timeout
220 );
221 self.write_stall_flag.store(true, Ordering::Relaxed);
222 let _ = self.stream.shutdown(Shutdown::Both);
223 let _ = self.event_tx.send(Event::InterfaceDown(self.interface_id));
224 self.disconnect_notified = true;
225 }
226 io::Error::new(
227 io::ErrorKind::TimedOut,
228 format!("backbone writer stalled for {:?}", timeout),
229 )
230 }
231}
232
233pub fn start(config: BackboneConfig, tx: EventSender, next_id: Arc<AtomicU64>) -> io::Result<()> {
235 let addr = format!("{}:{}", config.listen_ip, config.listen_port);
236 let listener = TcpListener::bind(&addr)?;
237 listener.set_nonblocking(true)?;
238
239 log::info!(
240 "[{}] backbone server listening on {}",
241 config.name,
242 listener
243 .local_addr()
244 .unwrap_or_else(|_| std::net::SocketAddr::from(([0, 0, 0, 0], config.listen_port)))
245 );
246
247 let name = config.name.clone();
248 let server_interface_id = config.interface_id;
249 let runtime = Arc::clone(&config.runtime);
250 let peer_state = Arc::clone(&config.peer_state);
251 let ingress_control = config.ingress_control;
252 thread::Builder::new()
253 .name(format!("backbone-poll-{}", config.interface_id.0))
254 .spawn(move || {
255 if let Err(e) = poll_loop(
256 listener,
257 name,
258 server_interface_id,
259 tx,
260 next_id,
261 runtime,
262 peer_state,
263 ingress_control,
264 ) {
265 log::error!("backbone poll loop error: {}", e);
266 }
267 })?;
268
269 Ok(())
270}
271
272struct ClientState {
274 id: InterfaceId,
275 peer_ip: IpAddr,
276 peer_port: u16,
277 stream: TcpStream,
278 decoder: hdlc::Decoder,
279 connected_at: Instant,
280 has_received_data: bool,
281 write_stall_flag: Arc<AtomicBool>,
282}
283
284#[derive(Debug, Clone)]
285struct PeerBehaviorState {
286 blacklisted_until: Option<Instant>,
287 blacklist_reason: Option<String>,
288 reject_count: u64,
289 connected_count: usize,
290}
291
292impl PeerBehaviorState {
293 fn new() -> Self {
294 Self {
295 blacklisted_until: None,
296 blacklist_reason: None,
297 reject_count: 0,
298 connected_count: 0,
299 }
300 }
301}
302
303#[derive(Debug, Clone, Default)]
304pub struct BackbonePeerMonitor {
305 peers: HashMap<IpAddr, PeerBehaviorState>,
306}
307
308impl BackbonePeerMonitor {
309 pub fn new() -> Self {
310 Self {
311 peers: HashMap::new(),
312 }
313 }
314
315 fn upsert_snapshot(&mut self, peers: &HashMap<IpAddr, PeerBehaviorState>) {
316 let mut merged = self.peers.clone();
317
318 for (peer_ip, state) in peers {
319 let entry = merged
320 .entry(*peer_ip)
321 .or_insert_with(PeerBehaviorState::new);
322 entry.connected_count = state.connected_count;
323 entry.reject_count = state.reject_count;
324 if state.blacklisted_until.is_some() {
325 entry.blacklisted_until = state.blacklisted_until;
326 entry.blacklist_reason = state.blacklist_reason.clone();
327 }
328 }
329
330 merged.retain(|peer_ip, state| {
331 peers.contains_key(peer_ip)
332 || state.blacklisted_until.is_some()
333 || state.reject_count > 0
334 });
335 self.peers = merged;
336 }
337
338 fn sync_into(&self, peers: &mut HashMap<IpAddr, PeerBehaviorState>) {
339 for (peer_ip, state) in &self.peers {
340 let entry = peers.entry(*peer_ip).or_insert_with(PeerBehaviorState::new);
341 entry.blacklisted_until = state.blacklisted_until;
342 entry.blacklist_reason = state.blacklist_reason.clone();
343 entry.reject_count = state.reject_count;
344 }
345
346 peers.retain(|peer_ip, state| {
347 if state.connected_count > 0 {
348 return true;
349 }
350 self.peers.contains_key(peer_ip)
351 });
352 }
353
354 pub fn list(&self, interface_name: &str) -> Vec<BackbonePeerStateEntry> {
355 let now = Instant::now();
356 let mut entries: Vec<BackbonePeerStateEntry> = self
357 .peers
358 .iter()
359 .map(|(peer_ip, state)| BackbonePeerStateEntry {
360 interface_name: interface_name.to_string(),
361 peer_ip: *peer_ip,
362 connected_count: state.connected_count,
363 blacklisted_remaining_secs: state
364 .blacklisted_until
365 .and_then(|until| (until > now).then(|| (until - now).as_secs_f64())),
366 blacklist_reason: state.blacklist_reason.clone(),
367 reject_count: state.reject_count,
368 })
369 .collect();
370 entries.sort_by(|a, b| a.peer_ip.cmp(&b.peer_ip));
371 entries
372 }
373
374 pub fn clear(&mut self, peer_ip: IpAddr) -> bool {
375 self.peers.remove(&peer_ip).is_some()
376 }
377
378 pub fn blacklist(&mut self, peer_ip: IpAddr, duration: Duration, reason: String) -> bool {
379 let state = self
380 .peers
381 .entry(peer_ip)
382 .or_insert_with(PeerBehaviorState::new);
383 state.blacklisted_until = Some(Instant::now() + duration);
384 state.blacklist_reason = Some(reason);
385 true
386 }
387
388 #[cfg(test)]
389 pub fn seed_entry(&mut self, entry: BackbonePeerStateEntry) {
390 let mut state = PeerBehaviorState::new();
391 state.connected_count = entry.connected_count;
392 state.reject_count = entry.reject_count;
393 state.blacklist_reason = entry.blacklist_reason;
394 if let Some(remaining) = entry.blacklisted_remaining_secs {
395 state.blacklisted_until = Some(Instant::now() + Duration::from_secs_f64(remaining));
396 }
397 self.peers.insert(entry.peer_ip, state);
398 }
399}
400
401#[derive(Clone, Copy)]
402enum DisconnectReason {
403 RemoteClosed,
404 IdleTimeout,
405 WriteStall,
406}
407
408fn poll_loop(
410 listener: TcpListener,
411 name: String,
412 server_interface_id: InterfaceId,
413 tx: EventSender,
414 next_id: Arc<AtomicU64>,
415 runtime: Arc<Mutex<BackboneServerRuntime>>,
416 peer_state: Arc<Mutex<BackbonePeerMonitor>>,
417 ingress_control: IngressControlConfig,
418) -> io::Result<()> {
419 let poller = Poller::new()?;
420
421 const LISTENER_KEY: usize = 0;
422
423 unsafe { poller.add(&listener, PollEvent::readable(LISTENER_KEY))? };
425
426 let mut clients: HashMap<usize, ClientState> = HashMap::new();
427 let mut peers: HashMap<IpAddr, PeerBehaviorState> = HashMap::new();
428 let mut events = Events::new();
429 let mut next_key: usize = 1;
430
431 loop {
432 let runtime_snapshot = runtime.lock().unwrap().clone();
433 let max_connections = runtime_snapshot.max_connections;
434 let idle_timeout = runtime_snapshot.idle_timeout;
435 cleanup_peer_state(&mut peers);
436 {
437 let mut monitor = peer_state.lock().unwrap();
438 monitor.sync_into(&mut peers);
439 monitor.upsert_snapshot(&peers);
440 }
441
442 events.clear();
443 poller.wait(&mut events, Some(Duration::from_secs(1)))?;
444
445 for ev in events.iter() {
446 if ev.key == LISTENER_KEY {
447 loop {
449 match listener.accept() {
450 Ok((stream, peer_addr)) => {
451 let peer_ip = peer_addr.ip();
452 let peer_port = peer_addr.port();
453
454 if is_ip_blacklisted(&mut peers, peer_ip) {
455 if let Some(state) = peers.get_mut(&peer_ip) {
456 state.reject_count = state.reject_count.saturating_add(1);
457 }
458 peer_state.lock().unwrap().upsert_snapshot(&peers);
459 log::debug!("[{}] rejecting blacklisted peer {}", name, peer_addr);
460 drop(stream);
461 continue;
462 }
463
464 if let Some(max) = max_connections {
465 if clients.len() >= max {
466 log::warn!(
467 "[{}] max connections ({}) reached, rejecting {}",
468 name,
469 max,
470 peer_addr
471 );
472 drop(stream);
473 continue;
474 }
475 }
476
477 stream.set_nonblocking(true).ok();
478 stream.set_nodelay(true).ok();
479 set_tcp_keepalive(&stream).ok();
480
481 #[cfg(target_os = "macos")]
483 {
484 let sock = SockRef::from(&stream);
485 sock.set_nosigpipe(true).ok();
486 }
487
488 let key = next_key;
489 next_key += 1;
490 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
491
492 log::info!(
493 "[{}] backbone client connected: {} → id {}",
494 name,
495 peer_addr,
496 client_id.0
497 );
498
499 if let Err(e) = unsafe { poller.add(&stream, PollEvent::readable(key)) }
502 {
503 log::warn!("[{}] failed to add client to poller: {}", name, e);
504 continue; }
506
507 let writer_stream = match stream.try_clone() {
509 Ok(s) => s,
510 Err(e) => {
511 log::warn!("[{}] failed to clone client stream: {}", name, e);
512 let _ = poller.delete(&stream);
513 continue; }
515 };
516 let write_stall_flag = Arc::new(AtomicBool::new(false));
517 let writer: Box<dyn Writer> = Box::new(BackboneWriter {
518 stream: writer_stream,
519 runtime: Arc::clone(&runtime),
520 interface_name: name.clone(),
521 interface_id: client_id,
522 event_tx: tx.clone(),
523 pending: Vec::new(),
524 stall_started: None,
525 disconnect_notified: false,
526 write_stall_flag: Arc::clone(&write_stall_flag),
527 });
528
529 clients.insert(
530 key,
531 ClientState {
532 id: client_id,
533 peer_ip,
534 peer_port,
535 stream,
536 decoder: hdlc::Decoder::new(),
537 connected_at: Instant::now(),
538 has_received_data: false,
539 write_stall_flag,
540 },
541 );
542 peers
543 .entry(peer_ip)
544 .or_insert_with(PeerBehaviorState::new)
545 .connected_count += 1;
546 peer_state.lock().unwrap().upsert_snapshot(&peers);
547 let _ = tx.send(Event::BackbonePeerConnected {
548 server_interface_id,
549 peer_interface_id: client_id,
550 peer_ip,
551 peer_port,
552 });
553
554 let info = InterfaceInfo {
555 id: client_id,
556 name: format!("BackboneInterface/{}", client_id.0),
557 mode: constants::MODE_FULL,
558 out_capable: true,
559 in_capable: true,
560 bitrate: Some(1_000_000_000), airtime_profile: None,
562 announce_rate_target: None,
563 announce_rate_grace: 0,
564 announce_rate_penalty: 0.0,
565 announce_cap: constants::ANNOUNCE_CAP,
566 is_local_client: false,
567 wants_tunnel: false,
568 tunnel_id: None,
569 mtu: 65535,
570 ia_freq: 0.0,
571 ip_freq: 0.0,
572 op_freq: 0.0,
573 op_samples: 0,
574 started: 0.0,
575 ingress_control,
576 };
577
578 if tx
579 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
580 .is_err()
581 {
582 cleanup(&poller, &clients, &listener);
584 return Ok(());
585 }
586 }
587 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
588 Err(e) => {
589 log::warn!("[{}] accept error: {}", name, e);
590 break;
591 }
592 }
593 }
594 poller.modify(&listener, PollEvent::readable(LISTENER_KEY))?;
596 } else if clients.contains_key(&ev.key) {
597 let key = ev.key;
598 let mut should_remove = false;
599 let mut client_id = InterfaceId(0);
600
601 let mut buf = [0u8; 4096];
602 let read_result = {
603 let client = clients.get_mut(&key).unwrap();
604 client.stream.read(&mut buf)
605 };
606
607 match read_result {
608 Ok(0) | Err(_) => {
609 if let Some(c) = clients.get(&key) {
610 client_id = c.id;
611 }
612 should_remove = true;
613 }
614 Ok(n) => {
615 let client = clients.get_mut(&key).unwrap();
616 client_id = client.id;
617 client.has_received_data = true;
618 for frame in client.decoder.feed(&buf[..n]) {
619 if tx
620 .send(Event::Frame {
621 interface_id: client_id,
622 data: frame,
623 rssi: None,
624 snr: None,
625 })
626 .is_err()
627 {
628 cleanup(&poller, &clients, &listener);
629 return Ok(());
630 }
631 }
632 }
633 }
634
635 if should_remove {
636 let reason = if clients
637 .get(&key)
638 .is_some_and(|c| c.write_stall_flag.load(Ordering::Relaxed))
639 {
640 DisconnectReason::WriteStall
641 } else {
642 DisconnectReason::RemoteClosed
643 };
644 disconnect_client(
645 &poller,
646 &mut clients,
647 &mut peers,
648 &name,
649 server_interface_id,
650 &tx,
651 &peer_state,
652 key,
653 client_id,
654 reason,
655 );
656 } else if let Some(client) = clients.get(&key) {
657 poller.modify(&client.stream, PollEvent::readable(key))?;
659 }
660 }
661 }
662
663 if let Some(timeout) = idle_timeout {
664 let now = Instant::now();
665 let timed_out: Vec<(usize, InterfaceId)> = clients
666 .iter()
667 .filter_map(|(&key, client)| {
668 if client.has_received_data || now.duration_since(client.connected_at) < timeout
669 {
670 None
671 } else {
672 Some((key, client.id))
673 }
674 })
675 .collect();
676
677 for (key, client_id) in timed_out {
678 disconnect_client(
679 &poller,
680 &mut clients,
681 &mut peers,
682 &name,
683 server_interface_id,
684 &tx,
685 &peer_state,
686 key,
687 client_id,
688 DisconnectReason::IdleTimeout,
689 );
690 }
691 }
692 }
693}
694
695fn cleanup_peer_state(peers: &mut HashMap<IpAddr, PeerBehaviorState>) {
696 let now = Instant::now();
697 peers.retain(|_, state| {
698 if matches!(state.blacklisted_until, Some(until) if now >= until) {
699 state.blacklisted_until = None;
700 state.blacklist_reason = None;
701 }
702 state.blacklisted_until.is_some() || state.connected_count > 0 || state.reject_count > 0
703 });
704}
705
706fn is_ip_blacklisted(peers: &mut HashMap<IpAddr, PeerBehaviorState>, peer_ip: IpAddr) -> bool {
707 let now = Instant::now();
708 if let Some(state) = peers.get_mut(&peer_ip) {
709 if let Some(until) = state.blacklisted_until {
710 if now < until {
711 return true;
712 }
713 state.blacklisted_until = None;
714 }
715 }
716 false
717}
718
719fn disconnect_client(
720 poller: &Poller,
721 clients: &mut HashMap<usize, ClientState>,
722 peers: &mut HashMap<IpAddr, PeerBehaviorState>,
723 name: &str,
724 server_interface_id: InterfaceId,
725 tx: &EventSender,
726 peer_state: &Arc<Mutex<BackbonePeerMonitor>>,
727 key: usize,
728 client_id: InterfaceId,
729 reason: DisconnectReason,
730) {
731 let Some(client) = clients.remove(&key) else {
732 return;
733 };
734
735 match reason {
736 DisconnectReason::RemoteClosed => {
737 log::info!("[{}] backbone client {} disconnected", name, client_id.0);
738 }
739 DisconnectReason::IdleTimeout => {
740 log::info!(
741 "[{}] backbone client {} disconnected due to idle timeout",
742 name,
743 client_id.0
744 );
745 }
746 DisconnectReason::WriteStall => {
747 }
749 }
750
751 let _ = poller.delete(&client.stream);
752 let connected_for = client.connected_at.elapsed();
754 let _ = tx.send(Event::BackbonePeerDisconnected {
755 server_interface_id,
756 peer_interface_id: client.id,
757 peer_ip: client.peer_ip,
758 peer_port: client.peer_port,
759 connected_for,
760 had_received_data: client.has_received_data,
761 });
762 match reason {
763 DisconnectReason::IdleTimeout => {
764 let _ = tx.send(Event::BackbonePeerIdleTimeout {
765 server_interface_id,
766 peer_interface_id: client.id,
767 peer_ip: client.peer_ip,
768 peer_port: client.peer_port,
769 connected_for,
770 });
771 }
772 DisconnectReason::WriteStall => {
773 let _ = tx.send(Event::BackbonePeerWriteStall {
774 server_interface_id,
775 peer_interface_id: client.id,
776 peer_ip: client.peer_ip,
777 peer_port: client.peer_port,
778 connected_for,
779 });
780 }
781 DisconnectReason::RemoteClosed => {}
782 }
783
784 if let Some(state) = peers.get_mut(&client.peer_ip) {
785 state.connected_count = state.connected_count.saturating_sub(1);
786 }
787 peer_state.lock().unwrap().upsert_snapshot(peers);
788 if !matches!(reason, DisconnectReason::WriteStall) {
790 let _ = tx.send(Event::InterfaceDown(client_id));
791 }
792}
793
794fn set_tcp_keepalive(stream: &TcpStream) -> io::Result<()> {
795 let sock = SockRef::from(stream);
796 let mut keepalive = TcpKeepalive::new()
797 .with_time(Duration::from_secs(5))
798 .with_interval(Duration::from_secs(2));
799 #[cfg(any(target_os = "linux", target_os = "macos"))]
800 {
801 keepalive = keepalive.with_retries(12);
802 }
803 sock.set_tcp_keepalive(&keepalive)
804}
805
806fn cleanup(poller: &Poller, clients: &HashMap<usize, ClientState>, listener: &TcpListener) {
807 for (_, client) in clients {
808 let _ = poller.delete(&client.stream);
809 }
810 let _ = poller.delete(listener);
811}
812
813#[derive(Debug, Clone)]
819pub struct BackboneClientConfig {
820 pub name: String,
821 pub target_host: String,
822 pub target_port: u16,
823 pub interface_id: InterfaceId,
824 pub reconnect_wait: Duration,
825 pub max_reconnect_tries: Option<u32>,
826 pub connect_timeout: Duration,
827 pub transport_identity: Option<String>,
828 pub runtime: Arc<Mutex<BackboneClientRuntime>>,
829}
830
831#[derive(Debug, Clone)]
832pub struct BackboneClientRuntime {
833 pub reconnect_wait: Duration,
834 pub max_reconnect_tries: Option<u32>,
835 pub connect_timeout: Duration,
836}
837
838impl BackboneClientRuntime {
839 pub fn from_config(config: &BackboneClientConfig) -> Self {
840 Self {
841 reconnect_wait: config.reconnect_wait,
842 max_reconnect_tries: config.max_reconnect_tries,
843 connect_timeout: config.connect_timeout,
844 }
845 }
846}
847
848#[derive(Debug, Clone)]
849pub struct BackboneClientRuntimeConfigHandle {
850 pub interface_name: String,
851 pub runtime: Arc<Mutex<BackboneClientRuntime>>,
852 pub startup: BackboneClientRuntime,
853}
854
855impl Default for BackboneClientConfig {
856 fn default() -> Self {
857 let mut config = BackboneClientConfig {
858 name: String::new(),
859 target_host: "127.0.0.1".into(),
860 target_port: 4242,
861 interface_id: InterfaceId(0),
862 reconnect_wait: Duration::from_secs(5),
863 max_reconnect_tries: None,
864 connect_timeout: Duration::from_secs(5),
865 transport_identity: None,
866 runtime: Arc::new(Mutex::new(BackboneClientRuntime {
867 reconnect_wait: Duration::from_secs(5),
868 max_reconnect_tries: None,
869 connect_timeout: Duration::from_secs(5),
870 })),
871 };
872 let startup = BackboneClientRuntime::from_config(&config);
873 config.runtime = Arc::new(Mutex::new(startup));
874 config
875 }
876}
877
878struct BackboneClientWriter {
880 stream: TcpStream,
881}
882
883impl Writer for BackboneClientWriter {
884 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
885 self.stream.write_all(&hdlc::frame(data))
886 }
887}
888
889fn try_connect_client(config: &BackboneClientConfig) -> io::Result<TcpStream> {
891 let runtime = config.runtime.lock().unwrap().clone();
892 let addr_str = format!("{}:{}", config.target_host, config.target_port);
893 let addr = addr_str
894 .to_socket_addrs()?
895 .next()
896 .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "no addresses resolved"))?;
897
898 let stream = TcpStream::connect_timeout(&addr, runtime.connect_timeout)?;
899 stream.set_nodelay(true)?;
900 set_tcp_keepalive(&stream).ok();
901
902 #[cfg(target_os = "macos")]
904 {
905 let sock = SockRef::from(&stream);
906 sock.set_nosigpipe(true).ok();
907 }
908
909 Ok(stream)
910}
911
912pub fn start_client(config: BackboneClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
914 let stream = try_connect_client(&config)?;
915 let reader_stream = stream.try_clone()?;
916 let writer_stream = stream.try_clone()?;
917
918 let id = config.interface_id;
919 log::info!(
920 "[{}] backbone client connected to {}:{}",
921 config.name,
922 config.target_host,
923 config.target_port
924 );
925
926 let _ = tx.send(Event::InterfaceUp(id, None, None));
928
929 thread::Builder::new()
930 .name(format!("backbone-client-{}", id.0))
931 .spawn(move || {
932 client_reader_loop(reader_stream, config, tx);
933 })?;
934
935 Ok(Box::new(BackboneClientWriter {
936 stream: writer_stream,
937 }))
938}
939
940fn client_reader_loop(mut stream: TcpStream, config: BackboneClientConfig, tx: EventSender) {
943 let id = config.interface_id;
944 let mut decoder = hdlc::Decoder::new();
945 let mut buf = [0u8; 4096];
946
947 loop {
948 match stream.read(&mut buf) {
949 Ok(0) => {
950 log::warn!("[{}] connection closed", config.name);
951 let _ = tx.send(Event::InterfaceDown(id));
952 match client_reconnect(&config, &tx) {
953 Some(new_stream) => {
954 stream = new_stream;
955 decoder = hdlc::Decoder::new();
956 continue;
957 }
958 None => {
959 log::error!("[{}] reconnection failed, giving up", config.name);
960 return;
961 }
962 }
963 }
964 Ok(n) => {
965 for frame in decoder.feed(&buf[..n]) {
966 if tx
967 .send(Event::Frame {
968 interface_id: id,
969 data: frame,
970 rssi: None,
971 snr: None,
972 })
973 .is_err()
974 {
975 return;
976 }
977 }
978 }
979 Err(e) => {
980 log::warn!("[{}] read error: {}", config.name, e);
981 let _ = tx.send(Event::InterfaceDown(id));
982 match client_reconnect(&config, &tx) {
983 Some(new_stream) => {
984 stream = new_stream;
985 decoder = hdlc::Decoder::new();
986 continue;
987 }
988 None => {
989 log::error!("[{}] reconnection failed, giving up", config.name);
990 return;
991 }
992 }
993 }
994 }
995 }
996}
997
998const MAX_BACKOFF_SHIFT: u32 = 6;
1001
1002fn client_reconnect(config: &BackboneClientConfig, tx: &EventSender) -> Option<TcpStream> {
1006 let mut attempts = 0u32;
1007 loop {
1008 let runtime = config.runtime.lock().unwrap().clone();
1009
1010 let shift = attempts.min(MAX_BACKOFF_SHIFT);
1011 let backoff = runtime.reconnect_wait * 2u32.pow(shift);
1012 let jitter_range = backoff / 4;
1014 let jitter = if jitter_range.as_nanos() > 0 {
1015 let offset = Duration::from_nanos(
1016 (std::hash::RandomState::new().build_hasher().finish()
1017 % jitter_range.as_nanos() as u64)
1018 * 2,
1019 );
1020 if offset > jitter_range {
1021 backoff + (offset - jitter_range)
1022 } else {
1023 backoff - (jitter_range - offset)
1024 }
1025 } else {
1026 backoff
1027 };
1028 thread::sleep(jitter);
1029
1030 attempts += 1;
1031
1032 if let Some(max) = runtime.max_reconnect_tries {
1033 if attempts > max {
1034 let _ = tx.send(Event::InterfaceDown(config.interface_id));
1035 return None;
1036 }
1037 }
1038
1039 log::info!(
1040 "[{}] reconnect attempt {} (backoff {:.1}s) ...",
1041 config.name,
1042 attempts,
1043 jitter.as_secs_f64(),
1044 );
1045
1046 match try_connect_client(config) {
1047 Ok(new_stream) => {
1048 let writer_stream = match new_stream.try_clone() {
1049 Ok(s) => s,
1050 Err(e) => {
1051 log::warn!("[{}] failed to clone stream: {}", config.name, e);
1052 continue;
1053 }
1054 };
1055 log::info!(
1056 "[{}] reconnected after {} attempt(s)",
1057 config.name,
1058 attempts
1059 );
1060 let new_writer: Box<dyn Writer> = Box::new(BackboneClientWriter {
1061 stream: writer_stream,
1062 });
1063 let _ = tx.send(Event::InterfaceUp(
1064 config.interface_id,
1065 Some(new_writer),
1066 None,
1067 ));
1068 return Some(new_stream);
1069 }
1070 Err(e) => {
1071 log::warn!("[{}] reconnect failed: {}", config.name, e);
1072 }
1073 }
1074 }
1075}
1076
1077#[derive(Clone)]
1084pub(crate) enum BackboneMode {
1085 Server(BackboneConfig),
1086 Client(BackboneClientConfig, u8),
1087}
1088
1089pub struct BackboneInterfaceFactory;
1095
1096fn parse_positive_duration_secs(params: &HashMap<String, String>, key: &str) -> Option<Duration> {
1097 params
1098 .get(key)
1099 .and_then(|v| v.parse::<f64>().ok())
1100 .filter(|v| *v > 0.0)
1101 .map(Duration::from_secs_f64)
1102}
1103
1104impl InterfaceFactory for BackboneInterfaceFactory {
1105 fn type_name(&self) -> &str {
1106 "BackboneInterface"
1107 }
1108
1109 fn parse_config(
1110 &self,
1111 name: &str,
1112 id: InterfaceId,
1113 params: &HashMap<String, String>,
1114 ) -> Result<Box<dyn InterfaceConfigData>, String> {
1115 if let Some(target_host) = params.get("remote").or_else(|| params.get("target_host")) {
1116 let target_host = target_host.clone();
1118 let target_port = params
1119 .get("target_port")
1120 .or_else(|| params.get("port"))
1121 .and_then(|v| v.parse().ok())
1122 .unwrap_or(4242);
1123 let transport_identity = params.get("transport_identity").cloned();
1124 let priority = match params.get("priority") {
1125 Some(value) => value.parse::<u8>().map_err(|_| {
1126 format!(
1127 "invalid Backbone peer priority '{}' (expected 0..100)",
1128 value
1129 )
1130 })?,
1131 None => crate::driver::BACKBONE_PEER_POOL_CONFIGURED_DEFAULT_PRIORITY,
1132 };
1133 if priority > 100 {
1134 return Err(format!(
1135 "invalid Backbone peer priority '{}' (expected 0..100)",
1136 priority
1137 ));
1138 }
1139 Ok(Box::new(BackboneMode::Client(
1140 BackboneClientConfig {
1141 name: name.to_string(),
1142 target_host,
1143 target_port,
1144 interface_id: id,
1145 transport_identity,
1146 ..BackboneClientConfig::default()
1147 },
1148 priority,
1149 )))
1150 } else {
1151 let listen_ip = params
1153 .get("listen_ip")
1154 .or_else(|| params.get("device"))
1155 .cloned()
1156 .unwrap_or_else(|| "0.0.0.0".into());
1157 let listen_port = params
1158 .get("listen_port")
1159 .or_else(|| params.get("port"))
1160 .and_then(|v| v.parse().ok())
1161 .unwrap_or(4242);
1162 let max_connections = params.get("max_connections").and_then(|v| v.parse().ok());
1163 let idle_timeout = parse_positive_duration_secs(params, "idle_timeout");
1164 let write_stall_timeout = parse_positive_duration_secs(params, "write_stall_timeout");
1165 let abuse = BackboneAbuseConfig {
1166 max_penalty_duration: parse_positive_duration_secs(params, "max_penalty_duration"),
1167 };
1168 let mut config = BackboneConfig {
1169 name: name.to_string(),
1170 listen_ip,
1171 listen_port,
1172 interface_id: id,
1173 max_connections,
1174 idle_timeout,
1175 write_stall_timeout,
1176 abuse,
1177 ingress_control: IngressControlConfig::enabled(),
1178 runtime: Arc::new(Mutex::new(BackboneServerRuntime {
1179 max_connections: None,
1180 idle_timeout: None,
1181 write_stall_timeout: None,
1182 abuse: BackboneAbuseConfig::default(),
1183 })),
1184 peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
1185 };
1186 let startup = BackboneServerRuntime::from_config(&config);
1187 config.runtime = Arc::new(Mutex::new(startup));
1188 Ok(Box::new(BackboneMode::Server(config)))
1189 }
1190 }
1191
1192 fn start(
1193 &self,
1194 config: Box<dyn InterfaceConfigData>,
1195 ctx: StartContext,
1196 ) -> io::Result<StartResult> {
1197 let mode = *config.into_any().downcast::<BackboneMode>().map_err(|_| {
1198 io::Error::new(
1199 io::ErrorKind::InvalidData,
1200 "wrong config type for BackboneInterface",
1201 )
1202 })?;
1203
1204 match mode {
1205 BackboneMode::Client(cfg, _) => {
1206 let id = cfg.interface_id;
1207 let name = cfg.name.clone();
1208 let info = InterfaceInfo {
1209 id,
1210 name,
1211 mode: ctx.mode,
1212 out_capable: true,
1213 in_capable: true,
1214 bitrate: Some(1_000_000_000),
1215 airtime_profile: None,
1216 announce_rate_target: None,
1217 announce_rate_grace: 0,
1218 announce_rate_penalty: 0.0,
1219 announce_cap: constants::ANNOUNCE_CAP,
1220 is_local_client: false,
1221 wants_tunnel: false,
1222 tunnel_id: None,
1223 mtu: 65535,
1224 ingress_control: ctx.ingress_control,
1225 ia_freq: 0.0,
1226 ip_freq: 0.0,
1227 op_freq: 0.0,
1228 op_samples: 0,
1229 started: crate::time::now(),
1230 };
1231 let writer = start_client(cfg, ctx.tx)?;
1232 Ok(StartResult::Simple {
1233 id,
1234 info,
1235 writer,
1236 interface_type_name: "BackboneInterface".to_string(),
1237 })
1238 }
1239 BackboneMode::Server(mut cfg) => {
1240 cfg.ingress_control = ctx.ingress_control;
1241 start(cfg, ctx.tx, ctx.next_dynamic_id)?;
1242 Ok(StartResult::Listener { control: None })
1243 }
1244 }
1245 }
1246}
1247
1248pub(crate) fn runtime_handle_from_mode(mode: &BackboneMode) -> Option<BackboneRuntimeConfigHandle> {
1249 match mode {
1250 BackboneMode::Server(config) => Some(BackboneRuntimeConfigHandle {
1251 interface_name: config.name.clone(),
1252 runtime: Arc::clone(&config.runtime),
1253 startup: BackboneServerRuntime::from_config(config),
1254 }),
1255 BackboneMode::Client(_, _) => None,
1256 }
1257}
1258
1259pub(crate) fn peer_state_handle_from_mode(mode: &BackboneMode) -> Option<BackbonePeerStateHandle> {
1260 match mode {
1261 BackboneMode::Server(config) => Some(BackbonePeerStateHandle {
1262 interface_id: config.interface_id,
1263 interface_name: config.name.clone(),
1264 peer_state: Arc::clone(&config.peer_state),
1265 }),
1266 BackboneMode::Client(_, _) => None,
1267 }
1268}
1269
1270pub(crate) fn client_runtime_handle_from_mode(
1271 mode: &BackboneMode,
1272) -> Option<BackboneClientRuntimeConfigHandle> {
1273 match mode {
1274 BackboneMode::Client(config, _) => Some(BackboneClientRuntimeConfigHandle {
1275 interface_name: config.name.clone(),
1276 runtime: Arc::clone(&config.runtime),
1277 startup: BackboneClientRuntime::from_config(config),
1278 }),
1279 BackboneMode::Server(_) => None,
1280 }
1281}
1282
1283pub(crate) fn client_config_from_mode(mode: &BackboneMode) -> Option<BackboneClientConfig> {
1284 match mode {
1285 BackboneMode::Client(config, _) => Some(config.clone()),
1286 BackboneMode::Server(_) => None,
1287 }
1288}
1289
1290pub(crate) fn client_priority_from_mode(mode: &BackboneMode) -> Option<u8> {
1291 match mode {
1292 BackboneMode::Client(_, priority) => Some(*priority),
1293 BackboneMode::Server(_) => None,
1294 }
1295}
1296
1297#[cfg(test)]
1298mod tests {
1299 use super::*;
1300 use std::sync::mpsc;
1301 use std::time::Duration;
1302
1303 fn find_free_port() -> u16 {
1304 TcpListener::bind("127.0.0.1:0")
1305 .unwrap()
1306 .local_addr()
1307 .unwrap()
1308 .port()
1309 }
1310
1311 fn recv_non_peer_event(
1312 rx: &mpsc::Receiver<Event>,
1313 timeout: Duration,
1314 ) -> Result<Event, mpsc::RecvTimeoutError> {
1315 let deadline = Instant::now() + timeout;
1316 loop {
1317 let remaining = deadline.saturating_duration_since(Instant::now());
1318 if remaining.is_zero() {
1319 return Err(mpsc::RecvTimeoutError::Timeout);
1320 }
1321 let event = rx.recv_timeout(remaining)?;
1322 match event {
1323 Event::BackbonePeerConnected { .. }
1324 | Event::BackbonePeerDisconnected { .. }
1325 | Event::BackbonePeerIdleTimeout { .. }
1326 | Event::BackbonePeerWriteStall { .. }
1327 | Event::BackbonePeerPenalty { .. } => continue,
1328 other => return Ok(other),
1329 }
1330 }
1331 }
1332
1333 fn make_server_config(
1334 port: u16,
1335 interface_id: u64,
1336 max_connections: Option<usize>,
1337 idle_timeout: Option<Duration>,
1338 write_stall_timeout: Option<Duration>,
1339 abuse: BackboneAbuseConfig,
1340 ) -> BackboneConfig {
1341 let mut config = BackboneConfig {
1342 name: "test-backbone".into(),
1343 listen_ip: "127.0.0.1".into(),
1344 listen_port: port,
1345 interface_id: InterfaceId(interface_id),
1346 max_connections,
1347 idle_timeout,
1348 write_stall_timeout,
1349 abuse,
1350 ingress_control: IngressControlConfig::enabled(),
1351 runtime: Arc::new(Mutex::new(BackboneServerRuntime {
1352 max_connections: None,
1353 idle_timeout: None,
1354 write_stall_timeout: None,
1355 abuse: BackboneAbuseConfig::default(),
1356 })),
1357 peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
1358 };
1359 let startup = BackboneServerRuntime::from_config(&config);
1360 config.runtime = Arc::new(Mutex::new(startup));
1361 config
1362 }
1363
1364 #[test]
1365 fn backbone_accept_connection() {
1366 let port = find_free_port();
1367 let (tx, rx) = crate::event::channel();
1368 let next_id = Arc::new(AtomicU64::new(8000));
1369
1370 let config = make_server_config(port, 80, None, None, None, BackboneAbuseConfig::default());
1371
1372 start(config, tx, next_id).unwrap();
1373 thread::sleep(Duration::from_millis(50));
1374
1375 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1376
1377 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1378 match event {
1379 Event::InterfaceUp(id, writer, info) => {
1380 assert_eq!(id, InterfaceId(8000));
1381 assert!(writer.is_some());
1382 assert!(info.is_some());
1383 let info = info.unwrap();
1384 assert!(info.out_capable);
1385 assert!(info.in_capable);
1386 }
1387 other => panic!("expected InterfaceUp, got {:?}", other),
1388 }
1389 }
1390
1391 #[test]
1392 fn backbone_receive_frame() {
1393 let port = find_free_port();
1394 let (tx, rx) = crate::event::channel();
1395 let next_id = Arc::new(AtomicU64::new(8100));
1396
1397 let config = make_server_config(port, 81, None, None, None, BackboneAbuseConfig::default());
1398
1399 start(config, tx, next_id).unwrap();
1400 thread::sleep(Duration::from_millis(50));
1401
1402 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1403
1404 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1406
1407 let payload: Vec<u8> = (0..32).collect();
1409 client.write_all(&hdlc::frame(&payload)).unwrap();
1410
1411 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1412 match event {
1413 Event::Frame {
1414 interface_id,
1415 data,
1416 rssi: _,
1417 snr: _,
1418 } => {
1419 assert_eq!(interface_id, InterfaceId(8100));
1420 assert_eq!(data, payload);
1421 }
1422 other => panic!("expected Frame, got {:?}", other),
1423 }
1424 }
1425
1426 #[test]
1427 fn backbone_send_to_client() {
1428 let port = find_free_port();
1429 let (tx, rx) = crate::event::channel();
1430 let next_id = Arc::new(AtomicU64::new(8200));
1431
1432 let config = make_server_config(port, 82, None, None, None, BackboneAbuseConfig::default());
1433
1434 start(config, tx, next_id).unwrap();
1435 thread::sleep(Duration::from_millis(50));
1436
1437 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1438 client
1439 .set_read_timeout(Some(Duration::from_secs(2)))
1440 .unwrap();
1441
1442 let event = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1444 let mut writer = match event {
1445 Event::InterfaceUp(_, Some(w), _) => w,
1446 other => panic!("expected InterfaceUp with writer, got {:?}", other),
1447 };
1448
1449 let payload: Vec<u8> = (0..24).collect();
1451 writer.send_frame(&payload).unwrap();
1452
1453 let mut buf = [0u8; 256];
1455 let n = client.read(&mut buf).unwrap();
1456 let expected = hdlc::frame(&payload);
1457 assert_eq!(&buf[..n], &expected[..]);
1458 }
1459
1460 #[test]
1461 fn backbone_multiple_clients() {
1462 let port = find_free_port();
1463 let (tx, rx) = crate::event::channel();
1464 let next_id = Arc::new(AtomicU64::new(8300));
1465
1466 let config = make_server_config(port, 83, None, None, None, BackboneAbuseConfig::default());
1467
1468 start(config, tx, next_id).unwrap();
1469 thread::sleep(Duration::from_millis(50));
1470
1471 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1472 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1473
1474 let mut ids = Vec::new();
1475 for _ in 0..2 {
1476 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1477 match event {
1478 Event::InterfaceUp(id, _, _) => ids.push(id),
1479 other => panic!("expected InterfaceUp, got {:?}", other),
1480 }
1481 }
1482
1483 assert_eq!(ids.len(), 2);
1484 assert_ne!(ids[0], ids[1]);
1485 }
1486
1487 #[test]
1488 fn backbone_client_disconnect() {
1489 let port = find_free_port();
1490 let (tx, rx) = crate::event::channel();
1491 let next_id = Arc::new(AtomicU64::new(8400));
1492
1493 let config = make_server_config(port, 84, None, None, None, BackboneAbuseConfig::default());
1494
1495 start(config, tx, next_id).unwrap();
1496 thread::sleep(Duration::from_millis(50));
1497
1498 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1499
1500 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1502
1503 drop(client);
1505
1506 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1508 assert!(
1509 matches!(event, Event::InterfaceDown(InterfaceId(8400))),
1510 "expected InterfaceDown(8400), got {:?}",
1511 event
1512 );
1513 }
1514
1515 #[test]
1516 fn backbone_epoll_multiplexing() {
1517 let port = find_free_port();
1518 let (tx, rx) = crate::event::channel();
1519 let next_id = Arc::new(AtomicU64::new(8500));
1520
1521 let config = make_server_config(port, 85, None, None, None, BackboneAbuseConfig::default());
1522
1523 start(config, tx, next_id).unwrap();
1524 thread::sleep(Duration::from_millis(50));
1525
1526 let mut client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1527 let mut client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1528
1529 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1531 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1532
1533 let payload1: Vec<u8> = (0..24).collect();
1535 let payload2: Vec<u8> = (100..130).collect();
1536 client1.write_all(&hdlc::frame(&payload1)).unwrap();
1537 client2.write_all(&hdlc::frame(&payload2)).unwrap();
1538
1539 let mut received = Vec::new();
1541 for _ in 0..2 {
1542 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1543 match event {
1544 Event::Frame { data, .. } => received.push(data),
1545 other => panic!("expected Frame, got {:?}", other),
1546 }
1547 }
1548 assert!(received.contains(&payload1));
1549 assert!(received.contains(&payload2));
1550 }
1551
1552 #[test]
1553 fn backbone_bind_port() {
1554 let port = find_free_port();
1555 let (tx, _rx) = crate::event::channel();
1556 let next_id = Arc::new(AtomicU64::new(8600));
1557
1558 let config = make_server_config(port, 86, None, None, None, BackboneAbuseConfig::default());
1559
1560 start(config, tx, next_id).unwrap();
1562 }
1563
1564 #[test]
1565 fn backbone_hdlc_fragmented() {
1566 let port = find_free_port();
1567 let (tx, rx) = crate::event::channel();
1568 let next_id = Arc::new(AtomicU64::new(8700));
1569
1570 let config = make_server_config(port, 87, None, None, None, BackboneAbuseConfig::default());
1571
1572 start(config, tx, next_id).unwrap();
1573 thread::sleep(Duration::from_millis(50));
1574
1575 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1576 client.set_nodelay(true).unwrap();
1577
1578 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1580
1581 let payload: Vec<u8> = (0..32).collect();
1583 let framed = hdlc::frame(&payload);
1584 let mid = framed.len() / 2;
1585
1586 client.write_all(&framed[..mid]).unwrap();
1587 thread::sleep(Duration::from_millis(50));
1588 client.write_all(&framed[mid..]).unwrap();
1589
1590 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1592 match event {
1593 Event::Frame { data, .. } => {
1594 assert_eq!(data, payload);
1595 }
1596 other => panic!("expected Frame, got {:?}", other),
1597 }
1598 }
1599
1600 fn make_client_config(port: u16, id: u64) -> BackboneClientConfig {
1605 BackboneClientConfig {
1606 name: format!("test-bb-client-{}", port),
1607 target_host: "127.0.0.1".into(),
1608 target_port: port,
1609 interface_id: InterfaceId(id),
1610 reconnect_wait: Duration::from_millis(100),
1611 max_reconnect_tries: Some(2),
1612 connect_timeout: Duration::from_secs(2),
1613 transport_identity: None,
1614 runtime: Arc::new(Mutex::new(BackboneClientRuntime {
1615 reconnect_wait: Duration::from_millis(100),
1616 max_reconnect_tries: Some(2),
1617 connect_timeout: Duration::from_secs(2),
1618 })),
1619 }
1620 }
1621
1622 #[test]
1623 fn backbone_client_connect() {
1624 let port = find_free_port();
1625 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1626 let (tx, rx) = crate::event::channel();
1627
1628 let config = make_client_config(port, 9000);
1629 let _writer = start_client(config, tx).unwrap();
1630
1631 let _server_stream = listener.accept().unwrap();
1632
1633 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1634 assert!(matches!(event, Event::InterfaceUp(InterfaceId(9000), _, _)));
1635 }
1636
1637 #[test]
1638 fn backbone_client_receive_frame() {
1639 let port = find_free_port();
1640 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1641 let (tx, rx) = crate::event::channel();
1642
1643 let config = make_client_config(port, 9100);
1644 let _writer = start_client(config, tx).unwrap();
1645
1646 let (mut server_stream, _) = listener.accept().unwrap();
1647
1648 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1650
1651 let payload: Vec<u8> = (0..32).collect();
1653 server_stream.write_all(&hdlc::frame(&payload)).unwrap();
1654
1655 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1656 match event {
1657 Event::Frame {
1658 interface_id,
1659 data,
1660 rssi: _,
1661 snr: _,
1662 } => {
1663 assert_eq!(interface_id, InterfaceId(9100));
1664 assert_eq!(data, payload);
1665 }
1666 other => panic!("expected Frame, got {:?}", other),
1667 }
1668 }
1669
1670 #[test]
1671 fn backbone_client_send_frame() {
1672 let port = find_free_port();
1673 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1674 let (tx, _rx) = crate::event::channel();
1675
1676 let config = make_client_config(port, 9200);
1677 let mut writer = start_client(config, tx).unwrap();
1678
1679 let (mut server_stream, _) = listener.accept().unwrap();
1680 server_stream
1681 .set_read_timeout(Some(Duration::from_secs(2)))
1682 .unwrap();
1683
1684 let payload: Vec<u8> = (0..24).collect();
1685 writer.send_frame(&payload).unwrap();
1686
1687 let mut buf = [0u8; 256];
1688 let n = server_stream.read(&mut buf).unwrap();
1689 let expected = hdlc::frame(&payload);
1690 assert_eq!(&buf[..n], &expected[..]);
1691 }
1692
1693 #[test]
1694 fn backbone_max_connections_rejects_excess() {
1695 let port = find_free_port();
1696 let (tx, rx) = crate::event::channel();
1697 let next_id = Arc::new(AtomicU64::new(8800));
1698
1699 let config = make_server_config(
1700 port,
1701 88,
1702 Some(2),
1703 None,
1704 None,
1705 BackboneAbuseConfig::default(),
1706 );
1707
1708 start(config, tx, next_id).unwrap();
1709 thread::sleep(Duration::from_millis(50));
1710
1711 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1713 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1714
1715 for _ in 0..2 {
1717 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1718 assert!(matches!(event, Event::InterfaceUp(_, _, _)));
1719 }
1720
1721 let client3 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1723 client3
1724 .set_read_timeout(Some(Duration::from_millis(500)))
1725 .unwrap();
1726
1727 thread::sleep(Duration::from_millis(100));
1729
1730 let result = recv_non_peer_event(&rx, Duration::from_millis(500));
1732 assert!(
1733 result.is_err(),
1734 "expected no InterfaceUp for rejected connection, got {:?}",
1735 result
1736 );
1737 }
1738
1739 #[test]
1740 fn backbone_max_connections_allows_after_disconnect() {
1741 let port = find_free_port();
1742 let (tx, rx) = crate::event::channel();
1743 let next_id = Arc::new(AtomicU64::new(8900));
1744
1745 let config = make_server_config(
1746 port,
1747 89,
1748 Some(1),
1749 None,
1750 None,
1751 BackboneAbuseConfig::default(),
1752 );
1753
1754 start(config, tx, next_id).unwrap();
1755 thread::sleep(Duration::from_millis(50));
1756
1757 let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1759 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1760 assert!(matches!(event, Event::InterfaceUp(_, _, _)));
1761
1762 drop(client1);
1764
1765 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1767 assert!(matches!(event, Event::InterfaceDown(_)));
1768
1769 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1771 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1772 assert!(
1773 matches!(event, Event::InterfaceUp(_, _, _)),
1774 "expected InterfaceUp after slot freed, got {:?}",
1775 event
1776 );
1777 }
1778
1779 #[test]
1780 fn backbone_client_reconnect() {
1781 let port = find_free_port();
1782 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1783 listener.set_nonblocking(false).unwrap();
1784 let (tx, rx) = crate::event::channel();
1785
1786 let config = make_client_config(port, 9300);
1787 let _writer = start_client(config, tx).unwrap();
1788
1789 let (server_stream, _) = listener.accept().unwrap();
1791
1792 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1794
1795 drop(server_stream);
1796
1797 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1799 assert!(matches!(event, Event::InterfaceDown(InterfaceId(9300))));
1800
1801 let _server_stream2 = listener.accept().unwrap();
1803
1804 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1806 assert!(matches!(event, Event::InterfaceUp(InterfaceId(9300), _, _)));
1807 }
1808
1809 #[test]
1810 fn backbone_idle_timeout_disconnects_silent_client() {
1811 let port = find_free_port();
1812 let (tx, rx) = crate::event::channel();
1813 let next_id = Arc::new(AtomicU64::new(9400));
1814
1815 let config = make_server_config(
1816 port,
1817 94,
1818 None,
1819 Some(Duration::from_millis(150)),
1820 None,
1821 BackboneAbuseConfig::default(),
1822 );
1823
1824 start(config, tx, next_id).unwrap();
1825 thread::sleep(Duration::from_millis(50));
1826
1827 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1828
1829 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1830 let client_id = match event {
1831 Event::InterfaceUp(id, _, _) => id,
1832 other => panic!("expected InterfaceUp, got {:?}", other),
1833 };
1834
1835 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1836 assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1837 }
1838
1839 #[test]
1840 fn backbone_idle_timeout_ignores_client_after_data() {
1841 let port = find_free_port();
1842 let (tx, rx) = crate::event::channel();
1843 let next_id = Arc::new(AtomicU64::new(9500));
1844
1845 let config = make_server_config(
1846 port,
1847 95,
1848 None,
1849 Some(Duration::from_millis(200)),
1850 None,
1851 BackboneAbuseConfig::default(),
1852 );
1853
1854 start(config, tx, next_id).unwrap();
1855 thread::sleep(Duration::from_millis(50));
1856
1857 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1858
1859 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1860 let client_id = match event {
1861 Event::InterfaceUp(id, _, _) => id,
1862 other => panic!("expected InterfaceUp, got {:?}", other),
1863 };
1864
1865 client.write_all(&hdlc::frame(&[1u8; 24])).unwrap();
1866
1867 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1868 match event {
1869 Event::Frame {
1870 interface_id,
1871 data,
1872 rssi: _,
1873 snr: _,
1874 } => {
1875 assert_eq!(interface_id, client_id);
1876 assert_eq!(data, vec![1u8; 24]);
1877 }
1878 other => panic!("expected Frame, got {:?}", other),
1879 }
1880
1881 let result = recv_non_peer_event(&rx, Duration::from_millis(500));
1882 assert!(
1883 result.is_err(),
1884 "expected no InterfaceDown after client sent data, got {:?}",
1885 result
1886 );
1887 }
1888
1889 #[test]
1890 fn backbone_runtime_idle_timeout_updates_live() {
1891 let port = find_free_port();
1892 let (tx, rx) = crate::event::channel();
1893 let next_id = Arc::new(AtomicU64::new(9650));
1894
1895 let config = make_server_config(port, 97, None, None, None, BackboneAbuseConfig::default());
1896 let runtime = Arc::clone(&config.runtime);
1897
1898 start(config, tx, next_id).unwrap();
1899 thread::sleep(Duration::from_millis(50));
1900
1901 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1902 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1903 let client_id = match event {
1904 Event::InterfaceUp(id, _, _) => id,
1905 other => panic!("expected InterfaceUp, got {:?}", other),
1906 };
1907
1908 {
1909 let mut runtime = runtime.lock().unwrap();
1910 runtime.idle_timeout = Some(Duration::from_millis(150));
1911 }
1912
1913 let event = recv_non_peer_event(&rx, Duration::from_secs(4)).unwrap();
1914 assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1915 }
1916
1917 #[test]
1918 fn backbone_write_stall_timeout_disconnects_unwritable_client() {
1919 let port = find_free_port();
1920 let (tx, rx) = crate::event::channel();
1921 let next_id = Arc::new(AtomicU64::new(9660));
1922
1923 let config = make_server_config(
1924 port,
1925 98,
1926 None,
1927 None,
1928 Some(Duration::from_millis(50)),
1929 BackboneAbuseConfig::default(),
1930 );
1931
1932 start(config, tx, next_id).unwrap();
1933 thread::sleep(Duration::from_millis(50));
1934
1935 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1936 client
1937 .set_read_timeout(Some(Duration::from_millis(100)))
1938 .unwrap();
1939 let sock = SockRef::from(&client);
1940 sock.set_recv_buffer_size(4096).ok();
1941
1942 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1943 let (client_id, mut writer) = match event {
1944 Event::InterfaceUp(id, Some(writer), _) => (id, writer),
1945 other => panic!("expected InterfaceUp with writer, got {:?}", other),
1946 };
1947
1948 let payload = vec![0x55; 512 * 1024];
1949 let deadline = Instant::now() + Duration::from_secs(3);
1950 let mut stalled = false;
1951 while Instant::now() < deadline {
1952 match writer.send_frame(&payload) {
1953 Ok(()) => {}
1954 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1955 thread::sleep(Duration::from_millis(10));
1956 }
1957 Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {
1958 stalled = true;
1959 break;
1960 }
1961 Err(e) => panic!("unexpected send error: {}", e),
1962 }
1963 }
1964
1965 assert!(stalled, "expected writer to time out on persistent stall");
1966 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1967 assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1968 }
1969
1970 fn wait_for<F>(rx: &mpsc::Receiver<Event>, timeout: Duration, mut pred: F) -> Option<Event>
1972 where
1973 F: FnMut(&Event) -> bool,
1974 {
1975 let deadline = Instant::now() + timeout;
1976 loop {
1977 let remaining = deadline.saturating_duration_since(Instant::now());
1978 if remaining.is_zero() {
1979 return None;
1980 }
1981 match rx.recv_timeout(remaining) {
1982 Ok(event) if pred(&event) => return Some(event),
1983 Ok(_) => continue,
1984 Err(_) => return None,
1985 }
1986 }
1987 }
1988
1989 #[test]
1990 fn backbone_write_stall_emits_peer_events() {
1991 let port = find_free_port();
1992 let (tx, rx) = crate::event::channel();
1993 let next_id = Arc::new(AtomicU64::new(9700));
1994
1995 let config = make_server_config(
1996 port,
1997 97,
1998 None,
1999 None,
2000 Some(Duration::from_millis(50)), BackboneAbuseConfig::default(),
2002 );
2003
2004 start(config, tx, next_id).unwrap();
2005 thread::sleep(Duration::from_millis(50));
2006
2007 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
2009 client
2010 .set_read_timeout(Some(Duration::from_millis(100)))
2011 .unwrap();
2012 let sock = SockRef::from(&client);
2013 sock.set_recv_buffer_size(4096).ok();
2014
2015 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
2017 let mut writer = match event {
2018 Event::InterfaceUp(_, Some(w), _) => w,
2019 other => panic!("expected InterfaceUp with writer, got {:?}", other),
2020 };
2021
2022 let payload = vec![0x55; 512 * 1024];
2024 let deadline = Instant::now() + Duration::from_secs(3);
2025 while Instant::now() < deadline {
2026 match writer.send_frame(&payload) {
2027 Ok(()) | Err(_) => {
2028 if Instant::now() + Duration::from_millis(10) > deadline {
2029 break;
2030 }
2031 thread::sleep(Duration::from_millis(10));
2032 }
2033 }
2034 }
2035
2036 let stall_event = wait_for(&rx, Duration::from_secs(3), |e| {
2038 matches!(e, Event::BackbonePeerWriteStall { .. })
2039 });
2040 assert!(
2041 stall_event.is_some(),
2042 "expected BackbonePeerWriteStall event"
2043 );
2044 }
2045
2046 #[test]
2047 fn backbone_blacklisted_peer_rejected_on_connect() {
2048 let port = find_free_port();
2049 let (tx, rx) = crate::event::channel();
2050 let next_id = Arc::new(AtomicU64::new(9800));
2051
2052 let config = make_server_config(port, 98, None, None, None, BackboneAbuseConfig::default());
2053 let peer_state = config.peer_state.clone();
2054
2055 start(config, tx.clone(), next_id).unwrap();
2056 thread::sleep(Duration::from_millis(50));
2057
2058 let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
2060 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
2061 assert!(
2062 matches!(event, Event::InterfaceUp(_, _, _)),
2063 "first connection should succeed"
2064 );
2065 drop(client1);
2066
2067 thread::sleep(Duration::from_millis(100));
2069 while rx.try_recv().is_ok() {}
2070
2071 peer_state.lock().unwrap().blacklist(
2073 "127.0.0.1".parse().unwrap(),
2074 Duration::from_secs(60),
2075 "test blacklist".into(),
2076 );
2077
2078 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
2080 thread::sleep(Duration::from_millis(200));
2082
2083 let event = rx.try_recv();
2085 match event {
2086 Ok(Event::InterfaceUp(_, _, _)) => {
2087 panic!("blacklisted peer should not get InterfaceUp")
2088 }
2089 _ => {} }
2091 }
2092
2093 #[test]
2094 fn backbone_parse_config_reads_abuse_settings() {
2095 let factory = BackboneInterfaceFactory;
2096 let mut params = HashMap::new();
2097 params.insert("listen_ip".into(), "127.0.0.1".into());
2098 params.insert("listen_port".into(), "4242".into());
2099 params.insert("idle_timeout".into(), "15".into());
2100 params.insert("write_stall_timeout".into(), "45".into());
2101 params.insert("max_penalty_duration".into(), "3600".into());
2102
2103 let config = factory
2104 .parse_config("test-backbone", InterfaceId(97), ¶ms)
2105 .unwrap();
2106 let mode = *config.into_any().downcast::<BackboneMode>().unwrap();
2107
2108 match mode {
2109 BackboneMode::Server(config) => {
2110 assert_eq!(config.listen_ip, "127.0.0.1");
2111 assert_eq!(config.listen_port, 4242);
2112 assert_eq!(config.idle_timeout, Some(Duration::from_secs(15)));
2113 assert_eq!(config.write_stall_timeout, Some(Duration::from_secs(45)));
2114 assert_eq!(
2115 config.abuse.max_penalty_duration,
2116 Some(Duration::from_secs(3600))
2117 );
2118 }
2119 BackboneMode::Client(_, _) => panic!("expected server config"),
2120 }
2121 }
2122
2123 #[test]
2124 fn backbone_parse_config_reads_client_priority() {
2125 let factory = BackboneInterfaceFactory;
2126 let mut params = HashMap::new();
2127 params.insert("remote".into(), "example.com".into());
2128 params.insert("target_port".into(), "4242".into());
2129 params.insert("priority".into(), "87".into());
2130
2131 let config = factory
2132 .parse_config("test-backbone-client", InterfaceId(96), ¶ms)
2133 .unwrap();
2134 let mode = *config.into_any().downcast::<BackboneMode>().unwrap();
2135
2136 match mode {
2137 BackboneMode::Client(_, priority) => assert_eq!(priority, 87),
2138 BackboneMode::Server(_) => panic!("expected client config"),
2139 }
2140 }
2141
2142 #[test]
2143 fn backbone_parse_config_defaults_client_priority() {
2144 let factory = BackboneInterfaceFactory;
2145 let mut params = HashMap::new();
2146 params.insert("remote".into(), "example.com".into());
2147
2148 let config = factory
2149 .parse_config("test-backbone-client", InterfaceId(95), ¶ms)
2150 .unwrap();
2151 let mode = *config.into_any().downcast::<BackboneMode>().unwrap();
2152
2153 match mode {
2154 BackboneMode::Client(_, priority) => assert_eq!(priority, 60),
2155 BackboneMode::Server(_) => panic!("expected client config"),
2156 }
2157 }
2158
2159 #[test]
2160 fn backbone_parse_config_rejects_invalid_client_priority() {
2161 let factory = BackboneInterfaceFactory;
2162 for value in ["-1", "101", "fast"] {
2163 let mut params = HashMap::new();
2164 params.insert("remote".into(), "example.com".into());
2165 params.insert("priority".into(), value.into());
2166
2167 let err = match factory.parse_config("test-backbone-client", InterfaceId(94), ¶ms) {
2168 Ok(_) => panic!("priority {value} should be rejected"),
2169 Err(err) => err,
2170 };
2171 assert!(
2172 err.contains("priority"),
2173 "unexpected error for {value}: {err}"
2174 );
2175 }
2176 }
2177}