1use std::collections::HashMap;
13use std::hash::{BuildHasher, Hasher};
14use std::io::{self, Read, Write};
15use std::net::{IpAddr, Shutdown, TcpListener, TcpStream, ToSocketAddrs};
16use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
17use std::sync::{Arc, Mutex};
18use std::thread;
19use std::time::{Duration, Instant};
20
21use polling::{Event as PollEvent, Events, Poller};
22use socket2::{SockRef, TcpKeepalive};
23
24use rns_core::constants;
25use rns_core::transport::types::{IngressControlConfig, InterfaceId, InterfaceInfo};
26
27use crate::event::{Event, EventSender};
28use crate::hdlc;
29use crate::interface::{
30 lock_or_recover, InterfaceConfigData, InterfaceFactory, StartContext, StartResult, Writer,
31};
32use crate::BackbonePeerStateEntry;
33
34#[allow(dead_code)]
36const HW_MTU: usize = 1_048_576;
37
38#[derive(Debug, Clone)]
40pub struct BackboneConfig {
41 pub name: String,
42 pub listen_ip: String,
43 pub listen_port: u16,
44 pub interface_id: InterfaceId,
45 pub max_connections: Option<usize>,
46 pub idle_timeout: Option<Duration>,
47 pub write_stall_timeout: Option<Duration>,
48 pub abuse: BackboneAbuseConfig,
49 pub ingress_control: IngressControlConfig,
50 pub runtime: Arc<Mutex<BackboneServerRuntime>>,
51 pub peer_state: Arc<Mutex<BackbonePeerMonitor>>,
52}
53
54#[derive(Debug, Clone, Default)]
56pub struct BackboneAbuseConfig {
57 pub max_penalty_duration: Option<Duration>,
58}
59
60#[derive(Debug, Clone)]
62pub struct BackboneServerRuntime {
63 pub max_connections: Option<usize>,
64 pub idle_timeout: Option<Duration>,
65 pub write_stall_timeout: Option<Duration>,
66 pub abuse: BackboneAbuseConfig,
67}
68
69impl BackboneServerRuntime {
70 pub fn from_config(config: &BackboneConfig) -> Self {
71 Self {
72 max_connections: config.max_connections,
73 idle_timeout: config.idle_timeout,
74 write_stall_timeout: config.write_stall_timeout,
75 abuse: config.abuse.clone(),
76 }
77 }
78}
79
80#[derive(Debug, Clone)]
81pub struct BackboneRuntimeConfigHandle {
82 pub interface_name: String,
83 pub runtime: Arc<Mutex<BackboneServerRuntime>>,
84 pub startup: BackboneServerRuntime,
85}
86
87#[derive(Debug, Clone)]
88pub struct BackbonePeerStateHandle {
89 pub interface_id: InterfaceId,
90 pub interface_name: String,
91 pub peer_state: Arc<Mutex<BackbonePeerMonitor>>,
92}
93
94impl Default for BackboneConfig {
95 fn default() -> Self {
96 let mut config = BackboneConfig {
97 name: String::new(),
98 listen_ip: "0.0.0.0".into(),
99 listen_port: 0,
100 interface_id: InterfaceId(0),
101 max_connections: None,
102 idle_timeout: None,
103 write_stall_timeout: None,
104 abuse: BackboneAbuseConfig::default(),
105 ingress_control: IngressControlConfig::enabled(),
106 runtime: Arc::new(Mutex::new(BackboneServerRuntime {
107 max_connections: None,
108 idle_timeout: None,
109 write_stall_timeout: None,
110 abuse: BackboneAbuseConfig::default(),
111 })),
112 peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
113 };
114 let startup = BackboneServerRuntime::from_config(&config);
115 config.runtime = Arc::new(Mutex::new(startup));
116 config
117 }
118}
119
120const MAX_PENDING_BYTES: usize = 512 * 1024;
123
124struct BackboneWriter {
126 stream: TcpStream,
127 runtime: Arc<Mutex<BackboneServerRuntime>>,
128 interface_name: String,
129 interface_id: InterfaceId,
130 event_tx: EventSender,
131 pending: Vec<u8>,
132 stall_started: Option<Instant>,
133 disconnect_notified: bool,
134 write_stall_flag: Arc<AtomicBool>,
135}
136
137impl Writer for BackboneWriter {
138 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
139 let write_stall_timeout =
140 lock_or_recover(&self.runtime, "backbone runtime").write_stall_timeout;
141 if !self.pending.is_empty() {
142 self.flush_pending(write_stall_timeout)?;
143 if !self.pending.is_empty() {
144 return Err(io::Error::new(
145 io::ErrorKind::WouldBlock,
146 "backbone writer still stalled",
147 ));
148 }
149 }
150
151 let frame = hdlc::frame(data);
152 self.write_buffer(&frame, write_stall_timeout)
153 }
154}
155
156impl BackboneWriter {
157 fn write_buffer(
158 &mut self,
159 data: &[u8],
160 write_stall_timeout: Option<Duration>,
161 ) -> io::Result<()> {
162 let mut written = 0usize;
163 while written < data.len() {
164 match self.stream.write(&data[written..]) {
165 Ok(0) => {
166 return Err(io::Error::new(
167 io::ErrorKind::WriteZero,
168 "backbone writer wrote zero bytes",
169 ))
170 }
171 Ok(n) => {
172 written += n;
173 self.stall_started = None;
174 }
175 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
176 let now = Instant::now();
177 let started = self.stall_started.get_or_insert(now);
178 if let Some(timeout) = write_stall_timeout {
179 if now.duration_since(*started) >= timeout {
180 return Err(self.disconnect_for_write_stall(timeout));
181 }
182 }
183 if self.pending.len() + data[written..].len() > MAX_PENDING_BYTES {
184 return Err(self.disconnect_for_write_stall(
185 write_stall_timeout.unwrap_or(Duration::from_secs(30)),
186 ));
187 }
188 self.pending.extend_from_slice(&data[written..]);
189 return Err(io::Error::new(
190 io::ErrorKind::WouldBlock,
191 "backbone writer would block",
192 ));
193 }
194 Err(e) => return Err(e),
195 }
196 }
197 Ok(())
198 }
199
200 fn flush_pending(&mut self, write_stall_timeout: Option<Duration>) -> io::Result<()> {
201 if self.pending.is_empty() {
202 return Ok(());
203 }
204
205 let pending = std::mem::take(&mut self.pending);
206 match self.write_buffer(&pending, write_stall_timeout) {
207 Ok(()) => Ok(()),
208 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(()),
209 Err(e) => Err(e),
210 }
211 }
212
213 fn disconnect_for_write_stall(&mut self, timeout: Duration) -> io::Error {
214 if !self.disconnect_notified {
215 log::warn!(
216 "[{}] backbone client {} disconnected due to write stall timeout ({:?})",
217 self.interface_name,
218 self.interface_id.0,
219 timeout
220 );
221 self.write_stall_flag.store(true, Ordering::Relaxed);
222 let _ = self.stream.shutdown(Shutdown::Both);
223 let _ = self.event_tx.send(Event::InterfaceDown(self.interface_id));
224 self.disconnect_notified = true;
225 }
226 io::Error::new(
227 io::ErrorKind::TimedOut,
228 format!("backbone writer stalled for {:?}", timeout),
229 )
230 }
231}
232
233pub fn start(config: BackboneConfig, tx: EventSender, next_id: Arc<AtomicU64>) -> io::Result<()> {
235 let addr = format!("{}:{}", config.listen_ip, config.listen_port);
236 let listener = TcpListener::bind(&addr)?;
237 listener.set_nonblocking(true)?;
238
239 log::info!(
240 "[{}] backbone server listening on {}",
241 config.name,
242 listener
243 .local_addr()
244 .unwrap_or_else(|_| std::net::SocketAddr::from(([0, 0, 0, 0], config.listen_port)))
245 );
246
247 let name = config.name.clone();
248 let server_interface_id = config.interface_id;
249 let runtime = Arc::clone(&config.runtime);
250 let peer_state = Arc::clone(&config.peer_state);
251 let ingress_control = config.ingress_control;
252 thread::Builder::new()
253 .name(format!("backbone-poll-{}", config.interface_id.0))
254 .spawn(move || {
255 if let Err(e) = poll_loop(
256 listener,
257 name,
258 server_interface_id,
259 tx,
260 next_id,
261 runtime,
262 peer_state,
263 ingress_control,
264 ) {
265 log::error!("backbone poll loop error: {}", e);
266 }
267 })?;
268
269 Ok(())
270}
271
272struct ClientState {
274 id: InterfaceId,
275 peer_ip: IpAddr,
276 peer_port: u16,
277 stream: TcpStream,
278 decoder: hdlc::Decoder,
279 connected_at: Instant,
280 has_received_data: bool,
281 write_stall_flag: Arc<AtomicBool>,
282}
283
284#[derive(Debug, Clone)]
285struct PeerBehaviorState {
286 blacklisted_until: Option<Instant>,
287 blacklist_reason: Option<String>,
288 reject_count: u64,
289 connected_count: usize,
290}
291
292impl PeerBehaviorState {
293 fn new() -> Self {
294 Self {
295 blacklisted_until: None,
296 blacklist_reason: None,
297 reject_count: 0,
298 connected_count: 0,
299 }
300 }
301}
302
303#[derive(Debug, Clone, Default)]
304pub struct BackbonePeerMonitor {
305 peers: HashMap<IpAddr, PeerBehaviorState>,
306}
307
308impl BackbonePeerMonitor {
309 pub fn new() -> Self {
310 Self {
311 peers: HashMap::new(),
312 }
313 }
314
315 fn upsert_snapshot(&mut self, peers: &HashMap<IpAddr, PeerBehaviorState>) {
316 let mut merged = self.peers.clone();
317
318 for (peer_ip, state) in peers {
319 let entry = merged
320 .entry(*peer_ip)
321 .or_insert_with(PeerBehaviorState::new);
322 entry.connected_count = state.connected_count;
323 entry.reject_count = state.reject_count;
324 if state.blacklisted_until.is_some() {
325 entry.blacklisted_until = state.blacklisted_until;
326 entry.blacklist_reason = state.blacklist_reason.clone();
327 }
328 }
329
330 merged.retain(|peer_ip, state| {
331 peers.contains_key(peer_ip)
332 || state.blacklisted_until.is_some()
333 || state.reject_count > 0
334 });
335 self.peers = merged;
336 }
337
338 fn sync_into(&self, peers: &mut HashMap<IpAddr, PeerBehaviorState>) {
339 for (peer_ip, state) in &self.peers {
340 let entry = peers.entry(*peer_ip).or_insert_with(PeerBehaviorState::new);
341 entry.blacklisted_until = state.blacklisted_until;
342 entry.blacklist_reason = state.blacklist_reason.clone();
343 entry.reject_count = state.reject_count;
344 }
345
346 peers.retain(|peer_ip, state| {
347 if state.connected_count > 0 {
348 return true;
349 }
350 self.peers.contains_key(peer_ip)
351 });
352 }
353
354 pub fn list(&self, interface_name: &str) -> Vec<BackbonePeerStateEntry> {
355 let now = Instant::now();
356 let mut entries: Vec<BackbonePeerStateEntry> = self
357 .peers
358 .iter()
359 .map(|(peer_ip, state)| BackbonePeerStateEntry {
360 interface_name: interface_name.to_string(),
361 peer_ip: *peer_ip,
362 connected_count: state.connected_count,
363 blacklisted_remaining_secs: state
364 .blacklisted_until
365 .and_then(|until| (until > now).then(|| (until - now).as_secs_f64())),
366 blacklist_reason: state.blacklist_reason.clone(),
367 reject_count: state.reject_count,
368 })
369 .collect();
370 entries.sort_by(|a, b| a.peer_ip.cmp(&b.peer_ip));
371 entries
372 }
373
374 pub fn clear(&mut self, peer_ip: IpAddr) -> bool {
375 self.peers.remove(&peer_ip).is_some()
376 }
377
378 pub fn blacklist(&mut self, peer_ip: IpAddr, duration: Duration, reason: String) -> bool {
379 let state = self
380 .peers
381 .entry(peer_ip)
382 .or_insert_with(PeerBehaviorState::new);
383 state.blacklisted_until = Some(Instant::now() + duration);
384 state.blacklist_reason = Some(reason);
385 true
386 }
387
388 #[cfg(test)]
389 pub fn seed_entry(&mut self, entry: BackbonePeerStateEntry) {
390 let mut state = PeerBehaviorState::new();
391 state.connected_count = entry.connected_count;
392 state.reject_count = entry.reject_count;
393 state.blacklist_reason = entry.blacklist_reason;
394 if let Some(remaining) = entry.blacklisted_remaining_secs {
395 state.blacklisted_until = Some(Instant::now() + Duration::from_secs_f64(remaining));
396 }
397 self.peers.insert(entry.peer_ip, state);
398 }
399}
400
401#[derive(Clone, Copy)]
402enum DisconnectReason {
403 RemoteClosed,
404 IdleTimeout,
405 WriteStall,
406}
407
408fn poll_loop(
410 listener: TcpListener,
411 name: String,
412 server_interface_id: InterfaceId,
413 tx: EventSender,
414 next_id: Arc<AtomicU64>,
415 runtime: Arc<Mutex<BackboneServerRuntime>>,
416 peer_state: Arc<Mutex<BackbonePeerMonitor>>,
417 ingress_control: IngressControlConfig,
418) -> io::Result<()> {
419 let poller = Poller::new()?;
420
421 const LISTENER_KEY: usize = 0;
422
423 unsafe { poller.add(&listener, PollEvent::readable(LISTENER_KEY))? };
425
426 let mut clients: HashMap<usize, ClientState> = HashMap::new();
427 let mut peers: HashMap<IpAddr, PeerBehaviorState> = HashMap::new();
428 let mut events = Events::new();
429 let mut next_key: usize = 1;
430
431 loop {
432 let runtime_snapshot = runtime.lock().unwrap().clone();
433 let max_connections = runtime_snapshot.max_connections;
434 let idle_timeout = runtime_snapshot.idle_timeout;
435 cleanup_peer_state(&mut peers);
436 {
437 let mut monitor = peer_state.lock().unwrap();
438 monitor.sync_into(&mut peers);
439 monitor.upsert_snapshot(&peers);
440 }
441
442 events.clear();
443 poller.wait(&mut events, Some(Duration::from_secs(1)))?;
444
445 for ev in events.iter() {
446 if ev.key == LISTENER_KEY {
447 loop {
449 match listener.accept() {
450 Ok((stream, peer_addr)) => {
451 let peer_ip = peer_addr.ip();
452 let peer_port = peer_addr.port();
453
454 if is_ip_blacklisted(&mut peers, peer_ip) {
455 if let Some(state) = peers.get_mut(&peer_ip) {
456 state.reject_count = state.reject_count.saturating_add(1);
457 }
458 peer_state.lock().unwrap().upsert_snapshot(&peers);
459 log::debug!("[{}] rejecting blacklisted peer {}", name, peer_addr);
460 drop(stream);
461 continue;
462 }
463
464 if let Some(max) = max_connections {
465 if clients.len() >= max {
466 log::warn!(
467 "[{}] max connections ({}) reached, rejecting {}",
468 name,
469 max,
470 peer_addr
471 );
472 drop(stream);
473 continue;
474 }
475 }
476
477 stream.set_nonblocking(true).ok();
478 stream.set_nodelay(true).ok();
479 set_tcp_keepalive(&stream).ok();
480
481 #[cfg(target_os = "macos")]
483 {
484 let sock = SockRef::from(&stream);
485 sock.set_nosigpipe(true).ok();
486 }
487
488 let key = next_key;
489 next_key += 1;
490 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
491
492 log::info!(
493 "[{}] backbone client connected: {} → id {}",
494 name,
495 peer_addr,
496 client_id.0
497 );
498
499 if let Err(e) = unsafe { poller.add(&stream, PollEvent::readable(key)) }
502 {
503 log::warn!("[{}] failed to add client to poller: {}", name, e);
504 continue; }
506
507 let writer_stream = match stream.try_clone() {
509 Ok(s) => s,
510 Err(e) => {
511 log::warn!("[{}] failed to clone client stream: {}", name, e);
512 let _ = poller.delete(&stream);
513 continue; }
515 };
516 let write_stall_flag = Arc::new(AtomicBool::new(false));
517 let writer: Box<dyn Writer> = Box::new(BackboneWriter {
518 stream: writer_stream,
519 runtime: Arc::clone(&runtime),
520 interface_name: name.clone(),
521 interface_id: client_id,
522 event_tx: tx.clone(),
523 pending: Vec::new(),
524 stall_started: None,
525 disconnect_notified: false,
526 write_stall_flag: Arc::clone(&write_stall_flag),
527 });
528
529 clients.insert(
530 key,
531 ClientState {
532 id: client_id,
533 peer_ip,
534 peer_port,
535 stream,
536 decoder: hdlc::Decoder::new(),
537 connected_at: Instant::now(),
538 has_received_data: false,
539 write_stall_flag,
540 },
541 );
542 peers
543 .entry(peer_ip)
544 .or_insert_with(PeerBehaviorState::new)
545 .connected_count += 1;
546 peer_state.lock().unwrap().upsert_snapshot(&peers);
547 let _ = tx.send(Event::BackbonePeerConnected {
548 server_interface_id,
549 peer_interface_id: client_id,
550 peer_ip,
551 peer_port,
552 });
553
554 let info = InterfaceInfo {
555 id: client_id,
556 name: format!("BackboneInterface/{}", client_id.0),
557 mode: constants::MODE_FULL,
558 out_capable: true,
559 in_capable: true,
560 bitrate: Some(1_000_000_000), airtime_profile: None,
562 announce_rate_target: None,
563 announce_rate_grace: 0,
564 announce_rate_penalty: 0.0,
565 announce_cap: constants::ANNOUNCE_CAP,
566 is_local_client: false,
567 wants_tunnel: false,
568 tunnel_id: None,
569 mtu: 65535,
570 ia_freq: 0.0,
571 started: 0.0,
572 ingress_control,
573 };
574
575 if tx
576 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
577 .is_err()
578 {
579 cleanup(&poller, &clients, &listener);
581 return Ok(());
582 }
583 }
584 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
585 Err(e) => {
586 log::warn!("[{}] accept error: {}", name, e);
587 break;
588 }
589 }
590 }
591 poller.modify(&listener, PollEvent::readable(LISTENER_KEY))?;
593 } else if clients.contains_key(&ev.key) {
594 let key = ev.key;
595 let mut should_remove = false;
596 let mut client_id = InterfaceId(0);
597
598 let mut buf = [0u8; 4096];
599 let read_result = {
600 let client = clients.get_mut(&key).unwrap();
601 client.stream.read(&mut buf)
602 };
603
604 match read_result {
605 Ok(0) | Err(_) => {
606 if let Some(c) = clients.get(&key) {
607 client_id = c.id;
608 }
609 should_remove = true;
610 }
611 Ok(n) => {
612 let client = clients.get_mut(&key).unwrap();
613 client_id = client.id;
614 client.has_received_data = true;
615 for frame in client.decoder.feed(&buf[..n]) {
616 if tx
617 .send(Event::Frame {
618 interface_id: client_id,
619 data: frame,
620 })
621 .is_err()
622 {
623 cleanup(&poller, &clients, &listener);
624 return Ok(());
625 }
626 }
627 }
628 }
629
630 if should_remove {
631 let reason = if clients
632 .get(&key)
633 .is_some_and(|c| c.write_stall_flag.load(Ordering::Relaxed))
634 {
635 DisconnectReason::WriteStall
636 } else {
637 DisconnectReason::RemoteClosed
638 };
639 disconnect_client(
640 &poller,
641 &mut clients,
642 &mut peers,
643 &name,
644 server_interface_id,
645 &tx,
646 &peer_state,
647 key,
648 client_id,
649 reason,
650 );
651 } else if let Some(client) = clients.get(&key) {
652 poller.modify(&client.stream, PollEvent::readable(key))?;
654 }
655 }
656 }
657
658 if let Some(timeout) = idle_timeout {
659 let now = Instant::now();
660 let timed_out: Vec<(usize, InterfaceId)> = clients
661 .iter()
662 .filter_map(|(&key, client)| {
663 if client.has_received_data || now.duration_since(client.connected_at) < timeout
664 {
665 None
666 } else {
667 Some((key, client.id))
668 }
669 })
670 .collect();
671
672 for (key, client_id) in timed_out {
673 disconnect_client(
674 &poller,
675 &mut clients,
676 &mut peers,
677 &name,
678 server_interface_id,
679 &tx,
680 &peer_state,
681 key,
682 client_id,
683 DisconnectReason::IdleTimeout,
684 );
685 }
686 }
687 }
688}
689
690fn cleanup_peer_state(peers: &mut HashMap<IpAddr, PeerBehaviorState>) {
691 let now = Instant::now();
692 peers.retain(|_, state| {
693 if matches!(state.blacklisted_until, Some(until) if now >= until) {
694 state.blacklisted_until = None;
695 state.blacklist_reason = None;
696 }
697 state.blacklisted_until.is_some() || state.connected_count > 0 || state.reject_count > 0
698 });
699}
700
701fn is_ip_blacklisted(peers: &mut HashMap<IpAddr, PeerBehaviorState>, peer_ip: IpAddr) -> bool {
702 let now = Instant::now();
703 if let Some(state) = peers.get_mut(&peer_ip) {
704 if let Some(until) = state.blacklisted_until {
705 if now < until {
706 return true;
707 }
708 state.blacklisted_until = None;
709 }
710 }
711 false
712}
713
714fn disconnect_client(
715 poller: &Poller,
716 clients: &mut HashMap<usize, ClientState>,
717 peers: &mut HashMap<IpAddr, PeerBehaviorState>,
718 name: &str,
719 server_interface_id: InterfaceId,
720 tx: &EventSender,
721 peer_state: &Arc<Mutex<BackbonePeerMonitor>>,
722 key: usize,
723 client_id: InterfaceId,
724 reason: DisconnectReason,
725) {
726 let Some(client) = clients.remove(&key) else {
727 return;
728 };
729
730 match reason {
731 DisconnectReason::RemoteClosed => {
732 log::info!("[{}] backbone client {} disconnected", name, client_id.0);
733 }
734 DisconnectReason::IdleTimeout => {
735 log::info!(
736 "[{}] backbone client {} disconnected due to idle timeout",
737 name,
738 client_id.0
739 );
740 }
741 DisconnectReason::WriteStall => {
742 }
744 }
745
746 let _ = poller.delete(&client.stream);
747 let connected_for = client.connected_at.elapsed();
749 let _ = tx.send(Event::BackbonePeerDisconnected {
750 server_interface_id,
751 peer_interface_id: client.id,
752 peer_ip: client.peer_ip,
753 peer_port: client.peer_port,
754 connected_for,
755 had_received_data: client.has_received_data,
756 });
757 match reason {
758 DisconnectReason::IdleTimeout => {
759 let _ = tx.send(Event::BackbonePeerIdleTimeout {
760 server_interface_id,
761 peer_interface_id: client.id,
762 peer_ip: client.peer_ip,
763 peer_port: client.peer_port,
764 connected_for,
765 });
766 }
767 DisconnectReason::WriteStall => {
768 let _ = tx.send(Event::BackbonePeerWriteStall {
769 server_interface_id,
770 peer_interface_id: client.id,
771 peer_ip: client.peer_ip,
772 peer_port: client.peer_port,
773 connected_for,
774 });
775 }
776 DisconnectReason::RemoteClosed => {}
777 }
778
779 if let Some(state) = peers.get_mut(&client.peer_ip) {
780 state.connected_count = state.connected_count.saturating_sub(1);
781 }
782 peer_state.lock().unwrap().upsert_snapshot(peers);
783 if !matches!(reason, DisconnectReason::WriteStall) {
785 let _ = tx.send(Event::InterfaceDown(client_id));
786 }
787}
788
789fn set_tcp_keepalive(stream: &TcpStream) -> io::Result<()> {
790 let sock = SockRef::from(stream);
791 let mut keepalive = TcpKeepalive::new()
792 .with_time(Duration::from_secs(5))
793 .with_interval(Duration::from_secs(2));
794 #[cfg(any(target_os = "linux", target_os = "macos"))]
795 {
796 keepalive = keepalive.with_retries(12);
797 }
798 sock.set_tcp_keepalive(&keepalive)
799}
800
801fn cleanup(poller: &Poller, clients: &HashMap<usize, ClientState>, listener: &TcpListener) {
802 for (_, client) in clients {
803 let _ = poller.delete(&client.stream);
804 }
805 let _ = poller.delete(listener);
806}
807
808#[derive(Debug, Clone)]
814pub struct BackboneClientConfig {
815 pub name: String,
816 pub target_host: String,
817 pub target_port: u16,
818 pub interface_id: InterfaceId,
819 pub reconnect_wait: Duration,
820 pub max_reconnect_tries: Option<u32>,
821 pub connect_timeout: Duration,
822 pub transport_identity: Option<String>,
823 pub runtime: Arc<Mutex<BackboneClientRuntime>>,
824}
825
826#[derive(Debug, Clone)]
827pub struct BackboneClientRuntime {
828 pub reconnect_wait: Duration,
829 pub max_reconnect_tries: Option<u32>,
830 pub connect_timeout: Duration,
831}
832
833impl BackboneClientRuntime {
834 pub fn from_config(config: &BackboneClientConfig) -> Self {
835 Self {
836 reconnect_wait: config.reconnect_wait,
837 max_reconnect_tries: config.max_reconnect_tries,
838 connect_timeout: config.connect_timeout,
839 }
840 }
841}
842
843#[derive(Debug, Clone)]
844pub struct BackboneClientRuntimeConfigHandle {
845 pub interface_name: String,
846 pub runtime: Arc<Mutex<BackboneClientRuntime>>,
847 pub startup: BackboneClientRuntime,
848}
849
850impl Default for BackboneClientConfig {
851 fn default() -> Self {
852 let mut config = BackboneClientConfig {
853 name: String::new(),
854 target_host: "127.0.0.1".into(),
855 target_port: 4242,
856 interface_id: InterfaceId(0),
857 reconnect_wait: Duration::from_secs(5),
858 max_reconnect_tries: None,
859 connect_timeout: Duration::from_secs(5),
860 transport_identity: None,
861 runtime: Arc::new(Mutex::new(BackboneClientRuntime {
862 reconnect_wait: Duration::from_secs(5),
863 max_reconnect_tries: None,
864 connect_timeout: Duration::from_secs(5),
865 })),
866 };
867 let startup = BackboneClientRuntime::from_config(&config);
868 config.runtime = Arc::new(Mutex::new(startup));
869 config
870 }
871}
872
873struct BackboneClientWriter {
875 stream: TcpStream,
876}
877
878impl Writer for BackboneClientWriter {
879 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
880 self.stream.write_all(&hdlc::frame(data))
881 }
882}
883
884fn try_connect_client(config: &BackboneClientConfig) -> io::Result<TcpStream> {
886 let runtime = config.runtime.lock().unwrap().clone();
887 let addr_str = format!("{}:{}", config.target_host, config.target_port);
888 let addr = addr_str
889 .to_socket_addrs()?
890 .next()
891 .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "no addresses resolved"))?;
892
893 let stream = TcpStream::connect_timeout(&addr, runtime.connect_timeout)?;
894 stream.set_nodelay(true)?;
895 set_tcp_keepalive(&stream).ok();
896
897 #[cfg(target_os = "macos")]
899 {
900 let sock = SockRef::from(&stream);
901 sock.set_nosigpipe(true).ok();
902 }
903
904 Ok(stream)
905}
906
907pub fn start_client(config: BackboneClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
909 let stream = try_connect_client(&config)?;
910 let reader_stream = stream.try_clone()?;
911 let writer_stream = stream.try_clone()?;
912
913 let id = config.interface_id;
914 log::info!(
915 "[{}] backbone client connected to {}:{}",
916 config.name,
917 config.target_host,
918 config.target_port
919 );
920
921 let _ = tx.send(Event::InterfaceUp(id, None, None));
923
924 thread::Builder::new()
925 .name(format!("backbone-client-{}", id.0))
926 .spawn(move || {
927 client_reader_loop(reader_stream, config, tx);
928 })?;
929
930 Ok(Box::new(BackboneClientWriter {
931 stream: writer_stream,
932 }))
933}
934
935fn client_reader_loop(mut stream: TcpStream, config: BackboneClientConfig, tx: EventSender) {
938 let id = config.interface_id;
939 let mut decoder = hdlc::Decoder::new();
940 let mut buf = [0u8; 4096];
941
942 loop {
943 match stream.read(&mut buf) {
944 Ok(0) => {
945 log::warn!("[{}] connection closed", config.name);
946 let _ = tx.send(Event::InterfaceDown(id));
947 match client_reconnect(&config, &tx) {
948 Some(new_stream) => {
949 stream = new_stream;
950 decoder = hdlc::Decoder::new();
951 continue;
952 }
953 None => {
954 log::error!("[{}] reconnection failed, giving up", config.name);
955 return;
956 }
957 }
958 }
959 Ok(n) => {
960 for frame in decoder.feed(&buf[..n]) {
961 if tx
962 .send(Event::Frame {
963 interface_id: id,
964 data: frame,
965 })
966 .is_err()
967 {
968 return;
969 }
970 }
971 }
972 Err(e) => {
973 log::warn!("[{}] read error: {}", config.name, e);
974 let _ = tx.send(Event::InterfaceDown(id));
975 match client_reconnect(&config, &tx) {
976 Some(new_stream) => {
977 stream = new_stream;
978 decoder = hdlc::Decoder::new();
979 continue;
980 }
981 None => {
982 log::error!("[{}] reconnection failed, giving up", config.name);
983 return;
984 }
985 }
986 }
987 }
988 }
989}
990
991const MAX_BACKOFF_SHIFT: u32 = 6;
994
995fn client_reconnect(config: &BackboneClientConfig, tx: &EventSender) -> Option<TcpStream> {
999 let mut attempts = 0u32;
1000 loop {
1001 let runtime = config.runtime.lock().unwrap().clone();
1002
1003 let shift = attempts.min(MAX_BACKOFF_SHIFT);
1004 let backoff = runtime.reconnect_wait * 2u32.pow(shift);
1005 let jitter_range = backoff / 4;
1007 let jitter = if jitter_range.as_nanos() > 0 {
1008 let offset = Duration::from_nanos(
1009 (std::hash::RandomState::new().build_hasher().finish()
1010 % jitter_range.as_nanos() as u64)
1011 * 2,
1012 );
1013 if offset > jitter_range {
1014 backoff + (offset - jitter_range)
1015 } else {
1016 backoff - (jitter_range - offset)
1017 }
1018 } else {
1019 backoff
1020 };
1021 thread::sleep(jitter);
1022
1023 attempts += 1;
1024
1025 if let Some(max) = runtime.max_reconnect_tries {
1026 if attempts > max {
1027 let _ = tx.send(Event::InterfaceDown(config.interface_id));
1028 return None;
1029 }
1030 }
1031
1032 log::info!(
1033 "[{}] reconnect attempt {} (backoff {:.1}s) ...",
1034 config.name,
1035 attempts,
1036 jitter.as_secs_f64(),
1037 );
1038
1039 match try_connect_client(config) {
1040 Ok(new_stream) => {
1041 let writer_stream = match new_stream.try_clone() {
1042 Ok(s) => s,
1043 Err(e) => {
1044 log::warn!("[{}] failed to clone stream: {}", config.name, e);
1045 continue;
1046 }
1047 };
1048 log::info!(
1049 "[{}] reconnected after {} attempt(s)",
1050 config.name,
1051 attempts
1052 );
1053 let new_writer: Box<dyn Writer> = Box::new(BackboneClientWriter {
1054 stream: writer_stream,
1055 });
1056 let _ = tx.send(Event::InterfaceUp(
1057 config.interface_id,
1058 Some(new_writer),
1059 None,
1060 ));
1061 return Some(new_stream);
1062 }
1063 Err(e) => {
1064 log::warn!("[{}] reconnect failed: {}", config.name, e);
1065 }
1066 }
1067 }
1068}
1069
1070#[derive(Clone)]
1077pub(crate) enum BackboneMode {
1078 Server(BackboneConfig),
1079 Client(BackboneClientConfig),
1080}
1081
1082pub struct BackboneInterfaceFactory;
1088
1089fn parse_positive_duration_secs(params: &HashMap<String, String>, key: &str) -> Option<Duration> {
1090 params
1091 .get(key)
1092 .and_then(|v| v.parse::<f64>().ok())
1093 .filter(|v| *v > 0.0)
1094 .map(Duration::from_secs_f64)
1095}
1096
1097impl InterfaceFactory for BackboneInterfaceFactory {
1098 fn type_name(&self) -> &str {
1099 "BackboneInterface"
1100 }
1101
1102 fn parse_config(
1103 &self,
1104 name: &str,
1105 id: InterfaceId,
1106 params: &HashMap<String, String>,
1107 ) -> Result<Box<dyn InterfaceConfigData>, String> {
1108 if let Some(target_host) = params.get("remote").or_else(|| params.get("target_host")) {
1109 let target_host = target_host.clone();
1111 let target_port = params
1112 .get("target_port")
1113 .or_else(|| params.get("port"))
1114 .and_then(|v| v.parse().ok())
1115 .unwrap_or(4242);
1116 let transport_identity = params.get("transport_identity").cloned();
1117 Ok(Box::new(BackboneMode::Client(BackboneClientConfig {
1118 name: name.to_string(),
1119 target_host,
1120 target_port,
1121 interface_id: id,
1122 transport_identity,
1123 ..BackboneClientConfig::default()
1124 })))
1125 } else {
1126 let listen_ip = params
1128 .get("listen_ip")
1129 .or_else(|| params.get("device"))
1130 .cloned()
1131 .unwrap_or_else(|| "0.0.0.0".into());
1132 let listen_port = params
1133 .get("listen_port")
1134 .or_else(|| params.get("port"))
1135 .and_then(|v| v.parse().ok())
1136 .unwrap_or(4242);
1137 let max_connections = params.get("max_connections").and_then(|v| v.parse().ok());
1138 let idle_timeout = parse_positive_duration_secs(params, "idle_timeout");
1139 let write_stall_timeout = parse_positive_duration_secs(params, "write_stall_timeout");
1140 let abuse = BackboneAbuseConfig {
1141 max_penalty_duration: parse_positive_duration_secs(params, "max_penalty_duration"),
1142 };
1143 let mut config = BackboneConfig {
1144 name: name.to_string(),
1145 listen_ip,
1146 listen_port,
1147 interface_id: id,
1148 max_connections,
1149 idle_timeout,
1150 write_stall_timeout,
1151 abuse,
1152 ingress_control: IngressControlConfig::enabled(),
1153 runtime: Arc::new(Mutex::new(BackboneServerRuntime {
1154 max_connections: None,
1155 idle_timeout: None,
1156 write_stall_timeout: None,
1157 abuse: BackboneAbuseConfig::default(),
1158 })),
1159 peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
1160 };
1161 let startup = BackboneServerRuntime::from_config(&config);
1162 config.runtime = Arc::new(Mutex::new(startup));
1163 Ok(Box::new(BackboneMode::Server(config)))
1164 }
1165 }
1166
1167 fn start(
1168 &self,
1169 config: Box<dyn InterfaceConfigData>,
1170 ctx: StartContext,
1171 ) -> io::Result<StartResult> {
1172 let mode = *config.into_any().downcast::<BackboneMode>().map_err(|_| {
1173 io::Error::new(
1174 io::ErrorKind::InvalidData,
1175 "wrong config type for BackboneInterface",
1176 )
1177 })?;
1178
1179 match mode {
1180 BackboneMode::Client(cfg) => {
1181 let id = cfg.interface_id;
1182 let name = cfg.name.clone();
1183 let info = InterfaceInfo {
1184 id,
1185 name,
1186 mode: ctx.mode,
1187 out_capable: true,
1188 in_capable: true,
1189 bitrate: Some(1_000_000_000),
1190 airtime_profile: None,
1191 announce_rate_target: None,
1192 announce_rate_grace: 0,
1193 announce_rate_penalty: 0.0,
1194 announce_cap: constants::ANNOUNCE_CAP,
1195 is_local_client: false,
1196 wants_tunnel: false,
1197 tunnel_id: None,
1198 mtu: 65535,
1199 ingress_control: ctx.ingress_control,
1200 ia_freq: 0.0,
1201 started: crate::time::now(),
1202 };
1203 let writer = start_client(cfg, ctx.tx)?;
1204 Ok(StartResult::Simple {
1205 id,
1206 info,
1207 writer,
1208 interface_type_name: "BackboneInterface".to_string(),
1209 })
1210 }
1211 BackboneMode::Server(mut cfg) => {
1212 cfg.ingress_control = ctx.ingress_control;
1213 start(cfg, ctx.tx, ctx.next_dynamic_id)?;
1214 Ok(StartResult::Listener { control: None })
1215 }
1216 }
1217 }
1218}
1219
1220pub(crate) fn runtime_handle_from_mode(mode: &BackboneMode) -> Option<BackboneRuntimeConfigHandle> {
1221 match mode {
1222 BackboneMode::Server(config) => Some(BackboneRuntimeConfigHandle {
1223 interface_name: config.name.clone(),
1224 runtime: Arc::clone(&config.runtime),
1225 startup: BackboneServerRuntime::from_config(config),
1226 }),
1227 BackboneMode::Client(_) => None,
1228 }
1229}
1230
1231pub(crate) fn peer_state_handle_from_mode(mode: &BackboneMode) -> Option<BackbonePeerStateHandle> {
1232 match mode {
1233 BackboneMode::Server(config) => Some(BackbonePeerStateHandle {
1234 interface_id: config.interface_id,
1235 interface_name: config.name.clone(),
1236 peer_state: Arc::clone(&config.peer_state),
1237 }),
1238 BackboneMode::Client(_) => None,
1239 }
1240}
1241
1242pub(crate) fn client_runtime_handle_from_mode(
1243 mode: &BackboneMode,
1244) -> Option<BackboneClientRuntimeConfigHandle> {
1245 match mode {
1246 BackboneMode::Client(config) => Some(BackboneClientRuntimeConfigHandle {
1247 interface_name: config.name.clone(),
1248 runtime: Arc::clone(&config.runtime),
1249 startup: BackboneClientRuntime::from_config(config),
1250 }),
1251 BackboneMode::Server(_) => None,
1252 }
1253}
1254
1255pub(crate) fn client_config_from_mode(mode: &BackboneMode) -> Option<BackboneClientConfig> {
1256 match mode {
1257 BackboneMode::Client(config) => Some(config.clone()),
1258 BackboneMode::Server(_) => None,
1259 }
1260}
1261
1262#[cfg(test)]
1263mod tests {
1264 use super::*;
1265 use std::sync::mpsc;
1266 use std::time::Duration;
1267
1268 fn find_free_port() -> u16 {
1269 TcpListener::bind("127.0.0.1:0")
1270 .unwrap()
1271 .local_addr()
1272 .unwrap()
1273 .port()
1274 }
1275
1276 fn recv_non_peer_event(
1277 rx: &mpsc::Receiver<Event>,
1278 timeout: Duration,
1279 ) -> Result<Event, mpsc::RecvTimeoutError> {
1280 let deadline = Instant::now() + timeout;
1281 loop {
1282 let remaining = deadline.saturating_duration_since(Instant::now());
1283 if remaining.is_zero() {
1284 return Err(mpsc::RecvTimeoutError::Timeout);
1285 }
1286 let event = rx.recv_timeout(remaining)?;
1287 match event {
1288 Event::BackbonePeerConnected { .. }
1289 | Event::BackbonePeerDisconnected { .. }
1290 | Event::BackbonePeerIdleTimeout { .. }
1291 | Event::BackbonePeerWriteStall { .. }
1292 | Event::BackbonePeerPenalty { .. } => continue,
1293 other => return Ok(other),
1294 }
1295 }
1296 }
1297
1298 fn make_server_config(
1299 port: u16,
1300 interface_id: u64,
1301 max_connections: Option<usize>,
1302 idle_timeout: Option<Duration>,
1303 write_stall_timeout: Option<Duration>,
1304 abuse: BackboneAbuseConfig,
1305 ) -> BackboneConfig {
1306 let mut config = BackboneConfig {
1307 name: "test-backbone".into(),
1308 listen_ip: "127.0.0.1".into(),
1309 listen_port: port,
1310 interface_id: InterfaceId(interface_id),
1311 max_connections,
1312 idle_timeout,
1313 write_stall_timeout,
1314 abuse,
1315 ingress_control: IngressControlConfig::enabled(),
1316 runtime: Arc::new(Mutex::new(BackboneServerRuntime {
1317 max_connections: None,
1318 idle_timeout: None,
1319 write_stall_timeout: None,
1320 abuse: BackboneAbuseConfig::default(),
1321 })),
1322 peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
1323 };
1324 let startup = BackboneServerRuntime::from_config(&config);
1325 config.runtime = Arc::new(Mutex::new(startup));
1326 config
1327 }
1328
1329 #[test]
1330 fn backbone_accept_connection() {
1331 let port = find_free_port();
1332 let (tx, rx) = crate::event::channel();
1333 let next_id = Arc::new(AtomicU64::new(8000));
1334
1335 let config = make_server_config(port, 80, None, None, None, BackboneAbuseConfig::default());
1336
1337 start(config, tx, next_id).unwrap();
1338 thread::sleep(Duration::from_millis(50));
1339
1340 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1341
1342 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1343 match event {
1344 Event::InterfaceUp(id, writer, info) => {
1345 assert_eq!(id, InterfaceId(8000));
1346 assert!(writer.is_some());
1347 assert!(info.is_some());
1348 let info = info.unwrap();
1349 assert!(info.out_capable);
1350 assert!(info.in_capable);
1351 }
1352 other => panic!("expected InterfaceUp, got {:?}", other),
1353 }
1354 }
1355
1356 #[test]
1357 fn backbone_receive_frame() {
1358 let port = find_free_port();
1359 let (tx, rx) = crate::event::channel();
1360 let next_id = Arc::new(AtomicU64::new(8100));
1361
1362 let config = make_server_config(port, 81, None, None, None, BackboneAbuseConfig::default());
1363
1364 start(config, tx, next_id).unwrap();
1365 thread::sleep(Duration::from_millis(50));
1366
1367 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1368
1369 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1371
1372 let payload: Vec<u8> = (0..32).collect();
1374 client.write_all(&hdlc::frame(&payload)).unwrap();
1375
1376 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1377 match event {
1378 Event::Frame { interface_id, data } => {
1379 assert_eq!(interface_id, InterfaceId(8100));
1380 assert_eq!(data, payload);
1381 }
1382 other => panic!("expected Frame, got {:?}", other),
1383 }
1384 }
1385
1386 #[test]
1387 fn backbone_send_to_client() {
1388 let port = find_free_port();
1389 let (tx, rx) = crate::event::channel();
1390 let next_id = Arc::new(AtomicU64::new(8200));
1391
1392 let config = make_server_config(port, 82, None, None, None, BackboneAbuseConfig::default());
1393
1394 start(config, tx, next_id).unwrap();
1395 thread::sleep(Duration::from_millis(50));
1396
1397 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1398 client
1399 .set_read_timeout(Some(Duration::from_secs(2)))
1400 .unwrap();
1401
1402 let event = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1404 let mut writer = match event {
1405 Event::InterfaceUp(_, Some(w), _) => w,
1406 other => panic!("expected InterfaceUp with writer, got {:?}", other),
1407 };
1408
1409 let payload: Vec<u8> = (0..24).collect();
1411 writer.send_frame(&payload).unwrap();
1412
1413 let mut buf = [0u8; 256];
1415 let n = client.read(&mut buf).unwrap();
1416 let expected = hdlc::frame(&payload);
1417 assert_eq!(&buf[..n], &expected[..]);
1418 }
1419
1420 #[test]
1421 fn backbone_multiple_clients() {
1422 let port = find_free_port();
1423 let (tx, rx) = crate::event::channel();
1424 let next_id = Arc::new(AtomicU64::new(8300));
1425
1426 let config = make_server_config(port, 83, None, None, None, BackboneAbuseConfig::default());
1427
1428 start(config, tx, next_id).unwrap();
1429 thread::sleep(Duration::from_millis(50));
1430
1431 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1432 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1433
1434 let mut ids = Vec::new();
1435 for _ in 0..2 {
1436 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1437 match event {
1438 Event::InterfaceUp(id, _, _) => ids.push(id),
1439 other => panic!("expected InterfaceUp, got {:?}", other),
1440 }
1441 }
1442
1443 assert_eq!(ids.len(), 2);
1444 assert_ne!(ids[0], ids[1]);
1445 }
1446
1447 #[test]
1448 fn backbone_client_disconnect() {
1449 let port = find_free_port();
1450 let (tx, rx) = crate::event::channel();
1451 let next_id = Arc::new(AtomicU64::new(8400));
1452
1453 let config = make_server_config(port, 84, None, None, None, BackboneAbuseConfig::default());
1454
1455 start(config, tx, next_id).unwrap();
1456 thread::sleep(Duration::from_millis(50));
1457
1458 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1459
1460 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1462
1463 drop(client);
1465
1466 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1468 assert!(
1469 matches!(event, Event::InterfaceDown(InterfaceId(8400))),
1470 "expected InterfaceDown(8400), got {:?}",
1471 event
1472 );
1473 }
1474
1475 #[test]
1476 fn backbone_epoll_multiplexing() {
1477 let port = find_free_port();
1478 let (tx, rx) = crate::event::channel();
1479 let next_id = Arc::new(AtomicU64::new(8500));
1480
1481 let config = make_server_config(port, 85, None, None, None, BackboneAbuseConfig::default());
1482
1483 start(config, tx, next_id).unwrap();
1484 thread::sleep(Duration::from_millis(50));
1485
1486 let mut client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1487 let mut client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1488
1489 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1491 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1492
1493 let payload1: Vec<u8> = (0..24).collect();
1495 let payload2: Vec<u8> = (100..130).collect();
1496 client1.write_all(&hdlc::frame(&payload1)).unwrap();
1497 client2.write_all(&hdlc::frame(&payload2)).unwrap();
1498
1499 let mut received = Vec::new();
1501 for _ in 0..2 {
1502 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1503 match event {
1504 Event::Frame { data, .. } => received.push(data),
1505 other => panic!("expected Frame, got {:?}", other),
1506 }
1507 }
1508 assert!(received.contains(&payload1));
1509 assert!(received.contains(&payload2));
1510 }
1511
1512 #[test]
1513 fn backbone_bind_port() {
1514 let port = find_free_port();
1515 let (tx, _rx) = crate::event::channel();
1516 let next_id = Arc::new(AtomicU64::new(8600));
1517
1518 let config = make_server_config(port, 86, None, None, None, BackboneAbuseConfig::default());
1519
1520 start(config, tx, next_id).unwrap();
1522 }
1523
1524 #[test]
1525 fn backbone_hdlc_fragmented() {
1526 let port = find_free_port();
1527 let (tx, rx) = crate::event::channel();
1528 let next_id = Arc::new(AtomicU64::new(8700));
1529
1530 let config = make_server_config(port, 87, None, None, None, BackboneAbuseConfig::default());
1531
1532 start(config, tx, next_id).unwrap();
1533 thread::sleep(Duration::from_millis(50));
1534
1535 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1536 client.set_nodelay(true).unwrap();
1537
1538 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1540
1541 let payload: Vec<u8> = (0..32).collect();
1543 let framed = hdlc::frame(&payload);
1544 let mid = framed.len() / 2;
1545
1546 client.write_all(&framed[..mid]).unwrap();
1547 thread::sleep(Duration::from_millis(50));
1548 client.write_all(&framed[mid..]).unwrap();
1549
1550 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1552 match event {
1553 Event::Frame { data, .. } => {
1554 assert_eq!(data, payload);
1555 }
1556 other => panic!("expected Frame, got {:?}", other),
1557 }
1558 }
1559
1560 fn make_client_config(port: u16, id: u64) -> BackboneClientConfig {
1565 BackboneClientConfig {
1566 name: format!("test-bb-client-{}", port),
1567 target_host: "127.0.0.1".into(),
1568 target_port: port,
1569 interface_id: InterfaceId(id),
1570 reconnect_wait: Duration::from_millis(100),
1571 max_reconnect_tries: Some(2),
1572 connect_timeout: Duration::from_secs(2),
1573 transport_identity: None,
1574 runtime: Arc::new(Mutex::new(BackboneClientRuntime {
1575 reconnect_wait: Duration::from_millis(100),
1576 max_reconnect_tries: Some(2),
1577 connect_timeout: Duration::from_secs(2),
1578 })),
1579 }
1580 }
1581
1582 #[test]
1583 fn backbone_client_connect() {
1584 let port = find_free_port();
1585 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1586 let (tx, rx) = crate::event::channel();
1587
1588 let config = make_client_config(port, 9000);
1589 let _writer = start_client(config, tx).unwrap();
1590
1591 let _server_stream = listener.accept().unwrap();
1592
1593 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1594 assert!(matches!(event, Event::InterfaceUp(InterfaceId(9000), _, _)));
1595 }
1596
1597 #[test]
1598 fn backbone_client_receive_frame() {
1599 let port = find_free_port();
1600 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1601 let (tx, rx) = crate::event::channel();
1602
1603 let config = make_client_config(port, 9100);
1604 let _writer = start_client(config, tx).unwrap();
1605
1606 let (mut server_stream, _) = listener.accept().unwrap();
1607
1608 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1610
1611 let payload: Vec<u8> = (0..32).collect();
1613 server_stream.write_all(&hdlc::frame(&payload)).unwrap();
1614
1615 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1616 match event {
1617 Event::Frame { interface_id, data } => {
1618 assert_eq!(interface_id, InterfaceId(9100));
1619 assert_eq!(data, payload);
1620 }
1621 other => panic!("expected Frame, got {:?}", other),
1622 }
1623 }
1624
1625 #[test]
1626 fn backbone_client_send_frame() {
1627 let port = find_free_port();
1628 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1629 let (tx, _rx) = crate::event::channel();
1630
1631 let config = make_client_config(port, 9200);
1632 let mut writer = start_client(config, tx).unwrap();
1633
1634 let (mut server_stream, _) = listener.accept().unwrap();
1635 server_stream
1636 .set_read_timeout(Some(Duration::from_secs(2)))
1637 .unwrap();
1638
1639 let payload: Vec<u8> = (0..24).collect();
1640 writer.send_frame(&payload).unwrap();
1641
1642 let mut buf = [0u8; 256];
1643 let n = server_stream.read(&mut buf).unwrap();
1644 let expected = hdlc::frame(&payload);
1645 assert_eq!(&buf[..n], &expected[..]);
1646 }
1647
1648 #[test]
1649 fn backbone_max_connections_rejects_excess() {
1650 let port = find_free_port();
1651 let (tx, rx) = crate::event::channel();
1652 let next_id = Arc::new(AtomicU64::new(8800));
1653
1654 let config = make_server_config(
1655 port,
1656 88,
1657 Some(2),
1658 None,
1659 None,
1660 BackboneAbuseConfig::default(),
1661 );
1662
1663 start(config, tx, next_id).unwrap();
1664 thread::sleep(Duration::from_millis(50));
1665
1666 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1668 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1669
1670 for _ in 0..2 {
1672 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1673 assert!(matches!(event, Event::InterfaceUp(_, _, _)));
1674 }
1675
1676 let client3 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1678 client3
1679 .set_read_timeout(Some(Duration::from_millis(500)))
1680 .unwrap();
1681
1682 thread::sleep(Duration::from_millis(100));
1684
1685 let result = recv_non_peer_event(&rx, Duration::from_millis(500));
1687 assert!(
1688 result.is_err(),
1689 "expected no InterfaceUp for rejected connection, got {:?}",
1690 result
1691 );
1692 }
1693
1694 #[test]
1695 fn backbone_max_connections_allows_after_disconnect() {
1696 let port = find_free_port();
1697 let (tx, rx) = crate::event::channel();
1698 let next_id = Arc::new(AtomicU64::new(8900));
1699
1700 let config = make_server_config(
1701 port,
1702 89,
1703 Some(1),
1704 None,
1705 None,
1706 BackboneAbuseConfig::default(),
1707 );
1708
1709 start(config, tx, next_id).unwrap();
1710 thread::sleep(Duration::from_millis(50));
1711
1712 let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1714 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1715 assert!(matches!(event, Event::InterfaceUp(_, _, _)));
1716
1717 drop(client1);
1719
1720 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1722 assert!(matches!(event, Event::InterfaceDown(_)));
1723
1724 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1726 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1727 assert!(
1728 matches!(event, Event::InterfaceUp(_, _, _)),
1729 "expected InterfaceUp after slot freed, got {:?}",
1730 event
1731 );
1732 }
1733
1734 #[test]
1735 fn backbone_client_reconnect() {
1736 let port = find_free_port();
1737 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1738 listener.set_nonblocking(false).unwrap();
1739 let (tx, rx) = crate::event::channel();
1740
1741 let config = make_client_config(port, 9300);
1742 let _writer = start_client(config, tx).unwrap();
1743
1744 let (server_stream, _) = listener.accept().unwrap();
1746
1747 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1749
1750 drop(server_stream);
1751
1752 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1754 assert!(matches!(event, Event::InterfaceDown(InterfaceId(9300))));
1755
1756 let _server_stream2 = listener.accept().unwrap();
1758
1759 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1761 assert!(matches!(event, Event::InterfaceUp(InterfaceId(9300), _, _)));
1762 }
1763
1764 #[test]
1765 fn backbone_idle_timeout_disconnects_silent_client() {
1766 let port = find_free_port();
1767 let (tx, rx) = crate::event::channel();
1768 let next_id = Arc::new(AtomicU64::new(9400));
1769
1770 let config = make_server_config(
1771 port,
1772 94,
1773 None,
1774 Some(Duration::from_millis(150)),
1775 None,
1776 BackboneAbuseConfig::default(),
1777 );
1778
1779 start(config, tx, next_id).unwrap();
1780 thread::sleep(Duration::from_millis(50));
1781
1782 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1783
1784 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1785 let client_id = match event {
1786 Event::InterfaceUp(id, _, _) => id,
1787 other => panic!("expected InterfaceUp, got {:?}", other),
1788 };
1789
1790 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1791 assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1792 }
1793
1794 #[test]
1795 fn backbone_idle_timeout_ignores_client_after_data() {
1796 let port = find_free_port();
1797 let (tx, rx) = crate::event::channel();
1798 let next_id = Arc::new(AtomicU64::new(9500));
1799
1800 let config = make_server_config(
1801 port,
1802 95,
1803 None,
1804 Some(Duration::from_millis(200)),
1805 None,
1806 BackboneAbuseConfig::default(),
1807 );
1808
1809 start(config, tx, next_id).unwrap();
1810 thread::sleep(Duration::from_millis(50));
1811
1812 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1813
1814 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1815 let client_id = match event {
1816 Event::InterfaceUp(id, _, _) => id,
1817 other => panic!("expected InterfaceUp, got {:?}", other),
1818 };
1819
1820 client.write_all(&hdlc::frame(&[1u8; 24])).unwrap();
1821
1822 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1823 match event {
1824 Event::Frame { interface_id, data } => {
1825 assert_eq!(interface_id, client_id);
1826 assert_eq!(data, vec![1u8; 24]);
1827 }
1828 other => panic!("expected Frame, got {:?}", other),
1829 }
1830
1831 let result = recv_non_peer_event(&rx, Duration::from_millis(500));
1832 assert!(
1833 result.is_err(),
1834 "expected no InterfaceDown after client sent data, got {:?}",
1835 result
1836 );
1837 }
1838
1839 #[test]
1840 fn backbone_runtime_idle_timeout_updates_live() {
1841 let port = find_free_port();
1842 let (tx, rx) = crate::event::channel();
1843 let next_id = Arc::new(AtomicU64::new(9650));
1844
1845 let config = make_server_config(port, 97, None, None, None, BackboneAbuseConfig::default());
1846 let runtime = Arc::clone(&config.runtime);
1847
1848 start(config, tx, next_id).unwrap();
1849 thread::sleep(Duration::from_millis(50));
1850
1851 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1852 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1853 let client_id = match event {
1854 Event::InterfaceUp(id, _, _) => id,
1855 other => panic!("expected InterfaceUp, got {:?}", other),
1856 };
1857
1858 {
1859 let mut runtime = runtime.lock().unwrap();
1860 runtime.idle_timeout = Some(Duration::from_millis(150));
1861 }
1862
1863 let event = recv_non_peer_event(&rx, Duration::from_secs(4)).unwrap();
1864 assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1865 }
1866
1867 #[test]
1868 fn backbone_write_stall_timeout_disconnects_unwritable_client() {
1869 let port = find_free_port();
1870 let (tx, rx) = crate::event::channel();
1871 let next_id = Arc::new(AtomicU64::new(9660));
1872
1873 let config = make_server_config(
1874 port,
1875 98,
1876 None,
1877 None,
1878 Some(Duration::from_millis(50)),
1879 BackboneAbuseConfig::default(),
1880 );
1881
1882 start(config, tx, next_id).unwrap();
1883 thread::sleep(Duration::from_millis(50));
1884
1885 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1886 client
1887 .set_read_timeout(Some(Duration::from_millis(100)))
1888 .unwrap();
1889 let sock = SockRef::from(&client);
1890 sock.set_recv_buffer_size(4096).ok();
1891
1892 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1893 let (client_id, mut writer) = match event {
1894 Event::InterfaceUp(id, Some(writer), _) => (id, writer),
1895 other => panic!("expected InterfaceUp with writer, got {:?}", other),
1896 };
1897
1898 let payload = vec![0x55; 512 * 1024];
1899 let deadline = Instant::now() + Duration::from_secs(3);
1900 let mut stalled = false;
1901 while Instant::now() < deadline {
1902 match writer.send_frame(&payload) {
1903 Ok(()) => {}
1904 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1905 thread::sleep(Duration::from_millis(10));
1906 }
1907 Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {
1908 stalled = true;
1909 break;
1910 }
1911 Err(e) => panic!("unexpected send error: {}", e),
1912 }
1913 }
1914
1915 assert!(stalled, "expected writer to time out on persistent stall");
1916 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1917 assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1918 }
1919
1920 fn wait_for<F>(rx: &mpsc::Receiver<Event>, timeout: Duration, mut pred: F) -> Option<Event>
1922 where
1923 F: FnMut(&Event) -> bool,
1924 {
1925 let deadline = Instant::now() + timeout;
1926 loop {
1927 let remaining = deadline.saturating_duration_since(Instant::now());
1928 if remaining.is_zero() {
1929 return None;
1930 }
1931 match rx.recv_timeout(remaining) {
1932 Ok(event) if pred(&event) => return Some(event),
1933 Ok(_) => continue,
1934 Err(_) => return None,
1935 }
1936 }
1937 }
1938
1939 #[test]
1940 fn backbone_write_stall_emits_peer_events() {
1941 let port = find_free_port();
1942 let (tx, rx) = crate::event::channel();
1943 let next_id = Arc::new(AtomicU64::new(9700));
1944
1945 let config = make_server_config(
1946 port,
1947 97,
1948 None,
1949 None,
1950 Some(Duration::from_millis(50)), BackboneAbuseConfig::default(),
1952 );
1953
1954 start(config, tx, next_id).unwrap();
1955 thread::sleep(Duration::from_millis(50));
1956
1957 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1959 client
1960 .set_read_timeout(Some(Duration::from_millis(100)))
1961 .unwrap();
1962 let sock = SockRef::from(&client);
1963 sock.set_recv_buffer_size(4096).ok();
1964
1965 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1967 let mut writer = match event {
1968 Event::InterfaceUp(_, Some(w), _) => w,
1969 other => panic!("expected InterfaceUp with writer, got {:?}", other),
1970 };
1971
1972 let payload = vec![0x55; 512 * 1024];
1974 let deadline = Instant::now() + Duration::from_secs(3);
1975 while Instant::now() < deadline {
1976 match writer.send_frame(&payload) {
1977 Ok(()) | Err(_) => {
1978 if Instant::now() + Duration::from_millis(10) > deadline {
1979 break;
1980 }
1981 thread::sleep(Duration::from_millis(10));
1982 }
1983 }
1984 }
1985
1986 let stall_event = wait_for(&rx, Duration::from_secs(3), |e| {
1988 matches!(e, Event::BackbonePeerWriteStall { .. })
1989 });
1990 assert!(
1991 stall_event.is_some(),
1992 "expected BackbonePeerWriteStall event"
1993 );
1994 }
1995
1996 #[test]
1997 fn backbone_blacklisted_peer_rejected_on_connect() {
1998 let port = find_free_port();
1999 let (tx, rx) = crate::event::channel();
2000 let next_id = Arc::new(AtomicU64::new(9800));
2001
2002 let config = make_server_config(port, 98, None, None, None, BackboneAbuseConfig::default());
2003 let peer_state = config.peer_state.clone();
2004
2005 start(config, tx.clone(), next_id).unwrap();
2006 thread::sleep(Duration::from_millis(50));
2007
2008 let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
2010 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
2011 assert!(
2012 matches!(event, Event::InterfaceUp(_, _, _)),
2013 "first connection should succeed"
2014 );
2015 drop(client1);
2016
2017 thread::sleep(Duration::from_millis(100));
2019 while rx.try_recv().is_ok() {}
2020
2021 peer_state.lock().unwrap().blacklist(
2023 "127.0.0.1".parse().unwrap(),
2024 Duration::from_secs(60),
2025 "test blacklist".into(),
2026 );
2027
2028 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
2030 thread::sleep(Duration::from_millis(200));
2032
2033 let event = rx.try_recv();
2035 match event {
2036 Ok(Event::InterfaceUp(_, _, _)) => {
2037 panic!("blacklisted peer should not get InterfaceUp")
2038 }
2039 _ => {} }
2041 }
2042
2043 #[test]
2044 fn backbone_parse_config_reads_abuse_settings() {
2045 let factory = BackboneInterfaceFactory;
2046 let mut params = HashMap::new();
2047 params.insert("listen_ip".into(), "127.0.0.1".into());
2048 params.insert("listen_port".into(), "4242".into());
2049 params.insert("idle_timeout".into(), "15".into());
2050 params.insert("write_stall_timeout".into(), "45".into());
2051 params.insert("max_penalty_duration".into(), "3600".into());
2052
2053 let config = factory
2054 .parse_config("test-backbone", InterfaceId(97), ¶ms)
2055 .unwrap();
2056 let mode = *config.into_any().downcast::<BackboneMode>().unwrap();
2057
2058 match mode {
2059 BackboneMode::Server(config) => {
2060 assert_eq!(config.listen_ip, "127.0.0.1");
2061 assert_eq!(config.listen_port, 4242);
2062 assert_eq!(config.idle_timeout, Some(Duration::from_secs(15)));
2063 assert_eq!(config.write_stall_timeout, Some(Duration::from_secs(45)));
2064 assert_eq!(
2065 config.abuse.max_penalty_duration,
2066 Some(Duration::from_secs(3600))
2067 );
2068 }
2069 BackboneMode::Client(_) => panic!("expected server config"),
2070 }
2071 }
2072}