1use std::collections::HashMap;
13use std::hash::{BuildHasher, Hasher};
14use std::io::{self, Read, Write};
15use std::net::{IpAddr, Shutdown, TcpListener, TcpStream, ToSocketAddrs};
16use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
17use std::sync::{Arc, Mutex};
18use std::thread;
19use std::time::{Duration, Instant};
20
21use polling::{Event as PollEvent, Events, Poller};
22use socket2::{SockRef, TcpKeepalive};
23
24use rns_core::constants;
25use rns_core::transport::types::{IngressControlConfig, InterfaceId, InterfaceInfo};
26
27use crate::event::{Event, EventSender};
28use crate::hdlc;
29use crate::interface::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult, Writer};
30use crate::BackbonePeerStateEntry;
31
32#[allow(dead_code)]
34const HW_MTU: usize = 1_048_576;
35
36#[derive(Debug, Clone)]
38pub struct BackboneConfig {
39 pub name: String,
40 pub listen_ip: String,
41 pub listen_port: u16,
42 pub interface_id: InterfaceId,
43 pub max_connections: Option<usize>,
44 pub idle_timeout: Option<Duration>,
45 pub write_stall_timeout: Option<Duration>,
46 pub abuse: BackboneAbuseConfig,
47 pub ingress_control: IngressControlConfig,
48 pub runtime: Arc<Mutex<BackboneServerRuntime>>,
49 pub peer_state: Arc<Mutex<BackbonePeerMonitor>>,
50}
51
52#[derive(Debug, Clone, Default)]
54pub struct BackboneAbuseConfig {
55 pub max_penalty_duration: Option<Duration>,
56}
57
58#[derive(Debug, Clone)]
60pub struct BackboneServerRuntime {
61 pub max_connections: Option<usize>,
62 pub idle_timeout: Option<Duration>,
63 pub write_stall_timeout: Option<Duration>,
64 pub abuse: BackboneAbuseConfig,
65}
66
67impl BackboneServerRuntime {
68 pub fn from_config(config: &BackboneConfig) -> Self {
69 Self {
70 max_connections: config.max_connections,
71 idle_timeout: config.idle_timeout,
72 write_stall_timeout: config.write_stall_timeout,
73 abuse: config.abuse.clone(),
74 }
75 }
76}
77
78#[derive(Debug, Clone)]
79pub struct BackboneRuntimeConfigHandle {
80 pub interface_name: String,
81 pub runtime: Arc<Mutex<BackboneServerRuntime>>,
82 pub startup: BackboneServerRuntime,
83}
84
85#[derive(Debug, Clone)]
86pub struct BackbonePeerStateHandle {
87 pub interface_id: InterfaceId,
88 pub interface_name: String,
89 pub peer_state: Arc<Mutex<BackbonePeerMonitor>>,
90}
91
92impl Default for BackboneConfig {
93 fn default() -> Self {
94 let mut config = BackboneConfig {
95 name: String::new(),
96 listen_ip: "0.0.0.0".into(),
97 listen_port: 0,
98 interface_id: InterfaceId(0),
99 max_connections: None,
100 idle_timeout: None,
101 write_stall_timeout: None,
102 abuse: BackboneAbuseConfig::default(),
103 ingress_control: IngressControlConfig::enabled(),
104 runtime: Arc::new(Mutex::new(BackboneServerRuntime {
105 max_connections: None,
106 idle_timeout: None,
107 write_stall_timeout: None,
108 abuse: BackboneAbuseConfig::default(),
109 })),
110 peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
111 };
112 let startup = BackboneServerRuntime::from_config(&config);
113 config.runtime = Arc::new(Mutex::new(startup));
114 config
115 }
116}
117
118const MAX_PENDING_BYTES: usize = 512 * 1024;
121
122struct BackboneWriter {
124 stream: TcpStream,
125 runtime: Arc<Mutex<BackboneServerRuntime>>,
126 interface_name: String,
127 interface_id: InterfaceId,
128 event_tx: EventSender,
129 pending: Vec<u8>,
130 stall_started: Option<Instant>,
131 disconnect_notified: bool,
132 write_stall_flag: Arc<AtomicBool>,
133}
134
135impl Writer for BackboneWriter {
136 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
137 let write_stall_timeout = self.runtime.lock().unwrap().write_stall_timeout;
138 if !self.pending.is_empty() {
139 self.flush_pending(write_stall_timeout)?;
140 if !self.pending.is_empty() {
141 return Err(io::Error::new(
142 io::ErrorKind::WouldBlock,
143 "backbone writer still stalled",
144 ));
145 }
146 }
147
148 let frame = hdlc::frame(data);
149 self.write_buffer(&frame, write_stall_timeout)
150 }
151}
152
153impl BackboneWriter {
154 fn write_buffer(
155 &mut self,
156 data: &[u8],
157 write_stall_timeout: Option<Duration>,
158 ) -> io::Result<()> {
159 let mut written = 0usize;
160 while written < data.len() {
161 match self.stream.write(&data[written..]) {
162 Ok(0) => {
163 return Err(io::Error::new(
164 io::ErrorKind::WriteZero,
165 "backbone writer wrote zero bytes",
166 ))
167 }
168 Ok(n) => {
169 written += n;
170 self.stall_started = None;
171 }
172 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
173 let now = Instant::now();
174 let started = self.stall_started.get_or_insert(now);
175 if let Some(timeout) = write_stall_timeout {
176 if now.duration_since(*started) >= timeout {
177 return Err(self.disconnect_for_write_stall(timeout));
178 }
179 }
180 if self.pending.len() + data[written..].len() > MAX_PENDING_BYTES {
181 return Err(self.disconnect_for_write_stall(
182 write_stall_timeout.unwrap_or(Duration::from_secs(30)),
183 ));
184 }
185 self.pending.extend_from_slice(&data[written..]);
186 return Err(io::Error::new(
187 io::ErrorKind::WouldBlock,
188 "backbone writer would block",
189 ));
190 }
191 Err(e) => return Err(e),
192 }
193 }
194 Ok(())
195 }
196
197 fn flush_pending(&mut self, write_stall_timeout: Option<Duration>) -> io::Result<()> {
198 if self.pending.is_empty() {
199 return Ok(());
200 }
201
202 let pending = std::mem::take(&mut self.pending);
203 match self.write_buffer(&pending, write_stall_timeout) {
204 Ok(()) => Ok(()),
205 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(()),
206 Err(e) => Err(e),
207 }
208 }
209
210 fn disconnect_for_write_stall(&mut self, timeout: Duration) -> io::Error {
211 if !self.disconnect_notified {
212 log::warn!(
213 "[{}] backbone client {} disconnected due to write stall timeout ({:?})",
214 self.interface_name,
215 self.interface_id.0,
216 timeout
217 );
218 self.write_stall_flag.store(true, Ordering::Relaxed);
219 let _ = self.stream.shutdown(Shutdown::Both);
220 let _ = self.event_tx.send(Event::InterfaceDown(self.interface_id));
221 self.disconnect_notified = true;
222 }
223 io::Error::new(
224 io::ErrorKind::TimedOut,
225 format!("backbone writer stalled for {:?}", timeout),
226 )
227 }
228}
229
230pub fn start(config: BackboneConfig, tx: EventSender, next_id: Arc<AtomicU64>) -> io::Result<()> {
232 let addr = format!("{}:{}", config.listen_ip, config.listen_port);
233 let listener = TcpListener::bind(&addr)?;
234 listener.set_nonblocking(true)?;
235
236 log::info!(
237 "[{}] backbone server listening on {}",
238 config.name,
239 listener.local_addr().unwrap_or(addr.parse().unwrap())
240 );
241
242 let name = config.name.clone();
243 let server_interface_id = config.interface_id;
244 let runtime = Arc::clone(&config.runtime);
245 let peer_state = Arc::clone(&config.peer_state);
246 let ingress_control = config.ingress_control;
247 thread::Builder::new()
248 .name(format!("backbone-poll-{}", config.interface_id.0))
249 .spawn(move || {
250 if let Err(e) = poll_loop(
251 listener,
252 name,
253 server_interface_id,
254 tx,
255 next_id,
256 runtime,
257 peer_state,
258 ingress_control,
259 ) {
260 log::error!("backbone poll loop error: {}", e);
261 }
262 })?;
263
264 Ok(())
265}
266
267struct ClientState {
269 id: InterfaceId,
270 peer_ip: IpAddr,
271 peer_port: u16,
272 stream: TcpStream,
273 decoder: hdlc::Decoder,
274 connected_at: Instant,
275 has_received_data: bool,
276 write_stall_flag: Arc<AtomicBool>,
277}
278
279#[derive(Debug, Clone)]
280struct PeerBehaviorState {
281 blacklisted_until: Option<Instant>,
282 blacklist_reason: Option<String>,
283 reject_count: u64,
284 connected_count: usize,
285}
286
287impl PeerBehaviorState {
288 fn new() -> Self {
289 Self {
290 blacklisted_until: None,
291 blacklist_reason: None,
292 reject_count: 0,
293 connected_count: 0,
294 }
295 }
296}
297
298#[derive(Debug, Clone, Default)]
299pub struct BackbonePeerMonitor {
300 peers: HashMap<IpAddr, PeerBehaviorState>,
301}
302
303impl BackbonePeerMonitor {
304 pub fn new() -> Self {
305 Self {
306 peers: HashMap::new(),
307 }
308 }
309
310 fn upsert_snapshot(&mut self, peers: &HashMap<IpAddr, PeerBehaviorState>) {
311 let mut merged = self.peers.clone();
312
313 for (peer_ip, state) in peers {
314 let entry = merged
315 .entry(*peer_ip)
316 .or_insert_with(PeerBehaviorState::new);
317 entry.connected_count = state.connected_count;
318 entry.reject_count = state.reject_count;
319 if state.blacklisted_until.is_some() {
320 entry.blacklisted_until = state.blacklisted_until;
321 entry.blacklist_reason = state.blacklist_reason.clone();
322 }
323 }
324
325 merged.retain(|peer_ip, state| {
326 peers.contains_key(peer_ip)
327 || state.blacklisted_until.is_some()
328 || state.reject_count > 0
329 });
330 self.peers = merged;
331 }
332
333 fn sync_into(&self, peers: &mut HashMap<IpAddr, PeerBehaviorState>) {
334 for (peer_ip, state) in &self.peers {
335 let entry = peers.entry(*peer_ip).or_insert_with(PeerBehaviorState::new);
336 entry.blacklisted_until = state.blacklisted_until;
337 entry.blacklist_reason = state.blacklist_reason.clone();
338 entry.reject_count = state.reject_count;
339 }
340
341 peers.retain(|peer_ip, state| {
342 if state.connected_count > 0 {
343 return true;
344 }
345 self.peers.contains_key(peer_ip)
346 });
347 }
348
349 pub fn list(&self, interface_name: &str) -> Vec<BackbonePeerStateEntry> {
350 let now = Instant::now();
351 let mut entries: Vec<BackbonePeerStateEntry> = self
352 .peers
353 .iter()
354 .map(|(peer_ip, state)| BackbonePeerStateEntry {
355 interface_name: interface_name.to_string(),
356 peer_ip: *peer_ip,
357 connected_count: state.connected_count,
358 blacklisted_remaining_secs: state
359 .blacklisted_until
360 .and_then(|until| (until > now).then(|| (until - now).as_secs_f64())),
361 blacklist_reason: state.blacklist_reason.clone(),
362 reject_count: state.reject_count,
363 })
364 .collect();
365 entries.sort_by(|a, b| a.peer_ip.cmp(&b.peer_ip));
366 entries
367 }
368
369 pub fn clear(&mut self, peer_ip: IpAddr) -> bool {
370 self.peers.remove(&peer_ip).is_some()
371 }
372
373 pub fn blacklist(&mut self, peer_ip: IpAddr, duration: Duration, reason: String) -> bool {
374 let state = self
375 .peers
376 .entry(peer_ip)
377 .or_insert_with(PeerBehaviorState::new);
378 state.blacklisted_until = Some(Instant::now() + duration);
379 state.blacklist_reason = Some(reason);
380 true
381 }
382
383 #[cfg(test)]
384 pub fn seed_entry(&mut self, entry: BackbonePeerStateEntry) {
385 let mut state = PeerBehaviorState::new();
386 state.connected_count = entry.connected_count;
387 state.reject_count = entry.reject_count;
388 state.blacklist_reason = entry.blacklist_reason;
389 if let Some(remaining) = entry.blacklisted_remaining_secs {
390 state.blacklisted_until = Some(Instant::now() + Duration::from_secs_f64(remaining));
391 }
392 self.peers.insert(entry.peer_ip, state);
393 }
394}
395
396#[derive(Clone, Copy)]
397enum DisconnectReason {
398 RemoteClosed,
399 IdleTimeout,
400 WriteStall,
401}
402
403fn poll_loop(
405 listener: TcpListener,
406 name: String,
407 server_interface_id: InterfaceId,
408 tx: EventSender,
409 next_id: Arc<AtomicU64>,
410 runtime: Arc<Mutex<BackboneServerRuntime>>,
411 peer_state: Arc<Mutex<BackbonePeerMonitor>>,
412 ingress_control: IngressControlConfig,
413) -> io::Result<()> {
414 let poller = Poller::new()?;
415
416 const LISTENER_KEY: usize = 0;
417
418 unsafe { poller.add(&listener, PollEvent::readable(LISTENER_KEY))? };
420
421 let mut clients: HashMap<usize, ClientState> = HashMap::new();
422 let mut peers: HashMap<IpAddr, PeerBehaviorState> = HashMap::new();
423 let mut events = Events::new();
424 let mut next_key: usize = 1;
425
426 loop {
427 let runtime_snapshot = runtime.lock().unwrap().clone();
428 let max_connections = runtime_snapshot.max_connections;
429 let idle_timeout = runtime_snapshot.idle_timeout;
430 cleanup_peer_state(&mut peers);
431 {
432 let mut monitor = peer_state.lock().unwrap();
433 monitor.sync_into(&mut peers);
434 monitor.upsert_snapshot(&peers);
435 }
436
437 events.clear();
438 poller.wait(&mut events, Some(Duration::from_secs(1)))?;
439
440 for ev in events.iter() {
441 if ev.key == LISTENER_KEY {
442 loop {
444 match listener.accept() {
445 Ok((stream, peer_addr)) => {
446 let peer_ip = peer_addr.ip();
447 let peer_port = peer_addr.port();
448
449 if is_ip_blacklisted(&mut peers, peer_ip) {
450 if let Some(state) = peers.get_mut(&peer_ip) {
451 state.reject_count = state.reject_count.saturating_add(1);
452 }
453 peer_state.lock().unwrap().upsert_snapshot(&peers);
454 log::debug!("[{}] rejecting blacklisted peer {}", name, peer_addr);
455 drop(stream);
456 continue;
457 }
458
459 if let Some(max) = max_connections {
460 if clients.len() >= max {
461 log::warn!(
462 "[{}] max connections ({}) reached, rejecting {}",
463 name,
464 max,
465 peer_addr
466 );
467 drop(stream);
468 continue;
469 }
470 }
471
472 stream.set_nonblocking(true).ok();
473 stream.set_nodelay(true).ok();
474 set_tcp_keepalive(&stream).ok();
475
476 #[cfg(target_os = "macos")]
478 {
479 let sock = SockRef::from(&stream);
480 sock.set_nosigpipe(true).ok();
481 }
482
483 let key = next_key;
484 next_key += 1;
485 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
486
487 log::info!(
488 "[{}] backbone client connected: {} → id {}",
489 name,
490 peer_addr,
491 client_id.0
492 );
493
494 if let Err(e) = unsafe { poller.add(&stream, PollEvent::readable(key)) }
497 {
498 log::warn!("[{}] failed to add client to poller: {}", name, e);
499 continue; }
501
502 let writer_stream = match stream.try_clone() {
504 Ok(s) => s,
505 Err(e) => {
506 log::warn!("[{}] failed to clone client stream: {}", name, e);
507 let _ = poller.delete(&stream);
508 continue; }
510 };
511 let write_stall_flag = Arc::new(AtomicBool::new(false));
512 let writer: Box<dyn Writer> = Box::new(BackboneWriter {
513 stream: writer_stream,
514 runtime: Arc::clone(&runtime),
515 interface_name: name.clone(),
516 interface_id: client_id,
517 event_tx: tx.clone(),
518 pending: Vec::new(),
519 stall_started: None,
520 disconnect_notified: false,
521 write_stall_flag: Arc::clone(&write_stall_flag),
522 });
523
524 clients.insert(
525 key,
526 ClientState {
527 id: client_id,
528 peer_ip,
529 peer_port,
530 stream,
531 decoder: hdlc::Decoder::new(),
532 connected_at: Instant::now(),
533 has_received_data: false,
534 write_stall_flag,
535 },
536 );
537 peers
538 .entry(peer_ip)
539 .or_insert_with(PeerBehaviorState::new)
540 .connected_count += 1;
541 peer_state.lock().unwrap().upsert_snapshot(&peers);
542 let _ = tx.send(Event::BackbonePeerConnected {
543 server_interface_id,
544 peer_interface_id: client_id,
545 peer_ip,
546 peer_port,
547 });
548
549 let info = InterfaceInfo {
550 id: client_id,
551 name: format!("BackboneInterface/{}", client_id.0),
552 mode: constants::MODE_FULL,
553 out_capable: true,
554 in_capable: true,
555 bitrate: Some(1_000_000_000), announce_rate_target: None,
557 announce_rate_grace: 0,
558 announce_rate_penalty: 0.0,
559 announce_cap: constants::ANNOUNCE_CAP,
560 is_local_client: false,
561 wants_tunnel: false,
562 tunnel_id: None,
563 mtu: 65535,
564 ia_freq: 0.0,
565 started: 0.0,
566 ingress_control,
567 };
568
569 if tx
570 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
571 .is_err()
572 {
573 cleanup(&poller, &clients, &listener);
575 return Ok(());
576 }
577 }
578 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
579 Err(e) => {
580 log::warn!("[{}] accept error: {}", name, e);
581 break;
582 }
583 }
584 }
585 poller.modify(&listener, PollEvent::readable(LISTENER_KEY))?;
587 } else if clients.contains_key(&ev.key) {
588 let key = ev.key;
589 let mut should_remove = false;
590 let mut client_id = InterfaceId(0);
591
592 let mut buf = [0u8; 4096];
593 let read_result = {
594 let client = clients.get_mut(&key).unwrap();
595 client.stream.read(&mut buf)
596 };
597
598 match read_result {
599 Ok(0) | Err(_) => {
600 if let Some(c) = clients.get(&key) {
601 client_id = c.id;
602 }
603 should_remove = true;
604 }
605 Ok(n) => {
606 let client = clients.get_mut(&key).unwrap();
607 client_id = client.id;
608 client.has_received_data = true;
609 for frame in client.decoder.feed(&buf[..n]) {
610 if tx
611 .send(Event::Frame {
612 interface_id: client_id,
613 data: frame,
614 })
615 .is_err()
616 {
617 cleanup(&poller, &clients, &listener);
618 return Ok(());
619 }
620 }
621 }
622 }
623
624 if should_remove {
625 let reason = if clients
626 .get(&key)
627 .is_some_and(|c| c.write_stall_flag.load(Ordering::Relaxed))
628 {
629 DisconnectReason::WriteStall
630 } else {
631 DisconnectReason::RemoteClosed
632 };
633 disconnect_client(
634 &poller,
635 &mut clients,
636 &mut peers,
637 &name,
638 server_interface_id,
639 &tx,
640 &peer_state,
641 key,
642 client_id,
643 reason,
644 );
645 } else if let Some(client) = clients.get(&key) {
646 poller.modify(&client.stream, PollEvent::readable(key))?;
648 }
649 }
650 }
651
652 if let Some(timeout) = idle_timeout {
653 let now = Instant::now();
654 let timed_out: Vec<(usize, InterfaceId)> = clients
655 .iter()
656 .filter_map(|(&key, client)| {
657 if client.has_received_data || now.duration_since(client.connected_at) < timeout
658 {
659 None
660 } else {
661 Some((key, client.id))
662 }
663 })
664 .collect();
665
666 for (key, client_id) in timed_out {
667 disconnect_client(
668 &poller,
669 &mut clients,
670 &mut peers,
671 &name,
672 server_interface_id,
673 &tx,
674 &peer_state,
675 key,
676 client_id,
677 DisconnectReason::IdleTimeout,
678 );
679 }
680 }
681 }
682}
683
684fn cleanup_peer_state(peers: &mut HashMap<IpAddr, PeerBehaviorState>) {
685 let now = Instant::now();
686 peers.retain(|_, state| {
687 if matches!(state.blacklisted_until, Some(until) if now >= until) {
688 state.blacklisted_until = None;
689 state.blacklist_reason = None;
690 }
691 state.blacklisted_until.is_some() || state.connected_count > 0 || state.reject_count > 0
692 });
693}
694
695fn is_ip_blacklisted(peers: &mut HashMap<IpAddr, PeerBehaviorState>, peer_ip: IpAddr) -> bool {
696 let now = Instant::now();
697 if let Some(state) = peers.get_mut(&peer_ip) {
698 if let Some(until) = state.blacklisted_until {
699 if now < until {
700 return true;
701 }
702 state.blacklisted_until = None;
703 }
704 }
705 false
706}
707
708fn disconnect_client(
709 poller: &Poller,
710 clients: &mut HashMap<usize, ClientState>,
711 peers: &mut HashMap<IpAddr, PeerBehaviorState>,
712 name: &str,
713 server_interface_id: InterfaceId,
714 tx: &EventSender,
715 peer_state: &Arc<Mutex<BackbonePeerMonitor>>,
716 key: usize,
717 client_id: InterfaceId,
718 reason: DisconnectReason,
719) {
720 let Some(client) = clients.remove(&key) else {
721 return;
722 };
723
724 match reason {
725 DisconnectReason::RemoteClosed => {
726 log::info!("[{}] backbone client {} disconnected", name, client_id.0);
727 }
728 DisconnectReason::IdleTimeout => {
729 log::info!(
730 "[{}] backbone client {} disconnected due to idle timeout",
731 name,
732 client_id.0
733 );
734 }
735 DisconnectReason::WriteStall => {
736 }
738 }
739
740 let _ = poller.delete(&client.stream);
741 let connected_for = client.connected_at.elapsed();
743 let _ = tx.send(Event::BackbonePeerDisconnected {
744 server_interface_id,
745 peer_interface_id: client.id,
746 peer_ip: client.peer_ip,
747 peer_port: client.peer_port,
748 connected_for,
749 had_received_data: client.has_received_data,
750 });
751 match reason {
752 DisconnectReason::IdleTimeout => {
753 let _ = tx.send(Event::BackbonePeerIdleTimeout {
754 server_interface_id,
755 peer_interface_id: client.id,
756 peer_ip: client.peer_ip,
757 peer_port: client.peer_port,
758 connected_for,
759 });
760 }
761 DisconnectReason::WriteStall => {
762 let _ = tx.send(Event::BackbonePeerWriteStall {
763 server_interface_id,
764 peer_interface_id: client.id,
765 peer_ip: client.peer_ip,
766 peer_port: client.peer_port,
767 connected_for,
768 });
769 }
770 DisconnectReason::RemoteClosed => {}
771 }
772
773 if let Some(state) = peers.get_mut(&client.peer_ip) {
774 state.connected_count = state.connected_count.saturating_sub(1);
775 }
776 peer_state.lock().unwrap().upsert_snapshot(peers);
777 if !matches!(reason, DisconnectReason::WriteStall) {
779 let _ = tx.send(Event::InterfaceDown(client_id));
780 }
781}
782
783fn set_tcp_keepalive(stream: &TcpStream) -> io::Result<()> {
784 let sock = SockRef::from(stream);
785 let mut keepalive = TcpKeepalive::new()
786 .with_time(Duration::from_secs(5))
787 .with_interval(Duration::from_secs(2));
788 #[cfg(any(target_os = "linux", target_os = "macos"))]
789 {
790 keepalive = keepalive.with_retries(12);
791 }
792 sock.set_tcp_keepalive(&keepalive)
793}
794
795fn cleanup(poller: &Poller, clients: &HashMap<usize, ClientState>, listener: &TcpListener) {
796 for (_, client) in clients {
797 let _ = poller.delete(&client.stream);
798 }
799 let _ = poller.delete(listener);
800}
801
802#[derive(Debug, Clone)]
808pub struct BackboneClientConfig {
809 pub name: String,
810 pub target_host: String,
811 pub target_port: u16,
812 pub interface_id: InterfaceId,
813 pub reconnect_wait: Duration,
814 pub max_reconnect_tries: Option<u32>,
815 pub connect_timeout: Duration,
816 pub transport_identity: Option<String>,
817 pub runtime: Arc<Mutex<BackboneClientRuntime>>,
818}
819
820#[derive(Debug, Clone)]
821pub struct BackboneClientRuntime {
822 pub reconnect_wait: Duration,
823 pub max_reconnect_tries: Option<u32>,
824 pub connect_timeout: Duration,
825}
826
827impl BackboneClientRuntime {
828 pub fn from_config(config: &BackboneClientConfig) -> Self {
829 Self {
830 reconnect_wait: config.reconnect_wait,
831 max_reconnect_tries: config.max_reconnect_tries,
832 connect_timeout: config.connect_timeout,
833 }
834 }
835}
836
837#[derive(Debug, Clone)]
838pub struct BackboneClientRuntimeConfigHandle {
839 pub interface_name: String,
840 pub runtime: Arc<Mutex<BackboneClientRuntime>>,
841 pub startup: BackboneClientRuntime,
842}
843
844impl Default for BackboneClientConfig {
845 fn default() -> Self {
846 let mut config = BackboneClientConfig {
847 name: String::new(),
848 target_host: "127.0.0.1".into(),
849 target_port: 4242,
850 interface_id: InterfaceId(0),
851 reconnect_wait: Duration::from_secs(5),
852 max_reconnect_tries: None,
853 connect_timeout: Duration::from_secs(5),
854 transport_identity: None,
855 runtime: Arc::new(Mutex::new(BackboneClientRuntime {
856 reconnect_wait: Duration::from_secs(5),
857 max_reconnect_tries: None,
858 connect_timeout: Duration::from_secs(5),
859 })),
860 };
861 let startup = BackboneClientRuntime::from_config(&config);
862 config.runtime = Arc::new(Mutex::new(startup));
863 config
864 }
865}
866
867struct BackboneClientWriter {
869 stream: TcpStream,
870}
871
872impl Writer for BackboneClientWriter {
873 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
874 self.stream.write_all(&hdlc::frame(data))
875 }
876}
877
878fn try_connect_client(config: &BackboneClientConfig) -> io::Result<TcpStream> {
880 let runtime = config.runtime.lock().unwrap().clone();
881 let addr_str = format!("{}:{}", config.target_host, config.target_port);
882 let addr = addr_str
883 .to_socket_addrs()?
884 .next()
885 .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "no addresses resolved"))?;
886
887 let stream = TcpStream::connect_timeout(&addr, runtime.connect_timeout)?;
888 stream.set_nodelay(true)?;
889 set_tcp_keepalive(&stream).ok();
890
891 #[cfg(target_os = "macos")]
893 {
894 let sock = SockRef::from(&stream);
895 sock.set_nosigpipe(true).ok();
896 }
897
898 Ok(stream)
899}
900
901pub fn start_client(config: BackboneClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
903 let stream = try_connect_client(&config)?;
904 let reader_stream = stream.try_clone()?;
905 let writer_stream = stream.try_clone()?;
906
907 let id = config.interface_id;
908 log::info!(
909 "[{}] backbone client connected to {}:{}",
910 config.name,
911 config.target_host,
912 config.target_port
913 );
914
915 let _ = tx.send(Event::InterfaceUp(id, None, None));
917
918 thread::Builder::new()
919 .name(format!("backbone-client-{}", id.0))
920 .spawn(move || {
921 client_reader_loop(reader_stream, config, tx);
922 })?;
923
924 Ok(Box::new(BackboneClientWriter {
925 stream: writer_stream,
926 }))
927}
928
929fn client_reader_loop(mut stream: TcpStream, config: BackboneClientConfig, tx: EventSender) {
932 let id = config.interface_id;
933 let mut decoder = hdlc::Decoder::new();
934 let mut buf = [0u8; 4096];
935
936 loop {
937 match stream.read(&mut buf) {
938 Ok(0) => {
939 log::warn!("[{}] connection closed", config.name);
940 let _ = tx.send(Event::InterfaceDown(id));
941 match client_reconnect(&config, &tx) {
942 Some(new_stream) => {
943 stream = new_stream;
944 decoder = hdlc::Decoder::new();
945 continue;
946 }
947 None => {
948 log::error!("[{}] reconnection failed, giving up", config.name);
949 return;
950 }
951 }
952 }
953 Ok(n) => {
954 for frame in decoder.feed(&buf[..n]) {
955 if tx
956 .send(Event::Frame {
957 interface_id: id,
958 data: frame,
959 })
960 .is_err()
961 {
962 return;
963 }
964 }
965 }
966 Err(e) => {
967 log::warn!("[{}] read error: {}", config.name, e);
968 let _ = tx.send(Event::InterfaceDown(id));
969 match client_reconnect(&config, &tx) {
970 Some(new_stream) => {
971 stream = new_stream;
972 decoder = hdlc::Decoder::new();
973 continue;
974 }
975 None => {
976 log::error!("[{}] reconnection failed, giving up", config.name);
977 return;
978 }
979 }
980 }
981 }
982 }
983}
984
985const MAX_BACKOFF_SHIFT: u32 = 6;
988
989fn client_reconnect(config: &BackboneClientConfig, tx: &EventSender) -> Option<TcpStream> {
993 let mut attempts = 0u32;
994 loop {
995 let runtime = config.runtime.lock().unwrap().clone();
996
997 let shift = attempts.min(MAX_BACKOFF_SHIFT);
998 let backoff = runtime.reconnect_wait * 2u32.pow(shift);
999 let jitter_range = backoff / 4;
1001 let jitter = if jitter_range.as_nanos() > 0 {
1002 let offset = Duration::from_nanos(
1003 (std::hash::RandomState::new().build_hasher().finish()
1004 % jitter_range.as_nanos() as u64)
1005 * 2,
1006 );
1007 if offset > jitter_range {
1008 backoff + (offset - jitter_range)
1009 } else {
1010 backoff - (jitter_range - offset)
1011 }
1012 } else {
1013 backoff
1014 };
1015 thread::sleep(jitter);
1016
1017 attempts += 1;
1018
1019 if let Some(max) = runtime.max_reconnect_tries {
1020 if attempts > max {
1021 let _ = tx.send(Event::InterfaceDown(config.interface_id));
1022 return None;
1023 }
1024 }
1025
1026 log::info!(
1027 "[{}] reconnect attempt {} (backoff {:.1}s) ...",
1028 config.name,
1029 attempts,
1030 jitter.as_secs_f64(),
1031 );
1032
1033 match try_connect_client(config) {
1034 Ok(new_stream) => {
1035 let writer_stream = match new_stream.try_clone() {
1036 Ok(s) => s,
1037 Err(e) => {
1038 log::warn!("[{}] failed to clone stream: {}", config.name, e);
1039 continue;
1040 }
1041 };
1042 log::info!(
1043 "[{}] reconnected after {} attempt(s)",
1044 config.name,
1045 attempts
1046 );
1047 let new_writer: Box<dyn Writer> = Box::new(BackboneClientWriter {
1048 stream: writer_stream,
1049 });
1050 let _ = tx.send(Event::InterfaceUp(
1051 config.interface_id,
1052 Some(new_writer),
1053 None,
1054 ));
1055 return Some(new_stream);
1056 }
1057 Err(e) => {
1058 log::warn!("[{}] reconnect failed: {}", config.name, e);
1059 }
1060 }
1061 }
1062}
1063
1064#[derive(Clone)]
1071pub(crate) enum BackboneMode {
1072 Server(BackboneConfig),
1073 Client(BackboneClientConfig),
1074}
1075
1076pub struct BackboneInterfaceFactory;
1082
1083fn parse_positive_duration_secs(params: &HashMap<String, String>, key: &str) -> Option<Duration> {
1084 params
1085 .get(key)
1086 .and_then(|v| v.parse::<f64>().ok())
1087 .filter(|v| *v > 0.0)
1088 .map(Duration::from_secs_f64)
1089}
1090
1091impl InterfaceFactory for BackboneInterfaceFactory {
1092 fn type_name(&self) -> &str {
1093 "BackboneInterface"
1094 }
1095
1096 fn parse_config(
1097 &self,
1098 name: &str,
1099 id: InterfaceId,
1100 params: &HashMap<String, String>,
1101 ) -> Result<Box<dyn InterfaceConfigData>, String> {
1102 if let Some(target_host) = params.get("remote").or_else(|| params.get("target_host")) {
1103 let target_host = target_host.clone();
1105 let target_port = params
1106 .get("target_port")
1107 .or_else(|| params.get("port"))
1108 .and_then(|v| v.parse().ok())
1109 .unwrap_or(4242);
1110 let transport_identity = params.get("transport_identity").cloned();
1111 Ok(Box::new(BackboneMode::Client(BackboneClientConfig {
1112 name: name.to_string(),
1113 target_host,
1114 target_port,
1115 interface_id: id,
1116 transport_identity,
1117 ..BackboneClientConfig::default()
1118 })))
1119 } else {
1120 let listen_ip = params
1122 .get("listen_ip")
1123 .or_else(|| params.get("device"))
1124 .cloned()
1125 .unwrap_or_else(|| "0.0.0.0".into());
1126 let listen_port = params
1127 .get("listen_port")
1128 .or_else(|| params.get("port"))
1129 .and_then(|v| v.parse().ok())
1130 .unwrap_or(4242);
1131 let max_connections = params.get("max_connections").and_then(|v| v.parse().ok());
1132 let idle_timeout = parse_positive_duration_secs(params, "idle_timeout");
1133 let write_stall_timeout = parse_positive_duration_secs(params, "write_stall_timeout");
1134 let abuse = BackboneAbuseConfig {
1135 max_penalty_duration: parse_positive_duration_secs(params, "max_penalty_duration"),
1136 };
1137 let mut config = BackboneConfig {
1138 name: name.to_string(),
1139 listen_ip,
1140 listen_port,
1141 interface_id: id,
1142 max_connections,
1143 idle_timeout,
1144 write_stall_timeout,
1145 abuse,
1146 ingress_control: IngressControlConfig::enabled(),
1147 runtime: Arc::new(Mutex::new(BackboneServerRuntime {
1148 max_connections: None,
1149 idle_timeout: None,
1150 write_stall_timeout: None,
1151 abuse: BackboneAbuseConfig::default(),
1152 })),
1153 peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
1154 };
1155 let startup = BackboneServerRuntime::from_config(&config);
1156 config.runtime = Arc::new(Mutex::new(startup));
1157 Ok(Box::new(BackboneMode::Server(config)))
1158 }
1159 }
1160
1161 fn start(
1162 &self,
1163 config: Box<dyn InterfaceConfigData>,
1164 ctx: StartContext,
1165 ) -> io::Result<StartResult> {
1166 let mode = *config.into_any().downcast::<BackboneMode>().map_err(|_| {
1167 io::Error::new(
1168 io::ErrorKind::InvalidData,
1169 "wrong config type for BackboneInterface",
1170 )
1171 })?;
1172
1173 match mode {
1174 BackboneMode::Client(cfg) => {
1175 let id = cfg.interface_id;
1176 let name = cfg.name.clone();
1177 let info = InterfaceInfo {
1178 id,
1179 name,
1180 mode: ctx.mode,
1181 out_capable: true,
1182 in_capable: true,
1183 bitrate: Some(1_000_000_000),
1184 announce_rate_target: None,
1185 announce_rate_grace: 0,
1186 announce_rate_penalty: 0.0,
1187 announce_cap: constants::ANNOUNCE_CAP,
1188 is_local_client: false,
1189 wants_tunnel: false,
1190 tunnel_id: None,
1191 mtu: 65535,
1192 ingress_control: ctx.ingress_control,
1193 ia_freq: 0.0,
1194 started: crate::time::now(),
1195 };
1196 let writer = start_client(cfg, ctx.tx)?;
1197 Ok(StartResult::Simple {
1198 id,
1199 info,
1200 writer,
1201 interface_type_name: "BackboneInterface".to_string(),
1202 })
1203 }
1204 BackboneMode::Server(mut cfg) => {
1205 cfg.ingress_control = ctx.ingress_control;
1206 start(cfg, ctx.tx, ctx.next_dynamic_id)?;
1207 Ok(StartResult::Listener { control: None })
1208 }
1209 }
1210 }
1211}
1212
1213pub(crate) fn runtime_handle_from_mode(mode: &BackboneMode) -> Option<BackboneRuntimeConfigHandle> {
1214 match mode {
1215 BackboneMode::Server(config) => Some(BackboneRuntimeConfigHandle {
1216 interface_name: config.name.clone(),
1217 runtime: Arc::clone(&config.runtime),
1218 startup: BackboneServerRuntime::from_config(config),
1219 }),
1220 BackboneMode::Client(_) => None,
1221 }
1222}
1223
1224pub(crate) fn peer_state_handle_from_mode(mode: &BackboneMode) -> Option<BackbonePeerStateHandle> {
1225 match mode {
1226 BackboneMode::Server(config) => Some(BackbonePeerStateHandle {
1227 interface_id: config.interface_id,
1228 interface_name: config.name.clone(),
1229 peer_state: Arc::clone(&config.peer_state),
1230 }),
1231 BackboneMode::Client(_) => None,
1232 }
1233}
1234
1235pub(crate) fn client_runtime_handle_from_mode(
1236 mode: &BackboneMode,
1237) -> Option<BackboneClientRuntimeConfigHandle> {
1238 match mode {
1239 BackboneMode::Client(config) => Some(BackboneClientRuntimeConfigHandle {
1240 interface_name: config.name.clone(),
1241 runtime: Arc::clone(&config.runtime),
1242 startup: BackboneClientRuntime::from_config(config),
1243 }),
1244 BackboneMode::Server(_) => None,
1245 }
1246}
1247
1248pub(crate) fn client_config_from_mode(mode: &BackboneMode) -> Option<BackboneClientConfig> {
1249 match mode {
1250 BackboneMode::Client(config) => Some(config.clone()),
1251 BackboneMode::Server(_) => None,
1252 }
1253}
1254
1255#[cfg(test)]
1256mod tests {
1257 use super::*;
1258 use std::sync::mpsc;
1259 use std::time::Duration;
1260
1261 fn find_free_port() -> u16 {
1262 TcpListener::bind("127.0.0.1:0")
1263 .unwrap()
1264 .local_addr()
1265 .unwrap()
1266 .port()
1267 }
1268
1269 fn recv_non_peer_event(
1270 rx: &mpsc::Receiver<Event>,
1271 timeout: Duration,
1272 ) -> Result<Event, mpsc::RecvTimeoutError> {
1273 let deadline = Instant::now() + timeout;
1274 loop {
1275 let remaining = deadline.saturating_duration_since(Instant::now());
1276 if remaining.is_zero() {
1277 return Err(mpsc::RecvTimeoutError::Timeout);
1278 }
1279 let event = rx.recv_timeout(remaining)?;
1280 match event {
1281 Event::BackbonePeerConnected { .. }
1282 | Event::BackbonePeerDisconnected { .. }
1283 | Event::BackbonePeerIdleTimeout { .. }
1284 | Event::BackbonePeerWriteStall { .. }
1285 | Event::BackbonePeerPenalty { .. } => continue,
1286 other => return Ok(other),
1287 }
1288 }
1289 }
1290
1291 fn make_server_config(
1292 port: u16,
1293 interface_id: u64,
1294 max_connections: Option<usize>,
1295 idle_timeout: Option<Duration>,
1296 write_stall_timeout: Option<Duration>,
1297 abuse: BackboneAbuseConfig,
1298 ) -> BackboneConfig {
1299 let mut config = BackboneConfig {
1300 name: "test-backbone".into(),
1301 listen_ip: "127.0.0.1".into(),
1302 listen_port: port,
1303 interface_id: InterfaceId(interface_id),
1304 max_connections,
1305 idle_timeout,
1306 write_stall_timeout,
1307 abuse,
1308 ingress_control: IngressControlConfig::enabled(),
1309 runtime: Arc::new(Mutex::new(BackboneServerRuntime {
1310 max_connections: None,
1311 idle_timeout: None,
1312 write_stall_timeout: None,
1313 abuse: BackboneAbuseConfig::default(),
1314 })),
1315 peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
1316 };
1317 let startup = BackboneServerRuntime::from_config(&config);
1318 config.runtime = Arc::new(Mutex::new(startup));
1319 config
1320 }
1321
1322 #[test]
1323 fn backbone_accept_connection() {
1324 let port = find_free_port();
1325 let (tx, rx) = crate::event::channel();
1326 let next_id = Arc::new(AtomicU64::new(8000));
1327
1328 let config = make_server_config(port, 80, None, None, None, BackboneAbuseConfig::default());
1329
1330 start(config, tx, next_id).unwrap();
1331 thread::sleep(Duration::from_millis(50));
1332
1333 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1334
1335 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1336 match event {
1337 Event::InterfaceUp(id, writer, info) => {
1338 assert_eq!(id, InterfaceId(8000));
1339 assert!(writer.is_some());
1340 assert!(info.is_some());
1341 let info = info.unwrap();
1342 assert!(info.out_capable);
1343 assert!(info.in_capable);
1344 }
1345 other => panic!("expected InterfaceUp, got {:?}", other),
1346 }
1347 }
1348
1349 #[test]
1350 fn backbone_receive_frame() {
1351 let port = find_free_port();
1352 let (tx, rx) = crate::event::channel();
1353 let next_id = Arc::new(AtomicU64::new(8100));
1354
1355 let config = make_server_config(port, 81, None, None, None, BackboneAbuseConfig::default());
1356
1357 start(config, tx, next_id).unwrap();
1358 thread::sleep(Duration::from_millis(50));
1359
1360 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1361
1362 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1364
1365 let payload: Vec<u8> = (0..32).collect();
1367 client.write_all(&hdlc::frame(&payload)).unwrap();
1368
1369 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1370 match event {
1371 Event::Frame { interface_id, data } => {
1372 assert_eq!(interface_id, InterfaceId(8100));
1373 assert_eq!(data, payload);
1374 }
1375 other => panic!("expected Frame, got {:?}", other),
1376 }
1377 }
1378
1379 #[test]
1380 fn backbone_send_to_client() {
1381 let port = find_free_port();
1382 let (tx, rx) = crate::event::channel();
1383 let next_id = Arc::new(AtomicU64::new(8200));
1384
1385 let config = make_server_config(port, 82, None, None, None, BackboneAbuseConfig::default());
1386
1387 start(config, tx, next_id).unwrap();
1388 thread::sleep(Duration::from_millis(50));
1389
1390 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1391 client
1392 .set_read_timeout(Some(Duration::from_secs(2)))
1393 .unwrap();
1394
1395 let event = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1397 let mut writer = match event {
1398 Event::InterfaceUp(_, Some(w), _) => w,
1399 other => panic!("expected InterfaceUp with writer, got {:?}", other),
1400 };
1401
1402 let payload: Vec<u8> = (0..24).collect();
1404 writer.send_frame(&payload).unwrap();
1405
1406 let mut buf = [0u8; 256];
1408 let n = client.read(&mut buf).unwrap();
1409 let expected = hdlc::frame(&payload);
1410 assert_eq!(&buf[..n], &expected[..]);
1411 }
1412
1413 #[test]
1414 fn backbone_multiple_clients() {
1415 let port = find_free_port();
1416 let (tx, rx) = crate::event::channel();
1417 let next_id = Arc::new(AtomicU64::new(8300));
1418
1419 let config = make_server_config(port, 83, None, None, None, BackboneAbuseConfig::default());
1420
1421 start(config, tx, next_id).unwrap();
1422 thread::sleep(Duration::from_millis(50));
1423
1424 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1425 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1426
1427 let mut ids = Vec::new();
1428 for _ in 0..2 {
1429 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1430 match event {
1431 Event::InterfaceUp(id, _, _) => ids.push(id),
1432 other => panic!("expected InterfaceUp, got {:?}", other),
1433 }
1434 }
1435
1436 assert_eq!(ids.len(), 2);
1437 assert_ne!(ids[0], ids[1]);
1438 }
1439
1440 #[test]
1441 fn backbone_client_disconnect() {
1442 let port = find_free_port();
1443 let (tx, rx) = crate::event::channel();
1444 let next_id = Arc::new(AtomicU64::new(8400));
1445
1446 let config = make_server_config(port, 84, None, None, None, BackboneAbuseConfig::default());
1447
1448 start(config, tx, next_id).unwrap();
1449 thread::sleep(Duration::from_millis(50));
1450
1451 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1452
1453 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1455
1456 drop(client);
1458
1459 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1461 assert!(
1462 matches!(event, Event::InterfaceDown(InterfaceId(8400))),
1463 "expected InterfaceDown(8400), got {:?}",
1464 event
1465 );
1466 }
1467
1468 #[test]
1469 fn backbone_epoll_multiplexing() {
1470 let port = find_free_port();
1471 let (tx, rx) = crate::event::channel();
1472 let next_id = Arc::new(AtomicU64::new(8500));
1473
1474 let config = make_server_config(port, 85, None, None, None, BackboneAbuseConfig::default());
1475
1476 start(config, tx, next_id).unwrap();
1477 thread::sleep(Duration::from_millis(50));
1478
1479 let mut client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1480 let mut client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1481
1482 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1484 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1485
1486 let payload1: Vec<u8> = (0..24).collect();
1488 let payload2: Vec<u8> = (100..130).collect();
1489 client1.write_all(&hdlc::frame(&payload1)).unwrap();
1490 client2.write_all(&hdlc::frame(&payload2)).unwrap();
1491
1492 let mut received = Vec::new();
1494 for _ in 0..2 {
1495 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1496 match event {
1497 Event::Frame { data, .. } => received.push(data),
1498 other => panic!("expected Frame, got {:?}", other),
1499 }
1500 }
1501 assert!(received.contains(&payload1));
1502 assert!(received.contains(&payload2));
1503 }
1504
1505 #[test]
1506 fn backbone_bind_port() {
1507 let port = find_free_port();
1508 let (tx, _rx) = crate::event::channel();
1509 let next_id = Arc::new(AtomicU64::new(8600));
1510
1511 let config = make_server_config(port, 86, None, None, None, BackboneAbuseConfig::default());
1512
1513 start(config, tx, next_id).unwrap();
1515 }
1516
1517 #[test]
1518 fn backbone_hdlc_fragmented() {
1519 let port = find_free_port();
1520 let (tx, rx) = crate::event::channel();
1521 let next_id = Arc::new(AtomicU64::new(8700));
1522
1523 let config = make_server_config(port, 87, None, None, None, BackboneAbuseConfig::default());
1524
1525 start(config, tx, next_id).unwrap();
1526 thread::sleep(Duration::from_millis(50));
1527
1528 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1529 client.set_nodelay(true).unwrap();
1530
1531 let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1533
1534 let payload: Vec<u8> = (0..32).collect();
1536 let framed = hdlc::frame(&payload);
1537 let mid = framed.len() / 2;
1538
1539 client.write_all(&framed[..mid]).unwrap();
1540 thread::sleep(Duration::from_millis(50));
1541 client.write_all(&framed[mid..]).unwrap();
1542
1543 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1545 match event {
1546 Event::Frame { data, .. } => {
1547 assert_eq!(data, payload);
1548 }
1549 other => panic!("expected Frame, got {:?}", other),
1550 }
1551 }
1552
1553 fn make_client_config(port: u16, id: u64) -> BackboneClientConfig {
1558 BackboneClientConfig {
1559 name: format!("test-bb-client-{}", port),
1560 target_host: "127.0.0.1".into(),
1561 target_port: port,
1562 interface_id: InterfaceId(id),
1563 reconnect_wait: Duration::from_millis(100),
1564 max_reconnect_tries: Some(2),
1565 connect_timeout: Duration::from_secs(2),
1566 transport_identity: None,
1567 runtime: Arc::new(Mutex::new(BackboneClientRuntime {
1568 reconnect_wait: Duration::from_millis(100),
1569 max_reconnect_tries: Some(2),
1570 connect_timeout: Duration::from_secs(2),
1571 })),
1572 }
1573 }
1574
1575 #[test]
1576 fn backbone_client_connect() {
1577 let port = find_free_port();
1578 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1579 let (tx, rx) = crate::event::channel();
1580
1581 let config = make_client_config(port, 9000);
1582 let _writer = start_client(config, tx).unwrap();
1583
1584 let _server_stream = listener.accept().unwrap();
1585
1586 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1587 assert!(matches!(event, Event::InterfaceUp(InterfaceId(9000), _, _)));
1588 }
1589
1590 #[test]
1591 fn backbone_client_receive_frame() {
1592 let port = find_free_port();
1593 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1594 let (tx, rx) = crate::event::channel();
1595
1596 let config = make_client_config(port, 9100);
1597 let _writer = start_client(config, tx).unwrap();
1598
1599 let (mut server_stream, _) = listener.accept().unwrap();
1600
1601 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1603
1604 let payload: Vec<u8> = (0..32).collect();
1606 server_stream.write_all(&hdlc::frame(&payload)).unwrap();
1607
1608 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1609 match event {
1610 Event::Frame { interface_id, data } => {
1611 assert_eq!(interface_id, InterfaceId(9100));
1612 assert_eq!(data, payload);
1613 }
1614 other => panic!("expected Frame, got {:?}", other),
1615 }
1616 }
1617
1618 #[test]
1619 fn backbone_client_send_frame() {
1620 let port = find_free_port();
1621 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1622 let (tx, _rx) = crate::event::channel();
1623
1624 let config = make_client_config(port, 9200);
1625 let mut writer = start_client(config, tx).unwrap();
1626
1627 let (mut server_stream, _) = listener.accept().unwrap();
1628 server_stream
1629 .set_read_timeout(Some(Duration::from_secs(2)))
1630 .unwrap();
1631
1632 let payload: Vec<u8> = (0..24).collect();
1633 writer.send_frame(&payload).unwrap();
1634
1635 let mut buf = [0u8; 256];
1636 let n = server_stream.read(&mut buf).unwrap();
1637 let expected = hdlc::frame(&payload);
1638 assert_eq!(&buf[..n], &expected[..]);
1639 }
1640
1641 #[test]
1642 fn backbone_max_connections_rejects_excess() {
1643 let port = find_free_port();
1644 let (tx, rx) = crate::event::channel();
1645 let next_id = Arc::new(AtomicU64::new(8800));
1646
1647 let config = make_server_config(
1648 port,
1649 88,
1650 Some(2),
1651 None,
1652 None,
1653 BackboneAbuseConfig::default(),
1654 );
1655
1656 start(config, tx, next_id).unwrap();
1657 thread::sleep(Duration::from_millis(50));
1658
1659 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1661 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1662
1663 for _ in 0..2 {
1665 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1666 assert!(matches!(event, Event::InterfaceUp(_, _, _)));
1667 }
1668
1669 let client3 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1671 client3
1672 .set_read_timeout(Some(Duration::from_millis(500)))
1673 .unwrap();
1674
1675 thread::sleep(Duration::from_millis(100));
1677
1678 let result = recv_non_peer_event(&rx, Duration::from_millis(500));
1680 assert!(
1681 result.is_err(),
1682 "expected no InterfaceUp for rejected connection, got {:?}",
1683 result
1684 );
1685 }
1686
1687 #[test]
1688 fn backbone_max_connections_allows_after_disconnect() {
1689 let port = find_free_port();
1690 let (tx, rx) = crate::event::channel();
1691 let next_id = Arc::new(AtomicU64::new(8900));
1692
1693 let config = make_server_config(
1694 port,
1695 89,
1696 Some(1),
1697 None,
1698 None,
1699 BackboneAbuseConfig::default(),
1700 );
1701
1702 start(config, tx, next_id).unwrap();
1703 thread::sleep(Duration::from_millis(50));
1704
1705 let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1707 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1708 assert!(matches!(event, Event::InterfaceUp(_, _, _)));
1709
1710 drop(client1);
1712
1713 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1715 assert!(matches!(event, Event::InterfaceDown(_)));
1716
1717 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1719 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1720 assert!(
1721 matches!(event, Event::InterfaceUp(_, _, _)),
1722 "expected InterfaceUp after slot freed, got {:?}",
1723 event
1724 );
1725 }
1726
1727 #[test]
1728 fn backbone_client_reconnect() {
1729 let port = find_free_port();
1730 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1731 listener.set_nonblocking(false).unwrap();
1732 let (tx, rx) = crate::event::channel();
1733
1734 let config = make_client_config(port, 9300);
1735 let _writer = start_client(config, tx).unwrap();
1736
1737 let (server_stream, _) = listener.accept().unwrap();
1739
1740 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1742
1743 drop(server_stream);
1744
1745 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1747 assert!(matches!(event, Event::InterfaceDown(InterfaceId(9300))));
1748
1749 let _server_stream2 = listener.accept().unwrap();
1751
1752 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1754 assert!(matches!(event, Event::InterfaceUp(InterfaceId(9300), _, _)));
1755 }
1756
1757 #[test]
1758 fn backbone_idle_timeout_disconnects_silent_client() {
1759 let port = find_free_port();
1760 let (tx, rx) = crate::event::channel();
1761 let next_id = Arc::new(AtomicU64::new(9400));
1762
1763 let config = make_server_config(
1764 port,
1765 94,
1766 None,
1767 Some(Duration::from_millis(150)),
1768 None,
1769 BackboneAbuseConfig::default(),
1770 );
1771
1772 start(config, tx, next_id).unwrap();
1773 thread::sleep(Duration::from_millis(50));
1774
1775 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1776
1777 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1778 let client_id = match event {
1779 Event::InterfaceUp(id, _, _) => id,
1780 other => panic!("expected InterfaceUp, got {:?}", other),
1781 };
1782
1783 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1784 assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1785 }
1786
1787 #[test]
1788 fn backbone_idle_timeout_ignores_client_after_data() {
1789 let port = find_free_port();
1790 let (tx, rx) = crate::event::channel();
1791 let next_id = Arc::new(AtomicU64::new(9500));
1792
1793 let config = make_server_config(
1794 port,
1795 95,
1796 None,
1797 Some(Duration::from_millis(200)),
1798 None,
1799 BackboneAbuseConfig::default(),
1800 );
1801
1802 start(config, tx, next_id).unwrap();
1803 thread::sleep(Duration::from_millis(50));
1804
1805 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1806
1807 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1808 let client_id = match event {
1809 Event::InterfaceUp(id, _, _) => id,
1810 other => panic!("expected InterfaceUp, got {:?}", other),
1811 };
1812
1813 client.write_all(&hdlc::frame(&[1u8; 24])).unwrap();
1814
1815 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1816 match event {
1817 Event::Frame { interface_id, data } => {
1818 assert_eq!(interface_id, client_id);
1819 assert_eq!(data, vec![1u8; 24]);
1820 }
1821 other => panic!("expected Frame, got {:?}", other),
1822 }
1823
1824 let result = recv_non_peer_event(&rx, Duration::from_millis(500));
1825 assert!(
1826 result.is_err(),
1827 "expected no InterfaceDown after client sent data, got {:?}",
1828 result
1829 );
1830 }
1831
1832 #[test]
1833 fn backbone_runtime_idle_timeout_updates_live() {
1834 let port = find_free_port();
1835 let (tx, rx) = crate::event::channel();
1836 let next_id = Arc::new(AtomicU64::new(9650));
1837
1838 let config = make_server_config(port, 97, None, None, None, BackboneAbuseConfig::default());
1839 let runtime = Arc::clone(&config.runtime);
1840
1841 start(config, tx, next_id).unwrap();
1842 thread::sleep(Duration::from_millis(50));
1843
1844 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1845 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1846 let client_id = match event {
1847 Event::InterfaceUp(id, _, _) => id,
1848 other => panic!("expected InterfaceUp, got {:?}", other),
1849 };
1850
1851 {
1852 let mut runtime = runtime.lock().unwrap();
1853 runtime.idle_timeout = Some(Duration::from_millis(150));
1854 }
1855
1856 let event = recv_non_peer_event(&rx, Duration::from_secs(4)).unwrap();
1857 assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1858 }
1859
1860 #[test]
1861 fn backbone_write_stall_timeout_disconnects_unwritable_client() {
1862 let port = find_free_port();
1863 let (tx, rx) = crate::event::channel();
1864 let next_id = Arc::new(AtomicU64::new(9660));
1865
1866 let config = make_server_config(
1867 port,
1868 98,
1869 None,
1870 None,
1871 Some(Duration::from_millis(50)),
1872 BackboneAbuseConfig::default(),
1873 );
1874
1875 start(config, tx, next_id).unwrap();
1876 thread::sleep(Duration::from_millis(50));
1877
1878 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1879 client
1880 .set_read_timeout(Some(Duration::from_millis(100)))
1881 .unwrap();
1882 let sock = SockRef::from(&client);
1883 sock.set_recv_buffer_size(4096).ok();
1884
1885 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1886 let (client_id, mut writer) = match event {
1887 Event::InterfaceUp(id, Some(writer), _) => (id, writer),
1888 other => panic!("expected InterfaceUp with writer, got {:?}", other),
1889 };
1890
1891 let payload = vec![0x55; 512 * 1024];
1892 let deadline = Instant::now() + Duration::from_secs(3);
1893 let mut stalled = false;
1894 while Instant::now() < deadline {
1895 match writer.send_frame(&payload) {
1896 Ok(()) => {}
1897 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1898 thread::sleep(Duration::from_millis(10));
1899 }
1900 Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {
1901 stalled = true;
1902 break;
1903 }
1904 Err(e) => panic!("unexpected send error: {}", e),
1905 }
1906 }
1907
1908 assert!(stalled, "expected writer to time out on persistent stall");
1909 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1910 assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1911 }
1912
1913 fn wait_for<F>(rx: &mpsc::Receiver<Event>, timeout: Duration, mut pred: F) -> Option<Event>
1915 where
1916 F: FnMut(&Event) -> bool,
1917 {
1918 let deadline = Instant::now() + timeout;
1919 loop {
1920 let remaining = deadline.saturating_duration_since(Instant::now());
1921 if remaining.is_zero() {
1922 return None;
1923 }
1924 match rx.recv_timeout(remaining) {
1925 Ok(event) if pred(&event) => return Some(event),
1926 Ok(_) => continue,
1927 Err(_) => return None,
1928 }
1929 }
1930 }
1931
1932 #[test]
1933 fn backbone_write_stall_emits_peer_events() {
1934 let port = find_free_port();
1935 let (tx, rx) = crate::event::channel();
1936 let next_id = Arc::new(AtomicU64::new(9700));
1937
1938 let config = make_server_config(
1939 port,
1940 97,
1941 None,
1942 None,
1943 Some(Duration::from_millis(50)), BackboneAbuseConfig::default(),
1945 );
1946
1947 start(config, tx, next_id).unwrap();
1948 thread::sleep(Duration::from_millis(50));
1949
1950 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1952 client
1953 .set_read_timeout(Some(Duration::from_millis(100)))
1954 .unwrap();
1955 let sock = SockRef::from(&client);
1956 sock.set_recv_buffer_size(4096).ok();
1957
1958 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1960 let mut writer = match event {
1961 Event::InterfaceUp(_, Some(w), _) => w,
1962 other => panic!("expected InterfaceUp with writer, got {:?}", other),
1963 };
1964
1965 let payload = vec![0x55; 512 * 1024];
1967 let deadline = Instant::now() + Duration::from_secs(3);
1968 while Instant::now() < deadline {
1969 match writer.send_frame(&payload) {
1970 Ok(()) | Err(_) => {
1971 if Instant::now() + Duration::from_millis(10) > deadline {
1972 break;
1973 }
1974 thread::sleep(Duration::from_millis(10));
1975 }
1976 }
1977 }
1978
1979 let stall_event = wait_for(&rx, Duration::from_secs(3), |e| {
1981 matches!(e, Event::BackbonePeerWriteStall { .. })
1982 });
1983 assert!(
1984 stall_event.is_some(),
1985 "expected BackbonePeerWriteStall event"
1986 );
1987 }
1988
1989 #[test]
1990 fn backbone_blacklisted_peer_rejected_on_connect() {
1991 let port = find_free_port();
1992 let (tx, rx) = crate::event::channel();
1993 let next_id = Arc::new(AtomicU64::new(9800));
1994
1995 let config = make_server_config(port, 98, None, None, None, BackboneAbuseConfig::default());
1996 let peer_state = config.peer_state.clone();
1997
1998 start(config, tx.clone(), next_id).unwrap();
1999 thread::sleep(Duration::from_millis(50));
2000
2001 let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
2003 let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
2004 assert!(
2005 matches!(event, Event::InterfaceUp(_, _, _)),
2006 "first connection should succeed"
2007 );
2008 drop(client1);
2009
2010 thread::sleep(Duration::from_millis(100));
2012 while rx.try_recv().is_ok() {}
2013
2014 peer_state.lock().unwrap().blacklist(
2016 "127.0.0.1".parse().unwrap(),
2017 Duration::from_secs(60),
2018 "test blacklist".into(),
2019 );
2020
2021 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
2023 thread::sleep(Duration::from_millis(200));
2025
2026 let event = rx.try_recv();
2028 match event {
2029 Ok(Event::InterfaceUp(_, _, _)) => {
2030 panic!("blacklisted peer should not get InterfaceUp")
2031 }
2032 _ => {} }
2034 }
2035
2036 #[test]
2037 fn backbone_parse_config_reads_abuse_settings() {
2038 let factory = BackboneInterfaceFactory;
2039 let mut params = HashMap::new();
2040 params.insert("listen_ip".into(), "127.0.0.1".into());
2041 params.insert("listen_port".into(), "4242".into());
2042 params.insert("idle_timeout".into(), "15".into());
2043 params.insert("write_stall_timeout".into(), "45".into());
2044 params.insert("max_penalty_duration".into(), "3600".into());
2045
2046 let config = factory
2047 .parse_config("test-backbone", InterfaceId(97), ¶ms)
2048 .unwrap();
2049 let mode = *config.into_any().downcast::<BackboneMode>().unwrap();
2050
2051 match mode {
2052 BackboneMode::Server(config) => {
2053 assert_eq!(config.listen_ip, "127.0.0.1");
2054 assert_eq!(config.listen_port, 4242);
2055 assert_eq!(config.idle_timeout, Some(Duration::from_secs(15)));
2056 assert_eq!(config.write_stall_timeout, Some(Duration::from_secs(45)));
2057 assert_eq!(
2058 config.abuse.max_penalty_duration,
2059 Some(Duration::from_secs(3600))
2060 );
2061 }
2062 BackboneMode::Client(_) => panic!("expected server config"),
2063 }
2064 }
2065}