1use std::collections::HashMap;
13use std::hash::{BuildHasher, Hasher};
14use std::io::{self, Read, Write};
15use std::net::{IpAddr, Shutdown, TcpListener, TcpStream, ToSocketAddrs};
16use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
17use std::sync::{Arc, Mutex};
18use std::thread;
19use std::time::{Duration, Instant};
20
21use polling::{Event as PollEvent, Events, Poller};
22use socket2::{SockRef, TcpKeepalive};
23
24use rns_core::constants;
25use rns_core::transport::types::{IngressControlConfig, InterfaceId, InterfaceInfo};
26
27use crate::event::{Event, EventSender};
28use crate::hdlc;
29use crate::interface::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult, Writer};
30use crate::BackbonePeerStateEntry;
31
32#[allow(dead_code)]
34const HW_MTU: usize = 1_048_576;
35
36#[derive(Debug, Clone)]
38pub struct BackboneConfig {
39 pub name: String,
40 pub listen_ip: String,
41 pub listen_port: u16,
42 pub interface_id: InterfaceId,
43 pub max_connections: Option<usize>,
44 pub idle_timeout: Option<Duration>,
45 pub write_stall_timeout: Option<Duration>,
46 pub abuse: BackboneAbuseConfig,
47 pub ingress_control: IngressControlConfig,
48 pub runtime: Arc<Mutex<BackboneServerRuntime>>,
49 pub peer_state: Arc<Mutex<BackbonePeerMonitor>>,
50}
51
52#[derive(Debug, Clone, Default)]
54pub struct BackboneAbuseConfig {
55 pub max_penalty_duration: Option<Duration>,
56}
57
58#[derive(Debug, Clone)]
60pub struct BackboneServerRuntime {
61 pub max_connections: Option<usize>,
62 pub idle_timeout: Option<Duration>,
63 pub write_stall_timeout: Option<Duration>,
64 pub abuse: BackboneAbuseConfig,
65}
66
67impl BackboneServerRuntime {
68 pub fn from_config(config: &BackboneConfig) -> Self {
69 Self {
70 max_connections: config.max_connections,
71 idle_timeout: config.idle_timeout,
72 write_stall_timeout: config.write_stall_timeout,
73 abuse: config.abuse.clone(),
74 }
75 }
76}
77
78#[derive(Debug, Clone)]
79pub struct BackboneRuntimeConfigHandle {
80 pub interface_name: String,
81 pub runtime: Arc<Mutex<BackboneServerRuntime>>,
82 pub startup: BackboneServerRuntime,
83}
84
85#[derive(Debug, Clone)]
86pub struct BackbonePeerStateHandle {
87 pub interface_id: InterfaceId,
88 pub interface_name: String,
89 pub peer_state: Arc<Mutex<BackbonePeerMonitor>>,
90}
91
92impl Default for BackboneConfig {
93 fn default() -> Self {
94 let mut config = BackboneConfig {
95 name: String::new(),
96 listen_ip: "0.0.0.0".into(),
97 listen_port: 0,
98 interface_id: InterfaceId(0),
99 max_connections: None,
100 idle_timeout: None,
101 write_stall_timeout: None,
102 abuse: BackboneAbuseConfig::default(),
103 ingress_control: IngressControlConfig::enabled(),
104 runtime: Arc::new(Mutex::new(BackboneServerRuntime {
105 max_connections: None,
106 idle_timeout: None,
107 write_stall_timeout: None,
108 abuse: BackboneAbuseConfig::default(),
109 })),
110 peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
111 };
112 let startup = BackboneServerRuntime::from_config(&config);
113 config.runtime = Arc::new(Mutex::new(startup));
114 config
115 }
116}
117
118const MAX_PENDING_BYTES: usize = 512 * 1024;
121
122struct BackboneWriter {
124 stream: TcpStream,
125 runtime: Arc<Mutex<BackboneServerRuntime>>,
126 interface_name: String,
127 interface_id: InterfaceId,
128 event_tx: EventSender,
129 pending: Vec<u8>,
130 stall_started: Option<Instant>,
131 disconnect_notified: bool,
132 write_stall_flag: Arc<AtomicBool>,
133}
134
135impl Writer for BackboneWriter {
136 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
137 let write_stall_timeout = self.runtime.lock().unwrap().write_stall_timeout;
138 if !self.pending.is_empty() {
139 self.flush_pending(write_stall_timeout)?;
140 if !self.pending.is_empty() {
141 return Err(io::Error::new(
142 io::ErrorKind::WouldBlock,
143 "backbone writer still stalled",
144 ));
145 }
146 }
147
148 let frame = hdlc::frame(data);
149 self.write_buffer(&frame, write_stall_timeout)
150 }
151}
152
153impl BackboneWriter {
154 fn write_buffer(
155 &mut self,
156 data: &[u8],
157 write_stall_timeout: Option<Duration>,
158 ) -> io::Result<()> {
159 let mut written = 0usize;
160 while written < data.len() {
161 match self.stream.write(&data[written..]) {
162 Ok(0) => {
163 return Err(io::Error::new(
164 io::ErrorKind::WriteZero,
165 "backbone writer wrote zero bytes",
166 ))
167 }
168 Ok(n) => {
169 written += n;
170 self.stall_started = None;
171 }
172 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
173 let now = Instant::now();
174 let started = self.stall_started.get_or_insert(now);
175 if let Some(timeout) = write_stall_timeout {
176 if now.duration_since(*started) >= timeout {
177 return Err(self.disconnect_for_write_stall(timeout));
178 }
179 }
180 if self.pending.len() + data[written..].len() > MAX_PENDING_BYTES {
181 return Err(self.disconnect_for_write_stall(
182 write_stall_timeout.unwrap_or(Duration::from_secs(30)),
183 ));
184 }
185 self.pending.extend_from_slice(&data[written..]);
186 return Err(io::Error::new(
187 io::ErrorKind::WouldBlock,
188 "backbone writer would block",
189 ));
190 }
191 Err(e) => return Err(e),
192 }
193 }
194 Ok(())
195 }
196
197 fn flush_pending(&mut self, write_stall_timeout: Option<Duration>) -> io::Result<()> {
198 if self.pending.is_empty() {
199 return Ok(());
200 }
201
202 let pending = std::mem::take(&mut self.pending);
203 match self.write_buffer(&pending, write_stall_timeout) {
204 Ok(()) => Ok(()),
205 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(()),
206 Err(e) => Err(e),
207 }
208 }
209
210 fn disconnect_for_write_stall(&mut self, timeout: Duration) -> io::Error {
211 if !self.disconnect_notified {
212 log::warn!(
213 "[{}] backbone client {} disconnected due to write stall timeout ({:?})",
214 self.interface_name,
215 self.interface_id.0,
216 timeout
217 );
218 self.write_stall_flag.store(true, Ordering::Relaxed);
219 let _ = self.stream.shutdown(Shutdown::Both);
220 let _ = self.event_tx.send(Event::InterfaceDown(self.interface_id));
221 self.disconnect_notified = true;
222 }
223 io::Error::new(
224 io::ErrorKind::TimedOut,
225 format!("backbone writer stalled for {:?}", timeout),
226 )
227 }
228}
229
230pub fn start(config: BackboneConfig, tx: EventSender, next_id: Arc<AtomicU64>) -> io::Result<()> {
232 let addr = format!("{}:{}", config.listen_ip, config.listen_port);
233 let listener = TcpListener::bind(&addr)?;
234 listener.set_nonblocking(true)?;
235
236 log::info!(
237 "[{}] backbone server listening on {}",
238 config.name,
239 listener.local_addr().unwrap_or(addr.parse().unwrap())
240 );
241
242 let name = config.name.clone();
243 let server_interface_id = config.interface_id;
244 let runtime = Arc::clone(&config.runtime);
245 let peer_state = Arc::clone(&config.peer_state);
246 let ingress_control = config.ingress_control;
247 thread::Builder::new()
248 .name(format!("backbone-poll-{}", config.interface_id.0))
249 .spawn(move || {
250 if let Err(e) = poll_loop(
251 listener,
252 name,
253 server_interface_id,
254 tx,
255 next_id,
256 runtime,
257 peer_state,
258 ingress_control,
259 ) {
260 log::error!("backbone poll loop error: {}", e);
261 }
262 })?;
263
264 Ok(())
265}
266
267struct ClientState {
269 id: InterfaceId,
270 peer_ip: IpAddr,
271 peer_port: u16,
272 stream: TcpStream,
273 decoder: hdlc::Decoder,
274 connected_at: Instant,
275 has_received_data: bool,
276 write_stall_flag: Arc<AtomicBool>,
277}
278
279#[derive(Debug, Clone)]
280struct PeerBehaviorState {
281 blacklisted_until: Option<Instant>,
282 blacklist_reason: Option<String>,
283 reject_count: u64,
284 connected_count: usize,
285}
286
287impl PeerBehaviorState {
288 fn new() -> Self {
289 Self {
290 blacklisted_until: None,
291 blacklist_reason: None,
292 reject_count: 0,
293 connected_count: 0,
294 }
295 }
296}
297
298#[derive(Debug, Clone, Default)]
299pub struct BackbonePeerMonitor {
300 peers: HashMap<IpAddr, PeerBehaviorState>,
301}
302
303impl BackbonePeerMonitor {
304 pub fn new() -> Self {
305 Self {
306 peers: HashMap::new(),
307 }
308 }
309
310 fn upsert_snapshot(&mut self, peers: &HashMap<IpAddr, PeerBehaviorState>) {
311 let mut merged = self.peers.clone();
312
313 for (peer_ip, state) in peers {
314 let entry = merged
315 .entry(*peer_ip)
316 .or_insert_with(PeerBehaviorState::new);
317 entry.connected_count = state.connected_count;
318 entry.reject_count = state.reject_count;
319 if state.blacklisted_until.is_some() {
320 entry.blacklisted_until = state.blacklisted_until;
321 entry.blacklist_reason = state.blacklist_reason.clone();
322 }
323 }
324
325 merged.retain(|peer_ip, state| {
326 peers.contains_key(peer_ip)
327 || state.blacklisted_until.is_some()
328 || state.reject_count > 0
329 });
330 self.peers = merged;
331 }
332
333 fn sync_into(&self, peers: &mut HashMap<IpAddr, PeerBehaviorState>) {
334 for (peer_ip, state) in &self.peers {
335 let entry = peers.entry(*peer_ip).or_insert_with(PeerBehaviorState::new);
336 entry.blacklisted_until = state.blacklisted_until;
337 entry.blacklist_reason = state.blacklist_reason.clone();
338 entry.reject_count = state.reject_count;
339 }
340
341 peers.retain(|peer_ip, state| {
342 if state.connected_count > 0 {
343 return true;
344 }
345 self.peers.contains_key(peer_ip)
346 });
347 }
348
349 pub fn list(&self, interface_name: &str) -> Vec<BackbonePeerStateEntry> {
350 let now = Instant::now();
351 let mut entries: Vec<BackbonePeerStateEntry> = self
352 .peers
353 .iter()
354 .map(|(peer_ip, state)| BackbonePeerStateEntry {
355 interface_name: interface_name.to_string(),
356 peer_ip: *peer_ip,
357 connected_count: state.connected_count,
358 blacklisted_remaining_secs: state
359 .blacklisted_until
360 .and_then(|until| (until > now).then(|| (until - now).as_secs_f64())),
361 blacklist_reason: state.blacklist_reason.clone(),
362 reject_count: state.reject_count,
363 })
364 .collect();
365 entries.sort_by(|a, b| a.peer_ip.cmp(&b.peer_ip));
366 entries
367 }
368
369 pub fn clear(&mut self, peer_ip: IpAddr) -> bool {
370 self.peers.remove(&peer_ip).is_some()
371 }
372
373 pub fn blacklist(&mut self, peer_ip: IpAddr, duration: Duration, reason: String) -> bool {
374 let state = self
375 .peers
376 .entry(peer_ip)
377 .or_insert_with(PeerBehaviorState::new);
378 state.blacklisted_until = Some(Instant::now() + duration);
379 state.blacklist_reason = Some(reason);
380 true
381 }
382
383 #[cfg(test)]
384 pub fn seed_entry(&mut self, entry: BackbonePeerStateEntry) {
385 let mut state = PeerBehaviorState::new();
386 state.connected_count = entry.connected_count;
387 state.reject_count = entry.reject_count;
388 state.blacklist_reason = entry.blacklist_reason;
389 if let Some(remaining) = entry.blacklisted_remaining_secs {
390 state.blacklisted_until = Some(Instant::now() + Duration::from_secs_f64(remaining));
391 }
392 self.peers.insert(entry.peer_ip, state);
393 }
394}
395
396#[derive(Clone, Copy)]
397enum DisconnectReason {
398 RemoteClosed,
399 IdleTimeout,
400 WriteStall,
401}
402
403fn poll_loop(
405 listener: TcpListener,
406 name: String,
407 server_interface_id: InterfaceId,
408 tx: EventSender,
409 next_id: Arc<AtomicU64>,
410 runtime: Arc<Mutex<BackboneServerRuntime>>,
411 peer_state: Arc<Mutex<BackbonePeerMonitor>>,
412 ingress_control: IngressControlConfig,
413) -> io::Result<()> {
414 let poller = Poller::new()?;
415
416 const LISTENER_KEY: usize = 0;
417
418 unsafe { poller.add(&listener, PollEvent::readable(LISTENER_KEY))? };
420
421 let mut clients: HashMap<usize, ClientState> = HashMap::new();
422 let mut peers: HashMap<IpAddr, PeerBehaviorState> = HashMap::new();
423 let mut events = Events::new();
424 let mut next_key: usize = 1;
425
426 loop {
427 let runtime_snapshot = runtime.lock().unwrap().clone();
428 let max_connections = runtime_snapshot.max_connections;
429 let idle_timeout = runtime_snapshot.idle_timeout;
430 cleanup_peer_state(&mut peers);
431 {
432 let mut monitor = peer_state.lock().unwrap();
433 monitor.sync_into(&mut peers);
434 monitor.upsert_snapshot(&peers);
435 }
436
437 events.clear();
438 poller.wait(&mut events, Some(Duration::from_secs(1)))?;
439
440 for ev in events.iter() {
441 if ev.key == LISTENER_KEY {
442 loop {
444 match listener.accept() {
445 Ok((stream, peer_addr)) => {
446 let peer_ip = peer_addr.ip();
447 let peer_port = peer_addr.port();
448
449 if is_ip_blacklisted(&mut peers, peer_ip) {
450 if let Some(state) = peers.get_mut(&peer_ip) {
451 state.reject_count = state.reject_count.saturating_add(1);
452 }
453 peer_state.lock().unwrap().upsert_snapshot(&peers);
454 log::debug!("[{}] rejecting blacklisted peer {}", name, peer_addr);
455 drop(stream);
456 continue;
457 }
458
459 if let Some(max) = max_connections {
460 if clients.len() >= max {
461 log::warn!(
462 "[{}] max connections ({}) reached, rejecting {}",
463 name,
464 max,
465 peer_addr
466 );
467 drop(stream);
468 continue;
469 }
470 }
471
472 stream.set_nonblocking(true).ok();
473 stream.set_nodelay(true).ok();
474 set_tcp_keepalive(&stream).ok();
475
476 #[cfg(target_os = "macos")]
478 {
479 let sock = SockRef::from(&stream);
480 sock.set_nosigpipe(true).ok();
481 }
482
483 let key = next_key;
484 next_key += 1;
485 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
486
487 log::info!(
488 "[{}] backbone client connected: {} → id {}",
489 name,
490 peer_addr,
491 client_id.0
492 );
493
494 if let Err(e) = unsafe { poller.add(&stream, PollEvent::readable(key)) }
497 {
498 log::warn!("[{}] failed to add client to poller: {}", name, e);
499 continue; }
501
502 let writer_stream = match stream.try_clone() {
504 Ok(s) => s,
505 Err(e) => {
506 log::warn!("[{}] failed to clone client stream: {}", name, e);
507 let _ = poller.delete(&stream);
508 continue; }
510 };
511 let write_stall_flag = Arc::new(AtomicBool::new(false));
512 let writer: Box<dyn Writer> = Box::new(BackboneWriter {
513 stream: writer_stream,
514 runtime: Arc::clone(&runtime),
515 interface_name: name.clone(),
516 interface_id: client_id,
517 event_tx: tx.clone(),
518 pending: Vec::new(),
519 stall_started: None,
520 disconnect_notified: false,
521 write_stall_flag: Arc::clone(&write_stall_flag),
522 });
523
524 clients.insert(
525 key,
526 ClientState {
527 id: client_id,
528 peer_ip,
529 peer_port,
530 stream,
531 decoder: hdlc::Decoder::new(),
532 connected_at: Instant::now(),
533 has_received_data: false,
534 write_stall_flag,
535 },
536 );
537 peers
538 .entry(peer_ip)
539 .or_insert_with(PeerBehaviorState::new)
540 .connected_count += 1;
541 peer_state.lock().unwrap().upsert_snapshot(&peers);
542 let _ = tx.send(Event::BackbonePeerConnected {
543 server_interface_id,
544 peer_interface_id: client_id,
545 peer_ip,
546 peer_port,
547 });
548
549 let info = InterfaceInfo {
550 id: client_id,
551 name: format!("BackboneInterface/{}", client_id.0),
552 mode: constants::MODE_FULL,
553 out_capable: true,
554 in_capable: true,
555 bitrate: Some(1_000_000_000), announce_rate_target: None,
557 announce_rate_grace: 0,
558 announce_rate_penalty: 0.0,
559 announce_cap: constants::ANNOUNCE_CAP,
560 is_local_client: false,
561 wants_tunnel: false,
562 tunnel_id: None,
563 mtu: 65535,
564 ia_freq: 0.0,
565 started: 0.0,
566 ingress_control,
567 };
568
569 if tx
570 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
571 .is_err()
572 {
573 cleanup(&poller, &clients, &listener);
575 return Ok(());
576 }
577 }
578 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
579 Err(e) => {
580 log::warn!("[{}] accept error: {}", name, e);
581 break;
582 }
583 }
584 }
585 poller.modify(&listener, PollEvent::readable(LISTENER_KEY))?;
587 } else if clients.contains_key(&ev.key) {
588 let key = ev.key;
589 let mut should_remove = false;
590 let mut client_id = InterfaceId(0);
591
592 let mut buf = [0u8; 4096];
593 let read_result = {
594 let client = clients.get_mut(&key).unwrap();
595 client.stream.read(&mut buf)
596 };
597
598 match read_result {
599 Ok(0) | Err(_) => {
600 if let Some(c) = clients.get(&key) {
601 client_id = c.id;
602 }
603 should_remove = true;
604 }
605 Ok(n) => {
606 let client = clients.get_mut(&key).unwrap();
607 client_id = client.id;
608 client.has_received_data = true;
609 for frame in client.decoder.feed(&buf[..n]) {
610 if tx
611 .send(Event::Frame {
612 interface_id: client_id,
613 data: frame,
614 })
615 .is_err()
616 {
617 cleanup(&poller, &clients, &listener);
618 return Ok(());
619 }
620 }
621 }
622 }
623
624 if should_remove {
625 let reason = if clients
626 .get(&key)
627 .is_some_and(|c| c.write_stall_flag.load(Ordering::Relaxed))
628 {
629 DisconnectReason::WriteStall
630 } else {
631 DisconnectReason::RemoteClosed
632 };
633 disconnect_client(
634 &poller,
635 &mut clients,
636 &mut peers,
637 &name,
638 server_interface_id,
639 &tx,
640 &peer_state,
641 key,
642 client_id,
643 reason,
644 );
645 } else if let Some(client) = clients.get(&key) {
646 poller.modify(&client.stream, PollEvent::readable(key))?;
648 }
649 }
650 }
651
652 if let Some(timeout) = idle_timeout {
653 let now = Instant::now();
654 let timed_out: Vec<(usize, InterfaceId)> = clients
655 .iter()
656 .filter_map(|(&key, client)| {
657 if client.has_received_data || now.duration_since(client.connected_at) < timeout
658 {
659 None
660 } else {
661 Some((key, client.id))
662 }
663 })
664 .collect();
665
666 for (key, client_id) in timed_out {
667 disconnect_client(
668 &poller,
669 &mut clients,
670 &mut peers,
671 &name,
672 server_interface_id,
673 &tx,
674 &peer_state,
675 key,
676 client_id,
677 DisconnectReason::IdleTimeout,
678 );
679 }
680 }
681 }
682}
683
684fn cleanup_peer_state(peers: &mut HashMap<IpAddr, PeerBehaviorState>) {
685 let now = Instant::now();
686 peers.retain(|_, state| {
687 if matches!(state.blacklisted_until, Some(until) if now >= until) {
688 state.blacklisted_until = None;
689 state.blacklist_reason = None;
690 }
691 state.blacklisted_until.is_some() || state.connected_count > 0 || state.reject_count > 0
692 });
693}
694
695fn is_ip_blacklisted(peers: &mut HashMap<IpAddr, PeerBehaviorState>, peer_ip: IpAddr) -> bool {
696 let now = Instant::now();
697 if let Some(state) = peers.get_mut(&peer_ip) {
698 if let Some(until) = state.blacklisted_until {
699 if now < until {
700 return true;
701 }
702 state.blacklisted_until = None;
703 }
704 }
705 false
706}
707
708fn disconnect_client(
709 poller: &Poller,
710 clients: &mut HashMap<usize, ClientState>,
711 peers: &mut HashMap<IpAddr, PeerBehaviorState>,
712 name: &str,
713 server_interface_id: InterfaceId,
714 tx: &EventSender,
715 peer_state: &Arc<Mutex<BackbonePeerMonitor>>,
716 key: usize,
717 client_id: InterfaceId,
718 reason: DisconnectReason,
719) {
720 let Some(client) = clients.remove(&key) else {
721 return;
722 };
723
724 match reason {
725 DisconnectReason::RemoteClosed => {
726 log::info!("[{}] backbone client {} disconnected", name, client_id.0);
727 }
728 DisconnectReason::IdleTimeout => {
729 log::info!(
730 "[{}] backbone client {} disconnected due to idle timeout",
731 name,
732 client_id.0
733 );
734 }
735 DisconnectReason::WriteStall => {
736 }
738 }
739
740 let _ = poller.delete(&client.stream);
741 let connected_for = client.connected_at.elapsed();
743 let _ = tx.send(Event::BackbonePeerDisconnected {
744 server_interface_id,
745 peer_interface_id: client.id,
746 peer_ip: client.peer_ip,
747 peer_port: client.peer_port,
748 connected_for,
749 had_received_data: client.has_received_data,
750 });
751 match reason {
752 DisconnectReason::IdleTimeout => {
753 let _ = tx.send(Event::BackbonePeerIdleTimeout {
754 server_interface_id,
755 peer_interface_id: client.id,
756 peer_ip: client.peer_ip,
757 peer_port: client.peer_port,
758 connected_for,
759 });
760 }
761 DisconnectReason::WriteStall => {
762 let _ = tx.send(Event::BackbonePeerWriteStall {
763 server_interface_id,
764 peer_interface_id: client.id,
765 peer_ip: client.peer_ip,
766 peer_port: client.peer_port,
767 connected_for,
768 });
769 }
770 DisconnectReason::RemoteClosed => {}
771 }
772
773 if let Some(state) = peers.get_mut(&client.peer_ip) {
774 state.connected_count = state.connected_count.saturating_sub(1);
775 }
776 peer_state.lock().unwrap().upsert_snapshot(peers);
777 if !matches!(reason, DisconnectReason::WriteStall) {
779 let _ = tx.send(Event::InterfaceDown(client_id));
780 }
781}
782
783fn set_tcp_keepalive(stream: &TcpStream) -> io::Result<()> {
784 let sock = SockRef::from(stream);
785 let mut keepalive = TcpKeepalive::new()
786 .with_time(Duration::from_secs(5))
787 .with_interval(Duration::from_secs(2));
788 #[cfg(any(target_os = "linux", target_os = "macos"))]
789 {
790 keepalive = keepalive.with_retries(12);
791 }
792 sock.set_tcp_keepalive(&keepalive)
793}
794
795fn cleanup(poller: &Poller, clients: &HashMap<usize, ClientState>, listener: &TcpListener) {
796 for (_, client) in clients {
797 let _ = poller.delete(&client.stream);
798 }
799 let _ = poller.delete(listener);
800}
801
802#[derive(Debug, Clone)]
808pub struct BackboneClientConfig {
809 pub name: String,
810 pub target_host: String,
811 pub target_port: u16,
812 pub interface_id: InterfaceId,
813 pub reconnect_wait: Duration,
814 pub max_reconnect_tries: Option<u32>,
815 pub connect_timeout: Duration,
816 pub transport_identity: Option<String>,
817 pub runtime: Arc<Mutex<BackboneClientRuntime>>,
818}
819
820#[derive(Debug, Clone)]
821pub struct BackboneClientRuntime {
822 pub reconnect_wait: Duration,
823 pub max_reconnect_tries: Option<u32>,
824 pub connect_timeout: Duration,
825}
826
827impl BackboneClientRuntime {
828 pub fn from_config(config: &BackboneClientConfig) -> Self {
829 Self {
830 reconnect_wait: config.reconnect_wait,
831 max_reconnect_tries: config.max_reconnect_tries,
832 connect_timeout: config.connect_timeout,
833 }
834 }
835}
836
837#[derive(Debug, Clone)]
838pub struct BackboneClientRuntimeConfigHandle {
839 pub interface_name: String,
840 pub runtime: Arc<Mutex<BackboneClientRuntime>>,
841 pub startup: BackboneClientRuntime,
842}
843
844impl Default for BackboneClientConfig {
845 fn default() -> Self {
846 let mut config = BackboneClientConfig {
847 name: String::new(),
848 target_host: "127.0.0.1".into(),
849 target_port: 4242,
850 interface_id: InterfaceId(0),
851 reconnect_wait: Duration::from_secs(5),
852 max_reconnect_tries: None,
853 connect_timeout: Duration::from_secs(5),
854 transport_identity: None,
855 runtime: Arc::new(Mutex::new(BackboneClientRuntime {
856 reconnect_wait: Duration::from_secs(5),
857 max_reconnect_tries: None,
858 connect_timeout: Duration::from_secs(5),
859 })),
860 };
861 let startup = BackboneClientRuntime::from_config(&config);
862 config.runtime = Arc::new(Mutex::new(startup));
863 config
864 }
865}
866
867struct BackboneClientWriter {
869 stream: TcpStream,
870}
871
872impl Writer for BackboneClientWriter {
873 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
874 self.stream.write_all(&hdlc::frame(data))
875 }
876}
877
878fn try_connect_client(config: &BackboneClientConfig) -> io::Result<TcpStream> {
880 let runtime = config.runtime.lock().unwrap().clone();
881 let addr_str = format!("{}:{}", config.target_host, config.target_port);
882 let addr = addr_str
883 .to_socket_addrs()?
884 .next()
885 .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "no addresses resolved"))?;
886
887 let stream = TcpStream::connect_timeout(&addr, runtime.connect_timeout)?;
888 stream.set_nodelay(true)?;
889 set_tcp_keepalive(&stream).ok();
890
891 #[cfg(target_os = "macos")]
893 {
894 let sock = SockRef::from(&stream);
895 sock.set_nosigpipe(true).ok();
896 }
897
898 Ok(stream)
899}
900
901pub fn start_client(config: BackboneClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
903 let stream = try_connect_client(&config)?;
904 let reader_stream = stream.try_clone()?;
905 let writer_stream = stream.try_clone()?;
906
907 let id = config.interface_id;
908 log::info!(
909 "[{}] backbone client connected to {}:{}",
910 config.name,
911 config.target_host,
912 config.target_port
913 );
914
915 let _ = tx.send(Event::InterfaceUp(id, None, None));
917
918 thread::Builder::new()
919 .name(format!("backbone-client-{}", id.0))
920 .spawn(move || {
921 client_reader_loop(reader_stream, config, tx);
922 })?;
923
924 Ok(Box::new(BackboneClientWriter {
925 stream: writer_stream,
926 }))
927}
928
929fn client_reader_loop(mut stream: TcpStream, config: BackboneClientConfig, tx: EventSender) {
932 let id = config.interface_id;
933 let mut decoder = hdlc::Decoder::new();
934 let mut buf = [0u8; 4096];
935
936 loop {
937 match stream.read(&mut buf) {
938 Ok(0) => {
939 log::warn!("[{}] connection closed", config.name);
940 let _ = tx.send(Event::InterfaceDown(id));
941 match client_reconnect(&config, &tx) {
942 Some(new_stream) => {
943 stream = new_stream;
944 decoder = hdlc::Decoder::new();
945 continue;
946 }
947 None => {
948 log::error!("[{}] reconnection failed, giving up", config.name);
949 return;
950 }
951 }
952 }
953 Ok(n) => {
954 for frame in decoder.feed(&buf[..n]) {
955 if tx
956 .send(Event::Frame {
957 interface_id: id,
958 data: frame,
959 })
960 .is_err()
961 {
962 return;
963 }
964 }
965 }
966 Err(e) => {
967 log::warn!("[{}] read error: {}", config.name, e);
968 let _ = tx.send(Event::InterfaceDown(id));
969 match client_reconnect(&config, &tx) {
970 Some(new_stream) => {
971 stream = new_stream;
972 decoder = hdlc::Decoder::new();
973 continue;
974 }
975 None => {
976 log::error!("[{}] reconnection failed, giving up", config.name);
977 return;
978 }
979 }
980 }
981 }
982 }
983}
984
985const MAX_BACKOFF_SHIFT: u32 = 6;
988
989fn client_reconnect(config: &BackboneClientConfig, tx: &EventSender) -> Option<TcpStream> {
993 let mut attempts = 0u32;
994 loop {
995 let runtime = config.runtime.lock().unwrap().clone();
996
997 let shift = attempts.min(MAX_BACKOFF_SHIFT);
998 let backoff = runtime.reconnect_wait * 2u32.pow(shift);
999 let jitter_range = backoff / 4;
1001 let jitter = if jitter_range.as_nanos() > 0 {
1002 let offset = Duration::from_nanos(
1003 (std::hash::RandomState::new().build_hasher().finish()
1004 % jitter_range.as_nanos() as u64)
1005 * 2,
1006 );
1007 if offset > jitter_range {
1008 backoff + (offset - jitter_range)
1009 } else {
1010 backoff - (jitter_range - offset)
1011 }
1012 } else {
1013 backoff
1014 };
1015 thread::sleep(jitter);
1016
1017 attempts += 1;
1018
1019 if let Some(max) = runtime.max_reconnect_tries {
1020 if attempts > max {
1021 let _ = tx.send(Event::InterfaceDown(config.interface_id));
1022 return None;
1023 }
1024 }
1025
1026 log::info!(
1027 "[{}] reconnect attempt {} (backoff {:.1}s) ...",
1028 config.name,
1029 attempts,
1030 jitter.as_secs_f64(),
1031 );
1032
1033 match try_connect_client(config) {
1034 Ok(new_stream) => {
1035 let writer_stream = match new_stream.try_clone() {
1036 Ok(s) => s,
1037 Err(e) => {
1038 log::warn!("[{}] failed to clone stream: {}", config.name, e);
1039 continue;
1040 }
1041 };
1042 log::info!(
1043 "[{}] reconnected after {} attempt(s)",
1044 config.name,
1045 attempts
1046 );
1047 let new_writer: Box<dyn Writer> = Box::new(BackboneClientWriter {
1048 stream: writer_stream,
1049 });
1050 let _ = tx.send(Event::InterfaceUp(
1051 config.interface_id,
1052 Some(new_writer),
1053 None,
1054 ));
1055 return Some(new_stream);
1056 }
1057 Err(e) => {
1058 log::warn!("[{}] reconnect failed: {}", config.name, e);
1059 }
1060 }
1061 }
1062}
1063
1064pub(crate) enum BackboneMode {
1071 Server(BackboneConfig),
1072 Client(BackboneClientConfig),
1073}
1074
1075pub struct BackboneInterfaceFactory;
1081
1082fn parse_positive_duration_secs(params: &HashMap<String, String>, key: &str) -> Option<Duration> {
1083 params
1084 .get(key)
1085 .and_then(|v| v.parse::<f64>().ok())
1086 .filter(|v| *v > 0.0)
1087 .map(Duration::from_secs_f64)
1088}
1089
1090impl InterfaceFactory for BackboneInterfaceFactory {
1091 fn type_name(&self) -> &str {
1092 "BackboneInterface"
1093 }
1094
1095 fn parse_config(
1096 &self,
1097 name: &str,
1098 id: InterfaceId,
1099 params: &HashMap<String, String>,
1100 ) -> Result<Box<dyn InterfaceConfigData>, String> {
1101 if let Some(target_host) = params.get("remote").or_else(|| params.get("target_host")) {
1102 let target_host = target_host.clone();
1104 let target_port = params
1105 .get("target_port")
1106 .or_else(|| params.get("port"))
1107 .and_then(|v| v.parse().ok())
1108 .unwrap_or(4242);
1109 let transport_identity = params.get("transport_identity").cloned();
1110 Ok(Box::new(BackboneMode::Client(BackboneClientConfig {
1111 name: name.to_string(),
1112 target_host,
1113 target_port,
1114 interface_id: id,
1115 transport_identity,
1116 ..BackboneClientConfig::default()
1117 })))
1118 } else {
1119 let listen_ip = params
1121 .get("listen_ip")
1122 .or_else(|| params.get("device"))
1123 .cloned()
1124 .unwrap_or_else(|| "0.0.0.0".into());
1125 let listen_port = params
1126 .get("listen_port")
1127 .or_else(|| params.get("port"))
1128 .and_then(|v| v.parse().ok())
1129 .unwrap_or(4242);
1130 let max_connections = params.get("max_connections").and_then(|v| v.parse().ok());
1131 let idle_timeout = parse_positive_duration_secs(params, "idle_timeout");
1132 let write_stall_timeout = parse_positive_duration_secs(params, "write_stall_timeout");
1133 let abuse = BackboneAbuseConfig {
1134 max_penalty_duration: parse_positive_duration_secs(params, "max_penalty_duration"),
1135 };
1136 let mut config = BackboneConfig {
1137 name: name.to_string(),
1138 listen_ip,
1139 listen_port,
1140 interface_id: id,
1141 max_connections,
1142 idle_timeout,
1143 write_stall_timeout,
1144 abuse,
1145 ingress_control: IngressControlConfig::enabled(),
1146 runtime: Arc::new(Mutex::new(BackboneServerRuntime {
1147 max_connections: None,
1148 idle_timeout: None,
1149 write_stall_timeout: None,
1150 abuse: BackboneAbuseConfig::default(),
1151 })),
1152 peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
1153 };
1154 let startup = BackboneServerRuntime::from_config(&config);
1155 config.runtime = Arc::new(Mutex::new(startup));
1156 Ok(Box::new(BackboneMode::Server(config)))
1157 }
1158 }
1159
1160 fn start(
1161 &self,
1162 config: Box<dyn InterfaceConfigData>,
1163 ctx: StartContext,
1164 ) -> io::Result<StartResult> {
1165 let mode = *config.into_any().downcast::<BackboneMode>().map_err(|_| {
1166 io::Error::new(
1167 io::ErrorKind::InvalidData,
1168 "wrong config type for BackboneInterface",
1169 )
1170 })?;
1171
1172 match mode {
1173 BackboneMode::Client(cfg) => {
1174 let id = cfg.interface_id;
1175 let name = cfg.name.clone();
1176 let info = InterfaceInfo {
1177 id,
1178 name,
1179 mode: ctx.mode,
1180 out_capable: true,
1181 in_capable: true,
1182 bitrate: Some(1_000_000_000),
1183 announce_rate_target: None,
1184 announce_rate_grace: 0,
1185 announce_rate_penalty: 0.0,
1186 announce_cap: constants::ANNOUNCE_CAP,
1187 is_local_client: false,
1188 wants_tunnel: false,
1189 tunnel_id: None,
1190 mtu: 65535,
1191 ingress_control: ctx.ingress_control,
1192 ia_freq: 0.0,
1193 started: crate::time::now(),
1194 };
1195 let writer = start_client(cfg, ctx.tx)?;
1196 Ok(StartResult::Simple {
1197 id,
1198 info,
1199 writer,
1200 interface_type_name: "BackboneInterface".to_string(),
1201 })
1202 }
1203 BackboneMode::Server(mut cfg) => {
1204 cfg.ingress_control = ctx.ingress_control;
1205 start(cfg, ctx.tx, ctx.next_dynamic_id)?;
1206 Ok(StartResult::Listener { control: None })
1207 }
1208 }
1209 }
1210}
1211
1212pub(crate) fn runtime_handle_from_mode(mode: &BackboneMode) -> Option<BackboneRuntimeConfigHandle> {
1213 match mode {
1214 BackboneMode::Server(config) => Some(BackboneRuntimeConfigHandle {
1215 interface_name: config.name.clone(),
1216 runtime: Arc::clone(&config.runtime),
1217 startup: BackboneServerRuntime::from_config(config),
1218 }),
1219 BackboneMode::Client(_) => None,
1220 }
1221}
1222
1223pub(crate) fn peer_state_handle_from_mode(mode: &BackboneMode) -> Option<BackbonePeerStateHandle> {
1224 match mode {
1225 BackboneMode::Server(config) => Some(BackbonePeerStateHandle {
1226 interface_id: config.interface_id,
1227 interface_name: config.name.clone(),
1228 peer_state: Arc::clone(&config.peer_state),
1229 }),
1230 BackboneMode::Client(_) => None,
1231 }
1232}
1233
1234pub(crate) fn client_runtime_handle_from_mode(
1235 mode: &BackboneMode,
1236) -> Option<BackboneClientRuntimeConfigHandle> {
1237 match mode {
1238 BackboneMode::Client(config) => Some(BackboneClientRuntimeConfigHandle {
1239 interface_name: config.name.clone(),
1240 runtime: Arc::clone(&config.runtime),
1241 startup: BackboneClientRuntime::from_config(config),
1242 }),
1243 BackboneMode::Server(_) => None,
1244 }
1245}
1246
1247#[cfg(test)]
1248mod tests {
1249 use super::*;
1250 use std::sync::mpsc;
1251 use std::time::Duration;
1252
1253 fn find_free_port() -> u16 {
1254 TcpListener::bind("127.0.0.1:0")
1255 .unwrap()
1256 .local_addr()
1257 .unwrap()
1258 .port()
1259 }
1260
1261 fn recv_non_peer_event(
1262 rx: &mpsc::Receiver<Event>,
1263 timeout: Duration,
1264 ) -> Result<Event, mpsc::RecvTimeoutError> {
1265 let deadline = Instant::now() + timeout;
1266 loop {
1267 let remaining = deadline.saturating_duration_since(Instant::now());
1268 if remaining.is_zero() {
1269 return Err(mpsc::RecvTimeoutError::Timeout);
1270 }
1271 let event = rx.recv_timeout(remaining)?;
1272 match event {
1273 Event::BackbonePeerConnected { .. }
1274 | Event::BackbonePeerDisconnected { .. }
1275 | Event::BackbonePeerIdleTimeout { .. }
1276 | Event::BackbonePeerWriteStall { .. }
1277 | Event::BackbonePeerPenalty { .. } => continue,
1278 other => return Ok(other),
1279 }
1280 }
1281 }
1282
1283 fn make_server_config(
1284 port: u16,
1285 interface_id: u64,
1286 max_connections: Option<usize>,
1287 idle_timeout: Option<Duration>,
1288 write_stall_timeout: Option<Duration>,
1289 abuse: BackboneAbuseConfig,
1290 ) -> BackboneConfig {
1291 let mut config = BackboneConfig {
1292 name: "test-backbone".into(),
1293 listen_ip: "127.0.0.1".into(),
1294 listen_port: port,
1295 interface_id: InterfaceId(interface_id),
1296 max_connections,
1297 idle_timeout,
1298 write_stall_timeout,
1299 abuse,
1300 ingress_control: IngressControlConfig::enabled(),
1301 runtime: Arc::new(Mutex::new(BackboneServerRuntime {
1302 max_connections: None,
1303 idle_timeout: None,
1304 write_stall_timeout: None,
1305 abuse: BackboneAbuseConfig::default(),
1306 })),
1307 peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
1308 };
1309 let startup = BackboneServerRuntime::from_config(&config);
1310 config.runtime = Arc::new(Mutex::new(startup));
1311 config
1312 }
1313
1314 #[test]
1315 fn backbone_accept_connection() {
1316 let port = find_free_port();
1317 let (tx, rx) = crate::event::channel();
1318 let next_id = Arc::new(AtomicU64::new(8000));
1319
1320 let config = make_server_config(port, 80, None, None, None, BackboneAbuseConfig::default());
1321
1322 start(config, tx, next_id).unwrap();
1323 thread::sleep(Duration::from_millis(50));
1324
1325 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1326
1327 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1328 match event {
1329 Event::InterfaceUp(id, writer, info) => {
1330 assert_eq!(id, InterfaceId(8000));
1331 assert!(writer.is_some());
1332 assert!(info.is_some());
1333 let info = info.unwrap();
1334 assert!(info.out_capable);
1335 assert!(info.in_capable);
1336 }
1337 other => panic!("expected InterfaceUp, got {:?}", other),
1338 }
1339 }
1340
1341 #[test]
1342 fn backbone_receive_frame() {
1343 let port = find_free_port();
1344 let (tx, rx) = crate::event::channel();
1345 let next_id = Arc::new(AtomicU64::new(8100));
1346
1347 let config = make_server_config(port, 81, None, None, None, BackboneAbuseConfig::default());
1348
1349 start(config, tx, next_id).unwrap();
1350 thread::sleep(Duration::from_millis(50));
1351
1352 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1353
1354 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1356
1357 let payload: Vec<u8> = (0..32).collect();
1359 client.write_all(&hdlc::frame(&payload)).unwrap();
1360
1361 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1362 match event {
1363 Event::Frame { interface_id, data } => {
1364 assert_eq!(interface_id, InterfaceId(8100));
1365 assert_eq!(data, payload);
1366 }
1367 other => panic!("expected Frame, got {:?}", other),
1368 }
1369 }
1370
1371 #[test]
1372 fn backbone_send_to_client() {
1373 let port = find_free_port();
1374 let (tx, rx) = crate::event::channel();
1375 let next_id = Arc::new(AtomicU64::new(8200));
1376
1377 let config = make_server_config(port, 82, None, None, None, BackboneAbuseConfig::default());
1378
1379 start(config, tx, next_id).unwrap();
1380 thread::sleep(Duration::from_millis(50));
1381
1382 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1383 client
1384 .set_read_timeout(Some(Duration::from_secs(2)))
1385 .unwrap();
1386
1387 let event = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1389 let mut writer = match event {
1390 Event::InterfaceUp(_, Some(w), _) => w,
1391 other => panic!("expected InterfaceUp with writer, got {:?}", other),
1392 };
1393
1394 let payload: Vec<u8> = (0..24).collect();
1396 writer.send_frame(&payload).unwrap();
1397
1398 let mut buf = [0u8; 256];
1400 let n = client.read(&mut buf).unwrap();
1401 let expected = hdlc::frame(&payload);
1402 assert_eq!(&buf[..n], &expected[..]);
1403 }
1404
1405 #[test]
1406 fn backbone_multiple_clients() {
1407 let port = find_free_port();
1408 let (tx, rx) = crate::event::channel();
1409 let next_id = Arc::new(AtomicU64::new(8300));
1410
1411 let config = make_server_config(port, 83, None, None, None, BackboneAbuseConfig::default());
1412
1413 start(config, tx, next_id).unwrap();
1414 thread::sleep(Duration::from_millis(50));
1415
1416 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1417 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1418
1419 let mut ids = Vec::new();
1420 for _ in 0..2 {
1421 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1422 match event {
1423 Event::InterfaceUp(id, _, _) => ids.push(id),
1424 other => panic!("expected InterfaceUp, got {:?}", other),
1425 }
1426 }
1427
1428 assert_eq!(ids.len(), 2);
1429 assert_ne!(ids[0], ids[1]);
1430 }
1431
1432 #[test]
1433 fn backbone_client_disconnect() {
1434 let port = find_free_port();
1435 let (tx, rx) = crate::event::channel();
1436 let next_id = Arc::new(AtomicU64::new(8400));
1437
1438 let config = make_server_config(port, 84, None, None, None, BackboneAbuseConfig::default());
1439
1440 start(config, tx, next_id).unwrap();
1441 thread::sleep(Duration::from_millis(50));
1442
1443 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1444
1445 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1447
1448 drop(client);
1450
1451 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1453 assert!(
1454 matches!(event, Event::InterfaceDown(InterfaceId(8400))),
1455 "expected InterfaceDown(8400), got {:?}",
1456 event
1457 );
1458 }
1459
1460 #[test]
1461 fn backbone_epoll_multiplexing() {
1462 let port = find_free_port();
1463 let (tx, rx) = crate::event::channel();
1464 let next_id = Arc::new(AtomicU64::new(8500));
1465
1466 let config = make_server_config(port, 85, None, None, None, BackboneAbuseConfig::default());
1467
1468 start(config, tx, next_id).unwrap();
1469 thread::sleep(Duration::from_millis(50));
1470
1471 let mut client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1472 let mut client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1473
1474 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1476 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1477
1478 let payload1: Vec<u8> = (0..24).collect();
1480 let payload2: Vec<u8> = (100..130).collect();
1481 client1.write_all(&hdlc::frame(&payload1)).unwrap();
1482 client2.write_all(&hdlc::frame(&payload2)).unwrap();
1483
1484 let mut received = Vec::new();
1486 for _ in 0..2 {
1487 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1488 match event {
1489 Event::Frame { data, .. } => received.push(data),
1490 other => panic!("expected Frame, got {:?}", other),
1491 }
1492 }
1493 assert!(received.contains(&payload1));
1494 assert!(received.contains(&payload2));
1495 }
1496
1497 #[test]
1498 fn backbone_bind_port() {
1499 let port = find_free_port();
1500 let (tx, _rx) = crate::event::channel();
1501 let next_id = Arc::new(AtomicU64::new(8600));
1502
1503 let config = make_server_config(port, 86, None, None, None, BackboneAbuseConfig::default());
1504
1505 start(config, tx, next_id).unwrap();
1507 }
1508
1509 #[test]
1510 fn backbone_hdlc_fragmented() {
1511 let port = find_free_port();
1512 let (tx, rx) = crate::event::channel();
1513 let next_id = Arc::new(AtomicU64::new(8700));
1514
1515 let config = make_server_config(port, 87, None, None, None, BackboneAbuseConfig::default());
1516
1517 start(config, tx, next_id).unwrap();
1518 thread::sleep(Duration::from_millis(50));
1519
1520 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1521 client.set_nodelay(true).unwrap();
1522
1523 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1525
1526 let payload: Vec<u8> = (0..32).collect();
1528 let framed = hdlc::frame(&payload);
1529 let mid = framed.len() / 2;
1530
1531 client.write_all(&framed[..mid]).unwrap();
1532 thread::sleep(Duration::from_millis(50));
1533 client.write_all(&framed[mid..]).unwrap();
1534
1535 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1537 match event {
1538 Event::Frame { data, .. } => {
1539 assert_eq!(data, payload);
1540 }
1541 other => panic!("expected Frame, got {:?}", other),
1542 }
1543 }
1544
1545 fn make_client_config(port: u16, id: u64) -> BackboneClientConfig {
1550 BackboneClientConfig {
1551 name: format!("test-bb-client-{}", port),
1552 target_host: "127.0.0.1".into(),
1553 target_port: port,
1554 interface_id: InterfaceId(id),
1555 reconnect_wait: Duration::from_millis(100),
1556 max_reconnect_tries: Some(2),
1557 connect_timeout: Duration::from_secs(2),
1558 transport_identity: None,
1559 runtime: Arc::new(Mutex::new(BackboneClientRuntime {
1560 reconnect_wait: Duration::from_millis(100),
1561 max_reconnect_tries: Some(2),
1562 connect_timeout: Duration::from_secs(2),
1563 })),
1564 }
1565 }
1566
1567 #[test]
1568 fn backbone_client_connect() {
1569 let port = find_free_port();
1570 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1571 let (tx, rx) = crate::event::channel();
1572
1573 let config = make_client_config(port, 9000);
1574 let _writer = start_client(config, tx).unwrap();
1575
1576 let _server_stream = listener.accept().unwrap();
1577
1578 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1579 assert!(matches!(event, Event::InterfaceUp(InterfaceId(9000), _, _)));
1580 }
1581
1582 #[test]
1583 fn backbone_client_receive_frame() {
1584 let port = find_free_port();
1585 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1586 let (tx, rx) = crate::event::channel();
1587
1588 let config = make_client_config(port, 9100);
1589 let _writer = start_client(config, tx).unwrap();
1590
1591 let (mut server_stream, _) = listener.accept().unwrap();
1592
1593 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1595
1596 let payload: Vec<u8> = (0..32).collect();
1598 server_stream.write_all(&hdlc::frame(&payload)).unwrap();
1599
1600 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1601 match event {
1602 Event::Frame { interface_id, data } => {
1603 assert_eq!(interface_id, InterfaceId(9100));
1604 assert_eq!(data, payload);
1605 }
1606 other => panic!("expected Frame, got {:?}", other),
1607 }
1608 }
1609
1610 #[test]
1611 fn backbone_client_send_frame() {
1612 let port = find_free_port();
1613 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1614 let (tx, _rx) = crate::event::channel();
1615
1616 let config = make_client_config(port, 9200);
1617 let mut writer = start_client(config, tx).unwrap();
1618
1619 let (mut server_stream, _) = listener.accept().unwrap();
1620 server_stream
1621 .set_read_timeout(Some(Duration::from_secs(2)))
1622 .unwrap();
1623
1624 let payload: Vec<u8> = (0..24).collect();
1625 writer.send_frame(&payload).unwrap();
1626
1627 let mut buf = [0u8; 256];
1628 let n = server_stream.read(&mut buf).unwrap();
1629 let expected = hdlc::frame(&payload);
1630 assert_eq!(&buf[..n], &expected[..]);
1631 }
1632
1633 #[test]
1634 fn backbone_max_connections_rejects_excess() {
1635 let port = find_free_port();
1636 let (tx, rx) = crate::event::channel();
1637 let next_id = Arc::new(AtomicU64::new(8800));
1638
1639 let config = make_server_config(
1640 port,
1641 88,
1642 Some(2),
1643 None,
1644 None,
1645 BackboneAbuseConfig::default(),
1646 );
1647
1648 start(config, tx, next_id).unwrap();
1649 thread::sleep(Duration::from_millis(50));
1650
1651 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1653 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1654
1655 for _ in 0..2 {
1657 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1658 assert!(matches!(event, Event::InterfaceUp(_, _, _)));
1659 }
1660
1661 let client3 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1663 client3
1664 .set_read_timeout(Some(Duration::from_millis(500)))
1665 .unwrap();
1666
1667 thread::sleep(Duration::from_millis(100));
1669
1670 let result = recv_non_peer_event(&rx, Duration::from_millis(500));
1672 assert!(
1673 result.is_err(),
1674 "expected no InterfaceUp for rejected connection, got {:?}",
1675 result
1676 );
1677 }
1678
1679 #[test]
1680 fn backbone_max_connections_allows_after_disconnect() {
1681 let port = find_free_port();
1682 let (tx, rx) = crate::event::channel();
1683 let next_id = Arc::new(AtomicU64::new(8900));
1684
1685 let config = make_server_config(
1686 port,
1687 89,
1688 Some(1),
1689 None,
1690 None,
1691 BackboneAbuseConfig::default(),
1692 );
1693
1694 start(config, tx, next_id).unwrap();
1695 thread::sleep(Duration::from_millis(50));
1696
1697 let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1699 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1700 assert!(matches!(event, Event::InterfaceUp(_, _, _)));
1701
1702 drop(client1);
1704
1705 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1707 assert!(matches!(event, Event::InterfaceDown(_)));
1708
1709 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1711 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1712 assert!(
1713 matches!(event, Event::InterfaceUp(_, _, _)),
1714 "expected InterfaceUp after slot freed, got {:?}",
1715 event
1716 );
1717 }
1718
1719 #[test]
1720 fn backbone_client_reconnect() {
1721 let port = find_free_port();
1722 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1723 listener.set_nonblocking(false).unwrap();
1724 let (tx, rx) = crate::event::channel();
1725
1726 let config = make_client_config(port, 9300);
1727 let _writer = start_client(config, tx).unwrap();
1728
1729 let (server_stream, _) = listener.accept().unwrap();
1731
1732 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1734
1735 drop(server_stream);
1736
1737 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1739 assert!(matches!(event, Event::InterfaceDown(InterfaceId(9300))));
1740
1741 let _server_stream2 = listener.accept().unwrap();
1743
1744 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1746 assert!(matches!(event, Event::InterfaceUp(InterfaceId(9300), _, _)));
1747 }
1748
1749 #[test]
1750 fn backbone_idle_timeout_disconnects_silent_client() {
1751 let port = find_free_port();
1752 let (tx, rx) = crate::event::channel();
1753 let next_id = Arc::new(AtomicU64::new(9400));
1754
1755 let config = make_server_config(
1756 port,
1757 94,
1758 None,
1759 Some(Duration::from_millis(150)),
1760 None,
1761 BackboneAbuseConfig::default(),
1762 );
1763
1764 start(config, tx, next_id).unwrap();
1765 thread::sleep(Duration::from_millis(50));
1766
1767 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1768
1769 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1770 let client_id = match event {
1771 Event::InterfaceUp(id, _, _) => id,
1772 other => panic!("expected InterfaceUp, got {:?}", other),
1773 };
1774
1775 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1776 assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1777 }
1778
1779 #[test]
1780 fn backbone_idle_timeout_ignores_client_after_data() {
1781 let port = find_free_port();
1782 let (tx, rx) = crate::event::channel();
1783 let next_id = Arc::new(AtomicU64::new(9500));
1784
1785 let config = make_server_config(
1786 port,
1787 95,
1788 None,
1789 Some(Duration::from_millis(200)),
1790 None,
1791 BackboneAbuseConfig::default(),
1792 );
1793
1794 start(config, tx, next_id).unwrap();
1795 thread::sleep(Duration::from_millis(50));
1796
1797 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1798
1799 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1800 let client_id = match event {
1801 Event::InterfaceUp(id, _, _) => id,
1802 other => panic!("expected InterfaceUp, got {:?}", other),
1803 };
1804
1805 client.write_all(&hdlc::frame(&[1u8; 24])).unwrap();
1806
1807 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1808 match event {
1809 Event::Frame { interface_id, data } => {
1810 assert_eq!(interface_id, client_id);
1811 assert_eq!(data, vec![1u8; 24]);
1812 }
1813 other => panic!("expected Frame, got {:?}", other),
1814 }
1815
1816 let result = recv_non_peer_event(&rx, Duration::from_millis(500));
1817 assert!(
1818 result.is_err(),
1819 "expected no InterfaceDown after client sent data, got {:?}",
1820 result
1821 );
1822 }
1823
1824 #[test]
1825 fn backbone_runtime_idle_timeout_updates_live() {
1826 let port = find_free_port();
1827 let (tx, rx) = crate::event::channel();
1828 let next_id = Arc::new(AtomicU64::new(9650));
1829
1830 let config = make_server_config(port, 97, None, None, None, BackboneAbuseConfig::default());
1831 let runtime = Arc::clone(&config.runtime);
1832
1833 start(config, tx, next_id).unwrap();
1834 thread::sleep(Duration::from_millis(50));
1835
1836 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1837 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1838 let client_id = match event {
1839 Event::InterfaceUp(id, _, _) => id,
1840 other => panic!("expected InterfaceUp, got {:?}", other),
1841 };
1842
1843 {
1844 let mut runtime = runtime.lock().unwrap();
1845 runtime.idle_timeout = Some(Duration::from_millis(150));
1846 }
1847
1848 let event = recv_non_peer_event(&rx, Duration::from_secs(4)).unwrap();
1849 assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1850 }
1851
1852 #[test]
1853 fn backbone_write_stall_timeout_disconnects_unwritable_client() {
1854 let port = find_free_port();
1855 let (tx, rx) = crate::event::channel();
1856 let next_id = Arc::new(AtomicU64::new(9660));
1857
1858 let config = make_server_config(
1859 port,
1860 98,
1861 None,
1862 None,
1863 Some(Duration::from_millis(50)),
1864 BackboneAbuseConfig::default(),
1865 );
1866
1867 start(config, tx, next_id).unwrap();
1868 thread::sleep(Duration::from_millis(50));
1869
1870 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1871 client
1872 .set_read_timeout(Some(Duration::from_millis(100)))
1873 .unwrap();
1874 let sock = SockRef::from(&client);
1875 sock.set_recv_buffer_size(4096).ok();
1876
1877 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1878 let (client_id, mut writer) = match event {
1879 Event::InterfaceUp(id, Some(writer), _) => (id, writer),
1880 other => panic!("expected InterfaceUp with writer, got {:?}", other),
1881 };
1882
1883 let payload = vec![0x55; 512 * 1024];
1884 let deadline = Instant::now() + Duration::from_secs(3);
1885 let mut stalled = false;
1886 while Instant::now() < deadline {
1887 match writer.send_frame(&payload) {
1888 Ok(()) => {}
1889 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1890 thread::sleep(Duration::from_millis(10));
1891 }
1892 Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {
1893 stalled = true;
1894 break;
1895 }
1896 Err(e) => panic!("unexpected send error: {}", e),
1897 }
1898 }
1899
1900 assert!(stalled, "expected writer to time out on persistent stall");
1901 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1902 assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1903 }
1904
1905 fn wait_for<F>(rx: &mpsc::Receiver<Event>, timeout: Duration, mut pred: F) -> Option<Event>
1907 where
1908 F: FnMut(&Event) -> bool,
1909 {
1910 let deadline = Instant::now() + timeout;
1911 loop {
1912 let remaining = deadline.saturating_duration_since(Instant::now());
1913 if remaining.is_zero() {
1914 return None;
1915 }
1916 match rx.recv_timeout(remaining) {
1917 Ok(event) if pred(&event) => return Some(event),
1918 Ok(_) => continue,
1919 Err(_) => return None,
1920 }
1921 }
1922 }
1923
1924 #[test]
1925 fn backbone_write_stall_emits_peer_events() {
1926 let port = find_free_port();
1927 let (tx, rx) = crate::event::channel();
1928 let next_id = Arc::new(AtomicU64::new(9700));
1929
1930 let config = make_server_config(
1931 port,
1932 97,
1933 None,
1934 None,
1935 Some(Duration::from_millis(50)), BackboneAbuseConfig::default(),
1937 );
1938
1939 start(config, tx, next_id).unwrap();
1940 thread::sleep(Duration::from_millis(50));
1941
1942 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1944 client
1945 .set_read_timeout(Some(Duration::from_millis(100)))
1946 .unwrap();
1947 let sock = SockRef::from(&client);
1948 sock.set_recv_buffer_size(4096).ok();
1949
1950 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1952 let mut writer = match event {
1953 Event::InterfaceUp(_, Some(w), _) => w,
1954 other => panic!("expected InterfaceUp with writer, got {:?}", other),
1955 };
1956
1957 let payload = vec![0x55; 512 * 1024];
1959 let deadline = Instant::now() + Duration::from_secs(3);
1960 while Instant::now() < deadline {
1961 match writer.send_frame(&payload) {
1962 Ok(()) | Err(_) => {
1963 if Instant::now() + Duration::from_millis(10) > deadline {
1964 break;
1965 }
1966 thread::sleep(Duration::from_millis(10));
1967 }
1968 }
1969 }
1970
1971 let stall_event = wait_for(&rx, Duration::from_secs(3), |e| {
1973 matches!(e, Event::BackbonePeerWriteStall { .. })
1974 });
1975 assert!(
1976 stall_event.is_some(),
1977 "expected BackbonePeerWriteStall event"
1978 );
1979 }
1980
1981 #[test]
1982 fn backbone_blacklisted_peer_rejected_on_connect() {
1983 let port = find_free_port();
1984 let (tx, rx) = crate::event::channel();
1985 let next_id = Arc::new(AtomicU64::new(9800));
1986
1987 let config = make_server_config(port, 98, None, None, None, BackboneAbuseConfig::default());
1988 let peer_state = config.peer_state.clone();
1989
1990 start(config, tx.clone(), next_id).unwrap();
1991 thread::sleep(Duration::from_millis(50));
1992
1993 let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1995 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1996 assert!(
1997 matches!(event, Event::InterfaceUp(_, _, _)),
1998 "first connection should succeed"
1999 );
2000 drop(client1);
2001
2002 thread::sleep(Duration::from_millis(100));
2004 while rx.try_recv().is_ok() {}
2005
2006 peer_state.lock().unwrap().blacklist(
2008 "127.0.0.1".parse().unwrap(),
2009 Duration::from_secs(60),
2010 "test blacklist".into(),
2011 );
2012
2013 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
2015 thread::sleep(Duration::from_millis(200));
2017
2018 let event = rx.try_recv();
2020 match event {
2021 Ok(Event::InterfaceUp(_, _, _)) => {
2022 panic!("blacklisted peer should not get InterfaceUp")
2023 }
2024 _ => {} }
2026 }
2027
2028 #[test]
2029 fn backbone_parse_config_reads_abuse_settings() {
2030 let factory = BackboneInterfaceFactory;
2031 let mut params = HashMap::new();
2032 params.insert("listen_ip".into(), "127.0.0.1".into());
2033 params.insert("listen_port".into(), "4242".into());
2034 params.insert("idle_timeout".into(), "15".into());
2035 params.insert("write_stall_timeout".into(), "45".into());
2036 params.insert("max_penalty_duration".into(), "3600".into());
2037
2038 let config = factory
2039 .parse_config("test-backbone", InterfaceId(97), ¶ms)
2040 .unwrap();
2041 let mode = *config.into_any().downcast::<BackboneMode>().unwrap();
2042
2043 match mode {
2044 BackboneMode::Server(config) => {
2045 assert_eq!(config.listen_ip, "127.0.0.1");
2046 assert_eq!(config.listen_port, 4242);
2047 assert_eq!(config.idle_timeout, Some(Duration::from_secs(15)));
2048 assert_eq!(config.write_stall_timeout, Some(Duration::from_secs(45)));
2049 assert_eq!(
2050 config.abuse.max_penalty_duration,
2051 Some(Duration::from_secs(3600))
2052 );
2053 }
2054 BackboneMode::Client(_) => panic!("expected server config"),
2055 }
2056 }
2057}