1use std::collections::{HashMap, VecDeque};
15use std::io;
16use std::net::{Ipv6Addr, SocketAddrV6, UdpSocket};
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18use std::sync::{Arc, Mutex};
19use std::thread;
20use std::time::Duration;
21
22use rns_core::transport::types::InterfaceId;
23
24use crate::event::{Event, EventSender};
25use crate::interface::Writer;
26
27pub const DEFAULT_DISCOVERY_PORT: u16 = 29716;
31
32pub const DEFAULT_DATA_PORT: u16 = 42671;
34
35pub const DEFAULT_GROUP_ID: &[u8] = b"reticulum";
37
38pub const DEFAULT_IFAC_SIZE: usize = 16;
40
41pub const HW_MTU: usize = 1196;
43
44pub const SCOPE_LINK: &str = "2";
46pub const SCOPE_ADMIN: &str = "4";
48pub const SCOPE_SITE: &str = "5";
50pub const SCOPE_ORGANISATION: &str = "8";
52pub const SCOPE_GLOBAL: &str = "e";
54
55pub const MULTICAST_PERMANENT_ADDRESS_TYPE: &str = "0";
57pub const MULTICAST_TEMPORARY_ADDRESS_TYPE: &str = "1";
59
60pub const PEERING_TIMEOUT: f64 = 22.0;
62
63pub const ANNOUNCE_INTERVAL: f64 = 1.6;
65
66pub const PEER_JOB_INTERVAL: f64 = 4.0;
68
69pub const MCAST_ECHO_TIMEOUT: f64 = 6.5;
71
72pub const BITRATE_GUESS: u64 = 10_000_000;
74
75pub const MULTI_IF_DEQUE_LEN: usize = 48;
77
78pub const MULTI_IF_DEQUE_TTL: f64 = 0.75;
80
81pub const REVERSE_PEERING_MULTIPLIER: f64 = 3.25;
83
84pub const ALL_IGNORE_IFS: &[&str] = &["lo0"];
86
87#[derive(Debug, Clone)]
91pub struct AutoConfig {
92 pub name: String,
93 pub group_id: Vec<u8>,
94 pub discovery_scope: String,
95 pub discovery_port: u16,
96 pub data_port: u16,
97 pub multicast_address_type: String,
98 pub allowed_interfaces: Vec<String>,
99 pub ignored_interfaces: Vec<String>,
100 pub configured_bitrate: u64,
101 pub interface_id: InterfaceId,
103 pub runtime: Arc<Mutex<AutoRuntime>>,
104}
105
106#[derive(Debug, Clone)]
107pub struct AutoRuntime {
108 pub announce_interval_secs: f64,
109 pub peer_timeout_secs: f64,
110 pub peer_job_interval_secs: f64,
111}
112
113impl AutoRuntime {
114 pub fn from_config(_config: &AutoConfig) -> Self {
115 Self {
116 announce_interval_secs: ANNOUNCE_INTERVAL,
117 peer_timeout_secs: PEERING_TIMEOUT,
118 peer_job_interval_secs: PEER_JOB_INTERVAL,
119 }
120 }
121}
122
123#[derive(Debug, Clone)]
124pub struct AutoRuntimeConfigHandle {
125 pub interface_name: String,
126 pub runtime: Arc<Mutex<AutoRuntime>>,
127 pub startup: AutoRuntime,
128}
129
130impl Default for AutoConfig {
131 fn default() -> Self {
132 let mut config = AutoConfig {
133 name: String::new(),
134 group_id: DEFAULT_GROUP_ID.to_vec(),
135 discovery_scope: SCOPE_LINK.to_string(),
136 discovery_port: DEFAULT_DISCOVERY_PORT,
137 data_port: DEFAULT_DATA_PORT,
138 multicast_address_type: MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string(),
139 allowed_interfaces: Vec::new(),
140 ignored_interfaces: Vec::new(),
141 configured_bitrate: BITRATE_GUESS,
142 interface_id: InterfaceId(0),
143 runtime: Arc::new(Mutex::new(AutoRuntime {
144 announce_interval_secs: ANNOUNCE_INTERVAL,
145 peer_timeout_secs: PEERING_TIMEOUT,
146 peer_job_interval_secs: PEER_JOB_INTERVAL,
147 })),
148 };
149 let startup = AutoRuntime::from_config(&config);
150 config.runtime = Arc::new(Mutex::new(startup));
151 config
152 }
153}
154
155pub fn derive_multicast_address(group_id: &[u8], address_type: &str, scope: &str) -> String {
165 let group_hash = rns_crypto::sha256::sha256(group_id);
166 let g = &group_hash;
167
168 let w1 = (g[2] as u16) << 8 | g[3] as u16;
170 let w2 = (g[4] as u16) << 8 | g[5] as u16;
171 let w3 = (g[6] as u16) << 8 | g[7] as u16;
172 let w4 = (g[8] as u16) << 8 | g[9] as u16;
173 let w5 = (g[10] as u16) << 8 | g[11] as u16;
174 let w6 = (g[12] as u16) << 8 | g[13] as u16;
175
176 format!(
177 "ff{}{}:0:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
178 address_type, scope, w1, w2, w3, w4, w5, w6
179 )
180}
181
182pub fn parse_multicast_addr(addr: &str) -> Option<Ipv6Addr> {
184 addr.parse::<Ipv6Addr>().ok()
185}
186
187pub fn compute_discovery_token(group_id: &[u8], link_local_addr: &str) -> [u8; 32] {
191 let mut input = group_id.to_vec();
192 input.extend_from_slice(link_local_addr.as_bytes());
193 rns_crypto::sha256::sha256(&input)
194}
195
196#[derive(Debug, Clone)]
200pub struct LocalInterface {
201 pub name: String,
202 pub link_local_addr: String,
203 pub index: u32,
204}
205
206pub fn enumerate_interfaces(allowed: &[String], ignored: &[String]) -> Vec<LocalInterface> {
210 let mut result = Vec::new();
211
212 unsafe {
213 let mut ifaddrs: *mut libc::ifaddrs = std::ptr::null_mut();
214 if libc::getifaddrs(&mut ifaddrs) != 0 {
215 return result;
216 }
217
218 let mut current = ifaddrs;
219 while !current.is_null() {
220 let ifa = &*current;
221 current = ifa.ifa_next;
222
223 if ifa.ifa_addr.is_null() {
225 continue;
226 }
227
228 if (*ifa.ifa_addr).sa_family as i32 != libc::AF_INET6 {
230 continue;
231 }
232
233 let name = match std::ffi::CStr::from_ptr(ifa.ifa_name).to_str() {
235 Ok(s) => s.to_string(),
236 Err(_) => continue,
237 };
238
239 if ALL_IGNORE_IFS.iter().any(|&ig| ig == name) {
241 if !allowed.iter().any(|a| a == &name) {
242 continue;
243 }
244 }
245
246 if ignored.iter().any(|ig| ig == &name) {
248 continue;
249 }
250
251 if !allowed.is_empty() && !allowed.iter().any(|a| a == &name) {
253 continue;
254 }
255
256 let sa6 = ifa.ifa_addr as *const libc::sockaddr_in6;
258 let addr_bytes = (*sa6).sin6_addr.s6_addr;
259 let ipv6 = Ipv6Addr::from(addr_bytes);
260
261 let octets = ipv6.octets();
263 if octets[0] != 0xfe || (octets[1] & 0xc0) != 0x80 {
264 continue;
265 }
266
267 let addr_str = format!("{}", ipv6);
269
270 let index = libc::if_nametoindex(ifa.ifa_name);
272 if index == 0 {
273 continue;
274 }
275
276 if result.iter().any(|li: &LocalInterface| li.name == name) {
278 continue;
279 }
280
281 result.push(LocalInterface {
282 name,
283 link_local_addr: addr_str,
284 index,
285 });
286 }
287
288 libc::freeifaddrs(ifaddrs);
289 }
290
291 result
292}
293
294struct AutoPeer {
298 interface_id: InterfaceId,
299 #[allow(dead_code)]
300 link_local_addr: String,
301 #[allow(dead_code)]
302 ifname: String,
303 last_heard: f64,
304}
305
306struct UdpWriter {
308 socket: UdpSocket,
309 target: SocketAddrV6,
310}
311
312impl Writer for UdpWriter {
313 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
314 self.socket.send_to(data, self.target)?;
315 Ok(())
316 }
317}
318
319struct SharedState {
321 peers: HashMap<String, AutoPeer>,
323 link_local_addresses: Vec<String>,
325 dedup_deque: VecDeque<([u8; 32], f64)>,
327 online: bool,
329 next_id: Arc<AtomicU64>,
331}
332
333impl SharedState {
334 fn new(next_id: Arc<AtomicU64>) -> Self {
335 SharedState {
336 peers: HashMap::new(),
337 link_local_addresses: Vec::new(),
338 dedup_deque: VecDeque::new(),
339 online: false,
340 next_id,
341 }
342 }
343
344 fn is_duplicate(&self, hash: &[u8; 32], now: f64) -> bool {
346 for (h, ts) in &self.dedup_deque {
347 if h == hash && now < *ts + MULTI_IF_DEQUE_TTL {
348 return true;
349 }
350 }
351 false
352 }
353
354 fn add_dedup(&mut self, hash: [u8; 32], now: f64) {
356 self.dedup_deque.push_back((hash, now));
357 while self.dedup_deque.len() > MULTI_IF_DEQUE_LEN {
358 self.dedup_deque.pop_front();
359 }
360 }
361
362 fn refresh_peer(&mut self, addr: &str, now: f64) {
364 if let Some(peer) = self.peers.get_mut(addr) {
365 peer.last_heard = now;
366 }
367 }
368}
369
370pub fn start(
378 config: AutoConfig,
379 tx: EventSender,
380 next_dynamic_id: Arc<AtomicU64>,
381) -> io::Result<()> {
382 let interfaces = enumerate_interfaces(&config.allowed_interfaces, &config.ignored_interfaces);
383
384 if interfaces.is_empty() {
385 log::warn!(
386 "[{}] No suitable IPv6 link-local interfaces found",
387 config.name,
388 );
389 return Ok(());
390 }
391
392 let group_id = config.group_id.clone();
393 let mcast_addr_str = derive_multicast_address(
394 &group_id,
395 &config.multicast_address_type,
396 &config.discovery_scope,
397 );
398
399 let mcast_ip = match parse_multicast_addr(&mcast_addr_str) {
400 Some(ip) => ip,
401 None => {
402 return Err(io::Error::new(
403 io::ErrorKind::InvalidData,
404 format!("Invalid multicast address: {}", mcast_addr_str),
405 ));
406 }
407 };
408
409 let discovery_port = config.discovery_port;
410 let unicast_discovery_port = config.discovery_port + 1;
411 let data_port = config.data_port;
412 let name = config.name.clone();
413 let configured_bitrate = config.configured_bitrate;
414 {
415 let startup = AutoRuntime::from_config(&config);
416 *config.runtime.lock().unwrap() = startup;
417 }
418 let runtime = Arc::clone(&config.runtime);
419
420 let shared = Arc::new(Mutex::new(SharedState::new(next_dynamic_id)));
421 let running = Arc::new(AtomicBool::new(true));
422
423 {
425 let mut state = shared.lock().unwrap();
426 for iface in &interfaces {
427 state
428 .link_local_addresses
429 .push(iface.link_local_addr.clone());
430 }
431 }
432
433 log::info!(
434 "[{}] AutoInterface starting with {} local interfaces, multicast {}",
435 name,
436 interfaces.len(),
437 mcast_addr_str,
438 );
439
440 for local_iface in &interfaces {
442 let ifname = local_iface.name.clone();
443 let link_local = local_iface.link_local_addr.clone();
444 let if_index = local_iface.index;
445
446 let mcast_socket = create_multicast_recv_socket(&mcast_ip, discovery_port, if_index)?;
448
449 let unicast_socket =
451 create_unicast_recv_socket(&link_local, unicast_discovery_port, if_index)?;
452
453 {
455 let group_id = group_id.clone();
456 let link_local = link_local.clone();
457 let running = running.clone();
458 let name = name.clone();
459 let runtime = runtime.clone();
460
461 thread::Builder::new()
462 .name(format!("auto-disc-tx-{}", ifname))
463 .spawn(move || {
464 discovery_sender_loop(
465 &group_id,
466 &link_local,
467 &mcast_ip,
468 discovery_port,
469 if_index,
470 runtime,
471 &running,
472 &name,
473 );
474 })?;
475 }
476
477 {
479 let group_id = group_id.clone();
480 let shared = shared.clone();
481 let tx = tx.clone();
482 let running = running.clone();
483 let name = name.clone();
484 let runtime = runtime.clone();
485
486 thread::Builder::new()
487 .name(format!("auto-disc-rx-{}", ifname))
488 .spawn(move || {
489 discovery_receiver_loop(
490 mcast_socket,
491 &group_id,
492 shared,
493 tx,
494 &running,
495 &name,
496 data_port,
497 configured_bitrate,
498 runtime,
499 );
500 })?;
501 }
502
503 {
505 let group_id = group_id.clone();
506 let shared = shared.clone();
507 let tx = tx.clone();
508 let running = running.clone();
509 let name = name.clone();
510 let runtime = runtime.clone();
511
512 thread::Builder::new()
513 .name(format!("auto-udisc-rx-{}", ifname))
514 .spawn(move || {
515 discovery_receiver_loop(
516 unicast_socket,
517 &group_id,
518 shared,
519 tx,
520 &running,
521 &name,
522 data_port,
523 configured_bitrate,
524 runtime,
525 );
526 })?;
527 }
528
529 {
531 let link_local = local_iface.link_local_addr.clone();
532 let shared = shared.clone();
533 let tx = tx.clone();
534 let running = running.clone();
535 let name = name.clone();
536
537 let data_socket = create_data_recv_socket(&link_local, data_port, if_index)?;
538
539 thread::Builder::new()
540 .name(format!("auto-data-rx-{}", local_iface.name))
541 .spawn(move || {
542 data_receiver_loop(data_socket, shared, tx, &running, &name);
543 })?;
544 }
545 }
546
547 {
549 let shared = shared.clone();
550 let tx = tx.clone();
551 let running = running.clone();
552 let name = name.clone();
553 let runtime = runtime.clone();
554
555 thread::Builder::new()
556 .name(format!("auto-peer-jobs-{}", name))
557 .spawn(move || {
558 peer_jobs_loop(shared, tx, runtime, &running, &name);
559 })?;
560 }
561
562 let announce_interval = runtime.lock().unwrap().announce_interval_secs;
564 let peering_wait = Duration::from_secs_f64(announce_interval * 1.2);
565 thread::sleep(peering_wait);
566
567 {
569 let mut state = shared.lock().unwrap();
570 state.online = true;
571 }
572
573 log::info!("[{}] AutoInterface online", config.name);
574
575 Ok(())
576}
577
578fn create_multicast_recv_socket(
581 mcast_ip: &Ipv6Addr,
582 port: u16,
583 if_index: u32,
584) -> io::Result<UdpSocket> {
585 let socket = socket2::Socket::new(
586 socket2::Domain::IPV6,
587 socket2::Type::DGRAM,
588 Some(socket2::Protocol::UDP),
589 )?;
590
591 socket.set_reuse_address(true)?;
592 #[cfg(not(target_os = "windows"))]
593 socket.set_reuse_port(true)?;
594
595 let bind_addr = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, port, 0, if_index);
597 socket.bind(&bind_addr.into())?;
598
599 socket.join_multicast_v6(mcast_ip, if_index)?;
601
602 socket.set_nonblocking(false)?;
603 let std_socket: UdpSocket = socket.into();
604 std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
605 Ok(std_socket)
606}
607
608fn create_unicast_recv_socket(link_local: &str, port: u16, if_index: u32) -> io::Result<UdpSocket> {
609 let ip: Ipv6Addr = link_local
610 .parse()
611 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("bad IPv6: {}", e)))?;
612
613 let socket = socket2::Socket::new(
614 socket2::Domain::IPV6,
615 socket2::Type::DGRAM,
616 Some(socket2::Protocol::UDP),
617 )?;
618
619 socket.set_reuse_address(true)?;
620 #[cfg(not(target_os = "windows"))]
621 socket.set_reuse_port(true)?;
622
623 let bind_addr = SocketAddrV6::new(ip, port, 0, if_index);
624 socket.bind(&bind_addr.into())?;
625
626 socket.set_nonblocking(false)?;
627 let std_socket: UdpSocket = socket.into();
628 std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
629 Ok(std_socket)
630}
631
632fn create_data_recv_socket(link_local: &str, port: u16, if_index: u32) -> io::Result<UdpSocket> {
633 let ip: Ipv6Addr = link_local
634 .parse()
635 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("bad IPv6: {}", e)))?;
636
637 let socket = socket2::Socket::new(
638 socket2::Domain::IPV6,
639 socket2::Type::DGRAM,
640 Some(socket2::Protocol::UDP),
641 )?;
642
643 socket.set_reuse_address(true)?;
644 #[cfg(not(target_os = "windows"))]
645 socket.set_reuse_port(true)?;
646
647 let bind_addr = SocketAddrV6::new(ip, port, 0, if_index);
648 socket.bind(&bind_addr.into())?;
649
650 socket.set_nonblocking(false)?;
651 let std_socket: UdpSocket = socket.into();
652 std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
653 Ok(std_socket)
654}
655
656fn discovery_sender_loop(
660 group_id: &[u8],
661 link_local_addr: &str,
662 mcast_ip: &Ipv6Addr,
663 discovery_port: u16,
664 if_index: u32,
665 runtime: Arc<Mutex<AutoRuntime>>,
666 running: &AtomicBool,
667 name: &str,
668) {
669 let token = compute_discovery_token(group_id, link_local_addr);
670
671 while running.load(Ordering::Relaxed) {
672 if let Ok(socket) = UdpSocket::bind("[::]:0") {
674 let if_bytes = if_index.to_ne_bytes();
676 unsafe {
677 libc::setsockopt(
678 socket_fd(&socket),
679 libc::IPPROTO_IPV6,
680 libc::IPV6_MULTICAST_IF,
681 if_bytes.as_ptr() as *const libc::c_void,
682 4,
683 );
684 }
685
686 let target = SocketAddrV6::new(*mcast_ip, discovery_port, 0, 0);
687 if let Err(e) = socket.send_to(&token, target) {
688 log::debug!("[{}] multicast send error: {}", name, e);
689 }
690 }
691
692 let sleep_dur =
693 Duration::from_secs_f64(runtime.lock().unwrap().announce_interval_secs.max(0.1));
694 thread::sleep(sleep_dur);
695 }
696}
697
698fn discovery_receiver_loop(
700 socket: UdpSocket,
701 group_id: &[u8],
702 shared: Arc<Mutex<SharedState>>,
703 tx: EventSender,
704 running: &AtomicBool,
705 name: &str,
706 data_port: u16,
707 configured_bitrate: u64,
708 runtime: Arc<Mutex<AutoRuntime>>,
709) {
710 let mut buf = [0u8; 1024];
711
712 while running.load(Ordering::Relaxed) {
713 match socket.recv_from(&mut buf) {
714 Ok((n, src)) => {
715 if n < 32 {
716 continue;
717 }
718
719 let src_addr = match src {
721 std::net::SocketAddr::V6(v6) => v6,
722 _ => continue,
723 };
724 let src_ip = format!("{}", src_addr.ip());
725
726 let peering_hash = &buf[..32];
727 let expected = compute_discovery_token(group_id, &src_ip);
728
729 if peering_hash != expected {
730 log::debug!("[{}] invalid peering hash from {}", name, src_ip);
731 continue;
732 }
733
734 let state = shared.lock().unwrap();
736 if !state.online {
737 }
740
741 if state.link_local_addresses.contains(&src_ip) {
743 drop(state);
745 continue;
746 }
747
748 if state.peers.contains_key(&src_ip) {
750 let now = crate::time::now();
751 drop(state);
752 let mut state = shared.lock().unwrap();
753 state.refresh_peer(&src_ip, now);
754 continue;
755 }
756 drop(state);
757
758 add_peer(
760 &shared,
761 &tx,
762 &src_ip,
763 data_port,
764 name,
765 configured_bitrate,
766 &runtime,
767 );
768 }
769 Err(ref e)
770 if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut =>
771 {
772 continue;
774 }
775 Err(e) => {
776 log::warn!("[{}] discovery recv error: {}", name, e);
777 if !running.load(Ordering::Relaxed) {
778 return;
779 }
780 thread::sleep(Duration::from_millis(100));
781 }
782 }
783 }
784}
785
786fn add_peer(
788 shared: &Arc<Mutex<SharedState>>,
789 tx: &EventSender,
790 peer_addr: &str,
791 data_port: u16,
792 name: &str,
793 configured_bitrate: u64,
794 _runtime: &Arc<Mutex<AutoRuntime>>,
795) {
796 let peer_ip: Ipv6Addr = match peer_addr.parse() {
797 Ok(ip) => ip,
798 Err(_) => return,
799 };
800
801 let send_socket = match UdpSocket::bind("[::]:0") {
803 Ok(s) => s,
804 Err(e) => {
805 log::warn!(
806 "[{}] failed to create writer for peer {}: {}",
807 name,
808 peer_addr,
809 e
810 );
811 return;
812 }
813 };
814
815 let target = SocketAddrV6::new(peer_ip, data_port, 0, 0);
816
817 let mut state = shared.lock().unwrap();
818
819 if state.peers.contains_key(peer_addr) {
821 state.refresh_peer(peer_addr, crate::time::now());
822 return;
823 }
824
825 let peer_id = InterfaceId(state.next_id.fetch_add(1, Ordering::Relaxed));
826
827 let driver_writer: Box<dyn Writer> = Box::new(UdpWriter {
829 socket: send_socket,
830 target,
831 });
832
833 let peer_info = rns_core::transport::types::InterfaceInfo {
834 id: peer_id,
835 name: format!("{}:{}", name, peer_addr),
836 mode: rns_core::constants::MODE_FULL,
837 out_capable: true,
838 in_capable: true,
839 bitrate: Some(configured_bitrate),
840 announce_rate_target: None,
841 announce_rate_grace: 0,
842 announce_rate_penalty: 0.0,
843 announce_cap: rns_core::constants::ANNOUNCE_CAP,
844 is_local_client: false,
845 wants_tunnel: false,
846 tunnel_id: None,
847 mtu: 1400,
848 ia_freq: 0.0,
849 started: 0.0,
850 ingress_control: true,
851 };
852
853 let now = crate::time::now();
854 state.peers.insert(
855 peer_addr.to_string(),
856 AutoPeer {
857 interface_id: peer_id,
858 link_local_addr: peer_addr.to_string(),
859 ifname: String::new(),
860 last_heard: now,
861 },
862 );
863
864 log::info!(
865 "[{}] Peer discovered: {} (id={})",
866 name,
867 peer_addr,
868 peer_id.0
869 );
870
871 let _ = tx.send(Event::InterfaceUp(
873 peer_id,
874 Some(driver_writer),
875 Some(peer_info),
876 ));
877}
878
879fn data_receiver_loop(
881 socket: UdpSocket,
882 shared: Arc<Mutex<SharedState>>,
883 tx: EventSender,
884 running: &AtomicBool,
885 name: &str,
886) {
887 let mut buf = [0u8; HW_MTU + 64]; while running.load(Ordering::Relaxed) {
890 match socket.recv_from(&mut buf) {
891 Ok((n, src)) => {
892 if n == 0 {
893 continue;
894 }
895
896 let src_addr = match src {
897 std::net::SocketAddr::V6(v6) => v6,
898 _ => continue,
899 };
900 let src_ip = format!("{}", src_addr.ip());
901 let data = &buf[..n];
902
903 let now = crate::time::now();
904 let data_hash = rns_crypto::sha256::sha256(data);
905
906 let mut state = shared.lock().unwrap();
907
908 if !state.online {
909 continue;
910 }
911
912 if state.is_duplicate(&data_hash, now) {
914 continue;
915 }
916 state.add_dedup(data_hash, now);
917
918 state.refresh_peer(&src_ip, now);
920
921 let iface_id = match state.peers.get(&src_ip) {
923 Some(peer) => peer.interface_id,
924 None => {
925 continue;
927 }
928 };
929
930 drop(state);
931
932 if tx
933 .send(Event::Frame {
934 interface_id: iface_id,
935 data: data.to_vec(),
936 })
937 .is_err()
938 {
939 return;
940 }
941 }
942 Err(ref e)
943 if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut =>
944 {
945 continue;
946 }
947 Err(e) => {
948 log::warn!("[{}] data recv error: {}", name, e);
949 if !running.load(Ordering::Relaxed) {
950 return;
951 }
952 thread::sleep(Duration::from_millis(100));
953 }
954 }
955 }
956}
957
958fn peer_jobs_loop(
960 shared: Arc<Mutex<SharedState>>,
961 tx: EventSender,
962 runtime: Arc<Mutex<AutoRuntime>>,
963 running: &AtomicBool,
964 name: &str,
965) {
966 while running.load(Ordering::Relaxed) {
967 let interval =
968 Duration::from_secs_f64(runtime.lock().unwrap().peer_job_interval_secs.max(0.1));
969 thread::sleep(interval);
970
971 let now = crate::time::now();
972 let mut timed_out = Vec::new();
973 let peer_timeout_secs = runtime.lock().unwrap().peer_timeout_secs;
974
975 {
976 let state = shared.lock().unwrap();
977 for (addr, peer) in &state.peers {
978 if now > peer.last_heard + peer_timeout_secs {
979 timed_out.push((addr.clone(), peer.interface_id));
980 }
981 }
982 }
983
984 for (addr, iface_id) in &timed_out {
985 log::info!("[{}] Peer timed out: {}", name, addr);
986 let mut state = shared.lock().unwrap();
987 state.peers.remove(addr.as_str());
988 let _ = tx.send(Event::InterfaceDown(*iface_id));
989 }
990 }
991}
992
993#[cfg(unix)]
997fn socket_fd(socket: &UdpSocket) -> i32 {
998 use std::os::unix::io::AsRawFd;
999 socket.as_raw_fd()
1000}
1001
1002#[cfg(not(unix))]
1003fn socket_fd(_socket: &UdpSocket) -> i32 {
1004 0
1005}
1006
1007use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
1010
1011pub struct AutoFactory;
1013
1014impl InterfaceFactory for AutoFactory {
1015 fn type_name(&self) -> &str {
1016 "AutoInterface"
1017 }
1018
1019 fn parse_config(
1020 &self,
1021 name: &str,
1022 id: InterfaceId,
1023 params: &HashMap<String, String>,
1024 ) -> Result<Box<dyn InterfaceConfigData>, String> {
1025 let group_id = params
1026 .get("group_id")
1027 .map(|v| v.as_bytes().to_vec())
1028 .unwrap_or_else(|| DEFAULT_GROUP_ID.to_vec());
1029
1030 let discovery_scope = params
1031 .get("discovery_scope")
1032 .map(|v| match v.to_lowercase().as_str() {
1033 "link" => SCOPE_LINK.to_string(),
1034 "admin" => SCOPE_ADMIN.to_string(),
1035 "site" => SCOPE_SITE.to_string(),
1036 "organisation" | "organization" => SCOPE_ORGANISATION.to_string(),
1037 "global" => SCOPE_GLOBAL.to_string(),
1038 _ => v.clone(),
1039 })
1040 .unwrap_or_else(|| SCOPE_LINK.to_string());
1041
1042 let discovery_port = params
1043 .get("discovery_port")
1044 .and_then(|v| v.parse().ok())
1045 .unwrap_or(DEFAULT_DISCOVERY_PORT);
1046
1047 let data_port = params
1048 .get("data_port")
1049 .and_then(|v| v.parse().ok())
1050 .unwrap_or(DEFAULT_DATA_PORT);
1051
1052 let multicast_address_type = params
1053 .get("multicast_address_type")
1054 .map(|v| match v.to_lowercase().as_str() {
1055 "permanent" => MULTICAST_PERMANENT_ADDRESS_TYPE.to_string(),
1056 "temporary" => MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string(),
1057 _ => v.clone(),
1058 })
1059 .unwrap_or_else(|| MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string());
1060
1061 let configured_bitrate = params
1062 .get("configured_bitrate")
1063 .or_else(|| params.get("bitrate"))
1064 .and_then(|v| v.parse().ok())
1065 .unwrap_or(BITRATE_GUESS);
1066
1067 let allowed_interfaces = params
1068 .get("devices")
1069 .or_else(|| params.get("allowed_interfaces"))
1070 .map(|v| {
1071 v.split(',')
1072 .map(|s| s.trim().to_string())
1073 .filter(|s| !s.is_empty())
1074 .collect()
1075 })
1076 .unwrap_or_default();
1077
1078 let ignored_interfaces = params
1079 .get("ignored_devices")
1080 .or_else(|| params.get("ignored_interfaces"))
1081 .map(|v| {
1082 v.split(',')
1083 .map(|s| s.trim().to_string())
1084 .filter(|s| !s.is_empty())
1085 .collect()
1086 })
1087 .unwrap_or_default();
1088
1089 Ok(Box::new(AutoConfig {
1090 name: name.to_string(),
1091 group_id,
1092 discovery_scope,
1093 discovery_port,
1094 data_port,
1095 multicast_address_type,
1096 allowed_interfaces,
1097 ignored_interfaces,
1098 configured_bitrate,
1099 interface_id: id,
1100 runtime: Arc::new(Mutex::new(AutoRuntime {
1101 announce_interval_secs: ANNOUNCE_INTERVAL,
1102 peer_timeout_secs: PEERING_TIMEOUT,
1103 peer_job_interval_secs: PEER_JOB_INTERVAL,
1104 })),
1105 }))
1106 }
1107
1108 fn start(
1109 &self,
1110 config: Box<dyn InterfaceConfigData>,
1111 ctx: StartContext,
1112 ) -> std::io::Result<StartResult> {
1113 let auto_config = *config.into_any().downcast::<AutoConfig>().map_err(|_| {
1114 std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
1115 })?;
1116
1117 start(auto_config, ctx.tx, ctx.next_dynamic_id)?;
1118 Ok(StartResult::Listener { control: None })
1119 }
1120}
1121
1122pub(crate) fn auto_runtime_handle_from_config(config: &AutoConfig) -> AutoRuntimeConfigHandle {
1123 AutoRuntimeConfigHandle {
1124 interface_name: config.name.clone(),
1125 runtime: Arc::clone(&config.runtime),
1126 startup: AutoRuntime::from_config(config),
1127 }
1128}
1129
1130#[cfg(test)]
1133mod tests {
1134 use super::*;
1135
1136 #[test]
1139 fn multicast_address_default_group() {
1140 let addr = derive_multicast_address(
1142 DEFAULT_GROUP_ID,
1143 MULTICAST_TEMPORARY_ADDRESS_TYPE,
1144 SCOPE_LINK,
1145 );
1146 assert_eq!(addr, "ff12:0:d70b:fb1c:16e4:5e39:485e:31e1");
1147 }
1148
1149 #[test]
1150 fn multicast_address_custom_group() {
1151 let addr =
1152 derive_multicast_address(b"testgroup", MULTICAST_TEMPORARY_ADDRESS_TYPE, SCOPE_LINK);
1153 assert!(addr.starts_with("ff12:0:"));
1155 assert_ne!(addr, "ff12:0:d70b:fb1c:16e4:5e39:485e:31e1");
1157 }
1158
1159 #[test]
1160 fn multicast_address_scope_admin() {
1161 let addr = derive_multicast_address(
1162 DEFAULT_GROUP_ID,
1163 MULTICAST_TEMPORARY_ADDRESS_TYPE,
1164 SCOPE_ADMIN,
1165 );
1166 assert!(addr.starts_with("ff14:0:"));
1167 }
1168
1169 #[test]
1170 fn multicast_address_permanent_type() {
1171 let addr = derive_multicast_address(
1172 DEFAULT_GROUP_ID,
1173 MULTICAST_PERMANENT_ADDRESS_TYPE,
1174 SCOPE_LINK,
1175 );
1176 assert!(addr.starts_with("ff02:0:"));
1177 }
1178
1179 #[test]
1180 fn multicast_address_parseable() {
1181 let addr = derive_multicast_address(
1182 DEFAULT_GROUP_ID,
1183 MULTICAST_TEMPORARY_ADDRESS_TYPE,
1184 SCOPE_LINK,
1185 );
1186 let ip = parse_multicast_addr(&addr);
1187 assert!(ip.is_some());
1188 assert!(ip.unwrap().is_multicast());
1189 }
1190
1191 #[test]
1194 fn discovery_token_interop() {
1195 let token = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::1");
1197 let expected = "97b25576749ea936b0d8a8536ffaf442d157cf47d460dcf13c48b7bd18b6c163";
1198 let got = token
1199 .iter()
1200 .map(|b| format!("{:02x}", b))
1201 .collect::<String>();
1202 assert_eq!(got, expected);
1203 }
1204
1205 #[test]
1206 fn discovery_token_interop_2() {
1207 let token = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::dead:beef:1234:5678");
1209 let expected = "46b6ec7595504b6a35f06bd4bfff71567fb82fcf2706cd361bab20409c42d072";
1210 let got = token
1211 .iter()
1212 .map(|b| format!("{:02x}", b))
1213 .collect::<String>();
1214 assert_eq!(got, expected);
1215 }
1216
1217 #[test]
1218 fn discovery_token_different_groups() {
1219 let t1 = compute_discovery_token(b"reticulum", "fe80::1");
1220 let t2 = compute_discovery_token(b"othergroup", "fe80::1");
1221 assert_ne!(t1, t2);
1222 }
1223
1224 #[test]
1225 fn discovery_token_different_addrs() {
1226 let t1 = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::1");
1227 let t2 = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::2");
1228 assert_ne!(t1, t2);
1229 }
1230
1231 #[test]
1234 fn dedup_basic() {
1235 let next_id = Arc::new(AtomicU64::new(1));
1236 let mut state = SharedState::new(next_id);
1237
1238 let hash = [0xAA; 32];
1239 let now = 1000.0;
1240
1241 assert!(!state.is_duplicate(&hash, now));
1242 state.add_dedup(hash, now);
1243 assert!(state.is_duplicate(&hash, now));
1244 }
1245
1246 #[test]
1247 fn dedup_expired() {
1248 let next_id = Arc::new(AtomicU64::new(1));
1249 let mut state = SharedState::new(next_id);
1250
1251 let hash = [0xBB; 32];
1252 state.add_dedup(hash, 1000.0);
1253
1254 assert!(state.is_duplicate(&hash, 1000.5));
1256 assert!(!state.is_duplicate(&hash, 1001.0));
1258 }
1259
1260 #[test]
1261 fn dedup_max_length() {
1262 let next_id = Arc::new(AtomicU64::new(1));
1263 let mut state = SharedState::new(next_id);
1264
1265 for i in 0..MULTI_IF_DEQUE_LEN + 10 {
1267 let mut hash = [0u8; 32];
1268 hash[0] = (i & 0xFF) as u8;
1269 hash[1] = ((i >> 8) & 0xFF) as u8;
1270 state.add_dedup(hash, 1000.0);
1271 }
1272
1273 assert_eq!(state.dedup_deque.len(), MULTI_IF_DEQUE_LEN);
1274 }
1275
1276 #[test]
1279 fn peer_refresh() {
1280 let next_id = Arc::new(AtomicU64::new(100));
1281 let mut state = SharedState::new(next_id);
1282
1283 state.peers.insert(
1284 "fe80::1".to_string(),
1285 AutoPeer {
1286 interface_id: InterfaceId(100),
1287 link_local_addr: "fe80::1".to_string(),
1288 ifname: "eth0".to_string(),
1289 last_heard: 1000.0,
1290 },
1291 );
1292
1293 state.refresh_peer("fe80::1", 2000.0);
1294 assert_eq!(state.peers["fe80::1"].last_heard, 2000.0);
1295 }
1296
1297 #[test]
1298 fn peer_not_found_refresh() {
1299 let next_id = Arc::new(AtomicU64::new(100));
1300 let mut state = SharedState::new(next_id);
1301 state.refresh_peer("fe80::999", 1000.0);
1303 }
1304
1305 #[test]
1308 fn enumerate_returns_vec() {
1309 let interfaces = enumerate_interfaces(&[], &[]);
1312 for iface in &interfaces {
1314 assert!(!iface.name.is_empty());
1315 assert!(iface.link_local_addr.starts_with("fe80"));
1316 assert!(iface.index > 0);
1317 }
1318 }
1319
1320 #[test]
1321 fn enumerate_with_ignored() {
1322 let interfaces = enumerate_interfaces(
1324 &[],
1325 &[
1326 "lo".to_string(),
1327 "eth0".to_string(),
1328 "wlan0".to_string(),
1329 "enp0s3".to_string(),
1330 "docker0".to_string(),
1331 ],
1332 );
1333 for iface in &interfaces {
1335 assert_ne!(iface.name, "lo");
1336 assert_ne!(iface.name, "eth0");
1337 assert_ne!(iface.name, "wlan0");
1338 }
1339 }
1340
1341 #[test]
1342 fn enumerate_with_allowed_nonexistent() {
1343 let interfaces = enumerate_interfaces(&["nonexistent_if_12345".to_string()], &[]);
1345 assert!(interfaces.is_empty());
1346 }
1347
1348 #[test]
1351 fn config_defaults() {
1352 let config = AutoConfig::default();
1353 assert_eq!(config.group_id, DEFAULT_GROUP_ID);
1354 assert_eq!(config.discovery_scope, SCOPE_LINK);
1355 assert_eq!(config.discovery_port, DEFAULT_DISCOVERY_PORT);
1356 assert_eq!(config.data_port, DEFAULT_DATA_PORT);
1357 assert_eq!(
1358 config.multicast_address_type,
1359 MULTICAST_TEMPORARY_ADDRESS_TYPE
1360 );
1361 assert_eq!(config.configured_bitrate, BITRATE_GUESS);
1362 assert!(config.allowed_interfaces.is_empty());
1363 assert!(config.ignored_interfaces.is_empty());
1364 }
1365
1366 #[test]
1369 fn constants_match_python() {
1370 assert_eq!(DEFAULT_DISCOVERY_PORT, 29716);
1371 assert_eq!(DEFAULT_DATA_PORT, 42671);
1372 assert_eq!(HW_MTU, 1196);
1373 assert_eq!(MULTI_IF_DEQUE_LEN, 48);
1374 assert!((MULTI_IF_DEQUE_TTL - 0.75).abs() < f64::EPSILON);
1375 assert!((PEERING_TIMEOUT - 22.0).abs() < f64::EPSILON);
1376 assert!((ANNOUNCE_INTERVAL - 1.6).abs() < f64::EPSILON);
1377 assert!((PEER_JOB_INTERVAL - 4.0).abs() < f64::EPSILON);
1378 assert!((MCAST_ECHO_TIMEOUT - 6.5).abs() < f64::EPSILON);
1379 assert_eq!(BITRATE_GUESS, 10_000_000);
1380 }
1381
1382 #[test]
1383 fn unicast_discovery_port() {
1384 let unicast_port = DEFAULT_DISCOVERY_PORT + 1;
1386 assert_eq!(unicast_port, 29717);
1387 }
1388
1389 #[test]
1390 fn reverse_peering_interval() {
1391 let interval = ANNOUNCE_INTERVAL * REVERSE_PEERING_MULTIPLIER;
1392 assert!((interval - 5.2).abs() < 0.01);
1393 }
1394}