Skip to main content

hashtree_cli/webrtc/
wifi_aware.rs

1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use nostr::{nips::nip19::FromBech32, Event, Filter, JsonUtil, Keys, PublicKey};
4use std::collections::{HashMap, HashSet};
5use std::sync::{Arc, OnceLock};
6use std::time::Duration;
7use tokio::sync::{mpsc, watch, Mutex};
8use tracing::{debug, warn};
9
10use super::root_events::{
11    build_root_filter, is_hashtree_labeled_event, root_event_from_peer, PeerRootEvent,
12    HASHTREE_KIND,
13};
14use super::LocalNostrBus;
15use crate::nostr_relay::NostrRelay;
16
17pub const WIFI_AWARE_SOURCE: &str = "wifi-aware";
18const FRAME_VERSION: u8 = 1;
19const FRAME_KIND_QUERY_ROOT: u8 = 1;
20const FRAME_KIND_ROOT_RESPONSE: u8 = 2;
21const FRAME_KIND_QUERY_DONE: u8 = 3;
22const ROOT_FLAG_KEY: u8 = 1 << 0;
23const ROOT_FLAG_ENCRYPTED_KEY: u8 = 1 << 1;
24const ROOT_FLAG_SELF_ENCRYPTED_KEY: u8 = 1 << 2;
25
26#[derive(Debug, Clone)]
27pub struct WifiAwareConfig {
28    pub enabled: bool,
29    pub max_peers: usize,
30    pub announce_interval_ms: u64,
31}
32
33impl WifiAwareConfig {
34    pub fn is_enabled(&self) -> bool {
35        self.enabled && self.max_peers > 0
36    }
37}
38
39impl Default for WifiAwareConfig {
40    fn default() -> Self {
41        Self {
42            enabled: false,
43            max_peers: 0,
44            announce_interval_ms: 2_000,
45        }
46    }
47}
48
49#[derive(Debug, Clone)]
50pub enum WifiAwareEvent {
51    PeerDiscovered { peer_id: String },
52    PeerLost { peer_id: String },
53    Message { peer_id: String, payload: Vec<u8> },
54}
55
56#[async_trait]
57pub trait MobileWifiAwareBridge: Send + Sync {
58    async fn start(&self, local_peer_id: String) -> Result<mpsc::Receiver<WifiAwareEvent>>;
59
60    async fn broadcast_message(&self, payload: Vec<u8>) -> Result<()>;
61}
62
63static MOBILE_WIFI_AWARE_BRIDGE: OnceLock<Arc<dyn MobileWifiAwareBridge>> = OnceLock::new();
64
65pub fn install_mobile_wifi_aware_bridge(bridge: Arc<dyn MobileWifiAwareBridge>) -> Result<()> {
66    MOBILE_WIFI_AWARE_BRIDGE
67        .set(bridge)
68        .map_err(|_| anyhow!("mobile wifi aware bridge already installed"))
69}
70
71pub(crate) fn mobile_wifi_aware_bridge() -> Option<Arc<dyn MobileWifiAwareBridge>> {
72    MOBILE_WIFI_AWARE_BRIDGE.get().cloned()
73}
74
75pub struct WifiAwareNostrBus {
76    config: WifiAwareConfig,
77    keys: Keys,
78    relay: Arc<NostrRelay>,
79    bridge: Arc<dyn MobileWifiAwareBridge>,
80    pending_queries: Arc<Mutex<HashMap<u64, mpsc::UnboundedSender<PendingQueryMessage>>>>,
81    announced_event_ids: Arc<Mutex<HashSet<String>>>,
82}
83
84enum PendingQueryMessage {
85    Root(PeerRootEvent),
86    Done,
87}
88
89enum WifiAwareFrame {
90    QueryRoot {
91        request_id: u64,
92        owner_pubkey_hex: String,
93        tree_name: String,
94    },
95    RootResponse {
96        request_id: u64,
97        root: PeerRootEvent,
98    },
99    QueryDone {
100        request_id: u64,
101    },
102}
103
104#[async_trait]
105impl LocalNostrBus for WifiAwareNostrBus {
106    fn source_name(&self) -> &'static str {
107        WIFI_AWARE_SOURCE
108    }
109
110    async fn broadcast_event(&self, event: &Event) -> Result<()> {
111        WifiAwareNostrBus::broadcast_event(self, event).await
112    }
113
114    async fn query_root(
115        &self,
116        owner_pubkey: &str,
117        tree_name: &str,
118        timeout: Duration,
119    ) -> Option<PeerRootEvent> {
120        WifiAwareNostrBus::query_root(self, owner_pubkey, tree_name, timeout).await
121    }
122}
123
124impl WifiAwareNostrBus {
125    pub fn new(
126        config: WifiAwareConfig,
127        keys: Keys,
128        relay: Arc<NostrRelay>,
129        bridge: Arc<dyn MobileWifiAwareBridge>,
130    ) -> Arc<Self> {
131        Arc::new(Self {
132            config,
133            keys,
134            relay,
135            bridge,
136            pending_queries: Arc::new(Mutex::new(HashMap::new())),
137            announced_event_ids: Arc::new(Mutex::new(HashSet::new())),
138        })
139    }
140
141    pub async fn run(
142        self: Arc<Self>,
143        local_peer_id: String,
144        mut shutdown_rx: watch::Receiver<bool>,
145        signaling_tx: mpsc::Sender<(String, Event)>,
146    ) -> Result<()> {
147        let mut announce_ticker = tokio::time::interval(Duration::from_millis(
148            self.config.announce_interval_ms.max(1),
149        ));
150        let mut events = self.bridge.start(local_peer_id).await?;
151
152        loop {
153            tokio::select! {
154                _ = shutdown_rx.changed() => {
155                    if *shutdown_rx.borrow() {
156                        break;
157                    }
158                }
159                _ = announce_ticker.tick() => {
160                    if let Err(err) = self.broadcast_known_root_updates().await {
161                        debug!("wifi aware root announcement failed: {}", err);
162                    }
163                }
164                maybe_event = events.recv() => {
165                    match maybe_event {
166                        Some(WifiAwareEvent::Message { peer_id, payload }) => {
167                            self.handle_message(&peer_id, &payload, &signaling_tx).await;
168                        }
169                        Some(WifiAwareEvent::PeerDiscovered { peer_id }) => {
170                            debug!("wifi aware peer discovered: {}", peer_id);
171                        }
172                        Some(WifiAwareEvent::PeerLost { peer_id }) => {
173                            debug!("wifi aware peer lost: {}", peer_id);
174                        }
175                        None => break,
176                    }
177                }
178            }
179        }
180
181        Ok(())
182    }
183
184    pub async fn broadcast_event(&self, event: &Event) -> Result<()> {
185        // Keep raw event fallback for generic signaling until local bus semantics
186        // are narrowed further. Compact frames are used for root query/response.
187        self.bridge
188            .broadcast_message(event.as_json().into_bytes())
189            .await
190    }
191
192    pub async fn query_root(
193        &self,
194        owner_pubkey: &str,
195        tree_name: &str,
196        timeout: Duration,
197    ) -> Option<PeerRootEvent> {
198        let request_id = rand::random::<u64>();
199        let owner_bytes = owner_pubkey_bytes(owner_pubkey)?;
200        let request = encode_query_root(request_id, owner_bytes, tree_name)?;
201        let (tx, mut rx) = mpsc::unbounded_channel();
202        self.pending_queries.lock().await.insert(request_id, tx);
203
204        if self.bridge.broadcast_message(request).await.is_err() {
205            self.pending_queries.lock().await.remove(&request_id);
206            return None;
207        }
208
209        let mut roots = Vec::new();
210        let deadline = tokio::time::sleep(timeout);
211        tokio::pin!(deadline);
212
213        loop {
214            tokio::select! {
215                _ = &mut deadline => break,
216                maybe_msg = rx.recv() => {
217                    let Some(msg) = maybe_msg else {
218                        break;
219                    };
220                    match msg {
221                        PendingQueryMessage::Root(root) => roots.push(root),
222                        PendingQueryMessage::Done => break,
223                    }
224                }
225            }
226        }
227
228        self.pending_queries.lock().await.remove(&request_id);
229        pick_latest_root_event(&roots)
230    }
231
232    async fn handle_message(
233        &self,
234        peer_id: &str,
235        payload: &[u8],
236        signaling_tx: &mpsc::Sender<(String, Event)>,
237    ) {
238        if let Some(frame) = decode_frame(payload) {
239            match frame {
240                WifiAwareFrame::QueryRoot {
241                    request_id,
242                    owner_pubkey_hex,
243                    tree_name,
244                } => {
245                    self.respond_to_root_query(request_id, &owner_pubkey_hex, &tree_name)
246                        .await;
247                }
248                WifiAwareFrame::RootResponse { request_id, root } => {
249                    let tx = self.pending_queries.lock().await.get(&request_id).cloned();
250                    if let Some(tx) = tx {
251                        let _ = tx.send(PendingQueryMessage::Root(root));
252                    }
253                }
254                WifiAwareFrame::QueryDone { request_id } => {
255                    let tx = self.pending_queries.lock().await.get(&request_id).cloned();
256                    if let Some(tx) = tx {
257                        let _ = tx.send(PendingQueryMessage::Done);
258                    }
259                }
260            }
261            return;
262        }
263
264        let Ok(text) = std::str::from_utf8(payload) else {
265            debug!(
266                "ignoring non-utf8 wifi aware payload from {} ({} bytes)",
267                peer_id,
268                payload.len()
269            );
270            return;
271        };
272
273        if let Ok(event) = Event::from_json(text) {
274            if event.pubkey == self.keys.public_key() {
275                return;
276            }
277
278            if event.kind.is_ephemeral() {
279                let _ = signaling_tx
280                    .send((self.source_name().to_string(), event))
281                    .await;
282                return;
283            }
284
285            if event.kind == nostr::Kind::Custom(HASHTREE_KIND)
286                && is_hashtree_labeled_event(&event)
287                && event.verify().is_ok()
288            {
289                let _ = self.relay.ingest_trusted_event(event).await;
290            }
291            return;
292        }
293
294        debug!("ignoring wifi aware payload from {}: {}", peer_id, text);
295    }
296
297    async fn respond_to_root_query(&self, request_id: u64, owner_pubkey: &str, tree_name: &str) {
298        let Some(filter) = build_root_filter(owner_pubkey, tree_name) else {
299            let _ = self
300                .bridge
301                .broadcast_message(encode_query_done(request_id))
302                .await;
303            return;
304        };
305
306        for event in self.relay.query_events(&filter, 50).await {
307            let Some(root) = root_event_from_peer(&event, self.source_name(), tree_name) else {
308                continue;
309            };
310            let Some(encoded) = encode_root_response(request_id, &root) else {
311                warn!(
312                    "Skipping wifi aware root response for {} due to unsupported root fields",
313                    tree_name
314                );
315                continue;
316            };
317            if let Err(err) = self.bridge.broadcast_message(encoded).await {
318                warn!("wifi aware root response broadcast failed: {}", err);
319            }
320        }
321
322        if let Err(err) = self
323            .bridge
324            .broadcast_message(encode_query_done(request_id))
325            .await
326        {
327            warn!("wifi aware query-done broadcast failed: {}", err);
328        }
329    }
330
331    async fn broadcast_known_root_updates(&self) -> Result<()> {
332        let filter = Filter::new()
333            .kind(nostr::Kind::Custom(HASHTREE_KIND))
334            .author(self.keys.public_key())
335            .custom_tag(
336                nostr::SingleLetterTag::lowercase(nostr::Alphabet::L),
337                vec![super::root_events::HASHTREE_LABEL.to_string()],
338            )
339            .limit(256);
340        let events = self.relay.query_events(&filter, 256).await;
341        let mut announced = self.announced_event_ids.lock().await;
342        for event in events {
343            let event_id = event.id.to_hex();
344            if announced.insert(event_id) {
345                self.broadcast_event(&event).await?;
346            }
347        }
348        Ok(())
349    }
350}
351
352fn owner_pubkey_bytes(owner_pubkey: &str) -> Option<[u8; 32]> {
353    let pubkey = PublicKey::from_hex(owner_pubkey)
354        .or_else(|_| PublicKey::from_bech32(owner_pubkey))
355        .ok()?;
356    Some(pubkey.to_bytes())
357}
358
359fn hex_bytes_32(value: &str) -> Option<[u8; 32]> {
360    let decoded = hex::decode(value).ok()?;
361    decoded.try_into().ok()
362}
363
364fn push_u16(buf: &mut Vec<u8>, value: usize) -> Option<()> {
365    let value: u16 = value.try_into().ok()?;
366    buf.extend_from_slice(&value.to_be_bytes());
367    Some(())
368}
369
370fn read_u16(payload: &[u8], cursor: &mut usize) -> Option<usize> {
371    if payload.len() < *cursor + 2 {
372        return None;
373    }
374    let value = u16::from_be_bytes([payload[*cursor], payload[*cursor + 1]]) as usize;
375    *cursor += 2;
376    Some(value)
377}
378
379fn read_u64(payload: &[u8], cursor: &mut usize) -> Option<u64> {
380    if payload.len() < *cursor + 8 {
381        return None;
382    }
383    let bytes: [u8; 8] = payload[*cursor..*cursor + 8].try_into().ok()?;
384    *cursor += 8;
385    Some(u64::from_be_bytes(bytes))
386}
387
388fn read_exact<const N: usize>(payload: &[u8], cursor: &mut usize) -> Option<[u8; N]> {
389    if payload.len() < *cursor + N {
390        return None;
391    }
392    let bytes: [u8; N] = payload[*cursor..*cursor + N].try_into().ok()?;
393    *cursor += N;
394    Some(bytes)
395}
396
397fn read_tree_name(payload: &[u8], cursor: &mut usize) -> Option<String> {
398    let len = read_u16(payload, cursor)?;
399    if payload.len() < *cursor + len {
400        return None;
401    }
402    let value = std::str::from_utf8(&payload[*cursor..*cursor + len])
403        .ok()?
404        .to_string();
405    *cursor += len;
406    Some(value)
407}
408
409fn encode_query_root(request_id: u64, owner_pubkey: [u8; 32], tree_name: &str) -> Option<Vec<u8>> {
410    let tree_bytes = tree_name.as_bytes();
411    let mut payload = Vec::with_capacity(2 + 8 + 32 + 2 + tree_bytes.len());
412    payload.push(FRAME_VERSION);
413    payload.push(FRAME_KIND_QUERY_ROOT);
414    payload.extend_from_slice(&request_id.to_be_bytes());
415    payload.extend_from_slice(&owner_pubkey);
416    push_u16(&mut payload, tree_bytes.len())?;
417    payload.extend_from_slice(tree_bytes);
418    Some(payload)
419}
420
421fn encode_query_done(request_id: u64) -> Vec<u8> {
422    let mut payload = Vec::with_capacity(10);
423    payload.push(FRAME_VERSION);
424    payload.push(FRAME_KIND_QUERY_DONE);
425    payload.extend_from_slice(&request_id.to_be_bytes());
426    payload
427}
428
429fn encode_root_response(request_id: u64, root: &PeerRootEvent) -> Option<Vec<u8>> {
430    let event_id = hex_bytes_32(&root.event_id)?;
431    let hash = hex_bytes_32(&root.hash)?;
432    let key = match root.key.as_deref() {
433        Some(value) => Some(hex_bytes_32(value)?),
434        None => None,
435    };
436    let encrypted_key = match root.encrypted_key.as_deref() {
437        Some(value) => Some(hex_bytes_32(value)?),
438        None => None,
439    };
440    let self_encrypted_key = match root.self_encrypted_key.as_deref() {
441        Some(value) => Some(hex_bytes_32(value)?),
442        None => None,
443    };
444
445    let mut flags = 0u8;
446    if key.is_some() {
447        flags |= ROOT_FLAG_KEY;
448    }
449    if encrypted_key.is_some() {
450        flags |= ROOT_FLAG_ENCRYPTED_KEY;
451    }
452    if self_encrypted_key.is_some() {
453        flags |= ROOT_FLAG_SELF_ENCRYPTED_KEY;
454    }
455
456    let mut payload = Vec::with_capacity(2 + 8 + 8 + 32 + 32 + 1 + 96);
457    payload.push(FRAME_VERSION);
458    payload.push(FRAME_KIND_ROOT_RESPONSE);
459    payload.extend_from_slice(&request_id.to_be_bytes());
460    payload.extend_from_slice(&root.created_at.to_be_bytes());
461    payload.extend_from_slice(&event_id);
462    payload.extend_from_slice(&hash);
463    payload.push(flags);
464    if let Some(value) = key {
465        payload.extend_from_slice(&value);
466    }
467    if let Some(value) = encrypted_key {
468        payload.extend_from_slice(&value);
469    }
470    if let Some(value) = self_encrypted_key {
471        payload.extend_from_slice(&value);
472    }
473    Some(payload)
474}
475
476fn decode_frame(payload: &[u8]) -> Option<WifiAwareFrame> {
477    if payload.len() < 2 || payload[0] != FRAME_VERSION {
478        return None;
479    }
480
481    let mut cursor = 2;
482    match payload[1] {
483        FRAME_KIND_QUERY_ROOT => {
484            let request_id = read_u64(payload, &mut cursor)?;
485            let owner_pubkey = read_exact::<32>(payload, &mut cursor)?;
486            let tree_name = read_tree_name(payload, &mut cursor)?;
487            if cursor != payload.len() {
488                return None;
489            }
490            Some(WifiAwareFrame::QueryRoot {
491                request_id,
492                owner_pubkey_hex: hex::encode(owner_pubkey),
493                tree_name,
494            })
495        }
496        FRAME_KIND_ROOT_RESPONSE => {
497            let request_id = read_u64(payload, &mut cursor)?;
498            let created_at = read_u64(payload, &mut cursor)?;
499            let event_id = hex::encode(read_exact::<32>(payload, &mut cursor)?);
500            let hash = hex::encode(read_exact::<32>(payload, &mut cursor)?);
501            let flags = *payload.get(cursor)?;
502            cursor += 1;
503            let key = if flags & ROOT_FLAG_KEY != 0 {
504                Some(hex::encode(read_exact::<32>(payload, &mut cursor)?))
505            } else {
506                None
507            };
508            let encrypted_key = if flags & ROOT_FLAG_ENCRYPTED_KEY != 0 {
509                Some(hex::encode(read_exact::<32>(payload, &mut cursor)?))
510            } else {
511                None
512            };
513            let self_encrypted_key = if flags & ROOT_FLAG_SELF_ENCRYPTED_KEY != 0 {
514                Some(hex::encode(read_exact::<32>(payload, &mut cursor)?))
515            } else {
516                None
517            };
518            if cursor != payload.len() {
519                return None;
520            }
521            Some(WifiAwareFrame::RootResponse {
522                request_id,
523                root: PeerRootEvent {
524                    hash,
525                    key,
526                    encrypted_key,
527                    self_encrypted_key,
528                    event_id,
529                    created_at,
530                    peer_id: WIFI_AWARE_SOURCE.to_string(),
531                },
532            })
533        }
534        FRAME_KIND_QUERY_DONE => {
535            let request_id = read_u64(payload, &mut cursor)?;
536            if cursor != payload.len() {
537                return None;
538            }
539            Some(WifiAwareFrame::QueryDone { request_id })
540        }
541        _ => None,
542    }
543}
544
545fn pick_latest_root_event(events: &[PeerRootEvent]) -> Option<PeerRootEvent> {
546    events
547        .iter()
548        .max_by(|a, b| {
549            let ordering = a.created_at.cmp(&b.created_at);
550            if ordering == std::cmp::Ordering::Equal {
551                a.event_id.cmp(&b.event_id)
552            } else {
553                ordering
554            }
555        })
556        .cloned()
557}
558
559#[cfg(test)]
560mod tests {
561    use super::*;
562    use crate::nostr_relay::NostrRelayConfig;
563    use crate::socialgraph::{self, SocialGraphAccessControl, SocialGraphBackend};
564    use nostr::{Alphabet, EventBuilder, Kind, SingleLetterTag, Tag, TagKind};
565    use std::sync::Arc;
566    use tempfile::TempDir;
567    use tokio::sync::Mutex as AsyncMutex;
568
569    struct MockWifiAwareBridge {
570        sent_payloads: AsyncMutex<Vec<Vec<u8>>>,
571        response_events: AsyncMutex<Vec<Event>>,
572        event_tx: AsyncMutex<Option<mpsc::Sender<WifiAwareEvent>>>,
573    }
574
575    impl MockWifiAwareBridge {
576        fn new() -> Arc<Self> {
577            Arc::new(Self {
578                sent_payloads: AsyncMutex::new(Vec::new()),
579                response_events: AsyncMutex::new(Vec::new()),
580                event_tx: AsyncMutex::new(None),
581            })
582        }
583
584        async fn queue_response_event(&self, event: Event) {
585            self.response_events.lock().await.push(event);
586        }
587
588        async fn sent_payloads(&self) -> Vec<Vec<u8>> {
589            self.sent_payloads.lock().await.clone()
590        }
591
592        async fn wait_until_started(&self) {
593            for _ in 0..100 {
594                if self.event_tx.lock().await.is_some() {
595                    return;
596                }
597                tokio::time::sleep(Duration::from_millis(10)).await;
598            }
599            panic!("mock wifi aware bridge did not start in time");
600        }
601    }
602
603    #[async_trait]
604    impl MobileWifiAwareBridge for MockWifiAwareBridge {
605        async fn start(&self, _local_peer_id: String) -> Result<mpsc::Receiver<WifiAwareEvent>> {
606            let (tx, rx) = mpsc::channel(32);
607            *self.event_tx.lock().await = Some(tx);
608            Ok(rx)
609        }
610
611        async fn broadcast_message(&self, payload: Vec<u8>) -> Result<()> {
612            self.sent_payloads.lock().await.push(payload.clone());
613            let Some(tx) = self.event_tx.lock().await.clone() else {
614                return Ok(());
615            };
616
617            if let Some(WifiAwareFrame::QueryRoot {
618                request_id,
619                owner_pubkey_hex,
620                tree_name,
621            }) = decode_frame(&payload)
622            {
623                let response_events = self.response_events.lock().await.clone();
624                for event in response_events
625                    .iter()
626                    .filter(|event| event.pubkey.to_hex() == owner_pubkey_hex)
627                {
628                    let Some(root) = root_event_from_peer(event, WIFI_AWARE_SOURCE, &tree_name)
629                    else {
630                        continue;
631                    };
632                    let encoded = encode_root_response(request_id, &root)
633                        .expect("expected compact root response encoding");
634                    tx.send(WifiAwareEvent::Message {
635                        peer_id: "peer-b".to_string(),
636                        payload: encoded,
637                    })
638                    .await
639                    .map_err(|err| anyhow!("mock wifi aware event send failed: {}", err))?;
640                }
641                tx.send(WifiAwareEvent::Message {
642                    peer_id: "peer-b".to_string(),
643                    payload: encode_query_done(request_id),
644                })
645                .await
646                .map_err(|err| anyhow!("mock wifi aware query-done send failed: {}", err))?;
647            }
648            Ok(())
649        }
650    }
651
652    fn test_relay(keys: &Keys, tmp: &TempDir) -> Result<Arc<NostrRelay>> {
653        let _guard = socialgraph::test_lock();
654        let graph_store =
655            socialgraph::open_social_graph_store_with_mapsize(tmp.path(), Some(128 * 1024 * 1024))?;
656        let backend: Arc<dyn SocialGraphBackend> = graph_store.clone();
657        let access = Arc::new(SocialGraphAccessControl::new(
658            Arc::clone(&backend),
659            0,
660            HashSet::from([keys.public_key().to_hex()]),
661        ));
662        Ok(Arc::new(NostrRelay::new(
663            Arc::clone(&backend),
664            tmp.path().to_path_buf(),
665            HashSet::from([keys.public_key().to_hex()]),
666            Some(access),
667            NostrRelayConfig {
668                spambox_db_max_bytes: 0,
669                ..Default::default()
670            },
671        )?))
672    }
673
674    #[test]
675    fn compact_query_root_frame_round_trips() {
676        let owner =
677            owner_pubkey_bytes("79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798")
678                .expect("owner pubkey");
679        let frame = encode_query_root(42, owner, "video").expect("query frame");
680
681        match decode_frame(&frame).expect("decoded frame") {
682            WifiAwareFrame::QueryRoot {
683                request_id,
684                owner_pubkey_hex,
685                tree_name,
686            } => {
687                assert_eq!(request_id, 42);
688                assert_eq!(owner_pubkey_hex, hex::encode(owner));
689                assert_eq!(tree_name, "video");
690            }
691            _ => panic!("expected query-root frame"),
692        }
693    }
694
695    #[tokio::test]
696    async fn wifi_aware_bus_broadcast_event_forwards_json_bytes() -> Result<()> {
697        let bridge = MockWifiAwareBridge::new();
698        let bus_keys = Keys::generate();
699        let tmp = TempDir::new()?;
700        let relay = test_relay(&bus_keys, &tmp)?;
701        let bus = WifiAwareNostrBus::new(
702            WifiAwareConfig::default(),
703            bus_keys.clone(),
704            relay,
705            bridge.clone(),
706        );
707        let event =
708            EventBuilder::new(Kind::TextNote, "hello wifi aware", []).to_event(&bus_keys)?;
709
710        bus.broadcast_event(&event).await?;
711
712        let sent = bridge.sent_payloads().await;
713        assert_eq!(sent, vec![event.as_json().into_bytes()]);
714        Ok(())
715    }
716
717    #[tokio::test]
718    async fn wifi_aware_bus_query_root_returns_matching_event_and_sends_compact_query() -> Result<()>
719    {
720        let bridge = MockWifiAwareBridge::new();
721        let bus_keys = Keys::generate();
722        let author_keys = Keys::generate();
723        let tmp = TempDir::new()?;
724        let relay = test_relay(&bus_keys, &tmp)?;
725        let bus = WifiAwareNostrBus::new(
726            WifiAwareConfig {
727                enabled: true,
728                max_peers: 2,
729                announce_interval_ms: 60_000,
730            },
731            bus_keys,
732            relay,
733            bridge.clone(),
734        );
735        let root_event = EventBuilder::new(
736            Kind::Custom(HASHTREE_KIND),
737            "",
738            [
739                Tag::identifier("video".to_string()),
740                Tag::custom(
741                    TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
742                    vec!["hashtree".to_string()],
743                ),
744                Tag::custom(TagKind::Custom("hash".into()), vec!["ab".repeat(32)]),
745            ],
746        )
747        .to_event(&author_keys)?;
748        bridge.queue_response_event(root_event).await;
749
750        let (shutdown_tx, shutdown_rx) = watch::channel(false);
751        let bus_task = {
752            let bus = bus.clone();
753            tokio::spawn(async move {
754                let (signaling_tx, _signaling_rx) = mpsc::channel(8);
755                bus.run("local-peer".to_string(), shutdown_rx, signaling_tx)
756                    .await
757            })
758        };
759        bridge.wait_until_started().await;
760
761        let resolved = bus
762            .query_root(
763                &author_keys.public_key().to_hex(),
764                "video",
765                Duration::from_secs(1),
766            )
767            .await
768            .expect("expected wifi aware root");
769
770        assert_eq!(resolved.hash, "ab".repeat(32));
771        assert_eq!(resolved.peer_id, WIFI_AWARE_SOURCE);
772
773        let sent = bridge.sent_payloads().await;
774        let query_frame = sent.first().expect("expected compact query");
775        match decode_frame(query_frame).expect("expected decoded query") {
776            WifiAwareFrame::QueryRoot {
777                owner_pubkey_hex,
778                tree_name,
779                ..
780            } => {
781                assert_eq!(owner_pubkey_hex, author_keys.public_key().to_hex());
782                assert_eq!(tree_name, "video");
783            }
784            _ => panic!("expected first payload to be a query-root frame"),
785        }
786
787        shutdown_tx.send(true)?;
788        bus_task.await??;
789        Ok(())
790    }
791}