Skip to main content

rns_net/interface/
auto.rs

1//! AutoInterface: Zero-configuration LAN auto-discovery via IPv6 multicast.
2//!
3//! Matches Python `AutoInterface` from `RNS/Interfaces/AutoInterface.py`.
4//!
5//! Thread model (per adopted network interface):
6//!   - Discovery sender: periodically sends discovery token via multicast
7//!   - Discovery receiver (multicast): validates tokens, adds peers
8//!   - Discovery receiver (unicast): validates reverse-peering tokens
9//!   - Data receiver: UDP server receiving unicast data from peers
10//!
11//! Additionally one shared thread:
12//!   - Peer jobs: periodically culls timed-out peers
13
14use std::collections::{HashMap, VecDeque};
15use std::io;
16use std::net::{Ipv6Addr, SocketAddrV6, UdpSocket};
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18use std::sync::{Arc, Mutex};
19use std::thread;
20use std::time::Duration;
21
22use rns_core::transport::types::InterfaceId;
23
24use crate::event::{Event, EventSender};
25use crate::interface::Writer;
26
27// ── Constants (matching Python AutoInterface) ──────────────────────────────
28
29/// Default UDP port for multicast discovery.
30pub const DEFAULT_DISCOVERY_PORT: u16 = 29716;
31
32/// Default UDP port for unicast data exchange.
33pub const DEFAULT_DATA_PORT: u16 = 42671;
34
35/// Default group identifier.
36pub const DEFAULT_GROUP_ID: &[u8] = b"reticulum";
37
38/// Default IFAC size for AutoInterface (bytes).
39pub const DEFAULT_IFAC_SIZE: usize = 16;
40
41/// Hardware MTU for AutoInterface packets.
42pub const HW_MTU: usize = 1196;
43
44/// Multicast scope: link-local.
45pub const SCOPE_LINK: &str = "2";
46/// Multicast scope: admin-local.
47pub const SCOPE_ADMIN: &str = "4";
48/// Multicast scope: site-local.
49pub const SCOPE_SITE: &str = "5";
50/// Multicast scope: organization-local.
51pub const SCOPE_ORGANISATION: &str = "8";
52/// Multicast scope: global.
53pub const SCOPE_GLOBAL: &str = "e";
54
55/// Permanent multicast address type.
56pub const MULTICAST_PERMANENT_ADDRESS_TYPE: &str = "0";
57/// Temporary multicast address type.
58pub const MULTICAST_TEMPORARY_ADDRESS_TYPE: &str = "1";
59
60/// How long before a peer is considered timed out (seconds).
61pub const PEERING_TIMEOUT: f64 = 22.0;
62
63/// How often to send multicast discovery announcements (seconds).
64pub const ANNOUNCE_INTERVAL: f64 = 1.6;
65
66/// How often to run peer maintenance jobs (seconds).
67pub const PEER_JOB_INTERVAL: f64 = 4.0;
68
69/// Multicast echo timeout (seconds). Used for carrier detection.
70pub const MCAST_ECHO_TIMEOUT: f64 = 6.5;
71
72/// Default bitrate guess for AutoInterface (10 Mbps).
73pub const BITRATE_GUESS: u64 = 10_000_000;
74
75/// Deduplication deque size.
76pub const MULTI_IF_DEQUE_LEN: usize = 48;
77
78/// Deduplication deque entry TTL (seconds).
79pub const MULTI_IF_DEQUE_TTL: f64 = 0.75;
80
81/// Reverse peering interval multiplier (announce_interval * 3.25).
82pub const REVERSE_PEERING_MULTIPLIER: f64 = 3.25;
83
84/// Interfaces always ignored.
85pub const ALL_IGNORE_IFS: &[&str] = &["lo0"];
86
87// ── Configuration ──────────────────────────────────────────────────────────
88
89/// Configuration for an AutoInterface.
90#[derive(Debug, Clone)]
91pub struct AutoConfig {
92    pub name: String,
93    pub group_id: Vec<u8>,
94    pub discovery_scope: String,
95    pub discovery_port: u16,
96    pub data_port: u16,
97    pub multicast_address_type: String,
98    pub allowed_interfaces: Vec<String>,
99    pub ignored_interfaces: Vec<String>,
100    pub configured_bitrate: u64,
101    /// Base interface ID. Per-peer IDs will be assigned dynamically.
102    pub interface_id: InterfaceId,
103}
104
105impl Default for AutoConfig {
106    fn default() -> Self {
107        AutoConfig {
108            name: String::new(),
109            group_id: DEFAULT_GROUP_ID.to_vec(),
110            discovery_scope: SCOPE_LINK.to_string(),
111            discovery_port: DEFAULT_DISCOVERY_PORT,
112            data_port: DEFAULT_DATA_PORT,
113            multicast_address_type: MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string(),
114            allowed_interfaces: Vec::new(),
115            ignored_interfaces: Vec::new(),
116            configured_bitrate: BITRATE_GUESS,
117            interface_id: InterfaceId(0),
118        }
119    }
120}
121
122// ── Multicast address derivation ───────────────────────────────────────────
123
124/// Derive the IPv6 multicast discovery address from group_id, scope, and address type.
125///
126/// Algorithm (matching Python):
127///   1. group_hash = SHA-256(group_id)
128///   2. Build suffix from hash bytes 2..14 as 6 little-endian 16-bit words
129///   3. First word is hardcoded "0"
130///   4. Prefix = "ff" + address_type + scope
131pub fn derive_multicast_address(
132    group_id: &[u8],
133    address_type: &str,
134    scope: &str,
135) -> String {
136    let group_hash = rns_crypto::sha256::sha256(group_id);
137    let g = &group_hash;
138
139    // Build 6 LE 16-bit words from bytes 2..14
140    let w1 = (g[2] as u16) << 8 | g[3] as u16;
141    let w2 = (g[4] as u16) << 8 | g[5] as u16;
142    let w3 = (g[6] as u16) << 8 | g[7] as u16;
143    let w4 = (g[8] as u16) << 8 | g[9] as u16;
144    let w5 = (g[10] as u16) << 8 | g[11] as u16;
145    let w6 = (g[12] as u16) << 8 | g[13] as u16;
146
147    format!(
148        "ff{}{}:0:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
149        address_type, scope, w1, w2, w3, w4, w5, w6
150    )
151}
152
153/// Parse a multicast address string into an Ipv6Addr.
154pub fn parse_multicast_addr(addr: &str) -> Option<Ipv6Addr> {
155    addr.parse::<Ipv6Addr>().ok()
156}
157
158// ── Discovery token ────────────────────────────────────────────────────────
159
160/// Compute the discovery token: SHA-256(group_id + link_local_address_string).
161pub fn compute_discovery_token(group_id: &[u8], link_local_addr: &str) -> [u8; 32] {
162    let mut input = group_id.to_vec();
163    input.extend_from_slice(link_local_addr.as_bytes());
164    rns_crypto::sha256::sha256(&input)
165}
166
167// ── Network interface enumeration ──────────────────────────────────────────
168
169/// Information about a local network interface with an IPv6 link-local address.
170#[derive(Debug, Clone)]
171pub struct LocalInterface {
172    pub name: String,
173    pub link_local_addr: String,
174    pub index: u32,
175}
176
177/// Enumerate network interfaces that have IPv6 link-local addresses (fe80::/10).
178///
179/// Uses `libc::getifaddrs()`. Filters by allowed/ignored interface lists.
180pub fn enumerate_interfaces(
181    allowed: &[String],
182    ignored: &[String],
183) -> Vec<LocalInterface> {
184    let mut result = Vec::new();
185
186    unsafe {
187        let mut ifaddrs: *mut libc::ifaddrs = std::ptr::null_mut();
188        if libc::getifaddrs(&mut ifaddrs) != 0 {
189            return result;
190        }
191
192        let mut current = ifaddrs;
193        while !current.is_null() {
194            let ifa = &*current;
195            current = ifa.ifa_next;
196
197            // Must have an address
198            if ifa.ifa_addr.is_null() {
199                continue;
200            }
201
202            // Must be AF_INET6
203            if (*ifa.ifa_addr).sa_family as i32 != libc::AF_INET6 {
204                continue;
205            }
206
207            // Get interface name
208            let name = match std::ffi::CStr::from_ptr(ifa.ifa_name).to_str() {
209                Ok(s) => s.to_string(),
210                Err(_) => continue,
211            };
212
213            // Check global ignore list
214            if ALL_IGNORE_IFS.iter().any(|&ig| ig == name) {
215                if !allowed.iter().any(|a| a == &name) {
216                    continue;
217                }
218            }
219
220            // Check ignored interfaces
221            if ignored.iter().any(|ig| ig == &name) {
222                continue;
223            }
224
225            // Check allowed interfaces (if non-empty, only allow those)
226            if !allowed.is_empty() && !allowed.iter().any(|a| a == &name) {
227                continue;
228            }
229
230            // Extract IPv6 address
231            let sa6 = ifa.ifa_addr as *const libc::sockaddr_in6;
232            let addr_bytes = (*sa6).sin6_addr.s6_addr;
233            let ipv6 = Ipv6Addr::from(addr_bytes);
234
235            // Must be link-local (fe80::/10)
236            let octets = ipv6.octets();
237            if octets[0] != 0xfe || (octets[1] & 0xc0) != 0x80 {
238                continue;
239            }
240
241            // Format the address (drop scope ID, matching Python's descope_linklocal)
242            let addr_str = format!("{}", ipv6);
243
244            // Get interface index
245            let index = libc::if_nametoindex(ifa.ifa_name);
246            if index == 0 {
247                continue;
248            }
249
250            // Avoid duplicates (same interface may appear multiple times)
251            if result.iter().any(|li: &LocalInterface| li.name == name) {
252                continue;
253            }
254
255            result.push(LocalInterface {
256                name,
257                link_local_addr: addr_str,
258                index,
259            });
260        }
261
262        libc::freeifaddrs(ifaddrs);
263    }
264
265    result
266}
267
268// ── Peer tracking ──────────────────────────────────────────────────────────
269
270/// A discovered peer.
271struct AutoPeer {
272    interface_id: InterfaceId,
273    #[allow(dead_code)]
274    link_local_addr: String,
275    #[allow(dead_code)]
276    ifname: String,
277    last_heard: f64,
278    writer: UdpWriter,
279}
280
281/// Writer that sends UDP unicast data to a peer.
282struct UdpWriter {
283    socket: UdpSocket,
284    target: SocketAddrV6,
285}
286
287impl Writer for UdpWriter {
288    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
289        self.socket.send_to(data, self.target)?;
290        Ok(())
291    }
292}
293
294/// Shared state for the AutoInterface across all threads.
295struct SharedState {
296    /// Known peers: link_local_addr → AutoPeer
297    peers: HashMap<String, AutoPeer>,
298    /// Our own link-local addresses (for echo detection)
299    link_local_addresses: Vec<String>,
300    /// Deduplication deque: (hash, timestamp)
301    dedup_deque: VecDeque<([u8; 32], f64)>,
302    /// Flag set when final_init is done
303    online: bool,
304    /// Next dynamic interface ID
305    next_id: Arc<AtomicU64>,
306}
307
308impl SharedState {
309    fn new(next_id: Arc<AtomicU64>) -> Self {
310        SharedState {
311            peers: HashMap::new(),
312            link_local_addresses: Vec::new(),
313            dedup_deque: VecDeque::new(),
314            online: false,
315            next_id,
316        }
317    }
318
319    /// Check dedup deque for a data hash.
320    fn is_duplicate(&self, hash: &[u8; 32], now: f64) -> bool {
321        for (h, ts) in &self.dedup_deque {
322            if h == hash && now < *ts + MULTI_IF_DEQUE_TTL {
323                return true;
324            }
325        }
326        false
327    }
328
329    /// Add to dedup deque, trimming to max length.
330    fn add_dedup(&mut self, hash: [u8; 32], now: f64) {
331        self.dedup_deque.push_back((hash, now));
332        while self.dedup_deque.len() > MULTI_IF_DEQUE_LEN {
333            self.dedup_deque.pop_front();
334        }
335    }
336
337    /// Refresh a peer's last_heard timestamp.
338    fn refresh_peer(&mut self, addr: &str, now: f64) {
339        if let Some(peer) = self.peers.get_mut(addr) {
340            peer.last_heard = now;
341        }
342    }
343}
344
345// ── Start function ─────────────────────────────────────────────────────────
346
347/// Start an AutoInterface. Discovers local IPv6 link-local interfaces,
348/// sets up multicast discovery, and creates UDP data servers.
349///
350/// Returns a vec of (InterfaceId, Writer) for each initial peer (typically empty
351/// since peers are discovered dynamically via InterfaceUp events).
352pub fn start(
353    config: AutoConfig,
354    tx: EventSender,
355    next_dynamic_id: Arc<AtomicU64>,
356) -> io::Result<()> {
357    let interfaces = enumerate_interfaces(
358        &config.allowed_interfaces,
359        &config.ignored_interfaces,
360    );
361
362    if interfaces.is_empty() {
363        log::warn!(
364            "[{}] No suitable IPv6 link-local interfaces found",
365            config.name,
366        );
367        return Ok(());
368    }
369
370    let group_id = config.group_id.clone();
371    let mcast_addr_str = derive_multicast_address(
372        &group_id,
373        &config.multicast_address_type,
374        &config.discovery_scope,
375    );
376
377    let mcast_ip = match parse_multicast_addr(&mcast_addr_str) {
378        Some(ip) => ip,
379        None => {
380            return Err(io::Error::new(
381                io::ErrorKind::InvalidData,
382                format!("Invalid multicast address: {}", mcast_addr_str),
383            ));
384        }
385    };
386
387    let discovery_port = config.discovery_port;
388    let unicast_discovery_port = config.discovery_port + 1;
389    let data_port = config.data_port;
390    let name = config.name.clone();
391    let announce_interval = ANNOUNCE_INTERVAL;
392    let configured_bitrate = config.configured_bitrate;
393
394    let shared = Arc::new(Mutex::new(SharedState::new(next_dynamic_id)));
395    let running = Arc::new(AtomicBool::new(true));
396
397    // Record our own link-local addresses
398    {
399        let mut state = shared.lock().unwrap();
400        for iface in &interfaces {
401            state.link_local_addresses.push(iface.link_local_addr.clone());
402        }
403    }
404
405    log::info!(
406        "[{}] AutoInterface starting with {} local interfaces, multicast {}",
407        name,
408        interfaces.len(),
409        mcast_addr_str,
410    );
411
412    // Per-interface: set up discovery sockets and threads
413    for local_iface in &interfaces {
414        let ifname = local_iface.name.clone();
415        let link_local = local_iface.link_local_addr.clone();
416        let if_index = local_iface.index;
417
418        // ─── Multicast discovery socket ───────────────────────────────
419        let mcast_socket = create_multicast_recv_socket(
420            &mcast_ip,
421            discovery_port,
422            if_index,
423        )?;
424
425        // ─── Unicast discovery socket ─────────────────────────────────
426        let unicast_socket = create_unicast_recv_socket(
427            &link_local,
428            unicast_discovery_port,
429            if_index,
430        )?;
431
432        // ─── Discovery sender thread ──────────────────────────────────
433        {
434            let group_id = group_id.clone();
435            let link_local = link_local.clone();
436            let running = running.clone();
437            let name = name.clone();
438
439            thread::Builder::new()
440                .name(format!("auto-disc-tx-{}", ifname))
441                .spawn(move || {
442                    discovery_sender_loop(
443                        &group_id,
444                        &link_local,
445                        &mcast_ip,
446                        discovery_port,
447                        if_index,
448                        announce_interval,
449                        &running,
450                        &name,
451                    );
452                })?;
453        }
454
455        // ─── Multicast discovery receiver thread ──────────────────────
456        {
457            let group_id = group_id.clone();
458            let shared = shared.clone();
459            let tx = tx.clone();
460            let running = running.clone();
461            let name = name.clone();
462
463            thread::Builder::new()
464                .name(format!("auto-disc-rx-{}", ifname))
465                .spawn(move || {
466                    discovery_receiver_loop(
467                        mcast_socket,
468                        &group_id,
469                        shared,
470                        tx,
471                        &running,
472                        &name,
473                        data_port,
474                        configured_bitrate,
475                    );
476                })?;
477        }
478
479        // ─── Unicast discovery receiver thread ────────────────────────
480        {
481            let group_id = group_id.clone();
482            let shared = shared.clone();
483            let tx = tx.clone();
484            let running = running.clone();
485            let name = name.clone();
486
487            thread::Builder::new()
488                .name(format!("auto-udisc-rx-{}", ifname))
489                .spawn(move || {
490                    discovery_receiver_loop(
491                        unicast_socket,
492                        &group_id,
493                        shared,
494                        tx,
495                        &running,
496                        &name,
497                        data_port,
498                        configured_bitrate,
499                    );
500                })?;
501        }
502
503        // ─── Data receiver thread ─────────────────────────────────────
504        {
505            let link_local = local_iface.link_local_addr.clone();
506            let shared = shared.clone();
507            let tx = tx.clone();
508            let running = running.clone();
509            let name = name.clone();
510
511            let data_socket = create_data_recv_socket(
512                &link_local,
513                data_port,
514                if_index,
515            )?;
516
517            thread::Builder::new()
518                .name(format!("auto-data-rx-{}", local_iface.name))
519                .spawn(move || {
520                    data_receiver_loop(
521                        data_socket,
522                        shared,
523                        tx,
524                        &running,
525                        &name,
526                    );
527                })?;
528        }
529    }
530
531    // ─── Peer jobs thread ─────────────────────────────────────────────
532    {
533        let shared = shared.clone();
534        let tx = tx.clone();
535        let running = running.clone();
536        let name = name.clone();
537
538        thread::Builder::new()
539            .name(format!("auto-peer-jobs-{}", name))
540            .spawn(move || {
541                peer_jobs_loop(shared, tx, &running, &name);
542            })?;
543    }
544
545    // Wait for initial peering
546    let peering_wait = Duration::from_secs_f64(announce_interval * 1.2);
547    thread::sleep(peering_wait);
548
549    // Mark as online
550    {
551        let mut state = shared.lock().unwrap();
552        state.online = true;
553    }
554
555    log::info!("[{}] AutoInterface online", config.name);
556
557    Ok(())
558}
559
560// ── Socket creation helpers ────────────────────────────────────────────────
561
562fn create_multicast_recv_socket(
563    mcast_ip: &Ipv6Addr,
564    port: u16,
565    if_index: u32,
566) -> io::Result<UdpSocket> {
567    let socket = socket2::Socket::new(
568        socket2::Domain::IPV6,
569        socket2::Type::DGRAM,
570        Some(socket2::Protocol::UDP),
571    )?;
572
573    socket.set_reuse_address(true)?;
574    #[cfg(not(target_os = "windows"))]
575    socket.set_reuse_port(true)?;
576
577    // Bind to [::]:port on the specific interface
578    let bind_addr = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, port, 0, if_index);
579    socket.bind(&bind_addr.into())?;
580
581    // Join multicast group on the specific interface
582    socket.join_multicast_v6(mcast_ip, if_index)?;
583
584    socket.set_nonblocking(false)?;
585    let std_socket: UdpSocket = socket.into();
586    std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
587    Ok(std_socket)
588}
589
590fn create_unicast_recv_socket(
591    link_local: &str,
592    port: u16,
593    if_index: u32,
594) -> io::Result<UdpSocket> {
595    let ip: Ipv6Addr = link_local.parse().map_err(|e| {
596        io::Error::new(io::ErrorKind::InvalidInput, format!("bad IPv6: {}", e))
597    })?;
598
599    let socket = socket2::Socket::new(
600        socket2::Domain::IPV6,
601        socket2::Type::DGRAM,
602        Some(socket2::Protocol::UDP),
603    )?;
604
605    socket.set_reuse_address(true)?;
606    #[cfg(not(target_os = "windows"))]
607    socket.set_reuse_port(true)?;
608
609    let bind_addr = SocketAddrV6::new(ip, port, 0, if_index);
610    socket.bind(&bind_addr.into())?;
611
612    socket.set_nonblocking(false)?;
613    let std_socket: UdpSocket = socket.into();
614    std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
615    Ok(std_socket)
616}
617
618fn create_data_recv_socket(
619    link_local: &str,
620    port: u16,
621    if_index: u32,
622) -> io::Result<UdpSocket> {
623    let ip: Ipv6Addr = link_local.parse().map_err(|e| {
624        io::Error::new(io::ErrorKind::InvalidInput, format!("bad IPv6: {}", e))
625    })?;
626
627    let socket = socket2::Socket::new(
628        socket2::Domain::IPV6,
629        socket2::Type::DGRAM,
630        Some(socket2::Protocol::UDP),
631    )?;
632
633    socket.set_reuse_address(true)?;
634    #[cfg(not(target_os = "windows"))]
635    socket.set_reuse_port(true)?;
636
637    let bind_addr = SocketAddrV6::new(ip, port, 0, if_index);
638    socket.bind(&bind_addr.into())?;
639
640    socket.set_nonblocking(false)?;
641    let std_socket: UdpSocket = socket.into();
642    std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
643    Ok(std_socket)
644}
645
646// ── Thread loops ───────────────────────────────────────────────────────────
647
648/// Discovery sender: periodically sends discovery token via multicast.
649fn discovery_sender_loop(
650    group_id: &[u8],
651    link_local_addr: &str,
652    mcast_ip: &Ipv6Addr,
653    discovery_port: u16,
654    if_index: u32,
655    interval: f64,
656    running: &AtomicBool,
657    name: &str,
658) {
659    let token = compute_discovery_token(group_id, link_local_addr);
660    let sleep_dur = Duration::from_secs_f64(interval);
661
662    while running.load(Ordering::Relaxed) {
663        // Create a fresh socket for each send (matches Python)
664        if let Ok(socket) = UdpSocket::bind("[::]:0") {
665            // Set multicast interface
666            let if_bytes = if_index.to_ne_bytes();
667            unsafe {
668                libc::setsockopt(
669                    socket_fd(&socket),
670                    libc::IPPROTO_IPV6,
671                    libc::IPV6_MULTICAST_IF,
672                    if_bytes.as_ptr() as *const libc::c_void,
673                    4,
674                );
675            }
676
677            let target = SocketAddrV6::new(*mcast_ip, discovery_port, 0, 0);
678            if let Err(e) = socket.send_to(&token, target) {
679                log::debug!("[{}] multicast send error: {}", name, e);
680            }
681        }
682
683        thread::sleep(sleep_dur);
684    }
685}
686
687/// Discovery receiver: listens for discovery tokens and adds peers.
688fn discovery_receiver_loop(
689    socket: UdpSocket,
690    group_id: &[u8],
691    shared: Arc<Mutex<SharedState>>,
692    tx: EventSender,
693    running: &AtomicBool,
694    name: &str,
695    data_port: u16,
696    configured_bitrate: u64,
697) {
698    let mut buf = [0u8; 1024];
699
700    while running.load(Ordering::Relaxed) {
701        match socket.recv_from(&mut buf) {
702            Ok((n, src)) => {
703                if n < 32 {
704                    continue;
705                }
706
707                // Extract source IPv6 address
708                let src_addr = match src {
709                    std::net::SocketAddr::V6(v6) => v6,
710                    _ => continue,
711                };
712                let src_ip = format!("{}", src_addr.ip());
713
714                let peering_hash = &buf[..32];
715                let expected = compute_discovery_token(group_id, &src_ip);
716
717                if peering_hash != expected {
718                    log::debug!(
719                        "[{}] invalid peering hash from {}",
720                        name, src_ip
721                    );
722                    continue;
723                }
724
725                // Check if online
726                let state = shared.lock().unwrap();
727                if !state.online {
728                    // Not fully initialized yet, but still accept for initial peering
729                    // (Python processes after final_init_done)
730                }
731
732                // Check if it's our own echo
733                if state.link_local_addresses.contains(&src_ip) {
734                    // Multicast echo from ourselves — just record it
735                    drop(state);
736                    continue;
737                }
738
739                // Check if already known
740                if state.peers.contains_key(&src_ip) {
741                    let now = crate::time::now();
742                    drop(state);
743                    let mut state = shared.lock().unwrap();
744                    state.refresh_peer(&src_ip, now);
745                    continue;
746                }
747                drop(state);
748
749                // New peer! Create a data writer to send to them.
750                add_peer(
751                    &shared,
752                    &tx,
753                    &src_ip,
754                    data_port,
755                    name,
756                    configured_bitrate,
757                );
758            }
759            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock
760                || e.kind() == io::ErrorKind::TimedOut =>
761            {
762                // Timeout, loop again
763                continue;
764            }
765            Err(e) => {
766                log::warn!("[{}] discovery recv error: {}", name, e);
767                if !running.load(Ordering::Relaxed) {
768                    return;
769                }
770                thread::sleep(Duration::from_millis(100));
771            }
772        }
773    }
774}
775
776/// Add a new peer, creating a writer and emitting InterfaceUp.
777fn add_peer(
778    shared: &Arc<Mutex<SharedState>>,
779    tx: &EventSender,
780    peer_addr: &str,
781    data_port: u16,
782    name: &str,
783    configured_bitrate: u64,
784) {
785    let peer_ip: Ipv6Addr = match peer_addr.parse() {
786        Ok(ip) => ip,
787        Err(_) => return,
788    };
789
790    // Create UDP writer to send data to this peer
791    let send_socket = match UdpSocket::bind("[::]:0") {
792        Ok(s) => s,
793        Err(e) => {
794            log::warn!("[{}] failed to create writer for peer {}: {}", name, peer_addr, e);
795            return;
796        }
797    };
798
799    let target = SocketAddrV6::new(peer_ip, data_port, 0, 0);
800
801    let mut state = shared.lock().unwrap();
802
803    // Double-check not already added (race)
804    if state.peers.contains_key(peer_addr) {
805        state.refresh_peer(peer_addr, crate::time::now());
806        return;
807    }
808
809    let peer_id = InterfaceId(state.next_id.fetch_add(1, Ordering::Relaxed));
810
811    let writer = UdpWriter {
812        socket: send_socket.try_clone().unwrap_or_else(|_| {
813            UdpSocket::bind("[::]:0").expect("bind udp")
814        }),
815        target,
816    };
817
818    // Create a boxed writer for the driver
819    let driver_writer: Box<dyn Writer> = Box::new(UdpWriter {
820        socket: send_socket,
821        target,
822    });
823
824    let peer_info = rns_core::transport::types::InterfaceInfo {
825        id: peer_id,
826        name: format!("{}:{}", name, peer_addr),
827        mode: rns_core::constants::MODE_FULL,
828        out_capable: true,
829        in_capable: true,
830        bitrate: Some(configured_bitrate),
831        announce_rate_target: None,
832        announce_rate_grace: 0,
833        announce_rate_penalty: 0.0,
834        announce_cap: rns_core::constants::ANNOUNCE_CAP,
835        is_local_client: false,
836        wants_tunnel: false,
837        tunnel_id: None,
838    };
839
840    let now = crate::time::now();
841    state.peers.insert(
842        peer_addr.to_string(),
843        AutoPeer {
844            interface_id: peer_id,
845            link_local_addr: peer_addr.to_string(),
846            ifname: String::new(),
847            last_heard: now,
848            writer,
849        },
850    );
851
852    log::info!("[{}] Peer discovered: {} (id={})", name, peer_addr, peer_id.0);
853
854    // Notify driver of new dynamic interface
855    let _ = tx.send(Event::InterfaceUp(
856        peer_id,
857        Some(driver_writer),
858        Some(peer_info),
859    ));
860}
861
862/// Data receiver: receives unicast UDP data from peers and dispatches as frames.
863fn data_receiver_loop(
864    socket: UdpSocket,
865    shared: Arc<Mutex<SharedState>>,
866    tx: EventSender,
867    running: &AtomicBool,
868    name: &str,
869) {
870    let mut buf = [0u8; HW_MTU + 64]; // a bit extra
871
872    while running.load(Ordering::Relaxed) {
873        match socket.recv_from(&mut buf) {
874            Ok((n, src)) => {
875                if n == 0 {
876                    continue;
877                }
878
879                let src_addr = match src {
880                    std::net::SocketAddr::V6(v6) => v6,
881                    _ => continue,
882                };
883                let src_ip = format!("{}", src_addr.ip());
884                let data = &buf[..n];
885
886                let now = crate::time::now();
887                let data_hash = rns_crypto::sha256::sha256(data);
888
889                let mut state = shared.lock().unwrap();
890
891                if !state.online {
892                    continue;
893                }
894
895                // Deduplication
896                if state.is_duplicate(&data_hash, now) {
897                    continue;
898                }
899                state.add_dedup(data_hash, now);
900
901                // Refresh peer
902                state.refresh_peer(&src_ip, now);
903
904                // Find the interface ID for this peer
905                let iface_id = match state.peers.get(&src_ip) {
906                    Some(peer) => peer.interface_id,
907                    None => {
908                        // Unknown peer, skip
909                        continue;
910                    }
911                };
912
913                drop(state);
914
915                if tx
916                    .send(Event::Frame {
917                        interface_id: iface_id,
918                        data: data.to_vec(),
919                    })
920                    .is_err()
921                {
922                    return;
923                }
924            }
925            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock
926                || e.kind() == io::ErrorKind::TimedOut =>
927            {
928                continue;
929            }
930            Err(e) => {
931                log::warn!("[{}] data recv error: {}", name, e);
932                if !running.load(Ordering::Relaxed) {
933                    return;
934                }
935                thread::sleep(Duration::from_millis(100));
936            }
937        }
938    }
939}
940
941/// Peer jobs: periodically cull timed-out peers.
942fn peer_jobs_loop(
943    shared: Arc<Mutex<SharedState>>,
944    tx: EventSender,
945    running: &AtomicBool,
946    name: &str,
947) {
948    let interval = Duration::from_secs_f64(PEER_JOB_INTERVAL);
949
950    while running.load(Ordering::Relaxed) {
951        thread::sleep(interval);
952
953        let now = crate::time::now();
954        let mut timed_out = Vec::new();
955
956        {
957            let state = shared.lock().unwrap();
958            for (addr, peer) in &state.peers {
959                if now > peer.last_heard + PEERING_TIMEOUT {
960                    timed_out.push((addr.clone(), peer.interface_id));
961                }
962            }
963        }
964
965        for (addr, iface_id) in &timed_out {
966            log::info!("[{}] Peer timed out: {}", name, addr);
967            let mut state = shared.lock().unwrap();
968            state.peers.remove(addr.as_str());
969            let _ = tx.send(Event::InterfaceDown(*iface_id));
970        }
971    }
972}
973
974// ── Helper ─────────────────────────────────────────────────────────────────
975
976/// Get the raw file descriptor from a UdpSocket (for setsockopt).
977#[cfg(unix)]
978fn socket_fd(socket: &UdpSocket) -> i32 {
979    use std::os::unix::io::AsRawFd;
980    socket.as_raw_fd()
981}
982
983#[cfg(not(unix))]
984fn socket_fd(_socket: &UdpSocket) -> i32 {
985    0
986}
987
988// ── Tests ──────────────────────────────────────────────────────────────────
989
990#[cfg(test)]
991mod tests {
992    use super::*;
993
994    // ── Multicast address derivation ──────────────────────────────────
995
996    #[test]
997    fn multicast_address_default_group() {
998        // Python vector: ff12:0:d70b:fb1c:16e4:5e39:485e:31e1
999        let addr = derive_multicast_address(
1000            DEFAULT_GROUP_ID,
1001            MULTICAST_TEMPORARY_ADDRESS_TYPE,
1002            SCOPE_LINK,
1003        );
1004        assert_eq!(addr, "ff12:0:d70b:fb1c:16e4:5e39:485e:31e1");
1005    }
1006
1007    #[test]
1008    fn multicast_address_custom_group() {
1009        let addr = derive_multicast_address(
1010            b"testgroup",
1011            MULTICAST_TEMPORARY_ADDRESS_TYPE,
1012            SCOPE_LINK,
1013        );
1014        // Just verify format
1015        assert!(addr.starts_with("ff12:0:"));
1016        // Must be different from default
1017        assert_ne!(addr, "ff12:0:d70b:fb1c:16e4:5e39:485e:31e1");
1018    }
1019
1020    #[test]
1021    fn multicast_address_scope_admin() {
1022        let addr = derive_multicast_address(
1023            DEFAULT_GROUP_ID,
1024            MULTICAST_TEMPORARY_ADDRESS_TYPE,
1025            SCOPE_ADMIN,
1026        );
1027        assert!(addr.starts_with("ff14:0:"));
1028    }
1029
1030    #[test]
1031    fn multicast_address_permanent_type() {
1032        let addr = derive_multicast_address(
1033            DEFAULT_GROUP_ID,
1034            MULTICAST_PERMANENT_ADDRESS_TYPE,
1035            SCOPE_LINK,
1036        );
1037        assert!(addr.starts_with("ff02:0:"));
1038    }
1039
1040    #[test]
1041    fn multicast_address_parseable() {
1042        let addr = derive_multicast_address(
1043            DEFAULT_GROUP_ID,
1044            MULTICAST_TEMPORARY_ADDRESS_TYPE,
1045            SCOPE_LINK,
1046        );
1047        let ip = parse_multicast_addr(&addr);
1048        assert!(ip.is_some());
1049        assert!(ip.unwrap().is_multicast());
1050    }
1051
1052    // ── Discovery token ──────────────────────────────────────────────
1053
1054    #[test]
1055    fn discovery_token_interop() {
1056        // Python vector: fe80::1 → 97b25576749ea936b0d8a8536ffaf442d157cf47d460dcf13c48b7bd18b6c163
1057        let token = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::1");
1058        let expected = "97b25576749ea936b0d8a8536ffaf442d157cf47d460dcf13c48b7bd18b6c163";
1059        let got = token.iter().map(|b| format!("{:02x}", b)).collect::<String>();
1060        assert_eq!(got, expected);
1061    }
1062
1063    #[test]
1064    fn discovery_token_interop_2() {
1065        // Python vector: fe80::dead:beef:1234:5678
1066        let token = compute_discovery_token(
1067            DEFAULT_GROUP_ID,
1068            "fe80::dead:beef:1234:5678",
1069        );
1070        let expected = "46b6ec7595504b6a35f06bd4bfff71567fb82fcf2706cd361bab20409c42d072";
1071        let got = token.iter().map(|b| format!("{:02x}", b)).collect::<String>();
1072        assert_eq!(got, expected);
1073    }
1074
1075    #[test]
1076    fn discovery_token_different_groups() {
1077        let t1 = compute_discovery_token(b"reticulum", "fe80::1");
1078        let t2 = compute_discovery_token(b"othergroup", "fe80::1");
1079        assert_ne!(t1, t2);
1080    }
1081
1082    #[test]
1083    fn discovery_token_different_addrs() {
1084        let t1 = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::1");
1085        let t2 = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::2");
1086        assert_ne!(t1, t2);
1087    }
1088
1089    // ── Deduplication ────────────────────────────────────────────────
1090
1091    #[test]
1092    fn dedup_basic() {
1093        let next_id = Arc::new(AtomicU64::new(1));
1094        let mut state = SharedState::new(next_id);
1095
1096        let hash = [0xAA; 32];
1097        let now = 1000.0;
1098
1099        assert!(!state.is_duplicate(&hash, now));
1100        state.add_dedup(hash, now);
1101        assert!(state.is_duplicate(&hash, now));
1102    }
1103
1104    #[test]
1105    fn dedup_expired() {
1106        let next_id = Arc::new(AtomicU64::new(1));
1107        let mut state = SharedState::new(next_id);
1108
1109        let hash = [0xBB; 32];
1110        state.add_dedup(hash, 1000.0);
1111
1112        // Within TTL
1113        assert!(state.is_duplicate(&hash, 1000.5));
1114        // Expired
1115        assert!(!state.is_duplicate(&hash, 1001.0));
1116    }
1117
1118    #[test]
1119    fn dedup_max_length() {
1120        let next_id = Arc::new(AtomicU64::new(1));
1121        let mut state = SharedState::new(next_id);
1122
1123        // Fill beyond max
1124        for i in 0..MULTI_IF_DEQUE_LEN + 10 {
1125            let mut hash = [0u8; 32];
1126            hash[0] = (i & 0xFF) as u8;
1127            hash[1] = ((i >> 8) & 0xFF) as u8;
1128            state.add_dedup(hash, 1000.0);
1129        }
1130
1131        assert_eq!(state.dedup_deque.len(), MULTI_IF_DEQUE_LEN);
1132    }
1133
1134    // ── Peer tracking ────────────────────────────────────────────────
1135
1136    #[test]
1137    fn peer_refresh() {
1138        let next_id = Arc::new(AtomicU64::new(100));
1139        let mut state = SharedState::new(next_id);
1140
1141        let socket = UdpSocket::bind("[::]:0").unwrap();
1142        let target = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 12345, 0, 0);
1143
1144        state.peers.insert("fe80::1".to_string(), AutoPeer {
1145            interface_id: InterfaceId(100),
1146            link_local_addr: "fe80::1".to_string(),
1147            ifname: "eth0".to_string(),
1148            last_heard: 1000.0,
1149            writer: UdpWriter { socket, target },
1150        });
1151
1152        state.refresh_peer("fe80::1", 2000.0);
1153        assert_eq!(state.peers["fe80::1"].last_heard, 2000.0);
1154    }
1155
1156    #[test]
1157    fn peer_not_found_refresh() {
1158        let next_id = Arc::new(AtomicU64::new(100));
1159        let mut state = SharedState::new(next_id);
1160        // Should not panic
1161        state.refresh_peer("fe80::999", 1000.0);
1162    }
1163
1164    // ── Network interface enumeration ────────────────────────────────
1165
1166    #[test]
1167    fn enumerate_returns_vec() {
1168        // This test just verifies the function runs without crashing.
1169        // Results depend on the system's network configuration.
1170        let interfaces = enumerate_interfaces(&[], &[]);
1171        // On CI/test machines, we may or may not have IPv6 link-local
1172        for iface in &interfaces {
1173            assert!(!iface.name.is_empty());
1174            assert!(iface.link_local_addr.starts_with("fe80"));
1175            assert!(iface.index > 0);
1176        }
1177    }
1178
1179    #[test]
1180    fn enumerate_with_ignored() {
1181        // Ignore everything
1182        let interfaces = enumerate_interfaces(&[], &["lo".to_string(), "eth0".to_string(), "wlan0".to_string(), "enp0s3".to_string(), "docker0".to_string()]);
1183        // May still have some interfaces, but known ones should be filtered
1184        for iface in &interfaces {
1185            assert_ne!(iface.name, "lo");
1186            assert_ne!(iface.name, "eth0");
1187            assert_ne!(iface.name, "wlan0");
1188        }
1189    }
1190
1191    #[test]
1192    fn enumerate_with_allowed_nonexistent() {
1193        // Only allow an interface that doesn't exist
1194        let interfaces = enumerate_interfaces(
1195            &["nonexistent_if_12345".to_string()],
1196            &[],
1197        );
1198        assert!(interfaces.is_empty());
1199    }
1200
1201    // ── Config defaults ──────────────────────────────────────────────
1202
1203    #[test]
1204    fn config_defaults() {
1205        let config = AutoConfig::default();
1206        assert_eq!(config.group_id, DEFAULT_GROUP_ID);
1207        assert_eq!(config.discovery_scope, SCOPE_LINK);
1208        assert_eq!(config.discovery_port, DEFAULT_DISCOVERY_PORT);
1209        assert_eq!(config.data_port, DEFAULT_DATA_PORT);
1210        assert_eq!(config.multicast_address_type, MULTICAST_TEMPORARY_ADDRESS_TYPE);
1211        assert_eq!(config.configured_bitrate, BITRATE_GUESS);
1212        assert!(config.allowed_interfaces.is_empty());
1213        assert!(config.ignored_interfaces.is_empty());
1214    }
1215
1216    // ── Constants ────────────────────────────────────────────────────
1217
1218    #[test]
1219    fn constants_match_python() {
1220        assert_eq!(DEFAULT_DISCOVERY_PORT, 29716);
1221        assert_eq!(DEFAULT_DATA_PORT, 42671);
1222        assert_eq!(HW_MTU, 1196);
1223        assert_eq!(MULTI_IF_DEQUE_LEN, 48);
1224        assert!((MULTI_IF_DEQUE_TTL - 0.75).abs() < f64::EPSILON);
1225        assert!((PEERING_TIMEOUT - 22.0).abs() < f64::EPSILON);
1226        assert!((ANNOUNCE_INTERVAL - 1.6).abs() < f64::EPSILON);
1227        assert!((PEER_JOB_INTERVAL - 4.0).abs() < f64::EPSILON);
1228        assert!((MCAST_ECHO_TIMEOUT - 6.5).abs() < f64::EPSILON);
1229        assert_eq!(BITRATE_GUESS, 10_000_000);
1230    }
1231
1232    #[test]
1233    fn unicast_discovery_port() {
1234        // Python: unicast_discovery_port = discovery_port + 1
1235        let unicast_port = DEFAULT_DISCOVERY_PORT + 1;
1236        assert_eq!(unicast_port, 29717);
1237    }
1238
1239    #[test]
1240    fn reverse_peering_interval() {
1241        let interval = ANNOUNCE_INTERVAL * REVERSE_PEERING_MULTIPLIER;
1242        assert!((interval - 5.2).abs() < 0.01);
1243    }
1244}