Skip to main content

rns_net/interface/
backbone.rs

1//! Backbone TCP mesh interface using Linux epoll.
2//!
3//! Server mode: listens on a TCP port, accepts peer connections, spawns
4//! dynamic per-peer interfaces. Uses a single epoll thread to multiplex
5//! all client sockets. HDLC framing for packet boundaries.
6//!
7//! Client mode: connects to a remote backbone server, single TCP connection
8//! with HDLC framing. Reconnects on disconnect.
9//!
10//! Matches Python `BackboneInterface.py`.
11
12use std::collections::HashMap;
13use std::io::{self, Read, Write};
14use std::net::{TcpListener, TcpStream, ToSocketAddrs};
15use std::os::unix::io::{AsRawFd, RawFd};
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::sync::Arc;
18use std::thread;
19use std::time::Duration;
20
21use rns_core::constants;
22use rns_core::transport::types::{InterfaceId, InterfaceInfo};
23
24use crate::event::{Event, EventSender};
25use crate::hdlc;
26use crate::interface::Writer;
27
28/// HW_MTU: 1 MB (matches Python BackboneInterface.HW_MTU)
29#[allow(dead_code)]
30const HW_MTU: usize = 1_048_576;
31
32/// Configuration for a backbone interface.
33#[derive(Debug, Clone)]
34pub struct BackboneConfig {
35    pub name: String,
36    pub listen_ip: String,
37    pub listen_port: u16,
38    pub interface_id: InterfaceId,
39}
40
41impl Default for BackboneConfig {
42    fn default() -> Self {
43        BackboneConfig {
44            name: String::new(),
45            listen_ip: "0.0.0.0".into(),
46            listen_port: 0,
47            interface_id: InterfaceId(0),
48        }
49    }
50}
51
52/// Writer that sends HDLC-framed data directly via socket write.
53struct BackboneWriter {
54    fd: RawFd,
55}
56
57impl Writer for BackboneWriter {
58    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
59        let framed = hdlc::frame(data);
60        let mut offset = 0;
61        while offset < framed.len() {
62            let n = unsafe {
63                libc::send(
64                    self.fd,
65                    framed[offset..].as_ptr() as *const libc::c_void,
66                    framed.len() - offset,
67                    libc::MSG_NOSIGNAL,
68                )
69            };
70            if n < 0 {
71                return Err(io::Error::last_os_error());
72            }
73            offset += n as usize;
74        }
75        Ok(())
76    }
77}
78
79// BackboneWriter's fd is a dup'd copy — we own it
80impl Drop for BackboneWriter {
81    fn drop(&mut self) {
82        unsafe {
83            libc::close(self.fd);
84        }
85    }
86}
87
88/// Safety: the fd is only accessed via send/close which are thread-safe.
89unsafe impl Send for BackboneWriter {}
90
91/// Start a backbone interface. Binds TCP listener, spawns epoll thread.
92pub fn start(
93    config: BackboneConfig,
94    tx: EventSender,
95    next_id: Arc<AtomicU64>,
96) -> io::Result<()> {
97    let addr = format!("{}:{}", config.listen_ip, config.listen_port);
98    let listener = TcpListener::bind(&addr)?;
99    listener.set_nonblocking(true)?;
100
101    log::info!(
102        "[{}] backbone server listening on {}",
103        config.name,
104        listener.local_addr().unwrap_or(addr.parse().unwrap())
105    );
106
107    let name = config.name.clone();
108    thread::Builder::new()
109        .name(format!("backbone-epoll-{}", config.interface_id.0))
110        .spawn(move || {
111            if let Err(e) = epoll_loop(listener, name, tx, next_id) {
112                log::error!("backbone epoll loop error: {}", e);
113            }
114        })?;
115
116    Ok(())
117}
118
119/// Per-client tracking state.
120struct ClientState {
121    id: InterfaceId,
122    decoder: hdlc::Decoder,
123}
124
125/// Main epoll event loop.
126fn epoll_loop(
127    listener: TcpListener,
128    name: String,
129    tx: EventSender,
130    next_id: Arc<AtomicU64>,
131) -> io::Result<()> {
132    let epfd = unsafe { libc::epoll_create1(0) };
133    if epfd < 0 {
134        return Err(io::Error::last_os_error());
135    }
136
137    // Register listener
138    let listener_fd = listener.as_raw_fd();
139    let mut ev = libc::epoll_event {
140        events: libc::EPOLLIN as u32,
141        u64: listener_fd as u64,
142    };
143    if unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, listener_fd, &mut ev) } < 0 {
144        unsafe { libc::close(epfd) };
145        return Err(io::Error::last_os_error());
146    }
147
148    let mut clients: HashMap<RawFd, ClientState> = HashMap::new();
149    let mut events = vec![libc::epoll_event { events: 0, u64: 0 }; 64];
150
151    loop {
152        let nfds = unsafe {
153            libc::epoll_wait(epfd, events.as_mut_ptr(), events.len() as i32, 1000)
154        };
155
156        if nfds < 0 {
157            let err = io::Error::last_os_error();
158            if err.kind() == io::ErrorKind::Interrupted {
159                continue;
160            }
161            // Clean up
162            for (&fd, _) in &clients {
163                unsafe {
164                    libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
165                    libc::close(fd);
166                }
167            }
168            unsafe { libc::close(epfd) };
169            return Err(err);
170        }
171
172        for i in 0..nfds as usize {
173            let ev = &events[i];
174            let fd = ev.u64 as RawFd;
175
176            if fd == listener_fd {
177                // Accept new connection
178                loop {
179                    match listener.accept() {
180                        Ok((stream, peer_addr)) => {
181                            let client_fd = stream.as_raw_fd();
182
183                            // Set non-blocking
184                            stream.set_nonblocking(true).ok();
185                            stream.set_nodelay(true).ok();
186
187                            // Set SO_KEEPALIVE and TCP options
188                            set_tcp_keepalive(client_fd);
189
190                            let client_id =
191                                InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
192
193                            log::info!(
194                                "[{}] backbone client connected: {} → id {}",
195                                name,
196                                peer_addr,
197                                client_id.0
198                            );
199
200                            // Register client fd with epoll
201                            let mut cev = libc::epoll_event {
202                                events: libc::EPOLLIN as u32,
203                                u64: client_fd as u64,
204                            };
205                            if unsafe {
206                                libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, client_fd, &mut cev)
207                            } < 0
208                            {
209                                log::warn!(
210                                    "[{}] failed to add client to epoll: {}",
211                                    name,
212                                    io::Error::last_os_error()
213                                );
214                                // stream drops here, closing client_fd — correct
215                                continue;
216                            }
217
218                            // Prevent TcpStream from closing the fd on drop.
219                            // From here on, we own client_fd via epoll.
220                            std::mem::forget(stream);
221
222                            // Create writer (dup the fd so writer has independent ownership)
223                            let writer_fd = unsafe { libc::dup(client_fd) };
224                            if writer_fd < 0 {
225                                log::warn!("[{}] failed to dup client fd", name);
226                                unsafe {
227                                    libc::epoll_ctl(
228                                        epfd,
229                                        libc::EPOLL_CTL_DEL,
230                                        client_fd,
231                                        std::ptr::null_mut(),
232                                    );
233                                    libc::close(client_fd);
234                                }
235                                continue;
236                            }
237                            let writer: Box<dyn Writer> =
238                                Box::new(BackboneWriter { fd: writer_fd });
239
240                            clients.insert(
241                                client_fd,
242                                ClientState {
243                                    id: client_id,
244                                    decoder: hdlc::Decoder::new(),
245                                },
246                            );
247
248                            let info = InterfaceInfo {
249                                id: client_id,
250                                name: format!("BackboneInterface/{}", client_fd),
251                                mode: constants::MODE_FULL,
252                                out_capable: true,
253                                in_capable: true,
254                                bitrate: Some(1_000_000_000), // 1 Gbps guess
255                                announce_rate_target: None,
256                                announce_rate_grace: 0,
257                                announce_rate_penalty: 0.0,
258                                announce_cap: constants::ANNOUNCE_CAP,
259                                is_local_client: false,
260                                wants_tunnel: false,
261                                tunnel_id: None,
262                                mtu: 65535,
263                                ia_freq: 0.0,
264                                started: 0.0,
265                                ingress_control: true,
266                            };
267
268                            if tx
269                                .send(Event::InterfaceUp(
270                                    client_id,
271                                    Some(writer),
272                                    Some(info),
273                                ))
274                                .is_err()
275                            {
276                                // Driver shut down
277                                cleanup(epfd, &clients, listener_fd);
278                                return Ok(());
279                            }
280
281                        }
282                        Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
283                        Err(e) => {
284                            log::warn!("[{}] accept error: {}", name, e);
285                            break;
286                        }
287                    }
288                }
289            } else if clients.contains_key(&fd) {
290                // Client event
291                let mut should_remove = false;
292                let mut client_id = InterfaceId(0);
293
294                if ev.events & libc::EPOLLIN as u32 != 0 {
295                    let mut buf = [0u8; 4096];
296                    let n = unsafe {
297                        libc::recv(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len(), 0)
298                    };
299
300                    if n <= 0 {
301                        if let Some(c) = clients.get(&fd) {
302                            client_id = c.id;
303                        }
304                        should_remove = true;
305                    } else if let Some(client) = clients.get_mut(&fd) {
306                        client_id = client.id;
307                        for frame in client.decoder.feed(&buf[..n as usize]) {
308                            if tx
309                                .send(Event::Frame {
310                                    interface_id: client_id,
311                                    data: frame,
312                                })
313                                .is_err()
314                            {
315                                cleanup(epfd, &clients, listener_fd);
316                                return Ok(());
317                            }
318                        }
319                    }
320                }
321
322                if ev.events & (libc::EPOLLHUP | libc::EPOLLERR) as u32 != 0 {
323                    if let Some(c) = clients.get(&fd) {
324                        client_id = c.id;
325                    }
326                    should_remove = true;
327                }
328
329                if should_remove {
330                    log::info!(
331                        "[{}] backbone client {} disconnected",
332                        name,
333                        client_id.0
334                    );
335                    unsafe {
336                        libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
337                        libc::close(fd);
338                    }
339                    clients.remove(&fd);
340                    let _ = tx.send(Event::InterfaceDown(client_id));
341                }
342            }
343        }
344    }
345}
346
347fn set_tcp_keepalive(fd: RawFd) {
348    unsafe {
349        let one: libc::c_int = 1;
350        libc::setsockopt(
351            fd,
352            libc::SOL_SOCKET,
353            libc::SO_KEEPALIVE,
354            &one as *const _ as *const libc::c_void,
355            std::mem::size_of::<libc::c_int>() as libc::socklen_t,
356        );
357        let idle: libc::c_int = 5;
358        libc::setsockopt(
359            fd,
360            libc::IPPROTO_TCP,
361            libc::TCP_KEEPIDLE,
362            &idle as *const _ as *const libc::c_void,
363            std::mem::size_of::<libc::c_int>() as libc::socklen_t,
364        );
365        let interval: libc::c_int = 2;
366        libc::setsockopt(
367            fd,
368            libc::IPPROTO_TCP,
369            libc::TCP_KEEPINTVL,
370            &interval as *const _ as *const libc::c_void,
371            std::mem::size_of::<libc::c_int>() as libc::socklen_t,
372        );
373        let cnt: libc::c_int = 12;
374        libc::setsockopt(
375            fd,
376            libc::IPPROTO_TCP,
377            libc::TCP_KEEPCNT,
378            &cnt as *const _ as *const libc::c_void,
379            std::mem::size_of::<libc::c_int>() as libc::socklen_t,
380        );
381    }
382}
383
384fn cleanup(epfd: RawFd, clients: &HashMap<RawFd, ClientState>, listener_fd: RawFd) {
385    for (&fd, _) in clients {
386        unsafe {
387            libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
388            libc::close(fd);
389        }
390    }
391    unsafe {
392        libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, listener_fd, std::ptr::null_mut());
393        libc::close(epfd);
394    }
395}
396
397// ---------------------------------------------------------------------------
398// Client mode
399// ---------------------------------------------------------------------------
400
401/// Configuration for a backbone client interface.
402#[derive(Debug, Clone)]
403pub struct BackboneClientConfig {
404    pub name: String,
405    pub target_host: String,
406    pub target_port: u16,
407    pub interface_id: InterfaceId,
408    pub reconnect_wait: Duration,
409    pub max_reconnect_tries: Option<u32>,
410    pub connect_timeout: Duration,
411    pub transport_identity: Option<String>,
412}
413
414impl Default for BackboneClientConfig {
415    fn default() -> Self {
416        BackboneClientConfig {
417            name: String::new(),
418            target_host: "127.0.0.1".into(),
419            target_port: 4242,
420            interface_id: InterfaceId(0),
421            reconnect_wait: Duration::from_secs(5),
422            max_reconnect_tries: None,
423            connect_timeout: Duration::from_secs(5),
424            transport_identity: None,
425        }
426    }
427}
428
429/// Writer that sends HDLC-framed data over a TCP stream (client mode).
430struct BackboneClientWriter {
431    stream: TcpStream,
432}
433
434impl Writer for BackboneClientWriter {
435    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
436        self.stream.write_all(&hdlc::frame(data))
437    }
438}
439
440/// Try to connect to the target host:port with timeout.
441fn try_connect_client(config: &BackboneClientConfig) -> io::Result<TcpStream> {
442    let addr_str = format!("{}:{}", config.target_host, config.target_port);
443    let addr = addr_str
444        .to_socket_addrs()?
445        .next()
446        .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "no addresses resolved"))?;
447
448    let stream = TcpStream::connect_timeout(&addr, config.connect_timeout)?;
449    stream.set_nodelay(true)?;
450    set_tcp_keepalive(stream.as_raw_fd());
451    Ok(stream)
452}
453
454/// Connect and start the reader thread. Returns the writer for the driver.
455pub fn start_client(
456    config: BackboneClientConfig,
457    tx: EventSender,
458) -> io::Result<Box<dyn Writer>> {
459    let stream = try_connect_client(&config)?;
460    let reader_stream = stream.try_clone()?;
461    let writer_stream = stream.try_clone()?;
462
463    let id = config.interface_id;
464    log::info!(
465        "[{}] backbone client connected to {}:{}",
466        config.name,
467        config.target_host,
468        config.target_port
469    );
470
471    // Initial connect: writer is None because it's returned directly to the caller
472    let _ = tx.send(Event::InterfaceUp(id, None, None));
473
474    thread::Builder::new()
475        .name(format!("backbone-client-{}", id.0))
476        .spawn(move || {
477            client_reader_loop(reader_stream, config, tx);
478        })?;
479
480    Ok(Box::new(BackboneClientWriter { stream: writer_stream }))
481}
482
483/// Reader thread: reads from socket, HDLC-decodes, sends frames to driver.
484/// On disconnect, attempts reconnection.
485fn client_reader_loop(mut stream: TcpStream, config: BackboneClientConfig, tx: EventSender) {
486    let id = config.interface_id;
487    let mut decoder = hdlc::Decoder::new();
488    let mut buf = [0u8; 4096];
489
490    loop {
491        match stream.read(&mut buf) {
492            Ok(0) => {
493                log::warn!("[{}] connection closed", config.name);
494                let _ = tx.send(Event::InterfaceDown(id));
495                match client_reconnect(&config, &tx) {
496                    Some(new_stream) => {
497                        stream = new_stream;
498                        decoder = hdlc::Decoder::new();
499                        continue;
500                    }
501                    None => {
502                        log::error!("[{}] reconnection failed, giving up", config.name);
503                        return;
504                    }
505                }
506            }
507            Ok(n) => {
508                for frame in decoder.feed(&buf[..n]) {
509                    if tx
510                        .send(Event::Frame {
511                            interface_id: id,
512                            data: frame,
513                        })
514                        .is_err()
515                    {
516                        return;
517                    }
518                }
519            }
520            Err(e) => {
521                log::warn!("[{}] read error: {}", config.name, e);
522                let _ = tx.send(Event::InterfaceDown(id));
523                match client_reconnect(&config, &tx) {
524                    Some(new_stream) => {
525                        stream = new_stream;
526                        decoder = hdlc::Decoder::new();
527                        continue;
528                    }
529                    None => {
530                        log::error!("[{}] reconnection failed, giving up", config.name);
531                        return;
532                    }
533                }
534            }
535        }
536    }
537}
538
539/// Attempt to reconnect with retry logic. Returns the new reader stream on success.
540/// Sends the new writer to the driver via InterfaceUp event.
541fn client_reconnect(config: &BackboneClientConfig, tx: &EventSender) -> Option<TcpStream> {
542    let mut attempts = 0u32;
543    loop {
544        thread::sleep(config.reconnect_wait);
545        attempts += 1;
546
547        if let Some(max) = config.max_reconnect_tries {
548            if attempts > max {
549                let _ = tx.send(Event::InterfaceDown(config.interface_id));
550                return None;
551            }
552        }
553
554        log::info!(
555            "[{}] reconnect attempt {} ...",
556            config.name,
557            attempts
558        );
559
560        match try_connect_client(config) {
561            Ok(new_stream) => {
562                let writer_stream = match new_stream.try_clone() {
563                    Ok(s) => s,
564                    Err(e) => {
565                        log::warn!("[{}] failed to clone stream: {}", config.name, e);
566                        continue;
567                    }
568                };
569                log::info!("[{}] reconnected", config.name);
570                let new_writer: Box<dyn Writer> =
571                    Box::new(BackboneClientWriter { stream: writer_stream });
572                let _ = tx.send(Event::InterfaceUp(config.interface_id, Some(new_writer), None));
573                return Some(new_stream);
574            }
575            Err(e) => {
576                log::warn!("[{}] reconnect failed: {}", config.name, e);
577            }
578        }
579    }
580}
581
582#[cfg(test)]
583mod tests {
584    use super::*;
585    use std::sync::mpsc;
586    use std::time::Duration;
587
588    fn find_free_port() -> u16 {
589        TcpListener::bind("127.0.0.1:0")
590            .unwrap()
591            .local_addr()
592            .unwrap()
593            .port()
594    }
595
596    #[test]
597    fn backbone_accept_connection() {
598        let port = find_free_port();
599        let (tx, rx) = mpsc::channel();
600        let next_id = Arc::new(AtomicU64::new(8000));
601
602        let config = BackboneConfig {
603            name: "test-backbone".into(),
604            listen_ip: "127.0.0.1".into(),
605            listen_port: port,
606            interface_id: InterfaceId(80),
607        };
608
609        start(config, tx, next_id).unwrap();
610        thread::sleep(Duration::from_millis(50));
611
612        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
613
614        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
615        match event {
616            Event::InterfaceUp(id, writer, info) => {
617                assert_eq!(id, InterfaceId(8000));
618                assert!(writer.is_some());
619                assert!(info.is_some());
620                let info = info.unwrap();
621                assert!(info.out_capable);
622                assert!(info.in_capable);
623            }
624            other => panic!("expected InterfaceUp, got {:?}", other),
625        }
626    }
627
628    #[test]
629    fn backbone_receive_frame() {
630        let port = find_free_port();
631        let (tx, rx) = mpsc::channel();
632        let next_id = Arc::new(AtomicU64::new(8100));
633
634        let config = BackboneConfig {
635            name: "test-backbone".into(),
636            listen_ip: "127.0.0.1".into(),
637            listen_port: port,
638            interface_id: InterfaceId(81),
639        };
640
641        start(config, tx, next_id).unwrap();
642        thread::sleep(Duration::from_millis(50));
643
644        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
645
646        // Drain InterfaceUp
647        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
648
649        // Send HDLC frame (>= 19 bytes)
650        let payload: Vec<u8> = (0..32).collect();
651        client.write_all(&hdlc::frame(&payload)).unwrap();
652
653        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
654        match event {
655            Event::Frame { interface_id, data } => {
656                assert_eq!(interface_id, InterfaceId(8100));
657                assert_eq!(data, payload);
658            }
659            other => panic!("expected Frame, got {:?}", other),
660        }
661    }
662
663    #[test]
664    fn backbone_send_to_client() {
665        let port = find_free_port();
666        let (tx, rx) = mpsc::channel();
667        let next_id = Arc::new(AtomicU64::new(8200));
668
669        let config = BackboneConfig {
670            name: "test-backbone".into(),
671            listen_ip: "127.0.0.1".into(),
672            listen_port: port,
673            interface_id: InterfaceId(82),
674        };
675
676        start(config, tx, next_id).unwrap();
677        thread::sleep(Duration::from_millis(50));
678
679        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
680        client.set_read_timeout(Some(Duration::from_secs(2))).unwrap();
681
682        // Get writer from InterfaceUp
683        let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
684        let mut writer = match event {
685            Event::InterfaceUp(_, Some(w), _) => w,
686            other => panic!("expected InterfaceUp with writer, got {:?}", other),
687        };
688
689        // Send frame via writer
690        let payload: Vec<u8> = (0..24).collect();
691        writer.send_frame(&payload).unwrap();
692
693        // Read from client
694        let mut buf = [0u8; 256];
695        let n = client.read(&mut buf).unwrap();
696        let expected = hdlc::frame(&payload);
697        assert_eq!(&buf[..n], &expected[..]);
698    }
699
700    #[test]
701    fn backbone_multiple_clients() {
702        let port = find_free_port();
703        let (tx, rx) = mpsc::channel();
704        let next_id = Arc::new(AtomicU64::new(8300));
705
706        let config = BackboneConfig {
707            name: "test-backbone".into(),
708            listen_ip: "127.0.0.1".into(),
709            listen_port: port,
710            interface_id: InterfaceId(83),
711        };
712
713        start(config, tx, next_id).unwrap();
714        thread::sleep(Duration::from_millis(50));
715
716        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
717        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
718
719        let mut ids = Vec::new();
720        for _ in 0..2 {
721            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
722            match event {
723                Event::InterfaceUp(id, _, _) => ids.push(id),
724                other => panic!("expected InterfaceUp, got {:?}", other),
725            }
726        }
727
728        assert_eq!(ids.len(), 2);
729        assert_ne!(ids[0], ids[1]);
730    }
731
732    #[test]
733    fn backbone_client_disconnect() {
734        let port = find_free_port();
735        let (tx, rx) = mpsc::channel();
736        let next_id = Arc::new(AtomicU64::new(8400));
737
738        let config = BackboneConfig {
739            name: "test-backbone".into(),
740            listen_ip: "127.0.0.1".into(),
741            listen_port: port,
742            interface_id: InterfaceId(84),
743        };
744
745        start(config, tx, next_id).unwrap();
746        thread::sleep(Duration::from_millis(50));
747
748        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
749
750        // Drain InterfaceUp
751        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
752
753        // Disconnect
754        drop(client);
755
756        // Should receive InterfaceDown
757        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
758        assert!(
759            matches!(event, Event::InterfaceDown(InterfaceId(8400))),
760            "expected InterfaceDown(8400), got {:?}",
761            event
762        );
763    }
764
765    #[test]
766    fn backbone_epoll_multiplexing() {
767        let port = find_free_port();
768        let (tx, rx) = mpsc::channel();
769        let next_id = Arc::new(AtomicU64::new(8500));
770
771        let config = BackboneConfig {
772            name: "test-backbone".into(),
773            listen_ip: "127.0.0.1".into(),
774            listen_port: port,
775            interface_id: InterfaceId(85),
776        };
777
778        start(config, tx, next_id).unwrap();
779        thread::sleep(Duration::from_millis(50));
780
781        let mut client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
782        let mut client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
783
784        // Drain both InterfaceUp events
785        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
786        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
787
788        // Both clients send data simultaneously
789        let payload1: Vec<u8> = (0..24).collect();
790        let payload2: Vec<u8> = (100..130).collect();
791        client1.write_all(&hdlc::frame(&payload1)).unwrap();
792        client2.write_all(&hdlc::frame(&payload2)).unwrap();
793
794        // Should receive both Frame events
795        let mut received = Vec::new();
796        for _ in 0..2 {
797            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
798            match event {
799                Event::Frame { data, .. } => received.push(data),
800                other => panic!("expected Frame, got {:?}", other),
801            }
802        }
803        assert!(received.contains(&payload1));
804        assert!(received.contains(&payload2));
805    }
806
807    #[test]
808    fn backbone_bind_port() {
809        let port = find_free_port();
810        let (tx, _rx) = mpsc::channel();
811        let next_id = Arc::new(AtomicU64::new(8600));
812
813        let config = BackboneConfig {
814            name: "test-backbone".into(),
815            listen_ip: "127.0.0.1".into(),
816            listen_port: port,
817            interface_id: InterfaceId(86),
818        };
819
820        // Should not error
821        start(config, tx, next_id).unwrap();
822    }
823
824    #[test]
825    fn backbone_hdlc_fragmented() {
826        let port = find_free_port();
827        let (tx, rx) = mpsc::channel();
828        let next_id = Arc::new(AtomicU64::new(8700));
829
830        let config = BackboneConfig {
831            name: "test-backbone".into(),
832            listen_ip: "127.0.0.1".into(),
833            listen_port: port,
834            interface_id: InterfaceId(87),
835        };
836
837        start(config, tx, next_id).unwrap();
838        thread::sleep(Duration::from_millis(50));
839
840        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
841        client.set_nodelay(true).unwrap();
842
843        // Drain InterfaceUp
844        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
845
846        // Send HDLC frame in two fragments
847        let payload: Vec<u8> = (0..32).collect();
848        let framed = hdlc::frame(&payload);
849        let mid = framed.len() / 2;
850
851        client.write_all(&framed[..mid]).unwrap();
852        thread::sleep(Duration::from_millis(50));
853        client.write_all(&framed[mid..]).unwrap();
854
855        // Should receive reassembled frame
856        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
857        match event {
858            Event::Frame { data, .. } => {
859                assert_eq!(data, payload);
860            }
861            other => panic!("expected Frame, got {:?}", other),
862        }
863    }
864
865    // -----------------------------------------------------------------------
866    // Client mode tests
867    // -----------------------------------------------------------------------
868
869    fn make_client_config(port: u16, id: u64) -> BackboneClientConfig {
870        BackboneClientConfig {
871            name: format!("test-bb-client-{}", port),
872            target_host: "127.0.0.1".into(),
873            target_port: port,
874            interface_id: InterfaceId(id),
875            reconnect_wait: Duration::from_millis(100),
876            max_reconnect_tries: Some(2),
877            connect_timeout: Duration::from_secs(2),
878            transport_identity: None,
879        }
880    }
881
882    #[test]
883    fn backbone_client_connect() {
884        let port = find_free_port();
885        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
886        let (tx, rx) = mpsc::channel();
887
888        let config = make_client_config(port, 9000);
889        let _writer = start_client(config, tx).unwrap();
890
891        let _server_stream = listener.accept().unwrap();
892
893        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
894        assert!(matches!(event, Event::InterfaceUp(InterfaceId(9000), _, _)));
895    }
896
897    #[test]
898    fn backbone_client_receive_frame() {
899        let port = find_free_port();
900        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
901        let (tx, rx) = mpsc::channel();
902
903        let config = make_client_config(port, 9100);
904        let _writer = start_client(config, tx).unwrap();
905
906        let (mut server_stream, _) = listener.accept().unwrap();
907
908        // Drain InterfaceUp
909        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
910
911        // Send HDLC frame from server side (>= 19 bytes payload)
912        let payload: Vec<u8> = (0..32).collect();
913        server_stream.write_all(&hdlc::frame(&payload)).unwrap();
914
915        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
916        match event {
917            Event::Frame { interface_id, data } => {
918                assert_eq!(interface_id, InterfaceId(9100));
919                assert_eq!(data, payload);
920            }
921            other => panic!("expected Frame, got {:?}", other),
922        }
923    }
924
925    #[test]
926    fn backbone_client_send_frame() {
927        let port = find_free_port();
928        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
929        let (tx, _rx) = mpsc::channel();
930
931        let config = make_client_config(port, 9200);
932        let mut writer = start_client(config, tx).unwrap();
933
934        let (mut server_stream, _) = listener.accept().unwrap();
935        server_stream
936            .set_read_timeout(Some(Duration::from_secs(2)))
937            .unwrap();
938
939        let payload: Vec<u8> = (0..24).collect();
940        writer.send_frame(&payload).unwrap();
941
942        let mut buf = [0u8; 256];
943        let n = server_stream.read(&mut buf).unwrap();
944        let expected = hdlc::frame(&payload);
945        assert_eq!(&buf[..n], &expected[..]);
946    }
947
948    #[test]
949    fn backbone_client_reconnect() {
950        let port = find_free_port();
951        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
952        listener.set_nonblocking(false).unwrap();
953        let (tx, rx) = mpsc::channel();
954
955        let config = make_client_config(port, 9300);
956        let _writer = start_client(config, tx).unwrap();
957
958        // Accept first connection and immediately close it
959        let (server_stream, _) = listener.accept().unwrap();
960
961        // Drain InterfaceUp
962        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
963
964        drop(server_stream);
965
966        // Should get InterfaceDown
967        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
968        assert!(matches!(event, Event::InterfaceDown(InterfaceId(9300))));
969
970        // Accept the reconnection
971        let _server_stream2 = listener.accept().unwrap();
972
973        // Should get InterfaceUp again
974        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
975        assert!(matches!(event, Event::InterfaceUp(InterfaceId(9300), _, _)));
976    }
977}