1use anyhow::Result;
2use async_trait::async_trait;
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::sync::{mpsc, oneshot, Mutex};
7use tracing::debug;
8
9use crate::nostr_relay::NostrRelay;
10
11use super::peer::ContentStore;
12use super::signaling::WebRTCState;
13use super::types::{
14 encode_request, encode_response, hash_to_hex, parse_message, DataMessage, DataRequest,
15 DataResponse, MeshNostrFrame, PeerDirection, PeerHTLConfig, PeerId, TimedSeenSet,
16 BLOB_REQUEST_POLICY,
17};
18use nostr::{
19 ClientMessage as NostrClientMessage, Filter as NostrFilter, JsonUtil as NostrJsonUtil,
20 RelayMessage as NostrRelayMessage, SubscriptionId as NostrSubscriptionId, Timestamp,
21};
22
23const BLUETOOTH_SEEN_EVENT_CAP: usize = 2048;
24const BLUETOOTH_SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
25
26#[derive(Debug, Clone)]
27pub enum BluetoothFrame {
28 Text(String),
29 Binary(Vec<u8>),
30}
31
32#[async_trait]
33pub trait BluetoothLink: Send + Sync {
34 async fn send(&self, frame: BluetoothFrame) -> Result<()>;
35 async fn recv(&self) -> Option<BluetoothFrame>;
36 fn is_open(&self) -> bool;
37 async fn close(&self) -> Result<()>;
38}
39
40pub struct BluetoothPeer {
41 pub peer_id: PeerId,
42 pub direction: PeerDirection,
43 pub created_at: std::time::Instant,
44 pub connected_at: Option<std::time::Instant>,
45 link: Arc<dyn BluetoothLink>,
46 store: Option<Arc<dyn ContentStore>>,
47 pending_requests: Arc<Mutex<HashMap<String, oneshot::Sender<Option<Vec<u8>>>>>>,
48 pending_nostr_queries: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>>,
49 nostr_relay: Option<Arc<NostrRelay>>,
50 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
51 traffic_state: Option<Arc<WebRTCState>>,
52 seen_event_ids: Arc<Mutex<TimedSeenSet>>,
53 htl_config: PeerHTLConfig,
54}
55
56impl BluetoothPeer {
57 #[allow(clippy::too_many_arguments)]
58 pub fn new(
59 peer_id: PeerId,
60 direction: PeerDirection,
61 link: Arc<dyn BluetoothLink>,
62 store: Option<Arc<dyn ContentStore>>,
63 nostr_relay: Option<Arc<NostrRelay>>,
64 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
65 traffic_state: Option<Arc<WebRTCState>>,
66 ) -> Arc<Self> {
67 let peer = Arc::new(Self {
68 peer_id,
69 direction,
70 created_at: std::time::Instant::now(),
71 connected_at: Some(std::time::Instant::now()),
72 link,
73 store,
74 pending_requests: Arc::new(Mutex::new(HashMap::new())),
75 pending_nostr_queries: Arc::new(Mutex::new(HashMap::new())),
76 nostr_relay,
77 mesh_frame_tx,
78 traffic_state,
79 seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
80 BLUETOOTH_SEEN_EVENT_CAP,
81 BLUETOOTH_SEEN_EVENT_TTL,
82 ))),
83 htl_config: PeerHTLConfig::random(),
84 });
85 Self::spawn_reader(peer.clone());
86 peer
87 }
88
89 async fn mark_seen_event_id(&self, event_id: String) -> bool {
90 self.seen_event_ids.lock().await.insert_if_new(event_id)
91 }
92
93 fn spawn_reader(peer: Arc<Self>) {
94 tokio::spawn(async move {
95 let mut nostr_forward_task = None;
96 let mut nostr_client_id = None;
97
98 if let Some(relay) = peer.nostr_relay.as_ref() {
99 let client_id = relay.next_client_id();
100 let (nostr_tx, mut nostr_rx) = mpsc::unbounded_channel::<String>();
101 relay
102 .register_client(client_id, nostr_tx, Some(peer.peer_id.pubkey.clone()))
103 .await;
104 nostr_client_id = Some(client_id);
105
106 let live_subscription_id =
107 NostrSubscriptionId::new(format!("bluetooth-live-{}", rand::random::<u64>()));
108 let _ = relay
109 .register_subscription_query(
110 client_id,
111 live_subscription_id.clone(),
112 vec![NostrFilter::new().since(Timestamp::now())],
113 )
114 .await;
115
116 let peer_for_forward = peer.clone();
117 nostr_forward_task = Some(tokio::spawn(async move {
118 while let Some(text) = nostr_rx.recv().await {
119 if let Ok(NostrRelayMessage::Event {
120 subscription_id,
121 event,
122 }) = NostrRelayMessage::from_json(&text)
123 {
124 if subscription_id == live_subscription_id {
125 if event.kind.is_ephemeral()
126 || !peer_for_forward.mark_seen_event_id(event.id.to_hex()).await
127 {
128 continue;
129 }
130 if peer_for_forward
131 .send_frame(BluetoothFrame::Text(event.as_json()))
132 .await
133 .is_err()
134 {
135 break;
136 }
137 continue;
138 }
139 }
140 if peer_for_forward
141 .send_frame(BluetoothFrame::Text(text))
142 .await
143 .is_err()
144 {
145 break;
146 }
147 }
148 }));
149 }
150
151 while let Some(frame) = peer.link.recv().await {
152 match frame {
153 BluetoothFrame::Binary(data) => {
154 if let Err(err) = peer.handle_binary_frame(data).await {
155 debug!(
156 "[BluetoothPeer {}] Ignoring invalid binary frame: {}",
157 peer.peer_id.short(),
158 err
159 );
160 }
161 }
162 BluetoothFrame::Text(text) => {
163 peer.handle_text_frame(text, nostr_client_id).await;
164 }
165 }
166 }
167
168 if let (Some(relay), Some(client_id)) = (peer.nostr_relay.as_ref(), nostr_client_id) {
169 relay.unregister_client(client_id).await;
170 }
171
172 if let Some(task) = nostr_forward_task {
173 let _ = task.await;
174 }
175 });
176 }
177
178 async fn handle_binary_frame(&self, data: Vec<u8>) -> Result<()> {
179 self.record_received(data.len() as u64).await;
180 match parse_message(&data)? {
181 DataMessage::Request(req) => {
182 let hash_hex = hash_to_hex(&req.h);
183 if let Some(store) = self.store.as_ref() {
184 if let Ok(Some(data)) = store.get(&hash_hex) {
185 let response = DataResponse { h: req.h, d: data };
186 let wire = encode_response(&response)?;
187 self.send_frame(BluetoothFrame::Binary(wire)).await?;
188 }
189 }
190 }
191 DataMessage::Response(res) => {
192 let hash_hex = hash_to_hex(&res.h);
193 if let Some(sender) = self.pending_requests.lock().await.remove(&hash_hex) {
194 let _ = sender.send(Some(res.d));
195 }
196 }
197 other => {
198 debug!(
199 "[BluetoothPeer {}] Ignoring unsupported Bluetooth data frame {:?}",
200 self.peer_id.short(),
201 other
202 );
203 }
204 }
205 Ok(())
206 }
207
208 async fn handle_text_frame(&self, text: String, nostr_client_id: Option<u64>) {
209 self.record_received(text.len() as u64).await;
210 if let Ok(mesh_frame) = serde_json::from_str::<MeshNostrFrame>(&text) {
211 if let Some(tx) = self.mesh_frame_tx.as_ref() {
212 let _ = tx.send((self.peer_id.clone(), mesh_frame)).await;
213 return;
214 }
215 }
216
217 if let Ok(relay_msg) = NostrRelayMessage::from_json(&text) {
218 if let Some(sub_id) = relay_subscription_id(&relay_msg) {
219 let sender = {
220 let pending = self.pending_nostr_queries.lock().await;
221 pending.get(&sub_id).cloned()
222 };
223 if let Some(tx) = sender {
224 let _ = tx.send(relay_msg);
225 return;
226 }
227 }
228 }
229
230 if let Some(relay) = self.nostr_relay.as_ref() {
231 if let Ok(event) = nostr::Event::from_json(&text) {
232 if self.mark_seen_event_id(event.id.to_hex()).await {
233 let _ = relay
234 .ingest_trusted_event_from_bluetooth(event, Some(self.peer_id.to_string()))
235 .await;
236 }
237 return;
238 }
239
240 if let Ok(nostr_msg) = NostrClientMessage::from_json(&text) {
241 if let Some(client_id) = nostr_client_id {
242 relay.handle_client_message(client_id, nostr_msg).await;
243 }
244 }
245 }
246 }
247
248 pub fn is_connected(&self) -> bool {
249 self.link.is_open()
250 }
251
252 pub fn htl_config(&self) -> &PeerHTLConfig {
253 &self.htl_config
254 }
255
256 async fn record_sent(&self, bytes: u64) {
257 if let Some(state) = self.traffic_state.as_ref() {
258 state.record_sent(&self.peer_id.to_string(), bytes).await;
259 }
260 }
261
262 async fn record_received(&self, bytes: u64) {
263 if let Some(state) = self.traffic_state.as_ref() {
264 state
265 .record_received(&self.peer_id.to_string(), bytes)
266 .await;
267 }
268 }
269
270 async fn send_frame(&self, frame: BluetoothFrame) -> Result<()> {
271 let bytes = match &frame {
272 BluetoothFrame::Text(text) => text.len() as u64,
273 BluetoothFrame::Binary(payload) => payload.len() as u64,
274 };
275 self.link.send(frame).await?;
276 self.record_sent(bytes).await;
277 Ok(())
278 }
279
280 pub async fn request_with_timeout(
281 &self,
282 hash_hex: &str,
283 timeout: Duration,
284 ) -> Result<Option<Vec<u8>>> {
285 if !self.link.is_open() {
286 return Ok(None);
287 }
288
289 let hash = hex::decode(hash_hex)?;
290 let request = DataRequest {
291 h: hash,
292 htl: BLOB_REQUEST_POLICY.max_htl,
293 q: None,
294 };
295 let wire = encode_request(&request)?;
296 let (tx, rx) = oneshot::channel();
297 self.pending_requests
298 .lock()
299 .await
300 .insert(hash_hex.to_string(), tx);
301 self.send_frame(BluetoothFrame::Binary(wire)).await?;
302
303 match tokio::time::timeout(timeout, rx).await {
304 Ok(Ok(data)) => Ok(data),
305 Ok(Err(_)) => Ok(None),
306 Err(_) => {
307 self.pending_requests.lock().await.remove(hash_hex);
308 Ok(None)
309 }
310 }
311 }
312
313 pub async fn query_nostr_events(
314 &self,
315 filters: Vec<NostrFilter>,
316 timeout: Duration,
317 ) -> Result<Vec<nostr::Event>> {
318 let subscription_id = NostrSubscriptionId::generate();
319 let subscription_key = subscription_id.to_string();
320 let (tx, mut rx) = mpsc::unbounded_channel::<NostrRelayMessage>();
321
322 self.pending_nostr_queries
323 .lock()
324 .await
325 .insert(subscription_key.clone(), tx);
326
327 let req = NostrClientMessage::req(subscription_id.clone(), filters);
328 self.send_frame(BluetoothFrame::Text(req.as_json())).await?;
329
330 let mut events = Vec::new();
331 let deadline = tokio::time::Instant::now() + timeout;
332
333 loop {
334 let now = tokio::time::Instant::now();
335 if now >= deadline {
336 break;
337 }
338 match tokio::time::timeout(deadline - now, rx.recv()).await {
339 Ok(Some(NostrRelayMessage::Event {
340 subscription_id: sid,
341 event,
342 })) if sid == subscription_id => events.push(*event),
343 Ok(Some(NostrRelayMessage::EndOfStoredEvents(sid))) if sid == subscription_id => {
344 break;
345 }
346 Ok(Some(NostrRelayMessage::Closed {
347 subscription_id: sid,
348 ..
349 })) if sid == subscription_id => break,
350 Ok(Some(_)) => {}
351 Ok(None) | Err(_) => break,
352 }
353 }
354
355 let close = NostrClientMessage::close(subscription_id.clone());
356 let _ = self.send_frame(BluetoothFrame::Text(close.as_json())).await;
357 self.pending_nostr_queries
358 .lock()
359 .await
360 .remove(&subscription_key);
361 Ok(events)
362 }
363
364 pub async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
365 let text = serde_json::to_string(frame)?;
366 self.send_frame(BluetoothFrame::Text(text)).await
367 }
368
369 pub async fn close(&self) -> Result<()> {
370 self.link.close().await
371 }
372}
373
374fn relay_subscription_id(msg: &NostrRelayMessage) -> Option<String> {
375 match msg {
376 NostrRelayMessage::Event {
377 subscription_id, ..
378 } => Some(subscription_id.to_string()),
379 NostrRelayMessage::EndOfStoredEvents(subscription_id) => Some(subscription_id.to_string()),
380 NostrRelayMessage::Closed {
381 subscription_id, ..
382 } => Some(subscription_id.to_string()),
383 NostrRelayMessage::Count {
384 subscription_id, ..
385 } => Some(subscription_id.to_string()),
386 _ => None,
387 }
388}
389
390#[cfg(test)]
391pub struct MockBluetoothLink {
392 open: std::sync::atomic::AtomicBool,
393 tx: mpsc::Sender<BluetoothFrame>,
394 rx: Mutex<mpsc::Receiver<BluetoothFrame>>,
395}
396
397#[cfg(test)]
398impl MockBluetoothLink {
399 pub fn pair() -> (Arc<Self>, Arc<Self>) {
400 let (tx_a, rx_a) = mpsc::channel(32);
401 let (tx_b, rx_b) = mpsc::channel(32);
402 (
403 Arc::new(Self {
404 open: std::sync::atomic::AtomicBool::new(true),
405 tx: tx_a,
406 rx: Mutex::new(rx_b),
407 }),
408 Arc::new(Self {
409 open: std::sync::atomic::AtomicBool::new(true),
410 tx: tx_b,
411 rx: Mutex::new(rx_a),
412 }),
413 )
414 }
415}
416
417#[cfg(test)]
418#[async_trait]
419impl BluetoothLink for MockBluetoothLink {
420 async fn send(&self, frame: BluetoothFrame) -> Result<()> {
421 use std::sync::atomic::Ordering;
422 if !self.open.load(Ordering::Relaxed) {
423 return Ok(());
424 }
425 self.tx.send(frame).await.map_err(Into::into)
426 }
427
428 async fn recv(&self) -> Option<BluetoothFrame> {
429 self.rx.lock().await.recv().await
430 }
431
432 fn is_open(&self) -> bool {
433 use std::sync::atomic::Ordering;
434 self.open.load(Ordering::Relaxed)
435 }
436
437 async fn close(&self) -> Result<()> {
438 use std::sync::atomic::Ordering;
439 self.open.store(false, Ordering::Relaxed);
440 Ok(())
441 }
442}
443
444#[cfg(test)]
445mod tests {
446 use super::*;
447 use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
448 use crate::webrtc::signaling::{ConnectionState, PeerEntry, PeerSignalPath, PeerTransport};
449 use nostr::{EventBuilder, Filter, Keys, Kind};
450 use std::collections::HashSet;
451 use std::sync::Arc;
452 use std::time::Instant;
453 use tempfile::TempDir;
454
455 struct TestStore {
456 blobs: HashMap<String, Vec<u8>>,
457 }
458
459 impl ContentStore for TestStore {
460 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
461 Ok(self.blobs.get(hash_hex).cloned())
462 }
463 }
464
465 #[tokio::test]
466 async fn bluetooth_peer_round_trips_hash_request_over_mock_link() {
467 let (link_a, link_b) = MockBluetoothLink::pair();
468 let data = b"bluetooth mesh payload".to_vec();
469 let hash_hex = hex::encode(hashtree_core::sha256(&data));
470
471 let requester = BluetoothPeer::new(
472 PeerId::new("peer-a".to_string(), Some("sess-a".to_string())),
473 PeerDirection::Outbound,
474 link_a,
475 None,
476 None,
477 None,
478 None,
479 );
480
481 let mut blobs = HashMap::new();
482 blobs.insert(hash_hex.clone(), data.clone());
483 let responder = BluetoothPeer::new(
484 PeerId::new("peer-b".to_string(), Some("sess-b".to_string())),
485 PeerDirection::Inbound,
486 link_b,
487 Some(Arc::new(TestStore { blobs })),
488 None,
489 None,
490 None,
491 );
492
493 let received = requester
494 .request_with_timeout(&hash_hex, Duration::from_secs(1))
495 .await
496 .expect("request should succeed");
497
498 assert_eq!(received, Some(data));
499 responder.close().await.unwrap();
500 }
501
502 #[tokio::test]
503 async fn bluetooth_peer_records_bidirectional_bytes_in_router_state() {
504 let (link_a, link_b) = MockBluetoothLink::pair();
505 let state = Arc::new(WebRTCState::new());
506 let data = b"bluetooth stats payload".to_vec();
507 let hash_hex = hex::encode(hashtree_core::sha256(&data));
508 let requester_id = PeerId::new("peer-a".to_string(), Some("sess-a".to_string()));
509 let responder_id = PeerId::new("peer-b".to_string(), Some("sess-b".to_string()));
510
511 for peer_id in [&requester_id, &responder_id] {
512 state.peers.write().await.insert(
513 peer_id.to_string(),
514 PeerEntry {
515 peer_id: peer_id.clone(),
516 direction: PeerDirection::Outbound,
517 state: ConnectionState::Connected,
518 last_seen: Instant::now(),
519 peer: None,
520 pool: super::super::types::PeerPool::Other,
521 transport: PeerTransport::Bluetooth,
522 signal_paths: std::collections::BTreeSet::from([PeerSignalPath::Bluetooth]),
523 bytes_sent: 0,
524 bytes_received: 0,
525 },
526 );
527 }
528
529 let requester = BluetoothPeer::new(
530 requester_id.clone(),
531 PeerDirection::Outbound,
532 link_a,
533 None,
534 None,
535 None,
536 Some(state.clone()),
537 );
538
539 let mut blobs = HashMap::new();
540 blobs.insert(hash_hex.clone(), data.clone());
541 let responder = BluetoothPeer::new(
542 responder_id.clone(),
543 PeerDirection::Inbound,
544 link_b,
545 Some(Arc::new(TestStore { blobs })),
546 None,
547 None,
548 Some(state.clone()),
549 );
550
551 let received = requester
552 .request_with_timeout(&hash_hex, Duration::from_secs(1))
553 .await
554 .expect("request should succeed");
555
556 assert_eq!(received, Some(data.clone()));
557 let hash = hex::decode(&hash_hex).expect("valid hash hex");
558 let expected_request_len = encode_request(&DataRequest {
559 h: hash.clone(),
560 htl: BLOB_REQUEST_POLICY.max_htl,
561 q: None,
562 })
563 .expect("request encoding")
564 .len() as u64;
565 let expected_response_len = encode_response(&DataResponse {
566 h: hash,
567 d: data.clone(),
568 })
569 .expect("response encoding")
570 .len() as u64;
571
572 let peers = state.peers.read().await;
573 let requester_stats = peers
574 .get(&requester_id.to_string())
575 .expect("requester stats");
576 let responder_stats = peers
577 .get(&responder_id.to_string())
578 .expect("responder stats");
579 assert_eq!(requester_stats.bytes_sent, expected_request_len);
580 assert_eq!(requester_stats.bytes_received, expected_response_len);
581 assert_eq!(responder_stats.bytes_received, expected_request_len);
582 assert_eq!(responder_stats.bytes_sent, expected_response_len);
583 drop(peers);
584
585 responder.close().await.unwrap();
586 }
587
588 #[tokio::test]
589 async fn bluetooth_peer_round_trips_nostr_queries_over_mock_link() -> Result<()> {
590 let (link_a, link_b) = MockBluetoothLink::pair();
591 let tmp = TempDir::new()?;
592 let graph_store = {
593 let _guard = crate::socialgraph::test_lock();
594 crate::socialgraph::open_social_graph_store_with_mapsize(
595 tmp.path(),
596 Some(128 * 1024 * 1024),
597 )?
598 };
599 let author_keys = Keys::generate();
600 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
601 let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
602 Arc::clone(&backend),
603 0,
604 HashSet::from([author_keys.public_key().to_hex()]),
605 ));
606 let relay = Arc::new(NostrRelay::new(
607 Arc::clone(&backend),
608 tmp.path().to_path_buf(),
609 HashSet::from([author_keys.public_key().to_hex()]),
610 Some(access),
611 NostrRelayConfig {
612 spambox_db_max_bytes: 0,
613 ..Default::default()
614 },
615 )?);
616
617 let requester = BluetoothPeer::new(
618 PeerId::new("peer-a".to_string(), Some("sess-a".to_string())),
619 PeerDirection::Outbound,
620 link_a,
621 None,
622 None,
623 None,
624 None,
625 );
626 let responder = BluetoothPeer::new(
627 PeerId::new("peer-b".to_string(), Some("sess-b".to_string())),
628 PeerDirection::Inbound,
629 link_b,
630 None,
631 Some(relay.clone()),
632 None,
633 None,
634 );
635
636 let event = EventBuilder::new(Kind::TextNote, "bluetooth nostr relay", [])
637 .to_event(&author_keys)?;
638 relay.ingest_trusted_event(event.clone()).await?;
639 tokio::time::sleep(Duration::from_millis(50)).await;
640
641 let events = requester
642 .query_nostr_events(
643 vec![Filter::new()
644 .authors(vec![event.pubkey])
645 .kinds(vec![event.kind])],
646 Duration::from_secs(1),
647 )
648 .await?;
649
650 assert_eq!(events.len(), 1);
651 assert_eq!(events[0].id, event.id);
652 responder.close().await?;
653 Ok(())
654 }
655
656 #[tokio::test]
657 async fn bluetooth_peer_forwards_local_publishes_and_records_bluetooth_provenance() -> Result<()>
658 {
659 let (link_a, link_b) = MockBluetoothLink::pair();
660 let tmp_a = TempDir::new()?;
661 let tmp_b = TempDir::new()?;
662
663 let graph_store_a = {
664 let _guard = crate::socialgraph::test_lock();
665 crate::socialgraph::open_social_graph_store_with_mapsize(
666 tmp_a.path(),
667 Some(128 * 1024 * 1024),
668 )?
669 };
670 let graph_store_b = {
671 let _guard = crate::socialgraph::test_lock();
672 crate::socialgraph::open_social_graph_store_with_mapsize(
673 tmp_b.path(),
674 Some(128 * 1024 * 1024),
675 )?
676 };
677 let author_keys = Keys::generate();
678
679 let backend_a: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store_a.clone();
680 let backend_b: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store_b.clone();
681 let access_a = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
682 Arc::clone(&backend_a),
683 0,
684 HashSet::from([author_keys.public_key().to_hex()]),
685 ));
686 let access_b = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
687 Arc::clone(&backend_b),
688 0,
689 HashSet::from([author_keys.public_key().to_hex()]),
690 ));
691
692 let relay_a = Arc::new(NostrRelay::new(
693 Arc::clone(&backend_a),
694 tmp_a.path().to_path_buf(),
695 HashSet::from([author_keys.public_key().to_hex()]),
696 Some(access_a),
697 NostrRelayConfig {
698 spambox_db_max_bytes: 0,
699 ..Default::default()
700 },
701 )?);
702 let relay_b = Arc::new(NostrRelay::new(
703 Arc::clone(&backend_b),
704 tmp_b.path().to_path_buf(),
705 HashSet::from([author_keys.public_key().to_hex()]),
706 Some(access_b),
707 NostrRelayConfig {
708 spambox_db_max_bytes: 0,
709 ..Default::default()
710 },
711 )?);
712
713 let sender = BluetoothPeer::new(
714 PeerId::new("peer-a".to_string(), Some("sess-a".to_string())),
715 PeerDirection::Outbound,
716 link_a,
717 None,
718 Some(relay_a.clone()),
719 None,
720 None,
721 );
722 let receiver = BluetoothPeer::new(
723 PeerId::new("peer-b".to_string(), Some("sess-b".to_string())),
724 PeerDirection::Inbound,
725 link_b,
726 None,
727 Some(relay_b.clone()),
728 None,
729 None,
730 );
731
732 tokio::time::sleep(Duration::from_millis(50)).await;
733
734 let cid = "ab".repeat(32);
735 let event = EventBuilder::new(
736 Kind::TextNote,
737 "bluetooth publish sync",
738 [nostr::Tag::parse(&["cid", &cid]).unwrap()],
739 )
740 .to_event(&author_keys)?;
741 relay_a.ingest_trusted_event(event.clone()).await?;
742
743 tokio::time::sleep(Duration::from_millis(150)).await;
744
745 let received = relay_b
746 .query_events(
747 &Filter::new()
748 .authors(vec![event.pubkey])
749 .kinds(vec![event.kind]),
750 10,
751 )
752 .await;
753 assert_eq!(
754 received
755 .iter()
756 .filter(|candidate| candidate.id == event.id)
757 .count(),
758 1
759 );
760
761 let bluetooth_events = relay_b.bluetooth_received_events(10).await;
762 assert_eq!(bluetooth_events.len(), 1);
763 assert_eq!(bluetooth_events[0].event_id, event.id.to_hex());
764 assert_eq!(
765 bluetooth_events[0].peer_id.as_deref(),
766 Some("peer-b:sess-b")
767 );
768 assert_eq!(bluetooth_events[0].cid_values, vec![cid]);
769
770 receiver.close().await?;
771 sender.close().await?;
772 Ok(())
773 }
774}