Skip to main content

rns_net/interface/
backbone.rs

1//! Backbone TCP mesh interface using cross-platform polling.
2//!
3//! Server mode: listens on a TCP port, accepts peer connections, spawns
4//! dynamic per-peer interfaces. Uses a single poll thread to multiplex
5//! all client sockets. HDLC framing for packet boundaries.
6//!
7//! Client mode: connects to a remote backbone server, single TCP connection
8//! with HDLC framing. Reconnects on disconnect.
9//!
10//! Matches Python `BackboneInterface.py`.
11
12use std::collections::HashMap;
13use std::hash::{BuildHasher, Hasher};
14use std::io::{self, Read, Write};
15use std::net::{IpAddr, Shutdown, TcpListener, TcpStream, ToSocketAddrs};
16use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
17use std::sync::{Arc, Mutex};
18use std::thread;
19use std::time::{Duration, Instant};
20
21use polling::{Event as PollEvent, Events, Poller};
22use socket2::{SockRef, TcpKeepalive};
23
24use rns_core::constants;
25use rns_core::transport::types::{IngressControlConfig, InterfaceId, InterfaceInfo};
26
27use crate::event::{Event, EventSender};
28use crate::hdlc;
29use crate::interface::{
30    lock_or_recover, InterfaceConfigData, InterfaceFactory, StartContext, StartResult, Writer,
31};
32use crate::BackbonePeerStateEntry;
33
34/// HW_MTU: 1 MB (matches Python BackboneInterface.HW_MTU)
35#[allow(dead_code)]
36const HW_MTU: usize = 1_048_576;
37
38/// Configuration for a backbone interface.
39#[derive(Debug, Clone)]
40pub struct BackboneConfig {
41    pub name: String,
42    pub listen_ip: String,
43    pub listen_port: u16,
44    pub interface_id: InterfaceId,
45    pub max_connections: Option<usize>,
46    pub idle_timeout: Option<Duration>,
47    pub write_stall_timeout: Option<Duration>,
48    pub abuse: BackboneAbuseConfig,
49    pub ingress_control: IngressControlConfig,
50    pub runtime: Arc<Mutex<BackboneServerRuntime>>,
51    pub peer_state: Arc<Mutex<BackbonePeerMonitor>>,
52}
53
54/// Configurable behavior-based abuse detection for inbound peers.
55#[derive(Debug, Clone, Default)]
56pub struct BackboneAbuseConfig {
57    pub max_penalty_duration: Option<Duration>,
58}
59
60/// Live runtime state for a backbone server interface.
61#[derive(Debug, Clone)]
62pub struct BackboneServerRuntime {
63    pub max_connections: Option<usize>,
64    pub idle_timeout: Option<Duration>,
65    pub write_stall_timeout: Option<Duration>,
66    pub abuse: BackboneAbuseConfig,
67}
68
69impl BackboneServerRuntime {
70    pub fn from_config(config: &BackboneConfig) -> Self {
71        Self {
72            max_connections: config.max_connections,
73            idle_timeout: config.idle_timeout,
74            write_stall_timeout: config.write_stall_timeout,
75            abuse: config.abuse.clone(),
76        }
77    }
78}
79
80#[derive(Debug, Clone)]
81pub struct BackboneRuntimeConfigHandle {
82    pub interface_name: String,
83    pub runtime: Arc<Mutex<BackboneServerRuntime>>,
84    pub startup: BackboneServerRuntime,
85}
86
87#[derive(Debug, Clone)]
88pub struct BackbonePeerStateHandle {
89    pub interface_id: InterfaceId,
90    pub interface_name: String,
91    pub peer_state: Arc<Mutex<BackbonePeerMonitor>>,
92}
93
94impl Default for BackboneConfig {
95    fn default() -> Self {
96        let mut config = BackboneConfig {
97            name: String::new(),
98            listen_ip: "0.0.0.0".into(),
99            listen_port: 0,
100            interface_id: InterfaceId(0),
101            max_connections: None,
102            idle_timeout: None,
103            write_stall_timeout: None,
104            abuse: BackboneAbuseConfig::default(),
105            ingress_control: IngressControlConfig::enabled(),
106            runtime: Arc::new(Mutex::new(BackboneServerRuntime {
107                max_connections: None,
108                idle_timeout: None,
109                write_stall_timeout: None,
110                abuse: BackboneAbuseConfig::default(),
111            })),
112            peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
113        };
114        let startup = BackboneServerRuntime::from_config(&config);
115        config.runtime = Arc::new(Mutex::new(startup));
116        config
117    }
118}
119
120/// Maximum pending buffer size per client (512 KB). Clients exceeding this are
121/// disconnected to prevent unbounded memory growth from slow readers.
122const MAX_PENDING_BYTES: usize = 512 * 1024;
123
124/// Writer that sends HDLC-framed data over a cloned TCP stream (server mode).
125struct BackboneWriter {
126    stream: TcpStream,
127    runtime: Arc<Mutex<BackboneServerRuntime>>,
128    interface_name: String,
129    interface_id: InterfaceId,
130    event_tx: EventSender,
131    pending: Vec<u8>,
132    stall_started: Option<Instant>,
133    disconnect_notified: bool,
134    write_stall_flag: Arc<AtomicBool>,
135}
136
137impl Writer for BackboneWriter {
138    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
139        let write_stall_timeout =
140            lock_or_recover(&self.runtime, "backbone runtime").write_stall_timeout;
141        if !self.pending.is_empty() {
142            self.flush_pending(write_stall_timeout)?;
143            if !self.pending.is_empty() {
144                return Err(io::Error::new(
145                    io::ErrorKind::WouldBlock,
146                    "backbone writer still stalled",
147                ));
148            }
149        }
150
151        let frame = hdlc::frame(data);
152        self.write_buffer(&frame, write_stall_timeout)
153    }
154}
155
156impl BackboneWriter {
157    fn write_buffer(
158        &mut self,
159        data: &[u8],
160        write_stall_timeout: Option<Duration>,
161    ) -> io::Result<()> {
162        let mut written = 0usize;
163        while written < data.len() {
164            match self.stream.write(&data[written..]) {
165                Ok(0) => {
166                    return Err(io::Error::new(
167                        io::ErrorKind::WriteZero,
168                        "backbone writer wrote zero bytes",
169                    ))
170                }
171                Ok(n) => {
172                    written += n;
173                    self.stall_started = None;
174                }
175                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
176                    let now = Instant::now();
177                    let started = self.stall_started.get_or_insert(now);
178                    if let Some(timeout) = write_stall_timeout {
179                        if now.duration_since(*started) >= timeout {
180                            return Err(self.disconnect_for_write_stall(timeout));
181                        }
182                    }
183                    if self.pending.len() + data[written..].len() > MAX_PENDING_BYTES {
184                        return Err(self.disconnect_for_write_stall(
185                            write_stall_timeout.unwrap_or(Duration::from_secs(30)),
186                        ));
187                    }
188                    self.pending.extend_from_slice(&data[written..]);
189                    return Err(io::Error::new(
190                        io::ErrorKind::WouldBlock,
191                        "backbone writer would block",
192                    ));
193                }
194                Err(e) => return Err(e),
195            }
196        }
197        Ok(())
198    }
199
200    fn flush_pending(&mut self, write_stall_timeout: Option<Duration>) -> io::Result<()> {
201        if self.pending.is_empty() {
202            return Ok(());
203        }
204
205        let pending = std::mem::take(&mut self.pending);
206        match self.write_buffer(&pending, write_stall_timeout) {
207            Ok(()) => Ok(()),
208            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(()),
209            Err(e) => Err(e),
210        }
211    }
212
213    fn disconnect_for_write_stall(&mut self, timeout: Duration) -> io::Error {
214        if !self.disconnect_notified {
215            log::warn!(
216                "[{}] backbone client {} disconnected due to write stall timeout ({:?})",
217                self.interface_name,
218                self.interface_id.0,
219                timeout
220            );
221            self.write_stall_flag.store(true, Ordering::Relaxed);
222            let _ = self.stream.shutdown(Shutdown::Both);
223            let _ = self.event_tx.send(Event::InterfaceDown(self.interface_id));
224            self.disconnect_notified = true;
225        }
226        io::Error::new(
227            io::ErrorKind::TimedOut,
228            format!("backbone writer stalled for {:?}", timeout),
229        )
230    }
231}
232
233/// Start a backbone interface. Binds TCP listener, spawns poll thread.
234pub fn start(config: BackboneConfig, tx: EventSender, next_id: Arc<AtomicU64>) -> io::Result<()> {
235    let addr = format!("{}:{}", config.listen_ip, config.listen_port);
236    let listener = TcpListener::bind(&addr)?;
237    listener.set_nonblocking(true)?;
238
239    log::info!(
240        "[{}] backbone server listening on {}",
241        config.name,
242        listener
243            .local_addr()
244            .unwrap_or_else(|_| std::net::SocketAddr::from(([0, 0, 0, 0], config.listen_port)))
245    );
246
247    let name = config.name.clone();
248    let server_interface_id = config.interface_id;
249    let runtime = Arc::clone(&config.runtime);
250    let peer_state = Arc::clone(&config.peer_state);
251    let ingress_control = config.ingress_control;
252    thread::Builder::new()
253        .name(format!("backbone-poll-{}", config.interface_id.0))
254        .spawn(move || {
255            if let Err(e) = poll_loop(
256                listener,
257                name,
258                server_interface_id,
259                tx,
260                next_id,
261                runtime,
262                peer_state,
263                ingress_control,
264            ) {
265                log::error!("backbone poll loop error: {}", e);
266            }
267        })?;
268
269    Ok(())
270}
271
272/// Per-client tracking state.
273struct ClientState {
274    id: InterfaceId,
275    peer_ip: IpAddr,
276    peer_port: u16,
277    stream: TcpStream,
278    decoder: hdlc::Decoder,
279    connected_at: Instant,
280    has_received_data: bool,
281    write_stall_flag: Arc<AtomicBool>,
282}
283
284#[derive(Debug, Clone)]
285struct PeerBehaviorState {
286    blacklisted_until: Option<Instant>,
287    blacklist_reason: Option<String>,
288    reject_count: u64,
289    connected_count: usize,
290}
291
292impl PeerBehaviorState {
293    fn new() -> Self {
294        Self {
295            blacklisted_until: None,
296            blacklist_reason: None,
297            reject_count: 0,
298            connected_count: 0,
299        }
300    }
301}
302
303#[derive(Debug, Clone, Default)]
304pub struct BackbonePeerMonitor {
305    peers: HashMap<IpAddr, PeerBehaviorState>,
306}
307
308impl BackbonePeerMonitor {
309    pub fn new() -> Self {
310        Self {
311            peers: HashMap::new(),
312        }
313    }
314
315    fn upsert_snapshot(&mut self, peers: &HashMap<IpAddr, PeerBehaviorState>) {
316        let mut merged = self.peers.clone();
317
318        for (peer_ip, state) in peers {
319            let entry = merged
320                .entry(*peer_ip)
321                .or_insert_with(PeerBehaviorState::new);
322            entry.connected_count = state.connected_count;
323            entry.reject_count = state.reject_count;
324            if state.blacklisted_until.is_some() {
325                entry.blacklisted_until = state.blacklisted_until;
326                entry.blacklist_reason = state.blacklist_reason.clone();
327            }
328        }
329
330        merged.retain(|peer_ip, state| {
331            peers.contains_key(peer_ip)
332                || state.blacklisted_until.is_some()
333                || state.reject_count > 0
334        });
335        self.peers = merged;
336    }
337
338    fn sync_into(&self, peers: &mut HashMap<IpAddr, PeerBehaviorState>) {
339        for (peer_ip, state) in &self.peers {
340            let entry = peers.entry(*peer_ip).or_insert_with(PeerBehaviorState::new);
341            entry.blacklisted_until = state.blacklisted_until;
342            entry.blacklist_reason = state.blacklist_reason.clone();
343            entry.reject_count = state.reject_count;
344        }
345
346        peers.retain(|peer_ip, state| {
347            if state.connected_count > 0 {
348                return true;
349            }
350            self.peers.contains_key(peer_ip)
351        });
352    }
353
354    pub fn list(&self, interface_name: &str) -> Vec<BackbonePeerStateEntry> {
355        let now = Instant::now();
356        let mut entries: Vec<BackbonePeerStateEntry> = self
357            .peers
358            .iter()
359            .map(|(peer_ip, state)| BackbonePeerStateEntry {
360                interface_name: interface_name.to_string(),
361                peer_ip: *peer_ip,
362                connected_count: state.connected_count,
363                blacklisted_remaining_secs: state
364                    .blacklisted_until
365                    .and_then(|until| (until > now).then(|| (until - now).as_secs_f64())),
366                blacklist_reason: state.blacklist_reason.clone(),
367                reject_count: state.reject_count,
368            })
369            .collect();
370        entries.sort_by(|a, b| a.peer_ip.cmp(&b.peer_ip));
371        entries
372    }
373
374    pub fn clear(&mut self, peer_ip: IpAddr) -> bool {
375        self.peers.remove(&peer_ip).is_some()
376    }
377
378    pub fn blacklist(&mut self, peer_ip: IpAddr, duration: Duration, reason: String) -> bool {
379        let state = self
380            .peers
381            .entry(peer_ip)
382            .or_insert_with(PeerBehaviorState::new);
383        state.blacklisted_until = Some(Instant::now() + duration);
384        state.blacklist_reason = Some(reason);
385        true
386    }
387
388    #[cfg(test)]
389    pub fn seed_entry(&mut self, entry: BackbonePeerStateEntry) {
390        let mut state = PeerBehaviorState::new();
391        state.connected_count = entry.connected_count;
392        state.reject_count = entry.reject_count;
393        state.blacklist_reason = entry.blacklist_reason;
394        if let Some(remaining) = entry.blacklisted_remaining_secs {
395            state.blacklisted_until = Some(Instant::now() + Duration::from_secs_f64(remaining));
396        }
397        self.peers.insert(entry.peer_ip, state);
398    }
399}
400
401#[derive(Clone, Copy)]
402enum DisconnectReason {
403    RemoteClosed,
404    IdleTimeout,
405    WriteStall,
406}
407
408/// Main poll event loop.
409fn poll_loop(
410    listener: TcpListener,
411    name: String,
412    server_interface_id: InterfaceId,
413    tx: EventSender,
414    next_id: Arc<AtomicU64>,
415    runtime: Arc<Mutex<BackboneServerRuntime>>,
416    peer_state: Arc<Mutex<BackbonePeerMonitor>>,
417    ingress_control: IngressControlConfig,
418) -> io::Result<()> {
419    let poller = Poller::new()?;
420
421    const LISTENER_KEY: usize = 0;
422
423    // SAFETY: listener outlives its registration in the poller.
424    unsafe { poller.add(&listener, PollEvent::readable(LISTENER_KEY))? };
425
426    let mut clients: HashMap<usize, ClientState> = HashMap::new();
427    let mut peers: HashMap<IpAddr, PeerBehaviorState> = HashMap::new();
428    let mut events = Events::new();
429    let mut next_key: usize = 1;
430
431    loop {
432        let runtime_snapshot = runtime.lock().unwrap().clone();
433        let max_connections = runtime_snapshot.max_connections;
434        let idle_timeout = runtime_snapshot.idle_timeout;
435        cleanup_peer_state(&mut peers);
436        {
437            let mut monitor = peer_state.lock().unwrap();
438            monitor.sync_into(&mut peers);
439            monitor.upsert_snapshot(&peers);
440        }
441
442        events.clear();
443        poller.wait(&mut events, Some(Duration::from_secs(1)))?;
444
445        for ev in events.iter() {
446            if ev.key == LISTENER_KEY {
447                // Accept new connections
448                loop {
449                    match listener.accept() {
450                        Ok((stream, peer_addr)) => {
451                            let peer_ip = peer_addr.ip();
452                            let peer_port = peer_addr.port();
453
454                            if is_ip_blacklisted(&mut peers, peer_ip) {
455                                if let Some(state) = peers.get_mut(&peer_ip) {
456                                    state.reject_count = state.reject_count.saturating_add(1);
457                                }
458                                peer_state.lock().unwrap().upsert_snapshot(&peers);
459                                log::debug!("[{}] rejecting blacklisted peer {}", name, peer_addr);
460                                drop(stream);
461                                continue;
462                            }
463
464                            if let Some(max) = max_connections {
465                                if clients.len() >= max {
466                                    log::warn!(
467                                        "[{}] max connections ({}) reached, rejecting {}",
468                                        name,
469                                        max,
470                                        peer_addr
471                                    );
472                                    drop(stream);
473                                    continue;
474                                }
475                            }
476
477                            stream.set_nonblocking(true).ok();
478                            stream.set_nodelay(true).ok();
479                            set_tcp_keepalive(&stream).ok();
480
481                            // Prevent SIGPIPE on macOS when writing to broken pipes
482                            #[cfg(target_os = "macos")]
483                            {
484                                let sock = SockRef::from(&stream);
485                                sock.set_nosigpipe(true).ok();
486                            }
487
488                            let key = next_key;
489                            next_key += 1;
490                            let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
491
492                            log::info!(
493                                "[{}] backbone client connected: {} → id {}",
494                                name,
495                                peer_addr,
496                                client_id.0
497                            );
498
499                            // Register client with poller
500                            // SAFETY: stream is stored in ClientState and outlives registration.
501                            if let Err(e) = unsafe { poller.add(&stream, PollEvent::readable(key)) }
502                            {
503                                log::warn!("[{}] failed to add client to poller: {}", name, e);
504                                continue; // stream drops, closing socket
505                            }
506
507                            // Create writer via try_clone (cross-platform dup)
508                            let writer_stream = match stream.try_clone() {
509                                Ok(s) => s,
510                                Err(e) => {
511                                    log::warn!("[{}] failed to clone client stream: {}", name, e);
512                                    let _ = poller.delete(&stream);
513                                    continue; // stream drops
514                                }
515                            };
516                            let write_stall_flag = Arc::new(AtomicBool::new(false));
517                            let writer: Box<dyn Writer> = Box::new(BackboneWriter {
518                                stream: writer_stream,
519                                runtime: Arc::clone(&runtime),
520                                interface_name: name.clone(),
521                                interface_id: client_id,
522                                event_tx: tx.clone(),
523                                pending: Vec::new(),
524                                stall_started: None,
525                                disconnect_notified: false,
526                                write_stall_flag: Arc::clone(&write_stall_flag),
527                            });
528
529                            clients.insert(
530                                key,
531                                ClientState {
532                                    id: client_id,
533                                    peer_ip,
534                                    peer_port,
535                                    stream,
536                                    decoder: hdlc::Decoder::new(),
537                                    connected_at: Instant::now(),
538                                    has_received_data: false,
539                                    write_stall_flag,
540                                },
541                            );
542                            peers
543                                .entry(peer_ip)
544                                .or_insert_with(PeerBehaviorState::new)
545                                .connected_count += 1;
546                            peer_state.lock().unwrap().upsert_snapshot(&peers);
547                            let _ = tx.send(Event::BackbonePeerConnected {
548                                server_interface_id,
549                                peer_interface_id: client_id,
550                                peer_ip,
551                                peer_port,
552                            });
553
554                            let info = InterfaceInfo {
555                                id: client_id,
556                                name: format!("BackboneInterface/{}", client_id.0),
557                                mode: constants::MODE_FULL,
558                                out_capable: true,
559                                in_capable: true,
560                                bitrate: Some(1_000_000_000), // 1 Gbps guess
561                                airtime_profile: None,
562                                announce_rate_target: None,
563                                announce_rate_grace: 0,
564                                announce_rate_penalty: 0.0,
565                                announce_cap: constants::ANNOUNCE_CAP,
566                                is_local_client: false,
567                                wants_tunnel: false,
568                                tunnel_id: None,
569                                mtu: 65535,
570                                ia_freq: 0.0,
571                                started: 0.0,
572                                ingress_control,
573                            };
574
575                            if tx
576                                .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
577                                .is_err()
578                            {
579                                // Driver shut down
580                                cleanup(&poller, &clients, &listener);
581                                return Ok(());
582                            }
583                        }
584                        Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
585                        Err(e) => {
586                            log::warn!("[{}] accept error: {}", name, e);
587                            break;
588                        }
589                    }
590                }
591                // Re-arm listener (oneshot semantics)
592                poller.modify(&listener, PollEvent::readable(LISTENER_KEY))?;
593            } else if clients.contains_key(&ev.key) {
594                let key = ev.key;
595                let mut should_remove = false;
596                let mut client_id = InterfaceId(0);
597
598                let mut buf = [0u8; 4096];
599                let read_result = {
600                    let client = clients.get_mut(&key).unwrap();
601                    client.stream.read(&mut buf)
602                };
603
604                match read_result {
605                    Ok(0) | Err(_) => {
606                        if let Some(c) = clients.get(&key) {
607                            client_id = c.id;
608                        }
609                        should_remove = true;
610                    }
611                    Ok(n) => {
612                        let client = clients.get_mut(&key).unwrap();
613                        client_id = client.id;
614                        client.has_received_data = true;
615                        for frame in client.decoder.feed(&buf[..n]) {
616                            if tx
617                                .send(Event::Frame {
618                                    interface_id: client_id,
619                                    data: frame,
620                                })
621                                .is_err()
622                            {
623                                cleanup(&poller, &clients, &listener);
624                                return Ok(());
625                            }
626                        }
627                    }
628                }
629
630                if should_remove {
631                    let reason = if clients
632                        .get(&key)
633                        .is_some_and(|c| c.write_stall_flag.load(Ordering::Relaxed))
634                    {
635                        DisconnectReason::WriteStall
636                    } else {
637                        DisconnectReason::RemoteClosed
638                    };
639                    disconnect_client(
640                        &poller,
641                        &mut clients,
642                        &mut peers,
643                        &name,
644                        server_interface_id,
645                        &tx,
646                        &peer_state,
647                        key,
648                        client_id,
649                        reason,
650                    );
651                } else if let Some(client) = clients.get(&key) {
652                    // Re-arm client (oneshot semantics)
653                    poller.modify(&client.stream, PollEvent::readable(key))?;
654                }
655            }
656        }
657
658        if let Some(timeout) = idle_timeout {
659            let now = Instant::now();
660            let timed_out: Vec<(usize, InterfaceId)> = clients
661                .iter()
662                .filter_map(|(&key, client)| {
663                    if client.has_received_data || now.duration_since(client.connected_at) < timeout
664                    {
665                        None
666                    } else {
667                        Some((key, client.id))
668                    }
669                })
670                .collect();
671
672            for (key, client_id) in timed_out {
673                disconnect_client(
674                    &poller,
675                    &mut clients,
676                    &mut peers,
677                    &name,
678                    server_interface_id,
679                    &tx,
680                    &peer_state,
681                    key,
682                    client_id,
683                    DisconnectReason::IdleTimeout,
684                );
685            }
686        }
687    }
688}
689
690fn cleanup_peer_state(peers: &mut HashMap<IpAddr, PeerBehaviorState>) {
691    let now = Instant::now();
692    peers.retain(|_, state| {
693        if matches!(state.blacklisted_until, Some(until) if now >= until) {
694            state.blacklisted_until = None;
695            state.blacklist_reason = None;
696        }
697        state.blacklisted_until.is_some() || state.connected_count > 0 || state.reject_count > 0
698    });
699}
700
701fn is_ip_blacklisted(peers: &mut HashMap<IpAddr, PeerBehaviorState>, peer_ip: IpAddr) -> bool {
702    let now = Instant::now();
703    if let Some(state) = peers.get_mut(&peer_ip) {
704        if let Some(until) = state.blacklisted_until {
705            if now < until {
706                return true;
707            }
708            state.blacklisted_until = None;
709        }
710    }
711    false
712}
713
714fn disconnect_client(
715    poller: &Poller,
716    clients: &mut HashMap<usize, ClientState>,
717    peers: &mut HashMap<IpAddr, PeerBehaviorState>,
718    name: &str,
719    server_interface_id: InterfaceId,
720    tx: &EventSender,
721    peer_state: &Arc<Mutex<BackbonePeerMonitor>>,
722    key: usize,
723    client_id: InterfaceId,
724    reason: DisconnectReason,
725) {
726    let Some(client) = clients.remove(&key) else {
727        return;
728    };
729
730    match reason {
731        DisconnectReason::RemoteClosed => {
732            log::info!("[{}] backbone client {} disconnected", name, client_id.0);
733        }
734        DisconnectReason::IdleTimeout => {
735            log::info!(
736                "[{}] backbone client {} disconnected due to idle timeout",
737                name,
738                client_id.0
739            );
740        }
741        DisconnectReason::WriteStall => {
742            // Already logged by BackboneWriter::disconnect_for_write_stall
743        }
744    }
745
746    let _ = poller.delete(&client.stream);
747    // client.stream closes on drop
748    let connected_for = client.connected_at.elapsed();
749    let _ = tx.send(Event::BackbonePeerDisconnected {
750        server_interface_id,
751        peer_interface_id: client.id,
752        peer_ip: client.peer_ip,
753        peer_port: client.peer_port,
754        connected_for,
755        had_received_data: client.has_received_data,
756    });
757    match reason {
758        DisconnectReason::IdleTimeout => {
759            let _ = tx.send(Event::BackbonePeerIdleTimeout {
760                server_interface_id,
761                peer_interface_id: client.id,
762                peer_ip: client.peer_ip,
763                peer_port: client.peer_port,
764                connected_for,
765            });
766        }
767        DisconnectReason::WriteStall => {
768            let _ = tx.send(Event::BackbonePeerWriteStall {
769                server_interface_id,
770                peer_interface_id: client.id,
771                peer_ip: client.peer_ip,
772                peer_port: client.peer_port,
773                connected_for,
774            });
775        }
776        DisconnectReason::RemoteClosed => {}
777    }
778
779    if let Some(state) = peers.get_mut(&client.peer_ip) {
780        state.connected_count = state.connected_count.saturating_sub(1);
781    }
782    peer_state.lock().unwrap().upsert_snapshot(peers);
783    // Writer already sent InterfaceDown for write stalls; avoid duplicate.
784    if !matches!(reason, DisconnectReason::WriteStall) {
785        let _ = tx.send(Event::InterfaceDown(client_id));
786    }
787}
788
789fn set_tcp_keepalive(stream: &TcpStream) -> io::Result<()> {
790    let sock = SockRef::from(stream);
791    let mut keepalive = TcpKeepalive::new()
792        .with_time(Duration::from_secs(5))
793        .with_interval(Duration::from_secs(2));
794    #[cfg(any(target_os = "linux", target_os = "macos"))]
795    {
796        keepalive = keepalive.with_retries(12);
797    }
798    sock.set_tcp_keepalive(&keepalive)
799}
800
801fn cleanup(poller: &Poller, clients: &HashMap<usize, ClientState>, listener: &TcpListener) {
802    for (_, client) in clients {
803        let _ = poller.delete(&client.stream);
804    }
805    let _ = poller.delete(listener);
806}
807
808// ---------------------------------------------------------------------------
809// Client mode
810// ---------------------------------------------------------------------------
811
812/// Configuration for a backbone client interface.
813#[derive(Debug, Clone)]
814pub struct BackboneClientConfig {
815    pub name: String,
816    pub target_host: String,
817    pub target_port: u16,
818    pub interface_id: InterfaceId,
819    pub reconnect_wait: Duration,
820    pub max_reconnect_tries: Option<u32>,
821    pub connect_timeout: Duration,
822    pub transport_identity: Option<String>,
823    pub runtime: Arc<Mutex<BackboneClientRuntime>>,
824}
825
826#[derive(Debug, Clone)]
827pub struct BackboneClientRuntime {
828    pub reconnect_wait: Duration,
829    pub max_reconnect_tries: Option<u32>,
830    pub connect_timeout: Duration,
831}
832
833impl BackboneClientRuntime {
834    pub fn from_config(config: &BackboneClientConfig) -> Self {
835        Self {
836            reconnect_wait: config.reconnect_wait,
837            max_reconnect_tries: config.max_reconnect_tries,
838            connect_timeout: config.connect_timeout,
839        }
840    }
841}
842
843#[derive(Debug, Clone)]
844pub struct BackboneClientRuntimeConfigHandle {
845    pub interface_name: String,
846    pub runtime: Arc<Mutex<BackboneClientRuntime>>,
847    pub startup: BackboneClientRuntime,
848}
849
850impl Default for BackboneClientConfig {
851    fn default() -> Self {
852        let mut config = BackboneClientConfig {
853            name: String::new(),
854            target_host: "127.0.0.1".into(),
855            target_port: 4242,
856            interface_id: InterfaceId(0),
857            reconnect_wait: Duration::from_secs(5),
858            max_reconnect_tries: None,
859            connect_timeout: Duration::from_secs(5),
860            transport_identity: None,
861            runtime: Arc::new(Mutex::new(BackboneClientRuntime {
862                reconnect_wait: Duration::from_secs(5),
863                max_reconnect_tries: None,
864                connect_timeout: Duration::from_secs(5),
865            })),
866        };
867        let startup = BackboneClientRuntime::from_config(&config);
868        config.runtime = Arc::new(Mutex::new(startup));
869        config
870    }
871}
872
873/// Writer that sends HDLC-framed data over a TCP stream (client mode).
874struct BackboneClientWriter {
875    stream: TcpStream,
876}
877
878impl Writer for BackboneClientWriter {
879    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
880        self.stream.write_all(&hdlc::frame(data))
881    }
882}
883
884/// Try to connect to the target host:port with timeout.
885fn try_connect_client(config: &BackboneClientConfig) -> io::Result<TcpStream> {
886    let runtime = config.runtime.lock().unwrap().clone();
887    let addr_str = format!("{}:{}", config.target_host, config.target_port);
888    let addr = addr_str
889        .to_socket_addrs()?
890        .next()
891        .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "no addresses resolved"))?;
892
893    let stream = TcpStream::connect_timeout(&addr, runtime.connect_timeout)?;
894    stream.set_nodelay(true)?;
895    set_tcp_keepalive(&stream).ok();
896
897    // Prevent SIGPIPE on macOS when writing to broken pipes
898    #[cfg(target_os = "macos")]
899    {
900        let sock = SockRef::from(&stream);
901        sock.set_nosigpipe(true).ok();
902    }
903
904    Ok(stream)
905}
906
907/// Connect and start the reader thread. Returns the writer for the driver.
908pub fn start_client(config: BackboneClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
909    let stream = try_connect_client(&config)?;
910    let reader_stream = stream.try_clone()?;
911    let writer_stream = stream.try_clone()?;
912
913    let id = config.interface_id;
914    log::info!(
915        "[{}] backbone client connected to {}:{}",
916        config.name,
917        config.target_host,
918        config.target_port
919    );
920
921    // Initial connect: writer is None because it's returned directly to the caller
922    let _ = tx.send(Event::InterfaceUp(id, None, None));
923
924    thread::Builder::new()
925        .name(format!("backbone-client-{}", id.0))
926        .spawn(move || {
927            client_reader_loop(reader_stream, config, tx);
928        })?;
929
930    Ok(Box::new(BackboneClientWriter {
931        stream: writer_stream,
932    }))
933}
934
935/// Reader thread: reads from socket, HDLC-decodes, sends frames to driver.
936/// On disconnect, attempts reconnection.
937fn client_reader_loop(mut stream: TcpStream, config: BackboneClientConfig, tx: EventSender) {
938    let id = config.interface_id;
939    let mut decoder = hdlc::Decoder::new();
940    let mut buf = [0u8; 4096];
941
942    loop {
943        match stream.read(&mut buf) {
944            Ok(0) => {
945                log::warn!("[{}] connection closed", config.name);
946                let _ = tx.send(Event::InterfaceDown(id));
947                match client_reconnect(&config, &tx) {
948                    Some(new_stream) => {
949                        stream = new_stream;
950                        decoder = hdlc::Decoder::new();
951                        continue;
952                    }
953                    None => {
954                        log::error!("[{}] reconnection failed, giving up", config.name);
955                        return;
956                    }
957                }
958            }
959            Ok(n) => {
960                for frame in decoder.feed(&buf[..n]) {
961                    if tx
962                        .send(Event::Frame {
963                            interface_id: id,
964                            data: frame,
965                        })
966                        .is_err()
967                    {
968                        return;
969                    }
970                }
971            }
972            Err(e) => {
973                log::warn!("[{}] read error: {}", config.name, e);
974                let _ = tx.send(Event::InterfaceDown(id));
975                match client_reconnect(&config, &tx) {
976                    Some(new_stream) => {
977                        stream = new_stream;
978                        decoder = hdlc::Decoder::new();
979                        continue;
980                    }
981                    None => {
982                        log::error!("[{}] reconnection failed, giving up", config.name);
983                        return;
984                    }
985                }
986            }
987        }
988    }
989}
990
991/// Maximum backoff multiplier: `base_delay * 2^MAX_BACKOFF_SHIFT`.
992/// With a 5 s base this caps at 5 × 2^6 = 320 s ≈ 5 min.
993const MAX_BACKOFF_SHIFT: u32 = 6;
994
995/// Attempt to reconnect with exponential backoff and jitter.
996/// Returns the new reader stream on success.
997/// Sends the new writer to the driver via InterfaceUp event.
998fn client_reconnect(config: &BackboneClientConfig, tx: &EventSender) -> Option<TcpStream> {
999    let mut attempts = 0u32;
1000    loop {
1001        let runtime = config.runtime.lock().unwrap().clone();
1002
1003        let shift = attempts.min(MAX_BACKOFF_SHIFT);
1004        let backoff = runtime.reconnect_wait * 2u32.pow(shift);
1005        // Add ±25 % jitter to avoid thundering-herd reconnects.
1006        let jitter_range = backoff / 4;
1007        let jitter = if jitter_range.as_nanos() > 0 {
1008            let offset = Duration::from_nanos(
1009                (std::hash::RandomState::new().build_hasher().finish()
1010                    % jitter_range.as_nanos() as u64)
1011                    * 2,
1012            );
1013            if offset > jitter_range {
1014                backoff + (offset - jitter_range)
1015            } else {
1016                backoff - (jitter_range - offset)
1017            }
1018        } else {
1019            backoff
1020        };
1021        thread::sleep(jitter);
1022
1023        attempts += 1;
1024
1025        if let Some(max) = runtime.max_reconnect_tries {
1026            if attempts > max {
1027                let _ = tx.send(Event::InterfaceDown(config.interface_id));
1028                return None;
1029            }
1030        }
1031
1032        log::info!(
1033            "[{}] reconnect attempt {} (backoff {:.1}s) ...",
1034            config.name,
1035            attempts,
1036            jitter.as_secs_f64(),
1037        );
1038
1039        match try_connect_client(config) {
1040            Ok(new_stream) => {
1041                let writer_stream = match new_stream.try_clone() {
1042                    Ok(s) => s,
1043                    Err(e) => {
1044                        log::warn!("[{}] failed to clone stream: {}", config.name, e);
1045                        continue;
1046                    }
1047                };
1048                log::info!(
1049                    "[{}] reconnected after {} attempt(s)",
1050                    config.name,
1051                    attempts
1052                );
1053                let new_writer: Box<dyn Writer> = Box::new(BackboneClientWriter {
1054                    stream: writer_stream,
1055                });
1056                let _ = tx.send(Event::InterfaceUp(
1057                    config.interface_id,
1058                    Some(new_writer),
1059                    None,
1060                ));
1061                return Some(new_stream);
1062            }
1063            Err(e) => {
1064                log::warn!("[{}] reconnect failed: {}", config.name, e);
1065            }
1066        }
1067    }
1068}
1069
1070// ---------------------------------------------------------------------------
1071// Factory
1072// ---------------------------------------------------------------------------
1073
1074/// Internal enum used by [`BackboneInterfaceFactory`] to carry either a
1075/// server or client config through the opaque `InterfaceConfigData` channel.
1076#[derive(Clone)]
1077pub(crate) enum BackboneMode {
1078    Server(BackboneConfig),
1079    Client(BackboneClientConfig),
1080}
1081
1082/// Factory for `BackboneInterface`.
1083///
1084/// If the config params contain `"remote"` or `"target_host"` the interface
1085/// is started in client mode; otherwise it is started as a TCP listener
1086/// (server mode).
1087pub struct BackboneInterfaceFactory;
1088
1089fn parse_positive_duration_secs(params: &HashMap<String, String>, key: &str) -> Option<Duration> {
1090    params
1091        .get(key)
1092        .and_then(|v| v.parse::<f64>().ok())
1093        .filter(|v| *v > 0.0)
1094        .map(Duration::from_secs_f64)
1095}
1096
1097impl InterfaceFactory for BackboneInterfaceFactory {
1098    fn type_name(&self) -> &str {
1099        "BackboneInterface"
1100    }
1101
1102    fn parse_config(
1103        &self,
1104        name: &str,
1105        id: InterfaceId,
1106        params: &HashMap<String, String>,
1107    ) -> Result<Box<dyn InterfaceConfigData>, String> {
1108        if let Some(target_host) = params.get("remote").or_else(|| params.get("target_host")) {
1109            // Client mode
1110            let target_host = target_host.clone();
1111            let target_port = params
1112                .get("target_port")
1113                .or_else(|| params.get("port"))
1114                .and_then(|v| v.parse().ok())
1115                .unwrap_or(4242);
1116            let transport_identity = params.get("transport_identity").cloned();
1117            Ok(Box::new(BackboneMode::Client(BackboneClientConfig {
1118                name: name.to_string(),
1119                target_host,
1120                target_port,
1121                interface_id: id,
1122                transport_identity,
1123                ..BackboneClientConfig::default()
1124            })))
1125        } else {
1126            // Server mode
1127            let listen_ip = params
1128                .get("listen_ip")
1129                .or_else(|| params.get("device"))
1130                .cloned()
1131                .unwrap_or_else(|| "0.0.0.0".into());
1132            let listen_port = params
1133                .get("listen_port")
1134                .or_else(|| params.get("port"))
1135                .and_then(|v| v.parse().ok())
1136                .unwrap_or(4242);
1137            let max_connections = params.get("max_connections").and_then(|v| v.parse().ok());
1138            let idle_timeout = parse_positive_duration_secs(params, "idle_timeout");
1139            let write_stall_timeout = parse_positive_duration_secs(params, "write_stall_timeout");
1140            let abuse = BackboneAbuseConfig {
1141                max_penalty_duration: parse_positive_duration_secs(params, "max_penalty_duration"),
1142            };
1143            let mut config = BackboneConfig {
1144                name: name.to_string(),
1145                listen_ip,
1146                listen_port,
1147                interface_id: id,
1148                max_connections,
1149                idle_timeout,
1150                write_stall_timeout,
1151                abuse,
1152                ingress_control: IngressControlConfig::enabled(),
1153                runtime: Arc::new(Mutex::new(BackboneServerRuntime {
1154                    max_connections: None,
1155                    idle_timeout: None,
1156                    write_stall_timeout: None,
1157                    abuse: BackboneAbuseConfig::default(),
1158                })),
1159                peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
1160            };
1161            let startup = BackboneServerRuntime::from_config(&config);
1162            config.runtime = Arc::new(Mutex::new(startup));
1163            Ok(Box::new(BackboneMode::Server(config)))
1164        }
1165    }
1166
1167    fn start(
1168        &self,
1169        config: Box<dyn InterfaceConfigData>,
1170        ctx: StartContext,
1171    ) -> io::Result<StartResult> {
1172        let mode = *config.into_any().downcast::<BackboneMode>().map_err(|_| {
1173            io::Error::new(
1174                io::ErrorKind::InvalidData,
1175                "wrong config type for BackboneInterface",
1176            )
1177        })?;
1178
1179        match mode {
1180            BackboneMode::Client(cfg) => {
1181                let id = cfg.interface_id;
1182                let name = cfg.name.clone();
1183                let info = InterfaceInfo {
1184                    id,
1185                    name,
1186                    mode: ctx.mode,
1187                    out_capable: true,
1188                    in_capable: true,
1189                    bitrate: Some(1_000_000_000),
1190                    airtime_profile: None,
1191                    announce_rate_target: None,
1192                    announce_rate_grace: 0,
1193                    announce_rate_penalty: 0.0,
1194                    announce_cap: constants::ANNOUNCE_CAP,
1195                    is_local_client: false,
1196                    wants_tunnel: false,
1197                    tunnel_id: None,
1198                    mtu: 65535,
1199                    ingress_control: ctx.ingress_control,
1200                    ia_freq: 0.0,
1201                    started: crate::time::now(),
1202                };
1203                let writer = start_client(cfg, ctx.tx)?;
1204                Ok(StartResult::Simple {
1205                    id,
1206                    info,
1207                    writer,
1208                    interface_type_name: "BackboneInterface".to_string(),
1209                })
1210            }
1211            BackboneMode::Server(mut cfg) => {
1212                cfg.ingress_control = ctx.ingress_control;
1213                start(cfg, ctx.tx, ctx.next_dynamic_id)?;
1214                Ok(StartResult::Listener { control: None })
1215            }
1216        }
1217    }
1218}
1219
1220pub(crate) fn runtime_handle_from_mode(mode: &BackboneMode) -> Option<BackboneRuntimeConfigHandle> {
1221    match mode {
1222        BackboneMode::Server(config) => Some(BackboneRuntimeConfigHandle {
1223            interface_name: config.name.clone(),
1224            runtime: Arc::clone(&config.runtime),
1225            startup: BackboneServerRuntime::from_config(config),
1226        }),
1227        BackboneMode::Client(_) => None,
1228    }
1229}
1230
1231pub(crate) fn peer_state_handle_from_mode(mode: &BackboneMode) -> Option<BackbonePeerStateHandle> {
1232    match mode {
1233        BackboneMode::Server(config) => Some(BackbonePeerStateHandle {
1234            interface_id: config.interface_id,
1235            interface_name: config.name.clone(),
1236            peer_state: Arc::clone(&config.peer_state),
1237        }),
1238        BackboneMode::Client(_) => None,
1239    }
1240}
1241
1242pub(crate) fn client_runtime_handle_from_mode(
1243    mode: &BackboneMode,
1244) -> Option<BackboneClientRuntimeConfigHandle> {
1245    match mode {
1246        BackboneMode::Client(config) => Some(BackboneClientRuntimeConfigHandle {
1247            interface_name: config.name.clone(),
1248            runtime: Arc::clone(&config.runtime),
1249            startup: BackboneClientRuntime::from_config(config),
1250        }),
1251        BackboneMode::Server(_) => None,
1252    }
1253}
1254
1255pub(crate) fn client_config_from_mode(mode: &BackboneMode) -> Option<BackboneClientConfig> {
1256    match mode {
1257        BackboneMode::Client(config) => Some(config.clone()),
1258        BackboneMode::Server(_) => None,
1259    }
1260}
1261
1262#[cfg(test)]
1263mod tests {
1264    use super::*;
1265    use std::sync::mpsc;
1266    use std::time::Duration;
1267
1268    fn find_free_port() -> u16 {
1269        TcpListener::bind("127.0.0.1:0")
1270            .unwrap()
1271            .local_addr()
1272            .unwrap()
1273            .port()
1274    }
1275
1276    fn recv_non_peer_event(
1277        rx: &mpsc::Receiver<Event>,
1278        timeout: Duration,
1279    ) -> Result<Event, mpsc::RecvTimeoutError> {
1280        let deadline = Instant::now() + timeout;
1281        loop {
1282            let remaining = deadline.saturating_duration_since(Instant::now());
1283            if remaining.is_zero() {
1284                return Err(mpsc::RecvTimeoutError::Timeout);
1285            }
1286            let event = rx.recv_timeout(remaining)?;
1287            match event {
1288                Event::BackbonePeerConnected { .. }
1289                | Event::BackbonePeerDisconnected { .. }
1290                | Event::BackbonePeerIdleTimeout { .. }
1291                | Event::BackbonePeerWriteStall { .. }
1292                | Event::BackbonePeerPenalty { .. } => continue,
1293                other => return Ok(other),
1294            }
1295        }
1296    }
1297
1298    fn make_server_config(
1299        port: u16,
1300        interface_id: u64,
1301        max_connections: Option<usize>,
1302        idle_timeout: Option<Duration>,
1303        write_stall_timeout: Option<Duration>,
1304        abuse: BackboneAbuseConfig,
1305    ) -> BackboneConfig {
1306        let mut config = BackboneConfig {
1307            name: "test-backbone".into(),
1308            listen_ip: "127.0.0.1".into(),
1309            listen_port: port,
1310            interface_id: InterfaceId(interface_id),
1311            max_connections,
1312            idle_timeout,
1313            write_stall_timeout,
1314            abuse,
1315            ingress_control: IngressControlConfig::enabled(),
1316            runtime: Arc::new(Mutex::new(BackboneServerRuntime {
1317                max_connections: None,
1318                idle_timeout: None,
1319                write_stall_timeout: None,
1320                abuse: BackboneAbuseConfig::default(),
1321            })),
1322            peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
1323        };
1324        let startup = BackboneServerRuntime::from_config(&config);
1325        config.runtime = Arc::new(Mutex::new(startup));
1326        config
1327    }
1328
1329    #[test]
1330    fn backbone_accept_connection() {
1331        let port = find_free_port();
1332        let (tx, rx) = crate::event::channel();
1333        let next_id = Arc::new(AtomicU64::new(8000));
1334
1335        let config = make_server_config(port, 80, None, None, None, BackboneAbuseConfig::default());
1336
1337        start(config, tx, next_id).unwrap();
1338        thread::sleep(Duration::from_millis(50));
1339
1340        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1341
1342        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1343        match event {
1344            Event::InterfaceUp(id, writer, info) => {
1345                assert_eq!(id, InterfaceId(8000));
1346                assert!(writer.is_some());
1347                assert!(info.is_some());
1348                let info = info.unwrap();
1349                assert!(info.out_capable);
1350                assert!(info.in_capable);
1351            }
1352            other => panic!("expected InterfaceUp, got {:?}", other),
1353        }
1354    }
1355
1356    #[test]
1357    fn backbone_receive_frame() {
1358        let port = find_free_port();
1359        let (tx, rx) = crate::event::channel();
1360        let next_id = Arc::new(AtomicU64::new(8100));
1361
1362        let config = make_server_config(port, 81, None, None, None, BackboneAbuseConfig::default());
1363
1364        start(config, tx, next_id).unwrap();
1365        thread::sleep(Duration::from_millis(50));
1366
1367        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1368
1369        // Drain InterfaceUp
1370        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1371
1372        // Send HDLC frame (>= 19 bytes)
1373        let payload: Vec<u8> = (0..32).collect();
1374        client.write_all(&hdlc::frame(&payload)).unwrap();
1375
1376        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1377        match event {
1378            Event::Frame { interface_id, data } => {
1379                assert_eq!(interface_id, InterfaceId(8100));
1380                assert_eq!(data, payload);
1381            }
1382            other => panic!("expected Frame, got {:?}", other),
1383        }
1384    }
1385
1386    #[test]
1387    fn backbone_send_to_client() {
1388        let port = find_free_port();
1389        let (tx, rx) = crate::event::channel();
1390        let next_id = Arc::new(AtomicU64::new(8200));
1391
1392        let config = make_server_config(port, 82, None, None, None, BackboneAbuseConfig::default());
1393
1394        start(config, tx, next_id).unwrap();
1395        thread::sleep(Duration::from_millis(50));
1396
1397        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1398        client
1399            .set_read_timeout(Some(Duration::from_secs(2)))
1400            .unwrap();
1401
1402        // Get writer from InterfaceUp
1403        let event = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1404        let mut writer = match event {
1405            Event::InterfaceUp(_, Some(w), _) => w,
1406            other => panic!("expected InterfaceUp with writer, got {:?}", other),
1407        };
1408
1409        // Send frame via writer
1410        let payload: Vec<u8> = (0..24).collect();
1411        writer.send_frame(&payload).unwrap();
1412
1413        // Read from client
1414        let mut buf = [0u8; 256];
1415        let n = client.read(&mut buf).unwrap();
1416        let expected = hdlc::frame(&payload);
1417        assert_eq!(&buf[..n], &expected[..]);
1418    }
1419
1420    #[test]
1421    fn backbone_multiple_clients() {
1422        let port = find_free_port();
1423        let (tx, rx) = crate::event::channel();
1424        let next_id = Arc::new(AtomicU64::new(8300));
1425
1426        let config = make_server_config(port, 83, None, None, None, BackboneAbuseConfig::default());
1427
1428        start(config, tx, next_id).unwrap();
1429        thread::sleep(Duration::from_millis(50));
1430
1431        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1432        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1433
1434        let mut ids = Vec::new();
1435        for _ in 0..2 {
1436            let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1437            match event {
1438                Event::InterfaceUp(id, _, _) => ids.push(id),
1439                other => panic!("expected InterfaceUp, got {:?}", other),
1440            }
1441        }
1442
1443        assert_eq!(ids.len(), 2);
1444        assert_ne!(ids[0], ids[1]);
1445    }
1446
1447    #[test]
1448    fn backbone_client_disconnect() {
1449        let port = find_free_port();
1450        let (tx, rx) = crate::event::channel();
1451        let next_id = Arc::new(AtomicU64::new(8400));
1452
1453        let config = make_server_config(port, 84, None, None, None, BackboneAbuseConfig::default());
1454
1455        start(config, tx, next_id).unwrap();
1456        thread::sleep(Duration::from_millis(50));
1457
1458        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1459
1460        // Drain InterfaceUp
1461        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1462
1463        // Disconnect
1464        drop(client);
1465
1466        // Should receive InterfaceDown
1467        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1468        assert!(
1469            matches!(event, Event::InterfaceDown(InterfaceId(8400))),
1470            "expected InterfaceDown(8400), got {:?}",
1471            event
1472        );
1473    }
1474
1475    #[test]
1476    fn backbone_epoll_multiplexing() {
1477        let port = find_free_port();
1478        let (tx, rx) = crate::event::channel();
1479        let next_id = Arc::new(AtomicU64::new(8500));
1480
1481        let config = make_server_config(port, 85, None, None, None, BackboneAbuseConfig::default());
1482
1483        start(config, tx, next_id).unwrap();
1484        thread::sleep(Duration::from_millis(50));
1485
1486        let mut client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1487        let mut client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1488
1489        // Drain both InterfaceUp events
1490        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1491        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1492
1493        // Both clients send data simultaneously
1494        let payload1: Vec<u8> = (0..24).collect();
1495        let payload2: Vec<u8> = (100..130).collect();
1496        client1.write_all(&hdlc::frame(&payload1)).unwrap();
1497        client2.write_all(&hdlc::frame(&payload2)).unwrap();
1498
1499        // Should receive both Frame events
1500        let mut received = Vec::new();
1501        for _ in 0..2 {
1502            let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1503            match event {
1504                Event::Frame { data, .. } => received.push(data),
1505                other => panic!("expected Frame, got {:?}", other),
1506            }
1507        }
1508        assert!(received.contains(&payload1));
1509        assert!(received.contains(&payload2));
1510    }
1511
1512    #[test]
1513    fn backbone_bind_port() {
1514        let port = find_free_port();
1515        let (tx, _rx) = crate::event::channel();
1516        let next_id = Arc::new(AtomicU64::new(8600));
1517
1518        let config = make_server_config(port, 86, None, None, None, BackboneAbuseConfig::default());
1519
1520        // Should not error
1521        start(config, tx, next_id).unwrap();
1522    }
1523
1524    #[test]
1525    fn backbone_hdlc_fragmented() {
1526        let port = find_free_port();
1527        let (tx, rx) = crate::event::channel();
1528        let next_id = Arc::new(AtomicU64::new(8700));
1529
1530        let config = make_server_config(port, 87, None, None, None, BackboneAbuseConfig::default());
1531
1532        start(config, tx, next_id).unwrap();
1533        thread::sleep(Duration::from_millis(50));
1534
1535        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1536        client.set_nodelay(true).unwrap();
1537
1538        // Drain InterfaceUp
1539        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1540
1541        // Send HDLC frame in two fragments
1542        let payload: Vec<u8> = (0..32).collect();
1543        let framed = hdlc::frame(&payload);
1544        let mid = framed.len() / 2;
1545
1546        client.write_all(&framed[..mid]).unwrap();
1547        thread::sleep(Duration::from_millis(50));
1548        client.write_all(&framed[mid..]).unwrap();
1549
1550        // Should receive reassembled frame
1551        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1552        match event {
1553            Event::Frame { data, .. } => {
1554                assert_eq!(data, payload);
1555            }
1556            other => panic!("expected Frame, got {:?}", other),
1557        }
1558    }
1559
1560    // -----------------------------------------------------------------------
1561    // Client mode tests
1562    // -----------------------------------------------------------------------
1563
1564    fn make_client_config(port: u16, id: u64) -> BackboneClientConfig {
1565        BackboneClientConfig {
1566            name: format!("test-bb-client-{}", port),
1567            target_host: "127.0.0.1".into(),
1568            target_port: port,
1569            interface_id: InterfaceId(id),
1570            reconnect_wait: Duration::from_millis(100),
1571            max_reconnect_tries: Some(2),
1572            connect_timeout: Duration::from_secs(2),
1573            transport_identity: None,
1574            runtime: Arc::new(Mutex::new(BackboneClientRuntime {
1575                reconnect_wait: Duration::from_millis(100),
1576                max_reconnect_tries: Some(2),
1577                connect_timeout: Duration::from_secs(2),
1578            })),
1579        }
1580    }
1581
1582    #[test]
1583    fn backbone_client_connect() {
1584        let port = find_free_port();
1585        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1586        let (tx, rx) = crate::event::channel();
1587
1588        let config = make_client_config(port, 9000);
1589        let _writer = start_client(config, tx).unwrap();
1590
1591        let _server_stream = listener.accept().unwrap();
1592
1593        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1594        assert!(matches!(event, Event::InterfaceUp(InterfaceId(9000), _, _)));
1595    }
1596
1597    #[test]
1598    fn backbone_client_receive_frame() {
1599        let port = find_free_port();
1600        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1601        let (tx, rx) = crate::event::channel();
1602
1603        let config = make_client_config(port, 9100);
1604        let _writer = start_client(config, tx).unwrap();
1605
1606        let (mut server_stream, _) = listener.accept().unwrap();
1607
1608        // Drain InterfaceUp
1609        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1610
1611        // Send HDLC frame from server side (>= 19 bytes payload)
1612        let payload: Vec<u8> = (0..32).collect();
1613        server_stream.write_all(&hdlc::frame(&payload)).unwrap();
1614
1615        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1616        match event {
1617            Event::Frame { interface_id, data } => {
1618                assert_eq!(interface_id, InterfaceId(9100));
1619                assert_eq!(data, payload);
1620            }
1621            other => panic!("expected Frame, got {:?}", other),
1622        }
1623    }
1624
1625    #[test]
1626    fn backbone_client_send_frame() {
1627        let port = find_free_port();
1628        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1629        let (tx, _rx) = crate::event::channel();
1630
1631        let config = make_client_config(port, 9200);
1632        let mut writer = start_client(config, tx).unwrap();
1633
1634        let (mut server_stream, _) = listener.accept().unwrap();
1635        server_stream
1636            .set_read_timeout(Some(Duration::from_secs(2)))
1637            .unwrap();
1638
1639        let payload: Vec<u8> = (0..24).collect();
1640        writer.send_frame(&payload).unwrap();
1641
1642        let mut buf = [0u8; 256];
1643        let n = server_stream.read(&mut buf).unwrap();
1644        let expected = hdlc::frame(&payload);
1645        assert_eq!(&buf[..n], &expected[..]);
1646    }
1647
1648    #[test]
1649    fn backbone_max_connections_rejects_excess() {
1650        let port = find_free_port();
1651        let (tx, rx) = crate::event::channel();
1652        let next_id = Arc::new(AtomicU64::new(8800));
1653
1654        let config = make_server_config(
1655            port,
1656            88,
1657            Some(2),
1658            None,
1659            None,
1660            BackboneAbuseConfig::default(),
1661        );
1662
1663        start(config, tx, next_id).unwrap();
1664        thread::sleep(Duration::from_millis(50));
1665
1666        // Connect two clients (at limit)
1667        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1668        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1669
1670        // Drain both InterfaceUp events
1671        for _ in 0..2 {
1672            let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1673            assert!(matches!(event, Event::InterfaceUp(_, _, _)));
1674        }
1675
1676        // Third connection should be accepted at TCP level but immediately dropped
1677        let client3 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1678        client3
1679            .set_read_timeout(Some(Duration::from_millis(500)))
1680            .unwrap();
1681
1682        // Give server time to reject
1683        thread::sleep(Duration::from_millis(100));
1684
1685        // Should NOT receive a third InterfaceUp
1686        let result = recv_non_peer_event(&rx, Duration::from_millis(500));
1687        assert!(
1688            result.is_err(),
1689            "expected no InterfaceUp for rejected connection, got {:?}",
1690            result
1691        );
1692    }
1693
1694    #[test]
1695    fn backbone_max_connections_allows_after_disconnect() {
1696        let port = find_free_port();
1697        let (tx, rx) = crate::event::channel();
1698        let next_id = Arc::new(AtomicU64::new(8900));
1699
1700        let config = make_server_config(
1701            port,
1702            89,
1703            Some(1),
1704            None,
1705            None,
1706            BackboneAbuseConfig::default(),
1707        );
1708
1709        start(config, tx, next_id).unwrap();
1710        thread::sleep(Duration::from_millis(50));
1711
1712        // Connect first client
1713        let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1714        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1715        assert!(matches!(event, Event::InterfaceUp(_, _, _)));
1716
1717        // Disconnect first client
1718        drop(client1);
1719
1720        // Wait for InterfaceDown
1721        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1722        assert!(matches!(event, Event::InterfaceDown(_)));
1723
1724        // Now a new connection should be accepted
1725        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1726        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1727        assert!(
1728            matches!(event, Event::InterfaceUp(_, _, _)),
1729            "expected InterfaceUp after slot freed, got {:?}",
1730            event
1731        );
1732    }
1733
1734    #[test]
1735    fn backbone_client_reconnect() {
1736        let port = find_free_port();
1737        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1738        listener.set_nonblocking(false).unwrap();
1739        let (tx, rx) = crate::event::channel();
1740
1741        let config = make_client_config(port, 9300);
1742        let _writer = start_client(config, tx).unwrap();
1743
1744        // Accept first connection and immediately close it
1745        let (server_stream, _) = listener.accept().unwrap();
1746
1747        // Drain InterfaceUp
1748        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1749
1750        drop(server_stream);
1751
1752        // Should get InterfaceDown
1753        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1754        assert!(matches!(event, Event::InterfaceDown(InterfaceId(9300))));
1755
1756        // Accept the reconnection
1757        let _server_stream2 = listener.accept().unwrap();
1758
1759        // Should get InterfaceUp again
1760        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1761        assert!(matches!(event, Event::InterfaceUp(InterfaceId(9300), _, _)));
1762    }
1763
1764    #[test]
1765    fn backbone_idle_timeout_disconnects_silent_client() {
1766        let port = find_free_port();
1767        let (tx, rx) = crate::event::channel();
1768        let next_id = Arc::new(AtomicU64::new(9400));
1769
1770        let config = make_server_config(
1771            port,
1772            94,
1773            None,
1774            Some(Duration::from_millis(150)),
1775            None,
1776            BackboneAbuseConfig::default(),
1777        );
1778
1779        start(config, tx, next_id).unwrap();
1780        thread::sleep(Duration::from_millis(50));
1781
1782        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1783
1784        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1785        let client_id = match event {
1786            Event::InterfaceUp(id, _, _) => id,
1787            other => panic!("expected InterfaceUp, got {:?}", other),
1788        };
1789
1790        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1791        assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1792    }
1793
1794    #[test]
1795    fn backbone_idle_timeout_ignores_client_after_data() {
1796        let port = find_free_port();
1797        let (tx, rx) = crate::event::channel();
1798        let next_id = Arc::new(AtomicU64::new(9500));
1799
1800        let config = make_server_config(
1801            port,
1802            95,
1803            None,
1804            Some(Duration::from_millis(200)),
1805            None,
1806            BackboneAbuseConfig::default(),
1807        );
1808
1809        start(config, tx, next_id).unwrap();
1810        thread::sleep(Duration::from_millis(50));
1811
1812        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1813
1814        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1815        let client_id = match event {
1816            Event::InterfaceUp(id, _, _) => id,
1817            other => panic!("expected InterfaceUp, got {:?}", other),
1818        };
1819
1820        client.write_all(&hdlc::frame(&[1u8; 24])).unwrap();
1821
1822        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1823        match event {
1824            Event::Frame { interface_id, data } => {
1825                assert_eq!(interface_id, client_id);
1826                assert_eq!(data, vec![1u8; 24]);
1827            }
1828            other => panic!("expected Frame, got {:?}", other),
1829        }
1830
1831        let result = recv_non_peer_event(&rx, Duration::from_millis(500));
1832        assert!(
1833            result.is_err(),
1834            "expected no InterfaceDown after client sent data, got {:?}",
1835            result
1836        );
1837    }
1838
1839    #[test]
1840    fn backbone_runtime_idle_timeout_updates_live() {
1841        let port = find_free_port();
1842        let (tx, rx) = crate::event::channel();
1843        let next_id = Arc::new(AtomicU64::new(9650));
1844
1845        let config = make_server_config(port, 97, None, None, None, BackboneAbuseConfig::default());
1846        let runtime = Arc::clone(&config.runtime);
1847
1848        start(config, tx, next_id).unwrap();
1849        thread::sleep(Duration::from_millis(50));
1850
1851        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1852        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1853        let client_id = match event {
1854            Event::InterfaceUp(id, _, _) => id,
1855            other => panic!("expected InterfaceUp, got {:?}", other),
1856        };
1857
1858        {
1859            let mut runtime = runtime.lock().unwrap();
1860            runtime.idle_timeout = Some(Duration::from_millis(150));
1861        }
1862
1863        let event = recv_non_peer_event(&rx, Duration::from_secs(4)).unwrap();
1864        assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1865    }
1866
1867    #[test]
1868    fn backbone_write_stall_timeout_disconnects_unwritable_client() {
1869        let port = find_free_port();
1870        let (tx, rx) = crate::event::channel();
1871        let next_id = Arc::new(AtomicU64::new(9660));
1872
1873        let config = make_server_config(
1874            port,
1875            98,
1876            None,
1877            None,
1878            Some(Duration::from_millis(50)),
1879            BackboneAbuseConfig::default(),
1880        );
1881
1882        start(config, tx, next_id).unwrap();
1883        thread::sleep(Duration::from_millis(50));
1884
1885        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1886        client
1887            .set_read_timeout(Some(Duration::from_millis(100)))
1888            .unwrap();
1889        let sock = SockRef::from(&client);
1890        sock.set_recv_buffer_size(4096).ok();
1891
1892        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1893        let (client_id, mut writer) = match event {
1894            Event::InterfaceUp(id, Some(writer), _) => (id, writer),
1895            other => panic!("expected InterfaceUp with writer, got {:?}", other),
1896        };
1897
1898        let payload = vec![0x55; 512 * 1024];
1899        let deadline = Instant::now() + Duration::from_secs(3);
1900        let mut stalled = false;
1901        while Instant::now() < deadline {
1902            match writer.send_frame(&payload) {
1903                Ok(()) => {}
1904                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1905                    thread::sleep(Duration::from_millis(10));
1906                }
1907                Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {
1908                    stalled = true;
1909                    break;
1910                }
1911                Err(e) => panic!("unexpected send error: {}", e),
1912            }
1913        }
1914
1915        assert!(stalled, "expected writer to time out on persistent stall");
1916        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1917        assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1918    }
1919
1920    /// Drain events matching a predicate, return the first match.
1921    fn wait_for<F>(rx: &mpsc::Receiver<Event>, timeout: Duration, mut pred: F) -> Option<Event>
1922    where
1923        F: FnMut(&Event) -> bool,
1924    {
1925        let deadline = Instant::now() + timeout;
1926        loop {
1927            let remaining = deadline.saturating_duration_since(Instant::now());
1928            if remaining.is_zero() {
1929                return None;
1930            }
1931            match rx.recv_timeout(remaining) {
1932                Ok(event) if pred(&event) => return Some(event),
1933                Ok(_) => continue,
1934                Err(_) => return None,
1935            }
1936        }
1937    }
1938
1939    #[test]
1940    fn backbone_write_stall_emits_peer_events() {
1941        let port = find_free_port();
1942        let (tx, rx) = crate::event::channel();
1943        let next_id = Arc::new(AtomicU64::new(9700));
1944
1945        let config = make_server_config(
1946            port,
1947            97,
1948            None,
1949            None,
1950            Some(Duration::from_millis(50)), // 50ms stall timeout
1951            BackboneAbuseConfig::default(),
1952        );
1953
1954        start(config, tx, next_id).unwrap();
1955        thread::sleep(Duration::from_millis(50));
1956
1957        // Connect a client that won't read (will cause write stall)
1958        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1959        client
1960            .set_read_timeout(Some(Duration::from_millis(100)))
1961            .unwrap();
1962        let sock = SockRef::from(&client);
1963        sock.set_recv_buffer_size(4096).ok();
1964
1965        // Wait for InterfaceUp and grab writer
1966        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1967        let mut writer = match event {
1968            Event::InterfaceUp(_, Some(w), _) => w,
1969            other => panic!("expected InterfaceUp with writer, got {:?}", other),
1970        };
1971
1972        // Flood until stall
1973        let payload = vec![0x55; 512 * 1024];
1974        let deadline = Instant::now() + Duration::from_secs(3);
1975        while Instant::now() < deadline {
1976            match writer.send_frame(&payload) {
1977                Ok(()) | Err(_) => {
1978                    if Instant::now() + Duration::from_millis(10) > deadline {
1979                        break;
1980                    }
1981                    thread::sleep(Duration::from_millis(10));
1982                }
1983            }
1984        }
1985
1986        // Should see BackbonePeerWriteStall event
1987        let stall_event = wait_for(&rx, Duration::from_secs(3), |e| {
1988            matches!(e, Event::BackbonePeerWriteStall { .. })
1989        });
1990        assert!(
1991            stall_event.is_some(),
1992            "expected BackbonePeerWriteStall event"
1993        );
1994    }
1995
1996    #[test]
1997    fn backbone_blacklisted_peer_rejected_on_connect() {
1998        let port = find_free_port();
1999        let (tx, rx) = crate::event::channel();
2000        let next_id = Arc::new(AtomicU64::new(9800));
2001
2002        let config = make_server_config(port, 98, None, None, None, BackboneAbuseConfig::default());
2003        let peer_state = config.peer_state.clone();
2004
2005        start(config, tx.clone(), next_id).unwrap();
2006        thread::sleep(Duration::from_millis(50));
2007
2008        // First connection should succeed
2009        let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
2010        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
2011        assert!(
2012            matches!(event, Event::InterfaceUp(_, _, _)),
2013            "first connection should succeed"
2014        );
2015        drop(client1);
2016
2017        // Drain disconnect events
2018        thread::sleep(Duration::from_millis(100));
2019        while rx.try_recv().is_ok() {}
2020
2021        // Blacklist 127.0.0.1 via the peer monitor
2022        peer_state.lock().unwrap().blacklist(
2023            "127.0.0.1".parse().unwrap(),
2024            Duration::from_secs(60),
2025            "test blacklist".into(),
2026        );
2027
2028        // Second connection from same IP should be rejected (no InterfaceUp)
2029        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
2030        // Give poll loop time to reject
2031        thread::sleep(Duration::from_millis(200));
2032
2033        // Should NOT get an InterfaceUp — connection should have been rejected
2034        let event = rx.try_recv();
2035        match event {
2036            Ok(Event::InterfaceUp(_, _, _)) => {
2037                panic!("blacklisted peer should not get InterfaceUp")
2038            }
2039            _ => {} // Expected: no InterfaceUp
2040        }
2041    }
2042
2043    #[test]
2044    fn backbone_parse_config_reads_abuse_settings() {
2045        let factory = BackboneInterfaceFactory;
2046        let mut params = HashMap::new();
2047        params.insert("listen_ip".into(), "127.0.0.1".into());
2048        params.insert("listen_port".into(), "4242".into());
2049        params.insert("idle_timeout".into(), "15".into());
2050        params.insert("write_stall_timeout".into(), "45".into());
2051        params.insert("max_penalty_duration".into(), "3600".into());
2052
2053        let config = factory
2054            .parse_config("test-backbone", InterfaceId(97), &params)
2055            .unwrap();
2056        let mode = *config.into_any().downcast::<BackboneMode>().unwrap();
2057
2058        match mode {
2059            BackboneMode::Server(config) => {
2060                assert_eq!(config.listen_ip, "127.0.0.1");
2061                assert_eq!(config.listen_port, 4242);
2062                assert_eq!(config.idle_timeout, Some(Duration::from_secs(15)));
2063                assert_eq!(config.write_stall_timeout, Some(Duration::from_secs(45)));
2064                assert_eq!(
2065                    config.abuse.max_penalty_duration,
2066                    Some(Duration::from_secs(3600))
2067                );
2068            }
2069            BackboneMode::Client(_) => panic!("expected server config"),
2070        }
2071    }
2072}