Skip to main content

hashtree_cli/webrtc/
bluetooth_peer.rs

1use anyhow::Result;
2use async_trait::async_trait;
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::sync::{mpsc, oneshot, Mutex};
7use tracing::debug;
8
9use crate::nostr_relay::NostrRelay;
10
11use super::peer::ContentStore;
12use super::signaling::WebRTCState;
13use super::types::{
14    encode_request, encode_response, hash_to_hex, parse_message, DataMessage, DataRequest,
15    DataResponse, MeshNostrFrame, PeerDirection, PeerHTLConfig, PeerId, TimedSeenSet,
16    BLOB_REQUEST_POLICY,
17};
18use nostr::{
19    ClientMessage as NostrClientMessage, Filter as NostrFilter, JsonUtil as NostrJsonUtil,
20    RelayMessage as NostrRelayMessage, SubscriptionId as NostrSubscriptionId, Timestamp,
21};
22
23const BLUETOOTH_SEEN_EVENT_CAP: usize = 2048;
24const BLUETOOTH_SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
25
26#[derive(Debug, Clone)]
27pub enum BluetoothFrame {
28    Text(String),
29    Binary(Vec<u8>),
30}
31
32#[async_trait]
33pub trait BluetoothLink: Send + Sync {
34    async fn send(&self, frame: BluetoothFrame) -> Result<()>;
35    async fn recv(&self) -> Option<BluetoothFrame>;
36    fn is_open(&self) -> bool;
37    async fn close(&self) -> Result<()>;
38}
39
40pub struct BluetoothPeer {
41    pub peer_id: PeerId,
42    pub direction: PeerDirection,
43    pub created_at: std::time::Instant,
44    pub connected_at: Option<std::time::Instant>,
45    link: Arc<dyn BluetoothLink>,
46    store: Option<Arc<dyn ContentStore>>,
47    pending_requests: Arc<Mutex<HashMap<String, oneshot::Sender<Option<Vec<u8>>>>>>,
48    pending_nostr_queries: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>>,
49    nostr_relay: Option<Arc<NostrRelay>>,
50    mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
51    traffic_state: Option<Arc<WebRTCState>>,
52    seen_event_ids: Arc<Mutex<TimedSeenSet>>,
53    htl_config: PeerHTLConfig,
54}
55
56impl BluetoothPeer {
57    #[allow(clippy::too_many_arguments)]
58    pub fn new(
59        peer_id: PeerId,
60        direction: PeerDirection,
61        link: Arc<dyn BluetoothLink>,
62        store: Option<Arc<dyn ContentStore>>,
63        nostr_relay: Option<Arc<NostrRelay>>,
64        mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
65        traffic_state: Option<Arc<WebRTCState>>,
66    ) -> Arc<Self> {
67        let peer = Arc::new(Self {
68            peer_id,
69            direction,
70            created_at: std::time::Instant::now(),
71            connected_at: Some(std::time::Instant::now()),
72            link,
73            store,
74            pending_requests: Arc::new(Mutex::new(HashMap::new())),
75            pending_nostr_queries: Arc::new(Mutex::new(HashMap::new())),
76            nostr_relay,
77            mesh_frame_tx,
78            traffic_state,
79            seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
80                BLUETOOTH_SEEN_EVENT_CAP,
81                BLUETOOTH_SEEN_EVENT_TTL,
82            ))),
83            htl_config: PeerHTLConfig::random(),
84        });
85        Self::spawn_reader(peer.clone());
86        peer
87    }
88
89    async fn mark_seen_event_id(&self, event_id: String) -> bool {
90        self.seen_event_ids.lock().await.insert_if_new(event_id)
91    }
92
93    fn spawn_reader(peer: Arc<Self>) {
94        tokio::spawn(async move {
95            let mut nostr_forward_task = None;
96            let mut nostr_client_id = None;
97
98            if let Some(relay) = peer.nostr_relay.as_ref() {
99                let client_id = relay.next_client_id();
100                let (nostr_tx, mut nostr_rx) = mpsc::unbounded_channel::<String>();
101                relay
102                    .register_client(client_id, nostr_tx, Some(peer.peer_id.pubkey.clone()))
103                    .await;
104                nostr_client_id = Some(client_id);
105
106                let live_subscription_id =
107                    NostrSubscriptionId::new(format!("bluetooth-live-{}", rand::random::<u64>()));
108                let _ = relay
109                    .register_subscription_query(
110                        client_id,
111                        live_subscription_id.clone(),
112                        vec![NostrFilter::new().since(Timestamp::now())],
113                    )
114                    .await;
115
116                let peer_for_forward = peer.clone();
117                nostr_forward_task = Some(tokio::spawn(async move {
118                    while let Some(text) = nostr_rx.recv().await {
119                        if let Ok(NostrRelayMessage::Event {
120                            subscription_id,
121                            event,
122                        }) = NostrRelayMessage::from_json(&text)
123                        {
124                            if subscription_id == live_subscription_id {
125                                if event.kind.is_ephemeral()
126                                    || !peer_for_forward.mark_seen_event_id(event.id.to_hex()).await
127                                {
128                                    continue;
129                                }
130                                if peer_for_forward
131                                    .send_frame(BluetoothFrame::Text(event.as_json()))
132                                    .await
133                                    .is_err()
134                                {
135                                    break;
136                                }
137                                continue;
138                            }
139                        }
140                        if peer_for_forward
141                            .send_frame(BluetoothFrame::Text(text))
142                            .await
143                            .is_err()
144                        {
145                            break;
146                        }
147                    }
148                }));
149            }
150
151            while let Some(frame) = peer.link.recv().await {
152                match frame {
153                    BluetoothFrame::Binary(data) => {
154                        if let Err(err) = peer.handle_binary_frame(data).await {
155                            debug!(
156                                "[BluetoothPeer {}] Ignoring invalid binary frame: {}",
157                                peer.peer_id.short(),
158                                err
159                            );
160                        }
161                    }
162                    BluetoothFrame::Text(text) => {
163                        peer.handle_text_frame(text, nostr_client_id).await;
164                    }
165                }
166            }
167
168            if let (Some(relay), Some(client_id)) = (peer.nostr_relay.as_ref(), nostr_client_id) {
169                relay.unregister_client(client_id).await;
170            }
171
172            if let Some(task) = nostr_forward_task {
173                let _ = task.await;
174            }
175        });
176    }
177
178    async fn handle_binary_frame(&self, data: Vec<u8>) -> Result<()> {
179        self.record_received(data.len() as u64).await;
180        match parse_message(&data)? {
181            DataMessage::Request(req) => {
182                let hash_hex = hash_to_hex(&req.h);
183                if let Some(store) = self.store.as_ref() {
184                    if let Ok(Some(data)) = store.get(&hash_hex) {
185                        let response = DataResponse { h: req.h, d: data };
186                        let wire = encode_response(&response)?;
187                        self.send_frame(BluetoothFrame::Binary(wire)).await?;
188                    }
189                }
190            }
191            DataMessage::Response(res) => {
192                let hash_hex = hash_to_hex(&res.h);
193                if let Some(sender) = self.pending_requests.lock().await.remove(&hash_hex) {
194                    let _ = sender.send(Some(res.d));
195                }
196            }
197            other => {
198                debug!(
199                    "[BluetoothPeer {}] Ignoring unsupported Bluetooth data frame {:?}",
200                    self.peer_id.short(),
201                    other
202                );
203            }
204        }
205        Ok(())
206    }
207
208    async fn handle_text_frame(&self, text: String, nostr_client_id: Option<u64>) {
209        self.record_received(text.len() as u64).await;
210        if let Ok(mesh_frame) = serde_json::from_str::<MeshNostrFrame>(&text) {
211            if let Some(tx) = self.mesh_frame_tx.as_ref() {
212                let _ = tx.send((self.peer_id.clone(), mesh_frame)).await;
213                return;
214            }
215        }
216
217        if let Ok(relay_msg) = NostrRelayMessage::from_json(&text) {
218            if let Some(sub_id) = relay_subscription_id(&relay_msg) {
219                let sender = {
220                    let pending = self.pending_nostr_queries.lock().await;
221                    pending.get(&sub_id).cloned()
222                };
223                if let Some(tx) = sender {
224                    let _ = tx.send(relay_msg);
225                    return;
226                }
227            }
228        }
229
230        if let Some(relay) = self.nostr_relay.as_ref() {
231            if let Ok(event) = nostr::Event::from_json(&text) {
232                if self.mark_seen_event_id(event.id.to_hex()).await {
233                    let _ = relay
234                        .ingest_trusted_event_from_bluetooth(event, Some(self.peer_id.to_string()))
235                        .await;
236                }
237                return;
238            }
239
240            if let Ok(nostr_msg) = NostrClientMessage::from_json(&text) {
241                if let Some(client_id) = nostr_client_id {
242                    relay.handle_client_message(client_id, nostr_msg).await;
243                }
244            }
245        }
246    }
247
248    pub fn is_connected(&self) -> bool {
249        self.link.is_open()
250    }
251
252    pub fn htl_config(&self) -> &PeerHTLConfig {
253        &self.htl_config
254    }
255
256    async fn record_sent(&self, bytes: u64) {
257        if let Some(state) = self.traffic_state.as_ref() {
258            state.record_sent(&self.peer_id.to_string(), bytes).await;
259        }
260    }
261
262    async fn record_received(&self, bytes: u64) {
263        if let Some(state) = self.traffic_state.as_ref() {
264            state
265                .record_received(&self.peer_id.to_string(), bytes)
266                .await;
267        }
268    }
269
270    async fn send_frame(&self, frame: BluetoothFrame) -> Result<()> {
271        let bytes = match &frame {
272            BluetoothFrame::Text(text) => text.len() as u64,
273            BluetoothFrame::Binary(payload) => payload.len() as u64,
274        };
275        if let Err(err) = self.link.send(frame).await {
276            let _ = self.link.close().await;
277            return Err(err);
278        }
279        self.record_sent(bytes).await;
280        Ok(())
281    }
282
283    pub async fn request_with_timeout(
284        &self,
285        hash_hex: &str,
286        timeout: Duration,
287    ) -> Result<Option<Vec<u8>>> {
288        if !self.link.is_open() {
289            return Ok(None);
290        }
291
292        let hash = hex::decode(hash_hex)?;
293        let request = DataRequest {
294            h: hash,
295            htl: BLOB_REQUEST_POLICY.max_htl,
296            q: None,
297        };
298        let wire = encode_request(&request)?;
299        let (tx, rx) = oneshot::channel();
300        self.pending_requests
301            .lock()
302            .await
303            .insert(hash_hex.to_string(), tx);
304        self.send_frame(BluetoothFrame::Binary(wire)).await?;
305
306        match tokio::time::timeout(timeout, rx).await {
307            Ok(Ok(data)) => Ok(data),
308            Ok(Err(_)) => Ok(None),
309            Err(_) => {
310                self.pending_requests.lock().await.remove(hash_hex);
311                Ok(None)
312            }
313        }
314    }
315
316    pub async fn query_nostr_events(
317        &self,
318        filters: Vec<NostrFilter>,
319        timeout: Duration,
320    ) -> Result<Vec<nostr::Event>> {
321        let subscription_id = NostrSubscriptionId::generate();
322        let subscription_key = subscription_id.to_string();
323        let (tx, mut rx) = mpsc::unbounded_channel::<NostrRelayMessage>();
324
325        self.pending_nostr_queries
326            .lock()
327            .await
328            .insert(subscription_key.clone(), tx);
329
330        let req = NostrClientMessage::req(subscription_id.clone(), filters);
331        self.send_frame(BluetoothFrame::Text(req.as_json())).await?;
332
333        let mut events = Vec::new();
334        let deadline = tokio::time::Instant::now() + timeout;
335
336        loop {
337            let now = tokio::time::Instant::now();
338            if now >= deadline {
339                break;
340            }
341            match tokio::time::timeout(deadline - now, rx.recv()).await {
342                Ok(Some(NostrRelayMessage::Event {
343                    subscription_id: sid,
344                    event,
345                })) if sid == subscription_id => events.push(*event),
346                Ok(Some(NostrRelayMessage::EndOfStoredEvents(sid))) if sid == subscription_id => {
347                    break;
348                }
349                Ok(Some(NostrRelayMessage::Closed {
350                    subscription_id: sid,
351                    ..
352                })) if sid == subscription_id => break,
353                Ok(Some(_)) => {}
354                Ok(None) | Err(_) => break,
355            }
356        }
357
358        let close = NostrClientMessage::close(subscription_id.clone());
359        let _ = self.send_frame(BluetoothFrame::Text(close.as_json())).await;
360        self.pending_nostr_queries
361            .lock()
362            .await
363            .remove(&subscription_key);
364        Ok(events)
365    }
366
367    pub async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
368        let text = serde_json::to_string(frame)?;
369        self.send_frame(BluetoothFrame::Text(text)).await
370    }
371
372    pub async fn close(&self) -> Result<()> {
373        self.link.close().await
374    }
375}
376
377fn relay_subscription_id(msg: &NostrRelayMessage) -> Option<String> {
378    match msg {
379        NostrRelayMessage::Event {
380            subscription_id, ..
381        } => Some(subscription_id.to_string()),
382        NostrRelayMessage::EndOfStoredEvents(subscription_id) => Some(subscription_id.to_string()),
383        NostrRelayMessage::Closed {
384            subscription_id, ..
385        } => Some(subscription_id.to_string()),
386        NostrRelayMessage::Count {
387            subscription_id, ..
388        } => Some(subscription_id.to_string()),
389        _ => None,
390    }
391}
392
393#[cfg(test)]
394pub struct MockBluetoothLink {
395    open: std::sync::atomic::AtomicBool,
396    tx: mpsc::Sender<BluetoothFrame>,
397    rx: Mutex<mpsc::Receiver<BluetoothFrame>>,
398}
399
400#[cfg(test)]
401impl MockBluetoothLink {
402    pub fn pair() -> (Arc<Self>, Arc<Self>) {
403        let (tx_a, rx_a) = mpsc::channel(32);
404        let (tx_b, rx_b) = mpsc::channel(32);
405        (
406            Arc::new(Self {
407                open: std::sync::atomic::AtomicBool::new(true),
408                tx: tx_a,
409                rx: Mutex::new(rx_b),
410            }),
411            Arc::new(Self {
412                open: std::sync::atomic::AtomicBool::new(true),
413                tx: tx_b,
414                rx: Mutex::new(rx_a),
415            }),
416        )
417    }
418}
419
420#[cfg(test)]
421#[async_trait]
422impl BluetoothLink for MockBluetoothLink {
423    async fn send(&self, frame: BluetoothFrame) -> Result<()> {
424        use std::sync::atomic::Ordering;
425        if !self.open.load(Ordering::Relaxed) {
426            return Ok(());
427        }
428        self.tx.send(frame).await.map_err(Into::into)
429    }
430
431    async fn recv(&self) -> Option<BluetoothFrame> {
432        self.rx.lock().await.recv().await
433    }
434
435    fn is_open(&self) -> bool {
436        use std::sync::atomic::Ordering;
437        self.open.load(Ordering::Relaxed)
438    }
439
440    async fn close(&self) -> Result<()> {
441        use std::sync::atomic::Ordering;
442        self.open.store(false, Ordering::Relaxed);
443        Ok(())
444    }
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450    use anyhow::anyhow;
451    use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
452    use crate::webrtc::signaling::{ConnectionState, PeerEntry, PeerSignalPath, PeerTransport};
453    use nostr::{EventBuilder, Filter, Keys, Kind};
454    use std::collections::HashSet;
455    use std::sync::atomic::{AtomicBool, Ordering};
456    use std::sync::Arc;
457    use std::time::Instant;
458    use tempfile::TempDir;
459
460    struct TestStore {
461        blobs: HashMap<String, Vec<u8>>,
462    }
463
464    impl ContentStore for TestStore {
465        fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
466            Ok(self.blobs.get(hash_hex).cloned())
467        }
468    }
469
470    struct FailingSendLink {
471        open: AtomicBool,
472    }
473
474    #[async_trait]
475    impl BluetoothLink for FailingSendLink {
476        async fn send(&self, _frame: BluetoothFrame) -> Result<()> {
477            Err(anyhow!("send failed"))
478        }
479
480        async fn recv(&self) -> Option<BluetoothFrame> {
481            std::future::pending::<Option<BluetoothFrame>>().await
482        }
483
484        fn is_open(&self) -> bool {
485            self.open.load(Ordering::Relaxed)
486        }
487
488        async fn close(&self) -> Result<()> {
489            self.open.store(false, Ordering::Relaxed);
490            Ok(())
491        }
492    }
493
494    #[tokio::test]
495    async fn bluetooth_peer_round_trips_hash_request_over_mock_link() {
496        let (link_a, link_b) = MockBluetoothLink::pair();
497        let data = b"bluetooth mesh payload".to_vec();
498        let hash_hex = hex::encode(hashtree_core::sha256(&data));
499
500        let requester = BluetoothPeer::new(
501            PeerId::new("peer-a".to_string(), Some("sess-a".to_string())),
502            PeerDirection::Outbound,
503            link_a,
504            None,
505            None,
506            None,
507            None,
508        );
509
510        let mut blobs = HashMap::new();
511        blobs.insert(hash_hex.clone(), data.clone());
512        let responder = BluetoothPeer::new(
513            PeerId::new("peer-b".to_string(), Some("sess-b".to_string())),
514            PeerDirection::Inbound,
515            link_b,
516            Some(Arc::new(TestStore { blobs })),
517            None,
518            None,
519            None,
520        );
521
522        let received = requester
523            .request_with_timeout(&hash_hex, Duration::from_secs(1))
524            .await
525            .expect("request should succeed");
526
527        assert_eq!(received, Some(data));
528        responder.close().await.unwrap();
529    }
530
531    #[tokio::test]
532    async fn bluetooth_peer_records_bidirectional_bytes_in_router_state() {
533        let (link_a, link_b) = MockBluetoothLink::pair();
534        let state = Arc::new(WebRTCState::new());
535        let data = b"bluetooth stats payload".to_vec();
536        let hash_hex = hex::encode(hashtree_core::sha256(&data));
537        let requester_id = PeerId::new("peer-a".to_string(), Some("sess-a".to_string()));
538        let responder_id = PeerId::new("peer-b".to_string(), Some("sess-b".to_string()));
539
540        for peer_id in [&requester_id, &responder_id] {
541            state.peers.write().await.insert(
542                peer_id.to_string(),
543                PeerEntry {
544                    peer_id: peer_id.clone(),
545                    direction: PeerDirection::Outbound,
546                    state: ConnectionState::Connected,
547                    last_seen: Instant::now(),
548                    peer: None,
549                    pool: super::super::types::PeerPool::Other,
550                    transport: PeerTransport::Bluetooth,
551                    signal_paths: std::collections::BTreeSet::from([PeerSignalPath::Bluetooth]),
552                    bytes_sent: 0,
553                    bytes_received: 0,
554                },
555            );
556        }
557
558        let requester = BluetoothPeer::new(
559            requester_id.clone(),
560            PeerDirection::Outbound,
561            link_a,
562            None,
563            None,
564            None,
565            Some(state.clone()),
566        );
567
568        let mut blobs = HashMap::new();
569        blobs.insert(hash_hex.clone(), data.clone());
570        let responder = BluetoothPeer::new(
571            responder_id.clone(),
572            PeerDirection::Inbound,
573            link_b,
574            Some(Arc::new(TestStore { blobs })),
575            None,
576            None,
577            Some(state.clone()),
578        );
579
580        let received = requester
581            .request_with_timeout(&hash_hex, Duration::from_secs(1))
582            .await
583            .expect("request should succeed");
584
585        assert_eq!(received, Some(data.clone()));
586        let hash = hex::decode(&hash_hex).expect("valid hash hex");
587        let expected_request_len = encode_request(&DataRequest {
588            h: hash.clone(),
589            htl: BLOB_REQUEST_POLICY.max_htl,
590            q: None,
591        })
592        .expect("request encoding")
593        .len() as u64;
594        let expected_response_len = encode_response(&DataResponse {
595            h: hash,
596            d: data.clone(),
597        })
598        .expect("response encoding")
599        .len() as u64;
600
601        let peers = state.peers.read().await;
602        let requester_stats = peers
603            .get(&requester_id.to_string())
604            .expect("requester stats");
605        let responder_stats = peers
606            .get(&responder_id.to_string())
607            .expect("responder stats");
608        assert_eq!(requester_stats.bytes_sent, expected_request_len);
609        assert_eq!(requester_stats.bytes_received, expected_response_len);
610        assert_eq!(responder_stats.bytes_received, expected_request_len);
611        assert_eq!(responder_stats.bytes_sent, expected_response_len);
612        drop(peers);
613
614        responder.close().await.unwrap();
615    }
616
617    #[tokio::test]
618    async fn bluetooth_peer_round_trips_nostr_queries_over_mock_link() -> Result<()> {
619        let (link_a, link_b) = MockBluetoothLink::pair();
620        let tmp = TempDir::new()?;
621        let graph_store = {
622            let _guard = crate::socialgraph::test_lock();
623            crate::socialgraph::open_social_graph_store_with_mapsize(
624                tmp.path(),
625                Some(128 * 1024 * 1024),
626            )?
627        };
628        let author_keys = Keys::generate();
629        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
630        let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
631            Arc::clone(&backend),
632            0,
633            HashSet::from([author_keys.public_key().to_hex()]),
634        ));
635        let relay = Arc::new(NostrRelay::new(
636            Arc::clone(&backend),
637            tmp.path().to_path_buf(),
638            HashSet::from([author_keys.public_key().to_hex()]),
639            Some(access),
640            NostrRelayConfig {
641                spambox_db_max_bytes: 0,
642                ..Default::default()
643            },
644        )?);
645
646        let requester = BluetoothPeer::new(
647            PeerId::new("peer-a".to_string(), Some("sess-a".to_string())),
648            PeerDirection::Outbound,
649            link_a,
650            None,
651            None,
652            None,
653            None,
654        );
655        let responder = BluetoothPeer::new(
656            PeerId::new("peer-b".to_string(), Some("sess-b".to_string())),
657            PeerDirection::Inbound,
658            link_b,
659            None,
660            Some(relay.clone()),
661            None,
662            None,
663        );
664
665        let event = EventBuilder::new(Kind::TextNote, "bluetooth nostr relay", [])
666            .to_event(&author_keys)?;
667        relay.ingest_trusted_event(event.clone()).await?;
668        tokio::time::sleep(Duration::from_millis(50)).await;
669
670        let events = requester
671            .query_nostr_events(
672                vec![Filter::new()
673                    .authors(vec![event.pubkey])
674                    .kinds(vec![event.kind])],
675                Duration::from_secs(1),
676            )
677            .await?;
678
679        assert_eq!(events.len(), 1);
680        assert_eq!(events[0].id, event.id);
681        responder.close().await?;
682        Ok(())
683    }
684
685    #[tokio::test]
686    async fn bluetooth_peer_forwards_local_publishes_and_records_bluetooth_provenance() -> Result<()>
687    {
688        let (link_a, link_b) = MockBluetoothLink::pair();
689        let tmp_a = TempDir::new()?;
690        let tmp_b = TempDir::new()?;
691
692        let graph_store_a = {
693            let _guard = crate::socialgraph::test_lock();
694            crate::socialgraph::open_social_graph_store_with_mapsize(
695                tmp_a.path(),
696                Some(128 * 1024 * 1024),
697            )?
698        };
699        let graph_store_b = {
700            let _guard = crate::socialgraph::test_lock();
701            crate::socialgraph::open_social_graph_store_with_mapsize(
702                tmp_b.path(),
703                Some(128 * 1024 * 1024),
704            )?
705        };
706        let author_keys = Keys::generate();
707
708        let backend_a: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store_a.clone();
709        let backend_b: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store_b.clone();
710        let access_a = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
711            Arc::clone(&backend_a),
712            0,
713            HashSet::from([author_keys.public_key().to_hex()]),
714        ));
715        let access_b = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
716            Arc::clone(&backend_b),
717            0,
718            HashSet::from([author_keys.public_key().to_hex()]),
719        ));
720
721        let relay_a = Arc::new(NostrRelay::new(
722            Arc::clone(&backend_a),
723            tmp_a.path().to_path_buf(),
724            HashSet::from([author_keys.public_key().to_hex()]),
725            Some(access_a),
726            NostrRelayConfig {
727                spambox_db_max_bytes: 0,
728                ..Default::default()
729            },
730        )?);
731        let relay_b = Arc::new(NostrRelay::new(
732            Arc::clone(&backend_b),
733            tmp_b.path().to_path_buf(),
734            HashSet::from([author_keys.public_key().to_hex()]),
735            Some(access_b),
736            NostrRelayConfig {
737                spambox_db_max_bytes: 0,
738                ..Default::default()
739            },
740        )?);
741
742        let sender = BluetoothPeer::new(
743            PeerId::new("peer-a".to_string(), Some("sess-a".to_string())),
744            PeerDirection::Outbound,
745            link_a,
746            None,
747            Some(relay_a.clone()),
748            None,
749            None,
750        );
751        let receiver = BluetoothPeer::new(
752            PeerId::new("peer-b".to_string(), Some("sess-b".to_string())),
753            PeerDirection::Inbound,
754            link_b,
755            None,
756            Some(relay_b.clone()),
757            None,
758            None,
759        );
760
761        tokio::time::sleep(Duration::from_millis(50)).await;
762
763        let cid = "ab".repeat(32);
764        let event = EventBuilder::new(
765            Kind::TextNote,
766            "bluetooth publish sync",
767            [nostr::Tag::parse(&["cid", &cid]).unwrap()],
768        )
769        .to_event(&author_keys)?;
770        relay_a.ingest_trusted_event(event.clone()).await?;
771
772        tokio::time::sleep(Duration::from_millis(150)).await;
773
774        let received = relay_b
775            .query_events(
776                &Filter::new()
777                    .authors(vec![event.pubkey])
778                    .kinds(vec![event.kind]),
779                10,
780            )
781            .await;
782        assert_eq!(
783            received
784                .iter()
785                .filter(|candidate| candidate.id == event.id)
786                .count(),
787            1
788        );
789
790        let bluetooth_events = relay_b.bluetooth_received_events(10).await;
791        assert_eq!(bluetooth_events.len(), 1);
792        assert_eq!(bluetooth_events[0].event_id, event.id.to_hex());
793        assert_eq!(
794            bluetooth_events[0].peer_id.as_deref(),
795            Some("peer-b:sess-b")
796        );
797        assert_eq!(bluetooth_events[0].cid_values, vec![cid]);
798
799        receiver.close().await?;
800        sender.close().await?;
801        Ok(())
802    }
803
804    #[tokio::test]
805    async fn bluetooth_peer_closes_after_local_publish_send_failure() -> Result<()> {
806        let tmp = TempDir::new()?;
807        let graph_store = {
808            let _guard = crate::socialgraph::test_lock();
809            crate::socialgraph::open_social_graph_store_with_mapsize(
810                tmp.path(),
811                Some(128 * 1024 * 1024),
812            )?
813        };
814        let author_keys = Keys::generate();
815        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
816        let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
817            Arc::clone(&backend),
818            0,
819            HashSet::from([author_keys.public_key().to_hex()]),
820        ));
821        let relay = Arc::new(NostrRelay::new(
822            Arc::clone(&backend),
823            tmp.path().to_path_buf(),
824            HashSet::from([author_keys.public_key().to_hex()]),
825            Some(access),
826            NostrRelayConfig {
827                spambox_db_max_bytes: 0,
828                ..Default::default()
829            },
830        )?);
831
832        let peer = BluetoothPeer::new(
833            PeerId::new("peer-a".to_string(), Some("sess-a".to_string())),
834            PeerDirection::Outbound,
835            Arc::new(FailingSendLink {
836                open: AtomicBool::new(true),
837            }),
838            None,
839            Some(relay.clone()),
840            None,
841            None,
842        );
843
844        tokio::time::sleep(Duration::from_millis(50)).await;
845        assert!(peer.is_connected());
846
847        let event = EventBuilder::new(Kind::TextNote, "close stale bluetooth peer", [])
848            .to_event(&author_keys)?;
849        relay.ingest_trusted_event(event).await?;
850
851        tokio::time::sleep(Duration::from_millis(100)).await;
852        assert!(!peer.is_connected());
853        Ok(())
854    }
855}