1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use nostr::{nips::nip19::FromBech32, Event, Filter, JsonUtil, Keys, PublicKey};
4use std::collections::{HashMap, HashSet};
5use std::sync::{Arc, OnceLock};
6use std::time::Duration;
7use tokio::sync::{mpsc, watch, Mutex};
8use tracing::{debug, warn};
9
10use super::root_events::{
11 build_root_filter, is_hashtree_labeled_event, root_event_from_peer, PeerRootEvent,
12 HASHTREE_KIND,
13};
14use super::LocalNostrBus;
15use crate::nostr_relay::NostrRelay;
16
17pub const WIFI_AWARE_SOURCE: &str = "wifi-aware";
18const FRAME_VERSION: u8 = 1;
19const FRAME_KIND_QUERY_ROOT: u8 = 1;
20const FRAME_KIND_ROOT_RESPONSE: u8 = 2;
21const FRAME_KIND_QUERY_DONE: u8 = 3;
22const ROOT_FLAG_KEY: u8 = 1 << 0;
23const ROOT_FLAG_ENCRYPTED_KEY: u8 = 1 << 1;
24const ROOT_FLAG_SELF_ENCRYPTED_KEY: u8 = 1 << 2;
25
26#[derive(Debug, Clone)]
27pub struct WifiAwareConfig {
28 pub enabled: bool,
29 pub max_peers: usize,
30 pub announce_interval_ms: u64,
31}
32
33impl WifiAwareConfig {
34 pub fn is_enabled(&self) -> bool {
35 self.enabled && self.max_peers > 0
36 }
37}
38
39impl Default for WifiAwareConfig {
40 fn default() -> Self {
41 Self {
42 enabled: false,
43 max_peers: 0,
44 announce_interval_ms: 2_000,
45 }
46 }
47}
48
49#[derive(Debug, Clone)]
50pub enum WifiAwareEvent {
51 PeerDiscovered { peer_id: String },
52 PeerLost { peer_id: String },
53 Message { peer_id: String, payload: Vec<u8> },
54}
55
56#[async_trait]
57pub trait MobileWifiAwareBridge: Send + Sync {
58 async fn start(&self, local_peer_id: String) -> Result<mpsc::Receiver<WifiAwareEvent>>;
59
60 async fn broadcast_message(&self, payload: Vec<u8>) -> Result<()>;
61}
62
63static MOBILE_WIFI_AWARE_BRIDGE: OnceLock<Arc<dyn MobileWifiAwareBridge>> = OnceLock::new();
64
65pub fn install_mobile_wifi_aware_bridge(bridge: Arc<dyn MobileWifiAwareBridge>) -> Result<()> {
66 MOBILE_WIFI_AWARE_BRIDGE
67 .set(bridge)
68 .map_err(|_| anyhow!("mobile wifi aware bridge already installed"))
69}
70
71pub(crate) fn mobile_wifi_aware_bridge() -> Option<Arc<dyn MobileWifiAwareBridge>> {
72 MOBILE_WIFI_AWARE_BRIDGE.get().cloned()
73}
74
75pub struct WifiAwareNostrBus {
76 config: WifiAwareConfig,
77 keys: Keys,
78 relay: Arc<NostrRelay>,
79 bridge: Arc<dyn MobileWifiAwareBridge>,
80 pending_queries: Arc<Mutex<HashMap<u64, mpsc::UnboundedSender<PendingQueryMessage>>>>,
81 announced_event_ids: Arc<Mutex<HashSet<String>>>,
82}
83
84enum PendingQueryMessage {
85 Root(PeerRootEvent),
86 Done,
87}
88
89enum WifiAwareFrame {
90 QueryRoot {
91 request_id: u64,
92 owner_pubkey_hex: String,
93 tree_name: String,
94 },
95 RootResponse {
96 request_id: u64,
97 root: PeerRootEvent,
98 },
99 QueryDone {
100 request_id: u64,
101 },
102}
103
104#[async_trait]
105impl LocalNostrBus for WifiAwareNostrBus {
106 fn source_name(&self) -> &'static str {
107 WIFI_AWARE_SOURCE
108 }
109
110 async fn broadcast_event(&self, event: &Event) -> Result<()> {
111 WifiAwareNostrBus::broadcast_event(self, event).await
112 }
113
114 async fn query_root(
115 &self,
116 owner_pubkey: &str,
117 tree_name: &str,
118 timeout: Duration,
119 ) -> Option<PeerRootEvent> {
120 WifiAwareNostrBus::query_root(self, owner_pubkey, tree_name, timeout).await
121 }
122}
123
124impl WifiAwareNostrBus {
125 pub fn new(
126 config: WifiAwareConfig,
127 keys: Keys,
128 relay: Arc<NostrRelay>,
129 bridge: Arc<dyn MobileWifiAwareBridge>,
130 ) -> Arc<Self> {
131 Arc::new(Self {
132 config,
133 keys,
134 relay,
135 bridge,
136 pending_queries: Arc::new(Mutex::new(HashMap::new())),
137 announced_event_ids: Arc::new(Mutex::new(HashSet::new())),
138 })
139 }
140
141 pub async fn run(
142 self: Arc<Self>,
143 local_peer_id: String,
144 mut shutdown_rx: watch::Receiver<bool>,
145 signaling_tx: mpsc::Sender<(String, Event)>,
146 ) -> Result<()> {
147 let mut announce_ticker = tokio::time::interval(Duration::from_millis(
148 self.config.announce_interval_ms.max(1),
149 ));
150 let mut events = self.bridge.start(local_peer_id).await?;
151
152 loop {
153 tokio::select! {
154 _ = shutdown_rx.changed() => {
155 if *shutdown_rx.borrow() {
156 break;
157 }
158 }
159 _ = announce_ticker.tick() => {
160 if let Err(err) = self.broadcast_known_root_updates().await {
161 debug!("wifi aware root announcement failed: {}", err);
162 }
163 }
164 maybe_event = events.recv() => {
165 match maybe_event {
166 Some(WifiAwareEvent::Message { peer_id, payload }) => {
167 self.handle_message(&peer_id, &payload, &signaling_tx).await;
168 }
169 Some(WifiAwareEvent::PeerDiscovered { peer_id }) => {
170 debug!("wifi aware peer discovered: {}", peer_id);
171 }
172 Some(WifiAwareEvent::PeerLost { peer_id }) => {
173 debug!("wifi aware peer lost: {}", peer_id);
174 }
175 None => break,
176 }
177 }
178 }
179 }
180
181 Ok(())
182 }
183
184 pub async fn broadcast_event(&self, event: &Event) -> Result<()> {
185 self.bridge
188 .broadcast_message(event.as_json().into_bytes())
189 .await
190 }
191
192 pub async fn query_root(
193 &self,
194 owner_pubkey: &str,
195 tree_name: &str,
196 timeout: Duration,
197 ) -> Option<PeerRootEvent> {
198 let request_id = rand::random::<u64>();
199 let owner_bytes = owner_pubkey_bytes(owner_pubkey)?;
200 let request = encode_query_root(request_id, owner_bytes, tree_name)?;
201 let (tx, mut rx) = mpsc::unbounded_channel();
202 self.pending_queries.lock().await.insert(request_id, tx);
203
204 if self.bridge.broadcast_message(request).await.is_err() {
205 self.pending_queries.lock().await.remove(&request_id);
206 return None;
207 }
208
209 let mut roots = Vec::new();
210 let deadline = tokio::time::sleep(timeout);
211 tokio::pin!(deadline);
212
213 loop {
214 tokio::select! {
215 _ = &mut deadline => break,
216 maybe_msg = rx.recv() => {
217 let Some(msg) = maybe_msg else {
218 break;
219 };
220 match msg {
221 PendingQueryMessage::Root(root) => roots.push(root),
222 PendingQueryMessage::Done => break,
223 }
224 }
225 }
226 }
227
228 self.pending_queries.lock().await.remove(&request_id);
229 pick_latest_root_event(&roots)
230 }
231
232 async fn handle_message(
233 &self,
234 peer_id: &str,
235 payload: &[u8],
236 signaling_tx: &mpsc::Sender<(String, Event)>,
237 ) {
238 if let Some(frame) = decode_frame(payload) {
239 match frame {
240 WifiAwareFrame::QueryRoot {
241 request_id,
242 owner_pubkey_hex,
243 tree_name,
244 } => {
245 self.respond_to_root_query(request_id, &owner_pubkey_hex, &tree_name)
246 .await;
247 }
248 WifiAwareFrame::RootResponse { request_id, root } => {
249 let tx = self.pending_queries.lock().await.get(&request_id).cloned();
250 if let Some(tx) = tx {
251 let _ = tx.send(PendingQueryMessage::Root(root));
252 }
253 }
254 WifiAwareFrame::QueryDone { request_id } => {
255 let tx = self.pending_queries.lock().await.get(&request_id).cloned();
256 if let Some(tx) = tx {
257 let _ = tx.send(PendingQueryMessage::Done);
258 }
259 }
260 }
261 return;
262 }
263
264 let Ok(text) = std::str::from_utf8(payload) else {
265 debug!(
266 "ignoring non-utf8 wifi aware payload from {} ({} bytes)",
267 peer_id,
268 payload.len()
269 );
270 return;
271 };
272
273 if let Ok(event) = Event::from_json(text) {
274 if event.pubkey == self.keys.public_key() {
275 return;
276 }
277
278 if event.kind.is_ephemeral() {
279 let _ = signaling_tx
280 .send((self.source_name().to_string(), event))
281 .await;
282 return;
283 }
284
285 if event.kind == nostr::Kind::Custom(HASHTREE_KIND)
286 && is_hashtree_labeled_event(&event)
287 && event.verify().is_ok()
288 {
289 let _ = self.relay.ingest_trusted_event(event).await;
290 }
291 return;
292 }
293
294 debug!("ignoring wifi aware payload from {}: {}", peer_id, text);
295 }
296
297 async fn respond_to_root_query(&self, request_id: u64, owner_pubkey: &str, tree_name: &str) {
298 let Some(filter) = build_root_filter(owner_pubkey, tree_name) else {
299 let _ = self
300 .bridge
301 .broadcast_message(encode_query_done(request_id))
302 .await;
303 return;
304 };
305
306 for event in self.relay.query_events(&filter, 50).await {
307 let Some(root) = root_event_from_peer(&event, self.source_name(), tree_name) else {
308 continue;
309 };
310 let Some(encoded) = encode_root_response(request_id, &root) else {
311 warn!(
312 "Skipping wifi aware root response for {} due to unsupported root fields",
313 tree_name
314 );
315 continue;
316 };
317 if let Err(err) = self.bridge.broadcast_message(encoded).await {
318 warn!("wifi aware root response broadcast failed: {}", err);
319 }
320 }
321
322 if let Err(err) = self
323 .bridge
324 .broadcast_message(encode_query_done(request_id))
325 .await
326 {
327 warn!("wifi aware query-done broadcast failed: {}", err);
328 }
329 }
330
331 async fn broadcast_known_root_updates(&self) -> Result<()> {
332 let filter = Filter::new()
333 .kind(nostr::Kind::Custom(HASHTREE_KIND))
334 .author(self.keys.public_key())
335 .custom_tag(
336 nostr::SingleLetterTag::lowercase(nostr::Alphabet::L),
337 vec![super::root_events::HASHTREE_LABEL.to_string()],
338 )
339 .limit(256);
340 let events = self.relay.query_events(&filter, 256).await;
341 let mut announced = self.announced_event_ids.lock().await;
342 for event in events {
343 let event_id = event.id.to_hex();
344 if announced.insert(event_id) {
345 self.broadcast_event(&event).await?;
346 }
347 }
348 Ok(())
349 }
350}
351
352fn owner_pubkey_bytes(owner_pubkey: &str) -> Option<[u8; 32]> {
353 let pubkey = PublicKey::from_hex(owner_pubkey)
354 .or_else(|_| PublicKey::from_bech32(owner_pubkey))
355 .ok()?;
356 Some(pubkey.to_bytes())
357}
358
359fn hex_bytes_32(value: &str) -> Option<[u8; 32]> {
360 let decoded = hex::decode(value).ok()?;
361 decoded.try_into().ok()
362}
363
364fn push_u16(buf: &mut Vec<u8>, value: usize) -> Option<()> {
365 let value: u16 = value.try_into().ok()?;
366 buf.extend_from_slice(&value.to_be_bytes());
367 Some(())
368}
369
370fn read_u16(payload: &[u8], cursor: &mut usize) -> Option<usize> {
371 if payload.len() < *cursor + 2 {
372 return None;
373 }
374 let value = u16::from_be_bytes([payload[*cursor], payload[*cursor + 1]]) as usize;
375 *cursor += 2;
376 Some(value)
377}
378
379fn read_u64(payload: &[u8], cursor: &mut usize) -> Option<u64> {
380 if payload.len() < *cursor + 8 {
381 return None;
382 }
383 let bytes: [u8; 8] = payload[*cursor..*cursor + 8].try_into().ok()?;
384 *cursor += 8;
385 Some(u64::from_be_bytes(bytes))
386}
387
388fn read_exact<const N: usize>(payload: &[u8], cursor: &mut usize) -> Option<[u8; N]> {
389 if payload.len() < *cursor + N {
390 return None;
391 }
392 let bytes: [u8; N] = payload[*cursor..*cursor + N].try_into().ok()?;
393 *cursor += N;
394 Some(bytes)
395}
396
397fn read_tree_name(payload: &[u8], cursor: &mut usize) -> Option<String> {
398 let len = read_u16(payload, cursor)?;
399 if payload.len() < *cursor + len {
400 return None;
401 }
402 let value = std::str::from_utf8(&payload[*cursor..*cursor + len])
403 .ok()?
404 .to_string();
405 *cursor += len;
406 Some(value)
407}
408
409fn encode_query_root(request_id: u64, owner_pubkey: [u8; 32], tree_name: &str) -> Option<Vec<u8>> {
410 let tree_bytes = tree_name.as_bytes();
411 let mut payload = Vec::with_capacity(2 + 8 + 32 + 2 + tree_bytes.len());
412 payload.push(FRAME_VERSION);
413 payload.push(FRAME_KIND_QUERY_ROOT);
414 payload.extend_from_slice(&request_id.to_be_bytes());
415 payload.extend_from_slice(&owner_pubkey);
416 push_u16(&mut payload, tree_bytes.len())?;
417 payload.extend_from_slice(tree_bytes);
418 Some(payload)
419}
420
421fn encode_query_done(request_id: u64) -> Vec<u8> {
422 let mut payload = Vec::with_capacity(10);
423 payload.push(FRAME_VERSION);
424 payload.push(FRAME_KIND_QUERY_DONE);
425 payload.extend_from_slice(&request_id.to_be_bytes());
426 payload
427}
428
429fn encode_root_response(request_id: u64, root: &PeerRootEvent) -> Option<Vec<u8>> {
430 let event_id = hex_bytes_32(&root.event_id)?;
431 let hash = hex_bytes_32(&root.hash)?;
432 let key = match root.key.as_deref() {
433 Some(value) => Some(hex_bytes_32(value)?),
434 None => None,
435 };
436 let encrypted_key = match root.encrypted_key.as_deref() {
437 Some(value) => Some(hex_bytes_32(value)?),
438 None => None,
439 };
440 let self_encrypted_key = match root.self_encrypted_key.as_deref() {
441 Some(value) => Some(hex_bytes_32(value)?),
442 None => None,
443 };
444
445 let mut flags = 0u8;
446 if key.is_some() {
447 flags |= ROOT_FLAG_KEY;
448 }
449 if encrypted_key.is_some() {
450 flags |= ROOT_FLAG_ENCRYPTED_KEY;
451 }
452 if self_encrypted_key.is_some() {
453 flags |= ROOT_FLAG_SELF_ENCRYPTED_KEY;
454 }
455
456 let mut payload = Vec::with_capacity(2 + 8 + 8 + 32 + 32 + 1 + 96);
457 payload.push(FRAME_VERSION);
458 payload.push(FRAME_KIND_ROOT_RESPONSE);
459 payload.extend_from_slice(&request_id.to_be_bytes());
460 payload.extend_from_slice(&root.created_at.to_be_bytes());
461 payload.extend_from_slice(&event_id);
462 payload.extend_from_slice(&hash);
463 payload.push(flags);
464 if let Some(value) = key {
465 payload.extend_from_slice(&value);
466 }
467 if let Some(value) = encrypted_key {
468 payload.extend_from_slice(&value);
469 }
470 if let Some(value) = self_encrypted_key {
471 payload.extend_from_slice(&value);
472 }
473 Some(payload)
474}
475
476fn decode_frame(payload: &[u8]) -> Option<WifiAwareFrame> {
477 if payload.len() < 2 || payload[0] != FRAME_VERSION {
478 return None;
479 }
480
481 let mut cursor = 2;
482 match payload[1] {
483 FRAME_KIND_QUERY_ROOT => {
484 let request_id = read_u64(payload, &mut cursor)?;
485 let owner_pubkey = read_exact::<32>(payload, &mut cursor)?;
486 let tree_name = read_tree_name(payload, &mut cursor)?;
487 if cursor != payload.len() {
488 return None;
489 }
490 Some(WifiAwareFrame::QueryRoot {
491 request_id,
492 owner_pubkey_hex: hex::encode(owner_pubkey),
493 tree_name,
494 })
495 }
496 FRAME_KIND_ROOT_RESPONSE => {
497 let request_id = read_u64(payload, &mut cursor)?;
498 let created_at = read_u64(payload, &mut cursor)?;
499 let event_id = hex::encode(read_exact::<32>(payload, &mut cursor)?);
500 let hash = hex::encode(read_exact::<32>(payload, &mut cursor)?);
501 let flags = *payload.get(cursor)?;
502 cursor += 1;
503 let key = if flags & ROOT_FLAG_KEY != 0 {
504 Some(hex::encode(read_exact::<32>(payload, &mut cursor)?))
505 } else {
506 None
507 };
508 let encrypted_key = if flags & ROOT_FLAG_ENCRYPTED_KEY != 0 {
509 Some(hex::encode(read_exact::<32>(payload, &mut cursor)?))
510 } else {
511 None
512 };
513 let self_encrypted_key = if flags & ROOT_FLAG_SELF_ENCRYPTED_KEY != 0 {
514 Some(hex::encode(read_exact::<32>(payload, &mut cursor)?))
515 } else {
516 None
517 };
518 if cursor != payload.len() {
519 return None;
520 }
521 Some(WifiAwareFrame::RootResponse {
522 request_id,
523 root: PeerRootEvent {
524 hash,
525 key,
526 encrypted_key,
527 self_encrypted_key,
528 event_id,
529 created_at,
530 peer_id: WIFI_AWARE_SOURCE.to_string(),
531 },
532 })
533 }
534 FRAME_KIND_QUERY_DONE => {
535 let request_id = read_u64(payload, &mut cursor)?;
536 if cursor != payload.len() {
537 return None;
538 }
539 Some(WifiAwareFrame::QueryDone { request_id })
540 }
541 _ => None,
542 }
543}
544
545fn pick_latest_root_event(events: &[PeerRootEvent]) -> Option<PeerRootEvent> {
546 events
547 .iter()
548 .max_by(|a, b| {
549 let ordering = a.created_at.cmp(&b.created_at);
550 if ordering == std::cmp::Ordering::Equal {
551 a.event_id.cmp(&b.event_id)
552 } else {
553 ordering
554 }
555 })
556 .cloned()
557}
558
559#[cfg(test)]
560mod tests {
561 use super::*;
562 use crate::nostr_relay::NostrRelayConfig;
563 use crate::socialgraph::{self, SocialGraphAccessControl, SocialGraphBackend};
564 use nostr::{Alphabet, EventBuilder, Kind, SingleLetterTag, Tag, TagKind};
565 use std::sync::Arc;
566 use tempfile::TempDir;
567 use tokio::sync::Mutex as AsyncMutex;
568
569 struct MockWifiAwareBridge {
570 sent_payloads: AsyncMutex<Vec<Vec<u8>>>,
571 response_events: AsyncMutex<Vec<Event>>,
572 event_tx: AsyncMutex<Option<mpsc::Sender<WifiAwareEvent>>>,
573 }
574
575 impl MockWifiAwareBridge {
576 fn new() -> Arc<Self> {
577 Arc::new(Self {
578 sent_payloads: AsyncMutex::new(Vec::new()),
579 response_events: AsyncMutex::new(Vec::new()),
580 event_tx: AsyncMutex::new(None),
581 })
582 }
583
584 async fn queue_response_event(&self, event: Event) {
585 self.response_events.lock().await.push(event);
586 }
587
588 async fn sent_payloads(&self) -> Vec<Vec<u8>> {
589 self.sent_payloads.lock().await.clone()
590 }
591
592 async fn wait_until_started(&self) {
593 for _ in 0..100 {
594 if self.event_tx.lock().await.is_some() {
595 return;
596 }
597 tokio::time::sleep(Duration::from_millis(10)).await;
598 }
599 panic!("mock wifi aware bridge did not start in time");
600 }
601 }
602
603 #[async_trait]
604 impl MobileWifiAwareBridge for MockWifiAwareBridge {
605 async fn start(&self, _local_peer_id: String) -> Result<mpsc::Receiver<WifiAwareEvent>> {
606 let (tx, rx) = mpsc::channel(32);
607 *self.event_tx.lock().await = Some(tx);
608 Ok(rx)
609 }
610
611 async fn broadcast_message(&self, payload: Vec<u8>) -> Result<()> {
612 self.sent_payloads.lock().await.push(payload.clone());
613 let Some(tx) = self.event_tx.lock().await.clone() else {
614 return Ok(());
615 };
616
617 if let Some(WifiAwareFrame::QueryRoot {
618 request_id,
619 owner_pubkey_hex,
620 tree_name,
621 }) = decode_frame(&payload)
622 {
623 let response_events = self.response_events.lock().await.clone();
624 for event in response_events
625 .iter()
626 .filter(|event| event.pubkey.to_hex() == owner_pubkey_hex)
627 {
628 let Some(root) = root_event_from_peer(event, WIFI_AWARE_SOURCE, &tree_name)
629 else {
630 continue;
631 };
632 let encoded = encode_root_response(request_id, &root)
633 .expect("expected compact root response encoding");
634 tx.send(WifiAwareEvent::Message {
635 peer_id: "peer-b".to_string(),
636 payload: encoded,
637 })
638 .await
639 .map_err(|err| anyhow!("mock wifi aware event send failed: {}", err))?;
640 }
641 tx.send(WifiAwareEvent::Message {
642 peer_id: "peer-b".to_string(),
643 payload: encode_query_done(request_id),
644 })
645 .await
646 .map_err(|err| anyhow!("mock wifi aware query-done send failed: {}", err))?;
647 }
648 Ok(())
649 }
650 }
651
652 fn test_relay(keys: &Keys, tmp: &TempDir) -> Result<Arc<NostrRelay>> {
653 let _guard = socialgraph::test_lock();
654 let graph_store =
655 socialgraph::open_social_graph_store_with_mapsize(tmp.path(), Some(128 * 1024 * 1024))?;
656 let backend: Arc<dyn SocialGraphBackend> = graph_store.clone();
657 let access = Arc::new(SocialGraphAccessControl::new(
658 Arc::clone(&backend),
659 0,
660 HashSet::from([keys.public_key().to_hex()]),
661 ));
662 Ok(Arc::new(NostrRelay::new(
663 Arc::clone(&backend),
664 tmp.path().to_path_buf(),
665 HashSet::from([keys.public_key().to_hex()]),
666 Some(access),
667 NostrRelayConfig {
668 spambox_db_max_bytes: 0,
669 ..Default::default()
670 },
671 )?))
672 }
673
674 #[test]
675 fn compact_query_root_frame_round_trips() {
676 let owner =
677 owner_pubkey_bytes("79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798")
678 .expect("owner pubkey");
679 let frame = encode_query_root(42, owner, "video").expect("query frame");
680
681 match decode_frame(&frame).expect("decoded frame") {
682 WifiAwareFrame::QueryRoot {
683 request_id,
684 owner_pubkey_hex,
685 tree_name,
686 } => {
687 assert_eq!(request_id, 42);
688 assert_eq!(owner_pubkey_hex, hex::encode(owner));
689 assert_eq!(tree_name, "video");
690 }
691 _ => panic!("expected query-root frame"),
692 }
693 }
694
695 #[tokio::test]
696 async fn wifi_aware_bus_broadcast_event_forwards_json_bytes() -> Result<()> {
697 let bridge = MockWifiAwareBridge::new();
698 let bus_keys = Keys::generate();
699 let tmp = TempDir::new()?;
700 let relay = test_relay(&bus_keys, &tmp)?;
701 let bus = WifiAwareNostrBus::new(
702 WifiAwareConfig::default(),
703 bus_keys.clone(),
704 relay,
705 bridge.clone(),
706 );
707 let event =
708 EventBuilder::new(Kind::TextNote, "hello wifi aware", []).to_event(&bus_keys)?;
709
710 bus.broadcast_event(&event).await?;
711
712 let sent = bridge.sent_payloads().await;
713 assert_eq!(sent, vec![event.as_json().into_bytes()]);
714 Ok(())
715 }
716
717 #[tokio::test]
718 async fn wifi_aware_bus_query_root_returns_matching_event_and_sends_compact_query() -> Result<()>
719 {
720 let bridge = MockWifiAwareBridge::new();
721 let bus_keys = Keys::generate();
722 let author_keys = Keys::generate();
723 let tmp = TempDir::new()?;
724 let relay = test_relay(&bus_keys, &tmp)?;
725 let bus = WifiAwareNostrBus::new(
726 WifiAwareConfig {
727 enabled: true,
728 max_peers: 2,
729 announce_interval_ms: 60_000,
730 },
731 bus_keys,
732 relay,
733 bridge.clone(),
734 );
735 let root_event = EventBuilder::new(
736 Kind::Custom(HASHTREE_KIND),
737 "",
738 [
739 Tag::identifier("video".to_string()),
740 Tag::custom(
741 TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
742 vec!["hashtree".to_string()],
743 ),
744 Tag::custom(TagKind::Custom("hash".into()), vec!["ab".repeat(32)]),
745 ],
746 )
747 .to_event(&author_keys)?;
748 bridge.queue_response_event(root_event).await;
749
750 let (shutdown_tx, shutdown_rx) = watch::channel(false);
751 let bus_task = {
752 let bus = bus.clone();
753 tokio::spawn(async move {
754 let (signaling_tx, _signaling_rx) = mpsc::channel(8);
755 bus.run("local-peer".to_string(), shutdown_rx, signaling_tx)
756 .await
757 })
758 };
759 bridge.wait_until_started().await;
760
761 let resolved = bus
762 .query_root(
763 &author_keys.public_key().to_hex(),
764 "video",
765 Duration::from_secs(1),
766 )
767 .await
768 .expect("expected wifi aware root");
769
770 assert_eq!(resolved.hash, "ab".repeat(32));
771 assert_eq!(resolved.peer_id, WIFI_AWARE_SOURCE);
772
773 let sent = bridge.sent_payloads().await;
774 let query_frame = sent.first().expect("expected compact query");
775 match decode_frame(query_frame).expect("expected decoded query") {
776 WifiAwareFrame::QueryRoot {
777 owner_pubkey_hex,
778 tree_name,
779 ..
780 } => {
781 assert_eq!(owner_pubkey_hex, author_keys.public_key().to_hex());
782 assert_eq!(tree_name, "video");
783 }
784 _ => panic!("expected first payload to be a query-root frame"),
785 }
786
787 shutdown_tx.send(true)?;
788 bus_task.await??;
789 Ok(())
790 }
791}