1use super::*;
2use nostr_sdk::nostr::JsonUtil;
3use tokio::io::{AsyncReadExt, AsyncWriteExt};
4use tokio::net::TcpStream;
5
6impl WebRTCManager {
7 pub async fn run(&mut self) -> Result<()> {
9 info!(
10 "Starting peer router with peer ID: {}",
11 self.my_peer_id.short()
12 );
13
14 let (event_tx, mut event_rx) = mpsc::channel::<(String, nostr_sdk::nostr::Event)>(100);
15 let (relay_msg_tx, mut relay_msg_rx) = mpsc::channel::<SignalingMessage>(100);
16 self.state
17 .set_direct_signaling_sender(Some(event_tx.clone()))
18 .await;
19
20 let mut signaling_rx = self
22 .signaling_rx
23 .take()
24 .expect("signaling_rx already taken");
25
26 let mut state_event_rx = self
28 .state_event_rx
29 .take()
30 .expect("state_event_rx already taken");
31 let mut mesh_frame_rx = self
32 .mesh_frame_rx
33 .take()
34 .expect("mesh_frame_rx already taken");
35
36 if self.config.bluetooth.is_enabled() {
37 let bluetooth = BluetoothMesh::new(self.config.bluetooth.clone());
38 let context = BluetoothRuntimeContext {
39 my_peer_id: self.my_peer_id.clone(),
40 store: if bluetooth_nostr_only_mode() {
41 None
42 } else {
43 self.store.clone()
44 },
45 nostr_relay: self.nostr_relay.clone(),
46 mesh_frame_tx: self.mesh_frame_tx.clone(),
47 registrar: BluetoothPeerRegistrar::new(
48 self.state.clone(),
49 self.peer_classifier.clone(),
50 self.config.pools.clone(),
51 self.config.bluetooth.max_peers,
52 ),
53 };
54 let _ = bluetooth.start(context).await;
55 }
56
57 let relay_transport = if self.config.signaling_enabled {
58 let transport = Arc::new(NostrRelayTransport::new(
59 self.keys.clone(),
60 self.config.debug,
61 ));
62 transport
63 .connect(&self.config.relays)
64 .await
65 .map_err(|e| anyhow::anyhow!(e.to_string()))?;
66
67 let relay_reader = transport.clone();
68 let relay_msg_tx = relay_msg_tx.clone();
69 tokio::spawn(async move {
70 while let Some(msg) = relay_reader.recv().await {
71 if relay_msg_tx.send(msg).await.is_err() {
72 break;
73 }
74 }
75 });
76
77 Some(transport)
78 } else {
79 None
80 };
81
82 if self.config.multicast.is_enabled() {
83 if let Some(relay) = self.nostr_relay.clone() {
84 let relay = relay as crate::SharedMeshEventStore;
85 match MulticastNostrBus::bind(
86 self.config.multicast.clone(),
87 self.keys.clone(),
88 relay,
89 )
90 .await
91 {
92 Ok(bus) => {
93 let local_bus: SharedLocalNostrBus = bus.clone();
94 self.state.add_local_bus(local_bus.clone()).await;
95 self.local_buses.push(local_bus);
96 let shutdown_rx = self.shutdown_rx.clone();
97 let signaling_tx = event_tx.clone();
98 tokio::spawn(async move {
99 if let Err(err) = bus.run(shutdown_rx, signaling_tx).await {
100 error!("Multicast bus error: {}", err);
101 }
102 });
103 }
104 Err(err) => {
105 warn!("Failed to start multicast bus: {}", err);
106 }
107 }
108 } else {
109 warn!("Multicast enabled but Nostr relay is unavailable");
110 }
111 }
112
113 if self.config.wifi_aware.is_enabled() {
114 if let Some(relay) = self.nostr_relay.clone() {
115 if let Some(bridge) = mobile_wifi_aware_bridge() {
116 let relay = relay as crate::SharedMeshEventStore;
117 let bus = WifiAwareNostrBus::new(
118 self.config.wifi_aware.clone(),
119 self.keys.clone(),
120 relay,
121 bridge,
122 );
123 let local_bus: SharedLocalNostrBus = bus.clone();
124 self.state.add_local_bus(local_bus.clone()).await;
125 self.local_buses.push(local_bus);
126 let shutdown_rx = self.shutdown_rx.clone();
127 let signaling_tx = event_tx.clone();
128 let local_peer_id = self.my_peer_id.to_string();
129 tokio::spawn(async move {
130 if let Err(err) = bus.run(local_peer_id, shutdown_rx, signaling_tx).await {
131 error!("Wi-Fi Aware bus error: {}", err);
132 }
133 });
134 } else {
135 warn!("Wi-Fi Aware enabled but no mobile bridge is installed");
136 }
137 } else {
138 warn!("Wi-Fi Aware enabled but Nostr relay is unavailable");
139 }
140 }
141
142 if self.config.signaling_enabled {
143 let transport = Arc::new(RouterSignalingBridge::new(
144 self.my_peer_id.to_string(),
145 self.signaling_tx.clone(),
146 ));
147 let factory = Arc::new(SharedRouterPeerFactory::new(
148 self.my_peer_id.clone(),
149 self.signaling_tx.clone(),
150 self.config.stun_servers.clone(),
151 self.store.clone(),
152 self.state.clone(),
153 self.state_event_tx.clone(),
154 self.nostr_relay.clone(),
155 self.mesh_frame_tx.clone(),
156 self.config.signal_urls.clone(),
157 self.peer_classifier.clone(),
158 ));
159 let (classifier_tx, mut classifier_rx) = mpsc::channel::<SharedClassifyRequest>(32);
160 let classifier = self.peer_classifier.clone();
161 tokio::spawn(async move {
162 while let Some(request) = classifier_rx.recv().await {
163 let _ = request.response.send(classifier(&request.pubkey));
164 }
165 });
166
167 let mut router = MeshRouter::new(
168 self.my_peer_id.to_string(),
169 transport,
170 factory.clone(),
171 self.config.pools.clone(),
172 self.config.debug,
173 );
174 router.set_classifier(classifier_tx);
175 router.set_hash_get_enabled(self.config.hash_get_enabled);
176 self.shared_router = Some(Arc::new(router));
177 }
178
179 let mut shutdown_rx = self.shutdown_rx.clone();
181 let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
183 let mut hello_ticker =
184 tokio::time::interval(Duration::from_millis(self.config.hello_interval_ms));
185 if self.config.signaling_enabled {
186 if let Some(shared_router) = self.shared_router.as_ref() {
187 let _ = shared_router.send_hello(Vec::new()).await;
188 }
189 }
190 loop {
191 tokio::select! {
192 _ = shutdown_rx.changed() => {
193 if *shutdown_rx.borrow() {
194 info!("WebRTC manager shutting down");
195 break;
196 }
197 }
198 Some(msg) = relay_msg_rx.recv() => {
199 if let Err(e) = self
200 .handle_signaling_message("relay", msg, self.shared_router.as_ref())
201 .await
202 {
203 debug!("Error handling relay signaling message: {}", e);
204 }
205 }
206 Some((relay, event)) = event_rx.recv() => {
207 if let Err(e) = self
208 .handle_event(&relay, &event, self.shared_router.as_ref())
209 .await
210 {
211 debug!("Error handling event from {}: {}", relay, e);
212 }
213 }
214 Some(msg) = signaling_rx.recv() => {
215 self.dispatch_signaling_message(msg, relay_transport.as_ref()).await;
216 }
217 Some(event) = state_event_rx.recv() => {
218 self.handle_peer_state_event(event).await;
220 }
221 Some((from_peer_id, frame)) = mesh_frame_rx.recv() => {
222 self.handle_mesh_frame(from_peer_id, frame).await;
223 }
224 _ = hello_ticker.tick(), if self.config.signaling_enabled => {
225 if let Some(shared_router) = self.shared_router.as_ref() {
226 let _ = shared_router.send_hello(Vec::new()).await;
227 }
228 }
229 _ = cleanup_interval.tick() => {
230 self.cleanup_stale_peers().await;
232 }
233 }
234 }
235
236 self.state.set_direct_signaling_sender(None).await;
237 Ok(())
238 }
239
240 async fn mark_seen_frame_id(&self, frame_id: String) -> bool {
241 let mut seen = self.seen_frame_ids.lock().await;
242 seen.insert_if_new(frame_id)
243 }
244
245 async fn mark_seen_event_id(&self, event_id: String) -> bool {
246 let mut seen = self.seen_event_ids.lock().await;
247 seen.insert_if_new(event_id)
248 }
249
250 async fn dispatch_signaling_message(
251 &self,
252 msg: SignalingMessage,
253 relay_transport: Option<&Arc<NostrRelayTransport>>,
254 ) {
255 self.dispatch_direct_signaling_message(&msg).await;
256 if let Err(err) = crate::dispatch_signaling_message(
257 self.config.signaling_enabled,
258 &self.keys,
259 &self.my_peer_id,
260 &self.state.runtime,
261 relay_transport,
262 &self.local_buses,
263 &self.seen_frame_ids,
264 &self.seen_event_ids,
265 msg.clone(),
266 MESH_SIGNALING_EVENT_KIND as u64,
267 )
268 .await
269 {
270 debug!("Failed to dispatch signaling message: {}", err);
271 }
272 }
273
274 async fn dispatch_direct_signaling_message(&self, msg: &SignalingMessage) {
275 let endpoints = self.direct_signaling_endpoints(msg).await;
276 if endpoints.is_empty() {
277 return;
278 }
279
280 let Ok(event) =
281 crate::create_signaling_event(&self.keys, msg, MESH_SIGNALING_EVENT_KIND as u64).await
282 else {
283 return;
284 };
285 let body = event.as_json();
286 for endpoint in endpoints {
287 let body = body.clone();
288 tokio::spawn(async move {
289 if let Err(err) = post_direct_signaling_event(&endpoint, &body).await {
290 debug!("Failed to post WebRTC signaling event to {endpoint}: {err}");
291 }
292 });
293 }
294 }
295
296 async fn direct_signaling_endpoints(&self, msg: &SignalingMessage) -> Vec<String> {
297 let target = msg.target_peer_id().map(str::to_string);
298 let mut endpoints = Vec::new();
299 for peer in self.state.ordered_known_peers().await {
300 if target
301 .as_deref()
302 .is_some_and(|target| target != peer.peer_id)
303 {
304 continue;
305 }
306 for signal_url in peer.signal_urls {
307 let endpoint = direct_signal_endpoint(&signal_url);
308 if !endpoints.contains(&endpoint) {
309 endpoints.push(endpoint);
310 }
311 }
312 }
313 endpoints
314 }
315
316 async fn forward_mesh_frame(
317 &self,
318 frame: &MeshNostrFrame,
319 exclude_peer_id: Option<&str>,
320 ) -> usize {
321 crate::forward_mesh_frame_from_runtime(&self.state.runtime, frame, exclude_peer_id).await
322 }
323
324 async fn handle_mesh_frame(&self, from_peer_id: PeerId, frame: MeshNostrFrame) {
325 if let Err(reason) = validate_mesh_frame(&frame) {
326 debug!(
327 "Ignoring mesh frame from {} (invalid: {})",
328 from_peer_id.short(),
329 reason
330 );
331 return;
332 }
333
334 if !self.mark_seen_frame_id(frame.frame_id.clone()).await {
335 self.state.record_mesh_duplicate_drop();
336 return;
337 }
338
339 let event = match &frame.payload {
340 MeshNostrPayload::Event { event } => event.clone(),
341 };
342
343 if !self.mark_seen_event_id(event.id.to_hex()).await {
344 self.state.record_mesh_duplicate_drop();
345 return;
346 }
347
348 if event.verify().is_err() {
349 debug!(
350 "Ignoring mesh event from {} due to invalid signature",
351 from_peer_id.short()
352 );
353 return;
354 }
355
356 self.state.record_mesh_received();
357
358 if let Err(e) = self
359 .handle_event("mesh", &event, self.shared_router.as_ref())
360 .await
361 {
362 debug!(
363 "Error handling mesh event from {}: {}",
364 from_peer_id.short(),
365 e
366 );
367 }
368
369 let forwarded = self
370 .forward_mesh_frame(&frame, Some(&from_peer_id.to_string()))
371 .await;
372 if forwarded > 0 {
373 self.state.record_mesh_forwarded(forwarded as u64);
374 }
375 }
376
377 async fn handle_event(
383 &self,
384 relay: &str,
385 event: &nostr_sdk::nostr::Event,
386 shared_router: Option<&Arc<SharedProductionRouter>>,
387 ) -> Result<()> {
388 crate::handle_signaling_event(
389 self.config.signaling_enabled,
390 &self.my_peer_id,
391 &self.keys,
392 &self.state.runtime,
393 relay,
394 self.local_bus_max_peers(relay),
395 event,
396 shared_router,
397 )
398 .await
399 }
400
401 async fn handle_signaling_message(
402 &self,
403 source: &str,
404 msg: SignalingMessage,
405 shared_router: Option<&Arc<SharedProductionRouter>>,
406 ) -> Result<()> {
407 crate::handle_signaling_message(
408 &self.state.runtime,
409 source,
410 self.local_bus_max_peers(source),
411 msg,
412 shared_router,
413 )
414 .await
415 }
416
417 async fn handle_peer_state_event(&self, event: PeerStateEvent) {
419 crate::handle_peer_state_event(&self.state.runtime, event, self.shared_router.as_ref())
420 .await;
421 }
422
423 async fn cleanup_stale_peers(&self) {
425 crate::cleanup_stale_peers(&self.state.runtime, Duration::from_secs(60)).await;
426 }
427}
428
429fn direct_signal_endpoint(base_url: &str) -> String {
430 format!("{}/api/p2p/signal", base_url.trim().trim_end_matches('/'))
431}
432
433async fn post_direct_signaling_event(endpoint: &str, body: &str) -> Result<()> {
434 let Some((host, port, path)) = parse_http_endpoint(endpoint) else {
435 anyhow::bail!("unsupported WebRTC signaling endpoint");
436 };
437 let mut stream = TcpStream::connect((host.as_str(), port)).await?;
438 let request = format!(
439 "POST {path} HTTP/1.1\r\nHost: {host}:{port}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
440 body.len()
441 );
442 stream.write_all(request.as_bytes()).await?;
443 stream.flush().await?;
444
445 let mut response = Vec::new();
446 let _ = tokio::time::timeout(Duration::from_secs(3), stream.read_to_end(&mut response)).await;
447 if response.starts_with(b"HTTP/1.1 2") || response.starts_with(b"HTTP/1.0 2") {
448 return Ok(());
449 }
450 anyhow::bail!("WebRTC signaling endpoint returned non-2xx response");
451}
452
453fn parse_http_endpoint(endpoint: &str) -> Option<(String, u16, String)> {
454 let rest = endpoint.strip_prefix("http://")?;
455 let (authority, path) = match rest.split_once('/') {
456 Some((authority, path)) => (authority, format!("/{path}")),
457 None => (rest, "/".to_string()),
458 };
459 let (host, port) = if let Some(stripped) = authority.strip_prefix('[') {
460 let (host, tail) = stripped.split_once(']')?;
461 let port = tail.strip_prefix(':')?.parse::<u16>().ok()?;
462 (host.to_string(), port)
463 } else if let Some((host, port)) = authority.rsplit_once(':') {
464 (host.to_string(), port.parse::<u16>().ok()?)
465 } else {
466 (authority.to_string(), 80)
467 };
468 if host.is_empty() {
469 return None;
470 }
471 Some((host, port, path))
472}
473
474#[allow(dead_code)]
476#[derive(Debug, Clone)]
477pub struct PeerState {
478 pub peer_id: PeerId,
479 pub direction: PeerDirection,
480 pub state: String,
481 pub last_seen: Instant,
482}
483
484#[cfg(test)]
485mod tests {
486 use super::*;
487 use crate::root_events::{self, PeerRootEvent};
488 use crate::session::TestMeshPeer;
489 use crate::LocalNostrBus;
490 use crate::SelectionStrategy;
491 use crate::{build_hedged_wave_plan, normalize_dispatch_config};
492 use anyhow::Result as AnyResult;
493 use async_trait::async_trait;
494 use nostr_sdk::nostr::{EventBuilder, Keys, Tag};
495 use std::time::Duration;
496
497 struct TestLocalBus {
498 source: &'static str,
499 root: Option<PeerRootEvent>,
500 }
501
502 #[async_trait]
503 impl LocalNostrBus for TestLocalBus {
504 fn source_name(&self) -> &'static str {
505 self.source
506 }
507
508 async fn broadcast_event(&self, _event: &nostr_sdk::nostr::Event) -> AnyResult<()> {
509 Ok(())
510 }
511
512 async fn query_root(
513 &self,
514 _owner_pubkey: &str,
515 _tree_name: &str,
516 _timeout: Duration,
517 ) -> Option<PeerRootEvent> {
518 self.root.clone()
519 }
520 }
521
522 #[test]
523 fn root_event_from_peer_extracts_tags() {
524 let keys = Keys::generate();
525 let hash = "ab".repeat(32);
526 let event = EventBuilder::new(
527 Kind::Custom(root_events::HASHTREE_KIND),
528 "",
529 [
530 Tag::parse(&["d", "repo"]).unwrap(),
531 Tag::parse(&["l", root_events::HASHTREE_LABEL]).unwrap(),
532 Tag::parse(&["hash", &hash]).unwrap(),
533 Tag::parse(&["encryptedKey", &"11".repeat(32)]).unwrap(),
534 ],
535 )
536 .to_event(&keys)
537 .unwrap();
538
539 let parsed = root_events::root_event_from_peer(&event, "peer-a", "repo").unwrap();
540 let expected_encrypted = "11".repeat(32);
541 assert_eq!(parsed.hash, hash);
542 assert_eq!(parsed.peer_id, "peer-a");
543 assert_eq!(
544 parsed.encrypted_key.as_deref(),
545 Some(expected_encrypted.as_str())
546 );
547 assert!(parsed.key.is_none());
548 }
549
550 #[test]
551 fn pick_latest_event_prefers_higher_event_id_on_timestamp_tie() {
552 let keys = Keys::generate();
553 let created_at = nostr_sdk::nostr::Timestamp::from_secs(1_700_000_000);
554 let event_a = EventBuilder::new(Kind::Custom(root_events::HASHTREE_KIND), "", [])
555 .custom_created_at(created_at)
556 .to_event(&keys)
557 .unwrap();
558 let event_b = EventBuilder::new(Kind::Custom(root_events::HASHTREE_KIND), "", [])
559 .custom_created_at(created_at)
560 .to_event(&keys)
561 .unwrap();
562
563 let expected = if event_a.id > event_b.id {
564 event_a.id
565 } else {
566 event_b.id
567 };
568 let picked = root_events::pick_latest_event([&event_a, &event_b]).unwrap();
569 assert_eq!(picked.id, expected);
570 }
571
572 #[tokio::test]
573 async fn resolve_root_from_local_buses_returns_source_and_first_match() {
574 let state = WebRTCState::new();
575 let root = PeerRootEvent {
576 hash: "ab".repeat(32),
577 key: None,
578 encrypted_key: None,
579 self_encrypted_key: None,
580 event_id: "event-1".to_string(),
581 created_at: 1,
582 peer_id: "bus-peer".to_string(),
583 };
584
585 state
586 .set_local_buses(vec![
587 Arc::new(TestLocalBus {
588 source: "empty",
589 root: None,
590 }),
591 Arc::new(TestLocalBus {
592 source: "mock-bus",
593 root: Some(root.clone()),
594 }),
595 ])
596 .await;
597
598 let resolved = state
599 .resolve_root_from_local_buses_with_source("owner", "tree", Duration::from_millis(10))
600 .await
601 .expect("expected root from local bus");
602
603 assert_eq!(resolved.0, "mock-bus");
604 assert_eq!(resolved.1.hash, root.hash);
605 assert_eq!(resolved.1.peer_id, root.peer_id);
606 }
607
608 #[tokio::test]
609 async fn can_track_local_bus_peer_enforces_wifi_aware_limit() {
610 let keys = Keys::generate();
611 let mut config = WebRTCConfig::default();
612 config.wifi_aware.enabled = true;
613 config.wifi_aware.max_peers = 1;
614 let manager = WebRTCManager::new(keys, config);
615 let existing_peer = PeerId::new("peer-a".to_string());
616 let existing_key = existing_peer.to_string();
617 let mut peers = HashMap::new();
618 peers.insert(
619 existing_key.clone(),
620 PeerEntry {
621 peer_id: existing_peer,
622 direction: PeerDirection::Outbound,
623 state: ConnectionState::Discovered,
624 last_seen: Instant::now(),
625 peer: None,
626 pool: PeerPool::Other,
627 transport: PeerTransport::WebRtc,
628 signal_paths: BTreeSet::from([PeerSignalPath::WifiAware]),
629 bytes_sent: 0,
630 bytes_received: 0,
631 },
632 );
633
634 assert!(manager.can_track_local_bus_peer(WIFI_AWARE_SOURCE, &existing_key, &peers,));
635 assert!(!manager.can_track_local_bus_peer(WIFI_AWARE_SOURCE, "peer-b:sess-b", &peers,));
636 assert!(manager.can_track_local_bus_peer("relay", "peer-c:sess-c", &peers));
637 }
638
639 #[tokio::test]
640 async fn request_from_peers_with_source_accepts_generic_mesh_peers() {
641 let state = WebRTCState::new();
642 let data = b"offline-over-ble".to_vec();
643 let hash_hex = hex::encode(hashtree_core::sha256(&data));
644
645 state.runtime.peers.write().await.insert(
646 "peer-a".to_string(),
647 PeerEntry {
648 peer_id: PeerId::new("peer-a-pub".to_string()),
649 direction: PeerDirection::Outbound,
650 state: ConnectionState::Connected,
651 last_seen: Instant::now(),
652 peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_response(Some(
653 data.clone(),
654 )))),
655 pool: PeerPool::Other,
656 transport: PeerTransport::Bluetooth,
657 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
658 bytes_sent: 0,
659 bytes_received: 0,
660 },
661 );
662
663 let resolved = state
664 .request_from_peers_with_source(&hash_hex)
665 .await
666 .expect("expected mock mesh peer response");
667
668 assert_eq!(resolved.0, data);
669 assert_eq!(resolved.1, "peer-a-pub");
670 }
671
672 #[tokio::test]
673 async fn request_from_peers_with_source_waits_full_timeout_for_last_generic_peer() {
674 let state = WebRTCState::new_with_routing_and_cashu(
675 SelectionStrategy::Weighted,
676 true,
677 RequestDispatchConfig {
678 initial_fanout: 1,
679 hedge_fanout: 1,
680 max_fanout: 1,
681 hedge_interval_ms: 50,
682 },
683 Duration::from_millis(400),
684 CashuRoutingConfig::default(),
685 None,
686 None,
687 );
688 let data = b"slow-offline-over-ble".to_vec();
689 let hash_hex = hex::encode(hashtree_core::sha256(&data));
690
691 state.runtime.peers.write().await.insert(
692 "peer-a".to_string(),
693 PeerEntry {
694 peer_id: PeerId::new("peer-a-pub".to_string()),
695 direction: PeerDirection::Outbound,
696 state: ConnectionState::Connected,
697 last_seen: Instant::now(),
698 peer: Some(MeshPeer::mock_for_tests(
699 TestMeshPeer::with_delayed_response(
700 Some(data.clone()),
701 Duration::from_millis(200),
702 ),
703 )),
704 pool: PeerPool::Other,
705 transport: PeerTransport::Bluetooth,
706 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
707 bytes_sent: 0,
708 bytes_received: 0,
709 },
710 );
711
712 let resolved = state
713 .request_from_peers_with_source(&hash_hex)
714 .await
715 .expect("expected delayed mock mesh peer response");
716
717 assert_eq!(resolved.0, data);
718 assert_eq!(resolved.1, "peer-a-pub");
719 }
720
721 #[tokio::test]
722 async fn request_from_peers_with_source_skips_peers_with_hash_get_disabled() {
723 let state = WebRTCState::new();
724 let capable_data = b"hash-get-capable".to_vec();
725 let capable_hash_hex = hex::encode(hashtree_core::sha256(&capable_data));
726
727 state.runtime.peers.write().await.insert(
728 "peer-assist".to_string(),
729 PeerEntry {
730 peer_id: PeerId::new("peer-assist-pub".to_string()),
731 direction: PeerDirection::Outbound,
732 state: ConnectionState::Connected,
733 last_seen: Instant::now(),
734 peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_response(Some(
735 b"assist-should-not-be-queried".to_vec(),
736 )))),
737 pool: PeerPool::Other,
738 transport: PeerTransport::Bluetooth,
739 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
740 bytes_sent: 0,
741 bytes_received: 0,
742 },
743 );
744 state
745 .runtime
746 .set_peer_hash_get("peer-assist-pub", false)
747 .await;
748
749 state.runtime.peers.write().await.insert(
750 "peer-capable".to_string(),
751 PeerEntry {
752 peer_id: PeerId::new("peer-capable-pub".to_string()),
753 direction: PeerDirection::Outbound,
754 state: ConnectionState::Connected,
755 last_seen: Instant::now(),
756 peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_response(Some(
757 capable_data.clone(),
758 )))),
759 pool: PeerPool::Other,
760 transport: PeerTransport::Bluetooth,
761 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
762 bytes_sent: 0,
763 bytes_received: 0,
764 },
765 );
766 state
767 .runtime
768 .set_peer_hash_get("peer-capable-pub", true)
769 .await;
770
771 let resolved = state
772 .request_from_peers_with_source(&capable_hash_hex)
773 .await
774 .expect("expected capable peer response");
775
776 assert_eq!(resolved.0, capable_data);
777 assert_eq!(resolved.1, "peer-capable-pub");
778 }
779
780 #[tokio::test]
781 async fn dispatch_signaling_message_is_noop_when_signaling_disabled() {
782 let keys = Keys::generate();
783 let mut config = WebRTCConfig::default();
784 config.signaling_enabled = false;
785 let manager = WebRTCManager::new(keys, config);
786 let peer_id = PeerId::new("peer-a-pub".to_string());
787 let peer_key = peer_id.to_string();
788 let peer = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
789 let peer_ref = peer.mock_ref().expect("mock peer").clone();
790
791 manager.state.runtime.peers.write().await.insert(
792 peer_key,
793 PeerEntry {
794 peer_id,
795 direction: PeerDirection::Outbound,
796 state: ConnectionState::Connected,
797 last_seen: Instant::now(),
798 peer: Some(peer),
799 pool: PeerPool::Other,
800 transport: PeerTransport::Bluetooth,
801 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
802 bytes_sent: 0,
803 bytes_received: 0,
804 },
805 );
806
807 manager
808 .dispatch_signaling_message(
809 SignalingMessage::Hello {
810 peer_id: manager.my_peer_id.to_string(),
811 roots: Vec::new(),
812 hash_get: true,
813 },
814 None,
815 )
816 .await;
817
818 assert_eq!(peer_ref.sent_frame_count().await, 0);
819 }
820
821 #[tokio::test]
822 async fn failed_peer_cleanup_does_not_hold_peer_map_lock_while_closing() {
823 let keys = Keys::generate();
824 let manager = Arc::new(WebRTCManager::new(keys, WebRTCConfig::default()));
825 let peer_id = PeerId::new("peer-a-pub".to_string());
826 let peer_key = peer_id.to_string();
827
828 manager.state.runtime.peers.write().await.insert(
829 peer_key.clone(),
830 PeerEntry {
831 peer_id: peer_id.clone(),
832 direction: PeerDirection::Outbound,
833 state: ConnectionState::Connected,
834 last_seen: Instant::now(),
835 peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_delayed_close(
836 Duration::from_millis(200),
837 ))),
838 pool: PeerPool::Other,
839 transport: PeerTransport::Bluetooth,
840 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
841 bytes_sent: 0,
842 bytes_received: 0,
843 },
844 );
845
846 let manager_for_task = manager.clone();
847 let peer_id_for_task = peer_id.clone();
848 let cleanup_task = tokio::spawn(async move {
849 manager_for_task
850 .handle_peer_state_event(PeerStateEvent::Failed(peer_id_for_task))
851 .await;
852 });
853
854 tokio::time::sleep(Duration::from_millis(20)).await;
855
856 let remaining = tokio::time::timeout(Duration::from_millis(50), async {
857 manager.state.runtime.peers.read().await.len()
858 })
859 .await
860 .expect("peer map read should not block on close");
861
862 assert_eq!(remaining, 0);
863 cleanup_task.await.expect("cleanup task");
864 }
865
866 #[tokio::test]
867 async fn resolve_root_from_peers_does_not_hold_peer_map_lock_while_querying() {
868 let keys = Keys::generate();
869 let manager = Arc::new(WebRTCManager::new(keys.clone(), WebRTCConfig::default()));
870 let owner_keys = Keys::generate();
871 let owner_pubkey = owner_keys.public_key().to_hex();
872 let tree_name = "video";
873 let hash = "ab".repeat(32);
874 let event = EventBuilder::new(
875 Kind::Custom(root_events::HASHTREE_KIND),
876 "",
877 [
878 Tag::parse(&["d", tree_name]).unwrap(),
879 Tag::parse(&["l", root_events::HASHTREE_LABEL]).unwrap(),
880 Tag::parse(&["hash", &hash]).unwrap(),
881 ],
882 )
883 .to_event(&owner_keys)
884 .unwrap();
885
886 let peer_id = PeerId::new("peer-a-pub".to_string());
887 let peer_key = peer_id.to_string();
888
889 manager.state.runtime.peers.write().await.insert(
890 peer_key.clone(),
891 PeerEntry {
892 peer_id,
893 direction: PeerDirection::Outbound,
894 state: ConnectionState::Connected,
895 last_seen: Instant::now(),
896 peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_delayed_events(
897 vec![event],
898 Duration::from_millis(200),
899 ))),
900 pool: PeerPool::Other,
901 transport: PeerTransport::Bluetooth,
902 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
903 bytes_sent: 0,
904 bytes_received: 0,
905 },
906 );
907
908 let manager_for_task = manager.clone();
909 let owner_pubkey_for_task = owner_pubkey.clone();
910 let resolve_task = tokio::spawn(async move {
911 manager_for_task
912 .state
913 .resolve_root_from_peers(
914 &owner_pubkey_for_task,
915 tree_name,
916 Duration::from_millis(500),
917 )
918 .await
919 });
920
921 tokio::time::sleep(Duration::from_millis(20)).await;
922
923 let manager_for_writer = manager.clone();
924 let peer_key_for_writer = peer_key.clone();
925 let writer_task = tokio::spawn(async move {
926 let mut peers = manager_for_writer.state.runtime.peers.write().await;
927 if let Some(entry) = peers.get_mut(&peer_key_for_writer) {
928 entry.bytes_received += 1;
929 }
930 });
931
932 tokio::time::sleep(Duration::from_millis(20)).await;
933
934 let status_count = tokio::time::timeout(Duration::from_millis(50), async {
935 manager.state.runtime.peers.read().await.len()
936 })
937 .await
938 .expect("peer map read should not block on root query");
939
940 assert_eq!(status_count, 1);
941 assert!(resolve_task.await.expect("resolve task").is_some());
942 writer_task.await.expect("writer task");
943 }
944
945 #[test]
946 fn test_formal_timed_seen_set_rejects_duplicates() {
947 let mut seen = TimedSeenSet::new(4, Duration::from_secs(60));
948 assert!(seen.insert_if_new("frame-1".to_string()));
949 assert!(!seen.insert_if_new("frame-1".to_string()));
950 assert!(seen.insert_if_new("frame-2".to_string()));
951 }
952
953 #[test]
954 fn test_formal_timed_seen_set_evicts_oldest_when_capacity_exceeded() {
955 let mut seen = TimedSeenSet::new(2, Duration::from_secs(60));
956 assert!(seen.insert_if_new("a".to_string()));
957 assert!(seen.insert_if_new("b".to_string()));
958 assert!(seen.insert_if_new("c".to_string()));
959
960 assert!(seen.insert_if_new("a".to_string()));
962 assert!(!seen.insert_if_new("a".to_string()));
963 }
964
965 #[test]
966 fn test_request_dispatch_normalization_caps_to_available_peers() {
967 let normalized = normalize_dispatch_config(
968 RequestDispatchConfig {
969 initial_fanout: 8,
970 hedge_fanout: 6,
971 max_fanout: 5,
972 hedge_interval_ms: 120,
973 },
974 3,
975 );
976 assert_eq!(normalized.max_fanout, 3);
977 assert_eq!(normalized.initial_fanout, 3);
978 assert_eq!(normalized.hedge_fanout, 3);
979 }
980
981 #[test]
982 fn test_hedged_wave_plan_matches_dispatch_policy() {
983 let plan = build_hedged_wave_plan(
984 7,
985 RequestDispatchConfig {
986 initial_fanout: 2,
987 hedge_fanout: 3,
988 max_fanout: 6,
989 hedge_interval_ms: 120,
990 },
991 );
992 assert_eq!(plan, vec![2, 3, 1]);
993 }
994}