1use std::collections::{HashMap, VecDeque};
15use std::io;
16use std::net::{Ipv6Addr, SocketAddrV6, UdpSocket};
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18use std::sync::{Arc, Mutex};
19use std::thread;
20use std::time::Duration;
21
22use rns_core::transport::types::InterfaceId;
23
24use crate::event::{Event, EventSender};
25use crate::interface::Writer;
26
27pub const DEFAULT_DISCOVERY_PORT: u16 = 29716;
31
32pub const DEFAULT_DATA_PORT: u16 = 42671;
34
35pub const DEFAULT_GROUP_ID: &[u8] = b"reticulum";
37
38pub const DEFAULT_IFAC_SIZE: usize = 16;
40
41pub const HW_MTU: usize = 1196;
43
44pub const SCOPE_LINK: &str = "2";
46pub const SCOPE_ADMIN: &str = "4";
48pub const SCOPE_SITE: &str = "5";
50pub const SCOPE_ORGANISATION: &str = "8";
52pub const SCOPE_GLOBAL: &str = "e";
54
55pub const MULTICAST_PERMANENT_ADDRESS_TYPE: &str = "0";
57pub const MULTICAST_TEMPORARY_ADDRESS_TYPE: &str = "1";
59
60pub const PEERING_TIMEOUT: f64 = 22.0;
62
63pub const ANNOUNCE_INTERVAL: f64 = 1.6;
65
66pub const PEER_JOB_INTERVAL: f64 = 4.0;
68
69pub const MCAST_ECHO_TIMEOUT: f64 = 6.5;
71
72pub const BITRATE_GUESS: u64 = 10_000_000;
74
75pub const MULTI_IF_DEQUE_LEN: usize = 48;
77
78pub const MULTI_IF_DEQUE_TTL: f64 = 0.75;
80
81pub const REVERSE_PEERING_MULTIPLIER: f64 = 3.25;
83
84pub const ALL_IGNORE_IFS: &[&str] = &["lo0"];
86
87#[derive(Debug, Clone)]
91pub struct AutoConfig {
92 pub name: String,
93 pub group_id: Vec<u8>,
94 pub discovery_scope: String,
95 pub discovery_port: u16,
96 pub data_port: u16,
97 pub multicast_address_type: String,
98 pub allowed_interfaces: Vec<String>,
99 pub ignored_interfaces: Vec<String>,
100 pub configured_bitrate: u64,
101 pub interface_id: InterfaceId,
103 pub ingress_control: rns_core::transport::types::IngressControlConfig,
104 pub runtime: Arc<Mutex<AutoRuntime>>,
105}
106
107#[derive(Debug, Clone)]
108pub struct AutoRuntime {
109 pub announce_interval_secs: f64,
110 pub peer_timeout_secs: f64,
111 pub peer_job_interval_secs: f64,
112}
113
114impl AutoRuntime {
115 pub fn from_config(_config: &AutoConfig) -> Self {
116 Self {
117 announce_interval_secs: ANNOUNCE_INTERVAL,
118 peer_timeout_secs: PEERING_TIMEOUT,
119 peer_job_interval_secs: PEER_JOB_INTERVAL,
120 }
121 }
122}
123
124#[derive(Debug, Clone)]
125pub struct AutoRuntimeConfigHandle {
126 pub interface_name: String,
127 pub runtime: Arc<Mutex<AutoRuntime>>,
128 pub startup: AutoRuntime,
129}
130
131impl Default for AutoConfig {
132 fn default() -> Self {
133 let mut config = AutoConfig {
134 name: String::new(),
135 group_id: DEFAULT_GROUP_ID.to_vec(),
136 discovery_scope: SCOPE_LINK.to_string(),
137 discovery_port: DEFAULT_DISCOVERY_PORT,
138 data_port: DEFAULT_DATA_PORT,
139 multicast_address_type: MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string(),
140 allowed_interfaces: Vec::new(),
141 ignored_interfaces: Vec::new(),
142 configured_bitrate: BITRATE_GUESS,
143 interface_id: InterfaceId(0),
144 ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
145 runtime: Arc::new(Mutex::new(AutoRuntime {
146 announce_interval_secs: ANNOUNCE_INTERVAL,
147 peer_timeout_secs: PEERING_TIMEOUT,
148 peer_job_interval_secs: PEER_JOB_INTERVAL,
149 })),
150 };
151 let startup = AutoRuntime::from_config(&config);
152 config.runtime = Arc::new(Mutex::new(startup));
153 config
154 }
155}
156
157pub fn derive_multicast_address(group_id: &[u8], address_type: &str, scope: &str) -> String {
167 let group_hash = rns_crypto::sha256::sha256(group_id);
168 let g = &group_hash;
169
170 let w1 = (g[2] as u16) << 8 | g[3] as u16;
172 let w2 = (g[4] as u16) << 8 | g[5] as u16;
173 let w3 = (g[6] as u16) << 8 | g[7] as u16;
174 let w4 = (g[8] as u16) << 8 | g[9] as u16;
175 let w5 = (g[10] as u16) << 8 | g[11] as u16;
176 let w6 = (g[12] as u16) << 8 | g[13] as u16;
177
178 format!(
179 "ff{}{}:0:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
180 address_type, scope, w1, w2, w3, w4, w5, w6
181 )
182}
183
184pub fn parse_multicast_addr(addr: &str) -> Option<Ipv6Addr> {
186 addr.parse::<Ipv6Addr>().ok()
187}
188
189pub fn compute_discovery_token(group_id: &[u8], link_local_addr: &str) -> [u8; 32] {
193 let mut input = group_id.to_vec();
194 input.extend_from_slice(link_local_addr.as_bytes());
195 rns_crypto::sha256::sha256(&input)
196}
197
198#[derive(Debug, Clone)]
202pub struct LocalInterface {
203 pub name: String,
204 pub link_local_addr: String,
205 pub index: u32,
206}
207
208pub fn enumerate_interfaces(allowed: &[String], ignored: &[String]) -> Vec<LocalInterface> {
212 let mut result = Vec::new();
213
214 unsafe {
215 let mut ifaddrs: *mut libc::ifaddrs = std::ptr::null_mut();
216 if libc::getifaddrs(&mut ifaddrs) != 0 {
217 return result;
218 }
219
220 let mut current = ifaddrs;
221 while !current.is_null() {
222 let ifa = &*current;
223 current = ifa.ifa_next;
224
225 if ifa.ifa_addr.is_null() {
227 continue;
228 }
229
230 if (*ifa.ifa_addr).sa_family as i32 != libc::AF_INET6 {
232 continue;
233 }
234
235 let name = match std::ffi::CStr::from_ptr(ifa.ifa_name).to_str() {
237 Ok(s) => s.to_string(),
238 Err(_) => continue,
239 };
240
241 if ALL_IGNORE_IFS.iter().any(|&ig| ig == name) {
243 if !allowed.iter().any(|a| a == &name) {
244 continue;
245 }
246 }
247
248 if ignored.iter().any(|ig| ig == &name) {
250 continue;
251 }
252
253 if !allowed.is_empty() && !allowed.iter().any(|a| a == &name) {
255 continue;
256 }
257
258 let sa6 = ifa.ifa_addr as *const libc::sockaddr_in6;
260 let addr_bytes = (*sa6).sin6_addr.s6_addr;
261 let ipv6 = Ipv6Addr::from(addr_bytes);
262
263 let octets = ipv6.octets();
265 if octets[0] != 0xfe || (octets[1] & 0xc0) != 0x80 {
266 continue;
267 }
268
269 let addr_str = format!("{}", ipv6);
271
272 let index = libc::if_nametoindex(ifa.ifa_name);
274 if index == 0 {
275 continue;
276 }
277
278 if result.iter().any(|li: &LocalInterface| li.name == name) {
280 continue;
281 }
282
283 result.push(LocalInterface {
284 name,
285 link_local_addr: addr_str,
286 index,
287 });
288 }
289
290 libc::freeifaddrs(ifaddrs);
291 }
292
293 result
294}
295
296struct AutoPeer {
300 interface_id: InterfaceId,
301 #[allow(dead_code)]
302 link_local_addr: String,
303 #[allow(dead_code)]
304 ifname: String,
305 last_heard: f64,
306}
307
308struct UdpWriter {
310 socket: UdpSocket,
311 target: SocketAddrV6,
312}
313
314impl Writer for UdpWriter {
315 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
316 self.socket.send_to(data, self.target)?;
317 Ok(())
318 }
319}
320
321struct SharedState {
323 peers: HashMap<String, AutoPeer>,
325 link_local_addresses: Vec<String>,
327 dedup_deque: VecDeque<([u8; 32], f64)>,
329 online: bool,
331 next_id: Arc<AtomicU64>,
333}
334
335impl SharedState {
336 fn new(next_id: Arc<AtomicU64>) -> Self {
337 SharedState {
338 peers: HashMap::new(),
339 link_local_addresses: Vec::new(),
340 dedup_deque: VecDeque::new(),
341 online: false,
342 next_id,
343 }
344 }
345
346 fn is_duplicate(&self, hash: &[u8; 32], now: f64) -> bool {
348 for (h, ts) in &self.dedup_deque {
349 if h == hash && now < *ts + MULTI_IF_DEQUE_TTL {
350 return true;
351 }
352 }
353 false
354 }
355
356 fn add_dedup(&mut self, hash: [u8; 32], now: f64) {
358 self.dedup_deque.push_back((hash, now));
359 while self.dedup_deque.len() > MULTI_IF_DEQUE_LEN {
360 self.dedup_deque.pop_front();
361 }
362 }
363
364 fn refresh_peer(&mut self, addr: &str, now: f64) {
366 if let Some(peer) = self.peers.get_mut(addr) {
367 peer.last_heard = now;
368 }
369 }
370}
371
372pub fn start(
380 config: AutoConfig,
381 tx: EventSender,
382 next_dynamic_id: Arc<AtomicU64>,
383) -> io::Result<()> {
384 let interfaces = enumerate_interfaces(&config.allowed_interfaces, &config.ignored_interfaces);
385
386 if interfaces.is_empty() {
387 log::warn!(
388 "[{}] No suitable IPv6 link-local interfaces found",
389 config.name,
390 );
391 return Ok(());
392 }
393
394 let group_id = config.group_id.clone();
395 let mcast_addr_str = derive_multicast_address(
396 &group_id,
397 &config.multicast_address_type,
398 &config.discovery_scope,
399 );
400
401 let mcast_ip = match parse_multicast_addr(&mcast_addr_str) {
402 Some(ip) => ip,
403 None => {
404 return Err(io::Error::new(
405 io::ErrorKind::InvalidData,
406 format!("Invalid multicast address: {}", mcast_addr_str),
407 ));
408 }
409 };
410
411 let discovery_port = config.discovery_port;
412 let unicast_discovery_port = config.discovery_port + 1;
413 let data_port = config.data_port;
414 let name = config.name.clone();
415 let configured_bitrate = config.configured_bitrate;
416 let ingress_control = config.ingress_control;
417 {
418 let startup = AutoRuntime::from_config(&config);
419 *config.runtime.lock().unwrap() = startup;
420 }
421 let runtime = Arc::clone(&config.runtime);
422
423 let shared = Arc::new(Mutex::new(SharedState::new(next_dynamic_id)));
424 let running = Arc::new(AtomicBool::new(true));
425
426 {
428 let mut state = shared.lock().unwrap();
429 for iface in &interfaces {
430 state
431 .link_local_addresses
432 .push(iface.link_local_addr.clone());
433 }
434 }
435
436 log::info!(
437 "[{}] AutoInterface starting with {} local interfaces, multicast {}",
438 name,
439 interfaces.len(),
440 mcast_addr_str,
441 );
442
443 for local_iface in &interfaces {
445 let ifname = local_iface.name.clone();
446 let link_local = local_iface.link_local_addr.clone();
447 let if_index = local_iface.index;
448
449 let mcast_socket = create_multicast_recv_socket(&mcast_ip, discovery_port, if_index)?;
451
452 let unicast_socket =
454 create_unicast_recv_socket(&link_local, unicast_discovery_port, if_index)?;
455
456 {
458 let group_id = group_id.clone();
459 let link_local = link_local.clone();
460 let running = running.clone();
461 let name = name.clone();
462 let runtime = runtime.clone();
463
464 thread::Builder::new()
465 .name(format!("auto-disc-tx-{}", ifname))
466 .spawn(move || {
467 discovery_sender_loop(
468 &group_id,
469 &link_local,
470 &mcast_ip,
471 discovery_port,
472 if_index,
473 runtime,
474 &running,
475 &name,
476 );
477 })?;
478 }
479
480 {
482 let group_id = group_id.clone();
483 let shared = shared.clone();
484 let tx = tx.clone();
485 let running = running.clone();
486 let name = name.clone();
487 let runtime = runtime.clone();
488
489 thread::Builder::new()
490 .name(format!("auto-disc-rx-{}", ifname))
491 .spawn(move || {
492 discovery_receiver_loop(
493 mcast_socket,
494 &group_id,
495 shared,
496 tx,
497 &running,
498 &name,
499 data_port,
500 configured_bitrate,
501 ingress_control,
502 runtime,
503 );
504 })?;
505 }
506
507 {
509 let group_id = group_id.clone();
510 let shared = shared.clone();
511 let tx = tx.clone();
512 let running = running.clone();
513 let name = name.clone();
514 let runtime = runtime.clone();
515 let ingress_control = ingress_control;
516
517 thread::Builder::new()
518 .name(format!("auto-udisc-rx-{}", ifname))
519 .spawn(move || {
520 discovery_receiver_loop(
521 unicast_socket,
522 &group_id,
523 shared,
524 tx,
525 &running,
526 &name,
527 data_port,
528 configured_bitrate,
529 ingress_control,
530 runtime,
531 );
532 })?;
533 }
534
535 {
537 let link_local = local_iface.link_local_addr.clone();
538 let shared = shared.clone();
539 let tx = tx.clone();
540 let running = running.clone();
541 let name = name.clone();
542
543 let data_socket = create_data_recv_socket(&link_local, data_port, if_index)?;
544
545 thread::Builder::new()
546 .name(format!("auto-data-rx-{}", local_iface.name))
547 .spawn(move || {
548 data_receiver_loop(data_socket, shared, tx, &running, &name);
549 })?;
550 }
551 }
552
553 {
555 let shared = shared.clone();
556 let tx = tx.clone();
557 let running = running.clone();
558 let name = name.clone();
559 let runtime = runtime.clone();
560
561 thread::Builder::new()
562 .name(format!("auto-peer-jobs-{}", name))
563 .spawn(move || {
564 peer_jobs_loop(shared, tx, runtime, &running, &name);
565 })?;
566 }
567
568 let announce_interval = runtime.lock().unwrap().announce_interval_secs;
570 let peering_wait = Duration::from_secs_f64(announce_interval * 1.2);
571 thread::sleep(peering_wait);
572
573 {
575 let mut state = shared.lock().unwrap();
576 state.online = true;
577 }
578
579 log::info!("[{}] AutoInterface online", config.name);
580
581 Ok(())
582}
583
584fn create_multicast_recv_socket(
587 mcast_ip: &Ipv6Addr,
588 port: u16,
589 if_index: u32,
590) -> io::Result<UdpSocket> {
591 let socket = socket2::Socket::new(
592 socket2::Domain::IPV6,
593 socket2::Type::DGRAM,
594 Some(socket2::Protocol::UDP),
595 )?;
596
597 socket.set_reuse_address(true)?;
598 #[cfg(not(target_os = "windows"))]
599 socket.set_reuse_port(true)?;
600
601 let bind_addr = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, port, 0, if_index);
603 socket.bind(&bind_addr.into())?;
604
605 socket.join_multicast_v6(mcast_ip, if_index)?;
607
608 socket.set_nonblocking(false)?;
609 let std_socket: UdpSocket = socket.into();
610 std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
611 Ok(std_socket)
612}
613
614fn create_unicast_recv_socket(link_local: &str, port: u16, if_index: u32) -> io::Result<UdpSocket> {
615 let ip: Ipv6Addr = link_local
616 .parse()
617 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("bad IPv6: {}", e)))?;
618
619 let socket = socket2::Socket::new(
620 socket2::Domain::IPV6,
621 socket2::Type::DGRAM,
622 Some(socket2::Protocol::UDP),
623 )?;
624
625 socket.set_reuse_address(true)?;
626 #[cfg(not(target_os = "windows"))]
627 socket.set_reuse_port(true)?;
628
629 let bind_addr = SocketAddrV6::new(ip, port, 0, if_index);
630 socket.bind(&bind_addr.into())?;
631
632 socket.set_nonblocking(false)?;
633 let std_socket: UdpSocket = socket.into();
634 std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
635 Ok(std_socket)
636}
637
638fn create_data_recv_socket(link_local: &str, port: u16, if_index: u32) -> io::Result<UdpSocket> {
639 let ip: Ipv6Addr = link_local
640 .parse()
641 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("bad IPv6: {}", e)))?;
642
643 let socket = socket2::Socket::new(
644 socket2::Domain::IPV6,
645 socket2::Type::DGRAM,
646 Some(socket2::Protocol::UDP),
647 )?;
648
649 socket.set_reuse_address(true)?;
650 #[cfg(not(target_os = "windows"))]
651 socket.set_reuse_port(true)?;
652
653 let bind_addr = SocketAddrV6::new(ip, port, 0, if_index);
654 socket.bind(&bind_addr.into())?;
655
656 socket.set_nonblocking(false)?;
657 let std_socket: UdpSocket = socket.into();
658 std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
659 Ok(std_socket)
660}
661
662fn discovery_sender_loop(
666 group_id: &[u8],
667 link_local_addr: &str,
668 mcast_ip: &Ipv6Addr,
669 discovery_port: u16,
670 if_index: u32,
671 runtime: Arc<Mutex<AutoRuntime>>,
672 running: &AtomicBool,
673 name: &str,
674) {
675 let token = compute_discovery_token(group_id, link_local_addr);
676
677 while running.load(Ordering::Relaxed) {
678 if let Ok(socket) = UdpSocket::bind("[::]:0") {
680 let if_bytes = if_index.to_ne_bytes();
682 unsafe {
683 libc::setsockopt(
684 socket_fd(&socket),
685 libc::IPPROTO_IPV6,
686 libc::IPV6_MULTICAST_IF,
687 if_bytes.as_ptr() as *const libc::c_void,
688 4,
689 );
690 }
691
692 let target = SocketAddrV6::new(*mcast_ip, discovery_port, 0, 0);
693 if let Err(e) = socket.send_to(&token, target) {
694 log::debug!("[{}] multicast send error: {}", name, e);
695 }
696 }
697
698 let sleep_dur =
699 Duration::from_secs_f64(runtime.lock().unwrap().announce_interval_secs.max(0.1));
700 thread::sleep(sleep_dur);
701 }
702}
703
704fn discovery_receiver_loop(
706 socket: UdpSocket,
707 group_id: &[u8],
708 shared: Arc<Mutex<SharedState>>,
709 tx: EventSender,
710 running: &AtomicBool,
711 name: &str,
712 data_port: u16,
713 configured_bitrate: u64,
714 ingress_control: rns_core::transport::types::IngressControlConfig,
715 runtime: Arc<Mutex<AutoRuntime>>,
716) {
717 let mut buf = [0u8; 1024];
718
719 while running.load(Ordering::Relaxed) {
720 match socket.recv_from(&mut buf) {
721 Ok((n, src)) => {
722 if n < 32 {
723 continue;
724 }
725
726 let src_addr = match src {
728 std::net::SocketAddr::V6(v6) => v6,
729 _ => continue,
730 };
731 let src_ip = format!("{}", src_addr.ip());
732
733 let peering_hash = &buf[..32];
734 let expected = compute_discovery_token(group_id, &src_ip);
735
736 if peering_hash != expected {
737 log::debug!("[{}] invalid peering hash from {}", name, src_ip);
738 continue;
739 }
740
741 let state = shared.lock().unwrap();
743 if !state.online {
744 }
747
748 if state.link_local_addresses.contains(&src_ip) {
750 drop(state);
752 continue;
753 }
754
755 if state.peers.contains_key(&src_ip) {
757 let now = crate::time::now();
758 drop(state);
759 let mut state = shared.lock().unwrap();
760 state.refresh_peer(&src_ip, now);
761 continue;
762 }
763 drop(state);
764
765 add_peer(
767 &shared,
768 &tx,
769 &src_ip,
770 data_port,
771 name,
772 configured_bitrate,
773 ingress_control,
774 &runtime,
775 );
776 }
777 Err(ref e)
778 if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut =>
779 {
780 continue;
782 }
783 Err(e) => {
784 log::warn!("[{}] discovery recv error: {}", name, e);
785 if !running.load(Ordering::Relaxed) {
786 return;
787 }
788 thread::sleep(Duration::from_millis(100));
789 }
790 }
791 }
792}
793
794fn add_peer(
796 shared: &Arc<Mutex<SharedState>>,
797 tx: &EventSender,
798 peer_addr: &str,
799 data_port: u16,
800 name: &str,
801 configured_bitrate: u64,
802 ingress_control: rns_core::transport::types::IngressControlConfig,
803 _runtime: &Arc<Mutex<AutoRuntime>>,
804) {
805 let peer_ip: Ipv6Addr = match peer_addr.parse() {
806 Ok(ip) => ip,
807 Err(_) => return,
808 };
809
810 let send_socket = match UdpSocket::bind("[::]:0") {
812 Ok(s) => s,
813 Err(e) => {
814 log::warn!(
815 "[{}] failed to create writer for peer {}: {}",
816 name,
817 peer_addr,
818 e
819 );
820 return;
821 }
822 };
823
824 let target = SocketAddrV6::new(peer_ip, data_port, 0, 0);
825
826 let mut state = shared.lock().unwrap();
827
828 if state.peers.contains_key(peer_addr) {
830 state.refresh_peer(peer_addr, crate::time::now());
831 return;
832 }
833
834 let peer_id = InterfaceId(state.next_id.fetch_add(1, Ordering::Relaxed));
835
836 let driver_writer: Box<dyn Writer> = Box::new(UdpWriter {
838 socket: send_socket,
839 target,
840 });
841
842 let peer_info = rns_core::transport::types::InterfaceInfo {
843 id: peer_id,
844 name: format!("{}:{}", name, peer_addr),
845 mode: rns_core::constants::MODE_FULL,
846 out_capable: true,
847 in_capable: true,
848 bitrate: Some(configured_bitrate),
849 announce_rate_target: None,
850 announce_rate_grace: 0,
851 announce_rate_penalty: 0.0,
852 announce_cap: rns_core::constants::ANNOUNCE_CAP,
853 is_local_client: false,
854 wants_tunnel: false,
855 tunnel_id: None,
856 mtu: 1400,
857 ia_freq: 0.0,
858 started: 0.0,
859 ingress_control,
860 };
861
862 let now = crate::time::now();
863 state.peers.insert(
864 peer_addr.to_string(),
865 AutoPeer {
866 interface_id: peer_id,
867 link_local_addr: peer_addr.to_string(),
868 ifname: String::new(),
869 last_heard: now,
870 },
871 );
872
873 log::info!(
874 "[{}] Peer discovered: {} (id={})",
875 name,
876 peer_addr,
877 peer_id.0
878 );
879
880 let _ = tx.send(Event::InterfaceUp(
882 peer_id,
883 Some(driver_writer),
884 Some(peer_info),
885 ));
886}
887
888fn data_receiver_loop(
890 socket: UdpSocket,
891 shared: Arc<Mutex<SharedState>>,
892 tx: EventSender,
893 running: &AtomicBool,
894 name: &str,
895) {
896 let mut buf = [0u8; HW_MTU + 64]; while running.load(Ordering::Relaxed) {
899 match socket.recv_from(&mut buf) {
900 Ok((n, src)) => {
901 if n == 0 {
902 continue;
903 }
904
905 let src_addr = match src {
906 std::net::SocketAddr::V6(v6) => v6,
907 _ => continue,
908 };
909 let src_ip = format!("{}", src_addr.ip());
910 let data = &buf[..n];
911
912 let now = crate::time::now();
913 let data_hash = rns_crypto::sha256::sha256(data);
914
915 let mut state = shared.lock().unwrap();
916
917 if !state.online {
918 continue;
919 }
920
921 if state.is_duplicate(&data_hash, now) {
923 continue;
924 }
925 state.add_dedup(data_hash, now);
926
927 state.refresh_peer(&src_ip, now);
929
930 let iface_id = match state.peers.get(&src_ip) {
932 Some(peer) => peer.interface_id,
933 None => {
934 continue;
936 }
937 };
938
939 drop(state);
940
941 if tx
942 .send(Event::Frame {
943 interface_id: iface_id,
944 data: data.to_vec(),
945 })
946 .is_err()
947 {
948 return;
949 }
950 }
951 Err(ref e)
952 if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut =>
953 {
954 continue;
955 }
956 Err(e) => {
957 log::warn!("[{}] data recv error: {}", name, e);
958 if !running.load(Ordering::Relaxed) {
959 return;
960 }
961 thread::sleep(Duration::from_millis(100));
962 }
963 }
964 }
965}
966
967fn peer_jobs_loop(
969 shared: Arc<Mutex<SharedState>>,
970 tx: EventSender,
971 runtime: Arc<Mutex<AutoRuntime>>,
972 running: &AtomicBool,
973 name: &str,
974) {
975 while running.load(Ordering::Relaxed) {
976 let interval =
977 Duration::from_secs_f64(runtime.lock().unwrap().peer_job_interval_secs.max(0.1));
978 thread::sleep(interval);
979
980 let now = crate::time::now();
981 let mut timed_out = Vec::new();
982 let peer_timeout_secs = runtime.lock().unwrap().peer_timeout_secs;
983
984 {
985 let state = shared.lock().unwrap();
986 for (addr, peer) in &state.peers {
987 if now > peer.last_heard + peer_timeout_secs {
988 timed_out.push((addr.clone(), peer.interface_id));
989 }
990 }
991 }
992
993 for (addr, iface_id) in &timed_out {
994 log::info!("[{}] Peer timed out: {}", name, addr);
995 let mut state = shared.lock().unwrap();
996 state.peers.remove(addr.as_str());
997 let _ = tx.send(Event::InterfaceDown(*iface_id));
998 }
999 }
1000}
1001
1002#[cfg(unix)]
1006fn socket_fd(socket: &UdpSocket) -> i32 {
1007 use std::os::unix::io::AsRawFd;
1008 socket.as_raw_fd()
1009}
1010
1011#[cfg(not(unix))]
1012fn socket_fd(_socket: &UdpSocket) -> i32 {
1013 0
1014}
1015
1016use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
1019
1020pub struct AutoFactory;
1022
1023impl InterfaceFactory for AutoFactory {
1024 fn type_name(&self) -> &str {
1025 "AutoInterface"
1026 }
1027
1028 fn parse_config(
1029 &self,
1030 name: &str,
1031 id: InterfaceId,
1032 params: &HashMap<String, String>,
1033 ) -> Result<Box<dyn InterfaceConfigData>, String> {
1034 let group_id = params
1035 .get("group_id")
1036 .map(|v| v.as_bytes().to_vec())
1037 .unwrap_or_else(|| DEFAULT_GROUP_ID.to_vec());
1038
1039 let discovery_scope = params
1040 .get("discovery_scope")
1041 .map(|v| match v.to_lowercase().as_str() {
1042 "link" => SCOPE_LINK.to_string(),
1043 "admin" => SCOPE_ADMIN.to_string(),
1044 "site" => SCOPE_SITE.to_string(),
1045 "organisation" | "organization" => SCOPE_ORGANISATION.to_string(),
1046 "global" => SCOPE_GLOBAL.to_string(),
1047 _ => v.clone(),
1048 })
1049 .unwrap_or_else(|| SCOPE_LINK.to_string());
1050
1051 let discovery_port = params
1052 .get("discovery_port")
1053 .and_then(|v| v.parse().ok())
1054 .unwrap_or(DEFAULT_DISCOVERY_PORT);
1055
1056 let data_port = params
1057 .get("data_port")
1058 .and_then(|v| v.parse().ok())
1059 .unwrap_or(DEFAULT_DATA_PORT);
1060
1061 let multicast_address_type = params
1062 .get("multicast_address_type")
1063 .map(|v| match v.to_lowercase().as_str() {
1064 "permanent" => MULTICAST_PERMANENT_ADDRESS_TYPE.to_string(),
1065 "temporary" => MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string(),
1066 _ => v.clone(),
1067 })
1068 .unwrap_or_else(|| MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string());
1069
1070 let configured_bitrate = params
1071 .get("configured_bitrate")
1072 .or_else(|| params.get("bitrate"))
1073 .and_then(|v| v.parse().ok())
1074 .unwrap_or(BITRATE_GUESS);
1075
1076 let allowed_interfaces = params
1077 .get("devices")
1078 .or_else(|| params.get("allowed_interfaces"))
1079 .map(|v| {
1080 v.split(',')
1081 .map(|s| s.trim().to_string())
1082 .filter(|s| !s.is_empty())
1083 .collect()
1084 })
1085 .unwrap_or_default();
1086
1087 let ignored_interfaces = params
1088 .get("ignored_devices")
1089 .or_else(|| params.get("ignored_interfaces"))
1090 .map(|v| {
1091 v.split(',')
1092 .map(|s| s.trim().to_string())
1093 .filter(|s| !s.is_empty())
1094 .collect()
1095 })
1096 .unwrap_or_default();
1097
1098 Ok(Box::new(AutoConfig {
1099 name: name.to_string(),
1100 group_id,
1101 discovery_scope,
1102 discovery_port,
1103 data_port,
1104 multicast_address_type,
1105 allowed_interfaces,
1106 ignored_interfaces,
1107 configured_bitrate,
1108 interface_id: id,
1109 ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
1110 runtime: Arc::new(Mutex::new(AutoRuntime {
1111 announce_interval_secs: ANNOUNCE_INTERVAL,
1112 peer_timeout_secs: PEERING_TIMEOUT,
1113 peer_job_interval_secs: PEER_JOB_INTERVAL,
1114 })),
1115 }))
1116 }
1117
1118 fn start(
1119 &self,
1120 config: Box<dyn InterfaceConfigData>,
1121 ctx: StartContext,
1122 ) -> std::io::Result<StartResult> {
1123 let mut auto_config = *config.into_any().downcast::<AutoConfig>().map_err(|_| {
1124 std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
1125 })?;
1126
1127 auto_config.ingress_control = ctx.ingress_control;
1128 start(auto_config, ctx.tx, ctx.next_dynamic_id)?;
1129 Ok(StartResult::Listener { control: None })
1130 }
1131}
1132
1133pub(crate) fn auto_runtime_handle_from_config(config: &AutoConfig) -> AutoRuntimeConfigHandle {
1134 AutoRuntimeConfigHandle {
1135 interface_name: config.name.clone(),
1136 runtime: Arc::clone(&config.runtime),
1137 startup: AutoRuntime::from_config(config),
1138 }
1139}
1140
1141#[cfg(test)]
1144mod tests {
1145 use super::*;
1146
1147 #[test]
1150 fn multicast_address_default_group() {
1151 let addr = derive_multicast_address(
1153 DEFAULT_GROUP_ID,
1154 MULTICAST_TEMPORARY_ADDRESS_TYPE,
1155 SCOPE_LINK,
1156 );
1157 assert_eq!(addr, "ff12:0:d70b:fb1c:16e4:5e39:485e:31e1");
1158 }
1159
1160 #[test]
1161 fn multicast_address_custom_group() {
1162 let addr =
1163 derive_multicast_address(b"testgroup", MULTICAST_TEMPORARY_ADDRESS_TYPE, SCOPE_LINK);
1164 assert!(addr.starts_with("ff12:0:"));
1166 assert_ne!(addr, "ff12:0:d70b:fb1c:16e4:5e39:485e:31e1");
1168 }
1169
1170 #[test]
1171 fn multicast_address_scope_admin() {
1172 let addr = derive_multicast_address(
1173 DEFAULT_GROUP_ID,
1174 MULTICAST_TEMPORARY_ADDRESS_TYPE,
1175 SCOPE_ADMIN,
1176 );
1177 assert!(addr.starts_with("ff14:0:"));
1178 }
1179
1180 #[test]
1181 fn multicast_address_permanent_type() {
1182 let addr = derive_multicast_address(
1183 DEFAULT_GROUP_ID,
1184 MULTICAST_PERMANENT_ADDRESS_TYPE,
1185 SCOPE_LINK,
1186 );
1187 assert!(addr.starts_with("ff02:0:"));
1188 }
1189
1190 #[test]
1191 fn multicast_address_parseable() {
1192 let addr = derive_multicast_address(
1193 DEFAULT_GROUP_ID,
1194 MULTICAST_TEMPORARY_ADDRESS_TYPE,
1195 SCOPE_LINK,
1196 );
1197 let ip = parse_multicast_addr(&addr);
1198 assert!(ip.is_some());
1199 assert!(ip.unwrap().is_multicast());
1200 }
1201
1202 #[test]
1205 fn discovery_token_interop() {
1206 let token = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::1");
1208 let expected = "97b25576749ea936b0d8a8536ffaf442d157cf47d460dcf13c48b7bd18b6c163";
1209 let got = token
1210 .iter()
1211 .map(|b| format!("{:02x}", b))
1212 .collect::<String>();
1213 assert_eq!(got, expected);
1214 }
1215
1216 #[test]
1217 fn discovery_token_interop_2() {
1218 let token = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::dead:beef:1234:5678");
1220 let expected = "46b6ec7595504b6a35f06bd4bfff71567fb82fcf2706cd361bab20409c42d072";
1221 let got = token
1222 .iter()
1223 .map(|b| format!("{:02x}", b))
1224 .collect::<String>();
1225 assert_eq!(got, expected);
1226 }
1227
1228 #[test]
1229 fn discovery_token_different_groups() {
1230 let t1 = compute_discovery_token(b"reticulum", "fe80::1");
1231 let t2 = compute_discovery_token(b"othergroup", "fe80::1");
1232 assert_ne!(t1, t2);
1233 }
1234
1235 #[test]
1236 fn discovery_token_different_addrs() {
1237 let t1 = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::1");
1238 let t2 = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::2");
1239 assert_ne!(t1, t2);
1240 }
1241
1242 #[test]
1245 fn dedup_basic() {
1246 let next_id = Arc::new(AtomicU64::new(1));
1247 let mut state = SharedState::new(next_id);
1248
1249 let hash = [0xAA; 32];
1250 let now = 1000.0;
1251
1252 assert!(!state.is_duplicate(&hash, now));
1253 state.add_dedup(hash, now);
1254 assert!(state.is_duplicate(&hash, now));
1255 }
1256
1257 #[test]
1258 fn dedup_expired() {
1259 let next_id = Arc::new(AtomicU64::new(1));
1260 let mut state = SharedState::new(next_id);
1261
1262 let hash = [0xBB; 32];
1263 state.add_dedup(hash, 1000.0);
1264
1265 assert!(state.is_duplicate(&hash, 1000.5));
1267 assert!(!state.is_duplicate(&hash, 1001.0));
1269 }
1270
1271 #[test]
1272 fn dedup_max_length() {
1273 let next_id = Arc::new(AtomicU64::new(1));
1274 let mut state = SharedState::new(next_id);
1275
1276 for i in 0..MULTI_IF_DEQUE_LEN + 10 {
1278 let mut hash = [0u8; 32];
1279 hash[0] = (i & 0xFF) as u8;
1280 hash[1] = ((i >> 8) & 0xFF) as u8;
1281 state.add_dedup(hash, 1000.0);
1282 }
1283
1284 assert_eq!(state.dedup_deque.len(), MULTI_IF_DEQUE_LEN);
1285 }
1286
1287 #[test]
1290 fn peer_refresh() {
1291 let next_id = Arc::new(AtomicU64::new(100));
1292 let mut state = SharedState::new(next_id);
1293
1294 state.peers.insert(
1295 "fe80::1".to_string(),
1296 AutoPeer {
1297 interface_id: InterfaceId(100),
1298 link_local_addr: "fe80::1".to_string(),
1299 ifname: "eth0".to_string(),
1300 last_heard: 1000.0,
1301 },
1302 );
1303
1304 state.refresh_peer("fe80::1", 2000.0);
1305 assert_eq!(state.peers["fe80::1"].last_heard, 2000.0);
1306 }
1307
1308 #[test]
1309 fn peer_not_found_refresh() {
1310 let next_id = Arc::new(AtomicU64::new(100));
1311 let mut state = SharedState::new(next_id);
1312 state.refresh_peer("fe80::999", 1000.0);
1314 }
1315
1316 #[test]
1319 fn enumerate_returns_vec() {
1320 let interfaces = enumerate_interfaces(&[], &[]);
1323 for iface in &interfaces {
1325 assert!(!iface.name.is_empty());
1326 assert!(iface.link_local_addr.starts_with("fe80"));
1327 assert!(iface.index > 0);
1328 }
1329 }
1330
1331 #[test]
1332 fn enumerate_with_ignored() {
1333 let interfaces = enumerate_interfaces(
1335 &[],
1336 &[
1337 "lo".to_string(),
1338 "eth0".to_string(),
1339 "wlan0".to_string(),
1340 "enp0s3".to_string(),
1341 "docker0".to_string(),
1342 ],
1343 );
1344 for iface in &interfaces {
1346 assert_ne!(iface.name, "lo");
1347 assert_ne!(iface.name, "eth0");
1348 assert_ne!(iface.name, "wlan0");
1349 }
1350 }
1351
1352 #[test]
1353 fn enumerate_with_allowed_nonexistent() {
1354 let interfaces = enumerate_interfaces(&["nonexistent_if_12345".to_string()], &[]);
1356 assert!(interfaces.is_empty());
1357 }
1358
1359 #[test]
1362 fn config_defaults() {
1363 let config = AutoConfig::default();
1364 assert_eq!(config.group_id, DEFAULT_GROUP_ID);
1365 assert_eq!(config.discovery_scope, SCOPE_LINK);
1366 assert_eq!(config.discovery_port, DEFAULT_DISCOVERY_PORT);
1367 assert_eq!(config.data_port, DEFAULT_DATA_PORT);
1368 assert_eq!(
1369 config.multicast_address_type,
1370 MULTICAST_TEMPORARY_ADDRESS_TYPE
1371 );
1372 assert_eq!(config.configured_bitrate, BITRATE_GUESS);
1373 assert!(config.allowed_interfaces.is_empty());
1374 assert!(config.ignored_interfaces.is_empty());
1375 }
1376
1377 #[test]
1380 fn constants_match_python() {
1381 assert_eq!(DEFAULT_DISCOVERY_PORT, 29716);
1382 assert_eq!(DEFAULT_DATA_PORT, 42671);
1383 assert_eq!(HW_MTU, 1196);
1384 assert_eq!(MULTI_IF_DEQUE_LEN, 48);
1385 assert!((MULTI_IF_DEQUE_TTL - 0.75).abs() < f64::EPSILON);
1386 assert!((PEERING_TIMEOUT - 22.0).abs() < f64::EPSILON);
1387 assert!((ANNOUNCE_INTERVAL - 1.6).abs() < f64::EPSILON);
1388 assert!((PEER_JOB_INTERVAL - 4.0).abs() < f64::EPSILON);
1389 assert!((MCAST_ECHO_TIMEOUT - 6.5).abs() < f64::EPSILON);
1390 assert_eq!(BITRATE_GUESS, 10_000_000);
1391 }
1392
1393 #[test]
1394 fn unicast_discovery_port() {
1395 let unicast_port = DEFAULT_DISCOVERY_PORT + 1;
1397 assert_eq!(unicast_port, 29717);
1398 }
1399
1400 #[test]
1401 fn reverse_peering_interval() {
1402 let interval = ANNOUNCE_INTERVAL * REVERSE_PEERING_MULTIPLIER;
1403 assert!((interval - 5.2).abs() < 0.01);
1404 }
1405}