1pub use crate::common::discovery::*;
13
14use std::fs;
15use std::io;
16use std::path::PathBuf;
17use std::sync::{Mutex, MutexGuard};
18
19use rns_core::msgpack::{self, Value};
20use rns_core::stamp::{stamp_valid, stamp_workblock};
21use rns_crypto::sha256::sha256;
22
23use crate::time;
24
25static DISCOVERY_STORAGE_LOCK: Mutex<()> = Mutex::new(());
30
31pub struct DiscoveredInterfaceStorage {
33 base_path: PathBuf,
34}
35
36impl DiscoveredInterfaceStorage {
37 pub fn new(base_path: PathBuf) -> Self {
39 Self { base_path }
40 }
41
42 pub fn store(&self, iface: &DiscoveredInterface) -> io::Result<()> {
44 let _guard = discovery_storage_guard();
45 self.store_unlocked(iface)
46 }
47
48 fn store_unlocked(&self, iface: &DiscoveredInterface) -> io::Result<()> {
49 let filename = hex_encode(&iface.discovery_hash);
50 let filepath = self.base_path.join(filename);
51
52 let data = self.serialize_interface(iface)?;
53 fs::write(&filepath, &data)
54 }
55
56 pub fn store_received(&self, iface: &mut DiscoveredInterface) -> io::Result<()> {
58 let _guard = discovery_storage_guard();
59 match self.load_unlocked(&iface.discovery_hash) {
60 Ok(Some(existing)) => {
61 iface.discovered = existing.discovered;
62 iface.heard_count = existing.heard_count.saturating_add(1);
63 }
64 Ok(None) => {
65 iface.discovered = iface.last_heard;
66 iface.heard_count = 1;
67 }
68 Err(err) => {
69 log::error!(
70 "Error while reading existing data for discovered interface, re-creating data: {}",
71 err
72 );
73 iface.discovered = iface.last_heard;
74 iface.heard_count = 1;
75 }
76 }
77
78 self.store_unlocked(iface)
79 }
80
81 pub fn load(&self, discovery_hash: &[u8; 32]) -> io::Result<Option<DiscoveredInterface>> {
83 let _guard = discovery_storage_guard();
84 self.load_unlocked(discovery_hash)
85 }
86
87 fn load_unlocked(&self, discovery_hash: &[u8; 32]) -> io::Result<Option<DiscoveredInterface>> {
88 let filename = hex_encode(discovery_hash);
89 let filepath = self.base_path.join(filename);
90
91 if !filepath.exists() {
92 return Ok(None);
93 }
94
95 let data = fs::read(&filepath)?;
96 self.deserialize_interface(&data).map(Some)
97 }
98
99 pub fn list(&self) -> io::Result<Vec<DiscoveredInterface>> {
101 let _guard = discovery_storage_guard();
102 self.list_unlocked()
103 }
104
105 fn list_unlocked(&self) -> io::Result<Vec<DiscoveredInterface>> {
106 let mut interfaces = Vec::new();
107
108 let entries = match fs::read_dir(&self.base_path) {
109 Ok(e) => e,
110 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(interfaces),
111 Err(e) => return Err(e),
112 };
113
114 for entry in entries {
115 let entry = entry?;
116 let path = entry.path();
117
118 if !path.is_file() {
119 continue;
120 }
121
122 match fs::read(&path) {
123 Ok(data) => {
124 if let Ok(iface) = self.deserialize_interface(&data) {
125 interfaces.push(iface);
126 }
127 }
128 Err(_) => continue,
129 }
130 }
131
132 Ok(interfaces)
133 }
134
135 pub fn remove(&self, discovery_hash: &[u8; 32]) -> io::Result<()> {
137 let _guard = discovery_storage_guard();
138 self.remove_unlocked(discovery_hash)
139 }
140
141 fn remove_unlocked(&self, discovery_hash: &[u8; 32]) -> io::Result<()> {
142 let filename = hex_encode(discovery_hash);
143 let filepath = self.base_path.join(filename);
144
145 if filepath.exists() {
146 fs::remove_file(&filepath)?;
147 }
148 Ok(())
149 }
150
151 pub fn cleanup(&self) -> io::Result<usize> {
154 let _guard = discovery_storage_guard();
155 let mut removed = 0;
156 let now = time::now();
157
158 let interfaces = self.list_unlocked()?;
159 for iface in interfaces {
160 let invalid_reachable_on = iface
161 .reachable_on
162 .as_ref()
163 .map(|reachable_on| !(is_ip_address(reachable_on) || is_hostname(reachable_on)))
164 .unwrap_or(false);
165
166 if !is_discoverable_type(&iface.interface_type)
167 || invalid_reachable_on
168 || now - iface.last_heard > THRESHOLD_REMOVE
169 {
170 self.remove_unlocked(&iface.discovery_hash)?;
171 removed += 1;
172 }
173 }
174
175 Ok(removed)
176 }
177
178 fn serialize_interface(&self, iface: &DiscoveredInterface) -> io::Result<Vec<u8>> {
180 let mut entries: Vec<(Value, Value)> = Vec::new();
181
182 entries.push((
183 Value::Str("type".into()),
184 Value::Str(iface.interface_type.clone()),
185 ));
186 entries.push((Value::Str("transport".into()), Value::Bool(iface.transport)));
187 entries.push((Value::Str("name".into()), Value::Str(iface.name.clone())));
188 entries.push((
189 Value::Str("discovered".into()),
190 Value::Float(iface.discovered),
191 ));
192 entries.push((
193 Value::Str("last_heard".into()),
194 Value::Float(iface.last_heard),
195 ));
196 entries.push((
197 Value::Str("heard_count".into()),
198 Value::UInt(iface.heard_count as u64),
199 ));
200 entries.push((
201 Value::Str("status".into()),
202 Value::Str(iface.status.as_str().into()),
203 ));
204 entries.push((Value::Str("stamp".into()), Value::Bin(iface.stamp.clone())));
205 entries.push((
206 Value::Str("value".into()),
207 Value::UInt(iface.stamp_value as u64),
208 ));
209 entries.push((
210 Value::Str("transport_id".into()),
211 Value::Bin(iface.transport_id.to_vec()),
212 ));
213 entries.push((
214 Value::Str("network_id".into()),
215 Value::Bin(iface.network_id.to_vec()),
216 ));
217 entries.push((Value::Str("hops".into()), Value::UInt(iface.hops as u64)));
218
219 if let Some(v) = iface.latitude {
220 entries.push((Value::Str("latitude".into()), Value::Float(v)));
221 }
222 if let Some(v) = iface.longitude {
223 entries.push((Value::Str("longitude".into()), Value::Float(v)));
224 }
225 if let Some(v) = iface.height {
226 entries.push((Value::Str("height".into()), Value::Float(v)));
227 }
228 if let Some(ref v) = iface.reachable_on {
229 entries.push((Value::Str("reachable_on".into()), Value::Str(v.clone())));
230 }
231 if let Some(v) = iface.port {
232 entries.push((Value::Str("port".into()), Value::UInt(v as u64)));
233 }
234 if let Some(v) = iface.frequency {
235 entries.push((Value::Str("frequency".into()), Value::UInt(v as u64)));
236 }
237 if let Some(v) = iface.bandwidth {
238 entries.push((Value::Str("bandwidth".into()), Value::UInt(v as u64)));
239 }
240 if let Some(v) = iface.spreading_factor {
241 entries.push((Value::Str("sf".into()), Value::UInt(v as u64)));
242 }
243 if let Some(v) = iface.coding_rate {
244 entries.push((Value::Str("cr".into()), Value::UInt(v as u64)));
245 }
246 if let Some(ref v) = iface.modulation {
247 entries.push((Value::Str("modulation".into()), Value::Str(v.clone())));
248 }
249 if let Some(v) = iface.channel {
250 entries.push((Value::Str("channel".into()), Value::UInt(v as u64)));
251 }
252 if let Some(ref v) = iface.ifac_netname {
253 entries.push((Value::Str("ifac_netname".into()), Value::Str(v.clone())));
254 }
255 if let Some(ref v) = iface.ifac_netkey {
256 entries.push((Value::Str("ifac_netkey".into()), Value::Str(v.clone())));
257 }
258 if let Some(ref v) = iface.config_entry {
259 entries.push((Value::Str("config_entry".into()), Value::Str(v.clone())));
260 }
261
262 entries.push((
263 Value::Str("discovery_hash".into()),
264 Value::Bin(iface.discovery_hash.to_vec()),
265 ));
266
267 Ok(msgpack::pack(&Value::Map(entries)))
268 }
269
270 fn deserialize_interface(&self, data: &[u8]) -> io::Result<DiscoveredInterface> {
272 let (value, _) = msgpack::unpack(data).map_err(|e| {
273 io::Error::new(io::ErrorKind::InvalidData, format!("msgpack error: {}", e))
274 })?;
275
276 let get_str = |v: &Value, key: &str| -> io::Result<String> {
278 v.map_get(key)
279 .and_then(|val| val.as_str())
280 .map(|s| s.to_string())
281 .ok_or_else(|| {
282 io::Error::new(io::ErrorKind::InvalidData, format!("{} not a string", key))
283 })
284 };
285
286 let get_opt_str = |v: &Value, key: &str| -> Option<String> {
287 v.map_get(key)
288 .and_then(|val| val.as_str().map(|s| s.to_string()))
289 };
290
291 let get_bool = |v: &Value, key: &str| -> io::Result<bool> {
292 v.map_get(key).and_then(|val| val.as_bool()).ok_or_else(|| {
293 io::Error::new(io::ErrorKind::InvalidData, format!("{} not a bool", key))
294 })
295 };
296
297 let get_float = |v: &Value, key: &str| -> io::Result<f64> {
298 v.map_get(key)
299 .and_then(|val| val.as_float())
300 .ok_or_else(|| {
301 io::Error::new(io::ErrorKind::InvalidData, format!("{} not a float", key))
302 })
303 };
304
305 let get_opt_float =
306 |v: &Value, key: &str| -> Option<f64> { v.map_get(key).and_then(|val| val.as_float()) };
307
308 let get_uint = |v: &Value, key: &str| -> io::Result<u64> {
309 v.map_get(key).and_then(|val| val.as_uint()).ok_or_else(|| {
310 io::Error::new(io::ErrorKind::InvalidData, format!("{} not a uint", key))
311 })
312 };
313
314 let get_opt_uint =
315 |v: &Value, key: &str| -> Option<u64> { v.map_get(key).and_then(|val| val.as_uint()) };
316
317 let get_bytes = |v: &Value, key: &str| -> io::Result<Vec<u8>> {
318 v.map_get(key)
319 .and_then(|val| val.as_bin())
320 .map(|b| b.to_vec())
321 .ok_or_else(|| {
322 io::Error::new(io::ErrorKind::InvalidData, format!("{} not bytes", key))
323 })
324 };
325
326 let fixed_bytes = |key: &str, expected_len: usize| -> io::Result<Vec<u8>> {
327 let bytes = get_bytes(&value, key)?;
328 if bytes.len() != expected_len {
329 return Err(io::Error::new(
330 io::ErrorKind::InvalidData,
331 format!("{} must be {} bytes", key, expected_len),
332 ));
333 }
334 Ok(bytes)
335 };
336
337 let transport_id_bytes = fixed_bytes("transport_id", 16)?;
338 let mut transport_id = [0u8; 16];
339 transport_id.copy_from_slice(&transport_id_bytes);
340
341 let network_id_bytes = fixed_bytes("network_id", 16)?;
342 let mut network_id = [0u8; 16];
343 network_id.copy_from_slice(&network_id_bytes);
344
345 let discovery_hash_bytes = fixed_bytes("discovery_hash", 32)?;
346 let mut discovery_hash = [0u8; 32];
347 discovery_hash.copy_from_slice(&discovery_hash_bytes);
348
349 let status_str = get_str(&value, "status")?;
350 let status = match status_str.as_str() {
351 "available" => DiscoveredStatus::Available,
352 "unknown" => DiscoveredStatus::Unknown,
353 "stale" => DiscoveredStatus::Stale,
354 _ => DiscoveredStatus::Unknown,
355 };
356
357 let interface_type = get_str(&value, "type")?;
358 let raw_name = get_str(&value, "name")?;
359 let name = sanitize_discovered_name(&raw_name)
360 .unwrap_or_else(|| format!("Discovered {}", interface_type));
361
362 Ok(DiscoveredInterface {
363 interface_type,
364 transport: get_bool(&value, "transport")?,
365 name,
366 discovered: get_float(&value, "discovered")?,
367 last_heard: get_float(&value, "last_heard")?,
368 heard_count: get_uint(&value, "heard_count")? as u32,
369 status,
370 stamp: get_bytes(&value, "stamp")?,
371 stamp_value: get_uint(&value, "value")? as u32,
372 transport_id,
373 network_id,
374 hops: get_uint(&value, "hops")? as u8,
375 latitude: get_opt_float(&value, "latitude"),
376 longitude: get_opt_float(&value, "longitude"),
377 height: get_opt_float(&value, "height"),
378 reachable_on: get_opt_str(&value, "reachable_on"),
379 port: get_opt_uint(&value, "port").map(|v| v as u16),
380 frequency: get_opt_uint(&value, "frequency").map(|v| v as u32),
381 bandwidth: get_opt_uint(&value, "bandwidth").map(|v| v as u32),
382 spreading_factor: get_opt_uint(&value, "sf").map(|v| v as u8),
383 coding_rate: get_opt_uint(&value, "cr").map(|v| v as u8),
384 modulation: get_opt_str(&value, "modulation"),
385 channel: get_opt_uint(&value, "channel").map(|v| v as u8),
386 ifac_netname: get_opt_str(&value, "ifac_netname"),
387 ifac_netkey: get_opt_str(&value, "ifac_netkey"),
388 config_entry: get_opt_str(&value, "config_entry"),
389 discovery_hash,
390 })
391 }
392}
393
394fn discovery_storage_guard() -> MutexGuard<'static, ()> {
395 match DISCOVERY_STORAGE_LOCK.lock() {
396 Ok(guard) => guard,
397 Err(poisoned) => {
398 log::error!("recovering from poisoned discovery storage lock");
399 poisoned.into_inner()
400 }
401 }
402}
403
404pub fn generate_discovery_stamp(packed_data: &[u8], stamp_cost: u8) -> ([u8; STAMP_SIZE], u32) {
412 use rns_crypto::{OsRng, Rng};
413 use std::sync::atomic::{AtomicBool, Ordering};
414 use std::sync::{Arc, Mutex};
415
416 let infohash = sha256(packed_data);
417 let workblock = stamp_workblock(&infohash, WORKBLOCK_EXPAND_ROUNDS);
418
419 let found: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
420 let result: Arc<Mutex<Option<[u8; STAMP_SIZE]>>> = Arc::new(Mutex::new(None));
421
422 let num_threads = rayon::current_num_threads();
423
424 rayon::scope(|s| {
425 for _ in 0..num_threads {
426 let found = found.clone();
427 let result = result.clone();
428 let workblock = &workblock;
429 s.spawn(move |_| {
430 let mut rng = OsRng;
431 let mut nonce = [0u8; STAMP_SIZE];
432 loop {
433 if found.load(Ordering::Relaxed) {
434 return;
435 }
436 rng.fill_bytes(&mut nonce);
437 if stamp_valid(&nonce, stamp_cost, workblock) {
438 let mut r = match result.lock() {
439 Ok(guard) => guard,
440 Err(poisoned) => {
441 log::error!(
442 "recovering from poisoned discovery stamp result buffer"
443 );
444 poisoned.into_inner()
445 }
446 };
447 if r.is_none() {
448 *r = Some(nonce);
449 }
450 found.store(true, Ordering::Relaxed);
451 return;
452 }
453 }
454 });
455 }
456 });
457
458 let stamp = match result.lock() {
459 Ok(mut guard) => guard.take(),
460 Err(poisoned) => {
461 log::error!("recovering from poisoned discovery stamp result buffer");
462 poisoned.into_inner().take()
463 }
464 }
465 .unwrap_or_else(|| {
466 log::error!("parallel discovery stamp search returned no result; retrying synchronously");
467 let mut rng = OsRng;
468 let mut nonce = [0u8; STAMP_SIZE];
469 loop {
470 rng.fill_bytes(&mut nonce);
471 if stamp_valid(&nonce, stamp_cost, &workblock) {
472 return nonce;
473 }
474 }
475 });
476 let value = rns_core::stamp::stamp_value(&workblock, &stamp);
477 (stamp, value)
478}
479
480#[derive(Debug, Clone)]
486pub struct DiscoverableInterface {
487 pub interface_name: String,
489 pub config: DiscoveryConfig,
490 pub transport_enabled: bool,
492 pub ifac_netname: Option<String>,
494 pub ifac_netkey: Option<String>,
496}
497
498pub struct StampResult {
500 pub interface_name: String,
502 pub app_data: Vec<u8>,
504}
505
506pub struct InterfaceAnnouncer {
512 transport_id: [u8; 16],
514 interfaces: Vec<DiscoverableInterface>,
516 last_announced: Vec<f64>,
518 stamp_rx: std::sync::mpsc::Receiver<StampResult>,
520 stamp_tx: std::sync::mpsc::Sender<StampResult>,
522 stamp_pending: bool,
524}
525
526impl InterfaceAnnouncer {
527 pub fn new(transport_id: [u8; 16], interfaces: Vec<DiscoverableInterface>) -> Self {
529 let n = interfaces.len();
530 let (stamp_tx, stamp_rx) = std::sync::mpsc::channel();
531 InterfaceAnnouncer {
532 transport_id,
533 interfaces,
534 last_announced: vec![0.0; n],
535 stamp_rx,
536 stamp_tx,
537 stamp_pending: false,
538 }
539 }
540
541 pub fn maybe_start(&mut self, now: f64) {
545 if self.stamp_pending {
546 return;
547 }
548 let due_index = self.interfaces.iter().enumerate().find_map(|(i, iface)| {
549 let elapsed = now - self.last_announced[i];
550 if elapsed >= iface.config.announce_interval as f64 {
551 Some(i)
552 } else {
553 None
554 }
555 });
556
557 if let Some(idx) = due_index {
558 let packed = self.pack_interface_info(idx);
559 let stamp_cost = self.interfaces[idx].config.stamp_value;
560 let name = self.interfaces[idx].config.discovery_name.clone();
561 let interface_name = self.interfaces[idx].interface_name.clone();
562 let tx = self.stamp_tx.clone();
563
564 log::info!(
565 "Spawning discovery stamp generation (cost={}) for '{}'...",
566 stamp_cost,
567 name,
568 );
569
570 self.stamp_pending = true;
571 self.last_announced[idx] = now;
572
573 std::thread::spawn(move || {
574 let (stamp, value) = generate_discovery_stamp(&packed, stamp_cost);
575 log::info!("Discovery stamp generated (value={}) for '{}'", value, name,);
576
577 let flags: u8 = 0x00; let mut app_data = Vec::with_capacity(1 + packed.len() + STAMP_SIZE);
579 app_data.push(flags);
580 app_data.extend_from_slice(&packed);
581 app_data.extend_from_slice(&stamp);
582
583 let _ = tx.send(StampResult {
584 interface_name,
585 app_data,
586 });
587 });
588 }
589 }
590
591 pub fn poll_ready(&mut self) -> Option<StampResult> {
594 match self.stamp_rx.try_recv() {
595 Ok(result) => {
596 self.stamp_pending = false;
597 Some(result)
598 }
599 Err(_) => None,
600 }
601 }
602
603 pub fn contains_interface(&self, interface_name: &str) -> bool {
605 self.interfaces
606 .iter()
607 .any(|iface| iface.interface_name == interface_name)
608 }
609
610 pub fn upsert_interface(&mut self, iface: DiscoverableInterface) {
612 if let Some(index) = self
613 .interfaces
614 .iter()
615 .position(|existing| existing.interface_name == iface.interface_name)
616 {
617 self.interfaces[index] = iface;
618 return;
619 }
620 self.interfaces.push(iface);
621 self.last_announced.push(0.0);
622 }
623
624 pub fn remove_interface(&mut self, interface_name: &str) -> bool {
626 if let Some(index) = self
627 .interfaces
628 .iter()
629 .position(|iface| iface.interface_name == interface_name)
630 {
631 self.interfaces.remove(index);
632 self.last_announced.remove(index);
633 true
634 } else {
635 false
636 }
637 }
638
639 pub fn is_empty(&self) -> bool {
641 self.interfaces.is_empty()
642 }
643
644 fn pack_interface_info(&self, index: usize) -> Vec<u8> {
646 let iface = &self.interfaces[index];
647 let mut entries: Vec<(msgpack::Value, msgpack::Value)> = Vec::new();
648
649 entries.push((
650 msgpack::Value::UInt(INTERFACE_TYPE as u64),
651 msgpack::Value::Str(iface.config.interface_type.clone()),
652 ));
653 entries.push((
654 msgpack::Value::UInt(TRANSPORT as u64),
655 msgpack::Value::Bool(iface.transport_enabled),
656 ));
657 entries.push((
658 msgpack::Value::UInt(NAME as u64),
659 msgpack::Value::Str(iface.config.discovery_name.clone()),
660 ));
661 entries.push((
662 msgpack::Value::UInt(TRANSPORT_ID as u64),
663 msgpack::Value::Bin(self.transport_id.to_vec()),
664 ));
665 if let Some(ref reachable) = iface.config.reachable_on {
666 entries.push((
667 msgpack::Value::UInt(REACHABLE_ON as u64),
668 msgpack::Value::Str(reachable.clone()),
669 ));
670 }
671 if let Some(port) = iface.config.listen_port {
672 entries.push((
673 msgpack::Value::UInt(PORT as u64),
674 msgpack::Value::UInt(port as u64),
675 ));
676 }
677 if let Some(lat) = iface.config.latitude {
678 entries.push((
679 msgpack::Value::UInt(LATITUDE as u64),
680 msgpack::Value::Float(lat),
681 ));
682 }
683 if let Some(lon) = iface.config.longitude {
684 entries.push((
685 msgpack::Value::UInt(LONGITUDE as u64),
686 msgpack::Value::Float(lon),
687 ));
688 }
689 if let Some(h) = iface.config.height {
690 entries.push((
691 msgpack::Value::UInt(HEIGHT as u64),
692 msgpack::Value::Float(h),
693 ));
694 }
695 if let Some(ref netname) = iface.ifac_netname {
696 entries.push((
697 msgpack::Value::UInt(IFAC_NETNAME as u64),
698 msgpack::Value::Str(netname.clone()),
699 ));
700 }
701 if let Some(ref netkey) = iface.ifac_netkey {
702 entries.push((
703 msgpack::Value::UInt(IFAC_NETKEY as u64),
704 msgpack::Value::Str(netkey.clone()),
705 ));
706 }
707
708 msgpack::pack(&msgpack::Value::Map(entries))
709 }
710}
711
712#[cfg(test)]
717mod tests {
718 use super::*;
719
720 #[test]
721 fn test_hex_encode() {
722 assert_eq!(hex_encode(&[0x00, 0xff, 0x12]), "00ff12");
723 assert_eq!(hex_encode(&[]), "");
724 }
725
726 #[test]
727 fn test_compute_discovery_hash() {
728 let transport_id = [0x42u8; 16];
729 let name = "TestInterface";
730 let hash = compute_discovery_hash(&transport_id, name);
731
732 let hash2 = compute_discovery_hash(&transport_id, name);
734 assert_eq!(hash, hash2);
735
736 let hash3 = compute_discovery_hash(&transport_id, "OtherInterface");
738 assert_ne!(hash, hash3);
739 }
740
741 #[test]
742 fn test_is_ip_address() {
743 assert!(is_ip_address("192.168.1.1"));
744 assert!(is_ip_address("::1"));
745 assert!(is_ip_address("2001:db8::1"));
746 assert!(!is_ip_address("not-an-ip"));
747 assert!(!is_ip_address("hostname.example.com"));
748 }
749
750 #[test]
751 fn test_is_hostname() {
752 assert!(is_hostname("example.com"));
753 assert!(is_hostname("sub.example.com"));
754 assert!(is_hostname("my-node"));
755 assert!(is_hostname("my-node.example.com"));
756 assert!(!is_hostname(""));
757 assert!(!is_hostname("-invalid"));
758 assert!(!is_hostname("invalid-"));
759 assert!(!is_hostname("a".repeat(300).as_str()));
760 }
761
762 #[test]
763 fn test_discovered_status() {
764 let now = time::now();
765
766 let mut iface = DiscoveredInterface {
767 interface_type: "TestInterface".into(),
768 transport: true,
769 name: "Test".into(),
770 discovered: now,
771 last_heard: now,
772 heard_count: 0,
773 status: DiscoveredStatus::Available,
774 stamp: vec![],
775 stamp_value: 14,
776 transport_id: [0u8; 16],
777 network_id: [0u8; 16],
778 hops: 0,
779 latitude: None,
780 longitude: None,
781 height: None,
782 reachable_on: None,
783 port: None,
784 frequency: None,
785 bandwidth: None,
786 spreading_factor: None,
787 coding_rate: None,
788 modulation: None,
789 channel: None,
790 ifac_netname: None,
791 ifac_netkey: None,
792 config_entry: None,
793 discovery_hash: [0u8; 32],
794 };
795
796 assert_eq!(iface.compute_status(), DiscoveredStatus::Available);
798
799 iface.last_heard = now - THRESHOLD_UNKNOWN - 3600.0;
801 assert_eq!(iface.compute_status(), DiscoveredStatus::Unknown);
802
803 iface.last_heard = now - THRESHOLD_STALE - 3600.0;
805 assert_eq!(iface.compute_status(), DiscoveredStatus::Stale);
806 }
807
808 fn test_discovered_interface(name: &str) -> DiscoveredInterface {
809 DiscoveredInterface {
810 interface_type: "BackboneInterface".into(),
811 transport: true,
812 name: name.into(),
813 discovered: 1700000000.0,
814 last_heard: 1700001000.0,
815 heard_count: 5,
816 status: DiscoveredStatus::Available,
817 stamp: vec![0x42u8; 64],
818 stamp_value: 18,
819 transport_id: [0x01u8; 16],
820 network_id: [0x02u8; 16],
821 hops: 2,
822 latitude: Some(45.0),
823 longitude: Some(9.0),
824 height: Some(100.0),
825 reachable_on: Some("example.com".into()),
826 port: Some(4242),
827 frequency: None,
828 bandwidth: None,
829 spreading_factor: None,
830 coding_rate: None,
831 modulation: None,
832 channel: None,
833 ifac_netname: Some("mynetwork".into()),
834 ifac_netkey: Some("secretkey".into()),
835 config_entry: Some("test config".into()),
836 discovery_hash: compute_discovery_hash(&[0x01u8; 16], name),
837 }
838 }
839
840 #[test]
841 fn test_storage_roundtrip() {
842 use std::sync::atomic::{AtomicU64, Ordering};
843 static TEST_COUNTER: AtomicU64 = AtomicU64::new(0);
844
845 let id = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
846 let dir =
847 std::env::temp_dir().join(format!("rns-discovery-test-{}-{}", std::process::id(), id));
848 let _ = fs::remove_dir_all(&dir);
849 fs::create_dir_all(&dir).unwrap();
850
851 let storage = DiscoveredInterfaceStorage::new(dir.clone());
852
853 let iface = test_discovered_interface("TestNode");
854
855 storage.store(&iface).unwrap();
857
858 let loaded = storage.load(&iface.discovery_hash).unwrap().unwrap();
860
861 assert_eq!(loaded.interface_type, iface.interface_type);
862 assert_eq!(loaded.name, iface.name);
863 assert_eq!(loaded.stamp_value, iface.stamp_value);
864 assert_eq!(loaded.transport_id, iface.transport_id);
865 assert_eq!(loaded.hops, iface.hops);
866 assert_eq!(loaded.latitude, iface.latitude);
867 assert_eq!(loaded.reachable_on, iface.reachable_on);
868 assert_eq!(loaded.port, iface.port);
869
870 let list = storage.list().unwrap();
872 assert_eq!(list.len(), 1);
873
874 storage.remove(&iface.discovery_hash).unwrap();
876 let list = storage.list().unwrap();
877 assert!(list.is_empty());
878
879 let _ = fs::remove_dir_all(&dir);
880 }
881
882 #[test]
883 fn storage_load_sanitizes_cached_interface_names() {
884 let dir = std::env::temp_dir().join(format!(
885 "rns-discovery-sanitize-test-{}",
886 std::process::id()
887 ));
888 let _ = fs::remove_dir_all(&dir);
889 fs::create_dir_all(&dir).unwrap();
890 let storage = DiscoveredInterfaceStorage::new(dir.clone());
891 let iface = test_discovered_interface("\t**Cached Name!!!\n");
892
893 storage.store(&iface).unwrap();
894
895 let loaded = storage.load(&iface.discovery_hash).unwrap().unwrap();
896 let listed = storage.list().unwrap();
897
898 assert_eq!(loaded.name, "Cached Name");
899 assert_eq!(listed[0].name, "Cached Name");
900
901 let _ = fs::remove_dir_all(&dir);
902 }
903
904 #[test]
905 fn storage_rejects_cached_transport_id_with_invalid_length() {
906 let storage = DiscoveredInterfaceStorage::new(std::env::temp_dir());
907 let iface = test_discovered_interface("BadTransportId");
908 let mut data = storage.serialize_interface(&iface).unwrap();
909 let (mut value, _) = msgpack::unpack(&data).unwrap();
910 if let Value::Map(ref mut entries) = value {
911 for (key, val) in entries {
912 if key.as_str() == Some("transport_id") {
913 *val = Value::Bin(vec![0x01; 15]);
914 }
915 }
916 }
917 data = msgpack::pack(&value);
918
919 let err = storage.deserialize_interface(&data).unwrap_err();
920
921 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
922 assert!(err.to_string().contains("transport_id"));
923 }
924
925 #[test]
926 fn store_received_preserves_existing_first_seen_and_increments_heard_count() {
927 let dir = std::env::temp_dir().join(format!(
928 "rns-discovery-received-preserve-test-{}",
929 std::process::id()
930 ));
931 let _ = fs::remove_dir_all(&dir);
932 fs::create_dir_all(&dir).unwrap();
933 let storage = DiscoveredInterfaceStorage::new(dir.clone());
934
935 let mut existing = test_discovered_interface("ExistingDiscovery");
936 existing.discovered = 1000.0;
937 existing.last_heard = 1100.0;
938 existing.heard_count = 7;
939 storage.store(&existing).unwrap();
940
941 let mut received = existing.clone();
942 received.discovered = 2000.0;
943 received.last_heard = 3000.0;
944 received.heard_count = 0;
945 storage.store_received(&mut received).unwrap();
946
947 let loaded = storage.load(&received.discovery_hash).unwrap().unwrap();
948 assert_eq!(received.discovered, 1000.0);
949 assert_eq!(received.last_heard, 3000.0);
950 assert_eq!(received.heard_count, 8);
951 assert_eq!(loaded.discovered, 1000.0);
952 assert_eq!(loaded.last_heard, 3000.0);
953 assert_eq!(loaded.heard_count, 8);
954
955 let _ = fs::remove_dir_all(&dir);
956 }
957
958 #[test]
959 fn store_received_serializes_concurrent_counter_updates() {
960 use std::sync::{Arc, Barrier};
961 use std::thread;
962
963 let dir = std::env::temp_dir().join(format!(
964 "rns-discovery-concurrent-received-test-{}",
965 std::process::id()
966 ));
967 let _ = fs::remove_dir_all(&dir);
968 fs::create_dir_all(&dir).unwrap();
969 let storage = Arc::new(DiscoveredInterfaceStorage::new(dir.clone()));
970
971 let mut existing = test_discovered_interface("ConcurrentDiscovery");
972 existing.discovered = 1000.0;
973 existing.last_heard = 1000.0;
974 existing.heard_count = 0;
975 storage.store(&existing).unwrap();
976
977 let threads = 16;
978 let updates_per_thread = 25;
979 let barrier = Arc::new(Barrier::new(threads));
980 let mut handles = Vec::new();
981 for thread_id in 0..threads {
982 let storage = Arc::clone(&storage);
983 let barrier = Arc::clone(&barrier);
984 let template = existing.clone();
985 handles.push(thread::spawn(move || {
986 barrier.wait();
987 for update in 0..updates_per_thread {
988 let mut received = template.clone();
989 received.last_heard = 2000.0 + (thread_id * updates_per_thread + update) as f64;
990 storage.store_received(&mut received).unwrap();
991 }
992 }));
993 }
994
995 for handle in handles {
996 handle.join().unwrap();
997 }
998
999 let loaded = storage.load(&existing.discovery_hash).unwrap().unwrap();
1000 assert_eq!(loaded.discovered, 1000.0);
1001 assert_eq!(loaded.heard_count as usize, threads * updates_per_thread);
1002
1003 let _ = fs::remove_dir_all(&dir);
1004 }
1005
1006 #[test]
1007 fn store_received_recreates_corrupt_cache_with_received_time_as_first_seen() {
1008 let dir = std::env::temp_dir().join(format!(
1009 "rns-discovery-corrupt-recreate-test-{}",
1010 std::process::id()
1011 ));
1012 let _ = fs::remove_dir_all(&dir);
1013 fs::create_dir_all(&dir).unwrap();
1014 let storage = DiscoveredInterfaceStorage::new(dir.clone());
1015
1016 let mut received = test_discovered_interface("CorruptDiscovery");
1017 received.discovered = 1234.0;
1018 received.last_heard = 5678.0;
1019 received.heard_count = 0;
1020 let filepath = dir.join(hex_encode(&received.discovery_hash));
1021 fs::write(&filepath, b"not msgpack").unwrap();
1022
1023 storage.store_received(&mut received).unwrap();
1024
1025 let loaded = storage.load(&received.discovery_hash).unwrap().unwrap();
1026 assert_eq!(received.discovered, 5678.0);
1027 assert_eq!(received.heard_count, 1);
1028 assert_eq!(loaded.discovered, 5678.0);
1029 assert_eq!(loaded.last_heard, 5678.0);
1030 assert_eq!(loaded.heard_count, 1);
1031 assert_eq!(loaded.name, "CorruptDiscovery");
1032
1033 let _ = fs::remove_dir_all(&dir);
1034 }
1035
1036 #[test]
1037 fn test_filter_and_sort() {
1038 let now = time::now();
1039
1040 let ifaces = vec![
1041 DiscoveredInterface {
1042 interface_type: "BackboneInterface".into(),
1043 transport: true,
1044 name: "high-value-stale".into(),
1045 discovered: now,
1046 last_heard: now - THRESHOLD_STALE - 100.0, heard_count: 0,
1048 status: DiscoveredStatus::Stale,
1049 stamp: vec![],
1050 stamp_value: 20,
1051 transport_id: [0u8; 16],
1052 network_id: [0u8; 16],
1053 hops: 0,
1054 latitude: None,
1055 longitude: None,
1056 height: None,
1057 reachable_on: None,
1058 port: None,
1059 frequency: None,
1060 bandwidth: None,
1061 spreading_factor: None,
1062 coding_rate: None,
1063 modulation: None,
1064 channel: None,
1065 ifac_netname: None,
1066 ifac_netkey: None,
1067 config_entry: None,
1068 discovery_hash: [0u8; 32],
1069 },
1070 DiscoveredInterface {
1071 interface_type: "TCPServerInterface".into(),
1072 transport: true,
1073 name: "low-value-available".into(),
1074 discovered: now,
1075 last_heard: now - 10.0, heard_count: 0,
1077 status: DiscoveredStatus::Available,
1078 stamp: vec![],
1079 stamp_value: 10,
1080 transport_id: [0u8; 16],
1081 network_id: [0u8; 16],
1082 hops: 0,
1083 latitude: None,
1084 longitude: None,
1085 height: None,
1086 reachable_on: None,
1087 port: None,
1088 frequency: None,
1089 bandwidth: None,
1090 spreading_factor: None,
1091 coding_rate: None,
1092 modulation: None,
1093 channel: None,
1094 ifac_netname: None,
1095 ifac_netkey: None,
1096 config_entry: None,
1097 discovery_hash: [1u8; 32],
1098 },
1099 DiscoveredInterface {
1100 interface_type: "I2PInterface".into(),
1101 transport: false,
1102 name: "high-value-available".into(),
1103 discovered: now,
1104 last_heard: now - 10.0, heard_count: 0,
1106 status: DiscoveredStatus::Available,
1107 stamp: vec![],
1108 stamp_value: 20,
1109 transport_id: [0u8; 16],
1110 network_id: [0u8; 16],
1111 hops: 0,
1112 latitude: None,
1113 longitude: None,
1114 height: None,
1115 reachable_on: None,
1116 port: None,
1117 frequency: None,
1118 bandwidth: None,
1119 spreading_factor: None,
1120 coding_rate: None,
1121 modulation: None,
1122 channel: None,
1123 ifac_netname: None,
1124 ifac_netkey: None,
1125 config_entry: None,
1126 discovery_hash: [2u8; 32],
1127 },
1128 ];
1129
1130 let mut result = ifaces.clone();
1132 filter_and_sort_interfaces(&mut result, false, false);
1133 assert_eq!(result.len(), 3);
1134 assert_eq!(result[0].name, "high-value-available");
1136 assert_eq!(result[1].name, "low-value-available");
1137 assert_eq!(result[2].name, "high-value-stale");
1138
1139 let mut result = ifaces.clone();
1141 filter_and_sort_interfaces(&mut result, true, false);
1142 assert_eq!(result.len(), 2); let mut result = ifaces.clone();
1146 filter_and_sort_interfaces(&mut result, false, true);
1147 assert_eq!(result.len(), 2); }
1149
1150 #[test]
1151 fn test_discovery_name_hash_deterministic() {
1152 let h1 = discovery_name_hash();
1153 let h2 = discovery_name_hash();
1154 assert_eq!(h1, h2);
1155 assert_ne!(h1, [0u8; 10]); }
1157}