Skip to main content

hashtree_network/manager/
runtime.rs

1use super::*;
2
3impl WebRTCManager {
4    /// Start the native peer router - connects transports and handles signaling.
5    pub async fn run(&mut self) -> Result<()> {
6        info!(
7            "Starting peer router with peer ID: {}",
8            self.my_peer_id.short()
9        );
10
11        let (event_tx, mut event_rx) = mpsc::channel::<(String, nostr_sdk::nostr::Event)>(100);
12        let (relay_msg_tx, mut relay_msg_rx) = mpsc::channel::<SignalingMessage>(100);
13
14        // Take the signaling receiver
15        let mut signaling_rx = self
16            .signaling_rx
17            .take()
18            .expect("signaling_rx already taken");
19
20        // Take the state event receiver
21        let mut state_event_rx = self
22            .state_event_rx
23            .take()
24            .expect("state_event_rx already taken");
25        let mut mesh_frame_rx = self
26            .mesh_frame_rx
27            .take()
28            .expect("mesh_frame_rx already taken");
29
30        if self.config.bluetooth.is_enabled() {
31            let bluetooth = BluetoothMesh::new(self.config.bluetooth.clone());
32            let context = BluetoothRuntimeContext {
33                my_peer_id: self.my_peer_id.clone(),
34                store: if bluetooth_nostr_only_mode() {
35                    None
36                } else {
37                    self.store.clone()
38                },
39                nostr_relay: self.nostr_relay.clone(),
40                mesh_frame_tx: self.mesh_frame_tx.clone(),
41                registrar: BluetoothPeerRegistrar::new(
42                    self.state.clone(),
43                    self.peer_classifier.clone(),
44                    self.config.pools.clone(),
45                    self.config.bluetooth.max_peers,
46                ),
47            };
48            let _ = bluetooth.start(context).await;
49        }
50
51        let relay_transport = if self.config.signaling_enabled {
52            let transport = Arc::new(NostrRelayTransport::new(
53                self.keys.clone(),
54                self.config.debug,
55            ));
56            transport
57                .connect(&self.config.relays)
58                .await
59                .map_err(|e| anyhow::anyhow!(e.to_string()))?;
60
61            let relay_reader = transport.clone();
62            let relay_msg_tx = relay_msg_tx.clone();
63            tokio::spawn(async move {
64                while let Some(msg) = relay_reader.recv().await {
65                    if relay_msg_tx.send(msg).await.is_err() {
66                        break;
67                    }
68                }
69            });
70
71            Some(transport)
72        } else {
73            None
74        };
75
76        if self.config.multicast.is_enabled() {
77            if let Some(relay) = self.nostr_relay.clone() {
78                let relay = relay as crate::SharedMeshEventStore;
79                match MulticastNostrBus::bind(
80                    self.config.multicast.clone(),
81                    self.keys.clone(),
82                    relay,
83                )
84                .await
85                {
86                    Ok(bus) => {
87                        let local_bus: SharedLocalNostrBus = bus.clone();
88                        self.state.add_local_bus(local_bus.clone()).await;
89                        self.local_buses.push(local_bus);
90                        let shutdown_rx = self.shutdown_rx.clone();
91                        let signaling_tx = event_tx.clone();
92                        tokio::spawn(async move {
93                            if let Err(err) = bus.run(shutdown_rx, signaling_tx).await {
94                                error!("Multicast bus error: {}", err);
95                            }
96                        });
97                    }
98                    Err(err) => {
99                        warn!("Failed to start multicast bus: {}", err);
100                    }
101                }
102            } else {
103                warn!("Multicast enabled but Nostr relay is unavailable");
104            }
105        }
106
107        if self.config.wifi_aware.is_enabled() {
108            if let Some(relay) = self.nostr_relay.clone() {
109                if let Some(bridge) = mobile_wifi_aware_bridge() {
110                    let relay = relay as crate::SharedMeshEventStore;
111                    let bus = WifiAwareNostrBus::new(
112                        self.config.wifi_aware.clone(),
113                        self.keys.clone(),
114                        relay,
115                        bridge,
116                    );
117                    let local_bus: SharedLocalNostrBus = bus.clone();
118                    self.state.add_local_bus(local_bus.clone()).await;
119                    self.local_buses.push(local_bus);
120                    let shutdown_rx = self.shutdown_rx.clone();
121                    let signaling_tx = event_tx.clone();
122                    let local_peer_id = self.my_peer_id.to_string();
123                    tokio::spawn(async move {
124                        if let Err(err) = bus.run(local_peer_id, shutdown_rx, signaling_tx).await {
125                            error!("Wi-Fi Aware bus error: {}", err);
126                        }
127                    });
128                } else {
129                    warn!("Wi-Fi Aware enabled but no mobile bridge is installed");
130                }
131            } else {
132                warn!("Wi-Fi Aware enabled but Nostr relay is unavailable");
133            }
134        }
135
136        if self.config.signaling_enabled {
137            let transport = Arc::new(RouterSignalingBridge::new(
138                self.my_peer_id.to_string(),
139                self.signaling_tx.clone(),
140            ));
141            let factory = Arc::new(SharedRouterPeerFactory::new(
142                self.my_peer_id.clone(),
143                self.signaling_tx.clone(),
144                self.config.stun_servers.clone(),
145                self.store.clone(),
146                self.state.clone(),
147                self.state_event_tx.clone(),
148                self.nostr_relay.clone(),
149                self.mesh_frame_tx.clone(),
150                self.peer_classifier.clone(),
151            ));
152            let (classifier_tx, mut classifier_rx) = mpsc::channel::<SharedClassifyRequest>(32);
153            let classifier = self.peer_classifier.clone();
154            tokio::spawn(async move {
155                while let Some(request) = classifier_rx.recv().await {
156                    let _ = request.response.send(classifier(&request.pubkey));
157                }
158            });
159
160            let mut router = MeshRouter::new(
161                self.my_peer_id.to_string(),
162                transport,
163                factory.clone(),
164                self.config.pools.clone(),
165                self.config.debug,
166            );
167            router.set_classifier(classifier_tx);
168            router.set_hash_get_enabled(self.config.hash_get_enabled);
169            self.shared_router = Some(Arc::new(router));
170        }
171
172        // Process incoming events and outgoing signaling messages
173        let mut shutdown_rx = self.shutdown_rx.clone();
174        // Cleanup interval - run every 30 seconds as a fallback (not for real-time sync)
175        let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
176        let mut hello_ticker =
177            tokio::time::interval(Duration::from_millis(self.config.hello_interval_ms));
178        if self.config.signaling_enabled {
179            if let Some(shared_router) = self.shared_router.as_ref() {
180                let _ = shared_router.send_hello(Vec::new()).await;
181            }
182        }
183        loop {
184            tokio::select! {
185                _ = shutdown_rx.changed() => {
186                    if *shutdown_rx.borrow() {
187                        info!("WebRTC manager shutting down");
188                        break;
189                    }
190                }
191                Some(msg) = relay_msg_rx.recv() => {
192                    if let Err(e) = self
193                        .handle_signaling_message("relay", msg, self.shared_router.as_ref())
194                        .await
195                    {
196                        debug!("Error handling relay signaling message: {}", e);
197                    }
198                }
199                Some((relay, event)) = event_rx.recv() => {
200                    if let Err(e) = self
201                        .handle_event(&relay, &event, self.shared_router.as_ref())
202                        .await
203                    {
204                        debug!("Error handling event from {}: {}", relay, e);
205                    }
206                }
207                Some(msg) = signaling_rx.recv() => {
208                    self.dispatch_signaling_message(msg, relay_transport.as_ref()).await;
209                }
210                Some(event) = state_event_rx.recv() => {
211                    // Handle peer state events (connected, failed, disconnected)
212                    self.handle_peer_state_event(event).await;
213                }
214                Some((from_peer_id, frame)) = mesh_frame_rx.recv() => {
215                    self.handle_mesh_frame(from_peer_id, frame).await;
216                }
217                _ = hello_ticker.tick(), if self.config.signaling_enabled => {
218                    if let Some(shared_router) = self.shared_router.as_ref() {
219                        let _ = shared_router.send_hello(Vec::new()).await;
220                    }
221                }
222                _ = cleanup_interval.tick() => {
223                    // Periodic cleanup of stale peers and state sync (fallback)
224                    self.cleanup_stale_peers().await;
225                }
226            }
227        }
228
229        Ok(())
230    }
231
232    async fn mark_seen_frame_id(&self, frame_id: String) -> bool {
233        let mut seen = self.seen_frame_ids.lock().await;
234        seen.insert_if_new(frame_id)
235    }
236
237    async fn mark_seen_event_id(&self, event_id: String) -> bool {
238        let mut seen = self.seen_event_ids.lock().await;
239        seen.insert_if_new(event_id)
240    }
241
242    async fn dispatch_signaling_message(
243        &self,
244        msg: SignalingMessage,
245        relay_transport: Option<&Arc<NostrRelayTransport>>,
246    ) {
247        if let Err(err) = crate::dispatch_signaling_message(
248            self.config.signaling_enabled,
249            &self.keys,
250            &self.my_peer_id,
251            &self.state.runtime,
252            relay_transport,
253            &self.local_buses,
254            &self.seen_frame_ids,
255            &self.seen_event_ids,
256            msg,
257            MESH_SIGNALING_EVENT_KIND as u64,
258        )
259        .await
260        {
261            debug!("Failed to dispatch signaling message: {}", err);
262        }
263    }
264
265    async fn forward_mesh_frame(
266        &self,
267        frame: &MeshNostrFrame,
268        exclude_peer_id: Option<&str>,
269    ) -> usize {
270        crate::forward_mesh_frame_from_runtime(&self.state.runtime, frame, exclude_peer_id).await
271    }
272
273    async fn handle_mesh_frame(&self, from_peer_id: PeerId, frame: MeshNostrFrame) {
274        if let Err(reason) = validate_mesh_frame(&frame) {
275            debug!(
276                "Ignoring mesh frame from {} (invalid: {})",
277                from_peer_id.short(),
278                reason
279            );
280            return;
281        }
282
283        if !self.mark_seen_frame_id(frame.frame_id.clone()).await {
284            self.state.record_mesh_duplicate_drop();
285            return;
286        }
287
288        let event = match &frame.payload {
289            MeshNostrPayload::Event { event } => event.clone(),
290        };
291
292        if !self.mark_seen_event_id(event.id.to_hex()).await {
293            self.state.record_mesh_duplicate_drop();
294            return;
295        }
296
297        if event.verify().is_err() {
298            debug!(
299                "Ignoring mesh event from {} due to invalid signature",
300                from_peer_id.short()
301            );
302            return;
303        }
304
305        self.state.record_mesh_received();
306
307        if let Err(e) = self
308            .handle_event("mesh", &event, self.shared_router.as_ref())
309            .await
310        {
311            debug!(
312                "Error handling mesh event from {}: {}",
313                from_peer_id.short(),
314                e
315            );
316        }
317
318        let forwarded = self
319            .forward_mesh_frame(&frame, Some(&from_peer_id.to_string()))
320            .await;
321        if forwarded > 0 {
322            self.state.record_mesh_forwarded(forwarded as u64);
323        }
324    }
325
326    /// Handle an incoming event
327    ///
328    /// Messages may be:
329    /// 1. Hello messages: kind 25050 with #l: "hello" tag and peerId
330    /// 2. Gift-wrapped directed messages: kind 25050 with #p tag, encrypted with ephemeral key
331    async fn handle_event(
332        &self,
333        relay: &str,
334        event: &nostr_sdk::nostr::Event,
335        shared_router: Option<&Arc<SharedProductionRouter>>,
336    ) -> Result<()> {
337        crate::handle_signaling_event(
338            self.config.signaling_enabled,
339            &self.my_peer_id,
340            &self.keys,
341            &self.state.runtime,
342            relay,
343            self.local_bus_max_peers(relay),
344            event,
345            shared_router,
346        )
347        .await
348    }
349
350    async fn handle_signaling_message(
351        &self,
352        source: &str,
353        msg: SignalingMessage,
354        shared_router: Option<&Arc<SharedProductionRouter>>,
355    ) -> Result<()> {
356        crate::handle_signaling_message(
357            &self.state.runtime,
358            source,
359            self.local_bus_max_peers(source),
360            msg,
361            shared_router,
362        )
363        .await
364    }
365
366    /// Handle peer state change events from peer connections
367    async fn handle_peer_state_event(&self, event: PeerStateEvent) {
368        crate::handle_peer_state_event(&self.state.runtime, event, self.shared_router.as_ref())
369            .await;
370    }
371
372    /// Cleanup stale peers and sync connection states (fallback, runs every 30s)
373    async fn cleanup_stale_peers(&self) {
374        crate::cleanup_stale_peers(&self.state.runtime, Duration::from_secs(60)).await;
375    }
376}
377
378// Keep the old PeerState for backward compatibility with tests
379#[allow(dead_code)]
380#[derive(Debug, Clone)]
381pub struct PeerState {
382    pub peer_id: PeerId,
383    pub direction: PeerDirection,
384    pub state: String,
385    pub last_seen: Instant,
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391    use crate::root_events::{self, PeerRootEvent};
392    use crate::session::TestMeshPeer;
393    use crate::LocalNostrBus;
394    use crate::SelectionStrategy;
395    use crate::{build_hedged_wave_plan, normalize_dispatch_config};
396    use anyhow::Result as AnyResult;
397    use async_trait::async_trait;
398    use nostr_sdk::nostr::{EventBuilder, Keys, Tag};
399    use std::time::Duration;
400
401    struct TestLocalBus {
402        source: &'static str,
403        root: Option<PeerRootEvent>,
404    }
405
406    #[async_trait]
407    impl LocalNostrBus for TestLocalBus {
408        fn source_name(&self) -> &'static str {
409            self.source
410        }
411
412        async fn broadcast_event(&self, _event: &nostr_sdk::nostr::Event) -> AnyResult<()> {
413            Ok(())
414        }
415
416        async fn query_root(
417            &self,
418            _owner_pubkey: &str,
419            _tree_name: &str,
420            _timeout: Duration,
421        ) -> Option<PeerRootEvent> {
422            self.root.clone()
423        }
424    }
425
426    #[test]
427    fn root_event_from_peer_extracts_tags() {
428        let keys = Keys::generate();
429        let hash = "ab".repeat(32);
430        let event = EventBuilder::new(
431            Kind::Custom(root_events::HASHTREE_KIND),
432            "",
433            [
434                Tag::parse(&["d", "repo"]).unwrap(),
435                Tag::parse(&["l", root_events::HASHTREE_LABEL]).unwrap(),
436                Tag::parse(&["hash", &hash]).unwrap(),
437                Tag::parse(&["encryptedKey", &"11".repeat(32)]).unwrap(),
438            ],
439        )
440        .to_event(&keys)
441        .unwrap();
442
443        let parsed = root_events::root_event_from_peer(&event, "peer-a", "repo").unwrap();
444        let expected_encrypted = "11".repeat(32);
445        assert_eq!(parsed.hash, hash);
446        assert_eq!(parsed.peer_id, "peer-a");
447        assert_eq!(
448            parsed.encrypted_key.as_deref(),
449            Some(expected_encrypted.as_str())
450        );
451        assert!(parsed.key.is_none());
452    }
453
454    #[test]
455    fn pick_latest_event_prefers_higher_event_id_on_timestamp_tie() {
456        let keys = Keys::generate();
457        let created_at = nostr_sdk::nostr::Timestamp::from_secs(1_700_000_000);
458        let event_a = EventBuilder::new(Kind::Custom(root_events::HASHTREE_KIND), "", [])
459            .custom_created_at(created_at)
460            .to_event(&keys)
461            .unwrap();
462        let event_b = EventBuilder::new(Kind::Custom(root_events::HASHTREE_KIND), "", [])
463            .custom_created_at(created_at)
464            .to_event(&keys)
465            .unwrap();
466
467        let expected = if event_a.id > event_b.id {
468            event_a.id
469        } else {
470            event_b.id
471        };
472        let picked = root_events::pick_latest_event([&event_a, &event_b]).unwrap();
473        assert_eq!(picked.id, expected);
474    }
475
476    #[tokio::test]
477    async fn resolve_root_from_local_buses_returns_source_and_first_match() {
478        let state = WebRTCState::new();
479        let root = PeerRootEvent {
480            hash: "ab".repeat(32),
481            key: None,
482            encrypted_key: None,
483            self_encrypted_key: None,
484            event_id: "event-1".to_string(),
485            created_at: 1,
486            peer_id: "bus-peer".to_string(),
487        };
488
489        state
490            .set_local_buses(vec![
491                Arc::new(TestLocalBus {
492                    source: "empty",
493                    root: None,
494                }),
495                Arc::new(TestLocalBus {
496                    source: "mock-bus",
497                    root: Some(root.clone()),
498                }),
499            ])
500            .await;
501
502        let resolved = state
503            .resolve_root_from_local_buses_with_source("owner", "tree", Duration::from_millis(10))
504            .await
505            .expect("expected root from local bus");
506
507        assert_eq!(resolved.0, "mock-bus");
508        assert_eq!(resolved.1.hash, root.hash);
509        assert_eq!(resolved.1.peer_id, root.peer_id);
510    }
511
512    #[tokio::test]
513    async fn can_track_local_bus_peer_enforces_wifi_aware_limit() {
514        let keys = Keys::generate();
515        let mut config = WebRTCConfig::default();
516        config.wifi_aware.enabled = true;
517        config.wifi_aware.max_peers = 1;
518        let manager = WebRTCManager::new(keys, config);
519        let existing_peer = PeerId::new("peer-a".to_string());
520        let existing_key = existing_peer.to_string();
521        let mut peers = HashMap::new();
522        peers.insert(
523            existing_key.clone(),
524            PeerEntry {
525                peer_id: existing_peer,
526                direction: PeerDirection::Outbound,
527                state: ConnectionState::Discovered,
528                last_seen: Instant::now(),
529                peer: None,
530                pool: PeerPool::Other,
531                transport: PeerTransport::WebRtc,
532                signal_paths: BTreeSet::from([PeerSignalPath::WifiAware]),
533                bytes_sent: 0,
534                bytes_received: 0,
535            },
536        );
537
538        assert!(manager.can_track_local_bus_peer(WIFI_AWARE_SOURCE, &existing_key, &peers,));
539        assert!(!manager.can_track_local_bus_peer(WIFI_AWARE_SOURCE, "peer-b:sess-b", &peers,));
540        assert!(manager.can_track_local_bus_peer("relay", "peer-c:sess-c", &peers));
541    }
542
543    #[tokio::test]
544    async fn request_from_peers_with_source_accepts_generic_mesh_peers() {
545        let state = WebRTCState::new();
546        let data = b"offline-over-ble".to_vec();
547        let hash_hex = hex::encode(hashtree_core::sha256(&data));
548
549        state.runtime.peers.write().await.insert(
550            "peer-a".to_string(),
551            PeerEntry {
552                peer_id: PeerId::new("peer-a-pub".to_string()),
553                direction: PeerDirection::Outbound,
554                state: ConnectionState::Connected,
555                last_seen: Instant::now(),
556                peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_response(Some(
557                    data.clone(),
558                )))),
559                pool: PeerPool::Other,
560                transport: PeerTransport::Bluetooth,
561                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
562                bytes_sent: 0,
563                bytes_received: 0,
564            },
565        );
566
567        let resolved = state
568            .request_from_peers_with_source(&hash_hex)
569            .await
570            .expect("expected mock mesh peer response");
571
572        assert_eq!(resolved.0, data);
573        assert_eq!(resolved.1, "peer-a-pub");
574    }
575
576    #[tokio::test]
577    async fn request_from_peers_with_source_waits_full_timeout_for_last_generic_peer() {
578        let state = WebRTCState::new_with_routing_and_cashu(
579            SelectionStrategy::TitForTat,
580            true,
581            RequestDispatchConfig {
582                initial_fanout: 1,
583                hedge_fanout: 1,
584                max_fanout: 1,
585                hedge_interval_ms: 50,
586            },
587            Duration::from_millis(400),
588            CashuRoutingConfig::default(),
589            None,
590            None,
591        );
592        let data = b"slow-offline-over-ble".to_vec();
593        let hash_hex = hex::encode(hashtree_core::sha256(&data));
594
595        state.runtime.peers.write().await.insert(
596            "peer-a".to_string(),
597            PeerEntry {
598                peer_id: PeerId::new("peer-a-pub".to_string()),
599                direction: PeerDirection::Outbound,
600                state: ConnectionState::Connected,
601                last_seen: Instant::now(),
602                peer: Some(MeshPeer::mock_for_tests(
603                    TestMeshPeer::with_delayed_response(
604                        Some(data.clone()),
605                        Duration::from_millis(200),
606                    ),
607                )),
608                pool: PeerPool::Other,
609                transport: PeerTransport::Bluetooth,
610                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
611                bytes_sent: 0,
612                bytes_received: 0,
613            },
614        );
615
616        let resolved = state
617            .request_from_peers_with_source(&hash_hex)
618            .await
619            .expect("expected delayed mock mesh peer response");
620
621        assert_eq!(resolved.0, data);
622        assert_eq!(resolved.1, "peer-a-pub");
623    }
624
625    #[tokio::test]
626    async fn request_from_peers_with_source_skips_peers_with_hash_get_disabled() {
627        let state = WebRTCState::new();
628        let capable_data = b"hash-get-capable".to_vec();
629        let capable_hash_hex = hex::encode(hashtree_core::sha256(&capable_data));
630
631        state.runtime.peers.write().await.insert(
632            "peer-assist".to_string(),
633            PeerEntry {
634                peer_id: PeerId::new("peer-assist-pub".to_string()),
635                direction: PeerDirection::Outbound,
636                state: ConnectionState::Connected,
637                last_seen: Instant::now(),
638                peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_response(Some(
639                    b"assist-should-not-be-queried".to_vec(),
640                )))),
641                pool: PeerPool::Other,
642                transport: PeerTransport::Bluetooth,
643                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
644                bytes_sent: 0,
645                bytes_received: 0,
646            },
647        );
648        state
649            .runtime
650            .set_peer_hash_get("peer-assist-pub", false)
651            .await;
652
653        state.runtime.peers.write().await.insert(
654            "peer-capable".to_string(),
655            PeerEntry {
656                peer_id: PeerId::new("peer-capable-pub".to_string()),
657                direction: PeerDirection::Outbound,
658                state: ConnectionState::Connected,
659                last_seen: Instant::now(),
660                peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_response(Some(
661                    capable_data.clone(),
662                )))),
663                pool: PeerPool::Other,
664                transport: PeerTransport::Bluetooth,
665                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
666                bytes_sent: 0,
667                bytes_received: 0,
668            },
669        );
670        state
671            .runtime
672            .set_peer_hash_get("peer-capable-pub", true)
673            .await;
674
675        let resolved = state
676            .request_from_peers_with_source(&capable_hash_hex)
677            .await
678            .expect("expected capable peer response");
679
680        assert_eq!(resolved.0, capable_data);
681        assert_eq!(resolved.1, "peer-capable-pub");
682    }
683
684    #[tokio::test]
685    async fn dispatch_signaling_message_is_noop_when_signaling_disabled() {
686        let keys = Keys::generate();
687        let mut config = WebRTCConfig::default();
688        config.signaling_enabled = false;
689        let manager = WebRTCManager::new(keys, config);
690        let peer_id = PeerId::new("peer-a-pub".to_string());
691        let peer_key = peer_id.to_string();
692        let peer = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
693        let peer_ref = peer.mock_ref().expect("mock peer").clone();
694
695        manager.state.runtime.peers.write().await.insert(
696            peer_key,
697            PeerEntry {
698                peer_id,
699                direction: PeerDirection::Outbound,
700                state: ConnectionState::Connected,
701                last_seen: Instant::now(),
702                peer: Some(peer),
703                pool: PeerPool::Other,
704                transport: PeerTransport::Bluetooth,
705                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
706                bytes_sent: 0,
707                bytes_received: 0,
708            },
709        );
710
711        manager
712            .dispatch_signaling_message(
713                SignalingMessage::Hello {
714                    peer_id: manager.my_peer_id.to_string(),
715                    roots: Vec::new(),
716                    hash_get: true,
717                },
718                None,
719            )
720            .await;
721
722        assert_eq!(peer_ref.sent_frame_count().await, 0);
723    }
724
725    #[tokio::test]
726    async fn failed_peer_cleanup_does_not_hold_peer_map_lock_while_closing() {
727        let keys = Keys::generate();
728        let manager = Arc::new(WebRTCManager::new(keys, WebRTCConfig::default()));
729        let peer_id = PeerId::new("peer-a-pub".to_string());
730        let peer_key = peer_id.to_string();
731
732        manager.state.runtime.peers.write().await.insert(
733            peer_key.clone(),
734            PeerEntry {
735                peer_id: peer_id.clone(),
736                direction: PeerDirection::Outbound,
737                state: ConnectionState::Connected,
738                last_seen: Instant::now(),
739                peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_delayed_close(
740                    Duration::from_millis(200),
741                ))),
742                pool: PeerPool::Other,
743                transport: PeerTransport::Bluetooth,
744                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
745                bytes_sent: 0,
746                bytes_received: 0,
747            },
748        );
749
750        let manager_for_task = manager.clone();
751        let peer_id_for_task = peer_id.clone();
752        let cleanup_task = tokio::spawn(async move {
753            manager_for_task
754                .handle_peer_state_event(PeerStateEvent::Failed(peer_id_for_task))
755                .await;
756        });
757
758        tokio::time::sleep(Duration::from_millis(20)).await;
759
760        let remaining = tokio::time::timeout(Duration::from_millis(50), async {
761            manager.state.runtime.peers.read().await.len()
762        })
763        .await
764        .expect("peer map read should not block on close");
765
766        assert_eq!(remaining, 0);
767        cleanup_task.await.expect("cleanup task");
768    }
769
770    #[tokio::test]
771    async fn resolve_root_from_peers_does_not_hold_peer_map_lock_while_querying() {
772        let keys = Keys::generate();
773        let manager = Arc::new(WebRTCManager::new(keys.clone(), WebRTCConfig::default()));
774        let owner_keys = Keys::generate();
775        let owner_pubkey = owner_keys.public_key().to_hex();
776        let tree_name = "video";
777        let hash = "ab".repeat(32);
778        let event = EventBuilder::new(
779            Kind::Custom(root_events::HASHTREE_KIND),
780            "",
781            [
782                Tag::parse(&["d", tree_name]).unwrap(),
783                Tag::parse(&["l", root_events::HASHTREE_LABEL]).unwrap(),
784                Tag::parse(&["hash", &hash]).unwrap(),
785            ],
786        )
787        .to_event(&owner_keys)
788        .unwrap();
789
790        let peer_id = PeerId::new("peer-a-pub".to_string());
791        let peer_key = peer_id.to_string();
792
793        manager.state.runtime.peers.write().await.insert(
794            peer_key.clone(),
795            PeerEntry {
796                peer_id,
797                direction: PeerDirection::Outbound,
798                state: ConnectionState::Connected,
799                last_seen: Instant::now(),
800                peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_delayed_events(
801                    vec![event],
802                    Duration::from_millis(200),
803                ))),
804                pool: PeerPool::Other,
805                transport: PeerTransport::Bluetooth,
806                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
807                bytes_sent: 0,
808                bytes_received: 0,
809            },
810        );
811
812        let manager_for_task = manager.clone();
813        let owner_pubkey_for_task = owner_pubkey.clone();
814        let resolve_task = tokio::spawn(async move {
815            manager_for_task
816                .state
817                .resolve_root_from_peers(
818                    &owner_pubkey_for_task,
819                    tree_name,
820                    Duration::from_millis(500),
821                )
822                .await
823        });
824
825        tokio::time::sleep(Duration::from_millis(20)).await;
826
827        let manager_for_writer = manager.clone();
828        let peer_key_for_writer = peer_key.clone();
829        let writer_task = tokio::spawn(async move {
830            let mut peers = manager_for_writer.state.runtime.peers.write().await;
831            if let Some(entry) = peers.get_mut(&peer_key_for_writer) {
832                entry.bytes_received += 1;
833            }
834        });
835
836        tokio::time::sleep(Duration::from_millis(20)).await;
837
838        let status_count = tokio::time::timeout(Duration::from_millis(50), async {
839            manager.state.runtime.peers.read().await.len()
840        })
841        .await
842        .expect("peer map read should not block on root query");
843
844        assert_eq!(status_count, 1);
845        assert!(resolve_task.await.expect("resolve task").is_some());
846        writer_task.await.expect("writer task");
847    }
848
849    #[test]
850    fn test_formal_timed_seen_set_rejects_duplicates() {
851        let mut seen = TimedSeenSet::new(4, Duration::from_secs(60));
852        assert!(seen.insert_if_new("frame-1".to_string()));
853        assert!(!seen.insert_if_new("frame-1".to_string()));
854        assert!(seen.insert_if_new("frame-2".to_string()));
855    }
856
857    #[test]
858    fn test_formal_timed_seen_set_evicts_oldest_when_capacity_exceeded() {
859        let mut seen = TimedSeenSet::new(2, Duration::from_secs(60));
860        assert!(seen.insert_if_new("a".to_string()));
861        assert!(seen.insert_if_new("b".to_string()));
862        assert!(seen.insert_if_new("c".to_string()));
863
864        // "a" should be evicted due to cap=2, so re-insert becomes new again.
865        assert!(seen.insert_if_new("a".to_string()));
866        assert!(!seen.insert_if_new("a".to_string()));
867    }
868
869    #[test]
870    fn test_request_dispatch_normalization_caps_to_available_peers() {
871        let normalized = normalize_dispatch_config(
872            RequestDispatchConfig {
873                initial_fanout: 8,
874                hedge_fanout: 6,
875                max_fanout: 5,
876                hedge_interval_ms: 120,
877            },
878            3,
879        );
880        assert_eq!(normalized.max_fanout, 3);
881        assert_eq!(normalized.initial_fanout, 3);
882        assert_eq!(normalized.hedge_fanout, 3);
883    }
884
885    #[test]
886    fn test_hedged_wave_plan_matches_dispatch_policy() {
887        let plan = build_hedged_wave_plan(
888            7,
889            RequestDispatchConfig {
890                initial_fanout: 2,
891                hedge_fanout: 3,
892                max_fanout: 6,
893                hedge_interval_ms: 120,
894            },
895        );
896        assert_eq!(plan, vec![2, 3, 1]);
897    }
898}