Skip to main content

rns_net/interface/
backbone.rs

1//! Backbone TCP mesh interface using Linux epoll.
2//!
3//! Server mode: listens on a TCP port, accepts peer connections, spawns
4//! dynamic per-peer interfaces. Uses a single epoll thread to multiplex
5//! all client sockets. HDLC framing for packet boundaries.
6//!
7//! Client mode: connects to a remote backbone server, single TCP connection
8//! with HDLC framing. Reconnects on disconnect.
9//!
10//! Matches Python `BackboneInterface.py`.
11
12use std::collections::HashMap;
13use std::io::{self, Read, Write};
14use std::net::{TcpListener, TcpStream, ToSocketAddrs};
15use std::os::unix::io::{AsRawFd, RawFd};
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::sync::Arc;
18use std::thread;
19use std::time::Duration;
20
21use rns_core::constants;
22use rns_core::transport::types::{InterfaceId, InterfaceInfo};
23
24use crate::event::{Event, EventSender};
25use crate::hdlc;
26use crate::interface::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult, Writer};
27
28/// HW_MTU: 1 MB (matches Python BackboneInterface.HW_MTU)
29#[allow(dead_code)]
30const HW_MTU: usize = 1_048_576;
31
32/// Configuration for a backbone interface.
33#[derive(Debug, Clone)]
34pub struct BackboneConfig {
35    pub name: String,
36    pub listen_ip: String,
37    pub listen_port: u16,
38    pub interface_id: InterfaceId,
39}
40
41impl Default for BackboneConfig {
42    fn default() -> Self {
43        BackboneConfig {
44            name: String::new(),
45            listen_ip: "0.0.0.0".into(),
46            listen_port: 0,
47            interface_id: InterfaceId(0),
48        }
49    }
50}
51
52/// Writer that sends HDLC-framed data directly via socket write.
53struct BackboneWriter {
54    fd: RawFd,
55}
56
57impl Writer for BackboneWriter {
58    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
59        let framed = hdlc::frame(data);
60        let mut offset = 0;
61        while offset < framed.len() {
62            let n = unsafe {
63                libc::send(
64                    self.fd,
65                    framed[offset..].as_ptr() as *const libc::c_void,
66                    framed.len() - offset,
67                    libc::MSG_NOSIGNAL,
68                )
69            };
70            if n < 0 {
71                return Err(io::Error::last_os_error());
72            }
73            offset += n as usize;
74        }
75        Ok(())
76    }
77}
78
79// BackboneWriter's fd is a dup'd copy — we own it
80impl Drop for BackboneWriter {
81    fn drop(&mut self) {
82        unsafe {
83            libc::close(self.fd);
84        }
85    }
86}
87
88/// Safety: the fd is only accessed via send/close which are thread-safe.
89unsafe impl Send for BackboneWriter {}
90
91/// Start a backbone interface. Binds TCP listener, spawns epoll thread.
92pub fn start(config: BackboneConfig, tx: EventSender, next_id: Arc<AtomicU64>) -> io::Result<()> {
93    let addr = format!("{}:{}", config.listen_ip, config.listen_port);
94    let listener = TcpListener::bind(&addr)?;
95    listener.set_nonblocking(true)?;
96
97    log::info!(
98        "[{}] backbone server listening on {}",
99        config.name,
100        listener.local_addr().unwrap_or(addr.parse().unwrap())
101    );
102
103    let name = config.name.clone();
104    thread::Builder::new()
105        .name(format!("backbone-epoll-{}", config.interface_id.0))
106        .spawn(move || {
107            if let Err(e) = epoll_loop(listener, name, tx, next_id) {
108                log::error!("backbone epoll loop error: {}", e);
109            }
110        })?;
111
112    Ok(())
113}
114
115/// Per-client tracking state.
116struct ClientState {
117    id: InterfaceId,
118    decoder: hdlc::Decoder,
119}
120
121/// Main epoll event loop.
122fn epoll_loop(
123    listener: TcpListener,
124    name: String,
125    tx: EventSender,
126    next_id: Arc<AtomicU64>,
127) -> io::Result<()> {
128    let epfd = unsafe { libc::epoll_create1(0) };
129    if epfd < 0 {
130        return Err(io::Error::last_os_error());
131    }
132
133    // Register listener
134    let listener_fd = listener.as_raw_fd();
135    let mut ev = libc::epoll_event {
136        events: libc::EPOLLIN as u32,
137        u64: listener_fd as u64,
138    };
139    if unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, listener_fd, &mut ev) } < 0 {
140        unsafe { libc::close(epfd) };
141        return Err(io::Error::last_os_error());
142    }
143
144    let mut clients: HashMap<RawFd, ClientState> = HashMap::new();
145    let mut events = vec![libc::epoll_event { events: 0, u64: 0 }; 64];
146
147    loop {
148        let nfds =
149            unsafe { libc::epoll_wait(epfd, events.as_mut_ptr(), events.len() as i32, 1000) };
150
151        if nfds < 0 {
152            let err = io::Error::last_os_error();
153            if err.kind() == io::ErrorKind::Interrupted {
154                continue;
155            }
156            // Clean up
157            for (&fd, _) in &clients {
158                unsafe {
159                    libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
160                    libc::close(fd);
161                }
162            }
163            unsafe { libc::close(epfd) };
164            return Err(err);
165        }
166
167        for i in 0..nfds as usize {
168            let ev = &events[i];
169            let fd = ev.u64 as RawFd;
170
171            if fd == listener_fd {
172                // Accept new connection
173                loop {
174                    match listener.accept() {
175                        Ok((stream, peer_addr)) => {
176                            let client_fd = stream.as_raw_fd();
177
178                            // Set non-blocking
179                            stream.set_nonblocking(true).ok();
180                            stream.set_nodelay(true).ok();
181
182                            // Set SO_KEEPALIVE and TCP options
183                            set_tcp_keepalive(client_fd);
184
185                            let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
186
187                            log::info!(
188                                "[{}] backbone client connected: {} → id {}",
189                                name,
190                                peer_addr,
191                                client_id.0
192                            );
193
194                            // Register client fd with epoll
195                            let mut cev = libc::epoll_event {
196                                events: libc::EPOLLIN as u32,
197                                u64: client_fd as u64,
198                            };
199                            if unsafe {
200                                libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, client_fd, &mut cev)
201                            } < 0
202                            {
203                                log::warn!(
204                                    "[{}] failed to add client to epoll: {}",
205                                    name,
206                                    io::Error::last_os_error()
207                                );
208                                // stream drops here, closing client_fd — correct
209                                continue;
210                            }
211
212                            // Prevent TcpStream from closing the fd on drop.
213                            // From here on, we own client_fd via epoll.
214                            std::mem::forget(stream);
215
216                            // Create writer (dup the fd so writer has independent ownership)
217                            let writer_fd = unsafe { libc::dup(client_fd) };
218                            if writer_fd < 0 {
219                                log::warn!("[{}] failed to dup client fd", name);
220                                unsafe {
221                                    libc::epoll_ctl(
222                                        epfd,
223                                        libc::EPOLL_CTL_DEL,
224                                        client_fd,
225                                        std::ptr::null_mut(),
226                                    );
227                                    libc::close(client_fd);
228                                }
229                                continue;
230                            }
231                            let writer: Box<dyn Writer> =
232                                Box::new(BackboneWriter { fd: writer_fd });
233
234                            clients.insert(
235                                client_fd,
236                                ClientState {
237                                    id: client_id,
238                                    decoder: hdlc::Decoder::new(),
239                                },
240                            );
241
242                            let info = InterfaceInfo {
243                                id: client_id,
244                                name: format!("BackboneInterface/{}", client_fd),
245                                mode: constants::MODE_FULL,
246                                out_capable: true,
247                                in_capable: true,
248                                bitrate: Some(1_000_000_000), // 1 Gbps guess
249                                announce_rate_target: None,
250                                announce_rate_grace: 0,
251                                announce_rate_penalty: 0.0,
252                                announce_cap: constants::ANNOUNCE_CAP,
253                                is_local_client: false,
254                                wants_tunnel: false,
255                                tunnel_id: None,
256                                mtu: 65535,
257                                ia_freq: 0.0,
258                                started: 0.0,
259                                ingress_control: true,
260                            };
261
262                            if tx
263                                .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
264                                .is_err()
265                            {
266                                // Driver shut down
267                                cleanup(epfd, &clients, listener_fd);
268                                return Ok(());
269                            }
270                        }
271                        Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
272                        Err(e) => {
273                            log::warn!("[{}] accept error: {}", name, e);
274                            break;
275                        }
276                    }
277                }
278            } else if clients.contains_key(&fd) {
279                // Client event
280                let mut should_remove = false;
281                let mut client_id = InterfaceId(0);
282
283                if ev.events & libc::EPOLLIN as u32 != 0 {
284                    let mut buf = [0u8; 4096];
285                    let n = unsafe {
286                        libc::recv(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len(), 0)
287                    };
288
289                    if n <= 0 {
290                        if let Some(c) = clients.get(&fd) {
291                            client_id = c.id;
292                        }
293                        should_remove = true;
294                    } else if let Some(client) = clients.get_mut(&fd) {
295                        client_id = client.id;
296                        for frame in client.decoder.feed(&buf[..n as usize]) {
297                            if tx
298                                .send(Event::Frame {
299                                    interface_id: client_id,
300                                    data: frame,
301                                })
302                                .is_err()
303                            {
304                                cleanup(epfd, &clients, listener_fd);
305                                return Ok(());
306                            }
307                        }
308                    }
309                }
310
311                if ev.events & (libc::EPOLLHUP | libc::EPOLLERR) as u32 != 0 {
312                    if let Some(c) = clients.get(&fd) {
313                        client_id = c.id;
314                    }
315                    should_remove = true;
316                }
317
318                if should_remove {
319                    log::info!("[{}] backbone client {} disconnected", name, client_id.0);
320                    unsafe {
321                        libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
322                        libc::close(fd);
323                    }
324                    clients.remove(&fd);
325                    let _ = tx.send(Event::InterfaceDown(client_id));
326                }
327            }
328        }
329    }
330}
331
332fn set_tcp_keepalive(fd: RawFd) {
333    unsafe {
334        let one: libc::c_int = 1;
335        libc::setsockopt(
336            fd,
337            libc::SOL_SOCKET,
338            libc::SO_KEEPALIVE,
339            &one as *const _ as *const libc::c_void,
340            std::mem::size_of::<libc::c_int>() as libc::socklen_t,
341        );
342        let idle: libc::c_int = 5;
343        libc::setsockopt(
344            fd,
345            libc::IPPROTO_TCP,
346            libc::TCP_KEEPIDLE,
347            &idle as *const _ as *const libc::c_void,
348            std::mem::size_of::<libc::c_int>() as libc::socklen_t,
349        );
350        let interval: libc::c_int = 2;
351        libc::setsockopt(
352            fd,
353            libc::IPPROTO_TCP,
354            libc::TCP_KEEPINTVL,
355            &interval as *const _ as *const libc::c_void,
356            std::mem::size_of::<libc::c_int>() as libc::socklen_t,
357        );
358        let cnt: libc::c_int = 12;
359        libc::setsockopt(
360            fd,
361            libc::IPPROTO_TCP,
362            libc::TCP_KEEPCNT,
363            &cnt as *const _ as *const libc::c_void,
364            std::mem::size_of::<libc::c_int>() as libc::socklen_t,
365        );
366    }
367}
368
369fn cleanup(epfd: RawFd, clients: &HashMap<RawFd, ClientState>, listener_fd: RawFd) {
370    for (&fd, _) in clients {
371        unsafe {
372            libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
373            libc::close(fd);
374        }
375    }
376    unsafe {
377        libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, listener_fd, std::ptr::null_mut());
378        libc::close(epfd);
379    }
380}
381
382// ---------------------------------------------------------------------------
383// Client mode
384// ---------------------------------------------------------------------------
385
386/// Configuration for a backbone client interface.
387#[derive(Debug, Clone)]
388pub struct BackboneClientConfig {
389    pub name: String,
390    pub target_host: String,
391    pub target_port: u16,
392    pub interface_id: InterfaceId,
393    pub reconnect_wait: Duration,
394    pub max_reconnect_tries: Option<u32>,
395    pub connect_timeout: Duration,
396    pub transport_identity: Option<String>,
397}
398
399impl Default for BackboneClientConfig {
400    fn default() -> Self {
401        BackboneClientConfig {
402            name: String::new(),
403            target_host: "127.0.0.1".into(),
404            target_port: 4242,
405            interface_id: InterfaceId(0),
406            reconnect_wait: Duration::from_secs(5),
407            max_reconnect_tries: None,
408            connect_timeout: Duration::from_secs(5),
409            transport_identity: None,
410        }
411    }
412}
413
414/// Writer that sends HDLC-framed data over a TCP stream (client mode).
415struct BackboneClientWriter {
416    stream: TcpStream,
417}
418
419impl Writer for BackboneClientWriter {
420    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
421        self.stream.write_all(&hdlc::frame(data))
422    }
423}
424
425/// Try to connect to the target host:port with timeout.
426fn try_connect_client(config: &BackboneClientConfig) -> io::Result<TcpStream> {
427    let addr_str = format!("{}:{}", config.target_host, config.target_port);
428    let addr = addr_str
429        .to_socket_addrs()?
430        .next()
431        .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "no addresses resolved"))?;
432
433    let stream = TcpStream::connect_timeout(&addr, config.connect_timeout)?;
434    stream.set_nodelay(true)?;
435    set_tcp_keepalive(stream.as_raw_fd());
436    Ok(stream)
437}
438
439/// Connect and start the reader thread. Returns the writer for the driver.
440pub fn start_client(config: BackboneClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
441    let stream = try_connect_client(&config)?;
442    let reader_stream = stream.try_clone()?;
443    let writer_stream = stream.try_clone()?;
444
445    let id = config.interface_id;
446    log::info!(
447        "[{}] backbone client connected to {}:{}",
448        config.name,
449        config.target_host,
450        config.target_port
451    );
452
453    // Initial connect: writer is None because it's returned directly to the caller
454    let _ = tx.send(Event::InterfaceUp(id, None, None));
455
456    thread::Builder::new()
457        .name(format!("backbone-client-{}", id.0))
458        .spawn(move || {
459            client_reader_loop(reader_stream, config, tx);
460        })?;
461
462    Ok(Box::new(BackboneClientWriter {
463        stream: writer_stream,
464    }))
465}
466
467/// Reader thread: reads from socket, HDLC-decodes, sends frames to driver.
468/// On disconnect, attempts reconnection.
469fn client_reader_loop(mut stream: TcpStream, config: BackboneClientConfig, tx: EventSender) {
470    let id = config.interface_id;
471    let mut decoder = hdlc::Decoder::new();
472    let mut buf = [0u8; 4096];
473
474    loop {
475        match stream.read(&mut buf) {
476            Ok(0) => {
477                log::warn!("[{}] connection closed", config.name);
478                let _ = tx.send(Event::InterfaceDown(id));
479                match client_reconnect(&config, &tx) {
480                    Some(new_stream) => {
481                        stream = new_stream;
482                        decoder = hdlc::Decoder::new();
483                        continue;
484                    }
485                    None => {
486                        log::error!("[{}] reconnection failed, giving up", config.name);
487                        return;
488                    }
489                }
490            }
491            Ok(n) => {
492                for frame in decoder.feed(&buf[..n]) {
493                    if tx
494                        .send(Event::Frame {
495                            interface_id: id,
496                            data: frame,
497                        })
498                        .is_err()
499                    {
500                        return;
501                    }
502                }
503            }
504            Err(e) => {
505                log::warn!("[{}] read error: {}", config.name, e);
506                let _ = tx.send(Event::InterfaceDown(id));
507                match client_reconnect(&config, &tx) {
508                    Some(new_stream) => {
509                        stream = new_stream;
510                        decoder = hdlc::Decoder::new();
511                        continue;
512                    }
513                    None => {
514                        log::error!("[{}] reconnection failed, giving up", config.name);
515                        return;
516                    }
517                }
518            }
519        }
520    }
521}
522
523/// Attempt to reconnect with retry logic. Returns the new reader stream on success.
524/// Sends the new writer to the driver via InterfaceUp event.
525fn client_reconnect(config: &BackboneClientConfig, tx: &EventSender) -> Option<TcpStream> {
526    let mut attempts = 0u32;
527    loop {
528        thread::sleep(config.reconnect_wait);
529        attempts += 1;
530
531        if let Some(max) = config.max_reconnect_tries {
532            if attempts > max {
533                let _ = tx.send(Event::InterfaceDown(config.interface_id));
534                return None;
535            }
536        }
537
538        log::info!("[{}] reconnect attempt {} ...", config.name, attempts);
539
540        match try_connect_client(config) {
541            Ok(new_stream) => {
542                let writer_stream = match new_stream.try_clone() {
543                    Ok(s) => s,
544                    Err(e) => {
545                        log::warn!("[{}] failed to clone stream: {}", config.name, e);
546                        continue;
547                    }
548                };
549                log::info!("[{}] reconnected", config.name);
550                let new_writer: Box<dyn Writer> = Box::new(BackboneClientWriter {
551                    stream: writer_stream,
552                });
553                let _ = tx.send(Event::InterfaceUp(
554                    config.interface_id,
555                    Some(new_writer),
556                    None,
557                ));
558                return Some(new_stream);
559            }
560            Err(e) => {
561                log::warn!("[{}] reconnect failed: {}", config.name, e);
562            }
563        }
564    }
565}
566
567// ---------------------------------------------------------------------------
568// Factory
569// ---------------------------------------------------------------------------
570
571/// Internal enum used by [`BackboneInterfaceFactory`] to carry either a
572/// server or client config through the opaque `InterfaceConfigData` channel.
573enum BackboneMode {
574    Server(BackboneConfig),
575    Client(BackboneClientConfig),
576}
577
578/// Factory for `BackboneInterface`.
579///
580/// If the config params contain `"remote"` or `"target_host"` the interface
581/// is started in client mode; otherwise it is started as a TCP listener
582/// (server mode).
583pub struct BackboneInterfaceFactory;
584
585impl InterfaceFactory for BackboneInterfaceFactory {
586    fn type_name(&self) -> &str {
587        "BackboneInterface"
588    }
589
590    fn parse_config(
591        &self,
592        name: &str,
593        id: InterfaceId,
594        params: &HashMap<String, String>,
595    ) -> Result<Box<dyn InterfaceConfigData>, String> {
596        if let Some(target_host) = params.get("remote").or_else(|| params.get("target_host")) {
597            // Client mode
598            let target_host = target_host.clone();
599            let target_port = params
600                .get("target_port")
601                .or_else(|| params.get("port"))
602                .and_then(|v| v.parse().ok())
603                .unwrap_or(4242);
604            let transport_identity = params.get("transport_identity").cloned();
605            Ok(Box::new(BackboneMode::Client(BackboneClientConfig {
606                name: name.to_string(),
607                target_host,
608                target_port,
609                interface_id: id,
610                transport_identity,
611                ..BackboneClientConfig::default()
612            })))
613        } else {
614            // Server mode
615            let listen_ip = params
616                .get("listen_ip")
617                .or_else(|| params.get("device"))
618                .cloned()
619                .unwrap_or_else(|| "0.0.0.0".into());
620            let listen_port = params
621                .get("listen_port")
622                .or_else(|| params.get("port"))
623                .and_then(|v| v.parse().ok())
624                .unwrap_or(4242);
625            Ok(Box::new(BackboneMode::Server(BackboneConfig {
626                name: name.to_string(),
627                listen_ip,
628                listen_port,
629                interface_id: id,
630            })))
631        }
632    }
633
634    fn start(
635        &self,
636        config: Box<dyn InterfaceConfigData>,
637        ctx: StartContext,
638    ) -> io::Result<StartResult> {
639        let mode = *config.into_any().downcast::<BackboneMode>().map_err(|_| {
640            io::Error::new(
641                io::ErrorKind::InvalidData,
642                "wrong config type for BackboneInterface",
643            )
644        })?;
645
646        match mode {
647            BackboneMode::Client(cfg) => {
648                let id = cfg.interface_id;
649                let name = cfg.name.clone();
650                let info = InterfaceInfo {
651                    id,
652                    name,
653                    mode: ctx.mode,
654                    out_capable: true,
655                    in_capable: true,
656                    bitrate: Some(1_000_000_000),
657                    announce_rate_target: None,
658                    announce_rate_grace: 0,
659                    announce_rate_penalty: 0.0,
660                    announce_cap: constants::ANNOUNCE_CAP,
661                    is_local_client: false,
662                    wants_tunnel: false,
663                    tunnel_id: None,
664                    mtu: 65535,
665                    ingress_control: true,
666                    ia_freq: 0.0,
667                    started: crate::time::now(),
668                };
669                let writer = start_client(cfg, ctx.tx)?;
670                Ok(StartResult::Simple {
671                    id,
672                    info,
673                    writer,
674                    interface_type_name: "BackboneInterface".to_string(),
675                })
676            }
677            BackboneMode::Server(cfg) => {
678                start(cfg, ctx.tx, ctx.next_dynamic_id)?;
679                Ok(StartResult::Listener)
680            }
681        }
682    }
683}
684
685#[cfg(test)]
686mod tests {
687    use super::*;
688    use std::sync::mpsc;
689    use std::time::Duration;
690
691    fn find_free_port() -> u16 {
692        TcpListener::bind("127.0.0.1:0")
693            .unwrap()
694            .local_addr()
695            .unwrap()
696            .port()
697    }
698
699    #[test]
700    fn backbone_accept_connection() {
701        let port = find_free_port();
702        let (tx, rx) = mpsc::channel();
703        let next_id = Arc::new(AtomicU64::new(8000));
704
705        let config = BackboneConfig {
706            name: "test-backbone".into(),
707            listen_ip: "127.0.0.1".into(),
708            listen_port: port,
709            interface_id: InterfaceId(80),
710        };
711
712        start(config, tx, next_id).unwrap();
713        thread::sleep(Duration::from_millis(50));
714
715        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
716
717        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
718        match event {
719            Event::InterfaceUp(id, writer, info) => {
720                assert_eq!(id, InterfaceId(8000));
721                assert!(writer.is_some());
722                assert!(info.is_some());
723                let info = info.unwrap();
724                assert!(info.out_capable);
725                assert!(info.in_capable);
726            }
727            other => panic!("expected InterfaceUp, got {:?}", other),
728        }
729    }
730
731    #[test]
732    fn backbone_receive_frame() {
733        let port = find_free_port();
734        let (tx, rx) = mpsc::channel();
735        let next_id = Arc::new(AtomicU64::new(8100));
736
737        let config = BackboneConfig {
738            name: "test-backbone".into(),
739            listen_ip: "127.0.0.1".into(),
740            listen_port: port,
741            interface_id: InterfaceId(81),
742        };
743
744        start(config, tx, next_id).unwrap();
745        thread::sleep(Duration::from_millis(50));
746
747        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
748
749        // Drain InterfaceUp
750        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
751
752        // Send HDLC frame (>= 19 bytes)
753        let payload: Vec<u8> = (0..32).collect();
754        client.write_all(&hdlc::frame(&payload)).unwrap();
755
756        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
757        match event {
758            Event::Frame { interface_id, data } => {
759                assert_eq!(interface_id, InterfaceId(8100));
760                assert_eq!(data, payload);
761            }
762            other => panic!("expected Frame, got {:?}", other),
763        }
764    }
765
766    #[test]
767    fn backbone_send_to_client() {
768        let port = find_free_port();
769        let (tx, rx) = mpsc::channel();
770        let next_id = Arc::new(AtomicU64::new(8200));
771
772        let config = BackboneConfig {
773            name: "test-backbone".into(),
774            listen_ip: "127.0.0.1".into(),
775            listen_port: port,
776            interface_id: InterfaceId(82),
777        };
778
779        start(config, tx, next_id).unwrap();
780        thread::sleep(Duration::from_millis(50));
781
782        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
783        client
784            .set_read_timeout(Some(Duration::from_secs(2)))
785            .unwrap();
786
787        // Get writer from InterfaceUp
788        let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
789        let mut writer = match event {
790            Event::InterfaceUp(_, Some(w), _) => w,
791            other => panic!("expected InterfaceUp with writer, got {:?}", other),
792        };
793
794        // Send frame via writer
795        let payload: Vec<u8> = (0..24).collect();
796        writer.send_frame(&payload).unwrap();
797
798        // Read from client
799        let mut buf = [0u8; 256];
800        let n = client.read(&mut buf).unwrap();
801        let expected = hdlc::frame(&payload);
802        assert_eq!(&buf[..n], &expected[..]);
803    }
804
805    #[test]
806    fn backbone_multiple_clients() {
807        let port = find_free_port();
808        let (tx, rx) = mpsc::channel();
809        let next_id = Arc::new(AtomicU64::new(8300));
810
811        let config = BackboneConfig {
812            name: "test-backbone".into(),
813            listen_ip: "127.0.0.1".into(),
814            listen_port: port,
815            interface_id: InterfaceId(83),
816        };
817
818        start(config, tx, next_id).unwrap();
819        thread::sleep(Duration::from_millis(50));
820
821        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
822        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
823
824        let mut ids = Vec::new();
825        for _ in 0..2 {
826            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
827            match event {
828                Event::InterfaceUp(id, _, _) => ids.push(id),
829                other => panic!("expected InterfaceUp, got {:?}", other),
830            }
831        }
832
833        assert_eq!(ids.len(), 2);
834        assert_ne!(ids[0], ids[1]);
835    }
836
837    #[test]
838    fn backbone_client_disconnect() {
839        let port = find_free_port();
840        let (tx, rx) = mpsc::channel();
841        let next_id = Arc::new(AtomicU64::new(8400));
842
843        let config = BackboneConfig {
844            name: "test-backbone".into(),
845            listen_ip: "127.0.0.1".into(),
846            listen_port: port,
847            interface_id: InterfaceId(84),
848        };
849
850        start(config, tx, next_id).unwrap();
851        thread::sleep(Duration::from_millis(50));
852
853        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
854
855        // Drain InterfaceUp
856        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
857
858        // Disconnect
859        drop(client);
860
861        // Should receive InterfaceDown
862        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
863        assert!(
864            matches!(event, Event::InterfaceDown(InterfaceId(8400))),
865            "expected InterfaceDown(8400), got {:?}",
866            event
867        );
868    }
869
870    #[test]
871    fn backbone_epoll_multiplexing() {
872        let port = find_free_port();
873        let (tx, rx) = mpsc::channel();
874        let next_id = Arc::new(AtomicU64::new(8500));
875
876        let config = BackboneConfig {
877            name: "test-backbone".into(),
878            listen_ip: "127.0.0.1".into(),
879            listen_port: port,
880            interface_id: InterfaceId(85),
881        };
882
883        start(config, tx, next_id).unwrap();
884        thread::sleep(Duration::from_millis(50));
885
886        let mut client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
887        let mut client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
888
889        // Drain both InterfaceUp events
890        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
891        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
892
893        // Both clients send data simultaneously
894        let payload1: Vec<u8> = (0..24).collect();
895        let payload2: Vec<u8> = (100..130).collect();
896        client1.write_all(&hdlc::frame(&payload1)).unwrap();
897        client2.write_all(&hdlc::frame(&payload2)).unwrap();
898
899        // Should receive both Frame events
900        let mut received = Vec::new();
901        for _ in 0..2 {
902            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
903            match event {
904                Event::Frame { data, .. } => received.push(data),
905                other => panic!("expected Frame, got {:?}", other),
906            }
907        }
908        assert!(received.contains(&payload1));
909        assert!(received.contains(&payload2));
910    }
911
912    #[test]
913    fn backbone_bind_port() {
914        let port = find_free_port();
915        let (tx, _rx) = mpsc::channel();
916        let next_id = Arc::new(AtomicU64::new(8600));
917
918        let config = BackboneConfig {
919            name: "test-backbone".into(),
920            listen_ip: "127.0.0.1".into(),
921            listen_port: port,
922            interface_id: InterfaceId(86),
923        };
924
925        // Should not error
926        start(config, tx, next_id).unwrap();
927    }
928
929    #[test]
930    fn backbone_hdlc_fragmented() {
931        let port = find_free_port();
932        let (tx, rx) = mpsc::channel();
933        let next_id = Arc::new(AtomicU64::new(8700));
934
935        let config = BackboneConfig {
936            name: "test-backbone".into(),
937            listen_ip: "127.0.0.1".into(),
938            listen_port: port,
939            interface_id: InterfaceId(87),
940        };
941
942        start(config, tx, next_id).unwrap();
943        thread::sleep(Duration::from_millis(50));
944
945        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
946        client.set_nodelay(true).unwrap();
947
948        // Drain InterfaceUp
949        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
950
951        // Send HDLC frame in two fragments
952        let payload: Vec<u8> = (0..32).collect();
953        let framed = hdlc::frame(&payload);
954        let mid = framed.len() / 2;
955
956        client.write_all(&framed[..mid]).unwrap();
957        thread::sleep(Duration::from_millis(50));
958        client.write_all(&framed[mid..]).unwrap();
959
960        // Should receive reassembled frame
961        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
962        match event {
963            Event::Frame { data, .. } => {
964                assert_eq!(data, payload);
965            }
966            other => panic!("expected Frame, got {:?}", other),
967        }
968    }
969
970    // -----------------------------------------------------------------------
971    // Client mode tests
972    // -----------------------------------------------------------------------
973
974    fn make_client_config(port: u16, id: u64) -> BackboneClientConfig {
975        BackboneClientConfig {
976            name: format!("test-bb-client-{}", port),
977            target_host: "127.0.0.1".into(),
978            target_port: port,
979            interface_id: InterfaceId(id),
980            reconnect_wait: Duration::from_millis(100),
981            max_reconnect_tries: Some(2),
982            connect_timeout: Duration::from_secs(2),
983            transport_identity: None,
984        }
985    }
986
987    #[test]
988    fn backbone_client_connect() {
989        let port = find_free_port();
990        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
991        let (tx, rx) = mpsc::channel();
992
993        let config = make_client_config(port, 9000);
994        let _writer = start_client(config, tx).unwrap();
995
996        let _server_stream = listener.accept().unwrap();
997
998        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
999        assert!(matches!(event, Event::InterfaceUp(InterfaceId(9000), _, _)));
1000    }
1001
1002    #[test]
1003    fn backbone_client_receive_frame() {
1004        let port = find_free_port();
1005        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1006        let (tx, rx) = mpsc::channel();
1007
1008        let config = make_client_config(port, 9100);
1009        let _writer = start_client(config, tx).unwrap();
1010
1011        let (mut server_stream, _) = listener.accept().unwrap();
1012
1013        // Drain InterfaceUp
1014        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1015
1016        // Send HDLC frame from server side (>= 19 bytes payload)
1017        let payload: Vec<u8> = (0..32).collect();
1018        server_stream.write_all(&hdlc::frame(&payload)).unwrap();
1019
1020        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1021        match event {
1022            Event::Frame { interface_id, data } => {
1023                assert_eq!(interface_id, InterfaceId(9100));
1024                assert_eq!(data, payload);
1025            }
1026            other => panic!("expected Frame, got {:?}", other),
1027        }
1028    }
1029
1030    #[test]
1031    fn backbone_client_send_frame() {
1032        let port = find_free_port();
1033        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1034        let (tx, _rx) = mpsc::channel();
1035
1036        let config = make_client_config(port, 9200);
1037        let mut writer = start_client(config, tx).unwrap();
1038
1039        let (mut server_stream, _) = listener.accept().unwrap();
1040        server_stream
1041            .set_read_timeout(Some(Duration::from_secs(2)))
1042            .unwrap();
1043
1044        let payload: Vec<u8> = (0..24).collect();
1045        writer.send_frame(&payload).unwrap();
1046
1047        let mut buf = [0u8; 256];
1048        let n = server_stream.read(&mut buf).unwrap();
1049        let expected = hdlc::frame(&payload);
1050        assert_eq!(&buf[..n], &expected[..]);
1051    }
1052
1053    #[test]
1054    fn backbone_client_reconnect() {
1055        let port = find_free_port();
1056        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1057        listener.set_nonblocking(false).unwrap();
1058        let (tx, rx) = mpsc::channel();
1059
1060        let config = make_client_config(port, 9300);
1061        let _writer = start_client(config, tx).unwrap();
1062
1063        // Accept first connection and immediately close it
1064        let (server_stream, _) = listener.accept().unwrap();
1065
1066        // Drain InterfaceUp
1067        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1068
1069        drop(server_stream);
1070
1071        // Should get InterfaceDown
1072        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1073        assert!(matches!(event, Event::InterfaceDown(InterfaceId(9300))));
1074
1075        // Accept the reconnection
1076        let _server_stream2 = listener.accept().unwrap();
1077
1078        // Should get InterfaceUp again
1079        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1080        assert!(matches!(event, Event::InterfaceUp(InterfaceId(9300), _, _)));
1081    }
1082}