Skip to main content

hashtree_cli/webrtc/
bluetooth_peer.rs

1use anyhow::Result;
2use async_trait::async_trait;
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::sync::{mpsc, oneshot, Mutex};
7use tracing::{debug, warn};
8
9use crate::nostr_relay::NostrRelay;
10
11use super::peer::ContentStore;
12use super::signaling::WebRTCState;
13use super::types::{
14    encode_request, encode_response, hash_to_hex, parse_message, DataMessage, DataRequest,
15    DataResponse, MeshNostrFrame, PeerDirection, PeerHTLConfig, PeerId, TimedSeenSet,
16    BLOB_REQUEST_POLICY,
17};
18use nostr::{
19    ClientMessage as NostrClientMessage, Filter as NostrFilter, JsonUtil as NostrJsonUtil,
20    RelayMessage as NostrRelayMessage, SubscriptionId as NostrSubscriptionId, Timestamp,
21};
22
23const BLUETOOTH_SEEN_EVENT_CAP: usize = 2048;
24const BLUETOOTH_SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
25
26#[derive(Debug, Clone)]
27pub enum BluetoothFrame {
28    Text(String),
29    Binary(Vec<u8>),
30}
31
32#[async_trait]
33pub trait BluetoothLink: Send + Sync {
34    async fn send(&self, frame: BluetoothFrame) -> Result<()>;
35    async fn recv(&self) -> Option<BluetoothFrame>;
36    fn is_open(&self) -> bool;
37    async fn close(&self) -> Result<()>;
38}
39
40pub struct BluetoothPeer {
41    pub peer_id: PeerId,
42    pub direction: PeerDirection,
43    pub created_at: std::time::Instant,
44    pub connected_at: Option<std::time::Instant>,
45    link: Arc<dyn BluetoothLink>,
46    store: Option<Arc<dyn ContentStore>>,
47    pending_requests: Arc<Mutex<HashMap<String, oneshot::Sender<Option<Vec<u8>>>>>>,
48    pending_nostr_queries: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>>,
49    nostr_relay: Option<Arc<NostrRelay>>,
50    mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
51    traffic_state: Option<Arc<WebRTCState>>,
52    seen_event_ids: Arc<Mutex<TimedSeenSet>>,
53    htl_config: PeerHTLConfig,
54}
55
56impl BluetoothPeer {
57    #[allow(clippy::too_many_arguments)]
58    pub fn new(
59        peer_id: PeerId,
60        direction: PeerDirection,
61        link: Arc<dyn BluetoothLink>,
62        store: Option<Arc<dyn ContentStore>>,
63        nostr_relay: Option<Arc<NostrRelay>>,
64        mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
65        traffic_state: Option<Arc<WebRTCState>>,
66    ) -> Arc<Self> {
67        let peer = Arc::new(Self {
68            peer_id,
69            direction,
70            created_at: std::time::Instant::now(),
71            connected_at: Some(std::time::Instant::now()),
72            link,
73            store,
74            pending_requests: Arc::new(Mutex::new(HashMap::new())),
75            pending_nostr_queries: Arc::new(Mutex::new(HashMap::new())),
76            nostr_relay,
77            mesh_frame_tx,
78            traffic_state,
79            seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
80                BLUETOOTH_SEEN_EVENT_CAP,
81                BLUETOOTH_SEEN_EVENT_TTL,
82            ))),
83            htl_config: PeerHTLConfig::random(),
84        });
85        Self::spawn_reader(peer.clone());
86        peer
87    }
88
89    async fn mark_seen_event_id(&self, event_id: String) -> bool {
90        self.seen_event_ids.lock().await.insert_if_new(event_id)
91    }
92
93    fn spawn_reader(peer: Arc<Self>) {
94        tokio::spawn(async move {
95            let mut nostr_forward_task = None;
96            let mut nostr_client_id = None;
97
98            if let Some(relay) = peer.nostr_relay.as_ref() {
99                let client_id = relay.next_client_id();
100                let (nostr_tx, mut nostr_rx) = mpsc::unbounded_channel::<String>();
101                relay
102                    .register_client(client_id, nostr_tx, Some(peer.peer_id.pubkey.clone()))
103                    .await;
104                nostr_client_id = Some(client_id);
105
106                let live_subscription_id =
107                    NostrSubscriptionId::new(format!("bluetooth-live-{}", rand::random::<u64>()));
108                let _ = relay
109                    .register_subscription_query(
110                        client_id,
111                        live_subscription_id.clone(),
112                        vec![NostrFilter::new().since(Timestamp::now())],
113                    )
114                    .await;
115
116                let peer_for_forward = peer.clone();
117                nostr_forward_task = Some(tokio::spawn(async move {
118                    while let Some(text) = nostr_rx.recv().await {
119                        if let Ok(NostrRelayMessage::Event {
120                            subscription_id,
121                            event,
122                        }) = NostrRelayMessage::from_json(&text)
123                        {
124                            if subscription_id == live_subscription_id {
125                                if event.kind.is_ephemeral()
126                                    || !peer_for_forward.mark_seen_event_id(event.id.to_hex()).await
127                                {
128                                    continue;
129                                }
130                                if peer_for_forward
131                                    .send_frame(BluetoothFrame::Text(event.as_json()))
132                                    .await
133                                    .is_err()
134                                {
135                                    break;
136                                }
137                                continue;
138                            }
139                        }
140                        if peer_for_forward
141                            .send_frame(BluetoothFrame::Text(text))
142                            .await
143                            .is_err()
144                        {
145                            break;
146                        }
147                    }
148                }));
149            }
150
151            while let Some(frame) = peer.link.recv().await {
152                match frame {
153                    BluetoothFrame::Binary(data) => {
154                        if let Err(err) = peer.handle_binary_frame(data).await {
155                            debug!(
156                                "[BluetoothPeer {}] Ignoring invalid binary frame: {}",
157                                peer.peer_id.short(),
158                                err
159                            );
160                        }
161                    }
162                    BluetoothFrame::Text(text) => {
163                        peer.handle_text_frame(text, nostr_client_id).await;
164                    }
165                }
166            }
167
168            if let (Some(relay), Some(client_id)) = (peer.nostr_relay.as_ref(), nostr_client_id) {
169                relay.unregister_client(client_id).await;
170            }
171
172            if let Some(task) = nostr_forward_task {
173                let _ = task.await;
174            }
175        });
176    }
177
178    async fn handle_binary_frame(&self, data: Vec<u8>) -> Result<()> {
179        self.record_received(data.len() as u64).await;
180        match parse_message(&data)? {
181            DataMessage::Request(req) => {
182                let hash_hex = hash_to_hex(&req.h);
183                if let Some(store) = self.store.as_ref() {
184                    if let Ok(Some(data)) = store.get(&hash_hex) {
185                        let response = DataResponse {
186                            h: req.h,
187                            d: data,
188                            i: None,
189                            n: None,
190                        };
191                        let wire = encode_response(&response)?;
192                        self.send_frame(BluetoothFrame::Binary(wire)).await?;
193                    }
194                }
195            }
196            DataMessage::Response(res) => {
197                let hash_hex = hash_to_hex(&res.h);
198                if let Some(sender) = self.pending_requests.lock().await.remove(&hash_hex) {
199                    let _ = sender.send(Some(res.d));
200                }
201            }
202            other => {
203                debug!(
204                    "[BluetoothPeer {}] Ignoring unsupported Bluetooth data frame {:?}",
205                    self.peer_id.short(),
206                    other
207                );
208            }
209        }
210        Ok(())
211    }
212
213    async fn handle_text_frame(&self, text: String, nostr_client_id: Option<u64>) {
214        self.record_received(text.len() as u64).await;
215        if let Ok(mesh_frame) = serde_json::from_str::<MeshNostrFrame>(&text) {
216            if let Some(tx) = self.mesh_frame_tx.as_ref() {
217                let _ = tx.send((self.peer_id.clone(), mesh_frame)).await;
218                return;
219            }
220        }
221
222        if let Ok(relay_msg) = NostrRelayMessage::from_json(&text) {
223            if let Some(sub_id) = relay_subscription_id(&relay_msg) {
224                let sender = {
225                    let pending = self.pending_nostr_queries.lock().await;
226                    pending.get(&sub_id).cloned()
227                };
228                if let Some(tx) = sender {
229                    let _ = tx.send(relay_msg);
230                    return;
231                }
232            }
233        }
234
235        if let Some(relay) = self.nostr_relay.as_ref() {
236            if let Ok(event) = nostr::Event::from_json(&text) {
237                if self.mark_seen_event_id(event.id.to_hex()).await {
238                    let _ = relay
239                        .ingest_trusted_event_from_bluetooth(event, Some(self.peer_id.to_string()))
240                        .await;
241                }
242                return;
243            }
244
245            if let Ok(nostr_msg) = NostrClientMessage::from_json(&text) {
246                if let Some(client_id) = nostr_client_id {
247                    relay.handle_client_message(client_id, nostr_msg).await;
248                }
249            }
250        }
251    }
252
253    pub fn is_connected(&self) -> bool {
254        self.link.is_open()
255    }
256
257    pub fn htl_config(&self) -> &PeerHTLConfig {
258        &self.htl_config
259    }
260
261    async fn record_sent(&self, bytes: u64) {
262        if let Some(state) = self.traffic_state.as_ref() {
263            state.record_sent(&self.peer_id.to_string(), bytes).await;
264        }
265    }
266
267    async fn record_received(&self, bytes: u64) {
268        if let Some(state) = self.traffic_state.as_ref() {
269            state
270                .record_received(&self.peer_id.to_string(), bytes)
271                .await;
272        }
273    }
274
275    async fn send_frame(&self, frame: BluetoothFrame) -> Result<()> {
276        let bytes = match &frame {
277            BluetoothFrame::Text(text) => text.len() as u64,
278            BluetoothFrame::Binary(payload) => payload.len() as u64,
279        };
280        if let Err(err) = self.link.send(frame).await {
281            warn!(
282                "[BluetoothPeer {}] Failed to send frame over BLE: {}",
283                self.peer_id.short(),
284                err
285            );
286            let _ = self.link.close().await;
287            return Err(err);
288        }
289        self.record_sent(bytes).await;
290        Ok(())
291    }
292
293    pub async fn request_with_timeout(
294        &self,
295        hash_hex: &str,
296        timeout: Duration,
297    ) -> Result<Option<Vec<u8>>> {
298        if !self.link.is_open() {
299            return Ok(None);
300        }
301
302        let hash = hex::decode(hash_hex)?;
303        let request = DataRequest {
304            h: hash,
305            htl: BLOB_REQUEST_POLICY.max_htl,
306            q: None,
307        };
308        let wire = encode_request(&request)?;
309        let (tx, rx) = oneshot::channel();
310        self.pending_requests
311            .lock()
312            .await
313            .insert(hash_hex.to_string(), tx);
314        self.send_frame(BluetoothFrame::Binary(wire)).await?;
315
316        match tokio::time::timeout(timeout, rx).await {
317            Ok(Ok(data)) => Ok(data),
318            Ok(Err(_)) => Ok(None),
319            Err(_) => {
320                self.pending_requests.lock().await.remove(hash_hex);
321                Ok(None)
322            }
323        }
324    }
325
326    pub async fn query_nostr_events(
327        &self,
328        filters: Vec<NostrFilter>,
329        timeout: Duration,
330    ) -> Result<Vec<nostr::Event>> {
331        let subscription_id = NostrSubscriptionId::generate();
332        let subscription_key = subscription_id.to_string();
333        let (tx, mut rx) = mpsc::unbounded_channel::<NostrRelayMessage>();
334
335        self.pending_nostr_queries
336            .lock()
337            .await
338            .insert(subscription_key.clone(), tx);
339
340        let req = NostrClientMessage::req(subscription_id.clone(), filters);
341        self.send_frame(BluetoothFrame::Text(req.as_json())).await?;
342
343        let mut events = Vec::new();
344        let deadline = tokio::time::Instant::now() + timeout;
345
346        loop {
347            let now = tokio::time::Instant::now();
348            if now >= deadline {
349                break;
350            }
351            match tokio::time::timeout(deadline - now, rx.recv()).await {
352                Ok(Some(NostrRelayMessage::Event {
353                    subscription_id: sid,
354                    event,
355                })) if sid == subscription_id => events.push(*event),
356                Ok(Some(NostrRelayMessage::EndOfStoredEvents(sid))) if sid == subscription_id => {
357                    break;
358                }
359                Ok(Some(NostrRelayMessage::Closed {
360                    subscription_id: sid,
361                    ..
362                })) if sid == subscription_id => break,
363                Ok(Some(_)) => {}
364                Ok(None) | Err(_) => break,
365            }
366        }
367
368        let close = NostrClientMessage::close(subscription_id.clone());
369        let _ = self.send_frame(BluetoothFrame::Text(close.as_json())).await;
370        self.pending_nostr_queries
371            .lock()
372            .await
373            .remove(&subscription_key);
374        Ok(events)
375    }
376
377    pub async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
378        let text = serde_json::to_string(frame)?;
379        self.send_frame(BluetoothFrame::Text(text)).await
380    }
381
382    pub async fn close(&self) -> Result<()> {
383        self.link.close().await
384    }
385}
386
387fn relay_subscription_id(msg: &NostrRelayMessage) -> Option<String> {
388    match msg {
389        NostrRelayMessage::Event {
390            subscription_id, ..
391        } => Some(subscription_id.to_string()),
392        NostrRelayMessage::EndOfStoredEvents(subscription_id) => Some(subscription_id.to_string()),
393        NostrRelayMessage::Closed {
394            subscription_id, ..
395        } => Some(subscription_id.to_string()),
396        NostrRelayMessage::Count {
397            subscription_id, ..
398        } => Some(subscription_id.to_string()),
399        _ => None,
400    }
401}
402
403#[cfg(test)]
404pub struct MockBluetoothLink {
405    open: std::sync::atomic::AtomicBool,
406    tx: mpsc::Sender<BluetoothFrame>,
407    rx: Mutex<mpsc::Receiver<BluetoothFrame>>,
408}
409
410#[cfg(test)]
411impl MockBluetoothLink {
412    pub fn pair() -> (Arc<Self>, Arc<Self>) {
413        let (tx_a, rx_a) = mpsc::channel(32);
414        let (tx_b, rx_b) = mpsc::channel(32);
415        (
416            Arc::new(Self {
417                open: std::sync::atomic::AtomicBool::new(true),
418                tx: tx_a,
419                rx: Mutex::new(rx_b),
420            }),
421            Arc::new(Self {
422                open: std::sync::atomic::AtomicBool::new(true),
423                tx: tx_b,
424                rx: Mutex::new(rx_a),
425            }),
426        )
427    }
428}
429
430#[cfg(test)]
431#[async_trait]
432impl BluetoothLink for MockBluetoothLink {
433    async fn send(&self, frame: BluetoothFrame) -> Result<()> {
434        use std::sync::atomic::Ordering;
435        if !self.open.load(Ordering::Relaxed) {
436            return Ok(());
437        }
438        self.tx.send(frame).await.map_err(Into::into)
439    }
440
441    async fn recv(&self) -> Option<BluetoothFrame> {
442        self.rx.lock().await.recv().await
443    }
444
445    fn is_open(&self) -> bool {
446        use std::sync::atomic::Ordering;
447        self.open.load(Ordering::Relaxed)
448    }
449
450    async fn close(&self) -> Result<()> {
451        use std::sync::atomic::Ordering;
452        self.open.store(false, Ordering::Relaxed);
453        Ok(())
454    }
455}
456
457#[cfg(test)]
458mod tests {
459    use super::*;
460    use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
461    use crate::webrtc::signaling::{ConnectionState, PeerEntry, PeerSignalPath, PeerTransport};
462    use anyhow::anyhow;
463    use nostr::{EventBuilder, Filter, Keys, Kind};
464    use std::collections::HashSet;
465    use std::sync::atomic::{AtomicBool, Ordering};
466    use std::sync::Arc;
467    use std::time::Instant;
468    use tempfile::TempDir;
469
470    struct TestStore {
471        blobs: HashMap<String, Vec<u8>>,
472    }
473
474    impl ContentStore for TestStore {
475        fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
476            Ok(self.blobs.get(hash_hex).cloned())
477        }
478    }
479
480    struct FailingSendLink {
481        open: AtomicBool,
482    }
483
484    #[async_trait]
485    impl BluetoothLink for FailingSendLink {
486        async fn send(&self, _frame: BluetoothFrame) -> Result<()> {
487            Err(anyhow!("send failed"))
488        }
489
490        async fn recv(&self) -> Option<BluetoothFrame> {
491            std::future::pending::<Option<BluetoothFrame>>().await
492        }
493
494        fn is_open(&self) -> bool {
495            self.open.load(Ordering::Relaxed)
496        }
497
498        async fn close(&self) -> Result<()> {
499            self.open.store(false, Ordering::Relaxed);
500            Ok(())
501        }
502    }
503
504    #[tokio::test]
505    async fn bluetooth_peer_round_trips_hash_request_over_mock_link() {
506        let (link_a, link_b) = MockBluetoothLink::pair();
507        let data = b"bluetooth mesh payload".to_vec();
508        let hash_hex = hex::encode(hashtree_core::sha256(&data));
509
510        let requester = BluetoothPeer::new(
511            PeerId::new("peer-a".to_string()),
512            PeerDirection::Outbound,
513            link_a,
514            None,
515            None,
516            None,
517            None,
518        );
519
520        let mut blobs = HashMap::new();
521        blobs.insert(hash_hex.clone(), data.clone());
522        let responder = BluetoothPeer::new(
523            PeerId::new("peer-b".to_string()),
524            PeerDirection::Inbound,
525            link_b,
526            Some(Arc::new(TestStore { blobs })),
527            None,
528            None,
529            None,
530        );
531
532        let received = requester
533            .request_with_timeout(&hash_hex, Duration::from_secs(1))
534            .await
535            .expect("request should succeed");
536
537        assert_eq!(received, Some(data));
538        responder.close().await.unwrap();
539    }
540
541    #[tokio::test]
542    async fn bluetooth_peer_records_bidirectional_bytes_in_router_state() {
543        let (link_a, link_b) = MockBluetoothLink::pair();
544        let state = Arc::new(WebRTCState::new());
545        let data = b"bluetooth stats payload".to_vec();
546        let hash_hex = hex::encode(hashtree_core::sha256(&data));
547        let requester_id = PeerId::new("peer-a".to_string());
548        let responder_id = PeerId::new("peer-b".to_string());
549
550        for peer_id in [&requester_id, &responder_id] {
551            state.peers.write().await.insert(
552                peer_id.to_string(),
553                PeerEntry {
554                    peer_id: peer_id.clone(),
555                    direction: PeerDirection::Outbound,
556                    state: ConnectionState::Connected,
557                    last_seen: Instant::now(),
558                    peer: None,
559                    pool: super::super::types::PeerPool::Other,
560                    transport: PeerTransport::Bluetooth,
561                    signal_paths: std::collections::BTreeSet::from([PeerSignalPath::Bluetooth]),
562                    bytes_sent: 0,
563                    bytes_received: 0,
564                },
565            );
566        }
567
568        let requester = BluetoothPeer::new(
569            requester_id.clone(),
570            PeerDirection::Outbound,
571            link_a,
572            None,
573            None,
574            None,
575            Some(state.clone()),
576        );
577
578        let mut blobs = HashMap::new();
579        blobs.insert(hash_hex.clone(), data.clone());
580        let responder = BluetoothPeer::new(
581            responder_id.clone(),
582            PeerDirection::Inbound,
583            link_b,
584            Some(Arc::new(TestStore { blobs })),
585            None,
586            None,
587            Some(state.clone()),
588        );
589
590        let received = requester
591            .request_with_timeout(&hash_hex, Duration::from_secs(1))
592            .await
593            .expect("request should succeed");
594
595        assert_eq!(received, Some(data.clone()));
596        let hash = hex::decode(&hash_hex).expect("valid hash hex");
597        let expected_request_len = encode_request(&DataRequest {
598            h: hash.clone(),
599            htl: BLOB_REQUEST_POLICY.max_htl,
600            q: None,
601        })
602        .expect("request encoding")
603        .len() as u64;
604        let expected_response_len = encode_response(&DataResponse {
605            h: hash,
606            d: data.clone(),
607            i: None,
608            n: None,
609        })
610        .expect("response encoding")
611        .len() as u64;
612
613        let peers = state.peers.read().await;
614        let requester_stats = peers
615            .get(&requester_id.to_string())
616            .expect("requester stats");
617        let responder_stats = peers
618            .get(&responder_id.to_string())
619            .expect("responder stats");
620        assert_eq!(requester_stats.bytes_sent, expected_request_len);
621        assert_eq!(requester_stats.bytes_received, expected_response_len);
622        assert_eq!(responder_stats.bytes_received, expected_request_len);
623        assert_eq!(responder_stats.bytes_sent, expected_response_len);
624        drop(peers);
625
626        responder.close().await.unwrap();
627    }
628
629    #[tokio::test]
630    async fn bluetooth_peer_round_trips_nostr_queries_over_mock_link() -> Result<()> {
631        let (link_a, link_b) = MockBluetoothLink::pair();
632        let tmp = TempDir::new()?;
633        let graph_store = {
634            let _guard = crate::socialgraph::test_lock();
635            crate::socialgraph::open_social_graph_store_with_mapsize(
636                tmp.path(),
637                Some(128 * 1024 * 1024),
638            )?
639        };
640        let author_keys = Keys::generate();
641        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
642        let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
643            Arc::clone(&backend),
644            0,
645            HashSet::from([author_keys.public_key().to_hex()]),
646        ));
647        let relay = Arc::new(NostrRelay::new(
648            Arc::clone(&backend),
649            tmp.path().to_path_buf(),
650            HashSet::from([author_keys.public_key().to_hex()]),
651            Some(access),
652            NostrRelayConfig {
653                spambox_db_max_bytes: 0,
654                ..Default::default()
655            },
656        )?);
657
658        let requester = BluetoothPeer::new(
659            PeerId::new("peer-a".to_string()),
660            PeerDirection::Outbound,
661            link_a,
662            None,
663            None,
664            None,
665            None,
666        );
667        let responder = BluetoothPeer::new(
668            PeerId::new("peer-b".to_string()),
669            PeerDirection::Inbound,
670            link_b,
671            None,
672            Some(relay.clone()),
673            None,
674            None,
675        );
676
677        let event = EventBuilder::new(Kind::TextNote, "bluetooth nostr relay", [])
678            .to_event(&author_keys)?;
679        relay.ingest_trusted_event(event.clone()).await?;
680        tokio::time::sleep(Duration::from_millis(50)).await;
681
682        let events = requester
683            .query_nostr_events(
684                vec![Filter::new()
685                    .authors(vec![event.pubkey])
686                    .kinds(vec![event.kind])],
687                Duration::from_secs(1),
688            )
689            .await?;
690
691        assert_eq!(events.len(), 1);
692        assert_eq!(events[0].id, event.id);
693        responder.close().await?;
694        Ok(())
695    }
696
697    #[tokio::test]
698    async fn bluetooth_peer_forwards_local_publishes_and_records_bluetooth_provenance() -> Result<()>
699    {
700        let (link_a, link_b) = MockBluetoothLink::pair();
701        let tmp_a = TempDir::new()?;
702        let tmp_b = TempDir::new()?;
703
704        let graph_store_a = {
705            let _guard = crate::socialgraph::test_lock();
706            crate::socialgraph::open_social_graph_store_with_mapsize(
707                tmp_a.path(),
708                Some(128 * 1024 * 1024),
709            )?
710        };
711        let graph_store_b = {
712            let _guard = crate::socialgraph::test_lock();
713            crate::socialgraph::open_social_graph_store_with_mapsize(
714                tmp_b.path(),
715                Some(128 * 1024 * 1024),
716            )?
717        };
718        let author_keys = Keys::generate();
719
720        let backend_a: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store_a.clone();
721        let backend_b: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store_b.clone();
722        let access_a = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
723            Arc::clone(&backend_a),
724            0,
725            HashSet::from([author_keys.public_key().to_hex()]),
726        ));
727        let access_b = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
728            Arc::clone(&backend_b),
729            0,
730            HashSet::from([author_keys.public_key().to_hex()]),
731        ));
732
733        let relay_a = Arc::new(NostrRelay::new(
734            Arc::clone(&backend_a),
735            tmp_a.path().to_path_buf(),
736            HashSet::from([author_keys.public_key().to_hex()]),
737            Some(access_a),
738            NostrRelayConfig {
739                spambox_db_max_bytes: 0,
740                ..Default::default()
741            },
742        )?);
743        let relay_b = Arc::new(NostrRelay::new(
744            Arc::clone(&backend_b),
745            tmp_b.path().to_path_buf(),
746            HashSet::from([author_keys.public_key().to_hex()]),
747            Some(access_b),
748            NostrRelayConfig {
749                spambox_db_max_bytes: 0,
750                ..Default::default()
751            },
752        )?);
753
754        let sender = BluetoothPeer::new(
755            PeerId::new("peer-a".to_string()),
756            PeerDirection::Outbound,
757            link_a,
758            None,
759            Some(relay_a.clone()),
760            None,
761            None,
762        );
763        let receiver = BluetoothPeer::new(
764            PeerId::new("peer-b".to_string()),
765            PeerDirection::Inbound,
766            link_b,
767            None,
768            Some(relay_b.clone()),
769            None,
770            None,
771        );
772
773        tokio::time::sleep(Duration::from_millis(50)).await;
774
775        let cid = "ab".repeat(32);
776        let event = EventBuilder::new(
777            Kind::TextNote,
778            "bluetooth publish sync",
779            [nostr::Tag::parse(&["cid", &cid]).unwrap()],
780        )
781        .to_event(&author_keys)?;
782        relay_a.ingest_trusted_event(event.clone()).await?;
783
784        tokio::time::sleep(Duration::from_millis(150)).await;
785
786        let received = relay_b
787            .query_events(
788                &Filter::new()
789                    .authors(vec![event.pubkey])
790                    .kinds(vec![event.kind]),
791                10,
792            )
793            .await;
794        assert_eq!(
795            received
796                .iter()
797                .filter(|candidate| candidate.id == event.id)
798                .count(),
799            1
800        );
801
802        let bluetooth_events = relay_b.bluetooth_received_events(10).await;
803        assert_eq!(bluetooth_events.len(), 1);
804        assert_eq!(bluetooth_events[0].event_id, event.id.to_hex());
805        assert_eq!(bluetooth_events[0].peer_id.as_deref(), Some("peer-b"));
806        assert_eq!(bluetooth_events[0].cid_values, vec![cid]);
807
808        receiver.close().await?;
809        sender.close().await?;
810        Ok(())
811    }
812
813    #[tokio::test]
814    async fn bluetooth_peer_forwards_local_publishes_both_directions() -> Result<()> {
815        let (link_a, link_b) = MockBluetoothLink::pair();
816        let tmp_a = TempDir::new()?;
817        let tmp_b = TempDir::new()?;
818
819        let graph_store_a = {
820            let _guard = crate::socialgraph::test_lock();
821            crate::socialgraph::open_social_graph_store_with_mapsize(
822                tmp_a.path(),
823                Some(128 * 1024 * 1024),
824            )?
825        };
826        let graph_store_b = {
827            let _guard = crate::socialgraph::test_lock();
828            crate::socialgraph::open_social_graph_store_with_mapsize(
829                tmp_b.path(),
830                Some(128 * 1024 * 1024),
831            )?
832        };
833        let author_keys_a = Keys::generate();
834        let author_keys_b = Keys::generate();
835
836        let backend_a: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store_a.clone();
837        let backend_b: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store_b.clone();
838        let access_a = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
839            Arc::clone(&backend_a),
840            0,
841            HashSet::from([
842                author_keys_a.public_key().to_hex(),
843                author_keys_b.public_key().to_hex(),
844            ]),
845        ));
846        let access_b = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
847            Arc::clone(&backend_b),
848            0,
849            HashSet::from([
850                author_keys_a.public_key().to_hex(),
851                author_keys_b.public_key().to_hex(),
852            ]),
853        ));
854
855        let relay_a = Arc::new(NostrRelay::new(
856            Arc::clone(&backend_a),
857            tmp_a.path().to_path_buf(),
858            HashSet::from([
859                author_keys_a.public_key().to_hex(),
860                author_keys_b.public_key().to_hex(),
861            ]),
862            Some(access_a),
863            NostrRelayConfig {
864                spambox_db_max_bytes: 0,
865                ..Default::default()
866            },
867        )?);
868        let relay_b = Arc::new(NostrRelay::new(
869            Arc::clone(&backend_b),
870            tmp_b.path().to_path_buf(),
871            HashSet::from([
872                author_keys_a.public_key().to_hex(),
873                author_keys_b.public_key().to_hex(),
874            ]),
875            Some(access_b),
876            NostrRelayConfig {
877                spambox_db_max_bytes: 0,
878                ..Default::default()
879            },
880        )?);
881
882        let peer_a = BluetoothPeer::new(
883            PeerId::new("peer-a".to_string()),
884            PeerDirection::Outbound,
885            link_a,
886            None,
887            Some(relay_a.clone()),
888            None,
889            None,
890        );
891        let peer_b = BluetoothPeer::new(
892            PeerId::new("peer-b".to_string()),
893            PeerDirection::Inbound,
894            link_b,
895            None,
896            Some(relay_b.clone()),
897            None,
898            None,
899        );
900
901        tokio::time::sleep(Duration::from_millis(50)).await;
902
903        let cid_a = "ab".repeat(32);
904        let event_a = EventBuilder::new(
905            Kind::TextNote,
906            "bluetooth publish from a",
907            [nostr::Tag::parse(&["cid", &cid_a]).unwrap()],
908        )
909        .to_event(&author_keys_a)?;
910        relay_a.ingest_trusted_event(event_a.clone()).await?;
911
912        let cid_b = "cd".repeat(32);
913        let event_b = EventBuilder::new(
914            Kind::TextNote,
915            "bluetooth publish from b",
916            [nostr::Tag::parse(&["cid", &cid_b]).unwrap()],
917        )
918        .to_event(&author_keys_b)?;
919        relay_b.ingest_trusted_event(event_b.clone()).await?;
920
921        tokio::time::sleep(Duration::from_millis(200)).await;
922
923        let received_on_b = relay_b
924            .query_events(
925                &Filter::new()
926                    .authors(vec![event_a.pubkey])
927                    .kinds(vec![event_a.kind]),
928                10,
929            )
930            .await;
931        assert_eq!(
932            received_on_b
933                .iter()
934                .filter(|candidate| candidate.id == event_a.id)
935                .count(),
936            1
937        );
938
939        let received_on_a = relay_a
940            .query_events(
941                &Filter::new()
942                    .authors(vec![event_b.pubkey])
943                    .kinds(vec![event_b.kind]),
944                10,
945            )
946            .await;
947        assert_eq!(
948            received_on_a
949                .iter()
950                .filter(|candidate| candidate.id == event_b.id)
951                .count(),
952            1
953        );
954
955        let bluetooth_events_a = relay_a.bluetooth_received_events(10).await;
956        assert_eq!(bluetooth_events_a.len(), 1);
957        assert_eq!(bluetooth_events_a[0].event_id, event_b.id.to_hex());
958        assert_eq!(bluetooth_events_a[0].peer_id.as_deref(), Some("peer-a"));
959        assert_eq!(bluetooth_events_a[0].cid_values, vec![cid_b]);
960
961        let bluetooth_events_b = relay_b.bluetooth_received_events(10).await;
962        assert_eq!(bluetooth_events_b.len(), 1);
963        assert_eq!(bluetooth_events_b[0].event_id, event_a.id.to_hex());
964        assert_eq!(bluetooth_events_b[0].peer_id.as_deref(), Some("peer-b"));
965        assert_eq!(bluetooth_events_b[0].cid_values, vec![cid_a]);
966
967        peer_b.close().await?;
968        peer_a.close().await?;
969        Ok(())
970    }
971
972    #[tokio::test]
973    async fn bluetooth_peer_closes_after_local_publish_send_failure() -> Result<()> {
974        let tmp = TempDir::new()?;
975        let graph_store = {
976            let _guard = crate::socialgraph::test_lock();
977            crate::socialgraph::open_social_graph_store_with_mapsize(
978                tmp.path(),
979                Some(128 * 1024 * 1024),
980            )?
981        };
982        let author_keys = Keys::generate();
983        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
984        let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
985            Arc::clone(&backend),
986            0,
987            HashSet::from([author_keys.public_key().to_hex()]),
988        ));
989        let relay = Arc::new(NostrRelay::new(
990            Arc::clone(&backend),
991            tmp.path().to_path_buf(),
992            HashSet::from([author_keys.public_key().to_hex()]),
993            Some(access),
994            NostrRelayConfig {
995                spambox_db_max_bytes: 0,
996                ..Default::default()
997            },
998        )?);
999
1000        let peer = BluetoothPeer::new(
1001            PeerId::new("peer-a".to_string()),
1002            PeerDirection::Outbound,
1003            Arc::new(FailingSendLink {
1004                open: AtomicBool::new(true),
1005            }),
1006            None,
1007            Some(relay.clone()),
1008            None,
1009            None,
1010        );
1011
1012        tokio::time::sleep(Duration::from_millis(50)).await;
1013        assert!(peer.is_connected());
1014
1015        let event = EventBuilder::new(Kind::TextNote, "close stale bluetooth peer", [])
1016            .to_event(&author_keys)?;
1017        relay.ingest_trusted_event(event).await?;
1018
1019        tokio::time::sleep(Duration::from_millis(100)).await;
1020        assert!(!peer.is_connected());
1021        Ok(())
1022    }
1023}