Skip to main content

rns_net/interface/
auto.rs

1//! AutoInterface: Zero-configuration LAN auto-discovery via IPv6 multicast.
2//!
3//! Matches Python `AutoInterface` from `RNS/Interfaces/AutoInterface.py`.
4//!
5//! Thread model (per adopted network interface):
6//!   - Discovery sender: periodically sends discovery token via multicast
7//!   - Discovery receiver (multicast): validates tokens, adds peers
8//!   - Discovery receiver (unicast): validates reverse-peering tokens
9//!   - Data receiver: UDP server receiving unicast data from peers
10//!
11//! Additionally one shared thread:
12//!   - Peer jobs: periodically culls timed-out peers
13
14use std::collections::{HashMap, VecDeque};
15use std::io;
16use std::net::{Ipv6Addr, SocketAddrV6, UdpSocket};
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18use std::sync::{Arc, Mutex};
19use std::thread;
20use std::time::Duration;
21
22use rns_core::transport::types::InterfaceId;
23
24use crate::event::{Event, EventSender};
25use crate::interface::Writer;
26
27// ── Constants (matching Python AutoInterface) ──────────────────────────────
28
29/// Default UDP port for multicast discovery.
30pub const DEFAULT_DISCOVERY_PORT: u16 = 29716;
31
32/// Default UDP port for unicast data exchange.
33pub const DEFAULT_DATA_PORT: u16 = 42671;
34
35/// Default group identifier.
36pub const DEFAULT_GROUP_ID: &[u8] = b"reticulum";
37
38/// Default IFAC size for AutoInterface (bytes).
39pub const DEFAULT_IFAC_SIZE: usize = 16;
40
41/// Hardware MTU for AutoInterface packets.
42pub const HW_MTU: usize = 1196;
43
44/// Multicast scope: link-local.
45pub const SCOPE_LINK: &str = "2";
46/// Multicast scope: admin-local.
47pub const SCOPE_ADMIN: &str = "4";
48/// Multicast scope: site-local.
49pub const SCOPE_SITE: &str = "5";
50/// Multicast scope: organization-local.
51pub const SCOPE_ORGANISATION: &str = "8";
52/// Multicast scope: global.
53pub const SCOPE_GLOBAL: &str = "e";
54
55/// Permanent multicast address type.
56pub const MULTICAST_PERMANENT_ADDRESS_TYPE: &str = "0";
57/// Temporary multicast address type.
58pub const MULTICAST_TEMPORARY_ADDRESS_TYPE: &str = "1";
59
60/// How long before a peer is considered timed out (seconds).
61pub const PEERING_TIMEOUT: f64 = 22.0;
62
63/// How often to send multicast discovery announcements (seconds).
64pub const ANNOUNCE_INTERVAL: f64 = 1.6;
65
66/// How often to run peer maintenance jobs (seconds).
67pub const PEER_JOB_INTERVAL: f64 = 4.0;
68
69/// Multicast echo timeout (seconds). Used for carrier detection.
70pub const MCAST_ECHO_TIMEOUT: f64 = 6.5;
71
72/// Default bitrate guess for AutoInterface (10 Mbps).
73pub const BITRATE_GUESS: u64 = 10_000_000;
74
75/// Deduplication deque size.
76pub const MULTI_IF_DEQUE_LEN: usize = 48;
77
78/// Deduplication deque entry TTL (seconds).
79pub const MULTI_IF_DEQUE_TTL: f64 = 0.75;
80
81/// Reverse peering interval multiplier (announce_interval * 3.25).
82pub const REVERSE_PEERING_MULTIPLIER: f64 = 3.25;
83
84/// Interfaces always ignored.
85pub const ALL_IGNORE_IFS: &[&str] = &["lo0"];
86
87// ── Configuration ──────────────────────────────────────────────────────────
88
89/// Configuration for an AutoInterface.
90#[derive(Debug, Clone)]
91pub struct AutoConfig {
92    pub name: String,
93    pub group_id: Vec<u8>,
94    pub discovery_scope: String,
95    pub discovery_port: u16,
96    pub data_port: u16,
97    pub multicast_address_type: String,
98    pub allowed_interfaces: Vec<String>,
99    pub ignored_interfaces: Vec<String>,
100    pub configured_bitrate: u64,
101    /// Base interface ID. Per-peer IDs will be assigned dynamically.
102    pub interface_id: InterfaceId,
103}
104
105impl Default for AutoConfig {
106    fn default() -> Self {
107        AutoConfig {
108            name: String::new(),
109            group_id: DEFAULT_GROUP_ID.to_vec(),
110            discovery_scope: SCOPE_LINK.to_string(),
111            discovery_port: DEFAULT_DISCOVERY_PORT,
112            data_port: DEFAULT_DATA_PORT,
113            multicast_address_type: MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string(),
114            allowed_interfaces: Vec::new(),
115            ignored_interfaces: Vec::new(),
116            configured_bitrate: BITRATE_GUESS,
117            interface_id: InterfaceId(0),
118        }
119    }
120}
121
122// ── Multicast address derivation ───────────────────────────────────────────
123
124/// Derive the IPv6 multicast discovery address from group_id, scope, and address type.
125///
126/// Algorithm (matching Python):
127///   1. group_hash = SHA-256(group_id)
128///   2. Build suffix from hash bytes 2..14 as 6 little-endian 16-bit words
129///   3. First word is hardcoded "0"
130///   4. Prefix = "ff" + address_type + scope
131pub fn derive_multicast_address(
132    group_id: &[u8],
133    address_type: &str,
134    scope: &str,
135) -> String {
136    let group_hash = rns_crypto::sha256::sha256(group_id);
137    let g = &group_hash;
138
139    // Build 6 LE 16-bit words from bytes 2..14
140    let w1 = (g[2] as u16) << 8 | g[3] as u16;
141    let w2 = (g[4] as u16) << 8 | g[5] as u16;
142    let w3 = (g[6] as u16) << 8 | g[7] as u16;
143    let w4 = (g[8] as u16) << 8 | g[9] as u16;
144    let w5 = (g[10] as u16) << 8 | g[11] as u16;
145    let w6 = (g[12] as u16) << 8 | g[13] as u16;
146
147    format!(
148        "ff{}{}:0:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
149        address_type, scope, w1, w2, w3, w4, w5, w6
150    )
151}
152
153/// Parse a multicast address string into an Ipv6Addr.
154pub fn parse_multicast_addr(addr: &str) -> Option<Ipv6Addr> {
155    addr.parse::<Ipv6Addr>().ok()
156}
157
158// ── Discovery token ────────────────────────────────────────────────────────
159
160/// Compute the discovery token: SHA-256(group_id + link_local_address_string).
161pub fn compute_discovery_token(group_id: &[u8], link_local_addr: &str) -> [u8; 32] {
162    let mut input = group_id.to_vec();
163    input.extend_from_slice(link_local_addr.as_bytes());
164    rns_crypto::sha256::sha256(&input)
165}
166
167// ── Network interface enumeration ──────────────────────────────────────────
168
169/// Information about a local network interface with an IPv6 link-local address.
170#[derive(Debug, Clone)]
171pub struct LocalInterface {
172    pub name: String,
173    pub link_local_addr: String,
174    pub index: u32,
175}
176
177/// Enumerate network interfaces that have IPv6 link-local addresses (fe80::/10).
178///
179/// Uses `libc::getifaddrs()`. Filters by allowed/ignored interface lists.
180pub fn enumerate_interfaces(
181    allowed: &[String],
182    ignored: &[String],
183) -> Vec<LocalInterface> {
184    let mut result = Vec::new();
185
186    unsafe {
187        let mut ifaddrs: *mut libc::ifaddrs = std::ptr::null_mut();
188        if libc::getifaddrs(&mut ifaddrs) != 0 {
189            return result;
190        }
191
192        let mut current = ifaddrs;
193        while !current.is_null() {
194            let ifa = &*current;
195            current = ifa.ifa_next;
196
197            // Must have an address
198            if ifa.ifa_addr.is_null() {
199                continue;
200            }
201
202            // Must be AF_INET6
203            if (*ifa.ifa_addr).sa_family as i32 != libc::AF_INET6 {
204                continue;
205            }
206
207            // Get interface name
208            let name = match std::ffi::CStr::from_ptr(ifa.ifa_name).to_str() {
209                Ok(s) => s.to_string(),
210                Err(_) => continue,
211            };
212
213            // Check global ignore list
214            if ALL_IGNORE_IFS.iter().any(|&ig| ig == name) {
215                if !allowed.iter().any(|a| a == &name) {
216                    continue;
217                }
218            }
219
220            // Check ignored interfaces
221            if ignored.iter().any(|ig| ig == &name) {
222                continue;
223            }
224
225            // Check allowed interfaces (if non-empty, only allow those)
226            if !allowed.is_empty() && !allowed.iter().any(|a| a == &name) {
227                continue;
228            }
229
230            // Extract IPv6 address
231            let sa6 = ifa.ifa_addr as *const libc::sockaddr_in6;
232            let addr_bytes = (*sa6).sin6_addr.s6_addr;
233            let ipv6 = Ipv6Addr::from(addr_bytes);
234
235            // Must be link-local (fe80::/10)
236            let octets = ipv6.octets();
237            if octets[0] != 0xfe || (octets[1] & 0xc0) != 0x80 {
238                continue;
239            }
240
241            // Format the address (drop scope ID, matching Python's descope_linklocal)
242            let addr_str = format!("{}", ipv6);
243
244            // Get interface index
245            let index = libc::if_nametoindex(ifa.ifa_name);
246            if index == 0 {
247                continue;
248            }
249
250            // Avoid duplicates (same interface may appear multiple times)
251            if result.iter().any(|li: &LocalInterface| li.name == name) {
252                continue;
253            }
254
255            result.push(LocalInterface {
256                name,
257                link_local_addr: addr_str,
258                index,
259            });
260        }
261
262        libc::freeifaddrs(ifaddrs);
263    }
264
265    result
266}
267
268// ── Peer tracking ──────────────────────────────────────────────────────────
269
270/// A discovered peer.
271struct AutoPeer {
272    interface_id: InterfaceId,
273    #[allow(dead_code)]
274    link_local_addr: String,
275    #[allow(dead_code)]
276    ifname: String,
277    last_heard: f64,
278}
279
280/// Writer that sends UDP unicast data to a peer.
281struct UdpWriter {
282    socket: UdpSocket,
283    target: SocketAddrV6,
284}
285
286impl Writer for UdpWriter {
287    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
288        self.socket.send_to(data, self.target)?;
289        Ok(())
290    }
291}
292
293/// Shared state for the AutoInterface across all threads.
294struct SharedState {
295    /// Known peers: link_local_addr → AutoPeer
296    peers: HashMap<String, AutoPeer>,
297    /// Our own link-local addresses (for echo detection)
298    link_local_addresses: Vec<String>,
299    /// Deduplication deque: (hash, timestamp)
300    dedup_deque: VecDeque<([u8; 32], f64)>,
301    /// Flag set when final_init is done
302    online: bool,
303    /// Next dynamic interface ID
304    next_id: Arc<AtomicU64>,
305}
306
307impl SharedState {
308    fn new(next_id: Arc<AtomicU64>) -> Self {
309        SharedState {
310            peers: HashMap::new(),
311            link_local_addresses: Vec::new(),
312            dedup_deque: VecDeque::new(),
313            online: false,
314            next_id,
315        }
316    }
317
318    /// Check dedup deque for a data hash.
319    fn is_duplicate(&self, hash: &[u8; 32], now: f64) -> bool {
320        for (h, ts) in &self.dedup_deque {
321            if h == hash && now < *ts + MULTI_IF_DEQUE_TTL {
322                return true;
323            }
324        }
325        false
326    }
327
328    /// Add to dedup deque, trimming to max length.
329    fn add_dedup(&mut self, hash: [u8; 32], now: f64) {
330        self.dedup_deque.push_back((hash, now));
331        while self.dedup_deque.len() > MULTI_IF_DEQUE_LEN {
332            self.dedup_deque.pop_front();
333        }
334    }
335
336    /// Refresh a peer's last_heard timestamp.
337    fn refresh_peer(&mut self, addr: &str, now: f64) {
338        if let Some(peer) = self.peers.get_mut(addr) {
339            peer.last_heard = now;
340        }
341    }
342}
343
344// ── Start function ─────────────────────────────────────────────────────────
345
346/// Start an AutoInterface. Discovers local IPv6 link-local interfaces,
347/// sets up multicast discovery, and creates UDP data servers.
348///
349/// Returns a vec of (InterfaceId, Writer) for each initial peer (typically empty
350/// since peers are discovered dynamically via InterfaceUp events).
351pub fn start(
352    config: AutoConfig,
353    tx: EventSender,
354    next_dynamic_id: Arc<AtomicU64>,
355) -> io::Result<()> {
356    let interfaces = enumerate_interfaces(
357        &config.allowed_interfaces,
358        &config.ignored_interfaces,
359    );
360
361    if interfaces.is_empty() {
362        log::warn!(
363            "[{}] No suitable IPv6 link-local interfaces found",
364            config.name,
365        );
366        return Ok(());
367    }
368
369    let group_id = config.group_id.clone();
370    let mcast_addr_str = derive_multicast_address(
371        &group_id,
372        &config.multicast_address_type,
373        &config.discovery_scope,
374    );
375
376    let mcast_ip = match parse_multicast_addr(&mcast_addr_str) {
377        Some(ip) => ip,
378        None => {
379            return Err(io::Error::new(
380                io::ErrorKind::InvalidData,
381                format!("Invalid multicast address: {}", mcast_addr_str),
382            ));
383        }
384    };
385
386    let discovery_port = config.discovery_port;
387    let unicast_discovery_port = config.discovery_port + 1;
388    let data_port = config.data_port;
389    let name = config.name.clone();
390    let announce_interval = ANNOUNCE_INTERVAL;
391    let configured_bitrate = config.configured_bitrate;
392
393    let shared = Arc::new(Mutex::new(SharedState::new(next_dynamic_id)));
394    let running = Arc::new(AtomicBool::new(true));
395
396    // Record our own link-local addresses
397    {
398        let mut state = shared.lock().unwrap();
399        for iface in &interfaces {
400            state.link_local_addresses.push(iface.link_local_addr.clone());
401        }
402    }
403
404    log::info!(
405        "[{}] AutoInterface starting with {} local interfaces, multicast {}",
406        name,
407        interfaces.len(),
408        mcast_addr_str,
409    );
410
411    // Per-interface: set up discovery sockets and threads
412    for local_iface in &interfaces {
413        let ifname = local_iface.name.clone();
414        let link_local = local_iface.link_local_addr.clone();
415        let if_index = local_iface.index;
416
417        // ─── Multicast discovery socket ───────────────────────────────
418        let mcast_socket = create_multicast_recv_socket(
419            &mcast_ip,
420            discovery_port,
421            if_index,
422        )?;
423
424        // ─── Unicast discovery socket ─────────────────────────────────
425        let unicast_socket = create_unicast_recv_socket(
426            &link_local,
427            unicast_discovery_port,
428            if_index,
429        )?;
430
431        // ─── Discovery sender thread ──────────────────────────────────
432        {
433            let group_id = group_id.clone();
434            let link_local = link_local.clone();
435            let running = running.clone();
436            let name = name.clone();
437
438            thread::Builder::new()
439                .name(format!("auto-disc-tx-{}", ifname))
440                .spawn(move || {
441                    discovery_sender_loop(
442                        &group_id,
443                        &link_local,
444                        &mcast_ip,
445                        discovery_port,
446                        if_index,
447                        announce_interval,
448                        &running,
449                        &name,
450                    );
451                })?;
452        }
453
454        // ─── Multicast discovery receiver thread ──────────────────────
455        {
456            let group_id = group_id.clone();
457            let shared = shared.clone();
458            let tx = tx.clone();
459            let running = running.clone();
460            let name = name.clone();
461
462            thread::Builder::new()
463                .name(format!("auto-disc-rx-{}", ifname))
464                .spawn(move || {
465                    discovery_receiver_loop(
466                        mcast_socket,
467                        &group_id,
468                        shared,
469                        tx,
470                        &running,
471                        &name,
472                        data_port,
473                        configured_bitrate,
474                    );
475                })?;
476        }
477
478        // ─── Unicast discovery receiver thread ────────────────────────
479        {
480            let group_id = group_id.clone();
481            let shared = shared.clone();
482            let tx = tx.clone();
483            let running = running.clone();
484            let name = name.clone();
485
486            thread::Builder::new()
487                .name(format!("auto-udisc-rx-{}", ifname))
488                .spawn(move || {
489                    discovery_receiver_loop(
490                        unicast_socket,
491                        &group_id,
492                        shared,
493                        tx,
494                        &running,
495                        &name,
496                        data_port,
497                        configured_bitrate,
498                    );
499                })?;
500        }
501
502        // ─── Data receiver thread ─────────────────────────────────────
503        {
504            let link_local = local_iface.link_local_addr.clone();
505            let shared = shared.clone();
506            let tx = tx.clone();
507            let running = running.clone();
508            let name = name.clone();
509
510            let data_socket = create_data_recv_socket(
511                &link_local,
512                data_port,
513                if_index,
514            )?;
515
516            thread::Builder::new()
517                .name(format!("auto-data-rx-{}", local_iface.name))
518                .spawn(move || {
519                    data_receiver_loop(
520                        data_socket,
521                        shared,
522                        tx,
523                        &running,
524                        &name,
525                    );
526                })?;
527        }
528    }
529
530    // ─── Peer jobs thread ─────────────────────────────────────────────
531    {
532        let shared = shared.clone();
533        let tx = tx.clone();
534        let running = running.clone();
535        let name = name.clone();
536
537        thread::Builder::new()
538            .name(format!("auto-peer-jobs-{}", name))
539            .spawn(move || {
540                peer_jobs_loop(shared, tx, &running, &name);
541            })?;
542    }
543
544    // Wait for initial peering
545    let peering_wait = Duration::from_secs_f64(announce_interval * 1.2);
546    thread::sleep(peering_wait);
547
548    // Mark as online
549    {
550        let mut state = shared.lock().unwrap();
551        state.online = true;
552    }
553
554    log::info!("[{}] AutoInterface online", config.name);
555
556    Ok(())
557}
558
559// ── Socket creation helpers ────────────────────────────────────────────────
560
561fn create_multicast_recv_socket(
562    mcast_ip: &Ipv6Addr,
563    port: u16,
564    if_index: u32,
565) -> io::Result<UdpSocket> {
566    let socket = socket2::Socket::new(
567        socket2::Domain::IPV6,
568        socket2::Type::DGRAM,
569        Some(socket2::Protocol::UDP),
570    )?;
571
572    socket.set_reuse_address(true)?;
573    #[cfg(not(target_os = "windows"))]
574    socket.set_reuse_port(true)?;
575
576    // Bind to [::]:port on the specific interface
577    let bind_addr = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, port, 0, if_index);
578    socket.bind(&bind_addr.into())?;
579
580    // Join multicast group on the specific interface
581    socket.join_multicast_v6(mcast_ip, if_index)?;
582
583    socket.set_nonblocking(false)?;
584    let std_socket: UdpSocket = socket.into();
585    std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
586    Ok(std_socket)
587}
588
589fn create_unicast_recv_socket(
590    link_local: &str,
591    port: u16,
592    if_index: u32,
593) -> io::Result<UdpSocket> {
594    let ip: Ipv6Addr = link_local.parse().map_err(|e| {
595        io::Error::new(io::ErrorKind::InvalidInput, format!("bad IPv6: {}", e))
596    })?;
597
598    let socket = socket2::Socket::new(
599        socket2::Domain::IPV6,
600        socket2::Type::DGRAM,
601        Some(socket2::Protocol::UDP),
602    )?;
603
604    socket.set_reuse_address(true)?;
605    #[cfg(not(target_os = "windows"))]
606    socket.set_reuse_port(true)?;
607
608    let bind_addr = SocketAddrV6::new(ip, port, 0, if_index);
609    socket.bind(&bind_addr.into())?;
610
611    socket.set_nonblocking(false)?;
612    let std_socket: UdpSocket = socket.into();
613    std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
614    Ok(std_socket)
615}
616
617fn create_data_recv_socket(
618    link_local: &str,
619    port: u16,
620    if_index: u32,
621) -> io::Result<UdpSocket> {
622    let ip: Ipv6Addr = link_local.parse().map_err(|e| {
623        io::Error::new(io::ErrorKind::InvalidInput, format!("bad IPv6: {}", e))
624    })?;
625
626    let socket = socket2::Socket::new(
627        socket2::Domain::IPV6,
628        socket2::Type::DGRAM,
629        Some(socket2::Protocol::UDP),
630    )?;
631
632    socket.set_reuse_address(true)?;
633    #[cfg(not(target_os = "windows"))]
634    socket.set_reuse_port(true)?;
635
636    let bind_addr = SocketAddrV6::new(ip, port, 0, if_index);
637    socket.bind(&bind_addr.into())?;
638
639    socket.set_nonblocking(false)?;
640    let std_socket: UdpSocket = socket.into();
641    std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
642    Ok(std_socket)
643}
644
645// ── Thread loops ───────────────────────────────────────────────────────────
646
647/// Discovery sender: periodically sends discovery token via multicast.
648fn discovery_sender_loop(
649    group_id: &[u8],
650    link_local_addr: &str,
651    mcast_ip: &Ipv6Addr,
652    discovery_port: u16,
653    if_index: u32,
654    interval: f64,
655    running: &AtomicBool,
656    name: &str,
657) {
658    let token = compute_discovery_token(group_id, link_local_addr);
659    let sleep_dur = Duration::from_secs_f64(interval);
660
661    while running.load(Ordering::Relaxed) {
662        // Create a fresh socket for each send (matches Python)
663        if let Ok(socket) = UdpSocket::bind("[::]:0") {
664            // Set multicast interface
665            let if_bytes = if_index.to_ne_bytes();
666            unsafe {
667                libc::setsockopt(
668                    socket_fd(&socket),
669                    libc::IPPROTO_IPV6,
670                    libc::IPV6_MULTICAST_IF,
671                    if_bytes.as_ptr() as *const libc::c_void,
672                    4,
673                );
674            }
675
676            let target = SocketAddrV6::new(*mcast_ip, discovery_port, 0, 0);
677            if let Err(e) = socket.send_to(&token, target) {
678                log::debug!("[{}] multicast send error: {}", name, e);
679            }
680        }
681
682        thread::sleep(sleep_dur);
683    }
684}
685
686/// Discovery receiver: listens for discovery tokens and adds peers.
687fn discovery_receiver_loop(
688    socket: UdpSocket,
689    group_id: &[u8],
690    shared: Arc<Mutex<SharedState>>,
691    tx: EventSender,
692    running: &AtomicBool,
693    name: &str,
694    data_port: u16,
695    configured_bitrate: u64,
696) {
697    let mut buf = [0u8; 1024];
698
699    while running.load(Ordering::Relaxed) {
700        match socket.recv_from(&mut buf) {
701            Ok((n, src)) => {
702                if n < 32 {
703                    continue;
704                }
705
706                // Extract source IPv6 address
707                let src_addr = match src {
708                    std::net::SocketAddr::V6(v6) => v6,
709                    _ => continue,
710                };
711                let src_ip = format!("{}", src_addr.ip());
712
713                let peering_hash = &buf[..32];
714                let expected = compute_discovery_token(group_id, &src_ip);
715
716                if peering_hash != expected {
717                    log::debug!(
718                        "[{}] invalid peering hash from {}",
719                        name, src_ip
720                    );
721                    continue;
722                }
723
724                // Check if online
725                let state = shared.lock().unwrap();
726                if !state.online {
727                    // Not fully initialized yet, but still accept for initial peering
728                    // (Python processes after final_init_done)
729                }
730
731                // Check if it's our own echo
732                if state.link_local_addresses.contains(&src_ip) {
733                    // Multicast echo from ourselves — just record it
734                    drop(state);
735                    continue;
736                }
737
738                // Check if already known
739                if state.peers.contains_key(&src_ip) {
740                    let now = crate::time::now();
741                    drop(state);
742                    let mut state = shared.lock().unwrap();
743                    state.refresh_peer(&src_ip, now);
744                    continue;
745                }
746                drop(state);
747
748                // New peer! Create a data writer to send to them.
749                add_peer(
750                    &shared,
751                    &tx,
752                    &src_ip,
753                    data_port,
754                    name,
755                    configured_bitrate,
756                );
757            }
758            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock
759                || e.kind() == io::ErrorKind::TimedOut =>
760            {
761                // Timeout, loop again
762                continue;
763            }
764            Err(e) => {
765                log::warn!("[{}] discovery recv error: {}", name, e);
766                if !running.load(Ordering::Relaxed) {
767                    return;
768                }
769                thread::sleep(Duration::from_millis(100));
770            }
771        }
772    }
773}
774
775/// Add a new peer, creating a writer and emitting InterfaceUp.
776fn add_peer(
777    shared: &Arc<Mutex<SharedState>>,
778    tx: &EventSender,
779    peer_addr: &str,
780    data_port: u16,
781    name: &str,
782    configured_bitrate: u64,
783) {
784    let peer_ip: Ipv6Addr = match peer_addr.parse() {
785        Ok(ip) => ip,
786        Err(_) => return,
787    };
788
789    // Create UDP writer to send data to this peer
790    let send_socket = match UdpSocket::bind("[::]:0") {
791        Ok(s) => s,
792        Err(e) => {
793            log::warn!("[{}] failed to create writer for peer {}: {}", name, peer_addr, e);
794            return;
795        }
796    };
797
798    let target = SocketAddrV6::new(peer_ip, data_port, 0, 0);
799
800    let mut state = shared.lock().unwrap();
801
802    // Double-check not already added (race)
803    if state.peers.contains_key(peer_addr) {
804        state.refresh_peer(peer_addr, crate::time::now());
805        return;
806    }
807
808    let peer_id = InterfaceId(state.next_id.fetch_add(1, Ordering::Relaxed));
809
810    // Create a boxed writer for the driver
811    let driver_writer: Box<dyn Writer> = Box::new(UdpWriter {
812        socket: send_socket,
813        target,
814    });
815
816    let peer_info = rns_core::transport::types::InterfaceInfo {
817        id: peer_id,
818        name: format!("{}:{}", name, peer_addr),
819        mode: rns_core::constants::MODE_FULL,
820        out_capable: true,
821        in_capable: true,
822        bitrate: Some(configured_bitrate),
823        announce_rate_target: None,
824        announce_rate_grace: 0,
825        announce_rate_penalty: 0.0,
826        announce_cap: rns_core::constants::ANNOUNCE_CAP,
827        is_local_client: false,
828        wants_tunnel: false,
829        tunnel_id: None,
830        mtu: 1400,
831        ia_freq: 0.0,
832        started: 0.0,
833        ingress_control: true,
834    };
835
836    let now = crate::time::now();
837    state.peers.insert(
838        peer_addr.to_string(),
839        AutoPeer {
840            interface_id: peer_id,
841            link_local_addr: peer_addr.to_string(),
842            ifname: String::new(),
843            last_heard: now,
844        },
845    );
846
847    log::info!("[{}] Peer discovered: {} (id={})", name, peer_addr, peer_id.0);
848
849    // Notify driver of new dynamic interface
850    let _ = tx.send(Event::InterfaceUp(
851        peer_id,
852        Some(driver_writer),
853        Some(peer_info),
854    ));
855}
856
857/// Data receiver: receives unicast UDP data from peers and dispatches as frames.
858fn data_receiver_loop(
859    socket: UdpSocket,
860    shared: Arc<Mutex<SharedState>>,
861    tx: EventSender,
862    running: &AtomicBool,
863    name: &str,
864) {
865    let mut buf = [0u8; HW_MTU + 64]; // a bit extra
866
867    while running.load(Ordering::Relaxed) {
868        match socket.recv_from(&mut buf) {
869            Ok((n, src)) => {
870                if n == 0 {
871                    continue;
872                }
873
874                let src_addr = match src {
875                    std::net::SocketAddr::V6(v6) => v6,
876                    _ => continue,
877                };
878                let src_ip = format!("{}", src_addr.ip());
879                let data = &buf[..n];
880
881                let now = crate::time::now();
882                let data_hash = rns_crypto::sha256::sha256(data);
883
884                let mut state = shared.lock().unwrap();
885
886                if !state.online {
887                    continue;
888                }
889
890                // Deduplication
891                if state.is_duplicate(&data_hash, now) {
892                    continue;
893                }
894                state.add_dedup(data_hash, now);
895
896                // Refresh peer
897                state.refresh_peer(&src_ip, now);
898
899                // Find the interface ID for this peer
900                let iface_id = match state.peers.get(&src_ip) {
901                    Some(peer) => peer.interface_id,
902                    None => {
903                        // Unknown peer, skip
904                        continue;
905                    }
906                };
907
908                drop(state);
909
910                if tx
911                    .send(Event::Frame {
912                        interface_id: iface_id,
913                        data: data.to_vec(),
914                    })
915                    .is_err()
916                {
917                    return;
918                }
919            }
920            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock
921                || e.kind() == io::ErrorKind::TimedOut =>
922            {
923                continue;
924            }
925            Err(e) => {
926                log::warn!("[{}] data recv error: {}", name, e);
927                if !running.load(Ordering::Relaxed) {
928                    return;
929                }
930                thread::sleep(Duration::from_millis(100));
931            }
932        }
933    }
934}
935
936/// Peer jobs: periodically cull timed-out peers.
937fn peer_jobs_loop(
938    shared: Arc<Mutex<SharedState>>,
939    tx: EventSender,
940    running: &AtomicBool,
941    name: &str,
942) {
943    let interval = Duration::from_secs_f64(PEER_JOB_INTERVAL);
944
945    while running.load(Ordering::Relaxed) {
946        thread::sleep(interval);
947
948        let now = crate::time::now();
949        let mut timed_out = Vec::new();
950
951        {
952            let state = shared.lock().unwrap();
953            for (addr, peer) in &state.peers {
954                if now > peer.last_heard + PEERING_TIMEOUT {
955                    timed_out.push((addr.clone(), peer.interface_id));
956                }
957            }
958        }
959
960        for (addr, iface_id) in &timed_out {
961            log::info!("[{}] Peer timed out: {}", name, addr);
962            let mut state = shared.lock().unwrap();
963            state.peers.remove(addr.as_str());
964            let _ = tx.send(Event::InterfaceDown(*iface_id));
965        }
966    }
967}
968
969// ── Helper ─────────────────────────────────────────────────────────────────
970
971/// Get the raw file descriptor from a UdpSocket (for setsockopt).
972#[cfg(unix)]
973fn socket_fd(socket: &UdpSocket) -> i32 {
974    use std::os::unix::io::AsRawFd;
975    socket.as_raw_fd()
976}
977
978#[cfg(not(unix))]
979fn socket_fd(_socket: &UdpSocket) -> i32 {
980    0
981}
982
983// ── Factory implementation ─────────────────────────────────────────────────
984
985use super::{InterfaceFactory, InterfaceConfigData, StartContext, StartResult};
986
987/// Factory for `AutoInterface`.
988pub struct AutoFactory;
989
990impl InterfaceFactory for AutoFactory {
991    fn type_name(&self) -> &str { "AutoInterface" }
992
993    fn parse_config(
994        &self,
995        name: &str,
996        id: InterfaceId,
997        params: &HashMap<String, String>,
998    ) -> Result<Box<dyn InterfaceConfigData>, String> {
999        let group_id = params
1000            .get("group_id")
1001            .map(|v| v.as_bytes().to_vec())
1002            .unwrap_or_else(|| DEFAULT_GROUP_ID.to_vec());
1003
1004        let discovery_scope = params
1005            .get("discovery_scope")
1006            .map(|v| match v.to_lowercase().as_str() {
1007                "link"                        => SCOPE_LINK.to_string(),
1008                "admin"                       => SCOPE_ADMIN.to_string(),
1009                "site"                        => SCOPE_SITE.to_string(),
1010                "organisation" | "organization" => SCOPE_ORGANISATION.to_string(),
1011                "global"                      => SCOPE_GLOBAL.to_string(),
1012                _                             => v.clone(),
1013            })
1014            .unwrap_or_else(|| SCOPE_LINK.to_string());
1015
1016        let discovery_port = params
1017            .get("discovery_port")
1018            .and_then(|v| v.parse().ok())
1019            .unwrap_or(DEFAULT_DISCOVERY_PORT);
1020
1021        let data_port = params
1022            .get("data_port")
1023            .and_then(|v| v.parse().ok())
1024            .unwrap_or(DEFAULT_DATA_PORT);
1025
1026        let multicast_address_type = params
1027            .get("multicast_address_type")
1028            .map(|v| match v.to_lowercase().as_str() {
1029                "permanent"  => MULTICAST_PERMANENT_ADDRESS_TYPE.to_string(),
1030                "temporary"  => MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string(),
1031                _            => v.clone(),
1032            })
1033            .unwrap_or_else(|| MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string());
1034
1035        let configured_bitrate = params
1036            .get("configured_bitrate")
1037            .or_else(|| params.get("bitrate"))
1038            .and_then(|v| v.parse().ok())
1039            .unwrap_or(BITRATE_GUESS);
1040
1041        let allowed_interfaces = params
1042            .get("devices")
1043            .or_else(|| params.get("allowed_interfaces"))
1044            .map(|v| {
1045                v.split(',')
1046                    .map(|s| s.trim().to_string())
1047                    .filter(|s| !s.is_empty())
1048                    .collect()
1049            })
1050            .unwrap_or_default();
1051
1052        let ignored_interfaces = params
1053            .get("ignored_devices")
1054            .or_else(|| params.get("ignored_interfaces"))
1055            .map(|v| {
1056                v.split(',')
1057                    .map(|s| s.trim().to_string())
1058                    .filter(|s| !s.is_empty())
1059                    .collect()
1060            })
1061            .unwrap_or_default();
1062
1063        Ok(Box::new(AutoConfig {
1064            name: name.to_string(),
1065            group_id,
1066            discovery_scope,
1067            discovery_port,
1068            data_port,
1069            multicast_address_type,
1070            allowed_interfaces,
1071            ignored_interfaces,
1072            configured_bitrate,
1073            interface_id: id,
1074        }))
1075    }
1076
1077    fn start(
1078        &self,
1079        config: Box<dyn InterfaceConfigData>,
1080        ctx: StartContext,
1081    ) -> std::io::Result<StartResult> {
1082        let auto_config = *config.into_any().downcast::<AutoConfig>()
1083            .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type"))?;
1084
1085        start(auto_config, ctx.tx, ctx.next_dynamic_id)?;
1086        Ok(StartResult::Listener)
1087    }
1088}
1089
1090// ── Tests ──────────────────────────────────────────────────────────────────
1091
1092#[cfg(test)]
1093mod tests {
1094    use super::*;
1095
1096    // ── Multicast address derivation ──────────────────────────────────
1097
1098    #[test]
1099    fn multicast_address_default_group() {
1100        // Python vector: ff12:0:d70b:fb1c:16e4:5e39:485e:31e1
1101        let addr = derive_multicast_address(
1102            DEFAULT_GROUP_ID,
1103            MULTICAST_TEMPORARY_ADDRESS_TYPE,
1104            SCOPE_LINK,
1105        );
1106        assert_eq!(addr, "ff12:0:d70b:fb1c:16e4:5e39:485e:31e1");
1107    }
1108
1109    #[test]
1110    fn multicast_address_custom_group() {
1111        let addr = derive_multicast_address(
1112            b"testgroup",
1113            MULTICAST_TEMPORARY_ADDRESS_TYPE,
1114            SCOPE_LINK,
1115        );
1116        // Just verify format
1117        assert!(addr.starts_with("ff12:0:"));
1118        // Must be different from default
1119        assert_ne!(addr, "ff12:0:d70b:fb1c:16e4:5e39:485e:31e1");
1120    }
1121
1122    #[test]
1123    fn multicast_address_scope_admin() {
1124        let addr = derive_multicast_address(
1125            DEFAULT_GROUP_ID,
1126            MULTICAST_TEMPORARY_ADDRESS_TYPE,
1127            SCOPE_ADMIN,
1128        );
1129        assert!(addr.starts_with("ff14:0:"));
1130    }
1131
1132    #[test]
1133    fn multicast_address_permanent_type() {
1134        let addr = derive_multicast_address(
1135            DEFAULT_GROUP_ID,
1136            MULTICAST_PERMANENT_ADDRESS_TYPE,
1137            SCOPE_LINK,
1138        );
1139        assert!(addr.starts_with("ff02:0:"));
1140    }
1141
1142    #[test]
1143    fn multicast_address_parseable() {
1144        let addr = derive_multicast_address(
1145            DEFAULT_GROUP_ID,
1146            MULTICAST_TEMPORARY_ADDRESS_TYPE,
1147            SCOPE_LINK,
1148        );
1149        let ip = parse_multicast_addr(&addr);
1150        assert!(ip.is_some());
1151        assert!(ip.unwrap().is_multicast());
1152    }
1153
1154    // ── Discovery token ──────────────────────────────────────────────
1155
1156    #[test]
1157    fn discovery_token_interop() {
1158        // Python vector: fe80::1 → 97b25576749ea936b0d8a8536ffaf442d157cf47d460dcf13c48b7bd18b6c163
1159        let token = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::1");
1160        let expected = "97b25576749ea936b0d8a8536ffaf442d157cf47d460dcf13c48b7bd18b6c163";
1161        let got = token.iter().map(|b| format!("{:02x}", b)).collect::<String>();
1162        assert_eq!(got, expected);
1163    }
1164
1165    #[test]
1166    fn discovery_token_interop_2() {
1167        // Python vector: fe80::dead:beef:1234:5678
1168        let token = compute_discovery_token(
1169            DEFAULT_GROUP_ID,
1170            "fe80::dead:beef:1234:5678",
1171        );
1172        let expected = "46b6ec7595504b6a35f06bd4bfff71567fb82fcf2706cd361bab20409c42d072";
1173        let got = token.iter().map(|b| format!("{:02x}", b)).collect::<String>();
1174        assert_eq!(got, expected);
1175    }
1176
1177    #[test]
1178    fn discovery_token_different_groups() {
1179        let t1 = compute_discovery_token(b"reticulum", "fe80::1");
1180        let t2 = compute_discovery_token(b"othergroup", "fe80::1");
1181        assert_ne!(t1, t2);
1182    }
1183
1184    #[test]
1185    fn discovery_token_different_addrs() {
1186        let t1 = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::1");
1187        let t2 = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::2");
1188        assert_ne!(t1, t2);
1189    }
1190
1191    // ── Deduplication ────────────────────────────────────────────────
1192
1193    #[test]
1194    fn dedup_basic() {
1195        let next_id = Arc::new(AtomicU64::new(1));
1196        let mut state = SharedState::new(next_id);
1197
1198        let hash = [0xAA; 32];
1199        let now = 1000.0;
1200
1201        assert!(!state.is_duplicate(&hash, now));
1202        state.add_dedup(hash, now);
1203        assert!(state.is_duplicate(&hash, now));
1204    }
1205
1206    #[test]
1207    fn dedup_expired() {
1208        let next_id = Arc::new(AtomicU64::new(1));
1209        let mut state = SharedState::new(next_id);
1210
1211        let hash = [0xBB; 32];
1212        state.add_dedup(hash, 1000.0);
1213
1214        // Within TTL
1215        assert!(state.is_duplicate(&hash, 1000.5));
1216        // Expired
1217        assert!(!state.is_duplicate(&hash, 1001.0));
1218    }
1219
1220    #[test]
1221    fn dedup_max_length() {
1222        let next_id = Arc::new(AtomicU64::new(1));
1223        let mut state = SharedState::new(next_id);
1224
1225        // Fill beyond max
1226        for i in 0..MULTI_IF_DEQUE_LEN + 10 {
1227            let mut hash = [0u8; 32];
1228            hash[0] = (i & 0xFF) as u8;
1229            hash[1] = ((i >> 8) & 0xFF) as u8;
1230            state.add_dedup(hash, 1000.0);
1231        }
1232
1233        assert_eq!(state.dedup_deque.len(), MULTI_IF_DEQUE_LEN);
1234    }
1235
1236    // ── Peer tracking ────────────────────────────────────────────────
1237
1238    #[test]
1239    fn peer_refresh() {
1240        let next_id = Arc::new(AtomicU64::new(100));
1241        let mut state = SharedState::new(next_id);
1242
1243        state.peers.insert("fe80::1".to_string(), AutoPeer {
1244            interface_id: InterfaceId(100),
1245            link_local_addr: "fe80::1".to_string(),
1246            ifname: "eth0".to_string(),
1247            last_heard: 1000.0,
1248        });
1249
1250        state.refresh_peer("fe80::1", 2000.0);
1251        assert_eq!(state.peers["fe80::1"].last_heard, 2000.0);
1252    }
1253
1254    #[test]
1255    fn peer_not_found_refresh() {
1256        let next_id = Arc::new(AtomicU64::new(100));
1257        let mut state = SharedState::new(next_id);
1258        // Should not panic
1259        state.refresh_peer("fe80::999", 1000.0);
1260    }
1261
1262    // ── Network interface enumeration ────────────────────────────────
1263
1264    #[test]
1265    fn enumerate_returns_vec() {
1266        // This test just verifies the function runs without crashing.
1267        // Results depend on the system's network configuration.
1268        let interfaces = enumerate_interfaces(&[], &[]);
1269        // On CI/test machines, we may or may not have IPv6 link-local
1270        for iface in &interfaces {
1271            assert!(!iface.name.is_empty());
1272            assert!(iface.link_local_addr.starts_with("fe80"));
1273            assert!(iface.index > 0);
1274        }
1275    }
1276
1277    #[test]
1278    fn enumerate_with_ignored() {
1279        // Ignore everything
1280        let interfaces = enumerate_interfaces(&[], &["lo".to_string(), "eth0".to_string(), "wlan0".to_string(), "enp0s3".to_string(), "docker0".to_string()]);
1281        // May still have some interfaces, but known ones should be filtered
1282        for iface in &interfaces {
1283            assert_ne!(iface.name, "lo");
1284            assert_ne!(iface.name, "eth0");
1285            assert_ne!(iface.name, "wlan0");
1286        }
1287    }
1288
1289    #[test]
1290    fn enumerate_with_allowed_nonexistent() {
1291        // Only allow an interface that doesn't exist
1292        let interfaces = enumerate_interfaces(
1293            &["nonexistent_if_12345".to_string()],
1294            &[],
1295        );
1296        assert!(interfaces.is_empty());
1297    }
1298
1299    // ── Config defaults ──────────────────────────────────────────────
1300
1301    #[test]
1302    fn config_defaults() {
1303        let config = AutoConfig::default();
1304        assert_eq!(config.group_id, DEFAULT_GROUP_ID);
1305        assert_eq!(config.discovery_scope, SCOPE_LINK);
1306        assert_eq!(config.discovery_port, DEFAULT_DISCOVERY_PORT);
1307        assert_eq!(config.data_port, DEFAULT_DATA_PORT);
1308        assert_eq!(config.multicast_address_type, MULTICAST_TEMPORARY_ADDRESS_TYPE);
1309        assert_eq!(config.configured_bitrate, BITRATE_GUESS);
1310        assert!(config.allowed_interfaces.is_empty());
1311        assert!(config.ignored_interfaces.is_empty());
1312    }
1313
1314    // ── Constants ────────────────────────────────────────────────────
1315
1316    #[test]
1317    fn constants_match_python() {
1318        assert_eq!(DEFAULT_DISCOVERY_PORT, 29716);
1319        assert_eq!(DEFAULT_DATA_PORT, 42671);
1320        assert_eq!(HW_MTU, 1196);
1321        assert_eq!(MULTI_IF_DEQUE_LEN, 48);
1322        assert!((MULTI_IF_DEQUE_TTL - 0.75).abs() < f64::EPSILON);
1323        assert!((PEERING_TIMEOUT - 22.0).abs() < f64::EPSILON);
1324        assert!((ANNOUNCE_INTERVAL - 1.6).abs() < f64::EPSILON);
1325        assert!((PEER_JOB_INTERVAL - 4.0).abs() < f64::EPSILON);
1326        assert!((MCAST_ECHO_TIMEOUT - 6.5).abs() < f64::EPSILON);
1327        assert_eq!(BITRATE_GUESS, 10_000_000);
1328    }
1329
1330    #[test]
1331    fn unicast_discovery_port() {
1332        // Python: unicast_discovery_port = discovery_port + 1
1333        let unicast_port = DEFAULT_DISCOVERY_PORT + 1;
1334        assert_eq!(unicast_port, 29717);
1335    }
1336
1337    #[test]
1338    fn reverse_peering_interval() {
1339        let interval = ANNOUNCE_INTERVAL * REVERSE_PEERING_MULTIPLIER;
1340        assert!((interval - 5.2).abs() < 0.01);
1341    }
1342}