1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use nostr_sdk::nostr::{nips::nip19::FromBech32, Event, Filter, JsonUtil, Keys, PublicKey};
4use std::collections::{HashMap, HashSet};
5use std::sync::{Arc, OnceLock};
6use std::time::Duration;
7use tokio::sync::{mpsc, watch, Mutex};
8use tracing::{debug, warn};
9
10use crate::local_bus::LocalNostrBus;
11use crate::relay_bridge::SharedMeshEventStore;
12use crate::root_events::{
13 build_root_filter, is_hashtree_labeled_event, root_event_from_peer, PeerRootEvent,
14 HASHTREE_KIND, HASHTREE_LABEL,
15};
16
17pub const WIFI_AWARE_SOURCE: &str = "wifi-aware";
18const FRAME_VERSION: u8 = 1;
19const FRAME_KIND_QUERY_ROOT: u8 = 1;
20const FRAME_KIND_ROOT_RESPONSE: u8 = 2;
21const FRAME_KIND_QUERY_DONE: u8 = 3;
22const ROOT_FLAG_KEY: u8 = 1 << 0;
23const ROOT_FLAG_ENCRYPTED_KEY: u8 = 1 << 1;
24const ROOT_FLAG_SELF_ENCRYPTED_KEY: u8 = 1 << 2;
25
26#[derive(Debug, Clone)]
27pub struct WifiAwareConfig {
28 pub enabled: bool,
29 pub max_peers: usize,
30 pub announce_interval_ms: u64,
31}
32
33impl WifiAwareConfig {
34 pub fn is_enabled(&self) -> bool {
35 self.enabled && self.max_peers > 0
36 }
37}
38
39impl Default for WifiAwareConfig {
40 fn default() -> Self {
41 Self {
42 enabled: false,
43 max_peers: 0,
44 announce_interval_ms: 2_000,
45 }
46 }
47}
48
49#[derive(Debug, Clone)]
50pub enum WifiAwareEvent {
51 PeerDiscovered { peer_id: String },
52 PeerLost { peer_id: String },
53 Message { peer_id: String, payload: Vec<u8> },
54}
55
56#[async_trait]
57pub trait MobileWifiAwareBridge: Send + Sync {
58 async fn start(&self, local_peer_id: String) -> Result<mpsc::Receiver<WifiAwareEvent>>;
59
60 async fn broadcast_message(&self, payload: Vec<u8>) -> Result<()>;
61}
62
63static MOBILE_WIFI_AWARE_BRIDGE: OnceLock<Arc<dyn MobileWifiAwareBridge>> = OnceLock::new();
64
65pub fn install_mobile_wifi_aware_bridge(bridge: Arc<dyn MobileWifiAwareBridge>) -> Result<()> {
66 MOBILE_WIFI_AWARE_BRIDGE
67 .set(bridge)
68 .map_err(|_| anyhow!("mobile wifi aware bridge already installed"))
69}
70
71pub fn mobile_wifi_aware_bridge() -> Option<Arc<dyn MobileWifiAwareBridge>> {
72 MOBILE_WIFI_AWARE_BRIDGE.get().cloned()
73}
74
75pub struct WifiAwareNostrBus {
76 config: WifiAwareConfig,
77 keys: Keys,
78 relay: SharedMeshEventStore,
79 bridge: Arc<dyn MobileWifiAwareBridge>,
80 pending_queries: Arc<Mutex<HashMap<u64, mpsc::UnboundedSender<PendingQueryMessage>>>>,
81 announced_event_ids: Arc<Mutex<HashSet<String>>>,
82}
83
84enum PendingQueryMessage {
85 Root(PeerRootEvent),
86 Done,
87}
88
89enum WifiAwareFrame {
90 QueryRoot {
91 request_id: u64,
92 owner_pubkey_hex: String,
93 tree_name: String,
94 },
95 RootResponse {
96 request_id: u64,
97 root: PeerRootEvent,
98 },
99 QueryDone {
100 request_id: u64,
101 },
102}
103
104#[async_trait]
105impl LocalNostrBus for WifiAwareNostrBus {
106 fn source_name(&self) -> &'static str {
107 WIFI_AWARE_SOURCE
108 }
109
110 async fn broadcast_event(&self, event: &Event) -> Result<()> {
111 WifiAwareNostrBus::broadcast_event(self, event).await
112 }
113
114 async fn query_root(
115 &self,
116 owner_pubkey: &str,
117 tree_name: &str,
118 timeout: Duration,
119 ) -> Option<PeerRootEvent> {
120 WifiAwareNostrBus::query_root(self, owner_pubkey, tree_name, timeout).await
121 }
122}
123
124impl WifiAwareNostrBus {
125 pub fn new(
126 config: WifiAwareConfig,
127 keys: Keys,
128 relay: SharedMeshEventStore,
129 bridge: Arc<dyn MobileWifiAwareBridge>,
130 ) -> Arc<Self> {
131 Arc::new(Self {
132 config,
133 keys,
134 relay,
135 bridge,
136 pending_queries: Arc::new(Mutex::new(HashMap::new())),
137 announced_event_ids: Arc::new(Mutex::new(HashSet::new())),
138 })
139 }
140
141 pub async fn run(
142 self: Arc<Self>,
143 local_peer_id: String,
144 mut shutdown_rx: watch::Receiver<bool>,
145 signaling_tx: mpsc::Sender<(String, Event)>,
146 ) -> Result<()> {
147 let mut announce_ticker = tokio::time::interval(Duration::from_millis(
148 self.config.announce_interval_ms.max(1),
149 ));
150 let mut events = self.bridge.start(local_peer_id).await?;
151
152 loop {
153 tokio::select! {
154 _ = shutdown_rx.changed() => {
155 if *shutdown_rx.borrow() {
156 break;
157 }
158 }
159 _ = announce_ticker.tick() => {
160 if let Err(err) = self.broadcast_known_root_updates().await {
161 debug!("wifi aware root announcement failed: {}", err);
162 }
163 }
164 maybe_event = events.recv() => {
165 match maybe_event {
166 Some(WifiAwareEvent::Message { peer_id, payload }) => {
167 self.handle_message(&peer_id, &payload, &signaling_tx).await;
168 }
169 Some(WifiAwareEvent::PeerDiscovered { peer_id }) => {
170 debug!("wifi aware peer discovered: {}", peer_id);
171 }
172 Some(WifiAwareEvent::PeerLost { peer_id }) => {
173 debug!("wifi aware peer lost: {}", peer_id);
174 }
175 None => break,
176 }
177 }
178 }
179 }
180
181 Ok(())
182 }
183
184 pub async fn broadcast_event(&self, event: &Event) -> Result<()> {
185 self.bridge
186 .broadcast_message(event.as_json().into_bytes())
187 .await
188 }
189
190 pub async fn query_root(
191 &self,
192 owner_pubkey: &str,
193 tree_name: &str,
194 timeout: Duration,
195 ) -> Option<PeerRootEvent> {
196 let request_id = rand::random::<u64>();
197 let owner_bytes = owner_pubkey_bytes(owner_pubkey)?;
198 let request = encode_query_root(request_id, owner_bytes, tree_name)?;
199 let (tx, mut rx) = mpsc::unbounded_channel();
200 self.pending_queries.lock().await.insert(request_id, tx);
201
202 if self.bridge.broadcast_message(request).await.is_err() {
203 self.pending_queries.lock().await.remove(&request_id);
204 return None;
205 }
206
207 let mut roots = Vec::new();
208 let deadline = tokio::time::sleep(timeout);
209 tokio::pin!(deadline);
210
211 loop {
212 tokio::select! {
213 _ = &mut deadline => break,
214 maybe_msg = rx.recv() => {
215 let Some(msg) = maybe_msg else {
216 break;
217 };
218 match msg {
219 PendingQueryMessage::Root(root) => roots.push(root),
220 PendingQueryMessage::Done => break,
221 }
222 }
223 }
224 }
225
226 self.pending_queries.lock().await.remove(&request_id);
227 pick_latest_root_event(&roots)
228 }
229
230 async fn handle_message(
231 &self,
232 peer_id: &str,
233 payload: &[u8],
234 signaling_tx: &mpsc::Sender<(String, Event)>,
235 ) {
236 if let Some(frame) = decode_frame(payload) {
237 match frame {
238 WifiAwareFrame::QueryRoot {
239 request_id,
240 owner_pubkey_hex,
241 tree_name,
242 } => {
243 self.respond_to_root_query(request_id, &owner_pubkey_hex, &tree_name)
244 .await;
245 }
246 WifiAwareFrame::RootResponse { request_id, root } => {
247 let tx = self.pending_queries.lock().await.get(&request_id).cloned();
248 if let Some(tx) = tx {
249 let _ = tx.send(PendingQueryMessage::Root(root));
250 }
251 }
252 WifiAwareFrame::QueryDone { request_id } => {
253 let tx = self.pending_queries.lock().await.get(&request_id).cloned();
254 if let Some(tx) = tx {
255 let _ = tx.send(PendingQueryMessage::Done);
256 }
257 }
258 }
259 return;
260 }
261
262 let Ok(text) = std::str::from_utf8(payload) else {
263 debug!(
264 "ignoring non-utf8 wifi aware payload from {} ({} bytes)",
265 peer_id,
266 payload.len()
267 );
268 return;
269 };
270
271 if let Ok(event) = Event::from_json(text) {
272 if event.pubkey == self.keys.public_key() {
273 return;
274 }
275
276 if event.kind.is_ephemeral() {
277 let _ = signaling_tx
278 .send((self.source_name().to_string(), event))
279 .await;
280 return;
281 }
282
283 if event.kind == nostr_sdk::nostr::Kind::Custom(HASHTREE_KIND)
284 && is_hashtree_labeled_event(&event)
285 && event.verify().is_ok()
286 {
287 let _ = self.relay.ingest_trusted_event(event).await;
288 }
289 return;
290 }
291
292 debug!("ignoring wifi aware payload from {}: {}", peer_id, text);
293 }
294
295 async fn respond_to_root_query(&self, request_id: u64, owner_pubkey: &str, tree_name: &str) {
296 let Some(filter) = build_root_filter(owner_pubkey, tree_name) else {
297 let _ = self
298 .bridge
299 .broadcast_message(encode_query_done(request_id))
300 .await;
301 return;
302 };
303
304 for event in self.relay.query_events(&filter, 50).await {
305 let Some(root) = root_event_from_peer(&event, self.source_name(), tree_name) else {
306 continue;
307 };
308 let Some(encoded) = encode_root_response(request_id, &root) else {
309 warn!(
310 "Skipping wifi aware root response for {} due to unsupported root fields",
311 tree_name
312 );
313 continue;
314 };
315 if let Err(err) = self.bridge.broadcast_message(encoded).await {
316 warn!("wifi aware root response broadcast failed: {}", err);
317 }
318 }
319
320 if let Err(err) = self
321 .bridge
322 .broadcast_message(encode_query_done(request_id))
323 .await
324 {
325 warn!("wifi aware query-done broadcast failed: {}", err);
326 }
327 }
328
329 async fn broadcast_known_root_updates(&self) -> Result<()> {
330 let filter = Filter::new()
331 .kind(nostr_sdk::nostr::Kind::Custom(HASHTREE_KIND))
332 .author(self.keys.public_key())
333 .custom_tag(
334 nostr_sdk::nostr::SingleLetterTag::lowercase(nostr_sdk::nostr::Alphabet::L),
335 vec![HASHTREE_LABEL.to_string()],
336 )
337 .limit(256);
338 let events = self.relay.query_events(&filter, 256).await;
339 let mut announced = self.announced_event_ids.lock().await;
340 for event in events {
341 let event_id = event.id.to_hex();
342 if announced.insert(event_id) {
343 self.broadcast_event(&event).await?;
344 }
345 }
346 Ok(())
347 }
348}
349
350fn owner_pubkey_bytes(owner_pubkey: &str) -> Option<[u8; 32]> {
351 let pubkey = PublicKey::from_hex(owner_pubkey)
352 .or_else(|_| PublicKey::from_bech32(owner_pubkey))
353 .ok()?;
354 Some(pubkey.to_bytes())
355}
356
357fn hex_bytes_32(value: &str) -> Option<[u8; 32]> {
358 let decoded = hex::decode(value).ok()?;
359 decoded.try_into().ok()
360}
361
362fn push_u16(buf: &mut Vec<u8>, value: usize) -> Option<()> {
363 let value: u16 = value.try_into().ok()?;
364 buf.extend_from_slice(&value.to_be_bytes());
365 Some(())
366}
367
368fn read_u16(payload: &[u8], cursor: &mut usize) -> Option<usize> {
369 if payload.len() < *cursor + 2 {
370 return None;
371 }
372 let value = u16::from_be_bytes([payload[*cursor], payload[*cursor + 1]]) as usize;
373 *cursor += 2;
374 Some(value)
375}
376
377fn read_u64(payload: &[u8], cursor: &mut usize) -> Option<u64> {
378 if payload.len() < *cursor + 8 {
379 return None;
380 }
381 let bytes: [u8; 8] = payload[*cursor..*cursor + 8].try_into().ok()?;
382 *cursor += 8;
383 Some(u64::from_be_bytes(bytes))
384}
385
386fn read_exact<const N: usize>(payload: &[u8], cursor: &mut usize) -> Option<[u8; N]> {
387 if payload.len() < *cursor + N {
388 return None;
389 }
390 let bytes: [u8; N] = payload[*cursor..*cursor + N].try_into().ok()?;
391 *cursor += N;
392 Some(bytes)
393}
394
395fn read_tree_name(payload: &[u8], cursor: &mut usize) -> Option<String> {
396 let len = read_u16(payload, cursor)?;
397 if payload.len() < *cursor + len {
398 return None;
399 }
400 let value = std::str::from_utf8(&payload[*cursor..*cursor + len])
401 .ok()?
402 .to_string();
403 *cursor += len;
404 Some(value)
405}
406
407fn encode_query_root(request_id: u64, owner_pubkey: [u8; 32], tree_name: &str) -> Option<Vec<u8>> {
408 let tree_bytes = tree_name.as_bytes();
409 let mut payload = Vec::with_capacity(2 + 8 + 32 + 2 + tree_bytes.len());
410 payload.push(FRAME_VERSION);
411 payload.push(FRAME_KIND_QUERY_ROOT);
412 payload.extend_from_slice(&request_id.to_be_bytes());
413 payload.extend_from_slice(&owner_pubkey);
414 push_u16(&mut payload, tree_bytes.len())?;
415 payload.extend_from_slice(tree_bytes);
416 Some(payload)
417}
418
419fn encode_query_done(request_id: u64) -> Vec<u8> {
420 let mut payload = Vec::with_capacity(10);
421 payload.push(FRAME_VERSION);
422 payload.push(FRAME_KIND_QUERY_DONE);
423 payload.extend_from_slice(&request_id.to_be_bytes());
424 payload
425}
426
427fn encode_root_response(request_id: u64, root: &PeerRootEvent) -> Option<Vec<u8>> {
428 let event_id = hex_bytes_32(&root.event_id)?;
429 let hash = hex_bytes_32(&root.hash)?;
430 let key = match root.key.as_deref() {
431 Some(value) => Some(hex_bytes_32(value)?),
432 None => None,
433 };
434 let encrypted_key = match root.encrypted_key.as_deref() {
435 Some(value) => Some(hex_bytes_32(value)?),
436 None => None,
437 };
438 let self_encrypted_key = match root.self_encrypted_key.as_deref() {
439 Some(value) => Some(hex_bytes_32(value)?),
440 None => None,
441 };
442
443 let mut flags = 0u8;
444 if key.is_some() {
445 flags |= ROOT_FLAG_KEY;
446 }
447 if encrypted_key.is_some() {
448 flags |= ROOT_FLAG_ENCRYPTED_KEY;
449 }
450 if self_encrypted_key.is_some() {
451 flags |= ROOT_FLAG_SELF_ENCRYPTED_KEY;
452 }
453
454 let mut payload = Vec::with_capacity(2 + 8 + 8 + 32 + 32 + 1 + 96);
455 payload.push(FRAME_VERSION);
456 payload.push(FRAME_KIND_ROOT_RESPONSE);
457 payload.extend_from_slice(&request_id.to_be_bytes());
458 payload.extend_from_slice(&root.created_at.to_be_bytes());
459 payload.extend_from_slice(&event_id);
460 payload.extend_from_slice(&hash);
461 payload.push(flags);
462 if let Some(value) = key {
463 payload.extend_from_slice(&value);
464 }
465 if let Some(value) = encrypted_key {
466 payload.extend_from_slice(&value);
467 }
468 if let Some(value) = self_encrypted_key {
469 payload.extend_from_slice(&value);
470 }
471 Some(payload)
472}
473
474fn decode_frame(payload: &[u8]) -> Option<WifiAwareFrame> {
475 if payload.len() < 2 || payload[0] != FRAME_VERSION {
476 return None;
477 }
478
479 let mut cursor = 2;
480 match payload[1] {
481 FRAME_KIND_QUERY_ROOT => {
482 let request_id = read_u64(payload, &mut cursor)?;
483 let owner_pubkey = read_exact::<32>(payload, &mut cursor)?;
484 let tree_name = read_tree_name(payload, &mut cursor)?;
485 if cursor != payload.len() {
486 return None;
487 }
488 Some(WifiAwareFrame::QueryRoot {
489 request_id,
490 owner_pubkey_hex: hex::encode(owner_pubkey),
491 tree_name,
492 })
493 }
494 FRAME_KIND_ROOT_RESPONSE => {
495 let request_id = read_u64(payload, &mut cursor)?;
496 let created_at = read_u64(payload, &mut cursor)?;
497 let event_id = hex::encode(read_exact::<32>(payload, &mut cursor)?);
498 let hash = hex::encode(read_exact::<32>(payload, &mut cursor)?);
499 let flags = *payload.get(cursor)?;
500 cursor += 1;
501 let key = if flags & ROOT_FLAG_KEY != 0 {
502 Some(hex::encode(read_exact::<32>(payload, &mut cursor)?))
503 } else {
504 None
505 };
506 let encrypted_key = if flags & ROOT_FLAG_ENCRYPTED_KEY != 0 {
507 Some(hex::encode(read_exact::<32>(payload, &mut cursor)?))
508 } else {
509 None
510 };
511 let self_encrypted_key = if flags & ROOT_FLAG_SELF_ENCRYPTED_KEY != 0 {
512 Some(hex::encode(read_exact::<32>(payload, &mut cursor)?))
513 } else {
514 None
515 };
516 if cursor != payload.len() {
517 return None;
518 }
519 Some(WifiAwareFrame::RootResponse {
520 request_id,
521 root: PeerRootEvent {
522 hash,
523 key,
524 encrypted_key,
525 self_encrypted_key,
526 event_id,
527 created_at,
528 peer_id: WIFI_AWARE_SOURCE.to_string(),
529 },
530 })
531 }
532 FRAME_KIND_QUERY_DONE => {
533 let request_id = read_u64(payload, &mut cursor)?;
534 if cursor != payload.len() {
535 return None;
536 }
537 Some(WifiAwareFrame::QueryDone { request_id })
538 }
539 _ => None,
540 }
541}
542
543fn pick_latest_root_event(events: &[PeerRootEvent]) -> Option<PeerRootEvent> {
544 events
545 .iter()
546 .max_by(|a, b| {
547 let ordering = a.created_at.cmp(&b.created_at);
548 if ordering == std::cmp::Ordering::Equal {
549 a.event_id.cmp(&b.event_id)
550 } else {
551 ordering
552 }
553 })
554 .cloned()
555}
556
557#[cfg(test)]
558mod tests {
559 use super::*;
560 use crate::relay_bridge::MeshEventStore;
561 use nostr_sdk::nostr::{Alphabet, EventBuilder, Kind, SingleLetterTag, Tag, TagKind};
562 use tokio::sync::Mutex as AsyncMutex;
563
564 #[derive(Default)]
565 struct TestEventStore {
566 events: Mutex<Vec<Event>>,
567 }
568
569 #[async_trait]
570 impl MeshEventStore for TestEventStore {
571 async fn ingest_trusted_event(&self, event: Event) -> Result<()> {
572 self.events.lock().await.push(event);
573 Ok(())
574 }
575
576 async fn query_events(&self, filter: &Filter, limit: usize) -> Vec<Event> {
577 self.events
578 .lock()
579 .await
580 .iter()
581 .filter(|event| filter.match_event(event))
582 .take(limit)
583 .cloned()
584 .collect()
585 }
586 }
587
588 struct MockWifiAwareBridge {
589 sent_payloads: AsyncMutex<Vec<Vec<u8>>>,
590 response_events: AsyncMutex<Vec<Event>>,
591 event_tx: AsyncMutex<Option<mpsc::Sender<WifiAwareEvent>>>,
592 }
593
594 impl MockWifiAwareBridge {
595 fn new() -> Arc<Self> {
596 Arc::new(Self {
597 sent_payloads: AsyncMutex::new(Vec::new()),
598 response_events: AsyncMutex::new(Vec::new()),
599 event_tx: AsyncMutex::new(None),
600 })
601 }
602
603 async fn queue_response_event(&self, event: Event) {
604 self.response_events.lock().await.push(event);
605 }
606
607 async fn sent_payloads(&self) -> Vec<Vec<u8>> {
608 self.sent_payloads.lock().await.clone()
609 }
610
611 async fn wait_until_started(&self) {
612 for _ in 0..100 {
613 if self.event_tx.lock().await.is_some() {
614 return;
615 }
616 tokio::time::sleep(Duration::from_millis(10)).await;
617 }
618 panic!("mock wifi aware bridge did not start in time");
619 }
620 }
621
622 #[async_trait]
623 impl MobileWifiAwareBridge for MockWifiAwareBridge {
624 async fn start(&self, _local_peer_id: String) -> Result<mpsc::Receiver<WifiAwareEvent>> {
625 let (tx, rx) = mpsc::channel(32);
626 *self.event_tx.lock().await = Some(tx);
627 Ok(rx)
628 }
629
630 async fn broadcast_message(&self, payload: Vec<u8>) -> Result<()> {
631 self.sent_payloads.lock().await.push(payload.clone());
632 let Some(tx) = self.event_tx.lock().await.clone() else {
633 return Ok(());
634 };
635
636 if let Some(WifiAwareFrame::QueryRoot {
637 request_id,
638 owner_pubkey_hex,
639 tree_name,
640 }) = decode_frame(&payload)
641 {
642 let response_events = self.response_events.lock().await.clone();
643 for event in response_events
644 .iter()
645 .filter(|event| event.pubkey.to_hex() == owner_pubkey_hex)
646 {
647 let Some(root) = root_event_from_peer(event, WIFI_AWARE_SOURCE, &tree_name)
648 else {
649 continue;
650 };
651 let encoded = encode_root_response(request_id, &root)
652 .expect("expected compact root response encoding");
653 tx.send(WifiAwareEvent::Message {
654 peer_id: "peer-b".to_string(),
655 payload: encoded,
656 })
657 .await
658 .map_err(|err| anyhow!("mock wifi aware event send failed: {}", err))?;
659 }
660 tx.send(WifiAwareEvent::Message {
661 peer_id: "peer-b".to_string(),
662 payload: encode_query_done(request_id),
663 })
664 .await
665 .map_err(|err| anyhow!("mock wifi aware query-done send failed: {}", err))?;
666 }
667 Ok(())
668 }
669 }
670
671 #[test]
672 fn compact_query_root_frame_round_trips() {
673 let owner =
674 owner_pubkey_bytes("79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798")
675 .expect("owner pubkey");
676 let frame = encode_query_root(42, owner, "video").expect("query frame");
677
678 match decode_frame(&frame).expect("decoded frame") {
679 WifiAwareFrame::QueryRoot {
680 request_id,
681 owner_pubkey_hex,
682 tree_name,
683 } => {
684 assert_eq!(request_id, 42);
685 assert_eq!(owner_pubkey_hex, hex::encode(owner));
686 assert_eq!(tree_name, "video");
687 }
688 _ => panic!("expected query-root frame"),
689 }
690 }
691
692 #[tokio::test]
693 async fn wifi_aware_bus_broadcast_event_forwards_json_bytes() -> Result<()> {
694 let bridge = MockWifiAwareBridge::new();
695 let bus_keys = Keys::generate();
696 let relay = Arc::new(TestEventStore::default()) as SharedMeshEventStore;
697 let bus = WifiAwareNostrBus::new(
698 WifiAwareConfig::default(),
699 bus_keys.clone(),
700 relay,
701 bridge.clone(),
702 );
703 let event =
704 EventBuilder::new(Kind::TextNote, "hello wifi aware", []).to_event(&bus_keys)?;
705
706 bus.broadcast_event(&event).await?;
707
708 let sent = bridge.sent_payloads().await;
709 assert_eq!(sent, vec![event.as_json().into_bytes()]);
710 Ok(())
711 }
712
713 #[tokio::test]
714 async fn wifi_aware_bus_query_root_returns_matching_event_and_sends_compact_query() -> Result<()>
715 {
716 let bridge = MockWifiAwareBridge::new();
717 let bus_keys = Keys::generate();
718 let author_keys = Keys::generate();
719 let relay = Arc::new(TestEventStore::default()) as SharedMeshEventStore;
720 let bus = WifiAwareNostrBus::new(
721 WifiAwareConfig {
722 enabled: true,
723 max_peers: 2,
724 announce_interval_ms: 60_000,
725 },
726 bus_keys,
727 relay,
728 bridge.clone(),
729 );
730 let root_event = EventBuilder::new(
731 Kind::Custom(HASHTREE_KIND),
732 "",
733 [
734 Tag::identifier("video".to_string()),
735 Tag::custom(
736 TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
737 vec!["hashtree".to_string()],
738 ),
739 Tag::custom(TagKind::Custom("hash".into()), vec!["ab".repeat(32)]),
740 ],
741 )
742 .to_event(&author_keys)?;
743 bridge.queue_response_event(root_event).await;
744
745 let (_shutdown_tx, shutdown_rx) = watch::channel(false);
746 let bus_task = {
747 let bus = bus.clone();
748 tokio::spawn(async move {
749 let (signaling_tx, _signaling_rx) = mpsc::channel(8);
750 bus.run("local-peer".to_string(), shutdown_rx, signaling_tx)
751 .await
752 })
753 };
754 bridge.wait_until_started().await;
755
756 let resolved = bus
757 .query_root(
758 &author_keys.public_key().to_hex(),
759 "video",
760 Duration::from_secs(1),
761 )
762 .await
763 .expect("root event");
764 assert_eq!(resolved.hash, "ab".repeat(32));
765 assert_eq!(resolved.peer_id, WIFI_AWARE_SOURCE);
766
767 let payloads = bridge.sent_payloads().await;
768 assert!(decode_frame(&payloads[0]).is_some());
769
770 bus_task.abort();
771 Ok(())
772 }
773}