Skip to main content

rns_net/
discovery.rs

1//! Interface Discovery protocol implementation.
2//!
3//! Handles receiving, validating, and storing discovered interface announcements
4//! from other Reticulum nodes on the network.
5//!
6//! Pure types and parsing live in `common::discovery`; this module contains
7//! I/O storage and background-threaded stamp generation / announcing.
8//!
9//! Python reference: RNS/Discovery.py
10
11// Re-export everything from common::discovery so existing `crate::discovery::X` paths work.
12pub use crate::common::discovery::*;
13
14use std::fs;
15use std::io;
16use std::path::PathBuf;
17use std::sync::{Mutex, MutexGuard};
18
19use rns_core::msgpack::{self, Value};
20use rns_core::stamp::{stamp_valid, stamp_workblock};
21use rns_crypto::sha256::sha256;
22
23use crate::time;
24
25// ============================================================================
26// Storage
27// ============================================================================
28
29static DISCOVERY_STORAGE_LOCK: Mutex<()> = Mutex::new(());
30
31/// Persistent storage for discovered interfaces
32pub struct DiscoveredInterfaceStorage {
33    base_path: PathBuf,
34}
35
36impl DiscoveredInterfaceStorage {
37    /// Create a new storage instance
38    pub fn new(base_path: PathBuf) -> Self {
39        Self { base_path }
40    }
41
42    /// Store a discovered interface
43    pub fn store(&self, iface: &DiscoveredInterface) -> io::Result<()> {
44        let _guard = discovery_storage_guard();
45        self.store_unlocked(iface)
46    }
47
48    fn store_unlocked(&self, iface: &DiscoveredInterface) -> io::Result<()> {
49        let filename = hex_encode(&iface.discovery_hash);
50        let filepath = self.base_path.join(filename);
51
52        let data = self.serialize_interface(iface)?;
53        fs::write(&filepath, &data)
54    }
55
56    /// Store a newly received interface announce, preserving persistent counters.
57    pub fn store_received(&self, iface: &mut DiscoveredInterface) -> io::Result<()> {
58        let _guard = discovery_storage_guard();
59        match self.load_unlocked(&iface.discovery_hash) {
60            Ok(Some(existing)) => {
61                iface.discovered = existing.discovered;
62                iface.heard_count = existing.heard_count.saturating_add(1);
63            }
64            Ok(None) => {
65                iface.discovered = iface.last_heard;
66                iface.heard_count = 1;
67            }
68            Err(err) => {
69                log::error!(
70                    "Error while reading existing data for discovered interface, re-creating data: {}",
71                    err
72                );
73                iface.discovered = iface.last_heard;
74                iface.heard_count = 1;
75            }
76        }
77
78        self.store_unlocked(iface)
79    }
80
81    /// Load a discovered interface by its discovery hash
82    pub fn load(&self, discovery_hash: &[u8; 32]) -> io::Result<Option<DiscoveredInterface>> {
83        let _guard = discovery_storage_guard();
84        self.load_unlocked(discovery_hash)
85    }
86
87    fn load_unlocked(&self, discovery_hash: &[u8; 32]) -> io::Result<Option<DiscoveredInterface>> {
88        let filename = hex_encode(discovery_hash);
89        let filepath = self.base_path.join(filename);
90
91        if !filepath.exists() {
92            return Ok(None);
93        }
94
95        let data = fs::read(&filepath)?;
96        self.deserialize_interface(&data).map(Some)
97    }
98
99    /// List all discovered interfaces
100    pub fn list(&self) -> io::Result<Vec<DiscoveredInterface>> {
101        let _guard = discovery_storage_guard();
102        self.list_unlocked()
103    }
104
105    fn list_unlocked(&self) -> io::Result<Vec<DiscoveredInterface>> {
106        let mut interfaces = Vec::new();
107
108        let entries = match fs::read_dir(&self.base_path) {
109            Ok(e) => e,
110            Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(interfaces),
111            Err(e) => return Err(e),
112        };
113
114        for entry in entries {
115            let entry = entry?;
116            let path = entry.path();
117
118            if !path.is_file() {
119                continue;
120            }
121
122            match fs::read(&path) {
123                Ok(data) => {
124                    if let Ok(iface) = self.deserialize_interface(&data) {
125                        interfaces.push(iface);
126                    }
127                }
128                Err(_) => continue,
129            }
130        }
131
132        Ok(interfaces)
133    }
134
135    /// Remove a discovered interface by its discovery hash
136    pub fn remove(&self, discovery_hash: &[u8; 32]) -> io::Result<()> {
137        let _guard = discovery_storage_guard();
138        self.remove_unlocked(discovery_hash)
139    }
140
141    fn remove_unlocked(&self, discovery_hash: &[u8; 32]) -> io::Result<()> {
142        let filename = hex_encode(discovery_hash);
143        let filepath = self.base_path.join(filename);
144
145        if filepath.exists() {
146            fs::remove_file(&filepath)?;
147        }
148        Ok(())
149    }
150
151    /// Clean up stale entries (older than THRESHOLD_REMOVE)
152    /// Returns the number of entries removed
153    pub fn cleanup(&self) -> io::Result<usize> {
154        let _guard = discovery_storage_guard();
155        let mut removed = 0;
156        let now = time::now();
157
158        let interfaces = self.list_unlocked()?;
159        for iface in interfaces {
160            let invalid_reachable_on = iface
161                .reachable_on
162                .as_ref()
163                .map(|reachable_on| !(is_ip_address(reachable_on) || is_hostname(reachable_on)))
164                .unwrap_or(false);
165
166            if !is_discoverable_type(&iface.interface_type)
167                || invalid_reachable_on
168                || now - iface.last_heard > THRESHOLD_REMOVE
169            {
170                self.remove_unlocked(&iface.discovery_hash)?;
171                removed += 1;
172            }
173        }
174
175        Ok(removed)
176    }
177
178    /// Serialize an interface to msgpack
179    fn serialize_interface(&self, iface: &DiscoveredInterface) -> io::Result<Vec<u8>> {
180        let mut entries: Vec<(Value, Value)> = Vec::new();
181
182        entries.push((
183            Value::Str("type".into()),
184            Value::Str(iface.interface_type.clone()),
185        ));
186        entries.push((Value::Str("transport".into()), Value::Bool(iface.transport)));
187        entries.push((Value::Str("name".into()), Value::Str(iface.name.clone())));
188        entries.push((
189            Value::Str("discovered".into()),
190            Value::Float(iface.discovered),
191        ));
192        entries.push((
193            Value::Str("last_heard".into()),
194            Value::Float(iface.last_heard),
195        ));
196        entries.push((
197            Value::Str("heard_count".into()),
198            Value::UInt(iface.heard_count as u64),
199        ));
200        entries.push((
201            Value::Str("status".into()),
202            Value::Str(iface.status.as_str().into()),
203        ));
204        entries.push((Value::Str("stamp".into()), Value::Bin(iface.stamp.clone())));
205        entries.push((
206            Value::Str("value".into()),
207            Value::UInt(iface.stamp_value as u64),
208        ));
209        entries.push((
210            Value::Str("transport_id".into()),
211            Value::Bin(iface.transport_id.to_vec()),
212        ));
213        entries.push((
214            Value::Str("network_id".into()),
215            Value::Bin(iface.network_id.to_vec()),
216        ));
217        entries.push((Value::Str("hops".into()), Value::UInt(iface.hops as u64)));
218
219        if let Some(v) = iface.latitude {
220            entries.push((Value::Str("latitude".into()), Value::Float(v)));
221        }
222        if let Some(v) = iface.longitude {
223            entries.push((Value::Str("longitude".into()), Value::Float(v)));
224        }
225        if let Some(v) = iface.height {
226            entries.push((Value::Str("height".into()), Value::Float(v)));
227        }
228        if let Some(ref v) = iface.reachable_on {
229            entries.push((Value::Str("reachable_on".into()), Value::Str(v.clone())));
230        }
231        if let Some(v) = iface.port {
232            entries.push((Value::Str("port".into()), Value::UInt(v as u64)));
233        }
234        if let Some(v) = iface.frequency {
235            entries.push((Value::Str("frequency".into()), Value::UInt(v as u64)));
236        }
237        if let Some(v) = iface.bandwidth {
238            entries.push((Value::Str("bandwidth".into()), Value::UInt(v as u64)));
239        }
240        if let Some(v) = iface.spreading_factor {
241            entries.push((Value::Str("sf".into()), Value::UInt(v as u64)));
242        }
243        if let Some(v) = iface.coding_rate {
244            entries.push((Value::Str("cr".into()), Value::UInt(v as u64)));
245        }
246        if let Some(ref v) = iface.modulation {
247            entries.push((Value::Str("modulation".into()), Value::Str(v.clone())));
248        }
249        if let Some(v) = iface.channel {
250            entries.push((Value::Str("channel".into()), Value::UInt(v as u64)));
251        }
252        if let Some(ref v) = iface.ifac_netname {
253            entries.push((Value::Str("ifac_netname".into()), Value::Str(v.clone())));
254        }
255        if let Some(ref v) = iface.ifac_netkey {
256            entries.push((Value::Str("ifac_netkey".into()), Value::Str(v.clone())));
257        }
258        if let Some(ref v) = iface.config_entry {
259            entries.push((Value::Str("config_entry".into()), Value::Str(v.clone())));
260        }
261
262        entries.push((
263            Value::Str("discovery_hash".into()),
264            Value::Bin(iface.discovery_hash.to_vec()),
265        ));
266
267        Ok(msgpack::pack(&Value::Map(entries)))
268    }
269
270    /// Deserialize an interface from msgpack
271    fn deserialize_interface(&self, data: &[u8]) -> io::Result<DiscoveredInterface> {
272        let (value, _) = msgpack::unpack(data).map_err(|e| {
273            io::Error::new(io::ErrorKind::InvalidData, format!("msgpack error: {}", e))
274        })?;
275
276        // Helper functions using map_get
277        let get_str = |v: &Value, key: &str| -> io::Result<String> {
278            v.map_get(key)
279                .and_then(|val| val.as_str())
280                .map(|s| s.to_string())
281                .ok_or_else(|| {
282                    io::Error::new(io::ErrorKind::InvalidData, format!("{} not a string", key))
283                })
284        };
285
286        let get_opt_str = |v: &Value, key: &str| -> Option<String> {
287            v.map_get(key)
288                .and_then(|val| val.as_str().map(|s| s.to_string()))
289        };
290
291        let get_bool = |v: &Value, key: &str| -> io::Result<bool> {
292            v.map_get(key).and_then(|val| val.as_bool()).ok_or_else(|| {
293                io::Error::new(io::ErrorKind::InvalidData, format!("{} not a bool", key))
294            })
295        };
296
297        let get_float = |v: &Value, key: &str| -> io::Result<f64> {
298            v.map_get(key)
299                .and_then(|val| val.as_float())
300                .ok_or_else(|| {
301                    io::Error::new(io::ErrorKind::InvalidData, format!("{} not a float", key))
302                })
303        };
304
305        let get_opt_float =
306            |v: &Value, key: &str| -> Option<f64> { v.map_get(key).and_then(|val| val.as_float()) };
307
308        let get_uint = |v: &Value, key: &str| -> io::Result<u64> {
309            v.map_get(key).and_then(|val| val.as_uint()).ok_or_else(|| {
310                io::Error::new(io::ErrorKind::InvalidData, format!("{} not a uint", key))
311            })
312        };
313
314        let get_opt_uint =
315            |v: &Value, key: &str| -> Option<u64> { v.map_get(key).and_then(|val| val.as_uint()) };
316
317        let get_bytes = |v: &Value, key: &str| -> io::Result<Vec<u8>> {
318            v.map_get(key)
319                .and_then(|val| val.as_bin())
320                .map(|b| b.to_vec())
321                .ok_or_else(|| {
322                    io::Error::new(io::ErrorKind::InvalidData, format!("{} not bytes", key))
323                })
324        };
325
326        let fixed_bytes = |key: &str, expected_len: usize| -> io::Result<Vec<u8>> {
327            let bytes = get_bytes(&value, key)?;
328            if bytes.len() != expected_len {
329                return Err(io::Error::new(
330                    io::ErrorKind::InvalidData,
331                    format!("{} must be {} bytes", key, expected_len),
332                ));
333            }
334            Ok(bytes)
335        };
336
337        let transport_id_bytes = fixed_bytes("transport_id", 16)?;
338        let mut transport_id = [0u8; 16];
339        transport_id.copy_from_slice(&transport_id_bytes);
340
341        let network_id_bytes = fixed_bytes("network_id", 16)?;
342        let mut network_id = [0u8; 16];
343        network_id.copy_from_slice(&network_id_bytes);
344
345        let discovery_hash_bytes = fixed_bytes("discovery_hash", 32)?;
346        let mut discovery_hash = [0u8; 32];
347        discovery_hash.copy_from_slice(&discovery_hash_bytes);
348
349        let status_str = get_str(&value, "status")?;
350        let status = match status_str.as_str() {
351            "available" => DiscoveredStatus::Available,
352            "unknown" => DiscoveredStatus::Unknown,
353            "stale" => DiscoveredStatus::Stale,
354            _ => DiscoveredStatus::Unknown,
355        };
356
357        let interface_type = get_str(&value, "type")?;
358        let raw_name = get_str(&value, "name")?;
359        let name = sanitize_discovered_name(&raw_name)
360            .unwrap_or_else(|| format!("Discovered {}", interface_type));
361
362        Ok(DiscoveredInterface {
363            interface_type,
364            transport: get_bool(&value, "transport")?,
365            name,
366            discovered: get_float(&value, "discovered")?,
367            last_heard: get_float(&value, "last_heard")?,
368            heard_count: get_uint(&value, "heard_count")? as u32,
369            status,
370            stamp: get_bytes(&value, "stamp")?,
371            stamp_value: get_uint(&value, "value")? as u32,
372            transport_id,
373            network_id,
374            hops: get_uint(&value, "hops")? as u8,
375            latitude: get_opt_float(&value, "latitude"),
376            longitude: get_opt_float(&value, "longitude"),
377            height: get_opt_float(&value, "height"),
378            reachable_on: get_opt_str(&value, "reachable_on"),
379            port: get_opt_uint(&value, "port").map(|v| v as u16),
380            frequency: get_opt_uint(&value, "frequency").map(|v| v as u32),
381            bandwidth: get_opt_uint(&value, "bandwidth").map(|v| v as u32),
382            spreading_factor: get_opt_uint(&value, "sf").map(|v| v as u8),
383            coding_rate: get_opt_uint(&value, "cr").map(|v| v as u8),
384            modulation: get_opt_str(&value, "modulation"),
385            channel: get_opt_uint(&value, "channel").map(|v| v as u8),
386            ifac_netname: get_opt_str(&value, "ifac_netname"),
387            ifac_netkey: get_opt_str(&value, "ifac_netkey"),
388            config_entry: get_opt_str(&value, "config_entry"),
389            discovery_hash,
390        })
391    }
392}
393
394fn discovery_storage_guard() -> MutexGuard<'static, ()> {
395    match DISCOVERY_STORAGE_LOCK.lock() {
396        Ok(guard) => guard,
397        Err(poisoned) => {
398            log::error!("recovering from poisoned discovery storage lock");
399            poisoned.into_inner()
400        }
401    }
402}
403
404// ============================================================================
405// Stamp Generation (parallel PoW search)
406// ============================================================================
407
408/// Generate a discovery stamp with the given cost using rayon parallel iterators.
409///
410/// Returns `(stamp, value)` on success. This is a blocking, CPU-intensive operation.
411pub fn generate_discovery_stamp(packed_data: &[u8], stamp_cost: u8) -> ([u8; STAMP_SIZE], u32) {
412    use rns_crypto::{OsRng, Rng};
413    use std::sync::atomic::{AtomicBool, Ordering};
414    use std::sync::{Arc, Mutex};
415
416    let infohash = sha256(packed_data);
417    let workblock = stamp_workblock(&infohash, WORKBLOCK_EXPAND_ROUNDS);
418
419    let found: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
420    let result: Arc<Mutex<Option<[u8; STAMP_SIZE]>>> = Arc::new(Mutex::new(None));
421
422    let num_threads = rayon::current_num_threads();
423
424    rayon::scope(|s| {
425        for _ in 0..num_threads {
426            let found = found.clone();
427            let result = result.clone();
428            let workblock = &workblock;
429            s.spawn(move |_| {
430                let mut rng = OsRng;
431                let mut nonce = [0u8; STAMP_SIZE];
432                loop {
433                    if found.load(Ordering::Relaxed) {
434                        return;
435                    }
436                    rng.fill_bytes(&mut nonce);
437                    if stamp_valid(&nonce, stamp_cost, workblock) {
438                        let mut r = match result.lock() {
439                            Ok(guard) => guard,
440                            Err(poisoned) => {
441                                log::error!(
442                                    "recovering from poisoned discovery stamp result buffer"
443                                );
444                                poisoned.into_inner()
445                            }
446                        };
447                        if r.is_none() {
448                            *r = Some(nonce);
449                        }
450                        found.store(true, Ordering::Relaxed);
451                        return;
452                    }
453                }
454            });
455        }
456    });
457
458    let stamp = match result.lock() {
459        Ok(mut guard) => guard.take(),
460        Err(poisoned) => {
461            log::error!("recovering from poisoned discovery stamp result buffer");
462            poisoned.into_inner().take()
463        }
464    }
465    .unwrap_or_else(|| {
466        log::error!("parallel discovery stamp search returned no result; retrying synchronously");
467        let mut rng = OsRng;
468        let mut nonce = [0u8; STAMP_SIZE];
469        loop {
470            rng.fill_bytes(&mut nonce);
471            if stamp_valid(&nonce, stamp_cost, &workblock) {
472                return nonce;
473            }
474        }
475    });
476    let value = rns_core::stamp::stamp_value(&workblock, &stamp);
477    (stamp, value)
478}
479
480// ============================================================================
481// Interface Announcer
482// ============================================================================
483
484/// Info about a single discoverable interface, ready for announcing.
485#[derive(Debug, Clone)]
486pub struct DiscoverableInterface {
487    /// Configured interface name used for runtime targeting.
488    pub interface_name: String,
489    pub config: DiscoveryConfig,
490    /// Whether the node has transport enabled.
491    pub transport_enabled: bool,
492    /// IFAC network name, if configured.
493    pub ifac_netname: Option<String>,
494    /// IFAC passphrase, if configured.
495    pub ifac_netkey: Option<String>,
496}
497
498/// Result of a completed background stamp generation.
499pub struct StampResult {
500    /// Configured interface name this stamp was generated for.
501    pub interface_name: String,
502    /// The complete app_data: [flags][packed][stamp].
503    pub app_data: Vec<u8>,
504}
505
506/// Manages periodic announcing of discoverable interfaces.
507///
508/// Stamp generation (PoW) runs on a background thread so it never blocks the
509/// driver event loop.  The driver calls `poll_ready()` each tick to collect
510/// finished results.
511pub struct InterfaceAnnouncer {
512    /// Transport identity hash (16 bytes).
513    transport_id: [u8; 16],
514    /// Discoverable interfaces with their configs.
515    interfaces: Vec<DiscoverableInterface>,
516    /// Last announce time per interface (indexed same as `interfaces`).
517    last_announced: Vec<f64>,
518    /// Receiver for completed stamp results from background threads.
519    stamp_rx: std::sync::mpsc::Receiver<StampResult>,
520    /// Sender cloned into background threads.
521    stamp_tx: std::sync::mpsc::Sender<StampResult>,
522    /// Whether a background stamp job is currently running.
523    stamp_pending: bool,
524}
525
526impl InterfaceAnnouncer {
527    /// Create a new announcer.
528    pub fn new(transport_id: [u8; 16], interfaces: Vec<DiscoverableInterface>) -> Self {
529        let n = interfaces.len();
530        let (stamp_tx, stamp_rx) = std::sync::mpsc::channel();
531        InterfaceAnnouncer {
532            transport_id,
533            interfaces,
534            last_announced: vec![0.0; n],
535            stamp_rx,
536            stamp_tx,
537            stamp_pending: false,
538        }
539    }
540
541    /// If any interface is due for an announce and no stamp job is already
542    /// running, spawns a background thread for PoW.  The result will be
543    /// available via `poll_ready()`.
544    pub fn maybe_start(&mut self, now: f64) {
545        if self.stamp_pending {
546            return;
547        }
548        let due_index = self.interfaces.iter().enumerate().find_map(|(i, iface)| {
549            let elapsed = now - self.last_announced[i];
550            if elapsed >= iface.config.announce_interval as f64 {
551                Some(i)
552            } else {
553                None
554            }
555        });
556
557        if let Some(idx) = due_index {
558            let packed = self.pack_interface_info(idx);
559            let stamp_cost = self.interfaces[idx].config.stamp_value;
560            let name = self.interfaces[idx].config.discovery_name.clone();
561            let interface_name = self.interfaces[idx].interface_name.clone();
562            let tx = self.stamp_tx.clone();
563
564            log::info!(
565                "Spawning discovery stamp generation (cost={}) for '{}'...",
566                stamp_cost,
567                name,
568            );
569
570            self.stamp_pending = true;
571            self.last_announced[idx] = now;
572
573            std::thread::spawn(move || {
574                let (stamp, value) = generate_discovery_stamp(&packed, stamp_cost);
575                log::info!("Discovery stamp generated (value={}) for '{}'", value, name,);
576
577                let flags: u8 = 0x00; // no encryption
578                let mut app_data = Vec::with_capacity(1 + packed.len() + STAMP_SIZE);
579                app_data.push(flags);
580                app_data.extend_from_slice(&packed);
581                app_data.extend_from_slice(&stamp);
582
583                let _ = tx.send(StampResult {
584                    interface_name,
585                    app_data,
586                });
587            });
588        }
589    }
590
591    /// Non-blocking poll: returns completed app_data if a background stamp
592    /// job has finished.
593    pub fn poll_ready(&mut self) -> Option<StampResult> {
594        match self.stamp_rx.try_recv() {
595            Ok(result) => {
596                self.stamp_pending = false;
597                Some(result)
598            }
599            Err(_) => None,
600        }
601    }
602
603    /// Returns true if the announcer currently tracks a discoverable interface by name.
604    pub fn contains_interface(&self, interface_name: &str) -> bool {
605        self.interfaces
606            .iter()
607            .any(|iface| iface.interface_name == interface_name)
608    }
609
610    /// Insert or update a discoverable interface by configured name.
611    pub fn upsert_interface(&mut self, iface: DiscoverableInterface) {
612        if let Some(index) = self
613            .interfaces
614            .iter()
615            .position(|existing| existing.interface_name == iface.interface_name)
616        {
617            self.interfaces[index] = iface;
618            return;
619        }
620        self.interfaces.push(iface);
621        self.last_announced.push(0.0);
622    }
623
624    /// Remove a discoverable interface by configured name.
625    pub fn remove_interface(&mut self, interface_name: &str) -> bool {
626        if let Some(index) = self
627            .interfaces
628            .iter()
629            .position(|iface| iface.interface_name == interface_name)
630        {
631            self.interfaces.remove(index);
632            self.last_announced.remove(index);
633            true
634        } else {
635            false
636        }
637    }
638
639    /// Returns true if no discoverable interfaces remain.
640    pub fn is_empty(&self) -> bool {
641        self.interfaces.is_empty()
642    }
643
644    /// Pack interface metadata as msgpack map with integer keys.
645    fn pack_interface_info(&self, index: usize) -> Vec<u8> {
646        let iface = &self.interfaces[index];
647        let mut entries: Vec<(msgpack::Value, msgpack::Value)> = Vec::new();
648
649        entries.push((
650            msgpack::Value::UInt(INTERFACE_TYPE as u64),
651            msgpack::Value::Str(iface.config.interface_type.clone()),
652        ));
653        entries.push((
654            msgpack::Value::UInt(TRANSPORT as u64),
655            msgpack::Value::Bool(iface.transport_enabled),
656        ));
657        entries.push((
658            msgpack::Value::UInt(NAME as u64),
659            msgpack::Value::Str(iface.config.discovery_name.clone()),
660        ));
661        entries.push((
662            msgpack::Value::UInt(TRANSPORT_ID as u64),
663            msgpack::Value::Bin(self.transport_id.to_vec()),
664        ));
665        if let Some(ref reachable) = iface.config.reachable_on {
666            entries.push((
667                msgpack::Value::UInt(REACHABLE_ON as u64),
668                msgpack::Value::Str(reachable.clone()),
669            ));
670        }
671        if let Some(port) = iface.config.listen_port {
672            entries.push((
673                msgpack::Value::UInt(PORT as u64),
674                msgpack::Value::UInt(port as u64),
675            ));
676        }
677        if let Some(lat) = iface.config.latitude {
678            entries.push((
679                msgpack::Value::UInt(LATITUDE as u64),
680                msgpack::Value::Float(lat),
681            ));
682        }
683        if let Some(lon) = iface.config.longitude {
684            entries.push((
685                msgpack::Value::UInt(LONGITUDE as u64),
686                msgpack::Value::Float(lon),
687            ));
688        }
689        if let Some(h) = iface.config.height {
690            entries.push((
691                msgpack::Value::UInt(HEIGHT as u64),
692                msgpack::Value::Float(h),
693            ));
694        }
695        if let Some(ref netname) = iface.ifac_netname {
696            entries.push((
697                msgpack::Value::UInt(IFAC_NETNAME as u64),
698                msgpack::Value::Str(netname.clone()),
699            ));
700        }
701        if let Some(ref netkey) = iface.ifac_netkey {
702            entries.push((
703                msgpack::Value::UInt(IFAC_NETKEY as u64),
704                msgpack::Value::Str(netkey.clone()),
705            ));
706        }
707
708        msgpack::pack(&msgpack::Value::Map(entries))
709    }
710}
711
712// ============================================================================
713// Tests
714// ============================================================================
715
716#[cfg(test)]
717mod tests {
718    use super::*;
719
720    #[test]
721    fn test_hex_encode() {
722        assert_eq!(hex_encode(&[0x00, 0xff, 0x12]), "00ff12");
723        assert_eq!(hex_encode(&[]), "");
724    }
725
726    #[test]
727    fn test_compute_discovery_hash() {
728        let transport_id = [0x42u8; 16];
729        let name = "TestInterface";
730        let hash = compute_discovery_hash(&transport_id, name);
731
732        // Should be deterministic
733        let hash2 = compute_discovery_hash(&transport_id, name);
734        assert_eq!(hash, hash2);
735
736        // Different name should give different hash
737        let hash3 = compute_discovery_hash(&transport_id, "OtherInterface");
738        assert_ne!(hash, hash3);
739    }
740
741    #[test]
742    fn test_is_ip_address() {
743        assert!(is_ip_address("192.168.1.1"));
744        assert!(is_ip_address("::1"));
745        assert!(is_ip_address("2001:db8::1"));
746        assert!(!is_ip_address("not-an-ip"));
747        assert!(!is_ip_address("hostname.example.com"));
748    }
749
750    #[test]
751    fn test_is_hostname() {
752        assert!(is_hostname("example.com"));
753        assert!(is_hostname("sub.example.com"));
754        assert!(is_hostname("my-node"));
755        assert!(is_hostname("my-node.example.com"));
756        assert!(!is_hostname(""));
757        assert!(!is_hostname("-invalid"));
758        assert!(!is_hostname("invalid-"));
759        assert!(!is_hostname("a".repeat(300).as_str()));
760    }
761
762    #[test]
763    fn test_discovered_status() {
764        let now = time::now();
765
766        let mut iface = DiscoveredInterface {
767            interface_type: "TestInterface".into(),
768            transport: true,
769            name: "Test".into(),
770            discovered: now,
771            last_heard: now,
772            heard_count: 0,
773            status: DiscoveredStatus::Available,
774            stamp: vec![],
775            stamp_value: 14,
776            transport_id: [0u8; 16],
777            network_id: [0u8; 16],
778            hops: 0,
779            latitude: None,
780            longitude: None,
781            height: None,
782            reachable_on: None,
783            port: None,
784            frequency: None,
785            bandwidth: None,
786            spreading_factor: None,
787            coding_rate: None,
788            modulation: None,
789            channel: None,
790            ifac_netname: None,
791            ifac_netkey: None,
792            config_entry: None,
793            discovery_hash: [0u8; 32],
794        };
795
796        // Fresh interface should be available
797        assert_eq!(iface.compute_status(), DiscoveredStatus::Available);
798
799        // 25 hours old should be unknown
800        iface.last_heard = now - THRESHOLD_UNKNOWN - 3600.0;
801        assert_eq!(iface.compute_status(), DiscoveredStatus::Unknown);
802
803        // 4 days old should be stale
804        iface.last_heard = now - THRESHOLD_STALE - 3600.0;
805        assert_eq!(iface.compute_status(), DiscoveredStatus::Stale);
806    }
807
808    fn test_discovered_interface(name: &str) -> DiscoveredInterface {
809        DiscoveredInterface {
810            interface_type: "BackboneInterface".into(),
811            transport: true,
812            name: name.into(),
813            discovered: 1700000000.0,
814            last_heard: 1700001000.0,
815            heard_count: 5,
816            status: DiscoveredStatus::Available,
817            stamp: vec![0x42u8; 64],
818            stamp_value: 18,
819            transport_id: [0x01u8; 16],
820            network_id: [0x02u8; 16],
821            hops: 2,
822            latitude: Some(45.0),
823            longitude: Some(9.0),
824            height: Some(100.0),
825            reachable_on: Some("example.com".into()),
826            port: Some(4242),
827            frequency: None,
828            bandwidth: None,
829            spreading_factor: None,
830            coding_rate: None,
831            modulation: None,
832            channel: None,
833            ifac_netname: Some("mynetwork".into()),
834            ifac_netkey: Some("secretkey".into()),
835            config_entry: Some("test config".into()),
836            discovery_hash: compute_discovery_hash(&[0x01u8; 16], name),
837        }
838    }
839
840    #[test]
841    fn test_storage_roundtrip() {
842        use std::sync::atomic::{AtomicU64, Ordering};
843        static TEST_COUNTER: AtomicU64 = AtomicU64::new(0);
844
845        let id = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
846        let dir =
847            std::env::temp_dir().join(format!("rns-discovery-test-{}-{}", std::process::id(), id));
848        let _ = fs::remove_dir_all(&dir);
849        fs::create_dir_all(&dir).unwrap();
850
851        let storage = DiscoveredInterfaceStorage::new(dir.clone());
852
853        let iface = test_discovered_interface("TestNode");
854
855        // Store
856        storage.store(&iface).unwrap();
857
858        // Load
859        let loaded = storage.load(&iface.discovery_hash).unwrap().unwrap();
860
861        assert_eq!(loaded.interface_type, iface.interface_type);
862        assert_eq!(loaded.name, iface.name);
863        assert_eq!(loaded.stamp_value, iface.stamp_value);
864        assert_eq!(loaded.transport_id, iface.transport_id);
865        assert_eq!(loaded.hops, iface.hops);
866        assert_eq!(loaded.latitude, iface.latitude);
867        assert_eq!(loaded.reachable_on, iface.reachable_on);
868        assert_eq!(loaded.port, iface.port);
869
870        // List
871        let list = storage.list().unwrap();
872        assert_eq!(list.len(), 1);
873
874        // Remove
875        storage.remove(&iface.discovery_hash).unwrap();
876        let list = storage.list().unwrap();
877        assert!(list.is_empty());
878
879        let _ = fs::remove_dir_all(&dir);
880    }
881
882    #[test]
883    fn storage_load_sanitizes_cached_interface_names() {
884        let dir = std::env::temp_dir().join(format!(
885            "rns-discovery-sanitize-test-{}",
886            std::process::id()
887        ));
888        let _ = fs::remove_dir_all(&dir);
889        fs::create_dir_all(&dir).unwrap();
890        let storage = DiscoveredInterfaceStorage::new(dir.clone());
891        let iface = test_discovered_interface("\t**Cached     Name!!!\n");
892
893        storage.store(&iface).unwrap();
894
895        let loaded = storage.load(&iface.discovery_hash).unwrap().unwrap();
896        let listed = storage.list().unwrap();
897
898        assert_eq!(loaded.name, "Cached Name");
899        assert_eq!(listed[0].name, "Cached Name");
900
901        let _ = fs::remove_dir_all(&dir);
902    }
903
904    #[test]
905    fn storage_rejects_cached_transport_id_with_invalid_length() {
906        let storage = DiscoveredInterfaceStorage::new(std::env::temp_dir());
907        let iface = test_discovered_interface("BadTransportId");
908        let mut data = storage.serialize_interface(&iface).unwrap();
909        let (mut value, _) = msgpack::unpack(&data).unwrap();
910        if let Value::Map(ref mut entries) = value {
911            for (key, val) in entries {
912                if key.as_str() == Some("transport_id") {
913                    *val = Value::Bin(vec![0x01; 15]);
914                }
915            }
916        }
917        data = msgpack::pack(&value);
918
919        let err = storage.deserialize_interface(&data).unwrap_err();
920
921        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
922        assert!(err.to_string().contains("transport_id"));
923    }
924
925    #[test]
926    fn store_received_preserves_existing_first_seen_and_increments_heard_count() {
927        let dir = std::env::temp_dir().join(format!(
928            "rns-discovery-received-preserve-test-{}",
929            std::process::id()
930        ));
931        let _ = fs::remove_dir_all(&dir);
932        fs::create_dir_all(&dir).unwrap();
933        let storage = DiscoveredInterfaceStorage::new(dir.clone());
934
935        let mut existing = test_discovered_interface("ExistingDiscovery");
936        existing.discovered = 1000.0;
937        existing.last_heard = 1100.0;
938        existing.heard_count = 7;
939        storage.store(&existing).unwrap();
940
941        let mut received = existing.clone();
942        received.discovered = 2000.0;
943        received.last_heard = 3000.0;
944        received.heard_count = 0;
945        storage.store_received(&mut received).unwrap();
946
947        let loaded = storage.load(&received.discovery_hash).unwrap().unwrap();
948        assert_eq!(received.discovered, 1000.0);
949        assert_eq!(received.last_heard, 3000.0);
950        assert_eq!(received.heard_count, 8);
951        assert_eq!(loaded.discovered, 1000.0);
952        assert_eq!(loaded.last_heard, 3000.0);
953        assert_eq!(loaded.heard_count, 8);
954
955        let _ = fs::remove_dir_all(&dir);
956    }
957
958    #[test]
959    fn store_received_serializes_concurrent_counter_updates() {
960        use std::sync::{Arc, Barrier};
961        use std::thread;
962
963        let dir = std::env::temp_dir().join(format!(
964            "rns-discovery-concurrent-received-test-{}",
965            std::process::id()
966        ));
967        let _ = fs::remove_dir_all(&dir);
968        fs::create_dir_all(&dir).unwrap();
969        let storage = Arc::new(DiscoveredInterfaceStorage::new(dir.clone()));
970
971        let mut existing = test_discovered_interface("ConcurrentDiscovery");
972        existing.discovered = 1000.0;
973        existing.last_heard = 1000.0;
974        existing.heard_count = 0;
975        storage.store(&existing).unwrap();
976
977        let threads = 16;
978        let updates_per_thread = 25;
979        let barrier = Arc::new(Barrier::new(threads));
980        let mut handles = Vec::new();
981        for thread_id in 0..threads {
982            let storage = Arc::clone(&storage);
983            let barrier = Arc::clone(&barrier);
984            let template = existing.clone();
985            handles.push(thread::spawn(move || {
986                barrier.wait();
987                for update in 0..updates_per_thread {
988                    let mut received = template.clone();
989                    received.last_heard = 2000.0 + (thread_id * updates_per_thread + update) as f64;
990                    storage.store_received(&mut received).unwrap();
991                }
992            }));
993        }
994
995        for handle in handles {
996            handle.join().unwrap();
997        }
998
999        let loaded = storage.load(&existing.discovery_hash).unwrap().unwrap();
1000        assert_eq!(loaded.discovered, 1000.0);
1001        assert_eq!(loaded.heard_count as usize, threads * updates_per_thread);
1002
1003        let _ = fs::remove_dir_all(&dir);
1004    }
1005
1006    #[test]
1007    fn store_received_recreates_corrupt_cache_with_received_time_as_first_seen() {
1008        let dir = std::env::temp_dir().join(format!(
1009            "rns-discovery-corrupt-recreate-test-{}",
1010            std::process::id()
1011        ));
1012        let _ = fs::remove_dir_all(&dir);
1013        fs::create_dir_all(&dir).unwrap();
1014        let storage = DiscoveredInterfaceStorage::new(dir.clone());
1015
1016        let mut received = test_discovered_interface("CorruptDiscovery");
1017        received.discovered = 1234.0;
1018        received.last_heard = 5678.0;
1019        received.heard_count = 0;
1020        let filepath = dir.join(hex_encode(&received.discovery_hash));
1021        fs::write(&filepath, b"not msgpack").unwrap();
1022
1023        storage.store_received(&mut received).unwrap();
1024
1025        let loaded = storage.load(&received.discovery_hash).unwrap().unwrap();
1026        assert_eq!(received.discovered, 5678.0);
1027        assert_eq!(received.heard_count, 1);
1028        assert_eq!(loaded.discovered, 5678.0);
1029        assert_eq!(loaded.last_heard, 5678.0);
1030        assert_eq!(loaded.heard_count, 1);
1031        assert_eq!(loaded.name, "CorruptDiscovery");
1032
1033        let _ = fs::remove_dir_all(&dir);
1034    }
1035
1036    #[test]
1037    fn test_filter_and_sort() {
1038        let now = time::now();
1039
1040        let ifaces = vec![
1041            DiscoveredInterface {
1042                interface_type: "BackboneInterface".into(),
1043                transport: true,
1044                name: "high-value-stale".into(),
1045                discovered: now,
1046                last_heard: now - THRESHOLD_STALE - 100.0, // Stale
1047                heard_count: 0,
1048                status: DiscoveredStatus::Stale,
1049                stamp: vec![],
1050                stamp_value: 20,
1051                transport_id: [0u8; 16],
1052                network_id: [0u8; 16],
1053                hops: 0,
1054                latitude: None,
1055                longitude: None,
1056                height: None,
1057                reachable_on: None,
1058                port: None,
1059                frequency: None,
1060                bandwidth: None,
1061                spreading_factor: None,
1062                coding_rate: None,
1063                modulation: None,
1064                channel: None,
1065                ifac_netname: None,
1066                ifac_netkey: None,
1067                config_entry: None,
1068                discovery_hash: [0u8; 32],
1069            },
1070            DiscoveredInterface {
1071                interface_type: "TCPServerInterface".into(),
1072                transport: true,
1073                name: "low-value-available".into(),
1074                discovered: now,
1075                last_heard: now - 10.0, // Available
1076                heard_count: 0,
1077                status: DiscoveredStatus::Available,
1078                stamp: vec![],
1079                stamp_value: 10,
1080                transport_id: [0u8; 16],
1081                network_id: [0u8; 16],
1082                hops: 0,
1083                latitude: None,
1084                longitude: None,
1085                height: None,
1086                reachable_on: None,
1087                port: None,
1088                frequency: None,
1089                bandwidth: None,
1090                spreading_factor: None,
1091                coding_rate: None,
1092                modulation: None,
1093                channel: None,
1094                ifac_netname: None,
1095                ifac_netkey: None,
1096                config_entry: None,
1097                discovery_hash: [1u8; 32],
1098            },
1099            DiscoveredInterface {
1100                interface_type: "I2PInterface".into(),
1101                transport: false,
1102                name: "high-value-available".into(),
1103                discovered: now,
1104                last_heard: now - 10.0, // Available
1105                heard_count: 0,
1106                status: DiscoveredStatus::Available,
1107                stamp: vec![],
1108                stamp_value: 20,
1109                transport_id: [0u8; 16],
1110                network_id: [0u8; 16],
1111                hops: 0,
1112                latitude: None,
1113                longitude: None,
1114                height: None,
1115                reachable_on: None,
1116                port: None,
1117                frequency: None,
1118                bandwidth: None,
1119                spreading_factor: None,
1120                coding_rate: None,
1121                modulation: None,
1122                channel: None,
1123                ifac_netname: None,
1124                ifac_netkey: None,
1125                config_entry: None,
1126                discovery_hash: [2u8; 32],
1127            },
1128        ];
1129
1130        // Test no filter — all included, sorted by status then value
1131        let mut result = ifaces.clone();
1132        filter_and_sort_interfaces(&mut result, false, false);
1133        assert_eq!(result.len(), 3);
1134        // Available ones should come first (higher status code)
1135        assert_eq!(result[0].name, "high-value-available");
1136        assert_eq!(result[1].name, "low-value-available");
1137        assert_eq!(result[2].name, "high-value-stale");
1138
1139        // Test only_available filter
1140        let mut result = ifaces.clone();
1141        filter_and_sort_interfaces(&mut result, true, false);
1142        assert_eq!(result.len(), 2); // stale one filtered out
1143
1144        // Test only_transport filter
1145        let mut result = ifaces.clone();
1146        filter_and_sort_interfaces(&mut result, false, true);
1147        assert_eq!(result.len(), 2); // non-transport one filtered out
1148    }
1149
1150    #[test]
1151    fn test_discovery_name_hash_deterministic() {
1152        let h1 = discovery_name_hash();
1153        let h2 = discovery_name_hash();
1154        assert_eq!(h1, h2);
1155        assert_ne!(h1, [0u8; 10]); // not all zeros
1156    }
1157}