Skip to main content

rns_net/interface/
auto.rs

1//! AutoInterface: Zero-configuration LAN auto-discovery via IPv6 multicast.
2//!
3//! Matches Python `AutoInterface` from `RNS/Interfaces/AutoInterface.py`.
4//!
5//! Thread model (per adopted network interface):
6//!   - Discovery sender: periodically sends discovery token via multicast
7//!   - Discovery receiver (multicast): validates tokens, adds peers
8//!   - Discovery receiver (unicast): validates reverse-peering tokens
9//!   - Data receiver: UDP server receiving unicast data from peers
10//!
11//! Additionally one shared thread:
12//!   - Peer jobs: periodically culls timed-out peers
13
14use std::collections::{HashMap, VecDeque};
15use std::io;
16use std::net::{Ipv6Addr, SocketAddrV6, UdpSocket};
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18use std::sync::{Arc, Mutex};
19use std::thread;
20use std::time::Duration;
21
22use rns_core::transport::types::InterfaceId;
23
24use crate::event::{Event, EventSender};
25use crate::interface::Writer;
26
27// ── Constants (matching Python AutoInterface) ──────────────────────────────
28
29/// Default UDP port for multicast discovery.
30pub const DEFAULT_DISCOVERY_PORT: u16 = 29716;
31
32/// Default UDP port for unicast data exchange.
33pub const DEFAULT_DATA_PORT: u16 = 42671;
34
35/// Default group identifier.
36pub const DEFAULT_GROUP_ID: &[u8] = b"reticulum";
37
38/// Default IFAC size for AutoInterface (bytes).
39pub const DEFAULT_IFAC_SIZE: usize = 16;
40
41/// Hardware MTU for AutoInterface packets.
42pub const HW_MTU: usize = 1196;
43
44/// Multicast scope: link-local.
45pub const SCOPE_LINK: &str = "2";
46/// Multicast scope: admin-local.
47pub const SCOPE_ADMIN: &str = "4";
48/// Multicast scope: site-local.
49pub const SCOPE_SITE: &str = "5";
50/// Multicast scope: organization-local.
51pub const SCOPE_ORGANISATION: &str = "8";
52/// Multicast scope: global.
53pub const SCOPE_GLOBAL: &str = "e";
54
55/// Permanent multicast address type.
56pub const MULTICAST_PERMANENT_ADDRESS_TYPE: &str = "0";
57/// Temporary multicast address type.
58pub const MULTICAST_TEMPORARY_ADDRESS_TYPE: &str = "1";
59
60/// How long before a peer is considered timed out (seconds).
61pub const PEERING_TIMEOUT: f64 = 22.0;
62
63/// How often to send multicast discovery announcements (seconds).
64pub const ANNOUNCE_INTERVAL: f64 = 1.6;
65
66/// How often to run peer maintenance jobs (seconds).
67pub const PEER_JOB_INTERVAL: f64 = 4.0;
68
69/// Multicast echo timeout (seconds). Used for carrier detection.
70pub const MCAST_ECHO_TIMEOUT: f64 = 6.5;
71
72/// Default bitrate guess for AutoInterface (10 Mbps).
73pub const BITRATE_GUESS: u64 = 10_000_000;
74
75/// Deduplication deque size.
76pub const MULTI_IF_DEQUE_LEN: usize = 48;
77
78/// Deduplication deque entry TTL (seconds).
79pub const MULTI_IF_DEQUE_TTL: f64 = 0.75;
80
81/// Reverse peering interval multiplier (announce_interval * 3.25).
82pub const REVERSE_PEERING_MULTIPLIER: f64 = 3.25;
83
84/// Interfaces always ignored.
85pub const ALL_IGNORE_IFS: &[&str] = &["lo0"];
86
87// ── Configuration ──────────────────────────────────────────────────────────
88
89/// Configuration for an AutoInterface.
90#[derive(Debug, Clone)]
91pub struct AutoConfig {
92    pub name: String,
93    pub group_id: Vec<u8>,
94    pub discovery_scope: String,
95    pub discovery_port: u16,
96    pub data_port: u16,
97    pub multicast_address_type: String,
98    pub allowed_interfaces: Vec<String>,
99    pub ignored_interfaces: Vec<String>,
100    pub configured_bitrate: u64,
101    /// Base interface ID. Per-peer IDs will be assigned dynamically.
102    pub interface_id: InterfaceId,
103}
104
105impl Default for AutoConfig {
106    fn default() -> Self {
107        AutoConfig {
108            name: String::new(),
109            group_id: DEFAULT_GROUP_ID.to_vec(),
110            discovery_scope: SCOPE_LINK.to_string(),
111            discovery_port: DEFAULT_DISCOVERY_PORT,
112            data_port: DEFAULT_DATA_PORT,
113            multicast_address_type: MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string(),
114            allowed_interfaces: Vec::new(),
115            ignored_interfaces: Vec::new(),
116            configured_bitrate: BITRATE_GUESS,
117            interface_id: InterfaceId(0),
118        }
119    }
120}
121
122// ── Multicast address derivation ───────────────────────────────────────────
123
124/// Derive the IPv6 multicast discovery address from group_id, scope, and address type.
125///
126/// Algorithm (matching Python):
127///   1. group_hash = SHA-256(group_id)
128///   2. Build suffix from hash bytes 2..14 as 6 little-endian 16-bit words
129///   3. First word is hardcoded "0"
130///   4. Prefix = "ff" + address_type + scope
131pub fn derive_multicast_address(group_id: &[u8], address_type: &str, scope: &str) -> String {
132    let group_hash = rns_crypto::sha256::sha256(group_id);
133    let g = &group_hash;
134
135    // Build 6 LE 16-bit words from bytes 2..14
136    let w1 = (g[2] as u16) << 8 | g[3] as u16;
137    let w2 = (g[4] as u16) << 8 | g[5] as u16;
138    let w3 = (g[6] as u16) << 8 | g[7] as u16;
139    let w4 = (g[8] as u16) << 8 | g[9] as u16;
140    let w5 = (g[10] as u16) << 8 | g[11] as u16;
141    let w6 = (g[12] as u16) << 8 | g[13] as u16;
142
143    format!(
144        "ff{}{}:0:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
145        address_type, scope, w1, w2, w3, w4, w5, w6
146    )
147}
148
149/// Parse a multicast address string into an Ipv6Addr.
150pub fn parse_multicast_addr(addr: &str) -> Option<Ipv6Addr> {
151    addr.parse::<Ipv6Addr>().ok()
152}
153
154// ── Discovery token ────────────────────────────────────────────────────────
155
156/// Compute the discovery token: SHA-256(group_id + link_local_address_string).
157pub fn compute_discovery_token(group_id: &[u8], link_local_addr: &str) -> [u8; 32] {
158    let mut input = group_id.to_vec();
159    input.extend_from_slice(link_local_addr.as_bytes());
160    rns_crypto::sha256::sha256(&input)
161}
162
163// ── Network interface enumeration ──────────────────────────────────────────
164
165/// Information about a local network interface with an IPv6 link-local address.
166#[derive(Debug, Clone)]
167pub struct LocalInterface {
168    pub name: String,
169    pub link_local_addr: String,
170    pub index: u32,
171}
172
173/// Enumerate network interfaces that have IPv6 link-local addresses (fe80::/10).
174///
175/// Uses `libc::getifaddrs()`. Filters by allowed/ignored interface lists.
176pub fn enumerate_interfaces(allowed: &[String], ignored: &[String]) -> Vec<LocalInterface> {
177    let mut result = Vec::new();
178
179    unsafe {
180        let mut ifaddrs: *mut libc::ifaddrs = std::ptr::null_mut();
181        if libc::getifaddrs(&mut ifaddrs) != 0 {
182            return result;
183        }
184
185        let mut current = ifaddrs;
186        while !current.is_null() {
187            let ifa = &*current;
188            current = ifa.ifa_next;
189
190            // Must have an address
191            if ifa.ifa_addr.is_null() {
192                continue;
193            }
194
195            // Must be AF_INET6
196            if (*ifa.ifa_addr).sa_family as i32 != libc::AF_INET6 {
197                continue;
198            }
199
200            // Get interface name
201            let name = match std::ffi::CStr::from_ptr(ifa.ifa_name).to_str() {
202                Ok(s) => s.to_string(),
203                Err(_) => continue,
204            };
205
206            // Check global ignore list
207            if ALL_IGNORE_IFS.iter().any(|&ig| ig == name) {
208                if !allowed.iter().any(|a| a == &name) {
209                    continue;
210                }
211            }
212
213            // Check ignored interfaces
214            if ignored.iter().any(|ig| ig == &name) {
215                continue;
216            }
217
218            // Check allowed interfaces (if non-empty, only allow those)
219            if !allowed.is_empty() && !allowed.iter().any(|a| a == &name) {
220                continue;
221            }
222
223            // Extract IPv6 address
224            let sa6 = ifa.ifa_addr as *const libc::sockaddr_in6;
225            let addr_bytes = (*sa6).sin6_addr.s6_addr;
226            let ipv6 = Ipv6Addr::from(addr_bytes);
227
228            // Must be link-local (fe80::/10)
229            let octets = ipv6.octets();
230            if octets[0] != 0xfe || (octets[1] & 0xc0) != 0x80 {
231                continue;
232            }
233
234            // Format the address (drop scope ID, matching Python's descope_linklocal)
235            let addr_str = format!("{}", ipv6);
236
237            // Get interface index
238            let index = libc::if_nametoindex(ifa.ifa_name);
239            if index == 0 {
240                continue;
241            }
242
243            // Avoid duplicates (same interface may appear multiple times)
244            if result.iter().any(|li: &LocalInterface| li.name == name) {
245                continue;
246            }
247
248            result.push(LocalInterface {
249                name,
250                link_local_addr: addr_str,
251                index,
252            });
253        }
254
255        libc::freeifaddrs(ifaddrs);
256    }
257
258    result
259}
260
261// ── Peer tracking ──────────────────────────────────────────────────────────
262
263/// A discovered peer.
264struct AutoPeer {
265    interface_id: InterfaceId,
266    #[allow(dead_code)]
267    link_local_addr: String,
268    #[allow(dead_code)]
269    ifname: String,
270    last_heard: f64,
271}
272
273/// Writer that sends UDP unicast data to a peer.
274struct UdpWriter {
275    socket: UdpSocket,
276    target: SocketAddrV6,
277}
278
279impl Writer for UdpWriter {
280    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
281        self.socket.send_to(data, self.target)?;
282        Ok(())
283    }
284}
285
286/// Shared state for the AutoInterface across all threads.
287struct SharedState {
288    /// Known peers: link_local_addr → AutoPeer
289    peers: HashMap<String, AutoPeer>,
290    /// Our own link-local addresses (for echo detection)
291    link_local_addresses: Vec<String>,
292    /// Deduplication deque: (hash, timestamp)
293    dedup_deque: VecDeque<([u8; 32], f64)>,
294    /// Flag set when final_init is done
295    online: bool,
296    /// Next dynamic interface ID
297    next_id: Arc<AtomicU64>,
298}
299
300impl SharedState {
301    fn new(next_id: Arc<AtomicU64>) -> Self {
302        SharedState {
303            peers: HashMap::new(),
304            link_local_addresses: Vec::new(),
305            dedup_deque: VecDeque::new(),
306            online: false,
307            next_id,
308        }
309    }
310
311    /// Check dedup deque for a data hash.
312    fn is_duplicate(&self, hash: &[u8; 32], now: f64) -> bool {
313        for (h, ts) in &self.dedup_deque {
314            if h == hash && now < *ts + MULTI_IF_DEQUE_TTL {
315                return true;
316            }
317        }
318        false
319    }
320
321    /// Add to dedup deque, trimming to max length.
322    fn add_dedup(&mut self, hash: [u8; 32], now: f64) {
323        self.dedup_deque.push_back((hash, now));
324        while self.dedup_deque.len() > MULTI_IF_DEQUE_LEN {
325            self.dedup_deque.pop_front();
326        }
327    }
328
329    /// Refresh a peer's last_heard timestamp.
330    fn refresh_peer(&mut self, addr: &str, now: f64) {
331        if let Some(peer) = self.peers.get_mut(addr) {
332            peer.last_heard = now;
333        }
334    }
335}
336
337// ── Start function ─────────────────────────────────────────────────────────
338
339/// Start an AutoInterface. Discovers local IPv6 link-local interfaces,
340/// sets up multicast discovery, and creates UDP data servers.
341///
342/// Returns a vec of (InterfaceId, Writer) for each initial peer (typically empty
343/// since peers are discovered dynamically via InterfaceUp events).
344pub fn start(
345    config: AutoConfig,
346    tx: EventSender,
347    next_dynamic_id: Arc<AtomicU64>,
348) -> io::Result<()> {
349    let interfaces = enumerate_interfaces(&config.allowed_interfaces, &config.ignored_interfaces);
350
351    if interfaces.is_empty() {
352        log::warn!(
353            "[{}] No suitable IPv6 link-local interfaces found",
354            config.name,
355        );
356        return Ok(());
357    }
358
359    let group_id = config.group_id.clone();
360    let mcast_addr_str = derive_multicast_address(
361        &group_id,
362        &config.multicast_address_type,
363        &config.discovery_scope,
364    );
365
366    let mcast_ip = match parse_multicast_addr(&mcast_addr_str) {
367        Some(ip) => ip,
368        None => {
369            return Err(io::Error::new(
370                io::ErrorKind::InvalidData,
371                format!("Invalid multicast address: {}", mcast_addr_str),
372            ));
373        }
374    };
375
376    let discovery_port = config.discovery_port;
377    let unicast_discovery_port = config.discovery_port + 1;
378    let data_port = config.data_port;
379    let name = config.name.clone();
380    let announce_interval = ANNOUNCE_INTERVAL;
381    let configured_bitrate = config.configured_bitrate;
382
383    let shared = Arc::new(Mutex::new(SharedState::new(next_dynamic_id)));
384    let running = Arc::new(AtomicBool::new(true));
385
386    // Record our own link-local addresses
387    {
388        let mut state = shared.lock().unwrap();
389        for iface in &interfaces {
390            state
391                .link_local_addresses
392                .push(iface.link_local_addr.clone());
393        }
394    }
395
396    log::info!(
397        "[{}] AutoInterface starting with {} local interfaces, multicast {}",
398        name,
399        interfaces.len(),
400        mcast_addr_str,
401    );
402
403    // Per-interface: set up discovery sockets and threads
404    for local_iface in &interfaces {
405        let ifname = local_iface.name.clone();
406        let link_local = local_iface.link_local_addr.clone();
407        let if_index = local_iface.index;
408
409        // ─── Multicast discovery socket ───────────────────────────────
410        let mcast_socket = create_multicast_recv_socket(&mcast_ip, discovery_port, if_index)?;
411
412        // ─── Unicast discovery socket ─────────────────────────────────
413        let unicast_socket =
414            create_unicast_recv_socket(&link_local, unicast_discovery_port, if_index)?;
415
416        // ─── Discovery sender thread ──────────────────────────────────
417        {
418            let group_id = group_id.clone();
419            let link_local = link_local.clone();
420            let running = running.clone();
421            let name = name.clone();
422
423            thread::Builder::new()
424                .name(format!("auto-disc-tx-{}", ifname))
425                .spawn(move || {
426                    discovery_sender_loop(
427                        &group_id,
428                        &link_local,
429                        &mcast_ip,
430                        discovery_port,
431                        if_index,
432                        announce_interval,
433                        &running,
434                        &name,
435                    );
436                })?;
437        }
438
439        // ─── Multicast discovery receiver thread ──────────────────────
440        {
441            let group_id = group_id.clone();
442            let shared = shared.clone();
443            let tx = tx.clone();
444            let running = running.clone();
445            let name = name.clone();
446
447            thread::Builder::new()
448                .name(format!("auto-disc-rx-{}", ifname))
449                .spawn(move || {
450                    discovery_receiver_loop(
451                        mcast_socket,
452                        &group_id,
453                        shared,
454                        tx,
455                        &running,
456                        &name,
457                        data_port,
458                        configured_bitrate,
459                    );
460                })?;
461        }
462
463        // ─── Unicast discovery receiver thread ────────────────────────
464        {
465            let group_id = group_id.clone();
466            let shared = shared.clone();
467            let tx = tx.clone();
468            let running = running.clone();
469            let name = name.clone();
470
471            thread::Builder::new()
472                .name(format!("auto-udisc-rx-{}", ifname))
473                .spawn(move || {
474                    discovery_receiver_loop(
475                        unicast_socket,
476                        &group_id,
477                        shared,
478                        tx,
479                        &running,
480                        &name,
481                        data_port,
482                        configured_bitrate,
483                    );
484                })?;
485        }
486
487        // ─── Data receiver thread ─────────────────────────────────────
488        {
489            let link_local = local_iface.link_local_addr.clone();
490            let shared = shared.clone();
491            let tx = tx.clone();
492            let running = running.clone();
493            let name = name.clone();
494
495            let data_socket = create_data_recv_socket(&link_local, data_port, if_index)?;
496
497            thread::Builder::new()
498                .name(format!("auto-data-rx-{}", local_iface.name))
499                .spawn(move || {
500                    data_receiver_loop(data_socket, shared, tx, &running, &name);
501                })?;
502        }
503    }
504
505    // ─── Peer jobs thread ─────────────────────────────────────────────
506    {
507        let shared = shared.clone();
508        let tx = tx.clone();
509        let running = running.clone();
510        let name = name.clone();
511
512        thread::Builder::new()
513            .name(format!("auto-peer-jobs-{}", name))
514            .spawn(move || {
515                peer_jobs_loop(shared, tx, &running, &name);
516            })?;
517    }
518
519    // Wait for initial peering
520    let peering_wait = Duration::from_secs_f64(announce_interval * 1.2);
521    thread::sleep(peering_wait);
522
523    // Mark as online
524    {
525        let mut state = shared.lock().unwrap();
526        state.online = true;
527    }
528
529    log::info!("[{}] AutoInterface online", config.name);
530
531    Ok(())
532}
533
534// ── Socket creation helpers ────────────────────────────────────────────────
535
536fn create_multicast_recv_socket(
537    mcast_ip: &Ipv6Addr,
538    port: u16,
539    if_index: u32,
540) -> io::Result<UdpSocket> {
541    let socket = socket2::Socket::new(
542        socket2::Domain::IPV6,
543        socket2::Type::DGRAM,
544        Some(socket2::Protocol::UDP),
545    )?;
546
547    socket.set_reuse_address(true)?;
548    #[cfg(not(target_os = "windows"))]
549    socket.set_reuse_port(true)?;
550
551    // Bind to [::]:port on the specific interface
552    let bind_addr = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, port, 0, if_index);
553    socket.bind(&bind_addr.into())?;
554
555    // Join multicast group on the specific interface
556    socket.join_multicast_v6(mcast_ip, if_index)?;
557
558    socket.set_nonblocking(false)?;
559    let std_socket: UdpSocket = socket.into();
560    std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
561    Ok(std_socket)
562}
563
564fn create_unicast_recv_socket(link_local: &str, port: u16, if_index: u32) -> io::Result<UdpSocket> {
565    let ip: Ipv6Addr = link_local
566        .parse()
567        .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("bad IPv6: {}", e)))?;
568
569    let socket = socket2::Socket::new(
570        socket2::Domain::IPV6,
571        socket2::Type::DGRAM,
572        Some(socket2::Protocol::UDP),
573    )?;
574
575    socket.set_reuse_address(true)?;
576    #[cfg(not(target_os = "windows"))]
577    socket.set_reuse_port(true)?;
578
579    let bind_addr = SocketAddrV6::new(ip, port, 0, if_index);
580    socket.bind(&bind_addr.into())?;
581
582    socket.set_nonblocking(false)?;
583    let std_socket: UdpSocket = socket.into();
584    std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
585    Ok(std_socket)
586}
587
588fn create_data_recv_socket(link_local: &str, port: u16, if_index: u32) -> io::Result<UdpSocket> {
589    let ip: Ipv6Addr = link_local
590        .parse()
591        .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("bad IPv6: {}", e)))?;
592
593    let socket = socket2::Socket::new(
594        socket2::Domain::IPV6,
595        socket2::Type::DGRAM,
596        Some(socket2::Protocol::UDP),
597    )?;
598
599    socket.set_reuse_address(true)?;
600    #[cfg(not(target_os = "windows"))]
601    socket.set_reuse_port(true)?;
602
603    let bind_addr = SocketAddrV6::new(ip, port, 0, if_index);
604    socket.bind(&bind_addr.into())?;
605
606    socket.set_nonblocking(false)?;
607    let std_socket: UdpSocket = socket.into();
608    std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
609    Ok(std_socket)
610}
611
612// ── Thread loops ───────────────────────────────────────────────────────────
613
614/// Discovery sender: periodically sends discovery token via multicast.
615fn discovery_sender_loop(
616    group_id: &[u8],
617    link_local_addr: &str,
618    mcast_ip: &Ipv6Addr,
619    discovery_port: u16,
620    if_index: u32,
621    interval: f64,
622    running: &AtomicBool,
623    name: &str,
624) {
625    let token = compute_discovery_token(group_id, link_local_addr);
626    let sleep_dur = Duration::from_secs_f64(interval);
627
628    while running.load(Ordering::Relaxed) {
629        // Create a fresh socket for each send (matches Python)
630        if let Ok(socket) = UdpSocket::bind("[::]:0") {
631            // Set multicast interface
632            let if_bytes = if_index.to_ne_bytes();
633            unsafe {
634                libc::setsockopt(
635                    socket_fd(&socket),
636                    libc::IPPROTO_IPV6,
637                    libc::IPV6_MULTICAST_IF,
638                    if_bytes.as_ptr() as *const libc::c_void,
639                    4,
640                );
641            }
642
643            let target = SocketAddrV6::new(*mcast_ip, discovery_port, 0, 0);
644            if let Err(e) = socket.send_to(&token, target) {
645                log::debug!("[{}] multicast send error: {}", name, e);
646            }
647        }
648
649        thread::sleep(sleep_dur);
650    }
651}
652
653/// Discovery receiver: listens for discovery tokens and adds peers.
654fn discovery_receiver_loop(
655    socket: UdpSocket,
656    group_id: &[u8],
657    shared: Arc<Mutex<SharedState>>,
658    tx: EventSender,
659    running: &AtomicBool,
660    name: &str,
661    data_port: u16,
662    configured_bitrate: u64,
663) {
664    let mut buf = [0u8; 1024];
665
666    while running.load(Ordering::Relaxed) {
667        match socket.recv_from(&mut buf) {
668            Ok((n, src)) => {
669                if n < 32 {
670                    continue;
671                }
672
673                // Extract source IPv6 address
674                let src_addr = match src {
675                    std::net::SocketAddr::V6(v6) => v6,
676                    _ => continue,
677                };
678                let src_ip = format!("{}", src_addr.ip());
679
680                let peering_hash = &buf[..32];
681                let expected = compute_discovery_token(group_id, &src_ip);
682
683                if peering_hash != expected {
684                    log::debug!("[{}] invalid peering hash from {}", name, src_ip);
685                    continue;
686                }
687
688                // Check if online
689                let state = shared.lock().unwrap();
690                if !state.online {
691                    // Not fully initialized yet, but still accept for initial peering
692                    // (Python processes after final_init_done)
693                }
694
695                // Check if it's our own echo
696                if state.link_local_addresses.contains(&src_ip) {
697                    // Multicast echo from ourselves — just record it
698                    drop(state);
699                    continue;
700                }
701
702                // Check if already known
703                if state.peers.contains_key(&src_ip) {
704                    let now = crate::time::now();
705                    drop(state);
706                    let mut state = shared.lock().unwrap();
707                    state.refresh_peer(&src_ip, now);
708                    continue;
709                }
710                drop(state);
711
712                // New peer! Create a data writer to send to them.
713                add_peer(&shared, &tx, &src_ip, data_port, name, configured_bitrate);
714            }
715            Err(ref e)
716                if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut =>
717            {
718                // Timeout, loop again
719                continue;
720            }
721            Err(e) => {
722                log::warn!("[{}] discovery recv error: {}", name, e);
723                if !running.load(Ordering::Relaxed) {
724                    return;
725                }
726                thread::sleep(Duration::from_millis(100));
727            }
728        }
729    }
730}
731
732/// Add a new peer, creating a writer and emitting InterfaceUp.
733fn add_peer(
734    shared: &Arc<Mutex<SharedState>>,
735    tx: &EventSender,
736    peer_addr: &str,
737    data_port: u16,
738    name: &str,
739    configured_bitrate: u64,
740) {
741    let peer_ip: Ipv6Addr = match peer_addr.parse() {
742        Ok(ip) => ip,
743        Err(_) => return,
744    };
745
746    // Create UDP writer to send data to this peer
747    let send_socket = match UdpSocket::bind("[::]:0") {
748        Ok(s) => s,
749        Err(e) => {
750            log::warn!(
751                "[{}] failed to create writer for peer {}: {}",
752                name,
753                peer_addr,
754                e
755            );
756            return;
757        }
758    };
759
760    let target = SocketAddrV6::new(peer_ip, data_port, 0, 0);
761
762    let mut state = shared.lock().unwrap();
763
764    // Double-check not already added (race)
765    if state.peers.contains_key(peer_addr) {
766        state.refresh_peer(peer_addr, crate::time::now());
767        return;
768    }
769
770    let peer_id = InterfaceId(state.next_id.fetch_add(1, Ordering::Relaxed));
771
772    // Create a boxed writer for the driver
773    let driver_writer: Box<dyn Writer> = Box::new(UdpWriter {
774        socket: send_socket,
775        target,
776    });
777
778    let peer_info = rns_core::transport::types::InterfaceInfo {
779        id: peer_id,
780        name: format!("{}:{}", name, peer_addr),
781        mode: rns_core::constants::MODE_FULL,
782        out_capable: true,
783        in_capable: true,
784        bitrate: Some(configured_bitrate),
785        announce_rate_target: None,
786        announce_rate_grace: 0,
787        announce_rate_penalty: 0.0,
788        announce_cap: rns_core::constants::ANNOUNCE_CAP,
789        is_local_client: false,
790        wants_tunnel: false,
791        tunnel_id: None,
792        mtu: 1400,
793        ia_freq: 0.0,
794        started: 0.0,
795        ingress_control: true,
796    };
797
798    let now = crate::time::now();
799    state.peers.insert(
800        peer_addr.to_string(),
801        AutoPeer {
802            interface_id: peer_id,
803            link_local_addr: peer_addr.to_string(),
804            ifname: String::new(),
805            last_heard: now,
806        },
807    );
808
809    log::info!(
810        "[{}] Peer discovered: {} (id={})",
811        name,
812        peer_addr,
813        peer_id.0
814    );
815
816    // Notify driver of new dynamic interface
817    let _ = tx.send(Event::InterfaceUp(
818        peer_id,
819        Some(driver_writer),
820        Some(peer_info),
821    ));
822}
823
824/// Data receiver: receives unicast UDP data from peers and dispatches as frames.
825fn data_receiver_loop(
826    socket: UdpSocket,
827    shared: Arc<Mutex<SharedState>>,
828    tx: EventSender,
829    running: &AtomicBool,
830    name: &str,
831) {
832    let mut buf = [0u8; HW_MTU + 64]; // a bit extra
833
834    while running.load(Ordering::Relaxed) {
835        match socket.recv_from(&mut buf) {
836            Ok((n, src)) => {
837                if n == 0 {
838                    continue;
839                }
840
841                let src_addr = match src {
842                    std::net::SocketAddr::V6(v6) => v6,
843                    _ => continue,
844                };
845                let src_ip = format!("{}", src_addr.ip());
846                let data = &buf[..n];
847
848                let now = crate::time::now();
849                let data_hash = rns_crypto::sha256::sha256(data);
850
851                let mut state = shared.lock().unwrap();
852
853                if !state.online {
854                    continue;
855                }
856
857                // Deduplication
858                if state.is_duplicate(&data_hash, now) {
859                    continue;
860                }
861                state.add_dedup(data_hash, now);
862
863                // Refresh peer
864                state.refresh_peer(&src_ip, now);
865
866                // Find the interface ID for this peer
867                let iface_id = match state.peers.get(&src_ip) {
868                    Some(peer) => peer.interface_id,
869                    None => {
870                        // Unknown peer, skip
871                        continue;
872                    }
873                };
874
875                drop(state);
876
877                if tx
878                    .send(Event::Frame {
879                        interface_id: iface_id,
880                        data: data.to_vec(),
881                    })
882                    .is_err()
883                {
884                    return;
885                }
886            }
887            Err(ref e)
888                if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut =>
889            {
890                continue;
891            }
892            Err(e) => {
893                log::warn!("[{}] data recv error: {}", name, e);
894                if !running.load(Ordering::Relaxed) {
895                    return;
896                }
897                thread::sleep(Duration::from_millis(100));
898            }
899        }
900    }
901}
902
903/// Peer jobs: periodically cull timed-out peers.
904fn peer_jobs_loop(
905    shared: Arc<Mutex<SharedState>>,
906    tx: EventSender,
907    running: &AtomicBool,
908    name: &str,
909) {
910    let interval = Duration::from_secs_f64(PEER_JOB_INTERVAL);
911
912    while running.load(Ordering::Relaxed) {
913        thread::sleep(interval);
914
915        let now = crate::time::now();
916        let mut timed_out = Vec::new();
917
918        {
919            let state = shared.lock().unwrap();
920            for (addr, peer) in &state.peers {
921                if now > peer.last_heard + PEERING_TIMEOUT {
922                    timed_out.push((addr.clone(), peer.interface_id));
923                }
924            }
925        }
926
927        for (addr, iface_id) in &timed_out {
928            log::info!("[{}] Peer timed out: {}", name, addr);
929            let mut state = shared.lock().unwrap();
930            state.peers.remove(addr.as_str());
931            let _ = tx.send(Event::InterfaceDown(*iface_id));
932        }
933    }
934}
935
936// ── Helper ─────────────────────────────────────────────────────────────────
937
938/// Get the raw file descriptor from a UdpSocket (for setsockopt).
939#[cfg(unix)]
940fn socket_fd(socket: &UdpSocket) -> i32 {
941    use std::os::unix::io::AsRawFd;
942    socket.as_raw_fd()
943}
944
945#[cfg(not(unix))]
946fn socket_fd(_socket: &UdpSocket) -> i32 {
947    0
948}
949
950// ── Factory implementation ─────────────────────────────────────────────────
951
952use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
953
954/// Factory for `AutoInterface`.
955pub struct AutoFactory;
956
957impl InterfaceFactory for AutoFactory {
958    fn type_name(&self) -> &str {
959        "AutoInterface"
960    }
961
962    fn parse_config(
963        &self,
964        name: &str,
965        id: InterfaceId,
966        params: &HashMap<String, String>,
967    ) -> Result<Box<dyn InterfaceConfigData>, String> {
968        let group_id = params
969            .get("group_id")
970            .map(|v| v.as_bytes().to_vec())
971            .unwrap_or_else(|| DEFAULT_GROUP_ID.to_vec());
972
973        let discovery_scope = params
974            .get("discovery_scope")
975            .map(|v| match v.to_lowercase().as_str() {
976                "link" => SCOPE_LINK.to_string(),
977                "admin" => SCOPE_ADMIN.to_string(),
978                "site" => SCOPE_SITE.to_string(),
979                "organisation" | "organization" => SCOPE_ORGANISATION.to_string(),
980                "global" => SCOPE_GLOBAL.to_string(),
981                _ => v.clone(),
982            })
983            .unwrap_or_else(|| SCOPE_LINK.to_string());
984
985        let discovery_port = params
986            .get("discovery_port")
987            .and_then(|v| v.parse().ok())
988            .unwrap_or(DEFAULT_DISCOVERY_PORT);
989
990        let data_port = params
991            .get("data_port")
992            .and_then(|v| v.parse().ok())
993            .unwrap_or(DEFAULT_DATA_PORT);
994
995        let multicast_address_type = params
996            .get("multicast_address_type")
997            .map(|v| match v.to_lowercase().as_str() {
998                "permanent" => MULTICAST_PERMANENT_ADDRESS_TYPE.to_string(),
999                "temporary" => MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string(),
1000                _ => v.clone(),
1001            })
1002            .unwrap_or_else(|| MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string());
1003
1004        let configured_bitrate = params
1005            .get("configured_bitrate")
1006            .or_else(|| params.get("bitrate"))
1007            .and_then(|v| v.parse().ok())
1008            .unwrap_or(BITRATE_GUESS);
1009
1010        let allowed_interfaces = params
1011            .get("devices")
1012            .or_else(|| params.get("allowed_interfaces"))
1013            .map(|v| {
1014                v.split(',')
1015                    .map(|s| s.trim().to_string())
1016                    .filter(|s| !s.is_empty())
1017                    .collect()
1018            })
1019            .unwrap_or_default();
1020
1021        let ignored_interfaces = params
1022            .get("ignored_devices")
1023            .or_else(|| params.get("ignored_interfaces"))
1024            .map(|v| {
1025                v.split(',')
1026                    .map(|s| s.trim().to_string())
1027                    .filter(|s| !s.is_empty())
1028                    .collect()
1029            })
1030            .unwrap_or_default();
1031
1032        Ok(Box::new(AutoConfig {
1033            name: name.to_string(),
1034            group_id,
1035            discovery_scope,
1036            discovery_port,
1037            data_port,
1038            multicast_address_type,
1039            allowed_interfaces,
1040            ignored_interfaces,
1041            configured_bitrate,
1042            interface_id: id,
1043        }))
1044    }
1045
1046    fn start(
1047        &self,
1048        config: Box<dyn InterfaceConfigData>,
1049        ctx: StartContext,
1050    ) -> std::io::Result<StartResult> {
1051        let auto_config = *config.into_any().downcast::<AutoConfig>().map_err(|_| {
1052            std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
1053        })?;
1054
1055        start(auto_config, ctx.tx, ctx.next_dynamic_id)?;
1056        Ok(StartResult::Listener)
1057    }
1058}
1059
1060// ── Tests ──────────────────────────────────────────────────────────────────
1061
1062#[cfg(test)]
1063mod tests {
1064    use super::*;
1065
1066    // ── Multicast address derivation ──────────────────────────────────
1067
1068    #[test]
1069    fn multicast_address_default_group() {
1070        // Python vector: ff12:0:d70b:fb1c:16e4:5e39:485e:31e1
1071        let addr = derive_multicast_address(
1072            DEFAULT_GROUP_ID,
1073            MULTICAST_TEMPORARY_ADDRESS_TYPE,
1074            SCOPE_LINK,
1075        );
1076        assert_eq!(addr, "ff12:0:d70b:fb1c:16e4:5e39:485e:31e1");
1077    }
1078
1079    #[test]
1080    fn multicast_address_custom_group() {
1081        let addr =
1082            derive_multicast_address(b"testgroup", MULTICAST_TEMPORARY_ADDRESS_TYPE, SCOPE_LINK);
1083        // Just verify format
1084        assert!(addr.starts_with("ff12:0:"));
1085        // Must be different from default
1086        assert_ne!(addr, "ff12:0:d70b:fb1c:16e4:5e39:485e:31e1");
1087    }
1088
1089    #[test]
1090    fn multicast_address_scope_admin() {
1091        let addr = derive_multicast_address(
1092            DEFAULT_GROUP_ID,
1093            MULTICAST_TEMPORARY_ADDRESS_TYPE,
1094            SCOPE_ADMIN,
1095        );
1096        assert!(addr.starts_with("ff14:0:"));
1097    }
1098
1099    #[test]
1100    fn multicast_address_permanent_type() {
1101        let addr = derive_multicast_address(
1102            DEFAULT_GROUP_ID,
1103            MULTICAST_PERMANENT_ADDRESS_TYPE,
1104            SCOPE_LINK,
1105        );
1106        assert!(addr.starts_with("ff02:0:"));
1107    }
1108
1109    #[test]
1110    fn multicast_address_parseable() {
1111        let addr = derive_multicast_address(
1112            DEFAULT_GROUP_ID,
1113            MULTICAST_TEMPORARY_ADDRESS_TYPE,
1114            SCOPE_LINK,
1115        );
1116        let ip = parse_multicast_addr(&addr);
1117        assert!(ip.is_some());
1118        assert!(ip.unwrap().is_multicast());
1119    }
1120
1121    // ── Discovery token ──────────────────────────────────────────────
1122
1123    #[test]
1124    fn discovery_token_interop() {
1125        // Python vector: fe80::1 → 97b25576749ea936b0d8a8536ffaf442d157cf47d460dcf13c48b7bd18b6c163
1126        let token = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::1");
1127        let expected = "97b25576749ea936b0d8a8536ffaf442d157cf47d460dcf13c48b7bd18b6c163";
1128        let got = token
1129            .iter()
1130            .map(|b| format!("{:02x}", b))
1131            .collect::<String>();
1132        assert_eq!(got, expected);
1133    }
1134
1135    #[test]
1136    fn discovery_token_interop_2() {
1137        // Python vector: fe80::dead:beef:1234:5678
1138        let token = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::dead:beef:1234:5678");
1139        let expected = "46b6ec7595504b6a35f06bd4bfff71567fb82fcf2706cd361bab20409c42d072";
1140        let got = token
1141            .iter()
1142            .map(|b| format!("{:02x}", b))
1143            .collect::<String>();
1144        assert_eq!(got, expected);
1145    }
1146
1147    #[test]
1148    fn discovery_token_different_groups() {
1149        let t1 = compute_discovery_token(b"reticulum", "fe80::1");
1150        let t2 = compute_discovery_token(b"othergroup", "fe80::1");
1151        assert_ne!(t1, t2);
1152    }
1153
1154    #[test]
1155    fn discovery_token_different_addrs() {
1156        let t1 = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::1");
1157        let t2 = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::2");
1158        assert_ne!(t1, t2);
1159    }
1160
1161    // ── Deduplication ────────────────────────────────────────────────
1162
1163    #[test]
1164    fn dedup_basic() {
1165        let next_id = Arc::new(AtomicU64::new(1));
1166        let mut state = SharedState::new(next_id);
1167
1168        let hash = [0xAA; 32];
1169        let now = 1000.0;
1170
1171        assert!(!state.is_duplicate(&hash, now));
1172        state.add_dedup(hash, now);
1173        assert!(state.is_duplicate(&hash, now));
1174    }
1175
1176    #[test]
1177    fn dedup_expired() {
1178        let next_id = Arc::new(AtomicU64::new(1));
1179        let mut state = SharedState::new(next_id);
1180
1181        let hash = [0xBB; 32];
1182        state.add_dedup(hash, 1000.0);
1183
1184        // Within TTL
1185        assert!(state.is_duplicate(&hash, 1000.5));
1186        // Expired
1187        assert!(!state.is_duplicate(&hash, 1001.0));
1188    }
1189
1190    #[test]
1191    fn dedup_max_length() {
1192        let next_id = Arc::new(AtomicU64::new(1));
1193        let mut state = SharedState::new(next_id);
1194
1195        // Fill beyond max
1196        for i in 0..MULTI_IF_DEQUE_LEN + 10 {
1197            let mut hash = [0u8; 32];
1198            hash[0] = (i & 0xFF) as u8;
1199            hash[1] = ((i >> 8) & 0xFF) as u8;
1200            state.add_dedup(hash, 1000.0);
1201        }
1202
1203        assert_eq!(state.dedup_deque.len(), MULTI_IF_DEQUE_LEN);
1204    }
1205
1206    // ── Peer tracking ────────────────────────────────────────────────
1207
1208    #[test]
1209    fn peer_refresh() {
1210        let next_id = Arc::new(AtomicU64::new(100));
1211        let mut state = SharedState::new(next_id);
1212
1213        state.peers.insert(
1214            "fe80::1".to_string(),
1215            AutoPeer {
1216                interface_id: InterfaceId(100),
1217                link_local_addr: "fe80::1".to_string(),
1218                ifname: "eth0".to_string(),
1219                last_heard: 1000.0,
1220            },
1221        );
1222
1223        state.refresh_peer("fe80::1", 2000.0);
1224        assert_eq!(state.peers["fe80::1"].last_heard, 2000.0);
1225    }
1226
1227    #[test]
1228    fn peer_not_found_refresh() {
1229        let next_id = Arc::new(AtomicU64::new(100));
1230        let mut state = SharedState::new(next_id);
1231        // Should not panic
1232        state.refresh_peer("fe80::999", 1000.0);
1233    }
1234
1235    // ── Network interface enumeration ────────────────────────────────
1236
1237    #[test]
1238    fn enumerate_returns_vec() {
1239        // This test just verifies the function runs without crashing.
1240        // Results depend on the system's network configuration.
1241        let interfaces = enumerate_interfaces(&[], &[]);
1242        // On CI/test machines, we may or may not have IPv6 link-local
1243        for iface in &interfaces {
1244            assert!(!iface.name.is_empty());
1245            assert!(iface.link_local_addr.starts_with("fe80"));
1246            assert!(iface.index > 0);
1247        }
1248    }
1249
1250    #[test]
1251    fn enumerate_with_ignored() {
1252        // Ignore everything
1253        let interfaces = enumerate_interfaces(
1254            &[],
1255            &[
1256                "lo".to_string(),
1257                "eth0".to_string(),
1258                "wlan0".to_string(),
1259                "enp0s3".to_string(),
1260                "docker0".to_string(),
1261            ],
1262        );
1263        // May still have some interfaces, but known ones should be filtered
1264        for iface in &interfaces {
1265            assert_ne!(iface.name, "lo");
1266            assert_ne!(iface.name, "eth0");
1267            assert_ne!(iface.name, "wlan0");
1268        }
1269    }
1270
1271    #[test]
1272    fn enumerate_with_allowed_nonexistent() {
1273        // Only allow an interface that doesn't exist
1274        let interfaces = enumerate_interfaces(&["nonexistent_if_12345".to_string()], &[]);
1275        assert!(interfaces.is_empty());
1276    }
1277
1278    // ── Config defaults ──────────────────────────────────────────────
1279
1280    #[test]
1281    fn config_defaults() {
1282        let config = AutoConfig::default();
1283        assert_eq!(config.group_id, DEFAULT_GROUP_ID);
1284        assert_eq!(config.discovery_scope, SCOPE_LINK);
1285        assert_eq!(config.discovery_port, DEFAULT_DISCOVERY_PORT);
1286        assert_eq!(config.data_port, DEFAULT_DATA_PORT);
1287        assert_eq!(
1288            config.multicast_address_type,
1289            MULTICAST_TEMPORARY_ADDRESS_TYPE
1290        );
1291        assert_eq!(config.configured_bitrate, BITRATE_GUESS);
1292        assert!(config.allowed_interfaces.is_empty());
1293        assert!(config.ignored_interfaces.is_empty());
1294    }
1295
1296    // ── Constants ────────────────────────────────────────────────────
1297
1298    #[test]
1299    fn constants_match_python() {
1300        assert_eq!(DEFAULT_DISCOVERY_PORT, 29716);
1301        assert_eq!(DEFAULT_DATA_PORT, 42671);
1302        assert_eq!(HW_MTU, 1196);
1303        assert_eq!(MULTI_IF_DEQUE_LEN, 48);
1304        assert!((MULTI_IF_DEQUE_TTL - 0.75).abs() < f64::EPSILON);
1305        assert!((PEERING_TIMEOUT - 22.0).abs() < f64::EPSILON);
1306        assert!((ANNOUNCE_INTERVAL - 1.6).abs() < f64::EPSILON);
1307        assert!((PEER_JOB_INTERVAL - 4.0).abs() < f64::EPSILON);
1308        assert!((MCAST_ECHO_TIMEOUT - 6.5).abs() < f64::EPSILON);
1309        assert_eq!(BITRATE_GUESS, 10_000_000);
1310    }
1311
1312    #[test]
1313    fn unicast_discovery_port() {
1314        // Python: unicast_discovery_port = discovery_port + 1
1315        let unicast_port = DEFAULT_DISCOVERY_PORT + 1;
1316        assert_eq!(unicast_port, 29717);
1317    }
1318
1319    #[test]
1320    fn reverse_peering_interval() {
1321        let interval = ANNOUNCE_INTERVAL * REVERSE_PEERING_MULTIPLIER;
1322        assert!((interval - 5.2).abs() < 0.01);
1323    }
1324}