1use super::*;
2
3impl WebRTCManager {
4 pub async fn run(&mut self) -> Result<()> {
6 info!(
7 "Starting peer router with peer ID: {}",
8 self.my_peer_id.short()
9 );
10
11 let (event_tx, mut event_rx) = mpsc::channel::<(String, nostr_sdk::nostr::Event)>(100);
12 let (relay_msg_tx, mut relay_msg_rx) = mpsc::channel::<SignalingMessage>(100);
13
14 let mut signaling_rx = self
16 .signaling_rx
17 .take()
18 .expect("signaling_rx already taken");
19
20 let mut state_event_rx = self
22 .state_event_rx
23 .take()
24 .expect("state_event_rx already taken");
25 let mut mesh_frame_rx = self
26 .mesh_frame_rx
27 .take()
28 .expect("mesh_frame_rx already taken");
29
30 if self.config.bluetooth.is_enabled() {
31 let bluetooth = BluetoothMesh::new(self.config.bluetooth.clone());
32 let context = BluetoothRuntimeContext {
33 my_peer_id: self.my_peer_id.clone(),
34 store: if bluetooth_nostr_only_mode() {
35 None
36 } else {
37 self.store.clone()
38 },
39 nostr_relay: self.nostr_relay.clone(),
40 mesh_frame_tx: self.mesh_frame_tx.clone(),
41 registrar: BluetoothPeerRegistrar::new(
42 self.state.clone(),
43 self.peer_classifier.clone(),
44 self.config.pools.clone(),
45 self.config.bluetooth.max_peers,
46 ),
47 };
48 let _ = bluetooth.start(context).await;
49 }
50
51 let relay_transport = if self.config.signaling_enabled {
52 let transport = Arc::new(NostrRelayTransport::new(
53 self.keys.clone(),
54 self.config.debug,
55 ));
56 transport
57 .connect(&self.config.relays)
58 .await
59 .map_err(|e| anyhow::anyhow!(e.to_string()))?;
60
61 let relay_reader = transport.clone();
62 let relay_msg_tx = relay_msg_tx.clone();
63 tokio::spawn(async move {
64 while let Some(msg) = relay_reader.recv().await {
65 if relay_msg_tx.send(msg).await.is_err() {
66 break;
67 }
68 }
69 });
70
71 Some(transport)
72 } else {
73 None
74 };
75
76 if self.config.multicast.is_enabled() {
77 if let Some(relay) = self.nostr_relay.clone() {
78 let relay = relay as crate::SharedMeshEventStore;
79 match MulticastNostrBus::bind(
80 self.config.multicast.clone(),
81 self.keys.clone(),
82 relay,
83 )
84 .await
85 {
86 Ok(bus) => {
87 let local_bus: SharedLocalNostrBus = bus.clone();
88 self.state.add_local_bus(local_bus.clone()).await;
89 self.local_buses.push(local_bus);
90 let shutdown_rx = self.shutdown_rx.clone();
91 let signaling_tx = event_tx.clone();
92 tokio::spawn(async move {
93 if let Err(err) = bus.run(shutdown_rx, signaling_tx).await {
94 error!("Multicast bus error: {}", err);
95 }
96 });
97 }
98 Err(err) => {
99 warn!("Failed to start multicast bus: {}", err);
100 }
101 }
102 } else {
103 warn!("Multicast enabled but Nostr relay is unavailable");
104 }
105 }
106
107 if self.config.wifi_aware.is_enabled() {
108 if let Some(relay) = self.nostr_relay.clone() {
109 if let Some(bridge) = mobile_wifi_aware_bridge() {
110 let relay = relay as crate::SharedMeshEventStore;
111 let bus = WifiAwareNostrBus::new(
112 self.config.wifi_aware.clone(),
113 self.keys.clone(),
114 relay,
115 bridge,
116 );
117 let local_bus: SharedLocalNostrBus = bus.clone();
118 self.state.add_local_bus(local_bus.clone()).await;
119 self.local_buses.push(local_bus);
120 let shutdown_rx = self.shutdown_rx.clone();
121 let signaling_tx = event_tx.clone();
122 let local_peer_id = self.my_peer_id.to_string();
123 tokio::spawn(async move {
124 if let Err(err) = bus.run(local_peer_id, shutdown_rx, signaling_tx).await {
125 error!("Wi-Fi Aware bus error: {}", err);
126 }
127 });
128 } else {
129 warn!("Wi-Fi Aware enabled but no mobile bridge is installed");
130 }
131 } else {
132 warn!("Wi-Fi Aware enabled but Nostr relay is unavailable");
133 }
134 }
135
136 if self.config.signaling_enabled {
137 let transport = Arc::new(RouterSignalingBridge::new(
138 self.my_peer_id.to_string(),
139 self.signaling_tx.clone(),
140 ));
141 let factory = Arc::new(SharedRouterPeerFactory::new(
142 self.my_peer_id.clone(),
143 self.signaling_tx.clone(),
144 self.config.stun_servers.clone(),
145 self.store.clone(),
146 self.state.clone(),
147 self.state_event_tx.clone(),
148 self.nostr_relay.clone(),
149 self.mesh_frame_tx.clone(),
150 self.peer_classifier.clone(),
151 ));
152 let (classifier_tx, mut classifier_rx) = mpsc::channel::<SharedClassifyRequest>(32);
153 let classifier = self.peer_classifier.clone();
154 tokio::spawn(async move {
155 while let Some(request) = classifier_rx.recv().await {
156 let _ = request.response.send(classifier(&request.pubkey));
157 }
158 });
159
160 let mut router = MeshRouter::new(
161 self.my_peer_id.to_string(),
162 transport,
163 factory.clone(),
164 self.config.pools.clone(),
165 self.config.debug,
166 );
167 router.set_classifier(classifier_tx);
168 router.set_hash_get_enabled(self.config.hash_get_enabled);
169 self.shared_router = Some(Arc::new(router));
170 }
171
172 let mut shutdown_rx = self.shutdown_rx.clone();
174 let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
176 let mut hello_ticker =
177 tokio::time::interval(Duration::from_millis(self.config.hello_interval_ms));
178 if self.config.signaling_enabled {
179 if let Some(shared_router) = self.shared_router.as_ref() {
180 let _ = shared_router.send_hello(Vec::new()).await;
181 }
182 }
183 loop {
184 tokio::select! {
185 _ = shutdown_rx.changed() => {
186 if *shutdown_rx.borrow() {
187 info!("WebRTC manager shutting down");
188 break;
189 }
190 }
191 Some(msg) = relay_msg_rx.recv() => {
192 if let Err(e) = self
193 .handle_signaling_message("relay", msg, self.shared_router.as_ref())
194 .await
195 {
196 debug!("Error handling relay signaling message: {}", e);
197 }
198 }
199 Some((relay, event)) = event_rx.recv() => {
200 if let Err(e) = self
201 .handle_event(&relay, &event, self.shared_router.as_ref())
202 .await
203 {
204 debug!("Error handling event from {}: {}", relay, e);
205 }
206 }
207 Some(msg) = signaling_rx.recv() => {
208 self.dispatch_signaling_message(msg, relay_transport.as_ref()).await;
209 }
210 Some(event) = state_event_rx.recv() => {
211 self.handle_peer_state_event(event).await;
213 }
214 Some((from_peer_id, frame)) = mesh_frame_rx.recv() => {
215 self.handle_mesh_frame(from_peer_id, frame).await;
216 }
217 _ = hello_ticker.tick(), if self.config.signaling_enabled => {
218 if let Some(shared_router) = self.shared_router.as_ref() {
219 let _ = shared_router.send_hello(Vec::new()).await;
220 }
221 }
222 _ = cleanup_interval.tick() => {
223 self.cleanup_stale_peers().await;
225 }
226 }
227 }
228
229 Ok(())
230 }
231
232 async fn mark_seen_frame_id(&self, frame_id: String) -> bool {
233 let mut seen = self.seen_frame_ids.lock().await;
234 seen.insert_if_new(frame_id)
235 }
236
237 async fn mark_seen_event_id(&self, event_id: String) -> bool {
238 let mut seen = self.seen_event_ids.lock().await;
239 seen.insert_if_new(event_id)
240 }
241
242 async fn dispatch_signaling_message(
243 &self,
244 msg: SignalingMessage,
245 relay_transport: Option<&Arc<NostrRelayTransport>>,
246 ) {
247 if let Err(err) = crate::dispatch_signaling_message(
248 self.config.signaling_enabled,
249 &self.keys,
250 &self.my_peer_id,
251 &self.state.runtime,
252 relay_transport,
253 &self.local_buses,
254 &self.seen_frame_ids,
255 &self.seen_event_ids,
256 msg,
257 MESH_SIGNALING_EVENT_KIND as u64,
258 )
259 .await
260 {
261 debug!("Failed to dispatch signaling message: {}", err);
262 }
263 }
264
265 async fn forward_mesh_frame(
266 &self,
267 frame: &MeshNostrFrame,
268 exclude_peer_id: Option<&str>,
269 ) -> usize {
270 crate::forward_mesh_frame_from_runtime(&self.state.runtime, frame, exclude_peer_id).await
271 }
272
273 async fn handle_mesh_frame(&self, from_peer_id: PeerId, frame: MeshNostrFrame) {
274 if let Err(reason) = validate_mesh_frame(&frame) {
275 debug!(
276 "Ignoring mesh frame from {} (invalid: {})",
277 from_peer_id.short(),
278 reason
279 );
280 return;
281 }
282
283 if !self.mark_seen_frame_id(frame.frame_id.clone()).await {
284 self.state.record_mesh_duplicate_drop();
285 return;
286 }
287
288 let event = match &frame.payload {
289 MeshNostrPayload::Event { event } => event.clone(),
290 };
291
292 if !self.mark_seen_event_id(event.id.to_hex()).await {
293 self.state.record_mesh_duplicate_drop();
294 return;
295 }
296
297 if event.verify().is_err() {
298 debug!(
299 "Ignoring mesh event from {} due to invalid signature",
300 from_peer_id.short()
301 );
302 return;
303 }
304
305 self.state.record_mesh_received();
306
307 if let Err(e) = self
308 .handle_event("mesh", &event, self.shared_router.as_ref())
309 .await
310 {
311 debug!(
312 "Error handling mesh event from {}: {}",
313 from_peer_id.short(),
314 e
315 );
316 }
317
318 let forwarded = self
319 .forward_mesh_frame(&frame, Some(&from_peer_id.to_string()))
320 .await;
321 if forwarded > 0 {
322 self.state.record_mesh_forwarded(forwarded as u64);
323 }
324 }
325
326 async fn handle_event(
332 &self,
333 relay: &str,
334 event: &nostr_sdk::nostr::Event,
335 shared_router: Option<&Arc<SharedProductionRouter>>,
336 ) -> Result<()> {
337 crate::handle_signaling_event(
338 self.config.signaling_enabled,
339 &self.my_peer_id,
340 &self.keys,
341 &self.state.runtime,
342 relay,
343 self.local_bus_max_peers(relay),
344 event,
345 shared_router,
346 )
347 .await
348 }
349
350 async fn handle_signaling_message(
351 &self,
352 source: &str,
353 msg: SignalingMessage,
354 shared_router: Option<&Arc<SharedProductionRouter>>,
355 ) -> Result<()> {
356 crate::handle_signaling_message(
357 &self.state.runtime,
358 source,
359 self.local_bus_max_peers(source),
360 msg,
361 shared_router,
362 )
363 .await
364 }
365
366 async fn handle_peer_state_event(&self, event: PeerStateEvent) {
368 crate::handle_peer_state_event(&self.state.runtime, event, self.shared_router.as_ref())
369 .await;
370 }
371
372 async fn cleanup_stale_peers(&self) {
374 crate::cleanup_stale_peers(&self.state.runtime, Duration::from_secs(60)).await;
375 }
376}
377
378#[allow(dead_code)]
380#[derive(Debug, Clone)]
381pub struct PeerState {
382 pub peer_id: PeerId,
383 pub direction: PeerDirection,
384 pub state: String,
385 pub last_seen: Instant,
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391 use crate::root_events::{self, PeerRootEvent};
392 use crate::session::TestMeshPeer;
393 use crate::LocalNostrBus;
394 use crate::SelectionStrategy;
395 use crate::{build_hedged_wave_plan, normalize_dispatch_config};
396 use anyhow::Result as AnyResult;
397 use async_trait::async_trait;
398 use nostr_sdk::nostr::{EventBuilder, Keys, Tag};
399 use std::time::Duration;
400
401 struct TestLocalBus {
402 source: &'static str,
403 root: Option<PeerRootEvent>,
404 }
405
406 #[async_trait]
407 impl LocalNostrBus for TestLocalBus {
408 fn source_name(&self) -> &'static str {
409 self.source
410 }
411
412 async fn broadcast_event(&self, _event: &nostr_sdk::nostr::Event) -> AnyResult<()> {
413 Ok(())
414 }
415
416 async fn query_root(
417 &self,
418 _owner_pubkey: &str,
419 _tree_name: &str,
420 _timeout: Duration,
421 ) -> Option<PeerRootEvent> {
422 self.root.clone()
423 }
424 }
425
426 #[test]
427 fn root_event_from_peer_extracts_tags() {
428 let keys = Keys::generate();
429 let hash = "ab".repeat(32);
430 let event = EventBuilder::new(
431 Kind::Custom(root_events::HASHTREE_KIND),
432 "",
433 [
434 Tag::parse(&["d", "repo"]).unwrap(),
435 Tag::parse(&["l", root_events::HASHTREE_LABEL]).unwrap(),
436 Tag::parse(&["hash", &hash]).unwrap(),
437 Tag::parse(&["encryptedKey", &"11".repeat(32)]).unwrap(),
438 ],
439 )
440 .to_event(&keys)
441 .unwrap();
442
443 let parsed = root_events::root_event_from_peer(&event, "peer-a", "repo").unwrap();
444 let expected_encrypted = "11".repeat(32);
445 assert_eq!(parsed.hash, hash);
446 assert_eq!(parsed.peer_id, "peer-a");
447 assert_eq!(
448 parsed.encrypted_key.as_deref(),
449 Some(expected_encrypted.as_str())
450 );
451 assert!(parsed.key.is_none());
452 }
453
454 #[test]
455 fn pick_latest_event_prefers_higher_event_id_on_timestamp_tie() {
456 let keys = Keys::generate();
457 let created_at = nostr_sdk::nostr::Timestamp::from_secs(1_700_000_000);
458 let event_a = EventBuilder::new(Kind::Custom(root_events::HASHTREE_KIND), "", [])
459 .custom_created_at(created_at)
460 .to_event(&keys)
461 .unwrap();
462 let event_b = EventBuilder::new(Kind::Custom(root_events::HASHTREE_KIND), "", [])
463 .custom_created_at(created_at)
464 .to_event(&keys)
465 .unwrap();
466
467 let expected = if event_a.id > event_b.id {
468 event_a.id
469 } else {
470 event_b.id
471 };
472 let picked = root_events::pick_latest_event([&event_a, &event_b]).unwrap();
473 assert_eq!(picked.id, expected);
474 }
475
476 #[tokio::test]
477 async fn resolve_root_from_local_buses_returns_source_and_first_match() {
478 let state = WebRTCState::new();
479 let root = PeerRootEvent {
480 hash: "ab".repeat(32),
481 key: None,
482 encrypted_key: None,
483 self_encrypted_key: None,
484 event_id: "event-1".to_string(),
485 created_at: 1,
486 peer_id: "bus-peer".to_string(),
487 };
488
489 state
490 .set_local_buses(vec![
491 Arc::new(TestLocalBus {
492 source: "empty",
493 root: None,
494 }),
495 Arc::new(TestLocalBus {
496 source: "mock-bus",
497 root: Some(root.clone()),
498 }),
499 ])
500 .await;
501
502 let resolved = state
503 .resolve_root_from_local_buses_with_source("owner", "tree", Duration::from_millis(10))
504 .await
505 .expect("expected root from local bus");
506
507 assert_eq!(resolved.0, "mock-bus");
508 assert_eq!(resolved.1.hash, root.hash);
509 assert_eq!(resolved.1.peer_id, root.peer_id);
510 }
511
512 #[tokio::test]
513 async fn can_track_local_bus_peer_enforces_wifi_aware_limit() {
514 let keys = Keys::generate();
515 let mut config = WebRTCConfig::default();
516 config.wifi_aware.enabled = true;
517 config.wifi_aware.max_peers = 1;
518 let manager = WebRTCManager::new(keys, config);
519 let existing_peer = PeerId::new("peer-a".to_string());
520 let existing_key = existing_peer.to_string();
521 let mut peers = HashMap::new();
522 peers.insert(
523 existing_key.clone(),
524 PeerEntry {
525 peer_id: existing_peer,
526 direction: PeerDirection::Outbound,
527 state: ConnectionState::Discovered,
528 last_seen: Instant::now(),
529 peer: None,
530 pool: PeerPool::Other,
531 transport: PeerTransport::WebRtc,
532 signal_paths: BTreeSet::from([PeerSignalPath::WifiAware]),
533 bytes_sent: 0,
534 bytes_received: 0,
535 },
536 );
537
538 assert!(manager.can_track_local_bus_peer(WIFI_AWARE_SOURCE, &existing_key, &peers,));
539 assert!(!manager.can_track_local_bus_peer(WIFI_AWARE_SOURCE, "peer-b:sess-b", &peers,));
540 assert!(manager.can_track_local_bus_peer("relay", "peer-c:sess-c", &peers));
541 }
542
543 #[tokio::test]
544 async fn request_from_peers_with_source_accepts_generic_mesh_peers() {
545 let state = WebRTCState::new();
546 let data = b"offline-over-ble".to_vec();
547 let hash_hex = hex::encode(hashtree_core::sha256(&data));
548
549 state.runtime.peers.write().await.insert(
550 "peer-a".to_string(),
551 PeerEntry {
552 peer_id: PeerId::new("peer-a-pub".to_string()),
553 direction: PeerDirection::Outbound,
554 state: ConnectionState::Connected,
555 last_seen: Instant::now(),
556 peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_response(Some(
557 data.clone(),
558 )))),
559 pool: PeerPool::Other,
560 transport: PeerTransport::Bluetooth,
561 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
562 bytes_sent: 0,
563 bytes_received: 0,
564 },
565 );
566
567 let resolved = state
568 .request_from_peers_with_source(&hash_hex)
569 .await
570 .expect("expected mock mesh peer response");
571
572 assert_eq!(resolved.0, data);
573 assert_eq!(resolved.1, "peer-a-pub");
574 }
575
576 #[tokio::test]
577 async fn request_from_peers_with_source_waits_full_timeout_for_last_generic_peer() {
578 let state = WebRTCState::new_with_routing_and_cashu(
579 SelectionStrategy::Weighted,
580 true,
581 RequestDispatchConfig {
582 initial_fanout: 1,
583 hedge_fanout: 1,
584 max_fanout: 1,
585 hedge_interval_ms: 50,
586 },
587 Duration::from_millis(400),
588 CashuRoutingConfig::default(),
589 None,
590 None,
591 );
592 let data = b"slow-offline-over-ble".to_vec();
593 let hash_hex = hex::encode(hashtree_core::sha256(&data));
594
595 state.runtime.peers.write().await.insert(
596 "peer-a".to_string(),
597 PeerEntry {
598 peer_id: PeerId::new("peer-a-pub".to_string()),
599 direction: PeerDirection::Outbound,
600 state: ConnectionState::Connected,
601 last_seen: Instant::now(),
602 peer: Some(MeshPeer::mock_for_tests(
603 TestMeshPeer::with_delayed_response(
604 Some(data.clone()),
605 Duration::from_millis(200),
606 ),
607 )),
608 pool: PeerPool::Other,
609 transport: PeerTransport::Bluetooth,
610 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
611 bytes_sent: 0,
612 bytes_received: 0,
613 },
614 );
615
616 let resolved = state
617 .request_from_peers_with_source(&hash_hex)
618 .await
619 .expect("expected delayed mock mesh peer response");
620
621 assert_eq!(resolved.0, data);
622 assert_eq!(resolved.1, "peer-a-pub");
623 }
624
625 #[tokio::test]
626 async fn request_from_peers_with_source_skips_peers_with_hash_get_disabled() {
627 let state = WebRTCState::new();
628 let capable_data = b"hash-get-capable".to_vec();
629 let capable_hash_hex = hex::encode(hashtree_core::sha256(&capable_data));
630
631 state.runtime.peers.write().await.insert(
632 "peer-assist".to_string(),
633 PeerEntry {
634 peer_id: PeerId::new("peer-assist-pub".to_string()),
635 direction: PeerDirection::Outbound,
636 state: ConnectionState::Connected,
637 last_seen: Instant::now(),
638 peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_response(Some(
639 b"assist-should-not-be-queried".to_vec(),
640 )))),
641 pool: PeerPool::Other,
642 transport: PeerTransport::Bluetooth,
643 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
644 bytes_sent: 0,
645 bytes_received: 0,
646 },
647 );
648 state
649 .runtime
650 .set_peer_hash_get("peer-assist-pub", false)
651 .await;
652
653 state.runtime.peers.write().await.insert(
654 "peer-capable".to_string(),
655 PeerEntry {
656 peer_id: PeerId::new("peer-capable-pub".to_string()),
657 direction: PeerDirection::Outbound,
658 state: ConnectionState::Connected,
659 last_seen: Instant::now(),
660 peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_response(Some(
661 capable_data.clone(),
662 )))),
663 pool: PeerPool::Other,
664 transport: PeerTransport::Bluetooth,
665 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
666 bytes_sent: 0,
667 bytes_received: 0,
668 },
669 );
670 state
671 .runtime
672 .set_peer_hash_get("peer-capable-pub", true)
673 .await;
674
675 let resolved = state
676 .request_from_peers_with_source(&capable_hash_hex)
677 .await
678 .expect("expected capable peer response");
679
680 assert_eq!(resolved.0, capable_data);
681 assert_eq!(resolved.1, "peer-capable-pub");
682 }
683
684 #[tokio::test]
685 async fn dispatch_signaling_message_is_noop_when_signaling_disabled() {
686 let keys = Keys::generate();
687 let mut config = WebRTCConfig::default();
688 config.signaling_enabled = false;
689 let manager = WebRTCManager::new(keys, config);
690 let peer_id = PeerId::new("peer-a-pub".to_string());
691 let peer_key = peer_id.to_string();
692 let peer = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
693 let peer_ref = peer.mock_ref().expect("mock peer").clone();
694
695 manager.state.runtime.peers.write().await.insert(
696 peer_key,
697 PeerEntry {
698 peer_id,
699 direction: PeerDirection::Outbound,
700 state: ConnectionState::Connected,
701 last_seen: Instant::now(),
702 peer: Some(peer),
703 pool: PeerPool::Other,
704 transport: PeerTransport::Bluetooth,
705 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
706 bytes_sent: 0,
707 bytes_received: 0,
708 },
709 );
710
711 manager
712 .dispatch_signaling_message(
713 SignalingMessage::Hello {
714 peer_id: manager.my_peer_id.to_string(),
715 roots: Vec::new(),
716 hash_get: true,
717 },
718 None,
719 )
720 .await;
721
722 assert_eq!(peer_ref.sent_frame_count().await, 0);
723 }
724
725 #[tokio::test]
726 async fn failed_peer_cleanup_does_not_hold_peer_map_lock_while_closing() {
727 let keys = Keys::generate();
728 let manager = Arc::new(WebRTCManager::new(keys, WebRTCConfig::default()));
729 let peer_id = PeerId::new("peer-a-pub".to_string());
730 let peer_key = peer_id.to_string();
731
732 manager.state.runtime.peers.write().await.insert(
733 peer_key.clone(),
734 PeerEntry {
735 peer_id: peer_id.clone(),
736 direction: PeerDirection::Outbound,
737 state: ConnectionState::Connected,
738 last_seen: Instant::now(),
739 peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_delayed_close(
740 Duration::from_millis(200),
741 ))),
742 pool: PeerPool::Other,
743 transport: PeerTransport::Bluetooth,
744 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
745 bytes_sent: 0,
746 bytes_received: 0,
747 },
748 );
749
750 let manager_for_task = manager.clone();
751 let peer_id_for_task = peer_id.clone();
752 let cleanup_task = tokio::spawn(async move {
753 manager_for_task
754 .handle_peer_state_event(PeerStateEvent::Failed(peer_id_for_task))
755 .await;
756 });
757
758 tokio::time::sleep(Duration::from_millis(20)).await;
759
760 let remaining = tokio::time::timeout(Duration::from_millis(50), async {
761 manager.state.runtime.peers.read().await.len()
762 })
763 .await
764 .expect("peer map read should not block on close");
765
766 assert_eq!(remaining, 0);
767 cleanup_task.await.expect("cleanup task");
768 }
769
770 #[tokio::test]
771 async fn resolve_root_from_peers_does_not_hold_peer_map_lock_while_querying() {
772 let keys = Keys::generate();
773 let manager = Arc::new(WebRTCManager::new(keys.clone(), WebRTCConfig::default()));
774 let owner_keys = Keys::generate();
775 let owner_pubkey = owner_keys.public_key().to_hex();
776 let tree_name = "video";
777 let hash = "ab".repeat(32);
778 let event = EventBuilder::new(
779 Kind::Custom(root_events::HASHTREE_KIND),
780 "",
781 [
782 Tag::parse(&["d", tree_name]).unwrap(),
783 Tag::parse(&["l", root_events::HASHTREE_LABEL]).unwrap(),
784 Tag::parse(&["hash", &hash]).unwrap(),
785 ],
786 )
787 .to_event(&owner_keys)
788 .unwrap();
789
790 let peer_id = PeerId::new("peer-a-pub".to_string());
791 let peer_key = peer_id.to_string();
792
793 manager.state.runtime.peers.write().await.insert(
794 peer_key.clone(),
795 PeerEntry {
796 peer_id,
797 direction: PeerDirection::Outbound,
798 state: ConnectionState::Connected,
799 last_seen: Instant::now(),
800 peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_delayed_events(
801 vec![event],
802 Duration::from_millis(200),
803 ))),
804 pool: PeerPool::Other,
805 transport: PeerTransport::Bluetooth,
806 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
807 bytes_sent: 0,
808 bytes_received: 0,
809 },
810 );
811
812 let manager_for_task = manager.clone();
813 let owner_pubkey_for_task = owner_pubkey.clone();
814 let resolve_task = tokio::spawn(async move {
815 manager_for_task
816 .state
817 .resolve_root_from_peers(
818 &owner_pubkey_for_task,
819 tree_name,
820 Duration::from_millis(500),
821 )
822 .await
823 });
824
825 tokio::time::sleep(Duration::from_millis(20)).await;
826
827 let manager_for_writer = manager.clone();
828 let peer_key_for_writer = peer_key.clone();
829 let writer_task = tokio::spawn(async move {
830 let mut peers = manager_for_writer.state.runtime.peers.write().await;
831 if let Some(entry) = peers.get_mut(&peer_key_for_writer) {
832 entry.bytes_received += 1;
833 }
834 });
835
836 tokio::time::sleep(Duration::from_millis(20)).await;
837
838 let status_count = tokio::time::timeout(Duration::from_millis(50), async {
839 manager.state.runtime.peers.read().await.len()
840 })
841 .await
842 .expect("peer map read should not block on root query");
843
844 assert_eq!(status_count, 1);
845 assert!(resolve_task.await.expect("resolve task").is_some());
846 writer_task.await.expect("writer task");
847 }
848
849 #[test]
850 fn test_formal_timed_seen_set_rejects_duplicates() {
851 let mut seen = TimedSeenSet::new(4, Duration::from_secs(60));
852 assert!(seen.insert_if_new("frame-1".to_string()));
853 assert!(!seen.insert_if_new("frame-1".to_string()));
854 assert!(seen.insert_if_new("frame-2".to_string()));
855 }
856
857 #[test]
858 fn test_formal_timed_seen_set_evicts_oldest_when_capacity_exceeded() {
859 let mut seen = TimedSeenSet::new(2, Duration::from_secs(60));
860 assert!(seen.insert_if_new("a".to_string()));
861 assert!(seen.insert_if_new("b".to_string()));
862 assert!(seen.insert_if_new("c".to_string()));
863
864 assert!(seen.insert_if_new("a".to_string()));
866 assert!(!seen.insert_if_new("a".to_string()));
867 }
868
869 #[test]
870 fn test_request_dispatch_normalization_caps_to_available_peers() {
871 let normalized = normalize_dispatch_config(
872 RequestDispatchConfig {
873 initial_fanout: 8,
874 hedge_fanout: 6,
875 max_fanout: 5,
876 hedge_interval_ms: 120,
877 },
878 3,
879 );
880 assert_eq!(normalized.max_fanout, 3);
881 assert_eq!(normalized.initial_fanout, 3);
882 assert_eq!(normalized.hedge_fanout, 3);
883 }
884
885 #[test]
886 fn test_hedged_wave_plan_matches_dispatch_policy() {
887 let plan = build_hedged_wave_plan(
888 7,
889 RequestDispatchConfig {
890 initial_fanout: 2,
891 hedge_fanout: 3,
892 max_fanout: 6,
893 hedge_interval_ms: 120,
894 },
895 );
896 assert_eq!(plan, vec![2, 3, 1]);
897 }
898}