1use super::*;
2
3impl WebRTCManager {
4 pub async fn run(&mut self) -> Result<()> {
6 info!(
7 "Starting peer router with peer ID: {}",
8 self.my_peer_id.short()
9 );
10
11 let (event_tx, mut event_rx) = mpsc::channel::<(String, nostr_sdk::nostr::Event)>(100);
12 let (relay_msg_tx, mut relay_msg_rx) = mpsc::channel::<SignalingMessage>(100);
13
14 let mut signaling_rx = self
16 .signaling_rx
17 .take()
18 .expect("signaling_rx already taken");
19
20 let mut state_event_rx = self
22 .state_event_rx
23 .take()
24 .expect("state_event_rx already taken");
25 let mut mesh_frame_rx = self
26 .mesh_frame_rx
27 .take()
28 .expect("mesh_frame_rx already taken");
29
30 if self.config.bluetooth.is_enabled() {
31 let bluetooth = BluetoothMesh::new(self.config.bluetooth.clone());
32 let context = BluetoothRuntimeContext {
33 my_peer_id: self.my_peer_id.clone(),
34 store: if bluetooth_nostr_only_mode() {
35 None
36 } else {
37 self.store.clone()
38 },
39 nostr_relay: self.nostr_relay.clone(),
40 mesh_frame_tx: self.mesh_frame_tx.clone(),
41 registrar: BluetoothPeerRegistrar::new(
42 self.state.clone(),
43 self.peer_classifier.clone(),
44 self.config.pools.clone(),
45 self.config.bluetooth.max_peers,
46 ),
47 };
48 let _ = bluetooth.start(context).await;
49 }
50
51 let relay_transport = if self.config.signaling_enabled {
52 let transport = Arc::new(NostrRelayTransport::new(
53 self.keys.clone(),
54 self.config.debug,
55 ));
56 transport
57 .connect(&self.config.relays)
58 .await
59 .map_err(|e| anyhow::anyhow!(e.to_string()))?;
60
61 let relay_reader = transport.clone();
62 let relay_msg_tx = relay_msg_tx.clone();
63 tokio::spawn(async move {
64 while let Some(msg) = relay_reader.recv().await {
65 if relay_msg_tx.send(msg).await.is_err() {
66 break;
67 }
68 }
69 });
70
71 Some(transport)
72 } else {
73 None
74 };
75
76 if self.config.multicast.is_enabled() {
77 if let Some(relay) = self.nostr_relay.clone() {
78 let relay = relay as crate::SharedMeshEventStore;
79 match MulticastNostrBus::bind(
80 self.config.multicast.clone(),
81 self.keys.clone(),
82 relay,
83 )
84 .await
85 {
86 Ok(bus) => {
87 let local_bus: SharedLocalNostrBus = bus.clone();
88 self.state.add_local_bus(local_bus.clone()).await;
89 self.local_buses.push(local_bus);
90 let shutdown_rx = self.shutdown_rx.clone();
91 let signaling_tx = event_tx.clone();
92 tokio::spawn(async move {
93 if let Err(err) = bus.run(shutdown_rx, signaling_tx).await {
94 error!("Multicast bus error: {}", err);
95 }
96 });
97 }
98 Err(err) => {
99 warn!("Failed to start multicast bus: {}", err);
100 }
101 }
102 } else {
103 warn!("Multicast enabled but Nostr relay is unavailable");
104 }
105 }
106
107 if self.config.wifi_aware.is_enabled() {
108 if let Some(relay) = self.nostr_relay.clone() {
109 if let Some(bridge) = mobile_wifi_aware_bridge() {
110 let relay = relay as crate::SharedMeshEventStore;
111 let bus = WifiAwareNostrBus::new(
112 self.config.wifi_aware.clone(),
113 self.keys.clone(),
114 relay,
115 bridge,
116 );
117 let local_bus: SharedLocalNostrBus = bus.clone();
118 self.state.add_local_bus(local_bus.clone()).await;
119 self.local_buses.push(local_bus);
120 let shutdown_rx = self.shutdown_rx.clone();
121 let signaling_tx = event_tx.clone();
122 let local_peer_id = self.my_peer_id.to_string();
123 tokio::spawn(async move {
124 if let Err(err) = bus.run(local_peer_id, shutdown_rx, signaling_tx).await {
125 error!("Wi-Fi Aware bus error: {}", err);
126 }
127 });
128 } else {
129 warn!("Wi-Fi Aware enabled but no mobile bridge is installed");
130 }
131 } else {
132 warn!("Wi-Fi Aware enabled but Nostr relay is unavailable");
133 }
134 }
135
136 if self.config.signaling_enabled {
137 let transport = Arc::new(RouterSignalingBridge::new(
138 self.my_peer_id.to_string(),
139 self.signaling_tx.clone(),
140 ));
141 let factory = Arc::new(SharedRouterPeerFactory::new(
142 self.my_peer_id.clone(),
143 self.signaling_tx.clone(),
144 self.config.stun_servers.clone(),
145 self.store.clone(),
146 self.state.clone(),
147 self.state_event_tx.clone(),
148 self.nostr_relay.clone(),
149 self.mesh_frame_tx.clone(),
150 self.peer_classifier.clone(),
151 ));
152 let (classifier_tx, mut classifier_rx) = mpsc::channel::<SharedClassifyRequest>(32);
153 let classifier = self.peer_classifier.clone();
154 tokio::spawn(async move {
155 while let Some(request) = classifier_rx.recv().await {
156 let _ = request.response.send(classifier(&request.pubkey));
157 }
158 });
159
160 let mut router = MeshRouter::new(
161 self.my_peer_id.to_string(),
162 transport,
163 factory.clone(),
164 self.config.pools.clone(),
165 self.config.debug,
166 );
167 router.set_classifier(classifier_tx);
168 self.shared_router = Some(Arc::new(router));
169 }
170
171 let mut shutdown_rx = self.shutdown_rx.clone();
173 let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
175 let mut hello_ticker =
176 tokio::time::interval(Duration::from_millis(self.config.hello_interval_ms));
177 if self.config.signaling_enabled {
178 if let Some(shared_router) = self.shared_router.as_ref() {
179 let _ = shared_router.send_hello(Vec::new()).await;
180 }
181 }
182 loop {
183 tokio::select! {
184 _ = shutdown_rx.changed() => {
185 if *shutdown_rx.borrow() {
186 info!("WebRTC manager shutting down");
187 break;
188 }
189 }
190 Some(msg) = relay_msg_rx.recv() => {
191 if let Err(e) = self
192 .handle_signaling_message("relay", msg, self.shared_router.as_ref())
193 .await
194 {
195 debug!("Error handling relay signaling message: {}", e);
196 }
197 }
198 Some((relay, event)) = event_rx.recv() => {
199 if let Err(e) = self
200 .handle_event(&relay, &event, self.shared_router.as_ref())
201 .await
202 {
203 debug!("Error handling event from {}: {}", relay, e);
204 }
205 }
206 Some(msg) = signaling_rx.recv() => {
207 self.dispatch_signaling_message(msg, relay_transport.as_ref()).await;
208 }
209 Some(event) = state_event_rx.recv() => {
210 self.handle_peer_state_event(event).await;
212 }
213 Some((from_peer_id, frame)) = mesh_frame_rx.recv() => {
214 self.handle_mesh_frame(from_peer_id, frame).await;
215 }
216 _ = hello_ticker.tick(), if self.config.signaling_enabled => {
217 if let Some(shared_router) = self.shared_router.as_ref() {
218 let _ = shared_router.send_hello(Vec::new()).await;
219 }
220 }
221 _ = cleanup_interval.tick() => {
222 self.cleanup_stale_peers().await;
224 }
225 }
226 }
227
228 Ok(())
229 }
230
231 async fn mark_seen_frame_id(&self, frame_id: String) -> bool {
232 let mut seen = self.seen_frame_ids.lock().await;
233 seen.insert_if_new(frame_id)
234 }
235
236 async fn mark_seen_event_id(&self, event_id: String) -> bool {
237 let mut seen = self.seen_event_ids.lock().await;
238 seen.insert_if_new(event_id)
239 }
240
241 async fn dispatch_signaling_message(
242 &self,
243 msg: SignalingMessage,
244 relay_transport: Option<&Arc<NostrRelayTransport>>,
245 ) {
246 if let Err(err) = crate::dispatch_signaling_message(
247 self.config.signaling_enabled,
248 &self.keys,
249 &self.my_peer_id,
250 &self.state.runtime,
251 relay_transport,
252 &self.local_buses,
253 &self.seen_frame_ids,
254 &self.seen_event_ids,
255 msg,
256 MESH_SIGNALING_EVENT_KIND as u64,
257 )
258 .await
259 {
260 debug!("Failed to dispatch signaling message: {}", err);
261 }
262 }
263
264 async fn forward_mesh_frame(
265 &self,
266 frame: &MeshNostrFrame,
267 exclude_peer_id: Option<&str>,
268 ) -> usize {
269 crate::forward_mesh_frame_from_runtime(&self.state.runtime, frame, exclude_peer_id).await
270 }
271
272 async fn handle_mesh_frame(&self, from_peer_id: PeerId, frame: MeshNostrFrame) {
273 if let Err(reason) = validate_mesh_frame(&frame) {
274 debug!(
275 "Ignoring mesh frame from {} (invalid: {})",
276 from_peer_id.short(),
277 reason
278 );
279 return;
280 }
281
282 if !self.mark_seen_frame_id(frame.frame_id.clone()).await {
283 self.state.record_mesh_duplicate_drop();
284 return;
285 }
286
287 let event = match &frame.payload {
288 MeshNostrPayload::Event { event } => event.clone(),
289 };
290
291 if !self.mark_seen_event_id(event.id.to_hex()).await {
292 self.state.record_mesh_duplicate_drop();
293 return;
294 }
295
296 if event.verify().is_err() {
297 debug!(
298 "Ignoring mesh event from {} due to invalid signature",
299 from_peer_id.short()
300 );
301 return;
302 }
303
304 self.state.record_mesh_received();
305
306 if let Err(e) = self
307 .handle_event("mesh", &event, self.shared_router.as_ref())
308 .await
309 {
310 debug!(
311 "Error handling mesh event from {}: {}",
312 from_peer_id.short(),
313 e
314 );
315 }
316
317 let forwarded = self
318 .forward_mesh_frame(&frame, Some(&from_peer_id.to_string()))
319 .await;
320 if forwarded > 0 {
321 self.state.record_mesh_forwarded(forwarded as u64);
322 }
323 }
324
325 async fn handle_event(
331 &self,
332 relay: &str,
333 event: &nostr_sdk::nostr::Event,
334 shared_router: Option<&Arc<SharedProductionRouter>>,
335 ) -> Result<()> {
336 crate::handle_signaling_event(
337 self.config.signaling_enabled,
338 &self.my_peer_id,
339 &self.keys,
340 &self.state.runtime,
341 relay,
342 self.local_bus_max_peers(relay),
343 event,
344 shared_router,
345 )
346 .await
347 }
348
349 async fn handle_signaling_message(
350 &self,
351 source: &str,
352 msg: SignalingMessage,
353 shared_router: Option<&Arc<SharedProductionRouter>>,
354 ) -> Result<()> {
355 crate::handle_signaling_message(
356 &self.state.runtime,
357 source,
358 self.local_bus_max_peers(source),
359 msg,
360 shared_router,
361 )
362 .await
363 }
364
365 async fn handle_peer_state_event(&self, event: PeerStateEvent) {
367 crate::handle_peer_state_event(&self.state.runtime, event, self.shared_router.as_ref())
368 .await;
369 }
370
371 async fn cleanup_stale_peers(&self) {
373 crate::cleanup_stale_peers(&self.state.runtime, Duration::from_secs(60)).await;
374 }
375}
376
377#[allow(dead_code)]
379#[derive(Debug, Clone)]
380pub struct PeerState {
381 pub peer_id: PeerId,
382 pub direction: PeerDirection,
383 pub state: String,
384 pub last_seen: Instant,
385}
386
387#[cfg(test)]
388mod tests {
389 use super::*;
390 use crate::root_events::{self, PeerRootEvent};
391 use crate::session::TestMeshPeer;
392 use crate::LocalNostrBus;
393 use crate::SelectionStrategy;
394 use crate::{build_hedged_wave_plan, normalize_dispatch_config};
395 use anyhow::Result as AnyResult;
396 use async_trait::async_trait;
397 use nostr_sdk::nostr::{EventBuilder, Keys, Tag};
398 use std::time::Duration;
399
400 struct TestLocalBus {
401 source: &'static str,
402 root: Option<PeerRootEvent>,
403 }
404
405 #[async_trait]
406 impl LocalNostrBus for TestLocalBus {
407 fn source_name(&self) -> &'static str {
408 self.source
409 }
410
411 async fn broadcast_event(&self, _event: &nostr_sdk::nostr::Event) -> AnyResult<()> {
412 Ok(())
413 }
414
415 async fn query_root(
416 &self,
417 _owner_pubkey: &str,
418 _tree_name: &str,
419 _timeout: Duration,
420 ) -> Option<PeerRootEvent> {
421 self.root.clone()
422 }
423 }
424
425 #[test]
426 fn root_event_from_peer_extracts_tags() {
427 let keys = Keys::generate();
428 let hash = "ab".repeat(32);
429 let event = EventBuilder::new(
430 Kind::Custom(root_events::HASHTREE_KIND),
431 "",
432 [
433 Tag::parse(&["d", "repo"]).unwrap(),
434 Tag::parse(&["l", root_events::HASHTREE_LABEL]).unwrap(),
435 Tag::parse(&["hash", &hash]).unwrap(),
436 Tag::parse(&["encryptedKey", &"11".repeat(32)]).unwrap(),
437 ],
438 )
439 .to_event(&keys)
440 .unwrap();
441
442 let parsed = root_events::root_event_from_peer(&event, "peer-a", "repo").unwrap();
443 let expected_encrypted = "11".repeat(32);
444 assert_eq!(parsed.hash, hash);
445 assert_eq!(parsed.peer_id, "peer-a");
446 assert_eq!(
447 parsed.encrypted_key.as_deref(),
448 Some(expected_encrypted.as_str())
449 );
450 assert!(parsed.key.is_none());
451 }
452
453 #[test]
454 fn pick_latest_event_prefers_higher_event_id_on_timestamp_tie() {
455 let keys = Keys::generate();
456 let created_at = nostr_sdk::nostr::Timestamp::from_secs(1_700_000_000);
457 let event_a = EventBuilder::new(Kind::Custom(root_events::HASHTREE_KIND), "", [])
458 .custom_created_at(created_at)
459 .to_event(&keys)
460 .unwrap();
461 let event_b = EventBuilder::new(Kind::Custom(root_events::HASHTREE_KIND), "", [])
462 .custom_created_at(created_at)
463 .to_event(&keys)
464 .unwrap();
465
466 let expected = if event_a.id > event_b.id {
467 event_a.id
468 } else {
469 event_b.id
470 };
471 let picked = root_events::pick_latest_event([&event_a, &event_b]).unwrap();
472 assert_eq!(picked.id, expected);
473 }
474
475 #[tokio::test]
476 async fn resolve_root_from_local_buses_returns_source_and_first_match() {
477 let state = WebRTCState::new();
478 let root = PeerRootEvent {
479 hash: "ab".repeat(32),
480 key: None,
481 encrypted_key: None,
482 self_encrypted_key: None,
483 event_id: "event-1".to_string(),
484 created_at: 1,
485 peer_id: "bus-peer".to_string(),
486 };
487
488 state
489 .set_local_buses(vec![
490 Arc::new(TestLocalBus {
491 source: "empty",
492 root: None,
493 }),
494 Arc::new(TestLocalBus {
495 source: "mock-bus",
496 root: Some(root.clone()),
497 }),
498 ])
499 .await;
500
501 let resolved = state
502 .resolve_root_from_local_buses_with_source("owner", "tree", Duration::from_millis(10))
503 .await
504 .expect("expected root from local bus");
505
506 assert_eq!(resolved.0, "mock-bus");
507 assert_eq!(resolved.1.hash, root.hash);
508 assert_eq!(resolved.1.peer_id, root.peer_id);
509 }
510
511 #[tokio::test]
512 async fn can_track_local_bus_peer_enforces_wifi_aware_limit() {
513 let keys = Keys::generate();
514 let mut config = WebRTCConfig::default();
515 config.wifi_aware.enabled = true;
516 config.wifi_aware.max_peers = 1;
517 let manager = WebRTCManager::new(keys, config);
518 let existing_peer = PeerId::new("peer-a".to_string());
519 let existing_key = existing_peer.to_string();
520 let mut peers = HashMap::new();
521 peers.insert(
522 existing_key.clone(),
523 PeerEntry {
524 peer_id: existing_peer,
525 direction: PeerDirection::Outbound,
526 state: ConnectionState::Discovered,
527 last_seen: Instant::now(),
528 peer: None,
529 pool: PeerPool::Other,
530 transport: PeerTransport::WebRtc,
531 signal_paths: BTreeSet::from([PeerSignalPath::WifiAware]),
532 bytes_sent: 0,
533 bytes_received: 0,
534 },
535 );
536
537 assert!(manager.can_track_local_bus_peer(WIFI_AWARE_SOURCE, &existing_key, &peers,));
538 assert!(!manager.can_track_local_bus_peer(WIFI_AWARE_SOURCE, "peer-b:sess-b", &peers,));
539 assert!(manager.can_track_local_bus_peer("relay", "peer-c:sess-c", &peers));
540 }
541
542 #[tokio::test]
543 async fn request_from_peers_with_source_accepts_generic_mesh_peers() {
544 let state = WebRTCState::new();
545 let data = b"offline-over-ble".to_vec();
546 let hash_hex = hex::encode(hashtree_core::sha256(&data));
547
548 state.runtime.peers.write().await.insert(
549 "peer-a".to_string(),
550 PeerEntry {
551 peer_id: PeerId::new("peer-a-pub".to_string()),
552 direction: PeerDirection::Outbound,
553 state: ConnectionState::Connected,
554 last_seen: Instant::now(),
555 peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_response(Some(
556 data.clone(),
557 )))),
558 pool: PeerPool::Other,
559 transport: PeerTransport::Bluetooth,
560 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
561 bytes_sent: 0,
562 bytes_received: 0,
563 },
564 );
565
566 let resolved = state
567 .request_from_peers_with_source(&hash_hex)
568 .await
569 .expect("expected mock mesh peer response");
570
571 assert_eq!(resolved.0, data);
572 assert_eq!(resolved.1, "peer-a-pub");
573 }
574
575 #[tokio::test]
576 async fn request_from_peers_with_source_waits_full_timeout_for_last_generic_peer() {
577 let state = WebRTCState::new_with_routing_and_cashu(
578 SelectionStrategy::TitForTat,
579 true,
580 RequestDispatchConfig {
581 initial_fanout: 1,
582 hedge_fanout: 1,
583 max_fanout: 1,
584 hedge_interval_ms: 50,
585 },
586 Duration::from_millis(400),
587 CashuRoutingConfig::default(),
588 None,
589 None,
590 );
591 let data = b"slow-offline-over-ble".to_vec();
592 let hash_hex = hex::encode(hashtree_core::sha256(&data));
593
594 state.runtime.peers.write().await.insert(
595 "peer-a".to_string(),
596 PeerEntry {
597 peer_id: PeerId::new("peer-a-pub".to_string()),
598 direction: PeerDirection::Outbound,
599 state: ConnectionState::Connected,
600 last_seen: Instant::now(),
601 peer: Some(MeshPeer::mock_for_tests(
602 TestMeshPeer::with_delayed_response(
603 Some(data.clone()),
604 Duration::from_millis(200),
605 ),
606 )),
607 pool: PeerPool::Other,
608 transport: PeerTransport::Bluetooth,
609 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
610 bytes_sent: 0,
611 bytes_received: 0,
612 },
613 );
614
615 let resolved = state
616 .request_from_peers_with_source(&hash_hex)
617 .await
618 .expect("expected delayed mock mesh peer response");
619
620 assert_eq!(resolved.0, data);
621 assert_eq!(resolved.1, "peer-a-pub");
622 }
623
624 #[tokio::test]
625 async fn dispatch_signaling_message_is_noop_when_signaling_disabled() {
626 let keys = Keys::generate();
627 let mut config = WebRTCConfig::default();
628 config.signaling_enabled = false;
629 let manager = WebRTCManager::new(keys, config);
630 let peer_id = PeerId::new("peer-a-pub".to_string());
631 let peer_key = peer_id.to_string();
632 let peer = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
633 let peer_ref = peer.mock_ref().expect("mock peer").clone();
634
635 manager.state.runtime.peers.write().await.insert(
636 peer_key,
637 PeerEntry {
638 peer_id,
639 direction: PeerDirection::Outbound,
640 state: ConnectionState::Connected,
641 last_seen: Instant::now(),
642 peer: Some(peer),
643 pool: PeerPool::Other,
644 transport: PeerTransport::Bluetooth,
645 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
646 bytes_sent: 0,
647 bytes_received: 0,
648 },
649 );
650
651 manager
652 .dispatch_signaling_message(
653 SignalingMessage::Hello {
654 peer_id: manager.my_peer_id.to_string(),
655 roots: Vec::new(),
656 },
657 None,
658 )
659 .await;
660
661 assert_eq!(peer_ref.sent_frame_count().await, 0);
662 }
663
664 #[tokio::test]
665 async fn failed_peer_cleanup_does_not_hold_peer_map_lock_while_closing() {
666 let keys = Keys::generate();
667 let manager = Arc::new(WebRTCManager::new(keys, WebRTCConfig::default()));
668 let peer_id = PeerId::new("peer-a-pub".to_string());
669 let peer_key = peer_id.to_string();
670
671 manager.state.runtime.peers.write().await.insert(
672 peer_key.clone(),
673 PeerEntry {
674 peer_id: peer_id.clone(),
675 direction: PeerDirection::Outbound,
676 state: ConnectionState::Connected,
677 last_seen: Instant::now(),
678 peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_delayed_close(
679 Duration::from_millis(200),
680 ))),
681 pool: PeerPool::Other,
682 transport: PeerTransport::Bluetooth,
683 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
684 bytes_sent: 0,
685 bytes_received: 0,
686 },
687 );
688
689 let manager_for_task = manager.clone();
690 let peer_id_for_task = peer_id.clone();
691 let cleanup_task = tokio::spawn(async move {
692 manager_for_task
693 .handle_peer_state_event(PeerStateEvent::Failed(peer_id_for_task))
694 .await;
695 });
696
697 tokio::time::sleep(Duration::from_millis(20)).await;
698
699 let remaining = tokio::time::timeout(Duration::from_millis(50), async {
700 manager.state.runtime.peers.read().await.len()
701 })
702 .await
703 .expect("peer map read should not block on close");
704
705 assert_eq!(remaining, 0);
706 cleanup_task.await.expect("cleanup task");
707 }
708
709 #[tokio::test]
710 async fn resolve_root_from_peers_does_not_hold_peer_map_lock_while_querying() {
711 let keys = Keys::generate();
712 let manager = Arc::new(WebRTCManager::new(keys.clone(), WebRTCConfig::default()));
713 let owner_keys = Keys::generate();
714 let owner_pubkey = owner_keys.public_key().to_hex();
715 let tree_name = "video";
716 let hash = "ab".repeat(32);
717 let event = EventBuilder::new(
718 Kind::Custom(root_events::HASHTREE_KIND),
719 "",
720 [
721 Tag::parse(&["d", tree_name]).unwrap(),
722 Tag::parse(&["l", root_events::HASHTREE_LABEL]).unwrap(),
723 Tag::parse(&["hash", &hash]).unwrap(),
724 ],
725 )
726 .to_event(&owner_keys)
727 .unwrap();
728
729 let peer_id = PeerId::new("peer-a-pub".to_string());
730 let peer_key = peer_id.to_string();
731
732 manager.state.runtime.peers.write().await.insert(
733 peer_key.clone(),
734 PeerEntry {
735 peer_id,
736 direction: PeerDirection::Outbound,
737 state: ConnectionState::Connected,
738 last_seen: Instant::now(),
739 peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_delayed_events(
740 vec![event],
741 Duration::from_millis(200),
742 ))),
743 pool: PeerPool::Other,
744 transport: PeerTransport::Bluetooth,
745 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
746 bytes_sent: 0,
747 bytes_received: 0,
748 },
749 );
750
751 let manager_for_task = manager.clone();
752 let owner_pubkey_for_task = owner_pubkey.clone();
753 let resolve_task = tokio::spawn(async move {
754 manager_for_task
755 .state
756 .resolve_root_from_peers(
757 &owner_pubkey_for_task,
758 tree_name,
759 Duration::from_millis(500),
760 )
761 .await
762 });
763
764 tokio::time::sleep(Duration::from_millis(20)).await;
765
766 let manager_for_writer = manager.clone();
767 let peer_key_for_writer = peer_key.clone();
768 let writer_task = tokio::spawn(async move {
769 let mut peers = manager_for_writer.state.runtime.peers.write().await;
770 if let Some(entry) = peers.get_mut(&peer_key_for_writer) {
771 entry.bytes_received += 1;
772 }
773 });
774
775 tokio::time::sleep(Duration::from_millis(20)).await;
776
777 let status_count = tokio::time::timeout(Duration::from_millis(50), async {
778 manager.state.runtime.peers.read().await.len()
779 })
780 .await
781 .expect("peer map read should not block on root query");
782
783 assert_eq!(status_count, 1);
784 assert!(resolve_task.await.expect("resolve task").is_some());
785 writer_task.await.expect("writer task");
786 }
787
788 #[test]
789 fn test_formal_timed_seen_set_rejects_duplicates() {
790 let mut seen = TimedSeenSet::new(4, Duration::from_secs(60));
791 assert!(seen.insert_if_new("frame-1".to_string()));
792 assert!(!seen.insert_if_new("frame-1".to_string()));
793 assert!(seen.insert_if_new("frame-2".to_string()));
794 }
795
796 #[test]
797 fn test_formal_timed_seen_set_evicts_oldest_when_capacity_exceeded() {
798 let mut seen = TimedSeenSet::new(2, Duration::from_secs(60));
799 assert!(seen.insert_if_new("a".to_string()));
800 assert!(seen.insert_if_new("b".to_string()));
801 assert!(seen.insert_if_new("c".to_string()));
802
803 assert!(seen.insert_if_new("a".to_string()));
805 assert!(!seen.insert_if_new("a".to_string()));
806 }
807
808 #[test]
809 fn test_request_dispatch_normalization_caps_to_available_peers() {
810 let normalized = normalize_dispatch_config(
811 RequestDispatchConfig {
812 initial_fanout: 8,
813 hedge_fanout: 6,
814 max_fanout: 5,
815 hedge_interval_ms: 120,
816 },
817 3,
818 );
819 assert_eq!(normalized.max_fanout, 3);
820 assert_eq!(normalized.initial_fanout, 3);
821 assert_eq!(normalized.hedge_fanout, 3);
822 }
823
824 #[test]
825 fn test_hedged_wave_plan_matches_dispatch_policy() {
826 let plan = build_hedged_wave_plan(
827 7,
828 RequestDispatchConfig {
829 initial_fanout: 2,
830 hedge_fanout: 3,
831 max_fanout: 6,
832 hedge_interval_ms: 120,
833 },
834 );
835 assert_eq!(plan, vec![2, 3, 1]);
836 }
837}