Skip to main content

rns_net/interface/
backbone.rs

1//! Backbone TCP mesh interface using Linux epoll.
2//!
3//! Server mode: listens on a TCP port, accepts peer connections, spawns
4//! dynamic per-peer interfaces. Uses a single epoll thread to multiplex
5//! all client sockets. HDLC framing for packet boundaries.
6//!
7//! Client mode: connects to a remote backbone server, single TCP connection
8//! with HDLC framing. Reconnects on disconnect.
9//!
10//! Matches Python `BackboneInterface.py`.
11
12use std::collections::HashMap;
13use std::io::{self, Read, Write};
14use std::net::{TcpListener, TcpStream, ToSocketAddrs};
15use std::os::unix::io::{AsRawFd, RawFd};
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::sync::Arc;
18use std::thread;
19use std::time::Duration;
20
21use rns_core::constants;
22use rns_core::transport::types::{InterfaceId, InterfaceInfo};
23
24use crate::event::{Event, EventSender};
25use crate::hdlc;
26use crate::interface::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult, Writer};
27
28/// HW_MTU: 1 MB (matches Python BackboneInterface.HW_MTU)
29#[allow(dead_code)]
30const HW_MTU: usize = 1_048_576;
31
32/// Configuration for a backbone interface.
33#[derive(Debug, Clone)]
34pub struct BackboneConfig {
35    pub name: String,
36    pub listen_ip: String,
37    pub listen_port: u16,
38    pub interface_id: InterfaceId,
39}
40
41impl Default for BackboneConfig {
42    fn default() -> Self {
43        BackboneConfig {
44            name: String::new(),
45            listen_ip: "0.0.0.0".into(),
46            listen_port: 0,
47            interface_id: InterfaceId(0),
48        }
49    }
50}
51
52/// Writer that sends HDLC-framed data directly via socket write.
53struct BackboneWriter {
54    fd: RawFd,
55}
56
57impl Writer for BackboneWriter {
58    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
59        let framed = hdlc::frame(data);
60        let mut offset = 0;
61        while offset < framed.len() {
62            let n = unsafe {
63                libc::send(
64                    self.fd,
65                    framed[offset..].as_ptr() as *const libc::c_void,
66                    framed.len() - offset,
67                    libc::MSG_NOSIGNAL,
68                )
69            };
70            if n < 0 {
71                return Err(io::Error::last_os_error());
72            }
73            offset += n as usize;
74        }
75        Ok(())
76    }
77}
78
79// BackboneWriter's fd is a dup'd copy — we own it
80impl Drop for BackboneWriter {
81    fn drop(&mut self) {
82        unsafe {
83            libc::close(self.fd);
84        }
85    }
86}
87
88/// Safety: the fd is only accessed via send/close which are thread-safe.
89unsafe impl Send for BackboneWriter {}
90
91/// Start a backbone interface. Binds TCP listener, spawns epoll thread.
92pub fn start(
93    config: BackboneConfig,
94    tx: EventSender,
95    next_id: Arc<AtomicU64>,
96) -> io::Result<()> {
97    let addr = format!("{}:{}", config.listen_ip, config.listen_port);
98    let listener = TcpListener::bind(&addr)?;
99    listener.set_nonblocking(true)?;
100
101    log::info!(
102        "[{}] backbone server listening on {}",
103        config.name,
104        listener.local_addr().unwrap_or(addr.parse().unwrap())
105    );
106
107    let name = config.name.clone();
108    thread::Builder::new()
109        .name(format!("backbone-epoll-{}", config.interface_id.0))
110        .spawn(move || {
111            if let Err(e) = epoll_loop(listener, name, tx, next_id) {
112                log::error!("backbone epoll loop error: {}", e);
113            }
114        })?;
115
116    Ok(())
117}
118
119/// Per-client tracking state.
120struct ClientState {
121    id: InterfaceId,
122    decoder: hdlc::Decoder,
123}
124
125/// Main epoll event loop.
126fn epoll_loop(
127    listener: TcpListener,
128    name: String,
129    tx: EventSender,
130    next_id: Arc<AtomicU64>,
131) -> io::Result<()> {
132    let epfd = unsafe { libc::epoll_create1(0) };
133    if epfd < 0 {
134        return Err(io::Error::last_os_error());
135    }
136
137    // Register listener
138    let listener_fd = listener.as_raw_fd();
139    let mut ev = libc::epoll_event {
140        events: libc::EPOLLIN as u32,
141        u64: listener_fd as u64,
142    };
143    if unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, listener_fd, &mut ev) } < 0 {
144        unsafe { libc::close(epfd) };
145        return Err(io::Error::last_os_error());
146    }
147
148    let mut clients: HashMap<RawFd, ClientState> = HashMap::new();
149    let mut events = vec![libc::epoll_event { events: 0, u64: 0 }; 64];
150
151    loop {
152        let nfds = unsafe {
153            libc::epoll_wait(epfd, events.as_mut_ptr(), events.len() as i32, 1000)
154        };
155
156        if nfds < 0 {
157            let err = io::Error::last_os_error();
158            if err.kind() == io::ErrorKind::Interrupted {
159                continue;
160            }
161            // Clean up
162            for (&fd, _) in &clients {
163                unsafe {
164                    libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
165                    libc::close(fd);
166                }
167            }
168            unsafe { libc::close(epfd) };
169            return Err(err);
170        }
171
172        for i in 0..nfds as usize {
173            let ev = &events[i];
174            let fd = ev.u64 as RawFd;
175
176            if fd == listener_fd {
177                // Accept new connection
178                loop {
179                    match listener.accept() {
180                        Ok((stream, peer_addr)) => {
181                            let client_fd = stream.as_raw_fd();
182
183                            // Set non-blocking
184                            stream.set_nonblocking(true).ok();
185                            stream.set_nodelay(true).ok();
186
187                            // Set SO_KEEPALIVE and TCP options
188                            set_tcp_keepalive(client_fd);
189
190                            let client_id =
191                                InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
192
193                            log::info!(
194                                "[{}] backbone client connected: {} → id {}",
195                                name,
196                                peer_addr,
197                                client_id.0
198                            );
199
200                            // Register client fd with epoll
201                            let mut cev = libc::epoll_event {
202                                events: libc::EPOLLIN as u32,
203                                u64: client_fd as u64,
204                            };
205                            if unsafe {
206                                libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, client_fd, &mut cev)
207                            } < 0
208                            {
209                                log::warn!(
210                                    "[{}] failed to add client to epoll: {}",
211                                    name,
212                                    io::Error::last_os_error()
213                                );
214                                // stream drops here, closing client_fd — correct
215                                continue;
216                            }
217
218                            // Prevent TcpStream from closing the fd on drop.
219                            // From here on, we own client_fd via epoll.
220                            std::mem::forget(stream);
221
222                            // Create writer (dup the fd so writer has independent ownership)
223                            let writer_fd = unsafe { libc::dup(client_fd) };
224                            if writer_fd < 0 {
225                                log::warn!("[{}] failed to dup client fd", name);
226                                unsafe {
227                                    libc::epoll_ctl(
228                                        epfd,
229                                        libc::EPOLL_CTL_DEL,
230                                        client_fd,
231                                        std::ptr::null_mut(),
232                                    );
233                                    libc::close(client_fd);
234                                }
235                                continue;
236                            }
237                            let writer: Box<dyn Writer> =
238                                Box::new(BackboneWriter { fd: writer_fd });
239
240                            clients.insert(
241                                client_fd,
242                                ClientState {
243                                    id: client_id,
244                                    decoder: hdlc::Decoder::new(),
245                                },
246                            );
247
248                            let info = InterfaceInfo {
249                                id: client_id,
250                                name: format!("BackboneInterface/{}", client_fd),
251                                mode: constants::MODE_FULL,
252                                out_capable: true,
253                                in_capable: true,
254                                bitrate: Some(1_000_000_000), // 1 Gbps guess
255                                announce_rate_target: None,
256                                announce_rate_grace: 0,
257                                announce_rate_penalty: 0.0,
258                                announce_cap: constants::ANNOUNCE_CAP,
259                                is_local_client: false,
260                                wants_tunnel: false,
261                                tunnel_id: None,
262                                mtu: 65535,
263                                ia_freq: 0.0,
264                                started: 0.0,
265                                ingress_control: true,
266                            };
267
268                            if tx
269                                .send(Event::InterfaceUp(
270                                    client_id,
271                                    Some(writer),
272                                    Some(info),
273                                ))
274                                .is_err()
275                            {
276                                // Driver shut down
277                                cleanup(epfd, &clients, listener_fd);
278                                return Ok(());
279                            }
280
281                        }
282                        Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
283                        Err(e) => {
284                            log::warn!("[{}] accept error: {}", name, e);
285                            break;
286                        }
287                    }
288                }
289            } else if clients.contains_key(&fd) {
290                // Client event
291                let mut should_remove = false;
292                let mut client_id = InterfaceId(0);
293
294                if ev.events & libc::EPOLLIN as u32 != 0 {
295                    let mut buf = [0u8; 4096];
296                    let n = unsafe {
297                        libc::recv(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len(), 0)
298                    };
299
300                    if n <= 0 {
301                        if let Some(c) = clients.get(&fd) {
302                            client_id = c.id;
303                        }
304                        should_remove = true;
305                    } else if let Some(client) = clients.get_mut(&fd) {
306                        client_id = client.id;
307                        for frame in client.decoder.feed(&buf[..n as usize]) {
308                            if tx
309                                .send(Event::Frame {
310                                    interface_id: client_id,
311                                    data: frame,
312                                })
313                                .is_err()
314                            {
315                                cleanup(epfd, &clients, listener_fd);
316                                return Ok(());
317                            }
318                        }
319                    }
320                }
321
322                if ev.events & (libc::EPOLLHUP | libc::EPOLLERR) as u32 != 0 {
323                    if let Some(c) = clients.get(&fd) {
324                        client_id = c.id;
325                    }
326                    should_remove = true;
327                }
328
329                if should_remove {
330                    log::info!(
331                        "[{}] backbone client {} disconnected",
332                        name,
333                        client_id.0
334                    );
335                    unsafe {
336                        libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
337                        libc::close(fd);
338                    }
339                    clients.remove(&fd);
340                    let _ = tx.send(Event::InterfaceDown(client_id));
341                }
342            }
343        }
344    }
345}
346
347fn set_tcp_keepalive(fd: RawFd) {
348    unsafe {
349        let one: libc::c_int = 1;
350        libc::setsockopt(
351            fd,
352            libc::SOL_SOCKET,
353            libc::SO_KEEPALIVE,
354            &one as *const _ as *const libc::c_void,
355            std::mem::size_of::<libc::c_int>() as libc::socklen_t,
356        );
357        let idle: libc::c_int = 5;
358        libc::setsockopt(
359            fd,
360            libc::IPPROTO_TCP,
361            libc::TCP_KEEPIDLE,
362            &idle as *const _ as *const libc::c_void,
363            std::mem::size_of::<libc::c_int>() as libc::socklen_t,
364        );
365        let interval: libc::c_int = 2;
366        libc::setsockopt(
367            fd,
368            libc::IPPROTO_TCP,
369            libc::TCP_KEEPINTVL,
370            &interval as *const _ as *const libc::c_void,
371            std::mem::size_of::<libc::c_int>() as libc::socklen_t,
372        );
373        let cnt: libc::c_int = 12;
374        libc::setsockopt(
375            fd,
376            libc::IPPROTO_TCP,
377            libc::TCP_KEEPCNT,
378            &cnt as *const _ as *const libc::c_void,
379            std::mem::size_of::<libc::c_int>() as libc::socklen_t,
380        );
381    }
382}
383
384fn cleanup(epfd: RawFd, clients: &HashMap<RawFd, ClientState>, listener_fd: RawFd) {
385    for (&fd, _) in clients {
386        unsafe {
387            libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
388            libc::close(fd);
389        }
390    }
391    unsafe {
392        libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, listener_fd, std::ptr::null_mut());
393        libc::close(epfd);
394    }
395}
396
397// ---------------------------------------------------------------------------
398// Client mode
399// ---------------------------------------------------------------------------
400
401/// Configuration for a backbone client interface.
402#[derive(Debug, Clone)]
403pub struct BackboneClientConfig {
404    pub name: String,
405    pub target_host: String,
406    pub target_port: u16,
407    pub interface_id: InterfaceId,
408    pub reconnect_wait: Duration,
409    pub max_reconnect_tries: Option<u32>,
410    pub connect_timeout: Duration,
411    pub transport_identity: Option<String>,
412}
413
414impl Default for BackboneClientConfig {
415    fn default() -> Self {
416        BackboneClientConfig {
417            name: String::new(),
418            target_host: "127.0.0.1".into(),
419            target_port: 4242,
420            interface_id: InterfaceId(0),
421            reconnect_wait: Duration::from_secs(5),
422            max_reconnect_tries: None,
423            connect_timeout: Duration::from_secs(5),
424            transport_identity: None,
425        }
426    }
427}
428
429/// Writer that sends HDLC-framed data over a TCP stream (client mode).
430struct BackboneClientWriter {
431    stream: TcpStream,
432}
433
434impl Writer for BackboneClientWriter {
435    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
436        self.stream.write_all(&hdlc::frame(data))
437    }
438}
439
440/// Try to connect to the target host:port with timeout.
441fn try_connect_client(config: &BackboneClientConfig) -> io::Result<TcpStream> {
442    let addr_str = format!("{}:{}", config.target_host, config.target_port);
443    let addr = addr_str
444        .to_socket_addrs()?
445        .next()
446        .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "no addresses resolved"))?;
447
448    let stream = TcpStream::connect_timeout(&addr, config.connect_timeout)?;
449    stream.set_nodelay(true)?;
450    set_tcp_keepalive(stream.as_raw_fd());
451    Ok(stream)
452}
453
454/// Connect and start the reader thread. Returns the writer for the driver.
455pub fn start_client(
456    config: BackboneClientConfig,
457    tx: EventSender,
458) -> io::Result<Box<dyn Writer>> {
459    let stream = try_connect_client(&config)?;
460    let reader_stream = stream.try_clone()?;
461    let writer_stream = stream.try_clone()?;
462
463    let id = config.interface_id;
464    log::info!(
465        "[{}] backbone client connected to {}:{}",
466        config.name,
467        config.target_host,
468        config.target_port
469    );
470
471    // Initial connect: writer is None because it's returned directly to the caller
472    let _ = tx.send(Event::InterfaceUp(id, None, None));
473
474    thread::Builder::new()
475        .name(format!("backbone-client-{}", id.0))
476        .spawn(move || {
477            client_reader_loop(reader_stream, config, tx);
478        })?;
479
480    Ok(Box::new(BackboneClientWriter { stream: writer_stream }))
481}
482
483/// Reader thread: reads from socket, HDLC-decodes, sends frames to driver.
484/// On disconnect, attempts reconnection.
485fn client_reader_loop(mut stream: TcpStream, config: BackboneClientConfig, tx: EventSender) {
486    let id = config.interface_id;
487    let mut decoder = hdlc::Decoder::new();
488    let mut buf = [0u8; 4096];
489
490    loop {
491        match stream.read(&mut buf) {
492            Ok(0) => {
493                log::warn!("[{}] connection closed", config.name);
494                let _ = tx.send(Event::InterfaceDown(id));
495                match client_reconnect(&config, &tx) {
496                    Some(new_stream) => {
497                        stream = new_stream;
498                        decoder = hdlc::Decoder::new();
499                        continue;
500                    }
501                    None => {
502                        log::error!("[{}] reconnection failed, giving up", config.name);
503                        return;
504                    }
505                }
506            }
507            Ok(n) => {
508                for frame in decoder.feed(&buf[..n]) {
509                    if tx
510                        .send(Event::Frame {
511                            interface_id: id,
512                            data: frame,
513                        })
514                        .is_err()
515                    {
516                        return;
517                    }
518                }
519            }
520            Err(e) => {
521                log::warn!("[{}] read error: {}", config.name, e);
522                let _ = tx.send(Event::InterfaceDown(id));
523                match client_reconnect(&config, &tx) {
524                    Some(new_stream) => {
525                        stream = new_stream;
526                        decoder = hdlc::Decoder::new();
527                        continue;
528                    }
529                    None => {
530                        log::error!("[{}] reconnection failed, giving up", config.name);
531                        return;
532                    }
533                }
534            }
535        }
536    }
537}
538
539/// Attempt to reconnect with retry logic. Returns the new reader stream on success.
540/// Sends the new writer to the driver via InterfaceUp event.
541fn client_reconnect(config: &BackboneClientConfig, tx: &EventSender) -> Option<TcpStream> {
542    let mut attempts = 0u32;
543    loop {
544        thread::sleep(config.reconnect_wait);
545        attempts += 1;
546
547        if let Some(max) = config.max_reconnect_tries {
548            if attempts > max {
549                let _ = tx.send(Event::InterfaceDown(config.interface_id));
550                return None;
551            }
552        }
553
554        log::info!(
555            "[{}] reconnect attempt {} ...",
556            config.name,
557            attempts
558        );
559
560        match try_connect_client(config) {
561            Ok(new_stream) => {
562                let writer_stream = match new_stream.try_clone() {
563                    Ok(s) => s,
564                    Err(e) => {
565                        log::warn!("[{}] failed to clone stream: {}", config.name, e);
566                        continue;
567                    }
568                };
569                log::info!("[{}] reconnected", config.name);
570                let new_writer: Box<dyn Writer> =
571                    Box::new(BackboneClientWriter { stream: writer_stream });
572                let _ = tx.send(Event::InterfaceUp(config.interface_id, Some(new_writer), None));
573                return Some(new_stream);
574            }
575            Err(e) => {
576                log::warn!("[{}] reconnect failed: {}", config.name, e);
577            }
578        }
579    }
580}
581
582// ---------------------------------------------------------------------------
583// Factory
584// ---------------------------------------------------------------------------
585
586/// Internal enum used by [`BackboneInterfaceFactory`] to carry either a
587/// server or client config through the opaque `InterfaceConfigData` channel.
588enum BackboneMode {
589    Server(BackboneConfig),
590    Client(BackboneClientConfig),
591}
592
593/// Factory for `BackboneInterface`.
594///
595/// If the config params contain `"remote"` or `"target_host"` the interface
596/// is started in client mode; otherwise it is started as a TCP listener
597/// (server mode).
598pub struct BackboneInterfaceFactory;
599
600impl InterfaceFactory for BackboneInterfaceFactory {
601    fn type_name(&self) -> &str { "BackboneInterface" }
602
603    fn parse_config(
604        &self,
605        name: &str,
606        id: InterfaceId,
607        params: &HashMap<String, String>,
608    ) -> Result<Box<dyn InterfaceConfigData>, String> {
609        if let Some(target_host) = params.get("remote").or_else(|| params.get("target_host")) {
610            // Client mode
611            let target_host = target_host.clone();
612            let target_port = params.get("target_port")
613                .or_else(|| params.get("port"))
614                .and_then(|v| v.parse().ok())
615                .unwrap_or(4242);
616            let transport_identity = params.get("transport_identity").cloned();
617            Ok(Box::new(BackboneMode::Client(BackboneClientConfig {
618                name: name.to_string(),
619                target_host,
620                target_port,
621                interface_id: id,
622                transport_identity,
623                ..BackboneClientConfig::default()
624            })))
625        } else {
626            // Server mode
627            let listen_ip = params.get("listen_ip")
628                .or_else(|| params.get("device"))
629                .cloned()
630                .unwrap_or_else(|| "0.0.0.0".into());
631            let listen_port = params.get("listen_port")
632                .or_else(|| params.get("port"))
633                .and_then(|v| v.parse().ok())
634                .unwrap_or(4242);
635            Ok(Box::new(BackboneMode::Server(BackboneConfig {
636                name: name.to_string(),
637                listen_ip,
638                listen_port,
639                interface_id: id,
640            })))
641        }
642    }
643
644    fn start(
645        &self,
646        config: Box<dyn InterfaceConfigData>,
647        ctx: StartContext,
648    ) -> io::Result<StartResult> {
649        let mode = *config.into_any()
650            .downcast::<BackboneMode>()
651            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type for BackboneInterface"))?;
652
653        match mode {
654            BackboneMode::Client(cfg) => {
655                let id = cfg.interface_id;
656                let name = cfg.name.clone();
657                let info = InterfaceInfo {
658                    id,
659                    name,
660                    mode: ctx.mode,
661                    out_capable: true,
662                    in_capable: true,
663                    bitrate: Some(1_000_000_000),
664                    announce_rate_target: None,
665                    announce_rate_grace: 0,
666                    announce_rate_penalty: 0.0,
667                    announce_cap: constants::ANNOUNCE_CAP,
668                    is_local_client: false,
669                    wants_tunnel: false,
670                    tunnel_id: None,
671                    mtu: 65535,
672                    ingress_control: true,
673                    ia_freq: 0.0,
674                    started: crate::time::now(),
675                };
676                let writer = start_client(cfg, ctx.tx)?;
677                Ok(StartResult::Simple {
678                    id,
679                    info,
680                    writer,
681                    interface_type_name: "BackboneInterface".to_string(),
682                })
683            }
684            BackboneMode::Server(cfg) => {
685                start(cfg, ctx.tx, ctx.next_dynamic_id)?;
686                Ok(StartResult::Listener)
687            }
688        }
689    }
690}
691
692#[cfg(test)]
693mod tests {
694    use super::*;
695    use std::sync::mpsc;
696    use std::time::Duration;
697
698    fn find_free_port() -> u16 {
699        TcpListener::bind("127.0.0.1:0")
700            .unwrap()
701            .local_addr()
702            .unwrap()
703            .port()
704    }
705
706    #[test]
707    fn backbone_accept_connection() {
708        let port = find_free_port();
709        let (tx, rx) = mpsc::channel();
710        let next_id = Arc::new(AtomicU64::new(8000));
711
712        let config = BackboneConfig {
713            name: "test-backbone".into(),
714            listen_ip: "127.0.0.1".into(),
715            listen_port: port,
716            interface_id: InterfaceId(80),
717        };
718
719        start(config, tx, next_id).unwrap();
720        thread::sleep(Duration::from_millis(50));
721
722        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
723
724        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
725        match event {
726            Event::InterfaceUp(id, writer, info) => {
727                assert_eq!(id, InterfaceId(8000));
728                assert!(writer.is_some());
729                assert!(info.is_some());
730                let info = info.unwrap();
731                assert!(info.out_capable);
732                assert!(info.in_capable);
733            }
734            other => panic!("expected InterfaceUp, got {:?}", other),
735        }
736    }
737
738    #[test]
739    fn backbone_receive_frame() {
740        let port = find_free_port();
741        let (tx, rx) = mpsc::channel();
742        let next_id = Arc::new(AtomicU64::new(8100));
743
744        let config = BackboneConfig {
745            name: "test-backbone".into(),
746            listen_ip: "127.0.0.1".into(),
747            listen_port: port,
748            interface_id: InterfaceId(81),
749        };
750
751        start(config, tx, next_id).unwrap();
752        thread::sleep(Duration::from_millis(50));
753
754        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
755
756        // Drain InterfaceUp
757        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
758
759        // Send HDLC frame (>= 19 bytes)
760        let payload: Vec<u8> = (0..32).collect();
761        client.write_all(&hdlc::frame(&payload)).unwrap();
762
763        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
764        match event {
765            Event::Frame { interface_id, data } => {
766                assert_eq!(interface_id, InterfaceId(8100));
767                assert_eq!(data, payload);
768            }
769            other => panic!("expected Frame, got {:?}", other),
770        }
771    }
772
773    #[test]
774    fn backbone_send_to_client() {
775        let port = find_free_port();
776        let (tx, rx) = mpsc::channel();
777        let next_id = Arc::new(AtomicU64::new(8200));
778
779        let config = BackboneConfig {
780            name: "test-backbone".into(),
781            listen_ip: "127.0.0.1".into(),
782            listen_port: port,
783            interface_id: InterfaceId(82),
784        };
785
786        start(config, tx, next_id).unwrap();
787        thread::sleep(Duration::from_millis(50));
788
789        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
790        client.set_read_timeout(Some(Duration::from_secs(2))).unwrap();
791
792        // Get writer from InterfaceUp
793        let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
794        let mut writer = match event {
795            Event::InterfaceUp(_, Some(w), _) => w,
796            other => panic!("expected InterfaceUp with writer, got {:?}", other),
797        };
798
799        // Send frame via writer
800        let payload: Vec<u8> = (0..24).collect();
801        writer.send_frame(&payload).unwrap();
802
803        // Read from client
804        let mut buf = [0u8; 256];
805        let n = client.read(&mut buf).unwrap();
806        let expected = hdlc::frame(&payload);
807        assert_eq!(&buf[..n], &expected[..]);
808    }
809
810    #[test]
811    fn backbone_multiple_clients() {
812        let port = find_free_port();
813        let (tx, rx) = mpsc::channel();
814        let next_id = Arc::new(AtomicU64::new(8300));
815
816        let config = BackboneConfig {
817            name: "test-backbone".into(),
818            listen_ip: "127.0.0.1".into(),
819            listen_port: port,
820            interface_id: InterfaceId(83),
821        };
822
823        start(config, tx, next_id).unwrap();
824        thread::sleep(Duration::from_millis(50));
825
826        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
827        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
828
829        let mut ids = Vec::new();
830        for _ in 0..2 {
831            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
832            match event {
833                Event::InterfaceUp(id, _, _) => ids.push(id),
834                other => panic!("expected InterfaceUp, got {:?}", other),
835            }
836        }
837
838        assert_eq!(ids.len(), 2);
839        assert_ne!(ids[0], ids[1]);
840    }
841
842    #[test]
843    fn backbone_client_disconnect() {
844        let port = find_free_port();
845        let (tx, rx) = mpsc::channel();
846        let next_id = Arc::new(AtomicU64::new(8400));
847
848        let config = BackboneConfig {
849            name: "test-backbone".into(),
850            listen_ip: "127.0.0.1".into(),
851            listen_port: port,
852            interface_id: InterfaceId(84),
853        };
854
855        start(config, tx, next_id).unwrap();
856        thread::sleep(Duration::from_millis(50));
857
858        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
859
860        // Drain InterfaceUp
861        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
862
863        // Disconnect
864        drop(client);
865
866        // Should receive InterfaceDown
867        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
868        assert!(
869            matches!(event, Event::InterfaceDown(InterfaceId(8400))),
870            "expected InterfaceDown(8400), got {:?}",
871            event
872        );
873    }
874
875    #[test]
876    fn backbone_epoll_multiplexing() {
877        let port = find_free_port();
878        let (tx, rx) = mpsc::channel();
879        let next_id = Arc::new(AtomicU64::new(8500));
880
881        let config = BackboneConfig {
882            name: "test-backbone".into(),
883            listen_ip: "127.0.0.1".into(),
884            listen_port: port,
885            interface_id: InterfaceId(85),
886        };
887
888        start(config, tx, next_id).unwrap();
889        thread::sleep(Duration::from_millis(50));
890
891        let mut client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
892        let mut client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
893
894        // Drain both InterfaceUp events
895        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
896        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
897
898        // Both clients send data simultaneously
899        let payload1: Vec<u8> = (0..24).collect();
900        let payload2: Vec<u8> = (100..130).collect();
901        client1.write_all(&hdlc::frame(&payload1)).unwrap();
902        client2.write_all(&hdlc::frame(&payload2)).unwrap();
903
904        // Should receive both Frame events
905        let mut received = Vec::new();
906        for _ in 0..2 {
907            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
908            match event {
909                Event::Frame { data, .. } => received.push(data),
910                other => panic!("expected Frame, got {:?}", other),
911            }
912        }
913        assert!(received.contains(&payload1));
914        assert!(received.contains(&payload2));
915    }
916
917    #[test]
918    fn backbone_bind_port() {
919        let port = find_free_port();
920        let (tx, _rx) = mpsc::channel();
921        let next_id = Arc::new(AtomicU64::new(8600));
922
923        let config = BackboneConfig {
924            name: "test-backbone".into(),
925            listen_ip: "127.0.0.1".into(),
926            listen_port: port,
927            interface_id: InterfaceId(86),
928        };
929
930        // Should not error
931        start(config, tx, next_id).unwrap();
932    }
933
934    #[test]
935    fn backbone_hdlc_fragmented() {
936        let port = find_free_port();
937        let (tx, rx) = mpsc::channel();
938        let next_id = Arc::new(AtomicU64::new(8700));
939
940        let config = BackboneConfig {
941            name: "test-backbone".into(),
942            listen_ip: "127.0.0.1".into(),
943            listen_port: port,
944            interface_id: InterfaceId(87),
945        };
946
947        start(config, tx, next_id).unwrap();
948        thread::sleep(Duration::from_millis(50));
949
950        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
951        client.set_nodelay(true).unwrap();
952
953        // Drain InterfaceUp
954        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
955
956        // Send HDLC frame in two fragments
957        let payload: Vec<u8> = (0..32).collect();
958        let framed = hdlc::frame(&payload);
959        let mid = framed.len() / 2;
960
961        client.write_all(&framed[..mid]).unwrap();
962        thread::sleep(Duration::from_millis(50));
963        client.write_all(&framed[mid..]).unwrap();
964
965        // Should receive reassembled frame
966        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
967        match event {
968            Event::Frame { data, .. } => {
969                assert_eq!(data, payload);
970            }
971            other => panic!("expected Frame, got {:?}", other),
972        }
973    }
974
975    // -----------------------------------------------------------------------
976    // Client mode tests
977    // -----------------------------------------------------------------------
978
979    fn make_client_config(port: u16, id: u64) -> BackboneClientConfig {
980        BackboneClientConfig {
981            name: format!("test-bb-client-{}", port),
982            target_host: "127.0.0.1".into(),
983            target_port: port,
984            interface_id: InterfaceId(id),
985            reconnect_wait: Duration::from_millis(100),
986            max_reconnect_tries: Some(2),
987            connect_timeout: Duration::from_secs(2),
988            transport_identity: None,
989        }
990    }
991
992    #[test]
993    fn backbone_client_connect() {
994        let port = find_free_port();
995        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
996        let (tx, rx) = mpsc::channel();
997
998        let config = make_client_config(port, 9000);
999        let _writer = start_client(config, tx).unwrap();
1000
1001        let _server_stream = listener.accept().unwrap();
1002
1003        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1004        assert!(matches!(event, Event::InterfaceUp(InterfaceId(9000), _, _)));
1005    }
1006
1007    #[test]
1008    fn backbone_client_receive_frame() {
1009        let port = find_free_port();
1010        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1011        let (tx, rx) = mpsc::channel();
1012
1013        let config = make_client_config(port, 9100);
1014        let _writer = start_client(config, tx).unwrap();
1015
1016        let (mut server_stream, _) = listener.accept().unwrap();
1017
1018        // Drain InterfaceUp
1019        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1020
1021        // Send HDLC frame from server side (>= 19 bytes payload)
1022        let payload: Vec<u8> = (0..32).collect();
1023        server_stream.write_all(&hdlc::frame(&payload)).unwrap();
1024
1025        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1026        match event {
1027            Event::Frame { interface_id, data } => {
1028                assert_eq!(interface_id, InterfaceId(9100));
1029                assert_eq!(data, payload);
1030            }
1031            other => panic!("expected Frame, got {:?}", other),
1032        }
1033    }
1034
1035    #[test]
1036    fn backbone_client_send_frame() {
1037        let port = find_free_port();
1038        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1039        let (tx, _rx) = mpsc::channel();
1040
1041        let config = make_client_config(port, 9200);
1042        let mut writer = start_client(config, tx).unwrap();
1043
1044        let (mut server_stream, _) = listener.accept().unwrap();
1045        server_stream
1046            .set_read_timeout(Some(Duration::from_secs(2)))
1047            .unwrap();
1048
1049        let payload: Vec<u8> = (0..24).collect();
1050        writer.send_frame(&payload).unwrap();
1051
1052        let mut buf = [0u8; 256];
1053        let n = server_stream.read(&mut buf).unwrap();
1054        let expected = hdlc::frame(&payload);
1055        assert_eq!(&buf[..n], &expected[..]);
1056    }
1057
1058    #[test]
1059    fn backbone_client_reconnect() {
1060        let port = find_free_port();
1061        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1062        listener.set_nonblocking(false).unwrap();
1063        let (tx, rx) = mpsc::channel();
1064
1065        let config = make_client_config(port, 9300);
1066        let _writer = start_client(config, tx).unwrap();
1067
1068        // Accept first connection and immediately close it
1069        let (server_stream, _) = listener.accept().unwrap();
1070
1071        // Drain InterfaceUp
1072        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1073
1074        drop(server_stream);
1075
1076        // Should get InterfaceDown
1077        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1078        assert!(matches!(event, Event::InterfaceDown(InterfaceId(9300))));
1079
1080        // Accept the reconnection
1081        let _server_stream2 = listener.accept().unwrap();
1082
1083        // Should get InterfaceUp again
1084        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1085        assert!(matches!(event, Event::InterfaceUp(InterfaceId(9300), _, _)));
1086    }
1087}