Skip to main content

hashtree_network/manager/
runtime.rs

1use super::*;
2
3impl WebRTCManager {
4    /// Start the native peer router - connects transports and handles signaling.
5    pub async fn run(&mut self) -> Result<()> {
6        info!(
7            "Starting peer router with peer ID: {}",
8            self.my_peer_id.short()
9        );
10
11        let (event_tx, mut event_rx) = mpsc::channel::<(String, nostr_sdk::nostr::Event)>(100);
12        let (relay_msg_tx, mut relay_msg_rx) = mpsc::channel::<SignalingMessage>(100);
13
14        // Take the signaling receiver
15        let mut signaling_rx = self
16            .signaling_rx
17            .take()
18            .expect("signaling_rx already taken");
19
20        // Take the state event receiver
21        let mut state_event_rx = self
22            .state_event_rx
23            .take()
24            .expect("state_event_rx already taken");
25        let mut mesh_frame_rx = self
26            .mesh_frame_rx
27            .take()
28            .expect("mesh_frame_rx already taken");
29
30        if self.config.bluetooth.is_enabled() {
31            let bluetooth = BluetoothMesh::new(self.config.bluetooth.clone());
32            let context = BluetoothRuntimeContext {
33                my_peer_id: self.my_peer_id.clone(),
34                store: if bluetooth_nostr_only_mode() {
35                    None
36                } else {
37                    self.store.clone()
38                },
39                nostr_relay: self.nostr_relay.clone(),
40                mesh_frame_tx: self.mesh_frame_tx.clone(),
41                registrar: BluetoothPeerRegistrar::new(
42                    self.state.clone(),
43                    self.peer_classifier.clone(),
44                    self.config.pools.clone(),
45                    self.config.bluetooth.max_peers,
46                ),
47            };
48            let _ = bluetooth.start(context).await;
49        }
50
51        let relay_transport = if self.config.signaling_enabled {
52            let transport = Arc::new(NostrRelayTransport::new(
53                self.keys.clone(),
54                self.config.debug,
55            ));
56            transport
57                .connect(&self.config.relays)
58                .await
59                .map_err(|e| anyhow::anyhow!(e.to_string()))?;
60
61            let relay_reader = transport.clone();
62            let relay_msg_tx = relay_msg_tx.clone();
63            tokio::spawn(async move {
64                while let Some(msg) = relay_reader.recv().await {
65                    if relay_msg_tx.send(msg).await.is_err() {
66                        break;
67                    }
68                }
69            });
70
71            Some(transport)
72        } else {
73            None
74        };
75
76        if self.config.multicast.is_enabled() {
77            if let Some(relay) = self.nostr_relay.clone() {
78                let relay = relay as crate::SharedMeshEventStore;
79                match MulticastNostrBus::bind(
80                    self.config.multicast.clone(),
81                    self.keys.clone(),
82                    relay,
83                )
84                .await
85                {
86                    Ok(bus) => {
87                        let local_bus: SharedLocalNostrBus = bus.clone();
88                        self.state.add_local_bus(local_bus.clone()).await;
89                        self.local_buses.push(local_bus);
90                        let shutdown_rx = self.shutdown_rx.clone();
91                        let signaling_tx = event_tx.clone();
92                        tokio::spawn(async move {
93                            if let Err(err) = bus.run(shutdown_rx, signaling_tx).await {
94                                error!("Multicast bus error: {}", err);
95                            }
96                        });
97                    }
98                    Err(err) => {
99                        warn!("Failed to start multicast bus: {}", err);
100                    }
101                }
102            } else {
103                warn!("Multicast enabled but Nostr relay is unavailable");
104            }
105        }
106
107        if self.config.wifi_aware.is_enabled() {
108            if let Some(relay) = self.nostr_relay.clone() {
109                if let Some(bridge) = mobile_wifi_aware_bridge() {
110                    let relay = relay as crate::SharedMeshEventStore;
111                    let bus = WifiAwareNostrBus::new(
112                        self.config.wifi_aware.clone(),
113                        self.keys.clone(),
114                        relay,
115                        bridge,
116                    );
117                    let local_bus: SharedLocalNostrBus = bus.clone();
118                    self.state.add_local_bus(local_bus.clone()).await;
119                    self.local_buses.push(local_bus);
120                    let shutdown_rx = self.shutdown_rx.clone();
121                    let signaling_tx = event_tx.clone();
122                    let local_peer_id = self.my_peer_id.to_string();
123                    tokio::spawn(async move {
124                        if let Err(err) = bus.run(local_peer_id, shutdown_rx, signaling_tx).await {
125                            error!("Wi-Fi Aware bus error: {}", err);
126                        }
127                    });
128                } else {
129                    warn!("Wi-Fi Aware enabled but no mobile bridge is installed");
130                }
131            } else {
132                warn!("Wi-Fi Aware enabled but Nostr relay is unavailable");
133            }
134        }
135
136        if self.config.signaling_enabled {
137            let transport = Arc::new(RouterSignalingBridge::new(
138                self.my_peer_id.to_string(),
139                self.signaling_tx.clone(),
140            ));
141            let factory = Arc::new(SharedRouterPeerFactory::new(
142                self.my_peer_id.clone(),
143                self.signaling_tx.clone(),
144                self.config.stun_servers.clone(),
145                self.store.clone(),
146                self.state.clone(),
147                self.state_event_tx.clone(),
148                self.nostr_relay.clone(),
149                self.mesh_frame_tx.clone(),
150                self.peer_classifier.clone(),
151            ));
152            let (classifier_tx, mut classifier_rx) = mpsc::channel::<SharedClassifyRequest>(32);
153            let classifier = self.peer_classifier.clone();
154            tokio::spawn(async move {
155                while let Some(request) = classifier_rx.recv().await {
156                    let _ = request.response.send(classifier(&request.pubkey));
157                }
158            });
159
160            let mut router = MeshRouter::new(
161                self.my_peer_id.to_string(),
162                transport,
163                factory.clone(),
164                self.config.pools.clone(),
165                self.config.debug,
166            );
167            router.set_classifier(classifier_tx);
168            self.shared_router = Some(Arc::new(router));
169        }
170
171        // Process incoming events and outgoing signaling messages
172        let mut shutdown_rx = self.shutdown_rx.clone();
173        // Cleanup interval - run every 30 seconds as a fallback (not for real-time sync)
174        let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
175        let mut hello_ticker =
176            tokio::time::interval(Duration::from_millis(self.config.hello_interval_ms));
177        if self.config.signaling_enabled {
178            if let Some(shared_router) = self.shared_router.as_ref() {
179                let _ = shared_router.send_hello(Vec::new()).await;
180            }
181        }
182        loop {
183            tokio::select! {
184                _ = shutdown_rx.changed() => {
185                    if *shutdown_rx.borrow() {
186                        info!("WebRTC manager shutting down");
187                        break;
188                    }
189                }
190                Some(msg) = relay_msg_rx.recv() => {
191                    if let Err(e) = self
192                        .handle_signaling_message("relay", msg, self.shared_router.as_ref())
193                        .await
194                    {
195                        debug!("Error handling relay signaling message: {}", e);
196                    }
197                }
198                Some((relay, event)) = event_rx.recv() => {
199                    if let Err(e) = self
200                        .handle_event(&relay, &event, self.shared_router.as_ref())
201                        .await
202                    {
203                        debug!("Error handling event from {}: {}", relay, e);
204                    }
205                }
206                Some(msg) = signaling_rx.recv() => {
207                    self.dispatch_signaling_message(msg, relay_transport.as_ref()).await;
208                }
209                Some(event) = state_event_rx.recv() => {
210                    // Handle peer state events (connected, failed, disconnected)
211                    self.handle_peer_state_event(event).await;
212                }
213                Some((from_peer_id, frame)) = mesh_frame_rx.recv() => {
214                    self.handle_mesh_frame(from_peer_id, frame).await;
215                }
216                _ = hello_ticker.tick(), if self.config.signaling_enabled => {
217                    if let Some(shared_router) = self.shared_router.as_ref() {
218                        let _ = shared_router.send_hello(Vec::new()).await;
219                    }
220                }
221                _ = cleanup_interval.tick() => {
222                    // Periodic cleanup of stale peers and state sync (fallback)
223                    self.cleanup_stale_peers().await;
224                }
225            }
226        }
227
228        Ok(())
229    }
230
231    async fn mark_seen_frame_id(&self, frame_id: String) -> bool {
232        let mut seen = self.seen_frame_ids.lock().await;
233        seen.insert_if_new(frame_id)
234    }
235
236    async fn mark_seen_event_id(&self, event_id: String) -> bool {
237        let mut seen = self.seen_event_ids.lock().await;
238        seen.insert_if_new(event_id)
239    }
240
241    async fn dispatch_signaling_message(
242        &self,
243        msg: SignalingMessage,
244        relay_transport: Option<&Arc<NostrRelayTransport>>,
245    ) {
246        if let Err(err) = crate::dispatch_signaling_message(
247            self.config.signaling_enabled,
248            &self.keys,
249            &self.my_peer_id,
250            &self.state.runtime,
251            relay_transport,
252            &self.local_buses,
253            &self.seen_frame_ids,
254            &self.seen_event_ids,
255            msg,
256            MESH_SIGNALING_EVENT_KIND as u64,
257        )
258        .await
259        {
260            debug!("Failed to dispatch signaling message: {}", err);
261        }
262    }
263
264    async fn forward_mesh_frame(
265        &self,
266        frame: &MeshNostrFrame,
267        exclude_peer_id: Option<&str>,
268    ) -> usize {
269        crate::forward_mesh_frame_from_runtime(&self.state.runtime, frame, exclude_peer_id).await
270    }
271
272    async fn handle_mesh_frame(&self, from_peer_id: PeerId, frame: MeshNostrFrame) {
273        if let Err(reason) = validate_mesh_frame(&frame) {
274            debug!(
275                "Ignoring mesh frame from {} (invalid: {})",
276                from_peer_id.short(),
277                reason
278            );
279            return;
280        }
281
282        if !self.mark_seen_frame_id(frame.frame_id.clone()).await {
283            self.state.record_mesh_duplicate_drop();
284            return;
285        }
286
287        let event = match &frame.payload {
288            MeshNostrPayload::Event { event } => event.clone(),
289        };
290
291        if !self.mark_seen_event_id(event.id.to_hex()).await {
292            self.state.record_mesh_duplicate_drop();
293            return;
294        }
295
296        if event.verify().is_err() {
297            debug!(
298                "Ignoring mesh event from {} due to invalid signature",
299                from_peer_id.short()
300            );
301            return;
302        }
303
304        self.state.record_mesh_received();
305
306        if let Err(e) = self
307            .handle_event("mesh", &event, self.shared_router.as_ref())
308            .await
309        {
310            debug!(
311                "Error handling mesh event from {}: {}",
312                from_peer_id.short(),
313                e
314            );
315        }
316
317        let forwarded = self
318            .forward_mesh_frame(&frame, Some(&from_peer_id.to_string()))
319            .await;
320        if forwarded > 0 {
321            self.state.record_mesh_forwarded(forwarded as u64);
322        }
323    }
324
325    /// Handle an incoming event
326    ///
327    /// Messages may be:
328    /// 1. Hello messages: kind 25050 with #l: "hello" tag and peerId
329    /// 2. Gift-wrapped directed messages: kind 25050 with #p tag, encrypted with ephemeral key
330    async fn handle_event(
331        &self,
332        relay: &str,
333        event: &nostr_sdk::nostr::Event,
334        shared_router: Option<&Arc<SharedProductionRouter>>,
335    ) -> Result<()> {
336        crate::handle_signaling_event(
337            self.config.signaling_enabled,
338            &self.my_peer_id,
339            &self.keys,
340            &self.state.runtime,
341            relay,
342            self.local_bus_max_peers(relay),
343            event,
344            shared_router,
345        )
346        .await
347    }
348
349    async fn handle_signaling_message(
350        &self,
351        source: &str,
352        msg: SignalingMessage,
353        shared_router: Option<&Arc<SharedProductionRouter>>,
354    ) -> Result<()> {
355        crate::handle_signaling_message(
356            &self.state.runtime,
357            source,
358            self.local_bus_max_peers(source),
359            msg,
360            shared_router,
361        )
362        .await
363    }
364
365    /// Handle peer state change events from peer connections
366    async fn handle_peer_state_event(&self, event: PeerStateEvent) {
367        crate::handle_peer_state_event(&self.state.runtime, event, self.shared_router.as_ref())
368            .await;
369    }
370
371    /// Cleanup stale peers and sync connection states (fallback, runs every 30s)
372    async fn cleanup_stale_peers(&self) {
373        crate::cleanup_stale_peers(&self.state.runtime, Duration::from_secs(60)).await;
374    }
375}
376
377// Keep the old PeerState for backward compatibility with tests
378#[allow(dead_code)]
379#[derive(Debug, Clone)]
380pub struct PeerState {
381    pub peer_id: PeerId,
382    pub direction: PeerDirection,
383    pub state: String,
384    pub last_seen: Instant,
385}
386
387#[cfg(test)]
388mod tests {
389    use super::*;
390    use crate::root_events::{self, PeerRootEvent};
391    use crate::session::TestMeshPeer;
392    use crate::LocalNostrBus;
393    use crate::SelectionStrategy;
394    use crate::{build_hedged_wave_plan, normalize_dispatch_config};
395    use anyhow::Result as AnyResult;
396    use async_trait::async_trait;
397    use nostr_sdk::nostr::{EventBuilder, Keys, Tag};
398    use std::time::Duration;
399
400    struct TestLocalBus {
401        source: &'static str,
402        root: Option<PeerRootEvent>,
403    }
404
405    #[async_trait]
406    impl LocalNostrBus for TestLocalBus {
407        fn source_name(&self) -> &'static str {
408            self.source
409        }
410
411        async fn broadcast_event(&self, _event: &nostr_sdk::nostr::Event) -> AnyResult<()> {
412            Ok(())
413        }
414
415        async fn query_root(
416            &self,
417            _owner_pubkey: &str,
418            _tree_name: &str,
419            _timeout: Duration,
420        ) -> Option<PeerRootEvent> {
421            self.root.clone()
422        }
423    }
424
425    #[test]
426    fn root_event_from_peer_extracts_tags() {
427        let keys = Keys::generate();
428        let hash = "ab".repeat(32);
429        let event = EventBuilder::new(
430            Kind::Custom(root_events::HASHTREE_KIND),
431            "",
432            [
433                Tag::parse(&["d", "repo"]).unwrap(),
434                Tag::parse(&["l", root_events::HASHTREE_LABEL]).unwrap(),
435                Tag::parse(&["hash", &hash]).unwrap(),
436                Tag::parse(&["encryptedKey", &"11".repeat(32)]).unwrap(),
437            ],
438        )
439        .to_event(&keys)
440        .unwrap();
441
442        let parsed = root_events::root_event_from_peer(&event, "peer-a", "repo").unwrap();
443        let expected_encrypted = "11".repeat(32);
444        assert_eq!(parsed.hash, hash);
445        assert_eq!(parsed.peer_id, "peer-a");
446        assert_eq!(
447            parsed.encrypted_key.as_deref(),
448            Some(expected_encrypted.as_str())
449        );
450        assert!(parsed.key.is_none());
451    }
452
453    #[test]
454    fn pick_latest_event_prefers_higher_event_id_on_timestamp_tie() {
455        let keys = Keys::generate();
456        let created_at = nostr_sdk::nostr::Timestamp::from_secs(1_700_000_000);
457        let event_a = EventBuilder::new(Kind::Custom(root_events::HASHTREE_KIND), "", [])
458            .custom_created_at(created_at)
459            .to_event(&keys)
460            .unwrap();
461        let event_b = EventBuilder::new(Kind::Custom(root_events::HASHTREE_KIND), "", [])
462            .custom_created_at(created_at)
463            .to_event(&keys)
464            .unwrap();
465
466        let expected = if event_a.id > event_b.id {
467            event_a.id
468        } else {
469            event_b.id
470        };
471        let picked = root_events::pick_latest_event([&event_a, &event_b]).unwrap();
472        assert_eq!(picked.id, expected);
473    }
474
475    #[tokio::test]
476    async fn resolve_root_from_local_buses_returns_source_and_first_match() {
477        let state = WebRTCState::new();
478        let root = PeerRootEvent {
479            hash: "ab".repeat(32),
480            key: None,
481            encrypted_key: None,
482            self_encrypted_key: None,
483            event_id: "event-1".to_string(),
484            created_at: 1,
485            peer_id: "bus-peer".to_string(),
486        };
487
488        state
489            .set_local_buses(vec![
490                Arc::new(TestLocalBus {
491                    source: "empty",
492                    root: None,
493                }),
494                Arc::new(TestLocalBus {
495                    source: "mock-bus",
496                    root: Some(root.clone()),
497                }),
498            ])
499            .await;
500
501        let resolved = state
502            .resolve_root_from_local_buses_with_source("owner", "tree", Duration::from_millis(10))
503            .await
504            .expect("expected root from local bus");
505
506        assert_eq!(resolved.0, "mock-bus");
507        assert_eq!(resolved.1.hash, root.hash);
508        assert_eq!(resolved.1.peer_id, root.peer_id);
509    }
510
511    #[tokio::test]
512    async fn can_track_local_bus_peer_enforces_wifi_aware_limit() {
513        let keys = Keys::generate();
514        let mut config = WebRTCConfig::default();
515        config.wifi_aware.enabled = true;
516        config.wifi_aware.max_peers = 1;
517        let manager = WebRTCManager::new(keys, config);
518        let existing_peer = PeerId::new("peer-a".to_string());
519        let existing_key = existing_peer.to_string();
520        let mut peers = HashMap::new();
521        peers.insert(
522            existing_key.clone(),
523            PeerEntry {
524                peer_id: existing_peer,
525                direction: PeerDirection::Outbound,
526                state: ConnectionState::Discovered,
527                last_seen: Instant::now(),
528                peer: None,
529                pool: PeerPool::Other,
530                transport: PeerTransport::WebRtc,
531                signal_paths: BTreeSet::from([PeerSignalPath::WifiAware]),
532                bytes_sent: 0,
533                bytes_received: 0,
534            },
535        );
536
537        assert!(manager.can_track_local_bus_peer(WIFI_AWARE_SOURCE, &existing_key, &peers,));
538        assert!(!manager.can_track_local_bus_peer(WIFI_AWARE_SOURCE, "peer-b:sess-b", &peers,));
539        assert!(manager.can_track_local_bus_peer("relay", "peer-c:sess-c", &peers));
540    }
541
542    #[tokio::test]
543    async fn request_from_peers_with_source_accepts_generic_mesh_peers() {
544        let state = WebRTCState::new();
545        let data = b"offline-over-ble".to_vec();
546        let hash_hex = hex::encode(hashtree_core::sha256(&data));
547
548        state.runtime.peers.write().await.insert(
549            "peer-a".to_string(),
550            PeerEntry {
551                peer_id: PeerId::new("peer-a-pub".to_string()),
552                direction: PeerDirection::Outbound,
553                state: ConnectionState::Connected,
554                last_seen: Instant::now(),
555                peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_response(Some(
556                    data.clone(),
557                )))),
558                pool: PeerPool::Other,
559                transport: PeerTransport::Bluetooth,
560                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
561                bytes_sent: 0,
562                bytes_received: 0,
563            },
564        );
565
566        let resolved = state
567            .request_from_peers_with_source(&hash_hex)
568            .await
569            .expect("expected mock mesh peer response");
570
571        assert_eq!(resolved.0, data);
572        assert_eq!(resolved.1, "peer-a-pub");
573    }
574
575    #[tokio::test]
576    async fn request_from_peers_with_source_waits_full_timeout_for_last_generic_peer() {
577        let state = WebRTCState::new_with_routing_and_cashu(
578            SelectionStrategy::TitForTat,
579            true,
580            RequestDispatchConfig {
581                initial_fanout: 1,
582                hedge_fanout: 1,
583                max_fanout: 1,
584                hedge_interval_ms: 50,
585            },
586            Duration::from_millis(400),
587            CashuRoutingConfig::default(),
588            None,
589            None,
590        );
591        let data = b"slow-offline-over-ble".to_vec();
592        let hash_hex = hex::encode(hashtree_core::sha256(&data));
593
594        state.runtime.peers.write().await.insert(
595            "peer-a".to_string(),
596            PeerEntry {
597                peer_id: PeerId::new("peer-a-pub".to_string()),
598                direction: PeerDirection::Outbound,
599                state: ConnectionState::Connected,
600                last_seen: Instant::now(),
601                peer: Some(MeshPeer::mock_for_tests(
602                    TestMeshPeer::with_delayed_response(
603                        Some(data.clone()),
604                        Duration::from_millis(200),
605                    ),
606                )),
607                pool: PeerPool::Other,
608                transport: PeerTransport::Bluetooth,
609                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
610                bytes_sent: 0,
611                bytes_received: 0,
612            },
613        );
614
615        let resolved = state
616            .request_from_peers_with_source(&hash_hex)
617            .await
618            .expect("expected delayed mock mesh peer response");
619
620        assert_eq!(resolved.0, data);
621        assert_eq!(resolved.1, "peer-a-pub");
622    }
623
624    #[tokio::test]
625    async fn dispatch_signaling_message_is_noop_when_signaling_disabled() {
626        let keys = Keys::generate();
627        let mut config = WebRTCConfig::default();
628        config.signaling_enabled = false;
629        let manager = WebRTCManager::new(keys, config);
630        let peer_id = PeerId::new("peer-a-pub".to_string());
631        let peer_key = peer_id.to_string();
632        let peer = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
633        let peer_ref = peer.mock_ref().expect("mock peer").clone();
634
635        manager.state.runtime.peers.write().await.insert(
636            peer_key,
637            PeerEntry {
638                peer_id,
639                direction: PeerDirection::Outbound,
640                state: ConnectionState::Connected,
641                last_seen: Instant::now(),
642                peer: Some(peer),
643                pool: PeerPool::Other,
644                transport: PeerTransport::Bluetooth,
645                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
646                bytes_sent: 0,
647                bytes_received: 0,
648            },
649        );
650
651        manager
652            .dispatch_signaling_message(
653                SignalingMessage::Hello {
654                    peer_id: manager.my_peer_id.to_string(),
655                    roots: Vec::new(),
656                },
657                None,
658            )
659            .await;
660
661        assert_eq!(peer_ref.sent_frame_count().await, 0);
662    }
663
664    #[tokio::test]
665    async fn failed_peer_cleanup_does_not_hold_peer_map_lock_while_closing() {
666        let keys = Keys::generate();
667        let manager = Arc::new(WebRTCManager::new(keys, WebRTCConfig::default()));
668        let peer_id = PeerId::new("peer-a-pub".to_string());
669        let peer_key = peer_id.to_string();
670
671        manager.state.runtime.peers.write().await.insert(
672            peer_key.clone(),
673            PeerEntry {
674                peer_id: peer_id.clone(),
675                direction: PeerDirection::Outbound,
676                state: ConnectionState::Connected,
677                last_seen: Instant::now(),
678                peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_delayed_close(
679                    Duration::from_millis(200),
680                ))),
681                pool: PeerPool::Other,
682                transport: PeerTransport::Bluetooth,
683                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
684                bytes_sent: 0,
685                bytes_received: 0,
686            },
687        );
688
689        let manager_for_task = manager.clone();
690        let peer_id_for_task = peer_id.clone();
691        let cleanup_task = tokio::spawn(async move {
692            manager_for_task
693                .handle_peer_state_event(PeerStateEvent::Failed(peer_id_for_task))
694                .await;
695        });
696
697        tokio::time::sleep(Duration::from_millis(20)).await;
698
699        let remaining = tokio::time::timeout(Duration::from_millis(50), async {
700            manager.state.runtime.peers.read().await.len()
701        })
702        .await
703        .expect("peer map read should not block on close");
704
705        assert_eq!(remaining, 0);
706        cleanup_task.await.expect("cleanup task");
707    }
708
709    #[tokio::test]
710    async fn resolve_root_from_peers_does_not_hold_peer_map_lock_while_querying() {
711        let keys = Keys::generate();
712        let manager = Arc::new(WebRTCManager::new(keys.clone(), WebRTCConfig::default()));
713        let owner_keys = Keys::generate();
714        let owner_pubkey = owner_keys.public_key().to_hex();
715        let tree_name = "video";
716        let hash = "ab".repeat(32);
717        let event = EventBuilder::new(
718            Kind::Custom(root_events::HASHTREE_KIND),
719            "",
720            [
721                Tag::parse(&["d", tree_name]).unwrap(),
722                Tag::parse(&["l", root_events::HASHTREE_LABEL]).unwrap(),
723                Tag::parse(&["hash", &hash]).unwrap(),
724            ],
725        )
726        .to_event(&owner_keys)
727        .unwrap();
728
729        let peer_id = PeerId::new("peer-a-pub".to_string());
730        let peer_key = peer_id.to_string();
731
732        manager.state.runtime.peers.write().await.insert(
733            peer_key.clone(),
734            PeerEntry {
735                peer_id,
736                direction: PeerDirection::Outbound,
737                state: ConnectionState::Connected,
738                last_seen: Instant::now(),
739                peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_delayed_events(
740                    vec![event],
741                    Duration::from_millis(200),
742                ))),
743                pool: PeerPool::Other,
744                transport: PeerTransport::Bluetooth,
745                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
746                bytes_sent: 0,
747                bytes_received: 0,
748            },
749        );
750
751        let manager_for_task = manager.clone();
752        let owner_pubkey_for_task = owner_pubkey.clone();
753        let resolve_task = tokio::spawn(async move {
754            manager_for_task
755                .state
756                .resolve_root_from_peers(
757                    &owner_pubkey_for_task,
758                    tree_name,
759                    Duration::from_millis(500),
760                )
761                .await
762        });
763
764        tokio::time::sleep(Duration::from_millis(20)).await;
765
766        let manager_for_writer = manager.clone();
767        let peer_key_for_writer = peer_key.clone();
768        let writer_task = tokio::spawn(async move {
769            let mut peers = manager_for_writer.state.runtime.peers.write().await;
770            if let Some(entry) = peers.get_mut(&peer_key_for_writer) {
771                entry.bytes_received += 1;
772            }
773        });
774
775        tokio::time::sleep(Duration::from_millis(20)).await;
776
777        let status_count = tokio::time::timeout(Duration::from_millis(50), async {
778            manager.state.runtime.peers.read().await.len()
779        })
780        .await
781        .expect("peer map read should not block on root query");
782
783        assert_eq!(status_count, 1);
784        assert!(resolve_task.await.expect("resolve task").is_some());
785        writer_task.await.expect("writer task");
786    }
787
788    #[test]
789    fn test_formal_timed_seen_set_rejects_duplicates() {
790        let mut seen = TimedSeenSet::new(4, Duration::from_secs(60));
791        assert!(seen.insert_if_new("frame-1".to_string()));
792        assert!(!seen.insert_if_new("frame-1".to_string()));
793        assert!(seen.insert_if_new("frame-2".to_string()));
794    }
795
796    #[test]
797    fn test_formal_timed_seen_set_evicts_oldest_when_capacity_exceeded() {
798        let mut seen = TimedSeenSet::new(2, Duration::from_secs(60));
799        assert!(seen.insert_if_new("a".to_string()));
800        assert!(seen.insert_if_new("b".to_string()));
801        assert!(seen.insert_if_new("c".to_string()));
802
803        // "a" should be evicted due to cap=2, so re-insert becomes new again.
804        assert!(seen.insert_if_new("a".to_string()));
805        assert!(!seen.insert_if_new("a".to_string()));
806    }
807
808    #[test]
809    fn test_request_dispatch_normalization_caps_to_available_peers() {
810        let normalized = normalize_dispatch_config(
811            RequestDispatchConfig {
812                initial_fanout: 8,
813                hedge_fanout: 6,
814                max_fanout: 5,
815                hedge_interval_ms: 120,
816            },
817            3,
818        );
819        assert_eq!(normalized.max_fanout, 3);
820        assert_eq!(normalized.initial_fanout, 3);
821        assert_eq!(normalized.hedge_fanout, 3);
822    }
823
824    #[test]
825    fn test_hedged_wave_plan_matches_dispatch_policy() {
826        let plan = build_hedged_wave_plan(
827            7,
828            RequestDispatchConfig {
829                initial_fanout: 2,
830                hedge_fanout: 3,
831                max_fanout: 6,
832                hedge_interval_ms: 120,
833            },
834        );
835        assert_eq!(plan, vec![2, 3, 1]);
836    }
837}