1use std::collections::HashMap;
13use std::io::{self, Read, Write};
14use std::net::{TcpListener, TcpStream, ToSocketAddrs};
15use std::os::unix::io::{AsRawFd, RawFd};
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::sync::Arc;
18use std::thread;
19use std::time::Duration;
20
21use rns_core::constants;
22use rns_core::transport::types::{InterfaceId, InterfaceInfo};
23
24use crate::event::{Event, EventSender};
25use crate::hdlc;
26use crate::interface::Writer;
27
28#[allow(dead_code)]
30const HW_MTU: usize = 1_048_576;
31
32#[derive(Debug, Clone)]
34pub struct BackboneConfig {
35 pub name: String,
36 pub listen_ip: String,
37 pub listen_port: u16,
38 pub interface_id: InterfaceId,
39}
40
41impl Default for BackboneConfig {
42 fn default() -> Self {
43 BackboneConfig {
44 name: String::new(),
45 listen_ip: "0.0.0.0".into(),
46 listen_port: 0,
47 interface_id: InterfaceId(0),
48 }
49 }
50}
51
52struct BackboneWriter {
54 fd: RawFd,
55}
56
57impl Writer for BackboneWriter {
58 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
59 let framed = hdlc::frame(data);
60 let mut offset = 0;
61 while offset < framed.len() {
62 let n = unsafe {
63 libc::send(
64 self.fd,
65 framed[offset..].as_ptr() as *const libc::c_void,
66 framed.len() - offset,
67 libc::MSG_NOSIGNAL,
68 )
69 };
70 if n < 0 {
71 return Err(io::Error::last_os_error());
72 }
73 offset += n as usize;
74 }
75 Ok(())
76 }
77}
78
79impl Drop for BackboneWriter {
81 fn drop(&mut self) {
82 unsafe {
83 libc::close(self.fd);
84 }
85 }
86}
87
88unsafe impl Send for BackboneWriter {}
90
91pub fn start(
93 config: BackboneConfig,
94 tx: EventSender,
95 next_id: Arc<AtomicU64>,
96) -> io::Result<()> {
97 let addr = format!("{}:{}", config.listen_ip, config.listen_port);
98 let listener = TcpListener::bind(&addr)?;
99 listener.set_nonblocking(true)?;
100
101 log::info!(
102 "[{}] backbone server listening on {}",
103 config.name,
104 listener.local_addr().unwrap_or(addr.parse().unwrap())
105 );
106
107 let name = config.name.clone();
108 thread::Builder::new()
109 .name(format!("backbone-epoll-{}", config.interface_id.0))
110 .spawn(move || {
111 if let Err(e) = epoll_loop(listener, name, tx, next_id) {
112 log::error!("backbone epoll loop error: {}", e);
113 }
114 })?;
115
116 Ok(())
117}
118
119struct ClientState {
121 id: InterfaceId,
122 decoder: hdlc::Decoder,
123}
124
125fn epoll_loop(
127 listener: TcpListener,
128 name: String,
129 tx: EventSender,
130 next_id: Arc<AtomicU64>,
131) -> io::Result<()> {
132 let epfd = unsafe { libc::epoll_create1(0) };
133 if epfd < 0 {
134 return Err(io::Error::last_os_error());
135 }
136
137 let listener_fd = listener.as_raw_fd();
139 let mut ev = libc::epoll_event {
140 events: libc::EPOLLIN as u32,
141 u64: listener_fd as u64,
142 };
143 if unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, listener_fd, &mut ev) } < 0 {
144 unsafe { libc::close(epfd) };
145 return Err(io::Error::last_os_error());
146 }
147
148 let mut clients: HashMap<RawFd, ClientState> = HashMap::new();
149 let mut events = vec![libc::epoll_event { events: 0, u64: 0 }; 64];
150
151 loop {
152 let nfds = unsafe {
153 libc::epoll_wait(epfd, events.as_mut_ptr(), events.len() as i32, 1000)
154 };
155
156 if nfds < 0 {
157 let err = io::Error::last_os_error();
158 if err.kind() == io::ErrorKind::Interrupted {
159 continue;
160 }
161 for (&fd, _) in &clients {
163 unsafe {
164 libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
165 libc::close(fd);
166 }
167 }
168 unsafe { libc::close(epfd) };
169 return Err(err);
170 }
171
172 for i in 0..nfds as usize {
173 let ev = &events[i];
174 let fd = ev.u64 as RawFd;
175
176 if fd == listener_fd {
177 loop {
179 match listener.accept() {
180 Ok((stream, peer_addr)) => {
181 let client_fd = stream.as_raw_fd();
182
183 stream.set_nonblocking(true).ok();
185 stream.set_nodelay(true).ok();
186
187 set_tcp_keepalive(client_fd);
189
190 let client_id =
191 InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
192
193 log::info!(
194 "[{}] backbone client connected: {} → id {}",
195 name,
196 peer_addr,
197 client_id.0
198 );
199
200 let mut cev = libc::epoll_event {
202 events: libc::EPOLLIN as u32,
203 u64: client_fd as u64,
204 };
205 if unsafe {
206 libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, client_fd, &mut cev)
207 } < 0
208 {
209 log::warn!(
210 "[{}] failed to add client to epoll: {}",
211 name,
212 io::Error::last_os_error()
213 );
214 continue;
216 }
217
218 std::mem::forget(stream);
221
222 let writer_fd = unsafe { libc::dup(client_fd) };
224 if writer_fd < 0 {
225 log::warn!("[{}] failed to dup client fd", name);
226 unsafe {
227 libc::epoll_ctl(
228 epfd,
229 libc::EPOLL_CTL_DEL,
230 client_fd,
231 std::ptr::null_mut(),
232 );
233 libc::close(client_fd);
234 }
235 continue;
236 }
237 let writer: Box<dyn Writer> =
238 Box::new(BackboneWriter { fd: writer_fd });
239
240 clients.insert(
241 client_fd,
242 ClientState {
243 id: client_id,
244 decoder: hdlc::Decoder::new(),
245 },
246 );
247
248 let info = InterfaceInfo {
249 id: client_id,
250 name: format!("BackboneInterface/{}", client_fd),
251 mode: constants::MODE_FULL,
252 out_capable: true,
253 in_capable: true,
254 bitrate: Some(1_000_000_000), announce_rate_target: None,
256 announce_rate_grace: 0,
257 announce_rate_penalty: 0.0,
258 announce_cap: constants::ANNOUNCE_CAP,
259 is_local_client: false,
260 wants_tunnel: false,
261 tunnel_id: None,
262 mtu: 65535,
263 ia_freq: 0.0,
264 started: 0.0,
265 ingress_control: true,
266 };
267
268 if tx
269 .send(Event::InterfaceUp(
270 client_id,
271 Some(writer),
272 Some(info),
273 ))
274 .is_err()
275 {
276 cleanup(epfd, &clients, listener_fd);
278 return Ok(());
279 }
280
281 }
282 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
283 Err(e) => {
284 log::warn!("[{}] accept error: {}", name, e);
285 break;
286 }
287 }
288 }
289 } else if clients.contains_key(&fd) {
290 let mut should_remove = false;
292 let mut client_id = InterfaceId(0);
293
294 if ev.events & libc::EPOLLIN as u32 != 0 {
295 let mut buf = [0u8; 4096];
296 let n = unsafe {
297 libc::recv(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len(), 0)
298 };
299
300 if n <= 0 {
301 if let Some(c) = clients.get(&fd) {
302 client_id = c.id;
303 }
304 should_remove = true;
305 } else if let Some(client) = clients.get_mut(&fd) {
306 client_id = client.id;
307 for frame in client.decoder.feed(&buf[..n as usize]) {
308 if tx
309 .send(Event::Frame {
310 interface_id: client_id,
311 data: frame,
312 })
313 .is_err()
314 {
315 cleanup(epfd, &clients, listener_fd);
316 return Ok(());
317 }
318 }
319 }
320 }
321
322 if ev.events & (libc::EPOLLHUP | libc::EPOLLERR) as u32 != 0 {
323 if let Some(c) = clients.get(&fd) {
324 client_id = c.id;
325 }
326 should_remove = true;
327 }
328
329 if should_remove {
330 log::info!(
331 "[{}] backbone client {} disconnected",
332 name,
333 client_id.0
334 );
335 unsafe {
336 libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
337 libc::close(fd);
338 }
339 clients.remove(&fd);
340 let _ = tx.send(Event::InterfaceDown(client_id));
341 }
342 }
343 }
344 }
345}
346
347fn set_tcp_keepalive(fd: RawFd) {
348 unsafe {
349 let one: libc::c_int = 1;
350 libc::setsockopt(
351 fd,
352 libc::SOL_SOCKET,
353 libc::SO_KEEPALIVE,
354 &one as *const _ as *const libc::c_void,
355 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
356 );
357 let idle: libc::c_int = 5;
358 libc::setsockopt(
359 fd,
360 libc::IPPROTO_TCP,
361 libc::TCP_KEEPIDLE,
362 &idle as *const _ as *const libc::c_void,
363 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
364 );
365 let interval: libc::c_int = 2;
366 libc::setsockopt(
367 fd,
368 libc::IPPROTO_TCP,
369 libc::TCP_KEEPINTVL,
370 &interval as *const _ as *const libc::c_void,
371 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
372 );
373 let cnt: libc::c_int = 12;
374 libc::setsockopt(
375 fd,
376 libc::IPPROTO_TCP,
377 libc::TCP_KEEPCNT,
378 &cnt as *const _ as *const libc::c_void,
379 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
380 );
381 }
382}
383
384fn cleanup(epfd: RawFd, clients: &HashMap<RawFd, ClientState>, listener_fd: RawFd) {
385 for (&fd, _) in clients {
386 unsafe {
387 libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
388 libc::close(fd);
389 }
390 }
391 unsafe {
392 libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, listener_fd, std::ptr::null_mut());
393 libc::close(epfd);
394 }
395}
396
397#[derive(Debug, Clone)]
403pub struct BackboneClientConfig {
404 pub name: String,
405 pub target_host: String,
406 pub target_port: u16,
407 pub interface_id: InterfaceId,
408 pub reconnect_wait: Duration,
409 pub max_reconnect_tries: Option<u32>,
410 pub connect_timeout: Duration,
411 pub transport_identity: Option<String>,
412}
413
414impl Default for BackboneClientConfig {
415 fn default() -> Self {
416 BackboneClientConfig {
417 name: String::new(),
418 target_host: "127.0.0.1".into(),
419 target_port: 4242,
420 interface_id: InterfaceId(0),
421 reconnect_wait: Duration::from_secs(5),
422 max_reconnect_tries: None,
423 connect_timeout: Duration::from_secs(5),
424 transport_identity: None,
425 }
426 }
427}
428
429struct BackboneClientWriter {
431 stream: TcpStream,
432}
433
434impl Writer for BackboneClientWriter {
435 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
436 self.stream.write_all(&hdlc::frame(data))
437 }
438}
439
440fn try_connect_client(config: &BackboneClientConfig) -> io::Result<TcpStream> {
442 let addr_str = format!("{}:{}", config.target_host, config.target_port);
443 let addr = addr_str
444 .to_socket_addrs()?
445 .next()
446 .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "no addresses resolved"))?;
447
448 let stream = TcpStream::connect_timeout(&addr, config.connect_timeout)?;
449 stream.set_nodelay(true)?;
450 set_tcp_keepalive(stream.as_raw_fd());
451 Ok(stream)
452}
453
454pub fn start_client(
456 config: BackboneClientConfig,
457 tx: EventSender,
458) -> io::Result<Box<dyn Writer>> {
459 let stream = try_connect_client(&config)?;
460 let reader_stream = stream.try_clone()?;
461 let writer_stream = stream.try_clone()?;
462
463 let id = config.interface_id;
464 log::info!(
465 "[{}] backbone client connected to {}:{}",
466 config.name,
467 config.target_host,
468 config.target_port
469 );
470
471 let _ = tx.send(Event::InterfaceUp(id, None, None));
473
474 thread::Builder::new()
475 .name(format!("backbone-client-{}", id.0))
476 .spawn(move || {
477 client_reader_loop(reader_stream, config, tx);
478 })?;
479
480 Ok(Box::new(BackboneClientWriter { stream: writer_stream }))
481}
482
483fn client_reader_loop(mut stream: TcpStream, config: BackboneClientConfig, tx: EventSender) {
486 let id = config.interface_id;
487 let mut decoder = hdlc::Decoder::new();
488 let mut buf = [0u8; 4096];
489
490 loop {
491 match stream.read(&mut buf) {
492 Ok(0) => {
493 log::warn!("[{}] connection closed", config.name);
494 let _ = tx.send(Event::InterfaceDown(id));
495 match client_reconnect(&config, &tx) {
496 Some(new_stream) => {
497 stream = new_stream;
498 decoder = hdlc::Decoder::new();
499 continue;
500 }
501 None => {
502 log::error!("[{}] reconnection failed, giving up", config.name);
503 return;
504 }
505 }
506 }
507 Ok(n) => {
508 for frame in decoder.feed(&buf[..n]) {
509 if tx
510 .send(Event::Frame {
511 interface_id: id,
512 data: frame,
513 })
514 .is_err()
515 {
516 return;
517 }
518 }
519 }
520 Err(e) => {
521 log::warn!("[{}] read error: {}", config.name, e);
522 let _ = tx.send(Event::InterfaceDown(id));
523 match client_reconnect(&config, &tx) {
524 Some(new_stream) => {
525 stream = new_stream;
526 decoder = hdlc::Decoder::new();
527 continue;
528 }
529 None => {
530 log::error!("[{}] reconnection failed, giving up", config.name);
531 return;
532 }
533 }
534 }
535 }
536 }
537}
538
539fn client_reconnect(config: &BackboneClientConfig, tx: &EventSender) -> Option<TcpStream> {
542 let mut attempts = 0u32;
543 loop {
544 thread::sleep(config.reconnect_wait);
545 attempts += 1;
546
547 if let Some(max) = config.max_reconnect_tries {
548 if attempts > max {
549 let _ = tx.send(Event::InterfaceDown(config.interface_id));
550 return None;
551 }
552 }
553
554 log::info!(
555 "[{}] reconnect attempt {} ...",
556 config.name,
557 attempts
558 );
559
560 match try_connect_client(config) {
561 Ok(new_stream) => {
562 let writer_stream = match new_stream.try_clone() {
563 Ok(s) => s,
564 Err(e) => {
565 log::warn!("[{}] failed to clone stream: {}", config.name, e);
566 continue;
567 }
568 };
569 log::info!("[{}] reconnected", config.name);
570 let new_writer: Box<dyn Writer> =
571 Box::new(BackboneClientWriter { stream: writer_stream });
572 let _ = tx.send(Event::InterfaceUp(config.interface_id, Some(new_writer), None));
573 return Some(new_stream);
574 }
575 Err(e) => {
576 log::warn!("[{}] reconnect failed: {}", config.name, e);
577 }
578 }
579 }
580}
581
582#[cfg(test)]
583mod tests {
584 use super::*;
585 use std::sync::mpsc;
586 use std::time::Duration;
587
588 fn find_free_port() -> u16 {
589 TcpListener::bind("127.0.0.1:0")
590 .unwrap()
591 .local_addr()
592 .unwrap()
593 .port()
594 }
595
596 #[test]
597 fn backbone_accept_connection() {
598 let port = find_free_port();
599 let (tx, rx) = mpsc::channel();
600 let next_id = Arc::new(AtomicU64::new(8000));
601
602 let config = BackboneConfig {
603 name: "test-backbone".into(),
604 listen_ip: "127.0.0.1".into(),
605 listen_port: port,
606 interface_id: InterfaceId(80),
607 };
608
609 start(config, tx, next_id).unwrap();
610 thread::sleep(Duration::from_millis(50));
611
612 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
613
614 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
615 match event {
616 Event::InterfaceUp(id, writer, info) => {
617 assert_eq!(id, InterfaceId(8000));
618 assert!(writer.is_some());
619 assert!(info.is_some());
620 let info = info.unwrap();
621 assert!(info.out_capable);
622 assert!(info.in_capable);
623 }
624 other => panic!("expected InterfaceUp, got {:?}", other),
625 }
626 }
627
628 #[test]
629 fn backbone_receive_frame() {
630 let port = find_free_port();
631 let (tx, rx) = mpsc::channel();
632 let next_id = Arc::new(AtomicU64::new(8100));
633
634 let config = BackboneConfig {
635 name: "test-backbone".into(),
636 listen_ip: "127.0.0.1".into(),
637 listen_port: port,
638 interface_id: InterfaceId(81),
639 };
640
641 start(config, tx, next_id).unwrap();
642 thread::sleep(Duration::from_millis(50));
643
644 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
645
646 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
648
649 let payload: Vec<u8> = (0..32).collect();
651 client.write_all(&hdlc::frame(&payload)).unwrap();
652
653 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
654 match event {
655 Event::Frame { interface_id, data } => {
656 assert_eq!(interface_id, InterfaceId(8100));
657 assert_eq!(data, payload);
658 }
659 other => panic!("expected Frame, got {:?}", other),
660 }
661 }
662
663 #[test]
664 fn backbone_send_to_client() {
665 let port = find_free_port();
666 let (tx, rx) = mpsc::channel();
667 let next_id = Arc::new(AtomicU64::new(8200));
668
669 let config = BackboneConfig {
670 name: "test-backbone".into(),
671 listen_ip: "127.0.0.1".into(),
672 listen_port: port,
673 interface_id: InterfaceId(82),
674 };
675
676 start(config, tx, next_id).unwrap();
677 thread::sleep(Duration::from_millis(50));
678
679 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
680 client.set_read_timeout(Some(Duration::from_secs(2))).unwrap();
681
682 let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
684 let mut writer = match event {
685 Event::InterfaceUp(_, Some(w), _) => w,
686 other => panic!("expected InterfaceUp with writer, got {:?}", other),
687 };
688
689 let payload: Vec<u8> = (0..24).collect();
691 writer.send_frame(&payload).unwrap();
692
693 let mut buf = [0u8; 256];
695 let n = client.read(&mut buf).unwrap();
696 let expected = hdlc::frame(&payload);
697 assert_eq!(&buf[..n], &expected[..]);
698 }
699
700 #[test]
701 fn backbone_multiple_clients() {
702 let port = find_free_port();
703 let (tx, rx) = mpsc::channel();
704 let next_id = Arc::new(AtomicU64::new(8300));
705
706 let config = BackboneConfig {
707 name: "test-backbone".into(),
708 listen_ip: "127.0.0.1".into(),
709 listen_port: port,
710 interface_id: InterfaceId(83),
711 };
712
713 start(config, tx, next_id).unwrap();
714 thread::sleep(Duration::from_millis(50));
715
716 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
717 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
718
719 let mut ids = Vec::new();
720 for _ in 0..2 {
721 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
722 match event {
723 Event::InterfaceUp(id, _, _) => ids.push(id),
724 other => panic!("expected InterfaceUp, got {:?}", other),
725 }
726 }
727
728 assert_eq!(ids.len(), 2);
729 assert_ne!(ids[0], ids[1]);
730 }
731
732 #[test]
733 fn backbone_client_disconnect() {
734 let port = find_free_port();
735 let (tx, rx) = mpsc::channel();
736 let next_id = Arc::new(AtomicU64::new(8400));
737
738 let config = BackboneConfig {
739 name: "test-backbone".into(),
740 listen_ip: "127.0.0.1".into(),
741 listen_port: port,
742 interface_id: InterfaceId(84),
743 };
744
745 start(config, tx, next_id).unwrap();
746 thread::sleep(Duration::from_millis(50));
747
748 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
749
750 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
752
753 drop(client);
755
756 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
758 assert!(
759 matches!(event, Event::InterfaceDown(InterfaceId(8400))),
760 "expected InterfaceDown(8400), got {:?}",
761 event
762 );
763 }
764
765 #[test]
766 fn backbone_epoll_multiplexing() {
767 let port = find_free_port();
768 let (tx, rx) = mpsc::channel();
769 let next_id = Arc::new(AtomicU64::new(8500));
770
771 let config = BackboneConfig {
772 name: "test-backbone".into(),
773 listen_ip: "127.0.0.1".into(),
774 listen_port: port,
775 interface_id: InterfaceId(85),
776 };
777
778 start(config, tx, next_id).unwrap();
779 thread::sleep(Duration::from_millis(50));
780
781 let mut client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
782 let mut client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
783
784 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
786 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
787
788 let payload1: Vec<u8> = (0..24).collect();
790 let payload2: Vec<u8> = (100..130).collect();
791 client1.write_all(&hdlc::frame(&payload1)).unwrap();
792 client2.write_all(&hdlc::frame(&payload2)).unwrap();
793
794 let mut received = Vec::new();
796 for _ in 0..2 {
797 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
798 match event {
799 Event::Frame { data, .. } => received.push(data),
800 other => panic!("expected Frame, got {:?}", other),
801 }
802 }
803 assert!(received.contains(&payload1));
804 assert!(received.contains(&payload2));
805 }
806
807 #[test]
808 fn backbone_bind_port() {
809 let port = find_free_port();
810 let (tx, _rx) = mpsc::channel();
811 let next_id = Arc::new(AtomicU64::new(8600));
812
813 let config = BackboneConfig {
814 name: "test-backbone".into(),
815 listen_ip: "127.0.0.1".into(),
816 listen_port: port,
817 interface_id: InterfaceId(86),
818 };
819
820 start(config, tx, next_id).unwrap();
822 }
823
824 #[test]
825 fn backbone_hdlc_fragmented() {
826 let port = find_free_port();
827 let (tx, rx) = mpsc::channel();
828 let next_id = Arc::new(AtomicU64::new(8700));
829
830 let config = BackboneConfig {
831 name: "test-backbone".into(),
832 listen_ip: "127.0.0.1".into(),
833 listen_port: port,
834 interface_id: InterfaceId(87),
835 };
836
837 start(config, tx, next_id).unwrap();
838 thread::sleep(Duration::from_millis(50));
839
840 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
841 client.set_nodelay(true).unwrap();
842
843 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
845
846 let payload: Vec<u8> = (0..32).collect();
848 let framed = hdlc::frame(&payload);
849 let mid = framed.len() / 2;
850
851 client.write_all(&framed[..mid]).unwrap();
852 thread::sleep(Duration::from_millis(50));
853 client.write_all(&framed[mid..]).unwrap();
854
855 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
857 match event {
858 Event::Frame { data, .. } => {
859 assert_eq!(data, payload);
860 }
861 other => panic!("expected Frame, got {:?}", other),
862 }
863 }
864
865 fn make_client_config(port: u16, id: u64) -> BackboneClientConfig {
870 BackboneClientConfig {
871 name: format!("test-bb-client-{}", port),
872 target_host: "127.0.0.1".into(),
873 target_port: port,
874 interface_id: InterfaceId(id),
875 reconnect_wait: Duration::from_millis(100),
876 max_reconnect_tries: Some(2),
877 connect_timeout: Duration::from_secs(2),
878 transport_identity: None,
879 }
880 }
881
882 #[test]
883 fn backbone_client_connect() {
884 let port = find_free_port();
885 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
886 let (tx, rx) = mpsc::channel();
887
888 let config = make_client_config(port, 9000);
889 let _writer = start_client(config, tx).unwrap();
890
891 let _server_stream = listener.accept().unwrap();
892
893 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
894 assert!(matches!(event, Event::InterfaceUp(InterfaceId(9000), _, _)));
895 }
896
897 #[test]
898 fn backbone_client_receive_frame() {
899 let port = find_free_port();
900 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
901 let (tx, rx) = mpsc::channel();
902
903 let config = make_client_config(port, 9100);
904 let _writer = start_client(config, tx).unwrap();
905
906 let (mut server_stream, _) = listener.accept().unwrap();
907
908 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
910
911 let payload: Vec<u8> = (0..32).collect();
913 server_stream.write_all(&hdlc::frame(&payload)).unwrap();
914
915 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
916 match event {
917 Event::Frame { interface_id, data } => {
918 assert_eq!(interface_id, InterfaceId(9100));
919 assert_eq!(data, payload);
920 }
921 other => panic!("expected Frame, got {:?}", other),
922 }
923 }
924
925 #[test]
926 fn backbone_client_send_frame() {
927 let port = find_free_port();
928 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
929 let (tx, _rx) = mpsc::channel();
930
931 let config = make_client_config(port, 9200);
932 let mut writer = start_client(config, tx).unwrap();
933
934 let (mut server_stream, _) = listener.accept().unwrap();
935 server_stream
936 .set_read_timeout(Some(Duration::from_secs(2)))
937 .unwrap();
938
939 let payload: Vec<u8> = (0..24).collect();
940 writer.send_frame(&payload).unwrap();
941
942 let mut buf = [0u8; 256];
943 let n = server_stream.read(&mut buf).unwrap();
944 let expected = hdlc::frame(&payload);
945 assert_eq!(&buf[..n], &expected[..]);
946 }
947
948 #[test]
949 fn backbone_client_reconnect() {
950 let port = find_free_port();
951 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
952 listener.set_nonblocking(false).unwrap();
953 let (tx, rx) = mpsc::channel();
954
955 let config = make_client_config(port, 9300);
956 let _writer = start_client(config, tx).unwrap();
957
958 let (server_stream, _) = listener.accept().unwrap();
960
961 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
963
964 drop(server_stream);
965
966 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
968 assert!(matches!(event, Event::InterfaceDown(InterfaceId(9300))));
969
970 let _server_stream2 = listener.accept().unwrap();
972
973 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
975 assert!(matches!(event, Event::InterfaceUp(InterfaceId(9300), _, _)));
976 }
977}