Skip to main content

rns_net/interface/
backbone.rs

1//! Backbone TCP mesh interface using cross-platform polling.
2//!
3//! Server mode: listens on a TCP port, accepts peer connections, spawns
4//! dynamic per-peer interfaces. Uses a single poll thread to multiplex
5//! all client sockets. HDLC framing for packet boundaries.
6//!
7//! Client mode: connects to a remote backbone server, single TCP connection
8//! with HDLC framing. Reconnects on disconnect.
9//!
10//! Matches Python `BackboneInterface.py`.
11
12use std::collections::HashMap;
13use std::hash::{BuildHasher, Hasher};
14use std::io::{self, Read, Write};
15use std::net::{IpAddr, Shutdown, TcpListener, TcpStream, ToSocketAddrs};
16use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
17use std::sync::{Arc, Mutex};
18use std::thread;
19use std::time::{Duration, Instant};
20
21use polling::{Event as PollEvent, Events, Poller};
22use socket2::{SockRef, TcpKeepalive};
23
24use rns_core::constants;
25use rns_core::transport::types::{IngressControlConfig, InterfaceId, InterfaceInfo};
26
27use crate::event::{Event, EventSender};
28use crate::hdlc;
29use crate::interface::{
30    lock_or_recover, InterfaceConfigData, InterfaceFactory, StartContext, StartResult, Writer,
31};
32use crate::BackbonePeerStateEntry;
33
34/// HW_MTU: 1 MB (matches Python BackboneInterface.HW_MTU)
35#[allow(dead_code)]
36const HW_MTU: usize = 1_048_576;
37
38/// Configuration for a backbone interface.
39#[derive(Debug, Clone)]
40pub struct BackboneConfig {
41    pub name: String,
42    pub listen_ip: String,
43    pub listen_port: u16,
44    pub interface_id: InterfaceId,
45    pub mode: u8,
46    pub max_connections: Option<usize>,
47    pub idle_timeout: Option<Duration>,
48    pub write_stall_timeout: Option<Duration>,
49    pub abuse: BackboneAbuseConfig,
50    pub ingress_control: IngressControlConfig,
51    pub runtime: Arc<Mutex<BackboneServerRuntime>>,
52    pub peer_state: Arc<Mutex<BackbonePeerMonitor>>,
53}
54
55/// Configurable behavior-based abuse detection for inbound peers.
56#[derive(Debug, Clone, Default)]
57pub struct BackboneAbuseConfig {
58    pub max_penalty_duration: Option<Duration>,
59}
60
61/// Live runtime state for a backbone server interface.
62#[derive(Debug, Clone)]
63pub struct BackboneServerRuntime {
64    pub max_connections: Option<usize>,
65    pub idle_timeout: Option<Duration>,
66    pub write_stall_timeout: Option<Duration>,
67    pub abuse: BackboneAbuseConfig,
68}
69
70impl BackboneServerRuntime {
71    pub fn from_config(config: &BackboneConfig) -> Self {
72        Self {
73            max_connections: config.max_connections,
74            idle_timeout: config.idle_timeout,
75            write_stall_timeout: config.write_stall_timeout,
76            abuse: config.abuse.clone(),
77        }
78    }
79}
80
81#[derive(Debug, Clone)]
82pub struct BackboneRuntimeConfigHandle {
83    pub interface_name: String,
84    pub runtime: Arc<Mutex<BackboneServerRuntime>>,
85    pub startup: BackboneServerRuntime,
86}
87
88#[derive(Debug, Clone)]
89pub struct BackbonePeerStateHandle {
90    pub interface_id: InterfaceId,
91    pub interface_name: String,
92    pub peer_state: Arc<Mutex<BackbonePeerMonitor>>,
93}
94
95impl Default for BackboneConfig {
96    fn default() -> Self {
97        let mut config = BackboneConfig {
98            name: String::new(),
99            listen_ip: "0.0.0.0".into(),
100            listen_port: 0,
101            interface_id: InterfaceId(0),
102            mode: constants::MODE_FULL,
103            max_connections: None,
104            idle_timeout: None,
105            write_stall_timeout: None,
106            abuse: BackboneAbuseConfig::default(),
107            ingress_control: IngressControlConfig::enabled(),
108            runtime: Arc::new(Mutex::new(BackboneServerRuntime {
109                max_connections: None,
110                idle_timeout: None,
111                write_stall_timeout: None,
112                abuse: BackboneAbuseConfig::default(),
113            })),
114            peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
115        };
116        let startup = BackboneServerRuntime::from_config(&config);
117        config.runtime = Arc::new(Mutex::new(startup));
118        config
119    }
120}
121
122/// Maximum pending buffer size per client (512 KB). Clients exceeding this are
123/// disconnected to prevent unbounded memory growth from slow readers.
124const MAX_PENDING_BYTES: usize = 512 * 1024;
125
126/// Writer that sends HDLC-framed data over a cloned TCP stream (server mode).
127struct BackboneWriter {
128    stream: TcpStream,
129    runtime: Arc<Mutex<BackboneServerRuntime>>,
130    interface_name: String,
131    interface_id: InterfaceId,
132    event_tx: EventSender,
133    pending: Vec<u8>,
134    stall_started: Option<Instant>,
135    disconnect_notified: bool,
136    write_stall_flag: Arc<AtomicBool>,
137}
138
139impl Writer for BackboneWriter {
140    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
141        let write_stall_timeout =
142            lock_or_recover(&self.runtime, "backbone runtime").write_stall_timeout;
143        if !self.pending.is_empty() {
144            self.flush_pending(write_stall_timeout)?;
145            if !self.pending.is_empty() {
146                return Err(io::Error::new(
147                    io::ErrorKind::WouldBlock,
148                    "backbone writer still stalled",
149                ));
150            }
151        }
152
153        let frame = hdlc::frame(data);
154        self.write_buffer(&frame, write_stall_timeout)
155    }
156}
157
158impl BackboneWriter {
159    fn write_buffer(
160        &mut self,
161        data: &[u8],
162        write_stall_timeout: Option<Duration>,
163    ) -> io::Result<()> {
164        let mut written = 0usize;
165        while written < data.len() {
166            match self.stream.write(&data[written..]) {
167                Ok(0) => {
168                    return Err(io::Error::new(
169                        io::ErrorKind::WriteZero,
170                        "backbone writer wrote zero bytes",
171                    ))
172                }
173                Ok(n) => {
174                    written += n;
175                    self.stall_started = None;
176                }
177                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
178                    let now = Instant::now();
179                    let started = self.stall_started.get_or_insert(now);
180                    if let Some(timeout) = write_stall_timeout {
181                        if now.duration_since(*started) >= timeout {
182                            return Err(self.disconnect_for_write_stall(timeout));
183                        }
184                    }
185                    if self.pending.len() + data[written..].len() > MAX_PENDING_BYTES {
186                        return Err(self.disconnect_for_write_stall(
187                            write_stall_timeout.unwrap_or(Duration::from_secs(30)),
188                        ));
189                    }
190                    self.pending.extend_from_slice(&data[written..]);
191                    return Err(io::Error::new(
192                        io::ErrorKind::WouldBlock,
193                        "backbone writer would block",
194                    ));
195                }
196                Err(e) => return Err(e),
197            }
198        }
199        Ok(())
200    }
201
202    fn flush_pending(&mut self, write_stall_timeout: Option<Duration>) -> io::Result<()> {
203        if self.pending.is_empty() {
204            return Ok(());
205        }
206
207        let pending = std::mem::take(&mut self.pending);
208        match self.write_buffer(&pending, write_stall_timeout) {
209            Ok(()) => Ok(()),
210            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(()),
211            Err(e) => Err(e),
212        }
213    }
214
215    fn disconnect_for_write_stall(&mut self, timeout: Duration) -> io::Error {
216        if !self.disconnect_notified {
217            log::warn!(
218                "[{}] backbone client {} disconnected due to write stall timeout ({:?})",
219                self.interface_name,
220                self.interface_id.0,
221                timeout
222            );
223            self.write_stall_flag.store(true, Ordering::Relaxed);
224            let _ = self.stream.shutdown(Shutdown::Both);
225            let _ = self.event_tx.send(Event::InterfaceDown(self.interface_id));
226            self.disconnect_notified = true;
227        }
228        io::Error::new(
229            io::ErrorKind::TimedOut,
230            format!("backbone writer stalled for {:?}", timeout),
231        )
232    }
233}
234
235/// Start a backbone interface. Binds TCP listener, spawns poll thread.
236pub fn start(config: BackboneConfig, tx: EventSender, next_id: Arc<AtomicU64>) -> io::Result<()> {
237    let addr = format!("{}:{}", config.listen_ip, config.listen_port);
238    let listener = TcpListener::bind(&addr)?;
239    listener.set_nonblocking(true)?;
240
241    log::info!(
242        "[{}] backbone server listening on {}",
243        config.name,
244        listener
245            .local_addr()
246            .unwrap_or_else(|_| std::net::SocketAddr::from(([0, 0, 0, 0], config.listen_port)))
247    );
248
249    let name = config.name.clone();
250    let server_interface_id = config.interface_id;
251    let runtime = Arc::clone(&config.runtime);
252    let peer_state = Arc::clone(&config.peer_state);
253    let ingress_control = config.ingress_control;
254    let accepted_peer_mode = config.mode;
255    thread::Builder::new()
256        .name(format!("backbone-poll-{}", config.interface_id.0))
257        .spawn(move || {
258            if let Err(e) = poll_loop(
259                listener,
260                name,
261                server_interface_id,
262                tx,
263                next_id,
264                runtime,
265                peer_state,
266                ingress_control,
267                accepted_peer_mode,
268            ) {
269                log::error!("backbone poll loop error: {}", e);
270            }
271        })?;
272
273    Ok(())
274}
275
276/// Per-client tracking state.
277struct ClientState {
278    id: InterfaceId,
279    peer_ip: IpAddr,
280    peer_port: u16,
281    stream: TcpStream,
282    decoder: hdlc::Decoder,
283    connected_at: Instant,
284    has_received_data: bool,
285    write_stall_flag: Arc<AtomicBool>,
286}
287
288#[derive(Debug, Clone)]
289struct PeerBehaviorState {
290    blacklisted_until: Option<Instant>,
291    blacklist_reason: Option<String>,
292    reject_count: u64,
293    connected_count: usize,
294}
295
296impl PeerBehaviorState {
297    fn new() -> Self {
298        Self {
299            blacklisted_until: None,
300            blacklist_reason: None,
301            reject_count: 0,
302            connected_count: 0,
303        }
304    }
305}
306
307#[derive(Debug, Clone, Default)]
308pub struct BackbonePeerMonitor {
309    peers: HashMap<IpAddr, PeerBehaviorState>,
310}
311
312impl BackbonePeerMonitor {
313    pub fn new() -> Self {
314        Self {
315            peers: HashMap::new(),
316        }
317    }
318
319    fn upsert_snapshot(&mut self, peers: &HashMap<IpAddr, PeerBehaviorState>) {
320        let mut merged = self.peers.clone();
321
322        for (peer_ip, state) in peers {
323            let entry = merged
324                .entry(*peer_ip)
325                .or_insert_with(PeerBehaviorState::new);
326            entry.connected_count = state.connected_count;
327            entry.reject_count = state.reject_count;
328            if state.blacklisted_until.is_some() {
329                entry.blacklisted_until = state.blacklisted_until;
330                entry.blacklist_reason = state.blacklist_reason.clone();
331            }
332        }
333
334        merged.retain(|peer_ip, state| {
335            peers.contains_key(peer_ip)
336                || state.blacklisted_until.is_some()
337                || state.reject_count > 0
338        });
339        self.peers = merged;
340    }
341
342    fn sync_into(&self, peers: &mut HashMap<IpAddr, PeerBehaviorState>) {
343        for (peer_ip, state) in &self.peers {
344            let entry = peers.entry(*peer_ip).or_insert_with(PeerBehaviorState::new);
345            entry.blacklisted_until = state.blacklisted_until;
346            entry.blacklist_reason = state.blacklist_reason.clone();
347            entry.reject_count = state.reject_count;
348        }
349
350        peers.retain(|peer_ip, state| {
351            if state.connected_count > 0 {
352                return true;
353            }
354            self.peers.contains_key(peer_ip)
355        });
356    }
357
358    pub fn list(&self, interface_name: &str) -> Vec<BackbonePeerStateEntry> {
359        let now = Instant::now();
360        let mut entries: Vec<BackbonePeerStateEntry> = self
361            .peers
362            .iter()
363            .map(|(peer_ip, state)| BackbonePeerStateEntry {
364                interface_name: interface_name.to_string(),
365                peer_ip: *peer_ip,
366                connected_count: state.connected_count,
367                blacklisted_remaining_secs: state
368                    .blacklisted_until
369                    .and_then(|until| (until > now).then(|| (until - now).as_secs_f64())),
370                blacklist_reason: state.blacklist_reason.clone(),
371                reject_count: state.reject_count,
372            })
373            .collect();
374        entries.sort_by(|a, b| a.peer_ip.cmp(&b.peer_ip));
375        entries
376    }
377
378    pub fn clear(&mut self, peer_ip: IpAddr) -> bool {
379        self.peers.remove(&peer_ip).is_some()
380    }
381
382    pub fn blacklist(&mut self, peer_ip: IpAddr, duration: Duration, reason: String) -> bool {
383        let state = self
384            .peers
385            .entry(peer_ip)
386            .or_insert_with(PeerBehaviorState::new);
387        state.blacklisted_until = Some(Instant::now() + duration);
388        state.blacklist_reason = Some(reason);
389        true
390    }
391
392    #[cfg(test)]
393    pub fn seed_entry(&mut self, entry: BackbonePeerStateEntry) {
394        let mut state = PeerBehaviorState::new();
395        state.connected_count = entry.connected_count;
396        state.reject_count = entry.reject_count;
397        state.blacklist_reason = entry.blacklist_reason;
398        if let Some(remaining) = entry.blacklisted_remaining_secs {
399            state.blacklisted_until = Some(Instant::now() + Duration::from_secs_f64(remaining));
400        }
401        self.peers.insert(entry.peer_ip, state);
402    }
403}
404
405#[derive(Clone, Copy)]
406enum DisconnectReason {
407    RemoteClosed,
408    IdleTimeout,
409    WriteStall,
410}
411
412/// Main poll event loop.
413fn poll_loop(
414    listener: TcpListener,
415    name: String,
416    server_interface_id: InterfaceId,
417    tx: EventSender,
418    next_id: Arc<AtomicU64>,
419    runtime: Arc<Mutex<BackboneServerRuntime>>,
420    peer_state: Arc<Mutex<BackbonePeerMonitor>>,
421    ingress_control: IngressControlConfig,
422    accepted_peer_mode: u8,
423) -> io::Result<()> {
424    let poller = Poller::new()?;
425
426    const LISTENER_KEY: usize = 0;
427
428    // SAFETY: listener outlives its registration in the poller.
429    unsafe { poller.add(&listener, PollEvent::readable(LISTENER_KEY))? };
430
431    let mut clients: HashMap<usize, ClientState> = HashMap::new();
432    let mut peers: HashMap<IpAddr, PeerBehaviorState> = HashMap::new();
433    let mut events = Events::new();
434    let mut next_key: usize = 1;
435
436    loop {
437        let runtime_snapshot = runtime.lock().unwrap().clone();
438        let max_connections = runtime_snapshot.max_connections;
439        let idle_timeout = runtime_snapshot.idle_timeout;
440        cleanup_peer_state(&mut peers);
441        {
442            let mut monitor = peer_state.lock().unwrap();
443            monitor.sync_into(&mut peers);
444            monitor.upsert_snapshot(&peers);
445        }
446
447        events.clear();
448        poller.wait(&mut events, Some(Duration::from_secs(1)))?;
449
450        for ev in events.iter() {
451            if ev.key == LISTENER_KEY {
452                // Accept new connections
453                loop {
454                    match listener.accept() {
455                        Ok((stream, peer_addr)) => {
456                            let peer_ip = peer_addr.ip();
457                            let peer_port = peer_addr.port();
458
459                            if is_ip_blacklisted(&mut peers, peer_ip) {
460                                if let Some(state) = peers.get_mut(&peer_ip) {
461                                    state.reject_count = state.reject_count.saturating_add(1);
462                                }
463                                peer_state.lock().unwrap().upsert_snapshot(&peers);
464                                log::debug!("[{}] rejecting blacklisted peer {}", name, peer_addr);
465                                drop(stream);
466                                continue;
467                            }
468
469                            if let Some(max) = max_connections {
470                                if clients.len() >= max {
471                                    log::warn!(
472                                        "[{}] max connections ({}) reached, rejecting {}",
473                                        name,
474                                        max,
475                                        peer_addr
476                                    );
477                                    drop(stream);
478                                    continue;
479                                }
480                            }
481
482                            stream.set_nonblocking(true).ok();
483                            stream.set_nodelay(true).ok();
484                            set_tcp_keepalive(&stream).ok();
485
486                            // Prevent SIGPIPE on macOS when writing to broken pipes
487                            #[cfg(target_os = "macos")]
488                            {
489                                let sock = SockRef::from(&stream);
490                                sock.set_nosigpipe(true).ok();
491                            }
492
493                            let key = next_key;
494                            next_key += 1;
495                            let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
496
497                            log::info!(
498                                "[{}] backbone client connected: {} → id {}",
499                                name,
500                                peer_addr,
501                                client_id.0
502                            );
503
504                            // Register client with poller
505                            // SAFETY: stream is stored in ClientState and outlives registration.
506                            if let Err(e) = unsafe { poller.add(&stream, PollEvent::readable(key)) }
507                            {
508                                log::warn!("[{}] failed to add client to poller: {}", name, e);
509                                continue; // stream drops, closing socket
510                            }
511
512                            // Create writer via try_clone (cross-platform dup)
513                            let writer_stream = match stream.try_clone() {
514                                Ok(s) => s,
515                                Err(e) => {
516                                    log::warn!("[{}] failed to clone client stream: {}", name, e);
517                                    let _ = poller.delete(&stream);
518                                    continue; // stream drops
519                                }
520                            };
521                            let write_stall_flag = Arc::new(AtomicBool::new(false));
522                            let writer: Box<dyn Writer> = Box::new(BackboneWriter {
523                                stream: writer_stream,
524                                runtime: Arc::clone(&runtime),
525                                interface_name: name.clone(),
526                                interface_id: client_id,
527                                event_tx: tx.clone(),
528                                pending: Vec::new(),
529                                stall_started: None,
530                                disconnect_notified: false,
531                                write_stall_flag: Arc::clone(&write_stall_flag),
532                            });
533
534                            clients.insert(
535                                key,
536                                ClientState {
537                                    id: client_id,
538                                    peer_ip,
539                                    peer_port,
540                                    stream,
541                                    decoder: hdlc::Decoder::new(),
542                                    connected_at: Instant::now(),
543                                    has_received_data: false,
544                                    write_stall_flag,
545                                },
546                            );
547                            peers
548                                .entry(peer_ip)
549                                .or_insert_with(PeerBehaviorState::new)
550                                .connected_count += 1;
551                            peer_state.lock().unwrap().upsert_snapshot(&peers);
552                            let _ = tx.send(Event::BackbonePeerConnected {
553                                server_interface_id,
554                                peer_interface_id: client_id,
555                                peer_ip,
556                                peer_port,
557                            });
558
559                            let info = InterfaceInfo {
560                                id: client_id,
561                                name: format!("BackboneInterface/{}", client_id.0),
562                                mode: accepted_peer_mode,
563                                out_capable: true,
564                                in_capable: true,
565                                bitrate: Some(1_000_000_000), // 1 Gbps guess
566                                airtime_profile: None,
567                                announce_rate_target: None,
568                                announce_rate_grace: 0,
569                                announce_rate_penalty: 0.0,
570                                announce_cap: constants::ANNOUNCE_CAP,
571                                is_local_client: false,
572                                wants_tunnel: false,
573                                tunnel_id: None,
574                                mtu: 65535,
575                                ia_freq: 0.0,
576                                ip_freq: 0.0,
577                                op_freq: 0.0,
578                                op_samples: 0,
579                                started: 0.0,
580                                ingress_control,
581                            };
582
583                            if tx
584                                .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
585                                .is_err()
586                            {
587                                // Driver shut down
588                                cleanup(&poller, &clients, &listener);
589                                return Ok(());
590                            }
591                        }
592                        Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
593                        Err(e) => {
594                            log::warn!("[{}] accept error: {}", name, e);
595                            break;
596                        }
597                    }
598                }
599                // Re-arm listener (oneshot semantics)
600                poller.modify(&listener, PollEvent::readable(LISTENER_KEY))?;
601            } else if clients.contains_key(&ev.key) {
602                let key = ev.key;
603                let mut should_remove = false;
604                let mut client_id = InterfaceId(0);
605
606                let mut buf = [0u8; 4096];
607                let read_result = {
608                    let client = clients.get_mut(&key).unwrap();
609                    client.stream.read(&mut buf)
610                };
611
612                match read_result {
613                    Ok(0) | Err(_) => {
614                        if let Some(c) = clients.get(&key) {
615                            client_id = c.id;
616                        }
617                        should_remove = true;
618                    }
619                    Ok(n) => {
620                        let client = clients.get_mut(&key).unwrap();
621                        client_id = client.id;
622                        client.has_received_data = true;
623                        for frame in client.decoder.feed(&buf[..n]) {
624                            if tx
625                                .send(Event::Frame {
626                                    interface_id: client_id,
627                                    data: frame,
628                                    rssi: None,
629                                    snr: None,
630                                })
631                                .is_err()
632                            {
633                                cleanup(&poller, &clients, &listener);
634                                return Ok(());
635                            }
636                        }
637                    }
638                }
639
640                if should_remove {
641                    let reason = if clients
642                        .get(&key)
643                        .is_some_and(|c| c.write_stall_flag.load(Ordering::Relaxed))
644                    {
645                        DisconnectReason::WriteStall
646                    } else {
647                        DisconnectReason::RemoteClosed
648                    };
649                    disconnect_client(
650                        &poller,
651                        &mut clients,
652                        &mut peers,
653                        &name,
654                        server_interface_id,
655                        &tx,
656                        &peer_state,
657                        key,
658                        client_id,
659                        reason,
660                    );
661                } else if let Some(client) = clients.get(&key) {
662                    // Re-arm client (oneshot semantics)
663                    poller.modify(&client.stream, PollEvent::readable(key))?;
664                }
665            }
666        }
667
668        if let Some(timeout) = idle_timeout {
669            let now = Instant::now();
670            let timed_out: Vec<(usize, InterfaceId)> = clients
671                .iter()
672                .filter_map(|(&key, client)| {
673                    if client.has_received_data || now.duration_since(client.connected_at) < timeout
674                    {
675                        None
676                    } else {
677                        Some((key, client.id))
678                    }
679                })
680                .collect();
681
682            for (key, client_id) in timed_out {
683                disconnect_client(
684                    &poller,
685                    &mut clients,
686                    &mut peers,
687                    &name,
688                    server_interface_id,
689                    &tx,
690                    &peer_state,
691                    key,
692                    client_id,
693                    DisconnectReason::IdleTimeout,
694                );
695            }
696        }
697    }
698}
699
700fn cleanup_peer_state(peers: &mut HashMap<IpAddr, PeerBehaviorState>) {
701    let now = Instant::now();
702    peers.retain(|_, state| {
703        if matches!(state.blacklisted_until, Some(until) if now >= until) {
704            state.blacklisted_until = None;
705            state.blacklist_reason = None;
706        }
707        state.blacklisted_until.is_some() || state.connected_count > 0 || state.reject_count > 0
708    });
709}
710
711fn is_ip_blacklisted(peers: &mut HashMap<IpAddr, PeerBehaviorState>, peer_ip: IpAddr) -> bool {
712    let now = Instant::now();
713    if let Some(state) = peers.get_mut(&peer_ip) {
714        if let Some(until) = state.blacklisted_until {
715            if now < until {
716                return true;
717            }
718            state.blacklisted_until = None;
719        }
720    }
721    false
722}
723
724fn disconnect_client(
725    poller: &Poller,
726    clients: &mut HashMap<usize, ClientState>,
727    peers: &mut HashMap<IpAddr, PeerBehaviorState>,
728    name: &str,
729    server_interface_id: InterfaceId,
730    tx: &EventSender,
731    peer_state: &Arc<Mutex<BackbonePeerMonitor>>,
732    key: usize,
733    client_id: InterfaceId,
734    reason: DisconnectReason,
735) {
736    let Some(client) = clients.remove(&key) else {
737        return;
738    };
739
740    match reason {
741        DisconnectReason::RemoteClosed => {
742            log::info!("[{}] backbone client {} disconnected", name, client_id.0);
743        }
744        DisconnectReason::IdleTimeout => {
745            log::info!(
746                "[{}] backbone client {} disconnected due to idle timeout",
747                name,
748                client_id.0
749            );
750        }
751        DisconnectReason::WriteStall => {
752            // Already logged by BackboneWriter::disconnect_for_write_stall
753        }
754    }
755
756    let _ = poller.delete(&client.stream);
757    // client.stream closes on drop
758    let connected_for = client.connected_at.elapsed();
759    let _ = tx.send(Event::BackbonePeerDisconnected {
760        server_interface_id,
761        peer_interface_id: client.id,
762        peer_ip: client.peer_ip,
763        peer_port: client.peer_port,
764        connected_for,
765        had_received_data: client.has_received_data,
766    });
767    match reason {
768        DisconnectReason::IdleTimeout => {
769            let _ = tx.send(Event::BackbonePeerIdleTimeout {
770                server_interface_id,
771                peer_interface_id: client.id,
772                peer_ip: client.peer_ip,
773                peer_port: client.peer_port,
774                connected_for,
775            });
776        }
777        DisconnectReason::WriteStall => {
778            let _ = tx.send(Event::BackbonePeerWriteStall {
779                server_interface_id,
780                peer_interface_id: client.id,
781                peer_ip: client.peer_ip,
782                peer_port: client.peer_port,
783                connected_for,
784            });
785        }
786        DisconnectReason::RemoteClosed => {}
787    }
788
789    if let Some(state) = peers.get_mut(&client.peer_ip) {
790        state.connected_count = state.connected_count.saturating_sub(1);
791    }
792    peer_state.lock().unwrap().upsert_snapshot(peers);
793    // Writer already sent InterfaceDown for write stalls; avoid duplicate.
794    if !matches!(reason, DisconnectReason::WriteStall) {
795        let _ = tx.send(Event::InterfaceDown(client_id));
796    }
797}
798
799fn set_tcp_keepalive(stream: &TcpStream) -> io::Result<()> {
800    let sock = SockRef::from(stream);
801    let mut keepalive = TcpKeepalive::new()
802        .with_time(Duration::from_secs(5))
803        .with_interval(Duration::from_secs(2));
804    #[cfg(any(target_os = "linux", target_os = "macos"))]
805    {
806        keepalive = keepalive.with_retries(12);
807    }
808    sock.set_tcp_keepalive(&keepalive)
809}
810
811fn cleanup(poller: &Poller, clients: &HashMap<usize, ClientState>, listener: &TcpListener) {
812    for (_, client) in clients {
813        let _ = poller.delete(&client.stream);
814    }
815    let _ = poller.delete(listener);
816}
817
818// ---------------------------------------------------------------------------
819// Client mode
820// ---------------------------------------------------------------------------
821
822/// Configuration for a backbone client interface.
823#[derive(Debug, Clone)]
824pub struct BackboneClientConfig {
825    pub name: String,
826    pub target_host: String,
827    pub target_port: u16,
828    pub interface_id: InterfaceId,
829    pub reconnect_wait: Duration,
830    pub max_reconnect_tries: Option<u32>,
831    pub connect_timeout: Duration,
832    pub transport_identity: Option<String>,
833    pub runtime: Arc<Mutex<BackboneClientRuntime>>,
834}
835
836#[derive(Debug, Clone)]
837pub struct BackboneClientRuntime {
838    pub reconnect_wait: Duration,
839    pub max_reconnect_tries: Option<u32>,
840    pub connect_timeout: Duration,
841}
842
843impl BackboneClientRuntime {
844    pub fn from_config(config: &BackboneClientConfig) -> Self {
845        Self {
846            reconnect_wait: config.reconnect_wait,
847            max_reconnect_tries: config.max_reconnect_tries,
848            connect_timeout: config.connect_timeout,
849        }
850    }
851}
852
853#[derive(Debug, Clone)]
854pub struct BackboneClientRuntimeConfigHandle {
855    pub interface_name: String,
856    pub runtime: Arc<Mutex<BackboneClientRuntime>>,
857    pub startup: BackboneClientRuntime,
858}
859
860impl Default for BackboneClientConfig {
861    fn default() -> Self {
862        let mut config = BackboneClientConfig {
863            name: String::new(),
864            target_host: "127.0.0.1".into(),
865            target_port: 4242,
866            interface_id: InterfaceId(0),
867            reconnect_wait: Duration::from_secs(5),
868            max_reconnect_tries: None,
869            connect_timeout: Duration::from_secs(5),
870            transport_identity: None,
871            runtime: Arc::new(Mutex::new(BackboneClientRuntime {
872                reconnect_wait: Duration::from_secs(5),
873                max_reconnect_tries: None,
874                connect_timeout: Duration::from_secs(5),
875            })),
876        };
877        let startup = BackboneClientRuntime::from_config(&config);
878        config.runtime = Arc::new(Mutex::new(startup));
879        config
880    }
881}
882
883/// Writer that sends HDLC-framed data over a TCP stream (client mode).
884struct BackboneClientWriter {
885    stream: TcpStream,
886}
887
888impl Writer for BackboneClientWriter {
889    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
890        self.stream.write_all(&hdlc::frame(data))
891    }
892}
893
894/// Try to connect to the target host:port with timeout.
895fn try_connect_client(config: &BackboneClientConfig) -> io::Result<TcpStream> {
896    let runtime = config.runtime.lock().unwrap().clone();
897    let addr_str = format!("{}:{}", config.target_host, config.target_port);
898    let addr = addr_str
899        .to_socket_addrs()?
900        .next()
901        .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "no addresses resolved"))?;
902
903    let stream = TcpStream::connect_timeout(&addr, runtime.connect_timeout)?;
904    stream.set_nodelay(true)?;
905    set_tcp_keepalive(&stream).ok();
906
907    // Prevent SIGPIPE on macOS when writing to broken pipes
908    #[cfg(target_os = "macos")]
909    {
910        let sock = SockRef::from(&stream);
911        sock.set_nosigpipe(true).ok();
912    }
913
914    Ok(stream)
915}
916
917/// Connect and start the reader thread. Returns the writer for the driver.
918pub fn start_client(config: BackboneClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
919    let stream = try_connect_client(&config)?;
920    let reader_stream = stream.try_clone()?;
921    let writer_stream = stream.try_clone()?;
922
923    let id = config.interface_id;
924    log::info!(
925        "[{}] backbone client connected to {}:{}",
926        config.name,
927        config.target_host,
928        config.target_port
929    );
930
931    // Initial connect: writer is None because it's returned directly to the caller
932    let _ = tx.send(Event::InterfaceUp(id, None, None));
933
934    thread::Builder::new()
935        .name(format!("backbone-client-{}", id.0))
936        .spawn(move || {
937            client_reader_loop(reader_stream, config, tx);
938        })?;
939
940    Ok(Box::new(BackboneClientWriter {
941        stream: writer_stream,
942    }))
943}
944
945/// Reader thread: reads from socket, HDLC-decodes, sends frames to driver.
946/// On disconnect, attempts reconnection.
947fn client_reader_loop(mut stream: TcpStream, config: BackboneClientConfig, tx: EventSender) {
948    let id = config.interface_id;
949    let mut decoder = hdlc::Decoder::new();
950    let mut buf = [0u8; 4096];
951
952    loop {
953        match stream.read(&mut buf) {
954            Ok(0) => {
955                log::warn!("[{}] connection closed", config.name);
956                let _ = tx.send(Event::InterfaceDown(id));
957                match client_reconnect(&config, &tx) {
958                    Some(new_stream) => {
959                        stream = new_stream;
960                        decoder = hdlc::Decoder::new();
961                        continue;
962                    }
963                    None => {
964                        log::error!("[{}] reconnection failed, giving up", config.name);
965                        return;
966                    }
967                }
968            }
969            Ok(n) => {
970                for frame in decoder.feed(&buf[..n]) {
971                    if tx
972                        .send(Event::Frame {
973                            interface_id: id,
974                            data: frame,
975                            rssi: None,
976                            snr: None,
977                        })
978                        .is_err()
979                    {
980                        return;
981                    }
982                }
983            }
984            Err(e) => {
985                log::warn!("[{}] read error: {}", config.name, e);
986                let _ = tx.send(Event::InterfaceDown(id));
987                match client_reconnect(&config, &tx) {
988                    Some(new_stream) => {
989                        stream = new_stream;
990                        decoder = hdlc::Decoder::new();
991                        continue;
992                    }
993                    None => {
994                        log::error!("[{}] reconnection failed, giving up", config.name);
995                        return;
996                    }
997                }
998            }
999        }
1000    }
1001}
1002
1003/// Maximum backoff multiplier: `base_delay * 2^MAX_BACKOFF_SHIFT`.
1004/// With a 5 s base this caps at 5 × 2^6 = 320 s ≈ 5 min.
1005const MAX_BACKOFF_SHIFT: u32 = 6;
1006
1007/// Attempt to reconnect with exponential backoff and jitter.
1008/// Returns the new reader stream on success.
1009/// Sends the new writer to the driver via InterfaceUp event.
1010fn client_reconnect(config: &BackboneClientConfig, tx: &EventSender) -> Option<TcpStream> {
1011    let mut attempts = 0u32;
1012    loop {
1013        let runtime = config.runtime.lock().unwrap().clone();
1014
1015        let shift = attempts.min(MAX_BACKOFF_SHIFT);
1016        let backoff = runtime.reconnect_wait * 2u32.pow(shift);
1017        // Add ±25 % jitter to avoid thundering-herd reconnects.
1018        let jitter_range = backoff / 4;
1019        let jitter = if jitter_range.as_nanos() > 0 {
1020            let offset = Duration::from_nanos(
1021                (std::hash::RandomState::new().build_hasher().finish()
1022                    % jitter_range.as_nanos() as u64)
1023                    * 2,
1024            );
1025            if offset > jitter_range {
1026                backoff + (offset - jitter_range)
1027            } else {
1028                backoff - (jitter_range - offset)
1029            }
1030        } else {
1031            backoff
1032        };
1033        thread::sleep(jitter);
1034
1035        attempts += 1;
1036
1037        if let Some(max) = runtime.max_reconnect_tries {
1038            if attempts > max {
1039                let _ = tx.send(Event::InterfaceDown(config.interface_id));
1040                return None;
1041            }
1042        }
1043
1044        log::info!(
1045            "[{}] reconnect attempt {} (backoff {:.1}s) ...",
1046            config.name,
1047            attempts,
1048            jitter.as_secs_f64(),
1049        );
1050
1051        match try_connect_client(config) {
1052            Ok(new_stream) => {
1053                let writer_stream = match new_stream.try_clone() {
1054                    Ok(s) => s,
1055                    Err(e) => {
1056                        log::warn!("[{}] failed to clone stream: {}", config.name, e);
1057                        continue;
1058                    }
1059                };
1060                log::info!(
1061                    "[{}] reconnected after {} attempt(s)",
1062                    config.name,
1063                    attempts
1064                );
1065                let new_writer: Box<dyn Writer> = Box::new(BackboneClientWriter {
1066                    stream: writer_stream,
1067                });
1068                let _ = tx.send(Event::InterfaceUp(
1069                    config.interface_id,
1070                    Some(new_writer),
1071                    None,
1072                ));
1073                return Some(new_stream);
1074            }
1075            Err(e) => {
1076                log::warn!("[{}] reconnect failed: {}", config.name, e);
1077            }
1078        }
1079    }
1080}
1081
1082// ---------------------------------------------------------------------------
1083// Factory
1084// ---------------------------------------------------------------------------
1085
1086/// Internal enum used by [`BackboneInterfaceFactory`] to carry either a
1087/// server or client config through the opaque `InterfaceConfigData` channel.
1088#[derive(Clone)]
1089pub(crate) enum BackboneMode {
1090    Server(BackboneConfig),
1091    Client(BackboneClientConfig, u8),
1092}
1093
1094/// Factory for `BackboneInterface`.
1095///
1096/// If the config params contain `"remote"` or `"target_host"` the interface
1097/// is started in client mode; otherwise it is started as a TCP listener
1098/// (server mode).
1099pub struct BackboneInterfaceFactory;
1100
1101fn parse_positive_duration_secs(params: &HashMap<String, String>, key: &str) -> Option<Duration> {
1102    params
1103        .get(key)
1104        .and_then(|v| v.parse::<f64>().ok())
1105        .filter(|v| *v > 0.0)
1106        .map(Duration::from_secs_f64)
1107}
1108
1109impl InterfaceFactory for BackboneInterfaceFactory {
1110    fn type_name(&self) -> &str {
1111        "BackboneInterface"
1112    }
1113
1114    fn parse_config(
1115        &self,
1116        name: &str,
1117        id: InterfaceId,
1118        params: &HashMap<String, String>,
1119    ) -> Result<Box<dyn InterfaceConfigData>, String> {
1120        if let Some(target_host) = params.get("remote").or_else(|| params.get("target_host")) {
1121            // Client mode
1122            let target_host = target_host.clone();
1123            let target_port = params
1124                .get("target_port")
1125                .or_else(|| params.get("port"))
1126                .and_then(|v| v.parse().ok())
1127                .unwrap_or(4242);
1128            let transport_identity = params.get("transport_identity").cloned();
1129            let priority = match params.get("priority") {
1130                Some(value) => value.parse::<u8>().map_err(|_| {
1131                    format!(
1132                        "invalid Backbone peer priority '{}' (expected 0..100)",
1133                        value
1134                    )
1135                })?,
1136                None => crate::driver::BACKBONE_PEER_POOL_CONFIGURED_DEFAULT_PRIORITY,
1137            };
1138            if priority > 100 {
1139                return Err(format!(
1140                    "invalid Backbone peer priority '{}' (expected 0..100)",
1141                    priority
1142                ));
1143            }
1144            Ok(Box::new(BackboneMode::Client(
1145                BackboneClientConfig {
1146                    name: name.to_string(),
1147                    target_host,
1148                    target_port,
1149                    interface_id: id,
1150                    transport_identity,
1151                    ..BackboneClientConfig::default()
1152                },
1153                priority,
1154            )))
1155        } else {
1156            // Server mode
1157            let listen_ip = params
1158                .get("listen_ip")
1159                .or_else(|| params.get("device"))
1160                .cloned()
1161                .unwrap_or_else(|| "0.0.0.0".into());
1162            let listen_port = params
1163                .get("listen_port")
1164                .or_else(|| params.get("port"))
1165                .and_then(|v| v.parse().ok())
1166                .unwrap_or(4242);
1167            let max_connections = params.get("max_connections").and_then(|v| v.parse().ok());
1168            let idle_timeout = parse_positive_duration_secs(params, "idle_timeout");
1169            let write_stall_timeout = parse_positive_duration_secs(params, "write_stall_timeout");
1170            let abuse = BackboneAbuseConfig {
1171                max_penalty_duration: parse_positive_duration_secs(params, "max_penalty_duration"),
1172            };
1173            let mut config = BackboneConfig {
1174                name: name.to_string(),
1175                listen_ip,
1176                listen_port,
1177                interface_id: id,
1178                mode: constants::MODE_FULL,
1179                max_connections,
1180                idle_timeout,
1181                write_stall_timeout,
1182                abuse,
1183                ingress_control: IngressControlConfig::enabled(),
1184                runtime: Arc::new(Mutex::new(BackboneServerRuntime {
1185                    max_connections: None,
1186                    idle_timeout: None,
1187                    write_stall_timeout: None,
1188                    abuse: BackboneAbuseConfig::default(),
1189                })),
1190                peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
1191            };
1192            let startup = BackboneServerRuntime::from_config(&config);
1193            config.runtime = Arc::new(Mutex::new(startup));
1194            Ok(Box::new(BackboneMode::Server(config)))
1195        }
1196    }
1197
1198    fn start(
1199        &self,
1200        config: Box<dyn InterfaceConfigData>,
1201        ctx: StartContext,
1202    ) -> io::Result<StartResult> {
1203        let mode = *config.into_any().downcast::<BackboneMode>().map_err(|_| {
1204            io::Error::new(
1205                io::ErrorKind::InvalidData,
1206                "wrong config type for BackboneInterface",
1207            )
1208        })?;
1209
1210        match mode {
1211            BackboneMode::Client(cfg, _) => {
1212                let id = cfg.interface_id;
1213                let name = cfg.name.clone();
1214                let info = InterfaceInfo {
1215                    id,
1216                    name,
1217                    mode: ctx.mode,
1218                    out_capable: true,
1219                    in_capable: true,
1220                    bitrate: Some(1_000_000_000),
1221                    airtime_profile: None,
1222                    announce_rate_target: None,
1223                    announce_rate_grace: 0,
1224                    announce_rate_penalty: 0.0,
1225                    announce_cap: constants::ANNOUNCE_CAP,
1226                    is_local_client: false,
1227                    wants_tunnel: false,
1228                    tunnel_id: None,
1229                    mtu: 65535,
1230                    ingress_control: ctx.ingress_control,
1231                    ia_freq: 0.0,
1232                    ip_freq: 0.0,
1233                    op_freq: 0.0,
1234                    op_samples: 0,
1235                    started: crate::time::now(),
1236                };
1237                let writer = start_client(cfg, ctx.tx)?;
1238                Ok(StartResult::Simple {
1239                    id,
1240                    info,
1241                    writer,
1242                    interface_type_name: "BackboneInterface".to_string(),
1243                })
1244            }
1245            BackboneMode::Server(mut cfg) => {
1246                cfg.ingress_control = ctx.ingress_control;
1247                cfg.mode = ctx.mode;
1248                start(cfg, ctx.tx, ctx.next_dynamic_id)?;
1249                Ok(StartResult::Listener { control: None })
1250            }
1251        }
1252    }
1253}
1254
1255pub(crate) fn runtime_handle_from_mode(mode: &BackboneMode) -> Option<BackboneRuntimeConfigHandle> {
1256    match mode {
1257        BackboneMode::Server(config) => Some(BackboneRuntimeConfigHandle {
1258            interface_name: config.name.clone(),
1259            runtime: Arc::clone(&config.runtime),
1260            startup: BackboneServerRuntime::from_config(config),
1261        }),
1262        BackboneMode::Client(_, _) => None,
1263    }
1264}
1265
1266pub(crate) fn peer_state_handle_from_mode(mode: &BackboneMode) -> Option<BackbonePeerStateHandle> {
1267    match mode {
1268        BackboneMode::Server(config) => Some(BackbonePeerStateHandle {
1269            interface_id: config.interface_id,
1270            interface_name: config.name.clone(),
1271            peer_state: Arc::clone(&config.peer_state),
1272        }),
1273        BackboneMode::Client(_, _) => None,
1274    }
1275}
1276
1277pub(crate) fn client_runtime_handle_from_mode(
1278    mode: &BackboneMode,
1279) -> Option<BackboneClientRuntimeConfigHandle> {
1280    match mode {
1281        BackboneMode::Client(config, _) => Some(BackboneClientRuntimeConfigHandle {
1282            interface_name: config.name.clone(),
1283            runtime: Arc::clone(&config.runtime),
1284            startup: BackboneClientRuntime::from_config(config),
1285        }),
1286        BackboneMode::Server(_) => None,
1287    }
1288}
1289
1290pub(crate) fn client_config_from_mode(mode: &BackboneMode) -> Option<BackboneClientConfig> {
1291    match mode {
1292        BackboneMode::Client(config, _) => Some(config.clone()),
1293        BackboneMode::Server(_) => None,
1294    }
1295}
1296
1297pub(crate) fn client_priority_from_mode(mode: &BackboneMode) -> Option<u8> {
1298    match mode {
1299        BackboneMode::Client(_, priority) => Some(*priority),
1300        BackboneMode::Server(_) => None,
1301    }
1302}
1303
1304#[cfg(test)]
1305mod tests {
1306    use super::*;
1307    use std::sync::mpsc;
1308    use std::time::Duration;
1309
1310    fn find_free_port() -> u16 {
1311        TcpListener::bind("127.0.0.1:0")
1312            .unwrap()
1313            .local_addr()
1314            .unwrap()
1315            .port()
1316    }
1317
1318    fn recv_non_peer_event(
1319        rx: &mpsc::Receiver<Event>,
1320        timeout: Duration,
1321    ) -> Result<Event, mpsc::RecvTimeoutError> {
1322        let deadline = Instant::now() + timeout;
1323        loop {
1324            let remaining = deadline.saturating_duration_since(Instant::now());
1325            if remaining.is_zero() {
1326                return Err(mpsc::RecvTimeoutError::Timeout);
1327            }
1328            let event = rx.recv_timeout(remaining)?;
1329            match event {
1330                Event::BackbonePeerConnected { .. }
1331                | Event::BackbonePeerDisconnected { .. }
1332                | Event::BackbonePeerIdleTimeout { .. }
1333                | Event::BackbonePeerWriteStall { .. }
1334                | Event::BackbonePeerPenalty { .. } => continue,
1335                other => return Ok(other),
1336            }
1337        }
1338    }
1339
1340    fn make_server_config(
1341        port: u16,
1342        interface_id: u64,
1343        max_connections: Option<usize>,
1344        idle_timeout: Option<Duration>,
1345        write_stall_timeout: Option<Duration>,
1346        abuse: BackboneAbuseConfig,
1347    ) -> BackboneConfig {
1348        let mut config = BackboneConfig {
1349            name: "test-backbone".into(),
1350            listen_ip: "127.0.0.1".into(),
1351            listen_port: port,
1352            interface_id: InterfaceId(interface_id),
1353            mode: constants::MODE_FULL,
1354            max_connections,
1355            idle_timeout,
1356            write_stall_timeout,
1357            abuse,
1358            ingress_control: IngressControlConfig::enabled(),
1359            runtime: Arc::new(Mutex::new(BackboneServerRuntime {
1360                max_connections: None,
1361                idle_timeout: None,
1362                write_stall_timeout: None,
1363                abuse: BackboneAbuseConfig::default(),
1364            })),
1365            peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
1366        };
1367        let startup = BackboneServerRuntime::from_config(&config);
1368        config.runtime = Arc::new(Mutex::new(startup));
1369        config
1370    }
1371
1372    #[test]
1373    fn backbone_accept_connection() {
1374        let port = find_free_port();
1375        let (tx, rx) = crate::event::channel();
1376        let next_id = Arc::new(AtomicU64::new(8000));
1377
1378        let config = make_server_config(port, 80, None, None, None, BackboneAbuseConfig::default());
1379
1380        start(config, tx, next_id).unwrap();
1381        thread::sleep(Duration::from_millis(50));
1382
1383        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1384
1385        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1386        match event {
1387            Event::InterfaceUp(id, writer, info) => {
1388                assert_eq!(id, InterfaceId(8000));
1389                assert!(writer.is_some());
1390                assert!(info.is_some());
1391                let info = info.unwrap();
1392                assert!(info.out_capable);
1393                assert!(info.in_capable);
1394            }
1395            other => panic!("expected InterfaceUp, got {:?}", other),
1396        }
1397    }
1398
1399    #[test]
1400    fn backbone_accepted_connection_inherits_server_mode() {
1401        let port = find_free_port();
1402        let (tx, rx) = crate::event::channel();
1403        let next_id = Arc::new(AtomicU64::new(8050));
1404
1405        let mut config =
1406            make_server_config(port, 85, None, None, None, BackboneAbuseConfig::default());
1407        config.mode = constants::MODE_GATEWAY;
1408
1409        start(config, tx, next_id).unwrap();
1410        thread::sleep(Duration::from_millis(50));
1411
1412        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1413
1414        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1415        match event {
1416            Event::InterfaceUp(_, _, Some(info)) => {
1417                assert_eq!(info.mode, constants::MODE_GATEWAY);
1418            }
1419            other => panic!("expected InterfaceUp with info, got {:?}", other),
1420        }
1421    }
1422
1423    #[test]
1424    fn backbone_receive_frame() {
1425        let port = find_free_port();
1426        let (tx, rx) = crate::event::channel();
1427        let next_id = Arc::new(AtomicU64::new(8100));
1428
1429        let config = make_server_config(port, 81, None, None, None, BackboneAbuseConfig::default());
1430
1431        start(config, tx, next_id).unwrap();
1432        thread::sleep(Duration::from_millis(50));
1433
1434        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1435
1436        // Drain InterfaceUp
1437        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1438
1439        // Send HDLC frame (>= 19 bytes)
1440        let payload: Vec<u8> = (0..32).collect();
1441        client.write_all(&hdlc::frame(&payload)).unwrap();
1442
1443        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1444        match event {
1445            Event::Frame {
1446                interface_id,
1447                data,
1448                rssi: _,
1449                snr: _,
1450            } => {
1451                assert_eq!(interface_id, InterfaceId(8100));
1452                assert_eq!(data, payload);
1453            }
1454            other => panic!("expected Frame, got {:?}", other),
1455        }
1456    }
1457
1458    #[test]
1459    fn backbone_send_to_client() {
1460        let port = find_free_port();
1461        let (tx, rx) = crate::event::channel();
1462        let next_id = Arc::new(AtomicU64::new(8200));
1463
1464        let config = make_server_config(port, 82, None, None, None, BackboneAbuseConfig::default());
1465
1466        start(config, tx, next_id).unwrap();
1467        thread::sleep(Duration::from_millis(50));
1468
1469        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1470        client
1471            .set_read_timeout(Some(Duration::from_secs(2)))
1472            .unwrap();
1473
1474        // Get writer from InterfaceUp
1475        let event = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1476        let mut writer = match event {
1477            Event::InterfaceUp(_, Some(w), _) => w,
1478            other => panic!("expected InterfaceUp with writer, got {:?}", other),
1479        };
1480
1481        // Send frame via writer
1482        let payload: Vec<u8> = (0..24).collect();
1483        writer.send_frame(&payload).unwrap();
1484
1485        // Read from client
1486        let mut buf = [0u8; 256];
1487        let n = client.read(&mut buf).unwrap();
1488        let expected = hdlc::frame(&payload);
1489        assert_eq!(&buf[..n], &expected[..]);
1490    }
1491
1492    #[test]
1493    fn backbone_multiple_clients() {
1494        let port = find_free_port();
1495        let (tx, rx) = crate::event::channel();
1496        let next_id = Arc::new(AtomicU64::new(8300));
1497
1498        let config = make_server_config(port, 83, None, None, None, BackboneAbuseConfig::default());
1499
1500        start(config, tx, next_id).unwrap();
1501        thread::sleep(Duration::from_millis(50));
1502
1503        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1504        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1505
1506        let mut ids = Vec::new();
1507        for _ in 0..2 {
1508            let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1509            match event {
1510                Event::InterfaceUp(id, _, _) => ids.push(id),
1511                other => panic!("expected InterfaceUp, got {:?}", other),
1512            }
1513        }
1514
1515        assert_eq!(ids.len(), 2);
1516        assert_ne!(ids[0], ids[1]);
1517    }
1518
1519    #[test]
1520    fn backbone_client_disconnect() {
1521        let port = find_free_port();
1522        let (tx, rx) = crate::event::channel();
1523        let next_id = Arc::new(AtomicU64::new(8400));
1524
1525        let config = make_server_config(port, 84, None, None, None, BackboneAbuseConfig::default());
1526
1527        start(config, tx, next_id).unwrap();
1528        thread::sleep(Duration::from_millis(50));
1529
1530        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1531
1532        // Drain InterfaceUp
1533        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1534
1535        // Disconnect
1536        drop(client);
1537
1538        // Should receive InterfaceDown
1539        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1540        assert!(
1541            matches!(event, Event::InterfaceDown(InterfaceId(8400))),
1542            "expected InterfaceDown(8400), got {:?}",
1543            event
1544        );
1545    }
1546
1547    #[test]
1548    fn backbone_epoll_multiplexing() {
1549        let port = find_free_port();
1550        let (tx, rx) = crate::event::channel();
1551        let next_id = Arc::new(AtomicU64::new(8500));
1552
1553        let config = make_server_config(port, 85, None, None, None, BackboneAbuseConfig::default());
1554
1555        start(config, tx, next_id).unwrap();
1556        thread::sleep(Duration::from_millis(50));
1557
1558        let mut client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1559        let mut client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1560
1561        // Drain both InterfaceUp events
1562        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1563        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1564
1565        // Both clients send data simultaneously
1566        let payload1: Vec<u8> = (0..24).collect();
1567        let payload2: Vec<u8> = (100..130).collect();
1568        client1.write_all(&hdlc::frame(&payload1)).unwrap();
1569        client2.write_all(&hdlc::frame(&payload2)).unwrap();
1570
1571        // Should receive both Frame events
1572        let mut received = Vec::new();
1573        for _ in 0..2 {
1574            let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1575            match event {
1576                Event::Frame { data, .. } => received.push(data),
1577                other => panic!("expected Frame, got {:?}", other),
1578            }
1579        }
1580        assert!(received.contains(&payload1));
1581        assert!(received.contains(&payload2));
1582    }
1583
1584    #[test]
1585    fn backbone_bind_port() {
1586        let port = find_free_port();
1587        let (tx, _rx) = crate::event::channel();
1588        let next_id = Arc::new(AtomicU64::new(8600));
1589
1590        let config = make_server_config(port, 86, None, None, None, BackboneAbuseConfig::default());
1591
1592        // Should not error
1593        start(config, tx, next_id).unwrap();
1594    }
1595
1596    #[test]
1597    fn backbone_hdlc_fragmented() {
1598        let port = find_free_port();
1599        let (tx, rx) = crate::event::channel();
1600        let next_id = Arc::new(AtomicU64::new(8700));
1601
1602        let config = make_server_config(port, 87, None, None, None, BackboneAbuseConfig::default());
1603
1604        start(config, tx, next_id).unwrap();
1605        thread::sleep(Duration::from_millis(50));
1606
1607        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1608        client.set_nodelay(true).unwrap();
1609
1610        // Drain InterfaceUp
1611        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1612
1613        // Send HDLC frame in two fragments
1614        let payload: Vec<u8> = (0..32).collect();
1615        let framed = hdlc::frame(&payload);
1616        let mid = framed.len() / 2;
1617
1618        client.write_all(&framed[..mid]).unwrap();
1619        thread::sleep(Duration::from_millis(50));
1620        client.write_all(&framed[mid..]).unwrap();
1621
1622        // Should receive reassembled frame
1623        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1624        match event {
1625            Event::Frame { data, .. } => {
1626                assert_eq!(data, payload);
1627            }
1628            other => panic!("expected Frame, got {:?}", other),
1629        }
1630    }
1631
1632    // -----------------------------------------------------------------------
1633    // Client mode tests
1634    // -----------------------------------------------------------------------
1635
1636    fn make_client_config(port: u16, id: u64) -> BackboneClientConfig {
1637        BackboneClientConfig {
1638            name: format!("test-bb-client-{}", port),
1639            target_host: "127.0.0.1".into(),
1640            target_port: port,
1641            interface_id: InterfaceId(id),
1642            reconnect_wait: Duration::from_millis(100),
1643            max_reconnect_tries: Some(2),
1644            connect_timeout: Duration::from_secs(2),
1645            transport_identity: None,
1646            runtime: Arc::new(Mutex::new(BackboneClientRuntime {
1647                reconnect_wait: Duration::from_millis(100),
1648                max_reconnect_tries: Some(2),
1649                connect_timeout: Duration::from_secs(2),
1650            })),
1651        }
1652    }
1653
1654    #[test]
1655    fn backbone_client_connect() {
1656        let port = find_free_port();
1657        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1658        let (tx, rx) = crate::event::channel();
1659
1660        let config = make_client_config(port, 9000);
1661        let _writer = start_client(config, tx).unwrap();
1662
1663        let _server_stream = listener.accept().unwrap();
1664
1665        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1666        assert!(matches!(event, Event::InterfaceUp(InterfaceId(9000), _, _)));
1667    }
1668
1669    #[test]
1670    fn backbone_client_receive_frame() {
1671        let port = find_free_port();
1672        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1673        let (tx, rx) = crate::event::channel();
1674
1675        let config = make_client_config(port, 9100);
1676        let _writer = start_client(config, tx).unwrap();
1677
1678        let (mut server_stream, _) = listener.accept().unwrap();
1679
1680        // Drain InterfaceUp
1681        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1682
1683        // Send HDLC frame from server side (>= 19 bytes payload)
1684        let payload: Vec<u8> = (0..32).collect();
1685        server_stream.write_all(&hdlc::frame(&payload)).unwrap();
1686
1687        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1688        match event {
1689            Event::Frame {
1690                interface_id,
1691                data,
1692                rssi: _,
1693                snr: _,
1694            } => {
1695                assert_eq!(interface_id, InterfaceId(9100));
1696                assert_eq!(data, payload);
1697            }
1698            other => panic!("expected Frame, got {:?}", other),
1699        }
1700    }
1701
1702    #[test]
1703    fn backbone_client_send_frame() {
1704        let port = find_free_port();
1705        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1706        let (tx, _rx) = crate::event::channel();
1707
1708        let config = make_client_config(port, 9200);
1709        let mut writer = start_client(config, tx).unwrap();
1710
1711        let (mut server_stream, _) = listener.accept().unwrap();
1712        server_stream
1713            .set_read_timeout(Some(Duration::from_secs(2)))
1714            .unwrap();
1715
1716        let payload: Vec<u8> = (0..24).collect();
1717        writer.send_frame(&payload).unwrap();
1718
1719        let mut buf = [0u8; 256];
1720        let n = server_stream.read(&mut buf).unwrap();
1721        let expected = hdlc::frame(&payload);
1722        assert_eq!(&buf[..n], &expected[..]);
1723    }
1724
1725    #[test]
1726    fn backbone_max_connections_rejects_excess() {
1727        let port = find_free_port();
1728        let (tx, rx) = crate::event::channel();
1729        let next_id = Arc::new(AtomicU64::new(8800));
1730
1731        let config = make_server_config(
1732            port,
1733            88,
1734            Some(2),
1735            None,
1736            None,
1737            BackboneAbuseConfig::default(),
1738        );
1739
1740        start(config, tx, next_id).unwrap();
1741        thread::sleep(Duration::from_millis(50));
1742
1743        // Connect two clients (at limit)
1744        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1745        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1746
1747        // Drain both InterfaceUp events
1748        for _ in 0..2 {
1749            let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1750            assert!(matches!(event, Event::InterfaceUp(_, _, _)));
1751        }
1752
1753        // Third connection should be accepted at TCP level but immediately dropped
1754        let client3 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1755        client3
1756            .set_read_timeout(Some(Duration::from_millis(500)))
1757            .unwrap();
1758
1759        // Give server time to reject
1760        thread::sleep(Duration::from_millis(100));
1761
1762        // Should NOT receive a third InterfaceUp
1763        let result = recv_non_peer_event(&rx, Duration::from_millis(500));
1764        assert!(
1765            result.is_err(),
1766            "expected no InterfaceUp for rejected connection, got {:?}",
1767            result
1768        );
1769    }
1770
1771    #[test]
1772    fn backbone_max_connections_allows_after_disconnect() {
1773        let port = find_free_port();
1774        let (tx, rx) = crate::event::channel();
1775        let next_id = Arc::new(AtomicU64::new(8900));
1776
1777        let config = make_server_config(
1778            port,
1779            89,
1780            Some(1),
1781            None,
1782            None,
1783            BackboneAbuseConfig::default(),
1784        );
1785
1786        start(config, tx, next_id).unwrap();
1787        thread::sleep(Duration::from_millis(50));
1788
1789        // Connect first client
1790        let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1791        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1792        assert!(matches!(event, Event::InterfaceUp(_, _, _)));
1793
1794        // Disconnect first client
1795        drop(client1);
1796
1797        // Wait for InterfaceDown
1798        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1799        assert!(matches!(event, Event::InterfaceDown(_)));
1800
1801        // Now a new connection should be accepted
1802        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1803        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1804        assert!(
1805            matches!(event, Event::InterfaceUp(_, _, _)),
1806            "expected InterfaceUp after slot freed, got {:?}",
1807            event
1808        );
1809    }
1810
1811    #[test]
1812    fn backbone_client_reconnect() {
1813        let port = find_free_port();
1814        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1815        listener.set_nonblocking(false).unwrap();
1816        let (tx, rx) = crate::event::channel();
1817
1818        let config = make_client_config(port, 9300);
1819        let _writer = start_client(config, tx).unwrap();
1820
1821        // Accept first connection and immediately close it
1822        let (server_stream, _) = listener.accept().unwrap();
1823
1824        // Drain InterfaceUp
1825        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1826
1827        drop(server_stream);
1828
1829        // Should get InterfaceDown
1830        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1831        assert!(matches!(event, Event::InterfaceDown(InterfaceId(9300))));
1832
1833        // Accept the reconnection
1834        let _server_stream2 = listener.accept().unwrap();
1835
1836        // Should get InterfaceUp again
1837        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1838        assert!(matches!(event, Event::InterfaceUp(InterfaceId(9300), _, _)));
1839    }
1840
1841    #[test]
1842    fn backbone_idle_timeout_disconnects_silent_client() {
1843        let port = find_free_port();
1844        let (tx, rx) = crate::event::channel();
1845        let next_id = Arc::new(AtomicU64::new(9400));
1846
1847        let config = make_server_config(
1848            port,
1849            94,
1850            None,
1851            Some(Duration::from_millis(150)),
1852            None,
1853            BackboneAbuseConfig::default(),
1854        );
1855
1856        start(config, tx, next_id).unwrap();
1857        thread::sleep(Duration::from_millis(50));
1858
1859        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1860
1861        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1862        let client_id = match event {
1863            Event::InterfaceUp(id, _, _) => id,
1864            other => panic!("expected InterfaceUp, got {:?}", other),
1865        };
1866
1867        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1868        assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1869    }
1870
1871    #[test]
1872    fn backbone_idle_timeout_ignores_client_after_data() {
1873        let port = find_free_port();
1874        let (tx, rx) = crate::event::channel();
1875        let next_id = Arc::new(AtomicU64::new(9500));
1876
1877        let config = make_server_config(
1878            port,
1879            95,
1880            None,
1881            Some(Duration::from_millis(200)),
1882            None,
1883            BackboneAbuseConfig::default(),
1884        );
1885
1886        start(config, tx, next_id).unwrap();
1887        thread::sleep(Duration::from_millis(50));
1888
1889        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1890
1891        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1892        let client_id = match event {
1893            Event::InterfaceUp(id, _, _) => id,
1894            other => panic!("expected InterfaceUp, got {:?}", other),
1895        };
1896
1897        client.write_all(&hdlc::frame(&[1u8; 24])).unwrap();
1898
1899        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1900        match event {
1901            Event::Frame {
1902                interface_id,
1903                data,
1904                rssi: _,
1905                snr: _,
1906            } => {
1907                assert_eq!(interface_id, client_id);
1908                assert_eq!(data, vec![1u8; 24]);
1909            }
1910            other => panic!("expected Frame, got {:?}", other),
1911        }
1912
1913        let result = recv_non_peer_event(&rx, Duration::from_millis(500));
1914        assert!(
1915            result.is_err(),
1916            "expected no InterfaceDown after client sent data, got {:?}",
1917            result
1918        );
1919    }
1920
1921    #[test]
1922    fn backbone_runtime_idle_timeout_updates_live() {
1923        let port = find_free_port();
1924        let (tx, rx) = crate::event::channel();
1925        let next_id = Arc::new(AtomicU64::new(9650));
1926
1927        let config = make_server_config(port, 97, None, None, None, BackboneAbuseConfig::default());
1928        let runtime = Arc::clone(&config.runtime);
1929
1930        start(config, tx, next_id).unwrap();
1931        thread::sleep(Duration::from_millis(50));
1932
1933        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1934        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1935        let client_id = match event {
1936            Event::InterfaceUp(id, _, _) => id,
1937            other => panic!("expected InterfaceUp, got {:?}", other),
1938        };
1939
1940        {
1941            let mut runtime = runtime.lock().unwrap();
1942            runtime.idle_timeout = Some(Duration::from_millis(150));
1943        }
1944
1945        let event = recv_non_peer_event(&rx, Duration::from_secs(4)).unwrap();
1946        assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1947    }
1948
1949    #[test]
1950    fn backbone_write_stall_timeout_disconnects_unwritable_client() {
1951        let port = find_free_port();
1952        let (tx, rx) = crate::event::channel();
1953        let next_id = Arc::new(AtomicU64::new(9660));
1954
1955        let config = make_server_config(
1956            port,
1957            98,
1958            None,
1959            None,
1960            Some(Duration::from_millis(50)),
1961            BackboneAbuseConfig::default(),
1962        );
1963
1964        start(config, tx, next_id).unwrap();
1965        thread::sleep(Duration::from_millis(50));
1966
1967        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1968        client
1969            .set_read_timeout(Some(Duration::from_millis(100)))
1970            .unwrap();
1971        let sock = SockRef::from(&client);
1972        sock.set_recv_buffer_size(4096).ok();
1973
1974        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1975        let (client_id, mut writer) = match event {
1976            Event::InterfaceUp(id, Some(writer), _) => (id, writer),
1977            other => panic!("expected InterfaceUp with writer, got {:?}", other),
1978        };
1979
1980        let payload = vec![0x55; 512 * 1024];
1981        let deadline = Instant::now() + Duration::from_secs(3);
1982        let mut stalled = false;
1983        while Instant::now() < deadline {
1984            match writer.send_frame(&payload) {
1985                Ok(()) => {}
1986                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1987                    thread::sleep(Duration::from_millis(10));
1988                }
1989                Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {
1990                    stalled = true;
1991                    break;
1992                }
1993                Err(e) => panic!("unexpected send error: {}", e),
1994            }
1995        }
1996
1997        assert!(stalled, "expected writer to time out on persistent stall");
1998        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1999        assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
2000    }
2001
2002    /// Drain events matching a predicate, return the first match.
2003    fn wait_for<F>(rx: &mpsc::Receiver<Event>, timeout: Duration, mut pred: F) -> Option<Event>
2004    where
2005        F: FnMut(&Event) -> bool,
2006    {
2007        let deadline = Instant::now() + timeout;
2008        loop {
2009            let remaining = deadline.saturating_duration_since(Instant::now());
2010            if remaining.is_zero() {
2011                return None;
2012            }
2013            match rx.recv_timeout(remaining) {
2014                Ok(event) if pred(&event) => return Some(event),
2015                Ok(_) => continue,
2016                Err(_) => return None,
2017            }
2018        }
2019    }
2020
2021    #[test]
2022    fn backbone_write_stall_emits_peer_events() {
2023        let port = find_free_port();
2024        let (tx, rx) = crate::event::channel();
2025        let next_id = Arc::new(AtomicU64::new(9700));
2026
2027        let config = make_server_config(
2028            port,
2029            97,
2030            None,
2031            None,
2032            Some(Duration::from_millis(50)), // 50ms stall timeout
2033            BackboneAbuseConfig::default(),
2034        );
2035
2036        start(config, tx, next_id).unwrap();
2037        thread::sleep(Duration::from_millis(50));
2038
2039        // Connect a client that won't read (will cause write stall)
2040        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
2041        client
2042            .set_read_timeout(Some(Duration::from_millis(100)))
2043            .unwrap();
2044        let sock = SockRef::from(&client);
2045        sock.set_recv_buffer_size(4096).ok();
2046
2047        // Wait for InterfaceUp and grab writer
2048        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
2049        let mut writer = match event {
2050            Event::InterfaceUp(_, Some(w), _) => w,
2051            other => panic!("expected InterfaceUp with writer, got {:?}", other),
2052        };
2053
2054        // Flood until stall
2055        let payload = vec![0x55; 512 * 1024];
2056        let deadline = Instant::now() + Duration::from_secs(3);
2057        while Instant::now() < deadline {
2058            match writer.send_frame(&payload) {
2059                Ok(()) | Err(_) => {
2060                    if Instant::now() + Duration::from_millis(10) > deadline {
2061                        break;
2062                    }
2063                    thread::sleep(Duration::from_millis(10));
2064                }
2065            }
2066        }
2067
2068        // Should see BackbonePeerWriteStall event
2069        let stall_event = wait_for(&rx, Duration::from_secs(3), |e| {
2070            matches!(e, Event::BackbonePeerWriteStall { .. })
2071        });
2072        assert!(
2073            stall_event.is_some(),
2074            "expected BackbonePeerWriteStall event"
2075        );
2076    }
2077
2078    #[test]
2079    fn backbone_blacklisted_peer_rejected_on_connect() {
2080        let port = find_free_port();
2081        let (tx, rx) = crate::event::channel();
2082        let next_id = Arc::new(AtomicU64::new(9800));
2083
2084        let config = make_server_config(port, 98, None, None, None, BackboneAbuseConfig::default());
2085        let peer_state = config.peer_state.clone();
2086
2087        start(config, tx.clone(), next_id).unwrap();
2088        thread::sleep(Duration::from_millis(50));
2089
2090        // First connection should succeed
2091        let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
2092        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
2093        assert!(
2094            matches!(event, Event::InterfaceUp(_, _, _)),
2095            "first connection should succeed"
2096        );
2097        drop(client1);
2098
2099        // Drain disconnect events
2100        thread::sleep(Duration::from_millis(100));
2101        while rx.try_recv().is_ok() {}
2102
2103        // Blacklist 127.0.0.1 via the peer monitor
2104        peer_state.lock().unwrap().blacklist(
2105            "127.0.0.1".parse().unwrap(),
2106            Duration::from_secs(60),
2107            "test blacklist".into(),
2108        );
2109
2110        // Second connection from same IP should be rejected (no InterfaceUp)
2111        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
2112        // Give poll loop time to reject
2113        thread::sleep(Duration::from_millis(200));
2114
2115        // Should NOT get an InterfaceUp — connection should have been rejected
2116        let event = rx.try_recv();
2117        match event {
2118            Ok(Event::InterfaceUp(_, _, _)) => {
2119                panic!("blacklisted peer should not get InterfaceUp")
2120            }
2121            _ => {} // Expected: no InterfaceUp
2122        }
2123    }
2124
2125    #[test]
2126    fn backbone_parse_config_reads_abuse_settings() {
2127        let factory = BackboneInterfaceFactory;
2128        let mut params = HashMap::new();
2129        params.insert("listen_ip".into(), "127.0.0.1".into());
2130        params.insert("listen_port".into(), "4242".into());
2131        params.insert("idle_timeout".into(), "15".into());
2132        params.insert("write_stall_timeout".into(), "45".into());
2133        params.insert("max_penalty_duration".into(), "3600".into());
2134
2135        let config = factory
2136            .parse_config("test-backbone", InterfaceId(97), &params)
2137            .unwrap();
2138        let mode = *config.into_any().downcast::<BackboneMode>().unwrap();
2139
2140        match mode {
2141            BackboneMode::Server(config) => {
2142                assert_eq!(config.listen_ip, "127.0.0.1");
2143                assert_eq!(config.listen_port, 4242);
2144                assert_eq!(config.idle_timeout, Some(Duration::from_secs(15)));
2145                assert_eq!(config.write_stall_timeout, Some(Duration::from_secs(45)));
2146                assert_eq!(
2147                    config.abuse.max_penalty_duration,
2148                    Some(Duration::from_secs(3600))
2149                );
2150            }
2151            BackboneMode::Client(_, _) => panic!("expected server config"),
2152        }
2153    }
2154
2155    #[test]
2156    fn backbone_parse_config_reads_client_priority() {
2157        let factory = BackboneInterfaceFactory;
2158        let mut params = HashMap::new();
2159        params.insert("remote".into(), "example.com".into());
2160        params.insert("target_port".into(), "4242".into());
2161        params.insert("priority".into(), "87".into());
2162
2163        let config = factory
2164            .parse_config("test-backbone-client", InterfaceId(96), &params)
2165            .unwrap();
2166        let mode = *config.into_any().downcast::<BackboneMode>().unwrap();
2167
2168        match mode {
2169            BackboneMode::Client(_, priority) => assert_eq!(priority, 87),
2170            BackboneMode::Server(_) => panic!("expected client config"),
2171        }
2172    }
2173
2174    #[test]
2175    fn backbone_parse_config_defaults_client_priority() {
2176        let factory = BackboneInterfaceFactory;
2177        let mut params = HashMap::new();
2178        params.insert("remote".into(), "example.com".into());
2179
2180        let config = factory
2181            .parse_config("test-backbone-client", InterfaceId(95), &params)
2182            .unwrap();
2183        let mode = *config.into_any().downcast::<BackboneMode>().unwrap();
2184
2185        match mode {
2186            BackboneMode::Client(_, priority) => assert_eq!(priority, 60),
2187            BackboneMode::Server(_) => panic!("expected client config"),
2188        }
2189    }
2190
2191    #[test]
2192    fn backbone_parse_config_rejects_invalid_client_priority() {
2193        let factory = BackboneInterfaceFactory;
2194        for value in ["-1", "101", "fast"] {
2195            let mut params = HashMap::new();
2196            params.insert("remote".into(), "example.com".into());
2197            params.insert("priority".into(), value.into());
2198
2199            let err = match factory.parse_config("test-backbone-client", InterfaceId(94), &params) {
2200                Ok(_) => panic!("priority {value} should be rejected"),
2201                Err(err) => err,
2202            };
2203            assert!(
2204                err.contains("priority"),
2205                "unexpected error for {value}: {err}"
2206            );
2207        }
2208    }
2209}