Skip to main content

hashtree_network/manager/
runtime.rs

1use super::*;
2use nostr_sdk::nostr::JsonUtil;
3use tokio::io::{AsyncReadExt, AsyncWriteExt};
4use tokio::net::TcpStream;
5
6impl WebRTCManager {
7    /// Start the native peer router - connects transports and handles signaling.
8    pub async fn run(&mut self) -> Result<()> {
9        info!(
10            "Starting peer router with peer ID: {}",
11            self.my_peer_id.short()
12        );
13
14        let (event_tx, mut event_rx) = mpsc::channel::<(String, nostr_sdk::nostr::Event)>(100);
15        let (relay_msg_tx, mut relay_msg_rx) = mpsc::channel::<SignalingMessage>(100);
16        self.state
17            .set_direct_signaling_sender(Some(event_tx.clone()))
18            .await;
19
20        // Take the signaling receiver
21        let mut signaling_rx = self
22            .signaling_rx
23            .take()
24            .expect("signaling_rx already taken");
25
26        // Take the state event receiver
27        let mut state_event_rx = self
28            .state_event_rx
29            .take()
30            .expect("state_event_rx already taken");
31        let mut mesh_frame_rx = self
32            .mesh_frame_rx
33            .take()
34            .expect("mesh_frame_rx already taken");
35
36        if self.config.bluetooth.is_enabled() {
37            let bluetooth = BluetoothMesh::new(self.config.bluetooth.clone());
38            let context = BluetoothRuntimeContext {
39                my_peer_id: self.my_peer_id.clone(),
40                store: if bluetooth_nostr_only_mode() {
41                    None
42                } else {
43                    self.store.clone()
44                },
45                nostr_relay: self.nostr_relay.clone(),
46                mesh_frame_tx: self.mesh_frame_tx.clone(),
47                registrar: BluetoothPeerRegistrar::new(
48                    self.state.clone(),
49                    self.peer_classifier.clone(),
50                    self.config.pools.clone(),
51                    self.config.bluetooth.max_peers,
52                ),
53            };
54            let _ = bluetooth.start(context).await;
55        }
56
57        let relay_transport = if self.config.signaling_enabled {
58            let transport = Arc::new(NostrRelayTransport::new(
59                self.keys.clone(),
60                self.config.debug,
61            ));
62            transport
63                .connect(&self.config.relays)
64                .await
65                .map_err(|e| anyhow::anyhow!(e.to_string()))?;
66
67            let relay_reader = transport.clone();
68            let relay_msg_tx = relay_msg_tx.clone();
69            tokio::spawn(async move {
70                while let Some(msg) = relay_reader.recv().await {
71                    if relay_msg_tx.send(msg).await.is_err() {
72                        break;
73                    }
74                }
75            });
76
77            Some(transport)
78        } else {
79            None
80        };
81
82        if self.config.multicast.is_enabled() {
83            if let Some(relay) = self.nostr_relay.clone() {
84                let relay = relay as crate::SharedMeshEventStore;
85                match MulticastNostrBus::bind(
86                    self.config.multicast.clone(),
87                    self.keys.clone(),
88                    relay,
89                )
90                .await
91                {
92                    Ok(bus) => {
93                        let local_bus: SharedLocalNostrBus = bus.clone();
94                        self.state.add_local_bus(local_bus.clone()).await;
95                        self.local_buses.push(local_bus);
96                        let shutdown_rx = self.shutdown_rx.clone();
97                        let signaling_tx = event_tx.clone();
98                        tokio::spawn(async move {
99                            if let Err(err) = bus.run(shutdown_rx, signaling_tx).await {
100                                error!("Multicast bus error: {}", err);
101                            }
102                        });
103                    }
104                    Err(err) => {
105                        warn!("Failed to start multicast bus: {}", err);
106                    }
107                }
108            } else {
109                warn!("Multicast enabled but Nostr relay is unavailable");
110            }
111        }
112
113        if self.config.wifi_aware.is_enabled() {
114            if let Some(relay) = self.nostr_relay.clone() {
115                if let Some(bridge) = mobile_wifi_aware_bridge() {
116                    let relay = relay as crate::SharedMeshEventStore;
117                    let bus = WifiAwareNostrBus::new(
118                        self.config.wifi_aware.clone(),
119                        self.keys.clone(),
120                        relay,
121                        bridge,
122                    );
123                    let local_bus: SharedLocalNostrBus = bus.clone();
124                    self.state.add_local_bus(local_bus.clone()).await;
125                    self.local_buses.push(local_bus);
126                    let shutdown_rx = self.shutdown_rx.clone();
127                    let signaling_tx = event_tx.clone();
128                    let local_peer_id = self.my_peer_id.to_string();
129                    tokio::spawn(async move {
130                        if let Err(err) = bus.run(local_peer_id, shutdown_rx, signaling_tx).await {
131                            error!("Wi-Fi Aware bus error: {}", err);
132                        }
133                    });
134                } else {
135                    warn!("Wi-Fi Aware enabled but no mobile bridge is installed");
136                }
137            } else {
138                warn!("Wi-Fi Aware enabled but Nostr relay is unavailable");
139            }
140        }
141
142        if self.config.signaling_enabled {
143            let transport = Arc::new(RouterSignalingBridge::new(
144                self.my_peer_id.to_string(),
145                self.signaling_tx.clone(),
146            ));
147            let factory = Arc::new(SharedRouterPeerFactory::new(
148                self.my_peer_id.clone(),
149                self.signaling_tx.clone(),
150                self.config.stun_servers.clone(),
151                self.store.clone(),
152                self.state.clone(),
153                self.state_event_tx.clone(),
154                self.nostr_relay.clone(),
155                self.mesh_frame_tx.clone(),
156                self.config.signal_urls.clone(),
157                self.peer_classifier.clone(),
158            ));
159            let (classifier_tx, mut classifier_rx) = mpsc::channel::<SharedClassifyRequest>(32);
160            let classifier = self.peer_classifier.clone();
161            tokio::spawn(async move {
162                while let Some(request) = classifier_rx.recv().await {
163                    let _ = request.response.send(classifier(&request.pubkey));
164                }
165            });
166
167            let mut router = MeshRouter::new(
168                self.my_peer_id.to_string(),
169                transport,
170                factory.clone(),
171                self.config.pools.clone(),
172                self.config.debug,
173            );
174            router.set_classifier(classifier_tx);
175            router.set_hash_get_enabled(self.config.hash_get_enabled);
176            self.shared_router = Some(Arc::new(router));
177        }
178
179        // Process incoming events and outgoing signaling messages
180        let mut shutdown_rx = self.shutdown_rx.clone();
181        // Cleanup interval - run every 30 seconds as a fallback (not for real-time sync)
182        let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
183        let mut hello_ticker =
184            tokio::time::interval(Duration::from_millis(self.config.hello_interval_ms));
185        if self.config.signaling_enabled {
186            if let Some(shared_router) = self.shared_router.as_ref() {
187                let _ = shared_router.send_hello(Vec::new()).await;
188            }
189        }
190        loop {
191            tokio::select! {
192                _ = shutdown_rx.changed() => {
193                    if *shutdown_rx.borrow() {
194                        info!("WebRTC manager shutting down");
195                        break;
196                    }
197                }
198                Some(msg) = relay_msg_rx.recv() => {
199                    if let Err(e) = self
200                        .handle_signaling_message("relay", msg, self.shared_router.as_ref())
201                        .await
202                    {
203                        debug!("Error handling relay signaling message: {}", e);
204                    }
205                }
206                Some((relay, event)) = event_rx.recv() => {
207                    if let Err(e) = self
208                        .handle_event(&relay, &event, self.shared_router.as_ref())
209                        .await
210                    {
211                        debug!("Error handling event from {}: {}", relay, e);
212                    }
213                }
214                Some(msg) = signaling_rx.recv() => {
215                    self.dispatch_signaling_message(msg, relay_transport.as_ref()).await;
216                }
217                Some(event) = state_event_rx.recv() => {
218                    // Handle peer state events (connected, failed, disconnected)
219                    self.handle_peer_state_event(event).await;
220                }
221                Some((from_peer_id, frame)) = mesh_frame_rx.recv() => {
222                    self.handle_mesh_frame(from_peer_id, frame).await;
223                }
224                _ = hello_ticker.tick(), if self.config.signaling_enabled => {
225                    if let Some(shared_router) = self.shared_router.as_ref() {
226                        let _ = shared_router.send_hello(Vec::new()).await;
227                    }
228                }
229                _ = cleanup_interval.tick() => {
230                    // Periodic cleanup of stale peers and state sync (fallback)
231                    self.cleanup_stale_peers().await;
232                }
233            }
234        }
235
236        self.state.set_direct_signaling_sender(None).await;
237        Ok(())
238    }
239
240    async fn mark_seen_frame_id(&self, frame_id: String) -> bool {
241        let mut seen = self.seen_frame_ids.lock().await;
242        seen.insert_if_new(frame_id)
243    }
244
245    async fn mark_seen_event_id(&self, event_id: String) -> bool {
246        let mut seen = self.seen_event_ids.lock().await;
247        seen.insert_if_new(event_id)
248    }
249
250    async fn dispatch_signaling_message(
251        &self,
252        msg: SignalingMessage,
253        relay_transport: Option<&Arc<NostrRelayTransport>>,
254    ) {
255        self.dispatch_direct_signaling_message(&msg).await;
256        if let Err(err) = crate::dispatch_signaling_message(
257            self.config.signaling_enabled,
258            &self.keys,
259            &self.my_peer_id,
260            &self.state.runtime,
261            relay_transport,
262            &self.local_buses,
263            &self.seen_frame_ids,
264            &self.seen_event_ids,
265            msg.clone(),
266            MESH_SIGNALING_EVENT_KIND as u64,
267        )
268        .await
269        {
270            debug!("Failed to dispatch signaling message: {}", err);
271        }
272    }
273
274    async fn dispatch_direct_signaling_message(&self, msg: &SignalingMessage) {
275        let endpoints = self.direct_signaling_endpoints(msg).await;
276        if endpoints.is_empty() {
277            return;
278        }
279
280        let Ok(event) =
281            crate::create_signaling_event(&self.keys, msg, MESH_SIGNALING_EVENT_KIND as u64).await
282        else {
283            return;
284        };
285        let body = event.as_json();
286        for endpoint in endpoints {
287            let body = body.clone();
288            tokio::spawn(async move {
289                if let Err(err) = post_direct_signaling_event(&endpoint, &body).await {
290                    debug!("Failed to post WebRTC signaling event to {endpoint}: {err}");
291                }
292            });
293        }
294    }
295
296    async fn direct_signaling_endpoints(&self, msg: &SignalingMessage) -> Vec<String> {
297        let target = msg.target_peer_id().map(str::to_string);
298        let mut endpoints = Vec::new();
299        for peer in self.state.ordered_known_peers().await {
300            if target
301                .as_deref()
302                .is_some_and(|target| target != peer.peer_id)
303            {
304                continue;
305            }
306            for signal_url in peer.signal_urls {
307                let endpoint = direct_signal_endpoint(&signal_url);
308                if !endpoints.contains(&endpoint) {
309                    endpoints.push(endpoint);
310                }
311            }
312        }
313        endpoints
314    }
315
316    async fn forward_mesh_frame(
317        &self,
318        frame: &MeshNostrFrame,
319        exclude_peer_id: Option<&str>,
320    ) -> usize {
321        crate::forward_mesh_frame_from_runtime(&self.state.runtime, frame, exclude_peer_id).await
322    }
323
324    async fn handle_mesh_frame(&self, from_peer_id: PeerId, frame: MeshNostrFrame) {
325        if let Err(reason) = validate_mesh_frame(&frame) {
326            debug!(
327                "Ignoring mesh frame from {} (invalid: {})",
328                from_peer_id.short(),
329                reason
330            );
331            return;
332        }
333
334        if !self.mark_seen_frame_id(frame.frame_id.clone()).await {
335            self.state.record_mesh_duplicate_drop();
336            return;
337        }
338
339        let event = match &frame.payload {
340            MeshNostrPayload::Event { event } => event.clone(),
341        };
342
343        if !self.mark_seen_event_id(event.id.to_hex()).await {
344            self.state.record_mesh_duplicate_drop();
345            return;
346        }
347
348        if event.verify().is_err() {
349            debug!(
350                "Ignoring mesh event from {} due to invalid signature",
351                from_peer_id.short()
352            );
353            return;
354        }
355
356        self.state.record_mesh_received();
357
358        if let Err(e) = self
359            .handle_event("mesh", &event, self.shared_router.as_ref())
360            .await
361        {
362            debug!(
363                "Error handling mesh event from {}: {}",
364                from_peer_id.short(),
365                e
366            );
367        }
368
369        let forwarded = self
370            .forward_mesh_frame(&frame, Some(&from_peer_id.to_string()))
371            .await;
372        if forwarded > 0 {
373            self.state.record_mesh_forwarded(forwarded as u64);
374        }
375    }
376
377    /// Handle an incoming event
378    ///
379    /// Messages may be:
380    /// 1. Hello messages: kind 25050 with #l: "hello" tag and peerId
381    /// 2. Gift-wrapped directed messages: kind 25050 with #p tag, encrypted with ephemeral key
382    async fn handle_event(
383        &self,
384        relay: &str,
385        event: &nostr_sdk::nostr::Event,
386        shared_router: Option<&Arc<SharedProductionRouter>>,
387    ) -> Result<()> {
388        crate::handle_signaling_event(
389            self.config.signaling_enabled,
390            &self.my_peer_id,
391            &self.keys,
392            &self.state.runtime,
393            relay,
394            self.local_bus_max_peers(relay),
395            event,
396            shared_router,
397        )
398        .await
399    }
400
401    async fn handle_signaling_message(
402        &self,
403        source: &str,
404        msg: SignalingMessage,
405        shared_router: Option<&Arc<SharedProductionRouter>>,
406    ) -> Result<()> {
407        crate::handle_signaling_message(
408            &self.state.runtime,
409            source,
410            self.local_bus_max_peers(source),
411            msg,
412            shared_router,
413        )
414        .await
415    }
416
417    /// Handle peer state change events from peer connections
418    async fn handle_peer_state_event(&self, event: PeerStateEvent) {
419        crate::handle_peer_state_event(&self.state.runtime, event, self.shared_router.as_ref())
420            .await;
421    }
422
423    /// Cleanup stale peers and sync connection states (fallback, runs every 30s)
424    async fn cleanup_stale_peers(&self) {
425        crate::cleanup_stale_peers(&self.state.runtime, Duration::from_secs(60)).await;
426    }
427}
428
429fn direct_signal_endpoint(base_url: &str) -> String {
430    format!("{}/api/p2p/signal", base_url.trim().trim_end_matches('/'))
431}
432
433async fn post_direct_signaling_event(endpoint: &str, body: &str) -> Result<()> {
434    let Some((host, port, path)) = parse_http_endpoint(endpoint) else {
435        anyhow::bail!("unsupported WebRTC signaling endpoint");
436    };
437    let mut stream = TcpStream::connect((host.as_str(), port)).await?;
438    let request = format!(
439        "POST {path} HTTP/1.1\r\nHost: {host}:{port}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
440        body.len()
441    );
442    stream.write_all(request.as_bytes()).await?;
443    stream.flush().await?;
444
445    let mut response = Vec::new();
446    let _ = tokio::time::timeout(Duration::from_secs(3), stream.read_to_end(&mut response)).await;
447    if response.starts_with(b"HTTP/1.1 2") || response.starts_with(b"HTTP/1.0 2") {
448        return Ok(());
449    }
450    anyhow::bail!("WebRTC signaling endpoint returned non-2xx response");
451}
452
453fn parse_http_endpoint(endpoint: &str) -> Option<(String, u16, String)> {
454    let rest = endpoint.strip_prefix("http://")?;
455    let (authority, path) = match rest.split_once('/') {
456        Some((authority, path)) => (authority, format!("/{path}")),
457        None => (rest, "/".to_string()),
458    };
459    let (host, port) = if let Some(stripped) = authority.strip_prefix('[') {
460        let (host, tail) = stripped.split_once(']')?;
461        let port = tail.strip_prefix(':')?.parse::<u16>().ok()?;
462        (host.to_string(), port)
463    } else if let Some((host, port)) = authority.rsplit_once(':') {
464        (host.to_string(), port.parse::<u16>().ok()?)
465    } else {
466        (authority.to_string(), 80)
467    };
468    if host.is_empty() {
469        return None;
470    }
471    Some((host, port, path))
472}
473
474// Keep the old PeerState for backward compatibility with tests
475#[allow(dead_code)]
476#[derive(Debug, Clone)]
477pub struct PeerState {
478    pub peer_id: PeerId,
479    pub direction: PeerDirection,
480    pub state: String,
481    pub last_seen: Instant,
482}
483
484#[cfg(test)]
485mod tests {
486    use super::*;
487    use crate::root_events::{self, PeerRootEvent};
488    use crate::session::TestMeshPeer;
489    use crate::LocalNostrBus;
490    use crate::SelectionStrategy;
491    use crate::{build_hedged_wave_plan, normalize_dispatch_config};
492    use anyhow::Result as AnyResult;
493    use async_trait::async_trait;
494    use nostr_sdk::nostr::{EventBuilder, Keys, Tag};
495    use std::time::Duration;
496
497    struct TestLocalBus {
498        source: &'static str,
499        root: Option<PeerRootEvent>,
500    }
501
502    #[async_trait]
503    impl LocalNostrBus for TestLocalBus {
504        fn source_name(&self) -> &'static str {
505            self.source
506        }
507
508        async fn broadcast_event(&self, _event: &nostr_sdk::nostr::Event) -> AnyResult<()> {
509            Ok(())
510        }
511
512        async fn query_root(
513            &self,
514            _owner_pubkey: &str,
515            _tree_name: &str,
516            _timeout: Duration,
517        ) -> Option<PeerRootEvent> {
518            self.root.clone()
519        }
520    }
521
522    #[test]
523    fn root_event_from_peer_extracts_tags() {
524        let keys = Keys::generate();
525        let hash = "ab".repeat(32);
526        let event = EventBuilder::new(
527            Kind::Custom(root_events::HASHTREE_KIND),
528            "",
529            [
530                Tag::parse(&["d", "repo"]).unwrap(),
531                Tag::parse(&["l", root_events::HASHTREE_LABEL]).unwrap(),
532                Tag::parse(&["hash", &hash]).unwrap(),
533                Tag::parse(&["encryptedKey", &"11".repeat(32)]).unwrap(),
534            ],
535        )
536        .to_event(&keys)
537        .unwrap();
538
539        let parsed = root_events::root_event_from_peer(&event, "peer-a", "repo").unwrap();
540        let expected_encrypted = "11".repeat(32);
541        assert_eq!(parsed.hash, hash);
542        assert_eq!(parsed.peer_id, "peer-a");
543        assert_eq!(
544            parsed.encrypted_key.as_deref(),
545            Some(expected_encrypted.as_str())
546        );
547        assert!(parsed.key.is_none());
548    }
549
550    #[test]
551    fn pick_latest_event_prefers_higher_event_id_on_timestamp_tie() {
552        let keys = Keys::generate();
553        let created_at = nostr_sdk::nostr::Timestamp::from_secs(1_700_000_000);
554        let event_a = EventBuilder::new(Kind::Custom(root_events::HASHTREE_KIND), "", [])
555            .custom_created_at(created_at)
556            .to_event(&keys)
557            .unwrap();
558        let event_b = EventBuilder::new(Kind::Custom(root_events::HASHTREE_KIND), "", [])
559            .custom_created_at(created_at)
560            .to_event(&keys)
561            .unwrap();
562
563        let expected = if event_a.id > event_b.id {
564            event_a.id
565        } else {
566            event_b.id
567        };
568        let picked = root_events::pick_latest_event([&event_a, &event_b]).unwrap();
569        assert_eq!(picked.id, expected);
570    }
571
572    #[tokio::test]
573    async fn resolve_root_from_local_buses_returns_source_and_first_match() {
574        let state = WebRTCState::new();
575        let root = PeerRootEvent {
576            hash: "ab".repeat(32),
577            key: None,
578            encrypted_key: None,
579            self_encrypted_key: None,
580            event_id: "event-1".to_string(),
581            created_at: 1,
582            peer_id: "bus-peer".to_string(),
583        };
584
585        state
586            .set_local_buses(vec![
587                Arc::new(TestLocalBus {
588                    source: "empty",
589                    root: None,
590                }),
591                Arc::new(TestLocalBus {
592                    source: "mock-bus",
593                    root: Some(root.clone()),
594                }),
595            ])
596            .await;
597
598        let resolved = state
599            .resolve_root_from_local_buses_with_source("owner", "tree", Duration::from_millis(10))
600            .await
601            .expect("expected root from local bus");
602
603        assert_eq!(resolved.0, "mock-bus");
604        assert_eq!(resolved.1.hash, root.hash);
605        assert_eq!(resolved.1.peer_id, root.peer_id);
606    }
607
608    #[tokio::test]
609    async fn can_track_local_bus_peer_enforces_wifi_aware_limit() {
610        let keys = Keys::generate();
611        let mut config = WebRTCConfig::default();
612        config.wifi_aware.enabled = true;
613        config.wifi_aware.max_peers = 1;
614        let manager = WebRTCManager::new(keys, config);
615        let existing_peer = PeerId::new("peer-a".to_string());
616        let existing_key = existing_peer.to_string();
617        let mut peers = HashMap::new();
618        peers.insert(
619            existing_key.clone(),
620            PeerEntry {
621                peer_id: existing_peer,
622                direction: PeerDirection::Outbound,
623                state: ConnectionState::Discovered,
624                last_seen: Instant::now(),
625                peer: None,
626                pool: PeerPool::Other,
627                transport: PeerTransport::WebRtc,
628                signal_paths: BTreeSet::from([PeerSignalPath::WifiAware]),
629                bytes_sent: 0,
630                bytes_received: 0,
631            },
632        );
633
634        assert!(manager.can_track_local_bus_peer(WIFI_AWARE_SOURCE, &existing_key, &peers,));
635        assert!(!manager.can_track_local_bus_peer(WIFI_AWARE_SOURCE, "peer-b:sess-b", &peers,));
636        assert!(manager.can_track_local_bus_peer("relay", "peer-c:sess-c", &peers));
637    }
638
639    #[tokio::test]
640    async fn request_from_peers_with_source_accepts_generic_mesh_peers() {
641        let state = WebRTCState::new();
642        let data = b"offline-over-ble".to_vec();
643        let hash_hex = hex::encode(hashtree_core::sha256(&data));
644
645        state.runtime.peers.write().await.insert(
646            "peer-a".to_string(),
647            PeerEntry {
648                peer_id: PeerId::new("peer-a-pub".to_string()),
649                direction: PeerDirection::Outbound,
650                state: ConnectionState::Connected,
651                last_seen: Instant::now(),
652                peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_response(Some(
653                    data.clone(),
654                )))),
655                pool: PeerPool::Other,
656                transport: PeerTransport::Bluetooth,
657                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
658                bytes_sent: 0,
659                bytes_received: 0,
660            },
661        );
662
663        let resolved = state
664            .request_from_peers_with_source(&hash_hex)
665            .await
666            .expect("expected mock mesh peer response");
667
668        assert_eq!(resolved.0, data);
669        assert_eq!(resolved.1, "peer-a-pub");
670    }
671
672    #[tokio::test]
673    async fn request_from_peers_with_source_waits_full_timeout_for_last_generic_peer() {
674        let state = WebRTCState::new_with_routing_and_cashu(
675            SelectionStrategy::Weighted,
676            true,
677            RequestDispatchConfig {
678                initial_fanout: 1,
679                hedge_fanout: 1,
680                max_fanout: 1,
681                hedge_interval_ms: 50,
682            },
683            Duration::from_millis(400),
684            CashuRoutingConfig::default(),
685            None,
686            None,
687        );
688        let data = b"slow-offline-over-ble".to_vec();
689        let hash_hex = hex::encode(hashtree_core::sha256(&data));
690
691        state.runtime.peers.write().await.insert(
692            "peer-a".to_string(),
693            PeerEntry {
694                peer_id: PeerId::new("peer-a-pub".to_string()),
695                direction: PeerDirection::Outbound,
696                state: ConnectionState::Connected,
697                last_seen: Instant::now(),
698                peer: Some(MeshPeer::mock_for_tests(
699                    TestMeshPeer::with_delayed_response(
700                        Some(data.clone()),
701                        Duration::from_millis(200),
702                    ),
703                )),
704                pool: PeerPool::Other,
705                transport: PeerTransport::Bluetooth,
706                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
707                bytes_sent: 0,
708                bytes_received: 0,
709            },
710        );
711
712        let resolved = state
713            .request_from_peers_with_source(&hash_hex)
714            .await
715            .expect("expected delayed mock mesh peer response");
716
717        assert_eq!(resolved.0, data);
718        assert_eq!(resolved.1, "peer-a-pub");
719    }
720
721    #[tokio::test]
722    async fn request_from_peers_with_source_skips_peers_with_hash_get_disabled() {
723        let state = WebRTCState::new();
724        let capable_data = b"hash-get-capable".to_vec();
725        let capable_hash_hex = hex::encode(hashtree_core::sha256(&capable_data));
726
727        state.runtime.peers.write().await.insert(
728            "peer-assist".to_string(),
729            PeerEntry {
730                peer_id: PeerId::new("peer-assist-pub".to_string()),
731                direction: PeerDirection::Outbound,
732                state: ConnectionState::Connected,
733                last_seen: Instant::now(),
734                peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_response(Some(
735                    b"assist-should-not-be-queried".to_vec(),
736                )))),
737                pool: PeerPool::Other,
738                transport: PeerTransport::Bluetooth,
739                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
740                bytes_sent: 0,
741                bytes_received: 0,
742            },
743        );
744        state
745            .runtime
746            .set_peer_hash_get("peer-assist-pub", false)
747            .await;
748
749        state.runtime.peers.write().await.insert(
750            "peer-capable".to_string(),
751            PeerEntry {
752                peer_id: PeerId::new("peer-capable-pub".to_string()),
753                direction: PeerDirection::Outbound,
754                state: ConnectionState::Connected,
755                last_seen: Instant::now(),
756                peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_response(Some(
757                    capable_data.clone(),
758                )))),
759                pool: PeerPool::Other,
760                transport: PeerTransport::Bluetooth,
761                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
762                bytes_sent: 0,
763                bytes_received: 0,
764            },
765        );
766        state
767            .runtime
768            .set_peer_hash_get("peer-capable-pub", true)
769            .await;
770
771        let resolved = state
772            .request_from_peers_with_source(&capable_hash_hex)
773            .await
774            .expect("expected capable peer response");
775
776        assert_eq!(resolved.0, capable_data);
777        assert_eq!(resolved.1, "peer-capable-pub");
778    }
779
780    #[tokio::test]
781    async fn dispatch_signaling_message_is_noop_when_signaling_disabled() {
782        let keys = Keys::generate();
783        let mut config = WebRTCConfig::default();
784        config.signaling_enabled = false;
785        let manager = WebRTCManager::new(keys, config);
786        let peer_id = PeerId::new("peer-a-pub".to_string());
787        let peer_key = peer_id.to_string();
788        let peer = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
789        let peer_ref = peer.mock_ref().expect("mock peer").clone();
790
791        manager.state.runtime.peers.write().await.insert(
792            peer_key,
793            PeerEntry {
794                peer_id,
795                direction: PeerDirection::Outbound,
796                state: ConnectionState::Connected,
797                last_seen: Instant::now(),
798                peer: Some(peer),
799                pool: PeerPool::Other,
800                transport: PeerTransport::Bluetooth,
801                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
802                bytes_sent: 0,
803                bytes_received: 0,
804            },
805        );
806
807        manager
808            .dispatch_signaling_message(
809                SignalingMessage::Hello {
810                    peer_id: manager.my_peer_id.to_string(),
811                    roots: Vec::new(),
812                    hash_get: true,
813                },
814                None,
815            )
816            .await;
817
818        assert_eq!(peer_ref.sent_frame_count().await, 0);
819    }
820
821    #[tokio::test]
822    async fn failed_peer_cleanup_does_not_hold_peer_map_lock_while_closing() {
823        let keys = Keys::generate();
824        let manager = Arc::new(WebRTCManager::new(keys, WebRTCConfig::default()));
825        let peer_id = PeerId::new("peer-a-pub".to_string());
826        let peer_key = peer_id.to_string();
827
828        manager.state.runtime.peers.write().await.insert(
829            peer_key.clone(),
830            PeerEntry {
831                peer_id: peer_id.clone(),
832                direction: PeerDirection::Outbound,
833                state: ConnectionState::Connected,
834                last_seen: Instant::now(),
835                peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_delayed_close(
836                    Duration::from_millis(200),
837                ))),
838                pool: PeerPool::Other,
839                transport: PeerTransport::Bluetooth,
840                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
841                bytes_sent: 0,
842                bytes_received: 0,
843            },
844        );
845
846        let manager_for_task = manager.clone();
847        let peer_id_for_task = peer_id.clone();
848        let cleanup_task = tokio::spawn(async move {
849            manager_for_task
850                .handle_peer_state_event(PeerStateEvent::Failed(peer_id_for_task))
851                .await;
852        });
853
854        tokio::time::sleep(Duration::from_millis(20)).await;
855
856        let remaining = tokio::time::timeout(Duration::from_millis(50), async {
857            manager.state.runtime.peers.read().await.len()
858        })
859        .await
860        .expect("peer map read should not block on close");
861
862        assert_eq!(remaining, 0);
863        cleanup_task.await.expect("cleanup task");
864    }
865
866    #[tokio::test]
867    async fn resolve_root_from_peers_does_not_hold_peer_map_lock_while_querying() {
868        let keys = Keys::generate();
869        let manager = Arc::new(WebRTCManager::new(keys.clone(), WebRTCConfig::default()));
870        let owner_keys = Keys::generate();
871        let owner_pubkey = owner_keys.public_key().to_hex();
872        let tree_name = "video";
873        let hash = "ab".repeat(32);
874        let event = EventBuilder::new(
875            Kind::Custom(root_events::HASHTREE_KIND),
876            "",
877            [
878                Tag::parse(&["d", tree_name]).unwrap(),
879                Tag::parse(&["l", root_events::HASHTREE_LABEL]).unwrap(),
880                Tag::parse(&["hash", &hash]).unwrap(),
881            ],
882        )
883        .to_event(&owner_keys)
884        .unwrap();
885
886        let peer_id = PeerId::new("peer-a-pub".to_string());
887        let peer_key = peer_id.to_string();
888
889        manager.state.runtime.peers.write().await.insert(
890            peer_key.clone(),
891            PeerEntry {
892                peer_id,
893                direction: PeerDirection::Outbound,
894                state: ConnectionState::Connected,
895                last_seen: Instant::now(),
896                peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_delayed_events(
897                    vec![event],
898                    Duration::from_millis(200),
899                ))),
900                pool: PeerPool::Other,
901                transport: PeerTransport::Bluetooth,
902                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
903                bytes_sent: 0,
904                bytes_received: 0,
905            },
906        );
907
908        let manager_for_task = manager.clone();
909        let owner_pubkey_for_task = owner_pubkey.clone();
910        let resolve_task = tokio::spawn(async move {
911            manager_for_task
912                .state
913                .resolve_root_from_peers(
914                    &owner_pubkey_for_task,
915                    tree_name,
916                    Duration::from_millis(500),
917                )
918                .await
919        });
920
921        tokio::time::sleep(Duration::from_millis(20)).await;
922
923        let manager_for_writer = manager.clone();
924        let peer_key_for_writer = peer_key.clone();
925        let writer_task = tokio::spawn(async move {
926            let mut peers = manager_for_writer.state.runtime.peers.write().await;
927            if let Some(entry) = peers.get_mut(&peer_key_for_writer) {
928                entry.bytes_received += 1;
929            }
930        });
931
932        tokio::time::sleep(Duration::from_millis(20)).await;
933
934        let status_count = tokio::time::timeout(Duration::from_millis(50), async {
935            manager.state.runtime.peers.read().await.len()
936        })
937        .await
938        .expect("peer map read should not block on root query");
939
940        assert_eq!(status_count, 1);
941        assert!(resolve_task.await.expect("resolve task").is_some());
942        writer_task.await.expect("writer task");
943    }
944
945    #[test]
946    fn test_formal_timed_seen_set_rejects_duplicates() {
947        let mut seen = TimedSeenSet::new(4, Duration::from_secs(60));
948        assert!(seen.insert_if_new("frame-1".to_string()));
949        assert!(!seen.insert_if_new("frame-1".to_string()));
950        assert!(seen.insert_if_new("frame-2".to_string()));
951    }
952
953    #[test]
954    fn test_formal_timed_seen_set_evicts_oldest_when_capacity_exceeded() {
955        let mut seen = TimedSeenSet::new(2, Duration::from_secs(60));
956        assert!(seen.insert_if_new("a".to_string()));
957        assert!(seen.insert_if_new("b".to_string()));
958        assert!(seen.insert_if_new("c".to_string()));
959
960        // "a" should be evicted due to cap=2, so re-insert becomes new again.
961        assert!(seen.insert_if_new("a".to_string()));
962        assert!(!seen.insert_if_new("a".to_string()));
963    }
964
965    #[test]
966    fn test_request_dispatch_normalization_caps_to_available_peers() {
967        let normalized = normalize_dispatch_config(
968            RequestDispatchConfig {
969                initial_fanout: 8,
970                hedge_fanout: 6,
971                max_fanout: 5,
972                hedge_interval_ms: 120,
973            },
974            3,
975        );
976        assert_eq!(normalized.max_fanout, 3);
977        assert_eq!(normalized.initial_fanout, 3);
978        assert_eq!(normalized.hedge_fanout, 3);
979    }
980
981    #[test]
982    fn test_hedged_wave_plan_matches_dispatch_policy() {
983        let plan = build_hedged_wave_plan(
984            7,
985            RequestDispatchConfig {
986                initial_fanout: 2,
987                hedge_fanout: 3,
988                max_fanout: 6,
989                hedge_interval_ms: 120,
990            },
991        );
992        assert_eq!(plan, vec![2, 3, 1]);
993    }
994}