1use std::collections::{HashMap, VecDeque};
15use std::io;
16use std::net::{Ipv6Addr, SocketAddrV6, UdpSocket};
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18use std::sync::{Arc, Mutex};
19use std::thread;
20use std::time::Duration;
21
22use rns_core::transport::types::InterfaceId;
23
24use crate::event::{Event, EventSender};
25use crate::interface::{lock_or_recover, Writer};
26
27pub const DEFAULT_DISCOVERY_PORT: u16 = 29716;
31
32pub const DEFAULT_DATA_PORT: u16 = 42671;
34
35pub const DEFAULT_GROUP_ID: &[u8] = b"reticulum";
37
38pub const DEFAULT_IFAC_SIZE: usize = 16;
40
41pub const HW_MTU: usize = 1196;
43
44pub const SCOPE_LINK: &str = "2";
46pub const SCOPE_ADMIN: &str = "4";
48pub const SCOPE_SITE: &str = "5";
50pub const SCOPE_ORGANISATION: &str = "8";
52pub const SCOPE_GLOBAL: &str = "e";
54
55pub const MULTICAST_PERMANENT_ADDRESS_TYPE: &str = "0";
57pub const MULTICAST_TEMPORARY_ADDRESS_TYPE: &str = "1";
59
60pub const PEERING_TIMEOUT: f64 = 22.0;
62
63pub const ANNOUNCE_INTERVAL: f64 = 1.6;
65
66pub const PEER_JOB_INTERVAL: f64 = 4.0;
68
69pub const MCAST_ECHO_TIMEOUT: f64 = 6.5;
71
72pub const BITRATE_GUESS: u64 = 10_000_000;
74
75pub const MULTI_IF_DEQUE_LEN: usize = 48;
77
78pub const MULTI_IF_DEQUE_TTL: f64 = 0.75;
80
81pub const REVERSE_PEERING_MULTIPLIER: f64 = 3.25;
83
84pub const ALL_IGNORE_IFS: &[&str] = &["lo0"];
86
87#[cfg_attr(not(any(target_os = "android", test)), allow(dead_code))]
88const ANDROID_IGNORE_IFS: &[&str] = &[
89 "dummy0", "lo", "tun0", "rmnet0", "rmnet1", "rmnet2", "rmnet3", "rmnet4", "rmnet5", "rmnet6",
90 "rmnet7",
91];
92
93#[derive(Debug, Clone)]
97pub struct AutoConfig {
98 pub name: String,
99 pub group_id: Vec<u8>,
100 pub discovery_scope: String,
101 pub discovery_port: u16,
102 pub data_port: u16,
103 pub multicast_address_type: String,
104 pub allowed_interfaces: Vec<String>,
105 pub ignored_interfaces: Vec<String>,
106 pub configured_bitrate: u64,
107 pub interface_id: InterfaceId,
109 pub ingress_control: rns_core::transport::types::IngressControlConfig,
110 pub runtime: Arc<Mutex<AutoRuntime>>,
111}
112
113#[derive(Debug, Clone)]
114pub struct AutoRuntime {
115 pub announce_interval_secs: f64,
116 pub peer_timeout_secs: f64,
117 pub peer_job_interval_secs: f64,
118}
119
120impl AutoRuntime {
121 pub fn from_config(_config: &AutoConfig) -> Self {
122 Self {
123 announce_interval_secs: ANNOUNCE_INTERVAL,
124 peer_timeout_secs: PEERING_TIMEOUT,
125 peer_job_interval_secs: PEER_JOB_INTERVAL,
126 }
127 }
128}
129
130#[derive(Debug, Clone)]
131pub struct AutoRuntimeConfigHandle {
132 pub interface_name: String,
133 pub runtime: Arc<Mutex<AutoRuntime>>,
134 pub startup: AutoRuntime,
135}
136
137impl Default for AutoConfig {
138 fn default() -> Self {
139 let mut config = AutoConfig {
140 name: String::new(),
141 group_id: DEFAULT_GROUP_ID.to_vec(),
142 discovery_scope: SCOPE_LINK.to_string(),
143 discovery_port: DEFAULT_DISCOVERY_PORT,
144 data_port: DEFAULT_DATA_PORT,
145 multicast_address_type: MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string(),
146 allowed_interfaces: Vec::new(),
147 ignored_interfaces: Vec::new(),
148 configured_bitrate: BITRATE_GUESS,
149 interface_id: InterfaceId(0),
150 ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
151 runtime: Arc::new(Mutex::new(AutoRuntime {
152 announce_interval_secs: ANNOUNCE_INTERVAL,
153 peer_timeout_secs: PEERING_TIMEOUT,
154 peer_job_interval_secs: PEER_JOB_INTERVAL,
155 })),
156 };
157 let startup = AutoRuntime::from_config(&config);
158 config.runtime = Arc::new(Mutex::new(startup));
159 config
160 }
161}
162
163pub fn derive_multicast_address(group_id: &[u8], address_type: &str, scope: &str) -> String {
173 let group_hash = rns_crypto::sha256::sha256(group_id);
174 let g = &group_hash;
175
176 let w1 = (g[2] as u16) << 8 | g[3] as u16;
178 let w2 = (g[4] as u16) << 8 | g[5] as u16;
179 let w3 = (g[6] as u16) << 8 | g[7] as u16;
180 let w4 = (g[8] as u16) << 8 | g[9] as u16;
181 let w5 = (g[10] as u16) << 8 | g[11] as u16;
182 let w6 = (g[12] as u16) << 8 | g[13] as u16;
183
184 format!(
185 "ff{}{}:0:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
186 address_type, scope, w1, w2, w3, w4, w5, w6
187 )
188}
189
190pub fn parse_multicast_addr(addr: &str) -> Option<Ipv6Addr> {
192 addr.parse::<Ipv6Addr>().ok()
193}
194
195pub fn compute_discovery_token(group_id: &[u8], link_local_addr: &str) -> [u8; 32] {
199 let mut input = group_id.to_vec();
200 input.extend_from_slice(link_local_addr.as_bytes());
201 rns_crypto::sha256::sha256(&input)
202}
203
204#[derive(Debug, Clone)]
208pub struct LocalInterface {
209 pub name: String,
210 pub link_local_addr: String,
211 pub index: u32,
212}
213
214#[cfg(target_os = "android")]
215fn platform_ignored_interfaces() -> &'static [&'static str] {
216 ANDROID_IGNORE_IFS
217}
218
219#[cfg(not(target_os = "android"))]
220fn platform_ignored_interfaces() -> &'static [&'static str] {
221 &[]
222}
223
224fn should_adopt_interface_name(
225 name: &str,
226 allowed: &[String],
227 ignored: &[String],
228 platform_ignored: &[&str],
229) -> bool {
230 let is_allowed = allowed.iter().any(|a| a == name);
231 let is_system_ignored = ALL_IGNORE_IFS.iter().any(|&ig| ig == name)
232 || platform_ignored.iter().any(|&ig| ig == name);
233
234 if is_system_ignored && !is_allowed {
235 return false;
236 }
237
238 if ignored.iter().any(|ig| ig == name) {
239 return false;
240 }
241
242 if !allowed.is_empty() && !is_allowed {
243 return false;
244 }
245
246 true
247}
248
249pub fn enumerate_interfaces(allowed: &[String], ignored: &[String]) -> Vec<LocalInterface> {
253 let mut result = Vec::new();
254 let platform_ignored = platform_ignored_interfaces();
255
256 unsafe {
257 let mut ifaddrs: *mut libc::ifaddrs = std::ptr::null_mut();
258 if libc::getifaddrs(&mut ifaddrs) != 0 {
259 return result;
260 }
261
262 let mut current = ifaddrs;
263 while !current.is_null() {
264 let ifa = &*current;
265 current = ifa.ifa_next;
266
267 if ifa.ifa_addr.is_null() {
269 continue;
270 }
271
272 if (*ifa.ifa_addr).sa_family as i32 != libc::AF_INET6 {
274 continue;
275 }
276
277 let name = match std::ffi::CStr::from_ptr(ifa.ifa_name).to_str() {
279 Ok(s) => s.to_string(),
280 Err(_) => continue,
281 };
282
283 if !should_adopt_interface_name(&name, allowed, ignored, platform_ignored) {
284 continue;
285 }
286
287 let sa6 = ifa.ifa_addr as *const libc::sockaddr_in6;
289 let addr_bytes = (*sa6).sin6_addr.s6_addr;
290 let ipv6 = Ipv6Addr::from(addr_bytes);
291
292 let octets = ipv6.octets();
294 if octets[0] != 0xfe || (octets[1] & 0xc0) != 0x80 {
295 continue;
296 }
297
298 let addr_str = format!("{}", ipv6);
300
301 let index = libc::if_nametoindex(ifa.ifa_name);
303 if index == 0 {
304 continue;
305 }
306
307 if result.iter().any(|li: &LocalInterface| li.name == name) {
309 continue;
310 }
311
312 result.push(LocalInterface {
313 name,
314 link_local_addr: addr_str,
315 index,
316 });
317 }
318
319 libc::freeifaddrs(ifaddrs);
320 }
321
322 result
323}
324
325struct AutoPeer {
329 interface_id: InterfaceId,
330 #[allow(dead_code)]
331 link_local_addr: String,
332 #[allow(dead_code)]
333 ifname: String,
334 last_heard: f64,
335}
336
337struct UdpWriter {
339 socket: UdpSocket,
340 target: SocketAddrV6,
341}
342
343impl Writer for UdpWriter {
344 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
345 self.socket.send_to(data, self.target)?;
346 Ok(())
347 }
348}
349
350struct SharedState {
352 peers: HashMap<String, AutoPeer>,
354 link_local_addresses: Vec<String>,
356 dedup_deque: VecDeque<([u8; 32], f64)>,
358 online: bool,
360 next_id: Arc<AtomicU64>,
362}
363
364impl SharedState {
365 fn new(next_id: Arc<AtomicU64>) -> Self {
366 SharedState {
367 peers: HashMap::new(),
368 link_local_addresses: Vec::new(),
369 dedup_deque: VecDeque::new(),
370 online: false,
371 next_id,
372 }
373 }
374
375 fn is_duplicate(&self, hash: &[u8; 32], now: f64) -> bool {
377 for (h, ts) in &self.dedup_deque {
378 if h == hash && now < *ts + MULTI_IF_DEQUE_TTL {
379 return true;
380 }
381 }
382 false
383 }
384
385 fn add_dedup(&mut self, hash: [u8; 32], now: f64) {
387 self.dedup_deque.push_back((hash, now));
388 while self.dedup_deque.len() > MULTI_IF_DEQUE_LEN {
389 self.dedup_deque.pop_front();
390 }
391 }
392
393 fn refresh_peer(&mut self, addr: &str, now: f64) {
395 if let Some(peer) = self.peers.get_mut(addr) {
396 peer.last_heard = now;
397 }
398 }
399}
400
401pub fn start(
409 config: AutoConfig,
410 tx: EventSender,
411 next_dynamic_id: Arc<AtomicU64>,
412) -> io::Result<()> {
413 let interfaces = enumerate_interfaces(&config.allowed_interfaces, &config.ignored_interfaces);
414
415 if interfaces.is_empty() {
416 log::warn!(
417 "[{}] No suitable IPv6 link-local interfaces found",
418 config.name,
419 );
420 return Ok(());
421 }
422
423 let group_id = config.group_id.clone();
424 let mcast_addr_str = derive_multicast_address(
425 &group_id,
426 &config.multicast_address_type,
427 &config.discovery_scope,
428 );
429
430 let mcast_ip = match parse_multicast_addr(&mcast_addr_str) {
431 Some(ip) => ip,
432 None => {
433 return Err(io::Error::new(
434 io::ErrorKind::InvalidData,
435 format!("Invalid multicast address: {}", mcast_addr_str),
436 ));
437 }
438 };
439
440 let discovery_port = config.discovery_port;
441 let unicast_discovery_port = config.discovery_port + 1;
442 let data_port = config.data_port;
443 let name = config.name.clone();
444 let configured_bitrate = config.configured_bitrate;
445 let ingress_control = config.ingress_control;
446 {
447 let startup = AutoRuntime::from_config(&config);
448 *lock_or_recover(&config.runtime, "auto runtime") = startup;
449 }
450 let runtime = Arc::clone(&config.runtime);
451
452 let shared = Arc::new(Mutex::new(SharedState::new(next_dynamic_id)));
453 let running = Arc::new(AtomicBool::new(true));
454
455 {
457 let mut state = lock_or_recover(&shared, "auto shared state");
458 for iface in &interfaces {
459 state
460 .link_local_addresses
461 .push(iface.link_local_addr.clone());
462 }
463 }
464
465 log::info!(
466 "[{}] AutoInterface starting with {} local interfaces, multicast {}",
467 name,
468 interfaces.len(),
469 mcast_addr_str,
470 );
471
472 for local_iface in &interfaces {
474 let ifname = local_iface.name.clone();
475 let link_local = local_iface.link_local_addr.clone();
476 let if_index = local_iface.index;
477
478 let mcast_socket = create_multicast_recv_socket(&mcast_ip, discovery_port, if_index)?;
480
481 let unicast_socket =
483 create_unicast_recv_socket(&link_local, unicast_discovery_port, if_index)?;
484
485 {
487 let group_id = group_id.clone();
488 let link_local = link_local.clone();
489 let running = running.clone();
490 let name = name.clone();
491 let runtime = runtime.clone();
492
493 thread::Builder::new()
494 .name(format!("auto-disc-tx-{}", ifname))
495 .spawn(move || {
496 discovery_sender_loop(
497 &group_id,
498 &link_local,
499 &mcast_ip,
500 discovery_port,
501 if_index,
502 runtime,
503 &running,
504 &name,
505 );
506 })?;
507 }
508
509 {
511 let group_id = group_id.clone();
512 let shared = shared.clone();
513 let tx = tx.clone();
514 let running = running.clone();
515 let name = name.clone();
516 let runtime = runtime.clone();
517
518 thread::Builder::new()
519 .name(format!("auto-disc-rx-{}", ifname))
520 .spawn(move || {
521 discovery_receiver_loop(
522 mcast_socket,
523 &group_id,
524 shared,
525 tx,
526 &running,
527 &name,
528 data_port,
529 configured_bitrate,
530 ingress_control,
531 runtime,
532 );
533 })?;
534 }
535
536 {
538 let group_id = group_id.clone();
539 let shared = shared.clone();
540 let tx = tx.clone();
541 let running = running.clone();
542 let name = name.clone();
543 let runtime = runtime.clone();
544 let ingress_control = ingress_control;
545
546 thread::Builder::new()
547 .name(format!("auto-udisc-rx-{}", ifname))
548 .spawn(move || {
549 discovery_receiver_loop(
550 unicast_socket,
551 &group_id,
552 shared,
553 tx,
554 &running,
555 &name,
556 data_port,
557 configured_bitrate,
558 ingress_control,
559 runtime,
560 );
561 })?;
562 }
563
564 {
566 let link_local = local_iface.link_local_addr.clone();
567 let shared = shared.clone();
568 let tx = tx.clone();
569 let running = running.clone();
570 let name = name.clone();
571
572 let data_socket = create_data_recv_socket(&link_local, data_port, if_index)?;
573
574 thread::Builder::new()
575 .name(format!("auto-data-rx-{}", local_iface.name))
576 .spawn(move || {
577 data_receiver_loop(data_socket, shared, tx, &running, &name);
578 })?;
579 }
580 }
581
582 {
584 let shared = shared.clone();
585 let tx = tx.clone();
586 let running = running.clone();
587 let name = name.clone();
588 let runtime = runtime.clone();
589
590 thread::Builder::new()
591 .name(format!("auto-peer-jobs-{}", name))
592 .spawn(move || {
593 peer_jobs_loop(shared, tx, runtime, &running, &name);
594 })?;
595 }
596
597 let announce_interval = lock_or_recover(&runtime, "auto runtime").announce_interval_secs;
599 let peering_wait = Duration::from_secs_f64(announce_interval * 1.2);
600 thread::sleep(peering_wait);
601
602 {
604 let mut state = lock_or_recover(&shared, "auto shared state");
605 state.online = true;
606 }
607
608 log::info!("[{}] AutoInterface online", config.name);
609
610 Ok(())
611}
612
613fn create_multicast_recv_socket(
616 mcast_ip: &Ipv6Addr,
617 port: u16,
618 if_index: u32,
619) -> io::Result<UdpSocket> {
620 let socket = socket2::Socket::new(
621 socket2::Domain::IPV6,
622 socket2::Type::DGRAM,
623 Some(socket2::Protocol::UDP),
624 )?;
625
626 socket.set_reuse_address(true)?;
627 #[cfg(not(target_os = "windows"))]
628 socket.set_reuse_port(true)?;
629
630 let bind_addr = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, port, 0, if_index);
632 socket.bind(&bind_addr.into())?;
633
634 socket.join_multicast_v6(mcast_ip, if_index)?;
636
637 socket.set_nonblocking(false)?;
638 let std_socket: UdpSocket = socket.into();
639 std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
640 Ok(std_socket)
641}
642
643fn create_unicast_recv_socket(link_local: &str, port: u16, if_index: u32) -> io::Result<UdpSocket> {
644 let ip: Ipv6Addr = link_local
645 .parse()
646 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("bad IPv6: {}", e)))?;
647
648 let socket = socket2::Socket::new(
649 socket2::Domain::IPV6,
650 socket2::Type::DGRAM,
651 Some(socket2::Protocol::UDP),
652 )?;
653
654 socket.set_reuse_address(true)?;
655 #[cfg(not(target_os = "windows"))]
656 socket.set_reuse_port(true)?;
657
658 let bind_addr = SocketAddrV6::new(ip, port, 0, if_index);
659 socket.bind(&bind_addr.into())?;
660
661 socket.set_nonblocking(false)?;
662 let std_socket: UdpSocket = socket.into();
663 std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
664 Ok(std_socket)
665}
666
667fn create_data_recv_socket(link_local: &str, port: u16, if_index: u32) -> io::Result<UdpSocket> {
668 let ip: Ipv6Addr = link_local
669 .parse()
670 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("bad IPv6: {}", e)))?;
671
672 let socket = socket2::Socket::new(
673 socket2::Domain::IPV6,
674 socket2::Type::DGRAM,
675 Some(socket2::Protocol::UDP),
676 )?;
677
678 socket.set_reuse_address(true)?;
679 #[cfg(not(target_os = "windows"))]
680 socket.set_reuse_port(true)?;
681
682 let bind_addr = SocketAddrV6::new(ip, port, 0, if_index);
683 socket.bind(&bind_addr.into())?;
684
685 socket.set_nonblocking(false)?;
686 let std_socket: UdpSocket = socket.into();
687 std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
688 Ok(std_socket)
689}
690
691fn discovery_sender_loop(
695 group_id: &[u8],
696 link_local_addr: &str,
697 mcast_ip: &Ipv6Addr,
698 discovery_port: u16,
699 if_index: u32,
700 runtime: Arc<Mutex<AutoRuntime>>,
701 running: &AtomicBool,
702 name: &str,
703) {
704 let token = compute_discovery_token(group_id, link_local_addr);
705
706 while running.load(Ordering::Relaxed) {
707 if let Ok(socket) = UdpSocket::bind("[::]:0") {
709 let if_bytes = if_index.to_ne_bytes();
711 unsafe {
712 libc::setsockopt(
713 socket_fd(&socket),
714 libc::IPPROTO_IPV6,
715 libc::IPV6_MULTICAST_IF,
716 if_bytes.as_ptr() as *const libc::c_void,
717 4,
718 );
719 }
720
721 let target = SocketAddrV6::new(*mcast_ip, discovery_port, 0, 0);
722 if let Err(e) = socket.send_to(&token, target) {
723 log::debug!("[{}] multicast send error: {}", name, e);
724 }
725 }
726
727 let sleep_dur = Duration::from_secs_f64(
728 lock_or_recover(&runtime, "auto runtime")
729 .announce_interval_secs
730 .max(0.1),
731 );
732 thread::sleep(sleep_dur);
733 }
734}
735
736fn discovery_receiver_loop(
738 socket: UdpSocket,
739 group_id: &[u8],
740 shared: Arc<Mutex<SharedState>>,
741 tx: EventSender,
742 running: &AtomicBool,
743 name: &str,
744 data_port: u16,
745 configured_bitrate: u64,
746 ingress_control: rns_core::transport::types::IngressControlConfig,
747 runtime: Arc<Mutex<AutoRuntime>>,
748) {
749 let mut buf = [0u8; 1024];
750
751 while running.load(Ordering::Relaxed) {
752 match socket.recv_from(&mut buf) {
753 Ok((n, src)) => {
754 if n < 32 {
755 continue;
756 }
757
758 let src_addr = match src {
760 std::net::SocketAddr::V6(v6) => v6,
761 _ => continue,
762 };
763 let src_ip = format!("{}", src_addr.ip());
764
765 let peering_hash = &buf[..32];
766 let expected = compute_discovery_token(group_id, &src_ip);
767
768 if peering_hash != expected {
769 log::debug!("[{}] invalid peering hash from {}", name, src_ip);
770 continue;
771 }
772
773 let state = lock_or_recover(&shared, "auto shared state");
775 if !state.online {
776 }
779
780 if state.link_local_addresses.contains(&src_ip) {
782 drop(state);
784 continue;
785 }
786
787 if state.peers.contains_key(&src_ip) {
789 let now = crate::time::now();
790 drop(state);
791 let mut state = lock_or_recover(&shared, "auto shared state");
792 state.refresh_peer(&src_ip, now);
793 continue;
794 }
795 drop(state);
796
797 add_peer(
799 &shared,
800 &tx,
801 &src_ip,
802 data_port,
803 name,
804 configured_bitrate,
805 ingress_control,
806 &runtime,
807 );
808 }
809 Err(ref e)
810 if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut =>
811 {
812 continue;
814 }
815 Err(e) => {
816 log::warn!("[{}] discovery recv error: {}", name, e);
817 if !running.load(Ordering::Relaxed) {
818 return;
819 }
820 thread::sleep(Duration::from_millis(100));
821 }
822 }
823 }
824}
825
826fn add_peer(
828 shared: &Arc<Mutex<SharedState>>,
829 tx: &EventSender,
830 peer_addr: &str,
831 data_port: u16,
832 name: &str,
833 configured_bitrate: u64,
834 ingress_control: rns_core::transport::types::IngressControlConfig,
835 _runtime: &Arc<Mutex<AutoRuntime>>,
836) {
837 let peer_ip: Ipv6Addr = match peer_addr.parse() {
838 Ok(ip) => ip,
839 Err(_) => return,
840 };
841
842 let send_socket = match UdpSocket::bind("[::]:0") {
844 Ok(s) => s,
845 Err(e) => {
846 log::warn!(
847 "[{}] failed to create writer for peer {}: {}",
848 name,
849 peer_addr,
850 e
851 );
852 return;
853 }
854 };
855
856 let target = SocketAddrV6::new(peer_ip, data_port, 0, 0);
857
858 let mut state = lock_or_recover(shared, "auto shared state");
859
860 if state.peers.contains_key(peer_addr) {
862 state.refresh_peer(peer_addr, crate::time::now());
863 return;
864 }
865
866 let peer_id = InterfaceId(state.next_id.fetch_add(1, Ordering::Relaxed));
867
868 let driver_writer: Box<dyn Writer> = Box::new(UdpWriter {
870 socket: send_socket,
871 target,
872 });
873
874 let peer_info = rns_core::transport::types::InterfaceInfo {
875 id: peer_id,
876 name: format!("{}:{}", name, peer_addr),
877 mode: rns_core::constants::MODE_FULL,
878 out_capable: true,
879 in_capable: true,
880 bitrate: Some(configured_bitrate),
881 airtime_profile: None,
882 announce_rate_target: None,
883 announce_rate_grace: 0,
884 announce_rate_penalty: 0.0,
885 announce_cap: rns_core::constants::ANNOUNCE_CAP,
886 is_local_client: false,
887 wants_tunnel: false,
888 tunnel_id: None,
889 mtu: 1400,
890 ia_freq: 0.0,
891 ip_freq: 0.0,
892 op_freq: 0.0,
893 op_samples: 0,
894 started: 0.0,
895 ingress_control,
896 };
897
898 let now = crate::time::now();
899 state.peers.insert(
900 peer_addr.to_string(),
901 AutoPeer {
902 interface_id: peer_id,
903 link_local_addr: peer_addr.to_string(),
904 ifname: String::new(),
905 last_heard: now,
906 },
907 );
908
909 log::info!(
910 "[{}] Peer discovered: {} (id={})",
911 name,
912 peer_addr,
913 peer_id.0
914 );
915
916 let _ = tx.send(Event::InterfaceUp(
918 peer_id,
919 Some(driver_writer),
920 Some(peer_info),
921 ));
922}
923
924fn data_receiver_loop(
926 socket: UdpSocket,
927 shared: Arc<Mutex<SharedState>>,
928 tx: EventSender,
929 running: &AtomicBool,
930 name: &str,
931) {
932 let mut buf = [0u8; HW_MTU + 64]; while running.load(Ordering::Relaxed) {
935 match socket.recv_from(&mut buf) {
936 Ok((n, src)) => {
937 if n == 0 {
938 continue;
939 }
940
941 let src_addr = match src {
942 std::net::SocketAddr::V6(v6) => v6,
943 _ => continue,
944 };
945 let src_ip = format!("{}", src_addr.ip());
946 let data = &buf[..n];
947
948 let now = crate::time::now();
949 let data_hash = rns_crypto::sha256::sha256(data);
950
951 let mut state = lock_or_recover(&shared, "auto shared state");
952
953 if !state.online {
954 continue;
955 }
956
957 if state.is_duplicate(&data_hash, now) {
959 continue;
960 }
961 state.add_dedup(data_hash, now);
962
963 state.refresh_peer(&src_ip, now);
965
966 let iface_id = match state.peers.get(&src_ip) {
968 Some(peer) => peer.interface_id,
969 None => {
970 continue;
972 }
973 };
974
975 drop(state);
976
977 if tx
978 .send(Event::Frame {
979 interface_id: iface_id,
980 data: data.to_vec(),
981 rssi: None,
982 snr: None,
983 })
984 .is_err()
985 {
986 return;
987 }
988 }
989 Err(ref e)
990 if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut =>
991 {
992 continue;
993 }
994 Err(e) => {
995 log::warn!("[{}] data recv error: {}", name, e);
996 if !running.load(Ordering::Relaxed) {
997 return;
998 }
999 thread::sleep(Duration::from_millis(100));
1000 }
1001 }
1002 }
1003}
1004
1005fn peer_jobs_loop(
1007 shared: Arc<Mutex<SharedState>>,
1008 tx: EventSender,
1009 runtime: Arc<Mutex<AutoRuntime>>,
1010 running: &AtomicBool,
1011 name: &str,
1012) {
1013 while running.load(Ordering::Relaxed) {
1014 let interval = Duration::from_secs_f64(
1015 lock_or_recover(&runtime, "auto runtime")
1016 .peer_job_interval_secs
1017 .max(0.1),
1018 );
1019 thread::sleep(interval);
1020
1021 let now = crate::time::now();
1022 let mut timed_out = Vec::new();
1023 let peer_timeout_secs = lock_or_recover(&runtime, "auto runtime").peer_timeout_secs;
1024
1025 {
1026 let state = lock_or_recover(&shared, "auto shared state");
1027 for (addr, peer) in &state.peers {
1028 if now > peer.last_heard + peer_timeout_secs {
1029 timed_out.push((addr.clone(), peer.interface_id));
1030 }
1031 }
1032 }
1033
1034 for (addr, iface_id) in &timed_out {
1035 log::info!("[{}] Peer timed out: {}", name, addr);
1036 let mut state = lock_or_recover(&shared, "auto shared state");
1037 state.peers.remove(addr.as_str());
1038 let _ = tx.send(Event::InterfaceDown(*iface_id));
1039 }
1040 }
1041}
1042
1043#[cfg(unix)]
1047fn socket_fd(socket: &UdpSocket) -> i32 {
1048 use std::os::unix::io::AsRawFd;
1049 socket.as_raw_fd()
1050}
1051
1052#[cfg(not(unix))]
1053fn socket_fd(_socket: &UdpSocket) -> i32 {
1054 0
1055}
1056
1057use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
1060
1061pub struct AutoFactory;
1063
1064impl InterfaceFactory for AutoFactory {
1065 fn type_name(&self) -> &str {
1066 "AutoInterface"
1067 }
1068
1069 fn parse_config(
1070 &self,
1071 name: &str,
1072 id: InterfaceId,
1073 params: &HashMap<String, String>,
1074 ) -> Result<Box<dyn InterfaceConfigData>, String> {
1075 let group_id = params
1076 .get("group_id")
1077 .map(|v| v.as_bytes().to_vec())
1078 .unwrap_or_else(|| DEFAULT_GROUP_ID.to_vec());
1079
1080 let discovery_scope = params
1081 .get("discovery_scope")
1082 .map(|v| match v.to_lowercase().as_str() {
1083 "link" => SCOPE_LINK.to_string(),
1084 "admin" => SCOPE_ADMIN.to_string(),
1085 "site" => SCOPE_SITE.to_string(),
1086 "organisation" | "organization" => SCOPE_ORGANISATION.to_string(),
1087 "global" => SCOPE_GLOBAL.to_string(),
1088 _ => v.clone(),
1089 })
1090 .unwrap_or_else(|| SCOPE_LINK.to_string());
1091
1092 let discovery_port = params
1093 .get("discovery_port")
1094 .and_then(|v| v.parse().ok())
1095 .unwrap_or(DEFAULT_DISCOVERY_PORT);
1096
1097 let data_port = params
1098 .get("data_port")
1099 .and_then(|v| v.parse().ok())
1100 .unwrap_or(DEFAULT_DATA_PORT);
1101
1102 let multicast_address_type = params
1103 .get("multicast_address_type")
1104 .map(|v| match v.to_lowercase().as_str() {
1105 "permanent" => MULTICAST_PERMANENT_ADDRESS_TYPE.to_string(),
1106 "temporary" => MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string(),
1107 _ => v.clone(),
1108 })
1109 .unwrap_or_else(|| MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string());
1110
1111 let configured_bitrate = params
1112 .get("configured_bitrate")
1113 .or_else(|| params.get("bitrate"))
1114 .and_then(|v| v.parse().ok())
1115 .unwrap_or(BITRATE_GUESS);
1116
1117 let allowed_interfaces = params
1118 .get("devices")
1119 .or_else(|| params.get("allowed_interfaces"))
1120 .map(|v| {
1121 v.split(',')
1122 .map(|s| s.trim().to_string())
1123 .filter(|s| !s.is_empty())
1124 .collect()
1125 })
1126 .unwrap_or_default();
1127
1128 let ignored_interfaces = params
1129 .get("ignored_devices")
1130 .or_else(|| params.get("ignored_interfaces"))
1131 .map(|v| {
1132 v.split(',')
1133 .map(|s| s.trim().to_string())
1134 .filter(|s| !s.is_empty())
1135 .collect()
1136 })
1137 .unwrap_or_default();
1138
1139 Ok(Box::new(AutoConfig {
1140 name: name.to_string(),
1141 group_id,
1142 discovery_scope,
1143 discovery_port,
1144 data_port,
1145 multicast_address_type,
1146 allowed_interfaces,
1147 ignored_interfaces,
1148 configured_bitrate,
1149 interface_id: id,
1150 ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
1151 runtime: Arc::new(Mutex::new(AutoRuntime {
1152 announce_interval_secs: ANNOUNCE_INTERVAL,
1153 peer_timeout_secs: PEERING_TIMEOUT,
1154 peer_job_interval_secs: PEER_JOB_INTERVAL,
1155 })),
1156 }))
1157 }
1158
1159 fn start(
1160 &self,
1161 config: Box<dyn InterfaceConfigData>,
1162 ctx: StartContext,
1163 ) -> std::io::Result<StartResult> {
1164 let mut auto_config = *config.into_any().downcast::<AutoConfig>().map_err(|_| {
1165 std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
1166 })?;
1167
1168 auto_config.ingress_control = ctx.ingress_control;
1169 start(auto_config, ctx.tx, ctx.next_dynamic_id)?;
1170 Ok(StartResult::Listener { control: None })
1171 }
1172}
1173
1174pub(crate) fn auto_runtime_handle_from_config(config: &AutoConfig) -> AutoRuntimeConfigHandle {
1175 AutoRuntimeConfigHandle {
1176 interface_name: config.name.clone(),
1177 runtime: Arc::clone(&config.runtime),
1178 startup: AutoRuntime::from_config(config),
1179 }
1180}
1181
1182#[cfg(test)]
1185mod tests {
1186 use super::*;
1187
1188 #[test]
1191 fn multicast_address_default_group() {
1192 let addr = derive_multicast_address(
1194 DEFAULT_GROUP_ID,
1195 MULTICAST_TEMPORARY_ADDRESS_TYPE,
1196 SCOPE_LINK,
1197 );
1198 assert_eq!(addr, "ff12:0:d70b:fb1c:16e4:5e39:485e:31e1");
1199 }
1200
1201 #[test]
1202 fn multicast_address_custom_group() {
1203 let addr =
1204 derive_multicast_address(b"testgroup", MULTICAST_TEMPORARY_ADDRESS_TYPE, SCOPE_LINK);
1205 assert!(addr.starts_with("ff12:0:"));
1207 assert_ne!(addr, "ff12:0:d70b:fb1c:16e4:5e39:485e:31e1");
1209 }
1210
1211 #[test]
1212 fn multicast_address_scope_admin() {
1213 let addr = derive_multicast_address(
1214 DEFAULT_GROUP_ID,
1215 MULTICAST_TEMPORARY_ADDRESS_TYPE,
1216 SCOPE_ADMIN,
1217 );
1218 assert!(addr.starts_with("ff14:0:"));
1219 }
1220
1221 #[test]
1222 fn multicast_address_permanent_type() {
1223 let addr = derive_multicast_address(
1224 DEFAULT_GROUP_ID,
1225 MULTICAST_PERMANENT_ADDRESS_TYPE,
1226 SCOPE_LINK,
1227 );
1228 assert!(addr.starts_with("ff02:0:"));
1229 }
1230
1231 #[test]
1232 fn multicast_address_parseable() {
1233 let addr = derive_multicast_address(
1234 DEFAULT_GROUP_ID,
1235 MULTICAST_TEMPORARY_ADDRESS_TYPE,
1236 SCOPE_LINK,
1237 );
1238 let ip = parse_multicast_addr(&addr);
1239 assert!(ip.is_some());
1240 assert!(ip.unwrap().is_multicast());
1241 }
1242
1243 #[test]
1246 fn discovery_token_interop() {
1247 let token = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::1");
1249 let expected = "97b25576749ea936b0d8a8536ffaf442d157cf47d460dcf13c48b7bd18b6c163";
1250 let got = token
1251 .iter()
1252 .map(|b| format!("{:02x}", b))
1253 .collect::<String>();
1254 assert_eq!(got, expected);
1255 }
1256
1257 #[test]
1258 fn discovery_token_interop_2() {
1259 let token = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::dead:beef:1234:5678");
1261 let expected = "46b6ec7595504b6a35f06bd4bfff71567fb82fcf2706cd361bab20409c42d072";
1262 let got = token
1263 .iter()
1264 .map(|b| format!("{:02x}", b))
1265 .collect::<String>();
1266 assert_eq!(got, expected);
1267 }
1268
1269 #[test]
1270 fn discovery_token_different_groups() {
1271 let t1 = compute_discovery_token(b"reticulum", "fe80::1");
1272 let t2 = compute_discovery_token(b"othergroup", "fe80::1");
1273 assert_ne!(t1, t2);
1274 }
1275
1276 #[test]
1277 fn discovery_token_different_addrs() {
1278 let t1 = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::1");
1279 let t2 = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::2");
1280 assert_ne!(t1, t2);
1281 }
1282
1283 #[test]
1286 fn dedup_basic() {
1287 let next_id = Arc::new(AtomicU64::new(1));
1288 let mut state = SharedState::new(next_id);
1289
1290 let hash = [0xAA; 32];
1291 let now = 1000.0;
1292
1293 assert!(!state.is_duplicate(&hash, now));
1294 state.add_dedup(hash, now);
1295 assert!(state.is_duplicate(&hash, now));
1296 }
1297
1298 #[test]
1299 fn dedup_expired() {
1300 let next_id = Arc::new(AtomicU64::new(1));
1301 let mut state = SharedState::new(next_id);
1302
1303 let hash = [0xBB; 32];
1304 state.add_dedup(hash, 1000.0);
1305
1306 assert!(state.is_duplicate(&hash, 1000.5));
1308 assert!(!state.is_duplicate(&hash, 1001.0));
1310 }
1311
1312 #[test]
1313 fn dedup_max_length() {
1314 let next_id = Arc::new(AtomicU64::new(1));
1315 let mut state = SharedState::new(next_id);
1316
1317 for i in 0..MULTI_IF_DEQUE_LEN + 10 {
1319 let mut hash = [0u8; 32];
1320 hash[0] = (i & 0xFF) as u8;
1321 hash[1] = ((i >> 8) & 0xFF) as u8;
1322 state.add_dedup(hash, 1000.0);
1323 }
1324
1325 assert_eq!(state.dedup_deque.len(), MULTI_IF_DEQUE_LEN);
1326 }
1327
1328 #[test]
1331 fn peer_refresh() {
1332 let next_id = Arc::new(AtomicU64::new(100));
1333 let mut state = SharedState::new(next_id);
1334
1335 state.peers.insert(
1336 "fe80::1".to_string(),
1337 AutoPeer {
1338 interface_id: InterfaceId(100),
1339 link_local_addr: "fe80::1".to_string(),
1340 ifname: "eth0".to_string(),
1341 last_heard: 1000.0,
1342 },
1343 );
1344
1345 state.refresh_peer("fe80::1", 2000.0);
1346 assert_eq!(state.peers["fe80::1"].last_heard, 2000.0);
1347 }
1348
1349 #[test]
1350 fn peer_not_found_refresh() {
1351 let next_id = Arc::new(AtomicU64::new(100));
1352 let mut state = SharedState::new(next_id);
1353 state.refresh_peer("fe80::999", 1000.0);
1355 }
1356
1357 #[test]
1360 fn enumerate_returns_vec() {
1361 let interfaces = enumerate_interfaces(&[], &[]);
1364 for iface in &interfaces {
1366 assert!(!iface.name.is_empty());
1367 assert!(iface.link_local_addr.starts_with("fe80"));
1368 assert!(iface.index > 0);
1369 }
1370 }
1371
1372 #[test]
1373 fn enumerate_with_ignored() {
1374 let interfaces = enumerate_interfaces(
1376 &[],
1377 &[
1378 "lo".to_string(),
1379 "eth0".to_string(),
1380 "wlan0".to_string(),
1381 "enp0s3".to_string(),
1382 "docker0".to_string(),
1383 ],
1384 );
1385 for iface in &interfaces {
1387 assert_ne!(iface.name, "lo");
1388 assert_ne!(iface.name, "eth0");
1389 assert_ne!(iface.name, "wlan0");
1390 }
1391 }
1392
1393 #[test]
1394 fn enumerate_with_allowed_nonexistent() {
1395 let interfaces = enumerate_interfaces(&["nonexistent_if_12345".to_string()], &[]);
1397 assert!(interfaces.is_empty());
1398 }
1399
1400 #[test]
1401 fn filter_skips_android_system_interfaces() {
1402 let allowed = Vec::new();
1403 let ignored = Vec::new();
1404
1405 for name in ["dummy0", "lo", "tun0", "rmnet0", "rmnet7"] {
1406 assert!(
1407 !should_adopt_interface_name(name, &allowed, &ignored, ANDROID_IGNORE_IFS),
1408 "{name} should be skipped by Android AutoInterface defaults"
1409 );
1410 }
1411 }
1412
1413 #[test]
1414 fn filter_does_not_skip_rmnet8_by_android_defaults() {
1415 assert!(should_adopt_interface_name(
1416 "rmnet8",
1417 &[],
1418 &[],
1419 ANDROID_IGNORE_IFS
1420 ));
1421 }
1422
1423 #[test]
1424 fn filter_allowed_overrides_system_ignored_interface() {
1425 assert!(should_adopt_interface_name(
1426 "rmnet0",
1427 &["rmnet0".to_string()],
1428 &[],
1429 ANDROID_IGNORE_IFS
1430 ));
1431
1432 assert!(should_adopt_interface_name(
1433 "lo0",
1434 &["lo0".to_string()],
1435 &[],
1436 &[]
1437 ));
1438 }
1439
1440 #[test]
1441 fn filter_ignored_wins_over_allowed_interface() {
1442 assert!(!should_adopt_interface_name(
1443 "rmnet0",
1444 &["rmnet0".to_string()],
1445 &["rmnet0".to_string()],
1446 ANDROID_IGNORE_IFS
1447 ));
1448 }
1449
1450 #[test]
1451 fn filter_allowed_list_excludes_unlisted_interfaces() {
1452 assert!(!should_adopt_interface_name(
1453 "wlan0",
1454 &["eth0".to_string()],
1455 &[],
1456 ANDROID_IGNORE_IFS
1457 ));
1458
1459 assert!(should_adopt_interface_name(
1460 "wlan0",
1461 &["wlan0".to_string()],
1462 &[],
1463 ANDROID_IGNORE_IFS
1464 ));
1465 }
1466
1467 #[test]
1470 fn config_defaults() {
1471 let config = AutoConfig::default();
1472 assert_eq!(config.group_id, DEFAULT_GROUP_ID);
1473 assert_eq!(config.discovery_scope, SCOPE_LINK);
1474 assert_eq!(config.discovery_port, DEFAULT_DISCOVERY_PORT);
1475 assert_eq!(config.data_port, DEFAULT_DATA_PORT);
1476 assert_eq!(
1477 config.multicast_address_type,
1478 MULTICAST_TEMPORARY_ADDRESS_TYPE
1479 );
1480 assert_eq!(config.configured_bitrate, BITRATE_GUESS);
1481 assert!(config.allowed_interfaces.is_empty());
1482 assert!(config.ignored_interfaces.is_empty());
1483 }
1484
1485 #[test]
1488 fn constants_match_python() {
1489 assert_eq!(DEFAULT_DISCOVERY_PORT, 29716);
1490 assert_eq!(DEFAULT_DATA_PORT, 42671);
1491 assert_eq!(HW_MTU, 1196);
1492 assert_eq!(MULTI_IF_DEQUE_LEN, 48);
1493 assert!((MULTI_IF_DEQUE_TTL - 0.75).abs() < f64::EPSILON);
1494 assert!((PEERING_TIMEOUT - 22.0).abs() < f64::EPSILON);
1495 assert!((ANNOUNCE_INTERVAL - 1.6).abs() < f64::EPSILON);
1496 assert!((PEER_JOB_INTERVAL - 4.0).abs() < f64::EPSILON);
1497 assert!((MCAST_ECHO_TIMEOUT - 6.5).abs() < f64::EPSILON);
1498 assert_eq!(BITRATE_GUESS, 10_000_000);
1499 }
1500
1501 #[test]
1502 fn unicast_discovery_port() {
1503 let unicast_port = DEFAULT_DISCOVERY_PORT + 1;
1505 assert_eq!(unicast_port, 29717);
1506 }
1507
1508 #[test]
1509 fn reverse_peering_interval() {
1510 let interval = ANNOUNCE_INTERVAL * REVERSE_PEERING_MULTIPLIER;
1511 assert!((interval - 5.2).abs() < 0.01);
1512 }
1513}