1use std::collections::HashMap;
13use std::io::{self, Read, Write};
14use std::net::{TcpListener, TcpStream, ToSocketAddrs};
15use std::os::unix::io::{AsRawFd, RawFd};
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::sync::Arc;
18use std::thread;
19use std::time::Duration;
20
21use rns_core::constants;
22use rns_core::transport::types::{InterfaceId, InterfaceInfo};
23
24use crate::event::{Event, EventSender};
25use crate::hdlc;
26use crate::interface::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult, Writer};
27
28#[allow(dead_code)]
30const HW_MTU: usize = 1_048_576;
31
32#[derive(Debug, Clone)]
34pub struct BackboneConfig {
35 pub name: String,
36 pub listen_ip: String,
37 pub listen_port: u16,
38 pub interface_id: InterfaceId,
39}
40
41impl Default for BackboneConfig {
42 fn default() -> Self {
43 BackboneConfig {
44 name: String::new(),
45 listen_ip: "0.0.0.0".into(),
46 listen_port: 0,
47 interface_id: InterfaceId(0),
48 }
49 }
50}
51
52struct BackboneWriter {
54 fd: RawFd,
55}
56
57impl Writer for BackboneWriter {
58 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
59 let framed = hdlc::frame(data);
60 let mut offset = 0;
61 while offset < framed.len() {
62 let n = unsafe {
63 libc::send(
64 self.fd,
65 framed[offset..].as_ptr() as *const libc::c_void,
66 framed.len() - offset,
67 libc::MSG_NOSIGNAL,
68 )
69 };
70 if n < 0 {
71 return Err(io::Error::last_os_error());
72 }
73 offset += n as usize;
74 }
75 Ok(())
76 }
77}
78
79impl Drop for BackboneWriter {
81 fn drop(&mut self) {
82 unsafe {
83 libc::close(self.fd);
84 }
85 }
86}
87
88unsafe impl Send for BackboneWriter {}
90
91pub fn start(
93 config: BackboneConfig,
94 tx: EventSender,
95 next_id: Arc<AtomicU64>,
96) -> io::Result<()> {
97 let addr = format!("{}:{}", config.listen_ip, config.listen_port);
98 let listener = TcpListener::bind(&addr)?;
99 listener.set_nonblocking(true)?;
100
101 log::info!(
102 "[{}] backbone server listening on {}",
103 config.name,
104 listener.local_addr().unwrap_or(addr.parse().unwrap())
105 );
106
107 let name = config.name.clone();
108 thread::Builder::new()
109 .name(format!("backbone-epoll-{}", config.interface_id.0))
110 .spawn(move || {
111 if let Err(e) = epoll_loop(listener, name, tx, next_id) {
112 log::error!("backbone epoll loop error: {}", e);
113 }
114 })?;
115
116 Ok(())
117}
118
119struct ClientState {
121 id: InterfaceId,
122 decoder: hdlc::Decoder,
123}
124
125fn epoll_loop(
127 listener: TcpListener,
128 name: String,
129 tx: EventSender,
130 next_id: Arc<AtomicU64>,
131) -> io::Result<()> {
132 let epfd = unsafe { libc::epoll_create1(0) };
133 if epfd < 0 {
134 return Err(io::Error::last_os_error());
135 }
136
137 let listener_fd = listener.as_raw_fd();
139 let mut ev = libc::epoll_event {
140 events: libc::EPOLLIN as u32,
141 u64: listener_fd as u64,
142 };
143 if unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, listener_fd, &mut ev) } < 0 {
144 unsafe { libc::close(epfd) };
145 return Err(io::Error::last_os_error());
146 }
147
148 let mut clients: HashMap<RawFd, ClientState> = HashMap::new();
149 let mut events = vec![libc::epoll_event { events: 0, u64: 0 }; 64];
150
151 loop {
152 let nfds = unsafe {
153 libc::epoll_wait(epfd, events.as_mut_ptr(), events.len() as i32, 1000)
154 };
155
156 if nfds < 0 {
157 let err = io::Error::last_os_error();
158 if err.kind() == io::ErrorKind::Interrupted {
159 continue;
160 }
161 for (&fd, _) in &clients {
163 unsafe {
164 libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
165 libc::close(fd);
166 }
167 }
168 unsafe { libc::close(epfd) };
169 return Err(err);
170 }
171
172 for i in 0..nfds as usize {
173 let ev = &events[i];
174 let fd = ev.u64 as RawFd;
175
176 if fd == listener_fd {
177 loop {
179 match listener.accept() {
180 Ok((stream, peer_addr)) => {
181 let client_fd = stream.as_raw_fd();
182
183 stream.set_nonblocking(true).ok();
185 stream.set_nodelay(true).ok();
186
187 set_tcp_keepalive(client_fd);
189
190 let client_id =
191 InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
192
193 log::info!(
194 "[{}] backbone client connected: {} → id {}",
195 name,
196 peer_addr,
197 client_id.0
198 );
199
200 let mut cev = libc::epoll_event {
202 events: libc::EPOLLIN as u32,
203 u64: client_fd as u64,
204 };
205 if unsafe {
206 libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, client_fd, &mut cev)
207 } < 0
208 {
209 log::warn!(
210 "[{}] failed to add client to epoll: {}",
211 name,
212 io::Error::last_os_error()
213 );
214 continue;
216 }
217
218 std::mem::forget(stream);
221
222 let writer_fd = unsafe { libc::dup(client_fd) };
224 if writer_fd < 0 {
225 log::warn!("[{}] failed to dup client fd", name);
226 unsafe {
227 libc::epoll_ctl(
228 epfd,
229 libc::EPOLL_CTL_DEL,
230 client_fd,
231 std::ptr::null_mut(),
232 );
233 libc::close(client_fd);
234 }
235 continue;
236 }
237 let writer: Box<dyn Writer> =
238 Box::new(BackboneWriter { fd: writer_fd });
239
240 clients.insert(
241 client_fd,
242 ClientState {
243 id: client_id,
244 decoder: hdlc::Decoder::new(),
245 },
246 );
247
248 let info = InterfaceInfo {
249 id: client_id,
250 name: format!("BackboneInterface/{}", client_fd),
251 mode: constants::MODE_FULL,
252 out_capable: true,
253 in_capable: true,
254 bitrate: Some(1_000_000_000), announce_rate_target: None,
256 announce_rate_grace: 0,
257 announce_rate_penalty: 0.0,
258 announce_cap: constants::ANNOUNCE_CAP,
259 is_local_client: false,
260 wants_tunnel: false,
261 tunnel_id: None,
262 mtu: 65535,
263 ia_freq: 0.0,
264 started: 0.0,
265 ingress_control: true,
266 };
267
268 if tx
269 .send(Event::InterfaceUp(
270 client_id,
271 Some(writer),
272 Some(info),
273 ))
274 .is_err()
275 {
276 cleanup(epfd, &clients, listener_fd);
278 return Ok(());
279 }
280
281 }
282 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
283 Err(e) => {
284 log::warn!("[{}] accept error: {}", name, e);
285 break;
286 }
287 }
288 }
289 } else if clients.contains_key(&fd) {
290 let mut should_remove = false;
292 let mut client_id = InterfaceId(0);
293
294 if ev.events & libc::EPOLLIN as u32 != 0 {
295 let mut buf = [0u8; 4096];
296 let n = unsafe {
297 libc::recv(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len(), 0)
298 };
299
300 if n <= 0 {
301 if let Some(c) = clients.get(&fd) {
302 client_id = c.id;
303 }
304 should_remove = true;
305 } else if let Some(client) = clients.get_mut(&fd) {
306 client_id = client.id;
307 for frame in client.decoder.feed(&buf[..n as usize]) {
308 if tx
309 .send(Event::Frame {
310 interface_id: client_id,
311 data: frame,
312 })
313 .is_err()
314 {
315 cleanup(epfd, &clients, listener_fd);
316 return Ok(());
317 }
318 }
319 }
320 }
321
322 if ev.events & (libc::EPOLLHUP | libc::EPOLLERR) as u32 != 0 {
323 if let Some(c) = clients.get(&fd) {
324 client_id = c.id;
325 }
326 should_remove = true;
327 }
328
329 if should_remove {
330 log::info!(
331 "[{}] backbone client {} disconnected",
332 name,
333 client_id.0
334 );
335 unsafe {
336 libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
337 libc::close(fd);
338 }
339 clients.remove(&fd);
340 let _ = tx.send(Event::InterfaceDown(client_id));
341 }
342 }
343 }
344 }
345}
346
347fn set_tcp_keepalive(fd: RawFd) {
348 unsafe {
349 let one: libc::c_int = 1;
350 libc::setsockopt(
351 fd,
352 libc::SOL_SOCKET,
353 libc::SO_KEEPALIVE,
354 &one as *const _ as *const libc::c_void,
355 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
356 );
357 let idle: libc::c_int = 5;
358 libc::setsockopt(
359 fd,
360 libc::IPPROTO_TCP,
361 libc::TCP_KEEPIDLE,
362 &idle as *const _ as *const libc::c_void,
363 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
364 );
365 let interval: libc::c_int = 2;
366 libc::setsockopt(
367 fd,
368 libc::IPPROTO_TCP,
369 libc::TCP_KEEPINTVL,
370 &interval as *const _ as *const libc::c_void,
371 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
372 );
373 let cnt: libc::c_int = 12;
374 libc::setsockopt(
375 fd,
376 libc::IPPROTO_TCP,
377 libc::TCP_KEEPCNT,
378 &cnt as *const _ as *const libc::c_void,
379 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
380 );
381 }
382}
383
384fn cleanup(epfd: RawFd, clients: &HashMap<RawFd, ClientState>, listener_fd: RawFd) {
385 for (&fd, _) in clients {
386 unsafe {
387 libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
388 libc::close(fd);
389 }
390 }
391 unsafe {
392 libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, listener_fd, std::ptr::null_mut());
393 libc::close(epfd);
394 }
395}
396
397#[derive(Debug, Clone)]
403pub struct BackboneClientConfig {
404 pub name: String,
405 pub target_host: String,
406 pub target_port: u16,
407 pub interface_id: InterfaceId,
408 pub reconnect_wait: Duration,
409 pub max_reconnect_tries: Option<u32>,
410 pub connect_timeout: Duration,
411 pub transport_identity: Option<String>,
412}
413
414impl Default for BackboneClientConfig {
415 fn default() -> Self {
416 BackboneClientConfig {
417 name: String::new(),
418 target_host: "127.0.0.1".into(),
419 target_port: 4242,
420 interface_id: InterfaceId(0),
421 reconnect_wait: Duration::from_secs(5),
422 max_reconnect_tries: None,
423 connect_timeout: Duration::from_secs(5),
424 transport_identity: None,
425 }
426 }
427}
428
429struct BackboneClientWriter {
431 stream: TcpStream,
432}
433
434impl Writer for BackboneClientWriter {
435 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
436 self.stream.write_all(&hdlc::frame(data))
437 }
438}
439
440fn try_connect_client(config: &BackboneClientConfig) -> io::Result<TcpStream> {
442 let addr_str = format!("{}:{}", config.target_host, config.target_port);
443 let addr = addr_str
444 .to_socket_addrs()?
445 .next()
446 .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "no addresses resolved"))?;
447
448 let stream = TcpStream::connect_timeout(&addr, config.connect_timeout)?;
449 stream.set_nodelay(true)?;
450 set_tcp_keepalive(stream.as_raw_fd());
451 Ok(stream)
452}
453
454pub fn start_client(
456 config: BackboneClientConfig,
457 tx: EventSender,
458) -> io::Result<Box<dyn Writer>> {
459 let stream = try_connect_client(&config)?;
460 let reader_stream = stream.try_clone()?;
461 let writer_stream = stream.try_clone()?;
462
463 let id = config.interface_id;
464 log::info!(
465 "[{}] backbone client connected to {}:{}",
466 config.name,
467 config.target_host,
468 config.target_port
469 );
470
471 let _ = tx.send(Event::InterfaceUp(id, None, None));
473
474 thread::Builder::new()
475 .name(format!("backbone-client-{}", id.0))
476 .spawn(move || {
477 client_reader_loop(reader_stream, config, tx);
478 })?;
479
480 Ok(Box::new(BackboneClientWriter { stream: writer_stream }))
481}
482
483fn client_reader_loop(mut stream: TcpStream, config: BackboneClientConfig, tx: EventSender) {
486 let id = config.interface_id;
487 let mut decoder = hdlc::Decoder::new();
488 let mut buf = [0u8; 4096];
489
490 loop {
491 match stream.read(&mut buf) {
492 Ok(0) => {
493 log::warn!("[{}] connection closed", config.name);
494 let _ = tx.send(Event::InterfaceDown(id));
495 match client_reconnect(&config, &tx) {
496 Some(new_stream) => {
497 stream = new_stream;
498 decoder = hdlc::Decoder::new();
499 continue;
500 }
501 None => {
502 log::error!("[{}] reconnection failed, giving up", config.name);
503 return;
504 }
505 }
506 }
507 Ok(n) => {
508 for frame in decoder.feed(&buf[..n]) {
509 if tx
510 .send(Event::Frame {
511 interface_id: id,
512 data: frame,
513 })
514 .is_err()
515 {
516 return;
517 }
518 }
519 }
520 Err(e) => {
521 log::warn!("[{}] read error: {}", config.name, e);
522 let _ = tx.send(Event::InterfaceDown(id));
523 match client_reconnect(&config, &tx) {
524 Some(new_stream) => {
525 stream = new_stream;
526 decoder = hdlc::Decoder::new();
527 continue;
528 }
529 None => {
530 log::error!("[{}] reconnection failed, giving up", config.name);
531 return;
532 }
533 }
534 }
535 }
536 }
537}
538
539fn client_reconnect(config: &BackboneClientConfig, tx: &EventSender) -> Option<TcpStream> {
542 let mut attempts = 0u32;
543 loop {
544 thread::sleep(config.reconnect_wait);
545 attempts += 1;
546
547 if let Some(max) = config.max_reconnect_tries {
548 if attempts > max {
549 let _ = tx.send(Event::InterfaceDown(config.interface_id));
550 return None;
551 }
552 }
553
554 log::info!(
555 "[{}] reconnect attempt {} ...",
556 config.name,
557 attempts
558 );
559
560 match try_connect_client(config) {
561 Ok(new_stream) => {
562 let writer_stream = match new_stream.try_clone() {
563 Ok(s) => s,
564 Err(e) => {
565 log::warn!("[{}] failed to clone stream: {}", config.name, e);
566 continue;
567 }
568 };
569 log::info!("[{}] reconnected", config.name);
570 let new_writer: Box<dyn Writer> =
571 Box::new(BackboneClientWriter { stream: writer_stream });
572 let _ = tx.send(Event::InterfaceUp(config.interface_id, Some(new_writer), None));
573 return Some(new_stream);
574 }
575 Err(e) => {
576 log::warn!("[{}] reconnect failed: {}", config.name, e);
577 }
578 }
579 }
580}
581
582enum BackboneMode {
589 Server(BackboneConfig),
590 Client(BackboneClientConfig),
591}
592
593pub struct BackboneInterfaceFactory;
599
600impl InterfaceFactory for BackboneInterfaceFactory {
601 fn type_name(&self) -> &str { "BackboneInterface" }
602
603 fn parse_config(
604 &self,
605 name: &str,
606 id: InterfaceId,
607 params: &HashMap<String, String>,
608 ) -> Result<Box<dyn InterfaceConfigData>, String> {
609 if let Some(target_host) = params.get("remote").or_else(|| params.get("target_host")) {
610 let target_host = target_host.clone();
612 let target_port = params.get("target_port")
613 .or_else(|| params.get("port"))
614 .and_then(|v| v.parse().ok())
615 .unwrap_or(4242);
616 let transport_identity = params.get("transport_identity").cloned();
617 Ok(Box::new(BackboneMode::Client(BackboneClientConfig {
618 name: name.to_string(),
619 target_host,
620 target_port,
621 interface_id: id,
622 transport_identity,
623 ..BackboneClientConfig::default()
624 })))
625 } else {
626 let listen_ip = params.get("listen_ip")
628 .or_else(|| params.get("device"))
629 .cloned()
630 .unwrap_or_else(|| "0.0.0.0".into());
631 let listen_port = params.get("listen_port")
632 .or_else(|| params.get("port"))
633 .and_then(|v| v.parse().ok())
634 .unwrap_or(4242);
635 Ok(Box::new(BackboneMode::Server(BackboneConfig {
636 name: name.to_string(),
637 listen_ip,
638 listen_port,
639 interface_id: id,
640 })))
641 }
642 }
643
644 fn start(
645 &self,
646 config: Box<dyn InterfaceConfigData>,
647 ctx: StartContext,
648 ) -> io::Result<StartResult> {
649 let mode = *config.into_any()
650 .downcast::<BackboneMode>()
651 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type for BackboneInterface"))?;
652
653 match mode {
654 BackboneMode::Client(cfg) => {
655 let id = cfg.interface_id;
656 let name = cfg.name.clone();
657 let info = InterfaceInfo {
658 id,
659 name,
660 mode: ctx.mode,
661 out_capable: true,
662 in_capable: true,
663 bitrate: Some(1_000_000_000),
664 announce_rate_target: None,
665 announce_rate_grace: 0,
666 announce_rate_penalty: 0.0,
667 announce_cap: constants::ANNOUNCE_CAP,
668 is_local_client: false,
669 wants_tunnel: false,
670 tunnel_id: None,
671 mtu: 65535,
672 ingress_control: true,
673 ia_freq: 0.0,
674 started: crate::time::now(),
675 };
676 let writer = start_client(cfg, ctx.tx)?;
677 Ok(StartResult::Simple {
678 id,
679 info,
680 writer,
681 interface_type_name: "BackboneInterface".to_string(),
682 })
683 }
684 BackboneMode::Server(cfg) => {
685 start(cfg, ctx.tx, ctx.next_dynamic_id)?;
686 Ok(StartResult::Listener)
687 }
688 }
689 }
690}
691
692#[cfg(test)]
693mod tests {
694 use super::*;
695 use std::sync::mpsc;
696 use std::time::Duration;
697
698 fn find_free_port() -> u16 {
699 TcpListener::bind("127.0.0.1:0")
700 .unwrap()
701 .local_addr()
702 .unwrap()
703 .port()
704 }
705
706 #[test]
707 fn backbone_accept_connection() {
708 let port = find_free_port();
709 let (tx, rx) = mpsc::channel();
710 let next_id = Arc::new(AtomicU64::new(8000));
711
712 let config = BackboneConfig {
713 name: "test-backbone".into(),
714 listen_ip: "127.0.0.1".into(),
715 listen_port: port,
716 interface_id: InterfaceId(80),
717 };
718
719 start(config, tx, next_id).unwrap();
720 thread::sleep(Duration::from_millis(50));
721
722 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
723
724 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
725 match event {
726 Event::InterfaceUp(id, writer, info) => {
727 assert_eq!(id, InterfaceId(8000));
728 assert!(writer.is_some());
729 assert!(info.is_some());
730 let info = info.unwrap();
731 assert!(info.out_capable);
732 assert!(info.in_capable);
733 }
734 other => panic!("expected InterfaceUp, got {:?}", other),
735 }
736 }
737
738 #[test]
739 fn backbone_receive_frame() {
740 let port = find_free_port();
741 let (tx, rx) = mpsc::channel();
742 let next_id = Arc::new(AtomicU64::new(8100));
743
744 let config = BackboneConfig {
745 name: "test-backbone".into(),
746 listen_ip: "127.0.0.1".into(),
747 listen_port: port,
748 interface_id: InterfaceId(81),
749 };
750
751 start(config, tx, next_id).unwrap();
752 thread::sleep(Duration::from_millis(50));
753
754 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
755
756 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
758
759 let payload: Vec<u8> = (0..32).collect();
761 client.write_all(&hdlc::frame(&payload)).unwrap();
762
763 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
764 match event {
765 Event::Frame { interface_id, data } => {
766 assert_eq!(interface_id, InterfaceId(8100));
767 assert_eq!(data, payload);
768 }
769 other => panic!("expected Frame, got {:?}", other),
770 }
771 }
772
773 #[test]
774 fn backbone_send_to_client() {
775 let port = find_free_port();
776 let (tx, rx) = mpsc::channel();
777 let next_id = Arc::new(AtomicU64::new(8200));
778
779 let config = BackboneConfig {
780 name: "test-backbone".into(),
781 listen_ip: "127.0.0.1".into(),
782 listen_port: port,
783 interface_id: InterfaceId(82),
784 };
785
786 start(config, tx, next_id).unwrap();
787 thread::sleep(Duration::from_millis(50));
788
789 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
790 client.set_read_timeout(Some(Duration::from_secs(2))).unwrap();
791
792 let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
794 let mut writer = match event {
795 Event::InterfaceUp(_, Some(w), _) => w,
796 other => panic!("expected InterfaceUp with writer, got {:?}", other),
797 };
798
799 let payload: Vec<u8> = (0..24).collect();
801 writer.send_frame(&payload).unwrap();
802
803 let mut buf = [0u8; 256];
805 let n = client.read(&mut buf).unwrap();
806 let expected = hdlc::frame(&payload);
807 assert_eq!(&buf[..n], &expected[..]);
808 }
809
810 #[test]
811 fn backbone_multiple_clients() {
812 let port = find_free_port();
813 let (tx, rx) = mpsc::channel();
814 let next_id = Arc::new(AtomicU64::new(8300));
815
816 let config = BackboneConfig {
817 name: "test-backbone".into(),
818 listen_ip: "127.0.0.1".into(),
819 listen_port: port,
820 interface_id: InterfaceId(83),
821 };
822
823 start(config, tx, next_id).unwrap();
824 thread::sleep(Duration::from_millis(50));
825
826 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
827 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
828
829 let mut ids = Vec::new();
830 for _ in 0..2 {
831 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
832 match event {
833 Event::InterfaceUp(id, _, _) => ids.push(id),
834 other => panic!("expected InterfaceUp, got {:?}", other),
835 }
836 }
837
838 assert_eq!(ids.len(), 2);
839 assert_ne!(ids[0], ids[1]);
840 }
841
842 #[test]
843 fn backbone_client_disconnect() {
844 let port = find_free_port();
845 let (tx, rx) = mpsc::channel();
846 let next_id = Arc::new(AtomicU64::new(8400));
847
848 let config = BackboneConfig {
849 name: "test-backbone".into(),
850 listen_ip: "127.0.0.1".into(),
851 listen_port: port,
852 interface_id: InterfaceId(84),
853 };
854
855 start(config, tx, next_id).unwrap();
856 thread::sleep(Duration::from_millis(50));
857
858 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
859
860 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
862
863 drop(client);
865
866 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
868 assert!(
869 matches!(event, Event::InterfaceDown(InterfaceId(8400))),
870 "expected InterfaceDown(8400), got {:?}",
871 event
872 );
873 }
874
875 #[test]
876 fn backbone_epoll_multiplexing() {
877 let port = find_free_port();
878 let (tx, rx) = mpsc::channel();
879 let next_id = Arc::new(AtomicU64::new(8500));
880
881 let config = BackboneConfig {
882 name: "test-backbone".into(),
883 listen_ip: "127.0.0.1".into(),
884 listen_port: port,
885 interface_id: InterfaceId(85),
886 };
887
888 start(config, tx, next_id).unwrap();
889 thread::sleep(Duration::from_millis(50));
890
891 let mut client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
892 let mut client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
893
894 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
896 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
897
898 let payload1: Vec<u8> = (0..24).collect();
900 let payload2: Vec<u8> = (100..130).collect();
901 client1.write_all(&hdlc::frame(&payload1)).unwrap();
902 client2.write_all(&hdlc::frame(&payload2)).unwrap();
903
904 let mut received = Vec::new();
906 for _ in 0..2 {
907 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
908 match event {
909 Event::Frame { data, .. } => received.push(data),
910 other => panic!("expected Frame, got {:?}", other),
911 }
912 }
913 assert!(received.contains(&payload1));
914 assert!(received.contains(&payload2));
915 }
916
917 #[test]
918 fn backbone_bind_port() {
919 let port = find_free_port();
920 let (tx, _rx) = mpsc::channel();
921 let next_id = Arc::new(AtomicU64::new(8600));
922
923 let config = BackboneConfig {
924 name: "test-backbone".into(),
925 listen_ip: "127.0.0.1".into(),
926 listen_port: port,
927 interface_id: InterfaceId(86),
928 };
929
930 start(config, tx, next_id).unwrap();
932 }
933
934 #[test]
935 fn backbone_hdlc_fragmented() {
936 let port = find_free_port();
937 let (tx, rx) = mpsc::channel();
938 let next_id = Arc::new(AtomicU64::new(8700));
939
940 let config = BackboneConfig {
941 name: "test-backbone".into(),
942 listen_ip: "127.0.0.1".into(),
943 listen_port: port,
944 interface_id: InterfaceId(87),
945 };
946
947 start(config, tx, next_id).unwrap();
948 thread::sleep(Duration::from_millis(50));
949
950 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
951 client.set_nodelay(true).unwrap();
952
953 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
955
956 let payload: Vec<u8> = (0..32).collect();
958 let framed = hdlc::frame(&payload);
959 let mid = framed.len() / 2;
960
961 client.write_all(&framed[..mid]).unwrap();
962 thread::sleep(Duration::from_millis(50));
963 client.write_all(&framed[mid..]).unwrap();
964
965 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
967 match event {
968 Event::Frame { data, .. } => {
969 assert_eq!(data, payload);
970 }
971 other => panic!("expected Frame, got {:?}", other),
972 }
973 }
974
975 fn make_client_config(port: u16, id: u64) -> BackboneClientConfig {
980 BackboneClientConfig {
981 name: format!("test-bb-client-{}", port),
982 target_host: "127.0.0.1".into(),
983 target_port: port,
984 interface_id: InterfaceId(id),
985 reconnect_wait: Duration::from_millis(100),
986 max_reconnect_tries: Some(2),
987 connect_timeout: Duration::from_secs(2),
988 transport_identity: None,
989 }
990 }
991
992 #[test]
993 fn backbone_client_connect() {
994 let port = find_free_port();
995 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
996 let (tx, rx) = mpsc::channel();
997
998 let config = make_client_config(port, 9000);
999 let _writer = start_client(config, tx).unwrap();
1000
1001 let _server_stream = listener.accept().unwrap();
1002
1003 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1004 assert!(matches!(event, Event::InterfaceUp(InterfaceId(9000), _, _)));
1005 }
1006
1007 #[test]
1008 fn backbone_client_receive_frame() {
1009 let port = find_free_port();
1010 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1011 let (tx, rx) = mpsc::channel();
1012
1013 let config = make_client_config(port, 9100);
1014 let _writer = start_client(config, tx).unwrap();
1015
1016 let (mut server_stream, _) = listener.accept().unwrap();
1017
1018 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1020
1021 let payload: Vec<u8> = (0..32).collect();
1023 server_stream.write_all(&hdlc::frame(&payload)).unwrap();
1024
1025 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1026 match event {
1027 Event::Frame { interface_id, data } => {
1028 assert_eq!(interface_id, InterfaceId(9100));
1029 assert_eq!(data, payload);
1030 }
1031 other => panic!("expected Frame, got {:?}", other),
1032 }
1033 }
1034
1035 #[test]
1036 fn backbone_client_send_frame() {
1037 let port = find_free_port();
1038 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1039 let (tx, _rx) = mpsc::channel();
1040
1041 let config = make_client_config(port, 9200);
1042 let mut writer = start_client(config, tx).unwrap();
1043
1044 let (mut server_stream, _) = listener.accept().unwrap();
1045 server_stream
1046 .set_read_timeout(Some(Duration::from_secs(2)))
1047 .unwrap();
1048
1049 let payload: Vec<u8> = (0..24).collect();
1050 writer.send_frame(&payload).unwrap();
1051
1052 let mut buf = [0u8; 256];
1053 let n = server_stream.read(&mut buf).unwrap();
1054 let expected = hdlc::frame(&payload);
1055 assert_eq!(&buf[..n], &expected[..]);
1056 }
1057
1058 #[test]
1059 fn backbone_client_reconnect() {
1060 let port = find_free_port();
1061 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
1062 listener.set_nonblocking(false).unwrap();
1063 let (tx, rx) = mpsc::channel();
1064
1065 let config = make_client_config(port, 9300);
1066 let _writer = start_client(config, tx).unwrap();
1067
1068 let (server_stream, _) = listener.accept().unwrap();
1070
1071 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
1073
1074 drop(server_stream);
1075
1076 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1078 assert!(matches!(event, Event::InterfaceDown(InterfaceId(9300))));
1079
1080 let _server_stream2 = listener.accept().unwrap();
1082
1083 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
1085 assert!(matches!(event, Event::InterfaceUp(InterfaceId(9300), _, _)));
1086 }
1087}