1use std::collections::{HashMap, VecDeque};
15use std::io;
16use std::net::{Ipv6Addr, SocketAddrV6, UdpSocket};
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18use std::sync::{Arc, Mutex};
19use std::thread;
20use std::time::Duration;
21
22use rns_core::transport::types::InterfaceId;
23
24use crate::event::{Event, EventSender};
25use crate::interface::{lock_or_recover, Writer};
26
27pub const DEFAULT_DISCOVERY_PORT: u16 = 29716;
31
32pub const DEFAULT_DATA_PORT: u16 = 42671;
34
35pub const DEFAULT_GROUP_ID: &[u8] = b"reticulum";
37
38pub const DEFAULT_IFAC_SIZE: usize = 16;
40
41pub const HW_MTU: usize = 1196;
43
44pub const SCOPE_LINK: &str = "2";
46pub const SCOPE_ADMIN: &str = "4";
48pub const SCOPE_SITE: &str = "5";
50pub const SCOPE_ORGANISATION: &str = "8";
52pub const SCOPE_GLOBAL: &str = "e";
54
55pub const MULTICAST_PERMANENT_ADDRESS_TYPE: &str = "0";
57pub const MULTICAST_TEMPORARY_ADDRESS_TYPE: &str = "1";
59
60pub const PEERING_TIMEOUT: f64 = 22.0;
62
63pub const ANNOUNCE_INTERVAL: f64 = 1.6;
65
66pub const PEER_JOB_INTERVAL: f64 = 4.0;
68
69pub const MCAST_ECHO_TIMEOUT: f64 = 6.5;
71
72pub const BITRATE_GUESS: u64 = 10_000_000;
74
75pub const MULTI_IF_DEQUE_LEN: usize = 48;
77
78pub const MULTI_IF_DEQUE_TTL: f64 = 0.75;
80
81pub const REVERSE_PEERING_MULTIPLIER: f64 = 3.25;
83
84pub const ALL_IGNORE_IFS: &[&str] = &["lo0"];
86
87#[cfg_attr(not(any(target_os = "android", test)), allow(dead_code))]
88const ANDROID_IGNORE_IFS: &[&str] = &[
89 "dummy0", "lo", "tun0", "rmnet0", "rmnet1", "rmnet2", "rmnet3", "rmnet4", "rmnet5", "rmnet6",
90 "rmnet7",
91];
92
93#[derive(Debug, Clone)]
97pub struct AutoConfig {
98 pub name: String,
99 pub group_id: Vec<u8>,
100 pub discovery_scope: String,
101 pub discovery_port: u16,
102 pub data_port: u16,
103 pub multicast_address_type: String,
104 pub allowed_interfaces: Vec<String>,
105 pub ignored_interfaces: Vec<String>,
106 pub configured_bitrate: u64,
107 pub interface_id: InterfaceId,
109 pub ingress_control: rns_core::transport::types::IngressControlConfig,
110 pub runtime: Arc<Mutex<AutoRuntime>>,
111}
112
113#[derive(Debug, Clone)]
114pub struct AutoRuntime {
115 pub announce_interval_secs: f64,
116 pub peer_timeout_secs: f64,
117 pub peer_job_interval_secs: f64,
118}
119
120impl AutoRuntime {
121 pub fn from_config(_config: &AutoConfig) -> Self {
122 Self {
123 announce_interval_secs: ANNOUNCE_INTERVAL,
124 peer_timeout_secs: PEERING_TIMEOUT,
125 peer_job_interval_secs: PEER_JOB_INTERVAL,
126 }
127 }
128}
129
130#[derive(Debug, Clone)]
131pub struct AutoRuntimeConfigHandle {
132 pub interface_name: String,
133 pub runtime: Arc<Mutex<AutoRuntime>>,
134 pub startup: AutoRuntime,
135}
136
137impl Default for AutoConfig {
138 fn default() -> Self {
139 let mut config = AutoConfig {
140 name: String::new(),
141 group_id: DEFAULT_GROUP_ID.to_vec(),
142 discovery_scope: SCOPE_LINK.to_string(),
143 discovery_port: DEFAULT_DISCOVERY_PORT,
144 data_port: DEFAULT_DATA_PORT,
145 multicast_address_type: MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string(),
146 allowed_interfaces: Vec::new(),
147 ignored_interfaces: Vec::new(),
148 configured_bitrate: BITRATE_GUESS,
149 interface_id: InterfaceId(0),
150 ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
151 runtime: Arc::new(Mutex::new(AutoRuntime {
152 announce_interval_secs: ANNOUNCE_INTERVAL,
153 peer_timeout_secs: PEERING_TIMEOUT,
154 peer_job_interval_secs: PEER_JOB_INTERVAL,
155 })),
156 };
157 let startup = AutoRuntime::from_config(&config);
158 config.runtime = Arc::new(Mutex::new(startup));
159 config
160 }
161}
162
163pub fn derive_multicast_address(group_id: &[u8], address_type: &str, scope: &str) -> String {
173 let group_hash = rns_crypto::sha256::sha256(group_id);
174 let g = &group_hash;
175
176 let w1 = (g[2] as u16) << 8 | g[3] as u16;
178 let w2 = (g[4] as u16) << 8 | g[5] as u16;
179 let w3 = (g[6] as u16) << 8 | g[7] as u16;
180 let w4 = (g[8] as u16) << 8 | g[9] as u16;
181 let w5 = (g[10] as u16) << 8 | g[11] as u16;
182 let w6 = (g[12] as u16) << 8 | g[13] as u16;
183
184 format!(
185 "ff{}{}:0:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
186 address_type, scope, w1, w2, w3, w4, w5, w6
187 )
188}
189
190pub fn parse_multicast_addr(addr: &str) -> Option<Ipv6Addr> {
192 addr.parse::<Ipv6Addr>().ok()
193}
194
195pub fn compute_discovery_token(group_id: &[u8], link_local_addr: &str) -> [u8; 32] {
199 let mut input = group_id.to_vec();
200 input.extend_from_slice(link_local_addr.as_bytes());
201 rns_crypto::sha256::sha256(&input)
202}
203
204#[derive(Debug, Clone)]
208pub struct LocalInterface {
209 pub name: String,
210 pub link_local_addr: String,
211 pub index: u32,
212}
213
214#[cfg(target_os = "android")]
215fn platform_ignored_interfaces() -> &'static [&'static str] {
216 ANDROID_IGNORE_IFS
217}
218
219#[cfg(not(target_os = "android"))]
220fn platform_ignored_interfaces() -> &'static [&'static str] {
221 &[]
222}
223
224fn should_adopt_interface_name(
225 name: &str,
226 allowed: &[String],
227 ignored: &[String],
228 platform_ignored: &[&str],
229) -> bool {
230 let is_allowed = allowed.iter().any(|a| a == name);
231 let is_system_ignored = ALL_IGNORE_IFS.iter().any(|&ig| ig == name)
232 || platform_ignored.iter().any(|&ig| ig == name);
233
234 if is_system_ignored && !is_allowed {
235 return false;
236 }
237
238 if ignored.iter().any(|ig| ig == name) {
239 return false;
240 }
241
242 if !allowed.is_empty() && !is_allowed {
243 return false;
244 }
245
246 true
247}
248
249pub fn enumerate_interfaces(allowed: &[String], ignored: &[String]) -> Vec<LocalInterface> {
253 let mut result = Vec::new();
254 let platform_ignored = platform_ignored_interfaces();
255
256 unsafe {
257 let mut ifaddrs: *mut libc::ifaddrs = std::ptr::null_mut();
258 if libc::getifaddrs(&mut ifaddrs) != 0 {
259 return result;
260 }
261
262 let mut current = ifaddrs;
263 while !current.is_null() {
264 let ifa = &*current;
265 current = ifa.ifa_next;
266
267 if ifa.ifa_addr.is_null() {
269 continue;
270 }
271
272 if (*ifa.ifa_addr).sa_family as i32 != libc::AF_INET6 {
274 continue;
275 }
276
277 let name = match std::ffi::CStr::from_ptr(ifa.ifa_name).to_str() {
279 Ok(s) => s.to_string(),
280 Err(_) => continue,
281 };
282
283 if !should_adopt_interface_name(&name, allowed, ignored, platform_ignored) {
284 continue;
285 }
286
287 let sa6 = ifa.ifa_addr as *const libc::sockaddr_in6;
289 let addr_bytes = (*sa6).sin6_addr.s6_addr;
290 let ipv6 = Ipv6Addr::from(addr_bytes);
291
292 let octets = ipv6.octets();
294 if octets[0] != 0xfe || (octets[1] & 0xc0) != 0x80 {
295 continue;
296 }
297
298 let addr_str = format!("{}", ipv6);
300
301 let index = libc::if_nametoindex(ifa.ifa_name);
303 if index == 0 {
304 continue;
305 }
306
307 if result.iter().any(|li: &LocalInterface| li.name == name) {
309 continue;
310 }
311
312 result.push(LocalInterface {
313 name,
314 link_local_addr: addr_str,
315 index,
316 });
317 }
318
319 libc::freeifaddrs(ifaddrs);
320 }
321
322 result
323}
324
325struct AutoPeer {
329 interface_id: InterfaceId,
330 #[allow(dead_code)]
331 link_local_addr: String,
332 #[allow(dead_code)]
333 ifname: String,
334 last_heard: f64,
335}
336
337struct UdpWriter {
339 socket: UdpSocket,
340 target: SocketAddrV6,
341}
342
343impl Writer for UdpWriter {
344 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
345 self.socket.send_to(data, self.target)?;
346 Ok(())
347 }
348}
349
350struct SharedState {
352 peers: HashMap<String, AutoPeer>,
354 link_local_addresses: Vec<String>,
356 dedup_deque: VecDeque<([u8; 32], f64)>,
358 online: bool,
360 next_id: Arc<AtomicU64>,
362}
363
364impl SharedState {
365 fn new(next_id: Arc<AtomicU64>) -> Self {
366 SharedState {
367 peers: HashMap::new(),
368 link_local_addresses: Vec::new(),
369 dedup_deque: VecDeque::new(),
370 online: false,
371 next_id,
372 }
373 }
374
375 fn is_duplicate(&self, hash: &[u8; 32], now: f64) -> bool {
377 for (h, ts) in &self.dedup_deque {
378 if h == hash && now < *ts + MULTI_IF_DEQUE_TTL {
379 return true;
380 }
381 }
382 false
383 }
384
385 fn add_dedup(&mut self, hash: [u8; 32], now: f64) {
387 self.dedup_deque.push_back((hash, now));
388 while self.dedup_deque.len() > MULTI_IF_DEQUE_LEN {
389 self.dedup_deque.pop_front();
390 }
391 }
392
393 fn refresh_peer(&mut self, addr: &str, now: f64) {
395 if let Some(peer) = self.peers.get_mut(addr) {
396 peer.last_heard = now;
397 }
398 }
399}
400
401pub fn start(
409 config: AutoConfig,
410 tx: EventSender,
411 next_dynamic_id: Arc<AtomicU64>,
412) -> io::Result<()> {
413 let interfaces = enumerate_interfaces(&config.allowed_interfaces, &config.ignored_interfaces);
414
415 if interfaces.is_empty() {
416 log::warn!(
417 "[{}] No suitable IPv6 link-local interfaces found",
418 config.name,
419 );
420 return Ok(());
421 }
422
423 let group_id = config.group_id.clone();
424 let mcast_addr_str = derive_multicast_address(
425 &group_id,
426 &config.multicast_address_type,
427 &config.discovery_scope,
428 );
429
430 let mcast_ip = match parse_multicast_addr(&mcast_addr_str) {
431 Some(ip) => ip,
432 None => {
433 return Err(io::Error::new(
434 io::ErrorKind::InvalidData,
435 format!("Invalid multicast address: {}", mcast_addr_str),
436 ));
437 }
438 };
439
440 let discovery_port = config.discovery_port;
441 let unicast_discovery_port = config.discovery_port + 1;
442 let data_port = config.data_port;
443 let name = config.name.clone();
444 let configured_bitrate = config.configured_bitrate;
445 let ingress_control = config.ingress_control;
446 {
447 let startup = AutoRuntime::from_config(&config);
448 *lock_or_recover(&config.runtime, "auto runtime") = startup;
449 }
450 let runtime = Arc::clone(&config.runtime);
451
452 let shared = Arc::new(Mutex::new(SharedState::new(next_dynamic_id)));
453 let running = Arc::new(AtomicBool::new(true));
454
455 {
457 let mut state = lock_or_recover(&shared, "auto shared state");
458 for iface in &interfaces {
459 state
460 .link_local_addresses
461 .push(iface.link_local_addr.clone());
462 }
463 }
464
465 log::info!(
466 "[{}] AutoInterface starting with {} local interfaces, multicast {}",
467 name,
468 interfaces.len(),
469 mcast_addr_str,
470 );
471
472 for local_iface in &interfaces {
474 let ifname = local_iface.name.clone();
475 let link_local = local_iface.link_local_addr.clone();
476 let if_index = local_iface.index;
477
478 let mcast_socket = create_multicast_recv_socket(&mcast_ip, discovery_port, if_index)?;
480
481 let unicast_socket =
483 create_unicast_recv_socket(&link_local, unicast_discovery_port, if_index)?;
484
485 {
487 let group_id = group_id.clone();
488 let link_local = link_local.clone();
489 let running = running.clone();
490 let name = name.clone();
491 let runtime = runtime.clone();
492
493 thread::Builder::new()
494 .name(format!("auto-disc-tx-{}", ifname))
495 .spawn(move || {
496 discovery_sender_loop(
497 &group_id,
498 &link_local,
499 &mcast_ip,
500 discovery_port,
501 if_index,
502 runtime,
503 &running,
504 &name,
505 );
506 })?;
507 }
508
509 {
511 let group_id = group_id.clone();
512 let shared = shared.clone();
513 let tx = tx.clone();
514 let running = running.clone();
515 let name = name.clone();
516 let runtime = runtime.clone();
517
518 thread::Builder::new()
519 .name(format!("auto-disc-rx-{}", ifname))
520 .spawn(move || {
521 discovery_receiver_loop(
522 mcast_socket,
523 &group_id,
524 shared,
525 tx,
526 &running,
527 &name,
528 data_port,
529 configured_bitrate,
530 ingress_control,
531 runtime,
532 );
533 })?;
534 }
535
536 {
538 let group_id = group_id.clone();
539 let shared = shared.clone();
540 let tx = tx.clone();
541 let running = running.clone();
542 let name = name.clone();
543 let runtime = runtime.clone();
544 let ingress_control = ingress_control;
545
546 thread::Builder::new()
547 .name(format!("auto-udisc-rx-{}", ifname))
548 .spawn(move || {
549 discovery_receiver_loop(
550 unicast_socket,
551 &group_id,
552 shared,
553 tx,
554 &running,
555 &name,
556 data_port,
557 configured_bitrate,
558 ingress_control,
559 runtime,
560 );
561 })?;
562 }
563
564 {
566 let link_local = local_iface.link_local_addr.clone();
567 let shared = shared.clone();
568 let tx = tx.clone();
569 let running = running.clone();
570 let name = name.clone();
571
572 let data_socket = create_data_recv_socket(&link_local, data_port, if_index)?;
573
574 thread::Builder::new()
575 .name(format!("auto-data-rx-{}", local_iface.name))
576 .spawn(move || {
577 data_receiver_loop(data_socket, shared, tx, &running, &name);
578 })?;
579 }
580 }
581
582 {
584 let shared = shared.clone();
585 let tx = tx.clone();
586 let running = running.clone();
587 let name = name.clone();
588 let runtime = runtime.clone();
589
590 thread::Builder::new()
591 .name(format!("auto-peer-jobs-{}", name))
592 .spawn(move || {
593 peer_jobs_loop(shared, tx, runtime, &running, &name);
594 })?;
595 }
596
597 let announce_interval = lock_or_recover(&runtime, "auto runtime").announce_interval_secs;
599 let peering_wait = Duration::from_secs_f64(announce_interval * 1.2);
600 thread::sleep(peering_wait);
601
602 {
604 let mut state = lock_or_recover(&shared, "auto shared state");
605 state.online = true;
606 }
607
608 log::info!("[{}] AutoInterface online", config.name);
609
610 Ok(())
611}
612
613fn create_multicast_recv_socket(
616 mcast_ip: &Ipv6Addr,
617 port: u16,
618 if_index: u32,
619) -> io::Result<UdpSocket> {
620 let socket = socket2::Socket::new(
621 socket2::Domain::IPV6,
622 socket2::Type::DGRAM,
623 Some(socket2::Protocol::UDP),
624 )?;
625
626 socket.set_reuse_address(true)?;
627 #[cfg(not(target_os = "windows"))]
628 socket.set_reuse_port(true)?;
629
630 let bind_addr = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, port, 0, if_index);
632 socket.bind(&bind_addr.into())?;
633
634 socket.join_multicast_v6(mcast_ip, if_index)?;
636
637 socket.set_nonblocking(false)?;
638 let std_socket: UdpSocket = socket.into();
639 std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
640 Ok(std_socket)
641}
642
643fn create_unicast_recv_socket(link_local: &str, port: u16, if_index: u32) -> io::Result<UdpSocket> {
644 let ip: Ipv6Addr = link_local
645 .parse()
646 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("bad IPv6: {}", e)))?;
647
648 let socket = socket2::Socket::new(
649 socket2::Domain::IPV6,
650 socket2::Type::DGRAM,
651 Some(socket2::Protocol::UDP),
652 )?;
653
654 socket.set_reuse_address(true)?;
655 #[cfg(not(target_os = "windows"))]
656 socket.set_reuse_port(true)?;
657
658 let bind_addr = SocketAddrV6::new(ip, port, 0, if_index);
659 socket.bind(&bind_addr.into())?;
660
661 socket.set_nonblocking(false)?;
662 let std_socket: UdpSocket = socket.into();
663 std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
664 Ok(std_socket)
665}
666
667fn create_data_recv_socket(link_local: &str, port: u16, if_index: u32) -> io::Result<UdpSocket> {
668 let ip: Ipv6Addr = link_local
669 .parse()
670 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("bad IPv6: {}", e)))?;
671
672 let socket = socket2::Socket::new(
673 socket2::Domain::IPV6,
674 socket2::Type::DGRAM,
675 Some(socket2::Protocol::UDP),
676 )?;
677
678 socket.set_reuse_address(true)?;
679 #[cfg(not(target_os = "windows"))]
680 socket.set_reuse_port(true)?;
681
682 let bind_addr = SocketAddrV6::new(ip, port, 0, if_index);
683 socket.bind(&bind_addr.into())?;
684
685 socket.set_nonblocking(false)?;
686 let std_socket: UdpSocket = socket.into();
687 std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
688 Ok(std_socket)
689}
690
691fn discovery_sender_loop(
695 group_id: &[u8],
696 link_local_addr: &str,
697 mcast_ip: &Ipv6Addr,
698 discovery_port: u16,
699 if_index: u32,
700 runtime: Arc<Mutex<AutoRuntime>>,
701 running: &AtomicBool,
702 name: &str,
703) {
704 let token = compute_discovery_token(group_id, link_local_addr);
705
706 while running.load(Ordering::Relaxed) {
707 if let Ok(socket) = UdpSocket::bind("[::]:0") {
709 let if_bytes = if_index.to_ne_bytes();
711 unsafe {
712 libc::setsockopt(
713 socket_fd(&socket),
714 libc::IPPROTO_IPV6,
715 libc::IPV6_MULTICAST_IF,
716 if_bytes.as_ptr() as *const libc::c_void,
717 4,
718 );
719 }
720
721 let target = SocketAddrV6::new(*mcast_ip, discovery_port, 0, 0);
722 if let Err(e) = socket.send_to(&token, target) {
723 log::debug!("[{}] multicast send error: {}", name, e);
724 }
725 }
726
727 let sleep_dur = Duration::from_secs_f64(
728 lock_or_recover(&runtime, "auto runtime")
729 .announce_interval_secs
730 .max(0.1),
731 );
732 thread::sleep(sleep_dur);
733 }
734}
735
736fn discovery_receiver_loop(
738 socket: UdpSocket,
739 group_id: &[u8],
740 shared: Arc<Mutex<SharedState>>,
741 tx: EventSender,
742 running: &AtomicBool,
743 name: &str,
744 data_port: u16,
745 configured_bitrate: u64,
746 ingress_control: rns_core::transport::types::IngressControlConfig,
747 runtime: Arc<Mutex<AutoRuntime>>,
748) {
749 let mut buf = [0u8; 1024];
750
751 while running.load(Ordering::Relaxed) {
752 match socket.recv_from(&mut buf) {
753 Ok((n, src)) => {
754 if n < 32 {
755 continue;
756 }
757
758 let src_addr = match src {
760 std::net::SocketAddr::V6(v6) => v6,
761 _ => continue,
762 };
763 let src_ip = format!("{}", src_addr.ip());
764
765 let peering_hash = &buf[..32];
766 let expected = compute_discovery_token(group_id, &src_ip);
767
768 if peering_hash != expected {
769 log::debug!("[{}] invalid peering hash from {}", name, src_ip);
770 continue;
771 }
772
773 let state = lock_or_recover(&shared, "auto shared state");
775 if !state.online {
776 }
779
780 if state.link_local_addresses.contains(&src_ip) {
782 drop(state);
784 continue;
785 }
786
787 if state.peers.contains_key(&src_ip) {
789 let now = crate::time::now();
790 drop(state);
791 let mut state = lock_or_recover(&shared, "auto shared state");
792 state.refresh_peer(&src_ip, now);
793 continue;
794 }
795 drop(state);
796
797 add_peer(
799 &shared,
800 &tx,
801 &src_ip,
802 data_port,
803 name,
804 configured_bitrate,
805 ingress_control,
806 &runtime,
807 );
808 }
809 Err(ref e)
810 if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut =>
811 {
812 continue;
814 }
815 Err(e) => {
816 log::warn!("[{}] discovery recv error: {}", name, e);
817 if !running.load(Ordering::Relaxed) {
818 return;
819 }
820 thread::sleep(Duration::from_millis(100));
821 }
822 }
823 }
824}
825
826fn add_peer(
828 shared: &Arc<Mutex<SharedState>>,
829 tx: &EventSender,
830 peer_addr: &str,
831 data_port: u16,
832 name: &str,
833 configured_bitrate: u64,
834 ingress_control: rns_core::transport::types::IngressControlConfig,
835 _runtime: &Arc<Mutex<AutoRuntime>>,
836) {
837 let peer_ip: Ipv6Addr = match peer_addr.parse() {
838 Ok(ip) => ip,
839 Err(_) => return,
840 };
841
842 let send_socket = match UdpSocket::bind("[::]:0") {
844 Ok(s) => s,
845 Err(e) => {
846 log::warn!(
847 "[{}] failed to create writer for peer {}: {}",
848 name,
849 peer_addr,
850 e
851 );
852 return;
853 }
854 };
855
856 let target = SocketAddrV6::new(peer_ip, data_port, 0, 0);
857
858 let mut state = lock_or_recover(shared, "auto shared state");
859
860 if state.peers.contains_key(peer_addr) {
862 state.refresh_peer(peer_addr, crate::time::now());
863 return;
864 }
865
866 let peer_id = InterfaceId(state.next_id.fetch_add(1, Ordering::Relaxed));
867
868 let driver_writer: Box<dyn Writer> = Box::new(UdpWriter {
870 socket: send_socket,
871 target,
872 });
873
874 let peer_info = rns_core::transport::types::InterfaceInfo {
875 id: peer_id,
876 name: format!("{}:{}", name, peer_addr),
877 mode: rns_core::constants::MODE_FULL,
878 out_capable: true,
879 in_capable: true,
880 bitrate: Some(configured_bitrate),
881 airtime_profile: None,
882 announce_rate_target: None,
883 announce_rate_grace: 0,
884 announce_rate_penalty: 0.0,
885 announce_cap: rns_core::constants::ANNOUNCE_CAP,
886 is_local_client: false,
887 wants_tunnel: false,
888 tunnel_id: None,
889 mtu: 1400,
890 ia_freq: 0.0,
891 started: 0.0,
892 ingress_control,
893 };
894
895 let now = crate::time::now();
896 state.peers.insert(
897 peer_addr.to_string(),
898 AutoPeer {
899 interface_id: peer_id,
900 link_local_addr: peer_addr.to_string(),
901 ifname: String::new(),
902 last_heard: now,
903 },
904 );
905
906 log::info!(
907 "[{}] Peer discovered: {} (id={})",
908 name,
909 peer_addr,
910 peer_id.0
911 );
912
913 let _ = tx.send(Event::InterfaceUp(
915 peer_id,
916 Some(driver_writer),
917 Some(peer_info),
918 ));
919}
920
921fn data_receiver_loop(
923 socket: UdpSocket,
924 shared: Arc<Mutex<SharedState>>,
925 tx: EventSender,
926 running: &AtomicBool,
927 name: &str,
928) {
929 let mut buf = [0u8; HW_MTU + 64]; while running.load(Ordering::Relaxed) {
932 match socket.recv_from(&mut buf) {
933 Ok((n, src)) => {
934 if n == 0 {
935 continue;
936 }
937
938 let src_addr = match src {
939 std::net::SocketAddr::V6(v6) => v6,
940 _ => continue,
941 };
942 let src_ip = format!("{}", src_addr.ip());
943 let data = &buf[..n];
944
945 let now = crate::time::now();
946 let data_hash = rns_crypto::sha256::sha256(data);
947
948 let mut state = lock_or_recover(&shared, "auto shared state");
949
950 if !state.online {
951 continue;
952 }
953
954 if state.is_duplicate(&data_hash, now) {
956 continue;
957 }
958 state.add_dedup(data_hash, now);
959
960 state.refresh_peer(&src_ip, now);
962
963 let iface_id = match state.peers.get(&src_ip) {
965 Some(peer) => peer.interface_id,
966 None => {
967 continue;
969 }
970 };
971
972 drop(state);
973
974 if tx
975 .send(Event::Frame {
976 interface_id: iface_id,
977 data: data.to_vec(),
978 })
979 .is_err()
980 {
981 return;
982 }
983 }
984 Err(ref e)
985 if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut =>
986 {
987 continue;
988 }
989 Err(e) => {
990 log::warn!("[{}] data recv error: {}", name, e);
991 if !running.load(Ordering::Relaxed) {
992 return;
993 }
994 thread::sleep(Duration::from_millis(100));
995 }
996 }
997 }
998}
999
1000fn peer_jobs_loop(
1002 shared: Arc<Mutex<SharedState>>,
1003 tx: EventSender,
1004 runtime: Arc<Mutex<AutoRuntime>>,
1005 running: &AtomicBool,
1006 name: &str,
1007) {
1008 while running.load(Ordering::Relaxed) {
1009 let interval = Duration::from_secs_f64(
1010 lock_or_recover(&runtime, "auto runtime")
1011 .peer_job_interval_secs
1012 .max(0.1),
1013 );
1014 thread::sleep(interval);
1015
1016 let now = crate::time::now();
1017 let mut timed_out = Vec::new();
1018 let peer_timeout_secs = lock_or_recover(&runtime, "auto runtime").peer_timeout_secs;
1019
1020 {
1021 let state = lock_or_recover(&shared, "auto shared state");
1022 for (addr, peer) in &state.peers {
1023 if now > peer.last_heard + peer_timeout_secs {
1024 timed_out.push((addr.clone(), peer.interface_id));
1025 }
1026 }
1027 }
1028
1029 for (addr, iface_id) in &timed_out {
1030 log::info!("[{}] Peer timed out: {}", name, addr);
1031 let mut state = lock_or_recover(&shared, "auto shared state");
1032 state.peers.remove(addr.as_str());
1033 let _ = tx.send(Event::InterfaceDown(*iface_id));
1034 }
1035 }
1036}
1037
1038#[cfg(unix)]
1042fn socket_fd(socket: &UdpSocket) -> i32 {
1043 use std::os::unix::io::AsRawFd;
1044 socket.as_raw_fd()
1045}
1046
1047#[cfg(not(unix))]
1048fn socket_fd(_socket: &UdpSocket) -> i32 {
1049 0
1050}
1051
1052use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
1055
1056pub struct AutoFactory;
1058
1059impl InterfaceFactory for AutoFactory {
1060 fn type_name(&self) -> &str {
1061 "AutoInterface"
1062 }
1063
1064 fn parse_config(
1065 &self,
1066 name: &str,
1067 id: InterfaceId,
1068 params: &HashMap<String, String>,
1069 ) -> Result<Box<dyn InterfaceConfigData>, String> {
1070 let group_id = params
1071 .get("group_id")
1072 .map(|v| v.as_bytes().to_vec())
1073 .unwrap_or_else(|| DEFAULT_GROUP_ID.to_vec());
1074
1075 let discovery_scope = params
1076 .get("discovery_scope")
1077 .map(|v| match v.to_lowercase().as_str() {
1078 "link" => SCOPE_LINK.to_string(),
1079 "admin" => SCOPE_ADMIN.to_string(),
1080 "site" => SCOPE_SITE.to_string(),
1081 "organisation" | "organization" => SCOPE_ORGANISATION.to_string(),
1082 "global" => SCOPE_GLOBAL.to_string(),
1083 _ => v.clone(),
1084 })
1085 .unwrap_or_else(|| SCOPE_LINK.to_string());
1086
1087 let discovery_port = params
1088 .get("discovery_port")
1089 .and_then(|v| v.parse().ok())
1090 .unwrap_or(DEFAULT_DISCOVERY_PORT);
1091
1092 let data_port = params
1093 .get("data_port")
1094 .and_then(|v| v.parse().ok())
1095 .unwrap_or(DEFAULT_DATA_PORT);
1096
1097 let multicast_address_type = params
1098 .get("multicast_address_type")
1099 .map(|v| match v.to_lowercase().as_str() {
1100 "permanent" => MULTICAST_PERMANENT_ADDRESS_TYPE.to_string(),
1101 "temporary" => MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string(),
1102 _ => v.clone(),
1103 })
1104 .unwrap_or_else(|| MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string());
1105
1106 let configured_bitrate = params
1107 .get("configured_bitrate")
1108 .or_else(|| params.get("bitrate"))
1109 .and_then(|v| v.parse().ok())
1110 .unwrap_or(BITRATE_GUESS);
1111
1112 let allowed_interfaces = params
1113 .get("devices")
1114 .or_else(|| params.get("allowed_interfaces"))
1115 .map(|v| {
1116 v.split(',')
1117 .map(|s| s.trim().to_string())
1118 .filter(|s| !s.is_empty())
1119 .collect()
1120 })
1121 .unwrap_or_default();
1122
1123 let ignored_interfaces = params
1124 .get("ignored_devices")
1125 .or_else(|| params.get("ignored_interfaces"))
1126 .map(|v| {
1127 v.split(',')
1128 .map(|s| s.trim().to_string())
1129 .filter(|s| !s.is_empty())
1130 .collect()
1131 })
1132 .unwrap_or_default();
1133
1134 Ok(Box::new(AutoConfig {
1135 name: name.to_string(),
1136 group_id,
1137 discovery_scope,
1138 discovery_port,
1139 data_port,
1140 multicast_address_type,
1141 allowed_interfaces,
1142 ignored_interfaces,
1143 configured_bitrate,
1144 interface_id: id,
1145 ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
1146 runtime: Arc::new(Mutex::new(AutoRuntime {
1147 announce_interval_secs: ANNOUNCE_INTERVAL,
1148 peer_timeout_secs: PEERING_TIMEOUT,
1149 peer_job_interval_secs: PEER_JOB_INTERVAL,
1150 })),
1151 }))
1152 }
1153
1154 fn start(
1155 &self,
1156 config: Box<dyn InterfaceConfigData>,
1157 ctx: StartContext,
1158 ) -> std::io::Result<StartResult> {
1159 let mut auto_config = *config.into_any().downcast::<AutoConfig>().map_err(|_| {
1160 std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
1161 })?;
1162
1163 auto_config.ingress_control = ctx.ingress_control;
1164 start(auto_config, ctx.tx, ctx.next_dynamic_id)?;
1165 Ok(StartResult::Listener { control: None })
1166 }
1167}
1168
1169pub(crate) fn auto_runtime_handle_from_config(config: &AutoConfig) -> AutoRuntimeConfigHandle {
1170 AutoRuntimeConfigHandle {
1171 interface_name: config.name.clone(),
1172 runtime: Arc::clone(&config.runtime),
1173 startup: AutoRuntime::from_config(config),
1174 }
1175}
1176
1177#[cfg(test)]
1180mod tests {
1181 use super::*;
1182
1183 #[test]
1186 fn multicast_address_default_group() {
1187 let addr = derive_multicast_address(
1189 DEFAULT_GROUP_ID,
1190 MULTICAST_TEMPORARY_ADDRESS_TYPE,
1191 SCOPE_LINK,
1192 );
1193 assert_eq!(addr, "ff12:0:d70b:fb1c:16e4:5e39:485e:31e1");
1194 }
1195
1196 #[test]
1197 fn multicast_address_custom_group() {
1198 let addr =
1199 derive_multicast_address(b"testgroup", MULTICAST_TEMPORARY_ADDRESS_TYPE, SCOPE_LINK);
1200 assert!(addr.starts_with("ff12:0:"));
1202 assert_ne!(addr, "ff12:0:d70b:fb1c:16e4:5e39:485e:31e1");
1204 }
1205
1206 #[test]
1207 fn multicast_address_scope_admin() {
1208 let addr = derive_multicast_address(
1209 DEFAULT_GROUP_ID,
1210 MULTICAST_TEMPORARY_ADDRESS_TYPE,
1211 SCOPE_ADMIN,
1212 );
1213 assert!(addr.starts_with("ff14:0:"));
1214 }
1215
1216 #[test]
1217 fn multicast_address_permanent_type() {
1218 let addr = derive_multicast_address(
1219 DEFAULT_GROUP_ID,
1220 MULTICAST_PERMANENT_ADDRESS_TYPE,
1221 SCOPE_LINK,
1222 );
1223 assert!(addr.starts_with("ff02:0:"));
1224 }
1225
1226 #[test]
1227 fn multicast_address_parseable() {
1228 let addr = derive_multicast_address(
1229 DEFAULT_GROUP_ID,
1230 MULTICAST_TEMPORARY_ADDRESS_TYPE,
1231 SCOPE_LINK,
1232 );
1233 let ip = parse_multicast_addr(&addr);
1234 assert!(ip.is_some());
1235 assert!(ip.unwrap().is_multicast());
1236 }
1237
1238 #[test]
1241 fn discovery_token_interop() {
1242 let token = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::1");
1244 let expected = "97b25576749ea936b0d8a8536ffaf442d157cf47d460dcf13c48b7bd18b6c163";
1245 let got = token
1246 .iter()
1247 .map(|b| format!("{:02x}", b))
1248 .collect::<String>();
1249 assert_eq!(got, expected);
1250 }
1251
1252 #[test]
1253 fn discovery_token_interop_2() {
1254 let token = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::dead:beef:1234:5678");
1256 let expected = "46b6ec7595504b6a35f06bd4bfff71567fb82fcf2706cd361bab20409c42d072";
1257 let got = token
1258 .iter()
1259 .map(|b| format!("{:02x}", b))
1260 .collect::<String>();
1261 assert_eq!(got, expected);
1262 }
1263
1264 #[test]
1265 fn discovery_token_different_groups() {
1266 let t1 = compute_discovery_token(b"reticulum", "fe80::1");
1267 let t2 = compute_discovery_token(b"othergroup", "fe80::1");
1268 assert_ne!(t1, t2);
1269 }
1270
1271 #[test]
1272 fn discovery_token_different_addrs() {
1273 let t1 = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::1");
1274 let t2 = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::2");
1275 assert_ne!(t1, t2);
1276 }
1277
1278 #[test]
1281 fn dedup_basic() {
1282 let next_id = Arc::new(AtomicU64::new(1));
1283 let mut state = SharedState::new(next_id);
1284
1285 let hash = [0xAA; 32];
1286 let now = 1000.0;
1287
1288 assert!(!state.is_duplicate(&hash, now));
1289 state.add_dedup(hash, now);
1290 assert!(state.is_duplicate(&hash, now));
1291 }
1292
1293 #[test]
1294 fn dedup_expired() {
1295 let next_id = Arc::new(AtomicU64::new(1));
1296 let mut state = SharedState::new(next_id);
1297
1298 let hash = [0xBB; 32];
1299 state.add_dedup(hash, 1000.0);
1300
1301 assert!(state.is_duplicate(&hash, 1000.5));
1303 assert!(!state.is_duplicate(&hash, 1001.0));
1305 }
1306
1307 #[test]
1308 fn dedup_max_length() {
1309 let next_id = Arc::new(AtomicU64::new(1));
1310 let mut state = SharedState::new(next_id);
1311
1312 for i in 0..MULTI_IF_DEQUE_LEN + 10 {
1314 let mut hash = [0u8; 32];
1315 hash[0] = (i & 0xFF) as u8;
1316 hash[1] = ((i >> 8) & 0xFF) as u8;
1317 state.add_dedup(hash, 1000.0);
1318 }
1319
1320 assert_eq!(state.dedup_deque.len(), MULTI_IF_DEQUE_LEN);
1321 }
1322
1323 #[test]
1326 fn peer_refresh() {
1327 let next_id = Arc::new(AtomicU64::new(100));
1328 let mut state = SharedState::new(next_id);
1329
1330 state.peers.insert(
1331 "fe80::1".to_string(),
1332 AutoPeer {
1333 interface_id: InterfaceId(100),
1334 link_local_addr: "fe80::1".to_string(),
1335 ifname: "eth0".to_string(),
1336 last_heard: 1000.0,
1337 },
1338 );
1339
1340 state.refresh_peer("fe80::1", 2000.0);
1341 assert_eq!(state.peers["fe80::1"].last_heard, 2000.0);
1342 }
1343
1344 #[test]
1345 fn peer_not_found_refresh() {
1346 let next_id = Arc::new(AtomicU64::new(100));
1347 let mut state = SharedState::new(next_id);
1348 state.refresh_peer("fe80::999", 1000.0);
1350 }
1351
1352 #[test]
1355 fn enumerate_returns_vec() {
1356 let interfaces = enumerate_interfaces(&[], &[]);
1359 for iface in &interfaces {
1361 assert!(!iface.name.is_empty());
1362 assert!(iface.link_local_addr.starts_with("fe80"));
1363 assert!(iface.index > 0);
1364 }
1365 }
1366
1367 #[test]
1368 fn enumerate_with_ignored() {
1369 let interfaces = enumerate_interfaces(
1371 &[],
1372 &[
1373 "lo".to_string(),
1374 "eth0".to_string(),
1375 "wlan0".to_string(),
1376 "enp0s3".to_string(),
1377 "docker0".to_string(),
1378 ],
1379 );
1380 for iface in &interfaces {
1382 assert_ne!(iface.name, "lo");
1383 assert_ne!(iface.name, "eth0");
1384 assert_ne!(iface.name, "wlan0");
1385 }
1386 }
1387
1388 #[test]
1389 fn enumerate_with_allowed_nonexistent() {
1390 let interfaces = enumerate_interfaces(&["nonexistent_if_12345".to_string()], &[]);
1392 assert!(interfaces.is_empty());
1393 }
1394
1395 #[test]
1396 fn filter_skips_android_system_interfaces() {
1397 let allowed = Vec::new();
1398 let ignored = Vec::new();
1399
1400 for name in ["dummy0", "lo", "tun0", "rmnet0", "rmnet7"] {
1401 assert!(
1402 !should_adopt_interface_name(name, &allowed, &ignored, ANDROID_IGNORE_IFS),
1403 "{name} should be skipped by Android AutoInterface defaults"
1404 );
1405 }
1406 }
1407
1408 #[test]
1409 fn filter_does_not_skip_rmnet8_by_android_defaults() {
1410 assert!(should_adopt_interface_name(
1411 "rmnet8",
1412 &[],
1413 &[],
1414 ANDROID_IGNORE_IFS
1415 ));
1416 }
1417
1418 #[test]
1419 fn filter_allowed_overrides_system_ignored_interface() {
1420 assert!(should_adopt_interface_name(
1421 "rmnet0",
1422 &["rmnet0".to_string()],
1423 &[],
1424 ANDROID_IGNORE_IFS
1425 ));
1426
1427 assert!(should_adopt_interface_name(
1428 "lo0",
1429 &["lo0".to_string()],
1430 &[],
1431 &[]
1432 ));
1433 }
1434
1435 #[test]
1436 fn filter_ignored_wins_over_allowed_interface() {
1437 assert!(!should_adopt_interface_name(
1438 "rmnet0",
1439 &["rmnet0".to_string()],
1440 &["rmnet0".to_string()],
1441 ANDROID_IGNORE_IFS
1442 ));
1443 }
1444
1445 #[test]
1446 fn filter_allowed_list_excludes_unlisted_interfaces() {
1447 assert!(!should_adopt_interface_name(
1448 "wlan0",
1449 &["eth0".to_string()],
1450 &[],
1451 ANDROID_IGNORE_IFS
1452 ));
1453
1454 assert!(should_adopt_interface_name(
1455 "wlan0",
1456 &["wlan0".to_string()],
1457 &[],
1458 ANDROID_IGNORE_IFS
1459 ));
1460 }
1461
1462 #[test]
1465 fn config_defaults() {
1466 let config = AutoConfig::default();
1467 assert_eq!(config.group_id, DEFAULT_GROUP_ID);
1468 assert_eq!(config.discovery_scope, SCOPE_LINK);
1469 assert_eq!(config.discovery_port, DEFAULT_DISCOVERY_PORT);
1470 assert_eq!(config.data_port, DEFAULT_DATA_PORT);
1471 assert_eq!(
1472 config.multicast_address_type,
1473 MULTICAST_TEMPORARY_ADDRESS_TYPE
1474 );
1475 assert_eq!(config.configured_bitrate, BITRATE_GUESS);
1476 assert!(config.allowed_interfaces.is_empty());
1477 assert!(config.ignored_interfaces.is_empty());
1478 }
1479
1480 #[test]
1483 fn constants_match_python() {
1484 assert_eq!(DEFAULT_DISCOVERY_PORT, 29716);
1485 assert_eq!(DEFAULT_DATA_PORT, 42671);
1486 assert_eq!(HW_MTU, 1196);
1487 assert_eq!(MULTI_IF_DEQUE_LEN, 48);
1488 assert!((MULTI_IF_DEQUE_TTL - 0.75).abs() < f64::EPSILON);
1489 assert!((PEERING_TIMEOUT - 22.0).abs() < f64::EPSILON);
1490 assert!((ANNOUNCE_INTERVAL - 1.6).abs() < f64::EPSILON);
1491 assert!((PEER_JOB_INTERVAL - 4.0).abs() < f64::EPSILON);
1492 assert!((MCAST_ECHO_TIMEOUT - 6.5).abs() < f64::EPSILON);
1493 assert_eq!(BITRATE_GUESS, 10_000_000);
1494 }
1495
1496 #[test]
1497 fn unicast_discovery_port() {
1498 let unicast_port = DEFAULT_DISCOVERY_PORT + 1;
1500 assert_eq!(unicast_port, 29717);
1501 }
1502
1503 #[test]
1504 fn reverse_peering_interval() {
1505 let interval = ANNOUNCE_INTERVAL * REVERSE_PEERING_MULTIPLIER;
1506 assert!((interval - 5.2).abs() < 0.01);
1507 }
1508}