1use std::collections::HashMap;
10use std::io;
11use std::net::TcpListener;
12use std::os::unix::io::{AsRawFd, RawFd};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use std::thread;
16
17use rns_core::constants;
18use rns_core::transport::types::{InterfaceId, InterfaceInfo};
19
20use crate::event::{Event, EventSender};
21use crate::hdlc;
22use crate::interface::Writer;
23
24#[allow(dead_code)]
26const HW_MTU: usize = 1_048_576;
27
28#[derive(Debug, Clone)]
30pub struct BackboneConfig {
31 pub name: String,
32 pub listen_ip: String,
33 pub listen_port: u16,
34 pub interface_id: InterfaceId,
35}
36
37impl Default for BackboneConfig {
38 fn default() -> Self {
39 BackboneConfig {
40 name: String::new(),
41 listen_ip: "0.0.0.0".into(),
42 listen_port: 0,
43 interface_id: InterfaceId(0),
44 }
45 }
46}
47
48struct BackboneWriter {
50 fd: RawFd,
51}
52
53impl Writer for BackboneWriter {
54 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
55 let framed = hdlc::frame(data);
56 let mut offset = 0;
57 while offset < framed.len() {
58 let n = unsafe {
59 libc::send(
60 self.fd,
61 framed[offset..].as_ptr() as *const libc::c_void,
62 framed.len() - offset,
63 libc::MSG_NOSIGNAL,
64 )
65 };
66 if n < 0 {
67 return Err(io::Error::last_os_error());
68 }
69 offset += n as usize;
70 }
71 Ok(())
72 }
73}
74
75impl Drop for BackboneWriter {
77 fn drop(&mut self) {
78 unsafe {
79 libc::close(self.fd);
80 }
81 }
82}
83
84unsafe impl Send for BackboneWriter {}
86
87pub fn start(
89 config: BackboneConfig,
90 tx: EventSender,
91 next_id: Arc<AtomicU64>,
92) -> io::Result<()> {
93 let addr = format!("{}:{}", config.listen_ip, config.listen_port);
94 let listener = TcpListener::bind(&addr)?;
95 listener.set_nonblocking(true)?;
96
97 log::info!(
98 "[{}] backbone server listening on {}",
99 config.name,
100 listener.local_addr().unwrap_or(addr.parse().unwrap())
101 );
102
103 let name = config.name.clone();
104 thread::Builder::new()
105 .name(format!("backbone-epoll-{}", config.interface_id.0))
106 .spawn(move || {
107 if let Err(e) = epoll_loop(listener, name, tx, next_id) {
108 log::error!("backbone epoll loop error: {}", e);
109 }
110 })?;
111
112 Ok(())
113}
114
115struct ClientState {
117 id: InterfaceId,
118 decoder: hdlc::Decoder,
119}
120
121fn epoll_loop(
123 listener: TcpListener,
124 name: String,
125 tx: EventSender,
126 next_id: Arc<AtomicU64>,
127) -> io::Result<()> {
128 let epfd = unsafe { libc::epoll_create1(0) };
129 if epfd < 0 {
130 return Err(io::Error::last_os_error());
131 }
132
133 let listener_fd = listener.as_raw_fd();
135 let mut ev = libc::epoll_event {
136 events: libc::EPOLLIN as u32,
137 u64: listener_fd as u64,
138 };
139 if unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, listener_fd, &mut ev) } < 0 {
140 unsafe { libc::close(epfd) };
141 return Err(io::Error::last_os_error());
142 }
143
144 let mut clients: HashMap<RawFd, ClientState> = HashMap::new();
145 let mut events = vec![libc::epoll_event { events: 0, u64: 0 }; 64];
146
147 loop {
148 let nfds = unsafe {
149 libc::epoll_wait(epfd, events.as_mut_ptr(), events.len() as i32, 1000)
150 };
151
152 if nfds < 0 {
153 let err = io::Error::last_os_error();
154 if err.kind() == io::ErrorKind::Interrupted {
155 continue;
156 }
157 for (&fd, _) in &clients {
159 unsafe {
160 libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
161 libc::close(fd);
162 }
163 }
164 unsafe { libc::close(epfd) };
165 return Err(err);
166 }
167
168 for i in 0..nfds as usize {
169 let ev = &events[i];
170 let fd = ev.u64 as RawFd;
171
172 if fd == listener_fd {
173 loop {
175 match listener.accept() {
176 Ok((stream, peer_addr)) => {
177 let client_fd = stream.as_raw_fd();
178
179 stream.set_nonblocking(true).ok();
181 stream.set_nodelay(true).ok();
182
183 set_tcp_keepalive(client_fd);
185
186 let client_id =
187 InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
188
189 log::info!(
190 "[{}] backbone client connected: {} → id {}",
191 name,
192 peer_addr,
193 client_id.0
194 );
195
196 let mut cev = libc::epoll_event {
198 events: libc::EPOLLIN as u32,
199 u64: client_fd as u64,
200 };
201 if unsafe {
202 libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, client_fd, &mut cev)
203 } < 0
204 {
205 log::warn!(
206 "[{}] failed to add client to epoll: {}",
207 name,
208 io::Error::last_os_error()
209 );
210 continue;
212 }
213
214 std::mem::forget(stream);
217
218 let writer_fd = unsafe { libc::dup(client_fd) };
220 if writer_fd < 0 {
221 log::warn!("[{}] failed to dup client fd", name);
222 unsafe {
223 libc::epoll_ctl(
224 epfd,
225 libc::EPOLL_CTL_DEL,
226 client_fd,
227 std::ptr::null_mut(),
228 );
229 libc::close(client_fd);
230 }
231 continue;
232 }
233 let writer: Box<dyn Writer> =
234 Box::new(BackboneWriter { fd: writer_fd });
235
236 clients.insert(
237 client_fd,
238 ClientState {
239 id: client_id,
240 decoder: hdlc::Decoder::new(),
241 },
242 );
243
244 let info = InterfaceInfo {
245 id: client_id,
246 name: format!("BackboneInterface/{}", client_fd),
247 mode: constants::MODE_FULL,
248 out_capable: true,
249 in_capable: true,
250 bitrate: Some(1_000_000_000), announce_rate_target: None,
252 announce_rate_grace: 0,
253 announce_rate_penalty: 0.0,
254 announce_cap: constants::ANNOUNCE_CAP,
255 is_local_client: false,
256 wants_tunnel: false,
257 tunnel_id: None,
258 };
259
260 if tx
261 .send(Event::InterfaceUp(
262 client_id,
263 Some(writer),
264 Some(info),
265 ))
266 .is_err()
267 {
268 cleanup(epfd, &clients, listener_fd);
270 return Ok(());
271 }
272
273 }
274 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
275 Err(e) => {
276 log::warn!("[{}] accept error: {}", name, e);
277 break;
278 }
279 }
280 }
281 } else if clients.contains_key(&fd) {
282 let mut should_remove = false;
284 let mut client_id = InterfaceId(0);
285
286 if ev.events & libc::EPOLLIN as u32 != 0 {
287 let mut buf = [0u8; 4096];
288 let n = unsafe {
289 libc::recv(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len(), 0)
290 };
291
292 if n <= 0 {
293 if let Some(c) = clients.get(&fd) {
294 client_id = c.id;
295 }
296 should_remove = true;
297 } else if let Some(client) = clients.get_mut(&fd) {
298 client_id = client.id;
299 for frame in client.decoder.feed(&buf[..n as usize]) {
300 if tx
301 .send(Event::Frame {
302 interface_id: client_id,
303 data: frame,
304 })
305 .is_err()
306 {
307 cleanup(epfd, &clients, listener_fd);
308 return Ok(());
309 }
310 }
311 }
312 }
313
314 if ev.events & (libc::EPOLLHUP | libc::EPOLLERR) as u32 != 0 {
315 if let Some(c) = clients.get(&fd) {
316 client_id = c.id;
317 }
318 should_remove = true;
319 }
320
321 if should_remove {
322 log::info!(
323 "[{}] backbone client {} disconnected",
324 name,
325 client_id.0
326 );
327 unsafe {
328 libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
329 libc::close(fd);
330 }
331 clients.remove(&fd);
332 let _ = tx.send(Event::InterfaceDown(client_id));
333 }
334 }
335 }
336 }
337}
338
339fn set_tcp_keepalive(fd: RawFd) {
340 unsafe {
341 let one: libc::c_int = 1;
342 libc::setsockopt(
343 fd,
344 libc::SOL_SOCKET,
345 libc::SO_KEEPALIVE,
346 &one as *const _ as *const libc::c_void,
347 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
348 );
349 let idle: libc::c_int = 5;
350 libc::setsockopt(
351 fd,
352 libc::IPPROTO_TCP,
353 libc::TCP_KEEPIDLE,
354 &idle as *const _ as *const libc::c_void,
355 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
356 );
357 let interval: libc::c_int = 2;
358 libc::setsockopt(
359 fd,
360 libc::IPPROTO_TCP,
361 libc::TCP_KEEPINTVL,
362 &interval as *const _ as *const libc::c_void,
363 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
364 );
365 let cnt: libc::c_int = 12;
366 libc::setsockopt(
367 fd,
368 libc::IPPROTO_TCP,
369 libc::TCP_KEEPCNT,
370 &cnt as *const _ as *const libc::c_void,
371 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
372 );
373 }
374}
375
376fn cleanup(epfd: RawFd, clients: &HashMap<RawFd, ClientState>, listener_fd: RawFd) {
377 for (&fd, _) in clients {
378 unsafe {
379 libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
380 libc::close(fd);
381 }
382 }
383 unsafe {
384 libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, listener_fd, std::ptr::null_mut());
385 libc::close(epfd);
386 }
387}
388
389#[cfg(test)]
390mod tests {
391 use super::*;
392 use std::io::{Read, Write};
393 use std::net::TcpStream;
394 use std::sync::mpsc;
395 use std::time::Duration;
396
397 fn find_free_port() -> u16 {
398 TcpListener::bind("127.0.0.1:0")
399 .unwrap()
400 .local_addr()
401 .unwrap()
402 .port()
403 }
404
405 #[test]
406 fn backbone_accept_connection() {
407 let port = find_free_port();
408 let (tx, rx) = mpsc::channel();
409 let next_id = Arc::new(AtomicU64::new(8000));
410
411 let config = BackboneConfig {
412 name: "test-backbone".into(),
413 listen_ip: "127.0.0.1".into(),
414 listen_port: port,
415 interface_id: InterfaceId(80),
416 };
417
418 start(config, tx, next_id).unwrap();
419 thread::sleep(Duration::from_millis(50));
420
421 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
422
423 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
424 match event {
425 Event::InterfaceUp(id, writer, info) => {
426 assert_eq!(id, InterfaceId(8000));
427 assert!(writer.is_some());
428 assert!(info.is_some());
429 let info = info.unwrap();
430 assert!(info.out_capable);
431 assert!(info.in_capable);
432 }
433 other => panic!("expected InterfaceUp, got {:?}", other),
434 }
435 }
436
437 #[test]
438 fn backbone_receive_frame() {
439 let port = find_free_port();
440 let (tx, rx) = mpsc::channel();
441 let next_id = Arc::new(AtomicU64::new(8100));
442
443 let config = BackboneConfig {
444 name: "test-backbone".into(),
445 listen_ip: "127.0.0.1".into(),
446 listen_port: port,
447 interface_id: InterfaceId(81),
448 };
449
450 start(config, tx, next_id).unwrap();
451 thread::sleep(Duration::from_millis(50));
452
453 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
454
455 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
457
458 let payload: Vec<u8> = (0..32).collect();
460 client.write_all(&hdlc::frame(&payload)).unwrap();
461
462 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
463 match event {
464 Event::Frame { interface_id, data } => {
465 assert_eq!(interface_id, InterfaceId(8100));
466 assert_eq!(data, payload);
467 }
468 other => panic!("expected Frame, got {:?}", other),
469 }
470 }
471
472 #[test]
473 fn backbone_send_to_client() {
474 let port = find_free_port();
475 let (tx, rx) = mpsc::channel();
476 let next_id = Arc::new(AtomicU64::new(8200));
477
478 let config = BackboneConfig {
479 name: "test-backbone".into(),
480 listen_ip: "127.0.0.1".into(),
481 listen_port: port,
482 interface_id: InterfaceId(82),
483 };
484
485 start(config, tx, next_id).unwrap();
486 thread::sleep(Duration::from_millis(50));
487
488 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
489 client.set_read_timeout(Some(Duration::from_secs(2))).unwrap();
490
491 let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
493 let mut writer = match event {
494 Event::InterfaceUp(_, Some(w), _) => w,
495 other => panic!("expected InterfaceUp with writer, got {:?}", other),
496 };
497
498 let payload: Vec<u8> = (0..24).collect();
500 writer.send_frame(&payload).unwrap();
501
502 let mut buf = [0u8; 256];
504 let n = client.read(&mut buf).unwrap();
505 let expected = hdlc::frame(&payload);
506 assert_eq!(&buf[..n], &expected[..]);
507 }
508
509 #[test]
510 fn backbone_multiple_clients() {
511 let port = find_free_port();
512 let (tx, rx) = mpsc::channel();
513 let next_id = Arc::new(AtomicU64::new(8300));
514
515 let config = BackboneConfig {
516 name: "test-backbone".into(),
517 listen_ip: "127.0.0.1".into(),
518 listen_port: port,
519 interface_id: InterfaceId(83),
520 };
521
522 start(config, tx, next_id).unwrap();
523 thread::sleep(Duration::from_millis(50));
524
525 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
526 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
527
528 let mut ids = Vec::new();
529 for _ in 0..2 {
530 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
531 match event {
532 Event::InterfaceUp(id, _, _) => ids.push(id),
533 other => panic!("expected InterfaceUp, got {:?}", other),
534 }
535 }
536
537 assert_eq!(ids.len(), 2);
538 assert_ne!(ids[0], ids[1]);
539 }
540
541 #[test]
542 fn backbone_client_disconnect() {
543 let port = find_free_port();
544 let (tx, rx) = mpsc::channel();
545 let next_id = Arc::new(AtomicU64::new(8400));
546
547 let config = BackboneConfig {
548 name: "test-backbone".into(),
549 listen_ip: "127.0.0.1".into(),
550 listen_port: port,
551 interface_id: InterfaceId(84),
552 };
553
554 start(config, tx, next_id).unwrap();
555 thread::sleep(Duration::from_millis(50));
556
557 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
558
559 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
561
562 drop(client);
564
565 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
567 assert!(
568 matches!(event, Event::InterfaceDown(InterfaceId(8400))),
569 "expected InterfaceDown(8400), got {:?}",
570 event
571 );
572 }
573
574 #[test]
575 fn backbone_epoll_multiplexing() {
576 let port = find_free_port();
577 let (tx, rx) = mpsc::channel();
578 let next_id = Arc::new(AtomicU64::new(8500));
579
580 let config = BackboneConfig {
581 name: "test-backbone".into(),
582 listen_ip: "127.0.0.1".into(),
583 listen_port: port,
584 interface_id: InterfaceId(85),
585 };
586
587 start(config, tx, next_id).unwrap();
588 thread::sleep(Duration::from_millis(50));
589
590 let mut client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
591 let mut client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
592
593 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
595 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
596
597 let payload1: Vec<u8> = (0..24).collect();
599 let payload2: Vec<u8> = (100..130).collect();
600 client1.write_all(&hdlc::frame(&payload1)).unwrap();
601 client2.write_all(&hdlc::frame(&payload2)).unwrap();
602
603 let mut received = Vec::new();
605 for _ in 0..2 {
606 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
607 match event {
608 Event::Frame { data, .. } => received.push(data),
609 other => panic!("expected Frame, got {:?}", other),
610 }
611 }
612 assert!(received.contains(&payload1));
613 assert!(received.contains(&payload2));
614 }
615
616 #[test]
617 fn backbone_bind_port() {
618 let port = find_free_port();
619 let (tx, _rx) = mpsc::channel();
620 let next_id = Arc::new(AtomicU64::new(8600));
621
622 let config = BackboneConfig {
623 name: "test-backbone".into(),
624 listen_ip: "127.0.0.1".into(),
625 listen_port: port,
626 interface_id: InterfaceId(86),
627 };
628
629 start(config, tx, next_id).unwrap();
631 }
632
633 #[test]
634 fn backbone_hdlc_fragmented() {
635 let port = find_free_port();
636 let (tx, rx) = mpsc::channel();
637 let next_id = Arc::new(AtomicU64::new(8700));
638
639 let config = BackboneConfig {
640 name: "test-backbone".into(),
641 listen_ip: "127.0.0.1".into(),
642 listen_port: port,
643 interface_id: InterfaceId(87),
644 };
645
646 start(config, tx, next_id).unwrap();
647 thread::sleep(Duration::from_millis(50));
648
649 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
650 client.set_nodelay(true).unwrap();
651
652 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
654
655 let payload: Vec<u8> = (0..32).collect();
657 let framed = hdlc::frame(&payload);
658 let mid = framed.len() / 2;
659
660 client.write_all(&framed[..mid]).unwrap();
661 thread::sleep(Duration::from_millis(50));
662 client.write_all(&framed[mid..]).unwrap();
663
664 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
666 match event {
667 Event::Frame { data, .. } => {
668 assert_eq!(data, payload);
669 }
670 other => panic!("expected Frame, got {:?}", other),
671 }
672 }
673}