Skip to main content

hashtree_network/
wifi_aware.rs

1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use nostr_sdk::nostr::{nips::nip19::FromBech32, Event, Filter, JsonUtil, Keys, PublicKey};
4use std::collections::{HashMap, HashSet};
5use std::sync::{Arc, OnceLock};
6use std::time::Duration;
7use tokio::sync::{mpsc, watch, Mutex};
8use tracing::{debug, warn};
9
10use crate::local_bus::LocalNostrBus;
11use crate::relay_bridge::SharedMeshEventStore;
12use crate::root_events::{
13    build_root_filter, is_hashtree_labeled_event, root_event_from_peer, PeerRootEvent,
14    HASHTREE_KIND, HASHTREE_LABEL,
15};
16
17pub const WIFI_AWARE_SOURCE: &str = "wifi-aware";
18const FRAME_VERSION: u8 = 1;
19const FRAME_KIND_QUERY_ROOT: u8 = 1;
20const FRAME_KIND_ROOT_RESPONSE: u8 = 2;
21const FRAME_KIND_QUERY_DONE: u8 = 3;
22const ROOT_FLAG_KEY: u8 = 1 << 0;
23const ROOT_FLAG_ENCRYPTED_KEY: u8 = 1 << 1;
24const ROOT_FLAG_SELF_ENCRYPTED_KEY: u8 = 1 << 2;
25
26#[derive(Debug, Clone)]
27pub struct WifiAwareConfig {
28    pub enabled: bool,
29    pub max_peers: usize,
30    pub announce_interval_ms: u64,
31}
32
33impl WifiAwareConfig {
34    pub fn is_enabled(&self) -> bool {
35        self.enabled && self.max_peers > 0
36    }
37}
38
39impl Default for WifiAwareConfig {
40    fn default() -> Self {
41        Self {
42            enabled: false,
43            max_peers: 0,
44            announce_interval_ms: 2_000,
45        }
46    }
47}
48
49#[derive(Debug, Clone)]
50pub enum WifiAwareEvent {
51    PeerDiscovered { peer_id: String },
52    PeerLost { peer_id: String },
53    Message { peer_id: String, payload: Vec<u8> },
54}
55
56#[async_trait]
57pub trait MobileWifiAwareBridge: Send + Sync {
58    async fn start(&self, local_peer_id: String) -> Result<mpsc::Receiver<WifiAwareEvent>>;
59
60    async fn broadcast_message(&self, payload: Vec<u8>) -> Result<()>;
61}
62
63static MOBILE_WIFI_AWARE_BRIDGE: OnceLock<Arc<dyn MobileWifiAwareBridge>> = OnceLock::new();
64
65pub fn install_mobile_wifi_aware_bridge(bridge: Arc<dyn MobileWifiAwareBridge>) -> Result<()> {
66    MOBILE_WIFI_AWARE_BRIDGE
67        .set(bridge)
68        .map_err(|_| anyhow!("mobile wifi aware bridge already installed"))
69}
70
71pub fn mobile_wifi_aware_bridge() -> Option<Arc<dyn MobileWifiAwareBridge>> {
72    MOBILE_WIFI_AWARE_BRIDGE.get().cloned()
73}
74
75pub struct WifiAwareNostrBus {
76    config: WifiAwareConfig,
77    keys: Keys,
78    relay: SharedMeshEventStore,
79    bridge: Arc<dyn MobileWifiAwareBridge>,
80    pending_queries: Arc<Mutex<HashMap<u64, mpsc::UnboundedSender<PendingQueryMessage>>>>,
81    announced_event_ids: Arc<Mutex<HashSet<String>>>,
82}
83
84enum PendingQueryMessage {
85    Root(PeerRootEvent),
86    Done,
87}
88
89enum WifiAwareFrame {
90    QueryRoot {
91        request_id: u64,
92        owner_pubkey_hex: String,
93        tree_name: String,
94    },
95    RootResponse {
96        request_id: u64,
97        root: PeerRootEvent,
98    },
99    QueryDone {
100        request_id: u64,
101    },
102}
103
104#[async_trait]
105impl LocalNostrBus for WifiAwareNostrBus {
106    fn source_name(&self) -> &'static str {
107        WIFI_AWARE_SOURCE
108    }
109
110    async fn broadcast_event(&self, event: &Event) -> Result<()> {
111        WifiAwareNostrBus::broadcast_event(self, event).await
112    }
113
114    async fn query_root(
115        &self,
116        owner_pubkey: &str,
117        tree_name: &str,
118        timeout: Duration,
119    ) -> Option<PeerRootEvent> {
120        WifiAwareNostrBus::query_root(self, owner_pubkey, tree_name, timeout).await
121    }
122}
123
124impl WifiAwareNostrBus {
125    pub fn new(
126        config: WifiAwareConfig,
127        keys: Keys,
128        relay: SharedMeshEventStore,
129        bridge: Arc<dyn MobileWifiAwareBridge>,
130    ) -> Arc<Self> {
131        Arc::new(Self {
132            config,
133            keys,
134            relay,
135            bridge,
136            pending_queries: Arc::new(Mutex::new(HashMap::new())),
137            announced_event_ids: Arc::new(Mutex::new(HashSet::new())),
138        })
139    }
140
141    pub async fn run(
142        self: Arc<Self>,
143        local_peer_id: String,
144        mut shutdown_rx: watch::Receiver<bool>,
145        signaling_tx: mpsc::Sender<(String, Event)>,
146    ) -> Result<()> {
147        let mut announce_ticker = tokio::time::interval(Duration::from_millis(
148            self.config.announce_interval_ms.max(1),
149        ));
150        let mut events = self.bridge.start(local_peer_id).await?;
151
152        loop {
153            tokio::select! {
154                _ = shutdown_rx.changed() => {
155                    if *shutdown_rx.borrow() {
156                        break;
157                    }
158                }
159                _ = announce_ticker.tick() => {
160                    if let Err(err) = self.broadcast_known_root_updates().await {
161                        debug!("wifi aware root announcement failed: {}", err);
162                    }
163                }
164                maybe_event = events.recv() => {
165                    match maybe_event {
166                        Some(WifiAwareEvent::Message { peer_id, payload }) => {
167                            self.handle_message(&peer_id, &payload, &signaling_tx).await;
168                        }
169                        Some(WifiAwareEvent::PeerDiscovered { peer_id }) => {
170                            debug!("wifi aware peer discovered: {}", peer_id);
171                        }
172                        Some(WifiAwareEvent::PeerLost { peer_id }) => {
173                            debug!("wifi aware peer lost: {}", peer_id);
174                        }
175                        None => break,
176                    }
177                }
178            }
179        }
180
181        Ok(())
182    }
183
184    pub async fn broadcast_event(&self, event: &Event) -> Result<()> {
185        self.bridge
186            .broadcast_message(event.as_json().into_bytes())
187            .await
188    }
189
190    pub async fn query_root(
191        &self,
192        owner_pubkey: &str,
193        tree_name: &str,
194        timeout: Duration,
195    ) -> Option<PeerRootEvent> {
196        let request_id = rand::random::<u64>();
197        let owner_bytes = owner_pubkey_bytes(owner_pubkey)?;
198        let request = encode_query_root(request_id, owner_bytes, tree_name)?;
199        let (tx, mut rx) = mpsc::unbounded_channel();
200        self.pending_queries.lock().await.insert(request_id, tx);
201
202        if self.bridge.broadcast_message(request).await.is_err() {
203            self.pending_queries.lock().await.remove(&request_id);
204            return None;
205        }
206
207        let mut roots = Vec::new();
208        let deadline = tokio::time::sleep(timeout);
209        tokio::pin!(deadline);
210
211        loop {
212            tokio::select! {
213                _ = &mut deadline => break,
214                maybe_msg = rx.recv() => {
215                    let Some(msg) = maybe_msg else {
216                        break;
217                    };
218                    match msg {
219                        PendingQueryMessage::Root(root) => roots.push(root),
220                        PendingQueryMessage::Done => break,
221                    }
222                }
223            }
224        }
225
226        self.pending_queries.lock().await.remove(&request_id);
227        pick_latest_root_event(&roots)
228    }
229
230    async fn handle_message(
231        &self,
232        peer_id: &str,
233        payload: &[u8],
234        signaling_tx: &mpsc::Sender<(String, Event)>,
235    ) {
236        if let Some(frame) = decode_frame(payload) {
237            match frame {
238                WifiAwareFrame::QueryRoot {
239                    request_id,
240                    owner_pubkey_hex,
241                    tree_name,
242                } => {
243                    self.respond_to_root_query(request_id, &owner_pubkey_hex, &tree_name)
244                        .await;
245                }
246                WifiAwareFrame::RootResponse { request_id, root } => {
247                    let tx = self.pending_queries.lock().await.get(&request_id).cloned();
248                    if let Some(tx) = tx {
249                        let _ = tx.send(PendingQueryMessage::Root(root));
250                    }
251                }
252                WifiAwareFrame::QueryDone { request_id } => {
253                    let tx = self.pending_queries.lock().await.get(&request_id).cloned();
254                    if let Some(tx) = tx {
255                        let _ = tx.send(PendingQueryMessage::Done);
256                    }
257                }
258            }
259            return;
260        }
261
262        let Ok(text) = std::str::from_utf8(payload) else {
263            debug!(
264                "ignoring non-utf8 wifi aware payload from {} ({} bytes)",
265                peer_id,
266                payload.len()
267            );
268            return;
269        };
270
271        if let Ok(event) = Event::from_json(text) {
272            if event.pubkey == self.keys.public_key() {
273                return;
274            }
275
276            if event.kind.is_ephemeral() {
277                let _ = signaling_tx
278                    .send((self.source_name().to_string(), event))
279                    .await;
280                return;
281            }
282
283            if event.kind == nostr_sdk::nostr::Kind::Custom(HASHTREE_KIND)
284                && is_hashtree_labeled_event(&event)
285                && event.verify().is_ok()
286            {
287                let _ = self.relay.ingest_trusted_event(event).await;
288            }
289            return;
290        }
291
292        debug!("ignoring wifi aware payload from {}: {}", peer_id, text);
293    }
294
295    async fn respond_to_root_query(&self, request_id: u64, owner_pubkey: &str, tree_name: &str) {
296        let Some(filter) = build_root_filter(owner_pubkey, tree_name) else {
297            let _ = self
298                .bridge
299                .broadcast_message(encode_query_done(request_id))
300                .await;
301            return;
302        };
303
304        for event in self.relay.query_events(&filter, 50).await {
305            let Some(root) = root_event_from_peer(&event, self.source_name(), tree_name) else {
306                continue;
307            };
308            let Some(encoded) = encode_root_response(request_id, &root) else {
309                warn!(
310                    "Skipping wifi aware root response for {} due to unsupported root fields",
311                    tree_name
312                );
313                continue;
314            };
315            if let Err(err) = self.bridge.broadcast_message(encoded).await {
316                warn!("wifi aware root response broadcast failed: {}", err);
317            }
318        }
319
320        if let Err(err) = self
321            .bridge
322            .broadcast_message(encode_query_done(request_id))
323            .await
324        {
325            warn!("wifi aware query-done broadcast failed: {}", err);
326        }
327    }
328
329    async fn broadcast_known_root_updates(&self) -> Result<()> {
330        let filter = Filter::new()
331            .kind(nostr_sdk::nostr::Kind::Custom(HASHTREE_KIND))
332            .author(self.keys.public_key())
333            .custom_tag(
334                nostr_sdk::nostr::SingleLetterTag::lowercase(nostr_sdk::nostr::Alphabet::L),
335                vec![HASHTREE_LABEL.to_string()],
336            )
337            .limit(256);
338        let events = self.relay.query_events(&filter, 256).await;
339        let mut announced = self.announced_event_ids.lock().await;
340        for event in events {
341            let event_id = event.id.to_hex();
342            if announced.insert(event_id) {
343                self.broadcast_event(&event).await?;
344            }
345        }
346        Ok(())
347    }
348}
349
350fn owner_pubkey_bytes(owner_pubkey: &str) -> Option<[u8; 32]> {
351    let pubkey = PublicKey::from_hex(owner_pubkey)
352        .or_else(|_| PublicKey::from_bech32(owner_pubkey))
353        .ok()?;
354    Some(pubkey.to_bytes())
355}
356
357fn hex_bytes_32(value: &str) -> Option<[u8; 32]> {
358    let decoded = hex::decode(value).ok()?;
359    decoded.try_into().ok()
360}
361
362fn push_u16(buf: &mut Vec<u8>, value: usize) -> Option<()> {
363    let value: u16 = value.try_into().ok()?;
364    buf.extend_from_slice(&value.to_be_bytes());
365    Some(())
366}
367
368fn read_u16(payload: &[u8], cursor: &mut usize) -> Option<usize> {
369    if payload.len() < *cursor + 2 {
370        return None;
371    }
372    let value = u16::from_be_bytes([payload[*cursor], payload[*cursor + 1]]) as usize;
373    *cursor += 2;
374    Some(value)
375}
376
377fn read_u64(payload: &[u8], cursor: &mut usize) -> Option<u64> {
378    if payload.len() < *cursor + 8 {
379        return None;
380    }
381    let bytes: [u8; 8] = payload[*cursor..*cursor + 8].try_into().ok()?;
382    *cursor += 8;
383    Some(u64::from_be_bytes(bytes))
384}
385
386fn read_exact<const N: usize>(payload: &[u8], cursor: &mut usize) -> Option<[u8; N]> {
387    if payload.len() < *cursor + N {
388        return None;
389    }
390    let bytes: [u8; N] = payload[*cursor..*cursor + N].try_into().ok()?;
391    *cursor += N;
392    Some(bytes)
393}
394
395fn read_tree_name(payload: &[u8], cursor: &mut usize) -> Option<String> {
396    let len = read_u16(payload, cursor)?;
397    if payload.len() < *cursor + len {
398        return None;
399    }
400    let value = std::str::from_utf8(&payload[*cursor..*cursor + len])
401        .ok()?
402        .to_string();
403    *cursor += len;
404    Some(value)
405}
406
407fn encode_query_root(request_id: u64, owner_pubkey: [u8; 32], tree_name: &str) -> Option<Vec<u8>> {
408    let tree_bytes = tree_name.as_bytes();
409    let mut payload = Vec::with_capacity(2 + 8 + 32 + 2 + tree_bytes.len());
410    payload.push(FRAME_VERSION);
411    payload.push(FRAME_KIND_QUERY_ROOT);
412    payload.extend_from_slice(&request_id.to_be_bytes());
413    payload.extend_from_slice(&owner_pubkey);
414    push_u16(&mut payload, tree_bytes.len())?;
415    payload.extend_from_slice(tree_bytes);
416    Some(payload)
417}
418
419fn encode_query_done(request_id: u64) -> Vec<u8> {
420    let mut payload = Vec::with_capacity(10);
421    payload.push(FRAME_VERSION);
422    payload.push(FRAME_KIND_QUERY_DONE);
423    payload.extend_from_slice(&request_id.to_be_bytes());
424    payload
425}
426
427fn encode_root_response(request_id: u64, root: &PeerRootEvent) -> Option<Vec<u8>> {
428    let event_id = hex_bytes_32(&root.event_id)?;
429    let hash = hex_bytes_32(&root.hash)?;
430    let key = match root.key.as_deref() {
431        Some(value) => Some(hex_bytes_32(value)?),
432        None => None,
433    };
434    let encrypted_key = match root.encrypted_key.as_deref() {
435        Some(value) => Some(hex_bytes_32(value)?),
436        None => None,
437    };
438    let self_encrypted_key = match root.self_encrypted_key.as_deref() {
439        Some(value) => Some(hex_bytes_32(value)?),
440        None => None,
441    };
442
443    let mut flags = 0u8;
444    if key.is_some() {
445        flags |= ROOT_FLAG_KEY;
446    }
447    if encrypted_key.is_some() {
448        flags |= ROOT_FLAG_ENCRYPTED_KEY;
449    }
450    if self_encrypted_key.is_some() {
451        flags |= ROOT_FLAG_SELF_ENCRYPTED_KEY;
452    }
453
454    let mut payload = Vec::with_capacity(2 + 8 + 8 + 32 + 32 + 1 + 96);
455    payload.push(FRAME_VERSION);
456    payload.push(FRAME_KIND_ROOT_RESPONSE);
457    payload.extend_from_slice(&request_id.to_be_bytes());
458    payload.extend_from_slice(&root.created_at.to_be_bytes());
459    payload.extend_from_slice(&event_id);
460    payload.extend_from_slice(&hash);
461    payload.push(flags);
462    if let Some(value) = key {
463        payload.extend_from_slice(&value);
464    }
465    if let Some(value) = encrypted_key {
466        payload.extend_from_slice(&value);
467    }
468    if let Some(value) = self_encrypted_key {
469        payload.extend_from_slice(&value);
470    }
471    Some(payload)
472}
473
474fn decode_frame(payload: &[u8]) -> Option<WifiAwareFrame> {
475    if payload.len() < 2 || payload[0] != FRAME_VERSION {
476        return None;
477    }
478
479    let mut cursor = 2;
480    match payload[1] {
481        FRAME_KIND_QUERY_ROOT => {
482            let request_id = read_u64(payload, &mut cursor)?;
483            let owner_pubkey = read_exact::<32>(payload, &mut cursor)?;
484            let tree_name = read_tree_name(payload, &mut cursor)?;
485            if cursor != payload.len() {
486                return None;
487            }
488            Some(WifiAwareFrame::QueryRoot {
489                request_id,
490                owner_pubkey_hex: hex::encode(owner_pubkey),
491                tree_name,
492            })
493        }
494        FRAME_KIND_ROOT_RESPONSE => {
495            let request_id = read_u64(payload, &mut cursor)?;
496            let created_at = read_u64(payload, &mut cursor)?;
497            let event_id = hex::encode(read_exact::<32>(payload, &mut cursor)?);
498            let hash = hex::encode(read_exact::<32>(payload, &mut cursor)?);
499            let flags = *payload.get(cursor)?;
500            cursor += 1;
501            let key = if flags & ROOT_FLAG_KEY != 0 {
502                Some(hex::encode(read_exact::<32>(payload, &mut cursor)?))
503            } else {
504                None
505            };
506            let encrypted_key = if flags & ROOT_FLAG_ENCRYPTED_KEY != 0 {
507                Some(hex::encode(read_exact::<32>(payload, &mut cursor)?))
508            } else {
509                None
510            };
511            let self_encrypted_key = if flags & ROOT_FLAG_SELF_ENCRYPTED_KEY != 0 {
512                Some(hex::encode(read_exact::<32>(payload, &mut cursor)?))
513            } else {
514                None
515            };
516            if cursor != payload.len() {
517                return None;
518            }
519            Some(WifiAwareFrame::RootResponse {
520                request_id,
521                root: PeerRootEvent {
522                    hash,
523                    key,
524                    encrypted_key,
525                    self_encrypted_key,
526                    event_id,
527                    created_at,
528                    peer_id: WIFI_AWARE_SOURCE.to_string(),
529                },
530            })
531        }
532        FRAME_KIND_QUERY_DONE => {
533            let request_id = read_u64(payload, &mut cursor)?;
534            if cursor != payload.len() {
535                return None;
536            }
537            Some(WifiAwareFrame::QueryDone { request_id })
538        }
539        _ => None,
540    }
541}
542
543fn pick_latest_root_event(events: &[PeerRootEvent]) -> Option<PeerRootEvent> {
544    events
545        .iter()
546        .max_by(|a, b| {
547            let ordering = a.created_at.cmp(&b.created_at);
548            if ordering == std::cmp::Ordering::Equal {
549                a.event_id.cmp(&b.event_id)
550            } else {
551                ordering
552            }
553        })
554        .cloned()
555}
556
557#[cfg(test)]
558mod tests {
559    use super::*;
560    use crate::relay_bridge::MeshEventStore;
561    use nostr_sdk::nostr::{Alphabet, EventBuilder, Kind, SingleLetterTag, Tag, TagKind};
562    use tokio::sync::Mutex as AsyncMutex;
563
564    #[derive(Default)]
565    struct TestEventStore {
566        events: Mutex<Vec<Event>>,
567    }
568
569    #[async_trait]
570    impl MeshEventStore for TestEventStore {
571        async fn ingest_trusted_event(&self, event: Event) -> Result<()> {
572            self.events.lock().await.push(event);
573            Ok(())
574        }
575
576        async fn query_events(&self, filter: &Filter, limit: usize) -> Vec<Event> {
577            self.events
578                .lock()
579                .await
580                .iter()
581                .filter(|event| filter.match_event(event))
582                .take(limit)
583                .cloned()
584                .collect()
585        }
586    }
587
588    struct MockWifiAwareBridge {
589        sent_payloads: AsyncMutex<Vec<Vec<u8>>>,
590        response_events: AsyncMutex<Vec<Event>>,
591        event_tx: AsyncMutex<Option<mpsc::Sender<WifiAwareEvent>>>,
592    }
593
594    impl MockWifiAwareBridge {
595        fn new() -> Arc<Self> {
596            Arc::new(Self {
597                sent_payloads: AsyncMutex::new(Vec::new()),
598                response_events: AsyncMutex::new(Vec::new()),
599                event_tx: AsyncMutex::new(None),
600            })
601        }
602
603        async fn queue_response_event(&self, event: Event) {
604            self.response_events.lock().await.push(event);
605        }
606
607        async fn sent_payloads(&self) -> Vec<Vec<u8>> {
608            self.sent_payloads.lock().await.clone()
609        }
610
611        async fn wait_until_started(&self) {
612            for _ in 0..100 {
613                if self.event_tx.lock().await.is_some() {
614                    return;
615                }
616                tokio::time::sleep(Duration::from_millis(10)).await;
617            }
618            panic!("mock wifi aware bridge did not start in time");
619        }
620    }
621
622    #[async_trait]
623    impl MobileWifiAwareBridge for MockWifiAwareBridge {
624        async fn start(&self, _local_peer_id: String) -> Result<mpsc::Receiver<WifiAwareEvent>> {
625            let (tx, rx) = mpsc::channel(32);
626            *self.event_tx.lock().await = Some(tx);
627            Ok(rx)
628        }
629
630        async fn broadcast_message(&self, payload: Vec<u8>) -> Result<()> {
631            self.sent_payloads.lock().await.push(payload.clone());
632            let Some(tx) = self.event_tx.lock().await.clone() else {
633                return Ok(());
634            };
635
636            if let Some(WifiAwareFrame::QueryRoot {
637                request_id,
638                owner_pubkey_hex,
639                tree_name,
640            }) = decode_frame(&payload)
641            {
642                let response_events = self.response_events.lock().await.clone();
643                for event in response_events
644                    .iter()
645                    .filter(|event| event.pubkey.to_hex() == owner_pubkey_hex)
646                {
647                    let Some(root) = root_event_from_peer(event, WIFI_AWARE_SOURCE, &tree_name)
648                    else {
649                        continue;
650                    };
651                    let encoded = encode_root_response(request_id, &root)
652                        .expect("expected compact root response encoding");
653                    tx.send(WifiAwareEvent::Message {
654                        peer_id: "peer-b".to_string(),
655                        payload: encoded,
656                    })
657                    .await
658                    .map_err(|err| anyhow!("mock wifi aware event send failed: {}", err))?;
659                }
660                tx.send(WifiAwareEvent::Message {
661                    peer_id: "peer-b".to_string(),
662                    payload: encode_query_done(request_id),
663                })
664                .await
665                .map_err(|err| anyhow!("mock wifi aware query-done send failed: {}", err))?;
666            }
667            Ok(())
668        }
669    }
670
671    #[test]
672    fn compact_query_root_frame_round_trips() {
673        let owner =
674            owner_pubkey_bytes("79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798")
675                .expect("owner pubkey");
676        let frame = encode_query_root(42, owner, "video").expect("query frame");
677
678        match decode_frame(&frame).expect("decoded frame") {
679            WifiAwareFrame::QueryRoot {
680                request_id,
681                owner_pubkey_hex,
682                tree_name,
683            } => {
684                assert_eq!(request_id, 42);
685                assert_eq!(owner_pubkey_hex, hex::encode(owner));
686                assert_eq!(tree_name, "video");
687            }
688            _ => panic!("expected query-root frame"),
689        }
690    }
691
692    #[tokio::test]
693    async fn wifi_aware_bus_broadcast_event_forwards_json_bytes() -> Result<()> {
694        let bridge = MockWifiAwareBridge::new();
695        let bus_keys = Keys::generate();
696        let relay = Arc::new(TestEventStore::default()) as SharedMeshEventStore;
697        let bus = WifiAwareNostrBus::new(
698            WifiAwareConfig::default(),
699            bus_keys.clone(),
700            relay,
701            bridge.clone(),
702        );
703        let event =
704            EventBuilder::new(Kind::TextNote, "hello wifi aware", []).to_event(&bus_keys)?;
705
706        bus.broadcast_event(&event).await?;
707
708        let sent = bridge.sent_payloads().await;
709        assert_eq!(sent, vec![event.as_json().into_bytes()]);
710        Ok(())
711    }
712
713    #[tokio::test]
714    async fn wifi_aware_bus_query_root_returns_matching_event_and_sends_compact_query() -> Result<()>
715    {
716        let bridge = MockWifiAwareBridge::new();
717        let bus_keys = Keys::generate();
718        let author_keys = Keys::generate();
719        let relay = Arc::new(TestEventStore::default()) as SharedMeshEventStore;
720        let bus = WifiAwareNostrBus::new(
721            WifiAwareConfig {
722                enabled: true,
723                max_peers: 2,
724                announce_interval_ms: 60_000,
725            },
726            bus_keys,
727            relay,
728            bridge.clone(),
729        );
730        let root_event = EventBuilder::new(
731            Kind::Custom(HASHTREE_KIND),
732            "",
733            [
734                Tag::identifier("video".to_string()),
735                Tag::custom(
736                    TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
737                    vec!["hashtree".to_string()],
738                ),
739                Tag::custom(TagKind::Custom("hash".into()), vec!["ab".repeat(32)]),
740            ],
741        )
742        .to_event(&author_keys)?;
743        bridge.queue_response_event(root_event).await;
744
745        let (_shutdown_tx, shutdown_rx) = watch::channel(false);
746        let bus_task = {
747            let bus = bus.clone();
748            tokio::spawn(async move {
749                let (signaling_tx, _signaling_rx) = mpsc::channel(8);
750                bus.run("local-peer".to_string(), shutdown_rx, signaling_tx)
751                    .await
752            })
753        };
754        bridge.wait_until_started().await;
755
756        let resolved = bus
757            .query_root(
758                &author_keys.public_key().to_hex(),
759                "video",
760                Duration::from_secs(1),
761            )
762            .await
763            .expect("root event");
764        assert_eq!(resolved.hash, "ab".repeat(32));
765        assert_eq!(resolved.peer_id, WIFI_AWARE_SOURCE);
766
767        let payloads = bridge.sent_payloads().await;
768        assert!(decode_frame(&payloads[0]).is_some());
769
770        bus_task.abort();
771        Ok(())
772    }
773}