1use std::collections::HashMap;
13use std::io::{self, Read, Write};
14use std::net::{TcpListener, TcpStream, ToSocketAddrs};
15use std::os::unix::io::{AsRawFd, RawFd};
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::sync::Arc;
18use std::thread;
19use std::time::Duration;
20
21use rns_core::constants;
22use rns_core::transport::types::{InterfaceId, InterfaceInfo};
23
24use crate::event::{Event, EventSender};
25use crate::hdlc;
26use crate::interface::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult, Writer};
27
28#[allow(dead_code)]
30const HW_MTU: usize = 1_048_576;
31
32#[derive(Debug, Clone)]
34pub struct BackboneConfig {
35 pub name: String,
36 pub listen_ip: String,
37 pub listen_port: u16,
38 pub interface_id: InterfaceId,
39}
40
41impl Default for BackboneConfig {
42 fn default() -> Self {
43 BackboneConfig {
44 name: String::new(),
45 listen_ip: "0.0.0.0".into(),
46 listen_port: 0,
47 interface_id: InterfaceId(0),
48 }
49 }
50}
51
52struct BackboneWriter {
54 fd: RawFd,
55}
56
57impl Writer for BackboneWriter {
58 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
59 let framed = hdlc::frame(data);
60 let mut offset = 0;
61 while offset < framed.len() {
62 let n = unsafe {
63 libc::send(
64 self.fd,
65 framed[offset..].as_ptr() as *const libc::c_void,
66 framed.len() - offset,
67 libc::MSG_NOSIGNAL,
68 )
69 };
70 if n < 0 {
71 return Err(io::Error::last_os_error());
72 }
73 offset += n as usize;
74 }
75 Ok(())
76 }
77}
78
79impl Drop for BackboneWriter {
81 fn drop(&mut self) {
82 unsafe {
83 libc::close(self.fd);
84 }
85 }
86}
87
88unsafe impl Send for BackboneWriter {}
90
91pub fn start(config: BackboneConfig, tx: EventSender, next_id: Arc<AtomicU64>) -> io::Result<()> {
93 let addr = format!("{}:{}", config.listen_ip, config.listen_port);
94 let listener = TcpListener::bind(&addr)?;
95 listener.set_nonblocking(true)?;
96
97 log::info!(
98 "[{}] backbone server listening on {}",
99 config.name,
100 listener.local_addr().unwrap_or(addr.parse().unwrap())
101 );
102
103 let name = config.name.clone();
104 thread::Builder::new()
105 .name(format!("backbone-epoll-{}", config.interface_id.0))
106 .spawn(move || {
107 if let Err(e) = epoll_loop(listener, name, tx, next_id) {
108 log::error!("backbone epoll loop error: {}", e);
109 }
110 })?;
111
112 Ok(())
113}
114
115struct ClientState {
117 id: InterfaceId,
118 decoder: hdlc::Decoder,
119}
120
121fn epoll_loop(
123 listener: TcpListener,
124 name: String,
125 tx: EventSender,
126 next_id: Arc<AtomicU64>,
127) -> io::Result<()> {
128 let epfd = unsafe { libc::epoll_create1(0) };
129 if epfd < 0 {
130 return Err(io::Error::last_os_error());
131 }
132
133 let listener_fd = listener.as_raw_fd();
135 let mut ev = libc::epoll_event {
136 events: libc::EPOLLIN as u32,
137 u64: listener_fd as u64,
138 };
139 if unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, listener_fd, &mut ev) } < 0 {
140 unsafe { libc::close(epfd) };
141 return Err(io::Error::last_os_error());
142 }
143
144 let mut clients: HashMap<RawFd, ClientState> = HashMap::new();
145 let mut events = vec![libc::epoll_event { events: 0, u64: 0 }; 64];
146
147 loop {
148 let nfds =
149 unsafe { libc::epoll_wait(epfd, events.as_mut_ptr(), events.len() as i32, 1000) };
150
151 if nfds < 0 {
152 let err = io::Error::last_os_error();
153 if err.kind() == io::ErrorKind::Interrupted {
154 continue;
155 }
156 for (&fd, _) in &clients {
158 unsafe {
159 libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
160 libc::close(fd);
161 }
162 }
163 unsafe { libc::close(epfd) };
164 return Err(err);
165 }
166
167 for i in 0..nfds as usize {
168 let ev = &events[i];
169 let fd = ev.u64 as RawFd;
170
171 if fd == listener_fd {
172 loop {
174 match listener.accept() {
175 Ok((stream, peer_addr)) => {
176 let client_fd = stream.as_raw_fd();
177
178 stream.set_nonblocking(true).ok();
180 stream.set_nodelay(true).ok();
181
182 set_tcp_keepalive(client_fd);
184
185 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
186
187 log::info!(
188 "[{}] backbone client connected: {} → id {}",
189 name,
190 peer_addr,
191 client_id.0
192 );
193
194 let mut cev = libc::epoll_event {
196 events: libc::EPOLLIN as u32,
197 u64: client_fd as u64,
198 };
199 if unsafe {
200 libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, client_fd, &mut cev)
201 } < 0
202 {
203 log::warn!(
204 "[{}] failed to add client to epoll: {}",
205 name,
206 io::Error::last_os_error()
207 );
208 continue;
210 }
211
212 std::mem::forget(stream);
215
216 let writer_fd = unsafe { libc::dup(client_fd) };
218 if writer_fd < 0 {
219 log::warn!("[{}] failed to dup client fd", name);
220 unsafe {
221 libc::epoll_ctl(
222 epfd,
223 libc::EPOLL_CTL_DEL,
224 client_fd,
225 std::ptr::null_mut(),
226 );
227 libc::close(client_fd);
228 }
229 continue;
230 }
231 let writer: Box<dyn Writer> =
232 Box::new(BackboneWriter { fd: writer_fd });
233
234 clients.insert(
235 client_fd,
236 ClientState {
237 id: client_id,
238 decoder: hdlc::Decoder::new(),
239 },
240 );
241
242 let info = InterfaceInfo {
243 id: client_id,
244 name: format!("BackboneInterface/{}", client_fd),
245 mode: constants::MODE_FULL,
246 out_capable: true,
247 in_capable: true,
248 bitrate: Some(1_000_000_000), announce_rate_target: None,
250 announce_rate_grace: 0,
251 announce_rate_penalty: 0.0,
252 announce_cap: constants::ANNOUNCE_CAP,
253 is_local_client: false,
254 wants_tunnel: false,
255 tunnel_id: None,
256 mtu: 65535,
257 ia_freq: 0.0,
258 started: 0.0,
259 ingress_control: true,
260 };
261
262 if tx
263 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
264 .is_err()
265 {
266 cleanup(epfd, &clients, listener_fd);
268 return Ok(());
269 }
270 }
271 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
272 Err(e) => {
273 log::warn!("[{}] accept error: {}", name, e);
274 break;
275 }
276 }
277 }
278 } else if clients.contains_key(&fd) {
279 let mut should_remove = false;
281 let mut client_id = InterfaceId(0);
282
283 if ev.events & libc::EPOLLIN as u32 != 0 {
284 let mut buf = [0u8; 4096];
285 let n = unsafe {
286 libc::recv(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len(), 0)
287 };
288
289 if n <= 0 {
290 if let Some(c) = clients.get(&fd) {
291 client_id = c.id;
292 }
293 should_remove = true;
294 } else if let Some(client) = clients.get_mut(&fd) {
295 client_id = client.id;
296 for frame in client.decoder.feed(&buf[..n as usize]) {
297 if tx
298 .send(Event::Frame {
299 interface_id: client_id,
300 data: frame,
301 })
302 .is_err()
303 {
304 cleanup(epfd, &clients, listener_fd);
305 return Ok(());
306 }
307 }
308 }
309 }
310
311 if ev.events & (libc::EPOLLHUP | libc::EPOLLERR) as u32 != 0 {
312 if let Some(c) = clients.get(&fd) {
313 client_id = c.id;
314 }
315 should_remove = true;
316 }
317
318 if should_remove {
319 log::info!("[{}] backbone client {} disconnected", name, client_id.0);
320 unsafe {
321 libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
322 libc::close(fd);
323 }
324 clients.remove(&fd);
325 let _ = tx.send(Event::InterfaceDown(client_id));
326 }
327 }
328 }
329 }
330}
331
332fn set_tcp_keepalive(fd: RawFd) {
333 unsafe {
334 let one: libc::c_int = 1;
335 libc::setsockopt(
336 fd,
337 libc::SOL_SOCKET,
338 libc::SO_KEEPALIVE,
339 &one as *const _ as *const libc::c_void,
340 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
341 );
342 let idle: libc::c_int = 5;
343 libc::setsockopt(
344 fd,
345 libc::IPPROTO_TCP,
346 libc::TCP_KEEPIDLE,
347 &idle as *const _ as *const libc::c_void,
348 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
349 );
350 let interval: libc::c_int = 2;
351 libc::setsockopt(
352 fd,
353 libc::IPPROTO_TCP,
354 libc::TCP_KEEPINTVL,
355 &interval as *const _ as *const libc::c_void,
356 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
357 );
358 let cnt: libc::c_int = 12;
359 libc::setsockopt(
360 fd,
361 libc::IPPROTO_TCP,
362 libc::TCP_KEEPCNT,
363 &cnt as *const _ as *const libc::c_void,
364 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
365 );
366 }
367}
368
369fn cleanup(epfd: RawFd, clients: &HashMap<RawFd, ClientState>, listener_fd: RawFd) {
370 for (&fd, _) in clients {
371 unsafe {
372 libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
373 libc::close(fd);
374 }
375 }
376 unsafe {
377 libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, listener_fd, std::ptr::null_mut());
378 libc::close(epfd);
379 }
380}
381
382#[derive(Debug, Clone)]
388pub struct BackboneClientConfig {
389 pub name: String,
390 pub target_host: String,
391 pub target_port: u16,
392 pub interface_id: InterfaceId,
393 pub reconnect_wait: Duration,
394 pub max_reconnect_tries: Option<u32>,
395 pub connect_timeout: Duration,
396 pub transport_identity: Option<String>,
397}
398
399impl Default for BackboneClientConfig {
400 fn default() -> Self {
401 BackboneClientConfig {
402 name: String::new(),
403 target_host: "127.0.0.1".into(),
404 target_port: 4242,
405 interface_id: InterfaceId(0),
406 reconnect_wait: Duration::from_secs(5),
407 max_reconnect_tries: None,
408 connect_timeout: Duration::from_secs(5),
409 transport_identity: None,
410 }
411 }
412}
413
414struct BackboneClientWriter {
416 stream: TcpStream,
417}
418
419impl Writer for BackboneClientWriter {
420 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
421 self.stream.write_all(&hdlc::frame(data))
422 }
423}
424
425fn try_connect_client(config: &BackboneClientConfig) -> io::Result<TcpStream> {
427 let addr_str = format!("{}:{}", config.target_host, config.target_port);
428 let addr = addr_str
429 .to_socket_addrs()?
430 .next()
431 .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "no addresses resolved"))?;
432
433 let stream = TcpStream::connect_timeout(&addr, config.connect_timeout)?;
434 stream.set_nodelay(true)?;
435 set_tcp_keepalive(stream.as_raw_fd());
436 Ok(stream)
437}
438
439pub fn start_client(config: BackboneClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
441 let stream = try_connect_client(&config)?;
442 let reader_stream = stream.try_clone()?;
443 let writer_stream = stream.try_clone()?;
444
445 let id = config.interface_id;
446 log::info!(
447 "[{}] backbone client connected to {}:{}",
448 config.name,
449 config.target_host,
450 config.target_port
451 );
452
453 let _ = tx.send(Event::InterfaceUp(id, None, None));
455
456 thread::Builder::new()
457 .name(format!("backbone-client-{}", id.0))
458 .spawn(move || {
459 client_reader_loop(reader_stream, config, tx);
460 })?;
461
462 Ok(Box::new(BackboneClientWriter {
463 stream: writer_stream,
464 }))
465}
466
467fn client_reader_loop(mut stream: TcpStream, config: BackboneClientConfig, tx: EventSender) {
470 let id = config.interface_id;
471 let mut decoder = hdlc::Decoder::new();
472 let mut buf = [0u8; 4096];
473
474 loop {
475 match stream.read(&mut buf) {
476 Ok(0) => {
477 log::warn!("[{}] connection closed", config.name);
478 let _ = tx.send(Event::InterfaceDown(id));
479 match client_reconnect(&config, &tx) {
480 Some(new_stream) => {
481 stream = new_stream;
482 decoder = hdlc::Decoder::new();
483 continue;
484 }
485 None => {
486 log::error!("[{}] reconnection failed, giving up", config.name);
487 return;
488 }
489 }
490 }
491 Ok(n) => {
492 for frame in decoder.feed(&buf[..n]) {
493 if tx
494 .send(Event::Frame {
495 interface_id: id,
496 data: frame,
497 })
498 .is_err()
499 {
500 return;
501 }
502 }
503 }
504 Err(e) => {
505 log::warn!("[{}] read error: {}", config.name, e);
506 let _ = tx.send(Event::InterfaceDown(id));
507 match client_reconnect(&config, &tx) {
508 Some(new_stream) => {
509 stream = new_stream;
510 decoder = hdlc::Decoder::new();
511 continue;
512 }
513 None => {
514 log::error!("[{}] reconnection failed, giving up", config.name);
515 return;
516 }
517 }
518 }
519 }
520 }
521}
522
523fn client_reconnect(config: &BackboneClientConfig, tx: &EventSender) -> Option<TcpStream> {
526 let mut attempts = 0u32;
527 loop {
528 thread::sleep(config.reconnect_wait);
529 attempts += 1;
530
531 if let Some(max) = config.max_reconnect_tries {
532 if attempts > max {
533 let _ = tx.send(Event::InterfaceDown(config.interface_id));
534 return None;
535 }
536 }
537
538 log::info!("[{}] reconnect attempt {} ...", config.name, attempts);
539
540 match try_connect_client(config) {
541 Ok(new_stream) => {
542 let writer_stream = match new_stream.try_clone() {
543 Ok(s) => s,
544 Err(e) => {
545 log::warn!("[{}] failed to clone stream: {}", config.name, e);
546 continue;
547 }
548 };
549 log::info!("[{}] reconnected", config.name);
550 let new_writer: Box<dyn Writer> = Box::new(BackboneClientWriter {
551 stream: writer_stream,
552 });
553 let _ = tx.send(Event::InterfaceUp(
554 config.interface_id,
555 Some(new_writer),
556 None,
557 ));
558 return Some(new_stream);
559 }
560 Err(e) => {
561 log::warn!("[{}] reconnect failed: {}", config.name, e);
562 }
563 }
564 }
565}
566
567enum BackboneMode {
574 Server(BackboneConfig),
575 Client(BackboneClientConfig),
576}
577
578pub struct BackboneInterfaceFactory;
584
585impl InterfaceFactory for BackboneInterfaceFactory {
586 fn type_name(&self) -> &str {
587 "BackboneInterface"
588 }
589
590 fn parse_config(
591 &self,
592 name: &str,
593 id: InterfaceId,
594 params: &HashMap<String, String>,
595 ) -> Result<Box<dyn InterfaceConfigData>, String> {
596 if let Some(target_host) = params.get("remote").or_else(|| params.get("target_host")) {
597 let target_host = target_host.clone();
599 let target_port = params
600 .get("target_port")
601 .or_else(|| params.get("port"))
602 .and_then(|v| v.parse().ok())
603 .unwrap_or(4242);
604 let transport_identity = params.get("transport_identity").cloned();
605 Ok(Box::new(BackboneMode::Client(BackboneClientConfig {
606 name: name.to_string(),
607 target_host,
608 target_port,
609 interface_id: id,
610 transport_identity,
611 ..BackboneClientConfig::default()
612 })))
613 } else {
614 let listen_ip = params
616 .get("listen_ip")
617 .or_else(|| params.get("device"))
618 .cloned()
619 .unwrap_or_else(|| "0.0.0.0".into());
620 let listen_port = params
621 .get("listen_port")
622 .or_else(|| params.get("port"))
623 .and_then(|v| v.parse().ok())
624 .unwrap_or(4242);
625 Ok(Box::new(BackboneMode::Server(BackboneConfig {
626 name: name.to_string(),
627 listen_ip,
628 listen_port,
629 interface_id: id,
630 })))
631 }
632 }
633
634 fn start(
635 &self,
636 config: Box<dyn InterfaceConfigData>,
637 ctx: StartContext,
638 ) -> io::Result<StartResult> {
639 let mode = *config.into_any().downcast::<BackboneMode>().map_err(|_| {
640 io::Error::new(
641 io::ErrorKind::InvalidData,
642 "wrong config type for BackboneInterface",
643 )
644 })?;
645
646 match mode {
647 BackboneMode::Client(cfg) => {
648 let id = cfg.interface_id;
649 let name = cfg.name.clone();
650 let info = InterfaceInfo {
651 id,
652 name,
653 mode: ctx.mode,
654 out_capable: true,
655 in_capable: true,
656 bitrate: Some(1_000_000_000),
657 announce_rate_target: None,
658 announce_rate_grace: 0,
659 announce_rate_penalty: 0.0,
660 announce_cap: constants::ANNOUNCE_CAP,
661 is_local_client: false,
662 wants_tunnel: false,
663 tunnel_id: None,
664 mtu: 65535,
665 ingress_control: true,
666 ia_freq: 0.0,
667 started: crate::time::now(),
668 };
669 let writer = start_client(cfg, ctx.tx)?;
670 Ok(StartResult::Simple {
671 id,
672 info,
673 writer,
674 interface_type_name: "BackboneInterface".to_string(),
675 })
676 }
677 BackboneMode::Server(cfg) => {
678 start(cfg, ctx.tx, ctx.next_dynamic_id)?;
679 Ok(StartResult::Listener)
680 }
681 }
682 }
683}
684
685#[cfg(test)]
686mod tests {
687 use super::*;
688 use std::sync::mpsc;
689 use std::time::Duration;
690
691 fn find_free_port() -> u16 {
692 TcpListener::bind("127.0.0.1:0")
693 .unwrap()
694 .local_addr()
695 .unwrap()
696 .port()
697 }
698
699 #[test]
700 fn backbone_accept_connection() {
701 let port = find_free_port();
702 let (tx, rx) = mpsc::channel();
703 let next_id = Arc::new(AtomicU64::new(8000));
704
705 let config = BackboneConfig {
706 name: "test-backbone".into(),
707 listen_ip: "127.0.0.1".into(),
708 listen_port: port,
709 interface_id: InterfaceId(80),
710 };
711
712 start(config, tx, next_id).unwrap();
713 thread::sleep(Duration::from_millis(50));
714
715 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
716
717 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
718 match event {
719 Event::InterfaceUp(id, writer, info) => {
720 assert_eq!(id, InterfaceId(8000));
721 assert!(writer.is_some());
722 assert!(info.is_some());
723 let info = info.unwrap();
724 assert!(info.out_capable);
725 assert!(info.in_capable);
726 }
727 other => panic!("expected InterfaceUp, got {:?}", other),
728 }
729 }
730
731 #[test]
732 fn backbone_receive_frame() {
733 let port = find_free_port();
734 let (tx, rx) = mpsc::channel();
735 let next_id = Arc::new(AtomicU64::new(8100));
736
737 let config = BackboneConfig {
738 name: "test-backbone".into(),
739 listen_ip: "127.0.0.1".into(),
740 listen_port: port,
741 interface_id: InterfaceId(81),
742 };
743
744 start(config, tx, next_id).unwrap();
745 thread::sleep(Duration::from_millis(50));
746
747 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
748
749 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
751
752 let payload: Vec<u8> = (0..32).collect();
754 client.write_all(&hdlc::frame(&payload)).unwrap();
755
756 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
757 match event {
758 Event::Frame { interface_id, data } => {
759 assert_eq!(interface_id, InterfaceId(8100));
760 assert_eq!(data, payload);
761 }
762 other => panic!("expected Frame, got {:?}", other),
763 }
764 }
765
766 #[test]
767 fn backbone_send_to_client() {
768 let port = find_free_port();
769 let (tx, rx) = mpsc::channel();
770 let next_id = Arc::new(AtomicU64::new(8200));
771
772 let config = BackboneConfig {
773 name: "test-backbone".into(),
774 listen_ip: "127.0.0.1".into(),
775 listen_port: port,
776 interface_id: InterfaceId(82),
777 };
778
779 start(config, tx, next_id).unwrap();
780 thread::sleep(Duration::from_millis(50));
781
782 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
783 client
784 .set_read_timeout(Some(Duration::from_secs(2)))
785 .unwrap();
786
787 let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
789 let mut writer = match event {
790 Event::InterfaceUp(_, Some(w), _) => w,
791 other => panic!("expected InterfaceUp with writer, got {:?}", other),
792 };
793
794 let payload: Vec<u8> = (0..24).collect();
796 writer.send_frame(&payload).unwrap();
797
798 let mut buf = [0u8; 256];
800 let n = client.read(&mut buf).unwrap();
801 let expected = hdlc::frame(&payload);
802 assert_eq!(&buf[..n], &expected[..]);
803 }
804
805 #[test]
806 fn backbone_multiple_clients() {
807 let port = find_free_port();
808 let (tx, rx) = mpsc::channel();
809 let next_id = Arc::new(AtomicU64::new(8300));
810
811 let config = BackboneConfig {
812 name: "test-backbone".into(),
813 listen_ip: "127.0.0.1".into(),
814 listen_port: port,
815 interface_id: InterfaceId(83),
816 };
817
818 start(config, tx, next_id).unwrap();
819 thread::sleep(Duration::from_millis(50));
820
821 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
822 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
823
824 let mut ids = Vec::new();
825 for _ in 0..2 {
826 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
827 match event {
828 Event::InterfaceUp(id, _, _) => ids.push(id),
829 other => panic!("expected InterfaceUp, got {:?}", other),
830 }
831 }
832
833 assert_eq!(ids.len(), 2);
834 assert_ne!(ids[0], ids[1]);
835 }
836
837 #[test]
838 fn backbone_client_disconnect() {
839 let port = find_free_port();
840 let (tx, rx) = mpsc::channel();
841 let next_id = Arc::new(AtomicU64::new(8400));
842
843 let config = BackboneConfig {
844 name: "test-backbone".into(),
845 listen_ip: "127.0.0.1".into(),
846 listen_port: port,
847 interface_id: InterfaceId(84),
848 };
849
850 start(config, tx, next_id).unwrap();
851 thread::sleep(Duration::from_millis(50));
852
853 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
854
855 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
857
858 drop(client);
860
861 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
863 assert!(
864 matches!(event, Event::InterfaceDown(InterfaceId(8400))),
865 "expected InterfaceDown(8400), got {:?}",
866 event
867 );
868 }
869
870 #[test]
871 fn backbone_epoll_multiplexing() {
872 let port = find_free_port();
873 let (tx, rx) = mpsc::channel();
874 let next_id = Arc::new(AtomicU64::new(8500));
875
876 let config = BackboneConfig {
877 name: "test-backbone".into(),
878 listen_ip: "127.0.0.1".into(),
879 listen_port: port,
880 interface_id: InterfaceId(85),
881 };
882
883 start(config, tx, next_id).unwrap();
884 thread::sleep(Duration::from_millis(50));
885
886 let mut client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
887 let mut client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
888
889 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
891 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
892
893 let payload1: Vec<u8> = (0..24).collect();
895 let payload2: Vec<u8> = (100..130).collect();
896 client1.write_all(&hdlc::frame(&payload1)).unwrap();
897 client2.write_all(&hdlc::frame(&payload2)).unwrap();
898
899 let mut received = Vec::new();
901 for _ in 0..2 {
902 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
903 match event {
904 Event::Frame { data, .. } => received.push(data),
905 other => panic!("expected Frame, got {:?}", other),
906 }
907 }
908 assert!(received.contains(&payload1));
909 assert!(received.contains(&payload2));
910 }
911
912 #[test]
913 fn backbone_bind_port() {
914 let port = find_free_port();
915 let (tx, _rx) = mpsc::channel();
916 let next_id = Arc::new(AtomicU64::new(8600));
917
918 let config = BackboneConfig {
919 name: "test-backbone".into(),
920 listen_ip: "127.0.0.1".into(),
921 listen_port: port,
922 interface_id: InterfaceId(86),
923 };
924
925 start(config, tx, next_id).unwrap();
927 }
928
929 #[test]
930 fn backbone_hdlc_fragmented() {
931 let port = find_free_port();
932 let (tx, rx) = mpsc::channel();
933 let next_id = Arc::new(AtomicU64::new(8700));
934
935 let config = BackboneConfig {
936 name: "test-backbone".into(),
937 listen_ip: "127.0.0.1".into(),
938 listen_port: port,
939 interface_id: InterfaceId(87),
940 };
941
942 start(config, tx, next_id).unwrap();
943 thread::sleep(Duration::from_millis(50));
944
945 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
946 client.set_nodelay(true).unwrap();
947
948 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
950
951 let payload: Vec<u8> = (0..32).collect();
953 let framed = hdlc::frame(&payload);
954 let mid = framed.len() / 2;
955
956 client.write_all(&framed[..mid]).unwrap();
957 thread::sleep(Duration::from_millis(50));
958 client.write_all(&framed[mid..]).unwrap();
959
960 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
962 match event {
963 Event::Frame { data, .. } => {
964 assert_eq!(data, payload);
965 }
966 other => panic!("expected Frame, got {:?}", other),
967 }
968 }
969
970 fn make_client_config(port: u16, id: u64) -> BackboneClientConfig {
975 BackboneClientConfig {
976 name: format!("test-bb-client-{}", port),
977 target_host: "127.0.0.1".into(),
978 target_port: port,
979 interface_id: InterfaceId(id),
980 reconnect_wait: Duration::from_millis(100),
981 max_reconnect_tries: Some(2),
982 connect_timeout: Duration::from_secs(2),
983 transport_identity: None,
984 }
985 }
986
987 #[test]
988 fn backbone_client_connect() {
989 let port = find_free_port();
990 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
991 let (tx, rx) = mpsc::channel();
992
993 let config = make_client_config(port, 9000);
994 let _writer = start_client(config, tx).unwrap();
995
996 let _server_stream = listener.accept().unwrap();
997
998 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
999 assert!(matches!(event, Event::InterfaceUp(InterfaceId(9000), _, _)));
1000 }
1001
1002 #[test]
1003 fn backbone_client_receive_frame() {
1004 let port = find_free_port();
1005 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1006 let (tx, rx) = mpsc::channel();
1007
1008 let config = make_client_config(port, 9100);
1009 let _writer = start_client(config, tx).unwrap();
1010
1011 let (mut server_stream, _) = listener.accept().unwrap();
1012
1013 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1015
1016 let payload: Vec<u8> = (0..32).collect();
1018 server_stream.write_all(&hdlc::frame(&payload)).unwrap();
1019
1020 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1021 match event {
1022 Event::Frame { interface_id, data } => {
1023 assert_eq!(interface_id, InterfaceId(9100));
1024 assert_eq!(data, payload);
1025 }
1026 other => panic!("expected Frame, got {:?}", other),
1027 }
1028 }
1029
1030 #[test]
1031 fn backbone_client_send_frame() {
1032 let port = find_free_port();
1033 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1034 let (tx, _rx) = mpsc::channel();
1035
1036 let config = make_client_config(port, 9200);
1037 let mut writer = start_client(config, tx).unwrap();
1038
1039 let (mut server_stream, _) = listener.accept().unwrap();
1040 server_stream
1041 .set_read_timeout(Some(Duration::from_secs(2)))
1042 .unwrap();
1043
1044 let payload: Vec<u8> = (0..24).collect();
1045 writer.send_frame(&payload).unwrap();
1046
1047 let mut buf = [0u8; 256];
1048 let n = server_stream.read(&mut buf).unwrap();
1049 let expected = hdlc::frame(&payload);
1050 assert_eq!(&buf[..n], &expected[..]);
1051 }
1052
1053 #[test]
1054 fn backbone_client_reconnect() {
1055 let port = find_free_port();
1056 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1057 listener.set_nonblocking(false).unwrap();
1058 let (tx, rx) = mpsc::channel();
1059
1060 let config = make_client_config(port, 9300);
1061 let _writer = start_client(config, tx).unwrap();
1062
1063 let (server_stream, _) = listener.accept().unwrap();
1065
1066 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1068
1069 drop(server_stream);
1070
1071 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1073 assert!(matches!(event, Event::InterfaceDown(InterfaceId(9300))));
1074
1075 let _server_stream2 = listener.accept().unwrap();
1077
1078 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1080 assert!(matches!(event, Event::InterfaceUp(InterfaceId(9300), _, _)));
1081 }
1082}