Skip to main content

rns_net/interface/
backbone.rs

1//! Backbone TCP mesh interface using cross-platform polling.
2//!
3//! Server mode: listens on a TCP port, accepts peer connections, spawns
4//! dynamic per-peer interfaces. Uses a single poll thread to multiplex
5//! all client sockets. HDLC framing for packet boundaries.
6//!
7//! Client mode: connects to a remote backbone server, single TCP connection
8//! with HDLC framing. Reconnects on disconnect.
9//!
10//! Matches Python `BackboneInterface.py`.
11
12use std::collections::HashMap;
13use std::hash::{BuildHasher, Hasher};
14use std::io::{self, Read, Write};
15use std::net::{IpAddr, Shutdown, TcpListener, TcpStream, ToSocketAddrs};
16use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
17use std::sync::{Arc, Mutex};
18use std::thread;
19use std::time::{Duration, Instant};
20
21use polling::{Event as PollEvent, Events, Poller};
22use socket2::{SockRef, TcpKeepalive};
23
24use rns_core::constants;
25use rns_core::transport::types::{IngressControlConfig, InterfaceId, InterfaceInfo};
26
27use crate::event::{Event, EventSender};
28use crate::hdlc;
29use crate::interface::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult, Writer};
30use crate::BackbonePeerStateEntry;
31
32/// HW_MTU: 1 MB (matches Python BackboneInterface.HW_MTU)
33#[allow(dead_code)]
34const HW_MTU: usize = 1_048_576;
35
36/// Configuration for a backbone interface.
37#[derive(Debug, Clone)]
38pub struct BackboneConfig {
39    pub name: String,
40    pub listen_ip: String,
41    pub listen_port: u16,
42    pub interface_id: InterfaceId,
43    pub max_connections: Option<usize>,
44    pub idle_timeout: Option<Duration>,
45    pub write_stall_timeout: Option<Duration>,
46    pub abuse: BackboneAbuseConfig,
47    pub ingress_control: IngressControlConfig,
48    pub runtime: Arc<Mutex<BackboneServerRuntime>>,
49    pub peer_state: Arc<Mutex<BackbonePeerMonitor>>,
50}
51
52/// Configurable behavior-based abuse detection for inbound peers.
53#[derive(Debug, Clone, Default)]
54pub struct BackboneAbuseConfig {
55    pub max_penalty_duration: Option<Duration>,
56}
57
58/// Live runtime state for a backbone server interface.
59#[derive(Debug, Clone)]
60pub struct BackboneServerRuntime {
61    pub max_connections: Option<usize>,
62    pub idle_timeout: Option<Duration>,
63    pub write_stall_timeout: Option<Duration>,
64    pub abuse: BackboneAbuseConfig,
65}
66
67impl BackboneServerRuntime {
68    pub fn from_config(config: &BackboneConfig) -> Self {
69        Self {
70            max_connections: config.max_connections,
71            idle_timeout: config.idle_timeout,
72            write_stall_timeout: config.write_stall_timeout,
73            abuse: config.abuse.clone(),
74        }
75    }
76}
77
78#[derive(Debug, Clone)]
79pub struct BackboneRuntimeConfigHandle {
80    pub interface_name: String,
81    pub runtime: Arc<Mutex<BackboneServerRuntime>>,
82    pub startup: BackboneServerRuntime,
83}
84
85#[derive(Debug, Clone)]
86pub struct BackbonePeerStateHandle {
87    pub interface_id: InterfaceId,
88    pub interface_name: String,
89    pub peer_state: Arc<Mutex<BackbonePeerMonitor>>,
90}
91
92impl Default for BackboneConfig {
93    fn default() -> Self {
94        let mut config = BackboneConfig {
95            name: String::new(),
96            listen_ip: "0.0.0.0".into(),
97            listen_port: 0,
98            interface_id: InterfaceId(0),
99            max_connections: None,
100            idle_timeout: None,
101            write_stall_timeout: None,
102            abuse: BackboneAbuseConfig::default(),
103            ingress_control: IngressControlConfig::enabled(),
104            runtime: Arc::new(Mutex::new(BackboneServerRuntime {
105                max_connections: None,
106                idle_timeout: None,
107                write_stall_timeout: None,
108                abuse: BackboneAbuseConfig::default(),
109            })),
110            peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
111        };
112        let startup = BackboneServerRuntime::from_config(&config);
113        config.runtime = Arc::new(Mutex::new(startup));
114        config
115    }
116}
117
118/// Maximum pending buffer size per client (512 KB). Clients exceeding this are
119/// disconnected to prevent unbounded memory growth from slow readers.
120const MAX_PENDING_BYTES: usize = 512 * 1024;
121
122/// Writer that sends HDLC-framed data over a cloned TCP stream (server mode).
123struct BackboneWriter {
124    stream: TcpStream,
125    runtime: Arc<Mutex<BackboneServerRuntime>>,
126    interface_name: String,
127    interface_id: InterfaceId,
128    event_tx: EventSender,
129    pending: Vec<u8>,
130    stall_started: Option<Instant>,
131    disconnect_notified: bool,
132    write_stall_flag: Arc<AtomicBool>,
133}
134
135impl Writer for BackboneWriter {
136    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
137        let write_stall_timeout = self.runtime.lock().unwrap().write_stall_timeout;
138        if !self.pending.is_empty() {
139            self.flush_pending(write_stall_timeout)?;
140            if !self.pending.is_empty() {
141                return Err(io::Error::new(
142                    io::ErrorKind::WouldBlock,
143                    "backbone writer still stalled",
144                ));
145            }
146        }
147
148        let frame = hdlc::frame(data);
149        self.write_buffer(&frame, write_stall_timeout)
150    }
151}
152
153impl BackboneWriter {
154    fn write_buffer(
155        &mut self,
156        data: &[u8],
157        write_stall_timeout: Option<Duration>,
158    ) -> io::Result<()> {
159        let mut written = 0usize;
160        while written < data.len() {
161            match self.stream.write(&data[written..]) {
162                Ok(0) => {
163                    return Err(io::Error::new(
164                        io::ErrorKind::WriteZero,
165                        "backbone writer wrote zero bytes",
166                    ))
167                }
168                Ok(n) => {
169                    written += n;
170                    self.stall_started = None;
171                }
172                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
173                    let now = Instant::now();
174                    let started = self.stall_started.get_or_insert(now);
175                    if let Some(timeout) = write_stall_timeout {
176                        if now.duration_since(*started) >= timeout {
177                            return Err(self.disconnect_for_write_stall(timeout));
178                        }
179                    }
180                    if self.pending.len() + data[written..].len() > MAX_PENDING_BYTES {
181                        return Err(self.disconnect_for_write_stall(
182                            write_stall_timeout.unwrap_or(Duration::from_secs(30)),
183                        ));
184                    }
185                    self.pending.extend_from_slice(&data[written..]);
186                    return Err(io::Error::new(
187                        io::ErrorKind::WouldBlock,
188                        "backbone writer would block",
189                    ));
190                }
191                Err(e) => return Err(e),
192            }
193        }
194        Ok(())
195    }
196
197    fn flush_pending(&mut self, write_stall_timeout: Option<Duration>) -> io::Result<()> {
198        if self.pending.is_empty() {
199            return Ok(());
200        }
201
202        let pending = std::mem::take(&mut self.pending);
203        match self.write_buffer(&pending, write_stall_timeout) {
204            Ok(()) => Ok(()),
205            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(()),
206            Err(e) => Err(e),
207        }
208    }
209
210    fn disconnect_for_write_stall(&mut self, timeout: Duration) -> io::Error {
211        if !self.disconnect_notified {
212            log::warn!(
213                "[{}] backbone client {} disconnected due to write stall timeout ({:?})",
214                self.interface_name,
215                self.interface_id.0,
216                timeout
217            );
218            self.write_stall_flag.store(true, Ordering::Relaxed);
219            let _ = self.stream.shutdown(Shutdown::Both);
220            let _ = self.event_tx.send(Event::InterfaceDown(self.interface_id));
221            self.disconnect_notified = true;
222        }
223        io::Error::new(
224            io::ErrorKind::TimedOut,
225            format!("backbone writer stalled for {:?}", timeout),
226        )
227    }
228}
229
230/// Start a backbone interface. Binds TCP listener, spawns poll thread.
231pub fn start(config: BackboneConfig, tx: EventSender, next_id: Arc<AtomicU64>) -> io::Result<()> {
232    let addr = format!("{}:{}", config.listen_ip, config.listen_port);
233    let listener = TcpListener::bind(&addr)?;
234    listener.set_nonblocking(true)?;
235
236    log::info!(
237        "[{}] backbone server listening on {}",
238        config.name,
239        listener.local_addr().unwrap_or(addr.parse().unwrap())
240    );
241
242    let name = config.name.clone();
243    let server_interface_id = config.interface_id;
244    let runtime = Arc::clone(&config.runtime);
245    let peer_state = Arc::clone(&config.peer_state);
246    let ingress_control = config.ingress_control;
247    thread::Builder::new()
248        .name(format!("backbone-poll-{}", config.interface_id.0))
249        .spawn(move || {
250            if let Err(e) = poll_loop(
251                listener,
252                name,
253                server_interface_id,
254                tx,
255                next_id,
256                runtime,
257                peer_state,
258                ingress_control,
259            ) {
260                log::error!("backbone poll loop error: {}", e);
261            }
262        })?;
263
264    Ok(())
265}
266
267/// Per-client tracking state.
268struct ClientState {
269    id: InterfaceId,
270    peer_ip: IpAddr,
271    peer_port: u16,
272    stream: TcpStream,
273    decoder: hdlc::Decoder,
274    connected_at: Instant,
275    has_received_data: bool,
276    write_stall_flag: Arc<AtomicBool>,
277}
278
279#[derive(Debug, Clone)]
280struct PeerBehaviorState {
281    blacklisted_until: Option<Instant>,
282    blacklist_reason: Option<String>,
283    reject_count: u64,
284    connected_count: usize,
285}
286
287impl PeerBehaviorState {
288    fn new() -> Self {
289        Self {
290            blacklisted_until: None,
291            blacklist_reason: None,
292            reject_count: 0,
293            connected_count: 0,
294        }
295    }
296}
297
298#[derive(Debug, Clone, Default)]
299pub struct BackbonePeerMonitor {
300    peers: HashMap<IpAddr, PeerBehaviorState>,
301}
302
303impl BackbonePeerMonitor {
304    pub fn new() -> Self {
305        Self {
306            peers: HashMap::new(),
307        }
308    }
309
310    fn upsert_snapshot(&mut self, peers: &HashMap<IpAddr, PeerBehaviorState>) {
311        let mut merged = self.peers.clone();
312
313        for (peer_ip, state) in peers {
314            let entry = merged
315                .entry(*peer_ip)
316                .or_insert_with(PeerBehaviorState::new);
317            entry.connected_count = state.connected_count;
318            entry.reject_count = state.reject_count;
319            if state.blacklisted_until.is_some() {
320                entry.blacklisted_until = state.blacklisted_until;
321                entry.blacklist_reason = state.blacklist_reason.clone();
322            }
323        }
324
325        merged.retain(|peer_ip, state| {
326            peers.contains_key(peer_ip)
327                || state.blacklisted_until.is_some()
328                || state.reject_count > 0
329        });
330        self.peers = merged;
331    }
332
333    fn sync_into(&self, peers: &mut HashMap<IpAddr, PeerBehaviorState>) {
334        for (peer_ip, state) in &self.peers {
335            let entry = peers.entry(*peer_ip).or_insert_with(PeerBehaviorState::new);
336            entry.blacklisted_until = state.blacklisted_until;
337            entry.blacklist_reason = state.blacklist_reason.clone();
338            entry.reject_count = state.reject_count;
339        }
340
341        peers.retain(|peer_ip, state| {
342            if state.connected_count > 0 {
343                return true;
344            }
345            self.peers.contains_key(peer_ip)
346        });
347    }
348
349    pub fn list(&self, interface_name: &str) -> Vec<BackbonePeerStateEntry> {
350        let now = Instant::now();
351        let mut entries: Vec<BackbonePeerStateEntry> = self
352            .peers
353            .iter()
354            .map(|(peer_ip, state)| BackbonePeerStateEntry {
355                interface_name: interface_name.to_string(),
356                peer_ip: *peer_ip,
357                connected_count: state.connected_count,
358                blacklisted_remaining_secs: state
359                    .blacklisted_until
360                    .and_then(|until| (until > now).then(|| (until - now).as_secs_f64())),
361                blacklist_reason: state.blacklist_reason.clone(),
362                reject_count: state.reject_count,
363            })
364            .collect();
365        entries.sort_by(|a, b| a.peer_ip.cmp(&b.peer_ip));
366        entries
367    }
368
369    pub fn clear(&mut self, peer_ip: IpAddr) -> bool {
370        self.peers.remove(&peer_ip).is_some()
371    }
372
373    pub fn blacklist(&mut self, peer_ip: IpAddr, duration: Duration, reason: String) -> bool {
374        let state = self
375            .peers
376            .entry(peer_ip)
377            .or_insert_with(PeerBehaviorState::new);
378        state.blacklisted_until = Some(Instant::now() + duration);
379        state.blacklist_reason = Some(reason);
380        true
381    }
382
383    #[cfg(test)]
384    pub fn seed_entry(&mut self, entry: BackbonePeerStateEntry) {
385        let mut state = PeerBehaviorState::new();
386        state.connected_count = entry.connected_count;
387        state.reject_count = entry.reject_count;
388        state.blacklist_reason = entry.blacklist_reason;
389        if let Some(remaining) = entry.blacklisted_remaining_secs {
390            state.blacklisted_until = Some(Instant::now() + Duration::from_secs_f64(remaining));
391        }
392        self.peers.insert(entry.peer_ip, state);
393    }
394}
395
396#[derive(Clone, Copy)]
397enum DisconnectReason {
398    RemoteClosed,
399    IdleTimeout,
400    WriteStall,
401}
402
403/// Main poll event loop.
404fn poll_loop(
405    listener: TcpListener,
406    name: String,
407    server_interface_id: InterfaceId,
408    tx: EventSender,
409    next_id: Arc<AtomicU64>,
410    runtime: Arc<Mutex<BackboneServerRuntime>>,
411    peer_state: Arc<Mutex<BackbonePeerMonitor>>,
412    ingress_control: IngressControlConfig,
413) -> io::Result<()> {
414    let poller = Poller::new()?;
415
416    const LISTENER_KEY: usize = 0;
417
418    // SAFETY: listener outlives its registration in the poller.
419    unsafe { poller.add(&listener, PollEvent::readable(LISTENER_KEY))? };
420
421    let mut clients: HashMap<usize, ClientState> = HashMap::new();
422    let mut peers: HashMap<IpAddr, PeerBehaviorState> = HashMap::new();
423    let mut events = Events::new();
424    let mut next_key: usize = 1;
425
426    loop {
427        let runtime_snapshot = runtime.lock().unwrap().clone();
428        let max_connections = runtime_snapshot.max_connections;
429        let idle_timeout = runtime_snapshot.idle_timeout;
430        cleanup_peer_state(&mut peers);
431        {
432            let mut monitor = peer_state.lock().unwrap();
433            monitor.sync_into(&mut peers);
434            monitor.upsert_snapshot(&peers);
435        }
436
437        events.clear();
438        poller.wait(&mut events, Some(Duration::from_secs(1)))?;
439
440        for ev in events.iter() {
441            if ev.key == LISTENER_KEY {
442                // Accept new connections
443                loop {
444                    match listener.accept() {
445                        Ok((stream, peer_addr)) => {
446                            let peer_ip = peer_addr.ip();
447                            let peer_port = peer_addr.port();
448
449                            if is_ip_blacklisted(&mut peers, peer_ip) {
450                                if let Some(state) = peers.get_mut(&peer_ip) {
451                                    state.reject_count = state.reject_count.saturating_add(1);
452                                }
453                                peer_state.lock().unwrap().upsert_snapshot(&peers);
454                                log::debug!("[{}] rejecting blacklisted peer {}", name, peer_addr);
455                                drop(stream);
456                                continue;
457                            }
458
459                            if let Some(max) = max_connections {
460                                if clients.len() >= max {
461                                    log::warn!(
462                                        "[{}] max connections ({}) reached, rejecting {}",
463                                        name,
464                                        max,
465                                        peer_addr
466                                    );
467                                    drop(stream);
468                                    continue;
469                                }
470                            }
471
472                            stream.set_nonblocking(true).ok();
473                            stream.set_nodelay(true).ok();
474                            set_tcp_keepalive(&stream).ok();
475
476                            // Prevent SIGPIPE on macOS when writing to broken pipes
477                            #[cfg(target_os = "macos")]
478                            {
479                                let sock = SockRef::from(&stream);
480                                sock.set_nosigpipe(true).ok();
481                            }
482
483                            let key = next_key;
484                            next_key += 1;
485                            let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
486
487                            log::info!(
488                                "[{}] backbone client connected: {} → id {}",
489                                name,
490                                peer_addr,
491                                client_id.0
492                            );
493
494                            // Register client with poller
495                            // SAFETY: stream is stored in ClientState and outlives registration.
496                            if let Err(e) = unsafe { poller.add(&stream, PollEvent::readable(key)) }
497                            {
498                                log::warn!("[{}] failed to add client to poller: {}", name, e);
499                                continue; // stream drops, closing socket
500                            }
501
502                            // Create writer via try_clone (cross-platform dup)
503                            let writer_stream = match stream.try_clone() {
504                                Ok(s) => s,
505                                Err(e) => {
506                                    log::warn!("[{}] failed to clone client stream: {}", name, e);
507                                    let _ = poller.delete(&stream);
508                                    continue; // stream drops
509                                }
510                            };
511                            let write_stall_flag = Arc::new(AtomicBool::new(false));
512                            let writer: Box<dyn Writer> = Box::new(BackboneWriter {
513                                stream: writer_stream,
514                                runtime: Arc::clone(&runtime),
515                                interface_name: name.clone(),
516                                interface_id: client_id,
517                                event_tx: tx.clone(),
518                                pending: Vec::new(),
519                                stall_started: None,
520                                disconnect_notified: false,
521                                write_stall_flag: Arc::clone(&write_stall_flag),
522                            });
523
524                            clients.insert(
525                                key,
526                                ClientState {
527                                    id: client_id,
528                                    peer_ip,
529                                    peer_port,
530                                    stream,
531                                    decoder: hdlc::Decoder::new(),
532                                    connected_at: Instant::now(),
533                                    has_received_data: false,
534                                    write_stall_flag,
535                                },
536                            );
537                            peers
538                                .entry(peer_ip)
539                                .or_insert_with(PeerBehaviorState::new)
540                                .connected_count += 1;
541                            peer_state.lock().unwrap().upsert_snapshot(&peers);
542                            let _ = tx.send(Event::BackbonePeerConnected {
543                                server_interface_id,
544                                peer_interface_id: client_id,
545                                peer_ip,
546                                peer_port,
547                            });
548
549                            let info = InterfaceInfo {
550                                id: client_id,
551                                name: format!("BackboneInterface/{}", client_id.0),
552                                mode: constants::MODE_FULL,
553                                out_capable: true,
554                                in_capable: true,
555                                bitrate: Some(1_000_000_000), // 1 Gbps guess
556                                announce_rate_target: None,
557                                announce_rate_grace: 0,
558                                announce_rate_penalty: 0.0,
559                                announce_cap: constants::ANNOUNCE_CAP,
560                                is_local_client: false,
561                                wants_tunnel: false,
562                                tunnel_id: None,
563                                mtu: 65535,
564                                ia_freq: 0.0,
565                                started: 0.0,
566                                ingress_control,
567                            };
568
569                            if tx
570                                .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
571                                .is_err()
572                            {
573                                // Driver shut down
574                                cleanup(&poller, &clients, &listener);
575                                return Ok(());
576                            }
577                        }
578                        Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
579                        Err(e) => {
580                            log::warn!("[{}] accept error: {}", name, e);
581                            break;
582                        }
583                    }
584                }
585                // Re-arm listener (oneshot semantics)
586                poller.modify(&listener, PollEvent::readable(LISTENER_KEY))?;
587            } else if clients.contains_key(&ev.key) {
588                let key = ev.key;
589                let mut should_remove = false;
590                let mut client_id = InterfaceId(0);
591
592                let mut buf = [0u8; 4096];
593                let read_result = {
594                    let client = clients.get_mut(&key).unwrap();
595                    client.stream.read(&mut buf)
596                };
597
598                match read_result {
599                    Ok(0) | Err(_) => {
600                        if let Some(c) = clients.get(&key) {
601                            client_id = c.id;
602                        }
603                        should_remove = true;
604                    }
605                    Ok(n) => {
606                        let client = clients.get_mut(&key).unwrap();
607                        client_id = client.id;
608                        client.has_received_data = true;
609                        for frame in client.decoder.feed(&buf[..n]) {
610                            if tx
611                                .send(Event::Frame {
612                                    interface_id: client_id,
613                                    data: frame,
614                                })
615                                .is_err()
616                            {
617                                cleanup(&poller, &clients, &listener);
618                                return Ok(());
619                            }
620                        }
621                    }
622                }
623
624                if should_remove {
625                    let reason = if clients
626                        .get(&key)
627                        .is_some_and(|c| c.write_stall_flag.load(Ordering::Relaxed))
628                    {
629                        DisconnectReason::WriteStall
630                    } else {
631                        DisconnectReason::RemoteClosed
632                    };
633                    disconnect_client(
634                        &poller,
635                        &mut clients,
636                        &mut peers,
637                        &name,
638                        server_interface_id,
639                        &tx,
640                        &peer_state,
641                        key,
642                        client_id,
643                        reason,
644                    );
645                } else if let Some(client) = clients.get(&key) {
646                    // Re-arm client (oneshot semantics)
647                    poller.modify(&client.stream, PollEvent::readable(key))?;
648                }
649            }
650        }
651
652        if let Some(timeout) = idle_timeout {
653            let now = Instant::now();
654            let timed_out: Vec<(usize, InterfaceId)> = clients
655                .iter()
656                .filter_map(|(&key, client)| {
657                    if client.has_received_data || now.duration_since(client.connected_at) < timeout
658                    {
659                        None
660                    } else {
661                        Some((key, client.id))
662                    }
663                })
664                .collect();
665
666            for (key, client_id) in timed_out {
667                disconnect_client(
668                    &poller,
669                    &mut clients,
670                    &mut peers,
671                    &name,
672                    server_interface_id,
673                    &tx,
674                    &peer_state,
675                    key,
676                    client_id,
677                    DisconnectReason::IdleTimeout,
678                );
679            }
680        }
681    }
682}
683
684fn cleanup_peer_state(peers: &mut HashMap<IpAddr, PeerBehaviorState>) {
685    let now = Instant::now();
686    peers.retain(|_, state| {
687        if matches!(state.blacklisted_until, Some(until) if now >= until) {
688            state.blacklisted_until = None;
689            state.blacklist_reason = None;
690        }
691        state.blacklisted_until.is_some() || state.connected_count > 0 || state.reject_count > 0
692    });
693}
694
695fn is_ip_blacklisted(peers: &mut HashMap<IpAddr, PeerBehaviorState>, peer_ip: IpAddr) -> bool {
696    let now = Instant::now();
697    if let Some(state) = peers.get_mut(&peer_ip) {
698        if let Some(until) = state.blacklisted_until {
699            if now < until {
700                return true;
701            }
702            state.blacklisted_until = None;
703        }
704    }
705    false
706}
707
708fn disconnect_client(
709    poller: &Poller,
710    clients: &mut HashMap<usize, ClientState>,
711    peers: &mut HashMap<IpAddr, PeerBehaviorState>,
712    name: &str,
713    server_interface_id: InterfaceId,
714    tx: &EventSender,
715    peer_state: &Arc<Mutex<BackbonePeerMonitor>>,
716    key: usize,
717    client_id: InterfaceId,
718    reason: DisconnectReason,
719) {
720    let Some(client) = clients.remove(&key) else {
721        return;
722    };
723
724    match reason {
725        DisconnectReason::RemoteClosed => {
726            log::info!("[{}] backbone client {} disconnected", name, client_id.0);
727        }
728        DisconnectReason::IdleTimeout => {
729            log::info!(
730                "[{}] backbone client {} disconnected due to idle timeout",
731                name,
732                client_id.0
733            );
734        }
735        DisconnectReason::WriteStall => {
736            // Already logged by BackboneWriter::disconnect_for_write_stall
737        }
738    }
739
740    let _ = poller.delete(&client.stream);
741    // client.stream closes on drop
742    let connected_for = client.connected_at.elapsed();
743    let _ = tx.send(Event::BackbonePeerDisconnected {
744        server_interface_id,
745        peer_interface_id: client.id,
746        peer_ip: client.peer_ip,
747        peer_port: client.peer_port,
748        connected_for,
749        had_received_data: client.has_received_data,
750    });
751    match reason {
752        DisconnectReason::IdleTimeout => {
753            let _ = tx.send(Event::BackbonePeerIdleTimeout {
754                server_interface_id,
755                peer_interface_id: client.id,
756                peer_ip: client.peer_ip,
757                peer_port: client.peer_port,
758                connected_for,
759            });
760        }
761        DisconnectReason::WriteStall => {
762            let _ = tx.send(Event::BackbonePeerWriteStall {
763                server_interface_id,
764                peer_interface_id: client.id,
765                peer_ip: client.peer_ip,
766                peer_port: client.peer_port,
767                connected_for,
768            });
769        }
770        DisconnectReason::RemoteClosed => {}
771    }
772
773    if let Some(state) = peers.get_mut(&client.peer_ip) {
774        state.connected_count = state.connected_count.saturating_sub(1);
775    }
776    peer_state.lock().unwrap().upsert_snapshot(peers);
777    // Writer already sent InterfaceDown for write stalls; avoid duplicate.
778    if !matches!(reason, DisconnectReason::WriteStall) {
779        let _ = tx.send(Event::InterfaceDown(client_id));
780    }
781}
782
783fn set_tcp_keepalive(stream: &TcpStream) -> io::Result<()> {
784    let sock = SockRef::from(stream);
785    let mut keepalive = TcpKeepalive::new()
786        .with_time(Duration::from_secs(5))
787        .with_interval(Duration::from_secs(2));
788    #[cfg(any(target_os = "linux", target_os = "macos"))]
789    {
790        keepalive = keepalive.with_retries(12);
791    }
792    sock.set_tcp_keepalive(&keepalive)
793}
794
795fn cleanup(poller: &Poller, clients: &HashMap<usize, ClientState>, listener: &TcpListener) {
796    for (_, client) in clients {
797        let _ = poller.delete(&client.stream);
798    }
799    let _ = poller.delete(listener);
800}
801
802// ---------------------------------------------------------------------------
803// Client mode
804// ---------------------------------------------------------------------------
805
806/// Configuration for a backbone client interface.
807#[derive(Debug, Clone)]
808pub struct BackboneClientConfig {
809    pub name: String,
810    pub target_host: String,
811    pub target_port: u16,
812    pub interface_id: InterfaceId,
813    pub reconnect_wait: Duration,
814    pub max_reconnect_tries: Option<u32>,
815    pub connect_timeout: Duration,
816    pub transport_identity: Option<String>,
817    pub runtime: Arc<Mutex<BackboneClientRuntime>>,
818}
819
820#[derive(Debug, Clone)]
821pub struct BackboneClientRuntime {
822    pub reconnect_wait: Duration,
823    pub max_reconnect_tries: Option<u32>,
824    pub connect_timeout: Duration,
825}
826
827impl BackboneClientRuntime {
828    pub fn from_config(config: &BackboneClientConfig) -> Self {
829        Self {
830            reconnect_wait: config.reconnect_wait,
831            max_reconnect_tries: config.max_reconnect_tries,
832            connect_timeout: config.connect_timeout,
833        }
834    }
835}
836
837#[derive(Debug, Clone)]
838pub struct BackboneClientRuntimeConfigHandle {
839    pub interface_name: String,
840    pub runtime: Arc<Mutex<BackboneClientRuntime>>,
841    pub startup: BackboneClientRuntime,
842}
843
844impl Default for BackboneClientConfig {
845    fn default() -> Self {
846        let mut config = BackboneClientConfig {
847            name: String::new(),
848            target_host: "127.0.0.1".into(),
849            target_port: 4242,
850            interface_id: InterfaceId(0),
851            reconnect_wait: Duration::from_secs(5),
852            max_reconnect_tries: None,
853            connect_timeout: Duration::from_secs(5),
854            transport_identity: None,
855            runtime: Arc::new(Mutex::new(BackboneClientRuntime {
856                reconnect_wait: Duration::from_secs(5),
857                max_reconnect_tries: None,
858                connect_timeout: Duration::from_secs(5),
859            })),
860        };
861        let startup = BackboneClientRuntime::from_config(&config);
862        config.runtime = Arc::new(Mutex::new(startup));
863        config
864    }
865}
866
867/// Writer that sends HDLC-framed data over a TCP stream (client mode).
868struct BackboneClientWriter {
869    stream: TcpStream,
870}
871
872impl Writer for BackboneClientWriter {
873    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
874        self.stream.write_all(&hdlc::frame(data))
875    }
876}
877
878/// Try to connect to the target host:port with timeout.
879fn try_connect_client(config: &BackboneClientConfig) -> io::Result<TcpStream> {
880    let runtime = config.runtime.lock().unwrap().clone();
881    let addr_str = format!("{}:{}", config.target_host, config.target_port);
882    let addr = addr_str
883        .to_socket_addrs()?
884        .next()
885        .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "no addresses resolved"))?;
886
887    let stream = TcpStream::connect_timeout(&addr, runtime.connect_timeout)?;
888    stream.set_nodelay(true)?;
889    set_tcp_keepalive(&stream).ok();
890
891    // Prevent SIGPIPE on macOS when writing to broken pipes
892    #[cfg(target_os = "macos")]
893    {
894        let sock = SockRef::from(&stream);
895        sock.set_nosigpipe(true).ok();
896    }
897
898    Ok(stream)
899}
900
901/// Connect and start the reader thread. Returns the writer for the driver.
902pub fn start_client(config: BackboneClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
903    let stream = try_connect_client(&config)?;
904    let reader_stream = stream.try_clone()?;
905    let writer_stream = stream.try_clone()?;
906
907    let id = config.interface_id;
908    log::info!(
909        "[{}] backbone client connected to {}:{}",
910        config.name,
911        config.target_host,
912        config.target_port
913    );
914
915    // Initial connect: writer is None because it's returned directly to the caller
916    let _ = tx.send(Event::InterfaceUp(id, None, None));
917
918    thread::Builder::new()
919        .name(format!("backbone-client-{}", id.0))
920        .spawn(move || {
921            client_reader_loop(reader_stream, config, tx);
922        })?;
923
924    Ok(Box::new(BackboneClientWriter {
925        stream: writer_stream,
926    }))
927}
928
929/// Reader thread: reads from socket, HDLC-decodes, sends frames to driver.
930/// On disconnect, attempts reconnection.
931fn client_reader_loop(mut stream: TcpStream, config: BackboneClientConfig, tx: EventSender) {
932    let id = config.interface_id;
933    let mut decoder = hdlc::Decoder::new();
934    let mut buf = [0u8; 4096];
935
936    loop {
937        match stream.read(&mut buf) {
938            Ok(0) => {
939                log::warn!("[{}] connection closed", config.name);
940                let _ = tx.send(Event::InterfaceDown(id));
941                match client_reconnect(&config, &tx) {
942                    Some(new_stream) => {
943                        stream = new_stream;
944                        decoder = hdlc::Decoder::new();
945                        continue;
946                    }
947                    None => {
948                        log::error!("[{}] reconnection failed, giving up", config.name);
949                        return;
950                    }
951                }
952            }
953            Ok(n) => {
954                for frame in decoder.feed(&buf[..n]) {
955                    if tx
956                        .send(Event::Frame {
957                            interface_id: id,
958                            data: frame,
959                        })
960                        .is_err()
961                    {
962                        return;
963                    }
964                }
965            }
966            Err(e) => {
967                log::warn!("[{}] read error: {}", config.name, e);
968                let _ = tx.send(Event::InterfaceDown(id));
969                match client_reconnect(&config, &tx) {
970                    Some(new_stream) => {
971                        stream = new_stream;
972                        decoder = hdlc::Decoder::new();
973                        continue;
974                    }
975                    None => {
976                        log::error!("[{}] reconnection failed, giving up", config.name);
977                        return;
978                    }
979                }
980            }
981        }
982    }
983}
984
985/// Maximum backoff multiplier: `base_delay * 2^MAX_BACKOFF_SHIFT`.
986/// With a 5 s base this caps at 5 × 2^6 = 320 s ≈ 5 min.
987const MAX_BACKOFF_SHIFT: u32 = 6;
988
989/// Attempt to reconnect with exponential backoff and jitter.
990/// Returns the new reader stream on success.
991/// Sends the new writer to the driver via InterfaceUp event.
992fn client_reconnect(config: &BackboneClientConfig, tx: &EventSender) -> Option<TcpStream> {
993    let mut attempts = 0u32;
994    loop {
995        let runtime = config.runtime.lock().unwrap().clone();
996
997        let shift = attempts.min(MAX_BACKOFF_SHIFT);
998        let backoff = runtime.reconnect_wait * 2u32.pow(shift);
999        // Add ±25 % jitter to avoid thundering-herd reconnects.
1000        let jitter_range = backoff / 4;
1001        let jitter = if jitter_range.as_nanos() > 0 {
1002            let offset = Duration::from_nanos(
1003                (std::hash::RandomState::new().build_hasher().finish()
1004                    % jitter_range.as_nanos() as u64)
1005                    * 2,
1006            );
1007            if offset > jitter_range {
1008                backoff + (offset - jitter_range)
1009            } else {
1010                backoff - (jitter_range - offset)
1011            }
1012        } else {
1013            backoff
1014        };
1015        thread::sleep(jitter);
1016
1017        attempts += 1;
1018
1019        if let Some(max) = runtime.max_reconnect_tries {
1020            if attempts > max {
1021                let _ = tx.send(Event::InterfaceDown(config.interface_id));
1022                return None;
1023            }
1024        }
1025
1026        log::info!(
1027            "[{}] reconnect attempt {} (backoff {:.1}s) ...",
1028            config.name,
1029            attempts,
1030            jitter.as_secs_f64(),
1031        );
1032
1033        match try_connect_client(config) {
1034            Ok(new_stream) => {
1035                let writer_stream = match new_stream.try_clone() {
1036                    Ok(s) => s,
1037                    Err(e) => {
1038                        log::warn!("[{}] failed to clone stream: {}", config.name, e);
1039                        continue;
1040                    }
1041                };
1042                log::info!(
1043                    "[{}] reconnected after {} attempt(s)",
1044                    config.name,
1045                    attempts
1046                );
1047                let new_writer: Box<dyn Writer> = Box::new(BackboneClientWriter {
1048                    stream: writer_stream,
1049                });
1050                let _ = tx.send(Event::InterfaceUp(
1051                    config.interface_id,
1052                    Some(new_writer),
1053                    None,
1054                ));
1055                return Some(new_stream);
1056            }
1057            Err(e) => {
1058                log::warn!("[{}] reconnect failed: {}", config.name, e);
1059            }
1060        }
1061    }
1062}
1063
1064// ---------------------------------------------------------------------------
1065// Factory
1066// ---------------------------------------------------------------------------
1067
1068/// Internal enum used by [`BackboneInterfaceFactory`] to carry either a
1069/// server or client config through the opaque `InterfaceConfigData` channel.
1070#[derive(Clone)]
1071pub(crate) enum BackboneMode {
1072    Server(BackboneConfig),
1073    Client(BackboneClientConfig),
1074}
1075
1076/// Factory for `BackboneInterface`.
1077///
1078/// If the config params contain `"remote"` or `"target_host"` the interface
1079/// is started in client mode; otherwise it is started as a TCP listener
1080/// (server mode).
1081pub struct BackboneInterfaceFactory;
1082
1083fn parse_positive_duration_secs(params: &HashMap<String, String>, key: &str) -> Option<Duration> {
1084    params
1085        .get(key)
1086        .and_then(|v| v.parse::<f64>().ok())
1087        .filter(|v| *v > 0.0)
1088        .map(Duration::from_secs_f64)
1089}
1090
1091impl InterfaceFactory for BackboneInterfaceFactory {
1092    fn type_name(&self) -> &str {
1093        "BackboneInterface"
1094    }
1095
1096    fn parse_config(
1097        &self,
1098        name: &str,
1099        id: InterfaceId,
1100        params: &HashMap<String, String>,
1101    ) -> Result<Box<dyn InterfaceConfigData>, String> {
1102        if let Some(target_host) = params.get("remote").or_else(|| params.get("target_host")) {
1103            // Client mode
1104            let target_host = target_host.clone();
1105            let target_port = params
1106                .get("target_port")
1107                .or_else(|| params.get("port"))
1108                .and_then(|v| v.parse().ok())
1109                .unwrap_or(4242);
1110            let transport_identity = params.get("transport_identity").cloned();
1111            Ok(Box::new(BackboneMode::Client(BackboneClientConfig {
1112                name: name.to_string(),
1113                target_host,
1114                target_port,
1115                interface_id: id,
1116                transport_identity,
1117                ..BackboneClientConfig::default()
1118            })))
1119        } else {
1120            // Server mode
1121            let listen_ip = params
1122                .get("listen_ip")
1123                .or_else(|| params.get("device"))
1124                .cloned()
1125                .unwrap_or_else(|| "0.0.0.0".into());
1126            let listen_port = params
1127                .get("listen_port")
1128                .or_else(|| params.get("port"))
1129                .and_then(|v| v.parse().ok())
1130                .unwrap_or(4242);
1131            let max_connections = params.get("max_connections").and_then(|v| v.parse().ok());
1132            let idle_timeout = parse_positive_duration_secs(params, "idle_timeout");
1133            let write_stall_timeout = parse_positive_duration_secs(params, "write_stall_timeout");
1134            let abuse = BackboneAbuseConfig {
1135                max_penalty_duration: parse_positive_duration_secs(params, "max_penalty_duration"),
1136            };
1137            let mut config = BackboneConfig {
1138                name: name.to_string(),
1139                listen_ip,
1140                listen_port,
1141                interface_id: id,
1142                max_connections,
1143                idle_timeout,
1144                write_stall_timeout,
1145                abuse,
1146                ingress_control: IngressControlConfig::enabled(),
1147                runtime: Arc::new(Mutex::new(BackboneServerRuntime {
1148                    max_connections: None,
1149                    idle_timeout: None,
1150                    write_stall_timeout: None,
1151                    abuse: BackboneAbuseConfig::default(),
1152                })),
1153                peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
1154            };
1155            let startup = BackboneServerRuntime::from_config(&config);
1156            config.runtime = Arc::new(Mutex::new(startup));
1157            Ok(Box::new(BackboneMode::Server(config)))
1158        }
1159    }
1160
1161    fn start(
1162        &self,
1163        config: Box<dyn InterfaceConfigData>,
1164        ctx: StartContext,
1165    ) -> io::Result<StartResult> {
1166        let mode = *config.into_any().downcast::<BackboneMode>().map_err(|_| {
1167            io::Error::new(
1168                io::ErrorKind::InvalidData,
1169                "wrong config type for BackboneInterface",
1170            )
1171        })?;
1172
1173        match mode {
1174            BackboneMode::Client(cfg) => {
1175                let id = cfg.interface_id;
1176                let name = cfg.name.clone();
1177                let info = InterfaceInfo {
1178                    id,
1179                    name,
1180                    mode: ctx.mode,
1181                    out_capable: true,
1182                    in_capable: true,
1183                    bitrate: Some(1_000_000_000),
1184                    announce_rate_target: None,
1185                    announce_rate_grace: 0,
1186                    announce_rate_penalty: 0.0,
1187                    announce_cap: constants::ANNOUNCE_CAP,
1188                    is_local_client: false,
1189                    wants_tunnel: false,
1190                    tunnel_id: None,
1191                    mtu: 65535,
1192                    ingress_control: ctx.ingress_control,
1193                    ia_freq: 0.0,
1194                    started: crate::time::now(),
1195                };
1196                let writer = start_client(cfg, ctx.tx)?;
1197                Ok(StartResult::Simple {
1198                    id,
1199                    info,
1200                    writer,
1201                    interface_type_name: "BackboneInterface".to_string(),
1202                })
1203            }
1204            BackboneMode::Server(mut cfg) => {
1205                cfg.ingress_control = ctx.ingress_control;
1206                start(cfg, ctx.tx, ctx.next_dynamic_id)?;
1207                Ok(StartResult::Listener { control: None })
1208            }
1209        }
1210    }
1211}
1212
1213pub(crate) fn runtime_handle_from_mode(mode: &BackboneMode) -> Option<BackboneRuntimeConfigHandle> {
1214    match mode {
1215        BackboneMode::Server(config) => Some(BackboneRuntimeConfigHandle {
1216            interface_name: config.name.clone(),
1217            runtime: Arc::clone(&config.runtime),
1218            startup: BackboneServerRuntime::from_config(config),
1219        }),
1220        BackboneMode::Client(_) => None,
1221    }
1222}
1223
1224pub(crate) fn peer_state_handle_from_mode(mode: &BackboneMode) -> Option<BackbonePeerStateHandle> {
1225    match mode {
1226        BackboneMode::Server(config) => Some(BackbonePeerStateHandle {
1227            interface_id: config.interface_id,
1228            interface_name: config.name.clone(),
1229            peer_state: Arc::clone(&config.peer_state),
1230        }),
1231        BackboneMode::Client(_) => None,
1232    }
1233}
1234
1235pub(crate) fn client_runtime_handle_from_mode(
1236    mode: &BackboneMode,
1237) -> Option<BackboneClientRuntimeConfigHandle> {
1238    match mode {
1239        BackboneMode::Client(config) => Some(BackboneClientRuntimeConfigHandle {
1240            interface_name: config.name.clone(),
1241            runtime: Arc::clone(&config.runtime),
1242            startup: BackboneClientRuntime::from_config(config),
1243        }),
1244        BackboneMode::Server(_) => None,
1245    }
1246}
1247
1248pub(crate) fn client_config_from_mode(mode: &BackboneMode) -> Option<BackboneClientConfig> {
1249    match mode {
1250        BackboneMode::Client(config) => Some(config.clone()),
1251        BackboneMode::Server(_) => None,
1252    }
1253}
1254
1255#[cfg(test)]
1256mod tests {
1257    use super::*;
1258    use std::sync::mpsc;
1259    use std::time::Duration;
1260
1261    fn find_free_port() -> u16 {
1262        TcpListener::bind("127.0.0.1:0")
1263            .unwrap()
1264            .local_addr()
1265            .unwrap()
1266            .port()
1267    }
1268
1269    fn recv_non_peer_event(
1270        rx: &mpsc::Receiver<Event>,
1271        timeout: Duration,
1272    ) -> Result<Event, mpsc::RecvTimeoutError> {
1273        let deadline = Instant::now() + timeout;
1274        loop {
1275            let remaining = deadline.saturating_duration_since(Instant::now());
1276            if remaining.is_zero() {
1277                return Err(mpsc::RecvTimeoutError::Timeout);
1278            }
1279            let event = rx.recv_timeout(remaining)?;
1280            match event {
1281                Event::BackbonePeerConnected { .. }
1282                | Event::BackbonePeerDisconnected { .. }
1283                | Event::BackbonePeerIdleTimeout { .. }
1284                | Event::BackbonePeerWriteStall { .. }
1285                | Event::BackbonePeerPenalty { .. } => continue,
1286                other => return Ok(other),
1287            }
1288        }
1289    }
1290
1291    fn make_server_config(
1292        port: u16,
1293        interface_id: u64,
1294        max_connections: Option<usize>,
1295        idle_timeout: Option<Duration>,
1296        write_stall_timeout: Option<Duration>,
1297        abuse: BackboneAbuseConfig,
1298    ) -> BackboneConfig {
1299        let mut config = BackboneConfig {
1300            name: "test-backbone".into(),
1301            listen_ip: "127.0.0.1".into(),
1302            listen_port: port,
1303            interface_id: InterfaceId(interface_id),
1304            max_connections,
1305            idle_timeout,
1306            write_stall_timeout,
1307            abuse,
1308            ingress_control: IngressControlConfig::enabled(),
1309            runtime: Arc::new(Mutex::new(BackboneServerRuntime {
1310                max_connections: None,
1311                idle_timeout: None,
1312                write_stall_timeout: None,
1313                abuse: BackboneAbuseConfig::default(),
1314            })),
1315            peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
1316        };
1317        let startup = BackboneServerRuntime::from_config(&config);
1318        config.runtime = Arc::new(Mutex::new(startup));
1319        config
1320    }
1321
1322    #[test]
1323    fn backbone_accept_connection() {
1324        let port = find_free_port();
1325        let (tx, rx) = crate::event::channel();
1326        let next_id = Arc::new(AtomicU64::new(8000));
1327
1328        let config = make_server_config(port, 80, None, None, None, BackboneAbuseConfig::default());
1329
1330        start(config, tx, next_id).unwrap();
1331        thread::sleep(Duration::from_millis(50));
1332
1333        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1334
1335        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1336        match event {
1337            Event::InterfaceUp(id, writer, info) => {
1338                assert_eq!(id, InterfaceId(8000));
1339                assert!(writer.is_some());
1340                assert!(info.is_some());
1341                let info = info.unwrap();
1342                assert!(info.out_capable);
1343                assert!(info.in_capable);
1344            }
1345            other => panic!("expected InterfaceUp, got {:?}", other),
1346        }
1347    }
1348
1349    #[test]
1350    fn backbone_receive_frame() {
1351        let port = find_free_port();
1352        let (tx, rx) = crate::event::channel();
1353        let next_id = Arc::new(AtomicU64::new(8100));
1354
1355        let config = make_server_config(port, 81, None, None, None, BackboneAbuseConfig::default());
1356
1357        start(config, tx, next_id).unwrap();
1358        thread::sleep(Duration::from_millis(50));
1359
1360        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1361
1362        // Drain InterfaceUp
1363        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1364
1365        // Send HDLC frame (>= 19 bytes)
1366        let payload: Vec<u8> = (0..32).collect();
1367        client.write_all(&hdlc::frame(&payload)).unwrap();
1368
1369        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1370        match event {
1371            Event::Frame { interface_id, data } => {
1372                assert_eq!(interface_id, InterfaceId(8100));
1373                assert_eq!(data, payload);
1374            }
1375            other => panic!("expected Frame, got {:?}", other),
1376        }
1377    }
1378
1379    #[test]
1380    fn backbone_send_to_client() {
1381        let port = find_free_port();
1382        let (tx, rx) = crate::event::channel();
1383        let next_id = Arc::new(AtomicU64::new(8200));
1384
1385        let config = make_server_config(port, 82, None, None, None, BackboneAbuseConfig::default());
1386
1387        start(config, tx, next_id).unwrap();
1388        thread::sleep(Duration::from_millis(50));
1389
1390        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1391        client
1392            .set_read_timeout(Some(Duration::from_secs(2)))
1393            .unwrap();
1394
1395        // Get writer from InterfaceUp
1396        let event = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1397        let mut writer = match event {
1398            Event::InterfaceUp(_, Some(w), _) => w,
1399            other => panic!("expected InterfaceUp with writer, got {:?}", other),
1400        };
1401
1402        // Send frame via writer
1403        let payload: Vec<u8> = (0..24).collect();
1404        writer.send_frame(&payload).unwrap();
1405
1406        // Read from client
1407        let mut buf = [0u8; 256];
1408        let n = client.read(&mut buf).unwrap();
1409        let expected = hdlc::frame(&payload);
1410        assert_eq!(&buf[..n], &expected[..]);
1411    }
1412
1413    #[test]
1414    fn backbone_multiple_clients() {
1415        let port = find_free_port();
1416        let (tx, rx) = crate::event::channel();
1417        let next_id = Arc::new(AtomicU64::new(8300));
1418
1419        let config = make_server_config(port, 83, None, None, None, BackboneAbuseConfig::default());
1420
1421        start(config, tx, next_id).unwrap();
1422        thread::sleep(Duration::from_millis(50));
1423
1424        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1425        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1426
1427        let mut ids = Vec::new();
1428        for _ in 0..2 {
1429            let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1430            match event {
1431                Event::InterfaceUp(id, _, _) => ids.push(id),
1432                other => panic!("expected InterfaceUp, got {:?}", other),
1433            }
1434        }
1435
1436        assert_eq!(ids.len(), 2);
1437        assert_ne!(ids[0], ids[1]);
1438    }
1439
1440    #[test]
1441    fn backbone_client_disconnect() {
1442        let port = find_free_port();
1443        let (tx, rx) = crate::event::channel();
1444        let next_id = Arc::new(AtomicU64::new(8400));
1445
1446        let config = make_server_config(port, 84, None, None, None, BackboneAbuseConfig::default());
1447
1448        start(config, tx, next_id).unwrap();
1449        thread::sleep(Duration::from_millis(50));
1450
1451        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1452
1453        // Drain InterfaceUp
1454        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1455
1456        // Disconnect
1457        drop(client);
1458
1459        // Should receive InterfaceDown
1460        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1461        assert!(
1462            matches!(event, Event::InterfaceDown(InterfaceId(8400))),
1463            "expected InterfaceDown(8400), got {:?}",
1464            event
1465        );
1466    }
1467
1468    #[test]
1469    fn backbone_epoll_multiplexing() {
1470        let port = find_free_port();
1471        let (tx, rx) = crate::event::channel();
1472        let next_id = Arc::new(AtomicU64::new(8500));
1473
1474        let config = make_server_config(port, 85, None, None, None, BackboneAbuseConfig::default());
1475
1476        start(config, tx, next_id).unwrap();
1477        thread::sleep(Duration::from_millis(50));
1478
1479        let mut client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1480        let mut client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1481
1482        // Drain both InterfaceUp events
1483        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1484        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1485
1486        // Both clients send data simultaneously
1487        let payload1: Vec<u8> = (0..24).collect();
1488        let payload2: Vec<u8> = (100..130).collect();
1489        client1.write_all(&hdlc::frame(&payload1)).unwrap();
1490        client2.write_all(&hdlc::frame(&payload2)).unwrap();
1491
1492        // Should receive both Frame events
1493        let mut received = Vec::new();
1494        for _ in 0..2 {
1495            let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1496            match event {
1497                Event::Frame { data, .. } => received.push(data),
1498                other => panic!("expected Frame, got {:?}", other),
1499            }
1500        }
1501        assert!(received.contains(&payload1));
1502        assert!(received.contains(&payload2));
1503    }
1504
1505    #[test]
1506    fn backbone_bind_port() {
1507        let port = find_free_port();
1508        let (tx, _rx) = crate::event::channel();
1509        let next_id = Arc::new(AtomicU64::new(8600));
1510
1511        let config = make_server_config(port, 86, None, None, None, BackboneAbuseConfig::default());
1512
1513        // Should not error
1514        start(config, tx, next_id).unwrap();
1515    }
1516
1517    #[test]
1518    fn backbone_hdlc_fragmented() {
1519        let port = find_free_port();
1520        let (tx, rx) = crate::event::channel();
1521        let next_id = Arc::new(AtomicU64::new(8700));
1522
1523        let config = make_server_config(port, 87, None, None, None, BackboneAbuseConfig::default());
1524
1525        start(config, tx, next_id).unwrap();
1526        thread::sleep(Duration::from_millis(50));
1527
1528        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1529        client.set_nodelay(true).unwrap();
1530
1531        // Drain InterfaceUp
1532        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1533
1534        // Send HDLC frame in two fragments
1535        let payload: Vec<u8> = (0..32).collect();
1536        let framed = hdlc::frame(&payload);
1537        let mid = framed.len() / 2;
1538
1539        client.write_all(&framed[..mid]).unwrap();
1540        thread::sleep(Duration::from_millis(50));
1541        client.write_all(&framed[mid..]).unwrap();
1542
1543        // Should receive reassembled frame
1544        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1545        match event {
1546            Event::Frame { data, .. } => {
1547                assert_eq!(data, payload);
1548            }
1549            other => panic!("expected Frame, got {:?}", other),
1550        }
1551    }
1552
1553    // -----------------------------------------------------------------------
1554    // Client mode tests
1555    // -----------------------------------------------------------------------
1556
1557    fn make_client_config(port: u16, id: u64) -> BackboneClientConfig {
1558        BackboneClientConfig {
1559            name: format!("test-bb-client-{}", port),
1560            target_host: "127.0.0.1".into(),
1561            target_port: port,
1562            interface_id: InterfaceId(id),
1563            reconnect_wait: Duration::from_millis(100),
1564            max_reconnect_tries: Some(2),
1565            connect_timeout: Duration::from_secs(2),
1566            transport_identity: None,
1567            runtime: Arc::new(Mutex::new(BackboneClientRuntime {
1568                reconnect_wait: Duration::from_millis(100),
1569                max_reconnect_tries: Some(2),
1570                connect_timeout: Duration::from_secs(2),
1571            })),
1572        }
1573    }
1574
1575    #[test]
1576    fn backbone_client_connect() {
1577        let port = find_free_port();
1578        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1579        let (tx, rx) = crate::event::channel();
1580
1581        let config = make_client_config(port, 9000);
1582        let _writer = start_client(config, tx).unwrap();
1583
1584        let _server_stream = listener.accept().unwrap();
1585
1586        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1587        assert!(matches!(event, Event::InterfaceUp(InterfaceId(9000), _, _)));
1588    }
1589
1590    #[test]
1591    fn backbone_client_receive_frame() {
1592        let port = find_free_port();
1593        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1594        let (tx, rx) = crate::event::channel();
1595
1596        let config = make_client_config(port, 9100);
1597        let _writer = start_client(config, tx).unwrap();
1598
1599        let (mut server_stream, _) = listener.accept().unwrap();
1600
1601        // Drain InterfaceUp
1602        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1603
1604        // Send HDLC frame from server side (>= 19 bytes payload)
1605        let payload: Vec<u8> = (0..32).collect();
1606        server_stream.write_all(&hdlc::frame(&payload)).unwrap();
1607
1608        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1609        match event {
1610            Event::Frame { interface_id, data } => {
1611                assert_eq!(interface_id, InterfaceId(9100));
1612                assert_eq!(data, payload);
1613            }
1614            other => panic!("expected Frame, got {:?}", other),
1615        }
1616    }
1617
1618    #[test]
1619    fn backbone_client_send_frame() {
1620        let port = find_free_port();
1621        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1622        let (tx, _rx) = crate::event::channel();
1623
1624        let config = make_client_config(port, 9200);
1625        let mut writer = start_client(config, tx).unwrap();
1626
1627        let (mut server_stream, _) = listener.accept().unwrap();
1628        server_stream
1629            .set_read_timeout(Some(Duration::from_secs(2)))
1630            .unwrap();
1631
1632        let payload: Vec<u8> = (0..24).collect();
1633        writer.send_frame(&payload).unwrap();
1634
1635        let mut buf = [0u8; 256];
1636        let n = server_stream.read(&mut buf).unwrap();
1637        let expected = hdlc::frame(&payload);
1638        assert_eq!(&buf[..n], &expected[..]);
1639    }
1640
1641    #[test]
1642    fn backbone_max_connections_rejects_excess() {
1643        let port = find_free_port();
1644        let (tx, rx) = crate::event::channel();
1645        let next_id = Arc::new(AtomicU64::new(8800));
1646
1647        let config = make_server_config(
1648            port,
1649            88,
1650            Some(2),
1651            None,
1652            None,
1653            BackboneAbuseConfig::default(),
1654        );
1655
1656        start(config, tx, next_id).unwrap();
1657        thread::sleep(Duration::from_millis(50));
1658
1659        // Connect two clients (at limit)
1660        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1661        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1662
1663        // Drain both InterfaceUp events
1664        for _ in 0..2 {
1665            let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1666            assert!(matches!(event, Event::InterfaceUp(_, _, _)));
1667        }
1668
1669        // Third connection should be accepted at TCP level but immediately dropped
1670        let client3 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1671        client3
1672            .set_read_timeout(Some(Duration::from_millis(500)))
1673            .unwrap();
1674
1675        // Give server time to reject
1676        thread::sleep(Duration::from_millis(100));
1677
1678        // Should NOT receive a third InterfaceUp
1679        let result = recv_non_peer_event(&rx, Duration::from_millis(500));
1680        assert!(
1681            result.is_err(),
1682            "expected no InterfaceUp for rejected connection, got {:?}",
1683            result
1684        );
1685    }
1686
1687    #[test]
1688    fn backbone_max_connections_allows_after_disconnect() {
1689        let port = find_free_port();
1690        let (tx, rx) = crate::event::channel();
1691        let next_id = Arc::new(AtomicU64::new(8900));
1692
1693        let config = make_server_config(
1694            port,
1695            89,
1696            Some(1),
1697            None,
1698            None,
1699            BackboneAbuseConfig::default(),
1700        );
1701
1702        start(config, tx, next_id).unwrap();
1703        thread::sleep(Duration::from_millis(50));
1704
1705        // Connect first client
1706        let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1707        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1708        assert!(matches!(event, Event::InterfaceUp(_, _, _)));
1709
1710        // Disconnect first client
1711        drop(client1);
1712
1713        // Wait for InterfaceDown
1714        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1715        assert!(matches!(event, Event::InterfaceDown(_)));
1716
1717        // Now a new connection should be accepted
1718        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1719        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1720        assert!(
1721            matches!(event, Event::InterfaceUp(_, _, _)),
1722            "expected InterfaceUp after slot freed, got {:?}",
1723            event
1724        );
1725    }
1726
1727    #[test]
1728    fn backbone_client_reconnect() {
1729        let port = find_free_port();
1730        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1731        listener.set_nonblocking(false).unwrap();
1732        let (tx, rx) = crate::event::channel();
1733
1734        let config = make_client_config(port, 9300);
1735        let _writer = start_client(config, tx).unwrap();
1736
1737        // Accept first connection and immediately close it
1738        let (server_stream, _) = listener.accept().unwrap();
1739
1740        // Drain InterfaceUp
1741        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1742
1743        drop(server_stream);
1744
1745        // Should get InterfaceDown
1746        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1747        assert!(matches!(event, Event::InterfaceDown(InterfaceId(9300))));
1748
1749        // Accept the reconnection
1750        let _server_stream2 = listener.accept().unwrap();
1751
1752        // Should get InterfaceUp again
1753        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1754        assert!(matches!(event, Event::InterfaceUp(InterfaceId(9300), _, _)));
1755    }
1756
1757    #[test]
1758    fn backbone_idle_timeout_disconnects_silent_client() {
1759        let port = find_free_port();
1760        let (tx, rx) = crate::event::channel();
1761        let next_id = Arc::new(AtomicU64::new(9400));
1762
1763        let config = make_server_config(
1764            port,
1765            94,
1766            None,
1767            Some(Duration::from_millis(150)),
1768            None,
1769            BackboneAbuseConfig::default(),
1770        );
1771
1772        start(config, tx, next_id).unwrap();
1773        thread::sleep(Duration::from_millis(50));
1774
1775        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1776
1777        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1778        let client_id = match event {
1779            Event::InterfaceUp(id, _, _) => id,
1780            other => panic!("expected InterfaceUp, got {:?}", other),
1781        };
1782
1783        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1784        assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1785    }
1786
1787    #[test]
1788    fn backbone_idle_timeout_ignores_client_after_data() {
1789        let port = find_free_port();
1790        let (tx, rx) = crate::event::channel();
1791        let next_id = Arc::new(AtomicU64::new(9500));
1792
1793        let config = make_server_config(
1794            port,
1795            95,
1796            None,
1797            Some(Duration::from_millis(200)),
1798            None,
1799            BackboneAbuseConfig::default(),
1800        );
1801
1802        start(config, tx, next_id).unwrap();
1803        thread::sleep(Duration::from_millis(50));
1804
1805        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1806
1807        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1808        let client_id = match event {
1809            Event::InterfaceUp(id, _, _) => id,
1810            other => panic!("expected InterfaceUp, got {:?}", other),
1811        };
1812
1813        client.write_all(&hdlc::frame(&[1u8; 24])).unwrap();
1814
1815        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1816        match event {
1817            Event::Frame { interface_id, data } => {
1818                assert_eq!(interface_id, client_id);
1819                assert_eq!(data, vec![1u8; 24]);
1820            }
1821            other => panic!("expected Frame, got {:?}", other),
1822        }
1823
1824        let result = recv_non_peer_event(&rx, Duration::from_millis(500));
1825        assert!(
1826            result.is_err(),
1827            "expected no InterfaceDown after client sent data, got {:?}",
1828            result
1829        );
1830    }
1831
1832    #[test]
1833    fn backbone_runtime_idle_timeout_updates_live() {
1834        let port = find_free_port();
1835        let (tx, rx) = crate::event::channel();
1836        let next_id = Arc::new(AtomicU64::new(9650));
1837
1838        let config = make_server_config(port, 97, None, None, None, BackboneAbuseConfig::default());
1839        let runtime = Arc::clone(&config.runtime);
1840
1841        start(config, tx, next_id).unwrap();
1842        thread::sleep(Duration::from_millis(50));
1843
1844        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1845        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1846        let client_id = match event {
1847            Event::InterfaceUp(id, _, _) => id,
1848            other => panic!("expected InterfaceUp, got {:?}", other),
1849        };
1850
1851        {
1852            let mut runtime = runtime.lock().unwrap();
1853            runtime.idle_timeout = Some(Duration::from_millis(150));
1854        }
1855
1856        let event = recv_non_peer_event(&rx, Duration::from_secs(4)).unwrap();
1857        assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1858    }
1859
1860    #[test]
1861    fn backbone_write_stall_timeout_disconnects_unwritable_client() {
1862        let port = find_free_port();
1863        let (tx, rx) = crate::event::channel();
1864        let next_id = Arc::new(AtomicU64::new(9660));
1865
1866        let config = make_server_config(
1867            port,
1868            98,
1869            None,
1870            None,
1871            Some(Duration::from_millis(50)),
1872            BackboneAbuseConfig::default(),
1873        );
1874
1875        start(config, tx, next_id).unwrap();
1876        thread::sleep(Duration::from_millis(50));
1877
1878        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1879        client
1880            .set_read_timeout(Some(Duration::from_millis(100)))
1881            .unwrap();
1882        let sock = SockRef::from(&client);
1883        sock.set_recv_buffer_size(4096).ok();
1884
1885        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1886        let (client_id, mut writer) = match event {
1887            Event::InterfaceUp(id, Some(writer), _) => (id, writer),
1888            other => panic!("expected InterfaceUp with writer, got {:?}", other),
1889        };
1890
1891        let payload = vec![0x55; 512 * 1024];
1892        let deadline = Instant::now() + Duration::from_secs(3);
1893        let mut stalled = false;
1894        while Instant::now() < deadline {
1895            match writer.send_frame(&payload) {
1896                Ok(()) => {}
1897                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1898                    thread::sleep(Duration::from_millis(10));
1899                }
1900                Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {
1901                    stalled = true;
1902                    break;
1903                }
1904                Err(e) => panic!("unexpected send error: {}", e),
1905            }
1906        }
1907
1908        assert!(stalled, "expected writer to time out on persistent stall");
1909        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1910        assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1911    }
1912
1913    /// Drain events matching a predicate, return the first match.
1914    fn wait_for<F>(rx: &mpsc::Receiver<Event>, timeout: Duration, mut pred: F) -> Option<Event>
1915    where
1916        F: FnMut(&Event) -> bool,
1917    {
1918        let deadline = Instant::now() + timeout;
1919        loop {
1920            let remaining = deadline.saturating_duration_since(Instant::now());
1921            if remaining.is_zero() {
1922                return None;
1923            }
1924            match rx.recv_timeout(remaining) {
1925                Ok(event) if pred(&event) => return Some(event),
1926                Ok(_) => continue,
1927                Err(_) => return None,
1928            }
1929        }
1930    }
1931
1932    #[test]
1933    fn backbone_write_stall_emits_peer_events() {
1934        let port = find_free_port();
1935        let (tx, rx) = crate::event::channel();
1936        let next_id = Arc::new(AtomicU64::new(9700));
1937
1938        let config = make_server_config(
1939            port,
1940            97,
1941            None,
1942            None,
1943            Some(Duration::from_millis(50)), // 50ms stall timeout
1944            BackboneAbuseConfig::default(),
1945        );
1946
1947        start(config, tx, next_id).unwrap();
1948        thread::sleep(Duration::from_millis(50));
1949
1950        // Connect a client that won't read (will cause write stall)
1951        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1952        client
1953            .set_read_timeout(Some(Duration::from_millis(100)))
1954            .unwrap();
1955        let sock = SockRef::from(&client);
1956        sock.set_recv_buffer_size(4096).ok();
1957
1958        // Wait for InterfaceUp and grab writer
1959        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1960        let mut writer = match event {
1961            Event::InterfaceUp(_, Some(w), _) => w,
1962            other => panic!("expected InterfaceUp with writer, got {:?}", other),
1963        };
1964
1965        // Flood until stall
1966        let payload = vec![0x55; 512 * 1024];
1967        let deadline = Instant::now() + Duration::from_secs(3);
1968        while Instant::now() < deadline {
1969            match writer.send_frame(&payload) {
1970                Ok(()) | Err(_) => {
1971                    if Instant::now() + Duration::from_millis(10) > deadline {
1972                        break;
1973                    }
1974                    thread::sleep(Duration::from_millis(10));
1975                }
1976            }
1977        }
1978
1979        // Should see BackbonePeerWriteStall event
1980        let stall_event = wait_for(&rx, Duration::from_secs(3), |e| {
1981            matches!(e, Event::BackbonePeerWriteStall { .. })
1982        });
1983        assert!(
1984            stall_event.is_some(),
1985            "expected BackbonePeerWriteStall event"
1986        );
1987    }
1988
1989    #[test]
1990    fn backbone_blacklisted_peer_rejected_on_connect() {
1991        let port = find_free_port();
1992        let (tx, rx) = crate::event::channel();
1993        let next_id = Arc::new(AtomicU64::new(9800));
1994
1995        let config = make_server_config(port, 98, None, None, None, BackboneAbuseConfig::default());
1996        let peer_state = config.peer_state.clone();
1997
1998        start(config, tx.clone(), next_id).unwrap();
1999        thread::sleep(Duration::from_millis(50));
2000
2001        // First connection should succeed
2002        let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
2003        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
2004        assert!(
2005            matches!(event, Event::InterfaceUp(_, _, _)),
2006            "first connection should succeed"
2007        );
2008        drop(client1);
2009
2010        // Drain disconnect events
2011        thread::sleep(Duration::from_millis(100));
2012        while rx.try_recv().is_ok() {}
2013
2014        // Blacklist 127.0.0.1 via the peer monitor
2015        peer_state.lock().unwrap().blacklist(
2016            "127.0.0.1".parse().unwrap(),
2017            Duration::from_secs(60),
2018            "test blacklist".into(),
2019        );
2020
2021        // Second connection from same IP should be rejected (no InterfaceUp)
2022        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
2023        // Give poll loop time to reject
2024        thread::sleep(Duration::from_millis(200));
2025
2026        // Should NOT get an InterfaceUp — connection should have been rejected
2027        let event = rx.try_recv();
2028        match event {
2029            Ok(Event::InterfaceUp(_, _, _)) => {
2030                panic!("blacklisted peer should not get InterfaceUp")
2031            }
2032            _ => {} // Expected: no InterfaceUp
2033        }
2034    }
2035
2036    #[test]
2037    fn backbone_parse_config_reads_abuse_settings() {
2038        let factory = BackboneInterfaceFactory;
2039        let mut params = HashMap::new();
2040        params.insert("listen_ip".into(), "127.0.0.1".into());
2041        params.insert("listen_port".into(), "4242".into());
2042        params.insert("idle_timeout".into(), "15".into());
2043        params.insert("write_stall_timeout".into(), "45".into());
2044        params.insert("max_penalty_duration".into(), "3600".into());
2045
2046        let config = factory
2047            .parse_config("test-backbone", InterfaceId(97), &params)
2048            .unwrap();
2049        let mode = *config.into_any().downcast::<BackboneMode>().unwrap();
2050
2051        match mode {
2052            BackboneMode::Server(config) => {
2053                assert_eq!(config.listen_ip, "127.0.0.1");
2054                assert_eq!(config.listen_port, 4242);
2055                assert_eq!(config.idle_timeout, Some(Duration::from_secs(15)));
2056                assert_eq!(config.write_stall_timeout, Some(Duration::from_secs(45)));
2057                assert_eq!(
2058                    config.abuse.max_penalty_duration,
2059                    Some(Duration::from_secs(3600))
2060                );
2061            }
2062            BackboneMode::Client(_) => panic!("expected server config"),
2063        }
2064    }
2065}