Skip to main content

hashtree_cli/webrtc/
bluetooth_peer.rs

1use anyhow::Result;
2use async_trait::async_trait;
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::sync::{mpsc, oneshot, Mutex};
7use tracing::debug;
8
9use crate::nostr_relay::NostrRelay;
10
11use super::peer::ContentStore;
12use super::signaling::WebRTCState;
13use super::types::{
14    encode_request, encode_response, hash_to_hex, parse_message, DataMessage, DataRequest,
15    DataResponse, MeshNostrFrame, PeerDirection, PeerHTLConfig, PeerId, TimedSeenSet,
16    BLOB_REQUEST_POLICY,
17};
18use nostr::{
19    ClientMessage as NostrClientMessage, Filter as NostrFilter, JsonUtil as NostrJsonUtil,
20    RelayMessage as NostrRelayMessage, SubscriptionId as NostrSubscriptionId, Timestamp,
21};
22
23const BLUETOOTH_SEEN_EVENT_CAP: usize = 2048;
24const BLUETOOTH_SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
25
26#[derive(Debug, Clone)]
27pub enum BluetoothFrame {
28    Text(String),
29    Binary(Vec<u8>),
30}
31
32#[async_trait]
33pub trait BluetoothLink: Send + Sync {
34    async fn send(&self, frame: BluetoothFrame) -> Result<()>;
35    async fn recv(&self) -> Option<BluetoothFrame>;
36    fn is_open(&self) -> bool;
37    async fn close(&self) -> Result<()>;
38}
39
40pub struct BluetoothPeer {
41    pub peer_id: PeerId,
42    pub direction: PeerDirection,
43    pub created_at: std::time::Instant,
44    pub connected_at: Option<std::time::Instant>,
45    link: Arc<dyn BluetoothLink>,
46    store: Option<Arc<dyn ContentStore>>,
47    pending_requests: Arc<Mutex<HashMap<String, oneshot::Sender<Option<Vec<u8>>>>>>,
48    pending_nostr_queries: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>>,
49    nostr_relay: Option<Arc<NostrRelay>>,
50    mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
51    traffic_state: Option<Arc<WebRTCState>>,
52    seen_event_ids: Arc<Mutex<TimedSeenSet>>,
53    htl_config: PeerHTLConfig,
54}
55
56impl BluetoothPeer {
57    #[allow(clippy::too_many_arguments)]
58    pub fn new(
59        peer_id: PeerId,
60        direction: PeerDirection,
61        link: Arc<dyn BluetoothLink>,
62        store: Option<Arc<dyn ContentStore>>,
63        nostr_relay: Option<Arc<NostrRelay>>,
64        mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
65        traffic_state: Option<Arc<WebRTCState>>,
66    ) -> Arc<Self> {
67        let peer = Arc::new(Self {
68            peer_id,
69            direction,
70            created_at: std::time::Instant::now(),
71            connected_at: Some(std::time::Instant::now()),
72            link,
73            store,
74            pending_requests: Arc::new(Mutex::new(HashMap::new())),
75            pending_nostr_queries: Arc::new(Mutex::new(HashMap::new())),
76            nostr_relay,
77            mesh_frame_tx,
78            traffic_state,
79            seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
80                BLUETOOTH_SEEN_EVENT_CAP,
81                BLUETOOTH_SEEN_EVENT_TTL,
82            ))),
83            htl_config: PeerHTLConfig::random(),
84        });
85        Self::spawn_reader(peer.clone());
86        peer
87    }
88
89    async fn mark_seen_event_id(&self, event_id: String) -> bool {
90        self.seen_event_ids.lock().await.insert_if_new(event_id)
91    }
92
93    fn spawn_reader(peer: Arc<Self>) {
94        tokio::spawn(async move {
95            let mut nostr_forward_task = None;
96            let mut nostr_client_id = None;
97
98            if let Some(relay) = peer.nostr_relay.as_ref() {
99                let client_id = relay.next_client_id();
100                let (nostr_tx, mut nostr_rx) = mpsc::unbounded_channel::<String>();
101                relay
102                    .register_client(client_id, nostr_tx, Some(peer.peer_id.pubkey.clone()))
103                    .await;
104                nostr_client_id = Some(client_id);
105
106                let live_subscription_id =
107                    NostrSubscriptionId::new(format!("bluetooth-live-{}", rand::random::<u64>()));
108                let _ = relay
109                    .register_subscription_query(
110                        client_id,
111                        live_subscription_id.clone(),
112                        vec![NostrFilter::new().since(Timestamp::now())],
113                    )
114                    .await;
115
116                let peer_for_forward = peer.clone();
117                nostr_forward_task = Some(tokio::spawn(async move {
118                    while let Some(text) = nostr_rx.recv().await {
119                        if let Ok(NostrRelayMessage::Event {
120                            subscription_id,
121                            event,
122                        }) = NostrRelayMessage::from_json(&text)
123                        {
124                            if subscription_id == live_subscription_id {
125                                if event.kind.is_ephemeral()
126                                    || !peer_for_forward.mark_seen_event_id(event.id.to_hex()).await
127                                {
128                                    continue;
129                                }
130                                if peer_for_forward
131                                    .send_frame(BluetoothFrame::Text(event.as_json()))
132                                    .await
133                                    .is_err()
134                                {
135                                    break;
136                                }
137                                continue;
138                            }
139                        }
140                        if peer_for_forward
141                            .send_frame(BluetoothFrame::Text(text))
142                            .await
143                            .is_err()
144                        {
145                            break;
146                        }
147                    }
148                }));
149            }
150
151            while let Some(frame) = peer.link.recv().await {
152                match frame {
153                    BluetoothFrame::Binary(data) => {
154                        if let Err(err) = peer.handle_binary_frame(data).await {
155                            debug!(
156                                "[BluetoothPeer {}] Ignoring invalid binary frame: {}",
157                                peer.peer_id.short(),
158                                err
159                            );
160                        }
161                    }
162                    BluetoothFrame::Text(text) => {
163                        peer.handle_text_frame(text, nostr_client_id).await;
164                    }
165                }
166            }
167
168            if let (Some(relay), Some(client_id)) = (peer.nostr_relay.as_ref(), nostr_client_id) {
169                relay.unregister_client(client_id).await;
170            }
171
172            if let Some(task) = nostr_forward_task {
173                let _ = task.await;
174            }
175        });
176    }
177
178    async fn handle_binary_frame(&self, data: Vec<u8>) -> Result<()> {
179        self.record_received(data.len() as u64).await;
180        match parse_message(&data)? {
181            DataMessage::Request(req) => {
182                let hash_hex = hash_to_hex(&req.h);
183                if let Some(store) = self.store.as_ref() {
184                    if let Ok(Some(data)) = store.get(&hash_hex) {
185                        let response = DataResponse { h: req.h, d: data };
186                        let wire = encode_response(&response)?;
187                        self.send_frame(BluetoothFrame::Binary(wire)).await?;
188                    }
189                }
190            }
191            DataMessage::Response(res) => {
192                let hash_hex = hash_to_hex(&res.h);
193                if let Some(sender) = self.pending_requests.lock().await.remove(&hash_hex) {
194                    let _ = sender.send(Some(res.d));
195                }
196            }
197            other => {
198                debug!(
199                    "[BluetoothPeer {}] Ignoring unsupported Bluetooth data frame {:?}",
200                    self.peer_id.short(),
201                    other
202                );
203            }
204        }
205        Ok(())
206    }
207
208    async fn handle_text_frame(&self, text: String, nostr_client_id: Option<u64>) {
209        self.record_received(text.len() as u64).await;
210        if let Ok(mesh_frame) = serde_json::from_str::<MeshNostrFrame>(&text) {
211            if let Some(tx) = self.mesh_frame_tx.as_ref() {
212                let _ = tx.send((self.peer_id.clone(), mesh_frame)).await;
213                return;
214            }
215        }
216
217        if let Ok(relay_msg) = NostrRelayMessage::from_json(&text) {
218            if let Some(sub_id) = relay_subscription_id(&relay_msg) {
219                let sender = {
220                    let pending = self.pending_nostr_queries.lock().await;
221                    pending.get(&sub_id).cloned()
222                };
223                if let Some(tx) = sender {
224                    let _ = tx.send(relay_msg);
225                    return;
226                }
227            }
228        }
229
230        if let Some(relay) = self.nostr_relay.as_ref() {
231            if let Ok(event) = nostr::Event::from_json(&text) {
232                if self.mark_seen_event_id(event.id.to_hex()).await {
233                    let _ = relay
234                        .ingest_trusted_event_from_bluetooth(event, Some(self.peer_id.to_string()))
235                        .await;
236                }
237                return;
238            }
239
240            if let Ok(nostr_msg) = NostrClientMessage::from_json(&text) {
241                if let Some(client_id) = nostr_client_id {
242                    relay.handle_client_message(client_id, nostr_msg).await;
243                }
244            }
245        }
246    }
247
248    pub fn is_connected(&self) -> bool {
249        self.link.is_open()
250    }
251
252    pub fn htl_config(&self) -> &PeerHTLConfig {
253        &self.htl_config
254    }
255
256    async fn record_sent(&self, bytes: u64) {
257        if let Some(state) = self.traffic_state.as_ref() {
258            state.record_sent(&self.peer_id.to_string(), bytes).await;
259        }
260    }
261
262    async fn record_received(&self, bytes: u64) {
263        if let Some(state) = self.traffic_state.as_ref() {
264            state
265                .record_received(&self.peer_id.to_string(), bytes)
266                .await;
267        }
268    }
269
270    async fn send_frame(&self, frame: BluetoothFrame) -> Result<()> {
271        let bytes = match &frame {
272            BluetoothFrame::Text(text) => text.len() as u64,
273            BluetoothFrame::Binary(payload) => payload.len() as u64,
274        };
275        self.link.send(frame).await?;
276        self.record_sent(bytes).await;
277        Ok(())
278    }
279
280    pub async fn request_with_timeout(
281        &self,
282        hash_hex: &str,
283        timeout: Duration,
284    ) -> Result<Option<Vec<u8>>> {
285        if !self.link.is_open() {
286            return Ok(None);
287        }
288
289        let hash = hex::decode(hash_hex)?;
290        let request = DataRequest {
291            h: hash,
292            htl: BLOB_REQUEST_POLICY.max_htl,
293            q: None,
294        };
295        let wire = encode_request(&request)?;
296        let (tx, rx) = oneshot::channel();
297        self.pending_requests
298            .lock()
299            .await
300            .insert(hash_hex.to_string(), tx);
301        self.send_frame(BluetoothFrame::Binary(wire)).await?;
302
303        match tokio::time::timeout(timeout, rx).await {
304            Ok(Ok(data)) => Ok(data),
305            Ok(Err(_)) => Ok(None),
306            Err(_) => {
307                self.pending_requests.lock().await.remove(hash_hex);
308                Ok(None)
309            }
310        }
311    }
312
313    pub async fn query_nostr_events(
314        &self,
315        filters: Vec<NostrFilter>,
316        timeout: Duration,
317    ) -> Result<Vec<nostr::Event>> {
318        let subscription_id = NostrSubscriptionId::generate();
319        let subscription_key = subscription_id.to_string();
320        let (tx, mut rx) = mpsc::unbounded_channel::<NostrRelayMessage>();
321
322        self.pending_nostr_queries
323            .lock()
324            .await
325            .insert(subscription_key.clone(), tx);
326
327        let req = NostrClientMessage::req(subscription_id.clone(), filters);
328        self.send_frame(BluetoothFrame::Text(req.as_json())).await?;
329
330        let mut events = Vec::new();
331        let deadline = tokio::time::Instant::now() + timeout;
332
333        loop {
334            let now = tokio::time::Instant::now();
335            if now >= deadline {
336                break;
337            }
338            match tokio::time::timeout(deadline - now, rx.recv()).await {
339                Ok(Some(NostrRelayMessage::Event {
340                    subscription_id: sid,
341                    event,
342                })) if sid == subscription_id => events.push(*event),
343                Ok(Some(NostrRelayMessage::EndOfStoredEvents(sid))) if sid == subscription_id => {
344                    break;
345                }
346                Ok(Some(NostrRelayMessage::Closed {
347                    subscription_id: sid,
348                    ..
349                })) if sid == subscription_id => break,
350                Ok(Some(_)) => {}
351                Ok(None) | Err(_) => break,
352            }
353        }
354
355        let close = NostrClientMessage::close(subscription_id.clone());
356        let _ = self.send_frame(BluetoothFrame::Text(close.as_json())).await;
357        self.pending_nostr_queries
358            .lock()
359            .await
360            .remove(&subscription_key);
361        Ok(events)
362    }
363
364    pub async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
365        let text = serde_json::to_string(frame)?;
366        self.send_frame(BluetoothFrame::Text(text)).await
367    }
368
369    pub async fn close(&self) -> Result<()> {
370        self.link.close().await
371    }
372}
373
374fn relay_subscription_id(msg: &NostrRelayMessage) -> Option<String> {
375    match msg {
376        NostrRelayMessage::Event {
377            subscription_id, ..
378        } => Some(subscription_id.to_string()),
379        NostrRelayMessage::EndOfStoredEvents(subscription_id) => Some(subscription_id.to_string()),
380        NostrRelayMessage::Closed {
381            subscription_id, ..
382        } => Some(subscription_id.to_string()),
383        NostrRelayMessage::Count {
384            subscription_id, ..
385        } => Some(subscription_id.to_string()),
386        _ => None,
387    }
388}
389
390#[cfg(test)]
391pub struct MockBluetoothLink {
392    open: std::sync::atomic::AtomicBool,
393    tx: mpsc::Sender<BluetoothFrame>,
394    rx: Mutex<mpsc::Receiver<BluetoothFrame>>,
395}
396
397#[cfg(test)]
398impl MockBluetoothLink {
399    pub fn pair() -> (Arc<Self>, Arc<Self>) {
400        let (tx_a, rx_a) = mpsc::channel(32);
401        let (tx_b, rx_b) = mpsc::channel(32);
402        (
403            Arc::new(Self {
404                open: std::sync::atomic::AtomicBool::new(true),
405                tx: tx_a,
406                rx: Mutex::new(rx_b),
407            }),
408            Arc::new(Self {
409                open: std::sync::atomic::AtomicBool::new(true),
410                tx: tx_b,
411                rx: Mutex::new(rx_a),
412            }),
413        )
414    }
415}
416
417#[cfg(test)]
418#[async_trait]
419impl BluetoothLink for MockBluetoothLink {
420    async fn send(&self, frame: BluetoothFrame) -> Result<()> {
421        use std::sync::atomic::Ordering;
422        if !self.open.load(Ordering::Relaxed) {
423            return Ok(());
424        }
425        self.tx.send(frame).await.map_err(Into::into)
426    }
427
428    async fn recv(&self) -> Option<BluetoothFrame> {
429        self.rx.lock().await.recv().await
430    }
431
432    fn is_open(&self) -> bool {
433        use std::sync::atomic::Ordering;
434        self.open.load(Ordering::Relaxed)
435    }
436
437    async fn close(&self) -> Result<()> {
438        use std::sync::atomic::Ordering;
439        self.open.store(false, Ordering::Relaxed);
440        Ok(())
441    }
442}
443
444#[cfg(test)]
445mod tests {
446    use super::*;
447    use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
448    use crate::webrtc::signaling::{ConnectionState, PeerEntry, PeerSignalPath, PeerTransport};
449    use nostr::{EventBuilder, Filter, Keys, Kind};
450    use std::collections::HashSet;
451    use std::sync::Arc;
452    use std::time::Instant;
453    use tempfile::TempDir;
454
455    struct TestStore {
456        blobs: HashMap<String, Vec<u8>>,
457    }
458
459    impl ContentStore for TestStore {
460        fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
461            Ok(self.blobs.get(hash_hex).cloned())
462        }
463    }
464
465    #[tokio::test]
466    async fn bluetooth_peer_round_trips_hash_request_over_mock_link() {
467        let (link_a, link_b) = MockBluetoothLink::pair();
468        let data = b"bluetooth mesh payload".to_vec();
469        let hash_hex = hex::encode(hashtree_core::sha256(&data));
470
471        let requester = BluetoothPeer::new(
472            PeerId::new("peer-a".to_string(), Some("sess-a".to_string())),
473            PeerDirection::Outbound,
474            link_a,
475            None,
476            None,
477            None,
478            None,
479        );
480
481        let mut blobs = HashMap::new();
482        blobs.insert(hash_hex.clone(), data.clone());
483        let responder = BluetoothPeer::new(
484            PeerId::new("peer-b".to_string(), Some("sess-b".to_string())),
485            PeerDirection::Inbound,
486            link_b,
487            Some(Arc::new(TestStore { blobs })),
488            None,
489            None,
490            None,
491        );
492
493        let received = requester
494            .request_with_timeout(&hash_hex, Duration::from_secs(1))
495            .await
496            .expect("request should succeed");
497
498        assert_eq!(received, Some(data));
499        responder.close().await.unwrap();
500    }
501
502    #[tokio::test]
503    async fn bluetooth_peer_records_bidirectional_bytes_in_router_state() {
504        let (link_a, link_b) = MockBluetoothLink::pair();
505        let state = Arc::new(WebRTCState::new());
506        let data = b"bluetooth stats payload".to_vec();
507        let hash_hex = hex::encode(hashtree_core::sha256(&data));
508        let requester_id = PeerId::new("peer-a".to_string(), Some("sess-a".to_string()));
509        let responder_id = PeerId::new("peer-b".to_string(), Some("sess-b".to_string()));
510
511        for peer_id in [&requester_id, &responder_id] {
512            state.peers.write().await.insert(
513                peer_id.to_string(),
514                PeerEntry {
515                    peer_id: peer_id.clone(),
516                    direction: PeerDirection::Outbound,
517                    state: ConnectionState::Connected,
518                    last_seen: Instant::now(),
519                    peer: None,
520                    pool: super::super::types::PeerPool::Other,
521                    transport: PeerTransport::Bluetooth,
522                    signal_paths: std::collections::BTreeSet::from([PeerSignalPath::Bluetooth]),
523                    bytes_sent: 0,
524                    bytes_received: 0,
525                },
526            );
527        }
528
529        let requester = BluetoothPeer::new(
530            requester_id.clone(),
531            PeerDirection::Outbound,
532            link_a,
533            None,
534            None,
535            None,
536            Some(state.clone()),
537        );
538
539        let mut blobs = HashMap::new();
540        blobs.insert(hash_hex.clone(), data.clone());
541        let responder = BluetoothPeer::new(
542            responder_id.clone(),
543            PeerDirection::Inbound,
544            link_b,
545            Some(Arc::new(TestStore { blobs })),
546            None,
547            None,
548            Some(state.clone()),
549        );
550
551        let received = requester
552            .request_with_timeout(&hash_hex, Duration::from_secs(1))
553            .await
554            .expect("request should succeed");
555
556        assert_eq!(received, Some(data.clone()));
557        let hash = hex::decode(&hash_hex).expect("valid hash hex");
558        let expected_request_len = encode_request(&DataRequest {
559            h: hash.clone(),
560            htl: BLOB_REQUEST_POLICY.max_htl,
561            q: None,
562        })
563        .expect("request encoding")
564        .len() as u64;
565        let expected_response_len = encode_response(&DataResponse {
566            h: hash,
567            d: data.clone(),
568        })
569        .expect("response encoding")
570        .len() as u64;
571
572        let peers = state.peers.read().await;
573        let requester_stats = peers
574            .get(&requester_id.to_string())
575            .expect("requester stats");
576        let responder_stats = peers
577            .get(&responder_id.to_string())
578            .expect("responder stats");
579        assert_eq!(requester_stats.bytes_sent, expected_request_len);
580        assert_eq!(requester_stats.bytes_received, expected_response_len);
581        assert_eq!(responder_stats.bytes_received, expected_request_len);
582        assert_eq!(responder_stats.bytes_sent, expected_response_len);
583        drop(peers);
584
585        responder.close().await.unwrap();
586    }
587
588    #[tokio::test]
589    async fn bluetooth_peer_round_trips_nostr_queries_over_mock_link() -> Result<()> {
590        let (link_a, link_b) = MockBluetoothLink::pair();
591        let tmp = TempDir::new()?;
592        let graph_store = {
593            let _guard = crate::socialgraph::test_lock();
594            crate::socialgraph::open_social_graph_store_with_mapsize(
595                tmp.path(),
596                Some(128 * 1024 * 1024),
597            )?
598        };
599        let author_keys = Keys::generate();
600        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
601        let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
602            Arc::clone(&backend),
603            0,
604            HashSet::from([author_keys.public_key().to_hex()]),
605        ));
606        let relay = Arc::new(NostrRelay::new(
607            Arc::clone(&backend),
608            tmp.path().to_path_buf(),
609            HashSet::from([author_keys.public_key().to_hex()]),
610            Some(access),
611            NostrRelayConfig {
612                spambox_db_max_bytes: 0,
613                ..Default::default()
614            },
615        )?);
616
617        let requester = BluetoothPeer::new(
618            PeerId::new("peer-a".to_string(), Some("sess-a".to_string())),
619            PeerDirection::Outbound,
620            link_a,
621            None,
622            None,
623            None,
624            None,
625        );
626        let responder = BluetoothPeer::new(
627            PeerId::new("peer-b".to_string(), Some("sess-b".to_string())),
628            PeerDirection::Inbound,
629            link_b,
630            None,
631            Some(relay.clone()),
632            None,
633            None,
634        );
635
636        let event = EventBuilder::new(Kind::TextNote, "bluetooth nostr relay", [])
637            .to_event(&author_keys)?;
638        relay.ingest_trusted_event(event.clone()).await?;
639        tokio::time::sleep(Duration::from_millis(50)).await;
640
641        let events = requester
642            .query_nostr_events(
643                vec![Filter::new()
644                    .authors(vec![event.pubkey])
645                    .kinds(vec![event.kind])],
646                Duration::from_secs(1),
647            )
648            .await?;
649
650        assert_eq!(events.len(), 1);
651        assert_eq!(events[0].id, event.id);
652        responder.close().await?;
653        Ok(())
654    }
655
656    #[tokio::test]
657    async fn bluetooth_peer_forwards_local_publishes_and_records_bluetooth_provenance() -> Result<()>
658    {
659        let (link_a, link_b) = MockBluetoothLink::pair();
660        let tmp_a = TempDir::new()?;
661        let tmp_b = TempDir::new()?;
662
663        let graph_store_a = {
664            let _guard = crate::socialgraph::test_lock();
665            crate::socialgraph::open_social_graph_store_with_mapsize(
666                tmp_a.path(),
667                Some(128 * 1024 * 1024),
668            )?
669        };
670        let graph_store_b = {
671            let _guard = crate::socialgraph::test_lock();
672            crate::socialgraph::open_social_graph_store_with_mapsize(
673                tmp_b.path(),
674                Some(128 * 1024 * 1024),
675            )?
676        };
677        let author_keys = Keys::generate();
678
679        let backend_a: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store_a.clone();
680        let backend_b: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store_b.clone();
681        let access_a = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
682            Arc::clone(&backend_a),
683            0,
684            HashSet::from([author_keys.public_key().to_hex()]),
685        ));
686        let access_b = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
687            Arc::clone(&backend_b),
688            0,
689            HashSet::from([author_keys.public_key().to_hex()]),
690        ));
691
692        let relay_a = Arc::new(NostrRelay::new(
693            Arc::clone(&backend_a),
694            tmp_a.path().to_path_buf(),
695            HashSet::from([author_keys.public_key().to_hex()]),
696            Some(access_a),
697            NostrRelayConfig {
698                spambox_db_max_bytes: 0,
699                ..Default::default()
700            },
701        )?);
702        let relay_b = Arc::new(NostrRelay::new(
703            Arc::clone(&backend_b),
704            tmp_b.path().to_path_buf(),
705            HashSet::from([author_keys.public_key().to_hex()]),
706            Some(access_b),
707            NostrRelayConfig {
708                spambox_db_max_bytes: 0,
709                ..Default::default()
710            },
711        )?);
712
713        let sender = BluetoothPeer::new(
714            PeerId::new("peer-a".to_string(), Some("sess-a".to_string())),
715            PeerDirection::Outbound,
716            link_a,
717            None,
718            Some(relay_a.clone()),
719            None,
720            None,
721        );
722        let receiver = BluetoothPeer::new(
723            PeerId::new("peer-b".to_string(), Some("sess-b".to_string())),
724            PeerDirection::Inbound,
725            link_b,
726            None,
727            Some(relay_b.clone()),
728            None,
729            None,
730        );
731
732        tokio::time::sleep(Duration::from_millis(50)).await;
733
734        let cid = "ab".repeat(32);
735        let event = EventBuilder::new(
736            Kind::TextNote,
737            "bluetooth publish sync",
738            [nostr::Tag::parse(&["cid", &cid]).unwrap()],
739        )
740        .to_event(&author_keys)?;
741        relay_a.ingest_trusted_event(event.clone()).await?;
742
743        tokio::time::sleep(Duration::from_millis(150)).await;
744
745        let received = relay_b
746            .query_events(
747                &Filter::new()
748                    .authors(vec![event.pubkey])
749                    .kinds(vec![event.kind]),
750                10,
751            )
752            .await;
753        assert_eq!(
754            received
755                .iter()
756                .filter(|candidate| candidate.id == event.id)
757                .count(),
758            1
759        );
760
761        let bluetooth_events = relay_b.bluetooth_received_events(10).await;
762        assert_eq!(bluetooth_events.len(), 1);
763        assert_eq!(bluetooth_events[0].event_id, event.id.to_hex());
764        assert_eq!(
765            bluetooth_events[0].peer_id.as_deref(),
766            Some("peer-b:sess-b")
767        );
768        assert_eq!(bluetooth_events[0].cid_values, vec![cid]);
769
770        receiver.close().await?;
771        sender.close().await?;
772        Ok(())
773    }
774}