Skip to main content

rns_net/interface/
backbone.rs

1//! Backbone TCP mesh interface using cross-platform polling.
2//!
3//! Server mode: listens on a TCP port, accepts peer connections, spawns
4//! dynamic per-peer interfaces. Uses a single poll thread to multiplex
5//! all client sockets. HDLC framing for packet boundaries.
6//!
7//! Client mode: connects to a remote backbone server, single TCP connection
8//! with HDLC framing. Reconnects on disconnect.
9//!
10//! Matches Python `BackboneInterface.py`.
11
12use std::collections::HashMap;
13use std::hash::{BuildHasher, Hasher};
14use std::io::{self, Read, Write};
15use std::net::{IpAddr, Shutdown, TcpListener, TcpStream, ToSocketAddrs};
16use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
17use std::sync::{Arc, Mutex};
18use std::thread;
19use std::time::{Duration, Instant};
20
21use polling::{Event as PollEvent, Events, Poller};
22use socket2::{SockRef, TcpKeepalive};
23
24use rns_core::constants;
25use rns_core::transport::types::{InterfaceId, InterfaceInfo};
26
27use crate::event::{Event, EventSender};
28use crate::hdlc;
29use crate::interface::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult, Writer};
30use crate::BackbonePeerStateEntry;
31
32/// HW_MTU: 1 MB (matches Python BackboneInterface.HW_MTU)
33#[allow(dead_code)]
34const HW_MTU: usize = 1_048_576;
35
36/// Configuration for a backbone interface.
37#[derive(Debug, Clone)]
38pub struct BackboneConfig {
39    pub name: String,
40    pub listen_ip: String,
41    pub listen_port: u16,
42    pub interface_id: InterfaceId,
43    pub max_connections: Option<usize>,
44    pub idle_timeout: Option<Duration>,
45    pub write_stall_timeout: Option<Duration>,
46    pub abuse: BackboneAbuseConfig,
47    pub runtime: Arc<Mutex<BackboneServerRuntime>>,
48    pub peer_state: Arc<Mutex<BackbonePeerMonitor>>,
49}
50
51/// Configurable behavior-based abuse detection for inbound peers.
52#[derive(Debug, Clone, Default)]
53pub struct BackboneAbuseConfig {
54    pub max_penalty_duration: Option<Duration>,
55}
56
57/// Live runtime state for a backbone server interface.
58#[derive(Debug, Clone)]
59pub struct BackboneServerRuntime {
60    pub max_connections: Option<usize>,
61    pub idle_timeout: Option<Duration>,
62    pub write_stall_timeout: Option<Duration>,
63    pub abuse: BackboneAbuseConfig,
64}
65
66impl BackboneServerRuntime {
67    pub fn from_config(config: &BackboneConfig) -> Self {
68        Self {
69            max_connections: config.max_connections,
70            idle_timeout: config.idle_timeout,
71            write_stall_timeout: config.write_stall_timeout,
72            abuse: config.abuse.clone(),
73        }
74    }
75}
76
77#[derive(Debug, Clone)]
78pub struct BackboneRuntimeConfigHandle {
79    pub interface_name: String,
80    pub runtime: Arc<Mutex<BackboneServerRuntime>>,
81    pub startup: BackboneServerRuntime,
82}
83
84#[derive(Debug, Clone)]
85pub struct BackbonePeerStateHandle {
86    pub interface_id: InterfaceId,
87    pub interface_name: String,
88    pub peer_state: Arc<Mutex<BackbonePeerMonitor>>,
89}
90
91impl Default for BackboneConfig {
92    fn default() -> Self {
93        let mut config = BackboneConfig {
94            name: String::new(),
95            listen_ip: "0.0.0.0".into(),
96            listen_port: 0,
97            interface_id: InterfaceId(0),
98            max_connections: None,
99            idle_timeout: None,
100            write_stall_timeout: None,
101            abuse: BackboneAbuseConfig::default(),
102            runtime: Arc::new(Mutex::new(BackboneServerRuntime {
103                max_connections: None,
104                idle_timeout: None,
105                write_stall_timeout: None,
106                abuse: BackboneAbuseConfig::default(),
107            })),
108            peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
109        };
110        let startup = BackboneServerRuntime::from_config(&config);
111        config.runtime = Arc::new(Mutex::new(startup));
112        config
113    }
114}
115
116/// Maximum pending buffer size per client (512 KB). Clients exceeding this are
117/// disconnected to prevent unbounded memory growth from slow readers.
118const MAX_PENDING_BYTES: usize = 512 * 1024;
119
120/// Writer that sends HDLC-framed data over a cloned TCP stream (server mode).
121struct BackboneWriter {
122    stream: TcpStream,
123    runtime: Arc<Mutex<BackboneServerRuntime>>,
124    interface_name: String,
125    interface_id: InterfaceId,
126    event_tx: EventSender,
127    pending: Vec<u8>,
128    stall_started: Option<Instant>,
129    disconnect_notified: bool,
130    write_stall_flag: Arc<AtomicBool>,
131}
132
133impl Writer for BackboneWriter {
134    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
135        let write_stall_timeout = self.runtime.lock().unwrap().write_stall_timeout;
136        if !self.pending.is_empty() {
137            self.flush_pending(write_stall_timeout)?;
138            if !self.pending.is_empty() {
139                return Err(io::Error::new(
140                    io::ErrorKind::WouldBlock,
141                    "backbone writer still stalled",
142                ));
143            }
144        }
145
146        let frame = hdlc::frame(data);
147        self.write_buffer(&frame, write_stall_timeout)
148    }
149}
150
151impl BackboneWriter {
152    fn write_buffer(
153        &mut self,
154        data: &[u8],
155        write_stall_timeout: Option<Duration>,
156    ) -> io::Result<()> {
157        let mut written = 0usize;
158        while written < data.len() {
159            match self.stream.write(&data[written..]) {
160                Ok(0) => {
161                    return Err(io::Error::new(
162                        io::ErrorKind::WriteZero,
163                        "backbone writer wrote zero bytes",
164                    ))
165                }
166                Ok(n) => {
167                    written += n;
168                    self.stall_started = None;
169                }
170                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
171                    let now = Instant::now();
172                    let started = self.stall_started.get_or_insert(now);
173                    if let Some(timeout) = write_stall_timeout {
174                        if now.duration_since(*started) >= timeout {
175                            return Err(self.disconnect_for_write_stall(timeout));
176                        }
177                    }
178                    if self.pending.len() + data[written..].len() > MAX_PENDING_BYTES {
179                        return Err(self.disconnect_for_write_stall(
180                            write_stall_timeout.unwrap_or(Duration::from_secs(30)),
181                        ));
182                    }
183                    self.pending.extend_from_slice(&data[written..]);
184                    return Err(io::Error::new(
185                        io::ErrorKind::WouldBlock,
186                        "backbone writer would block",
187                    ));
188                }
189                Err(e) => return Err(e),
190            }
191        }
192        Ok(())
193    }
194
195    fn flush_pending(&mut self, write_stall_timeout: Option<Duration>) -> io::Result<()> {
196        if self.pending.is_empty() {
197            return Ok(());
198        }
199
200        let pending = std::mem::take(&mut self.pending);
201        match self.write_buffer(&pending, write_stall_timeout) {
202            Ok(()) => Ok(()),
203            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(()),
204            Err(e) => Err(e),
205        }
206    }
207
208    fn disconnect_for_write_stall(&mut self, timeout: Duration) -> io::Error {
209        if !self.disconnect_notified {
210            log::warn!(
211                "[{}] backbone client {} disconnected due to write stall timeout ({:?})",
212                self.interface_name,
213                self.interface_id.0,
214                timeout
215            );
216            self.write_stall_flag.store(true, Ordering::Relaxed);
217            let _ = self.stream.shutdown(Shutdown::Both);
218            let _ = self.event_tx.send(Event::InterfaceDown(self.interface_id));
219            self.disconnect_notified = true;
220        }
221        io::Error::new(
222            io::ErrorKind::TimedOut,
223            format!("backbone writer stalled for {:?}", timeout),
224        )
225    }
226}
227
228/// Start a backbone interface. Binds TCP listener, spawns poll thread.
229pub fn start(config: BackboneConfig, tx: EventSender, next_id: Arc<AtomicU64>) -> io::Result<()> {
230    let addr = format!("{}:{}", config.listen_ip, config.listen_port);
231    let listener = TcpListener::bind(&addr)?;
232    listener.set_nonblocking(true)?;
233
234    log::info!(
235        "[{}] backbone server listening on {}",
236        config.name,
237        listener.local_addr().unwrap_or(addr.parse().unwrap())
238    );
239
240    let name = config.name.clone();
241    let server_interface_id = config.interface_id;
242    let runtime = Arc::clone(&config.runtime);
243    let peer_state = Arc::clone(&config.peer_state);
244    thread::Builder::new()
245        .name(format!("backbone-poll-{}", config.interface_id.0))
246        .spawn(move || {
247            if let Err(e) = poll_loop(
248                listener,
249                name,
250                server_interface_id,
251                tx,
252                next_id,
253                runtime,
254                peer_state,
255            ) {
256                log::error!("backbone poll loop error: {}", e);
257            }
258        })?;
259
260    Ok(())
261}
262
263/// Per-client tracking state.
264struct ClientState {
265    id: InterfaceId,
266    peer_ip: IpAddr,
267    peer_port: u16,
268    stream: TcpStream,
269    decoder: hdlc::Decoder,
270    connected_at: Instant,
271    has_received_data: bool,
272    write_stall_flag: Arc<AtomicBool>,
273}
274
275#[derive(Debug, Clone)]
276struct PeerBehaviorState {
277    blacklisted_until: Option<Instant>,
278    blacklist_reason: Option<String>,
279    reject_count: u64,
280    connected_count: usize,
281}
282
283impl PeerBehaviorState {
284    fn new() -> Self {
285        Self {
286            blacklisted_until: None,
287            blacklist_reason: None,
288            reject_count: 0,
289            connected_count: 0,
290        }
291    }
292}
293
294#[derive(Debug, Clone, Default)]
295pub struct BackbonePeerMonitor {
296    peers: HashMap<IpAddr, PeerBehaviorState>,
297}
298
299impl BackbonePeerMonitor {
300    pub fn new() -> Self {
301        Self {
302            peers: HashMap::new(),
303        }
304    }
305
306    fn upsert_snapshot(&mut self, peers: &HashMap<IpAddr, PeerBehaviorState>) {
307        let mut merged = self.peers.clone();
308
309        for (peer_ip, state) in peers {
310            let entry = merged
311                .entry(*peer_ip)
312                .or_insert_with(PeerBehaviorState::new);
313            entry.connected_count = state.connected_count;
314            entry.reject_count = state.reject_count;
315            if state.blacklisted_until.is_some() {
316                entry.blacklisted_until = state.blacklisted_until;
317                entry.blacklist_reason = state.blacklist_reason.clone();
318            }
319        }
320
321        merged.retain(|peer_ip, state| {
322            peers.contains_key(peer_ip)
323                || state.blacklisted_until.is_some()
324                || state.reject_count > 0
325        });
326        self.peers = merged;
327    }
328
329    fn sync_into(&self, peers: &mut HashMap<IpAddr, PeerBehaviorState>) {
330        for (peer_ip, state) in &self.peers {
331            let entry = peers.entry(*peer_ip).or_insert_with(PeerBehaviorState::new);
332            entry.blacklisted_until = state.blacklisted_until;
333            entry.blacklist_reason = state.blacklist_reason.clone();
334            entry.reject_count = state.reject_count;
335        }
336
337        peers.retain(|peer_ip, state| {
338            if state.connected_count > 0 {
339                return true;
340            }
341            self.peers.contains_key(peer_ip)
342        });
343    }
344
345    pub fn list(&self, interface_name: &str) -> Vec<BackbonePeerStateEntry> {
346        let now = Instant::now();
347        let mut entries: Vec<BackbonePeerStateEntry> = self
348            .peers
349            .iter()
350            .map(|(peer_ip, state)| BackbonePeerStateEntry {
351                interface_name: interface_name.to_string(),
352                peer_ip: *peer_ip,
353                connected_count: state.connected_count,
354                blacklisted_remaining_secs: state
355                    .blacklisted_until
356                    .and_then(|until| (until > now).then(|| (until - now).as_secs_f64())),
357                blacklist_reason: state.blacklist_reason.clone(),
358                reject_count: state.reject_count,
359            })
360            .collect();
361        entries.sort_by(|a, b| a.peer_ip.cmp(&b.peer_ip));
362        entries
363    }
364
365    pub fn clear(&mut self, peer_ip: IpAddr) -> bool {
366        self.peers.remove(&peer_ip).is_some()
367    }
368
369    pub fn blacklist(&mut self, peer_ip: IpAddr, duration: Duration, reason: String) -> bool {
370        let state = self
371            .peers
372            .entry(peer_ip)
373            .or_insert_with(PeerBehaviorState::new);
374        state.blacklisted_until = Some(Instant::now() + duration);
375        state.blacklist_reason = Some(reason);
376        true
377    }
378
379    #[cfg(test)]
380    pub fn seed_entry(&mut self, entry: BackbonePeerStateEntry) {
381        let mut state = PeerBehaviorState::new();
382        state.connected_count = entry.connected_count;
383        state.reject_count = entry.reject_count;
384        state.blacklist_reason = entry.blacklist_reason;
385        if let Some(remaining) = entry.blacklisted_remaining_secs {
386            state.blacklisted_until = Some(Instant::now() + Duration::from_secs_f64(remaining));
387        }
388        self.peers.insert(entry.peer_ip, state);
389    }
390}
391
392#[derive(Clone, Copy)]
393enum DisconnectReason {
394    RemoteClosed,
395    IdleTimeout,
396    WriteStall,
397}
398
399/// Main poll event loop.
400fn poll_loop(
401    listener: TcpListener,
402    name: String,
403    server_interface_id: InterfaceId,
404    tx: EventSender,
405    next_id: Arc<AtomicU64>,
406    runtime: Arc<Mutex<BackboneServerRuntime>>,
407    peer_state: Arc<Mutex<BackbonePeerMonitor>>,
408) -> io::Result<()> {
409    let poller = Poller::new()?;
410
411    const LISTENER_KEY: usize = 0;
412
413    // SAFETY: listener outlives its registration in the poller.
414    unsafe { poller.add(&listener, PollEvent::readable(LISTENER_KEY))? };
415
416    let mut clients: HashMap<usize, ClientState> = HashMap::new();
417    let mut peers: HashMap<IpAddr, PeerBehaviorState> = HashMap::new();
418    let mut events = Events::new();
419    let mut next_key: usize = 1;
420
421    loop {
422        let runtime_snapshot = runtime.lock().unwrap().clone();
423        let max_connections = runtime_snapshot.max_connections;
424        let idle_timeout = runtime_snapshot.idle_timeout;
425        cleanup_peer_state(&mut peers);
426        {
427            let mut monitor = peer_state.lock().unwrap();
428            monitor.sync_into(&mut peers);
429            monitor.upsert_snapshot(&peers);
430        }
431
432        events.clear();
433        poller.wait(&mut events, Some(Duration::from_secs(1)))?;
434
435        for ev in events.iter() {
436            if ev.key == LISTENER_KEY {
437                // Accept new connections
438                loop {
439                    match listener.accept() {
440                        Ok((stream, peer_addr)) => {
441                            let peer_ip = peer_addr.ip();
442                            let peer_port = peer_addr.port();
443
444                            if is_ip_blacklisted(&mut peers, peer_ip) {
445                                if let Some(state) = peers.get_mut(&peer_ip) {
446                                    state.reject_count = state.reject_count.saturating_add(1);
447                                }
448                                peer_state.lock().unwrap().upsert_snapshot(&peers);
449                                log::debug!("[{}] rejecting blacklisted peer {}", name, peer_addr);
450                                drop(stream);
451                                continue;
452                            }
453
454                            if let Some(max) = max_connections {
455                                if clients.len() >= max {
456                                    log::warn!(
457                                        "[{}] max connections ({}) reached, rejecting {}",
458                                        name,
459                                        max,
460                                        peer_addr
461                                    );
462                                    drop(stream);
463                                    continue;
464                                }
465                            }
466
467                            stream.set_nonblocking(true).ok();
468                            stream.set_nodelay(true).ok();
469                            set_tcp_keepalive(&stream).ok();
470
471                            // Prevent SIGPIPE on macOS when writing to broken pipes
472                            #[cfg(target_os = "macos")]
473                            {
474                                let sock = SockRef::from(&stream);
475                                sock.set_nosigpipe(true).ok();
476                            }
477
478                            let key = next_key;
479                            next_key += 1;
480                            let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
481
482                            log::info!(
483                                "[{}] backbone client connected: {} → id {}",
484                                name,
485                                peer_addr,
486                                client_id.0
487                            );
488
489                            // Register client with poller
490                            // SAFETY: stream is stored in ClientState and outlives registration.
491                            if let Err(e) = unsafe { poller.add(&stream, PollEvent::readable(key)) }
492                            {
493                                log::warn!("[{}] failed to add client to poller: {}", name, e);
494                                continue; // stream drops, closing socket
495                            }
496
497                            // Create writer via try_clone (cross-platform dup)
498                            let writer_stream = match stream.try_clone() {
499                                Ok(s) => s,
500                                Err(e) => {
501                                    log::warn!("[{}] failed to clone client stream: {}", name, e);
502                                    let _ = poller.delete(&stream);
503                                    continue; // stream drops
504                                }
505                            };
506                            let write_stall_flag = Arc::new(AtomicBool::new(false));
507                            let writer: Box<dyn Writer> = Box::new(BackboneWriter {
508                                stream: writer_stream,
509                                runtime: Arc::clone(&runtime),
510                                interface_name: name.clone(),
511                                interface_id: client_id,
512                                event_tx: tx.clone(),
513                                pending: Vec::new(),
514                                stall_started: None,
515                                disconnect_notified: false,
516                                write_stall_flag: Arc::clone(&write_stall_flag),
517                            });
518
519                            clients.insert(
520                                key,
521                                ClientState {
522                                    id: client_id,
523                                    peer_ip,
524                                    peer_port,
525                                    stream,
526                                    decoder: hdlc::Decoder::new(),
527                                    connected_at: Instant::now(),
528                                    has_received_data: false,
529                                    write_stall_flag,
530                                },
531                            );
532                            peers
533                                .entry(peer_ip)
534                                .or_insert_with(PeerBehaviorState::new)
535                                .connected_count += 1;
536                            peer_state.lock().unwrap().upsert_snapshot(&peers);
537                            let _ = tx.send(Event::BackbonePeerConnected {
538                                server_interface_id,
539                                peer_interface_id: client_id,
540                                peer_ip,
541                                peer_port,
542                            });
543
544                            let info = InterfaceInfo {
545                                id: client_id,
546                                name: format!("BackboneInterface/{}", client_id.0),
547                                mode: constants::MODE_FULL,
548                                out_capable: true,
549                                in_capable: true,
550                                bitrate: Some(1_000_000_000), // 1 Gbps guess
551                                announce_rate_target: None,
552                                announce_rate_grace: 0,
553                                announce_rate_penalty: 0.0,
554                                announce_cap: constants::ANNOUNCE_CAP,
555                                is_local_client: false,
556                                wants_tunnel: false,
557                                tunnel_id: None,
558                                mtu: 65535,
559                                ia_freq: 0.0,
560                                started: 0.0,
561                                ingress_control: true,
562                            };
563
564                            if tx
565                                .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
566                                .is_err()
567                            {
568                                // Driver shut down
569                                cleanup(&poller, &clients, &listener);
570                                return Ok(());
571                            }
572                        }
573                        Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
574                        Err(e) => {
575                            log::warn!("[{}] accept error: {}", name, e);
576                            break;
577                        }
578                    }
579                }
580                // Re-arm listener (oneshot semantics)
581                poller.modify(&listener, PollEvent::readable(LISTENER_KEY))?;
582            } else if clients.contains_key(&ev.key) {
583                let key = ev.key;
584                let mut should_remove = false;
585                let mut client_id = InterfaceId(0);
586
587                let mut buf = [0u8; 4096];
588                let read_result = {
589                    let client = clients.get_mut(&key).unwrap();
590                    client.stream.read(&mut buf)
591                };
592
593                match read_result {
594                    Ok(0) | Err(_) => {
595                        if let Some(c) = clients.get(&key) {
596                            client_id = c.id;
597                        }
598                        should_remove = true;
599                    }
600                    Ok(n) => {
601                        let client = clients.get_mut(&key).unwrap();
602                        client_id = client.id;
603                        client.has_received_data = true;
604                        for frame in client.decoder.feed(&buf[..n]) {
605                            if tx
606                                .send(Event::Frame {
607                                    interface_id: client_id,
608                                    data: frame,
609                                })
610                                .is_err()
611                            {
612                                cleanup(&poller, &clients, &listener);
613                                return Ok(());
614                            }
615                        }
616                    }
617                }
618
619                if should_remove {
620                    let reason = if clients
621                        .get(&key)
622                        .is_some_and(|c| c.write_stall_flag.load(Ordering::Relaxed))
623                    {
624                        DisconnectReason::WriteStall
625                    } else {
626                        DisconnectReason::RemoteClosed
627                    };
628                    disconnect_client(
629                        &poller,
630                        &mut clients,
631                        &mut peers,
632                        &name,
633                        server_interface_id,
634                        &tx,
635                        &peer_state,
636                        key,
637                        client_id,
638                        reason,
639                    );
640                } else if let Some(client) = clients.get(&key) {
641                    // Re-arm client (oneshot semantics)
642                    poller.modify(&client.stream, PollEvent::readable(key))?;
643                }
644            }
645        }
646
647        if let Some(timeout) = idle_timeout {
648            let now = Instant::now();
649            let timed_out: Vec<(usize, InterfaceId)> = clients
650                .iter()
651                .filter_map(|(&key, client)| {
652                    if client.has_received_data || now.duration_since(client.connected_at) < timeout
653                    {
654                        None
655                    } else {
656                        Some((key, client.id))
657                    }
658                })
659                .collect();
660
661            for (key, client_id) in timed_out {
662                disconnect_client(
663                    &poller,
664                    &mut clients,
665                    &mut peers,
666                    &name,
667                    server_interface_id,
668                    &tx,
669                    &peer_state,
670                    key,
671                    client_id,
672                    DisconnectReason::IdleTimeout,
673                );
674            }
675        }
676    }
677}
678
679fn cleanup_peer_state(peers: &mut HashMap<IpAddr, PeerBehaviorState>) {
680    let now = Instant::now();
681    peers.retain(|_, state| {
682        if matches!(state.blacklisted_until, Some(until) if now >= until) {
683            state.blacklisted_until = None;
684            state.blacklist_reason = None;
685        }
686        state.blacklisted_until.is_some() || state.connected_count > 0 || state.reject_count > 0
687    });
688}
689
690fn is_ip_blacklisted(peers: &mut HashMap<IpAddr, PeerBehaviorState>, peer_ip: IpAddr) -> bool {
691    let now = Instant::now();
692    if let Some(state) = peers.get_mut(&peer_ip) {
693        if let Some(until) = state.blacklisted_until {
694            if now < until {
695                return true;
696            }
697            state.blacklisted_until = None;
698        }
699    }
700    false
701}
702
703fn disconnect_client(
704    poller: &Poller,
705    clients: &mut HashMap<usize, ClientState>,
706    peers: &mut HashMap<IpAddr, PeerBehaviorState>,
707    name: &str,
708    server_interface_id: InterfaceId,
709    tx: &EventSender,
710    peer_state: &Arc<Mutex<BackbonePeerMonitor>>,
711    key: usize,
712    client_id: InterfaceId,
713    reason: DisconnectReason,
714) {
715    let Some(client) = clients.remove(&key) else {
716        return;
717    };
718
719    match reason {
720        DisconnectReason::RemoteClosed => {
721            log::info!("[{}] backbone client {} disconnected", name, client_id.0);
722        }
723        DisconnectReason::IdleTimeout => {
724            log::info!(
725                "[{}] backbone client {} disconnected due to idle timeout",
726                name,
727                client_id.0
728            );
729        }
730        DisconnectReason::WriteStall => {
731            // Already logged by BackboneWriter::disconnect_for_write_stall
732        }
733    }
734
735    let _ = poller.delete(&client.stream);
736    // client.stream closes on drop
737    let connected_for = client.connected_at.elapsed();
738    let _ = tx.send(Event::BackbonePeerDisconnected {
739        server_interface_id,
740        peer_interface_id: client.id,
741        peer_ip: client.peer_ip,
742        peer_port: client.peer_port,
743        connected_for,
744        had_received_data: client.has_received_data,
745    });
746    match reason {
747        DisconnectReason::IdleTimeout => {
748            let _ = tx.send(Event::BackbonePeerIdleTimeout {
749                server_interface_id,
750                peer_interface_id: client.id,
751                peer_ip: client.peer_ip,
752                peer_port: client.peer_port,
753                connected_for,
754            });
755        }
756        DisconnectReason::WriteStall => {
757            let _ = tx.send(Event::BackbonePeerWriteStall {
758                server_interface_id,
759                peer_interface_id: client.id,
760                peer_ip: client.peer_ip,
761                peer_port: client.peer_port,
762                connected_for,
763            });
764        }
765        DisconnectReason::RemoteClosed => {}
766    }
767
768    if let Some(state) = peers.get_mut(&client.peer_ip) {
769        state.connected_count = state.connected_count.saturating_sub(1);
770    }
771    peer_state.lock().unwrap().upsert_snapshot(peers);
772    // Writer already sent InterfaceDown for write stalls; avoid duplicate.
773    if !matches!(reason, DisconnectReason::WriteStall) {
774        let _ = tx.send(Event::InterfaceDown(client_id));
775    }
776}
777
778fn set_tcp_keepalive(stream: &TcpStream) -> io::Result<()> {
779    let sock = SockRef::from(stream);
780    let mut keepalive = TcpKeepalive::new()
781        .with_time(Duration::from_secs(5))
782        .with_interval(Duration::from_secs(2));
783    #[cfg(any(target_os = "linux", target_os = "macos"))]
784    {
785        keepalive = keepalive.with_retries(12);
786    }
787    sock.set_tcp_keepalive(&keepalive)
788}
789
790fn cleanup(poller: &Poller, clients: &HashMap<usize, ClientState>, listener: &TcpListener) {
791    for (_, client) in clients {
792        let _ = poller.delete(&client.stream);
793    }
794    let _ = poller.delete(listener);
795}
796
797// ---------------------------------------------------------------------------
798// Client mode
799// ---------------------------------------------------------------------------
800
801/// Configuration for a backbone client interface.
802#[derive(Debug, Clone)]
803pub struct BackboneClientConfig {
804    pub name: String,
805    pub target_host: String,
806    pub target_port: u16,
807    pub interface_id: InterfaceId,
808    pub reconnect_wait: Duration,
809    pub max_reconnect_tries: Option<u32>,
810    pub connect_timeout: Duration,
811    pub transport_identity: Option<String>,
812    pub runtime: Arc<Mutex<BackboneClientRuntime>>,
813}
814
815#[derive(Debug, Clone)]
816pub struct BackboneClientRuntime {
817    pub reconnect_wait: Duration,
818    pub max_reconnect_tries: Option<u32>,
819    pub connect_timeout: Duration,
820}
821
822impl BackboneClientRuntime {
823    pub fn from_config(config: &BackboneClientConfig) -> Self {
824        Self {
825            reconnect_wait: config.reconnect_wait,
826            max_reconnect_tries: config.max_reconnect_tries,
827            connect_timeout: config.connect_timeout,
828        }
829    }
830}
831
832#[derive(Debug, Clone)]
833pub struct BackboneClientRuntimeConfigHandle {
834    pub interface_name: String,
835    pub runtime: Arc<Mutex<BackboneClientRuntime>>,
836    pub startup: BackboneClientRuntime,
837}
838
839impl Default for BackboneClientConfig {
840    fn default() -> Self {
841        let mut config = BackboneClientConfig {
842            name: String::new(),
843            target_host: "127.0.0.1".into(),
844            target_port: 4242,
845            interface_id: InterfaceId(0),
846            reconnect_wait: Duration::from_secs(5),
847            max_reconnect_tries: None,
848            connect_timeout: Duration::from_secs(5),
849            transport_identity: None,
850            runtime: Arc::new(Mutex::new(BackboneClientRuntime {
851                reconnect_wait: Duration::from_secs(5),
852                max_reconnect_tries: None,
853                connect_timeout: Duration::from_secs(5),
854            })),
855        };
856        let startup = BackboneClientRuntime::from_config(&config);
857        config.runtime = Arc::new(Mutex::new(startup));
858        config
859    }
860}
861
862/// Writer that sends HDLC-framed data over a TCP stream (client mode).
863struct BackboneClientWriter {
864    stream: TcpStream,
865}
866
867impl Writer for BackboneClientWriter {
868    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
869        self.stream.write_all(&hdlc::frame(data))
870    }
871}
872
873/// Try to connect to the target host:port with timeout.
874fn try_connect_client(config: &BackboneClientConfig) -> io::Result<TcpStream> {
875    let runtime = config.runtime.lock().unwrap().clone();
876    let addr_str = format!("{}:{}", config.target_host, config.target_port);
877    let addr = addr_str
878        .to_socket_addrs()?
879        .next()
880        .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "no addresses resolved"))?;
881
882    let stream = TcpStream::connect_timeout(&addr, runtime.connect_timeout)?;
883    stream.set_nodelay(true)?;
884    set_tcp_keepalive(&stream).ok();
885
886    // Prevent SIGPIPE on macOS when writing to broken pipes
887    #[cfg(target_os = "macos")]
888    {
889        let sock = SockRef::from(&stream);
890        sock.set_nosigpipe(true).ok();
891    }
892
893    Ok(stream)
894}
895
896/// Connect and start the reader thread. Returns the writer for the driver.
897pub fn start_client(config: BackboneClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
898    let stream = try_connect_client(&config)?;
899    let reader_stream = stream.try_clone()?;
900    let writer_stream = stream.try_clone()?;
901
902    let id = config.interface_id;
903    log::info!(
904        "[{}] backbone client connected to {}:{}",
905        config.name,
906        config.target_host,
907        config.target_port
908    );
909
910    // Initial connect: writer is None because it's returned directly to the caller
911    let _ = tx.send(Event::InterfaceUp(id, None, None));
912
913    thread::Builder::new()
914        .name(format!("backbone-client-{}", id.0))
915        .spawn(move || {
916            client_reader_loop(reader_stream, config, tx);
917        })?;
918
919    Ok(Box::new(BackboneClientWriter {
920        stream: writer_stream,
921    }))
922}
923
924/// Reader thread: reads from socket, HDLC-decodes, sends frames to driver.
925/// On disconnect, attempts reconnection.
926fn client_reader_loop(mut stream: TcpStream, config: BackboneClientConfig, tx: EventSender) {
927    let id = config.interface_id;
928    let mut decoder = hdlc::Decoder::new();
929    let mut buf = [0u8; 4096];
930
931    loop {
932        match stream.read(&mut buf) {
933            Ok(0) => {
934                log::warn!("[{}] connection closed", config.name);
935                let _ = tx.send(Event::InterfaceDown(id));
936                match client_reconnect(&config, &tx) {
937                    Some(new_stream) => {
938                        stream = new_stream;
939                        decoder = hdlc::Decoder::new();
940                        continue;
941                    }
942                    None => {
943                        log::error!("[{}] reconnection failed, giving up", config.name);
944                        return;
945                    }
946                }
947            }
948            Ok(n) => {
949                for frame in decoder.feed(&buf[..n]) {
950                    if tx
951                        .send(Event::Frame {
952                            interface_id: id,
953                            data: frame,
954                        })
955                        .is_err()
956                    {
957                        return;
958                    }
959                }
960            }
961            Err(e) => {
962                log::warn!("[{}] read error: {}", config.name, e);
963                let _ = tx.send(Event::InterfaceDown(id));
964                match client_reconnect(&config, &tx) {
965                    Some(new_stream) => {
966                        stream = new_stream;
967                        decoder = hdlc::Decoder::new();
968                        continue;
969                    }
970                    None => {
971                        log::error!("[{}] reconnection failed, giving up", config.name);
972                        return;
973                    }
974                }
975            }
976        }
977    }
978}
979
980/// Maximum backoff multiplier: `base_delay * 2^MAX_BACKOFF_SHIFT`.
981/// With a 5 s base this caps at 5 × 2^6 = 320 s ≈ 5 min.
982const MAX_BACKOFF_SHIFT: u32 = 6;
983
984/// Attempt to reconnect with exponential backoff and jitter.
985/// Returns the new reader stream on success.
986/// Sends the new writer to the driver via InterfaceUp event.
987fn client_reconnect(config: &BackboneClientConfig, tx: &EventSender) -> Option<TcpStream> {
988    let mut attempts = 0u32;
989    loop {
990        let runtime = config.runtime.lock().unwrap().clone();
991
992        let shift = attempts.min(MAX_BACKOFF_SHIFT);
993        let backoff = runtime.reconnect_wait * 2u32.pow(shift);
994        // Add ±25 % jitter to avoid thundering-herd reconnects.
995        let jitter_range = backoff / 4;
996        let jitter = if jitter_range.as_nanos() > 0 {
997            let offset = Duration::from_nanos(
998                (std::hash::RandomState::new().build_hasher().finish()
999                    % jitter_range.as_nanos() as u64)
1000                    * 2,
1001            );
1002            if offset > jitter_range {
1003                backoff + (offset - jitter_range)
1004            } else {
1005                backoff - (jitter_range - offset)
1006            }
1007        } else {
1008            backoff
1009        };
1010        thread::sleep(jitter);
1011
1012        attempts += 1;
1013
1014        if let Some(max) = runtime.max_reconnect_tries {
1015            if attempts > max {
1016                let _ = tx.send(Event::InterfaceDown(config.interface_id));
1017                return None;
1018            }
1019        }
1020
1021        log::info!(
1022            "[{}] reconnect attempt {} (backoff {:.1}s) ...",
1023            config.name,
1024            attempts,
1025            jitter.as_secs_f64(),
1026        );
1027
1028        match try_connect_client(config) {
1029            Ok(new_stream) => {
1030                let writer_stream = match new_stream.try_clone() {
1031                    Ok(s) => s,
1032                    Err(e) => {
1033                        log::warn!("[{}] failed to clone stream: {}", config.name, e);
1034                        continue;
1035                    }
1036                };
1037                log::info!(
1038                    "[{}] reconnected after {} attempt(s)",
1039                    config.name,
1040                    attempts
1041                );
1042                let new_writer: Box<dyn Writer> = Box::new(BackboneClientWriter {
1043                    stream: writer_stream,
1044                });
1045                let _ = tx.send(Event::InterfaceUp(
1046                    config.interface_id,
1047                    Some(new_writer),
1048                    None,
1049                ));
1050                return Some(new_stream);
1051            }
1052            Err(e) => {
1053                log::warn!("[{}] reconnect failed: {}", config.name, e);
1054            }
1055        }
1056    }
1057}
1058
1059// ---------------------------------------------------------------------------
1060// Factory
1061// ---------------------------------------------------------------------------
1062
1063/// Internal enum used by [`BackboneInterfaceFactory`] to carry either a
1064/// server or client config through the opaque `InterfaceConfigData` channel.
1065pub(crate) enum BackboneMode {
1066    Server(BackboneConfig),
1067    Client(BackboneClientConfig),
1068}
1069
1070/// Factory for `BackboneInterface`.
1071///
1072/// If the config params contain `"remote"` or `"target_host"` the interface
1073/// is started in client mode; otherwise it is started as a TCP listener
1074/// (server mode).
1075pub struct BackboneInterfaceFactory;
1076
1077fn parse_positive_duration_secs(params: &HashMap<String, String>, key: &str) -> Option<Duration> {
1078    params
1079        .get(key)
1080        .and_then(|v| v.parse::<f64>().ok())
1081        .filter(|v| *v > 0.0)
1082        .map(Duration::from_secs_f64)
1083}
1084
1085impl InterfaceFactory for BackboneInterfaceFactory {
1086    fn type_name(&self) -> &str {
1087        "BackboneInterface"
1088    }
1089
1090    fn parse_config(
1091        &self,
1092        name: &str,
1093        id: InterfaceId,
1094        params: &HashMap<String, String>,
1095    ) -> Result<Box<dyn InterfaceConfigData>, String> {
1096        if let Some(target_host) = params.get("remote").or_else(|| params.get("target_host")) {
1097            // Client mode
1098            let target_host = target_host.clone();
1099            let target_port = params
1100                .get("target_port")
1101                .or_else(|| params.get("port"))
1102                .and_then(|v| v.parse().ok())
1103                .unwrap_or(4242);
1104            let transport_identity = params.get("transport_identity").cloned();
1105            Ok(Box::new(BackboneMode::Client(BackboneClientConfig {
1106                name: name.to_string(),
1107                target_host,
1108                target_port,
1109                interface_id: id,
1110                transport_identity,
1111                ..BackboneClientConfig::default()
1112            })))
1113        } else {
1114            // Server mode
1115            let listen_ip = params
1116                .get("listen_ip")
1117                .or_else(|| params.get("device"))
1118                .cloned()
1119                .unwrap_or_else(|| "0.0.0.0".into());
1120            let listen_port = params
1121                .get("listen_port")
1122                .or_else(|| params.get("port"))
1123                .and_then(|v| v.parse().ok())
1124                .unwrap_or(4242);
1125            let max_connections = params.get("max_connections").and_then(|v| v.parse().ok());
1126            let idle_timeout = parse_positive_duration_secs(params, "idle_timeout");
1127            let write_stall_timeout = parse_positive_duration_secs(params, "write_stall_timeout");
1128            let abuse = BackboneAbuseConfig {
1129                max_penalty_duration: parse_positive_duration_secs(params, "max_penalty_duration"),
1130            };
1131            let mut config = BackboneConfig {
1132                name: name.to_string(),
1133                listen_ip,
1134                listen_port,
1135                interface_id: id,
1136                max_connections,
1137                idle_timeout,
1138                write_stall_timeout,
1139                abuse,
1140                runtime: Arc::new(Mutex::new(BackboneServerRuntime {
1141                    max_connections: None,
1142                    idle_timeout: None,
1143                    write_stall_timeout: None,
1144                    abuse: BackboneAbuseConfig::default(),
1145                })),
1146                peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
1147            };
1148            let startup = BackboneServerRuntime::from_config(&config);
1149            config.runtime = Arc::new(Mutex::new(startup));
1150            Ok(Box::new(BackboneMode::Server(config)))
1151        }
1152    }
1153
1154    fn start(
1155        &self,
1156        config: Box<dyn InterfaceConfigData>,
1157        ctx: StartContext,
1158    ) -> io::Result<StartResult> {
1159        let mode = *config.into_any().downcast::<BackboneMode>().map_err(|_| {
1160            io::Error::new(
1161                io::ErrorKind::InvalidData,
1162                "wrong config type for BackboneInterface",
1163            )
1164        })?;
1165
1166        match mode {
1167            BackboneMode::Client(cfg) => {
1168                let id = cfg.interface_id;
1169                let name = cfg.name.clone();
1170                let info = InterfaceInfo {
1171                    id,
1172                    name,
1173                    mode: ctx.mode,
1174                    out_capable: true,
1175                    in_capable: true,
1176                    bitrate: Some(1_000_000_000),
1177                    announce_rate_target: None,
1178                    announce_rate_grace: 0,
1179                    announce_rate_penalty: 0.0,
1180                    announce_cap: constants::ANNOUNCE_CAP,
1181                    is_local_client: false,
1182                    wants_tunnel: false,
1183                    tunnel_id: None,
1184                    mtu: 65535,
1185                    ingress_control: true,
1186                    ia_freq: 0.0,
1187                    started: crate::time::now(),
1188                };
1189                let writer = start_client(cfg, ctx.tx)?;
1190                Ok(StartResult::Simple {
1191                    id,
1192                    info,
1193                    writer,
1194                    interface_type_name: "BackboneInterface".to_string(),
1195                })
1196            }
1197            BackboneMode::Server(cfg) => {
1198                start(cfg, ctx.tx, ctx.next_dynamic_id)?;
1199                Ok(StartResult::Listener { control: None })
1200            }
1201        }
1202    }
1203}
1204
1205pub(crate) fn runtime_handle_from_mode(mode: &BackboneMode) -> Option<BackboneRuntimeConfigHandle> {
1206    match mode {
1207        BackboneMode::Server(config) => Some(BackboneRuntimeConfigHandle {
1208            interface_name: config.name.clone(),
1209            runtime: Arc::clone(&config.runtime),
1210            startup: BackboneServerRuntime::from_config(config),
1211        }),
1212        BackboneMode::Client(_) => None,
1213    }
1214}
1215
1216pub(crate) fn peer_state_handle_from_mode(mode: &BackboneMode) -> Option<BackbonePeerStateHandle> {
1217    match mode {
1218        BackboneMode::Server(config) => Some(BackbonePeerStateHandle {
1219            interface_id: config.interface_id,
1220            interface_name: config.name.clone(),
1221            peer_state: Arc::clone(&config.peer_state),
1222        }),
1223        BackboneMode::Client(_) => None,
1224    }
1225}
1226
1227pub(crate) fn client_runtime_handle_from_mode(
1228    mode: &BackboneMode,
1229) -> Option<BackboneClientRuntimeConfigHandle> {
1230    match mode {
1231        BackboneMode::Client(config) => Some(BackboneClientRuntimeConfigHandle {
1232            interface_name: config.name.clone(),
1233            runtime: Arc::clone(&config.runtime),
1234            startup: BackboneClientRuntime::from_config(config),
1235        }),
1236        BackboneMode::Server(_) => None,
1237    }
1238}
1239
1240#[cfg(test)]
1241mod tests {
1242    use super::*;
1243    use std::sync::mpsc;
1244    use std::time::Duration;
1245
1246    fn find_free_port() -> u16 {
1247        TcpListener::bind("127.0.0.1:0")
1248            .unwrap()
1249            .local_addr()
1250            .unwrap()
1251            .port()
1252    }
1253
1254    fn recv_non_peer_event(
1255        rx: &mpsc::Receiver<Event>,
1256        timeout: Duration,
1257    ) -> Result<Event, mpsc::RecvTimeoutError> {
1258        let deadline = Instant::now() + timeout;
1259        loop {
1260            let remaining = deadline.saturating_duration_since(Instant::now());
1261            if remaining.is_zero() {
1262                return Err(mpsc::RecvTimeoutError::Timeout);
1263            }
1264            let event = rx.recv_timeout(remaining)?;
1265            match event {
1266                Event::BackbonePeerConnected { .. }
1267                | Event::BackbonePeerDisconnected { .. }
1268                | Event::BackbonePeerIdleTimeout { .. }
1269                | Event::BackbonePeerWriteStall { .. }
1270                | Event::BackbonePeerPenalty { .. } => continue,
1271                other => return Ok(other),
1272            }
1273        }
1274    }
1275
1276    fn make_server_config(
1277        port: u16,
1278        interface_id: u64,
1279        max_connections: Option<usize>,
1280        idle_timeout: Option<Duration>,
1281        write_stall_timeout: Option<Duration>,
1282        abuse: BackboneAbuseConfig,
1283    ) -> BackboneConfig {
1284        let mut config = BackboneConfig {
1285            name: "test-backbone".into(),
1286            listen_ip: "127.0.0.1".into(),
1287            listen_port: port,
1288            interface_id: InterfaceId(interface_id),
1289            max_connections,
1290            idle_timeout,
1291            write_stall_timeout,
1292            abuse,
1293            runtime: Arc::new(Mutex::new(BackboneServerRuntime {
1294                max_connections: None,
1295                idle_timeout: None,
1296                write_stall_timeout: None,
1297                abuse: BackboneAbuseConfig::default(),
1298            })),
1299            peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
1300        };
1301        let startup = BackboneServerRuntime::from_config(&config);
1302        config.runtime = Arc::new(Mutex::new(startup));
1303        config
1304    }
1305
1306    #[test]
1307    fn backbone_accept_connection() {
1308        let port = find_free_port();
1309        let (tx, rx) = crate::event::channel();
1310        let next_id = Arc::new(AtomicU64::new(8000));
1311
1312        let config = make_server_config(port, 80, None, None, None, BackboneAbuseConfig::default());
1313
1314        start(config, tx, next_id).unwrap();
1315        thread::sleep(Duration::from_millis(50));
1316
1317        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1318
1319        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1320        match event {
1321            Event::InterfaceUp(id, writer, info) => {
1322                assert_eq!(id, InterfaceId(8000));
1323                assert!(writer.is_some());
1324                assert!(info.is_some());
1325                let info = info.unwrap();
1326                assert!(info.out_capable);
1327                assert!(info.in_capable);
1328            }
1329            other => panic!("expected InterfaceUp, got {:?}", other),
1330        }
1331    }
1332
1333    #[test]
1334    fn backbone_receive_frame() {
1335        let port = find_free_port();
1336        let (tx, rx) = crate::event::channel();
1337        let next_id = Arc::new(AtomicU64::new(8100));
1338
1339        let config = make_server_config(port, 81, None, None, None, BackboneAbuseConfig::default());
1340
1341        start(config, tx, next_id).unwrap();
1342        thread::sleep(Duration::from_millis(50));
1343
1344        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1345
1346        // Drain InterfaceUp
1347        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1348
1349        // Send HDLC frame (>= 19 bytes)
1350        let payload: Vec<u8> = (0..32).collect();
1351        client.write_all(&hdlc::frame(&payload)).unwrap();
1352
1353        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1354        match event {
1355            Event::Frame { interface_id, data } => {
1356                assert_eq!(interface_id, InterfaceId(8100));
1357                assert_eq!(data, payload);
1358            }
1359            other => panic!("expected Frame, got {:?}", other),
1360        }
1361    }
1362
1363    #[test]
1364    fn backbone_send_to_client() {
1365        let port = find_free_port();
1366        let (tx, rx) = crate::event::channel();
1367        let next_id = Arc::new(AtomicU64::new(8200));
1368
1369        let config = make_server_config(port, 82, None, None, None, BackboneAbuseConfig::default());
1370
1371        start(config, tx, next_id).unwrap();
1372        thread::sleep(Duration::from_millis(50));
1373
1374        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1375        client
1376            .set_read_timeout(Some(Duration::from_secs(2)))
1377            .unwrap();
1378
1379        // Get writer from InterfaceUp
1380        let event = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1381        let mut writer = match event {
1382            Event::InterfaceUp(_, Some(w), _) => w,
1383            other => panic!("expected InterfaceUp with writer, got {:?}", other),
1384        };
1385
1386        // Send frame via writer
1387        let payload: Vec<u8> = (0..24).collect();
1388        writer.send_frame(&payload).unwrap();
1389
1390        // Read from client
1391        let mut buf = [0u8; 256];
1392        let n = client.read(&mut buf).unwrap();
1393        let expected = hdlc::frame(&payload);
1394        assert_eq!(&buf[..n], &expected[..]);
1395    }
1396
1397    #[test]
1398    fn backbone_multiple_clients() {
1399        let port = find_free_port();
1400        let (tx, rx) = crate::event::channel();
1401        let next_id = Arc::new(AtomicU64::new(8300));
1402
1403        let config = make_server_config(port, 83, None, None, None, BackboneAbuseConfig::default());
1404
1405        start(config, tx, next_id).unwrap();
1406        thread::sleep(Duration::from_millis(50));
1407
1408        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1409        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1410
1411        let mut ids = Vec::new();
1412        for _ in 0..2 {
1413            let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1414            match event {
1415                Event::InterfaceUp(id, _, _) => ids.push(id),
1416                other => panic!("expected InterfaceUp, got {:?}", other),
1417            }
1418        }
1419
1420        assert_eq!(ids.len(), 2);
1421        assert_ne!(ids[0], ids[1]);
1422    }
1423
1424    #[test]
1425    fn backbone_client_disconnect() {
1426        let port = find_free_port();
1427        let (tx, rx) = crate::event::channel();
1428        let next_id = Arc::new(AtomicU64::new(8400));
1429
1430        let config = make_server_config(port, 84, None, None, None, BackboneAbuseConfig::default());
1431
1432        start(config, tx, next_id).unwrap();
1433        thread::sleep(Duration::from_millis(50));
1434
1435        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1436
1437        // Drain InterfaceUp
1438        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1439
1440        // Disconnect
1441        drop(client);
1442
1443        // Should receive InterfaceDown
1444        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1445        assert!(
1446            matches!(event, Event::InterfaceDown(InterfaceId(8400))),
1447            "expected InterfaceDown(8400), got {:?}",
1448            event
1449        );
1450    }
1451
1452    #[test]
1453    fn backbone_epoll_multiplexing() {
1454        let port = find_free_port();
1455        let (tx, rx) = crate::event::channel();
1456        let next_id = Arc::new(AtomicU64::new(8500));
1457
1458        let config = make_server_config(port, 85, None, None, None, BackboneAbuseConfig::default());
1459
1460        start(config, tx, next_id).unwrap();
1461        thread::sleep(Duration::from_millis(50));
1462
1463        let mut client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1464        let mut client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1465
1466        // Drain both InterfaceUp events
1467        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1468        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1469
1470        // Both clients send data simultaneously
1471        let payload1: Vec<u8> = (0..24).collect();
1472        let payload2: Vec<u8> = (100..130).collect();
1473        client1.write_all(&hdlc::frame(&payload1)).unwrap();
1474        client2.write_all(&hdlc::frame(&payload2)).unwrap();
1475
1476        // Should receive both Frame events
1477        let mut received = Vec::new();
1478        for _ in 0..2 {
1479            let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1480            match event {
1481                Event::Frame { data, .. } => received.push(data),
1482                other => panic!("expected Frame, got {:?}", other),
1483            }
1484        }
1485        assert!(received.contains(&payload1));
1486        assert!(received.contains(&payload2));
1487    }
1488
1489    #[test]
1490    fn backbone_bind_port() {
1491        let port = find_free_port();
1492        let (tx, _rx) = crate::event::channel();
1493        let next_id = Arc::new(AtomicU64::new(8600));
1494
1495        let config = make_server_config(port, 86, None, None, None, BackboneAbuseConfig::default());
1496
1497        // Should not error
1498        start(config, tx, next_id).unwrap();
1499    }
1500
1501    #[test]
1502    fn backbone_hdlc_fragmented() {
1503        let port = find_free_port();
1504        let (tx, rx) = crate::event::channel();
1505        let next_id = Arc::new(AtomicU64::new(8700));
1506
1507        let config = make_server_config(port, 87, None, None, None, BackboneAbuseConfig::default());
1508
1509        start(config, tx, next_id).unwrap();
1510        thread::sleep(Duration::from_millis(50));
1511
1512        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1513        client.set_nodelay(true).unwrap();
1514
1515        // Drain InterfaceUp
1516        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1517
1518        // Send HDLC frame in two fragments
1519        let payload: Vec<u8> = (0..32).collect();
1520        let framed = hdlc::frame(&payload);
1521        let mid = framed.len() / 2;
1522
1523        client.write_all(&framed[..mid]).unwrap();
1524        thread::sleep(Duration::from_millis(50));
1525        client.write_all(&framed[mid..]).unwrap();
1526
1527        // Should receive reassembled frame
1528        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1529        match event {
1530            Event::Frame { data, .. } => {
1531                assert_eq!(data, payload);
1532            }
1533            other => panic!("expected Frame, got {:?}", other),
1534        }
1535    }
1536
1537    // -----------------------------------------------------------------------
1538    // Client mode tests
1539    // -----------------------------------------------------------------------
1540
1541    fn make_client_config(port: u16, id: u64) -> BackboneClientConfig {
1542        BackboneClientConfig {
1543            name: format!("test-bb-client-{}", port),
1544            target_host: "127.0.0.1".into(),
1545            target_port: port,
1546            interface_id: InterfaceId(id),
1547            reconnect_wait: Duration::from_millis(100),
1548            max_reconnect_tries: Some(2),
1549            connect_timeout: Duration::from_secs(2),
1550            transport_identity: None,
1551            runtime: Arc::new(Mutex::new(BackboneClientRuntime {
1552                reconnect_wait: Duration::from_millis(100),
1553                max_reconnect_tries: Some(2),
1554                connect_timeout: Duration::from_secs(2),
1555            })),
1556        }
1557    }
1558
1559    #[test]
1560    fn backbone_client_connect() {
1561        let port = find_free_port();
1562        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1563        let (tx, rx) = crate::event::channel();
1564
1565        let config = make_client_config(port, 9000);
1566        let _writer = start_client(config, tx).unwrap();
1567
1568        let _server_stream = listener.accept().unwrap();
1569
1570        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1571        assert!(matches!(event, Event::InterfaceUp(InterfaceId(9000), _, _)));
1572    }
1573
1574    #[test]
1575    fn backbone_client_receive_frame() {
1576        let port = find_free_port();
1577        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1578        let (tx, rx) = crate::event::channel();
1579
1580        let config = make_client_config(port, 9100);
1581        let _writer = start_client(config, tx).unwrap();
1582
1583        let (mut server_stream, _) = listener.accept().unwrap();
1584
1585        // Drain InterfaceUp
1586        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1587
1588        // Send HDLC frame from server side (>= 19 bytes payload)
1589        let payload: Vec<u8> = (0..32).collect();
1590        server_stream.write_all(&hdlc::frame(&payload)).unwrap();
1591
1592        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1593        match event {
1594            Event::Frame { interface_id, data } => {
1595                assert_eq!(interface_id, InterfaceId(9100));
1596                assert_eq!(data, payload);
1597            }
1598            other => panic!("expected Frame, got {:?}", other),
1599        }
1600    }
1601
1602    #[test]
1603    fn backbone_client_send_frame() {
1604        let port = find_free_port();
1605        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1606        let (tx, _rx) = crate::event::channel();
1607
1608        let config = make_client_config(port, 9200);
1609        let mut writer = start_client(config, tx).unwrap();
1610
1611        let (mut server_stream, _) = listener.accept().unwrap();
1612        server_stream
1613            .set_read_timeout(Some(Duration::from_secs(2)))
1614            .unwrap();
1615
1616        let payload: Vec<u8> = (0..24).collect();
1617        writer.send_frame(&payload).unwrap();
1618
1619        let mut buf = [0u8; 256];
1620        let n = server_stream.read(&mut buf).unwrap();
1621        let expected = hdlc::frame(&payload);
1622        assert_eq!(&buf[..n], &expected[..]);
1623    }
1624
1625    #[test]
1626    fn backbone_max_connections_rejects_excess() {
1627        let port = find_free_port();
1628        let (tx, rx) = crate::event::channel();
1629        let next_id = Arc::new(AtomicU64::new(8800));
1630
1631        let config = make_server_config(
1632            port,
1633            88,
1634            Some(2),
1635            None,
1636            None,
1637            BackboneAbuseConfig::default(),
1638        );
1639
1640        start(config, tx, next_id).unwrap();
1641        thread::sleep(Duration::from_millis(50));
1642
1643        // Connect two clients (at limit)
1644        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1645        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1646
1647        // Drain both InterfaceUp events
1648        for _ in 0..2 {
1649            let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1650            assert!(matches!(event, Event::InterfaceUp(_, _, _)));
1651        }
1652
1653        // Third connection should be accepted at TCP level but immediately dropped
1654        let client3 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1655        client3
1656            .set_read_timeout(Some(Duration::from_millis(500)))
1657            .unwrap();
1658
1659        // Give server time to reject
1660        thread::sleep(Duration::from_millis(100));
1661
1662        // Should NOT receive a third InterfaceUp
1663        let result = recv_non_peer_event(&rx, Duration::from_millis(500));
1664        assert!(
1665            result.is_err(),
1666            "expected no InterfaceUp for rejected connection, got {:?}",
1667            result
1668        );
1669    }
1670
1671    #[test]
1672    fn backbone_max_connections_allows_after_disconnect() {
1673        let port = find_free_port();
1674        let (tx, rx) = crate::event::channel();
1675        let next_id = Arc::new(AtomicU64::new(8900));
1676
1677        let config = make_server_config(
1678            port,
1679            89,
1680            Some(1),
1681            None,
1682            None,
1683            BackboneAbuseConfig::default(),
1684        );
1685
1686        start(config, tx, next_id).unwrap();
1687        thread::sleep(Duration::from_millis(50));
1688
1689        // Connect first client
1690        let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1691        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1692        assert!(matches!(event, Event::InterfaceUp(_, _, _)));
1693
1694        // Disconnect first client
1695        drop(client1);
1696
1697        // Wait for InterfaceDown
1698        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1699        assert!(matches!(event, Event::InterfaceDown(_)));
1700
1701        // Now a new connection should be accepted
1702        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1703        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1704        assert!(
1705            matches!(event, Event::InterfaceUp(_, _, _)),
1706            "expected InterfaceUp after slot freed, got {:?}",
1707            event
1708        );
1709    }
1710
1711    #[test]
1712    fn backbone_client_reconnect() {
1713        let port = find_free_port();
1714        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1715        listener.set_nonblocking(false).unwrap();
1716        let (tx, rx) = crate::event::channel();
1717
1718        let config = make_client_config(port, 9300);
1719        let _writer = start_client(config, tx).unwrap();
1720
1721        // Accept first connection and immediately close it
1722        let (server_stream, _) = listener.accept().unwrap();
1723
1724        // Drain InterfaceUp
1725        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1726
1727        drop(server_stream);
1728
1729        // Should get InterfaceDown
1730        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1731        assert!(matches!(event, Event::InterfaceDown(InterfaceId(9300))));
1732
1733        // Accept the reconnection
1734        let _server_stream2 = listener.accept().unwrap();
1735
1736        // Should get InterfaceUp again
1737        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1738        assert!(matches!(event, Event::InterfaceUp(InterfaceId(9300), _, _)));
1739    }
1740
1741    #[test]
1742    fn backbone_idle_timeout_disconnects_silent_client() {
1743        let port = find_free_port();
1744        let (tx, rx) = crate::event::channel();
1745        let next_id = Arc::new(AtomicU64::new(9400));
1746
1747        let config = make_server_config(
1748            port,
1749            94,
1750            None,
1751            Some(Duration::from_millis(150)),
1752            None,
1753            BackboneAbuseConfig::default(),
1754        );
1755
1756        start(config, tx, next_id).unwrap();
1757        thread::sleep(Duration::from_millis(50));
1758
1759        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1760
1761        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1762        let client_id = match event {
1763            Event::InterfaceUp(id, _, _) => id,
1764            other => panic!("expected InterfaceUp, got {:?}", other),
1765        };
1766
1767        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1768        assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1769    }
1770
1771    #[test]
1772    fn backbone_idle_timeout_ignores_client_after_data() {
1773        let port = find_free_port();
1774        let (tx, rx) = crate::event::channel();
1775        let next_id = Arc::new(AtomicU64::new(9500));
1776
1777        let config = make_server_config(
1778            port,
1779            95,
1780            None,
1781            Some(Duration::from_millis(200)),
1782            None,
1783            BackboneAbuseConfig::default(),
1784        );
1785
1786        start(config, tx, next_id).unwrap();
1787        thread::sleep(Duration::from_millis(50));
1788
1789        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1790
1791        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1792        let client_id = match event {
1793            Event::InterfaceUp(id, _, _) => id,
1794            other => panic!("expected InterfaceUp, got {:?}", other),
1795        };
1796
1797        client.write_all(&hdlc::frame(&[1u8; 24])).unwrap();
1798
1799        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1800        match event {
1801            Event::Frame { interface_id, data } => {
1802                assert_eq!(interface_id, client_id);
1803                assert_eq!(data, vec![1u8; 24]);
1804            }
1805            other => panic!("expected Frame, got {:?}", other),
1806        }
1807
1808        let result = recv_non_peer_event(&rx, Duration::from_millis(500));
1809        assert!(
1810            result.is_err(),
1811            "expected no InterfaceDown after client sent data, got {:?}",
1812            result
1813        );
1814    }
1815
1816    #[test]
1817    fn backbone_runtime_idle_timeout_updates_live() {
1818        let port = find_free_port();
1819        let (tx, rx) = crate::event::channel();
1820        let next_id = Arc::new(AtomicU64::new(9650));
1821
1822        let config = make_server_config(port, 97, None, None, None, BackboneAbuseConfig::default());
1823        let runtime = Arc::clone(&config.runtime);
1824
1825        start(config, tx, next_id).unwrap();
1826        thread::sleep(Duration::from_millis(50));
1827
1828        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1829        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1830        let client_id = match event {
1831            Event::InterfaceUp(id, _, _) => id,
1832            other => panic!("expected InterfaceUp, got {:?}", other),
1833        };
1834
1835        {
1836            let mut runtime = runtime.lock().unwrap();
1837            runtime.idle_timeout = Some(Duration::from_millis(150));
1838        }
1839
1840        let event = recv_non_peer_event(&rx, Duration::from_secs(4)).unwrap();
1841        assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1842    }
1843
1844    #[test]
1845    fn backbone_write_stall_timeout_disconnects_unwritable_client() {
1846        let port = find_free_port();
1847        let (tx, rx) = crate::event::channel();
1848        let next_id = Arc::new(AtomicU64::new(9660));
1849
1850        let config = make_server_config(
1851            port,
1852            98,
1853            None,
1854            None,
1855            Some(Duration::from_millis(50)),
1856            BackboneAbuseConfig::default(),
1857        );
1858
1859        start(config, tx, next_id).unwrap();
1860        thread::sleep(Duration::from_millis(50));
1861
1862        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1863        client
1864            .set_read_timeout(Some(Duration::from_millis(100)))
1865            .unwrap();
1866        let sock = SockRef::from(&client);
1867        sock.set_recv_buffer_size(4096).ok();
1868
1869        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1870        let (client_id, mut writer) = match event {
1871            Event::InterfaceUp(id, Some(writer), _) => (id, writer),
1872            other => panic!("expected InterfaceUp with writer, got {:?}", other),
1873        };
1874
1875        let payload = vec![0x55; 512 * 1024];
1876        let deadline = Instant::now() + Duration::from_secs(3);
1877        let mut stalled = false;
1878        while Instant::now() < deadline {
1879            match writer.send_frame(&payload) {
1880                Ok(()) => {}
1881                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1882                    thread::sleep(Duration::from_millis(10));
1883                }
1884                Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {
1885                    stalled = true;
1886                    break;
1887                }
1888                Err(e) => panic!("unexpected send error: {}", e),
1889            }
1890        }
1891
1892        assert!(stalled, "expected writer to time out on persistent stall");
1893        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1894        assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1895    }
1896
1897    /// Drain events matching a predicate, return the first match.
1898    fn wait_for<F>(rx: &mpsc::Receiver<Event>, timeout: Duration, mut pred: F) -> Option<Event>
1899    where
1900        F: FnMut(&Event) -> bool,
1901    {
1902        let deadline = Instant::now() + timeout;
1903        loop {
1904            let remaining = deadline.saturating_duration_since(Instant::now());
1905            if remaining.is_zero() {
1906                return None;
1907            }
1908            match rx.recv_timeout(remaining) {
1909                Ok(event) if pred(&event) => return Some(event),
1910                Ok(_) => continue,
1911                Err(_) => return None,
1912            }
1913        }
1914    }
1915
1916    #[test]
1917    fn backbone_write_stall_emits_peer_events() {
1918        let port = find_free_port();
1919        let (tx, rx) = crate::event::channel();
1920        let next_id = Arc::new(AtomicU64::new(9700));
1921
1922        let config = make_server_config(
1923            port,
1924            97,
1925            None,
1926            None,
1927            Some(Duration::from_millis(50)), // 50ms stall timeout
1928            BackboneAbuseConfig::default(),
1929        );
1930
1931        start(config, tx, next_id).unwrap();
1932        thread::sleep(Duration::from_millis(50));
1933
1934        // Connect a client that won't read (will cause write stall)
1935        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1936        client
1937            .set_read_timeout(Some(Duration::from_millis(100)))
1938            .unwrap();
1939        let sock = SockRef::from(&client);
1940        sock.set_recv_buffer_size(4096).ok();
1941
1942        // Wait for InterfaceUp and grab writer
1943        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1944        let mut writer = match event {
1945            Event::InterfaceUp(_, Some(w), _) => w,
1946            other => panic!("expected InterfaceUp with writer, got {:?}", other),
1947        };
1948
1949        // Flood until stall
1950        let payload = vec![0x55; 512 * 1024];
1951        let deadline = Instant::now() + Duration::from_secs(3);
1952        while Instant::now() < deadline {
1953            match writer.send_frame(&payload) {
1954                Ok(()) | Err(_) => {
1955                    if Instant::now() + Duration::from_millis(10) > deadline {
1956                        break;
1957                    }
1958                    thread::sleep(Duration::from_millis(10));
1959                }
1960            }
1961        }
1962
1963        // Should see BackbonePeerWriteStall event
1964        let stall_event = wait_for(&rx, Duration::from_secs(3), |e| {
1965            matches!(e, Event::BackbonePeerWriteStall { .. })
1966        });
1967        assert!(
1968            stall_event.is_some(),
1969            "expected BackbonePeerWriteStall event"
1970        );
1971    }
1972
1973    #[test]
1974    fn backbone_blacklisted_peer_rejected_on_connect() {
1975        let port = find_free_port();
1976        let (tx, rx) = crate::event::channel();
1977        let next_id = Arc::new(AtomicU64::new(9800));
1978
1979        let config = make_server_config(port, 98, None, None, None, BackboneAbuseConfig::default());
1980        let peer_state = config.peer_state.clone();
1981
1982        start(config, tx.clone(), next_id).unwrap();
1983        thread::sleep(Duration::from_millis(50));
1984
1985        // First connection should succeed
1986        let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1987        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1988        assert!(
1989            matches!(event, Event::InterfaceUp(_, _, _)),
1990            "first connection should succeed"
1991        );
1992        drop(client1);
1993
1994        // Drain disconnect events
1995        thread::sleep(Duration::from_millis(100));
1996        while rx.try_recv().is_ok() {}
1997
1998        // Blacklist 127.0.0.1 via the peer monitor
1999        peer_state.lock().unwrap().blacklist(
2000            "127.0.0.1".parse().unwrap(),
2001            Duration::from_secs(60),
2002            "test blacklist".into(),
2003        );
2004
2005        // Second connection from same IP should be rejected (no InterfaceUp)
2006        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
2007        // Give poll loop time to reject
2008        thread::sleep(Duration::from_millis(200));
2009
2010        // Should NOT get an InterfaceUp — connection should have been rejected
2011        let event = rx.try_recv();
2012        match event {
2013            Ok(Event::InterfaceUp(_, _, _)) => {
2014                panic!("blacklisted peer should not get InterfaceUp")
2015            }
2016            _ => {} // Expected: no InterfaceUp
2017        }
2018    }
2019
2020    #[test]
2021    fn backbone_parse_config_reads_abuse_settings() {
2022        let factory = BackboneInterfaceFactory;
2023        let mut params = HashMap::new();
2024        params.insert("listen_ip".into(), "127.0.0.1".into());
2025        params.insert("listen_port".into(), "4242".into());
2026        params.insert("idle_timeout".into(), "15".into());
2027        params.insert("write_stall_timeout".into(), "45".into());
2028        params.insert("max_penalty_duration".into(), "3600".into());
2029
2030        let config = factory
2031            .parse_config("test-backbone", InterfaceId(97), &params)
2032            .unwrap();
2033        let mode = *config.into_any().downcast::<BackboneMode>().unwrap();
2034
2035        match mode {
2036            BackboneMode::Server(config) => {
2037                assert_eq!(config.listen_ip, "127.0.0.1");
2038                assert_eq!(config.listen_port, 4242);
2039                assert_eq!(config.idle_timeout, Some(Duration::from_secs(15)));
2040                assert_eq!(config.write_stall_timeout, Some(Duration::from_secs(45)));
2041                assert_eq!(
2042                    config.abuse.max_penalty_duration,
2043                    Some(Duration::from_secs(3600))
2044                );
2045            }
2046            BackboneMode::Client(_) => panic!("expected server config"),
2047        }
2048    }
2049}