Skip to main content

hashtree_fips_transport/
lib.rs

1//! Hashtree blob exchange over FIPS endpoint bytes.
2//!
3//! FIPS owns peer discovery, signaling, and underlay transports. This crate
4//! keeps the Hashtree side to verified blob request/response frames carried as
5//! app-owned endpoint bytes.
6
7use async_trait::async_trait;
8use fips_core::config::{NostrDiscoveryPolicy, PeerAddress, RoutingMode, TransportInstances};
9use hashtree_core::{Hash, MemoryStore, Store, StoreError};
10pub use hashtree_network::PubsubPublishStats;
11use hashtree_network::{
12    transport::{PeerLink, PeerLinkFactory, SignalingTransport, TransportError},
13    MeshRouter, MeshRoutingConfig, MeshStoreCore, PoolSettings, PubsubDeliveryMode,
14    SignalingMessage,
15};
16use serde::{Deserialize, Serialize};
17use sha2::{Digest, Sha256};
18use std::collections::{HashMap, VecDeque};
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::sync::Arc;
21use thiserror::Error;
22use tokio::sync::{broadcast, mpsc, oneshot, Mutex, Notify, RwLock};
23use tokio::task::JoinHandle;
24use tokio::time::{timeout, Duration};
25
26pub const DEFAULT_FIPS_DISCOVERY_SCOPE: &str = "hashtree-v1";
27pub const DEFAULT_FIPS_REQUEST_TIMEOUT: Duration = Duration::from_millis(5_500);
28pub const DEFAULT_FIPS_REQUEST_RETRY_INTERVAL: Duration = Duration::from_millis(750);
29pub const DEFAULT_FIPS_REQUEST_MAX_ATTEMPTS: usize = 4;
30pub const FIPS_RESPONSE_FRAGMENT_SIZE: usize = 1024;
31pub const FIPS_APP_FRAGMENT_SIZE: usize = 768;
32pub const MAX_HTL: u8 = 10;
33pub const DEFAULT_FIPS_WEBRTC_MAX_CONNECTIONS: usize = 512;
34
35const MSG_TYPE_REQUEST: u8 = 0x00;
36const MSG_TYPE_RESPONSE: u8 = 0x01;
37const MSG_TYPE_APP: u8 = 0x7f;
38const MAX_RESPONSE_FRAGMENTS: u32 = 16_384;
39const FIPS_MESH_SIGNALING_TOPIC: &str = "hashtree/fips/mesh/signaling/v1";
40const FIPS_MESH_DATA_TOPIC: &str = "hashtree/fips/mesh/data/v1";
41const FIPS_MESH_PUMP_INTERVAL: Duration = Duration::from_millis(20);
42const FIPS_MESH_HELLO_INTERVAL: Duration = Duration::from_secs(2);
43
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub struct FipsEndpointPacket {
46    pub peer_id: String,
47    pub data: Vec<u8>,
48}
49
50#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
51pub struct FipsRelayStatus {
52    pub url: String,
53    pub status: String,
54}
55
56#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
57pub struct FipsPeerStatus {
58    pub npub: String,
59    pub transport_addr: Option<String>,
60    pub transport_type: Option<String>,
61    pub srtt_ms: Option<u64>,
62    pub packets_sent: u64,
63    pub packets_recv: u64,
64    pub bytes_sent: u64,
65    pub bytes_recv: u64,
66}
67
68#[derive(Debug, Error)]
69pub enum FipsTransportError {
70    #[error("endpoint failed: {0}")]
71    Endpoint(String),
72    #[error("endpoint send failed: {0}")]
73    Send(String),
74    #[error("wire decode failed: {0}")]
75    Wire(String),
76    #[error("store error: {0}")]
77    Store(#[from] StoreError),
78}
79
80#[derive(Debug, Clone, PartialEq, Eq)]
81pub struct FipsAppMessage {
82    pub peer_id: String,
83    pub topic: String,
84    pub data: Vec<u8>,
85}
86
87#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct FipsPeerConfig {
89    pub npub: String,
90    pub udp_addresses: Vec<String>,
91}
92
93impl FipsPeerConfig {
94    pub fn new(npub: impl Into<String>) -> Self {
95        Self {
96            npub: npub.into(),
97            udp_addresses: Vec::new(),
98        }
99    }
100}
101
102#[async_trait]
103pub trait FipsEndpointIo: Send + Sync {
104    async fn send(&self, peer_id: &str, data: Vec<u8>) -> Result<(), FipsTransportError>;
105    async fn recv(&self) -> Option<FipsEndpointPacket>;
106    async fn set_peer_configs(
107        &self,
108        peer_configs: Vec<FipsPeerConfig>,
109    ) -> Result<(), FipsTransportError> {
110        self.set_peer_ids(peer_configs.into_iter().map(|peer| peer.npub).collect())
111            .await
112    }
113    async fn set_peer_ids(&self, _peer_ids: Vec<String>) -> Result<(), FipsTransportError> {
114        Ok(())
115    }
116    async fn peer_ids(&self) -> Vec<String> {
117        Vec::new()
118    }
119    async fn peer_statuses(&self) -> Vec<FipsPeerStatus> {
120        Vec::new()
121    }
122    async fn relay_statuses(&self) -> Vec<FipsRelayStatus> {
123        Vec::new()
124    }
125    fn local_peer_id(&self) -> Option<String> {
126        None
127    }
128}
129
130#[derive(Debug, Clone)]
131pub struct FipsEndpointOptions {
132    pub identity_nsec: String,
133    pub discovery_scope: String,
134    pub relays: Vec<String>,
135    pub enable_udp: bool,
136    pub enable_webrtc: bool,
137    pub udp_bind_addr: Option<String>,
138    pub udp_public: bool,
139    pub udp_external_addr: Option<String>,
140    pub webrtc_auto_connect: bool,
141    pub webrtc_max_connections: usize,
142    pub open_discovery_max_pending: usize,
143    pub packet_channel_capacity: usize,
144}
145
146impl FipsEndpointOptions {
147    pub fn new(identity_nsec: impl Into<String>) -> Self {
148        Self {
149            identity_nsec: identity_nsec.into(),
150            discovery_scope: DEFAULT_FIPS_DISCOVERY_SCOPE.to_string(),
151            relays: Vec::new(),
152            enable_udp: true,
153            enable_webrtc: true,
154            udp_bind_addr: None,
155            udp_public: false,
156            udp_external_addr: None,
157            webrtc_auto_connect: false,
158            webrtc_max_connections: DEFAULT_FIPS_WEBRTC_MAX_CONNECTIONS,
159            open_discovery_max_pending: 0,
160            packet_channel_capacity: 1024,
161        }
162    }
163}
164
165pub struct BoundFipsEndpoint {
166    pub endpoint: Arc<dyn FipsEndpointIo>,
167    pub local_peer_id: String,
168    pub discovery_scope: String,
169}
170
171pub async fn bind_fips_endpoint(
172    options: FipsEndpointOptions,
173) -> Result<BoundFipsEndpoint, FipsTransportError> {
174    if !options.enable_udp && !options.enable_webrtc {
175        return Err(FipsTransportError::Endpoint(
176            "at least one FIPS transport must be enabled".to_string(),
177        ));
178    }
179
180    let discovery_scope = if options.discovery_scope.trim().is_empty() {
181        DEFAULT_FIPS_DISCOVERY_SCOPE.to_string()
182    } else {
183        options.discovery_scope.trim().to_string()
184    };
185    let packet_channel_capacity = options.packet_channel_capacity;
186    let config = fips_endpoint_config(options, &discovery_scope);
187
188    let endpoint = fips_core::FipsEndpoint::builder()
189        .config(config)
190        .discovery_scope(discovery_scope.clone())
191        .without_system_tun()
192        .packet_channel_capacity(packet_channel_capacity)
193        .bind()
194        .await
195        .map_err(|err| FipsTransportError::Endpoint(err.to_string()))?;
196    let local_peer_id = endpoint.npub().to_string();
197
198    Ok(BoundFipsEndpoint {
199        endpoint: Arc::new(endpoint),
200        local_peer_id,
201        discovery_scope,
202    })
203}
204
205fn fips_endpoint_config(options: FipsEndpointOptions, discovery_scope: &str) -> fips_core::Config {
206    let mut config = fips_core::Config::new();
207    config.node.identity = fips_core::IdentityConfig {
208        nsec: Some(options.identity_nsec),
209        persistent: false,
210    };
211    config.node.routing.mode = RoutingMode::ReplyLearned;
212    config.node.limits.max_peers = options.webrtc_max_connections.max(1);
213    config.node.limits.max_links = options.webrtc_max_connections.saturating_mul(2).max(1);
214    config.node.limits.max_connections = options.webrtc_max_connections.saturating_mul(2).max(1);
215    config.node.limits.max_pending_inbound =
216        options.webrtc_max_connections.saturating_mul(4).max(1);
217    config.tun.enabled = false;
218    config.dns.enabled = false;
219    config.node.system_files_enabled = false;
220    config.node.discovery.lan.scope = Some(discovery_scope.to_string());
221    config.node.discovery.nostr.enabled = true;
222    config.node.discovery.nostr.advertise = true;
223    config.node.discovery.nostr.policy = if options.open_discovery_max_pending == 0 {
224        NostrDiscoveryPolicy::ConfiguredOnly
225    } else {
226        NostrDiscoveryPolicy::Open
227    };
228    config.node.discovery.nostr.open_discovery_max_pending = options.open_discovery_max_pending;
229    config.node.discovery.nostr.share_local_candidates = true;
230    config.node.discovery.nostr.app = discovery_scope.to_string();
231    if !options.relays.is_empty() {
232        config.node.discovery.nostr.advert_relays = options.relays.clone();
233        config.node.discovery.nostr.dm_relays = options.relays;
234    }
235
236    if options.enable_udp {
237        config.transports.udp = TransportInstances::Single(fips_core::UdpConfig {
238            bind_addr: Some(
239                options
240                    .udp_bind_addr
241                    .filter(|addr| !addr.trim().is_empty())
242                    .unwrap_or_else(|| "0.0.0.0:0".to_string()),
243            ),
244            advertise_on_nostr: Some(true),
245            public: Some(options.udp_public),
246            external_addr: options
247                .udp_external_addr
248                .filter(|addr| !addr.trim().is_empty()),
249            outbound_only: Some(false),
250            accept_connections: Some(true),
251            ..Default::default()
252        });
253    }
254
255    if options.enable_webrtc {
256        config.transports.webrtc = TransportInstances::Single(fips_core::WebRtcConfig {
257            advertise_on_nostr: Some(true),
258            auto_connect: Some(options.webrtc_auto_connect),
259            accept_connections: Some(true),
260            max_connections: Some(options.webrtc_max_connections.max(1)),
261            ..Default::default()
262        });
263    }
264
265    // Some shared bootstrap peers expose tcp:443 for UDP-hostile networks.
266    // Binding stays disabled by default, so this is outbound-only.
267    config.transports.tcp = TransportInstances::Single(Default::default());
268
269    config
270}
271
272fn peer_address_from_configured_addr(raw: &str) -> Option<PeerAddress> {
273    let trimmed = raw.trim();
274    if trimmed.is_empty() {
275        return None;
276    }
277    let (transport, addr) = split_configured_transport_addr(trimmed);
278    Some(PeerAddress::new(transport, addr))
279}
280
281fn split_configured_transport_addr(value: &str) -> (&str, &str) {
282    let Some((transport, addr)) = value.split_once(':') else {
283        return ("udp", value);
284    };
285    match transport.to_ascii_lowercase().as_str() {
286        "udp" | "tcp" | "webrtc" | "tor" | "ethernet" | "ble" => (transport, addr),
287        _ => ("udp", value),
288    }
289}
290
291fn sanitize_peer_configs(
292    local: Option<&str>,
293    peers: Vec<FipsPeerConfig>,
294    seen: &mut std::collections::HashSet<String>,
295) -> Vec<FipsPeerConfig> {
296    let mut out = Vec::new();
297    for peer in peers {
298        let npub = peer.npub.trim().to_string();
299        if npub.is_empty() || Some(npub.as_str()) == local || !seen.insert(npub.clone()) {
300            continue;
301        }
302        let udp_addresses = peer
303            .udp_addresses
304            .into_iter()
305            .map(|addr| addr.trim().to_string())
306            .filter(|addr| !addr.is_empty())
307            .collect();
308        out.push(FipsPeerConfig {
309            npub,
310            udp_addresses,
311        });
312    }
313    out
314}
315
316#[async_trait]
317impl FipsEndpointIo for fips_core::FipsEndpoint {
318    async fn send(&self, peer_id: &str, data: Vec<u8>) -> Result<(), FipsTransportError> {
319        self.send(peer_id.to_string(), data)
320            .await
321            .map_err(|err| FipsTransportError::Send(err.to_string()))
322    }
323
324    async fn recv(&self) -> Option<FipsEndpointPacket> {
325        loop {
326            let message = fips_core::FipsEndpoint::recv(self).await?;
327            if let Some(peer_id) = message.source_npub {
328                return Some(FipsEndpointPacket {
329                    peer_id,
330                    data: message.data,
331                });
332            }
333        }
334    }
335
336    async fn peer_ids(&self) -> Vec<String> {
337        match self.peers().await {
338            Ok(peers) => peers.into_iter().map(|peer| peer.npub).collect(),
339            Err(_) => Vec::new(),
340        }
341    }
342
343    async fn peer_statuses(&self) -> Vec<FipsPeerStatus> {
344        match self.peers().await {
345            Ok(peers) => peers
346                .into_iter()
347                .map(|peer| FipsPeerStatus {
348                    npub: peer.npub,
349                    transport_addr: peer.transport_addr,
350                    transport_type: peer.transport_type,
351                    srtt_ms: peer.srtt_ms,
352                    packets_sent: peer.packets_sent,
353                    packets_recv: peer.packets_recv,
354                    bytes_sent: peer.bytes_sent,
355                    bytes_recv: peer.bytes_recv,
356                })
357                .collect(),
358            Err(_) => Vec::new(),
359        }
360    }
361
362    async fn relay_statuses(&self) -> Vec<FipsRelayStatus> {
363        match fips_core::FipsEndpoint::relay_statuses(self).await {
364            Ok(statuses) => statuses
365                .into_iter()
366                .map(|status| FipsRelayStatus {
367                    url: status.url,
368                    status: status.status,
369                })
370                .collect(),
371            Err(_) => Vec::new(),
372        }
373    }
374
375    async fn set_peer_ids(&self, peer_ids: Vec<String>) -> Result<(), FipsTransportError> {
376        self.set_peer_configs(
377            peer_ids
378                .into_iter()
379                .map(FipsPeerConfig::new)
380                .collect::<Vec<_>>(),
381        )
382        .await
383    }
384
385    async fn set_peer_configs(
386        &self,
387        peer_configs: Vec<FipsPeerConfig>,
388    ) -> Result<(), FipsTransportError> {
389        let peers: Vec<fips_core::config::PeerConfig> = peer_configs
390            .into_iter()
391            .map(|peer| fips_core::config::PeerConfig {
392                npub: peer.npub,
393                addresses: peer
394                    .udp_addresses
395                    .into_iter()
396                    .filter_map(|addr| peer_address_from_configured_addr(&addr))
397                    .collect(),
398                ..Default::default()
399            })
400            .collect();
401        let peer_count = peers.len();
402        match self.update_peers(peers).await {
403            Ok(outcome) => {
404                tracing::info!(
405                    peer_count,
406                    added = outcome.added,
407                    removed = outcome.removed,
408                    updated = outcome.updated,
409                    unchanged = outcome.unchanged,
410                    "updated FIPS endpoint peer configs"
411                );
412                Ok(())
413            }
414            Err(err) => {
415                tracing::warn!(
416                    peer_count,
417                    error = %err,
418                    "failed to update FIPS endpoint peer configs"
419                );
420                Err(FipsTransportError::Endpoint(err.to_string()))
421            }
422        }
423    }
424
425    fn local_peer_id(&self) -> Option<String> {
426        Some(self.npub().to_string())
427    }
428}
429
430#[derive(Debug, Clone, Serialize, Deserialize)]
431struct DataRequest {
432    #[serde(with = "serde_bytes")]
433    h: Vec<u8>,
434    #[serde(default = "default_htl", skip_serializing_if = "is_max_htl")]
435    htl: u8,
436}
437
438#[derive(Debug, Clone, Serialize, Deserialize)]
439struct DataResponse {
440    #[serde(with = "serde_bytes")]
441    h: Vec<u8>,
442    #[serde(with = "serde_bytes")]
443    d: Vec<u8>,
444    #[serde(default, skip_serializing_if = "Option::is_none")]
445    i: Option<u32>,
446    #[serde(default, skip_serializing_if = "Option::is_none")]
447    n: Option<u32>,
448}
449
450#[derive(Debug, Clone, Serialize, Deserialize)]
451struct AppPacket {
452    t: String,
453    #[serde(with = "serde_bytes")]
454    d: Vec<u8>,
455    #[serde(default, skip_serializing_if = "Option::is_none")]
456    id: Option<String>,
457    #[serde(default, skip_serializing_if = "Option::is_none")]
458    i: Option<u32>,
459    #[serde(default, skip_serializing_if = "Option::is_none")]
460    n: Option<u32>,
461}
462
463enum Message {
464    Request(DataRequest),
465    Response(DataResponse),
466    App(AppPacket),
467}
468
469fn default_htl() -> u8 {
470    MAX_HTL
471}
472
473fn is_max_htl(htl: &u8) -> bool {
474    *htl == MAX_HTL
475}
476
477fn hash_key(hash: &Hash) -> String {
478    hex::encode(hash)
479}
480
481fn bytes_to_hash(bytes: &[u8]) -> Option<Hash> {
482    if bytes.len() != 32 {
483        return None;
484    }
485    let mut hash = [0u8; 32];
486    hash.copy_from_slice(bytes);
487    Some(hash)
488}
489
490fn verify_hash(data: &[u8], hash: &Hash) -> bool {
491    let digest = Sha256::digest(data);
492    digest.as_slice() == hash
493}
494
495fn remaining_until(deadline: tokio::time::Instant) -> Duration {
496    deadline
497        .checked_duration_since(tokio::time::Instant::now())
498        .unwrap_or_default()
499}
500
501fn encode_request(hash: &Hash, htl: u8) -> Result<Vec<u8>, FipsTransportError> {
502    let body = rmp_serde::to_vec_named(&DataRequest {
503        h: hash.to_vec(),
504        htl,
505    })
506    .map_err(|err| FipsTransportError::Wire(err.to_string()))?;
507    let mut out = Vec::with_capacity(1 + body.len());
508    out.push(MSG_TYPE_REQUEST);
509    out.extend(body);
510    Ok(out)
511}
512
513fn encode_response(hash: &Hash, data: &[u8]) -> Result<Vec<u8>, FipsTransportError> {
514    let body = rmp_serde::to_vec_named(&DataResponse {
515        h: hash.to_vec(),
516        d: data.to_vec(),
517        i: None,
518        n: None,
519    })
520    .map_err(|err| FipsTransportError::Wire(err.to_string()))?;
521    let mut out = Vec::with_capacity(1 + body.len());
522    out.push(MSG_TYPE_RESPONSE);
523    out.extend(body);
524    Ok(out)
525}
526
527fn encode_fragment_response(
528    hash: &Hash,
529    data: &[u8],
530    index: u32,
531    total: u32,
532) -> Result<Vec<u8>, FipsTransportError> {
533    let body = rmp_serde::to_vec_named(&DataResponse {
534        h: hash.to_vec(),
535        d: data.to_vec(),
536        i: Some(index),
537        n: Some(total),
538    })
539    .map_err(|err| FipsTransportError::Wire(err.to_string()))?;
540    let mut out = Vec::with_capacity(1 + body.len());
541    out.push(MSG_TYPE_RESPONSE);
542    out.extend(body);
543    Ok(out)
544}
545
546fn encode_app_message(topic: &str, data: &[u8]) -> Result<Vec<u8>, FipsTransportError> {
547    encode_app_packet(topic, data, None, None, None)
548}
549
550fn encode_app_packet(
551    topic: &str,
552    data: &[u8],
553    id: Option<String>,
554    index: Option<u32>,
555    total: Option<u32>,
556) -> Result<Vec<u8>, FipsTransportError> {
557    let topic = topic.trim();
558    if topic.is_empty() {
559        return Err(FipsTransportError::Wire(
560            "application message topic is empty".to_string(),
561        ));
562    }
563    let body = rmp_serde::to_vec_named(&AppPacket {
564        t: topic.to_string(),
565        d: data.to_vec(),
566        id,
567        i: index,
568        n: total,
569    })
570    .map_err(|err| FipsTransportError::Wire(err.to_string()))?;
571    let mut out = Vec::with_capacity(1 + body.len());
572    out.push(MSG_TYPE_APP);
573    out.extend(body);
574    Ok(out)
575}
576
577fn encode_app_messages(topic: &str, data: &[u8]) -> Result<Vec<Vec<u8>>, FipsTransportError> {
578    if data.len() <= FIPS_APP_FRAGMENT_SIZE {
579        return encode_app_message(topic, data).map(|packet| vec![packet]);
580    }
581
582    let total = data.len().div_ceil(FIPS_APP_FRAGMENT_SIZE);
583    if total > MAX_RESPONSE_FRAGMENTS as usize {
584        return Err(FipsTransportError::Wire(format!(
585            "application message has too many fragments: {total}"
586        )));
587    }
588
589    let mut hasher = Sha256::new();
590    hasher.update(topic.trim().as_bytes());
591    hasher.update([0]);
592    hasher.update(data);
593    let id = hex::encode(hasher.finalize());
594    let mut out = Vec::with_capacity(total);
595    for (index, chunk) in data.chunks(FIPS_APP_FRAGMENT_SIZE).enumerate() {
596        out.push(encode_app_packet(
597            topic,
598            chunk,
599            Some(id.clone()),
600            Some(index as u32),
601            Some(total as u32),
602        )?);
603    }
604    Ok(out)
605}
606
607fn parse_message(data: &[u8]) -> Result<Option<Message>, FipsTransportError> {
608    let Some((&kind, body)) = data.split_first() else {
609        return Ok(None);
610    };
611    match kind {
612        MSG_TYPE_REQUEST => rmp_serde::from_slice::<DataRequest>(body)
613            .map(|req| Some(Message::Request(req)))
614            .map_err(|err| FipsTransportError::Wire(err.to_string())),
615        MSG_TYPE_RESPONSE => rmp_serde::from_slice::<DataResponse>(body)
616            .map(|resp| Some(Message::Response(resp)))
617            .map_err(|err| FipsTransportError::Wire(err.to_string())),
618        MSG_TYPE_APP => rmp_serde::from_slice::<AppPacket>(body)
619            .map(|packet| Some(Message::App(packet)))
620            .map_err(|err| FipsTransportError::Wire(err.to_string())),
621        _ => Ok(None),
622    }
623}
624
625struct PendingRequest {
626    resolve: oneshot::Sender<Option<Vec<u8>>>,
627}
628
629struct ResponseReassembly {
630    total: u32,
631    fragments: HashMap<u32, Vec<u8>>,
632    received_bytes: usize,
633}
634
635struct AppReassembly {
636    total: u32,
637    fragments: HashMap<u32, Vec<u8>>,
638    received_bytes: usize,
639    topic: String,
640}
641
642pub struct HashtreeFipsTransport<S: Store + Send + Sync + 'static = MemoryStore> {
643    endpoint: Arc<dyn FipsEndpointIo>,
644    local_store: Arc<S>,
645    peers: Arc<RwLock<Vec<String>>>,
646    peer_filter_configured: Arc<RwLock<bool>>,
647    unconfigured_app_message_topics: Vec<String>,
648    pending: Arc<Mutex<HashMap<String, Vec<PendingRequest>>>>,
649    response_fragments: Arc<Mutex<HashMap<String, ResponseReassembly>>>,
650    app_fragments: Arc<Mutex<HashMap<String, AppReassembly>>>,
651    app_messages: broadcast::Sender<FipsAppMessage>,
652    request_timeout: Duration,
653    request_retry_interval: Duration,
654    request_max_attempts: usize,
655    request_htl: u8,
656    cache_responses: bool,
657}
658
659impl HashtreeFipsTransport<MemoryStore> {
660    pub fn in_memory(endpoint: Arc<dyn FipsEndpointIo>) -> Self {
661        Self::new(endpoint, Arc::new(MemoryStore::new()))
662    }
663}
664
665impl<S: Store + Send + Sync + 'static> HashtreeFipsTransport<S> {
666    pub fn new(endpoint: Arc<dyn FipsEndpointIo>, local_store: Arc<S>) -> Self {
667        let (app_messages, _) = broadcast::channel(256);
668        Self {
669            endpoint,
670            local_store,
671            peers: Arc::new(RwLock::new(Vec::new())),
672            peer_filter_configured: Arc::new(RwLock::new(false)),
673            unconfigured_app_message_topics: Vec::new(),
674            pending: Arc::new(Mutex::new(HashMap::new())),
675            response_fragments: Arc::new(Mutex::new(HashMap::new())),
676            app_fragments: Arc::new(Mutex::new(HashMap::new())),
677            app_messages,
678            request_timeout: DEFAULT_FIPS_REQUEST_TIMEOUT,
679            request_retry_interval: DEFAULT_FIPS_REQUEST_RETRY_INTERVAL,
680            request_max_attempts: DEFAULT_FIPS_REQUEST_MAX_ATTEMPTS,
681            request_htl: MAX_HTL,
682            cache_responses: true,
683        }
684    }
685
686    pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
687        self.request_timeout = timeout;
688        self
689    }
690
691    pub fn with_request_retry_interval(mut self, interval: Duration) -> Self {
692        self.request_retry_interval = if interval.is_zero() {
693            Duration::from_millis(1)
694        } else {
695            interval
696        };
697        self
698    }
699
700    pub fn with_request_max_attempts(mut self, attempts: usize) -> Self {
701        self.request_max_attempts = attempts.max(1);
702        self
703    }
704
705    pub fn with_request_htl(mut self, htl: u8) -> Self {
706        self.request_htl = htl;
707        self
708    }
709
710    pub fn with_cache_responses(mut self, cache_responses: bool) -> Self {
711        self.cache_responses = cache_responses;
712        self
713    }
714
715    pub fn with_unconfigured_app_message_topics<T, I>(mut self, topics: I) -> Self
716    where
717        T: Into<String>,
718        I: IntoIterator<Item = T>,
719    {
720        self.unconfigured_app_message_topics = topics.into_iter().map(Into::into).collect();
721        self
722    }
723
724    pub async fn set_peers(&self, peers: Vec<String>) {
725        self.set_peer_configs(peers.into_iter().map(FipsPeerConfig::new).collect())
726            .await;
727    }
728
729    pub async fn set_peer_configs(&self, peers: Vec<FipsPeerConfig>) {
730        self.set_peer_configs_with_routing_peers(peers, Vec::new())
731            .await;
732    }
733
734    pub async fn set_peer_configs_with_routing_peers(
735        &self,
736        application_peers: Vec<FipsPeerConfig>,
737        routing_peers: Vec<FipsPeerConfig>,
738    ) {
739        let local = self.endpoint.local_peer_id();
740        let mut seen = std::collections::HashSet::new();
741        let app_out = sanitize_peer_configs(local.as_deref(), application_peers, &mut seen);
742        let routing_out = sanitize_peer_configs(local.as_deref(), routing_peers, &mut seen);
743        let mut endpoint_out = app_out.clone();
744        endpoint_out.extend(routing_out.clone());
745        let configured_count = endpoint_out.len();
746        let application_count = app_out.len();
747        let routing_only_count = routing_out.len();
748        let udp_hint_count: usize = endpoint_out
749            .iter()
750            .map(|peer| peer.udp_addresses.len())
751            .sum();
752        match self.endpoint.set_peer_configs(endpoint_out.clone()).await {
753            Ok(()) => {
754                tracing::info!(
755                    configured_count,
756                    application_count,
757                    routing_only_count,
758                    udp_hint_count,
759                    "configured Hashtree FIPS peers"
760                );
761                *self.peers.write().await = app_out.into_iter().map(|peer| peer.npub).collect();
762                *self.peer_filter_configured.write().await = true;
763            }
764            Err(error) => {
765                tracing::warn!(
766                    configured_count,
767                    application_count,
768                    routing_only_count,
769                    udp_hint_count,
770                    error = %error,
771                    "failed to configure Hashtree FIPS peers"
772                );
773            }
774        }
775    }
776
777    pub async fn peer_ids(&self) -> Vec<String> {
778        let configured = self.peers.read().await.clone();
779        if configured.is_empty() && !*self.peer_filter_configured.read().await {
780            self.endpoint.peer_ids().await
781        } else {
782            configured
783        }
784    }
785
786    pub async fn configured_peer_ids(&self) -> Vec<String> {
787        self.peers.read().await.clone()
788    }
789
790    pub async fn connected_peer_ids(&self) -> Vec<String> {
791        self.endpoint.peer_ids().await
792    }
793
794    pub async fn peer_statuses(&self) -> Vec<FipsPeerStatus> {
795        self.endpoint.peer_statuses().await
796    }
797
798    pub async fn relay_statuses(&self) -> Vec<FipsRelayStatus> {
799        self.endpoint.relay_statuses().await
800    }
801
802    pub fn subscribe_app_messages(&self) -> broadcast::Receiver<FipsAppMessage> {
803        self.app_messages.subscribe()
804    }
805
806    pub async fn send_app_message(
807        &self,
808        peer_id: &str,
809        topic: &str,
810        data: Vec<u8>,
811    ) -> Result<(), FipsTransportError> {
812        for packet in encode_app_messages(topic, &data)? {
813            self.endpoint.send(peer_id, packet).await?;
814        }
815        Ok(())
816    }
817
818    pub async fn broadcast_app_message(
819        &self,
820        topic: &str,
821        data: Vec<u8>,
822    ) -> Result<usize, FipsTransportError> {
823        let packets = encode_app_messages(topic, &data)?;
824        let mut sent = 0usize;
825        for peer in self.peer_ids().await {
826            let mut peer_sent = true;
827            for packet in &packets {
828                if self.endpoint.send(&peer, packet.clone()).await.is_err() {
829                    peer_sent = false;
830                    break;
831                }
832            }
833            if peer_sent {
834                sent += 1;
835            }
836        }
837        Ok(sent)
838    }
839
840    pub fn start(self: &Arc<Self>) -> JoinHandle<()> {
841        let this = self.clone();
842        tokio::spawn(async move {
843            while let Some(packet) = this.endpoint.recv().await {
844                let _ = this.handle_packet(packet).await;
845            }
846        })
847    }
848
849    pub async fn get_from_peers(
850        &self,
851        hash: &Hash,
852        peers: &[String],
853    ) -> Result<Option<Vec<u8>>, FipsTransportError> {
854        if let Some(data) = self.local_store.get(hash).await? {
855            if verify_hash(&data, hash) {
856                return Ok(Some(data));
857            }
858        }
859        if peers.is_empty() {
860            return Ok(None);
861        }
862
863        let key = hash_key(hash);
864        let (tx, rx) = oneshot::channel();
865        self.pending
866            .lock()
867            .await
868            .entry(key.clone())
869            .or_default()
870            .push(PendingRequest { resolve: tx });
871
872        let payload = encode_request(hash, self.request_htl)?;
873        let deadline = tokio::time::Instant::now() + self.request_timeout;
874        let mut rx = rx;
875        for attempt in 0..self.request_max_attempts {
876            if attempt > 0 && remaining_until(deadline).is_zero() {
877                break;
878            }
879            let sent = self.send_request_to_peers(peers, &payload).await;
880            if sent == 0 && attempt == 0 {
881                self.resolve_pending(&key, None).await;
882                return Ok(None);
883            }
884            if attempt + 1 >= self.request_max_attempts {
885                break;
886            }
887            let remaining = remaining_until(deadline);
888            if remaining.is_zero() {
889                break;
890            }
891            match timeout(self.request_retry_interval.min(remaining), &mut rx).await {
892                Ok(Ok(result)) => return Ok(result),
893                Ok(Err(_)) => return Ok(None),
894                Err(_) => {
895                    if remaining_until(deadline).is_zero() {
896                        break;
897                    }
898                }
899            }
900        }
901
902        match timeout(remaining_until(deadline), rx).await {
903            Ok(Ok(result)) => Ok(result),
904            _ => {
905                self.remove_pending_sender(&key).await;
906                Ok(None)
907            }
908        }
909    }
910
911    async fn send_request_to_peers(&self, peers: &[String], payload: &[u8]) -> usize {
912        let mut sent = 0usize;
913        for peer in peers {
914            if self.endpoint.send(peer, payload.to_vec()).await.is_ok() {
915                sent += 1;
916            }
917        }
918        sent
919    }
920
921    async fn handle_packet(&self, packet: FipsEndpointPacket) -> Result<(), FipsTransportError> {
922        let Some(message) = parse_message(&packet.data)? else {
923            return Ok(());
924        };
925        let unconfigured_app_message_allowed = match &message {
926            Message::App(app) => self
927                .unconfigured_app_message_topics
928                .iter()
929                .any(|topic| topic == &app.t),
930            _ => false,
931        };
932        let is_application_peer = self.is_application_peer(&packet.peer_id).await;
933        if !is_application_peer && !unconfigured_app_message_allowed {
934            return Ok(());
935        }
936        match message {
937            Message::Request(req) => {
938                let Some(hash) = bytes_to_hash(&req.h) else {
939                    return Ok(());
940                };
941                let Some(data) = self.local_store.get(&hash).await? else {
942                    return Ok(());
943                };
944                if !verify_hash(&data, &hash) {
945                    return Ok(());
946                }
947                self.send_response(&packet.peer_id, &hash, &data).await?;
948            }
949            Message::Response(resp) => {
950                let Some(hash) = bytes_to_hash(&resp.h) else {
951                    return Ok(());
952                };
953                let key = hash_key(&hash);
954                if !self.pending.lock().await.contains_key(&key) {
955                    return Ok(());
956                }
957                let Some(data) = self.response_data_from_message(&key, resp).await else {
958                    return Ok(());
959                };
960                if !verify_hash(&data, &hash) {
961                    return Ok(());
962                }
963                if self.cache_responses {
964                    let _ = self.local_store.put(hash, data.clone()).await;
965                }
966                self.resolve_pending(&key, Some(data)).await;
967            }
968            Message::App(app) => {
969                if app.t.trim().is_empty() {
970                    return Ok(());
971                }
972                if let Some((topic, data)) = self.app_data_from_message(&packet.peer_id, app).await
973                {
974                    let _ = self.app_messages.send(FipsAppMessage {
975                        peer_id: packet.peer_id,
976                        topic,
977                        data,
978                    });
979                }
980            }
981        }
982        Ok(())
983    }
984
985    async fn is_application_peer(&self, peer_id: &str) -> bool {
986        let peers = self.peers.read().await;
987        if peers.is_empty() && !*self.peer_filter_configured.read().await {
988            return true;
989        }
990        peers.iter().any(|peer| peer == peer_id)
991    }
992
993    async fn send_response(
994        &self,
995        peer_id: &str,
996        hash: &Hash,
997        data: &[u8],
998    ) -> Result<(), FipsTransportError> {
999        if data.len() <= FIPS_RESPONSE_FRAGMENT_SIZE {
1000            self.endpoint
1001                .send(peer_id, encode_response(hash, data)?)
1002                .await?;
1003            return Ok(());
1004        }
1005
1006        let total = data.len().div_ceil(FIPS_RESPONSE_FRAGMENT_SIZE) as u32;
1007        for index in 0..total {
1008            let start = index as usize * FIPS_RESPONSE_FRAGMENT_SIZE;
1009            let end = (start + FIPS_RESPONSE_FRAGMENT_SIZE).min(data.len());
1010            self.endpoint
1011                .send(
1012                    peer_id,
1013                    encode_fragment_response(hash, &data[start..end], index, total)?,
1014                )
1015                .await?;
1016        }
1017        Ok(())
1018    }
1019
1020    async fn response_data_from_message(&self, key: &str, resp: DataResponse) -> Option<Vec<u8>> {
1021        match (resp.i, resp.n) {
1022            (Some(index), Some(total)) => {
1023                self.reassemble_response_fragment(key, resp.d, index, total)
1024                    .await
1025            }
1026            (None, None) => Some(resp.d),
1027            _ => None,
1028        }
1029    }
1030
1031    async fn reassemble_response_fragment(
1032        &self,
1033        key: &str,
1034        data: Vec<u8>,
1035        index: u32,
1036        total: u32,
1037    ) -> Option<Vec<u8>> {
1038        if total == 0 || total > MAX_RESPONSE_FRAGMENTS || index >= total {
1039            return None;
1040        }
1041
1042        let mut fragments = self.response_fragments.lock().await;
1043        let entry = fragments
1044            .entry(key.to_string())
1045            .or_insert_with(|| ResponseReassembly {
1046                total,
1047                fragments: HashMap::new(),
1048                received_bytes: 0,
1049            });
1050        if entry.total != total {
1051            *entry = ResponseReassembly {
1052                total,
1053                fragments: HashMap::new(),
1054                received_bytes: 0,
1055            };
1056        }
1057
1058        if let std::collections::hash_map::Entry::Vacant(slot) = entry.fragments.entry(index) {
1059            entry.received_bytes += data.len();
1060            slot.insert(data);
1061        }
1062
1063        if entry.fragments.len() != entry.total as usize {
1064            return None;
1065        }
1066
1067        let mut assembled = Vec::with_capacity(entry.received_bytes);
1068        for fragment_index in 0..entry.total {
1069            let fragment = entry.fragments.get(&fragment_index)?;
1070            assembled.extend_from_slice(fragment);
1071        }
1072        fragments.remove(key);
1073        Some(assembled)
1074    }
1075
1076    async fn app_data_from_message(
1077        &self,
1078        peer_id: &str,
1079        app: AppPacket,
1080    ) -> Option<(String, Vec<u8>)> {
1081        match (app.id, app.i, app.n) {
1082            (None, None, None) => Some((app.t, app.d)),
1083            (Some(id), Some(index), Some(total)) => {
1084                self.reassemble_app_fragment(peer_id, app.t, id, app.d, index, total)
1085                    .await
1086            }
1087            _ => None,
1088        }
1089    }
1090
1091    async fn reassemble_app_fragment(
1092        &self,
1093        peer_id: &str,
1094        topic: String,
1095        id: String,
1096        data: Vec<u8>,
1097        index: u32,
1098        total: u32,
1099    ) -> Option<(String, Vec<u8>)> {
1100        if total == 0 || total > MAX_RESPONSE_FRAGMENTS || index >= total {
1101            return None;
1102        }
1103
1104        let key = format!("{peer_id}\0{topic}\0{id}");
1105        let mut fragments = self.app_fragments.lock().await;
1106        let entry = fragments
1107            .entry(key.clone())
1108            .or_insert_with(|| AppReassembly {
1109                total,
1110                fragments: HashMap::new(),
1111                received_bytes: 0,
1112                topic: topic.clone(),
1113            });
1114        if entry.total != total {
1115            *entry = AppReassembly {
1116                total,
1117                fragments: HashMap::new(),
1118                received_bytes: 0,
1119                topic: topic.clone(),
1120            };
1121        }
1122
1123        if let std::collections::hash_map::Entry::Vacant(slot) = entry.fragments.entry(index) {
1124            entry.received_bytes += data.len();
1125            slot.insert(data);
1126        }
1127
1128        if entry.fragments.len() != entry.total as usize {
1129            return None;
1130        }
1131
1132        let mut assembled = Vec::with_capacity(entry.received_bytes);
1133        for fragment_index in 0..entry.total {
1134            let fragment = entry.fragments.get(&fragment_index)?;
1135            assembled.extend_from_slice(fragment);
1136        }
1137        let topic = entry.topic.clone();
1138        fragments.remove(&key);
1139        Some((topic, assembled))
1140    }
1141
1142    async fn resolve_pending(&self, key: &str, data: Option<Vec<u8>>) {
1143        self.response_fragments.lock().await.remove(key);
1144        let pending = self.pending.lock().await.remove(key);
1145        if let Some(pending) = pending {
1146            for request in pending {
1147                let _ = request.resolve.send(data.clone());
1148            }
1149        }
1150    }
1151
1152    async fn remove_pending_sender(&self, key: &str) {
1153        let remove_fragments = {
1154            let mut pending = self.pending.lock().await;
1155            if let Some(requests) = pending.get_mut(key) {
1156                requests.retain(|request| !request.resolve.is_closed());
1157                if requests.is_empty() {
1158                    pending.remove(key);
1159                    true
1160                } else {
1161                    false
1162                }
1163            } else {
1164                false
1165            }
1166        };
1167        if remove_fragments {
1168            self.response_fragments.lock().await.remove(key);
1169        }
1170    }
1171}
1172
1173type FipsMeshStore<S> = MeshStoreCore<S, FipsMeshSignaling<S>, FipsMeshLinkFactory<S>>;
1174
1175/// Local pubsub payload delivered by the hashtree mesh core over FIPS links.
1176#[derive(Debug, Clone, PartialEq, Eq)]
1177pub struct FipsMeshPubsubEvent {
1178    pub stream_id: String,
1179    pub seq: u64,
1180    pub origin_peer_id: String,
1181    pub from_peer_id: String,
1182    pub payload: Vec<u8>,
1183}
1184
1185/// Hashtree mesh pubsub runtime backed by FIPS endpoint bytes.
1186pub struct FipsMeshPubsub<S: Store + Send + Sync + 'static> {
1187    store: Arc<FipsMeshStore<S>>,
1188    demux_task: JoinHandle<()>,
1189    pump_task: JoinHandle<()>,
1190}
1191
1192impl<S: Store + Send + Sync + 'static> FipsMeshPubsub<S> {
1193    /// Subscribe this node to a mesh pubsub stream.
1194    pub async fn subscribe_pubsub(&self, stream_id: impl Into<String>) -> PubsubPublishStats {
1195        self.store.subscribe_pubsub(stream_id.into()).await
1196    }
1197
1198    /// Publish bytes on a mesh pubsub stream with the given origin-local sequence.
1199    pub async fn publish_pubsub(
1200        &self,
1201        stream_id: impl Into<String>,
1202        seq: u64,
1203        payload: Vec<u8>,
1204    ) -> PubsubPublishStats {
1205        self.store
1206            .publish_pubsub(stream_id.into(), seq, payload)
1207            .await
1208    }
1209
1210    /// Drain pubsub events delivered to this node.
1211    pub async fn drain_pubsub_events(&self) -> Vec<FipsMeshPubsubEvent> {
1212        self.store
1213            .drain_pubsub_events()
1214            .await
1215            .into_iter()
1216            .map(|event| FipsMeshPubsubEvent {
1217                stream_id: event.stream_id,
1218                seq: event.seq,
1219                origin_peer_id: event.origin_peer_id,
1220                from_peer_id: event.from_peer_id,
1221                payload: event.payload,
1222            })
1223            .collect()
1224    }
1225
1226    /// Current mesh peer count known to the shared mesh core.
1227    pub async fn peer_count(&self) -> usize {
1228        self.store.peer_count().await
1229    }
1230
1231    /// Current mesh peer IDs known to the shared mesh core.
1232    pub async fn peer_ids(&self) -> Vec<String> {
1233        self.store.peer_ids().await
1234    }
1235}
1236
1237#[async_trait]
1238impl<S: Store + Send + Sync + 'static> Store for FipsMeshPubsub<S> {
1239    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
1240        self.store.put(hash, data).await
1241    }
1242
1243    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
1244        self.store.get(hash).await
1245    }
1246
1247    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
1248        self.store.has(hash).await
1249    }
1250
1251    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
1252        self.store.delete(hash).await
1253    }
1254}
1255
1256impl<S: Store + Send + Sync + 'static> Drop for FipsMeshPubsub<S> {
1257    fn drop(&mut self) {
1258        self.demux_task.abort();
1259        self.pump_task.abort();
1260    }
1261}
1262
1263impl<S: Store + Send + Sync + 'static> HashtreeFipsTransport<S> {
1264    /// Start a hashtree mesh pubsub runtime on top of this FIPS transport.
1265    ///
1266    /// FIPS remains responsible for discovery, authorization, and endpoint
1267    /// delivery. The shared mesh core handles pubsub interest floods,
1268    /// inventory/want routing, frame dedupe, and fanout scheduling.
1269    pub async fn start_mesh_pubsub(
1270        self: &Arc<Self>,
1271        local_store: Arc<S>,
1272        peer_id: String,
1273        request_timeout: Duration,
1274    ) -> Result<FipsMeshPubsub<S>, FipsTransportError> {
1275        let hub = FipsMeshLinkHub::default();
1276        let (signaling_tx, signaling_rx) = mpsc::unbounded_channel();
1277        let signaling_transport = Arc::new(FipsMeshSignaling {
1278            peer_id: peer_id.clone(),
1279            transport: self.clone(),
1280            rx: Mutex::new(signaling_rx),
1281            connected: AtomicBool::new(true),
1282        });
1283        let link_factory = Arc::new(FipsMeshLinkFactory {
1284            transport: self.clone(),
1285            hub: hub.clone(),
1286        });
1287        let router = Arc::new(MeshRouter::new(
1288            peer_id.clone(),
1289            signaling_transport.clone(),
1290            link_factory,
1291            PoolSettings::default(),
1292            false,
1293        ));
1294        let store = Arc::new(MeshStoreCore::new_with_routing(
1295            local_store,
1296            router,
1297            request_timeout,
1298            false,
1299            MeshRoutingConfig {
1300                pubsub_delivery_mode: PubsubDeliveryMode::HtlInvWant,
1301                ..Default::default()
1302            },
1303        ));
1304        store
1305            .start()
1306            .await
1307            .map_err(|error| FipsTransportError::Endpoint(error.to_string()))?;
1308
1309        let demux_task = spawn_fips_mesh_demux(self.clone(), peer_id, hub, signaling_tx);
1310        let pump_task = spawn_fips_mesh_pump(store.clone(), signaling_transport);
1311        Ok(FipsMeshPubsub {
1312            store,
1313            demux_task,
1314            pump_task,
1315        })
1316    }
1317}
1318
1319#[derive(Default, Clone)]
1320struct FipsMeshLinkHub {
1321    inboxes: Arc<Mutex<HashMap<String, Arc<FipsMeshInbox>>>>,
1322}
1323
1324impl FipsMeshLinkHub {
1325    async fn inbox(&self, peer_id: &str) -> Arc<FipsMeshInbox> {
1326        let mut inboxes = self.inboxes.lock().await;
1327        inboxes
1328            .entry(peer_id.to_string())
1329            .or_insert_with(|| Arc::new(FipsMeshInbox::default()))
1330            .clone()
1331    }
1332
1333    async fn push(&self, peer_id: &str, data: Vec<u8>) {
1334        self.inbox(peer_id).await.push(data).await;
1335    }
1336}
1337
1338struct FipsMeshInbox {
1339    queue: Mutex<VecDeque<Vec<u8>>>,
1340    notify: Notify,
1341    open: AtomicBool,
1342}
1343
1344impl Default for FipsMeshInbox {
1345    fn default() -> Self {
1346        Self {
1347            queue: Mutex::new(VecDeque::new()),
1348            notify: Notify::new(),
1349            open: AtomicBool::new(true),
1350        }
1351    }
1352}
1353
1354impl FipsMeshInbox {
1355    async fn push(&self, data: Vec<u8>) {
1356        self.queue.lock().await.push_back(data);
1357        self.notify.notify_waiters();
1358    }
1359
1360    async fn recv(&self) -> Option<Vec<u8>> {
1361        loop {
1362            if let Some(data) = self.queue.lock().await.pop_front() {
1363                return Some(data);
1364            }
1365            if !self.open.load(Ordering::Relaxed) {
1366                return None;
1367            }
1368            self.notify.notified().await;
1369        }
1370    }
1371
1372    fn try_recv(&self) -> Option<Vec<u8>> {
1373        let Ok(mut queue) = self.queue.try_lock() else {
1374            return None;
1375        };
1376        queue.pop_front()
1377    }
1378
1379    fn close(&self) {
1380        self.open.store(false, Ordering::Relaxed);
1381        self.notify.notify_waiters();
1382    }
1383}
1384
1385struct FipsMeshSignaling<S: Store + Send + Sync + 'static> {
1386    peer_id: String,
1387    transport: Arc<HashtreeFipsTransport<S>>,
1388    rx: Mutex<mpsc::UnboundedReceiver<SignalingMessage>>,
1389    connected: AtomicBool,
1390}
1391
1392#[async_trait]
1393impl<S: Store + Send + Sync + 'static> SignalingTransport for FipsMeshSignaling<S> {
1394    async fn connect(&self, _relays: &[String]) -> Result<(), TransportError> {
1395        self.connected.store(true, Ordering::Relaxed);
1396        Ok(())
1397    }
1398
1399    async fn disconnect(&self) {
1400        self.connected.store(false, Ordering::Relaxed);
1401    }
1402
1403    async fn publish(&self, msg: SignalingMessage) -> Result<(), TransportError> {
1404        if !self.connected.load(Ordering::Relaxed) {
1405            return Err(TransportError::NotConnected);
1406        }
1407        let payload = serde_json::to_vec(&msg)
1408            .map_err(|error| TransportError::SendFailed(error.to_string()))?;
1409        if let Some(peer_id) = msg.target_peer_id() {
1410            self.transport
1411                .send_app_message(peer_id, FIPS_MESH_SIGNALING_TOPIC, payload)
1412                .await
1413                .map_err(|error| TransportError::SendFailed(error.to_string()))
1414        } else {
1415            self.transport
1416                .broadcast_app_message(FIPS_MESH_SIGNALING_TOPIC, payload)
1417                .await
1418                .map(|_| ())
1419                .map_err(|error| TransportError::SendFailed(error.to_string()))
1420        }
1421    }
1422
1423    async fn recv(&self) -> Option<SignalingMessage> {
1424        self.rx.lock().await.recv().await
1425    }
1426
1427    fn try_recv(&self) -> Option<SignalingMessage> {
1428        let Ok(mut rx) = self.rx.try_lock() else {
1429            return None;
1430        };
1431        rx.try_recv().ok()
1432    }
1433
1434    fn peer_id(&self) -> &str {
1435        &self.peer_id
1436    }
1437}
1438
1439struct FipsMeshLinkFactory<S: Store + Send + Sync + 'static> {
1440    transport: Arc<HashtreeFipsTransport<S>>,
1441    hub: FipsMeshLinkHub,
1442}
1443
1444impl<S: Store + Send + Sync + 'static> FipsMeshLinkFactory<S> {
1445    async fn link_for(&self, peer_id: &str) -> Arc<dyn PeerLink> {
1446        Arc::new(FipsMeshPeerLink {
1447            peer_id: peer_id.to_string(),
1448            transport: self.transport.clone(),
1449            inbox: self.hub.inbox(peer_id).await,
1450        })
1451    }
1452}
1453
1454#[async_trait]
1455impl<S: Store + Send + Sync + 'static> PeerLinkFactory for FipsMeshLinkFactory<S> {
1456    async fn create_offer(
1457        &self,
1458        target_peer_id: &str,
1459    ) -> Result<(Arc<dyn PeerLink>, String), TransportError> {
1460        Ok((self.link_for(target_peer_id).await, "fips-mesh-v1".into()))
1461    }
1462
1463    async fn accept_offer(
1464        &self,
1465        from_peer_id: &str,
1466        _offer_sdp: &str,
1467    ) -> Result<(Arc<dyn PeerLink>, String), TransportError> {
1468        Ok((self.link_for(from_peer_id).await, "fips-mesh-v1".into()))
1469    }
1470
1471    async fn handle_answer(
1472        &self,
1473        target_peer_id: &str,
1474        _answer_sdp: &str,
1475    ) -> Result<Arc<dyn PeerLink>, TransportError> {
1476        Ok(self.link_for(target_peer_id).await)
1477    }
1478}
1479
1480struct FipsMeshPeerLink<S: Store + Send + Sync + 'static> {
1481    peer_id: String,
1482    transport: Arc<HashtreeFipsTransport<S>>,
1483    inbox: Arc<FipsMeshInbox>,
1484}
1485
1486#[async_trait]
1487impl<S: Store + Send + Sync + 'static> PeerLink for FipsMeshPeerLink<S> {
1488    async fn send(&self, data: Vec<u8>) -> Result<(), TransportError> {
1489        self.transport
1490            .send_app_message(&self.peer_id, FIPS_MESH_DATA_TOPIC, data)
1491            .await
1492            .map_err(|error| TransportError::SendFailed(error.to_string()))
1493    }
1494
1495    async fn recv(&self) -> Option<Vec<u8>> {
1496        self.inbox.recv().await
1497    }
1498
1499    fn try_recv(&self) -> Option<Vec<u8>> {
1500        self.inbox.try_recv()
1501    }
1502
1503    fn is_open(&self) -> bool {
1504        self.inbox.open.load(Ordering::Relaxed)
1505    }
1506
1507    async fn close(&self) {
1508        self.inbox.close();
1509    }
1510}
1511
1512fn spawn_fips_mesh_demux<S: Store + Send + Sync + 'static>(
1513    transport: Arc<HashtreeFipsTransport<S>>,
1514    peer_id: String,
1515    hub: FipsMeshLinkHub,
1516    signaling_tx: mpsc::UnboundedSender<SignalingMessage>,
1517) -> JoinHandle<()> {
1518    let mut app_messages = transport.subscribe_app_messages();
1519    tokio::spawn(async move {
1520        loop {
1521            let Ok(message) = app_messages.recv().await else {
1522                break;
1523            };
1524            match message.topic.as_str() {
1525                FIPS_MESH_SIGNALING_TOPIC => {
1526                    let Ok(signal) = serde_json::from_slice::<SignalingMessage>(&message.data)
1527                    else {
1528                        continue;
1529                    };
1530                    if signal.peer_id() != peer_id
1531                        && (signal.target_peer_id().is_none()
1532                            || signal.target_peer_id() == Some(peer_id.as_str()))
1533                    {
1534                        let _ = signaling_tx.send(signal);
1535                    }
1536                }
1537                FIPS_MESH_DATA_TOPIC => {
1538                    hub.push(&message.peer_id, message.data).await;
1539                }
1540                _ => {}
1541            }
1542        }
1543    })
1544}
1545
1546fn spawn_fips_mesh_pump<S: Store + Send + Sync + 'static>(
1547    store: Arc<FipsMeshStore<S>>,
1548    signaling: Arc<FipsMeshSignaling<S>>,
1549) -> JoinHandle<()> {
1550    tokio::spawn(async move {
1551        let mut pump = tokio::time::interval(FIPS_MESH_PUMP_INTERVAL);
1552        pump.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1553        let mut hello = tokio::time::interval(FIPS_MESH_HELLO_INTERVAL);
1554        hello.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1555        loop {
1556            tokio::select! {
1557                signal = signaling.recv() => {
1558                    let Some(signal) = signal else {
1559                        break;
1560                    };
1561                    let _ = store.process_signaling(signal).await;
1562                }
1563                _ = pump.tick() => {
1564                    store.drain_available_data_messages().await;
1565                }
1566                _ = hello.tick() => {
1567                    let _ = store.send_hello().await;
1568                }
1569            }
1570        }
1571    })
1572}
1573
1574#[async_trait]
1575impl<S: Store + Send + Sync + 'static> Store for HashtreeFipsTransport<S> {
1576    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
1577        self.local_store.put(hash, data).await
1578    }
1579
1580    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
1581        if let Some(data) = self.local_store.get(hash).await? {
1582            if verify_hash(&data, hash) {
1583                return Ok(Some(data));
1584            }
1585        }
1586        let peers = self.peer_ids().await;
1587        self.get_from_peers(hash, &peers)
1588            .await
1589            .map_err(|err| StoreError::Other(err.to_string()))
1590    }
1591
1592    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
1593        self.local_store.has(hash).await
1594    }
1595
1596    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
1597        self.local_store.delete(hash).await
1598    }
1599}
1600
1601#[cfg(test)]
1602mod tests {
1603    use super::*;
1604    use std::sync::atomic::{AtomicUsize, Ordering};
1605    use tokio::sync::mpsc;
1606
1607    struct FakeEndpoint {
1608        id: String,
1609        network: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<FipsEndpointPacket>>>>,
1610        rx: Mutex<mpsc::UnboundedReceiver<FipsEndpointPacket>>,
1611        configured_peers: Mutex<Vec<String>>,
1612        configured_peer_configs: Mutex<Vec<FipsPeerConfig>>,
1613        peer_statuses: Mutex<Vec<FipsPeerStatus>>,
1614        sent: AtomicUsize,
1615        drop_next: AtomicUsize,
1616    }
1617
1618    impl FakeEndpoint {
1619        async fn new(
1620            id: &str,
1621            network: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<FipsEndpointPacket>>>>,
1622        ) -> Arc<Self> {
1623            let (tx, rx) = mpsc::unbounded_channel();
1624            network.lock().await.insert(id.to_string(), tx);
1625            Arc::new(Self {
1626                id: id.to_string(),
1627                network,
1628                rx: Mutex::new(rx),
1629                configured_peers: Mutex::new(Vec::new()),
1630                configured_peer_configs: Mutex::new(Vec::new()),
1631                peer_statuses: Mutex::new(Vec::new()),
1632                sent: AtomicUsize::new(0),
1633                drop_next: AtomicUsize::new(0),
1634            })
1635        }
1636
1637        fn sent_count(&self) -> usize {
1638            self.sent.load(Ordering::Relaxed)
1639        }
1640
1641        fn drop_next_sends(&self, count: usize) {
1642            self.drop_next.store(count, Ordering::Relaxed);
1643        }
1644    }
1645
1646    #[async_trait]
1647    impl FipsEndpointIo for FakeEndpoint {
1648        async fn send(&self, peer_id: &str, data: Vec<u8>) -> Result<(), FipsTransportError> {
1649            self.sent.fetch_add(1, Ordering::Relaxed);
1650            if self
1651                .drop_next
1652                .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| {
1653                    if value > 0 {
1654                        Some(value - 1)
1655                    } else {
1656                        None
1657                    }
1658                })
1659                .is_ok()
1660            {
1661                return Ok(());
1662            }
1663            let tx = self
1664                .network
1665                .lock()
1666                .await
1667                .get(peer_id)
1668                .cloned()
1669                .ok_or_else(|| FipsTransportError::Send(format!("unknown peer {peer_id}")))?;
1670            tx.send(FipsEndpointPacket {
1671                peer_id: self.id.clone(),
1672                data,
1673            })
1674            .map_err(|_| FipsTransportError::Send("receiver closed".to_string()))
1675        }
1676
1677        async fn recv(&self) -> Option<FipsEndpointPacket> {
1678            self.rx.lock().await.recv().await
1679        }
1680
1681        async fn set_peer_ids(&self, peer_ids: Vec<String>) -> Result<(), FipsTransportError> {
1682            *self.configured_peers.lock().await = peer_ids;
1683            Ok(())
1684        }
1685
1686        async fn set_peer_configs(
1687            &self,
1688            peer_configs: Vec<FipsPeerConfig>,
1689        ) -> Result<(), FipsTransportError> {
1690            *self.configured_peers.lock().await =
1691                peer_configs.iter().map(|peer| peer.npub.clone()).collect();
1692            *self.configured_peer_configs.lock().await = peer_configs;
1693            Ok(())
1694        }
1695
1696        async fn peer_ids(&self) -> Vec<String> {
1697            let configured = self.configured_peers.lock().await.clone();
1698            if !configured.is_empty() {
1699                return configured;
1700            }
1701            self.network
1702                .lock()
1703                .await
1704                .keys()
1705                .filter(|id| *id != &self.id)
1706                .cloned()
1707                .collect()
1708        }
1709
1710        async fn peer_statuses(&self) -> Vec<FipsPeerStatus> {
1711            self.peer_statuses.lock().await.clone()
1712        }
1713
1714        fn local_peer_id(&self) -> Option<String> {
1715            Some(self.id.clone())
1716        }
1717    }
1718
1719    fn hash(data: &[u8]) -> Hash {
1720        let digest = Sha256::digest(data);
1721        let mut hash = [0u8; 32];
1722        hash.copy_from_slice(&digest);
1723        hash
1724    }
1725
1726    #[test]
1727    fn endpoint_config_uses_reply_learned_routing_for_mesh_fallback() {
1728        let config = fips_endpoint_config(FipsEndpointOptions::new("nsec1example"), "test-scope");
1729
1730        assert_eq!(config.node.routing.mode, RoutingMode::ReplyLearned);
1731    }
1732
1733    #[test]
1734    fn endpoint_config_scopes_nostr_discovery_app() {
1735        let config = fips_endpoint_config(
1736            FipsEndpointOptions::new("nsec1example"),
1737            "iris-drive-v1:private-owner",
1738        );
1739
1740        assert_eq!(
1741            config.node.discovery.nostr.app,
1742            "iris-drive-v1:private-owner"
1743        );
1744    }
1745
1746    #[test]
1747    fn endpoint_config_caps_total_peer_fanout() {
1748        let mut options = FipsEndpointOptions::new("nsec1example");
1749        options.webrtc_max_connections = 9;
1750        let config = fips_endpoint_config(options, "test-scope");
1751
1752        assert_eq!(config.node.limits.max_peers, 9);
1753        assert_eq!(config.node.limits.max_links, 18);
1754        assert_eq!(config.node.limits.max_connections, 18);
1755        assert_eq!(config.node.limits.max_pending_inbound, 36);
1756    }
1757
1758    #[test]
1759    fn transport_tagged_peer_addresses_are_preserved_for_fips() {
1760        let udp = peer_address_from_configured_addr(" udp:10.44.1.2:22121 ").unwrap();
1761        let tcp = peer_address_from_configured_addr("tcp:203.0.113.9:443").unwrap();
1762        let bare = peer_address_from_configured_addr("10.44.1.3:22121").unwrap();
1763
1764        assert_eq!(udp.transport, "udp");
1765        assert_eq!(udp.addr, "10.44.1.2:22121");
1766        assert_eq!(tcp.transport, "tcp");
1767        assert_eq!(tcp.addr, "203.0.113.9:443");
1768        assert_eq!(bare.transport, "udp");
1769        assert_eq!(bare.addr, "10.44.1.3:22121");
1770    }
1771
1772    #[tokio::test]
1773    async fn set_peers_configures_underlying_fips_endpoint() {
1774        let network = Arc::new(Mutex::new(HashMap::new()));
1775        let endpoint = FakeEndpoint::new("local", network).await;
1776        let transport = HashtreeFipsTransport::new(endpoint.clone(), Arc::new(MemoryStore::new()));
1777
1778        transport
1779            .set_peers(vec![
1780                "remote".to_string(),
1781                "local".to_string(),
1782                "remote".to_string(),
1783                "  ".to_string(),
1784            ])
1785            .await;
1786
1787        assert_eq!(
1788            endpoint.configured_peers.lock().await.as_slice(),
1789            &["remote".to_string()]
1790        );
1791        assert_eq!(transport.configured_peer_ids().await, vec!["remote"]);
1792    }
1793
1794    #[tokio::test]
1795    async fn set_peer_configs_preserves_static_udp_addresses() {
1796        let network = Arc::new(Mutex::new(HashMap::new()));
1797        let endpoint = FakeEndpoint::new("local", network).await;
1798        let transport = HashtreeFipsTransport::new(endpoint.clone(), Arc::new(MemoryStore::new()));
1799
1800        transport
1801            .set_peer_configs(vec![
1802                FipsPeerConfig {
1803                    npub: " remote ".to_string(),
1804                    udp_addresses: vec![" 10.44.1.2:22121 ".to_string(), " ".to_string()],
1805                },
1806                FipsPeerConfig {
1807                    npub: "local".to_string(),
1808                    udp_addresses: vec!["10.44.1.1:22121".to_string()],
1809                },
1810            ])
1811            .await;
1812
1813        assert_eq!(
1814            endpoint.configured_peer_configs.lock().await.as_slice(),
1815            &[FipsPeerConfig {
1816                npub: "remote".to_string(),
1817                udp_addresses: vec!["10.44.1.2:22121".to_string()],
1818            }]
1819        );
1820        assert_eq!(transport.configured_peer_ids().await, vec!["remote"]);
1821    }
1822
1823    #[tokio::test]
1824    async fn routing_only_peers_are_configured_but_not_application_peers() {
1825        let network = Arc::new(Mutex::new(HashMap::new()));
1826        let endpoint = FakeEndpoint::new("local", network).await;
1827        let transport = HashtreeFipsTransport::new(endpoint.clone(), Arc::new(MemoryStore::new()));
1828
1829        transport
1830            .set_peer_configs_with_routing_peers(
1831                vec![FipsPeerConfig {
1832                    npub: "device".to_string(),
1833                    udp_addresses: vec!["10.44.1.2:22121".to_string()],
1834                }],
1835                vec![FipsPeerConfig {
1836                    npub: "bootstrap".to_string(),
1837                    udp_addresses: vec!["udp:203.0.113.7:2121".to_string()],
1838                }],
1839            )
1840            .await;
1841
1842        assert_eq!(
1843            endpoint.configured_peers.lock().await.as_slice(),
1844            &["device".to_string(), "bootstrap".to_string()]
1845        );
1846        assert_eq!(transport.configured_peer_ids().await, vec!["device"]);
1847        assert_eq!(transport.peer_ids().await, vec!["device"]);
1848    }
1849
1850    #[tokio::test]
1851    async fn empty_application_peer_set_does_not_fall_back_to_routing_peers() {
1852        let network = Arc::new(Mutex::new(HashMap::new()));
1853        let endpoint = FakeEndpoint::new("local", network).await;
1854        let transport = HashtreeFipsTransport::new(endpoint.clone(), Arc::new(MemoryStore::new()));
1855
1856        transport
1857            .set_peer_configs_with_routing_peers(
1858                Vec::new(),
1859                vec![FipsPeerConfig {
1860                    npub: "bootstrap".to_string(),
1861                    udp_addresses: vec!["udp:203.0.113.7:2121".to_string()],
1862                }],
1863            )
1864            .await;
1865
1866        assert_eq!(
1867            endpoint.configured_peers.lock().await.as_slice(),
1868            &["bootstrap".to_string()]
1869        );
1870        assert!(transport.configured_peer_ids().await.is_empty());
1871        assert!(transport.peer_ids().await.is_empty());
1872        assert_eq!(transport.connected_peer_ids().await, vec!["bootstrap"]);
1873    }
1874
1875    #[tokio::test]
1876    async fn peer_statuses_expose_fips_endpoint_latency_snapshot() {
1877        let network = Arc::new(Mutex::new(HashMap::new()));
1878        let endpoint = FakeEndpoint::new("local", network).await;
1879        *endpoint.peer_statuses.lock().await = vec![FipsPeerStatus {
1880            npub: "remote".to_string(),
1881            transport_addr: Some("udp:10.44.1.2:2121".to_string()),
1882            transport_type: Some("udp".to_string()),
1883            srtt_ms: Some(23),
1884            packets_sent: 5,
1885            packets_recv: 7,
1886            bytes_sent: 512,
1887            bytes_recv: 1024,
1888        }];
1889        let transport = HashtreeFipsTransport::new(endpoint, Arc::new(MemoryStore::new()));
1890
1891        assert_eq!(
1892            transport.peer_statuses().await,
1893            vec![FipsPeerStatus {
1894                npub: "remote".to_string(),
1895                transport_addr: Some("udp:10.44.1.2:2121".to_string()),
1896                transport_type: Some("udp".to_string()),
1897                srtt_ms: Some(23),
1898                packets_sent: 5,
1899                packets_recv: 7,
1900                bytes_sent: 512,
1901                bytes_recv: 1024,
1902            }]
1903        );
1904    }
1905
1906    #[tokio::test]
1907    async fn fetches_hash_verified_blob_over_fips_endpoint_bytes() {
1908        let network = Arc::new(Mutex::new(HashMap::new()));
1909        let endpoint_a = FakeEndpoint::new("a", network.clone()).await;
1910        let endpoint_b = FakeEndpoint::new("b", network).await;
1911        let data = b"hashtree over fips".to_vec();
1912        let hash = hash(&data);
1913        let store_a = Arc::new(MemoryStore::new());
1914        let store_b = Arc::new(MemoryStore::new());
1915        store_a.put(hash, data.clone()).await.unwrap();
1916
1917        let transport_a = Arc::new(HashtreeFipsTransport::new(endpoint_a, store_a));
1918        let transport_b = Arc::new(
1919            HashtreeFipsTransport::new(endpoint_b, store_b.clone())
1920                .with_request_timeout(Duration::from_millis(100)),
1921        );
1922        transport_a.start();
1923        transport_b.start();
1924        transport_b.set_peers(vec!["a".to_string()]).await;
1925
1926        assert_eq!(transport_b.get(&hash).await.unwrap(), Some(data.clone()));
1927        assert_eq!(store_b.get(&hash).await.unwrap(), Some(data));
1928    }
1929
1930    #[tokio::test]
1931    async fn fetches_fragmented_blob_over_fips_endpoint_bytes() {
1932        let network = Arc::new(Mutex::new(HashMap::new()));
1933        let endpoint_a = FakeEndpoint::new("a", network.clone()).await;
1934        let endpoint_b = FakeEndpoint::new("b", network).await;
1935        let data = (0..(FIPS_RESPONSE_FRAGMENT_SIZE * 2 + 17))
1936            .map(|index| (index % 251) as u8)
1937            .collect::<Vec<_>>();
1938        let hash = hash(&data);
1939        let store_a = Arc::new(MemoryStore::new());
1940        let store_b = Arc::new(MemoryStore::new());
1941        store_a.put(hash, data.clone()).await.unwrap();
1942
1943        let transport_a = Arc::new(HashtreeFipsTransport::new(endpoint_a, store_a));
1944        let transport_b = Arc::new(
1945            HashtreeFipsTransport::new(endpoint_b, store_b.clone())
1946                .with_request_timeout(Duration::from_millis(100)),
1947        );
1948        transport_a.start();
1949        transport_b.start();
1950        transport_b.set_peers(vec!["a".to_string()]).await;
1951
1952        assert_eq!(transport_b.get(&hash).await.unwrap(), Some(data.clone()));
1953        assert_eq!(store_b.get(&hash).await.unwrap(), Some(data));
1954    }
1955
1956    #[tokio::test]
1957    async fn delivers_fragmented_app_messages_over_fips_endpoint_bytes() {
1958        let network = Arc::new(Mutex::new(HashMap::new()));
1959        let endpoint_a = FakeEndpoint::new("a", network.clone()).await;
1960        let endpoint_b = FakeEndpoint::new("b", network).await;
1961        let transport_a = Arc::new(HashtreeFipsTransport::new(
1962            endpoint_a.clone(),
1963            Arc::new(MemoryStore::new()),
1964        ));
1965        let transport_b = Arc::new(HashtreeFipsTransport::new(
1966            endpoint_b,
1967            Arc::new(MemoryStore::new()),
1968        ));
1969        let mut app_messages = transport_b.subscribe_app_messages();
1970        transport_a.start();
1971        transport_b.start();
1972
1973        let data = (0..(FIPS_APP_FRAGMENT_SIZE * 3 + 19))
1974            .map(|index| (index % 251) as u8)
1975            .collect::<Vec<_>>();
1976        transport_a
1977            .send_app_message("b", "iris-drive/root/frame/v1", data.clone())
1978            .await
1979            .unwrap();
1980
1981        let message = timeout(Duration::from_millis(100), app_messages.recv())
1982            .await
1983            .unwrap()
1984            .unwrap();
1985        assert_eq!(message.peer_id, "a");
1986        assert_eq!(message.topic, "iris-drive/root/frame/v1");
1987        assert_eq!(message.data, data);
1988        assert!(endpoint_a.sent_count() > 1);
1989    }
1990
1991    #[tokio::test]
1992    async fn can_deliver_unconfigured_app_messages_without_serving_blocks() {
1993        let network = Arc::new(Mutex::new(HashMap::new()));
1994        let endpoint_a = FakeEndpoint::new("a", network.clone()).await;
1995        let endpoint_b = FakeEndpoint::new("b", network).await;
1996        let data = b"app-only peer".to_vec();
1997        let hash = hash(&data);
1998        let store_a = Arc::new(MemoryStore::new());
1999        let store_b = Arc::new(MemoryStore::new());
2000        store_b.put(hash, data.clone()).await.unwrap();
2001        let transport_a = Arc::new(
2002            HashtreeFipsTransport::new(endpoint_a, store_a)
2003                .with_request_timeout(Duration::from_millis(50)),
2004        );
2005        let transport_b = Arc::new(
2006            HashtreeFipsTransport::new(endpoint_b, store_b)
2007                .with_unconfigured_app_message_topics(["iris-drive/device-link/v1/request"]),
2008        );
2009        let mut app_messages = transport_b.subscribe_app_messages();
2010        transport_a.start();
2011        transport_b.start();
2012        transport_b
2013            .set_peers(vec!["configured-peer".to_string()])
2014            .await;
2015
2016        transport_a
2017            .send_app_message("b", "iris-drive/device-link/v1/request", b"join".to_vec())
2018            .await
2019            .unwrap();
2020
2021        let message = timeout(Duration::from_millis(100), app_messages.recv())
2022            .await
2023            .unwrap()
2024            .unwrap();
2025        assert_eq!(message.peer_id, "a");
2026        assert_eq!(message.topic, "iris-drive/device-link/v1/request");
2027        assert_eq!(message.data, b"join");
2028        assert_eq!(
2029            transport_a
2030                .get_from_peers(&hash, &["b".to_string()])
2031                .await
2032                .unwrap(),
2033            None
2034        );
2035    }
2036
2037    #[tokio::test]
2038    async fn mesh_pubsub_delivers_over_fips_endpoint_bytes() {
2039        let network = Arc::new(Mutex::new(HashMap::new()));
2040        let endpoint_a = FakeEndpoint::new("a", network.clone()).await;
2041        let endpoint_b = FakeEndpoint::new("b", network).await;
2042        let store_a = Arc::new(MemoryStore::new());
2043        let store_b = Arc::new(MemoryStore::new());
2044        let transport_a = Arc::new(HashtreeFipsTransport::new(endpoint_a, store_a.clone()));
2045        let transport_b = Arc::new(HashtreeFipsTransport::new(endpoint_b, store_b.clone()));
2046        transport_a.set_peers(vec!["b".to_string()]).await;
2047        transport_b.set_peers(vec!["a".to_string()]).await;
2048        transport_a.start();
2049        transport_b.start();
2050
2051        let mesh_a = transport_a
2052            .start_mesh_pubsub(store_a, "a".to_string(), Duration::from_millis(200))
2053            .await
2054            .unwrap();
2055        let mesh_b = transport_b
2056            .start_mesh_pubsub(store_b, "b".to_string(), Duration::from_millis(200))
2057            .await
2058            .unwrap();
2059        mesh_b.subscribe_pubsub("iris-drive/root-events/test").await;
2060
2061        let payload = vec![0x42; 4096];
2062        let delivered = timeout(Duration::from_secs(2), async {
2063            let mut seq = 1u64;
2064            loop {
2065                mesh_a
2066                    .publish_pubsub("iris-drive/root-events/test", seq, payload.clone())
2067                    .await;
2068                seq += 1;
2069                tokio::time::sleep(Duration::from_millis(25)).await;
2070                let events = mesh_b.drain_pubsub_events().await;
2071                if let Some(event) = events.into_iter().next() {
2072                    break event;
2073                }
2074            }
2075        })
2076        .await
2077        .unwrap();
2078
2079        assert_eq!(delivered.stream_id, "iris-drive/root-events/test");
2080        assert_eq!(delivered.origin_peer_id, "a");
2081        assert_eq!(delivered.payload, payload);
2082        assert_eq!(mesh_a.peer_ids().await, vec!["b"]);
2083        assert_eq!(mesh_b.peer_ids().await, vec!["a"]);
2084    }
2085
2086    #[tokio::test(start_paused = true)]
2087    async fn silence_resolves_unknown_without_retrying_same_peer() {
2088        let network = Arc::new(Mutex::new(HashMap::new()));
2089        let endpoint_a = FakeEndpoint::new("a", network.clone()).await;
2090        let endpoint_b = FakeEndpoint::new("b", network).await;
2091        let missing = [7u8; 32];
2092        let transport_a = Arc::new(HashtreeFipsTransport::new(
2093            endpoint_a,
2094            Arc::new(MemoryStore::new()),
2095        ));
2096        let transport_b = Arc::new(
2097            HashtreeFipsTransport::new(endpoint_b.clone(), Arc::new(MemoryStore::new()))
2098                .with_request_timeout(Duration::from_millis(25)),
2099        );
2100        transport_a.start();
2101        transport_b.start();
2102        transport_b.set_peers(vec!["a".to_string()]).await;
2103
2104        let pending = transport_b.get(&missing);
2105        tokio::time::advance(Duration::from_millis(30)).await;
2106
2107        assert_eq!(pending.await.unwrap(), None);
2108        assert_eq!(endpoint_b.sent_count(), 1);
2109        assert!(
2110            transport_b.pending.lock().await.is_empty(),
2111            "timed-out requests should not leave stale pending senders"
2112        );
2113    }
2114
2115    #[tokio::test(start_paused = true)]
2116    async fn retries_dropped_request_to_same_peer() {
2117        let network = Arc::new(Mutex::new(HashMap::new()));
2118        let endpoint_a = FakeEndpoint::new("a", network.clone()).await;
2119        let endpoint_b = FakeEndpoint::new("b", network).await;
2120        let data = b"retried request".to_vec();
2121        let hash = hash(&data);
2122        let store_a = Arc::new(MemoryStore::new());
2123        store_a.put(hash, data.clone()).await.unwrap();
2124        endpoint_b.drop_next_sends(1);
2125        let transport_a = Arc::new(HashtreeFipsTransport::new(endpoint_a, store_a));
2126        let transport_b = Arc::new(
2127            HashtreeFipsTransport::new(endpoint_b.clone(), Arc::new(MemoryStore::new()))
2128                .with_request_timeout(Duration::from_millis(300))
2129                .with_request_retry_interval(Duration::from_millis(50)),
2130        );
2131        transport_a.start();
2132        transport_b.start();
2133        transport_b.set_peers(vec!["a".to_string()]).await;
2134
2135        let pending = transport_b.get(&hash);
2136        tokio::time::advance(Duration::from_millis(60)).await;
2137
2138        assert_eq!(pending.await.unwrap(), Some(data));
2139        assert_eq!(endpoint_b.sent_count(), 2);
2140    }
2141}