1use std::collections::HashMap;
13use std::hash::{BuildHasher, Hasher};
14use std::io::{self, Read, Write};
15use std::net::{IpAddr, Shutdown, TcpListener, TcpStream, ToSocketAddrs};
16use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
17use std::sync::{Arc, Mutex};
18use std::thread;
19use std::time::{Duration, Instant};
20
21use polling::{Event as PollEvent, Events, Poller};
22use socket2::{SockRef, TcpKeepalive};
23
24use rns_core::constants;
25use rns_core::transport::types::{InterfaceId, InterfaceInfo};
26
27use crate::event::{Event, EventSender};
28use crate::hdlc;
29use crate::interface::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult, Writer};
30use crate::BackbonePeerStateEntry;
31
32#[allow(dead_code)]
34const HW_MTU: usize = 1_048_576;
35
36#[derive(Debug, Clone)]
38pub struct BackboneConfig {
39 pub name: String,
40 pub listen_ip: String,
41 pub listen_port: u16,
42 pub interface_id: InterfaceId,
43 pub max_connections: Option<usize>,
44 pub idle_timeout: Option<Duration>,
45 pub write_stall_timeout: Option<Duration>,
46 pub abuse: BackboneAbuseConfig,
47 pub runtime: Arc<Mutex<BackboneServerRuntime>>,
48 pub peer_state: Arc<Mutex<BackbonePeerMonitor>>,
49}
50
51#[derive(Debug, Clone, Default)]
53pub struct BackboneAbuseConfig {
54 pub max_penalty_duration: Option<Duration>,
55}
56
57#[derive(Debug, Clone)]
59pub struct BackboneServerRuntime {
60 pub max_connections: Option<usize>,
61 pub idle_timeout: Option<Duration>,
62 pub write_stall_timeout: Option<Duration>,
63 pub abuse: BackboneAbuseConfig,
64}
65
66impl BackboneServerRuntime {
67 pub fn from_config(config: &BackboneConfig) -> Self {
68 Self {
69 max_connections: config.max_connections,
70 idle_timeout: config.idle_timeout,
71 write_stall_timeout: config.write_stall_timeout,
72 abuse: config.abuse.clone(),
73 }
74 }
75}
76
77#[derive(Debug, Clone)]
78pub struct BackboneRuntimeConfigHandle {
79 pub interface_name: String,
80 pub runtime: Arc<Mutex<BackboneServerRuntime>>,
81 pub startup: BackboneServerRuntime,
82}
83
84#[derive(Debug, Clone)]
85pub struct BackbonePeerStateHandle {
86 pub interface_id: InterfaceId,
87 pub interface_name: String,
88 pub peer_state: Arc<Mutex<BackbonePeerMonitor>>,
89}
90
91impl Default for BackboneConfig {
92 fn default() -> Self {
93 let mut config = BackboneConfig {
94 name: String::new(),
95 listen_ip: "0.0.0.0".into(),
96 listen_port: 0,
97 interface_id: InterfaceId(0),
98 max_connections: None,
99 idle_timeout: None,
100 write_stall_timeout: None,
101 abuse: BackboneAbuseConfig::default(),
102 runtime: Arc::new(Mutex::new(BackboneServerRuntime {
103 max_connections: None,
104 idle_timeout: None,
105 write_stall_timeout: None,
106 abuse: BackboneAbuseConfig::default(),
107 })),
108 peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
109 };
110 let startup = BackboneServerRuntime::from_config(&config);
111 config.runtime = Arc::new(Mutex::new(startup));
112 config
113 }
114}
115
116const MAX_PENDING_BYTES: usize = 512 * 1024;
119
120struct BackboneWriter {
122 stream: TcpStream,
123 runtime: Arc<Mutex<BackboneServerRuntime>>,
124 interface_name: String,
125 interface_id: InterfaceId,
126 event_tx: EventSender,
127 pending: Vec<u8>,
128 stall_started: Option<Instant>,
129 disconnect_notified: bool,
130 write_stall_flag: Arc<AtomicBool>,
131}
132
133impl Writer for BackboneWriter {
134 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
135 let write_stall_timeout = self.runtime.lock().unwrap().write_stall_timeout;
136 if !self.pending.is_empty() {
137 self.flush_pending(write_stall_timeout)?;
138 if !self.pending.is_empty() {
139 return Err(io::Error::new(
140 io::ErrorKind::WouldBlock,
141 "backbone writer still stalled",
142 ));
143 }
144 }
145
146 let frame = hdlc::frame(data);
147 self.write_buffer(&frame, write_stall_timeout)
148 }
149}
150
151impl BackboneWriter {
152 fn write_buffer(
153 &mut self,
154 data: &[u8],
155 write_stall_timeout: Option<Duration>,
156 ) -> io::Result<()> {
157 let mut written = 0usize;
158 while written < data.len() {
159 match self.stream.write(&data[written..]) {
160 Ok(0) => {
161 return Err(io::Error::new(
162 io::ErrorKind::WriteZero,
163 "backbone writer wrote zero bytes",
164 ))
165 }
166 Ok(n) => {
167 written += n;
168 self.stall_started = None;
169 }
170 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
171 let now = Instant::now();
172 let started = self.stall_started.get_or_insert(now);
173 if let Some(timeout) = write_stall_timeout {
174 if now.duration_since(*started) >= timeout {
175 return Err(self.disconnect_for_write_stall(timeout));
176 }
177 }
178 if self.pending.len() + data[written..].len() > MAX_PENDING_BYTES {
179 return Err(self.disconnect_for_write_stall(
180 write_stall_timeout.unwrap_or(Duration::from_secs(30)),
181 ));
182 }
183 self.pending.extend_from_slice(&data[written..]);
184 return Err(io::Error::new(
185 io::ErrorKind::WouldBlock,
186 "backbone writer would block",
187 ));
188 }
189 Err(e) => return Err(e),
190 }
191 }
192 Ok(())
193 }
194
195 fn flush_pending(&mut self, write_stall_timeout: Option<Duration>) -> io::Result<()> {
196 if self.pending.is_empty() {
197 return Ok(());
198 }
199
200 let pending = std::mem::take(&mut self.pending);
201 match self.write_buffer(&pending, write_stall_timeout) {
202 Ok(()) => Ok(()),
203 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(()),
204 Err(e) => Err(e),
205 }
206 }
207
208 fn disconnect_for_write_stall(&mut self, timeout: Duration) -> io::Error {
209 if !self.disconnect_notified {
210 log::warn!(
211 "[{}] backbone client {} disconnected due to write stall timeout ({:?})",
212 self.interface_name,
213 self.interface_id.0,
214 timeout
215 );
216 self.write_stall_flag.store(true, Ordering::Relaxed);
217 let _ = self.stream.shutdown(Shutdown::Both);
218 let _ = self.event_tx.send(Event::InterfaceDown(self.interface_id));
219 self.disconnect_notified = true;
220 }
221 io::Error::new(
222 io::ErrorKind::TimedOut,
223 format!("backbone writer stalled for {:?}", timeout),
224 )
225 }
226}
227
228pub fn start(config: BackboneConfig, tx: EventSender, next_id: Arc<AtomicU64>) -> io::Result<()> {
230 let addr = format!("{}:{}", config.listen_ip, config.listen_port);
231 let listener = TcpListener::bind(&addr)?;
232 listener.set_nonblocking(true)?;
233
234 log::info!(
235 "[{}] backbone server listening on {}",
236 config.name,
237 listener.local_addr().unwrap_or(addr.parse().unwrap())
238 );
239
240 let name = config.name.clone();
241 let server_interface_id = config.interface_id;
242 let runtime = Arc::clone(&config.runtime);
243 let peer_state = Arc::clone(&config.peer_state);
244 thread::Builder::new()
245 .name(format!("backbone-poll-{}", config.interface_id.0))
246 .spawn(move || {
247 if let Err(e) = poll_loop(
248 listener,
249 name,
250 server_interface_id,
251 tx,
252 next_id,
253 runtime,
254 peer_state,
255 ) {
256 log::error!("backbone poll loop error: {}", e);
257 }
258 })?;
259
260 Ok(())
261}
262
263struct ClientState {
265 id: InterfaceId,
266 peer_ip: IpAddr,
267 peer_port: u16,
268 stream: TcpStream,
269 decoder: hdlc::Decoder,
270 connected_at: Instant,
271 has_received_data: bool,
272 write_stall_flag: Arc<AtomicBool>,
273}
274
275#[derive(Debug, Clone)]
276struct PeerBehaviorState {
277 blacklisted_until: Option<Instant>,
278 blacklist_reason: Option<String>,
279 reject_count: u64,
280 connected_count: usize,
281}
282
283impl PeerBehaviorState {
284 fn new() -> Self {
285 Self {
286 blacklisted_until: None,
287 blacklist_reason: None,
288 reject_count: 0,
289 connected_count: 0,
290 }
291 }
292}
293
294#[derive(Debug, Clone, Default)]
295pub struct BackbonePeerMonitor {
296 peers: HashMap<IpAddr, PeerBehaviorState>,
297}
298
299impl BackbonePeerMonitor {
300 pub fn new() -> Self {
301 Self {
302 peers: HashMap::new(),
303 }
304 }
305
306 fn upsert_snapshot(&mut self, peers: &HashMap<IpAddr, PeerBehaviorState>) {
307 let mut merged = self.peers.clone();
308
309 for (peer_ip, state) in peers {
310 let entry = merged
311 .entry(*peer_ip)
312 .or_insert_with(PeerBehaviorState::new);
313 entry.connected_count = state.connected_count;
314 entry.reject_count = state.reject_count;
315 if state.blacklisted_until.is_some() {
316 entry.blacklisted_until = state.blacklisted_until;
317 entry.blacklist_reason = state.blacklist_reason.clone();
318 }
319 }
320
321 merged.retain(|peer_ip, state| {
322 peers.contains_key(peer_ip)
323 || state.blacklisted_until.is_some()
324 || state.reject_count > 0
325 });
326 self.peers = merged;
327 }
328
329 fn sync_into(&self, peers: &mut HashMap<IpAddr, PeerBehaviorState>) {
330 for (peer_ip, state) in &self.peers {
331 let entry = peers.entry(*peer_ip).or_insert_with(PeerBehaviorState::new);
332 entry.blacklisted_until = state.blacklisted_until;
333 entry.blacklist_reason = state.blacklist_reason.clone();
334 entry.reject_count = state.reject_count;
335 }
336
337 peers.retain(|peer_ip, state| {
338 if state.connected_count > 0 {
339 return true;
340 }
341 self.peers.contains_key(peer_ip)
342 });
343 }
344
345 pub fn list(&self, interface_name: &str) -> Vec<BackbonePeerStateEntry> {
346 let now = Instant::now();
347 let mut entries: Vec<BackbonePeerStateEntry> = self
348 .peers
349 .iter()
350 .map(|(peer_ip, state)| BackbonePeerStateEntry {
351 interface_name: interface_name.to_string(),
352 peer_ip: *peer_ip,
353 connected_count: state.connected_count,
354 blacklisted_remaining_secs: state
355 .blacklisted_until
356 .and_then(|until| (until > now).then(|| (until - now).as_secs_f64())),
357 blacklist_reason: state.blacklist_reason.clone(),
358 reject_count: state.reject_count,
359 })
360 .collect();
361 entries.sort_by(|a, b| a.peer_ip.cmp(&b.peer_ip));
362 entries
363 }
364
365 pub fn clear(&mut self, peer_ip: IpAddr) -> bool {
366 self.peers.remove(&peer_ip).is_some()
367 }
368
369 pub fn blacklist(&mut self, peer_ip: IpAddr, duration: Duration, reason: String) -> bool {
370 let state = self
371 .peers
372 .entry(peer_ip)
373 .or_insert_with(PeerBehaviorState::new);
374 state.blacklisted_until = Some(Instant::now() + duration);
375 state.blacklist_reason = Some(reason);
376 true
377 }
378
379 #[cfg(test)]
380 pub fn seed_entry(&mut self, entry: BackbonePeerStateEntry) {
381 let mut state = PeerBehaviorState::new();
382 state.connected_count = entry.connected_count;
383 state.reject_count = entry.reject_count;
384 state.blacklist_reason = entry.blacklist_reason;
385 if let Some(remaining) = entry.blacklisted_remaining_secs {
386 state.blacklisted_until = Some(Instant::now() + Duration::from_secs_f64(remaining));
387 }
388 self.peers.insert(entry.peer_ip, state);
389 }
390}
391
392#[derive(Clone, Copy)]
393enum DisconnectReason {
394 RemoteClosed,
395 IdleTimeout,
396 WriteStall,
397}
398
399fn poll_loop(
401 listener: TcpListener,
402 name: String,
403 server_interface_id: InterfaceId,
404 tx: EventSender,
405 next_id: Arc<AtomicU64>,
406 runtime: Arc<Mutex<BackboneServerRuntime>>,
407 peer_state: Arc<Mutex<BackbonePeerMonitor>>,
408) -> io::Result<()> {
409 let poller = Poller::new()?;
410
411 const LISTENER_KEY: usize = 0;
412
413 unsafe { poller.add(&listener, PollEvent::readable(LISTENER_KEY))? };
415
416 let mut clients: HashMap<usize, ClientState> = HashMap::new();
417 let mut peers: HashMap<IpAddr, PeerBehaviorState> = HashMap::new();
418 let mut events = Events::new();
419 let mut next_key: usize = 1;
420
421 loop {
422 let runtime_snapshot = runtime.lock().unwrap().clone();
423 let max_connections = runtime_snapshot.max_connections;
424 let idle_timeout = runtime_snapshot.idle_timeout;
425 cleanup_peer_state(&mut peers);
426 {
427 let mut monitor = peer_state.lock().unwrap();
428 monitor.sync_into(&mut peers);
429 monitor.upsert_snapshot(&peers);
430 }
431
432 events.clear();
433 poller.wait(&mut events, Some(Duration::from_secs(1)))?;
434
435 for ev in events.iter() {
436 if ev.key == LISTENER_KEY {
437 loop {
439 match listener.accept() {
440 Ok((stream, peer_addr)) => {
441 let peer_ip = peer_addr.ip();
442 let peer_port = peer_addr.port();
443
444 if is_ip_blacklisted(&mut peers, peer_ip) {
445 if let Some(state) = peers.get_mut(&peer_ip) {
446 state.reject_count = state.reject_count.saturating_add(1);
447 }
448 peer_state.lock().unwrap().upsert_snapshot(&peers);
449 log::debug!("[{}] rejecting blacklisted peer {}", name, peer_addr);
450 drop(stream);
451 continue;
452 }
453
454 if let Some(max) = max_connections {
455 if clients.len() >= max {
456 log::warn!(
457 "[{}] max connections ({}) reached, rejecting {}",
458 name,
459 max,
460 peer_addr
461 );
462 drop(stream);
463 continue;
464 }
465 }
466
467 stream.set_nonblocking(true).ok();
468 stream.set_nodelay(true).ok();
469 set_tcp_keepalive(&stream).ok();
470
471 #[cfg(target_os = "macos")]
473 {
474 let sock = SockRef::from(&stream);
475 sock.set_nosigpipe(true).ok();
476 }
477
478 let key = next_key;
479 next_key += 1;
480 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
481
482 log::info!(
483 "[{}] backbone client connected: {} → id {}",
484 name,
485 peer_addr,
486 client_id.0
487 );
488
489 if let Err(e) = unsafe { poller.add(&stream, PollEvent::readable(key)) }
492 {
493 log::warn!("[{}] failed to add client to poller: {}", name, e);
494 continue; }
496
497 let writer_stream = match stream.try_clone() {
499 Ok(s) => s,
500 Err(e) => {
501 log::warn!("[{}] failed to clone client stream: {}", name, e);
502 let _ = poller.delete(&stream);
503 continue; }
505 };
506 let write_stall_flag = Arc::new(AtomicBool::new(false));
507 let writer: Box<dyn Writer> = Box::new(BackboneWriter {
508 stream: writer_stream,
509 runtime: Arc::clone(&runtime),
510 interface_name: name.clone(),
511 interface_id: client_id,
512 event_tx: tx.clone(),
513 pending: Vec::new(),
514 stall_started: None,
515 disconnect_notified: false,
516 write_stall_flag: Arc::clone(&write_stall_flag),
517 });
518
519 clients.insert(
520 key,
521 ClientState {
522 id: client_id,
523 peer_ip,
524 peer_port,
525 stream,
526 decoder: hdlc::Decoder::new(),
527 connected_at: Instant::now(),
528 has_received_data: false,
529 write_stall_flag,
530 },
531 );
532 peers
533 .entry(peer_ip)
534 .or_insert_with(PeerBehaviorState::new)
535 .connected_count += 1;
536 peer_state.lock().unwrap().upsert_snapshot(&peers);
537 let _ = tx.send(Event::BackbonePeerConnected {
538 server_interface_id,
539 peer_interface_id: client_id,
540 peer_ip,
541 peer_port,
542 });
543
544 let info = InterfaceInfo {
545 id: client_id,
546 name: format!("BackboneInterface/{}", client_id.0),
547 mode: constants::MODE_FULL,
548 out_capable: true,
549 in_capable: true,
550 bitrate: Some(1_000_000_000), announce_rate_target: None,
552 announce_rate_grace: 0,
553 announce_rate_penalty: 0.0,
554 announce_cap: constants::ANNOUNCE_CAP,
555 is_local_client: false,
556 wants_tunnel: false,
557 tunnel_id: None,
558 mtu: 65535,
559 ia_freq: 0.0,
560 started: 0.0,
561 ingress_control: true,
562 };
563
564 if tx
565 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
566 .is_err()
567 {
568 cleanup(&poller, &clients, &listener);
570 return Ok(());
571 }
572 }
573 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
574 Err(e) => {
575 log::warn!("[{}] accept error: {}", name, e);
576 break;
577 }
578 }
579 }
580 poller.modify(&listener, PollEvent::readable(LISTENER_KEY))?;
582 } else if clients.contains_key(&ev.key) {
583 let key = ev.key;
584 let mut should_remove = false;
585 let mut client_id = InterfaceId(0);
586
587 let mut buf = [0u8; 4096];
588 let read_result = {
589 let client = clients.get_mut(&key).unwrap();
590 client.stream.read(&mut buf)
591 };
592
593 match read_result {
594 Ok(0) | Err(_) => {
595 if let Some(c) = clients.get(&key) {
596 client_id = c.id;
597 }
598 should_remove = true;
599 }
600 Ok(n) => {
601 let client = clients.get_mut(&key).unwrap();
602 client_id = client.id;
603 client.has_received_data = true;
604 for frame in client.decoder.feed(&buf[..n]) {
605 if tx
606 .send(Event::Frame {
607 interface_id: client_id,
608 data: frame,
609 })
610 .is_err()
611 {
612 cleanup(&poller, &clients, &listener);
613 return Ok(());
614 }
615 }
616 }
617 }
618
619 if should_remove {
620 let reason = if clients
621 .get(&key)
622 .is_some_and(|c| c.write_stall_flag.load(Ordering::Relaxed))
623 {
624 DisconnectReason::WriteStall
625 } else {
626 DisconnectReason::RemoteClosed
627 };
628 disconnect_client(
629 &poller,
630 &mut clients,
631 &mut peers,
632 &name,
633 server_interface_id,
634 &tx,
635 &peer_state,
636 key,
637 client_id,
638 reason,
639 );
640 } else if let Some(client) = clients.get(&key) {
641 poller.modify(&client.stream, PollEvent::readable(key))?;
643 }
644 }
645 }
646
647 if let Some(timeout) = idle_timeout {
648 let now = Instant::now();
649 let timed_out: Vec<(usize, InterfaceId)> = clients
650 .iter()
651 .filter_map(|(&key, client)| {
652 if client.has_received_data || now.duration_since(client.connected_at) < timeout
653 {
654 None
655 } else {
656 Some((key, client.id))
657 }
658 })
659 .collect();
660
661 for (key, client_id) in timed_out {
662 disconnect_client(
663 &poller,
664 &mut clients,
665 &mut peers,
666 &name,
667 server_interface_id,
668 &tx,
669 &peer_state,
670 key,
671 client_id,
672 DisconnectReason::IdleTimeout,
673 );
674 }
675 }
676 }
677}
678
679fn cleanup_peer_state(peers: &mut HashMap<IpAddr, PeerBehaviorState>) {
680 let now = Instant::now();
681 peers.retain(|_, state| {
682 if matches!(state.blacklisted_until, Some(until) if now >= until) {
683 state.blacklisted_until = None;
684 state.blacklist_reason = None;
685 }
686 state.blacklisted_until.is_some() || state.connected_count > 0 || state.reject_count > 0
687 });
688}
689
690fn is_ip_blacklisted(peers: &mut HashMap<IpAddr, PeerBehaviorState>, peer_ip: IpAddr) -> bool {
691 let now = Instant::now();
692 if let Some(state) = peers.get_mut(&peer_ip) {
693 if let Some(until) = state.blacklisted_until {
694 if now < until {
695 return true;
696 }
697 state.blacklisted_until = None;
698 }
699 }
700 false
701}
702
703fn disconnect_client(
704 poller: &Poller,
705 clients: &mut HashMap<usize, ClientState>,
706 peers: &mut HashMap<IpAddr, PeerBehaviorState>,
707 name: &str,
708 server_interface_id: InterfaceId,
709 tx: &EventSender,
710 peer_state: &Arc<Mutex<BackbonePeerMonitor>>,
711 key: usize,
712 client_id: InterfaceId,
713 reason: DisconnectReason,
714) {
715 let Some(client) = clients.remove(&key) else {
716 return;
717 };
718
719 match reason {
720 DisconnectReason::RemoteClosed => {
721 log::info!("[{}] backbone client {} disconnected", name, client_id.0);
722 }
723 DisconnectReason::IdleTimeout => {
724 log::info!(
725 "[{}] backbone client {} disconnected due to idle timeout",
726 name,
727 client_id.0
728 );
729 }
730 DisconnectReason::WriteStall => {
731 }
733 }
734
735 let _ = poller.delete(&client.stream);
736 let connected_for = client.connected_at.elapsed();
738 let _ = tx.send(Event::BackbonePeerDisconnected {
739 server_interface_id,
740 peer_interface_id: client.id,
741 peer_ip: client.peer_ip,
742 peer_port: client.peer_port,
743 connected_for,
744 had_received_data: client.has_received_data,
745 });
746 match reason {
747 DisconnectReason::IdleTimeout => {
748 let _ = tx.send(Event::BackbonePeerIdleTimeout {
749 server_interface_id,
750 peer_interface_id: client.id,
751 peer_ip: client.peer_ip,
752 peer_port: client.peer_port,
753 connected_for,
754 });
755 }
756 DisconnectReason::WriteStall => {
757 let _ = tx.send(Event::BackbonePeerWriteStall {
758 server_interface_id,
759 peer_interface_id: client.id,
760 peer_ip: client.peer_ip,
761 peer_port: client.peer_port,
762 connected_for,
763 });
764 }
765 DisconnectReason::RemoteClosed => {}
766 }
767
768 if let Some(state) = peers.get_mut(&client.peer_ip) {
769 state.connected_count = state.connected_count.saturating_sub(1);
770 }
771 peer_state.lock().unwrap().upsert_snapshot(peers);
772 if !matches!(reason, DisconnectReason::WriteStall) {
774 let _ = tx.send(Event::InterfaceDown(client_id));
775 }
776}
777
778fn set_tcp_keepalive(stream: &TcpStream) -> io::Result<()> {
779 let sock = SockRef::from(stream);
780 let mut keepalive = TcpKeepalive::new()
781 .with_time(Duration::from_secs(5))
782 .with_interval(Duration::from_secs(2));
783 #[cfg(any(target_os = "linux", target_os = "macos"))]
784 {
785 keepalive = keepalive.with_retries(12);
786 }
787 sock.set_tcp_keepalive(&keepalive)
788}
789
790fn cleanup(poller: &Poller, clients: &HashMap<usize, ClientState>, listener: &TcpListener) {
791 for (_, client) in clients {
792 let _ = poller.delete(&client.stream);
793 }
794 let _ = poller.delete(listener);
795}
796
797#[derive(Debug, Clone)]
803pub struct BackboneClientConfig {
804 pub name: String,
805 pub target_host: String,
806 pub target_port: u16,
807 pub interface_id: InterfaceId,
808 pub reconnect_wait: Duration,
809 pub max_reconnect_tries: Option<u32>,
810 pub connect_timeout: Duration,
811 pub transport_identity: Option<String>,
812 pub runtime: Arc<Mutex<BackboneClientRuntime>>,
813}
814
815#[derive(Debug, Clone)]
816pub struct BackboneClientRuntime {
817 pub reconnect_wait: Duration,
818 pub max_reconnect_tries: Option<u32>,
819 pub connect_timeout: Duration,
820}
821
822impl BackboneClientRuntime {
823 pub fn from_config(config: &BackboneClientConfig) -> Self {
824 Self {
825 reconnect_wait: config.reconnect_wait,
826 max_reconnect_tries: config.max_reconnect_tries,
827 connect_timeout: config.connect_timeout,
828 }
829 }
830}
831
832#[derive(Debug, Clone)]
833pub struct BackboneClientRuntimeConfigHandle {
834 pub interface_name: String,
835 pub runtime: Arc<Mutex<BackboneClientRuntime>>,
836 pub startup: BackboneClientRuntime,
837}
838
839impl Default for BackboneClientConfig {
840 fn default() -> Self {
841 let mut config = BackboneClientConfig {
842 name: String::new(),
843 target_host: "127.0.0.1".into(),
844 target_port: 4242,
845 interface_id: InterfaceId(0),
846 reconnect_wait: Duration::from_secs(5),
847 max_reconnect_tries: None,
848 connect_timeout: Duration::from_secs(5),
849 transport_identity: None,
850 runtime: Arc::new(Mutex::new(BackboneClientRuntime {
851 reconnect_wait: Duration::from_secs(5),
852 max_reconnect_tries: None,
853 connect_timeout: Duration::from_secs(5),
854 })),
855 };
856 let startup = BackboneClientRuntime::from_config(&config);
857 config.runtime = Arc::new(Mutex::new(startup));
858 config
859 }
860}
861
862struct BackboneClientWriter {
864 stream: TcpStream,
865}
866
867impl Writer for BackboneClientWriter {
868 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
869 self.stream.write_all(&hdlc::frame(data))
870 }
871}
872
873fn try_connect_client(config: &BackboneClientConfig) -> io::Result<TcpStream> {
875 let runtime = config.runtime.lock().unwrap().clone();
876 let addr_str = format!("{}:{}", config.target_host, config.target_port);
877 let addr = addr_str
878 .to_socket_addrs()?
879 .next()
880 .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "no addresses resolved"))?;
881
882 let stream = TcpStream::connect_timeout(&addr, runtime.connect_timeout)?;
883 stream.set_nodelay(true)?;
884 set_tcp_keepalive(&stream).ok();
885
886 #[cfg(target_os = "macos")]
888 {
889 let sock = SockRef::from(&stream);
890 sock.set_nosigpipe(true).ok();
891 }
892
893 Ok(stream)
894}
895
896pub fn start_client(config: BackboneClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
898 let stream = try_connect_client(&config)?;
899 let reader_stream = stream.try_clone()?;
900 let writer_stream = stream.try_clone()?;
901
902 let id = config.interface_id;
903 log::info!(
904 "[{}] backbone client connected to {}:{}",
905 config.name,
906 config.target_host,
907 config.target_port
908 );
909
910 let _ = tx.send(Event::InterfaceUp(id, None, None));
912
913 thread::Builder::new()
914 .name(format!("backbone-client-{}", id.0))
915 .spawn(move || {
916 client_reader_loop(reader_stream, config, tx);
917 })?;
918
919 Ok(Box::new(BackboneClientWriter {
920 stream: writer_stream,
921 }))
922}
923
924fn client_reader_loop(mut stream: TcpStream, config: BackboneClientConfig, tx: EventSender) {
927 let id = config.interface_id;
928 let mut decoder = hdlc::Decoder::new();
929 let mut buf = [0u8; 4096];
930
931 loop {
932 match stream.read(&mut buf) {
933 Ok(0) => {
934 log::warn!("[{}] connection closed", config.name);
935 let _ = tx.send(Event::InterfaceDown(id));
936 match client_reconnect(&config, &tx) {
937 Some(new_stream) => {
938 stream = new_stream;
939 decoder = hdlc::Decoder::new();
940 continue;
941 }
942 None => {
943 log::error!("[{}] reconnection failed, giving up", config.name);
944 return;
945 }
946 }
947 }
948 Ok(n) => {
949 for frame in decoder.feed(&buf[..n]) {
950 if tx
951 .send(Event::Frame {
952 interface_id: id,
953 data: frame,
954 })
955 .is_err()
956 {
957 return;
958 }
959 }
960 }
961 Err(e) => {
962 log::warn!("[{}] read error: {}", config.name, e);
963 let _ = tx.send(Event::InterfaceDown(id));
964 match client_reconnect(&config, &tx) {
965 Some(new_stream) => {
966 stream = new_stream;
967 decoder = hdlc::Decoder::new();
968 continue;
969 }
970 None => {
971 log::error!("[{}] reconnection failed, giving up", config.name);
972 return;
973 }
974 }
975 }
976 }
977 }
978}
979
980const MAX_BACKOFF_SHIFT: u32 = 6;
983
984fn client_reconnect(config: &BackboneClientConfig, tx: &EventSender) -> Option<TcpStream> {
988 let mut attempts = 0u32;
989 loop {
990 let runtime = config.runtime.lock().unwrap().clone();
991
992 let shift = attempts.min(MAX_BACKOFF_SHIFT);
993 let backoff = runtime.reconnect_wait * 2u32.pow(shift);
994 let jitter_range = backoff / 4;
996 let jitter = if jitter_range.as_nanos() > 0 {
997 let offset = Duration::from_nanos(
998 (std::hash::RandomState::new().build_hasher().finish()
999 % jitter_range.as_nanos() as u64)
1000 * 2,
1001 );
1002 if offset > jitter_range {
1003 backoff + (offset - jitter_range)
1004 } else {
1005 backoff - (jitter_range - offset)
1006 }
1007 } else {
1008 backoff
1009 };
1010 thread::sleep(jitter);
1011
1012 attempts += 1;
1013
1014 if let Some(max) = runtime.max_reconnect_tries {
1015 if attempts > max {
1016 let _ = tx.send(Event::InterfaceDown(config.interface_id));
1017 return None;
1018 }
1019 }
1020
1021 log::info!(
1022 "[{}] reconnect attempt {} (backoff {:.1}s) ...",
1023 config.name,
1024 attempts,
1025 jitter.as_secs_f64(),
1026 );
1027
1028 match try_connect_client(config) {
1029 Ok(new_stream) => {
1030 let writer_stream = match new_stream.try_clone() {
1031 Ok(s) => s,
1032 Err(e) => {
1033 log::warn!("[{}] failed to clone stream: {}", config.name, e);
1034 continue;
1035 }
1036 };
1037 log::info!(
1038 "[{}] reconnected after {} attempt(s)",
1039 config.name,
1040 attempts
1041 );
1042 let new_writer: Box<dyn Writer> = Box::new(BackboneClientWriter {
1043 stream: writer_stream,
1044 });
1045 let _ = tx.send(Event::InterfaceUp(
1046 config.interface_id,
1047 Some(new_writer),
1048 None,
1049 ));
1050 return Some(new_stream);
1051 }
1052 Err(e) => {
1053 log::warn!("[{}] reconnect failed: {}", config.name, e);
1054 }
1055 }
1056 }
1057}
1058
1059pub(crate) enum BackboneMode {
1066 Server(BackboneConfig),
1067 Client(BackboneClientConfig),
1068}
1069
1070pub struct BackboneInterfaceFactory;
1076
1077fn parse_positive_duration_secs(params: &HashMap<String, String>, key: &str) -> Option<Duration> {
1078 params
1079 .get(key)
1080 .and_then(|v| v.parse::<f64>().ok())
1081 .filter(|v| *v > 0.0)
1082 .map(Duration::from_secs_f64)
1083}
1084
1085impl InterfaceFactory for BackboneInterfaceFactory {
1086 fn type_name(&self) -> &str {
1087 "BackboneInterface"
1088 }
1089
1090 fn parse_config(
1091 &self,
1092 name: &str,
1093 id: InterfaceId,
1094 params: &HashMap<String, String>,
1095 ) -> Result<Box<dyn InterfaceConfigData>, String> {
1096 if let Some(target_host) = params.get("remote").or_else(|| params.get("target_host")) {
1097 let target_host = target_host.clone();
1099 let target_port = params
1100 .get("target_port")
1101 .or_else(|| params.get("port"))
1102 .and_then(|v| v.parse().ok())
1103 .unwrap_or(4242);
1104 let transport_identity = params.get("transport_identity").cloned();
1105 Ok(Box::new(BackboneMode::Client(BackboneClientConfig {
1106 name: name.to_string(),
1107 target_host,
1108 target_port,
1109 interface_id: id,
1110 transport_identity,
1111 ..BackboneClientConfig::default()
1112 })))
1113 } else {
1114 let listen_ip = params
1116 .get("listen_ip")
1117 .or_else(|| params.get("device"))
1118 .cloned()
1119 .unwrap_or_else(|| "0.0.0.0".into());
1120 let listen_port = params
1121 .get("listen_port")
1122 .or_else(|| params.get("port"))
1123 .and_then(|v| v.parse().ok())
1124 .unwrap_or(4242);
1125 let max_connections = params.get("max_connections").and_then(|v| v.parse().ok());
1126 let idle_timeout = parse_positive_duration_secs(params, "idle_timeout");
1127 let write_stall_timeout = parse_positive_duration_secs(params, "write_stall_timeout");
1128 let abuse = BackboneAbuseConfig {
1129 max_penalty_duration: parse_positive_duration_secs(params, "max_penalty_duration"),
1130 };
1131 let mut config = BackboneConfig {
1132 name: name.to_string(),
1133 listen_ip,
1134 listen_port,
1135 interface_id: id,
1136 max_connections,
1137 idle_timeout,
1138 write_stall_timeout,
1139 abuse,
1140 runtime: Arc::new(Mutex::new(BackboneServerRuntime {
1141 max_connections: None,
1142 idle_timeout: None,
1143 write_stall_timeout: None,
1144 abuse: BackboneAbuseConfig::default(),
1145 })),
1146 peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
1147 };
1148 let startup = BackboneServerRuntime::from_config(&config);
1149 config.runtime = Arc::new(Mutex::new(startup));
1150 Ok(Box::new(BackboneMode::Server(config)))
1151 }
1152 }
1153
1154 fn start(
1155 &self,
1156 config: Box<dyn InterfaceConfigData>,
1157 ctx: StartContext,
1158 ) -> io::Result<StartResult> {
1159 let mode = *config.into_any().downcast::<BackboneMode>().map_err(|_| {
1160 io::Error::new(
1161 io::ErrorKind::InvalidData,
1162 "wrong config type for BackboneInterface",
1163 )
1164 })?;
1165
1166 match mode {
1167 BackboneMode::Client(cfg) => {
1168 let id = cfg.interface_id;
1169 let name = cfg.name.clone();
1170 let info = InterfaceInfo {
1171 id,
1172 name,
1173 mode: ctx.mode,
1174 out_capable: true,
1175 in_capable: true,
1176 bitrate: Some(1_000_000_000),
1177 announce_rate_target: None,
1178 announce_rate_grace: 0,
1179 announce_rate_penalty: 0.0,
1180 announce_cap: constants::ANNOUNCE_CAP,
1181 is_local_client: false,
1182 wants_tunnel: false,
1183 tunnel_id: None,
1184 mtu: 65535,
1185 ingress_control: true,
1186 ia_freq: 0.0,
1187 started: crate::time::now(),
1188 };
1189 let writer = start_client(cfg, ctx.tx)?;
1190 Ok(StartResult::Simple {
1191 id,
1192 info,
1193 writer,
1194 interface_type_name: "BackboneInterface".to_string(),
1195 })
1196 }
1197 BackboneMode::Server(cfg) => {
1198 start(cfg, ctx.tx, ctx.next_dynamic_id)?;
1199 Ok(StartResult::Listener { control: None })
1200 }
1201 }
1202 }
1203}
1204
1205pub(crate) fn runtime_handle_from_mode(mode: &BackboneMode) -> Option<BackboneRuntimeConfigHandle> {
1206 match mode {
1207 BackboneMode::Server(config) => Some(BackboneRuntimeConfigHandle {
1208 interface_name: config.name.clone(),
1209 runtime: Arc::clone(&config.runtime),
1210 startup: BackboneServerRuntime::from_config(config),
1211 }),
1212 BackboneMode::Client(_) => None,
1213 }
1214}
1215
1216pub(crate) fn peer_state_handle_from_mode(mode: &BackboneMode) -> Option<BackbonePeerStateHandle> {
1217 match mode {
1218 BackboneMode::Server(config) => Some(BackbonePeerStateHandle {
1219 interface_id: config.interface_id,
1220 interface_name: config.name.clone(),
1221 peer_state: Arc::clone(&config.peer_state),
1222 }),
1223 BackboneMode::Client(_) => None,
1224 }
1225}
1226
1227pub(crate) fn client_runtime_handle_from_mode(
1228 mode: &BackboneMode,
1229) -> Option<BackboneClientRuntimeConfigHandle> {
1230 match mode {
1231 BackboneMode::Client(config) => Some(BackboneClientRuntimeConfigHandle {
1232 interface_name: config.name.clone(),
1233 runtime: Arc::clone(&config.runtime),
1234 startup: BackboneClientRuntime::from_config(config),
1235 }),
1236 BackboneMode::Server(_) => None,
1237 }
1238}
1239
1240#[cfg(test)]
1241mod tests {
1242 use super::*;
1243 use std::sync::mpsc;
1244 use std::time::Duration;
1245
1246 fn find_free_port() -> u16 {
1247 TcpListener::bind("127.0.0.1:0")
1248 .unwrap()
1249 .local_addr()
1250 .unwrap()
1251 .port()
1252 }
1253
1254 fn recv_non_peer_event(
1255 rx: &mpsc::Receiver<Event>,
1256 timeout: Duration,
1257 ) -> Result<Event, mpsc::RecvTimeoutError> {
1258 let deadline = Instant::now() + timeout;
1259 loop {
1260 let remaining = deadline.saturating_duration_since(Instant::now());
1261 if remaining.is_zero() {
1262 return Err(mpsc::RecvTimeoutError::Timeout);
1263 }
1264 let event = rx.recv_timeout(remaining)?;
1265 match event {
1266 Event::BackbonePeerConnected { .. }
1267 | Event::BackbonePeerDisconnected { .. }
1268 | Event::BackbonePeerIdleTimeout { .. }
1269 | Event::BackbonePeerWriteStall { .. }
1270 | Event::BackbonePeerPenalty { .. } => continue,
1271 other => return Ok(other),
1272 }
1273 }
1274 }
1275
1276 fn make_server_config(
1277 port: u16,
1278 interface_id: u64,
1279 max_connections: Option<usize>,
1280 idle_timeout: Option<Duration>,
1281 write_stall_timeout: Option<Duration>,
1282 abuse: BackboneAbuseConfig,
1283 ) -> BackboneConfig {
1284 let mut config = BackboneConfig {
1285 name: "test-backbone".into(),
1286 listen_ip: "127.0.0.1".into(),
1287 listen_port: port,
1288 interface_id: InterfaceId(interface_id),
1289 max_connections,
1290 idle_timeout,
1291 write_stall_timeout,
1292 abuse,
1293 runtime: Arc::new(Mutex::new(BackboneServerRuntime {
1294 max_connections: None,
1295 idle_timeout: None,
1296 write_stall_timeout: None,
1297 abuse: BackboneAbuseConfig::default(),
1298 })),
1299 peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
1300 };
1301 let startup = BackboneServerRuntime::from_config(&config);
1302 config.runtime = Arc::new(Mutex::new(startup));
1303 config
1304 }
1305
1306 #[test]
1307 fn backbone_accept_connection() {
1308 let port = find_free_port();
1309 let (tx, rx) = crate::event::channel();
1310 let next_id = Arc::new(AtomicU64::new(8000));
1311
1312 let config = make_server_config(port, 80, None, None, None, BackboneAbuseConfig::default());
1313
1314 start(config, tx, next_id).unwrap();
1315 thread::sleep(Duration::from_millis(50));
1316
1317 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1318
1319 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1320 match event {
1321 Event::InterfaceUp(id, writer, info) => {
1322 assert_eq!(id, InterfaceId(8000));
1323 assert!(writer.is_some());
1324 assert!(info.is_some());
1325 let info = info.unwrap();
1326 assert!(info.out_capable);
1327 assert!(info.in_capable);
1328 }
1329 other => panic!("expected InterfaceUp, got {:?}", other),
1330 }
1331 }
1332
1333 #[test]
1334 fn backbone_receive_frame() {
1335 let port = find_free_port();
1336 let (tx, rx) = crate::event::channel();
1337 let next_id = Arc::new(AtomicU64::new(8100));
1338
1339 let config = make_server_config(port, 81, None, None, None, BackboneAbuseConfig::default());
1340
1341 start(config, tx, next_id).unwrap();
1342 thread::sleep(Duration::from_millis(50));
1343
1344 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1345
1346 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1348
1349 let payload: Vec<u8> = (0..32).collect();
1351 client.write_all(&hdlc::frame(&payload)).unwrap();
1352
1353 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1354 match event {
1355 Event::Frame { interface_id, data } => {
1356 assert_eq!(interface_id, InterfaceId(8100));
1357 assert_eq!(data, payload);
1358 }
1359 other => panic!("expected Frame, got {:?}", other),
1360 }
1361 }
1362
1363 #[test]
1364 fn backbone_send_to_client() {
1365 let port = find_free_port();
1366 let (tx, rx) = crate::event::channel();
1367 let next_id = Arc::new(AtomicU64::new(8200));
1368
1369 let config = make_server_config(port, 82, None, None, None, BackboneAbuseConfig::default());
1370
1371 start(config, tx, next_id).unwrap();
1372 thread::sleep(Duration::from_millis(50));
1373
1374 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1375 client
1376 .set_read_timeout(Some(Duration::from_secs(2)))
1377 .unwrap();
1378
1379 let event = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1381 let mut writer = match event {
1382 Event::InterfaceUp(_, Some(w), _) => w,
1383 other => panic!("expected InterfaceUp with writer, got {:?}", other),
1384 };
1385
1386 let payload: Vec<u8> = (0..24).collect();
1388 writer.send_frame(&payload).unwrap();
1389
1390 let mut buf = [0u8; 256];
1392 let n = client.read(&mut buf).unwrap();
1393 let expected = hdlc::frame(&payload);
1394 assert_eq!(&buf[..n], &expected[..]);
1395 }
1396
1397 #[test]
1398 fn backbone_multiple_clients() {
1399 let port = find_free_port();
1400 let (tx, rx) = crate::event::channel();
1401 let next_id = Arc::new(AtomicU64::new(8300));
1402
1403 let config = make_server_config(port, 83, None, None, None, BackboneAbuseConfig::default());
1404
1405 start(config, tx, next_id).unwrap();
1406 thread::sleep(Duration::from_millis(50));
1407
1408 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1409 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1410
1411 let mut ids = Vec::new();
1412 for _ in 0..2 {
1413 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1414 match event {
1415 Event::InterfaceUp(id, _, _) => ids.push(id),
1416 other => panic!("expected InterfaceUp, got {:?}", other),
1417 }
1418 }
1419
1420 assert_eq!(ids.len(), 2);
1421 assert_ne!(ids[0], ids[1]);
1422 }
1423
1424 #[test]
1425 fn backbone_client_disconnect() {
1426 let port = find_free_port();
1427 let (tx, rx) = crate::event::channel();
1428 let next_id = Arc::new(AtomicU64::new(8400));
1429
1430 let config = make_server_config(port, 84, None, None, None, BackboneAbuseConfig::default());
1431
1432 start(config, tx, next_id).unwrap();
1433 thread::sleep(Duration::from_millis(50));
1434
1435 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1436
1437 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1439
1440 drop(client);
1442
1443 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1445 assert!(
1446 matches!(event, Event::InterfaceDown(InterfaceId(8400))),
1447 "expected InterfaceDown(8400), got {:?}",
1448 event
1449 );
1450 }
1451
1452 #[test]
1453 fn backbone_epoll_multiplexing() {
1454 let port = find_free_port();
1455 let (tx, rx) = crate::event::channel();
1456 let next_id = Arc::new(AtomicU64::new(8500));
1457
1458 let config = make_server_config(port, 85, None, None, None, BackboneAbuseConfig::default());
1459
1460 start(config, tx, next_id).unwrap();
1461 thread::sleep(Duration::from_millis(50));
1462
1463 let mut client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1464 let mut client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1465
1466 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1468 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1469
1470 let payload1: Vec<u8> = (0..24).collect();
1472 let payload2: Vec<u8> = (100..130).collect();
1473 client1.write_all(&hdlc::frame(&payload1)).unwrap();
1474 client2.write_all(&hdlc::frame(&payload2)).unwrap();
1475
1476 let mut received = Vec::new();
1478 for _ in 0..2 {
1479 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1480 match event {
1481 Event::Frame { data, .. } => received.push(data),
1482 other => panic!("expected Frame, got {:?}", other),
1483 }
1484 }
1485 assert!(received.contains(&payload1));
1486 assert!(received.contains(&payload2));
1487 }
1488
1489 #[test]
1490 fn backbone_bind_port() {
1491 let port = find_free_port();
1492 let (tx, _rx) = crate::event::channel();
1493 let next_id = Arc::new(AtomicU64::new(8600));
1494
1495 let config = make_server_config(port, 86, None, None, None, BackboneAbuseConfig::default());
1496
1497 start(config, tx, next_id).unwrap();
1499 }
1500
1501 #[test]
1502 fn backbone_hdlc_fragmented() {
1503 let port = find_free_port();
1504 let (tx, rx) = crate::event::channel();
1505 let next_id = Arc::new(AtomicU64::new(8700));
1506
1507 let config = make_server_config(port, 87, None, None, None, BackboneAbuseConfig::default());
1508
1509 start(config, tx, next_id).unwrap();
1510 thread::sleep(Duration::from_millis(50));
1511
1512 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1513 client.set_nodelay(true).unwrap();
1514
1515 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1517
1518 let payload: Vec<u8> = (0..32).collect();
1520 let framed = hdlc::frame(&payload);
1521 let mid = framed.len() / 2;
1522
1523 client.write_all(&framed[..mid]).unwrap();
1524 thread::sleep(Duration::from_millis(50));
1525 client.write_all(&framed[mid..]).unwrap();
1526
1527 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1529 match event {
1530 Event::Frame { data, .. } => {
1531 assert_eq!(data, payload);
1532 }
1533 other => panic!("expected Frame, got {:?}", other),
1534 }
1535 }
1536
1537 fn make_client_config(port: u16, id: u64) -> BackboneClientConfig {
1542 BackboneClientConfig {
1543 name: format!("test-bb-client-{}", port),
1544 target_host: "127.0.0.1".into(),
1545 target_port: port,
1546 interface_id: InterfaceId(id),
1547 reconnect_wait: Duration::from_millis(100),
1548 max_reconnect_tries: Some(2),
1549 connect_timeout: Duration::from_secs(2),
1550 transport_identity: None,
1551 runtime: Arc::new(Mutex::new(BackboneClientRuntime {
1552 reconnect_wait: Duration::from_millis(100),
1553 max_reconnect_tries: Some(2),
1554 connect_timeout: Duration::from_secs(2),
1555 })),
1556 }
1557 }
1558
1559 #[test]
1560 fn backbone_client_connect() {
1561 let port = find_free_port();
1562 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1563 let (tx, rx) = crate::event::channel();
1564
1565 let config = make_client_config(port, 9000);
1566 let _writer = start_client(config, tx).unwrap();
1567
1568 let _server_stream = listener.accept().unwrap();
1569
1570 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1571 assert!(matches!(event, Event::InterfaceUp(InterfaceId(9000), _, _)));
1572 }
1573
1574 #[test]
1575 fn backbone_client_receive_frame() {
1576 let port = find_free_port();
1577 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1578 let (tx, rx) = crate::event::channel();
1579
1580 let config = make_client_config(port, 9100);
1581 let _writer = start_client(config, tx).unwrap();
1582
1583 let (mut server_stream, _) = listener.accept().unwrap();
1584
1585 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1587
1588 let payload: Vec<u8> = (0..32).collect();
1590 server_stream.write_all(&hdlc::frame(&payload)).unwrap();
1591
1592 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1593 match event {
1594 Event::Frame { interface_id, data } => {
1595 assert_eq!(interface_id, InterfaceId(9100));
1596 assert_eq!(data, payload);
1597 }
1598 other => panic!("expected Frame, got {:?}", other),
1599 }
1600 }
1601
1602 #[test]
1603 fn backbone_client_send_frame() {
1604 let port = find_free_port();
1605 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1606 let (tx, _rx) = crate::event::channel();
1607
1608 let config = make_client_config(port, 9200);
1609 let mut writer = start_client(config, tx).unwrap();
1610
1611 let (mut server_stream, _) = listener.accept().unwrap();
1612 server_stream
1613 .set_read_timeout(Some(Duration::from_secs(2)))
1614 .unwrap();
1615
1616 let payload: Vec<u8> = (0..24).collect();
1617 writer.send_frame(&payload).unwrap();
1618
1619 let mut buf = [0u8; 256];
1620 let n = server_stream.read(&mut buf).unwrap();
1621 let expected = hdlc::frame(&payload);
1622 assert_eq!(&buf[..n], &expected[..]);
1623 }
1624
1625 #[test]
1626 fn backbone_max_connections_rejects_excess() {
1627 let port = find_free_port();
1628 let (tx, rx) = crate::event::channel();
1629 let next_id = Arc::new(AtomicU64::new(8800));
1630
1631 let config = make_server_config(
1632 port,
1633 88,
1634 Some(2),
1635 None,
1636 None,
1637 BackboneAbuseConfig::default(),
1638 );
1639
1640 start(config, tx, next_id).unwrap();
1641 thread::sleep(Duration::from_millis(50));
1642
1643 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1645 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1646
1647 for _ in 0..2 {
1649 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1650 assert!(matches!(event, Event::InterfaceUp(_, _, _)));
1651 }
1652
1653 let client3 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1655 client3
1656 .set_read_timeout(Some(Duration::from_millis(500)))
1657 .unwrap();
1658
1659 thread::sleep(Duration::from_millis(100));
1661
1662 let result = recv_non_peer_event(&rx, Duration::from_millis(500));
1664 assert!(
1665 result.is_err(),
1666 "expected no InterfaceUp for rejected connection, got {:?}",
1667 result
1668 );
1669 }
1670
1671 #[test]
1672 fn backbone_max_connections_allows_after_disconnect() {
1673 let port = find_free_port();
1674 let (tx, rx) = crate::event::channel();
1675 let next_id = Arc::new(AtomicU64::new(8900));
1676
1677 let config = make_server_config(
1678 port,
1679 89,
1680 Some(1),
1681 None,
1682 None,
1683 BackboneAbuseConfig::default(),
1684 );
1685
1686 start(config, tx, next_id).unwrap();
1687 thread::sleep(Duration::from_millis(50));
1688
1689 let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1691 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1692 assert!(matches!(event, Event::InterfaceUp(_, _, _)));
1693
1694 drop(client1);
1696
1697 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1699 assert!(matches!(event, Event::InterfaceDown(_)));
1700
1701 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1703 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1704 assert!(
1705 matches!(event, Event::InterfaceUp(_, _, _)),
1706 "expected InterfaceUp after slot freed, got {:?}",
1707 event
1708 );
1709 }
1710
1711 #[test]
1712 fn backbone_client_reconnect() {
1713 let port = find_free_port();
1714 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1715 listener.set_nonblocking(false).unwrap();
1716 let (tx, rx) = crate::event::channel();
1717
1718 let config = make_client_config(port, 9300);
1719 let _writer = start_client(config, tx).unwrap();
1720
1721 let (server_stream, _) = listener.accept().unwrap();
1723
1724 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1726
1727 drop(server_stream);
1728
1729 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1731 assert!(matches!(event, Event::InterfaceDown(InterfaceId(9300))));
1732
1733 let _server_stream2 = listener.accept().unwrap();
1735
1736 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1738 assert!(matches!(event, Event::InterfaceUp(InterfaceId(9300), _, _)));
1739 }
1740
1741 #[test]
1742 fn backbone_idle_timeout_disconnects_silent_client() {
1743 let port = find_free_port();
1744 let (tx, rx) = crate::event::channel();
1745 let next_id = Arc::new(AtomicU64::new(9400));
1746
1747 let config = make_server_config(
1748 port,
1749 94,
1750 None,
1751 Some(Duration::from_millis(150)),
1752 None,
1753 BackboneAbuseConfig::default(),
1754 );
1755
1756 start(config, tx, next_id).unwrap();
1757 thread::sleep(Duration::from_millis(50));
1758
1759 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1760
1761 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1762 let client_id = match event {
1763 Event::InterfaceUp(id, _, _) => id,
1764 other => panic!("expected InterfaceUp, got {:?}", other),
1765 };
1766
1767 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1768 assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1769 }
1770
1771 #[test]
1772 fn backbone_idle_timeout_ignores_client_after_data() {
1773 let port = find_free_port();
1774 let (tx, rx) = crate::event::channel();
1775 let next_id = Arc::new(AtomicU64::new(9500));
1776
1777 let config = make_server_config(
1778 port,
1779 95,
1780 None,
1781 Some(Duration::from_millis(200)),
1782 None,
1783 BackboneAbuseConfig::default(),
1784 );
1785
1786 start(config, tx, next_id).unwrap();
1787 thread::sleep(Duration::from_millis(50));
1788
1789 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1790
1791 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1792 let client_id = match event {
1793 Event::InterfaceUp(id, _, _) => id,
1794 other => panic!("expected InterfaceUp, got {:?}", other),
1795 };
1796
1797 client.write_all(&hdlc::frame(&[1u8; 24])).unwrap();
1798
1799 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1800 match event {
1801 Event::Frame { interface_id, data } => {
1802 assert_eq!(interface_id, client_id);
1803 assert_eq!(data, vec![1u8; 24]);
1804 }
1805 other => panic!("expected Frame, got {:?}", other),
1806 }
1807
1808 let result = recv_non_peer_event(&rx, Duration::from_millis(500));
1809 assert!(
1810 result.is_err(),
1811 "expected no InterfaceDown after client sent data, got {:?}",
1812 result
1813 );
1814 }
1815
1816 #[test]
1817 fn backbone_runtime_idle_timeout_updates_live() {
1818 let port = find_free_port();
1819 let (tx, rx) = crate::event::channel();
1820 let next_id = Arc::new(AtomicU64::new(9650));
1821
1822 let config = make_server_config(port, 97, None, None, None, BackboneAbuseConfig::default());
1823 let runtime = Arc::clone(&config.runtime);
1824
1825 start(config, tx, next_id).unwrap();
1826 thread::sleep(Duration::from_millis(50));
1827
1828 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1829 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1830 let client_id = match event {
1831 Event::InterfaceUp(id, _, _) => id,
1832 other => panic!("expected InterfaceUp, got {:?}", other),
1833 };
1834
1835 {
1836 let mut runtime = runtime.lock().unwrap();
1837 runtime.idle_timeout = Some(Duration::from_millis(150));
1838 }
1839
1840 let event = recv_non_peer_event(&rx, Duration::from_secs(4)).unwrap();
1841 assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1842 }
1843
1844 #[test]
1845 fn backbone_write_stall_timeout_disconnects_unwritable_client() {
1846 let port = find_free_port();
1847 let (tx, rx) = crate::event::channel();
1848 let next_id = Arc::new(AtomicU64::new(9660));
1849
1850 let config = make_server_config(
1851 port,
1852 98,
1853 None,
1854 None,
1855 Some(Duration::from_millis(50)),
1856 BackboneAbuseConfig::default(),
1857 );
1858
1859 start(config, tx, next_id).unwrap();
1860 thread::sleep(Duration::from_millis(50));
1861
1862 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1863 client
1864 .set_read_timeout(Some(Duration::from_millis(100)))
1865 .unwrap();
1866 let sock = SockRef::from(&client);
1867 sock.set_recv_buffer_size(4096).ok();
1868
1869 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1870 let (client_id, mut writer) = match event {
1871 Event::InterfaceUp(id, Some(writer), _) => (id, writer),
1872 other => panic!("expected InterfaceUp with writer, got {:?}", other),
1873 };
1874
1875 let payload = vec![0x55; 512 * 1024];
1876 let deadline = Instant::now() + Duration::from_secs(3);
1877 let mut stalled = false;
1878 while Instant::now() < deadline {
1879 match writer.send_frame(&payload) {
1880 Ok(()) => {}
1881 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1882 thread::sleep(Duration::from_millis(10));
1883 }
1884 Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {
1885 stalled = true;
1886 break;
1887 }
1888 Err(e) => panic!("unexpected send error: {}", e),
1889 }
1890 }
1891
1892 assert!(stalled, "expected writer to time out on persistent stall");
1893 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1894 assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1895 }
1896
1897 fn wait_for<F>(rx: &mpsc::Receiver<Event>, timeout: Duration, mut pred: F) -> Option<Event>
1899 where
1900 F: FnMut(&Event) -> bool,
1901 {
1902 let deadline = Instant::now() + timeout;
1903 loop {
1904 let remaining = deadline.saturating_duration_since(Instant::now());
1905 if remaining.is_zero() {
1906 return None;
1907 }
1908 match rx.recv_timeout(remaining) {
1909 Ok(event) if pred(&event) => return Some(event),
1910 Ok(_) => continue,
1911 Err(_) => return None,
1912 }
1913 }
1914 }
1915
1916 #[test]
1917 fn backbone_write_stall_emits_peer_events() {
1918 let port = find_free_port();
1919 let (tx, rx) = crate::event::channel();
1920 let next_id = Arc::new(AtomicU64::new(9700));
1921
1922 let config = make_server_config(
1923 port,
1924 97,
1925 None,
1926 None,
1927 Some(Duration::from_millis(50)), BackboneAbuseConfig::default(),
1929 );
1930
1931 start(config, tx, next_id).unwrap();
1932 thread::sleep(Duration::from_millis(50));
1933
1934 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1936 client
1937 .set_read_timeout(Some(Duration::from_millis(100)))
1938 .unwrap();
1939 let sock = SockRef::from(&client);
1940 sock.set_recv_buffer_size(4096).ok();
1941
1942 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1944 let mut writer = match event {
1945 Event::InterfaceUp(_, Some(w), _) => w,
1946 other => panic!("expected InterfaceUp with writer, got {:?}", other),
1947 };
1948
1949 let payload = vec![0x55; 512 * 1024];
1951 let deadline = Instant::now() + Duration::from_secs(3);
1952 while Instant::now() < deadline {
1953 match writer.send_frame(&payload) {
1954 Ok(()) | Err(_) => {
1955 if Instant::now() + Duration::from_millis(10) > deadline {
1956 break;
1957 }
1958 thread::sleep(Duration::from_millis(10));
1959 }
1960 }
1961 }
1962
1963 let stall_event = wait_for(&rx, Duration::from_secs(3), |e| {
1965 matches!(e, Event::BackbonePeerWriteStall { .. })
1966 });
1967 assert!(
1968 stall_event.is_some(),
1969 "expected BackbonePeerWriteStall event"
1970 );
1971 }
1972
1973 #[test]
1974 fn backbone_blacklisted_peer_rejected_on_connect() {
1975 let port = find_free_port();
1976 let (tx, rx) = crate::event::channel();
1977 let next_id = Arc::new(AtomicU64::new(9800));
1978
1979 let config = make_server_config(port, 98, None, None, None, BackboneAbuseConfig::default());
1980 let peer_state = config.peer_state.clone();
1981
1982 start(config, tx.clone(), next_id).unwrap();
1983 thread::sleep(Duration::from_millis(50));
1984
1985 let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1987 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1988 assert!(
1989 matches!(event, Event::InterfaceUp(_, _, _)),
1990 "first connection should succeed"
1991 );
1992 drop(client1);
1993
1994 thread::sleep(Duration::from_millis(100));
1996 while rx.try_recv().is_ok() {}
1997
1998 peer_state.lock().unwrap().blacklist(
2000 "127.0.0.1".parse().unwrap(),
2001 Duration::from_secs(60),
2002 "test blacklist".into(),
2003 );
2004
2005 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
2007 thread::sleep(Duration::from_millis(200));
2009
2010 let event = rx.try_recv();
2012 match event {
2013 Ok(Event::InterfaceUp(_, _, _)) => {
2014 panic!("blacklisted peer should not get InterfaceUp")
2015 }
2016 _ => {} }
2018 }
2019
2020 #[test]
2021 fn backbone_parse_config_reads_abuse_settings() {
2022 let factory = BackboneInterfaceFactory;
2023 let mut params = HashMap::new();
2024 params.insert("listen_ip".into(), "127.0.0.1".into());
2025 params.insert("listen_port".into(), "4242".into());
2026 params.insert("idle_timeout".into(), "15".into());
2027 params.insert("write_stall_timeout".into(), "45".into());
2028 params.insert("max_penalty_duration".into(), "3600".into());
2029
2030 let config = factory
2031 .parse_config("test-backbone", InterfaceId(97), ¶ms)
2032 .unwrap();
2033 let mode = *config.into_any().downcast::<BackboneMode>().unwrap();
2034
2035 match mode {
2036 BackboneMode::Server(config) => {
2037 assert_eq!(config.listen_ip, "127.0.0.1");
2038 assert_eq!(config.listen_port, 4242);
2039 assert_eq!(config.idle_timeout, Some(Duration::from_secs(15)));
2040 assert_eq!(config.write_stall_timeout, Some(Duration::from_secs(45)));
2041 assert_eq!(
2042 config.abuse.max_penalty_duration,
2043 Some(Duration::from_secs(3600))
2044 );
2045 }
2046 BackboneMode::Client(_) => panic!("expected server config"),
2047 }
2048 }
2049}