Skip to main content

rns_net/interface/
auto.rs

1//! AutoInterface: Zero-configuration LAN auto-discovery via IPv6 multicast.
2//!
3//! Matches Python `AutoInterface` from `RNS/Interfaces/AutoInterface.py`.
4//!
5//! Thread model (per adopted network interface):
6//!   - Discovery sender: periodically sends discovery token via multicast
7//!   - Discovery receiver (multicast): validates tokens, adds peers
8//!   - Discovery receiver (unicast): validates reverse-peering tokens
9//!   - Data receiver: UDP server receiving unicast data from peers
10//!
11//! Additionally one shared thread:
12//!   - Peer jobs: periodically culls timed-out peers
13
14use std::collections::{HashMap, VecDeque};
15use std::io;
16use std::net::{Ipv6Addr, SocketAddrV6, UdpSocket};
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18use std::sync::{Arc, Mutex};
19use std::thread;
20use std::time::Duration;
21
22use rns_core::transport::types::InterfaceId;
23
24use crate::event::{Event, EventSender};
25use crate::interface::Writer;
26
27// ── Constants (matching Python AutoInterface) ──────────────────────────────
28
29/// Default UDP port for multicast discovery.
30pub const DEFAULT_DISCOVERY_PORT: u16 = 29716;
31
32/// Default UDP port for unicast data exchange.
33pub const DEFAULT_DATA_PORT: u16 = 42671;
34
35/// Default group identifier.
36pub const DEFAULT_GROUP_ID: &[u8] = b"reticulum";
37
38/// Default IFAC size for AutoInterface (bytes).
39pub const DEFAULT_IFAC_SIZE: usize = 16;
40
41/// Hardware MTU for AutoInterface packets.
42pub const HW_MTU: usize = 1196;
43
44/// Multicast scope: link-local.
45pub const SCOPE_LINK: &str = "2";
46/// Multicast scope: admin-local.
47pub const SCOPE_ADMIN: &str = "4";
48/// Multicast scope: site-local.
49pub const SCOPE_SITE: &str = "5";
50/// Multicast scope: organization-local.
51pub const SCOPE_ORGANISATION: &str = "8";
52/// Multicast scope: global.
53pub const SCOPE_GLOBAL: &str = "e";
54
55/// Permanent multicast address type.
56pub const MULTICAST_PERMANENT_ADDRESS_TYPE: &str = "0";
57/// Temporary multicast address type.
58pub const MULTICAST_TEMPORARY_ADDRESS_TYPE: &str = "1";
59
60/// How long before a peer is considered timed out (seconds).
61pub const PEERING_TIMEOUT: f64 = 22.0;
62
63/// How often to send multicast discovery announcements (seconds).
64pub const ANNOUNCE_INTERVAL: f64 = 1.6;
65
66/// How often to run peer maintenance jobs (seconds).
67pub const PEER_JOB_INTERVAL: f64 = 4.0;
68
69/// Multicast echo timeout (seconds). Used for carrier detection.
70pub const MCAST_ECHO_TIMEOUT: f64 = 6.5;
71
72/// Default bitrate guess for AutoInterface (10 Mbps).
73pub const BITRATE_GUESS: u64 = 10_000_000;
74
75/// Deduplication deque size.
76pub const MULTI_IF_DEQUE_LEN: usize = 48;
77
78/// Deduplication deque entry TTL (seconds).
79pub const MULTI_IF_DEQUE_TTL: f64 = 0.75;
80
81/// Reverse peering interval multiplier (announce_interval * 3.25).
82pub const REVERSE_PEERING_MULTIPLIER: f64 = 3.25;
83
84/// Interfaces always ignored.
85pub const ALL_IGNORE_IFS: &[&str] = &["lo0"];
86
87// ── Configuration ──────────────────────────────────────────────────────────
88
89/// Configuration for an AutoInterface.
90#[derive(Debug, Clone)]
91pub struct AutoConfig {
92    pub name: String,
93    pub group_id: Vec<u8>,
94    pub discovery_scope: String,
95    pub discovery_port: u16,
96    pub data_port: u16,
97    pub multicast_address_type: String,
98    pub allowed_interfaces: Vec<String>,
99    pub ignored_interfaces: Vec<String>,
100    pub configured_bitrate: u64,
101    /// Base interface ID. Per-peer IDs will be assigned dynamically.
102    pub interface_id: InterfaceId,
103    pub runtime: Arc<Mutex<AutoRuntime>>,
104}
105
106#[derive(Debug, Clone)]
107pub struct AutoRuntime {
108    pub announce_interval_secs: f64,
109    pub peer_timeout_secs: f64,
110    pub peer_job_interval_secs: f64,
111}
112
113impl AutoRuntime {
114    pub fn from_config(_config: &AutoConfig) -> Self {
115        Self {
116            announce_interval_secs: ANNOUNCE_INTERVAL,
117            peer_timeout_secs: PEERING_TIMEOUT,
118            peer_job_interval_secs: PEER_JOB_INTERVAL,
119        }
120    }
121}
122
123#[derive(Debug, Clone)]
124pub struct AutoRuntimeConfigHandle {
125    pub interface_name: String,
126    pub runtime: Arc<Mutex<AutoRuntime>>,
127    pub startup: AutoRuntime,
128}
129
130impl Default for AutoConfig {
131    fn default() -> Self {
132        let mut config = AutoConfig {
133            name: String::new(),
134            group_id: DEFAULT_GROUP_ID.to_vec(),
135            discovery_scope: SCOPE_LINK.to_string(),
136            discovery_port: DEFAULT_DISCOVERY_PORT,
137            data_port: DEFAULT_DATA_PORT,
138            multicast_address_type: MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string(),
139            allowed_interfaces: Vec::new(),
140            ignored_interfaces: Vec::new(),
141            configured_bitrate: BITRATE_GUESS,
142            interface_id: InterfaceId(0),
143            runtime: Arc::new(Mutex::new(AutoRuntime {
144                announce_interval_secs: ANNOUNCE_INTERVAL,
145                peer_timeout_secs: PEERING_TIMEOUT,
146                peer_job_interval_secs: PEER_JOB_INTERVAL,
147            })),
148        };
149        let startup = AutoRuntime::from_config(&config);
150        config.runtime = Arc::new(Mutex::new(startup));
151        config
152    }
153}
154
155// ── Multicast address derivation ───────────────────────────────────────────
156
157/// Derive the IPv6 multicast discovery address from group_id, scope, and address type.
158///
159/// Algorithm (matching Python):
160///   1. group_hash = SHA-256(group_id)
161///   2. Build suffix from hash bytes 2..14 as 6 little-endian 16-bit words
162///   3. First word is hardcoded "0"
163///   4. Prefix = "ff" + address_type + scope
164pub fn derive_multicast_address(group_id: &[u8], address_type: &str, scope: &str) -> String {
165    let group_hash = rns_crypto::sha256::sha256(group_id);
166    let g = &group_hash;
167
168    // Build 6 LE 16-bit words from bytes 2..14
169    let w1 = (g[2] as u16) << 8 | g[3] as u16;
170    let w2 = (g[4] as u16) << 8 | g[5] as u16;
171    let w3 = (g[6] as u16) << 8 | g[7] as u16;
172    let w4 = (g[8] as u16) << 8 | g[9] as u16;
173    let w5 = (g[10] as u16) << 8 | g[11] as u16;
174    let w6 = (g[12] as u16) << 8 | g[13] as u16;
175
176    format!(
177        "ff{}{}:0:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
178        address_type, scope, w1, w2, w3, w4, w5, w6
179    )
180}
181
182/// Parse a multicast address string into an Ipv6Addr.
183pub fn parse_multicast_addr(addr: &str) -> Option<Ipv6Addr> {
184    addr.parse::<Ipv6Addr>().ok()
185}
186
187// ── Discovery token ────────────────────────────────────────────────────────
188
189/// Compute the discovery token: SHA-256(group_id + link_local_address_string).
190pub fn compute_discovery_token(group_id: &[u8], link_local_addr: &str) -> [u8; 32] {
191    let mut input = group_id.to_vec();
192    input.extend_from_slice(link_local_addr.as_bytes());
193    rns_crypto::sha256::sha256(&input)
194}
195
196// ── Network interface enumeration ──────────────────────────────────────────
197
198/// Information about a local network interface with an IPv6 link-local address.
199#[derive(Debug, Clone)]
200pub struct LocalInterface {
201    pub name: String,
202    pub link_local_addr: String,
203    pub index: u32,
204}
205
206/// Enumerate network interfaces that have IPv6 link-local addresses (fe80::/10).
207///
208/// Uses `libc::getifaddrs()`. Filters by allowed/ignored interface lists.
209pub fn enumerate_interfaces(allowed: &[String], ignored: &[String]) -> Vec<LocalInterface> {
210    let mut result = Vec::new();
211
212    unsafe {
213        let mut ifaddrs: *mut libc::ifaddrs = std::ptr::null_mut();
214        if libc::getifaddrs(&mut ifaddrs) != 0 {
215            return result;
216        }
217
218        let mut current = ifaddrs;
219        while !current.is_null() {
220            let ifa = &*current;
221            current = ifa.ifa_next;
222
223            // Must have an address
224            if ifa.ifa_addr.is_null() {
225                continue;
226            }
227
228            // Must be AF_INET6
229            if (*ifa.ifa_addr).sa_family as i32 != libc::AF_INET6 {
230                continue;
231            }
232
233            // Get interface name
234            let name = match std::ffi::CStr::from_ptr(ifa.ifa_name).to_str() {
235                Ok(s) => s.to_string(),
236                Err(_) => continue,
237            };
238
239            // Check global ignore list
240            if ALL_IGNORE_IFS.iter().any(|&ig| ig == name) {
241                if !allowed.iter().any(|a| a == &name) {
242                    continue;
243                }
244            }
245
246            // Check ignored interfaces
247            if ignored.iter().any(|ig| ig == &name) {
248                continue;
249            }
250
251            // Check allowed interfaces (if non-empty, only allow those)
252            if !allowed.is_empty() && !allowed.iter().any(|a| a == &name) {
253                continue;
254            }
255
256            // Extract IPv6 address
257            let sa6 = ifa.ifa_addr as *const libc::sockaddr_in6;
258            let addr_bytes = (*sa6).sin6_addr.s6_addr;
259            let ipv6 = Ipv6Addr::from(addr_bytes);
260
261            // Must be link-local (fe80::/10)
262            let octets = ipv6.octets();
263            if octets[0] != 0xfe || (octets[1] & 0xc0) != 0x80 {
264                continue;
265            }
266
267            // Format the address (drop scope ID, matching Python's descope_linklocal)
268            let addr_str = format!("{}", ipv6);
269
270            // Get interface index
271            let index = libc::if_nametoindex(ifa.ifa_name);
272            if index == 0 {
273                continue;
274            }
275
276            // Avoid duplicates (same interface may appear multiple times)
277            if result.iter().any(|li: &LocalInterface| li.name == name) {
278                continue;
279            }
280
281            result.push(LocalInterface {
282                name,
283                link_local_addr: addr_str,
284                index,
285            });
286        }
287
288        libc::freeifaddrs(ifaddrs);
289    }
290
291    result
292}
293
294// ── Peer tracking ──────────────────────────────────────────────────────────
295
296/// A discovered peer.
297struct AutoPeer {
298    interface_id: InterfaceId,
299    #[allow(dead_code)]
300    link_local_addr: String,
301    #[allow(dead_code)]
302    ifname: String,
303    last_heard: f64,
304}
305
306/// Writer that sends UDP unicast data to a peer.
307struct UdpWriter {
308    socket: UdpSocket,
309    target: SocketAddrV6,
310}
311
312impl Writer for UdpWriter {
313    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
314        self.socket.send_to(data, self.target)?;
315        Ok(())
316    }
317}
318
319/// Shared state for the AutoInterface across all threads.
320struct SharedState {
321    /// Known peers: link_local_addr → AutoPeer
322    peers: HashMap<String, AutoPeer>,
323    /// Our own link-local addresses (for echo detection)
324    link_local_addresses: Vec<String>,
325    /// Deduplication deque: (hash, timestamp)
326    dedup_deque: VecDeque<([u8; 32], f64)>,
327    /// Flag set when final_init is done
328    online: bool,
329    /// Next dynamic interface ID
330    next_id: Arc<AtomicU64>,
331}
332
333impl SharedState {
334    fn new(next_id: Arc<AtomicU64>) -> Self {
335        SharedState {
336            peers: HashMap::new(),
337            link_local_addresses: Vec::new(),
338            dedup_deque: VecDeque::new(),
339            online: false,
340            next_id,
341        }
342    }
343
344    /// Check dedup deque for a data hash.
345    fn is_duplicate(&self, hash: &[u8; 32], now: f64) -> bool {
346        for (h, ts) in &self.dedup_deque {
347            if h == hash && now < *ts + MULTI_IF_DEQUE_TTL {
348                return true;
349            }
350        }
351        false
352    }
353
354    /// Add to dedup deque, trimming to max length.
355    fn add_dedup(&mut self, hash: [u8; 32], now: f64) {
356        self.dedup_deque.push_back((hash, now));
357        while self.dedup_deque.len() > MULTI_IF_DEQUE_LEN {
358            self.dedup_deque.pop_front();
359        }
360    }
361
362    /// Refresh a peer's last_heard timestamp.
363    fn refresh_peer(&mut self, addr: &str, now: f64) {
364        if let Some(peer) = self.peers.get_mut(addr) {
365            peer.last_heard = now;
366        }
367    }
368}
369
370// ── Start function ─────────────────────────────────────────────────────────
371
372/// Start an AutoInterface. Discovers local IPv6 link-local interfaces,
373/// sets up multicast discovery, and creates UDP data servers.
374///
375/// Returns a vec of (InterfaceId, Writer) for each initial peer (typically empty
376/// since peers are discovered dynamically via InterfaceUp events).
377pub fn start(
378    config: AutoConfig,
379    tx: EventSender,
380    next_dynamic_id: Arc<AtomicU64>,
381) -> io::Result<()> {
382    let interfaces = enumerate_interfaces(&config.allowed_interfaces, &config.ignored_interfaces);
383
384    if interfaces.is_empty() {
385        log::warn!(
386            "[{}] No suitable IPv6 link-local interfaces found",
387            config.name,
388        );
389        return Ok(());
390    }
391
392    let group_id = config.group_id.clone();
393    let mcast_addr_str = derive_multicast_address(
394        &group_id,
395        &config.multicast_address_type,
396        &config.discovery_scope,
397    );
398
399    let mcast_ip = match parse_multicast_addr(&mcast_addr_str) {
400        Some(ip) => ip,
401        None => {
402            return Err(io::Error::new(
403                io::ErrorKind::InvalidData,
404                format!("Invalid multicast address: {}", mcast_addr_str),
405            ));
406        }
407    };
408
409    let discovery_port = config.discovery_port;
410    let unicast_discovery_port = config.discovery_port + 1;
411    let data_port = config.data_port;
412    let name = config.name.clone();
413    let configured_bitrate = config.configured_bitrate;
414    {
415        let startup = AutoRuntime::from_config(&config);
416        *config.runtime.lock().unwrap() = startup;
417    }
418    let runtime = Arc::clone(&config.runtime);
419
420    let shared = Arc::new(Mutex::new(SharedState::new(next_dynamic_id)));
421    let running = Arc::new(AtomicBool::new(true));
422
423    // Record our own link-local addresses
424    {
425        let mut state = shared.lock().unwrap();
426        for iface in &interfaces {
427            state
428                .link_local_addresses
429                .push(iface.link_local_addr.clone());
430        }
431    }
432
433    log::info!(
434        "[{}] AutoInterface starting with {} local interfaces, multicast {}",
435        name,
436        interfaces.len(),
437        mcast_addr_str,
438    );
439
440    // Per-interface: set up discovery sockets and threads
441    for local_iface in &interfaces {
442        let ifname = local_iface.name.clone();
443        let link_local = local_iface.link_local_addr.clone();
444        let if_index = local_iface.index;
445
446        // ─── Multicast discovery socket ───────────────────────────────
447        let mcast_socket = create_multicast_recv_socket(&mcast_ip, discovery_port, if_index)?;
448
449        // ─── Unicast discovery socket ─────────────────────────────────
450        let unicast_socket =
451            create_unicast_recv_socket(&link_local, unicast_discovery_port, if_index)?;
452
453        // ─── Discovery sender thread ──────────────────────────────────
454        {
455            let group_id = group_id.clone();
456            let link_local = link_local.clone();
457            let running = running.clone();
458            let name = name.clone();
459            let runtime = runtime.clone();
460
461            thread::Builder::new()
462                .name(format!("auto-disc-tx-{}", ifname))
463                .spawn(move || {
464                    discovery_sender_loop(
465                        &group_id,
466                        &link_local,
467                        &mcast_ip,
468                        discovery_port,
469                        if_index,
470                        runtime,
471                        &running,
472                        &name,
473                    );
474                })?;
475        }
476
477        // ─── Multicast discovery receiver thread ──────────────────────
478        {
479            let group_id = group_id.clone();
480            let shared = shared.clone();
481            let tx = tx.clone();
482            let running = running.clone();
483            let name = name.clone();
484            let runtime = runtime.clone();
485
486            thread::Builder::new()
487                .name(format!("auto-disc-rx-{}", ifname))
488                .spawn(move || {
489                    discovery_receiver_loop(
490                        mcast_socket,
491                        &group_id,
492                        shared,
493                        tx,
494                        &running,
495                        &name,
496                        data_port,
497                        configured_bitrate,
498                        runtime,
499                    );
500                })?;
501        }
502
503        // ─── Unicast discovery receiver thread ────────────────────────
504        {
505            let group_id = group_id.clone();
506            let shared = shared.clone();
507            let tx = tx.clone();
508            let running = running.clone();
509            let name = name.clone();
510            let runtime = runtime.clone();
511
512            thread::Builder::new()
513                .name(format!("auto-udisc-rx-{}", ifname))
514                .spawn(move || {
515                    discovery_receiver_loop(
516                        unicast_socket,
517                        &group_id,
518                        shared,
519                        tx,
520                        &running,
521                        &name,
522                        data_port,
523                        configured_bitrate,
524                        runtime,
525                    );
526                })?;
527        }
528
529        // ─── Data receiver thread ─────────────────────────────────────
530        {
531            let link_local = local_iface.link_local_addr.clone();
532            let shared = shared.clone();
533            let tx = tx.clone();
534            let running = running.clone();
535            let name = name.clone();
536
537            let data_socket = create_data_recv_socket(&link_local, data_port, if_index)?;
538
539            thread::Builder::new()
540                .name(format!("auto-data-rx-{}", local_iface.name))
541                .spawn(move || {
542                    data_receiver_loop(data_socket, shared, tx, &running, &name);
543                })?;
544        }
545    }
546
547    // ─── Peer jobs thread ─────────────────────────────────────────────
548    {
549        let shared = shared.clone();
550        let tx = tx.clone();
551        let running = running.clone();
552        let name = name.clone();
553        let runtime = runtime.clone();
554
555        thread::Builder::new()
556            .name(format!("auto-peer-jobs-{}", name))
557            .spawn(move || {
558                peer_jobs_loop(shared, tx, runtime, &running, &name);
559            })?;
560    }
561
562    // Wait for initial peering
563    let announce_interval = runtime.lock().unwrap().announce_interval_secs;
564    let peering_wait = Duration::from_secs_f64(announce_interval * 1.2);
565    thread::sleep(peering_wait);
566
567    // Mark as online
568    {
569        let mut state = shared.lock().unwrap();
570        state.online = true;
571    }
572
573    log::info!("[{}] AutoInterface online", config.name);
574
575    Ok(())
576}
577
578// ── Socket creation helpers ────────────────────────────────────────────────
579
580fn create_multicast_recv_socket(
581    mcast_ip: &Ipv6Addr,
582    port: u16,
583    if_index: u32,
584) -> io::Result<UdpSocket> {
585    let socket = socket2::Socket::new(
586        socket2::Domain::IPV6,
587        socket2::Type::DGRAM,
588        Some(socket2::Protocol::UDP),
589    )?;
590
591    socket.set_reuse_address(true)?;
592    #[cfg(not(target_os = "windows"))]
593    socket.set_reuse_port(true)?;
594
595    // Bind to [::]:port on the specific interface
596    let bind_addr = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, port, 0, if_index);
597    socket.bind(&bind_addr.into())?;
598
599    // Join multicast group on the specific interface
600    socket.join_multicast_v6(mcast_ip, if_index)?;
601
602    socket.set_nonblocking(false)?;
603    let std_socket: UdpSocket = socket.into();
604    std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
605    Ok(std_socket)
606}
607
608fn create_unicast_recv_socket(link_local: &str, port: u16, if_index: u32) -> io::Result<UdpSocket> {
609    let ip: Ipv6Addr = link_local
610        .parse()
611        .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("bad IPv6: {}", e)))?;
612
613    let socket = socket2::Socket::new(
614        socket2::Domain::IPV6,
615        socket2::Type::DGRAM,
616        Some(socket2::Protocol::UDP),
617    )?;
618
619    socket.set_reuse_address(true)?;
620    #[cfg(not(target_os = "windows"))]
621    socket.set_reuse_port(true)?;
622
623    let bind_addr = SocketAddrV6::new(ip, port, 0, if_index);
624    socket.bind(&bind_addr.into())?;
625
626    socket.set_nonblocking(false)?;
627    let std_socket: UdpSocket = socket.into();
628    std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
629    Ok(std_socket)
630}
631
632fn create_data_recv_socket(link_local: &str, port: u16, if_index: u32) -> io::Result<UdpSocket> {
633    let ip: Ipv6Addr = link_local
634        .parse()
635        .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("bad IPv6: {}", e)))?;
636
637    let socket = socket2::Socket::new(
638        socket2::Domain::IPV6,
639        socket2::Type::DGRAM,
640        Some(socket2::Protocol::UDP),
641    )?;
642
643    socket.set_reuse_address(true)?;
644    #[cfg(not(target_os = "windows"))]
645    socket.set_reuse_port(true)?;
646
647    let bind_addr = SocketAddrV6::new(ip, port, 0, if_index);
648    socket.bind(&bind_addr.into())?;
649
650    socket.set_nonblocking(false)?;
651    let std_socket: UdpSocket = socket.into();
652    std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
653    Ok(std_socket)
654}
655
656// ── Thread loops ───────────────────────────────────────────────────────────
657
658/// Discovery sender: periodically sends discovery token via multicast.
659fn discovery_sender_loop(
660    group_id: &[u8],
661    link_local_addr: &str,
662    mcast_ip: &Ipv6Addr,
663    discovery_port: u16,
664    if_index: u32,
665    runtime: Arc<Mutex<AutoRuntime>>,
666    running: &AtomicBool,
667    name: &str,
668) {
669    let token = compute_discovery_token(group_id, link_local_addr);
670
671    while running.load(Ordering::Relaxed) {
672        // Create a fresh socket for each send (matches Python)
673        if let Ok(socket) = UdpSocket::bind("[::]:0") {
674            // Set multicast interface
675            let if_bytes = if_index.to_ne_bytes();
676            unsafe {
677                libc::setsockopt(
678                    socket_fd(&socket),
679                    libc::IPPROTO_IPV6,
680                    libc::IPV6_MULTICAST_IF,
681                    if_bytes.as_ptr() as *const libc::c_void,
682                    4,
683                );
684            }
685
686            let target = SocketAddrV6::new(*mcast_ip, discovery_port, 0, 0);
687            if let Err(e) = socket.send_to(&token, target) {
688                log::debug!("[{}] multicast send error: {}", name, e);
689            }
690        }
691
692        let sleep_dur =
693            Duration::from_secs_f64(runtime.lock().unwrap().announce_interval_secs.max(0.1));
694        thread::sleep(sleep_dur);
695    }
696}
697
698/// Discovery receiver: listens for discovery tokens and adds peers.
699fn discovery_receiver_loop(
700    socket: UdpSocket,
701    group_id: &[u8],
702    shared: Arc<Mutex<SharedState>>,
703    tx: EventSender,
704    running: &AtomicBool,
705    name: &str,
706    data_port: u16,
707    configured_bitrate: u64,
708    runtime: Arc<Mutex<AutoRuntime>>,
709) {
710    let mut buf = [0u8; 1024];
711
712    while running.load(Ordering::Relaxed) {
713        match socket.recv_from(&mut buf) {
714            Ok((n, src)) => {
715                if n < 32 {
716                    continue;
717                }
718
719                // Extract source IPv6 address
720                let src_addr = match src {
721                    std::net::SocketAddr::V6(v6) => v6,
722                    _ => continue,
723                };
724                let src_ip = format!("{}", src_addr.ip());
725
726                let peering_hash = &buf[..32];
727                let expected = compute_discovery_token(group_id, &src_ip);
728
729                if peering_hash != expected {
730                    log::debug!("[{}] invalid peering hash from {}", name, src_ip);
731                    continue;
732                }
733
734                // Check if online
735                let state = shared.lock().unwrap();
736                if !state.online {
737                    // Not fully initialized yet, but still accept for initial peering
738                    // (Python processes after final_init_done)
739                }
740
741                // Check if it's our own echo
742                if state.link_local_addresses.contains(&src_ip) {
743                    // Multicast echo from ourselves — just record it
744                    drop(state);
745                    continue;
746                }
747
748                // Check if already known
749                if state.peers.contains_key(&src_ip) {
750                    let now = crate::time::now();
751                    drop(state);
752                    let mut state = shared.lock().unwrap();
753                    state.refresh_peer(&src_ip, now);
754                    continue;
755                }
756                drop(state);
757
758                // New peer! Create a data writer to send to them.
759                add_peer(
760                    &shared,
761                    &tx,
762                    &src_ip,
763                    data_port,
764                    name,
765                    configured_bitrate,
766                    &runtime,
767                );
768            }
769            Err(ref e)
770                if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut =>
771            {
772                // Timeout, loop again
773                continue;
774            }
775            Err(e) => {
776                log::warn!("[{}] discovery recv error: {}", name, e);
777                if !running.load(Ordering::Relaxed) {
778                    return;
779                }
780                thread::sleep(Duration::from_millis(100));
781            }
782        }
783    }
784}
785
786/// Add a new peer, creating a writer and emitting InterfaceUp.
787fn add_peer(
788    shared: &Arc<Mutex<SharedState>>,
789    tx: &EventSender,
790    peer_addr: &str,
791    data_port: u16,
792    name: &str,
793    configured_bitrate: u64,
794    _runtime: &Arc<Mutex<AutoRuntime>>,
795) {
796    let peer_ip: Ipv6Addr = match peer_addr.parse() {
797        Ok(ip) => ip,
798        Err(_) => return,
799    };
800
801    // Create UDP writer to send data to this peer
802    let send_socket = match UdpSocket::bind("[::]:0") {
803        Ok(s) => s,
804        Err(e) => {
805            log::warn!(
806                "[{}] failed to create writer for peer {}: {}",
807                name,
808                peer_addr,
809                e
810            );
811            return;
812        }
813    };
814
815    let target = SocketAddrV6::new(peer_ip, data_port, 0, 0);
816
817    let mut state = shared.lock().unwrap();
818
819    // Double-check not already added (race)
820    if state.peers.contains_key(peer_addr) {
821        state.refresh_peer(peer_addr, crate::time::now());
822        return;
823    }
824
825    let peer_id = InterfaceId(state.next_id.fetch_add(1, Ordering::Relaxed));
826
827    // Create a boxed writer for the driver
828    let driver_writer: Box<dyn Writer> = Box::new(UdpWriter {
829        socket: send_socket,
830        target,
831    });
832
833    let peer_info = rns_core::transport::types::InterfaceInfo {
834        id: peer_id,
835        name: format!("{}:{}", name, peer_addr),
836        mode: rns_core::constants::MODE_FULL,
837        out_capable: true,
838        in_capable: true,
839        bitrate: Some(configured_bitrate),
840        announce_rate_target: None,
841        announce_rate_grace: 0,
842        announce_rate_penalty: 0.0,
843        announce_cap: rns_core::constants::ANNOUNCE_CAP,
844        is_local_client: false,
845        wants_tunnel: false,
846        tunnel_id: None,
847        mtu: 1400,
848        ia_freq: 0.0,
849        started: 0.0,
850        ingress_control: true,
851    };
852
853    let now = crate::time::now();
854    state.peers.insert(
855        peer_addr.to_string(),
856        AutoPeer {
857            interface_id: peer_id,
858            link_local_addr: peer_addr.to_string(),
859            ifname: String::new(),
860            last_heard: now,
861        },
862    );
863
864    log::info!(
865        "[{}] Peer discovered: {} (id={})",
866        name,
867        peer_addr,
868        peer_id.0
869    );
870
871    // Notify driver of new dynamic interface
872    let _ = tx.send(Event::InterfaceUp(
873        peer_id,
874        Some(driver_writer),
875        Some(peer_info),
876    ));
877}
878
879/// Data receiver: receives unicast UDP data from peers and dispatches as frames.
880fn data_receiver_loop(
881    socket: UdpSocket,
882    shared: Arc<Mutex<SharedState>>,
883    tx: EventSender,
884    running: &AtomicBool,
885    name: &str,
886) {
887    let mut buf = [0u8; HW_MTU + 64]; // a bit extra
888
889    while running.load(Ordering::Relaxed) {
890        match socket.recv_from(&mut buf) {
891            Ok((n, src)) => {
892                if n == 0 {
893                    continue;
894                }
895
896                let src_addr = match src {
897                    std::net::SocketAddr::V6(v6) => v6,
898                    _ => continue,
899                };
900                let src_ip = format!("{}", src_addr.ip());
901                let data = &buf[..n];
902
903                let now = crate::time::now();
904                let data_hash = rns_crypto::sha256::sha256(data);
905
906                let mut state = shared.lock().unwrap();
907
908                if !state.online {
909                    continue;
910                }
911
912                // Deduplication
913                if state.is_duplicate(&data_hash, now) {
914                    continue;
915                }
916                state.add_dedup(data_hash, now);
917
918                // Refresh peer
919                state.refresh_peer(&src_ip, now);
920
921                // Find the interface ID for this peer
922                let iface_id = match state.peers.get(&src_ip) {
923                    Some(peer) => peer.interface_id,
924                    None => {
925                        // Unknown peer, skip
926                        continue;
927                    }
928                };
929
930                drop(state);
931
932                if tx
933                    .send(Event::Frame {
934                        interface_id: iface_id,
935                        data: data.to_vec(),
936                    })
937                    .is_err()
938                {
939                    return;
940                }
941            }
942            Err(ref e)
943                if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut =>
944            {
945                continue;
946            }
947            Err(e) => {
948                log::warn!("[{}] data recv error: {}", name, e);
949                if !running.load(Ordering::Relaxed) {
950                    return;
951                }
952                thread::sleep(Duration::from_millis(100));
953            }
954        }
955    }
956}
957
958/// Peer jobs: periodically cull timed-out peers.
959fn peer_jobs_loop(
960    shared: Arc<Mutex<SharedState>>,
961    tx: EventSender,
962    runtime: Arc<Mutex<AutoRuntime>>,
963    running: &AtomicBool,
964    name: &str,
965) {
966    while running.load(Ordering::Relaxed) {
967        let interval =
968            Duration::from_secs_f64(runtime.lock().unwrap().peer_job_interval_secs.max(0.1));
969        thread::sleep(interval);
970
971        let now = crate::time::now();
972        let mut timed_out = Vec::new();
973        let peer_timeout_secs = runtime.lock().unwrap().peer_timeout_secs;
974
975        {
976            let state = shared.lock().unwrap();
977            for (addr, peer) in &state.peers {
978                if now > peer.last_heard + peer_timeout_secs {
979                    timed_out.push((addr.clone(), peer.interface_id));
980                }
981            }
982        }
983
984        for (addr, iface_id) in &timed_out {
985            log::info!("[{}] Peer timed out: {}", name, addr);
986            let mut state = shared.lock().unwrap();
987            state.peers.remove(addr.as_str());
988            let _ = tx.send(Event::InterfaceDown(*iface_id));
989        }
990    }
991}
992
993// ── Helper ─────────────────────────────────────────────────────────────────
994
995/// Get the raw file descriptor from a UdpSocket (for setsockopt).
996#[cfg(unix)]
997fn socket_fd(socket: &UdpSocket) -> i32 {
998    use std::os::unix::io::AsRawFd;
999    socket.as_raw_fd()
1000}
1001
1002#[cfg(not(unix))]
1003fn socket_fd(_socket: &UdpSocket) -> i32 {
1004    0
1005}
1006
1007// ── Factory implementation ─────────────────────────────────────────────────
1008
1009use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
1010
1011/// Factory for `AutoInterface`.
1012pub struct AutoFactory;
1013
1014impl InterfaceFactory for AutoFactory {
1015    fn type_name(&self) -> &str {
1016        "AutoInterface"
1017    }
1018
1019    fn parse_config(
1020        &self,
1021        name: &str,
1022        id: InterfaceId,
1023        params: &HashMap<String, String>,
1024    ) -> Result<Box<dyn InterfaceConfigData>, String> {
1025        let group_id = params
1026            .get("group_id")
1027            .map(|v| v.as_bytes().to_vec())
1028            .unwrap_or_else(|| DEFAULT_GROUP_ID.to_vec());
1029
1030        let discovery_scope = params
1031            .get("discovery_scope")
1032            .map(|v| match v.to_lowercase().as_str() {
1033                "link" => SCOPE_LINK.to_string(),
1034                "admin" => SCOPE_ADMIN.to_string(),
1035                "site" => SCOPE_SITE.to_string(),
1036                "organisation" | "organization" => SCOPE_ORGANISATION.to_string(),
1037                "global" => SCOPE_GLOBAL.to_string(),
1038                _ => v.clone(),
1039            })
1040            .unwrap_or_else(|| SCOPE_LINK.to_string());
1041
1042        let discovery_port = params
1043            .get("discovery_port")
1044            .and_then(|v| v.parse().ok())
1045            .unwrap_or(DEFAULT_DISCOVERY_PORT);
1046
1047        let data_port = params
1048            .get("data_port")
1049            .and_then(|v| v.parse().ok())
1050            .unwrap_or(DEFAULT_DATA_PORT);
1051
1052        let multicast_address_type = params
1053            .get("multicast_address_type")
1054            .map(|v| match v.to_lowercase().as_str() {
1055                "permanent" => MULTICAST_PERMANENT_ADDRESS_TYPE.to_string(),
1056                "temporary" => MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string(),
1057                _ => v.clone(),
1058            })
1059            .unwrap_or_else(|| MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string());
1060
1061        let configured_bitrate = params
1062            .get("configured_bitrate")
1063            .or_else(|| params.get("bitrate"))
1064            .and_then(|v| v.parse().ok())
1065            .unwrap_or(BITRATE_GUESS);
1066
1067        let allowed_interfaces = params
1068            .get("devices")
1069            .or_else(|| params.get("allowed_interfaces"))
1070            .map(|v| {
1071                v.split(',')
1072                    .map(|s| s.trim().to_string())
1073                    .filter(|s| !s.is_empty())
1074                    .collect()
1075            })
1076            .unwrap_or_default();
1077
1078        let ignored_interfaces = params
1079            .get("ignored_devices")
1080            .or_else(|| params.get("ignored_interfaces"))
1081            .map(|v| {
1082                v.split(',')
1083                    .map(|s| s.trim().to_string())
1084                    .filter(|s| !s.is_empty())
1085                    .collect()
1086            })
1087            .unwrap_or_default();
1088
1089        Ok(Box::new(AutoConfig {
1090            name: name.to_string(),
1091            group_id,
1092            discovery_scope,
1093            discovery_port,
1094            data_port,
1095            multicast_address_type,
1096            allowed_interfaces,
1097            ignored_interfaces,
1098            configured_bitrate,
1099            interface_id: id,
1100            runtime: Arc::new(Mutex::new(AutoRuntime {
1101                announce_interval_secs: ANNOUNCE_INTERVAL,
1102                peer_timeout_secs: PEERING_TIMEOUT,
1103                peer_job_interval_secs: PEER_JOB_INTERVAL,
1104            })),
1105        }))
1106    }
1107
1108    fn start(
1109        &self,
1110        config: Box<dyn InterfaceConfigData>,
1111        ctx: StartContext,
1112    ) -> std::io::Result<StartResult> {
1113        let auto_config = *config.into_any().downcast::<AutoConfig>().map_err(|_| {
1114            std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
1115        })?;
1116
1117        start(auto_config, ctx.tx, ctx.next_dynamic_id)?;
1118        Ok(StartResult::Listener { control: None })
1119    }
1120}
1121
1122pub(crate) fn auto_runtime_handle_from_config(config: &AutoConfig) -> AutoRuntimeConfigHandle {
1123    AutoRuntimeConfigHandle {
1124        interface_name: config.name.clone(),
1125        runtime: Arc::clone(&config.runtime),
1126        startup: AutoRuntime::from_config(config),
1127    }
1128}
1129
1130// ── Tests ──────────────────────────────────────────────────────────────────
1131
1132#[cfg(test)]
1133mod tests {
1134    use super::*;
1135
1136    // ── Multicast address derivation ──────────────────────────────────
1137
1138    #[test]
1139    fn multicast_address_default_group() {
1140        // Python vector: ff12:0:d70b:fb1c:16e4:5e39:485e:31e1
1141        let addr = derive_multicast_address(
1142            DEFAULT_GROUP_ID,
1143            MULTICAST_TEMPORARY_ADDRESS_TYPE,
1144            SCOPE_LINK,
1145        );
1146        assert_eq!(addr, "ff12:0:d70b:fb1c:16e4:5e39:485e:31e1");
1147    }
1148
1149    #[test]
1150    fn multicast_address_custom_group() {
1151        let addr =
1152            derive_multicast_address(b"testgroup", MULTICAST_TEMPORARY_ADDRESS_TYPE, SCOPE_LINK);
1153        // Just verify format
1154        assert!(addr.starts_with("ff12:0:"));
1155        // Must be different from default
1156        assert_ne!(addr, "ff12:0:d70b:fb1c:16e4:5e39:485e:31e1");
1157    }
1158
1159    #[test]
1160    fn multicast_address_scope_admin() {
1161        let addr = derive_multicast_address(
1162            DEFAULT_GROUP_ID,
1163            MULTICAST_TEMPORARY_ADDRESS_TYPE,
1164            SCOPE_ADMIN,
1165        );
1166        assert!(addr.starts_with("ff14:0:"));
1167    }
1168
1169    #[test]
1170    fn multicast_address_permanent_type() {
1171        let addr = derive_multicast_address(
1172            DEFAULT_GROUP_ID,
1173            MULTICAST_PERMANENT_ADDRESS_TYPE,
1174            SCOPE_LINK,
1175        );
1176        assert!(addr.starts_with("ff02:0:"));
1177    }
1178
1179    #[test]
1180    fn multicast_address_parseable() {
1181        let addr = derive_multicast_address(
1182            DEFAULT_GROUP_ID,
1183            MULTICAST_TEMPORARY_ADDRESS_TYPE,
1184            SCOPE_LINK,
1185        );
1186        let ip = parse_multicast_addr(&addr);
1187        assert!(ip.is_some());
1188        assert!(ip.unwrap().is_multicast());
1189    }
1190
1191    // ── Discovery token ──────────────────────────────────────────────
1192
1193    #[test]
1194    fn discovery_token_interop() {
1195        // Python vector: fe80::1 → 97b25576749ea936b0d8a8536ffaf442d157cf47d460dcf13c48b7bd18b6c163
1196        let token = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::1");
1197        let expected = "97b25576749ea936b0d8a8536ffaf442d157cf47d460dcf13c48b7bd18b6c163";
1198        let got = token
1199            .iter()
1200            .map(|b| format!("{:02x}", b))
1201            .collect::<String>();
1202        assert_eq!(got, expected);
1203    }
1204
1205    #[test]
1206    fn discovery_token_interop_2() {
1207        // Python vector: fe80::dead:beef:1234:5678
1208        let token = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::dead:beef:1234:5678");
1209        let expected = "46b6ec7595504b6a35f06bd4bfff71567fb82fcf2706cd361bab20409c42d072";
1210        let got = token
1211            .iter()
1212            .map(|b| format!("{:02x}", b))
1213            .collect::<String>();
1214        assert_eq!(got, expected);
1215    }
1216
1217    #[test]
1218    fn discovery_token_different_groups() {
1219        let t1 = compute_discovery_token(b"reticulum", "fe80::1");
1220        let t2 = compute_discovery_token(b"othergroup", "fe80::1");
1221        assert_ne!(t1, t2);
1222    }
1223
1224    #[test]
1225    fn discovery_token_different_addrs() {
1226        let t1 = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::1");
1227        let t2 = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::2");
1228        assert_ne!(t1, t2);
1229    }
1230
1231    // ── Deduplication ────────────────────────────────────────────────
1232
1233    #[test]
1234    fn dedup_basic() {
1235        let next_id = Arc::new(AtomicU64::new(1));
1236        let mut state = SharedState::new(next_id);
1237
1238        let hash = [0xAA; 32];
1239        let now = 1000.0;
1240
1241        assert!(!state.is_duplicate(&hash, now));
1242        state.add_dedup(hash, now);
1243        assert!(state.is_duplicate(&hash, now));
1244    }
1245
1246    #[test]
1247    fn dedup_expired() {
1248        let next_id = Arc::new(AtomicU64::new(1));
1249        let mut state = SharedState::new(next_id);
1250
1251        let hash = [0xBB; 32];
1252        state.add_dedup(hash, 1000.0);
1253
1254        // Within TTL
1255        assert!(state.is_duplicate(&hash, 1000.5));
1256        // Expired
1257        assert!(!state.is_duplicate(&hash, 1001.0));
1258    }
1259
1260    #[test]
1261    fn dedup_max_length() {
1262        let next_id = Arc::new(AtomicU64::new(1));
1263        let mut state = SharedState::new(next_id);
1264
1265        // Fill beyond max
1266        for i in 0..MULTI_IF_DEQUE_LEN + 10 {
1267            let mut hash = [0u8; 32];
1268            hash[0] = (i & 0xFF) as u8;
1269            hash[1] = ((i >> 8) & 0xFF) as u8;
1270            state.add_dedup(hash, 1000.0);
1271        }
1272
1273        assert_eq!(state.dedup_deque.len(), MULTI_IF_DEQUE_LEN);
1274    }
1275
1276    // ── Peer tracking ────────────────────────────────────────────────
1277
1278    #[test]
1279    fn peer_refresh() {
1280        let next_id = Arc::new(AtomicU64::new(100));
1281        let mut state = SharedState::new(next_id);
1282
1283        state.peers.insert(
1284            "fe80::1".to_string(),
1285            AutoPeer {
1286                interface_id: InterfaceId(100),
1287                link_local_addr: "fe80::1".to_string(),
1288                ifname: "eth0".to_string(),
1289                last_heard: 1000.0,
1290            },
1291        );
1292
1293        state.refresh_peer("fe80::1", 2000.0);
1294        assert_eq!(state.peers["fe80::1"].last_heard, 2000.0);
1295    }
1296
1297    #[test]
1298    fn peer_not_found_refresh() {
1299        let next_id = Arc::new(AtomicU64::new(100));
1300        let mut state = SharedState::new(next_id);
1301        // Should not panic
1302        state.refresh_peer("fe80::999", 1000.0);
1303    }
1304
1305    // ── Network interface enumeration ────────────────────────────────
1306
1307    #[test]
1308    fn enumerate_returns_vec() {
1309        // This test just verifies the function runs without crashing.
1310        // Results depend on the system's network configuration.
1311        let interfaces = enumerate_interfaces(&[], &[]);
1312        // On CI/test machines, we may or may not have IPv6 link-local
1313        for iface in &interfaces {
1314            assert!(!iface.name.is_empty());
1315            assert!(iface.link_local_addr.starts_with("fe80"));
1316            assert!(iface.index > 0);
1317        }
1318    }
1319
1320    #[test]
1321    fn enumerate_with_ignored() {
1322        // Ignore everything
1323        let interfaces = enumerate_interfaces(
1324            &[],
1325            &[
1326                "lo".to_string(),
1327                "eth0".to_string(),
1328                "wlan0".to_string(),
1329                "enp0s3".to_string(),
1330                "docker0".to_string(),
1331            ],
1332        );
1333        // May still have some interfaces, but known ones should be filtered
1334        for iface in &interfaces {
1335            assert_ne!(iface.name, "lo");
1336            assert_ne!(iface.name, "eth0");
1337            assert_ne!(iface.name, "wlan0");
1338        }
1339    }
1340
1341    #[test]
1342    fn enumerate_with_allowed_nonexistent() {
1343        // Only allow an interface that doesn't exist
1344        let interfaces = enumerate_interfaces(&["nonexistent_if_12345".to_string()], &[]);
1345        assert!(interfaces.is_empty());
1346    }
1347
1348    // ── Config defaults ──────────────────────────────────────────────
1349
1350    #[test]
1351    fn config_defaults() {
1352        let config = AutoConfig::default();
1353        assert_eq!(config.group_id, DEFAULT_GROUP_ID);
1354        assert_eq!(config.discovery_scope, SCOPE_LINK);
1355        assert_eq!(config.discovery_port, DEFAULT_DISCOVERY_PORT);
1356        assert_eq!(config.data_port, DEFAULT_DATA_PORT);
1357        assert_eq!(
1358            config.multicast_address_type,
1359            MULTICAST_TEMPORARY_ADDRESS_TYPE
1360        );
1361        assert_eq!(config.configured_bitrate, BITRATE_GUESS);
1362        assert!(config.allowed_interfaces.is_empty());
1363        assert!(config.ignored_interfaces.is_empty());
1364    }
1365
1366    // ── Constants ────────────────────────────────────────────────────
1367
1368    #[test]
1369    fn constants_match_python() {
1370        assert_eq!(DEFAULT_DISCOVERY_PORT, 29716);
1371        assert_eq!(DEFAULT_DATA_PORT, 42671);
1372        assert_eq!(HW_MTU, 1196);
1373        assert_eq!(MULTI_IF_DEQUE_LEN, 48);
1374        assert!((MULTI_IF_DEQUE_TTL - 0.75).abs() < f64::EPSILON);
1375        assert!((PEERING_TIMEOUT - 22.0).abs() < f64::EPSILON);
1376        assert!((ANNOUNCE_INTERVAL - 1.6).abs() < f64::EPSILON);
1377        assert!((PEER_JOB_INTERVAL - 4.0).abs() < f64::EPSILON);
1378        assert!((MCAST_ECHO_TIMEOUT - 6.5).abs() < f64::EPSILON);
1379        assert_eq!(BITRATE_GUESS, 10_000_000);
1380    }
1381
1382    #[test]
1383    fn unicast_discovery_port() {
1384        // Python: unicast_discovery_port = discovery_port + 1
1385        let unicast_port = DEFAULT_DISCOVERY_PORT + 1;
1386        assert_eq!(unicast_port, 29717);
1387    }
1388
1389    #[test]
1390    fn reverse_peering_interval() {
1391        let interval = ANNOUNCE_INTERVAL * REVERSE_PEERING_MULTIPLIER;
1392        assert!((interval - 5.2).abs() < 0.01);
1393    }
1394}