Skip to main content

rns_net/interface/
backbone.rs

1//! Backbone TCP mesh interface using Linux epoll.
2//!
3//! Server-only: listens on a TCP port, accepts peer connections, spawns
4//! dynamic per-peer interfaces. Uses a single epoll thread to multiplex
5//! all client sockets. HDLC framing for packet boundaries.
6//!
7//! Matches Python `BackboneInterface.py`.
8
9use std::collections::HashMap;
10use std::io;
11use std::net::TcpListener;
12use std::os::unix::io::{AsRawFd, RawFd};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use std::thread;
16
17use rns_core::constants;
18use rns_core::transport::types::{InterfaceId, InterfaceInfo};
19
20use crate::event::{Event, EventSender};
21use crate::hdlc;
22use crate::interface::Writer;
23
24/// HW_MTU: 1 MB (matches Python BackboneInterface.HW_MTU)
25#[allow(dead_code)]
26const HW_MTU: usize = 1_048_576;
27
28/// Configuration for a backbone interface.
29#[derive(Debug, Clone)]
30pub struct BackboneConfig {
31    pub name: String,
32    pub listen_ip: String,
33    pub listen_port: u16,
34    pub interface_id: InterfaceId,
35}
36
37impl Default for BackboneConfig {
38    fn default() -> Self {
39        BackboneConfig {
40            name: String::new(),
41            listen_ip: "0.0.0.0".into(),
42            listen_port: 0,
43            interface_id: InterfaceId(0),
44        }
45    }
46}
47
48/// Writer that sends HDLC-framed data directly via socket write.
49struct BackboneWriter {
50    fd: RawFd,
51}
52
53impl Writer for BackboneWriter {
54    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
55        let framed = hdlc::frame(data);
56        let mut offset = 0;
57        while offset < framed.len() {
58            let n = unsafe {
59                libc::send(
60                    self.fd,
61                    framed[offset..].as_ptr() as *const libc::c_void,
62                    framed.len() - offset,
63                    libc::MSG_NOSIGNAL,
64                )
65            };
66            if n < 0 {
67                return Err(io::Error::last_os_error());
68            }
69            offset += n as usize;
70        }
71        Ok(())
72    }
73}
74
75// BackboneWriter's fd is a dup'd copy — we own it
76impl Drop for BackboneWriter {
77    fn drop(&mut self) {
78        unsafe {
79            libc::close(self.fd);
80        }
81    }
82}
83
84/// Safety: the fd is only accessed via send/close which are thread-safe.
85unsafe impl Send for BackboneWriter {}
86
87/// Start a backbone interface. Binds TCP listener, spawns epoll thread.
88pub fn start(
89    config: BackboneConfig,
90    tx: EventSender,
91    next_id: Arc<AtomicU64>,
92) -> io::Result<()> {
93    let addr = format!("{}:{}", config.listen_ip, config.listen_port);
94    let listener = TcpListener::bind(&addr)?;
95    listener.set_nonblocking(true)?;
96
97    log::info!(
98        "[{}] backbone server listening on {}",
99        config.name,
100        listener.local_addr().unwrap_or(addr.parse().unwrap())
101    );
102
103    let name = config.name.clone();
104    thread::Builder::new()
105        .name(format!("backbone-epoll-{}", config.interface_id.0))
106        .spawn(move || {
107            if let Err(e) = epoll_loop(listener, name, tx, next_id) {
108                log::error!("backbone epoll loop error: {}", e);
109            }
110        })?;
111
112    Ok(())
113}
114
115/// Per-client tracking state.
116struct ClientState {
117    id: InterfaceId,
118    decoder: hdlc::Decoder,
119}
120
121/// Main epoll event loop.
122fn epoll_loop(
123    listener: TcpListener,
124    name: String,
125    tx: EventSender,
126    next_id: Arc<AtomicU64>,
127) -> io::Result<()> {
128    let epfd = unsafe { libc::epoll_create1(0) };
129    if epfd < 0 {
130        return Err(io::Error::last_os_error());
131    }
132
133    // Register listener
134    let listener_fd = listener.as_raw_fd();
135    let mut ev = libc::epoll_event {
136        events: libc::EPOLLIN as u32,
137        u64: listener_fd as u64,
138    };
139    if unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, listener_fd, &mut ev) } < 0 {
140        unsafe { libc::close(epfd) };
141        return Err(io::Error::last_os_error());
142    }
143
144    let mut clients: HashMap<RawFd, ClientState> = HashMap::new();
145    let mut events = vec![libc::epoll_event { events: 0, u64: 0 }; 64];
146
147    loop {
148        let nfds = unsafe {
149            libc::epoll_wait(epfd, events.as_mut_ptr(), events.len() as i32, 1000)
150        };
151
152        if nfds < 0 {
153            let err = io::Error::last_os_error();
154            if err.kind() == io::ErrorKind::Interrupted {
155                continue;
156            }
157            // Clean up
158            for (&fd, _) in &clients {
159                unsafe {
160                    libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
161                    libc::close(fd);
162                }
163            }
164            unsafe { libc::close(epfd) };
165            return Err(err);
166        }
167
168        for i in 0..nfds as usize {
169            let ev = &events[i];
170            let fd = ev.u64 as RawFd;
171
172            if fd == listener_fd {
173                // Accept new connection
174                loop {
175                    match listener.accept() {
176                        Ok((stream, peer_addr)) => {
177                            let client_fd = stream.as_raw_fd();
178
179                            // Set non-blocking
180                            stream.set_nonblocking(true).ok();
181                            stream.set_nodelay(true).ok();
182
183                            // Set SO_KEEPALIVE and TCP options
184                            set_tcp_keepalive(client_fd);
185
186                            let client_id =
187                                InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
188
189                            log::info!(
190                                "[{}] backbone client connected: {} → id {}",
191                                name,
192                                peer_addr,
193                                client_id.0
194                            );
195
196                            // Register client fd with epoll
197                            let mut cev = libc::epoll_event {
198                                events: libc::EPOLLIN as u32,
199                                u64: client_fd as u64,
200                            };
201                            if unsafe {
202                                libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, client_fd, &mut cev)
203                            } < 0
204                            {
205                                log::warn!(
206                                    "[{}] failed to add client to epoll: {}",
207                                    name,
208                                    io::Error::last_os_error()
209                                );
210                                // stream drops here, closing client_fd — correct
211                                continue;
212                            }
213
214                            // Prevent TcpStream from closing the fd on drop.
215                            // From here on, we own client_fd via epoll.
216                            std::mem::forget(stream);
217
218                            // Create writer (dup the fd so writer has independent ownership)
219                            let writer_fd = unsafe { libc::dup(client_fd) };
220                            if writer_fd < 0 {
221                                log::warn!("[{}] failed to dup client fd", name);
222                                unsafe {
223                                    libc::epoll_ctl(
224                                        epfd,
225                                        libc::EPOLL_CTL_DEL,
226                                        client_fd,
227                                        std::ptr::null_mut(),
228                                    );
229                                    libc::close(client_fd);
230                                }
231                                continue;
232                            }
233                            let writer: Box<dyn Writer> =
234                                Box::new(BackboneWriter { fd: writer_fd });
235
236                            clients.insert(
237                                client_fd,
238                                ClientState {
239                                    id: client_id,
240                                    decoder: hdlc::Decoder::new(),
241                                },
242                            );
243
244                            let info = InterfaceInfo {
245                                id: client_id,
246                                name: format!("BackboneInterface/{}", client_fd),
247                                mode: constants::MODE_FULL,
248                                out_capable: true,
249                                in_capable: true,
250                                bitrate: Some(1_000_000_000), // 1 Gbps guess
251                                announce_rate_target: None,
252                                announce_rate_grace: 0,
253                                announce_rate_penalty: 0.0,
254                                announce_cap: constants::ANNOUNCE_CAP,
255                                is_local_client: false,
256                                wants_tunnel: false,
257                                tunnel_id: None,
258                            };
259
260                            if tx
261                                .send(Event::InterfaceUp(
262                                    client_id,
263                                    Some(writer),
264                                    Some(info),
265                                ))
266                                .is_err()
267                            {
268                                // Driver shut down
269                                cleanup(epfd, &clients, listener_fd);
270                                return Ok(());
271                            }
272
273                        }
274                        Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
275                        Err(e) => {
276                            log::warn!("[{}] accept error: {}", name, e);
277                            break;
278                        }
279                    }
280                }
281            } else if clients.contains_key(&fd) {
282                // Client event
283                let mut should_remove = false;
284                let mut client_id = InterfaceId(0);
285
286                if ev.events & libc::EPOLLIN as u32 != 0 {
287                    let mut buf = [0u8; 4096];
288                    let n = unsafe {
289                        libc::recv(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len(), 0)
290                    };
291
292                    if n <= 0 {
293                        if let Some(c) = clients.get(&fd) {
294                            client_id = c.id;
295                        }
296                        should_remove = true;
297                    } else if let Some(client) = clients.get_mut(&fd) {
298                        client_id = client.id;
299                        for frame in client.decoder.feed(&buf[..n as usize]) {
300                            if tx
301                                .send(Event::Frame {
302                                    interface_id: client_id,
303                                    data: frame,
304                                })
305                                .is_err()
306                            {
307                                cleanup(epfd, &clients, listener_fd);
308                                return Ok(());
309                            }
310                        }
311                    }
312                }
313
314                if ev.events & (libc::EPOLLHUP | libc::EPOLLERR) as u32 != 0 {
315                    if let Some(c) = clients.get(&fd) {
316                        client_id = c.id;
317                    }
318                    should_remove = true;
319                }
320
321                if should_remove {
322                    log::info!(
323                        "[{}] backbone client {} disconnected",
324                        name,
325                        client_id.0
326                    );
327                    unsafe {
328                        libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
329                        libc::close(fd);
330                    }
331                    clients.remove(&fd);
332                    let _ = tx.send(Event::InterfaceDown(client_id));
333                }
334            }
335        }
336    }
337}
338
339fn set_tcp_keepalive(fd: RawFd) {
340    unsafe {
341        let one: libc::c_int = 1;
342        libc::setsockopt(
343            fd,
344            libc::SOL_SOCKET,
345            libc::SO_KEEPALIVE,
346            &one as *const _ as *const libc::c_void,
347            std::mem::size_of::<libc::c_int>() as libc::socklen_t,
348        );
349        let idle: libc::c_int = 5;
350        libc::setsockopt(
351            fd,
352            libc::IPPROTO_TCP,
353            libc::TCP_KEEPIDLE,
354            &idle as *const _ as *const libc::c_void,
355            std::mem::size_of::<libc::c_int>() as libc::socklen_t,
356        );
357        let interval: libc::c_int = 2;
358        libc::setsockopt(
359            fd,
360            libc::IPPROTO_TCP,
361            libc::TCP_KEEPINTVL,
362            &interval as *const _ as *const libc::c_void,
363            std::mem::size_of::<libc::c_int>() as libc::socklen_t,
364        );
365        let cnt: libc::c_int = 12;
366        libc::setsockopt(
367            fd,
368            libc::IPPROTO_TCP,
369            libc::TCP_KEEPCNT,
370            &cnt as *const _ as *const libc::c_void,
371            std::mem::size_of::<libc::c_int>() as libc::socklen_t,
372        );
373    }
374}
375
376fn cleanup(epfd: RawFd, clients: &HashMap<RawFd, ClientState>, listener_fd: RawFd) {
377    for (&fd, _) in clients {
378        unsafe {
379            libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
380            libc::close(fd);
381        }
382    }
383    unsafe {
384        libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, listener_fd, std::ptr::null_mut());
385        libc::close(epfd);
386    }
387}
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392    use std::io::{Read, Write};
393    use std::net::TcpStream;
394    use std::sync::mpsc;
395    use std::time::Duration;
396
397    fn find_free_port() -> u16 {
398        TcpListener::bind("127.0.0.1:0")
399            .unwrap()
400            .local_addr()
401            .unwrap()
402            .port()
403    }
404
405    #[test]
406    fn backbone_accept_connection() {
407        let port = find_free_port();
408        let (tx, rx) = mpsc::channel();
409        let next_id = Arc::new(AtomicU64::new(8000));
410
411        let config = BackboneConfig {
412            name: "test-backbone".into(),
413            listen_ip: "127.0.0.1".into(),
414            listen_port: port,
415            interface_id: InterfaceId(80),
416        };
417
418        start(config, tx, next_id).unwrap();
419        thread::sleep(Duration::from_millis(50));
420
421        let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
422
423        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
424        match event {
425            Event::InterfaceUp(id, writer, info) => {
426                assert_eq!(id, InterfaceId(8000));
427                assert!(writer.is_some());
428                assert!(info.is_some());
429                let info = info.unwrap();
430                assert!(info.out_capable);
431                assert!(info.in_capable);
432            }
433            other => panic!("expected InterfaceUp, got {:?}", other),
434        }
435    }
436
437    #[test]
438    fn backbone_receive_frame() {
439        let port = find_free_port();
440        let (tx, rx) = mpsc::channel();
441        let next_id = Arc::new(AtomicU64::new(8100));
442
443        let config = BackboneConfig {
444            name: "test-backbone".into(),
445            listen_ip: "127.0.0.1".into(),
446            listen_port: port,
447            interface_id: InterfaceId(81),
448        };
449
450        start(config, tx, next_id).unwrap();
451        thread::sleep(Duration::from_millis(50));
452
453        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
454
455        // Drain InterfaceUp
456        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
457
458        // Send HDLC frame (>= 19 bytes)
459        let payload: Vec<u8> = (0..32).collect();
460        client.write_all(&hdlc::frame(&payload)).unwrap();
461
462        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
463        match event {
464            Event::Frame { interface_id, data } => {
465                assert_eq!(interface_id, InterfaceId(8100));
466                assert_eq!(data, payload);
467            }
468            other => panic!("expected Frame, got {:?}", other),
469        }
470    }
471
472    #[test]
473    fn backbone_send_to_client() {
474        let port = find_free_port();
475        let (tx, rx) = mpsc::channel();
476        let next_id = Arc::new(AtomicU64::new(8200));
477
478        let config = BackboneConfig {
479            name: "test-backbone".into(),
480            listen_ip: "127.0.0.1".into(),
481            listen_port: port,
482            interface_id: InterfaceId(82),
483        };
484
485        start(config, tx, next_id).unwrap();
486        thread::sleep(Duration::from_millis(50));
487
488        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
489        client.set_read_timeout(Some(Duration::from_secs(2))).unwrap();
490
491        // Get writer from InterfaceUp
492        let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
493        let mut writer = match event {
494            Event::InterfaceUp(_, Some(w), _) => w,
495            other => panic!("expected InterfaceUp with writer, got {:?}", other),
496        };
497
498        // Send frame via writer
499        let payload: Vec<u8> = (0..24).collect();
500        writer.send_frame(&payload).unwrap();
501
502        // Read from client
503        let mut buf = [0u8; 256];
504        let n = client.read(&mut buf).unwrap();
505        let expected = hdlc::frame(&payload);
506        assert_eq!(&buf[..n], &expected[..]);
507    }
508
509    #[test]
510    fn backbone_multiple_clients() {
511        let port = find_free_port();
512        let (tx, rx) = mpsc::channel();
513        let next_id = Arc::new(AtomicU64::new(8300));
514
515        let config = BackboneConfig {
516            name: "test-backbone".into(),
517            listen_ip: "127.0.0.1".into(),
518            listen_port: port,
519            interface_id: InterfaceId(83),
520        };
521
522        start(config, tx, next_id).unwrap();
523        thread::sleep(Duration::from_millis(50));
524
525        let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
526        let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
527
528        let mut ids = Vec::new();
529        for _ in 0..2 {
530            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
531            match event {
532                Event::InterfaceUp(id, _, _) => ids.push(id),
533                other => panic!("expected InterfaceUp, got {:?}", other),
534            }
535        }
536
537        assert_eq!(ids.len(), 2);
538        assert_ne!(ids[0], ids[1]);
539    }
540
541    #[test]
542    fn backbone_client_disconnect() {
543        let port = find_free_port();
544        let (tx, rx) = mpsc::channel();
545        let next_id = Arc::new(AtomicU64::new(8400));
546
547        let config = BackboneConfig {
548            name: "test-backbone".into(),
549            listen_ip: "127.0.0.1".into(),
550            listen_port: port,
551            interface_id: InterfaceId(84),
552        };
553
554        start(config, tx, next_id).unwrap();
555        thread::sleep(Duration::from_millis(50));
556
557        let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
558
559        // Drain InterfaceUp
560        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
561
562        // Disconnect
563        drop(client);
564
565        // Should receive InterfaceDown
566        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
567        assert!(
568            matches!(event, Event::InterfaceDown(InterfaceId(8400))),
569            "expected InterfaceDown(8400), got {:?}",
570            event
571        );
572    }
573
574    #[test]
575    fn backbone_epoll_multiplexing() {
576        let port = find_free_port();
577        let (tx, rx) = mpsc::channel();
578        let next_id = Arc::new(AtomicU64::new(8500));
579
580        let config = BackboneConfig {
581            name: "test-backbone".into(),
582            listen_ip: "127.0.0.1".into(),
583            listen_port: port,
584            interface_id: InterfaceId(85),
585        };
586
587        start(config, tx, next_id).unwrap();
588        thread::sleep(Duration::from_millis(50));
589
590        let mut client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
591        let mut client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
592
593        // Drain both InterfaceUp events
594        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
595        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
596
597        // Both clients send data simultaneously
598        let payload1: Vec<u8> = (0..24).collect();
599        let payload2: Vec<u8> = (100..130).collect();
600        client1.write_all(&hdlc::frame(&payload1)).unwrap();
601        client2.write_all(&hdlc::frame(&payload2)).unwrap();
602
603        // Should receive both Frame events
604        let mut received = Vec::new();
605        for _ in 0..2 {
606            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
607            match event {
608                Event::Frame { data, .. } => received.push(data),
609                other => panic!("expected Frame, got {:?}", other),
610            }
611        }
612        assert!(received.contains(&payload1));
613        assert!(received.contains(&payload2));
614    }
615
616    #[test]
617    fn backbone_bind_port() {
618        let port = find_free_port();
619        let (tx, _rx) = mpsc::channel();
620        let next_id = Arc::new(AtomicU64::new(8600));
621
622        let config = BackboneConfig {
623            name: "test-backbone".into(),
624            listen_ip: "127.0.0.1".into(),
625            listen_port: port,
626            interface_id: InterfaceId(86),
627        };
628
629        // Should not error
630        start(config, tx, next_id).unwrap();
631    }
632
633    #[test]
634    fn backbone_hdlc_fragmented() {
635        let port = find_free_port();
636        let (tx, rx) = mpsc::channel();
637        let next_id = Arc::new(AtomicU64::new(8700));
638
639        let config = BackboneConfig {
640            name: "test-backbone".into(),
641            listen_ip: "127.0.0.1".into(),
642            listen_port: port,
643            interface_id: InterfaceId(87),
644        };
645
646        start(config, tx, next_id).unwrap();
647        thread::sleep(Duration::from_millis(50));
648
649        let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
650        client.set_nodelay(true).unwrap();
651
652        // Drain InterfaceUp
653        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
654
655        // Send HDLC frame in two fragments
656        let payload: Vec<u8> = (0..32).collect();
657        let framed = hdlc::frame(&payload);
658        let mid = framed.len() / 2;
659
660        client.write_all(&framed[..mid]).unwrap();
661        thread::sleep(Duration::from_millis(50));
662        client.write_all(&framed[mid..]).unwrap();
663
664        // Should receive reassembled frame
665        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
666        match event {
667            Event::Frame { data, .. } => {
668                assert_eq!(data, payload);
669            }
670            other => panic!("expected Frame, got {:?}", other),
671        }
672    }
673}