Skip to main content

rns_net/interface/
auto.rs

1//! AutoInterface: Zero-configuration LAN auto-discovery via IPv6 multicast.
2//!
3//! Matches Python `AutoInterface` from `RNS/Interfaces/AutoInterface.py`.
4//!
5//! Thread model (per adopted network interface):
6//!   - Discovery sender: periodically sends discovery token via multicast
7//!   - Discovery receiver (multicast): validates tokens, adds peers
8//!   - Discovery receiver (unicast): validates reverse-peering tokens
9//!   - Data receiver: UDP server receiving unicast data from peers
10//!
11//! Additionally one shared thread:
12//!   - Peer jobs: periodically culls timed-out peers
13
14use std::collections::{HashMap, VecDeque};
15use std::io;
16use std::net::{Ipv6Addr, SocketAddrV6, UdpSocket};
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18use std::sync::{Arc, Mutex};
19use std::thread;
20use std::time::Duration;
21
22use rns_core::transport::types::InterfaceId;
23
24use crate::event::{Event, EventSender};
25use crate::interface::Writer;
26
27// ── Constants (matching Python AutoInterface) ──────────────────────────────
28
29/// Default UDP port for multicast discovery.
30pub const DEFAULT_DISCOVERY_PORT: u16 = 29716;
31
32/// Default UDP port for unicast data exchange.
33pub const DEFAULT_DATA_PORT: u16 = 42671;
34
35/// Default group identifier.
36pub const DEFAULT_GROUP_ID: &[u8] = b"reticulum";
37
38/// Default IFAC size for AutoInterface (bytes).
39pub const DEFAULT_IFAC_SIZE: usize = 16;
40
41/// Hardware MTU for AutoInterface packets.
42pub const HW_MTU: usize = 1196;
43
44/// Multicast scope: link-local.
45pub const SCOPE_LINK: &str = "2";
46/// Multicast scope: admin-local.
47pub const SCOPE_ADMIN: &str = "4";
48/// Multicast scope: site-local.
49pub const SCOPE_SITE: &str = "5";
50/// Multicast scope: organization-local.
51pub const SCOPE_ORGANISATION: &str = "8";
52/// Multicast scope: global.
53pub const SCOPE_GLOBAL: &str = "e";
54
55/// Permanent multicast address type.
56pub const MULTICAST_PERMANENT_ADDRESS_TYPE: &str = "0";
57/// Temporary multicast address type.
58pub const MULTICAST_TEMPORARY_ADDRESS_TYPE: &str = "1";
59
60/// How long before a peer is considered timed out (seconds).
61pub const PEERING_TIMEOUT: f64 = 22.0;
62
63/// How often to send multicast discovery announcements (seconds).
64pub const ANNOUNCE_INTERVAL: f64 = 1.6;
65
66/// How often to run peer maintenance jobs (seconds).
67pub const PEER_JOB_INTERVAL: f64 = 4.0;
68
69/// Multicast echo timeout (seconds). Used for carrier detection.
70pub const MCAST_ECHO_TIMEOUT: f64 = 6.5;
71
72/// Default bitrate guess for AutoInterface (10 Mbps).
73pub const BITRATE_GUESS: u64 = 10_000_000;
74
75/// Deduplication deque size.
76pub const MULTI_IF_DEQUE_LEN: usize = 48;
77
78/// Deduplication deque entry TTL (seconds).
79pub const MULTI_IF_DEQUE_TTL: f64 = 0.75;
80
81/// Reverse peering interval multiplier (announce_interval * 3.25).
82pub const REVERSE_PEERING_MULTIPLIER: f64 = 3.25;
83
84/// Interfaces always ignored.
85pub const ALL_IGNORE_IFS: &[&str] = &["lo0"];
86
87// ── Configuration ──────────────────────────────────────────────────────────
88
89/// Configuration for an AutoInterface.
90#[derive(Debug, Clone)]
91pub struct AutoConfig {
92    pub name: String,
93    pub group_id: Vec<u8>,
94    pub discovery_scope: String,
95    pub discovery_port: u16,
96    pub data_port: u16,
97    pub multicast_address_type: String,
98    pub allowed_interfaces: Vec<String>,
99    pub ignored_interfaces: Vec<String>,
100    pub configured_bitrate: u64,
101    /// Base interface ID. Per-peer IDs will be assigned dynamically.
102    pub interface_id: InterfaceId,
103    pub ingress_control: rns_core::transport::types::IngressControlConfig,
104    pub runtime: Arc<Mutex<AutoRuntime>>,
105}
106
107#[derive(Debug, Clone)]
108pub struct AutoRuntime {
109    pub announce_interval_secs: f64,
110    pub peer_timeout_secs: f64,
111    pub peer_job_interval_secs: f64,
112}
113
114impl AutoRuntime {
115    pub fn from_config(_config: &AutoConfig) -> Self {
116        Self {
117            announce_interval_secs: ANNOUNCE_INTERVAL,
118            peer_timeout_secs: PEERING_TIMEOUT,
119            peer_job_interval_secs: PEER_JOB_INTERVAL,
120        }
121    }
122}
123
124#[derive(Debug, Clone)]
125pub struct AutoRuntimeConfigHandle {
126    pub interface_name: String,
127    pub runtime: Arc<Mutex<AutoRuntime>>,
128    pub startup: AutoRuntime,
129}
130
131impl Default for AutoConfig {
132    fn default() -> Self {
133        let mut config = AutoConfig {
134            name: String::new(),
135            group_id: DEFAULT_GROUP_ID.to_vec(),
136            discovery_scope: SCOPE_LINK.to_string(),
137            discovery_port: DEFAULT_DISCOVERY_PORT,
138            data_port: DEFAULT_DATA_PORT,
139            multicast_address_type: MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string(),
140            allowed_interfaces: Vec::new(),
141            ignored_interfaces: Vec::new(),
142            configured_bitrate: BITRATE_GUESS,
143            interface_id: InterfaceId(0),
144            ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
145            runtime: Arc::new(Mutex::new(AutoRuntime {
146                announce_interval_secs: ANNOUNCE_INTERVAL,
147                peer_timeout_secs: PEERING_TIMEOUT,
148                peer_job_interval_secs: PEER_JOB_INTERVAL,
149            })),
150        };
151        let startup = AutoRuntime::from_config(&config);
152        config.runtime = Arc::new(Mutex::new(startup));
153        config
154    }
155}
156
157// ── Multicast address derivation ───────────────────────────────────────────
158
159/// Derive the IPv6 multicast discovery address from group_id, scope, and address type.
160///
161/// Algorithm (matching Python):
162///   1. group_hash = SHA-256(group_id)
163///   2. Build suffix from hash bytes 2..14 as 6 little-endian 16-bit words
164///   3. First word is hardcoded "0"
165///   4. Prefix = "ff" + address_type + scope
166pub fn derive_multicast_address(group_id: &[u8], address_type: &str, scope: &str) -> String {
167    let group_hash = rns_crypto::sha256::sha256(group_id);
168    let g = &group_hash;
169
170    // Build 6 LE 16-bit words from bytes 2..14
171    let w1 = (g[2] as u16) << 8 | g[3] as u16;
172    let w2 = (g[4] as u16) << 8 | g[5] as u16;
173    let w3 = (g[6] as u16) << 8 | g[7] as u16;
174    let w4 = (g[8] as u16) << 8 | g[9] as u16;
175    let w5 = (g[10] as u16) << 8 | g[11] as u16;
176    let w6 = (g[12] as u16) << 8 | g[13] as u16;
177
178    format!(
179        "ff{}{}:0:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
180        address_type, scope, w1, w2, w3, w4, w5, w6
181    )
182}
183
184/// Parse a multicast address string into an Ipv6Addr.
185pub fn parse_multicast_addr(addr: &str) -> Option<Ipv6Addr> {
186    addr.parse::<Ipv6Addr>().ok()
187}
188
189// ── Discovery token ────────────────────────────────────────────────────────
190
191/// Compute the discovery token: SHA-256(group_id + link_local_address_string).
192pub fn compute_discovery_token(group_id: &[u8], link_local_addr: &str) -> [u8; 32] {
193    let mut input = group_id.to_vec();
194    input.extend_from_slice(link_local_addr.as_bytes());
195    rns_crypto::sha256::sha256(&input)
196}
197
198// ── Network interface enumeration ──────────────────────────────────────────
199
200/// Information about a local network interface with an IPv6 link-local address.
201#[derive(Debug, Clone)]
202pub struct LocalInterface {
203    pub name: String,
204    pub link_local_addr: String,
205    pub index: u32,
206}
207
208/// Enumerate network interfaces that have IPv6 link-local addresses (fe80::/10).
209///
210/// Uses `libc::getifaddrs()`. Filters by allowed/ignored interface lists.
211pub fn enumerate_interfaces(allowed: &[String], ignored: &[String]) -> Vec<LocalInterface> {
212    let mut result = Vec::new();
213
214    unsafe {
215        let mut ifaddrs: *mut libc::ifaddrs = std::ptr::null_mut();
216        if libc::getifaddrs(&mut ifaddrs) != 0 {
217            return result;
218        }
219
220        let mut current = ifaddrs;
221        while !current.is_null() {
222            let ifa = &*current;
223            current = ifa.ifa_next;
224
225            // Must have an address
226            if ifa.ifa_addr.is_null() {
227                continue;
228            }
229
230            // Must be AF_INET6
231            if (*ifa.ifa_addr).sa_family as i32 != libc::AF_INET6 {
232                continue;
233            }
234
235            // Get interface name
236            let name = match std::ffi::CStr::from_ptr(ifa.ifa_name).to_str() {
237                Ok(s) => s.to_string(),
238                Err(_) => continue,
239            };
240
241            // Check global ignore list
242            if ALL_IGNORE_IFS.iter().any(|&ig| ig == name) {
243                if !allowed.iter().any(|a| a == &name) {
244                    continue;
245                }
246            }
247
248            // Check ignored interfaces
249            if ignored.iter().any(|ig| ig == &name) {
250                continue;
251            }
252
253            // Check allowed interfaces (if non-empty, only allow those)
254            if !allowed.is_empty() && !allowed.iter().any(|a| a == &name) {
255                continue;
256            }
257
258            // Extract IPv6 address
259            let sa6 = ifa.ifa_addr as *const libc::sockaddr_in6;
260            let addr_bytes = (*sa6).sin6_addr.s6_addr;
261            let ipv6 = Ipv6Addr::from(addr_bytes);
262
263            // Must be link-local (fe80::/10)
264            let octets = ipv6.octets();
265            if octets[0] != 0xfe || (octets[1] & 0xc0) != 0x80 {
266                continue;
267            }
268
269            // Format the address (drop scope ID, matching Python's descope_linklocal)
270            let addr_str = format!("{}", ipv6);
271
272            // Get interface index
273            let index = libc::if_nametoindex(ifa.ifa_name);
274            if index == 0 {
275                continue;
276            }
277
278            // Avoid duplicates (same interface may appear multiple times)
279            if result.iter().any(|li: &LocalInterface| li.name == name) {
280                continue;
281            }
282
283            result.push(LocalInterface {
284                name,
285                link_local_addr: addr_str,
286                index,
287            });
288        }
289
290        libc::freeifaddrs(ifaddrs);
291    }
292
293    result
294}
295
296// ── Peer tracking ──────────────────────────────────────────────────────────
297
298/// A discovered peer.
299struct AutoPeer {
300    interface_id: InterfaceId,
301    #[allow(dead_code)]
302    link_local_addr: String,
303    #[allow(dead_code)]
304    ifname: String,
305    last_heard: f64,
306}
307
308/// Writer that sends UDP unicast data to a peer.
309struct UdpWriter {
310    socket: UdpSocket,
311    target: SocketAddrV6,
312}
313
314impl Writer for UdpWriter {
315    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
316        self.socket.send_to(data, self.target)?;
317        Ok(())
318    }
319}
320
321/// Shared state for the AutoInterface across all threads.
322struct SharedState {
323    /// Known peers: link_local_addr → AutoPeer
324    peers: HashMap<String, AutoPeer>,
325    /// Our own link-local addresses (for echo detection)
326    link_local_addresses: Vec<String>,
327    /// Deduplication deque: (hash, timestamp)
328    dedup_deque: VecDeque<([u8; 32], f64)>,
329    /// Flag set when final_init is done
330    online: bool,
331    /// Next dynamic interface ID
332    next_id: Arc<AtomicU64>,
333}
334
335impl SharedState {
336    fn new(next_id: Arc<AtomicU64>) -> Self {
337        SharedState {
338            peers: HashMap::new(),
339            link_local_addresses: Vec::new(),
340            dedup_deque: VecDeque::new(),
341            online: false,
342            next_id,
343        }
344    }
345
346    /// Check dedup deque for a data hash.
347    fn is_duplicate(&self, hash: &[u8; 32], now: f64) -> bool {
348        for (h, ts) in &self.dedup_deque {
349            if h == hash && now < *ts + MULTI_IF_DEQUE_TTL {
350                return true;
351            }
352        }
353        false
354    }
355
356    /// Add to dedup deque, trimming to max length.
357    fn add_dedup(&mut self, hash: [u8; 32], now: f64) {
358        self.dedup_deque.push_back((hash, now));
359        while self.dedup_deque.len() > MULTI_IF_DEQUE_LEN {
360            self.dedup_deque.pop_front();
361        }
362    }
363
364    /// Refresh a peer's last_heard timestamp.
365    fn refresh_peer(&mut self, addr: &str, now: f64) {
366        if let Some(peer) = self.peers.get_mut(addr) {
367            peer.last_heard = now;
368        }
369    }
370}
371
372// ── Start function ─────────────────────────────────────────────────────────
373
374/// Start an AutoInterface. Discovers local IPv6 link-local interfaces,
375/// sets up multicast discovery, and creates UDP data servers.
376///
377/// Returns a vec of (InterfaceId, Writer) for each initial peer (typically empty
378/// since peers are discovered dynamically via InterfaceUp events).
379pub fn start(
380    config: AutoConfig,
381    tx: EventSender,
382    next_dynamic_id: Arc<AtomicU64>,
383) -> io::Result<()> {
384    let interfaces = enumerate_interfaces(&config.allowed_interfaces, &config.ignored_interfaces);
385
386    if interfaces.is_empty() {
387        log::warn!(
388            "[{}] No suitable IPv6 link-local interfaces found",
389            config.name,
390        );
391        return Ok(());
392    }
393
394    let group_id = config.group_id.clone();
395    let mcast_addr_str = derive_multicast_address(
396        &group_id,
397        &config.multicast_address_type,
398        &config.discovery_scope,
399    );
400
401    let mcast_ip = match parse_multicast_addr(&mcast_addr_str) {
402        Some(ip) => ip,
403        None => {
404            return Err(io::Error::new(
405                io::ErrorKind::InvalidData,
406                format!("Invalid multicast address: {}", mcast_addr_str),
407            ));
408        }
409    };
410
411    let discovery_port = config.discovery_port;
412    let unicast_discovery_port = config.discovery_port + 1;
413    let data_port = config.data_port;
414    let name = config.name.clone();
415    let configured_bitrate = config.configured_bitrate;
416    let ingress_control = config.ingress_control;
417    {
418        let startup = AutoRuntime::from_config(&config);
419        *config.runtime.lock().unwrap() = startup;
420    }
421    let runtime = Arc::clone(&config.runtime);
422
423    let shared = Arc::new(Mutex::new(SharedState::new(next_dynamic_id)));
424    let running = Arc::new(AtomicBool::new(true));
425
426    // Record our own link-local addresses
427    {
428        let mut state = shared.lock().unwrap();
429        for iface in &interfaces {
430            state
431                .link_local_addresses
432                .push(iface.link_local_addr.clone());
433        }
434    }
435
436    log::info!(
437        "[{}] AutoInterface starting with {} local interfaces, multicast {}",
438        name,
439        interfaces.len(),
440        mcast_addr_str,
441    );
442
443    // Per-interface: set up discovery sockets and threads
444    for local_iface in &interfaces {
445        let ifname = local_iface.name.clone();
446        let link_local = local_iface.link_local_addr.clone();
447        let if_index = local_iface.index;
448
449        // ─── Multicast discovery socket ───────────────────────────────
450        let mcast_socket = create_multicast_recv_socket(&mcast_ip, discovery_port, if_index)?;
451
452        // ─── Unicast discovery socket ─────────────────────────────────
453        let unicast_socket =
454            create_unicast_recv_socket(&link_local, unicast_discovery_port, if_index)?;
455
456        // ─── Discovery sender thread ──────────────────────────────────
457        {
458            let group_id = group_id.clone();
459            let link_local = link_local.clone();
460            let running = running.clone();
461            let name = name.clone();
462            let runtime = runtime.clone();
463
464            thread::Builder::new()
465                .name(format!("auto-disc-tx-{}", ifname))
466                .spawn(move || {
467                    discovery_sender_loop(
468                        &group_id,
469                        &link_local,
470                        &mcast_ip,
471                        discovery_port,
472                        if_index,
473                        runtime,
474                        &running,
475                        &name,
476                    );
477                })?;
478        }
479
480        // ─── Multicast discovery receiver thread ──────────────────────
481        {
482            let group_id = group_id.clone();
483            let shared = shared.clone();
484            let tx = tx.clone();
485            let running = running.clone();
486            let name = name.clone();
487            let runtime = runtime.clone();
488
489            thread::Builder::new()
490                .name(format!("auto-disc-rx-{}", ifname))
491                .spawn(move || {
492                    discovery_receiver_loop(
493                        mcast_socket,
494                        &group_id,
495                        shared,
496                        tx,
497                        &running,
498                        &name,
499                        data_port,
500                        configured_bitrate,
501                        ingress_control,
502                        runtime,
503                    );
504                })?;
505        }
506
507        // ─── Unicast discovery receiver thread ────────────────────────
508        {
509            let group_id = group_id.clone();
510            let shared = shared.clone();
511            let tx = tx.clone();
512            let running = running.clone();
513            let name = name.clone();
514            let runtime = runtime.clone();
515            let ingress_control = ingress_control;
516
517            thread::Builder::new()
518                .name(format!("auto-udisc-rx-{}", ifname))
519                .spawn(move || {
520                    discovery_receiver_loop(
521                        unicast_socket,
522                        &group_id,
523                        shared,
524                        tx,
525                        &running,
526                        &name,
527                        data_port,
528                        configured_bitrate,
529                        ingress_control,
530                        runtime,
531                    );
532                })?;
533        }
534
535        // ─── Data receiver thread ─────────────────────────────────────
536        {
537            let link_local = local_iface.link_local_addr.clone();
538            let shared = shared.clone();
539            let tx = tx.clone();
540            let running = running.clone();
541            let name = name.clone();
542
543            let data_socket = create_data_recv_socket(&link_local, data_port, if_index)?;
544
545            thread::Builder::new()
546                .name(format!("auto-data-rx-{}", local_iface.name))
547                .spawn(move || {
548                    data_receiver_loop(data_socket, shared, tx, &running, &name);
549                })?;
550        }
551    }
552
553    // ─── Peer jobs thread ─────────────────────────────────────────────
554    {
555        let shared = shared.clone();
556        let tx = tx.clone();
557        let running = running.clone();
558        let name = name.clone();
559        let runtime = runtime.clone();
560
561        thread::Builder::new()
562            .name(format!("auto-peer-jobs-{}", name))
563            .spawn(move || {
564                peer_jobs_loop(shared, tx, runtime, &running, &name);
565            })?;
566    }
567
568    // Wait for initial peering
569    let announce_interval = runtime.lock().unwrap().announce_interval_secs;
570    let peering_wait = Duration::from_secs_f64(announce_interval * 1.2);
571    thread::sleep(peering_wait);
572
573    // Mark as online
574    {
575        let mut state = shared.lock().unwrap();
576        state.online = true;
577    }
578
579    log::info!("[{}] AutoInterface online", config.name);
580
581    Ok(())
582}
583
584// ── Socket creation helpers ────────────────────────────────────────────────
585
586fn create_multicast_recv_socket(
587    mcast_ip: &Ipv6Addr,
588    port: u16,
589    if_index: u32,
590) -> io::Result<UdpSocket> {
591    let socket = socket2::Socket::new(
592        socket2::Domain::IPV6,
593        socket2::Type::DGRAM,
594        Some(socket2::Protocol::UDP),
595    )?;
596
597    socket.set_reuse_address(true)?;
598    #[cfg(not(target_os = "windows"))]
599    socket.set_reuse_port(true)?;
600
601    // Bind to [::]:port on the specific interface
602    let bind_addr = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, port, 0, if_index);
603    socket.bind(&bind_addr.into())?;
604
605    // Join multicast group on the specific interface
606    socket.join_multicast_v6(mcast_ip, if_index)?;
607
608    socket.set_nonblocking(false)?;
609    let std_socket: UdpSocket = socket.into();
610    std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
611    Ok(std_socket)
612}
613
614fn create_unicast_recv_socket(link_local: &str, port: u16, if_index: u32) -> io::Result<UdpSocket> {
615    let ip: Ipv6Addr = link_local
616        .parse()
617        .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("bad IPv6: {}", e)))?;
618
619    let socket = socket2::Socket::new(
620        socket2::Domain::IPV6,
621        socket2::Type::DGRAM,
622        Some(socket2::Protocol::UDP),
623    )?;
624
625    socket.set_reuse_address(true)?;
626    #[cfg(not(target_os = "windows"))]
627    socket.set_reuse_port(true)?;
628
629    let bind_addr = SocketAddrV6::new(ip, port, 0, if_index);
630    socket.bind(&bind_addr.into())?;
631
632    socket.set_nonblocking(false)?;
633    let std_socket: UdpSocket = socket.into();
634    std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
635    Ok(std_socket)
636}
637
638fn create_data_recv_socket(link_local: &str, port: u16, if_index: u32) -> io::Result<UdpSocket> {
639    let ip: Ipv6Addr = link_local
640        .parse()
641        .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("bad IPv6: {}", e)))?;
642
643    let socket = socket2::Socket::new(
644        socket2::Domain::IPV6,
645        socket2::Type::DGRAM,
646        Some(socket2::Protocol::UDP),
647    )?;
648
649    socket.set_reuse_address(true)?;
650    #[cfg(not(target_os = "windows"))]
651    socket.set_reuse_port(true)?;
652
653    let bind_addr = SocketAddrV6::new(ip, port, 0, if_index);
654    socket.bind(&bind_addr.into())?;
655
656    socket.set_nonblocking(false)?;
657    let std_socket: UdpSocket = socket.into();
658    std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
659    Ok(std_socket)
660}
661
662// ── Thread loops ───────────────────────────────────────────────────────────
663
664/// Discovery sender: periodically sends discovery token via multicast.
665fn discovery_sender_loop(
666    group_id: &[u8],
667    link_local_addr: &str,
668    mcast_ip: &Ipv6Addr,
669    discovery_port: u16,
670    if_index: u32,
671    runtime: Arc<Mutex<AutoRuntime>>,
672    running: &AtomicBool,
673    name: &str,
674) {
675    let token = compute_discovery_token(group_id, link_local_addr);
676
677    while running.load(Ordering::Relaxed) {
678        // Create a fresh socket for each send (matches Python)
679        if let Ok(socket) = UdpSocket::bind("[::]:0") {
680            // Set multicast interface
681            let if_bytes = if_index.to_ne_bytes();
682            unsafe {
683                libc::setsockopt(
684                    socket_fd(&socket),
685                    libc::IPPROTO_IPV6,
686                    libc::IPV6_MULTICAST_IF,
687                    if_bytes.as_ptr() as *const libc::c_void,
688                    4,
689                );
690            }
691
692            let target = SocketAddrV6::new(*mcast_ip, discovery_port, 0, 0);
693            if let Err(e) = socket.send_to(&token, target) {
694                log::debug!("[{}] multicast send error: {}", name, e);
695            }
696        }
697
698        let sleep_dur =
699            Duration::from_secs_f64(runtime.lock().unwrap().announce_interval_secs.max(0.1));
700        thread::sleep(sleep_dur);
701    }
702}
703
704/// Discovery receiver: listens for discovery tokens and adds peers.
705fn discovery_receiver_loop(
706    socket: UdpSocket,
707    group_id: &[u8],
708    shared: Arc<Mutex<SharedState>>,
709    tx: EventSender,
710    running: &AtomicBool,
711    name: &str,
712    data_port: u16,
713    configured_bitrate: u64,
714    ingress_control: rns_core::transport::types::IngressControlConfig,
715    runtime: Arc<Mutex<AutoRuntime>>,
716) {
717    let mut buf = [0u8; 1024];
718
719    while running.load(Ordering::Relaxed) {
720        match socket.recv_from(&mut buf) {
721            Ok((n, src)) => {
722                if n < 32 {
723                    continue;
724                }
725
726                // Extract source IPv6 address
727                let src_addr = match src {
728                    std::net::SocketAddr::V6(v6) => v6,
729                    _ => continue,
730                };
731                let src_ip = format!("{}", src_addr.ip());
732
733                let peering_hash = &buf[..32];
734                let expected = compute_discovery_token(group_id, &src_ip);
735
736                if peering_hash != expected {
737                    log::debug!("[{}] invalid peering hash from {}", name, src_ip);
738                    continue;
739                }
740
741                // Check if online
742                let state = shared.lock().unwrap();
743                if !state.online {
744                    // Not fully initialized yet, but still accept for initial peering
745                    // (Python processes after final_init_done)
746                }
747
748                // Check if it's our own echo
749                if state.link_local_addresses.contains(&src_ip) {
750                    // Multicast echo from ourselves — just record it
751                    drop(state);
752                    continue;
753                }
754
755                // Check if already known
756                if state.peers.contains_key(&src_ip) {
757                    let now = crate::time::now();
758                    drop(state);
759                    let mut state = shared.lock().unwrap();
760                    state.refresh_peer(&src_ip, now);
761                    continue;
762                }
763                drop(state);
764
765                // New peer! Create a data writer to send to them.
766                add_peer(
767                    &shared,
768                    &tx,
769                    &src_ip,
770                    data_port,
771                    name,
772                    configured_bitrate,
773                    ingress_control,
774                    &runtime,
775                );
776            }
777            Err(ref e)
778                if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut =>
779            {
780                // Timeout, loop again
781                continue;
782            }
783            Err(e) => {
784                log::warn!("[{}] discovery recv error: {}", name, e);
785                if !running.load(Ordering::Relaxed) {
786                    return;
787                }
788                thread::sleep(Duration::from_millis(100));
789            }
790        }
791    }
792}
793
794/// Add a new peer, creating a writer and emitting InterfaceUp.
795fn add_peer(
796    shared: &Arc<Mutex<SharedState>>,
797    tx: &EventSender,
798    peer_addr: &str,
799    data_port: u16,
800    name: &str,
801    configured_bitrate: u64,
802    ingress_control: rns_core::transport::types::IngressControlConfig,
803    _runtime: &Arc<Mutex<AutoRuntime>>,
804) {
805    let peer_ip: Ipv6Addr = match peer_addr.parse() {
806        Ok(ip) => ip,
807        Err(_) => return,
808    };
809
810    // Create UDP writer to send data to this peer
811    let send_socket = match UdpSocket::bind("[::]:0") {
812        Ok(s) => s,
813        Err(e) => {
814            log::warn!(
815                "[{}] failed to create writer for peer {}: {}",
816                name,
817                peer_addr,
818                e
819            );
820            return;
821        }
822    };
823
824    let target = SocketAddrV6::new(peer_ip, data_port, 0, 0);
825
826    let mut state = shared.lock().unwrap();
827
828    // Double-check not already added (race)
829    if state.peers.contains_key(peer_addr) {
830        state.refresh_peer(peer_addr, crate::time::now());
831        return;
832    }
833
834    let peer_id = InterfaceId(state.next_id.fetch_add(1, Ordering::Relaxed));
835
836    // Create a boxed writer for the driver
837    let driver_writer: Box<dyn Writer> = Box::new(UdpWriter {
838        socket: send_socket,
839        target,
840    });
841
842    let peer_info = rns_core::transport::types::InterfaceInfo {
843        id: peer_id,
844        name: format!("{}:{}", name, peer_addr),
845        mode: rns_core::constants::MODE_FULL,
846        out_capable: true,
847        in_capable: true,
848        bitrate: Some(configured_bitrate),
849        announce_rate_target: None,
850        announce_rate_grace: 0,
851        announce_rate_penalty: 0.0,
852        announce_cap: rns_core::constants::ANNOUNCE_CAP,
853        is_local_client: false,
854        wants_tunnel: false,
855        tunnel_id: None,
856        mtu: 1400,
857        ia_freq: 0.0,
858        started: 0.0,
859        ingress_control,
860    };
861
862    let now = crate::time::now();
863    state.peers.insert(
864        peer_addr.to_string(),
865        AutoPeer {
866            interface_id: peer_id,
867            link_local_addr: peer_addr.to_string(),
868            ifname: String::new(),
869            last_heard: now,
870        },
871    );
872
873    log::info!(
874        "[{}] Peer discovered: {} (id={})",
875        name,
876        peer_addr,
877        peer_id.0
878    );
879
880    // Notify driver of new dynamic interface
881    let _ = tx.send(Event::InterfaceUp(
882        peer_id,
883        Some(driver_writer),
884        Some(peer_info),
885    ));
886}
887
888/// Data receiver: receives unicast UDP data from peers and dispatches as frames.
889fn data_receiver_loop(
890    socket: UdpSocket,
891    shared: Arc<Mutex<SharedState>>,
892    tx: EventSender,
893    running: &AtomicBool,
894    name: &str,
895) {
896    let mut buf = [0u8; HW_MTU + 64]; // a bit extra
897
898    while running.load(Ordering::Relaxed) {
899        match socket.recv_from(&mut buf) {
900            Ok((n, src)) => {
901                if n == 0 {
902                    continue;
903                }
904
905                let src_addr = match src {
906                    std::net::SocketAddr::V6(v6) => v6,
907                    _ => continue,
908                };
909                let src_ip = format!("{}", src_addr.ip());
910                let data = &buf[..n];
911
912                let now = crate::time::now();
913                let data_hash = rns_crypto::sha256::sha256(data);
914
915                let mut state = shared.lock().unwrap();
916
917                if !state.online {
918                    continue;
919                }
920
921                // Deduplication
922                if state.is_duplicate(&data_hash, now) {
923                    continue;
924                }
925                state.add_dedup(data_hash, now);
926
927                // Refresh peer
928                state.refresh_peer(&src_ip, now);
929
930                // Find the interface ID for this peer
931                let iface_id = match state.peers.get(&src_ip) {
932                    Some(peer) => peer.interface_id,
933                    None => {
934                        // Unknown peer, skip
935                        continue;
936                    }
937                };
938
939                drop(state);
940
941                if tx
942                    .send(Event::Frame {
943                        interface_id: iface_id,
944                        data: data.to_vec(),
945                    })
946                    .is_err()
947                {
948                    return;
949                }
950            }
951            Err(ref e)
952                if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut =>
953            {
954                continue;
955            }
956            Err(e) => {
957                log::warn!("[{}] data recv error: {}", name, e);
958                if !running.load(Ordering::Relaxed) {
959                    return;
960                }
961                thread::sleep(Duration::from_millis(100));
962            }
963        }
964    }
965}
966
967/// Peer jobs: periodically cull timed-out peers.
968fn peer_jobs_loop(
969    shared: Arc<Mutex<SharedState>>,
970    tx: EventSender,
971    runtime: Arc<Mutex<AutoRuntime>>,
972    running: &AtomicBool,
973    name: &str,
974) {
975    while running.load(Ordering::Relaxed) {
976        let interval =
977            Duration::from_secs_f64(runtime.lock().unwrap().peer_job_interval_secs.max(0.1));
978        thread::sleep(interval);
979
980        let now = crate::time::now();
981        let mut timed_out = Vec::new();
982        let peer_timeout_secs = runtime.lock().unwrap().peer_timeout_secs;
983
984        {
985            let state = shared.lock().unwrap();
986            for (addr, peer) in &state.peers {
987                if now > peer.last_heard + peer_timeout_secs {
988                    timed_out.push((addr.clone(), peer.interface_id));
989                }
990            }
991        }
992
993        for (addr, iface_id) in &timed_out {
994            log::info!("[{}] Peer timed out: {}", name, addr);
995            let mut state = shared.lock().unwrap();
996            state.peers.remove(addr.as_str());
997            let _ = tx.send(Event::InterfaceDown(*iface_id));
998        }
999    }
1000}
1001
1002// ── Helper ─────────────────────────────────────────────────────────────────
1003
1004/// Get the raw file descriptor from a UdpSocket (for setsockopt).
1005#[cfg(unix)]
1006fn socket_fd(socket: &UdpSocket) -> i32 {
1007    use std::os::unix::io::AsRawFd;
1008    socket.as_raw_fd()
1009}
1010
1011#[cfg(not(unix))]
1012fn socket_fd(_socket: &UdpSocket) -> i32 {
1013    0
1014}
1015
1016// ── Factory implementation ─────────────────────────────────────────────────
1017
1018use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
1019
1020/// Factory for `AutoInterface`.
1021pub struct AutoFactory;
1022
1023impl InterfaceFactory for AutoFactory {
1024    fn type_name(&self) -> &str {
1025        "AutoInterface"
1026    }
1027
1028    fn parse_config(
1029        &self,
1030        name: &str,
1031        id: InterfaceId,
1032        params: &HashMap<String, String>,
1033    ) -> Result<Box<dyn InterfaceConfigData>, String> {
1034        let group_id = params
1035            .get("group_id")
1036            .map(|v| v.as_bytes().to_vec())
1037            .unwrap_or_else(|| DEFAULT_GROUP_ID.to_vec());
1038
1039        let discovery_scope = params
1040            .get("discovery_scope")
1041            .map(|v| match v.to_lowercase().as_str() {
1042                "link" => SCOPE_LINK.to_string(),
1043                "admin" => SCOPE_ADMIN.to_string(),
1044                "site" => SCOPE_SITE.to_string(),
1045                "organisation" | "organization" => SCOPE_ORGANISATION.to_string(),
1046                "global" => SCOPE_GLOBAL.to_string(),
1047                _ => v.clone(),
1048            })
1049            .unwrap_or_else(|| SCOPE_LINK.to_string());
1050
1051        let discovery_port = params
1052            .get("discovery_port")
1053            .and_then(|v| v.parse().ok())
1054            .unwrap_or(DEFAULT_DISCOVERY_PORT);
1055
1056        let data_port = params
1057            .get("data_port")
1058            .and_then(|v| v.parse().ok())
1059            .unwrap_or(DEFAULT_DATA_PORT);
1060
1061        let multicast_address_type = params
1062            .get("multicast_address_type")
1063            .map(|v| match v.to_lowercase().as_str() {
1064                "permanent" => MULTICAST_PERMANENT_ADDRESS_TYPE.to_string(),
1065                "temporary" => MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string(),
1066                _ => v.clone(),
1067            })
1068            .unwrap_or_else(|| MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string());
1069
1070        let configured_bitrate = params
1071            .get("configured_bitrate")
1072            .or_else(|| params.get("bitrate"))
1073            .and_then(|v| v.parse().ok())
1074            .unwrap_or(BITRATE_GUESS);
1075
1076        let allowed_interfaces = params
1077            .get("devices")
1078            .or_else(|| params.get("allowed_interfaces"))
1079            .map(|v| {
1080                v.split(',')
1081                    .map(|s| s.trim().to_string())
1082                    .filter(|s| !s.is_empty())
1083                    .collect()
1084            })
1085            .unwrap_or_default();
1086
1087        let ignored_interfaces = params
1088            .get("ignored_devices")
1089            .or_else(|| params.get("ignored_interfaces"))
1090            .map(|v| {
1091                v.split(',')
1092                    .map(|s| s.trim().to_string())
1093                    .filter(|s| !s.is_empty())
1094                    .collect()
1095            })
1096            .unwrap_or_default();
1097
1098        Ok(Box::new(AutoConfig {
1099            name: name.to_string(),
1100            group_id,
1101            discovery_scope,
1102            discovery_port,
1103            data_port,
1104            multicast_address_type,
1105            allowed_interfaces,
1106            ignored_interfaces,
1107            configured_bitrate,
1108            interface_id: id,
1109            ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
1110            runtime: Arc::new(Mutex::new(AutoRuntime {
1111                announce_interval_secs: ANNOUNCE_INTERVAL,
1112                peer_timeout_secs: PEERING_TIMEOUT,
1113                peer_job_interval_secs: PEER_JOB_INTERVAL,
1114            })),
1115        }))
1116    }
1117
1118    fn start(
1119        &self,
1120        config: Box<dyn InterfaceConfigData>,
1121        ctx: StartContext,
1122    ) -> std::io::Result<StartResult> {
1123        let mut auto_config = *config.into_any().downcast::<AutoConfig>().map_err(|_| {
1124            std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
1125        })?;
1126
1127        auto_config.ingress_control = ctx.ingress_control;
1128        start(auto_config, ctx.tx, ctx.next_dynamic_id)?;
1129        Ok(StartResult::Listener { control: None })
1130    }
1131}
1132
1133pub(crate) fn auto_runtime_handle_from_config(config: &AutoConfig) -> AutoRuntimeConfigHandle {
1134    AutoRuntimeConfigHandle {
1135        interface_name: config.name.clone(),
1136        runtime: Arc::clone(&config.runtime),
1137        startup: AutoRuntime::from_config(config),
1138    }
1139}
1140
1141// ── Tests ──────────────────────────────────────────────────────────────────
1142
1143#[cfg(test)]
1144mod tests {
1145    use super::*;
1146
1147    // ── Multicast address derivation ──────────────────────────────────
1148
1149    #[test]
1150    fn multicast_address_default_group() {
1151        // Python vector: ff12:0:d70b:fb1c:16e4:5e39:485e:31e1
1152        let addr = derive_multicast_address(
1153            DEFAULT_GROUP_ID,
1154            MULTICAST_TEMPORARY_ADDRESS_TYPE,
1155            SCOPE_LINK,
1156        );
1157        assert_eq!(addr, "ff12:0:d70b:fb1c:16e4:5e39:485e:31e1");
1158    }
1159
1160    #[test]
1161    fn multicast_address_custom_group() {
1162        let addr =
1163            derive_multicast_address(b"testgroup", MULTICAST_TEMPORARY_ADDRESS_TYPE, SCOPE_LINK);
1164        // Just verify format
1165        assert!(addr.starts_with("ff12:0:"));
1166        // Must be different from default
1167        assert_ne!(addr, "ff12:0:d70b:fb1c:16e4:5e39:485e:31e1");
1168    }
1169
1170    #[test]
1171    fn multicast_address_scope_admin() {
1172        let addr = derive_multicast_address(
1173            DEFAULT_GROUP_ID,
1174            MULTICAST_TEMPORARY_ADDRESS_TYPE,
1175            SCOPE_ADMIN,
1176        );
1177        assert!(addr.starts_with("ff14:0:"));
1178    }
1179
1180    #[test]
1181    fn multicast_address_permanent_type() {
1182        let addr = derive_multicast_address(
1183            DEFAULT_GROUP_ID,
1184            MULTICAST_PERMANENT_ADDRESS_TYPE,
1185            SCOPE_LINK,
1186        );
1187        assert!(addr.starts_with("ff02:0:"));
1188    }
1189
1190    #[test]
1191    fn multicast_address_parseable() {
1192        let addr = derive_multicast_address(
1193            DEFAULT_GROUP_ID,
1194            MULTICAST_TEMPORARY_ADDRESS_TYPE,
1195            SCOPE_LINK,
1196        );
1197        let ip = parse_multicast_addr(&addr);
1198        assert!(ip.is_some());
1199        assert!(ip.unwrap().is_multicast());
1200    }
1201
1202    // ── Discovery token ──────────────────────────────────────────────
1203
1204    #[test]
1205    fn discovery_token_interop() {
1206        // Python vector: fe80::1 → 97b25576749ea936b0d8a8536ffaf442d157cf47d460dcf13c48b7bd18b6c163
1207        let token = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::1");
1208        let expected = "97b25576749ea936b0d8a8536ffaf442d157cf47d460dcf13c48b7bd18b6c163";
1209        let got = token
1210            .iter()
1211            .map(|b| format!("{:02x}", b))
1212            .collect::<String>();
1213        assert_eq!(got, expected);
1214    }
1215
1216    #[test]
1217    fn discovery_token_interop_2() {
1218        // Python vector: fe80::dead:beef:1234:5678
1219        let token = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::dead:beef:1234:5678");
1220        let expected = "46b6ec7595504b6a35f06bd4bfff71567fb82fcf2706cd361bab20409c42d072";
1221        let got = token
1222            .iter()
1223            .map(|b| format!("{:02x}", b))
1224            .collect::<String>();
1225        assert_eq!(got, expected);
1226    }
1227
1228    #[test]
1229    fn discovery_token_different_groups() {
1230        let t1 = compute_discovery_token(b"reticulum", "fe80::1");
1231        let t2 = compute_discovery_token(b"othergroup", "fe80::1");
1232        assert_ne!(t1, t2);
1233    }
1234
1235    #[test]
1236    fn discovery_token_different_addrs() {
1237        let t1 = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::1");
1238        let t2 = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::2");
1239        assert_ne!(t1, t2);
1240    }
1241
1242    // ── Deduplication ────────────────────────────────────────────────
1243
1244    #[test]
1245    fn dedup_basic() {
1246        let next_id = Arc::new(AtomicU64::new(1));
1247        let mut state = SharedState::new(next_id);
1248
1249        let hash = [0xAA; 32];
1250        let now = 1000.0;
1251
1252        assert!(!state.is_duplicate(&hash, now));
1253        state.add_dedup(hash, now);
1254        assert!(state.is_duplicate(&hash, now));
1255    }
1256
1257    #[test]
1258    fn dedup_expired() {
1259        let next_id = Arc::new(AtomicU64::new(1));
1260        let mut state = SharedState::new(next_id);
1261
1262        let hash = [0xBB; 32];
1263        state.add_dedup(hash, 1000.0);
1264
1265        // Within TTL
1266        assert!(state.is_duplicate(&hash, 1000.5));
1267        // Expired
1268        assert!(!state.is_duplicate(&hash, 1001.0));
1269    }
1270
1271    #[test]
1272    fn dedup_max_length() {
1273        let next_id = Arc::new(AtomicU64::new(1));
1274        let mut state = SharedState::new(next_id);
1275
1276        // Fill beyond max
1277        for i in 0..MULTI_IF_DEQUE_LEN + 10 {
1278            let mut hash = [0u8; 32];
1279            hash[0] = (i & 0xFF) as u8;
1280            hash[1] = ((i >> 8) & 0xFF) as u8;
1281            state.add_dedup(hash, 1000.0);
1282        }
1283
1284        assert_eq!(state.dedup_deque.len(), MULTI_IF_DEQUE_LEN);
1285    }
1286
1287    // ── Peer tracking ────────────────────────────────────────────────
1288
1289    #[test]
1290    fn peer_refresh() {
1291        let next_id = Arc::new(AtomicU64::new(100));
1292        let mut state = SharedState::new(next_id);
1293
1294        state.peers.insert(
1295            "fe80::1".to_string(),
1296            AutoPeer {
1297                interface_id: InterfaceId(100),
1298                link_local_addr: "fe80::1".to_string(),
1299                ifname: "eth0".to_string(),
1300                last_heard: 1000.0,
1301            },
1302        );
1303
1304        state.refresh_peer("fe80::1", 2000.0);
1305        assert_eq!(state.peers["fe80::1"].last_heard, 2000.0);
1306    }
1307
1308    #[test]
1309    fn peer_not_found_refresh() {
1310        let next_id = Arc::new(AtomicU64::new(100));
1311        let mut state = SharedState::new(next_id);
1312        // Should not panic
1313        state.refresh_peer("fe80::999", 1000.0);
1314    }
1315
1316    // ── Network interface enumeration ────────────────────────────────
1317
1318    #[test]
1319    fn enumerate_returns_vec() {
1320        // This test just verifies the function runs without crashing.
1321        // Results depend on the system's network configuration.
1322        let interfaces = enumerate_interfaces(&[], &[]);
1323        // On CI/test machines, we may or may not have IPv6 link-local
1324        for iface in &interfaces {
1325            assert!(!iface.name.is_empty());
1326            assert!(iface.link_local_addr.starts_with("fe80"));
1327            assert!(iface.index > 0);
1328        }
1329    }
1330
1331    #[test]
1332    fn enumerate_with_ignored() {
1333        // Ignore everything
1334        let interfaces = enumerate_interfaces(
1335            &[],
1336            &[
1337                "lo".to_string(),
1338                "eth0".to_string(),
1339                "wlan0".to_string(),
1340                "enp0s3".to_string(),
1341                "docker0".to_string(),
1342            ],
1343        );
1344        // May still have some interfaces, but known ones should be filtered
1345        for iface in &interfaces {
1346            assert_ne!(iface.name, "lo");
1347            assert_ne!(iface.name, "eth0");
1348            assert_ne!(iface.name, "wlan0");
1349        }
1350    }
1351
1352    #[test]
1353    fn enumerate_with_allowed_nonexistent() {
1354        // Only allow an interface that doesn't exist
1355        let interfaces = enumerate_interfaces(&["nonexistent_if_12345".to_string()], &[]);
1356        assert!(interfaces.is_empty());
1357    }
1358
1359    // ── Config defaults ──────────────────────────────────────────────
1360
1361    #[test]
1362    fn config_defaults() {
1363        let config = AutoConfig::default();
1364        assert_eq!(config.group_id, DEFAULT_GROUP_ID);
1365        assert_eq!(config.discovery_scope, SCOPE_LINK);
1366        assert_eq!(config.discovery_port, DEFAULT_DISCOVERY_PORT);
1367        assert_eq!(config.data_port, DEFAULT_DATA_PORT);
1368        assert_eq!(
1369            config.multicast_address_type,
1370            MULTICAST_TEMPORARY_ADDRESS_TYPE
1371        );
1372        assert_eq!(config.configured_bitrate, BITRATE_GUESS);
1373        assert!(config.allowed_interfaces.is_empty());
1374        assert!(config.ignored_interfaces.is_empty());
1375    }
1376
1377    // ── Constants ────────────────────────────────────────────────────
1378
1379    #[test]
1380    fn constants_match_python() {
1381        assert_eq!(DEFAULT_DISCOVERY_PORT, 29716);
1382        assert_eq!(DEFAULT_DATA_PORT, 42671);
1383        assert_eq!(HW_MTU, 1196);
1384        assert_eq!(MULTI_IF_DEQUE_LEN, 48);
1385        assert!((MULTI_IF_DEQUE_TTL - 0.75).abs() < f64::EPSILON);
1386        assert!((PEERING_TIMEOUT - 22.0).abs() < f64::EPSILON);
1387        assert!((ANNOUNCE_INTERVAL - 1.6).abs() < f64::EPSILON);
1388        assert!((PEER_JOB_INTERVAL - 4.0).abs() < f64::EPSILON);
1389        assert!((MCAST_ECHO_TIMEOUT - 6.5).abs() < f64::EPSILON);
1390        assert_eq!(BITRATE_GUESS, 10_000_000);
1391    }
1392
1393    #[test]
1394    fn unicast_discovery_port() {
1395        // Python: unicast_discovery_port = discovery_port + 1
1396        let unicast_port = DEFAULT_DISCOVERY_PORT + 1;
1397        assert_eq!(unicast_port, 29717);
1398    }
1399
1400    #[test]
1401    fn reverse_peering_interval() {
1402        let interval = ANNOUNCE_INTERVAL * REVERSE_PEERING_MULTIPLIER;
1403        assert!((interval - 5.2).abs() < 0.01);
1404    }
1405}