Skip to main content

rns_net/interface/
backbone.rs

1//! Backbone TCP mesh interface using cross-platform polling.
2//!
3//! Server mode: listens on a TCP port, accepts peer connections, spawns
4//! dynamic per-peer interfaces. Uses a single poll thread to multiplex
5//! all client sockets. HDLC framing for packet boundaries.
6//!
7//! Client mode: connects to a remote backbone server, single TCP connection
8//! with HDLC framing. Reconnects on disconnect.
9//!
10//! Matches Python `BackboneInterface.py`.
11
12use std::collections::HashMap;
13use std::hash::{BuildHasher, Hasher};
14use std::io::{self, Read, Write};
15use std::net::{IpAddr, Shutdown, TcpListener, TcpStream, ToSocketAddrs};
16use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
17use std::sync::{Arc, Mutex};
18use std::thread;
19use std::time::{Duration, Instant};
20
21use polling::{Event as PollEvent, Events, Poller};
22use socket2::{SockRef, TcpKeepalive};
23
24use rns_core::constants;
25use rns_core::transport::types::{IngressControlConfig, InterfaceId, InterfaceInfo};
26
27use crate::event::{Event, EventSender};
28use crate::hdlc;
29use crate::interface::{
30    lock_or_recover, InterfaceConfigData, InterfaceFactory, StartContext, StartResult, Writer,
31};
32use crate::BackbonePeerStateEntry;
33
34/// HW_MTU: 1 MB (matches Python BackboneInterface.HW_MTU)
35#[allow(dead_code)]
36const HW_MTU: usize = 1_048_576;
37
38/// Configuration for a backbone interface.
39#[derive(Debug, Clone)]
40pub struct BackboneConfig {
41    pub name: String,
42    pub listen_ip: String,
43    pub listen_port: u16,
44    pub interface_id: InterfaceId,
45    pub max_connections: Option<usize>,
46    pub idle_timeout: Option<Duration>,
47    pub write_stall_timeout: Option<Duration>,
48    pub abuse: BackboneAbuseConfig,
49    pub ingress_control: IngressControlConfig,
50    pub runtime: Arc<Mutex<BackboneServerRuntime>>,
51    pub peer_state: Arc<Mutex<BackbonePeerMonitor>>,
52}
53
54/// Configurable behavior-based abuse detection for inbound peers.
55#[derive(Debug, Clone, Default)]
56pub struct BackboneAbuseConfig {
57    pub max_penalty_duration: Option<Duration>,
58}
59
60/// Live runtime state for a backbone server interface.
61#[derive(Debug, Clone)]
62pub struct BackboneServerRuntime {
63    pub max_connections: Option<usize>,
64    pub idle_timeout: Option<Duration>,
65    pub write_stall_timeout: Option<Duration>,
66    pub abuse: BackboneAbuseConfig,
67}
68
69impl BackboneServerRuntime {
70    pub fn from_config(config: &BackboneConfig) -> Self {
71        Self {
72            max_connections: config.max_connections,
73            idle_timeout: config.idle_timeout,
74            write_stall_timeout: config.write_stall_timeout,
75            abuse: config.abuse.clone(),
76        }
77    }
78}
79
80#[derive(Debug, Clone)]
81pub struct BackboneRuntimeConfigHandle {
82    pub interface_name: String,
83    pub runtime: Arc<Mutex<BackboneServerRuntime>>,
84    pub startup: BackboneServerRuntime,
85}
86
87#[derive(Debug, Clone)]
88pub struct BackbonePeerStateHandle {
89    pub interface_id: InterfaceId,
90    pub interface_name: String,
91    pub peer_state: Arc<Mutex<BackbonePeerMonitor>>,
92}
93
94impl Default for BackboneConfig {
95    fn default() -> Self {
96        let mut config = BackboneConfig {
97            name: String::new(),
98            listen_ip: "0.0.0.0".into(),
99            listen_port: 0,
100            interface_id: InterfaceId(0),
101            max_connections: None,
102            idle_timeout: None,
103            write_stall_timeout: None,
104            abuse: BackboneAbuseConfig::default(),
105            ingress_control: IngressControlConfig::enabled(),
106            runtime: Arc::new(Mutex::new(BackboneServerRuntime {
107                max_connections: None,
108                idle_timeout: None,
109                write_stall_timeout: None,
110                abuse: BackboneAbuseConfig::default(),
111            })),
112            peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
113        };
114        let startup = BackboneServerRuntime::from_config(&config);
115        config.runtime = Arc::new(Mutex::new(startup));
116        config
117    }
118}
119
120/// Maximum pending buffer size per client (512 KB). Clients exceeding this are
121/// disconnected to prevent unbounded memory growth from slow readers.
122const MAX_PENDING_BYTES: usize = 512 * 1024;
123
124/// Writer that sends HDLC-framed data over a cloned TCP stream (server mode).
125struct BackboneWriter {
126    stream: TcpStream,
127    runtime: Arc<Mutex<BackboneServerRuntime>>,
128    interface_name: String,
129    interface_id: InterfaceId,
130    event_tx: EventSender,
131    pending: Vec<u8>,
132    stall_started: Option<Instant>,
133    disconnect_notified: bool,
134    write_stall_flag: Arc<AtomicBool>,
135}
136
137impl Writer for BackboneWriter {
138    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
139        let write_stall_timeout =
140            lock_or_recover(&self.runtime, "backbone runtime").write_stall_timeout;
141        if !self.pending.is_empty() {
142            self.flush_pending(write_stall_timeout)?;
143            if !self.pending.is_empty() {
144                return Err(io::Error::new(
145                    io::ErrorKind::WouldBlock,
146                    "backbone writer still stalled",
147                ));
148            }
149        }
150
151        let frame = hdlc::frame(data);
152        self.write_buffer(&frame, write_stall_timeout)
153    }
154}
155
156impl BackboneWriter {
157    fn write_buffer(
158        &mut self,
159        data: &[u8],
160        write_stall_timeout: Option<Duration>,
161    ) -> io::Result<()> {
162        let mut written = 0usize;
163        while written < data.len() {
164            match self.stream.write(&data[written..]) {
165                Ok(0) => {
166                    return Err(io::Error::new(
167                        io::ErrorKind::WriteZero,
168                        "backbone writer wrote zero bytes",
169                    ))
170                }
171                Ok(n) => {
172                    written += n;
173                    self.stall_started = None;
174                }
175                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
176                    let now = Instant::now();
177                    let started = self.stall_started.get_or_insert(now);
178                    if let Some(timeout) = write_stall_timeout {
179                        if now.duration_since(*started) >= timeout {
180                            return Err(self.disconnect_for_write_stall(timeout));
181                        }
182                    }
183                    if self.pending.len() + data[written..].len() > MAX_PENDING_BYTES {
184                        return Err(self.disconnect_for_write_stall(
185                            write_stall_timeout.unwrap_or(Duration::from_secs(30)),
186                        ));
187                    }
188                    self.pending.extend_from_slice(&data[written..]);
189                    return Err(io::Error::new(
190                        io::ErrorKind::WouldBlock,
191                        "backbone writer would block",
192                    ));
193                }
194                Err(e) => return Err(e),
195            }
196        }
197        Ok(())
198    }
199
200    fn flush_pending(&mut self, write_stall_timeout: Option<Duration>) -> io::Result<()> {
201        if self.pending.is_empty() {
202            return Ok(());
203        }
204
205        let pending = std::mem::take(&mut self.pending);
206        match self.write_buffer(&pending, write_stall_timeout) {
207            Ok(()) => Ok(()),
208            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(()),
209            Err(e) => Err(e),
210        }
211    }
212
213    fn disconnect_for_write_stall(&mut self, timeout: Duration) -> io::Error {
214        if !self.disconnect_notified {
215            log::warn!(
216                "[{}] backbone client {} disconnected due to write stall timeout ({:?})",
217                self.interface_name,
218                self.interface_id.0,
219                timeout
220            );
221            self.write_stall_flag.store(true, Ordering::Relaxed);
222            let _ = self.stream.shutdown(Shutdown::Both);
223            let _ = self.event_tx.send(Event::InterfaceDown(self.interface_id));
224            self.disconnect_notified = true;
225        }
226        io::Error::new(
227            io::ErrorKind::TimedOut,
228            format!("backbone writer stalled for {:?}", timeout),
229        )
230    }
231}
232
233/// Start a backbone interface. Binds TCP listener, spawns poll thread.
234pub fn start(config: BackboneConfig, tx: EventSender, next_id: Arc<AtomicU64>) -> io::Result<()> {
235    let addr = format!("{}:{}", config.listen_ip, config.listen_port);
236    let listener = TcpListener::bind(&addr)?;
237    listener.set_nonblocking(true)?;
238
239    log::info!(
240        "[{}] backbone server listening on {}",
241        config.name,
242        listener
243            .local_addr()
244            .unwrap_or_else(|_| std::net::SocketAddr::from(([0, 0, 0, 0], config.listen_port)))
245    );
246
247    let name = config.name.clone();
248    let server_interface_id = config.interface_id;
249    let runtime = Arc::clone(&config.runtime);
250    let peer_state = Arc::clone(&config.peer_state);
251    let ingress_control = config.ingress_control;
252    thread::Builder::new()
253        .name(format!("backbone-poll-{}", config.interface_id.0))
254        .spawn(move || {
255            if let Err(e) = poll_loop(
256                listener,
257                name,
258                server_interface_id,
259                tx,
260                next_id,
261                runtime,
262                peer_state,
263                ingress_control,
264            ) {
265                log::error!("backbone poll loop error: {}", e);
266            }
267        })?;
268
269    Ok(())
270}
271
272/// Per-client tracking state.
273struct ClientState {
274    id: InterfaceId,
275    peer_ip: IpAddr,
276    peer_port: u16,
277    stream: TcpStream,
278    decoder: hdlc::Decoder,
279    connected_at: Instant,
280    has_received_data: bool,
281    write_stall_flag: Arc<AtomicBool>,
282}
283
284#[derive(Debug, Clone)]
285struct PeerBehaviorState {
286    blacklisted_until: Option<Instant>,
287    blacklist_reason: Option<String>,
288    reject_count: u64,
289    connected_count: usize,
290}
291
292impl PeerBehaviorState {
293    fn new() -> Self {
294        Self {
295            blacklisted_until: None,
296            blacklist_reason: None,
297            reject_count: 0,
298            connected_count: 0,
299        }
300    }
301}
302
303#[derive(Debug, Clone, Default)]
304pub struct BackbonePeerMonitor {
305    peers: HashMap<IpAddr, PeerBehaviorState>,
306}
307
308impl BackbonePeerMonitor {
309    pub fn new() -> Self {
310        Self {
311            peers: HashMap::new(),
312        }
313    }
314
315    fn upsert_snapshot(&mut self, peers: &HashMap<IpAddr, PeerBehaviorState>) {
316        let mut merged = self.peers.clone();
317
318        for (peer_ip, state) in peers {
319            let entry = merged
320                .entry(*peer_ip)
321                .or_insert_with(PeerBehaviorState::new);
322            entry.connected_count = state.connected_count;
323            entry.reject_count = state.reject_count;
324            if state.blacklisted_until.is_some() {
325                entry.blacklisted_until = state.blacklisted_until;
326                entry.blacklist_reason = state.blacklist_reason.clone();
327            }
328        }
329
330        merged.retain(|peer_ip, state| {
331            peers.contains_key(peer_ip)
332                || state.blacklisted_until.is_some()
333                || state.reject_count > 0
334        });
335        self.peers = merged;
336    }
337
338    fn sync_into(&self, peers: &mut HashMap<IpAddr, PeerBehaviorState>) {
339        for (peer_ip, state) in &self.peers {
340            let entry = peers.entry(*peer_ip).or_insert_with(PeerBehaviorState::new);
341            entry.blacklisted_until = state.blacklisted_until;
342            entry.blacklist_reason = state.blacklist_reason.clone();
343            entry.reject_count = state.reject_count;
344        }
345
346        peers.retain(|peer_ip, state| {
347            if state.connected_count > 0 {
348                return true;
349            }
350            self.peers.contains_key(peer_ip)
351        });
352    }
353
354    pub fn list(&self, interface_name: &str) -> Vec<BackbonePeerStateEntry> {
355        let now = Instant::now();
356        let mut entries: Vec<BackbonePeerStateEntry> = self
357            .peers
358            .iter()
359            .map(|(peer_ip, state)| BackbonePeerStateEntry {
360                interface_name: interface_name.to_string(),
361                peer_ip: *peer_ip,
362                connected_count: state.connected_count,
363                blacklisted_remaining_secs: state
364                    .blacklisted_until
365                    .and_then(|until| (until > now).then(|| (until - now).as_secs_f64())),
366                blacklist_reason: state.blacklist_reason.clone(),
367                reject_count: state.reject_count,
368            })
369            .collect();
370        entries.sort_by(|a, b| a.peer_ip.cmp(&b.peer_ip));
371        entries
372    }
373
374    pub fn clear(&mut self, peer_ip: IpAddr) -> bool {
375        self.peers.remove(&peer_ip).is_some()
376    }
377
378    pub fn blacklist(&mut self, peer_ip: IpAddr, duration: Duration, reason: String) -> bool {
379        let state = self
380            .peers
381            .entry(peer_ip)
382            .or_insert_with(PeerBehaviorState::new);
383        state.blacklisted_until = Some(Instant::now() + duration);
384        state.blacklist_reason = Some(reason);
385        true
386    }
387
388    #[cfg(test)]
389    pub fn seed_entry(&mut self, entry: BackbonePeerStateEntry) {
390        let mut state = PeerBehaviorState::new();
391        state.connected_count = entry.connected_count;
392        state.reject_count = entry.reject_count;
393        state.blacklist_reason = entry.blacklist_reason;
394        if let Some(remaining) = entry.blacklisted_remaining_secs {
395            state.blacklisted_until = Some(Instant::now() + Duration::from_secs_f64(remaining));
396        }
397        self.peers.insert(entry.peer_ip, state);
398    }
399}
400
401#[derive(Clone, Copy)]
402enum DisconnectReason {
403    RemoteClosed,
404    IdleTimeout,
405    WriteStall,
406}
407
408/// Main poll event loop.
409fn poll_loop(
410    listener: TcpListener,
411    name: String,
412    server_interface_id: InterfaceId,
413    tx: EventSender,
414    next_id: Arc<AtomicU64>,
415    runtime: Arc<Mutex<BackboneServerRuntime>>,
416    peer_state: Arc<Mutex<BackbonePeerMonitor>>,
417    ingress_control: IngressControlConfig,
418) -> io::Result<()> {
419    let poller = Poller::new()?;
420
421    const LISTENER_KEY: usize = 0;
422
423    // SAFETY: listener outlives its registration in the poller.
424    unsafe { poller.add(&listener, PollEvent::readable(LISTENER_KEY))? };
425
426    let mut clients: HashMap<usize, ClientState> = HashMap::new();
427    let mut peers: HashMap<IpAddr, PeerBehaviorState> = HashMap::new();
428    let mut events = Events::new();
429    let mut next_key: usize = 1;
430
431    loop {
432        let runtime_snapshot = runtime.lock().unwrap().clone();
433        let max_connections = runtime_snapshot.max_connections;
434        let idle_timeout = runtime_snapshot.idle_timeout;
435        cleanup_peer_state(&mut peers);
436        {
437            let mut monitor = peer_state.lock().unwrap();
438            monitor.sync_into(&mut peers);
439            monitor.upsert_snapshot(&peers);
440        }
441
442        events.clear();
443        poller.wait(&mut events, Some(Duration::from_secs(1)))?;
444
445        for ev in events.iter() {
446            if ev.key == LISTENER_KEY {
447                // Accept new connections
448                loop {
449                    match listener.accept() {
450                        Ok((stream, peer_addr)) => {
451                            let peer_ip = peer_addr.ip();
452                            let peer_port = peer_addr.port();
453
454                            if is_ip_blacklisted(&mut peers, peer_ip) {
455                                if let Some(state) = peers.get_mut(&peer_ip) {
456                                    state.reject_count = state.reject_count.saturating_add(1);
457                                }
458                                peer_state.lock().unwrap().upsert_snapshot(&peers);
459                                log::debug!("[{}] rejecting blacklisted peer {}", name, peer_addr);
460                                drop(stream);
461                                continue;
462                            }
463
464                            if let Some(max) = max_connections {
465                                if clients.len() >= max {
466                                    log::warn!(
467                                        "[{}] max connections ({}) reached, rejecting {}",
468                                        name,
469                                        max,
470                                        peer_addr
471                                    );
472                                    drop(stream);
473                                    continue;
474                                }
475                            }
476
477                            stream.set_nonblocking(true).ok();
478                            stream.set_nodelay(true).ok();
479                            set_tcp_keepalive(&stream).ok();
480
481                            // Prevent SIGPIPE on macOS when writing to broken pipes
482                            #[cfg(target_os = "macos")]
483                            {
484                                let sock = SockRef::from(&stream);
485                                sock.set_nosigpipe(true).ok();
486                            }
487
488                            let key = next_key;
489                            next_key += 1;
490                            let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
491
492                            log::info!(
493                                "[{}] backbone client connected: {} → id {}",
494                                name,
495                                peer_addr,
496                                client_id.0
497                            );
498
499                            // Register client with poller
500                            // SAFETY: stream is stored in ClientState and outlives registration.
501                            if let Err(e) = unsafe { poller.add(&stream, PollEvent::readable(key)) }
502                            {
503                                log::warn!("[{}] failed to add client to poller: {}", name, e);
504                                continue; // stream drops, closing socket
505                            }
506
507                            // Create writer via try_clone (cross-platform dup)
508                            let writer_stream = match stream.try_clone() {
509                                Ok(s) => s,
510                                Err(e) => {
511                                    log::warn!("[{}] failed to clone client stream: {}", name, e);
512                                    let _ = poller.delete(&stream);
513                                    continue; // stream drops
514                                }
515                            };
516                            let write_stall_flag = Arc::new(AtomicBool::new(false));
517                            let writer: Box<dyn Writer> = Box::new(BackboneWriter {
518                                stream: writer_stream,
519                                runtime: Arc::clone(&runtime),
520                                interface_name: name.clone(),
521                                interface_id: client_id,
522                                event_tx: tx.clone(),
523                                pending: Vec::new(),
524                                stall_started: None,
525                                disconnect_notified: false,
526                                write_stall_flag: Arc::clone(&write_stall_flag),
527                            });
528
529                            clients.insert(
530                                key,
531                                ClientState {
532                                    id: client_id,
533                                    peer_ip,
534                                    peer_port,
535                                    stream,
536                                    decoder: hdlc::Decoder::new(),
537                                    connected_at: Instant::now(),
538                                    has_received_data: false,
539                                    write_stall_flag,
540                                },
541                            );
542                            peers
543                                .entry(peer_ip)
544                                .or_insert_with(PeerBehaviorState::new)
545                                .connected_count += 1;
546                            peer_state.lock().unwrap().upsert_snapshot(&peers);
547                            let _ = tx.send(Event::BackbonePeerConnected {
548                                server_interface_id,
549                                peer_interface_id: client_id,
550                                peer_ip,
551                                peer_port,
552                            });
553
554                            let info = InterfaceInfo {
555                                id: client_id,
556                                name: format!("BackboneInterface/{}", client_id.0),
557                                mode: constants::MODE_FULL,
558                                out_capable: true,
559                                in_capable: true,
560                                bitrate: Some(1_000_000_000), // 1 Gbps guess
561                                airtime_profile: None,
562                                announce_rate_target: None,
563                                announce_rate_grace: 0,
564                                announce_rate_penalty: 0.0,
565                                announce_cap: constants::ANNOUNCE_CAP,
566                                is_local_client: false,
567                                wants_tunnel: false,
568                                tunnel_id: None,
569                                mtu: 65535,
570                                ia_freq: 0.0,
571                                ip_freq: 0.0,
572                                op_freq: 0.0,
573                                op_samples: 0,
574                                started: 0.0,
575                                ingress_control,
576                            };
577
578                            if tx
579                                .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
580                                .is_err()
581                            {
582                                // Driver shut down
583                                cleanup(&poller, &clients, &listener);
584                                return Ok(());
585                            }
586                        }
587                        Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
588                        Err(e) => {
589                            log::warn!("[{}] accept error: {}", name, e);
590                            break;
591                        }
592                    }
593                }
594                // Re-arm listener (oneshot semantics)
595                poller.modify(&listener, PollEvent::readable(LISTENER_KEY))?;
596            } else if clients.contains_key(&ev.key) {
597                let key = ev.key;
598                let mut should_remove = false;
599                let mut client_id = InterfaceId(0);
600
601                let mut buf = [0u8; 4096];
602                let read_result = {
603                    let client = clients.get_mut(&key).unwrap();
604                    client.stream.read(&mut buf)
605                };
606
607                match read_result {
608                    Ok(0) | Err(_) => {
609                        if let Some(c) = clients.get(&key) {
610                            client_id = c.id;
611                        }
612                        should_remove = true;
613                    }
614                    Ok(n) => {
615                        let client = clients.get_mut(&key).unwrap();
616                        client_id = client.id;
617                        client.has_received_data = true;
618                        for frame in client.decoder.feed(&buf[..n]) {
619                            if tx
620                                .send(Event::Frame {
621                                    interface_id: client_id,
622                                    data: frame,
623                                    rssi: None,
624                                    snr: None,
625                                })
626                                .is_err()
627                            {
628                                cleanup(&poller, &clients, &listener);
629                                return Ok(());
630                            }
631                        }
632                    }
633                }
634
635                if should_remove {
636                    let reason = if clients
637                        .get(&key)
638                        .is_some_and(|c| c.write_stall_flag.load(Ordering::Relaxed))
639                    {
640                        DisconnectReason::WriteStall
641                    } else {
642                        DisconnectReason::RemoteClosed
643                    };
644                    disconnect_client(
645                        &poller,
646                        &mut clients,
647                        &mut peers,
648                        &name,
649                        server_interface_id,
650                        &tx,
651                        &peer_state,
652                        key,
653                        client_id,
654                        reason,
655                    );
656                } else if let Some(client) = clients.get(&key) {
657                    // Re-arm client (oneshot semantics)
658                    poller.modify(&client.stream, PollEvent::readable(key))?;
659                }
660            }
661        }
662
663        if let Some(timeout) = idle_timeout {
664            let now = Instant::now();
665            let timed_out: Vec<(usize, InterfaceId)> = clients
666                .iter()
667                .filter_map(|(&key, client)| {
668                    if client.has_received_data || now.duration_since(client.connected_at) < timeout
669                    {
670                        None
671                    } else {
672                        Some((key, client.id))
673                    }
674                })
675                .collect();
676
677            for (key, client_id) in timed_out {
678                disconnect_client(
679                    &poller,
680                    &mut clients,
681                    &mut peers,
682                    &name,
683                    server_interface_id,
684                    &tx,
685                    &peer_state,
686                    key,
687                    client_id,
688                    DisconnectReason::IdleTimeout,
689                );
690            }
691        }
692    }
693}
694
695fn cleanup_peer_state(peers: &mut HashMap<IpAddr, PeerBehaviorState>) {
696    let now = Instant::now();
697    peers.retain(|_, state| {
698        if matches!(state.blacklisted_until, Some(until) if now >= until) {
699            state.blacklisted_until = None;
700            state.blacklist_reason = None;
701        }
702        state.blacklisted_until.is_some() || state.connected_count > 0 || state.reject_count > 0
703    });
704}
705
706fn is_ip_blacklisted(peers: &mut HashMap<IpAddr, PeerBehaviorState>, peer_ip: IpAddr) -> bool {
707    let now = Instant::now();
708    if let Some(state) = peers.get_mut(&peer_ip) {
709        if let Some(until) = state.blacklisted_until {
710            if now < until {
711                return true;
712            }
713            state.blacklisted_until = None;
714        }
715    }
716    false
717}
718
719fn disconnect_client(
720    poller: &Poller,
721    clients: &mut HashMap<usize, ClientState>,
722    peers: &mut HashMap<IpAddr, PeerBehaviorState>,
723    name: &str,
724    server_interface_id: InterfaceId,
725    tx: &EventSender,
726    peer_state: &Arc<Mutex<BackbonePeerMonitor>>,
727    key: usize,
728    client_id: InterfaceId,
729    reason: DisconnectReason,
730) {
731    let Some(client) = clients.remove(&key) else {
732        return;
733    };
734
735    match reason {
736        DisconnectReason::RemoteClosed => {
737            log::info!("[{}] backbone client {} disconnected", name, client_id.0);
738        }
739        DisconnectReason::IdleTimeout => {
740            log::info!(
741                "[{}] backbone client {} disconnected due to idle timeout",
742                name,
743                client_id.0
744            );
745        }
746        DisconnectReason::WriteStall => {
747            // Already logged by BackboneWriter::disconnect_for_write_stall
748        }
749    }
750
751    let _ = poller.delete(&client.stream);
752    // client.stream closes on drop
753    let connected_for = client.connected_at.elapsed();
754    let _ = tx.send(Event::BackbonePeerDisconnected {
755        server_interface_id,
756        peer_interface_id: client.id,
757        peer_ip: client.peer_ip,
758        peer_port: client.peer_port,
759        connected_for,
760        had_received_data: client.has_received_data,
761    });
762    match reason {
763        DisconnectReason::IdleTimeout => {
764            let _ = tx.send(Event::BackbonePeerIdleTimeout {
765                server_interface_id,
766                peer_interface_id: client.id,
767                peer_ip: client.peer_ip,
768                peer_port: client.peer_port,
769                connected_for,
770            });
771        }
772        DisconnectReason::WriteStall => {
773            let _ = tx.send(Event::BackbonePeerWriteStall {
774                server_interface_id,
775                peer_interface_id: client.id,
776                peer_ip: client.peer_ip,
777                peer_port: client.peer_port,
778                connected_for,
779            });
780        }
781        DisconnectReason::RemoteClosed => {}
782    }
783
784    if let Some(state) = peers.get_mut(&client.peer_ip) {
785        state.connected_count = state.connected_count.saturating_sub(1);
786    }
787    peer_state.lock().unwrap().upsert_snapshot(peers);
788    // Writer already sent InterfaceDown for write stalls; avoid duplicate.
789    if !matches!(reason, DisconnectReason::WriteStall) {
790        let _ = tx.send(Event::InterfaceDown(client_id));
791    }
792}
793
794fn set_tcp_keepalive(stream: &TcpStream) -> io::Result<()> {
795    let sock = SockRef::from(stream);
796    let mut keepalive = TcpKeepalive::new()
797        .with_time(Duration::from_secs(5))
798        .with_interval(Duration::from_secs(2));
799    #[cfg(any(target_os = "linux", target_os = "macos"))]
800    {
801        keepalive = keepalive.with_retries(12);
802    }
803    sock.set_tcp_keepalive(&keepalive)
804}
805
806fn cleanup(poller: &Poller, clients: &HashMap<usize, ClientState>, listener: &TcpListener) {
807    for (_, client) in clients {
808        let _ = poller.delete(&client.stream);
809    }
810    let _ = poller.delete(listener);
811}
812
813// ---------------------------------------------------------------------------
814// Client mode
815// ---------------------------------------------------------------------------
816
817/// Configuration for a backbone client interface.
818#[derive(Debug, Clone)]
819pub struct BackboneClientConfig {
820    pub name: String,
821    pub target_host: String,
822    pub target_port: u16,
823    pub interface_id: InterfaceId,
824    pub reconnect_wait: Duration,
825    pub max_reconnect_tries: Option<u32>,
826    pub connect_timeout: Duration,
827    pub transport_identity: Option<String>,
828    pub runtime: Arc<Mutex<BackboneClientRuntime>>,
829}
830
831#[derive(Debug, Clone)]
832pub struct BackboneClientRuntime {
833    pub reconnect_wait: Duration,
834    pub max_reconnect_tries: Option<u32>,
835    pub connect_timeout: Duration,
836}
837
838impl BackboneClientRuntime {
839    pub fn from_config(config: &BackboneClientConfig) -> Self {
840        Self {
841            reconnect_wait: config.reconnect_wait,
842            max_reconnect_tries: config.max_reconnect_tries,
843            connect_timeout: config.connect_timeout,
844        }
845    }
846}
847
848#[derive(Debug, Clone)]
849pub struct BackboneClientRuntimeConfigHandle {
850    pub interface_name: String,
851    pub runtime: Arc<Mutex<BackboneClientRuntime>>,
852    pub startup: BackboneClientRuntime,
853}
854
855impl Default for BackboneClientConfig {
856    fn default() -> Self {
857        let mut config = BackboneClientConfig {
858            name: String::new(),
859            target_host: "127.0.0.1".into(),
860            target_port: 4242,
861            interface_id: InterfaceId(0),
862            reconnect_wait: Duration::from_secs(5),
863            max_reconnect_tries: None,
864            connect_timeout: Duration::from_secs(5),
865            transport_identity: None,
866            runtime: Arc::new(Mutex::new(BackboneClientRuntime {
867                reconnect_wait: Duration::from_secs(5),
868                max_reconnect_tries: None,
869                connect_timeout: Duration::from_secs(5),
870            })),
871        };
872        let startup = BackboneClientRuntime::from_config(&config);
873        config.runtime = Arc::new(Mutex::new(startup));
874        config
875    }
876}
877
878/// Writer that sends HDLC-framed data over a TCP stream (client mode).
879struct BackboneClientWriter {
880    stream: TcpStream,
881}
882
883impl Writer for BackboneClientWriter {
884    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
885        self.stream.write_all(&hdlc::frame(data))
886    }
887}
888
889/// Try to connect to the target host:port with timeout.
890fn try_connect_client(config: &BackboneClientConfig) -> io::Result<TcpStream> {
891    let runtime = config.runtime.lock().unwrap().clone();
892    let addr_str = format!("{}:{}", config.target_host, config.target_port);
893    let addr = addr_str
894        .to_socket_addrs()?
895        .next()
896        .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "no addresses resolved"))?;
897
898    let stream = TcpStream::connect_timeout(&addr, runtime.connect_timeout)?;
899    stream.set_nodelay(true)?;
900    set_tcp_keepalive(&stream).ok();
901
902    // Prevent SIGPIPE on macOS when writing to broken pipes
903    #[cfg(target_os = "macos")]
904    {
905        let sock = SockRef::from(&stream);
906        sock.set_nosigpipe(true).ok();
907    }
908
909    Ok(stream)
910}
911
912/// Connect and start the reader thread. Returns the writer for the driver.
913pub fn start_client(config: BackboneClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
914    let stream = try_connect_client(&config)?;
915    let reader_stream = stream.try_clone()?;
916    let writer_stream = stream.try_clone()?;
917
918    let id = config.interface_id;
919    log::info!(
920        "[{}] backbone client connected to {}:{}",
921        config.name,
922        config.target_host,
923        config.target_port
924    );
925
926    // Initial connect: writer is None because it's returned directly to the caller
927    let _ = tx.send(Event::InterfaceUp(id, None, None));
928
929    thread::Builder::new()
930        .name(format!("backbone-client-{}", id.0))
931        .spawn(move || {
932            client_reader_loop(reader_stream, config, tx);
933        })?;
934
935    Ok(Box::new(BackboneClientWriter {
936        stream: writer_stream,
937    }))
938}
939
940/// Reader thread: reads from socket, HDLC-decodes, sends frames to driver.
941/// On disconnect, attempts reconnection.
942fn client_reader_loop(mut stream: TcpStream, config: BackboneClientConfig, tx: EventSender) {
943    let id = config.interface_id;
944    let mut decoder = hdlc::Decoder::new();
945    let mut buf = [0u8; 4096];
946
947    loop {
948        match stream.read(&mut buf) {
949            Ok(0) => {
950                log::warn!("[{}] connection closed", config.name);
951                let _ = tx.send(Event::InterfaceDown(id));
952                match client_reconnect(&config, &tx) {
953                    Some(new_stream) => {
954                        stream = new_stream;
955                        decoder = hdlc::Decoder::new();
956                        continue;
957                    }
958                    None => {
959                        log::error!("[{}] reconnection failed, giving up", config.name);
960                        return;
961                    }
962                }
963            }
964            Ok(n) => {
965                for frame in decoder.feed(&buf[..n]) {
966                    if tx
967                        .send(Event::Frame {
968                            interface_id: id,
969                            data: frame,
970                            rssi: None,
971                            snr: None,
972                        })
973                        .is_err()
974                    {
975                        return;
976                    }
977                }
978            }
979            Err(e) => {
980                log::warn!("[{}] read error: {}", config.name, e);
981                let _ = tx.send(Event::InterfaceDown(id));
982                match client_reconnect(&config, &tx) {
983                    Some(new_stream) => {
984                        stream = new_stream;
985                        decoder = hdlc::Decoder::new();
986                        continue;
987                    }
988                    None => {
989                        log::error!("[{}] reconnection failed, giving up", config.name);
990                        return;
991                    }
992                }
993            }
994        }
995    }
996}
997
998/// Maximum backoff multiplier: `base_delay * 2^MAX_BACKOFF_SHIFT`.
999/// With a 5 s base this caps at 5 × 2^6 = 320 s ≈ 5 min.
1000const MAX_BACKOFF_SHIFT: u32 = 6;
1001
1002/// Attempt to reconnect with exponential backoff and jitter.
1003/// Returns the new reader stream on success.
1004/// Sends the new writer to the driver via InterfaceUp event.
1005fn client_reconnect(config: &BackboneClientConfig, tx: &EventSender) -> Option<TcpStream> {
1006    let mut attempts = 0u32;
1007    loop {
1008        let runtime = config.runtime.lock().unwrap().clone();
1009
1010        let shift = attempts.min(MAX_BACKOFF_SHIFT);
1011        let backoff = runtime.reconnect_wait * 2u32.pow(shift);
1012        // Add ±25 % jitter to avoid thundering-herd reconnects.
1013        let jitter_range = backoff / 4;
1014        let jitter = if jitter_range.as_nanos() > 0 {
1015            let offset = Duration::from_nanos(
1016                (std::hash::RandomState::new().build_hasher().finish()
1017                    % jitter_range.as_nanos() as u64)
1018                    * 2,
1019            );
1020            if offset > jitter_range {
1021                backoff + (offset - jitter_range)
1022            } else {
1023                backoff - (jitter_range - offset)
1024            }
1025        } else {
1026            backoff
1027        };
1028        thread::sleep(jitter);
1029
1030        attempts += 1;
1031
1032        if let Some(max) = runtime.max_reconnect_tries {
1033            if attempts > max {
1034                let _ = tx.send(Event::InterfaceDown(config.interface_id));
1035                return None;
1036            }
1037        }
1038
1039        log::info!(
1040            "[{}] reconnect attempt {} (backoff {:.1}s) ...",
1041            config.name,
1042            attempts,
1043            jitter.as_secs_f64(),
1044        );
1045
1046        match try_connect_client(config) {
1047            Ok(new_stream) => {
1048                let writer_stream = match new_stream.try_clone() {
1049                    Ok(s) => s,
1050                    Err(e) => {
1051                        log::warn!("[{}] failed to clone stream: {}", config.name, e);
1052                        continue;
1053                    }
1054                };
1055                log::info!(
1056                    "[{}] reconnected after {} attempt(s)",
1057                    config.name,
1058                    attempts
1059                );
1060                let new_writer: Box<dyn Writer> = Box::new(BackboneClientWriter {
1061                    stream: writer_stream,
1062                });
1063                let _ = tx.send(Event::InterfaceUp(
1064                    config.interface_id,
1065                    Some(new_writer),
1066                    None,
1067                ));
1068                return Some(new_stream);
1069            }
1070            Err(e) => {
1071                log::warn!("[{}] reconnect failed: {}", config.name, e);
1072            }
1073        }
1074    }
1075}
1076
1077// ---------------------------------------------------------------------------
1078// Factory
1079// ---------------------------------------------------------------------------
1080
1081/// Internal enum used by [`BackboneInterfaceFactory`] to carry either a
1082/// server or client config through the opaque `InterfaceConfigData` channel.
1083#[derive(Clone)]
1084pub(crate) enum BackboneMode {
1085    Server(BackboneConfig),
1086    Client(BackboneClientConfig, u8),
1087}
1088
1089/// Factory for `BackboneInterface`.
1090///
1091/// If the config params contain `"remote"` or `"target_host"` the interface
1092/// is started in client mode; otherwise it is started as a TCP listener
1093/// (server mode).
1094pub struct BackboneInterfaceFactory;
1095
1096fn parse_positive_duration_secs(params: &HashMap<String, String>, key: &str) -> Option<Duration> {
1097    params
1098        .get(key)
1099        .and_then(|v| v.parse::<f64>().ok())
1100        .filter(|v| *v > 0.0)
1101        .map(Duration::from_secs_f64)
1102}
1103
1104impl InterfaceFactory for BackboneInterfaceFactory {
1105    fn type_name(&self) -> &str {
1106        "BackboneInterface"
1107    }
1108
1109    fn parse_config(
1110        &self,
1111        name: &str,
1112        id: InterfaceId,
1113        params: &HashMap<String, String>,
1114    ) -> Result<Box<dyn InterfaceConfigData>, String> {
1115        if let Some(target_host) = params.get("remote").or_else(|| params.get("target_host")) {
1116            // Client mode
1117            let target_host = target_host.clone();
1118            let target_port = params
1119                .get("target_port")
1120                .or_else(|| params.get("port"))
1121                .and_then(|v| v.parse().ok())
1122                .unwrap_or(4242);
1123            let transport_identity = params.get("transport_identity").cloned();
1124            let priority = match params.get("priority") {
1125                Some(value) => value.parse::<u8>().map_err(|_| {
1126                    format!(
1127                        "invalid Backbone peer priority '{}' (expected 0..100)",
1128                        value
1129                    )
1130                })?,
1131                None => crate::driver::BACKBONE_PEER_POOL_CONFIGURED_DEFAULT_PRIORITY,
1132            };
1133            if priority > 100 {
1134                return Err(format!(
1135                    "invalid Backbone peer priority '{}' (expected 0..100)",
1136                    priority
1137                ));
1138            }
1139            Ok(Box::new(BackboneMode::Client(
1140                BackboneClientConfig {
1141                    name: name.to_string(),
1142                    target_host,
1143                    target_port,
1144                    interface_id: id,
1145                    transport_identity,
1146                    ..BackboneClientConfig::default()
1147                },
1148                priority,
1149            )))
1150        } else {
1151            // Server mode
1152            let listen_ip = params
1153                .get("listen_ip")
1154                .or_else(|| params.get("device"))
1155                .cloned()
1156                .unwrap_or_else(|| "0.0.0.0".into());
1157            let listen_port = params
1158                .get("listen_port")
1159                .or_else(|| params.get("port"))
1160                .and_then(|v| v.parse().ok())
1161                .unwrap_or(4242);
1162            let max_connections = params.get("max_connections").and_then(|v| v.parse().ok());
1163            let idle_timeout = parse_positive_duration_secs(params, "idle_timeout");
1164            let write_stall_timeout = parse_positive_duration_secs(params, "write_stall_timeout");
1165            let abuse = BackboneAbuseConfig {
1166                max_penalty_duration: parse_positive_duration_secs(params, "max_penalty_duration"),
1167            };
1168            let mut config = BackboneConfig {
1169                name: name.to_string(),
1170                listen_ip,
1171                listen_port,
1172                interface_id: id,
1173                max_connections,
1174                idle_timeout,
1175                write_stall_timeout,
1176                abuse,
1177                ingress_control: IngressControlConfig::enabled(),
1178                runtime: Arc::new(Mutex::new(BackboneServerRuntime {
1179                    max_connections: None,
1180                    idle_timeout: None,
1181                    write_stall_timeout: None,
1182                    abuse: BackboneAbuseConfig::default(),
1183                })),
1184                peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
1185            };
1186            let startup = BackboneServerRuntime::from_config(&config);
1187            config.runtime = Arc::new(Mutex::new(startup));
1188            Ok(Box::new(BackboneMode::Server(config)))
1189        }
1190    }
1191
1192    fn start(
1193        &self,
1194        config: Box<dyn InterfaceConfigData>,
1195        ctx: StartContext,
1196    ) -> io::Result<StartResult> {
1197        let mode = *config.into_any().downcast::<BackboneMode>().map_err(|_| {
1198            io::Error::new(
1199                io::ErrorKind::InvalidData,
1200                "wrong config type for BackboneInterface",
1201            )
1202        })?;
1203
1204        match mode {
1205            BackboneMode::Client(cfg, _) => {
1206                let id = cfg.interface_id;
1207                let name = cfg.name.clone();
1208                let info = InterfaceInfo {
1209                    id,
1210                    name,
1211                    mode: ctx.mode,
1212                    out_capable: true,
1213                    in_capable: true,
1214                    bitrate: Some(1_000_000_000),
1215                    airtime_profile: None,
1216                    announce_rate_target: None,
1217                    announce_rate_grace: 0,
1218                    announce_rate_penalty: 0.0,
1219                    announce_cap: constants::ANNOUNCE_CAP,
1220                    is_local_client: false,
1221                    wants_tunnel: false,
1222                    tunnel_id: None,
1223                    mtu: 65535,
1224                    ingress_control: ctx.ingress_control,
1225                    ia_freq: 0.0,
1226                    ip_freq: 0.0,
1227                    op_freq: 0.0,
1228                    op_samples: 0,
1229                    started: crate::time::now(),
1230                };
1231                let writer = start_client(cfg, ctx.tx)?;
1232                Ok(StartResult::Simple {
1233                    id,
1234                    info,
1235                    writer,
1236                    interface_type_name: "BackboneInterface".to_string(),
1237                })
1238            }
1239            BackboneMode::Server(mut cfg) => {
1240                cfg.ingress_control = ctx.ingress_control;
1241                start(cfg, ctx.tx, ctx.next_dynamic_id)?;
1242                Ok(StartResult::Listener { control: None })
1243            }
1244        }
1245    }
1246}
1247
1248pub(crate) fn runtime_handle_from_mode(mode: &BackboneMode) -> Option<BackboneRuntimeConfigHandle> {
1249    match mode {
1250        BackboneMode::Server(config) => Some(BackboneRuntimeConfigHandle {
1251            interface_name: config.name.clone(),
1252            runtime: Arc::clone(&config.runtime),
1253            startup: BackboneServerRuntime::from_config(config),
1254        }),
1255        BackboneMode::Client(_, _) => None,
1256    }
1257}
1258
1259pub(crate) fn peer_state_handle_from_mode(mode: &BackboneMode) -> Option<BackbonePeerStateHandle> {
1260    match mode {
1261        BackboneMode::Server(config) => Some(BackbonePeerStateHandle {
1262            interface_id: config.interface_id,
1263            interface_name: config.name.clone(),
1264            peer_state: Arc::clone(&config.peer_state),
1265        }),
1266        BackboneMode::Client(_, _) => None,
1267    }
1268}
1269
1270pub(crate) fn client_runtime_handle_from_mode(
1271    mode: &BackboneMode,
1272) -> Option<BackboneClientRuntimeConfigHandle> {
1273    match mode {
1274        BackboneMode::Client(config, _) => Some(BackboneClientRuntimeConfigHandle {
1275            interface_name: config.name.clone(),
1276            runtime: Arc::clone(&config.runtime),
1277            startup: BackboneClientRuntime::from_config(config),
1278        }),
1279        BackboneMode::Server(_) => None,
1280    }
1281}
1282
1283pub(crate) fn client_config_from_mode(mode: &BackboneMode) -> Option<BackboneClientConfig> {
1284    match mode {
1285        BackboneMode::Client(config, _) => Some(config.clone()),
1286        BackboneMode::Server(_) => None,
1287    }
1288}
1289
1290pub(crate) fn client_priority_from_mode(mode: &BackboneMode) -> Option<u8> {
1291    match mode {
1292        BackboneMode::Client(_, priority) => Some(*priority),
1293        BackboneMode::Server(_) => None,
1294    }
1295}
1296
1297#[cfg(test)]
1298mod tests {
1299    use super::*;
1300    use std::sync::mpsc;
1301    use std::time::Duration;
1302
1303    fn find_free_port() -> u16 {
1304        TcpListener::bind("127.0.0.1:0")
1305            .unwrap()
1306            .local_addr()
1307            .unwrap()
1308            .port()
1309    }
1310
1311    fn recv_non_peer_event(
1312        rx: &mpsc::Receiver<Event>,
1313        timeout: Duration,
1314    ) -> Result<Event, mpsc::RecvTimeoutError> {
1315        let deadline = Instant::now() + timeout;
1316        loop {
1317            let remaining = deadline.saturating_duration_since(Instant::now());
1318            if remaining.is_zero() {
1319                return Err(mpsc::RecvTimeoutError::Timeout);
1320            }
1321            let event = rx.recv_timeout(remaining)?;
1322            match event {
1323                Event::BackbonePeerConnected { .. }
1324                | Event::BackbonePeerDisconnected { .. }
1325                | Event::BackbonePeerIdleTimeout { .. }
1326                | Event::BackbonePeerWriteStall { .. }
1327                | Event::BackbonePeerPenalty { .. } => continue,
1328                other => return Ok(other),
1329            }
1330        }
1331    }
1332
1333    fn make_server_config(
1334        port: u16,
1335        interface_id: u64,
1336        max_connections: Option<usize>,
1337        idle_timeout: Option<Duration>,
1338        write_stall_timeout: Option<Duration>,
1339        abuse: BackboneAbuseConfig,
1340    ) -> BackboneConfig {
1341        let mut config = BackboneConfig {
1342            name: "test-backbone".into(),
1343            listen_ip: "127.0.0.1".into(),
1344            listen_port: port,
1345            interface_id: InterfaceId(interface_id),
1346            max_connections,
1347            idle_timeout,
1348            write_stall_timeout,
1349            abuse,
1350            ingress_control: IngressControlConfig::enabled(),
1351            runtime: Arc::new(Mutex::new(BackboneServerRuntime {
1352                max_connections: None,
1353                idle_timeout: None,
1354                write_stall_timeout: None,
1355                abuse: BackboneAbuseConfig::default(),
1356            })),
1357            peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
1358        };
1359        let startup = BackboneServerRuntime::from_config(&config);
1360        config.runtime = Arc::new(Mutex::new(startup));
1361        config
1362    }
1363
1364    #[test]
1365    fn backbone_accept_connection() {
1366        let port = find_free_port();
1367        let (tx, rx) = crate::event::channel();
1368        let next_id = Arc::new(AtomicU64::new(8000));
1369
1370        let config = make_server_config(port, 80, None, None, None, BackboneAbuseConfig::default());
1371
1372        start(config, tx, next_id).unwrap();
1373        thread::sleep(Duration::from_millis(50));
1374
1375        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1376
1377        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1378        match event {
1379            Event::InterfaceUp(id, writer, info) => {
1380                assert_eq!(id, InterfaceId(8000));
1381                assert!(writer.is_some());
1382                assert!(info.is_some());
1383                let info = info.unwrap();
1384                assert!(info.out_capable);
1385                assert!(info.in_capable);
1386            }
1387            other => panic!("expected InterfaceUp, got {:?}", other),
1388        }
1389    }
1390
1391    #[test]
1392    fn backbone_receive_frame() {
1393        let port = find_free_port();
1394        let (tx, rx) = crate::event::channel();
1395        let next_id = Arc::new(AtomicU64::new(8100));
1396
1397        let config = make_server_config(port, 81, None, None, None, BackboneAbuseConfig::default());
1398
1399        start(config, tx, next_id).unwrap();
1400        thread::sleep(Duration::from_millis(50));
1401
1402        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1403
1404        // Drain InterfaceUp
1405        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1406
1407        // Send HDLC frame (>= 19 bytes)
1408        let payload: Vec<u8> = (0..32).collect();
1409        client.write_all(&hdlc::frame(&payload)).unwrap();
1410
1411        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1412        match event {
1413            Event::Frame {
1414                interface_id,
1415                data,
1416                rssi: _,
1417                snr: _,
1418            } => {
1419                assert_eq!(interface_id, InterfaceId(8100));
1420                assert_eq!(data, payload);
1421            }
1422            other => panic!("expected Frame, got {:?}", other),
1423        }
1424    }
1425
1426    #[test]
1427    fn backbone_send_to_client() {
1428        let port = find_free_port();
1429        let (tx, rx) = crate::event::channel();
1430        let next_id = Arc::new(AtomicU64::new(8200));
1431
1432        let config = make_server_config(port, 82, None, None, None, BackboneAbuseConfig::default());
1433
1434        start(config, tx, next_id).unwrap();
1435        thread::sleep(Duration::from_millis(50));
1436
1437        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1438        client
1439            .set_read_timeout(Some(Duration::from_secs(2)))
1440            .unwrap();
1441
1442        // Get writer from InterfaceUp
1443        let event = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1444        let mut writer = match event {
1445            Event::InterfaceUp(_, Some(w), _) => w,
1446            other => panic!("expected InterfaceUp with writer, got {:?}", other),
1447        };
1448
1449        // Send frame via writer
1450        let payload: Vec<u8> = (0..24).collect();
1451        writer.send_frame(&payload).unwrap();
1452
1453        // Read from client
1454        let mut buf = [0u8; 256];
1455        let n = client.read(&mut buf).unwrap();
1456        let expected = hdlc::frame(&payload);
1457        assert_eq!(&buf[..n], &expected[..]);
1458    }
1459
1460    #[test]
1461    fn backbone_multiple_clients() {
1462        let port = find_free_port();
1463        let (tx, rx) = crate::event::channel();
1464        let next_id = Arc::new(AtomicU64::new(8300));
1465
1466        let config = make_server_config(port, 83, None, None, None, BackboneAbuseConfig::default());
1467
1468        start(config, tx, next_id).unwrap();
1469        thread::sleep(Duration::from_millis(50));
1470
1471        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1472        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1473
1474        let mut ids = Vec::new();
1475        for _ in 0..2 {
1476            let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1477            match event {
1478                Event::InterfaceUp(id, _, _) => ids.push(id),
1479                other => panic!("expected InterfaceUp, got {:?}", other),
1480            }
1481        }
1482
1483        assert_eq!(ids.len(), 2);
1484        assert_ne!(ids[0], ids[1]);
1485    }
1486
1487    #[test]
1488    fn backbone_client_disconnect() {
1489        let port = find_free_port();
1490        let (tx, rx) = crate::event::channel();
1491        let next_id = Arc::new(AtomicU64::new(8400));
1492
1493        let config = make_server_config(port, 84, None, None, None, BackboneAbuseConfig::default());
1494
1495        start(config, tx, next_id).unwrap();
1496        thread::sleep(Duration::from_millis(50));
1497
1498        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1499
1500        // Drain InterfaceUp
1501        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1502
1503        // Disconnect
1504        drop(client);
1505
1506        // Should receive InterfaceDown
1507        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1508        assert!(
1509            matches!(event, Event::InterfaceDown(InterfaceId(8400))),
1510            "expected InterfaceDown(8400), got {:?}",
1511            event
1512        );
1513    }
1514
1515    #[test]
1516    fn backbone_epoll_multiplexing() {
1517        let port = find_free_port();
1518        let (tx, rx) = crate::event::channel();
1519        let next_id = Arc::new(AtomicU64::new(8500));
1520
1521        let config = make_server_config(port, 85, None, None, None, BackboneAbuseConfig::default());
1522
1523        start(config, tx, next_id).unwrap();
1524        thread::sleep(Duration::from_millis(50));
1525
1526        let mut client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1527        let mut client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1528
1529        // Drain both InterfaceUp events
1530        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1531        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1532
1533        // Both clients send data simultaneously
1534        let payload1: Vec<u8> = (0..24).collect();
1535        let payload2: Vec<u8> = (100..130).collect();
1536        client1.write_all(&hdlc::frame(&payload1)).unwrap();
1537        client2.write_all(&hdlc::frame(&payload2)).unwrap();
1538
1539        // Should receive both Frame events
1540        let mut received = Vec::new();
1541        for _ in 0..2 {
1542            let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1543            match event {
1544                Event::Frame { data, .. } => received.push(data),
1545                other => panic!("expected Frame, got {:?}", other),
1546            }
1547        }
1548        assert!(received.contains(&payload1));
1549        assert!(received.contains(&payload2));
1550    }
1551
1552    #[test]
1553    fn backbone_bind_port() {
1554        let port = find_free_port();
1555        let (tx, _rx) = crate::event::channel();
1556        let next_id = Arc::new(AtomicU64::new(8600));
1557
1558        let config = make_server_config(port, 86, None, None, None, BackboneAbuseConfig::default());
1559
1560        // Should not error
1561        start(config, tx, next_id).unwrap();
1562    }
1563
1564    #[test]
1565    fn backbone_hdlc_fragmented() {
1566        let port = find_free_port();
1567        let (tx, rx) = crate::event::channel();
1568        let next_id = Arc::new(AtomicU64::new(8700));
1569
1570        let config = make_server_config(port, 87, None, None, None, BackboneAbuseConfig::default());
1571
1572        start(config, tx, next_id).unwrap();
1573        thread::sleep(Duration::from_millis(50));
1574
1575        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1576        client.set_nodelay(true).unwrap();
1577
1578        // Drain InterfaceUp
1579        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1580
1581        // Send HDLC frame in two fragments
1582        let payload: Vec<u8> = (0..32).collect();
1583        let framed = hdlc::frame(&payload);
1584        let mid = framed.len() / 2;
1585
1586        client.write_all(&framed[..mid]).unwrap();
1587        thread::sleep(Duration::from_millis(50));
1588        client.write_all(&framed[mid..]).unwrap();
1589
1590        // Should receive reassembled frame
1591        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1592        match event {
1593            Event::Frame { data, .. } => {
1594                assert_eq!(data, payload);
1595            }
1596            other => panic!("expected Frame, got {:?}", other),
1597        }
1598    }
1599
1600    // -----------------------------------------------------------------------
1601    // Client mode tests
1602    // -----------------------------------------------------------------------
1603
1604    fn make_client_config(port: u16, id: u64) -> BackboneClientConfig {
1605        BackboneClientConfig {
1606            name: format!("test-bb-client-{}", port),
1607            target_host: "127.0.0.1".into(),
1608            target_port: port,
1609            interface_id: InterfaceId(id),
1610            reconnect_wait: Duration::from_millis(100),
1611            max_reconnect_tries: Some(2),
1612            connect_timeout: Duration::from_secs(2),
1613            transport_identity: None,
1614            runtime: Arc::new(Mutex::new(BackboneClientRuntime {
1615                reconnect_wait: Duration::from_millis(100),
1616                max_reconnect_tries: Some(2),
1617                connect_timeout: Duration::from_secs(2),
1618            })),
1619        }
1620    }
1621
1622    #[test]
1623    fn backbone_client_connect() {
1624        let port = find_free_port();
1625        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1626        let (tx, rx) = crate::event::channel();
1627
1628        let config = make_client_config(port, 9000);
1629        let _writer = start_client(config, tx).unwrap();
1630
1631        let _server_stream = listener.accept().unwrap();
1632
1633        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1634        assert!(matches!(event, Event::InterfaceUp(InterfaceId(9000), _, _)));
1635    }
1636
1637    #[test]
1638    fn backbone_client_receive_frame() {
1639        let port = find_free_port();
1640        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1641        let (tx, rx) = crate::event::channel();
1642
1643        let config = make_client_config(port, 9100);
1644        let _writer = start_client(config, tx).unwrap();
1645
1646        let (mut server_stream, _) = listener.accept().unwrap();
1647
1648        // Drain InterfaceUp
1649        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1650
1651        // Send HDLC frame from server side (>= 19 bytes payload)
1652        let payload: Vec<u8> = (0..32).collect();
1653        server_stream.write_all(&hdlc::frame(&payload)).unwrap();
1654
1655        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1656        match event {
1657            Event::Frame {
1658                interface_id,
1659                data,
1660                rssi: _,
1661                snr: _,
1662            } => {
1663                assert_eq!(interface_id, InterfaceId(9100));
1664                assert_eq!(data, payload);
1665            }
1666            other => panic!("expected Frame, got {:?}", other),
1667        }
1668    }
1669
1670    #[test]
1671    fn backbone_client_send_frame() {
1672        let port = find_free_port();
1673        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1674        let (tx, _rx) = crate::event::channel();
1675
1676        let config = make_client_config(port, 9200);
1677        let mut writer = start_client(config, tx).unwrap();
1678
1679        let (mut server_stream, _) = listener.accept().unwrap();
1680        server_stream
1681            .set_read_timeout(Some(Duration::from_secs(2)))
1682            .unwrap();
1683
1684        let payload: Vec<u8> = (0..24).collect();
1685        writer.send_frame(&payload).unwrap();
1686
1687        let mut buf = [0u8; 256];
1688        let n = server_stream.read(&mut buf).unwrap();
1689        let expected = hdlc::frame(&payload);
1690        assert_eq!(&buf[..n], &expected[..]);
1691    }
1692
1693    #[test]
1694    fn backbone_max_connections_rejects_excess() {
1695        let port = find_free_port();
1696        let (tx, rx) = crate::event::channel();
1697        let next_id = Arc::new(AtomicU64::new(8800));
1698
1699        let config = make_server_config(
1700            port,
1701            88,
1702            Some(2),
1703            None,
1704            None,
1705            BackboneAbuseConfig::default(),
1706        );
1707
1708        start(config, tx, next_id).unwrap();
1709        thread::sleep(Duration::from_millis(50));
1710
1711        // Connect two clients (at limit)
1712        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1713        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1714
1715        // Drain both InterfaceUp events
1716        for _ in 0..2 {
1717            let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1718            assert!(matches!(event, Event::InterfaceUp(_, _, _)));
1719        }
1720
1721        // Third connection should be accepted at TCP level but immediately dropped
1722        let client3 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1723        client3
1724            .set_read_timeout(Some(Duration::from_millis(500)))
1725            .unwrap();
1726
1727        // Give server time to reject
1728        thread::sleep(Duration::from_millis(100));
1729
1730        // Should NOT receive a third InterfaceUp
1731        let result = recv_non_peer_event(&rx, Duration::from_millis(500));
1732        assert!(
1733            result.is_err(),
1734            "expected no InterfaceUp for rejected connection, got {:?}",
1735            result
1736        );
1737    }
1738
1739    #[test]
1740    fn backbone_max_connections_allows_after_disconnect() {
1741        let port = find_free_port();
1742        let (tx, rx) = crate::event::channel();
1743        let next_id = Arc::new(AtomicU64::new(8900));
1744
1745        let config = make_server_config(
1746            port,
1747            89,
1748            Some(1),
1749            None,
1750            None,
1751            BackboneAbuseConfig::default(),
1752        );
1753
1754        start(config, tx, next_id).unwrap();
1755        thread::sleep(Duration::from_millis(50));
1756
1757        // Connect first client
1758        let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1759        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1760        assert!(matches!(event, Event::InterfaceUp(_, _, _)));
1761
1762        // Disconnect first client
1763        drop(client1);
1764
1765        // Wait for InterfaceDown
1766        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1767        assert!(matches!(event, Event::InterfaceDown(_)));
1768
1769        // Now a new connection should be accepted
1770        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1771        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1772        assert!(
1773            matches!(event, Event::InterfaceUp(_, _, _)),
1774            "expected InterfaceUp after slot freed, got {:?}",
1775            event
1776        );
1777    }
1778
1779    #[test]
1780    fn backbone_client_reconnect() {
1781        let port = find_free_port();
1782        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1783        listener.set_nonblocking(false).unwrap();
1784        let (tx, rx) = crate::event::channel();
1785
1786        let config = make_client_config(port, 9300);
1787        let _writer = start_client(config, tx).unwrap();
1788
1789        // Accept first connection and immediately close it
1790        let (server_stream, _) = listener.accept().unwrap();
1791
1792        // Drain InterfaceUp
1793        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1794
1795        drop(server_stream);
1796
1797        // Should get InterfaceDown
1798        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1799        assert!(matches!(event, Event::InterfaceDown(InterfaceId(9300))));
1800
1801        // Accept the reconnection
1802        let _server_stream2 = listener.accept().unwrap();
1803
1804        // Should get InterfaceUp again
1805        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1806        assert!(matches!(event, Event::InterfaceUp(InterfaceId(9300), _, _)));
1807    }
1808
1809    #[test]
1810    fn backbone_idle_timeout_disconnects_silent_client() {
1811        let port = find_free_port();
1812        let (tx, rx) = crate::event::channel();
1813        let next_id = Arc::new(AtomicU64::new(9400));
1814
1815        let config = make_server_config(
1816            port,
1817            94,
1818            None,
1819            Some(Duration::from_millis(150)),
1820            None,
1821            BackboneAbuseConfig::default(),
1822        );
1823
1824        start(config, tx, next_id).unwrap();
1825        thread::sleep(Duration::from_millis(50));
1826
1827        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1828
1829        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1830        let client_id = match event {
1831            Event::InterfaceUp(id, _, _) => id,
1832            other => panic!("expected InterfaceUp, got {:?}", other),
1833        };
1834
1835        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1836        assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1837    }
1838
1839    #[test]
1840    fn backbone_idle_timeout_ignores_client_after_data() {
1841        let port = find_free_port();
1842        let (tx, rx) = crate::event::channel();
1843        let next_id = Arc::new(AtomicU64::new(9500));
1844
1845        let config = make_server_config(
1846            port,
1847            95,
1848            None,
1849            Some(Duration::from_millis(200)),
1850            None,
1851            BackboneAbuseConfig::default(),
1852        );
1853
1854        start(config, tx, next_id).unwrap();
1855        thread::sleep(Duration::from_millis(50));
1856
1857        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1858
1859        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1860        let client_id = match event {
1861            Event::InterfaceUp(id, _, _) => id,
1862            other => panic!("expected InterfaceUp, got {:?}", other),
1863        };
1864
1865        client.write_all(&hdlc::frame(&[1u8; 24])).unwrap();
1866
1867        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1868        match event {
1869            Event::Frame {
1870                interface_id,
1871                data,
1872                rssi: _,
1873                snr: _,
1874            } => {
1875                assert_eq!(interface_id, client_id);
1876                assert_eq!(data, vec![1u8; 24]);
1877            }
1878            other => panic!("expected Frame, got {:?}", other),
1879        }
1880
1881        let result = recv_non_peer_event(&rx, Duration::from_millis(500));
1882        assert!(
1883            result.is_err(),
1884            "expected no InterfaceDown after client sent data, got {:?}",
1885            result
1886        );
1887    }
1888
1889    #[test]
1890    fn backbone_runtime_idle_timeout_updates_live() {
1891        let port = find_free_port();
1892        let (tx, rx) = crate::event::channel();
1893        let next_id = Arc::new(AtomicU64::new(9650));
1894
1895        let config = make_server_config(port, 97, None, None, None, BackboneAbuseConfig::default());
1896        let runtime = Arc::clone(&config.runtime);
1897
1898        start(config, tx, next_id).unwrap();
1899        thread::sleep(Duration::from_millis(50));
1900
1901        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1902        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1903        let client_id = match event {
1904            Event::InterfaceUp(id, _, _) => id,
1905            other => panic!("expected InterfaceUp, got {:?}", other),
1906        };
1907
1908        {
1909            let mut runtime = runtime.lock().unwrap();
1910            runtime.idle_timeout = Some(Duration::from_millis(150));
1911        }
1912
1913        let event = recv_non_peer_event(&rx, Duration::from_secs(4)).unwrap();
1914        assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1915    }
1916
1917    #[test]
1918    fn backbone_write_stall_timeout_disconnects_unwritable_client() {
1919        let port = find_free_port();
1920        let (tx, rx) = crate::event::channel();
1921        let next_id = Arc::new(AtomicU64::new(9660));
1922
1923        let config = make_server_config(
1924            port,
1925            98,
1926            None,
1927            None,
1928            Some(Duration::from_millis(50)),
1929            BackboneAbuseConfig::default(),
1930        );
1931
1932        start(config, tx, next_id).unwrap();
1933        thread::sleep(Duration::from_millis(50));
1934
1935        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1936        client
1937            .set_read_timeout(Some(Duration::from_millis(100)))
1938            .unwrap();
1939        let sock = SockRef::from(&client);
1940        sock.set_recv_buffer_size(4096).ok();
1941
1942        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1943        let (client_id, mut writer) = match event {
1944            Event::InterfaceUp(id, Some(writer), _) => (id, writer),
1945            other => panic!("expected InterfaceUp with writer, got {:?}", other),
1946        };
1947
1948        let payload = vec![0x55; 512 * 1024];
1949        let deadline = Instant::now() + Duration::from_secs(3);
1950        let mut stalled = false;
1951        while Instant::now() < deadline {
1952            match writer.send_frame(&payload) {
1953                Ok(()) => {}
1954                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1955                    thread::sleep(Duration::from_millis(10));
1956                }
1957                Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {
1958                    stalled = true;
1959                    break;
1960                }
1961                Err(e) => panic!("unexpected send error: {}", e),
1962            }
1963        }
1964
1965        assert!(stalled, "expected writer to time out on persistent stall");
1966        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1967        assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1968    }
1969
1970    /// Drain events matching a predicate, return the first match.
1971    fn wait_for<F>(rx: &mpsc::Receiver<Event>, timeout: Duration, mut pred: F) -> Option<Event>
1972    where
1973        F: FnMut(&Event) -> bool,
1974    {
1975        let deadline = Instant::now() + timeout;
1976        loop {
1977            let remaining = deadline.saturating_duration_since(Instant::now());
1978            if remaining.is_zero() {
1979                return None;
1980            }
1981            match rx.recv_timeout(remaining) {
1982                Ok(event) if pred(&event) => return Some(event),
1983                Ok(_) => continue,
1984                Err(_) => return None,
1985            }
1986        }
1987    }
1988
1989    #[test]
1990    fn backbone_write_stall_emits_peer_events() {
1991        let port = find_free_port();
1992        let (tx, rx) = crate::event::channel();
1993        let next_id = Arc::new(AtomicU64::new(9700));
1994
1995        let config = make_server_config(
1996            port,
1997            97,
1998            None,
1999            None,
2000            Some(Duration::from_millis(50)), // 50ms stall timeout
2001            BackboneAbuseConfig::default(),
2002        );
2003
2004        start(config, tx, next_id).unwrap();
2005        thread::sleep(Duration::from_millis(50));
2006
2007        // Connect a client that won't read (will cause write stall)
2008        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
2009        client
2010            .set_read_timeout(Some(Duration::from_millis(100)))
2011            .unwrap();
2012        let sock = SockRef::from(&client);
2013        sock.set_recv_buffer_size(4096).ok();
2014
2015        // Wait for InterfaceUp and grab writer
2016        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
2017        let mut writer = match event {
2018            Event::InterfaceUp(_, Some(w), _) => w,
2019            other => panic!("expected InterfaceUp with writer, got {:?}", other),
2020        };
2021
2022        // Flood until stall
2023        let payload = vec![0x55; 512 * 1024];
2024        let deadline = Instant::now() + Duration::from_secs(3);
2025        while Instant::now() < deadline {
2026            match writer.send_frame(&payload) {
2027                Ok(()) | Err(_) => {
2028                    if Instant::now() + Duration::from_millis(10) > deadline {
2029                        break;
2030                    }
2031                    thread::sleep(Duration::from_millis(10));
2032                }
2033            }
2034        }
2035
2036        // Should see BackbonePeerWriteStall event
2037        let stall_event = wait_for(&rx, Duration::from_secs(3), |e| {
2038            matches!(e, Event::BackbonePeerWriteStall { .. })
2039        });
2040        assert!(
2041            stall_event.is_some(),
2042            "expected BackbonePeerWriteStall event"
2043        );
2044    }
2045
2046    #[test]
2047    fn backbone_blacklisted_peer_rejected_on_connect() {
2048        let port = find_free_port();
2049        let (tx, rx) = crate::event::channel();
2050        let next_id = Arc::new(AtomicU64::new(9800));
2051
2052        let config = make_server_config(port, 98, None, None, None, BackboneAbuseConfig::default());
2053        let peer_state = config.peer_state.clone();
2054
2055        start(config, tx.clone(), next_id).unwrap();
2056        thread::sleep(Duration::from_millis(50));
2057
2058        // First connection should succeed
2059        let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
2060        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
2061        assert!(
2062            matches!(event, Event::InterfaceUp(_, _, _)),
2063            "first connection should succeed"
2064        );
2065        drop(client1);
2066
2067        // Drain disconnect events
2068        thread::sleep(Duration::from_millis(100));
2069        while rx.try_recv().is_ok() {}
2070
2071        // Blacklist 127.0.0.1 via the peer monitor
2072        peer_state.lock().unwrap().blacklist(
2073            "127.0.0.1".parse().unwrap(),
2074            Duration::from_secs(60),
2075            "test blacklist".into(),
2076        );
2077
2078        // Second connection from same IP should be rejected (no InterfaceUp)
2079        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
2080        // Give poll loop time to reject
2081        thread::sleep(Duration::from_millis(200));
2082
2083        // Should NOT get an InterfaceUp — connection should have been rejected
2084        let event = rx.try_recv();
2085        match event {
2086            Ok(Event::InterfaceUp(_, _, _)) => {
2087                panic!("blacklisted peer should not get InterfaceUp")
2088            }
2089            _ => {} // Expected: no InterfaceUp
2090        }
2091    }
2092
2093    #[test]
2094    fn backbone_parse_config_reads_abuse_settings() {
2095        let factory = BackboneInterfaceFactory;
2096        let mut params = HashMap::new();
2097        params.insert("listen_ip".into(), "127.0.0.1".into());
2098        params.insert("listen_port".into(), "4242".into());
2099        params.insert("idle_timeout".into(), "15".into());
2100        params.insert("write_stall_timeout".into(), "45".into());
2101        params.insert("max_penalty_duration".into(), "3600".into());
2102
2103        let config = factory
2104            .parse_config("test-backbone", InterfaceId(97), &params)
2105            .unwrap();
2106        let mode = *config.into_any().downcast::<BackboneMode>().unwrap();
2107
2108        match mode {
2109            BackboneMode::Server(config) => {
2110                assert_eq!(config.listen_ip, "127.0.0.1");
2111                assert_eq!(config.listen_port, 4242);
2112                assert_eq!(config.idle_timeout, Some(Duration::from_secs(15)));
2113                assert_eq!(config.write_stall_timeout, Some(Duration::from_secs(45)));
2114                assert_eq!(
2115                    config.abuse.max_penalty_duration,
2116                    Some(Duration::from_secs(3600))
2117                );
2118            }
2119            BackboneMode::Client(_, _) => panic!("expected server config"),
2120        }
2121    }
2122
2123    #[test]
2124    fn backbone_parse_config_reads_client_priority() {
2125        let factory = BackboneInterfaceFactory;
2126        let mut params = HashMap::new();
2127        params.insert("remote".into(), "example.com".into());
2128        params.insert("target_port".into(), "4242".into());
2129        params.insert("priority".into(), "87".into());
2130
2131        let config = factory
2132            .parse_config("test-backbone-client", InterfaceId(96), &params)
2133            .unwrap();
2134        let mode = *config.into_any().downcast::<BackboneMode>().unwrap();
2135
2136        match mode {
2137            BackboneMode::Client(_, priority) => assert_eq!(priority, 87),
2138            BackboneMode::Server(_) => panic!("expected client config"),
2139        }
2140    }
2141
2142    #[test]
2143    fn backbone_parse_config_defaults_client_priority() {
2144        let factory = BackboneInterfaceFactory;
2145        let mut params = HashMap::new();
2146        params.insert("remote".into(), "example.com".into());
2147
2148        let config = factory
2149            .parse_config("test-backbone-client", InterfaceId(95), &params)
2150            .unwrap();
2151        let mode = *config.into_any().downcast::<BackboneMode>().unwrap();
2152
2153        match mode {
2154            BackboneMode::Client(_, priority) => assert_eq!(priority, 60),
2155            BackboneMode::Server(_) => panic!("expected client config"),
2156        }
2157    }
2158
2159    #[test]
2160    fn backbone_parse_config_rejects_invalid_client_priority() {
2161        let factory = BackboneInterfaceFactory;
2162        for value in ["-1", "101", "fast"] {
2163            let mut params = HashMap::new();
2164            params.insert("remote".into(), "example.com".into());
2165            params.insert("priority".into(), value.into());
2166
2167            let err = match factory.parse_config("test-backbone-client", InterfaceId(94), &params) {
2168                Ok(_) => panic!("priority {value} should be rejected"),
2169                Err(err) => err,
2170            };
2171            assert!(
2172                err.contains("priority"),
2173                "unexpected error for {value}: {err}"
2174            );
2175        }
2176    }
2177}