1use async_trait::async_trait;
8use fips_core::config::{NostrDiscoveryPolicy, PeerAddress, RoutingMode, TransportInstances};
9use hashtree_core::{Hash, MemoryStore, Store, StoreError};
10pub use hashtree_network::PubsubPublishStats;
11use hashtree_network::{
12 transport::{PeerLink, PeerLinkFactory, SignalingTransport, TransportError},
13 MeshRouter, MeshRoutingConfig, MeshStoreCore, PoolSettings, PubsubDeliveryMode,
14 SignalingMessage,
15};
16use serde::{Deserialize, Serialize};
17use sha2::{Digest, Sha256};
18use std::collections::{HashMap, VecDeque};
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::sync::Arc;
21use thiserror::Error;
22use tokio::sync::{broadcast, mpsc, oneshot, Mutex, Notify, RwLock};
23use tokio::task::JoinHandle;
24use tokio::time::{timeout, Duration};
25
26pub const DEFAULT_FIPS_DISCOVERY_SCOPE: &str = "hashtree-v1";
27pub const DEFAULT_FIPS_REQUEST_TIMEOUT: Duration = Duration::from_millis(5_500);
28pub const DEFAULT_FIPS_REQUEST_RETRY_INTERVAL: Duration = Duration::from_millis(750);
29pub const DEFAULT_FIPS_REQUEST_MAX_ATTEMPTS: usize = 4;
30pub const FIPS_RESPONSE_FRAGMENT_SIZE: usize = 1024;
31pub const FIPS_APP_FRAGMENT_SIZE: usize = 768;
32pub const MAX_HTL: u8 = 10;
33pub const DEFAULT_FIPS_WEBRTC_MAX_CONNECTIONS: usize = 512;
34
35const MSG_TYPE_REQUEST: u8 = 0x00;
36const MSG_TYPE_RESPONSE: u8 = 0x01;
37const MSG_TYPE_APP: u8 = 0x7f;
38const MAX_RESPONSE_FRAGMENTS: u32 = 16_384;
39const FIPS_MESH_SIGNALING_TOPIC: &str = "hashtree/fips/mesh/signaling/v1";
40const FIPS_MESH_DATA_TOPIC: &str = "hashtree/fips/mesh/data/v1";
41const FIPS_MESH_PUMP_INTERVAL: Duration = Duration::from_millis(20);
42const FIPS_MESH_HELLO_INTERVAL: Duration = Duration::from_secs(2);
43
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub struct FipsEndpointPacket {
46 pub peer_id: String,
47 pub data: Vec<u8>,
48}
49
50#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
51pub struct FipsRelayStatus {
52 pub url: String,
53 pub status: String,
54}
55
56#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
57pub struct FipsPeerStatus {
58 pub npub: String,
59 pub transport_addr: Option<String>,
60 pub transport_type: Option<String>,
61 pub srtt_ms: Option<u64>,
62 pub packets_sent: u64,
63 pub packets_recv: u64,
64 pub bytes_sent: u64,
65 pub bytes_recv: u64,
66}
67
68#[derive(Debug, Error)]
69pub enum FipsTransportError {
70 #[error("endpoint failed: {0}")]
71 Endpoint(String),
72 #[error("endpoint send failed: {0}")]
73 Send(String),
74 #[error("wire decode failed: {0}")]
75 Wire(String),
76 #[error("store error: {0}")]
77 Store(#[from] StoreError),
78}
79
80#[derive(Debug, Clone, PartialEq, Eq)]
81pub struct FipsAppMessage {
82 pub peer_id: String,
83 pub topic: String,
84 pub data: Vec<u8>,
85}
86
87#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct FipsPeerConfig {
89 pub npub: String,
90 pub udp_addresses: Vec<String>,
91}
92
93impl FipsPeerConfig {
94 pub fn new(npub: impl Into<String>) -> Self {
95 Self {
96 npub: npub.into(),
97 udp_addresses: Vec::new(),
98 }
99 }
100}
101
102#[async_trait]
103pub trait FipsEndpointIo: Send + Sync {
104 async fn send(&self, peer_id: &str, data: Vec<u8>) -> Result<(), FipsTransportError>;
105 async fn recv(&self) -> Option<FipsEndpointPacket>;
106 async fn set_peer_configs(
107 &self,
108 peer_configs: Vec<FipsPeerConfig>,
109 ) -> Result<(), FipsTransportError> {
110 self.set_peer_ids(peer_configs.into_iter().map(|peer| peer.npub).collect())
111 .await
112 }
113 async fn set_peer_ids(&self, _peer_ids: Vec<String>) -> Result<(), FipsTransportError> {
114 Ok(())
115 }
116 async fn peer_ids(&self) -> Vec<String> {
117 Vec::new()
118 }
119 async fn peer_statuses(&self) -> Vec<FipsPeerStatus> {
120 Vec::new()
121 }
122 async fn relay_statuses(&self) -> Vec<FipsRelayStatus> {
123 Vec::new()
124 }
125 fn local_peer_id(&self) -> Option<String> {
126 None
127 }
128}
129
130#[derive(Debug, Clone)]
131pub struct FipsEndpointOptions {
132 pub identity_nsec: String,
133 pub discovery_scope: String,
134 pub relays: Vec<String>,
135 pub enable_udp: bool,
136 pub enable_webrtc: bool,
137 pub udp_bind_addr: Option<String>,
138 pub udp_public: bool,
139 pub udp_external_addr: Option<String>,
140 pub webrtc_auto_connect: bool,
141 pub webrtc_max_connections: usize,
142 pub open_discovery_max_pending: usize,
143 pub packet_channel_capacity: usize,
144}
145
146impl FipsEndpointOptions {
147 pub fn new(identity_nsec: impl Into<String>) -> Self {
148 Self {
149 identity_nsec: identity_nsec.into(),
150 discovery_scope: DEFAULT_FIPS_DISCOVERY_SCOPE.to_string(),
151 relays: Vec::new(),
152 enable_udp: true,
153 enable_webrtc: true,
154 udp_bind_addr: None,
155 udp_public: false,
156 udp_external_addr: None,
157 webrtc_auto_connect: false,
158 webrtc_max_connections: DEFAULT_FIPS_WEBRTC_MAX_CONNECTIONS,
159 open_discovery_max_pending: 0,
160 packet_channel_capacity: 1024,
161 }
162 }
163}
164
165pub struct BoundFipsEndpoint {
166 pub endpoint: Arc<dyn FipsEndpointIo>,
167 pub local_peer_id: String,
168 pub discovery_scope: String,
169}
170
171pub async fn bind_fips_endpoint(
172 options: FipsEndpointOptions,
173) -> Result<BoundFipsEndpoint, FipsTransportError> {
174 if !options.enable_udp && !options.enable_webrtc {
175 return Err(FipsTransportError::Endpoint(
176 "at least one FIPS transport must be enabled".to_string(),
177 ));
178 }
179
180 let discovery_scope = if options.discovery_scope.trim().is_empty() {
181 DEFAULT_FIPS_DISCOVERY_SCOPE.to_string()
182 } else {
183 options.discovery_scope.trim().to_string()
184 };
185 let packet_channel_capacity = options.packet_channel_capacity;
186 let config = fips_endpoint_config(options, &discovery_scope);
187
188 let endpoint = fips_core::FipsEndpoint::builder()
189 .config(config)
190 .discovery_scope(discovery_scope.clone())
191 .without_system_tun()
192 .packet_channel_capacity(packet_channel_capacity)
193 .bind()
194 .await
195 .map_err(|err| FipsTransportError::Endpoint(err.to_string()))?;
196 let local_peer_id = endpoint.npub().to_string();
197
198 Ok(BoundFipsEndpoint {
199 endpoint: Arc::new(endpoint),
200 local_peer_id,
201 discovery_scope,
202 })
203}
204
205fn fips_endpoint_config(options: FipsEndpointOptions, discovery_scope: &str) -> fips_core::Config {
206 let mut config = fips_core::Config::new();
207 config.node.identity = fips_core::IdentityConfig {
208 nsec: Some(options.identity_nsec),
209 persistent: false,
210 };
211 config.node.routing.mode = RoutingMode::ReplyLearned;
212 config.node.limits.max_peers = options.webrtc_max_connections.max(1);
213 config.node.limits.max_links = options.webrtc_max_connections.saturating_mul(2).max(1);
214 config.node.limits.max_connections = options.webrtc_max_connections.saturating_mul(2).max(1);
215 config.node.limits.max_pending_inbound =
216 options.webrtc_max_connections.saturating_mul(4).max(1);
217 config.tun.enabled = false;
218 config.dns.enabled = false;
219 config.node.system_files_enabled = false;
220 config.node.discovery.lan.scope = Some(discovery_scope.to_string());
221 config.node.discovery.nostr.enabled = true;
222 config.node.discovery.nostr.advertise = true;
223 config.node.discovery.nostr.policy = if options.open_discovery_max_pending == 0 {
224 NostrDiscoveryPolicy::ConfiguredOnly
225 } else {
226 NostrDiscoveryPolicy::Open
227 };
228 config.node.discovery.nostr.open_discovery_max_pending = options.open_discovery_max_pending;
229 config.node.discovery.nostr.share_local_candidates = true;
230 config.node.discovery.nostr.app = discovery_scope.to_string();
231 if !options.relays.is_empty() {
232 config.node.discovery.nostr.advert_relays = options.relays.clone();
233 config.node.discovery.nostr.dm_relays = options.relays;
234 }
235
236 if options.enable_udp {
237 config.transports.udp = TransportInstances::Single(fips_core::UdpConfig {
238 bind_addr: Some(
239 options
240 .udp_bind_addr
241 .filter(|addr| !addr.trim().is_empty())
242 .unwrap_or_else(|| "0.0.0.0:0".to_string()),
243 ),
244 advertise_on_nostr: Some(true),
245 public: Some(options.udp_public),
246 external_addr: options
247 .udp_external_addr
248 .filter(|addr| !addr.trim().is_empty()),
249 outbound_only: Some(false),
250 accept_connections: Some(true),
251 ..Default::default()
252 });
253 }
254
255 if options.enable_webrtc {
256 config.transports.webrtc = TransportInstances::Single(fips_core::WebRtcConfig {
257 advertise_on_nostr: Some(true),
258 auto_connect: Some(options.webrtc_auto_connect),
259 accept_connections: Some(true),
260 max_connections: Some(options.webrtc_max_connections.max(1)),
261 ..Default::default()
262 });
263 }
264
265 config.transports.tcp = TransportInstances::Single(Default::default());
268
269 config
270}
271
272fn peer_address_from_configured_addr(raw: &str) -> Option<PeerAddress> {
273 let trimmed = raw.trim();
274 if trimmed.is_empty() {
275 return None;
276 }
277 let (transport, addr) = split_configured_transport_addr(trimmed);
278 Some(PeerAddress::new(transport, addr))
279}
280
281fn split_configured_transport_addr(value: &str) -> (&str, &str) {
282 let Some((transport, addr)) = value.split_once(':') else {
283 return ("udp", value);
284 };
285 match transport.to_ascii_lowercase().as_str() {
286 "udp" | "tcp" | "webrtc" | "tor" | "ethernet" | "ble" => (transport, addr),
287 _ => ("udp", value),
288 }
289}
290
291fn sanitize_peer_configs(
292 local: Option<&str>,
293 peers: Vec<FipsPeerConfig>,
294 seen: &mut std::collections::HashSet<String>,
295) -> Vec<FipsPeerConfig> {
296 let mut out = Vec::new();
297 for peer in peers {
298 let npub = peer.npub.trim().to_string();
299 if npub.is_empty() || Some(npub.as_str()) == local || !seen.insert(npub.clone()) {
300 continue;
301 }
302 let udp_addresses = peer
303 .udp_addresses
304 .into_iter()
305 .map(|addr| addr.trim().to_string())
306 .filter(|addr| !addr.is_empty())
307 .collect();
308 out.push(FipsPeerConfig {
309 npub,
310 udp_addresses,
311 });
312 }
313 out
314}
315
316#[async_trait]
317impl FipsEndpointIo for fips_core::FipsEndpoint {
318 async fn send(&self, peer_id: &str, data: Vec<u8>) -> Result<(), FipsTransportError> {
319 self.send(peer_id.to_string(), data)
320 .await
321 .map_err(|err| FipsTransportError::Send(err.to_string()))
322 }
323
324 async fn recv(&self) -> Option<FipsEndpointPacket> {
325 loop {
326 let message = fips_core::FipsEndpoint::recv(self).await?;
327 if let Some(peer_id) = message.source_npub {
328 return Some(FipsEndpointPacket {
329 peer_id,
330 data: message.data,
331 });
332 }
333 }
334 }
335
336 async fn peer_ids(&self) -> Vec<String> {
337 match self.peers().await {
338 Ok(peers) => peers.into_iter().map(|peer| peer.npub).collect(),
339 Err(_) => Vec::new(),
340 }
341 }
342
343 async fn peer_statuses(&self) -> Vec<FipsPeerStatus> {
344 match self.peers().await {
345 Ok(peers) => peers
346 .into_iter()
347 .map(|peer| FipsPeerStatus {
348 npub: peer.npub,
349 transport_addr: peer.transport_addr,
350 transport_type: peer.transport_type,
351 srtt_ms: peer.srtt_ms,
352 packets_sent: peer.packets_sent,
353 packets_recv: peer.packets_recv,
354 bytes_sent: peer.bytes_sent,
355 bytes_recv: peer.bytes_recv,
356 })
357 .collect(),
358 Err(_) => Vec::new(),
359 }
360 }
361
362 async fn relay_statuses(&self) -> Vec<FipsRelayStatus> {
363 match fips_core::FipsEndpoint::relay_statuses(self).await {
364 Ok(statuses) => statuses
365 .into_iter()
366 .map(|status| FipsRelayStatus {
367 url: status.url,
368 status: status.status,
369 })
370 .collect(),
371 Err(_) => Vec::new(),
372 }
373 }
374
375 async fn set_peer_ids(&self, peer_ids: Vec<String>) -> Result<(), FipsTransportError> {
376 self.set_peer_configs(
377 peer_ids
378 .into_iter()
379 .map(FipsPeerConfig::new)
380 .collect::<Vec<_>>(),
381 )
382 .await
383 }
384
385 async fn set_peer_configs(
386 &self,
387 peer_configs: Vec<FipsPeerConfig>,
388 ) -> Result<(), FipsTransportError> {
389 let peers: Vec<fips_core::config::PeerConfig> = peer_configs
390 .into_iter()
391 .map(|peer| fips_core::config::PeerConfig {
392 npub: peer.npub,
393 addresses: peer
394 .udp_addresses
395 .into_iter()
396 .filter_map(|addr| peer_address_from_configured_addr(&addr))
397 .collect(),
398 ..Default::default()
399 })
400 .collect();
401 let peer_count = peers.len();
402 match self.update_peers(peers).await {
403 Ok(outcome) => {
404 tracing::info!(
405 peer_count,
406 added = outcome.added,
407 removed = outcome.removed,
408 updated = outcome.updated,
409 unchanged = outcome.unchanged,
410 "updated FIPS endpoint peer configs"
411 );
412 Ok(())
413 }
414 Err(err) => {
415 tracing::warn!(
416 peer_count,
417 error = %err,
418 "failed to update FIPS endpoint peer configs"
419 );
420 Err(FipsTransportError::Endpoint(err.to_string()))
421 }
422 }
423 }
424
425 fn local_peer_id(&self) -> Option<String> {
426 Some(self.npub().to_string())
427 }
428}
429
430#[derive(Debug, Clone, Serialize, Deserialize)]
431struct DataRequest {
432 #[serde(with = "serde_bytes")]
433 h: Vec<u8>,
434 #[serde(default = "default_htl", skip_serializing_if = "is_max_htl")]
435 htl: u8,
436}
437
438#[derive(Debug, Clone, Serialize, Deserialize)]
439struct DataResponse {
440 #[serde(with = "serde_bytes")]
441 h: Vec<u8>,
442 #[serde(with = "serde_bytes")]
443 d: Vec<u8>,
444 #[serde(default, skip_serializing_if = "Option::is_none")]
445 i: Option<u32>,
446 #[serde(default, skip_serializing_if = "Option::is_none")]
447 n: Option<u32>,
448}
449
450#[derive(Debug, Clone, Serialize, Deserialize)]
451struct AppPacket {
452 t: String,
453 #[serde(with = "serde_bytes")]
454 d: Vec<u8>,
455 #[serde(default, skip_serializing_if = "Option::is_none")]
456 id: Option<String>,
457 #[serde(default, skip_serializing_if = "Option::is_none")]
458 i: Option<u32>,
459 #[serde(default, skip_serializing_if = "Option::is_none")]
460 n: Option<u32>,
461}
462
463enum Message {
464 Request(DataRequest),
465 Response(DataResponse),
466 App(AppPacket),
467}
468
469fn default_htl() -> u8 {
470 MAX_HTL
471}
472
473fn is_max_htl(htl: &u8) -> bool {
474 *htl == MAX_HTL
475}
476
477fn hash_key(hash: &Hash) -> String {
478 hex::encode(hash)
479}
480
481fn bytes_to_hash(bytes: &[u8]) -> Option<Hash> {
482 if bytes.len() != 32 {
483 return None;
484 }
485 let mut hash = [0u8; 32];
486 hash.copy_from_slice(bytes);
487 Some(hash)
488}
489
490fn verify_hash(data: &[u8], hash: &Hash) -> bool {
491 let digest = Sha256::digest(data);
492 digest.as_slice() == hash
493}
494
495fn remaining_until(deadline: tokio::time::Instant) -> Duration {
496 deadline
497 .checked_duration_since(tokio::time::Instant::now())
498 .unwrap_or_default()
499}
500
501fn encode_request(hash: &Hash, htl: u8) -> Result<Vec<u8>, FipsTransportError> {
502 let body = rmp_serde::to_vec_named(&DataRequest {
503 h: hash.to_vec(),
504 htl,
505 })
506 .map_err(|err| FipsTransportError::Wire(err.to_string()))?;
507 let mut out = Vec::with_capacity(1 + body.len());
508 out.push(MSG_TYPE_REQUEST);
509 out.extend(body);
510 Ok(out)
511}
512
513fn encode_response(hash: &Hash, data: &[u8]) -> Result<Vec<u8>, FipsTransportError> {
514 let body = rmp_serde::to_vec_named(&DataResponse {
515 h: hash.to_vec(),
516 d: data.to_vec(),
517 i: None,
518 n: None,
519 })
520 .map_err(|err| FipsTransportError::Wire(err.to_string()))?;
521 let mut out = Vec::with_capacity(1 + body.len());
522 out.push(MSG_TYPE_RESPONSE);
523 out.extend(body);
524 Ok(out)
525}
526
527fn encode_fragment_response(
528 hash: &Hash,
529 data: &[u8],
530 index: u32,
531 total: u32,
532) -> Result<Vec<u8>, FipsTransportError> {
533 let body = rmp_serde::to_vec_named(&DataResponse {
534 h: hash.to_vec(),
535 d: data.to_vec(),
536 i: Some(index),
537 n: Some(total),
538 })
539 .map_err(|err| FipsTransportError::Wire(err.to_string()))?;
540 let mut out = Vec::with_capacity(1 + body.len());
541 out.push(MSG_TYPE_RESPONSE);
542 out.extend(body);
543 Ok(out)
544}
545
546fn encode_app_message(topic: &str, data: &[u8]) -> Result<Vec<u8>, FipsTransportError> {
547 encode_app_packet(topic, data, None, None, None)
548}
549
550fn encode_app_packet(
551 topic: &str,
552 data: &[u8],
553 id: Option<String>,
554 index: Option<u32>,
555 total: Option<u32>,
556) -> Result<Vec<u8>, FipsTransportError> {
557 let topic = topic.trim();
558 if topic.is_empty() {
559 return Err(FipsTransportError::Wire(
560 "application message topic is empty".to_string(),
561 ));
562 }
563 let body = rmp_serde::to_vec_named(&AppPacket {
564 t: topic.to_string(),
565 d: data.to_vec(),
566 id,
567 i: index,
568 n: total,
569 })
570 .map_err(|err| FipsTransportError::Wire(err.to_string()))?;
571 let mut out = Vec::with_capacity(1 + body.len());
572 out.push(MSG_TYPE_APP);
573 out.extend(body);
574 Ok(out)
575}
576
577fn encode_app_messages(topic: &str, data: &[u8]) -> Result<Vec<Vec<u8>>, FipsTransportError> {
578 if data.len() <= FIPS_APP_FRAGMENT_SIZE {
579 return encode_app_message(topic, data).map(|packet| vec![packet]);
580 }
581
582 let total = data.len().div_ceil(FIPS_APP_FRAGMENT_SIZE);
583 if total > MAX_RESPONSE_FRAGMENTS as usize {
584 return Err(FipsTransportError::Wire(format!(
585 "application message has too many fragments: {total}"
586 )));
587 }
588
589 let mut hasher = Sha256::new();
590 hasher.update(topic.trim().as_bytes());
591 hasher.update([0]);
592 hasher.update(data);
593 let id = hex::encode(hasher.finalize());
594 let mut out = Vec::with_capacity(total);
595 for (index, chunk) in data.chunks(FIPS_APP_FRAGMENT_SIZE).enumerate() {
596 out.push(encode_app_packet(
597 topic,
598 chunk,
599 Some(id.clone()),
600 Some(index as u32),
601 Some(total as u32),
602 )?);
603 }
604 Ok(out)
605}
606
607fn parse_message(data: &[u8]) -> Result<Option<Message>, FipsTransportError> {
608 let Some((&kind, body)) = data.split_first() else {
609 return Ok(None);
610 };
611 match kind {
612 MSG_TYPE_REQUEST => rmp_serde::from_slice::<DataRequest>(body)
613 .map(|req| Some(Message::Request(req)))
614 .map_err(|err| FipsTransportError::Wire(err.to_string())),
615 MSG_TYPE_RESPONSE => rmp_serde::from_slice::<DataResponse>(body)
616 .map(|resp| Some(Message::Response(resp)))
617 .map_err(|err| FipsTransportError::Wire(err.to_string())),
618 MSG_TYPE_APP => rmp_serde::from_slice::<AppPacket>(body)
619 .map(|packet| Some(Message::App(packet)))
620 .map_err(|err| FipsTransportError::Wire(err.to_string())),
621 _ => Ok(None),
622 }
623}
624
625struct PendingRequest {
626 resolve: oneshot::Sender<Option<Vec<u8>>>,
627}
628
629struct ResponseReassembly {
630 total: u32,
631 fragments: HashMap<u32, Vec<u8>>,
632 received_bytes: usize,
633}
634
635struct AppReassembly {
636 total: u32,
637 fragments: HashMap<u32, Vec<u8>>,
638 received_bytes: usize,
639 topic: String,
640}
641
642pub struct HashtreeFipsTransport<S: Store + Send + Sync + 'static = MemoryStore> {
643 endpoint: Arc<dyn FipsEndpointIo>,
644 local_store: Arc<S>,
645 peers: Arc<RwLock<Vec<String>>>,
646 peer_filter_configured: Arc<RwLock<bool>>,
647 unconfigured_app_message_topics: Vec<String>,
648 pending: Arc<Mutex<HashMap<String, Vec<PendingRequest>>>>,
649 response_fragments: Arc<Mutex<HashMap<String, ResponseReassembly>>>,
650 app_fragments: Arc<Mutex<HashMap<String, AppReassembly>>>,
651 app_messages: broadcast::Sender<FipsAppMessage>,
652 request_timeout: Duration,
653 request_retry_interval: Duration,
654 request_max_attempts: usize,
655 request_htl: u8,
656 cache_responses: bool,
657}
658
659impl HashtreeFipsTransport<MemoryStore> {
660 pub fn in_memory(endpoint: Arc<dyn FipsEndpointIo>) -> Self {
661 Self::new(endpoint, Arc::new(MemoryStore::new()))
662 }
663}
664
665impl<S: Store + Send + Sync + 'static> HashtreeFipsTransport<S> {
666 pub fn new(endpoint: Arc<dyn FipsEndpointIo>, local_store: Arc<S>) -> Self {
667 let (app_messages, _) = broadcast::channel(256);
668 Self {
669 endpoint,
670 local_store,
671 peers: Arc::new(RwLock::new(Vec::new())),
672 peer_filter_configured: Arc::new(RwLock::new(false)),
673 unconfigured_app_message_topics: Vec::new(),
674 pending: Arc::new(Mutex::new(HashMap::new())),
675 response_fragments: Arc::new(Mutex::new(HashMap::new())),
676 app_fragments: Arc::new(Mutex::new(HashMap::new())),
677 app_messages,
678 request_timeout: DEFAULT_FIPS_REQUEST_TIMEOUT,
679 request_retry_interval: DEFAULT_FIPS_REQUEST_RETRY_INTERVAL,
680 request_max_attempts: DEFAULT_FIPS_REQUEST_MAX_ATTEMPTS,
681 request_htl: MAX_HTL,
682 cache_responses: true,
683 }
684 }
685
686 pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
687 self.request_timeout = timeout;
688 self
689 }
690
691 pub fn with_request_retry_interval(mut self, interval: Duration) -> Self {
692 self.request_retry_interval = if interval.is_zero() {
693 Duration::from_millis(1)
694 } else {
695 interval
696 };
697 self
698 }
699
700 pub fn with_request_max_attempts(mut self, attempts: usize) -> Self {
701 self.request_max_attempts = attempts.max(1);
702 self
703 }
704
705 pub fn with_request_htl(mut self, htl: u8) -> Self {
706 self.request_htl = htl;
707 self
708 }
709
710 pub fn with_cache_responses(mut self, cache_responses: bool) -> Self {
711 self.cache_responses = cache_responses;
712 self
713 }
714
715 pub fn with_unconfigured_app_message_topics<T, I>(mut self, topics: I) -> Self
716 where
717 T: Into<String>,
718 I: IntoIterator<Item = T>,
719 {
720 self.unconfigured_app_message_topics = topics.into_iter().map(Into::into).collect();
721 self
722 }
723
724 pub async fn set_peers(&self, peers: Vec<String>) {
725 self.set_peer_configs(peers.into_iter().map(FipsPeerConfig::new).collect())
726 .await;
727 }
728
729 pub async fn set_peer_configs(&self, peers: Vec<FipsPeerConfig>) {
730 self.set_peer_configs_with_routing_peers(peers, Vec::new())
731 .await;
732 }
733
734 pub async fn set_peer_configs_with_routing_peers(
735 &self,
736 application_peers: Vec<FipsPeerConfig>,
737 routing_peers: Vec<FipsPeerConfig>,
738 ) {
739 let local = self.endpoint.local_peer_id();
740 let mut seen = std::collections::HashSet::new();
741 let app_out = sanitize_peer_configs(local.as_deref(), application_peers, &mut seen);
742 let routing_out = sanitize_peer_configs(local.as_deref(), routing_peers, &mut seen);
743 let mut endpoint_out = app_out.clone();
744 endpoint_out.extend(routing_out.clone());
745 let configured_count = endpoint_out.len();
746 let application_count = app_out.len();
747 let routing_only_count = routing_out.len();
748 let udp_hint_count: usize = endpoint_out
749 .iter()
750 .map(|peer| peer.udp_addresses.len())
751 .sum();
752 match self.endpoint.set_peer_configs(endpoint_out.clone()).await {
753 Ok(()) => {
754 tracing::info!(
755 configured_count,
756 application_count,
757 routing_only_count,
758 udp_hint_count,
759 "configured Hashtree FIPS peers"
760 );
761 *self.peers.write().await = app_out.into_iter().map(|peer| peer.npub).collect();
762 *self.peer_filter_configured.write().await = true;
763 }
764 Err(error) => {
765 tracing::warn!(
766 configured_count,
767 application_count,
768 routing_only_count,
769 udp_hint_count,
770 error = %error,
771 "failed to configure Hashtree FIPS peers"
772 );
773 }
774 }
775 }
776
777 pub async fn peer_ids(&self) -> Vec<String> {
778 let configured = self.peers.read().await.clone();
779 if configured.is_empty() && !*self.peer_filter_configured.read().await {
780 self.endpoint.peer_ids().await
781 } else {
782 configured
783 }
784 }
785
786 pub async fn configured_peer_ids(&self) -> Vec<String> {
787 self.peers.read().await.clone()
788 }
789
790 pub async fn connected_peer_ids(&self) -> Vec<String> {
791 self.endpoint.peer_ids().await
792 }
793
794 pub async fn peer_statuses(&self) -> Vec<FipsPeerStatus> {
795 self.endpoint.peer_statuses().await
796 }
797
798 pub async fn relay_statuses(&self) -> Vec<FipsRelayStatus> {
799 self.endpoint.relay_statuses().await
800 }
801
802 pub fn subscribe_app_messages(&self) -> broadcast::Receiver<FipsAppMessage> {
803 self.app_messages.subscribe()
804 }
805
806 pub async fn send_app_message(
807 &self,
808 peer_id: &str,
809 topic: &str,
810 data: Vec<u8>,
811 ) -> Result<(), FipsTransportError> {
812 for packet in encode_app_messages(topic, &data)? {
813 self.endpoint.send(peer_id, packet).await?;
814 }
815 Ok(())
816 }
817
818 pub async fn broadcast_app_message(
819 &self,
820 topic: &str,
821 data: Vec<u8>,
822 ) -> Result<usize, FipsTransportError> {
823 let packets = encode_app_messages(topic, &data)?;
824 let mut sent = 0usize;
825 for peer in self.peer_ids().await {
826 let mut peer_sent = true;
827 for packet in &packets {
828 if self.endpoint.send(&peer, packet.clone()).await.is_err() {
829 peer_sent = false;
830 break;
831 }
832 }
833 if peer_sent {
834 sent += 1;
835 }
836 }
837 Ok(sent)
838 }
839
840 pub fn start(self: &Arc<Self>) -> JoinHandle<()> {
841 let this = self.clone();
842 tokio::spawn(async move {
843 while let Some(packet) = this.endpoint.recv().await {
844 let _ = this.handle_packet(packet).await;
845 }
846 })
847 }
848
849 pub async fn get_from_peers(
850 &self,
851 hash: &Hash,
852 peers: &[String],
853 ) -> Result<Option<Vec<u8>>, FipsTransportError> {
854 if let Some(data) = self.local_store.get(hash).await? {
855 if verify_hash(&data, hash) {
856 return Ok(Some(data));
857 }
858 }
859 if peers.is_empty() {
860 return Ok(None);
861 }
862
863 let key = hash_key(hash);
864 let (tx, rx) = oneshot::channel();
865 self.pending
866 .lock()
867 .await
868 .entry(key.clone())
869 .or_default()
870 .push(PendingRequest { resolve: tx });
871
872 let payload = encode_request(hash, self.request_htl)?;
873 let deadline = tokio::time::Instant::now() + self.request_timeout;
874 let mut rx = rx;
875 for attempt in 0..self.request_max_attempts {
876 if attempt > 0 && remaining_until(deadline).is_zero() {
877 break;
878 }
879 let sent = self.send_request_to_peers(peers, &payload).await;
880 if sent == 0 && attempt == 0 {
881 self.resolve_pending(&key, None).await;
882 return Ok(None);
883 }
884 if attempt + 1 >= self.request_max_attempts {
885 break;
886 }
887 let remaining = remaining_until(deadline);
888 if remaining.is_zero() {
889 break;
890 }
891 match timeout(self.request_retry_interval.min(remaining), &mut rx).await {
892 Ok(Ok(result)) => return Ok(result),
893 Ok(Err(_)) => return Ok(None),
894 Err(_) => {
895 if remaining_until(deadline).is_zero() {
896 break;
897 }
898 }
899 }
900 }
901
902 match timeout(remaining_until(deadline), rx).await {
903 Ok(Ok(result)) => Ok(result),
904 _ => {
905 self.remove_pending_sender(&key).await;
906 Ok(None)
907 }
908 }
909 }
910
911 async fn send_request_to_peers(&self, peers: &[String], payload: &[u8]) -> usize {
912 let mut sent = 0usize;
913 for peer in peers {
914 if self.endpoint.send(peer, payload.to_vec()).await.is_ok() {
915 sent += 1;
916 }
917 }
918 sent
919 }
920
921 async fn handle_packet(&self, packet: FipsEndpointPacket) -> Result<(), FipsTransportError> {
922 let Some(message) = parse_message(&packet.data)? else {
923 return Ok(());
924 };
925 let unconfigured_app_message_allowed = match &message {
926 Message::App(app) => self
927 .unconfigured_app_message_topics
928 .iter()
929 .any(|topic| topic == &app.t),
930 _ => false,
931 };
932 let is_application_peer = self.is_application_peer(&packet.peer_id).await;
933 if !is_application_peer && !unconfigured_app_message_allowed {
934 return Ok(());
935 }
936 match message {
937 Message::Request(req) => {
938 let Some(hash) = bytes_to_hash(&req.h) else {
939 return Ok(());
940 };
941 let Some(data) = self.local_store.get(&hash).await? else {
942 return Ok(());
943 };
944 if !verify_hash(&data, &hash) {
945 return Ok(());
946 }
947 self.send_response(&packet.peer_id, &hash, &data).await?;
948 }
949 Message::Response(resp) => {
950 let Some(hash) = bytes_to_hash(&resp.h) else {
951 return Ok(());
952 };
953 let key = hash_key(&hash);
954 if !self.pending.lock().await.contains_key(&key) {
955 return Ok(());
956 }
957 let Some(data) = self.response_data_from_message(&key, resp).await else {
958 return Ok(());
959 };
960 if !verify_hash(&data, &hash) {
961 return Ok(());
962 }
963 if self.cache_responses {
964 let _ = self.local_store.put(hash, data.clone()).await;
965 }
966 self.resolve_pending(&key, Some(data)).await;
967 }
968 Message::App(app) => {
969 if app.t.trim().is_empty() {
970 return Ok(());
971 }
972 if let Some((topic, data)) = self.app_data_from_message(&packet.peer_id, app).await
973 {
974 let _ = self.app_messages.send(FipsAppMessage {
975 peer_id: packet.peer_id,
976 topic,
977 data,
978 });
979 }
980 }
981 }
982 Ok(())
983 }
984
985 async fn is_application_peer(&self, peer_id: &str) -> bool {
986 let peers = self.peers.read().await;
987 if peers.is_empty() && !*self.peer_filter_configured.read().await {
988 return true;
989 }
990 peers.iter().any(|peer| peer == peer_id)
991 }
992
993 async fn send_response(
994 &self,
995 peer_id: &str,
996 hash: &Hash,
997 data: &[u8],
998 ) -> Result<(), FipsTransportError> {
999 if data.len() <= FIPS_RESPONSE_FRAGMENT_SIZE {
1000 self.endpoint
1001 .send(peer_id, encode_response(hash, data)?)
1002 .await?;
1003 return Ok(());
1004 }
1005
1006 let total = data.len().div_ceil(FIPS_RESPONSE_FRAGMENT_SIZE) as u32;
1007 for index in 0..total {
1008 let start = index as usize * FIPS_RESPONSE_FRAGMENT_SIZE;
1009 let end = (start + FIPS_RESPONSE_FRAGMENT_SIZE).min(data.len());
1010 self.endpoint
1011 .send(
1012 peer_id,
1013 encode_fragment_response(hash, &data[start..end], index, total)?,
1014 )
1015 .await?;
1016 }
1017 Ok(())
1018 }
1019
1020 async fn response_data_from_message(&self, key: &str, resp: DataResponse) -> Option<Vec<u8>> {
1021 match (resp.i, resp.n) {
1022 (Some(index), Some(total)) => {
1023 self.reassemble_response_fragment(key, resp.d, index, total)
1024 .await
1025 }
1026 (None, None) => Some(resp.d),
1027 _ => None,
1028 }
1029 }
1030
1031 async fn reassemble_response_fragment(
1032 &self,
1033 key: &str,
1034 data: Vec<u8>,
1035 index: u32,
1036 total: u32,
1037 ) -> Option<Vec<u8>> {
1038 if total == 0 || total > MAX_RESPONSE_FRAGMENTS || index >= total {
1039 return None;
1040 }
1041
1042 let mut fragments = self.response_fragments.lock().await;
1043 let entry = fragments
1044 .entry(key.to_string())
1045 .or_insert_with(|| ResponseReassembly {
1046 total,
1047 fragments: HashMap::new(),
1048 received_bytes: 0,
1049 });
1050 if entry.total != total {
1051 *entry = ResponseReassembly {
1052 total,
1053 fragments: HashMap::new(),
1054 received_bytes: 0,
1055 };
1056 }
1057
1058 if let std::collections::hash_map::Entry::Vacant(slot) = entry.fragments.entry(index) {
1059 entry.received_bytes += data.len();
1060 slot.insert(data);
1061 }
1062
1063 if entry.fragments.len() != entry.total as usize {
1064 return None;
1065 }
1066
1067 let mut assembled = Vec::with_capacity(entry.received_bytes);
1068 for fragment_index in 0..entry.total {
1069 let fragment = entry.fragments.get(&fragment_index)?;
1070 assembled.extend_from_slice(fragment);
1071 }
1072 fragments.remove(key);
1073 Some(assembled)
1074 }
1075
1076 async fn app_data_from_message(
1077 &self,
1078 peer_id: &str,
1079 app: AppPacket,
1080 ) -> Option<(String, Vec<u8>)> {
1081 match (app.id, app.i, app.n) {
1082 (None, None, None) => Some((app.t, app.d)),
1083 (Some(id), Some(index), Some(total)) => {
1084 self.reassemble_app_fragment(peer_id, app.t, id, app.d, index, total)
1085 .await
1086 }
1087 _ => None,
1088 }
1089 }
1090
1091 async fn reassemble_app_fragment(
1092 &self,
1093 peer_id: &str,
1094 topic: String,
1095 id: String,
1096 data: Vec<u8>,
1097 index: u32,
1098 total: u32,
1099 ) -> Option<(String, Vec<u8>)> {
1100 if total == 0 || total > MAX_RESPONSE_FRAGMENTS || index >= total {
1101 return None;
1102 }
1103
1104 let key = format!("{peer_id}\0{topic}\0{id}");
1105 let mut fragments = self.app_fragments.lock().await;
1106 let entry = fragments
1107 .entry(key.clone())
1108 .or_insert_with(|| AppReassembly {
1109 total,
1110 fragments: HashMap::new(),
1111 received_bytes: 0,
1112 topic: topic.clone(),
1113 });
1114 if entry.total != total {
1115 *entry = AppReassembly {
1116 total,
1117 fragments: HashMap::new(),
1118 received_bytes: 0,
1119 topic: topic.clone(),
1120 };
1121 }
1122
1123 if let std::collections::hash_map::Entry::Vacant(slot) = entry.fragments.entry(index) {
1124 entry.received_bytes += data.len();
1125 slot.insert(data);
1126 }
1127
1128 if entry.fragments.len() != entry.total as usize {
1129 return None;
1130 }
1131
1132 let mut assembled = Vec::with_capacity(entry.received_bytes);
1133 for fragment_index in 0..entry.total {
1134 let fragment = entry.fragments.get(&fragment_index)?;
1135 assembled.extend_from_slice(fragment);
1136 }
1137 let topic = entry.topic.clone();
1138 fragments.remove(&key);
1139 Some((topic, assembled))
1140 }
1141
1142 async fn resolve_pending(&self, key: &str, data: Option<Vec<u8>>) {
1143 self.response_fragments.lock().await.remove(key);
1144 let pending = self.pending.lock().await.remove(key);
1145 if let Some(pending) = pending {
1146 for request in pending {
1147 let _ = request.resolve.send(data.clone());
1148 }
1149 }
1150 }
1151
1152 async fn remove_pending_sender(&self, key: &str) {
1153 let remove_fragments = {
1154 let mut pending = self.pending.lock().await;
1155 if let Some(requests) = pending.get_mut(key) {
1156 requests.retain(|request| !request.resolve.is_closed());
1157 if requests.is_empty() {
1158 pending.remove(key);
1159 true
1160 } else {
1161 false
1162 }
1163 } else {
1164 false
1165 }
1166 };
1167 if remove_fragments {
1168 self.response_fragments.lock().await.remove(key);
1169 }
1170 }
1171}
1172
1173type FipsMeshStore<S> = MeshStoreCore<S, FipsMeshSignaling<S>, FipsMeshLinkFactory<S>>;
1174
1175#[derive(Debug, Clone, PartialEq, Eq)]
1177pub struct FipsMeshPubsubEvent {
1178 pub stream_id: String,
1179 pub seq: u64,
1180 pub origin_peer_id: String,
1181 pub from_peer_id: String,
1182 pub payload: Vec<u8>,
1183}
1184
1185pub struct FipsMeshPubsub<S: Store + Send + Sync + 'static> {
1187 store: Arc<FipsMeshStore<S>>,
1188 demux_task: JoinHandle<()>,
1189 pump_task: JoinHandle<()>,
1190}
1191
1192impl<S: Store + Send + Sync + 'static> FipsMeshPubsub<S> {
1193 pub async fn subscribe_pubsub(&self, stream_id: impl Into<String>) -> PubsubPublishStats {
1195 self.store.subscribe_pubsub(stream_id.into()).await
1196 }
1197
1198 pub async fn publish_pubsub(
1200 &self,
1201 stream_id: impl Into<String>,
1202 seq: u64,
1203 payload: Vec<u8>,
1204 ) -> PubsubPublishStats {
1205 self.store
1206 .publish_pubsub(stream_id.into(), seq, payload)
1207 .await
1208 }
1209
1210 pub async fn drain_pubsub_events(&self) -> Vec<FipsMeshPubsubEvent> {
1212 self.store
1213 .drain_pubsub_events()
1214 .await
1215 .into_iter()
1216 .map(|event| FipsMeshPubsubEvent {
1217 stream_id: event.stream_id,
1218 seq: event.seq,
1219 origin_peer_id: event.origin_peer_id,
1220 from_peer_id: event.from_peer_id,
1221 payload: event.payload,
1222 })
1223 .collect()
1224 }
1225
1226 pub async fn peer_count(&self) -> usize {
1228 self.store.peer_count().await
1229 }
1230
1231 pub async fn peer_ids(&self) -> Vec<String> {
1233 self.store.peer_ids().await
1234 }
1235}
1236
1237#[async_trait]
1238impl<S: Store + Send + Sync + 'static> Store for FipsMeshPubsub<S> {
1239 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
1240 self.store.put(hash, data).await
1241 }
1242
1243 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
1244 self.store.get(hash).await
1245 }
1246
1247 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
1248 self.store.has(hash).await
1249 }
1250
1251 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
1252 self.store.delete(hash).await
1253 }
1254}
1255
1256impl<S: Store + Send + Sync + 'static> Drop for FipsMeshPubsub<S> {
1257 fn drop(&mut self) {
1258 self.demux_task.abort();
1259 self.pump_task.abort();
1260 }
1261}
1262
1263impl<S: Store + Send + Sync + 'static> HashtreeFipsTransport<S> {
1264 pub async fn start_mesh_pubsub(
1270 self: &Arc<Self>,
1271 local_store: Arc<S>,
1272 peer_id: String,
1273 request_timeout: Duration,
1274 ) -> Result<FipsMeshPubsub<S>, FipsTransportError> {
1275 let hub = FipsMeshLinkHub::default();
1276 let (signaling_tx, signaling_rx) = mpsc::unbounded_channel();
1277 let signaling_transport = Arc::new(FipsMeshSignaling {
1278 peer_id: peer_id.clone(),
1279 transport: self.clone(),
1280 rx: Mutex::new(signaling_rx),
1281 connected: AtomicBool::new(true),
1282 });
1283 let link_factory = Arc::new(FipsMeshLinkFactory {
1284 transport: self.clone(),
1285 hub: hub.clone(),
1286 });
1287 let router = Arc::new(MeshRouter::new(
1288 peer_id.clone(),
1289 signaling_transport.clone(),
1290 link_factory,
1291 PoolSettings::default(),
1292 false,
1293 ));
1294 let store = Arc::new(MeshStoreCore::new_with_routing(
1295 local_store,
1296 router,
1297 request_timeout,
1298 false,
1299 MeshRoutingConfig {
1300 pubsub_delivery_mode: PubsubDeliveryMode::HtlInvWant,
1301 ..Default::default()
1302 },
1303 ));
1304 store
1305 .start()
1306 .await
1307 .map_err(|error| FipsTransportError::Endpoint(error.to_string()))?;
1308
1309 let demux_task = spawn_fips_mesh_demux(self.clone(), peer_id, hub, signaling_tx);
1310 let pump_task = spawn_fips_mesh_pump(store.clone(), signaling_transport);
1311 Ok(FipsMeshPubsub {
1312 store,
1313 demux_task,
1314 pump_task,
1315 })
1316 }
1317}
1318
1319#[derive(Default, Clone)]
1320struct FipsMeshLinkHub {
1321 inboxes: Arc<Mutex<HashMap<String, Arc<FipsMeshInbox>>>>,
1322}
1323
1324impl FipsMeshLinkHub {
1325 async fn inbox(&self, peer_id: &str) -> Arc<FipsMeshInbox> {
1326 let mut inboxes = self.inboxes.lock().await;
1327 inboxes
1328 .entry(peer_id.to_string())
1329 .or_insert_with(|| Arc::new(FipsMeshInbox::default()))
1330 .clone()
1331 }
1332
1333 async fn push(&self, peer_id: &str, data: Vec<u8>) {
1334 self.inbox(peer_id).await.push(data).await;
1335 }
1336}
1337
1338struct FipsMeshInbox {
1339 queue: Mutex<VecDeque<Vec<u8>>>,
1340 notify: Notify,
1341 open: AtomicBool,
1342}
1343
1344impl Default for FipsMeshInbox {
1345 fn default() -> Self {
1346 Self {
1347 queue: Mutex::new(VecDeque::new()),
1348 notify: Notify::new(),
1349 open: AtomicBool::new(true),
1350 }
1351 }
1352}
1353
1354impl FipsMeshInbox {
1355 async fn push(&self, data: Vec<u8>) {
1356 self.queue.lock().await.push_back(data);
1357 self.notify.notify_waiters();
1358 }
1359
1360 async fn recv(&self) -> Option<Vec<u8>> {
1361 loop {
1362 if let Some(data) = self.queue.lock().await.pop_front() {
1363 return Some(data);
1364 }
1365 if !self.open.load(Ordering::Relaxed) {
1366 return None;
1367 }
1368 self.notify.notified().await;
1369 }
1370 }
1371
1372 fn try_recv(&self) -> Option<Vec<u8>> {
1373 let Ok(mut queue) = self.queue.try_lock() else {
1374 return None;
1375 };
1376 queue.pop_front()
1377 }
1378
1379 fn close(&self) {
1380 self.open.store(false, Ordering::Relaxed);
1381 self.notify.notify_waiters();
1382 }
1383}
1384
1385struct FipsMeshSignaling<S: Store + Send + Sync + 'static> {
1386 peer_id: String,
1387 transport: Arc<HashtreeFipsTransport<S>>,
1388 rx: Mutex<mpsc::UnboundedReceiver<SignalingMessage>>,
1389 connected: AtomicBool,
1390}
1391
1392#[async_trait]
1393impl<S: Store + Send + Sync + 'static> SignalingTransport for FipsMeshSignaling<S> {
1394 async fn connect(&self, _relays: &[String]) -> Result<(), TransportError> {
1395 self.connected.store(true, Ordering::Relaxed);
1396 Ok(())
1397 }
1398
1399 async fn disconnect(&self) {
1400 self.connected.store(false, Ordering::Relaxed);
1401 }
1402
1403 async fn publish(&self, msg: SignalingMessage) -> Result<(), TransportError> {
1404 if !self.connected.load(Ordering::Relaxed) {
1405 return Err(TransportError::NotConnected);
1406 }
1407 let payload = serde_json::to_vec(&msg)
1408 .map_err(|error| TransportError::SendFailed(error.to_string()))?;
1409 if let Some(peer_id) = msg.target_peer_id() {
1410 self.transport
1411 .send_app_message(peer_id, FIPS_MESH_SIGNALING_TOPIC, payload)
1412 .await
1413 .map_err(|error| TransportError::SendFailed(error.to_string()))
1414 } else {
1415 self.transport
1416 .broadcast_app_message(FIPS_MESH_SIGNALING_TOPIC, payload)
1417 .await
1418 .map(|_| ())
1419 .map_err(|error| TransportError::SendFailed(error.to_string()))
1420 }
1421 }
1422
1423 async fn recv(&self) -> Option<SignalingMessage> {
1424 self.rx.lock().await.recv().await
1425 }
1426
1427 fn try_recv(&self) -> Option<SignalingMessage> {
1428 let Ok(mut rx) = self.rx.try_lock() else {
1429 return None;
1430 };
1431 rx.try_recv().ok()
1432 }
1433
1434 fn peer_id(&self) -> &str {
1435 &self.peer_id
1436 }
1437}
1438
1439struct FipsMeshLinkFactory<S: Store + Send + Sync + 'static> {
1440 transport: Arc<HashtreeFipsTransport<S>>,
1441 hub: FipsMeshLinkHub,
1442}
1443
1444impl<S: Store + Send + Sync + 'static> FipsMeshLinkFactory<S> {
1445 async fn link_for(&self, peer_id: &str) -> Arc<dyn PeerLink> {
1446 Arc::new(FipsMeshPeerLink {
1447 peer_id: peer_id.to_string(),
1448 transport: self.transport.clone(),
1449 inbox: self.hub.inbox(peer_id).await,
1450 })
1451 }
1452}
1453
1454#[async_trait]
1455impl<S: Store + Send + Sync + 'static> PeerLinkFactory for FipsMeshLinkFactory<S> {
1456 async fn create_offer(
1457 &self,
1458 target_peer_id: &str,
1459 ) -> Result<(Arc<dyn PeerLink>, String), TransportError> {
1460 Ok((self.link_for(target_peer_id).await, "fips-mesh-v1".into()))
1461 }
1462
1463 async fn accept_offer(
1464 &self,
1465 from_peer_id: &str,
1466 _offer_sdp: &str,
1467 ) -> Result<(Arc<dyn PeerLink>, String), TransportError> {
1468 Ok((self.link_for(from_peer_id).await, "fips-mesh-v1".into()))
1469 }
1470
1471 async fn handle_answer(
1472 &self,
1473 target_peer_id: &str,
1474 _answer_sdp: &str,
1475 ) -> Result<Arc<dyn PeerLink>, TransportError> {
1476 Ok(self.link_for(target_peer_id).await)
1477 }
1478}
1479
1480struct FipsMeshPeerLink<S: Store + Send + Sync + 'static> {
1481 peer_id: String,
1482 transport: Arc<HashtreeFipsTransport<S>>,
1483 inbox: Arc<FipsMeshInbox>,
1484}
1485
1486#[async_trait]
1487impl<S: Store + Send + Sync + 'static> PeerLink for FipsMeshPeerLink<S> {
1488 async fn send(&self, data: Vec<u8>) -> Result<(), TransportError> {
1489 self.transport
1490 .send_app_message(&self.peer_id, FIPS_MESH_DATA_TOPIC, data)
1491 .await
1492 .map_err(|error| TransportError::SendFailed(error.to_string()))
1493 }
1494
1495 async fn recv(&self) -> Option<Vec<u8>> {
1496 self.inbox.recv().await
1497 }
1498
1499 fn try_recv(&self) -> Option<Vec<u8>> {
1500 self.inbox.try_recv()
1501 }
1502
1503 fn is_open(&self) -> bool {
1504 self.inbox.open.load(Ordering::Relaxed)
1505 }
1506
1507 async fn close(&self) {
1508 self.inbox.close();
1509 }
1510}
1511
1512fn spawn_fips_mesh_demux<S: Store + Send + Sync + 'static>(
1513 transport: Arc<HashtreeFipsTransport<S>>,
1514 peer_id: String,
1515 hub: FipsMeshLinkHub,
1516 signaling_tx: mpsc::UnboundedSender<SignalingMessage>,
1517) -> JoinHandle<()> {
1518 let mut app_messages = transport.subscribe_app_messages();
1519 tokio::spawn(async move {
1520 loop {
1521 let Ok(message) = app_messages.recv().await else {
1522 break;
1523 };
1524 match message.topic.as_str() {
1525 FIPS_MESH_SIGNALING_TOPIC => {
1526 let Ok(signal) = serde_json::from_slice::<SignalingMessage>(&message.data)
1527 else {
1528 continue;
1529 };
1530 if signal.peer_id() != peer_id
1531 && (signal.target_peer_id().is_none()
1532 || signal.target_peer_id() == Some(peer_id.as_str()))
1533 {
1534 let _ = signaling_tx.send(signal);
1535 }
1536 }
1537 FIPS_MESH_DATA_TOPIC => {
1538 hub.push(&message.peer_id, message.data).await;
1539 }
1540 _ => {}
1541 }
1542 }
1543 })
1544}
1545
1546fn spawn_fips_mesh_pump<S: Store + Send + Sync + 'static>(
1547 store: Arc<FipsMeshStore<S>>,
1548 signaling: Arc<FipsMeshSignaling<S>>,
1549) -> JoinHandle<()> {
1550 tokio::spawn(async move {
1551 let mut pump = tokio::time::interval(FIPS_MESH_PUMP_INTERVAL);
1552 pump.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1553 let mut hello = tokio::time::interval(FIPS_MESH_HELLO_INTERVAL);
1554 hello.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1555 loop {
1556 tokio::select! {
1557 signal = signaling.recv() => {
1558 let Some(signal) = signal else {
1559 break;
1560 };
1561 let _ = store.process_signaling(signal).await;
1562 }
1563 _ = pump.tick() => {
1564 store.drain_available_data_messages().await;
1565 }
1566 _ = hello.tick() => {
1567 let _ = store.send_hello().await;
1568 }
1569 }
1570 }
1571 })
1572}
1573
1574#[async_trait]
1575impl<S: Store + Send + Sync + 'static> Store for HashtreeFipsTransport<S> {
1576 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
1577 self.local_store.put(hash, data).await
1578 }
1579
1580 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
1581 if let Some(data) = self.local_store.get(hash).await? {
1582 if verify_hash(&data, hash) {
1583 return Ok(Some(data));
1584 }
1585 }
1586 let peers = self.peer_ids().await;
1587 self.get_from_peers(hash, &peers)
1588 .await
1589 .map_err(|err| StoreError::Other(err.to_string()))
1590 }
1591
1592 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
1593 self.local_store.has(hash).await
1594 }
1595
1596 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
1597 self.local_store.delete(hash).await
1598 }
1599}
1600
1601#[cfg(test)]
1602mod tests {
1603 use super::*;
1604 use std::sync::atomic::{AtomicUsize, Ordering};
1605 use tokio::sync::mpsc;
1606
1607 struct FakeEndpoint {
1608 id: String,
1609 network: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<FipsEndpointPacket>>>>,
1610 rx: Mutex<mpsc::UnboundedReceiver<FipsEndpointPacket>>,
1611 configured_peers: Mutex<Vec<String>>,
1612 configured_peer_configs: Mutex<Vec<FipsPeerConfig>>,
1613 peer_statuses: Mutex<Vec<FipsPeerStatus>>,
1614 sent: AtomicUsize,
1615 drop_next: AtomicUsize,
1616 }
1617
1618 impl FakeEndpoint {
1619 async fn new(
1620 id: &str,
1621 network: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<FipsEndpointPacket>>>>,
1622 ) -> Arc<Self> {
1623 let (tx, rx) = mpsc::unbounded_channel();
1624 network.lock().await.insert(id.to_string(), tx);
1625 Arc::new(Self {
1626 id: id.to_string(),
1627 network,
1628 rx: Mutex::new(rx),
1629 configured_peers: Mutex::new(Vec::new()),
1630 configured_peer_configs: Mutex::new(Vec::new()),
1631 peer_statuses: Mutex::new(Vec::new()),
1632 sent: AtomicUsize::new(0),
1633 drop_next: AtomicUsize::new(0),
1634 })
1635 }
1636
1637 fn sent_count(&self) -> usize {
1638 self.sent.load(Ordering::Relaxed)
1639 }
1640
1641 fn drop_next_sends(&self, count: usize) {
1642 self.drop_next.store(count, Ordering::Relaxed);
1643 }
1644 }
1645
1646 #[async_trait]
1647 impl FipsEndpointIo for FakeEndpoint {
1648 async fn send(&self, peer_id: &str, data: Vec<u8>) -> Result<(), FipsTransportError> {
1649 self.sent.fetch_add(1, Ordering::Relaxed);
1650 if self
1651 .drop_next
1652 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| {
1653 if value > 0 {
1654 Some(value - 1)
1655 } else {
1656 None
1657 }
1658 })
1659 .is_ok()
1660 {
1661 return Ok(());
1662 }
1663 let tx = self
1664 .network
1665 .lock()
1666 .await
1667 .get(peer_id)
1668 .cloned()
1669 .ok_or_else(|| FipsTransportError::Send(format!("unknown peer {peer_id}")))?;
1670 tx.send(FipsEndpointPacket {
1671 peer_id: self.id.clone(),
1672 data,
1673 })
1674 .map_err(|_| FipsTransportError::Send("receiver closed".to_string()))
1675 }
1676
1677 async fn recv(&self) -> Option<FipsEndpointPacket> {
1678 self.rx.lock().await.recv().await
1679 }
1680
1681 async fn set_peer_ids(&self, peer_ids: Vec<String>) -> Result<(), FipsTransportError> {
1682 *self.configured_peers.lock().await = peer_ids;
1683 Ok(())
1684 }
1685
1686 async fn set_peer_configs(
1687 &self,
1688 peer_configs: Vec<FipsPeerConfig>,
1689 ) -> Result<(), FipsTransportError> {
1690 *self.configured_peers.lock().await =
1691 peer_configs.iter().map(|peer| peer.npub.clone()).collect();
1692 *self.configured_peer_configs.lock().await = peer_configs;
1693 Ok(())
1694 }
1695
1696 async fn peer_ids(&self) -> Vec<String> {
1697 let configured = self.configured_peers.lock().await.clone();
1698 if !configured.is_empty() {
1699 return configured;
1700 }
1701 self.network
1702 .lock()
1703 .await
1704 .keys()
1705 .filter(|id| *id != &self.id)
1706 .cloned()
1707 .collect()
1708 }
1709
1710 async fn peer_statuses(&self) -> Vec<FipsPeerStatus> {
1711 self.peer_statuses.lock().await.clone()
1712 }
1713
1714 fn local_peer_id(&self) -> Option<String> {
1715 Some(self.id.clone())
1716 }
1717 }
1718
1719 fn hash(data: &[u8]) -> Hash {
1720 let digest = Sha256::digest(data);
1721 let mut hash = [0u8; 32];
1722 hash.copy_from_slice(&digest);
1723 hash
1724 }
1725
1726 #[test]
1727 fn endpoint_config_uses_reply_learned_routing_for_mesh_fallback() {
1728 let config = fips_endpoint_config(FipsEndpointOptions::new("nsec1example"), "test-scope");
1729
1730 assert_eq!(config.node.routing.mode, RoutingMode::ReplyLearned);
1731 }
1732
1733 #[test]
1734 fn endpoint_config_scopes_nostr_discovery_app() {
1735 let config = fips_endpoint_config(
1736 FipsEndpointOptions::new("nsec1example"),
1737 "iris-drive-v1:private-owner",
1738 );
1739
1740 assert_eq!(
1741 config.node.discovery.nostr.app,
1742 "iris-drive-v1:private-owner"
1743 );
1744 }
1745
1746 #[test]
1747 fn endpoint_config_caps_total_peer_fanout() {
1748 let mut options = FipsEndpointOptions::new("nsec1example");
1749 options.webrtc_max_connections = 9;
1750 let config = fips_endpoint_config(options, "test-scope");
1751
1752 assert_eq!(config.node.limits.max_peers, 9);
1753 assert_eq!(config.node.limits.max_links, 18);
1754 assert_eq!(config.node.limits.max_connections, 18);
1755 assert_eq!(config.node.limits.max_pending_inbound, 36);
1756 }
1757
1758 #[test]
1759 fn transport_tagged_peer_addresses_are_preserved_for_fips() {
1760 let udp = peer_address_from_configured_addr(" udp:10.44.1.2:22121 ").unwrap();
1761 let tcp = peer_address_from_configured_addr("tcp:203.0.113.9:443").unwrap();
1762 let bare = peer_address_from_configured_addr("10.44.1.3:22121").unwrap();
1763
1764 assert_eq!(udp.transport, "udp");
1765 assert_eq!(udp.addr, "10.44.1.2:22121");
1766 assert_eq!(tcp.transport, "tcp");
1767 assert_eq!(tcp.addr, "203.0.113.9:443");
1768 assert_eq!(bare.transport, "udp");
1769 assert_eq!(bare.addr, "10.44.1.3:22121");
1770 }
1771
1772 #[tokio::test]
1773 async fn set_peers_configures_underlying_fips_endpoint() {
1774 let network = Arc::new(Mutex::new(HashMap::new()));
1775 let endpoint = FakeEndpoint::new("local", network).await;
1776 let transport = HashtreeFipsTransport::new(endpoint.clone(), Arc::new(MemoryStore::new()));
1777
1778 transport
1779 .set_peers(vec![
1780 "remote".to_string(),
1781 "local".to_string(),
1782 "remote".to_string(),
1783 " ".to_string(),
1784 ])
1785 .await;
1786
1787 assert_eq!(
1788 endpoint.configured_peers.lock().await.as_slice(),
1789 &["remote".to_string()]
1790 );
1791 assert_eq!(transport.configured_peer_ids().await, vec!["remote"]);
1792 }
1793
1794 #[tokio::test]
1795 async fn set_peer_configs_preserves_static_udp_addresses() {
1796 let network = Arc::new(Mutex::new(HashMap::new()));
1797 let endpoint = FakeEndpoint::new("local", network).await;
1798 let transport = HashtreeFipsTransport::new(endpoint.clone(), Arc::new(MemoryStore::new()));
1799
1800 transport
1801 .set_peer_configs(vec![
1802 FipsPeerConfig {
1803 npub: " remote ".to_string(),
1804 udp_addresses: vec![" 10.44.1.2:22121 ".to_string(), " ".to_string()],
1805 },
1806 FipsPeerConfig {
1807 npub: "local".to_string(),
1808 udp_addresses: vec!["10.44.1.1:22121".to_string()],
1809 },
1810 ])
1811 .await;
1812
1813 assert_eq!(
1814 endpoint.configured_peer_configs.lock().await.as_slice(),
1815 &[FipsPeerConfig {
1816 npub: "remote".to_string(),
1817 udp_addresses: vec!["10.44.1.2:22121".to_string()],
1818 }]
1819 );
1820 assert_eq!(transport.configured_peer_ids().await, vec!["remote"]);
1821 }
1822
1823 #[tokio::test]
1824 async fn routing_only_peers_are_configured_but_not_application_peers() {
1825 let network = Arc::new(Mutex::new(HashMap::new()));
1826 let endpoint = FakeEndpoint::new("local", network).await;
1827 let transport = HashtreeFipsTransport::new(endpoint.clone(), Arc::new(MemoryStore::new()));
1828
1829 transport
1830 .set_peer_configs_with_routing_peers(
1831 vec![FipsPeerConfig {
1832 npub: "device".to_string(),
1833 udp_addresses: vec!["10.44.1.2:22121".to_string()],
1834 }],
1835 vec![FipsPeerConfig {
1836 npub: "bootstrap".to_string(),
1837 udp_addresses: vec!["udp:203.0.113.7:2121".to_string()],
1838 }],
1839 )
1840 .await;
1841
1842 assert_eq!(
1843 endpoint.configured_peers.lock().await.as_slice(),
1844 &["device".to_string(), "bootstrap".to_string()]
1845 );
1846 assert_eq!(transport.configured_peer_ids().await, vec!["device"]);
1847 assert_eq!(transport.peer_ids().await, vec!["device"]);
1848 }
1849
1850 #[tokio::test]
1851 async fn empty_application_peer_set_does_not_fall_back_to_routing_peers() {
1852 let network = Arc::new(Mutex::new(HashMap::new()));
1853 let endpoint = FakeEndpoint::new("local", network).await;
1854 let transport = HashtreeFipsTransport::new(endpoint.clone(), Arc::new(MemoryStore::new()));
1855
1856 transport
1857 .set_peer_configs_with_routing_peers(
1858 Vec::new(),
1859 vec![FipsPeerConfig {
1860 npub: "bootstrap".to_string(),
1861 udp_addresses: vec!["udp:203.0.113.7:2121".to_string()],
1862 }],
1863 )
1864 .await;
1865
1866 assert_eq!(
1867 endpoint.configured_peers.lock().await.as_slice(),
1868 &["bootstrap".to_string()]
1869 );
1870 assert!(transport.configured_peer_ids().await.is_empty());
1871 assert!(transport.peer_ids().await.is_empty());
1872 assert_eq!(transport.connected_peer_ids().await, vec!["bootstrap"]);
1873 }
1874
1875 #[tokio::test]
1876 async fn peer_statuses_expose_fips_endpoint_latency_snapshot() {
1877 let network = Arc::new(Mutex::new(HashMap::new()));
1878 let endpoint = FakeEndpoint::new("local", network).await;
1879 *endpoint.peer_statuses.lock().await = vec![FipsPeerStatus {
1880 npub: "remote".to_string(),
1881 transport_addr: Some("udp:10.44.1.2:2121".to_string()),
1882 transport_type: Some("udp".to_string()),
1883 srtt_ms: Some(23),
1884 packets_sent: 5,
1885 packets_recv: 7,
1886 bytes_sent: 512,
1887 bytes_recv: 1024,
1888 }];
1889 let transport = HashtreeFipsTransport::new(endpoint, Arc::new(MemoryStore::new()));
1890
1891 assert_eq!(
1892 transport.peer_statuses().await,
1893 vec![FipsPeerStatus {
1894 npub: "remote".to_string(),
1895 transport_addr: Some("udp:10.44.1.2:2121".to_string()),
1896 transport_type: Some("udp".to_string()),
1897 srtt_ms: Some(23),
1898 packets_sent: 5,
1899 packets_recv: 7,
1900 bytes_sent: 512,
1901 bytes_recv: 1024,
1902 }]
1903 );
1904 }
1905
1906 #[tokio::test]
1907 async fn fetches_hash_verified_blob_over_fips_endpoint_bytes() {
1908 let network = Arc::new(Mutex::new(HashMap::new()));
1909 let endpoint_a = FakeEndpoint::new("a", network.clone()).await;
1910 let endpoint_b = FakeEndpoint::new("b", network).await;
1911 let data = b"hashtree over fips".to_vec();
1912 let hash = hash(&data);
1913 let store_a = Arc::new(MemoryStore::new());
1914 let store_b = Arc::new(MemoryStore::new());
1915 store_a.put(hash, data.clone()).await.unwrap();
1916
1917 let transport_a = Arc::new(HashtreeFipsTransport::new(endpoint_a, store_a));
1918 let transport_b = Arc::new(
1919 HashtreeFipsTransport::new(endpoint_b, store_b.clone())
1920 .with_request_timeout(Duration::from_millis(100)),
1921 );
1922 transport_a.start();
1923 transport_b.start();
1924 transport_b.set_peers(vec!["a".to_string()]).await;
1925
1926 assert_eq!(transport_b.get(&hash).await.unwrap(), Some(data.clone()));
1927 assert_eq!(store_b.get(&hash).await.unwrap(), Some(data));
1928 }
1929
1930 #[tokio::test]
1931 async fn fetches_fragmented_blob_over_fips_endpoint_bytes() {
1932 let network = Arc::new(Mutex::new(HashMap::new()));
1933 let endpoint_a = FakeEndpoint::new("a", network.clone()).await;
1934 let endpoint_b = FakeEndpoint::new("b", network).await;
1935 let data = (0..(FIPS_RESPONSE_FRAGMENT_SIZE * 2 + 17))
1936 .map(|index| (index % 251) as u8)
1937 .collect::<Vec<_>>();
1938 let hash = hash(&data);
1939 let store_a = Arc::new(MemoryStore::new());
1940 let store_b = Arc::new(MemoryStore::new());
1941 store_a.put(hash, data.clone()).await.unwrap();
1942
1943 let transport_a = Arc::new(HashtreeFipsTransport::new(endpoint_a, store_a));
1944 let transport_b = Arc::new(
1945 HashtreeFipsTransport::new(endpoint_b, store_b.clone())
1946 .with_request_timeout(Duration::from_millis(100)),
1947 );
1948 transport_a.start();
1949 transport_b.start();
1950 transport_b.set_peers(vec!["a".to_string()]).await;
1951
1952 assert_eq!(transport_b.get(&hash).await.unwrap(), Some(data.clone()));
1953 assert_eq!(store_b.get(&hash).await.unwrap(), Some(data));
1954 }
1955
1956 #[tokio::test]
1957 async fn delivers_fragmented_app_messages_over_fips_endpoint_bytes() {
1958 let network = Arc::new(Mutex::new(HashMap::new()));
1959 let endpoint_a = FakeEndpoint::new("a", network.clone()).await;
1960 let endpoint_b = FakeEndpoint::new("b", network).await;
1961 let transport_a = Arc::new(HashtreeFipsTransport::new(
1962 endpoint_a.clone(),
1963 Arc::new(MemoryStore::new()),
1964 ));
1965 let transport_b = Arc::new(HashtreeFipsTransport::new(
1966 endpoint_b,
1967 Arc::new(MemoryStore::new()),
1968 ));
1969 let mut app_messages = transport_b.subscribe_app_messages();
1970 transport_a.start();
1971 transport_b.start();
1972
1973 let data = (0..(FIPS_APP_FRAGMENT_SIZE * 3 + 19))
1974 .map(|index| (index % 251) as u8)
1975 .collect::<Vec<_>>();
1976 transport_a
1977 .send_app_message("b", "iris-drive/root/frame/v1", data.clone())
1978 .await
1979 .unwrap();
1980
1981 let message = timeout(Duration::from_millis(100), app_messages.recv())
1982 .await
1983 .unwrap()
1984 .unwrap();
1985 assert_eq!(message.peer_id, "a");
1986 assert_eq!(message.topic, "iris-drive/root/frame/v1");
1987 assert_eq!(message.data, data);
1988 assert!(endpoint_a.sent_count() > 1);
1989 }
1990
1991 #[tokio::test]
1992 async fn can_deliver_unconfigured_app_messages_without_serving_blocks() {
1993 let network = Arc::new(Mutex::new(HashMap::new()));
1994 let endpoint_a = FakeEndpoint::new("a", network.clone()).await;
1995 let endpoint_b = FakeEndpoint::new("b", network).await;
1996 let data = b"app-only peer".to_vec();
1997 let hash = hash(&data);
1998 let store_a = Arc::new(MemoryStore::new());
1999 let store_b = Arc::new(MemoryStore::new());
2000 store_b.put(hash, data.clone()).await.unwrap();
2001 let transport_a = Arc::new(
2002 HashtreeFipsTransport::new(endpoint_a, store_a)
2003 .with_request_timeout(Duration::from_millis(50)),
2004 );
2005 let transport_b = Arc::new(
2006 HashtreeFipsTransport::new(endpoint_b, store_b)
2007 .with_unconfigured_app_message_topics(["iris-drive/device-link/v1/request"]),
2008 );
2009 let mut app_messages = transport_b.subscribe_app_messages();
2010 transport_a.start();
2011 transport_b.start();
2012 transport_b
2013 .set_peers(vec!["configured-peer".to_string()])
2014 .await;
2015
2016 transport_a
2017 .send_app_message("b", "iris-drive/device-link/v1/request", b"join".to_vec())
2018 .await
2019 .unwrap();
2020
2021 let message = timeout(Duration::from_millis(100), app_messages.recv())
2022 .await
2023 .unwrap()
2024 .unwrap();
2025 assert_eq!(message.peer_id, "a");
2026 assert_eq!(message.topic, "iris-drive/device-link/v1/request");
2027 assert_eq!(message.data, b"join");
2028 assert_eq!(
2029 transport_a
2030 .get_from_peers(&hash, &["b".to_string()])
2031 .await
2032 .unwrap(),
2033 None
2034 );
2035 }
2036
2037 #[tokio::test]
2038 async fn mesh_pubsub_delivers_over_fips_endpoint_bytes() {
2039 let network = Arc::new(Mutex::new(HashMap::new()));
2040 let endpoint_a = FakeEndpoint::new("a", network.clone()).await;
2041 let endpoint_b = FakeEndpoint::new("b", network).await;
2042 let store_a = Arc::new(MemoryStore::new());
2043 let store_b = Arc::new(MemoryStore::new());
2044 let transport_a = Arc::new(HashtreeFipsTransport::new(endpoint_a, store_a.clone()));
2045 let transport_b = Arc::new(HashtreeFipsTransport::new(endpoint_b, store_b.clone()));
2046 transport_a.set_peers(vec!["b".to_string()]).await;
2047 transport_b.set_peers(vec!["a".to_string()]).await;
2048 transport_a.start();
2049 transport_b.start();
2050
2051 let mesh_a = transport_a
2052 .start_mesh_pubsub(store_a, "a".to_string(), Duration::from_millis(200))
2053 .await
2054 .unwrap();
2055 let mesh_b = transport_b
2056 .start_mesh_pubsub(store_b, "b".to_string(), Duration::from_millis(200))
2057 .await
2058 .unwrap();
2059 mesh_b.subscribe_pubsub("iris-drive/root-events/test").await;
2060
2061 let payload = vec![0x42; 4096];
2062 let delivered = timeout(Duration::from_secs(2), async {
2063 let mut seq = 1u64;
2064 loop {
2065 mesh_a
2066 .publish_pubsub("iris-drive/root-events/test", seq, payload.clone())
2067 .await;
2068 seq += 1;
2069 tokio::time::sleep(Duration::from_millis(25)).await;
2070 let events = mesh_b.drain_pubsub_events().await;
2071 if let Some(event) = events.into_iter().next() {
2072 break event;
2073 }
2074 }
2075 })
2076 .await
2077 .unwrap();
2078
2079 assert_eq!(delivered.stream_id, "iris-drive/root-events/test");
2080 assert_eq!(delivered.origin_peer_id, "a");
2081 assert_eq!(delivered.payload, payload);
2082 assert_eq!(mesh_a.peer_ids().await, vec!["b"]);
2083 assert_eq!(mesh_b.peer_ids().await, vec!["a"]);
2084 }
2085
2086 #[tokio::test(start_paused = true)]
2087 async fn silence_resolves_unknown_without_retrying_same_peer() {
2088 let network = Arc::new(Mutex::new(HashMap::new()));
2089 let endpoint_a = FakeEndpoint::new("a", network.clone()).await;
2090 let endpoint_b = FakeEndpoint::new("b", network).await;
2091 let missing = [7u8; 32];
2092 let transport_a = Arc::new(HashtreeFipsTransport::new(
2093 endpoint_a,
2094 Arc::new(MemoryStore::new()),
2095 ));
2096 let transport_b = Arc::new(
2097 HashtreeFipsTransport::new(endpoint_b.clone(), Arc::new(MemoryStore::new()))
2098 .with_request_timeout(Duration::from_millis(25)),
2099 );
2100 transport_a.start();
2101 transport_b.start();
2102 transport_b.set_peers(vec!["a".to_string()]).await;
2103
2104 let pending = transport_b.get(&missing);
2105 tokio::time::advance(Duration::from_millis(30)).await;
2106
2107 assert_eq!(pending.await.unwrap(), None);
2108 assert_eq!(endpoint_b.sent_count(), 1);
2109 assert!(
2110 transport_b.pending.lock().await.is_empty(),
2111 "timed-out requests should not leave stale pending senders"
2112 );
2113 }
2114
2115 #[tokio::test(start_paused = true)]
2116 async fn retries_dropped_request_to_same_peer() {
2117 let network = Arc::new(Mutex::new(HashMap::new()));
2118 let endpoint_a = FakeEndpoint::new("a", network.clone()).await;
2119 let endpoint_b = FakeEndpoint::new("b", network).await;
2120 let data = b"retried request".to_vec();
2121 let hash = hash(&data);
2122 let store_a = Arc::new(MemoryStore::new());
2123 store_a.put(hash, data.clone()).await.unwrap();
2124 endpoint_b.drop_next_sends(1);
2125 let transport_a = Arc::new(HashtreeFipsTransport::new(endpoint_a, store_a));
2126 let transport_b = Arc::new(
2127 HashtreeFipsTransport::new(endpoint_b.clone(), Arc::new(MemoryStore::new()))
2128 .with_request_timeout(Duration::from_millis(300))
2129 .with_request_retry_interval(Duration::from_millis(50)),
2130 );
2131 transport_a.start();
2132 transport_b.start();
2133 transport_b.set_peers(vec!["a".to_string()]).await;
2134
2135 let pending = transport_b.get(&hash);
2136 tokio::time::advance(Duration::from_millis(60)).await;
2137
2138 assert_eq!(pending.await.unwrap(), Some(data));
2139 assert_eq!(endpoint_b.sent_count(), 2);
2140 }
2141}