Skip to main content

rns_net/interface/
backbone.rs

1//! Backbone TCP mesh interface using cross-platform polling.
2//!
3//! Server mode: listens on a TCP port, accepts peer connections, spawns
4//! dynamic per-peer interfaces. Uses a single poll thread to multiplex
5//! all client sockets. HDLC framing for packet boundaries.
6//!
7//! Client mode: connects to a remote backbone server, single TCP connection
8//! with HDLC framing. Reconnects on disconnect.
9//!
10//! Matches Python `BackboneInterface.py`.
11
12use std::collections::HashMap;
13use std::hash::{BuildHasher, Hasher};
14use std::io::{self, Read, Write};
15use std::net::{IpAddr, Shutdown, TcpListener, TcpStream, ToSocketAddrs};
16use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
17use std::sync::{Arc, Mutex};
18use std::thread;
19use std::time::{Duration, Instant};
20
21use polling::{Event as PollEvent, Events, Poller};
22use socket2::{SockRef, TcpKeepalive};
23
24use rns_core::constants;
25use rns_core::transport::types::{IngressControlConfig, InterfaceId, InterfaceInfo};
26
27use crate::event::{Event, EventSender};
28use crate::hdlc;
29use crate::interface::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult, Writer};
30use crate::BackbonePeerStateEntry;
31
32/// HW_MTU: 1 MB (matches Python BackboneInterface.HW_MTU)
33#[allow(dead_code)]
34const HW_MTU: usize = 1_048_576;
35
36/// Configuration for a backbone interface.
37#[derive(Debug, Clone)]
38pub struct BackboneConfig {
39    pub name: String,
40    pub listen_ip: String,
41    pub listen_port: u16,
42    pub interface_id: InterfaceId,
43    pub max_connections: Option<usize>,
44    pub idle_timeout: Option<Duration>,
45    pub write_stall_timeout: Option<Duration>,
46    pub abuse: BackboneAbuseConfig,
47    pub ingress_control: IngressControlConfig,
48    pub runtime: Arc<Mutex<BackboneServerRuntime>>,
49    pub peer_state: Arc<Mutex<BackbonePeerMonitor>>,
50}
51
52/// Configurable behavior-based abuse detection for inbound peers.
53#[derive(Debug, Clone, Default)]
54pub struct BackboneAbuseConfig {
55    pub max_penalty_duration: Option<Duration>,
56}
57
58/// Live runtime state for a backbone server interface.
59#[derive(Debug, Clone)]
60pub struct BackboneServerRuntime {
61    pub max_connections: Option<usize>,
62    pub idle_timeout: Option<Duration>,
63    pub write_stall_timeout: Option<Duration>,
64    pub abuse: BackboneAbuseConfig,
65}
66
67impl BackboneServerRuntime {
68    pub fn from_config(config: &BackboneConfig) -> Self {
69        Self {
70            max_connections: config.max_connections,
71            idle_timeout: config.idle_timeout,
72            write_stall_timeout: config.write_stall_timeout,
73            abuse: config.abuse.clone(),
74        }
75    }
76}
77
78#[derive(Debug, Clone)]
79pub struct BackboneRuntimeConfigHandle {
80    pub interface_name: String,
81    pub runtime: Arc<Mutex<BackboneServerRuntime>>,
82    pub startup: BackboneServerRuntime,
83}
84
85#[derive(Debug, Clone)]
86pub struct BackbonePeerStateHandle {
87    pub interface_id: InterfaceId,
88    pub interface_name: String,
89    pub peer_state: Arc<Mutex<BackbonePeerMonitor>>,
90}
91
92impl Default for BackboneConfig {
93    fn default() -> Self {
94        let mut config = BackboneConfig {
95            name: String::new(),
96            listen_ip: "0.0.0.0".into(),
97            listen_port: 0,
98            interface_id: InterfaceId(0),
99            max_connections: None,
100            idle_timeout: None,
101            write_stall_timeout: None,
102            abuse: BackboneAbuseConfig::default(),
103            ingress_control: IngressControlConfig::enabled(),
104            runtime: Arc::new(Mutex::new(BackboneServerRuntime {
105                max_connections: None,
106                idle_timeout: None,
107                write_stall_timeout: None,
108                abuse: BackboneAbuseConfig::default(),
109            })),
110            peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
111        };
112        let startup = BackboneServerRuntime::from_config(&config);
113        config.runtime = Arc::new(Mutex::new(startup));
114        config
115    }
116}
117
118/// Maximum pending buffer size per client (512 KB). Clients exceeding this are
119/// disconnected to prevent unbounded memory growth from slow readers.
120const MAX_PENDING_BYTES: usize = 512 * 1024;
121
122/// Writer that sends HDLC-framed data over a cloned TCP stream (server mode).
123struct BackboneWriter {
124    stream: TcpStream,
125    runtime: Arc<Mutex<BackboneServerRuntime>>,
126    interface_name: String,
127    interface_id: InterfaceId,
128    event_tx: EventSender,
129    pending: Vec<u8>,
130    stall_started: Option<Instant>,
131    disconnect_notified: bool,
132    write_stall_flag: Arc<AtomicBool>,
133}
134
135impl Writer for BackboneWriter {
136    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
137        let write_stall_timeout = self.runtime.lock().unwrap().write_stall_timeout;
138        if !self.pending.is_empty() {
139            self.flush_pending(write_stall_timeout)?;
140            if !self.pending.is_empty() {
141                return Err(io::Error::new(
142                    io::ErrorKind::WouldBlock,
143                    "backbone writer still stalled",
144                ));
145            }
146        }
147
148        let frame = hdlc::frame(data);
149        self.write_buffer(&frame, write_stall_timeout)
150    }
151}
152
153impl BackboneWriter {
154    fn write_buffer(
155        &mut self,
156        data: &[u8],
157        write_stall_timeout: Option<Duration>,
158    ) -> io::Result<()> {
159        let mut written = 0usize;
160        while written < data.len() {
161            match self.stream.write(&data[written..]) {
162                Ok(0) => {
163                    return Err(io::Error::new(
164                        io::ErrorKind::WriteZero,
165                        "backbone writer wrote zero bytes",
166                    ))
167                }
168                Ok(n) => {
169                    written += n;
170                    self.stall_started = None;
171                }
172                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
173                    let now = Instant::now();
174                    let started = self.stall_started.get_or_insert(now);
175                    if let Some(timeout) = write_stall_timeout {
176                        if now.duration_since(*started) >= timeout {
177                            return Err(self.disconnect_for_write_stall(timeout));
178                        }
179                    }
180                    if self.pending.len() + data[written..].len() > MAX_PENDING_BYTES {
181                        return Err(self.disconnect_for_write_stall(
182                            write_stall_timeout.unwrap_or(Duration::from_secs(30)),
183                        ));
184                    }
185                    self.pending.extend_from_slice(&data[written..]);
186                    return Err(io::Error::new(
187                        io::ErrorKind::WouldBlock,
188                        "backbone writer would block",
189                    ));
190                }
191                Err(e) => return Err(e),
192            }
193        }
194        Ok(())
195    }
196
197    fn flush_pending(&mut self, write_stall_timeout: Option<Duration>) -> io::Result<()> {
198        if self.pending.is_empty() {
199            return Ok(());
200        }
201
202        let pending = std::mem::take(&mut self.pending);
203        match self.write_buffer(&pending, write_stall_timeout) {
204            Ok(()) => Ok(()),
205            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(()),
206            Err(e) => Err(e),
207        }
208    }
209
210    fn disconnect_for_write_stall(&mut self, timeout: Duration) -> io::Error {
211        if !self.disconnect_notified {
212            log::warn!(
213                "[{}] backbone client {} disconnected due to write stall timeout ({:?})",
214                self.interface_name,
215                self.interface_id.0,
216                timeout
217            );
218            self.write_stall_flag.store(true, Ordering::Relaxed);
219            let _ = self.stream.shutdown(Shutdown::Both);
220            let _ = self.event_tx.send(Event::InterfaceDown(self.interface_id));
221            self.disconnect_notified = true;
222        }
223        io::Error::new(
224            io::ErrorKind::TimedOut,
225            format!("backbone writer stalled for {:?}", timeout),
226        )
227    }
228}
229
230/// Start a backbone interface. Binds TCP listener, spawns poll thread.
231pub fn start(config: BackboneConfig, tx: EventSender, next_id: Arc<AtomicU64>) -> io::Result<()> {
232    let addr = format!("{}:{}", config.listen_ip, config.listen_port);
233    let listener = TcpListener::bind(&addr)?;
234    listener.set_nonblocking(true)?;
235
236    log::info!(
237        "[{}] backbone server listening on {}",
238        config.name,
239        listener.local_addr().unwrap_or(addr.parse().unwrap())
240    );
241
242    let name = config.name.clone();
243    let server_interface_id = config.interface_id;
244    let runtime = Arc::clone(&config.runtime);
245    let peer_state = Arc::clone(&config.peer_state);
246    let ingress_control = config.ingress_control;
247    thread::Builder::new()
248        .name(format!("backbone-poll-{}", config.interface_id.0))
249        .spawn(move || {
250            if let Err(e) = poll_loop(
251                listener,
252                name,
253                server_interface_id,
254                tx,
255                next_id,
256                runtime,
257                peer_state,
258                ingress_control,
259            ) {
260                log::error!("backbone poll loop error: {}", e);
261            }
262        })?;
263
264    Ok(())
265}
266
267/// Per-client tracking state.
268struct ClientState {
269    id: InterfaceId,
270    peer_ip: IpAddr,
271    peer_port: u16,
272    stream: TcpStream,
273    decoder: hdlc::Decoder,
274    connected_at: Instant,
275    has_received_data: bool,
276    write_stall_flag: Arc<AtomicBool>,
277}
278
279#[derive(Debug, Clone)]
280struct PeerBehaviorState {
281    blacklisted_until: Option<Instant>,
282    blacklist_reason: Option<String>,
283    reject_count: u64,
284    connected_count: usize,
285}
286
287impl PeerBehaviorState {
288    fn new() -> Self {
289        Self {
290            blacklisted_until: None,
291            blacklist_reason: None,
292            reject_count: 0,
293            connected_count: 0,
294        }
295    }
296}
297
298#[derive(Debug, Clone, Default)]
299pub struct BackbonePeerMonitor {
300    peers: HashMap<IpAddr, PeerBehaviorState>,
301}
302
303impl BackbonePeerMonitor {
304    pub fn new() -> Self {
305        Self {
306            peers: HashMap::new(),
307        }
308    }
309
310    fn upsert_snapshot(&mut self, peers: &HashMap<IpAddr, PeerBehaviorState>) {
311        let mut merged = self.peers.clone();
312
313        for (peer_ip, state) in peers {
314            let entry = merged
315                .entry(*peer_ip)
316                .or_insert_with(PeerBehaviorState::new);
317            entry.connected_count = state.connected_count;
318            entry.reject_count = state.reject_count;
319            if state.blacklisted_until.is_some() {
320                entry.blacklisted_until = state.blacklisted_until;
321                entry.blacklist_reason = state.blacklist_reason.clone();
322            }
323        }
324
325        merged.retain(|peer_ip, state| {
326            peers.contains_key(peer_ip)
327                || state.blacklisted_until.is_some()
328                || state.reject_count > 0
329        });
330        self.peers = merged;
331    }
332
333    fn sync_into(&self, peers: &mut HashMap<IpAddr, PeerBehaviorState>) {
334        for (peer_ip, state) in &self.peers {
335            let entry = peers.entry(*peer_ip).or_insert_with(PeerBehaviorState::new);
336            entry.blacklisted_until = state.blacklisted_until;
337            entry.blacklist_reason = state.blacklist_reason.clone();
338            entry.reject_count = state.reject_count;
339        }
340
341        peers.retain(|peer_ip, state| {
342            if state.connected_count > 0 {
343                return true;
344            }
345            self.peers.contains_key(peer_ip)
346        });
347    }
348
349    pub fn list(&self, interface_name: &str) -> Vec<BackbonePeerStateEntry> {
350        let now = Instant::now();
351        let mut entries: Vec<BackbonePeerStateEntry> = self
352            .peers
353            .iter()
354            .map(|(peer_ip, state)| BackbonePeerStateEntry {
355                interface_name: interface_name.to_string(),
356                peer_ip: *peer_ip,
357                connected_count: state.connected_count,
358                blacklisted_remaining_secs: state
359                    .blacklisted_until
360                    .and_then(|until| (until > now).then(|| (until - now).as_secs_f64())),
361                blacklist_reason: state.blacklist_reason.clone(),
362                reject_count: state.reject_count,
363            })
364            .collect();
365        entries.sort_by(|a, b| a.peer_ip.cmp(&b.peer_ip));
366        entries
367    }
368
369    pub fn clear(&mut self, peer_ip: IpAddr) -> bool {
370        self.peers.remove(&peer_ip).is_some()
371    }
372
373    pub fn blacklist(&mut self, peer_ip: IpAddr, duration: Duration, reason: String) -> bool {
374        let state = self
375            .peers
376            .entry(peer_ip)
377            .or_insert_with(PeerBehaviorState::new);
378        state.blacklisted_until = Some(Instant::now() + duration);
379        state.blacklist_reason = Some(reason);
380        true
381    }
382
383    #[cfg(test)]
384    pub fn seed_entry(&mut self, entry: BackbonePeerStateEntry) {
385        let mut state = PeerBehaviorState::new();
386        state.connected_count = entry.connected_count;
387        state.reject_count = entry.reject_count;
388        state.blacklist_reason = entry.blacklist_reason;
389        if let Some(remaining) = entry.blacklisted_remaining_secs {
390            state.blacklisted_until = Some(Instant::now() + Duration::from_secs_f64(remaining));
391        }
392        self.peers.insert(entry.peer_ip, state);
393    }
394}
395
396#[derive(Clone, Copy)]
397enum DisconnectReason {
398    RemoteClosed,
399    IdleTimeout,
400    WriteStall,
401}
402
403/// Main poll event loop.
404fn poll_loop(
405    listener: TcpListener,
406    name: String,
407    server_interface_id: InterfaceId,
408    tx: EventSender,
409    next_id: Arc<AtomicU64>,
410    runtime: Arc<Mutex<BackboneServerRuntime>>,
411    peer_state: Arc<Mutex<BackbonePeerMonitor>>,
412    ingress_control: IngressControlConfig,
413) -> io::Result<()> {
414    let poller = Poller::new()?;
415
416    const LISTENER_KEY: usize = 0;
417
418    // SAFETY: listener outlives its registration in the poller.
419    unsafe { poller.add(&listener, PollEvent::readable(LISTENER_KEY))? };
420
421    let mut clients: HashMap<usize, ClientState> = HashMap::new();
422    let mut peers: HashMap<IpAddr, PeerBehaviorState> = HashMap::new();
423    let mut events = Events::new();
424    let mut next_key: usize = 1;
425
426    loop {
427        let runtime_snapshot = runtime.lock().unwrap().clone();
428        let max_connections = runtime_snapshot.max_connections;
429        let idle_timeout = runtime_snapshot.idle_timeout;
430        cleanup_peer_state(&mut peers);
431        {
432            let mut monitor = peer_state.lock().unwrap();
433            monitor.sync_into(&mut peers);
434            monitor.upsert_snapshot(&peers);
435        }
436
437        events.clear();
438        poller.wait(&mut events, Some(Duration::from_secs(1)))?;
439
440        for ev in events.iter() {
441            if ev.key == LISTENER_KEY {
442                // Accept new connections
443                loop {
444                    match listener.accept() {
445                        Ok((stream, peer_addr)) => {
446                            let peer_ip = peer_addr.ip();
447                            let peer_port = peer_addr.port();
448
449                            if is_ip_blacklisted(&mut peers, peer_ip) {
450                                if let Some(state) = peers.get_mut(&peer_ip) {
451                                    state.reject_count = state.reject_count.saturating_add(1);
452                                }
453                                peer_state.lock().unwrap().upsert_snapshot(&peers);
454                                log::debug!("[{}] rejecting blacklisted peer {}", name, peer_addr);
455                                drop(stream);
456                                continue;
457                            }
458
459                            if let Some(max) = max_connections {
460                                if clients.len() >= max {
461                                    log::warn!(
462                                        "[{}] max connections ({}) reached, rejecting {}",
463                                        name,
464                                        max,
465                                        peer_addr
466                                    );
467                                    drop(stream);
468                                    continue;
469                                }
470                            }
471
472                            stream.set_nonblocking(true).ok();
473                            stream.set_nodelay(true).ok();
474                            set_tcp_keepalive(&stream).ok();
475
476                            // Prevent SIGPIPE on macOS when writing to broken pipes
477                            #[cfg(target_os = "macos")]
478                            {
479                                let sock = SockRef::from(&stream);
480                                sock.set_nosigpipe(true).ok();
481                            }
482
483                            let key = next_key;
484                            next_key += 1;
485                            let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
486
487                            log::info!(
488                                "[{}] backbone client connected: {} → id {}",
489                                name,
490                                peer_addr,
491                                client_id.0
492                            );
493
494                            // Register client with poller
495                            // SAFETY: stream is stored in ClientState and outlives registration.
496                            if let Err(e) = unsafe { poller.add(&stream, PollEvent::readable(key)) }
497                            {
498                                log::warn!("[{}] failed to add client to poller: {}", name, e);
499                                continue; // stream drops, closing socket
500                            }
501
502                            // Create writer via try_clone (cross-platform dup)
503                            let writer_stream = match stream.try_clone() {
504                                Ok(s) => s,
505                                Err(e) => {
506                                    log::warn!("[{}] failed to clone client stream: {}", name, e);
507                                    let _ = poller.delete(&stream);
508                                    continue; // stream drops
509                                }
510                            };
511                            let write_stall_flag = Arc::new(AtomicBool::new(false));
512                            let writer: Box<dyn Writer> = Box::new(BackboneWriter {
513                                stream: writer_stream,
514                                runtime: Arc::clone(&runtime),
515                                interface_name: name.clone(),
516                                interface_id: client_id,
517                                event_tx: tx.clone(),
518                                pending: Vec::new(),
519                                stall_started: None,
520                                disconnect_notified: false,
521                                write_stall_flag: Arc::clone(&write_stall_flag),
522                            });
523
524                            clients.insert(
525                                key,
526                                ClientState {
527                                    id: client_id,
528                                    peer_ip,
529                                    peer_port,
530                                    stream,
531                                    decoder: hdlc::Decoder::new(),
532                                    connected_at: Instant::now(),
533                                    has_received_data: false,
534                                    write_stall_flag,
535                                },
536                            );
537                            peers
538                                .entry(peer_ip)
539                                .or_insert_with(PeerBehaviorState::new)
540                                .connected_count += 1;
541                            peer_state.lock().unwrap().upsert_snapshot(&peers);
542                            let _ = tx.send(Event::BackbonePeerConnected {
543                                server_interface_id,
544                                peer_interface_id: client_id,
545                                peer_ip,
546                                peer_port,
547                            });
548
549                            let info = InterfaceInfo {
550                                id: client_id,
551                                name: format!("BackboneInterface/{}", client_id.0),
552                                mode: constants::MODE_FULL,
553                                out_capable: true,
554                                in_capable: true,
555                                bitrate: Some(1_000_000_000), // 1 Gbps guess
556                                announce_rate_target: None,
557                                announce_rate_grace: 0,
558                                announce_rate_penalty: 0.0,
559                                announce_cap: constants::ANNOUNCE_CAP,
560                                is_local_client: false,
561                                wants_tunnel: false,
562                                tunnel_id: None,
563                                mtu: 65535,
564                                ia_freq: 0.0,
565                                started: 0.0,
566                                ingress_control,
567                            };
568
569                            if tx
570                                .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
571                                .is_err()
572                            {
573                                // Driver shut down
574                                cleanup(&poller, &clients, &listener);
575                                return Ok(());
576                            }
577                        }
578                        Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
579                        Err(e) => {
580                            log::warn!("[{}] accept error: {}", name, e);
581                            break;
582                        }
583                    }
584                }
585                // Re-arm listener (oneshot semantics)
586                poller.modify(&listener, PollEvent::readable(LISTENER_KEY))?;
587            } else if clients.contains_key(&ev.key) {
588                let key = ev.key;
589                let mut should_remove = false;
590                let mut client_id = InterfaceId(0);
591
592                let mut buf = [0u8; 4096];
593                let read_result = {
594                    let client = clients.get_mut(&key).unwrap();
595                    client.stream.read(&mut buf)
596                };
597
598                match read_result {
599                    Ok(0) | Err(_) => {
600                        if let Some(c) = clients.get(&key) {
601                            client_id = c.id;
602                        }
603                        should_remove = true;
604                    }
605                    Ok(n) => {
606                        let client = clients.get_mut(&key).unwrap();
607                        client_id = client.id;
608                        client.has_received_data = true;
609                        for frame in client.decoder.feed(&buf[..n]) {
610                            if tx
611                                .send(Event::Frame {
612                                    interface_id: client_id,
613                                    data: frame,
614                                })
615                                .is_err()
616                            {
617                                cleanup(&poller, &clients, &listener);
618                                return Ok(());
619                            }
620                        }
621                    }
622                }
623
624                if should_remove {
625                    let reason = if clients
626                        .get(&key)
627                        .is_some_and(|c| c.write_stall_flag.load(Ordering::Relaxed))
628                    {
629                        DisconnectReason::WriteStall
630                    } else {
631                        DisconnectReason::RemoteClosed
632                    };
633                    disconnect_client(
634                        &poller,
635                        &mut clients,
636                        &mut peers,
637                        &name,
638                        server_interface_id,
639                        &tx,
640                        &peer_state,
641                        key,
642                        client_id,
643                        reason,
644                    );
645                } else if let Some(client) = clients.get(&key) {
646                    // Re-arm client (oneshot semantics)
647                    poller.modify(&client.stream, PollEvent::readable(key))?;
648                }
649            }
650        }
651
652        if let Some(timeout) = idle_timeout {
653            let now = Instant::now();
654            let timed_out: Vec<(usize, InterfaceId)> = clients
655                .iter()
656                .filter_map(|(&key, client)| {
657                    if client.has_received_data || now.duration_since(client.connected_at) < timeout
658                    {
659                        None
660                    } else {
661                        Some((key, client.id))
662                    }
663                })
664                .collect();
665
666            for (key, client_id) in timed_out {
667                disconnect_client(
668                    &poller,
669                    &mut clients,
670                    &mut peers,
671                    &name,
672                    server_interface_id,
673                    &tx,
674                    &peer_state,
675                    key,
676                    client_id,
677                    DisconnectReason::IdleTimeout,
678                );
679            }
680        }
681    }
682}
683
684fn cleanup_peer_state(peers: &mut HashMap<IpAddr, PeerBehaviorState>) {
685    let now = Instant::now();
686    peers.retain(|_, state| {
687        if matches!(state.blacklisted_until, Some(until) if now >= until) {
688            state.blacklisted_until = None;
689            state.blacklist_reason = None;
690        }
691        state.blacklisted_until.is_some() || state.connected_count > 0 || state.reject_count > 0
692    });
693}
694
695fn is_ip_blacklisted(peers: &mut HashMap<IpAddr, PeerBehaviorState>, peer_ip: IpAddr) -> bool {
696    let now = Instant::now();
697    if let Some(state) = peers.get_mut(&peer_ip) {
698        if let Some(until) = state.blacklisted_until {
699            if now < until {
700                return true;
701            }
702            state.blacklisted_until = None;
703        }
704    }
705    false
706}
707
708fn disconnect_client(
709    poller: &Poller,
710    clients: &mut HashMap<usize, ClientState>,
711    peers: &mut HashMap<IpAddr, PeerBehaviorState>,
712    name: &str,
713    server_interface_id: InterfaceId,
714    tx: &EventSender,
715    peer_state: &Arc<Mutex<BackbonePeerMonitor>>,
716    key: usize,
717    client_id: InterfaceId,
718    reason: DisconnectReason,
719) {
720    let Some(client) = clients.remove(&key) else {
721        return;
722    };
723
724    match reason {
725        DisconnectReason::RemoteClosed => {
726            log::info!("[{}] backbone client {} disconnected", name, client_id.0);
727        }
728        DisconnectReason::IdleTimeout => {
729            log::info!(
730                "[{}] backbone client {} disconnected due to idle timeout",
731                name,
732                client_id.0
733            );
734        }
735        DisconnectReason::WriteStall => {
736            // Already logged by BackboneWriter::disconnect_for_write_stall
737        }
738    }
739
740    let _ = poller.delete(&client.stream);
741    // client.stream closes on drop
742    let connected_for = client.connected_at.elapsed();
743    let _ = tx.send(Event::BackbonePeerDisconnected {
744        server_interface_id,
745        peer_interface_id: client.id,
746        peer_ip: client.peer_ip,
747        peer_port: client.peer_port,
748        connected_for,
749        had_received_data: client.has_received_data,
750    });
751    match reason {
752        DisconnectReason::IdleTimeout => {
753            let _ = tx.send(Event::BackbonePeerIdleTimeout {
754                server_interface_id,
755                peer_interface_id: client.id,
756                peer_ip: client.peer_ip,
757                peer_port: client.peer_port,
758                connected_for,
759            });
760        }
761        DisconnectReason::WriteStall => {
762            let _ = tx.send(Event::BackbonePeerWriteStall {
763                server_interface_id,
764                peer_interface_id: client.id,
765                peer_ip: client.peer_ip,
766                peer_port: client.peer_port,
767                connected_for,
768            });
769        }
770        DisconnectReason::RemoteClosed => {}
771    }
772
773    if let Some(state) = peers.get_mut(&client.peer_ip) {
774        state.connected_count = state.connected_count.saturating_sub(1);
775    }
776    peer_state.lock().unwrap().upsert_snapshot(peers);
777    // Writer already sent InterfaceDown for write stalls; avoid duplicate.
778    if !matches!(reason, DisconnectReason::WriteStall) {
779        let _ = tx.send(Event::InterfaceDown(client_id));
780    }
781}
782
783fn set_tcp_keepalive(stream: &TcpStream) -> io::Result<()> {
784    let sock = SockRef::from(stream);
785    let mut keepalive = TcpKeepalive::new()
786        .with_time(Duration::from_secs(5))
787        .with_interval(Duration::from_secs(2));
788    #[cfg(any(target_os = "linux", target_os = "macos"))]
789    {
790        keepalive = keepalive.with_retries(12);
791    }
792    sock.set_tcp_keepalive(&keepalive)
793}
794
795fn cleanup(poller: &Poller, clients: &HashMap<usize, ClientState>, listener: &TcpListener) {
796    for (_, client) in clients {
797        let _ = poller.delete(&client.stream);
798    }
799    let _ = poller.delete(listener);
800}
801
802// ---------------------------------------------------------------------------
803// Client mode
804// ---------------------------------------------------------------------------
805
806/// Configuration for a backbone client interface.
807#[derive(Debug, Clone)]
808pub struct BackboneClientConfig {
809    pub name: String,
810    pub target_host: String,
811    pub target_port: u16,
812    pub interface_id: InterfaceId,
813    pub reconnect_wait: Duration,
814    pub max_reconnect_tries: Option<u32>,
815    pub connect_timeout: Duration,
816    pub transport_identity: Option<String>,
817    pub runtime: Arc<Mutex<BackboneClientRuntime>>,
818}
819
820#[derive(Debug, Clone)]
821pub struct BackboneClientRuntime {
822    pub reconnect_wait: Duration,
823    pub max_reconnect_tries: Option<u32>,
824    pub connect_timeout: Duration,
825}
826
827impl BackboneClientRuntime {
828    pub fn from_config(config: &BackboneClientConfig) -> Self {
829        Self {
830            reconnect_wait: config.reconnect_wait,
831            max_reconnect_tries: config.max_reconnect_tries,
832            connect_timeout: config.connect_timeout,
833        }
834    }
835}
836
837#[derive(Debug, Clone)]
838pub struct BackboneClientRuntimeConfigHandle {
839    pub interface_name: String,
840    pub runtime: Arc<Mutex<BackboneClientRuntime>>,
841    pub startup: BackboneClientRuntime,
842}
843
844impl Default for BackboneClientConfig {
845    fn default() -> Self {
846        let mut config = BackboneClientConfig {
847            name: String::new(),
848            target_host: "127.0.0.1".into(),
849            target_port: 4242,
850            interface_id: InterfaceId(0),
851            reconnect_wait: Duration::from_secs(5),
852            max_reconnect_tries: None,
853            connect_timeout: Duration::from_secs(5),
854            transport_identity: None,
855            runtime: Arc::new(Mutex::new(BackboneClientRuntime {
856                reconnect_wait: Duration::from_secs(5),
857                max_reconnect_tries: None,
858                connect_timeout: Duration::from_secs(5),
859            })),
860        };
861        let startup = BackboneClientRuntime::from_config(&config);
862        config.runtime = Arc::new(Mutex::new(startup));
863        config
864    }
865}
866
867/// Writer that sends HDLC-framed data over a TCP stream (client mode).
868struct BackboneClientWriter {
869    stream: TcpStream,
870}
871
872impl Writer for BackboneClientWriter {
873    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
874        self.stream.write_all(&hdlc::frame(data))
875    }
876}
877
878/// Try to connect to the target host:port with timeout.
879fn try_connect_client(config: &BackboneClientConfig) -> io::Result<TcpStream> {
880    let runtime = config.runtime.lock().unwrap().clone();
881    let addr_str = format!("{}:{}", config.target_host, config.target_port);
882    let addr = addr_str
883        .to_socket_addrs()?
884        .next()
885        .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "no addresses resolved"))?;
886
887    let stream = TcpStream::connect_timeout(&addr, runtime.connect_timeout)?;
888    stream.set_nodelay(true)?;
889    set_tcp_keepalive(&stream).ok();
890
891    // Prevent SIGPIPE on macOS when writing to broken pipes
892    #[cfg(target_os = "macos")]
893    {
894        let sock = SockRef::from(&stream);
895        sock.set_nosigpipe(true).ok();
896    }
897
898    Ok(stream)
899}
900
901/// Connect and start the reader thread. Returns the writer for the driver.
902pub fn start_client(config: BackboneClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
903    let stream = try_connect_client(&config)?;
904    let reader_stream = stream.try_clone()?;
905    let writer_stream = stream.try_clone()?;
906
907    let id = config.interface_id;
908    log::info!(
909        "[{}] backbone client connected to {}:{}",
910        config.name,
911        config.target_host,
912        config.target_port
913    );
914
915    // Initial connect: writer is None because it's returned directly to the caller
916    let _ = tx.send(Event::InterfaceUp(id, None, None));
917
918    thread::Builder::new()
919        .name(format!("backbone-client-{}", id.0))
920        .spawn(move || {
921            client_reader_loop(reader_stream, config, tx);
922        })?;
923
924    Ok(Box::new(BackboneClientWriter {
925        stream: writer_stream,
926    }))
927}
928
929/// Reader thread: reads from socket, HDLC-decodes, sends frames to driver.
930/// On disconnect, attempts reconnection.
931fn client_reader_loop(mut stream: TcpStream, config: BackboneClientConfig, tx: EventSender) {
932    let id = config.interface_id;
933    let mut decoder = hdlc::Decoder::new();
934    let mut buf = [0u8; 4096];
935
936    loop {
937        match stream.read(&mut buf) {
938            Ok(0) => {
939                log::warn!("[{}] connection closed", config.name);
940                let _ = tx.send(Event::InterfaceDown(id));
941                match client_reconnect(&config, &tx) {
942                    Some(new_stream) => {
943                        stream = new_stream;
944                        decoder = hdlc::Decoder::new();
945                        continue;
946                    }
947                    None => {
948                        log::error!("[{}] reconnection failed, giving up", config.name);
949                        return;
950                    }
951                }
952            }
953            Ok(n) => {
954                for frame in decoder.feed(&buf[..n]) {
955                    if tx
956                        .send(Event::Frame {
957                            interface_id: id,
958                            data: frame,
959                        })
960                        .is_err()
961                    {
962                        return;
963                    }
964                }
965            }
966            Err(e) => {
967                log::warn!("[{}] read error: {}", config.name, e);
968                let _ = tx.send(Event::InterfaceDown(id));
969                match client_reconnect(&config, &tx) {
970                    Some(new_stream) => {
971                        stream = new_stream;
972                        decoder = hdlc::Decoder::new();
973                        continue;
974                    }
975                    None => {
976                        log::error!("[{}] reconnection failed, giving up", config.name);
977                        return;
978                    }
979                }
980            }
981        }
982    }
983}
984
985/// Maximum backoff multiplier: `base_delay * 2^MAX_BACKOFF_SHIFT`.
986/// With a 5 s base this caps at 5 × 2^6 = 320 s ≈ 5 min.
987const MAX_BACKOFF_SHIFT: u32 = 6;
988
989/// Attempt to reconnect with exponential backoff and jitter.
990/// Returns the new reader stream on success.
991/// Sends the new writer to the driver via InterfaceUp event.
992fn client_reconnect(config: &BackboneClientConfig, tx: &EventSender) -> Option<TcpStream> {
993    let mut attempts = 0u32;
994    loop {
995        let runtime = config.runtime.lock().unwrap().clone();
996
997        let shift = attempts.min(MAX_BACKOFF_SHIFT);
998        let backoff = runtime.reconnect_wait * 2u32.pow(shift);
999        // Add ±25 % jitter to avoid thundering-herd reconnects.
1000        let jitter_range = backoff / 4;
1001        let jitter = if jitter_range.as_nanos() > 0 {
1002            let offset = Duration::from_nanos(
1003                (std::hash::RandomState::new().build_hasher().finish()
1004                    % jitter_range.as_nanos() as u64)
1005                    * 2,
1006            );
1007            if offset > jitter_range {
1008                backoff + (offset - jitter_range)
1009            } else {
1010                backoff - (jitter_range - offset)
1011            }
1012        } else {
1013            backoff
1014        };
1015        thread::sleep(jitter);
1016
1017        attempts += 1;
1018
1019        if let Some(max) = runtime.max_reconnect_tries {
1020            if attempts > max {
1021                let _ = tx.send(Event::InterfaceDown(config.interface_id));
1022                return None;
1023            }
1024        }
1025
1026        log::info!(
1027            "[{}] reconnect attempt {} (backoff {:.1}s) ...",
1028            config.name,
1029            attempts,
1030            jitter.as_secs_f64(),
1031        );
1032
1033        match try_connect_client(config) {
1034            Ok(new_stream) => {
1035                let writer_stream = match new_stream.try_clone() {
1036                    Ok(s) => s,
1037                    Err(e) => {
1038                        log::warn!("[{}] failed to clone stream: {}", config.name, e);
1039                        continue;
1040                    }
1041                };
1042                log::info!(
1043                    "[{}] reconnected after {} attempt(s)",
1044                    config.name,
1045                    attempts
1046                );
1047                let new_writer: Box<dyn Writer> = Box::new(BackboneClientWriter {
1048                    stream: writer_stream,
1049                });
1050                let _ = tx.send(Event::InterfaceUp(
1051                    config.interface_id,
1052                    Some(new_writer),
1053                    None,
1054                ));
1055                return Some(new_stream);
1056            }
1057            Err(e) => {
1058                log::warn!("[{}] reconnect failed: {}", config.name, e);
1059            }
1060        }
1061    }
1062}
1063
1064// ---------------------------------------------------------------------------
1065// Factory
1066// ---------------------------------------------------------------------------
1067
1068/// Internal enum used by [`BackboneInterfaceFactory`] to carry either a
1069/// server or client config through the opaque `InterfaceConfigData` channel.
1070pub(crate) enum BackboneMode {
1071    Server(BackboneConfig),
1072    Client(BackboneClientConfig),
1073}
1074
1075/// Factory for `BackboneInterface`.
1076///
1077/// If the config params contain `"remote"` or `"target_host"` the interface
1078/// is started in client mode; otherwise it is started as a TCP listener
1079/// (server mode).
1080pub struct BackboneInterfaceFactory;
1081
1082fn parse_positive_duration_secs(params: &HashMap<String, String>, key: &str) -> Option<Duration> {
1083    params
1084        .get(key)
1085        .and_then(|v| v.parse::<f64>().ok())
1086        .filter(|v| *v > 0.0)
1087        .map(Duration::from_secs_f64)
1088}
1089
1090impl InterfaceFactory for BackboneInterfaceFactory {
1091    fn type_name(&self) -> &str {
1092        "BackboneInterface"
1093    }
1094
1095    fn parse_config(
1096        &self,
1097        name: &str,
1098        id: InterfaceId,
1099        params: &HashMap<String, String>,
1100    ) -> Result<Box<dyn InterfaceConfigData>, String> {
1101        if let Some(target_host) = params.get("remote").or_else(|| params.get("target_host")) {
1102            // Client mode
1103            let target_host = target_host.clone();
1104            let target_port = params
1105                .get("target_port")
1106                .or_else(|| params.get("port"))
1107                .and_then(|v| v.parse().ok())
1108                .unwrap_or(4242);
1109            let transport_identity = params.get("transport_identity").cloned();
1110            Ok(Box::new(BackboneMode::Client(BackboneClientConfig {
1111                name: name.to_string(),
1112                target_host,
1113                target_port,
1114                interface_id: id,
1115                transport_identity,
1116                ..BackboneClientConfig::default()
1117            })))
1118        } else {
1119            // Server mode
1120            let listen_ip = params
1121                .get("listen_ip")
1122                .or_else(|| params.get("device"))
1123                .cloned()
1124                .unwrap_or_else(|| "0.0.0.0".into());
1125            let listen_port = params
1126                .get("listen_port")
1127                .or_else(|| params.get("port"))
1128                .and_then(|v| v.parse().ok())
1129                .unwrap_or(4242);
1130            let max_connections = params.get("max_connections").and_then(|v| v.parse().ok());
1131            let idle_timeout = parse_positive_duration_secs(params, "idle_timeout");
1132            let write_stall_timeout = parse_positive_duration_secs(params, "write_stall_timeout");
1133            let abuse = BackboneAbuseConfig {
1134                max_penalty_duration: parse_positive_duration_secs(params, "max_penalty_duration"),
1135            };
1136            let mut config = BackboneConfig {
1137                name: name.to_string(),
1138                listen_ip,
1139                listen_port,
1140                interface_id: id,
1141                max_connections,
1142                idle_timeout,
1143                write_stall_timeout,
1144                abuse,
1145                ingress_control: IngressControlConfig::enabled(),
1146                runtime: Arc::new(Mutex::new(BackboneServerRuntime {
1147                    max_connections: None,
1148                    idle_timeout: None,
1149                    write_stall_timeout: None,
1150                    abuse: BackboneAbuseConfig::default(),
1151                })),
1152                peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
1153            };
1154            let startup = BackboneServerRuntime::from_config(&config);
1155            config.runtime = Arc::new(Mutex::new(startup));
1156            Ok(Box::new(BackboneMode::Server(config)))
1157        }
1158    }
1159
1160    fn start(
1161        &self,
1162        config: Box<dyn InterfaceConfigData>,
1163        ctx: StartContext,
1164    ) -> io::Result<StartResult> {
1165        let mode = *config.into_any().downcast::<BackboneMode>().map_err(|_| {
1166            io::Error::new(
1167                io::ErrorKind::InvalidData,
1168                "wrong config type for BackboneInterface",
1169            )
1170        })?;
1171
1172        match mode {
1173            BackboneMode::Client(cfg) => {
1174                let id = cfg.interface_id;
1175                let name = cfg.name.clone();
1176                let info = InterfaceInfo {
1177                    id,
1178                    name,
1179                    mode: ctx.mode,
1180                    out_capable: true,
1181                    in_capable: true,
1182                    bitrate: Some(1_000_000_000),
1183                    announce_rate_target: None,
1184                    announce_rate_grace: 0,
1185                    announce_rate_penalty: 0.0,
1186                    announce_cap: constants::ANNOUNCE_CAP,
1187                    is_local_client: false,
1188                    wants_tunnel: false,
1189                    tunnel_id: None,
1190                    mtu: 65535,
1191                    ingress_control: ctx.ingress_control,
1192                    ia_freq: 0.0,
1193                    started: crate::time::now(),
1194                };
1195                let writer = start_client(cfg, ctx.tx)?;
1196                Ok(StartResult::Simple {
1197                    id,
1198                    info,
1199                    writer,
1200                    interface_type_name: "BackboneInterface".to_string(),
1201                })
1202            }
1203            BackboneMode::Server(mut cfg) => {
1204                cfg.ingress_control = ctx.ingress_control;
1205                start(cfg, ctx.tx, ctx.next_dynamic_id)?;
1206                Ok(StartResult::Listener { control: None })
1207            }
1208        }
1209    }
1210}
1211
1212pub(crate) fn runtime_handle_from_mode(mode: &BackboneMode) -> Option<BackboneRuntimeConfigHandle> {
1213    match mode {
1214        BackboneMode::Server(config) => Some(BackboneRuntimeConfigHandle {
1215            interface_name: config.name.clone(),
1216            runtime: Arc::clone(&config.runtime),
1217            startup: BackboneServerRuntime::from_config(config),
1218        }),
1219        BackboneMode::Client(_) => None,
1220    }
1221}
1222
1223pub(crate) fn peer_state_handle_from_mode(mode: &BackboneMode) -> Option<BackbonePeerStateHandle> {
1224    match mode {
1225        BackboneMode::Server(config) => Some(BackbonePeerStateHandle {
1226            interface_id: config.interface_id,
1227            interface_name: config.name.clone(),
1228            peer_state: Arc::clone(&config.peer_state),
1229        }),
1230        BackboneMode::Client(_) => None,
1231    }
1232}
1233
1234pub(crate) fn client_runtime_handle_from_mode(
1235    mode: &BackboneMode,
1236) -> Option<BackboneClientRuntimeConfigHandle> {
1237    match mode {
1238        BackboneMode::Client(config) => Some(BackboneClientRuntimeConfigHandle {
1239            interface_name: config.name.clone(),
1240            runtime: Arc::clone(&config.runtime),
1241            startup: BackboneClientRuntime::from_config(config),
1242        }),
1243        BackboneMode::Server(_) => None,
1244    }
1245}
1246
1247#[cfg(test)]
1248mod tests {
1249    use super::*;
1250    use std::sync::mpsc;
1251    use std::time::Duration;
1252
1253    fn find_free_port() -> u16 {
1254        TcpListener::bind("127.0.0.1:0")
1255            .unwrap()
1256            .local_addr()
1257            .unwrap()
1258            .port()
1259    }
1260
1261    fn recv_non_peer_event(
1262        rx: &mpsc::Receiver<Event>,
1263        timeout: Duration,
1264    ) -> Result<Event, mpsc::RecvTimeoutError> {
1265        let deadline = Instant::now() + timeout;
1266        loop {
1267            let remaining = deadline.saturating_duration_since(Instant::now());
1268            if remaining.is_zero() {
1269                return Err(mpsc::RecvTimeoutError::Timeout);
1270            }
1271            let event = rx.recv_timeout(remaining)?;
1272            match event {
1273                Event::BackbonePeerConnected { .. }
1274                | Event::BackbonePeerDisconnected { .. }
1275                | Event::BackbonePeerIdleTimeout { .. }
1276                | Event::BackbonePeerWriteStall { .. }
1277                | Event::BackbonePeerPenalty { .. } => continue,
1278                other => return Ok(other),
1279            }
1280        }
1281    }
1282
1283    fn make_server_config(
1284        port: u16,
1285        interface_id: u64,
1286        max_connections: Option<usize>,
1287        idle_timeout: Option<Duration>,
1288        write_stall_timeout: Option<Duration>,
1289        abuse: BackboneAbuseConfig,
1290    ) -> BackboneConfig {
1291        let mut config = BackboneConfig {
1292            name: "test-backbone".into(),
1293            listen_ip: "127.0.0.1".into(),
1294            listen_port: port,
1295            interface_id: InterfaceId(interface_id),
1296            max_connections,
1297            idle_timeout,
1298            write_stall_timeout,
1299            abuse,
1300            ingress_control: IngressControlConfig::enabled(),
1301            runtime: Arc::new(Mutex::new(BackboneServerRuntime {
1302                max_connections: None,
1303                idle_timeout: None,
1304                write_stall_timeout: None,
1305                abuse: BackboneAbuseConfig::default(),
1306            })),
1307            peer_state: Arc::new(Mutex::new(BackbonePeerMonitor::new())),
1308        };
1309        let startup = BackboneServerRuntime::from_config(&config);
1310        config.runtime = Arc::new(Mutex::new(startup));
1311        config
1312    }
1313
1314    #[test]
1315    fn backbone_accept_connection() {
1316        let port = find_free_port();
1317        let (tx, rx) = crate::event::channel();
1318        let next_id = Arc::new(AtomicU64::new(8000));
1319
1320        let config = make_server_config(port, 80, None, None, None, BackboneAbuseConfig::default());
1321
1322        start(config, tx, next_id).unwrap();
1323        thread::sleep(Duration::from_millis(50));
1324
1325        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1326
1327        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1328        match event {
1329            Event::InterfaceUp(id, writer, info) => {
1330                assert_eq!(id, InterfaceId(8000));
1331                assert!(writer.is_some());
1332                assert!(info.is_some());
1333                let info = info.unwrap();
1334                assert!(info.out_capable);
1335                assert!(info.in_capable);
1336            }
1337            other => panic!("expected InterfaceUp, got {:?}", other),
1338        }
1339    }
1340
1341    #[test]
1342    fn backbone_receive_frame() {
1343        let port = find_free_port();
1344        let (tx, rx) = crate::event::channel();
1345        let next_id = Arc::new(AtomicU64::new(8100));
1346
1347        let config = make_server_config(port, 81, None, None, None, BackboneAbuseConfig::default());
1348
1349        start(config, tx, next_id).unwrap();
1350        thread::sleep(Duration::from_millis(50));
1351
1352        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1353
1354        // Drain InterfaceUp
1355        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1356
1357        // Send HDLC frame (>= 19 bytes)
1358        let payload: Vec<u8> = (0..32).collect();
1359        client.write_all(&hdlc::frame(&payload)).unwrap();
1360
1361        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1362        match event {
1363            Event::Frame { interface_id, data } => {
1364                assert_eq!(interface_id, InterfaceId(8100));
1365                assert_eq!(data, payload);
1366            }
1367            other => panic!("expected Frame, got {:?}", other),
1368        }
1369    }
1370
1371    #[test]
1372    fn backbone_send_to_client() {
1373        let port = find_free_port();
1374        let (tx, rx) = crate::event::channel();
1375        let next_id = Arc::new(AtomicU64::new(8200));
1376
1377        let config = make_server_config(port, 82, None, None, None, BackboneAbuseConfig::default());
1378
1379        start(config, tx, next_id).unwrap();
1380        thread::sleep(Duration::from_millis(50));
1381
1382        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1383        client
1384            .set_read_timeout(Some(Duration::from_secs(2)))
1385            .unwrap();
1386
1387        // Get writer from InterfaceUp
1388        let event = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1389        let mut writer = match event {
1390            Event::InterfaceUp(_, Some(w), _) => w,
1391            other => panic!("expected InterfaceUp with writer, got {:?}", other),
1392        };
1393
1394        // Send frame via writer
1395        let payload: Vec<u8> = (0..24).collect();
1396        writer.send_frame(&payload).unwrap();
1397
1398        // Read from client
1399        let mut buf = [0u8; 256];
1400        let n = client.read(&mut buf).unwrap();
1401        let expected = hdlc::frame(&payload);
1402        assert_eq!(&buf[..n], &expected[..]);
1403    }
1404
1405    #[test]
1406    fn backbone_multiple_clients() {
1407        let port = find_free_port();
1408        let (tx, rx) = crate::event::channel();
1409        let next_id = Arc::new(AtomicU64::new(8300));
1410
1411        let config = make_server_config(port, 83, None, None, None, BackboneAbuseConfig::default());
1412
1413        start(config, tx, next_id).unwrap();
1414        thread::sleep(Duration::from_millis(50));
1415
1416        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1417        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1418
1419        let mut ids = Vec::new();
1420        for _ in 0..2 {
1421            let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1422            match event {
1423                Event::InterfaceUp(id, _, _) => ids.push(id),
1424                other => panic!("expected InterfaceUp, got {:?}", other),
1425            }
1426        }
1427
1428        assert_eq!(ids.len(), 2);
1429        assert_ne!(ids[0], ids[1]);
1430    }
1431
1432    #[test]
1433    fn backbone_client_disconnect() {
1434        let port = find_free_port();
1435        let (tx, rx) = crate::event::channel();
1436        let next_id = Arc::new(AtomicU64::new(8400));
1437
1438        let config = make_server_config(port, 84, None, None, None, BackboneAbuseConfig::default());
1439
1440        start(config, tx, next_id).unwrap();
1441        thread::sleep(Duration::from_millis(50));
1442
1443        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1444
1445        // Drain InterfaceUp
1446        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1447
1448        // Disconnect
1449        drop(client);
1450
1451        // Should receive InterfaceDown
1452        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1453        assert!(
1454            matches!(event, Event::InterfaceDown(InterfaceId(8400))),
1455            "expected InterfaceDown(8400), got {:?}",
1456            event
1457        );
1458    }
1459
1460    #[test]
1461    fn backbone_epoll_multiplexing() {
1462        let port = find_free_port();
1463        let (tx, rx) = crate::event::channel();
1464        let next_id = Arc::new(AtomicU64::new(8500));
1465
1466        let config = make_server_config(port, 85, None, None, None, BackboneAbuseConfig::default());
1467
1468        start(config, tx, next_id).unwrap();
1469        thread::sleep(Duration::from_millis(50));
1470
1471        let mut client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1472        let mut client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1473
1474        // Drain both InterfaceUp events
1475        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1476        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1477
1478        // Both clients send data simultaneously
1479        let payload1: Vec<u8> = (0..24).collect();
1480        let payload2: Vec<u8> = (100..130).collect();
1481        client1.write_all(&hdlc::frame(&payload1)).unwrap();
1482        client2.write_all(&hdlc::frame(&payload2)).unwrap();
1483
1484        // Should receive both Frame events
1485        let mut received = Vec::new();
1486        for _ in 0..2 {
1487            let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1488            match event {
1489                Event::Frame { data, .. } => received.push(data),
1490                other => panic!("expected Frame, got {:?}", other),
1491            }
1492        }
1493        assert!(received.contains(&payload1));
1494        assert!(received.contains(&payload2));
1495    }
1496
1497    #[test]
1498    fn backbone_bind_port() {
1499        let port = find_free_port();
1500        let (tx, _rx) = crate::event::channel();
1501        let next_id = Arc::new(AtomicU64::new(8600));
1502
1503        let config = make_server_config(port, 86, None, None, None, BackboneAbuseConfig::default());
1504
1505        // Should not error
1506        start(config, tx, next_id).unwrap();
1507    }
1508
1509    #[test]
1510    fn backbone_hdlc_fragmented() {
1511        let port = find_free_port();
1512        let (tx, rx) = crate::event::channel();
1513        let next_id = Arc::new(AtomicU64::new(8700));
1514
1515        let config = make_server_config(port, 87, None, None, None, BackboneAbuseConfig::default());
1516
1517        start(config, tx, next_id).unwrap();
1518        thread::sleep(Duration::from_millis(50));
1519
1520        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1521        client.set_nodelay(true).unwrap();
1522
1523        // Drain InterfaceUp
1524        let _ = recv_non_peer_event(&rx, Duration::from_secs(1)).unwrap();
1525
1526        // Send HDLC frame in two fragments
1527        let payload: Vec<u8> = (0..32).collect();
1528        let framed = hdlc::frame(&payload);
1529        let mid = framed.len() / 2;
1530
1531        client.write_all(&framed[..mid]).unwrap();
1532        thread::sleep(Duration::from_millis(50));
1533        client.write_all(&framed[mid..]).unwrap();
1534
1535        // Should receive reassembled frame
1536        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1537        match event {
1538            Event::Frame { data, .. } => {
1539                assert_eq!(data, payload);
1540            }
1541            other => panic!("expected Frame, got {:?}", other),
1542        }
1543    }
1544
1545    // -----------------------------------------------------------------------
1546    // Client mode tests
1547    // -----------------------------------------------------------------------
1548
1549    fn make_client_config(port: u16, id: u64) -> BackboneClientConfig {
1550        BackboneClientConfig {
1551            name: format!("test-bb-client-{}", port),
1552            target_host: "127.0.0.1".into(),
1553            target_port: port,
1554            interface_id: InterfaceId(id),
1555            reconnect_wait: Duration::from_millis(100),
1556            max_reconnect_tries: Some(2),
1557            connect_timeout: Duration::from_secs(2),
1558            transport_identity: None,
1559            runtime: Arc::new(Mutex::new(BackboneClientRuntime {
1560                reconnect_wait: Duration::from_millis(100),
1561                max_reconnect_tries: Some(2),
1562                connect_timeout: Duration::from_secs(2),
1563            })),
1564        }
1565    }
1566
1567    #[test]
1568    fn backbone_client_connect() {
1569        let port = find_free_port();
1570        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1571        let (tx, rx) = crate::event::channel();
1572
1573        let config = make_client_config(port, 9000);
1574        let _writer = start_client(config, tx).unwrap();
1575
1576        let _server_stream = listener.accept().unwrap();
1577
1578        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1579        assert!(matches!(event, Event::InterfaceUp(InterfaceId(9000), _, _)));
1580    }
1581
1582    #[test]
1583    fn backbone_client_receive_frame() {
1584        let port = find_free_port();
1585        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1586        let (tx, rx) = crate::event::channel();
1587
1588        let config = make_client_config(port, 9100);
1589        let _writer = start_client(config, tx).unwrap();
1590
1591        let (mut server_stream, _) = listener.accept().unwrap();
1592
1593        // Drain InterfaceUp
1594        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1595
1596        // Send HDLC frame from server side (>= 19 bytes payload)
1597        let payload: Vec<u8> = (0..32).collect();
1598        server_stream.write_all(&hdlc::frame(&payload)).unwrap();
1599
1600        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1601        match event {
1602            Event::Frame { interface_id, data } => {
1603                assert_eq!(interface_id, InterfaceId(9100));
1604                assert_eq!(data, payload);
1605            }
1606            other => panic!("expected Frame, got {:?}", other),
1607        }
1608    }
1609
1610    #[test]
1611    fn backbone_client_send_frame() {
1612        let port = find_free_port();
1613        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1614        let (tx, _rx) = crate::event::channel();
1615
1616        let config = make_client_config(port, 9200);
1617        let mut writer = start_client(config, tx).unwrap();
1618
1619        let (mut server_stream, _) = listener.accept().unwrap();
1620        server_stream
1621            .set_read_timeout(Some(Duration::from_secs(2)))
1622            .unwrap();
1623
1624        let payload: Vec<u8> = (0..24).collect();
1625        writer.send_frame(&payload).unwrap();
1626
1627        let mut buf = [0u8; 256];
1628        let n = server_stream.read(&mut buf).unwrap();
1629        let expected = hdlc::frame(&payload);
1630        assert_eq!(&buf[..n], &expected[..]);
1631    }
1632
1633    #[test]
1634    fn backbone_max_connections_rejects_excess() {
1635        let port = find_free_port();
1636        let (tx, rx) = crate::event::channel();
1637        let next_id = Arc::new(AtomicU64::new(8800));
1638
1639        let config = make_server_config(
1640            port,
1641            88,
1642            Some(2),
1643            None,
1644            None,
1645            BackboneAbuseConfig::default(),
1646        );
1647
1648        start(config, tx, next_id).unwrap();
1649        thread::sleep(Duration::from_millis(50));
1650
1651        // Connect two clients (at limit)
1652        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1653        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1654
1655        // Drain both InterfaceUp events
1656        for _ in 0..2 {
1657            let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1658            assert!(matches!(event, Event::InterfaceUp(_, _, _)));
1659        }
1660
1661        // Third connection should be accepted at TCP level but immediately dropped
1662        let client3 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1663        client3
1664            .set_read_timeout(Some(Duration::from_millis(500)))
1665            .unwrap();
1666
1667        // Give server time to reject
1668        thread::sleep(Duration::from_millis(100));
1669
1670        // Should NOT receive a third InterfaceUp
1671        let result = recv_non_peer_event(&rx, Duration::from_millis(500));
1672        assert!(
1673            result.is_err(),
1674            "expected no InterfaceUp for rejected connection, got {:?}",
1675            result
1676        );
1677    }
1678
1679    #[test]
1680    fn backbone_max_connections_allows_after_disconnect() {
1681        let port = find_free_port();
1682        let (tx, rx) = crate::event::channel();
1683        let next_id = Arc::new(AtomicU64::new(8900));
1684
1685        let config = make_server_config(
1686            port,
1687            89,
1688            Some(1),
1689            None,
1690            None,
1691            BackboneAbuseConfig::default(),
1692        );
1693
1694        start(config, tx, next_id).unwrap();
1695        thread::sleep(Duration::from_millis(50));
1696
1697        // Connect first client
1698        let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1699        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1700        assert!(matches!(event, Event::InterfaceUp(_, _, _)));
1701
1702        // Disconnect first client
1703        drop(client1);
1704
1705        // Wait for InterfaceDown
1706        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1707        assert!(matches!(event, Event::InterfaceDown(_)));
1708
1709        // Now a new connection should be accepted
1710        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1711        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1712        assert!(
1713            matches!(event, Event::InterfaceUp(_, _, _)),
1714            "expected InterfaceUp after slot freed, got {:?}",
1715            event
1716        );
1717    }
1718
1719    #[test]
1720    fn backbone_client_reconnect() {
1721        let port = find_free_port();
1722        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1723        listener.set_nonblocking(false).unwrap();
1724        let (tx, rx) = crate::event::channel();
1725
1726        let config = make_client_config(port, 9300);
1727        let _writer = start_client(config, tx).unwrap();
1728
1729        // Accept first connection and immediately close it
1730        let (server_stream, _) = listener.accept().unwrap();
1731
1732        // Drain InterfaceUp
1733        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1734
1735        drop(server_stream);
1736
1737        // Should get InterfaceDown
1738        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1739        assert!(matches!(event, Event::InterfaceDown(InterfaceId(9300))));
1740
1741        // Accept the reconnection
1742        let _server_stream2 = listener.accept().unwrap();
1743
1744        // Should get InterfaceUp again
1745        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1746        assert!(matches!(event, Event::InterfaceUp(InterfaceId(9300), _, _)));
1747    }
1748
1749    #[test]
1750    fn backbone_idle_timeout_disconnects_silent_client() {
1751        let port = find_free_port();
1752        let (tx, rx) = crate::event::channel();
1753        let next_id = Arc::new(AtomicU64::new(9400));
1754
1755        let config = make_server_config(
1756            port,
1757            94,
1758            None,
1759            Some(Duration::from_millis(150)),
1760            None,
1761            BackboneAbuseConfig::default(),
1762        );
1763
1764        start(config, tx, next_id).unwrap();
1765        thread::sleep(Duration::from_millis(50));
1766
1767        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1768
1769        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1770        let client_id = match event {
1771            Event::InterfaceUp(id, _, _) => id,
1772            other => panic!("expected InterfaceUp, got {:?}", other),
1773        };
1774
1775        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1776        assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1777    }
1778
1779    #[test]
1780    fn backbone_idle_timeout_ignores_client_after_data() {
1781        let port = find_free_port();
1782        let (tx, rx) = crate::event::channel();
1783        let next_id = Arc::new(AtomicU64::new(9500));
1784
1785        let config = make_server_config(
1786            port,
1787            95,
1788            None,
1789            Some(Duration::from_millis(200)),
1790            None,
1791            BackboneAbuseConfig::default(),
1792        );
1793
1794        start(config, tx, next_id).unwrap();
1795        thread::sleep(Duration::from_millis(50));
1796
1797        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1798
1799        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1800        let client_id = match event {
1801            Event::InterfaceUp(id, _, _) => id,
1802            other => panic!("expected InterfaceUp, got {:?}", other),
1803        };
1804
1805        client.write_all(&hdlc::frame(&[1u8; 24])).unwrap();
1806
1807        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1808        match event {
1809            Event::Frame { interface_id, data } => {
1810                assert_eq!(interface_id, client_id);
1811                assert_eq!(data, vec![1u8; 24]);
1812            }
1813            other => panic!("expected Frame, got {:?}", other),
1814        }
1815
1816        let result = recv_non_peer_event(&rx, Duration::from_millis(500));
1817        assert!(
1818            result.is_err(),
1819            "expected no InterfaceDown after client sent data, got {:?}",
1820            result
1821        );
1822    }
1823
1824    #[test]
1825    fn backbone_runtime_idle_timeout_updates_live() {
1826        let port = find_free_port();
1827        let (tx, rx) = crate::event::channel();
1828        let next_id = Arc::new(AtomicU64::new(9650));
1829
1830        let config = make_server_config(port, 97, None, None, None, BackboneAbuseConfig::default());
1831        let runtime = Arc::clone(&config.runtime);
1832
1833        start(config, tx, next_id).unwrap();
1834        thread::sleep(Duration::from_millis(50));
1835
1836        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1837        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1838        let client_id = match event {
1839            Event::InterfaceUp(id, _, _) => id,
1840            other => panic!("expected InterfaceUp, got {:?}", other),
1841        };
1842
1843        {
1844            let mut runtime = runtime.lock().unwrap();
1845            runtime.idle_timeout = Some(Duration::from_millis(150));
1846        }
1847
1848        let event = recv_non_peer_event(&rx, Duration::from_secs(4)).unwrap();
1849        assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1850    }
1851
1852    #[test]
1853    fn backbone_write_stall_timeout_disconnects_unwritable_client() {
1854        let port = find_free_port();
1855        let (tx, rx) = crate::event::channel();
1856        let next_id = Arc::new(AtomicU64::new(9660));
1857
1858        let config = make_server_config(
1859            port,
1860            98,
1861            None,
1862            None,
1863            Some(Duration::from_millis(50)),
1864            BackboneAbuseConfig::default(),
1865        );
1866
1867        start(config, tx, next_id).unwrap();
1868        thread::sleep(Duration::from_millis(50));
1869
1870        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1871        client
1872            .set_read_timeout(Some(Duration::from_millis(100)))
1873            .unwrap();
1874        let sock = SockRef::from(&client);
1875        sock.set_recv_buffer_size(4096).ok();
1876
1877        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1878        let (client_id, mut writer) = match event {
1879            Event::InterfaceUp(id, Some(writer), _) => (id, writer),
1880            other => panic!("expected InterfaceUp with writer, got {:?}", other),
1881        };
1882
1883        let payload = vec![0x55; 512 * 1024];
1884        let deadline = Instant::now() + Duration::from_secs(3);
1885        let mut stalled = false;
1886        while Instant::now() < deadline {
1887            match writer.send_frame(&payload) {
1888                Ok(()) => {}
1889                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1890                    thread::sleep(Duration::from_millis(10));
1891                }
1892                Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {
1893                    stalled = true;
1894                    break;
1895                }
1896                Err(e) => panic!("unexpected send error: {}", e),
1897            }
1898        }
1899
1900        assert!(stalled, "expected writer to time out on persistent stall");
1901        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1902        assert!(matches!(event, Event::InterfaceDown(id) if id == client_id));
1903    }
1904
1905    /// Drain events matching a predicate, return the first match.
1906    fn wait_for<F>(rx: &mpsc::Receiver<Event>, timeout: Duration, mut pred: F) -> Option<Event>
1907    where
1908        F: FnMut(&Event) -> bool,
1909    {
1910        let deadline = Instant::now() + timeout;
1911        loop {
1912            let remaining = deadline.saturating_duration_since(Instant::now());
1913            if remaining.is_zero() {
1914                return None;
1915            }
1916            match rx.recv_timeout(remaining) {
1917                Ok(event) if pred(&event) => return Some(event),
1918                Ok(_) => continue,
1919                Err(_) => return None,
1920            }
1921        }
1922    }
1923
1924    #[test]
1925    fn backbone_write_stall_emits_peer_events() {
1926        let port = find_free_port();
1927        let (tx, rx) = crate::event::channel();
1928        let next_id = Arc::new(AtomicU64::new(9700));
1929
1930        let config = make_server_config(
1931            port,
1932            97,
1933            None,
1934            None,
1935            Some(Duration::from_millis(50)), // 50ms stall timeout
1936            BackboneAbuseConfig::default(),
1937        );
1938
1939        start(config, tx, next_id).unwrap();
1940        thread::sleep(Duration::from_millis(50));
1941
1942        // Connect a client that won't read (will cause write stall)
1943        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1944        client
1945            .set_read_timeout(Some(Duration::from_millis(100)))
1946            .unwrap();
1947        let sock = SockRef::from(&client);
1948        sock.set_recv_buffer_size(4096).ok();
1949
1950        // Wait for InterfaceUp and grab writer
1951        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1952        let mut writer = match event {
1953            Event::InterfaceUp(_, Some(w), _) => w,
1954            other => panic!("expected InterfaceUp with writer, got {:?}", other),
1955        };
1956
1957        // Flood until stall
1958        let payload = vec![0x55; 512 * 1024];
1959        let deadline = Instant::now() + Duration::from_secs(3);
1960        while Instant::now() < deadline {
1961            match writer.send_frame(&payload) {
1962                Ok(()) | Err(_) => {
1963                    if Instant::now() + Duration::from_millis(10) > deadline {
1964                        break;
1965                    }
1966                    thread::sleep(Duration::from_millis(10));
1967                }
1968            }
1969        }
1970
1971        // Should see BackbonePeerWriteStall event
1972        let stall_event = wait_for(&rx, Duration::from_secs(3), |e| {
1973            matches!(e, Event::BackbonePeerWriteStall { .. })
1974        });
1975        assert!(
1976            stall_event.is_some(),
1977            "expected BackbonePeerWriteStall event"
1978        );
1979    }
1980
1981    #[test]
1982    fn backbone_blacklisted_peer_rejected_on_connect() {
1983        let port = find_free_port();
1984        let (tx, rx) = crate::event::channel();
1985        let next_id = Arc::new(AtomicU64::new(9800));
1986
1987        let config = make_server_config(port, 98, None, None, None, BackboneAbuseConfig::default());
1988        let peer_state = config.peer_state.clone();
1989
1990        start(config, tx.clone(), next_id).unwrap();
1991        thread::sleep(Duration::from_millis(50));
1992
1993        // First connection should succeed
1994        let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
1995        let event = recv_non_peer_event(&rx, Duration::from_secs(2)).unwrap();
1996        assert!(
1997            matches!(event, Event::InterfaceUp(_, _, _)),
1998            "first connection should succeed"
1999        );
2000        drop(client1);
2001
2002        // Drain disconnect events
2003        thread::sleep(Duration::from_millis(100));
2004        while rx.try_recv().is_ok() {}
2005
2006        // Blacklist 127.0.0.1 via the peer monitor
2007        peer_state.lock().unwrap().blacklist(
2008            "127.0.0.1".parse().unwrap(),
2009            Duration::from_secs(60),
2010            "test blacklist".into(),
2011        );
2012
2013        // Second connection from same IP should be rejected (no InterfaceUp)
2014        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
2015        // Give poll loop time to reject
2016        thread::sleep(Duration::from_millis(200));
2017
2018        // Should NOT get an InterfaceUp — connection should have been rejected
2019        let event = rx.try_recv();
2020        match event {
2021            Ok(Event::InterfaceUp(_, _, _)) => {
2022                panic!("blacklisted peer should not get InterfaceUp")
2023            }
2024            _ => {} // Expected: no InterfaceUp
2025        }
2026    }
2027
2028    #[test]
2029    fn backbone_parse_config_reads_abuse_settings() {
2030        let factory = BackboneInterfaceFactory;
2031        let mut params = HashMap::new();
2032        params.insert("listen_ip".into(), "127.0.0.1".into());
2033        params.insert("listen_port".into(), "4242".into());
2034        params.insert("idle_timeout".into(), "15".into());
2035        params.insert("write_stall_timeout".into(), "45".into());
2036        params.insert("max_penalty_duration".into(), "3600".into());
2037
2038        let config = factory
2039            .parse_config("test-backbone", InterfaceId(97), &params)
2040            .unwrap();
2041        let mode = *config.into_any().downcast::<BackboneMode>().unwrap();
2042
2043        match mode {
2044            BackboneMode::Server(config) => {
2045                assert_eq!(config.listen_ip, "127.0.0.1");
2046                assert_eq!(config.listen_port, 4242);
2047                assert_eq!(config.idle_timeout, Some(Duration::from_secs(15)));
2048                assert_eq!(config.write_stall_timeout, Some(Duration::from_secs(45)));
2049                assert_eq!(
2050                    config.abuse.max_penalty_duration,
2051                    Some(Duration::from_secs(3600))
2052                );
2053            }
2054            BackboneMode::Client(_) => panic!("expected server config"),
2055        }
2056    }
2057}