Skip to main content

hashtree_network/
bluetooth_peer.rs

1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::sync::{mpsc, oneshot, Mutex};
7use tracing::{debug, warn};
8
9use crate::manager::WebRTCState;
10use crate::peer::ContentStore;
11use crate::relay_bridge::SharedMeshRelayClient;
12use crate::{
13    encode_request, encode_response, hash_to_key, parse_message, DataMessage, DataRequest,
14    DataResponse, MeshNostrFrame, PeerDirection, PeerHTLConfig, PeerId, TimedSeenSet,
15    BLOB_REQUEST_POLICY,
16};
17use nostr_sdk::nostr::{
18    ClientMessage as NostrClientMessage, Event, Filter as NostrFilter, JsonUtil as NostrJsonUtil,
19    RelayMessage as NostrRelayMessage, SubscriptionId as NostrSubscriptionId, Timestamp,
20};
21
22const BLUETOOTH_SEEN_EVENT_CAP: usize = 2048;
23const BLUETOOTH_SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
24
25#[derive(Debug, Clone)]
26pub enum BluetoothFrame {
27    Text(String),
28    Binary(Vec<u8>),
29}
30
31#[async_trait]
32pub trait BluetoothLink: Send + Sync {
33    async fn send(&self, frame: BluetoothFrame) -> Result<()>;
34    async fn recv(&self) -> Option<BluetoothFrame>;
35    fn is_open(&self) -> bool;
36    async fn close(&self) -> Result<()>;
37}
38
39pub struct BluetoothPeer {
40    pub peer_id: PeerId,
41    pub direction: PeerDirection,
42    pub created_at: std::time::Instant,
43    pub connected_at: Option<std::time::Instant>,
44    link: Arc<dyn BluetoothLink>,
45    store: Option<Arc<dyn ContentStore>>,
46    pending_requests: Arc<Mutex<HashMap<String, oneshot::Sender<Option<Vec<u8>>>>>>,
47    pending_nostr_queries: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>>,
48    nostr_relay: Option<SharedMeshRelayClient>,
49    mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
50    traffic_state: Option<Arc<WebRTCState>>,
51    seen_event_ids: Arc<Mutex<TimedSeenSet>>,
52    htl_config: PeerHTLConfig,
53}
54
55impl BluetoothPeer {
56    #[allow(clippy::too_many_arguments)]
57    pub fn new(
58        peer_id: PeerId,
59        direction: PeerDirection,
60        link: Arc<dyn BluetoothLink>,
61        store: Option<Arc<dyn ContentStore>>,
62        nostr_relay: Option<SharedMeshRelayClient>,
63        mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
64        traffic_state: Option<Arc<WebRTCState>>,
65    ) -> Arc<Self> {
66        let peer = Arc::new(Self {
67            peer_id,
68            direction,
69            created_at: std::time::Instant::now(),
70            connected_at: Some(std::time::Instant::now()),
71            link,
72            store,
73            pending_requests: Arc::new(Mutex::new(HashMap::new())),
74            pending_nostr_queries: Arc::new(Mutex::new(HashMap::new())),
75            nostr_relay,
76            mesh_frame_tx,
77            traffic_state,
78            seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
79                BLUETOOTH_SEEN_EVENT_CAP,
80                BLUETOOTH_SEEN_EVENT_TTL,
81            ))),
82            htl_config: PeerHTLConfig::random(),
83        });
84        Self::spawn_reader(peer.clone());
85        peer
86    }
87
88    async fn mark_seen_event_id(&self, event_id: String) -> bool {
89        self.seen_event_ids.lock().await.insert_if_new(event_id)
90    }
91
92    fn spawn_reader(peer: Arc<Self>) {
93        tokio::spawn(async move {
94            let mut nostr_forward_task = None;
95            let mut nostr_client_id = None;
96
97            if let Some(relay) = peer.nostr_relay.as_ref() {
98                let client_id = relay.next_client_id();
99                let (nostr_tx, mut nostr_rx) = mpsc::unbounded_channel::<String>();
100                relay
101                    .register_client(client_id, nostr_tx, Some(peer.peer_id.pubkey.clone()))
102                    .await;
103                nostr_client_id = Some(client_id);
104
105                let live_subscription_id =
106                    NostrSubscriptionId::new(format!("bluetooth-live-{}", rand::random::<u64>()));
107                let _ = relay
108                    .register_subscription_query(
109                        client_id,
110                        live_subscription_id.clone(),
111                        vec![NostrFilter::new().since(Timestamp::now())],
112                    )
113                    .await;
114
115                let peer_for_forward = peer.clone();
116                nostr_forward_task = Some(tokio::spawn(async move {
117                    while let Some(text) = nostr_rx.recv().await {
118                        if let Ok(NostrRelayMessage::Event {
119                            subscription_id,
120                            event,
121                        }) = NostrRelayMessage::from_json(&text)
122                        {
123                            if subscription_id == live_subscription_id {
124                                if event.kind.is_ephemeral()
125                                    || !peer_for_forward.mark_seen_event_id(event.id.to_hex()).await
126                                {
127                                    continue;
128                                }
129                                if peer_for_forward
130                                    .send_frame(BluetoothFrame::Text(event.as_json()))
131                                    .await
132                                    .is_err()
133                                {
134                                    break;
135                                }
136                                continue;
137                            }
138                        }
139                        if peer_for_forward
140                            .send_frame(BluetoothFrame::Text(text))
141                            .await
142                            .is_err()
143                        {
144                            break;
145                        }
146                    }
147                }));
148            }
149
150            while let Some(frame) = peer.link.recv().await {
151                match frame {
152                    BluetoothFrame::Binary(data) => {
153                        if let Err(err) = peer.handle_binary_frame(data).await {
154                            debug!(
155                                "[BluetoothPeer {}] Ignoring invalid binary frame: {}",
156                                peer.peer_id.short(),
157                                err
158                            );
159                        }
160                    }
161                    BluetoothFrame::Text(text) => {
162                        peer.handle_text_frame(text, nostr_client_id).await;
163                    }
164                }
165            }
166
167            if let (Some(relay), Some(client_id)) = (peer.nostr_relay.as_ref(), nostr_client_id) {
168                relay.unregister_client(client_id).await;
169            }
170
171            if let Some(task) = nostr_forward_task {
172                let _ = task.await;
173            }
174        });
175    }
176
177    async fn handle_binary_frame(&self, data: Vec<u8>) -> Result<()> {
178        self.record_received(data.len() as u64).await;
179        match parse_message(&data).ok_or_else(|| anyhow!("invalid Bluetooth data frame"))? {
180            DataMessage::Request(req) => {
181                let hash_hex = hash_to_key(&req.h);
182                if let Some(store) = self.store.as_ref() {
183                    if let Ok(Some(data)) = store.get(&hash_hex) {
184                        let response = DataResponse {
185                            h: req.h,
186                            d: data,
187                            i: None,
188                            n: None,
189                        };
190                        let wire = encode_response(&response);
191                        self.send_frame(BluetoothFrame::Binary(wire)).await?;
192                    }
193                }
194            }
195            DataMessage::Response(res) => {
196                let hash_hex = hash_to_key(&res.h);
197                if let Some(sender) = self.pending_requests.lock().await.remove(&hash_hex) {
198                    let _ = sender.send(Some(res.d));
199                }
200            }
201            other => {
202                debug!(
203                    "[BluetoothPeer {}] Ignoring unsupported Bluetooth data frame {:?}",
204                    self.peer_id.short(),
205                    other
206                );
207            }
208        }
209        Ok(())
210    }
211
212    async fn handle_text_frame(&self, text: String, nostr_client_id: Option<u64>) {
213        self.record_received(text.len() as u64).await;
214        if let Ok(mesh_frame) = serde_json::from_str::<MeshNostrFrame>(&text) {
215            if let Some(tx) = self.mesh_frame_tx.as_ref() {
216                let _ = tx.send((self.peer_id.clone(), mesh_frame)).await;
217                return;
218            }
219        }
220
221        if let Ok(relay_msg) = NostrRelayMessage::from_json(&text) {
222            if let Some(sub_id) = relay_subscription_id(&relay_msg) {
223                let sender = {
224                    let pending = self.pending_nostr_queries.lock().await;
225                    pending.get(&sub_id).cloned()
226                };
227                if let Some(tx) = sender {
228                    let _ = tx.send(relay_msg);
229                    return;
230                }
231            }
232        }
233
234        if let Some(relay) = self.nostr_relay.as_ref() {
235            if let Ok(event) = Event::from_json(&text) {
236                if self.mark_seen_event_id(event.id.to_hex()).await {
237                    let _ = relay
238                        .ingest_trusted_event_from_peer(event, Some(self.peer_id.to_string()))
239                        .await;
240                }
241                return;
242            }
243
244            if let Ok(nostr_msg) = NostrClientMessage::from_json(&text) {
245                if let Some(client_id) = nostr_client_id {
246                    relay.handle_client_message(client_id, nostr_msg).await;
247                }
248            }
249        }
250    }
251
252    pub fn is_connected(&self) -> bool {
253        self.link.is_open()
254    }
255
256    pub fn htl_config(&self) -> &PeerHTLConfig {
257        &self.htl_config
258    }
259
260    async fn record_sent(&self, bytes: u64) {
261        if let Some(state) = self.traffic_state.as_ref() {
262            state.record_sent(&self.peer_id.to_string(), bytes).await;
263        }
264    }
265
266    async fn record_received(&self, bytes: u64) {
267        if let Some(state) = self.traffic_state.as_ref() {
268            state
269                .record_received(&self.peer_id.to_string(), bytes)
270                .await;
271        }
272    }
273
274    async fn send_frame(&self, frame: BluetoothFrame) -> Result<()> {
275        let bytes = match &frame {
276            BluetoothFrame::Text(text) => text.len() as u64,
277            BluetoothFrame::Binary(payload) => payload.len() as u64,
278        };
279        if let Err(err) = self.link.send(frame).await {
280            warn!(
281                "[BluetoothPeer {}] Failed to send frame over BLE: {}",
282                self.peer_id.short(),
283                err
284            );
285            let _ = self.link.close().await;
286            return Err(err);
287        }
288        self.record_sent(bytes).await;
289        Ok(())
290    }
291
292    pub async fn request_with_timeout(
293        &self,
294        hash_hex: &str,
295        timeout: Duration,
296    ) -> Result<Option<Vec<u8>>> {
297        if !self.link.is_open() {
298            return Ok(None);
299        }
300
301        let hash = hex::decode(hash_hex)?;
302        let request = DataRequest {
303            h: hash,
304            htl: BLOB_REQUEST_POLICY.max_htl,
305            q: None,
306        };
307        let wire = encode_request(&request);
308        let (tx, rx) = oneshot::channel();
309        self.pending_requests
310            .lock()
311            .await
312            .insert(hash_hex.to_string(), tx);
313        self.send_frame(BluetoothFrame::Binary(wire)).await?;
314
315        match tokio::time::timeout(timeout, rx).await {
316            Ok(Ok(data)) => Ok(data),
317            Ok(Err(_)) => Ok(None),
318            Err(_) => {
319                self.pending_requests.lock().await.remove(hash_hex);
320                Ok(None)
321            }
322        }
323    }
324
325    pub async fn query_nostr_events(
326        &self,
327        filters: Vec<NostrFilter>,
328        timeout: Duration,
329    ) -> Result<Vec<Event>> {
330        let subscription_id = NostrSubscriptionId::generate();
331        let subscription_key = subscription_id.to_string();
332        let (tx, mut rx) = mpsc::unbounded_channel::<NostrRelayMessage>();
333
334        self.pending_nostr_queries
335            .lock()
336            .await
337            .insert(subscription_key.clone(), tx);
338
339        let req = NostrClientMessage::req(subscription_id.clone(), filters);
340        self.send_frame(BluetoothFrame::Text(req.as_json())).await?;
341
342        let mut events = Vec::new();
343        let deadline = tokio::time::Instant::now() + timeout;
344
345        loop {
346            let now = tokio::time::Instant::now();
347            if now >= deadline {
348                break;
349            }
350            match tokio::time::timeout(deadline - now, rx.recv()).await {
351                Ok(Some(NostrRelayMessage::Event {
352                    subscription_id: sid,
353                    event,
354                })) if sid == subscription_id => events.push(*event),
355                Ok(Some(NostrRelayMessage::EndOfStoredEvents(sid))) if sid == subscription_id => {
356                    break;
357                }
358                Ok(Some(NostrRelayMessage::Closed {
359                    subscription_id: sid,
360                    ..
361                })) if sid == subscription_id => break,
362                Ok(Some(_)) => {}
363                Ok(None) | Err(_) => break,
364            }
365        }
366
367        let close = NostrClientMessage::close(subscription_id.clone());
368        let _ = self.send_frame(BluetoothFrame::Text(close.as_json())).await;
369        self.pending_nostr_queries
370            .lock()
371            .await
372            .remove(&subscription_key);
373        Ok(events)
374    }
375
376    pub async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
377        let text = serde_json::to_string(frame)?;
378        self.send_frame(BluetoothFrame::Text(text)).await
379    }
380
381    pub async fn close(&self) -> Result<()> {
382        self.link.close().await
383    }
384}
385
386fn relay_subscription_id(msg: &NostrRelayMessage) -> Option<String> {
387    match msg {
388        NostrRelayMessage::Event {
389            subscription_id, ..
390        } => Some(subscription_id.to_string()),
391        NostrRelayMessage::EndOfStoredEvents(subscription_id) => Some(subscription_id.to_string()),
392        NostrRelayMessage::Closed {
393            subscription_id, ..
394        } => Some(subscription_id.to_string()),
395        NostrRelayMessage::Count {
396            subscription_id, ..
397        } => Some(subscription_id.to_string()),
398        _ => None,
399    }
400}
401
402#[cfg(test)]
403pub struct MockBluetoothLink {
404    open: std::sync::atomic::AtomicBool,
405    tx: mpsc::Sender<BluetoothFrame>,
406    rx: Mutex<mpsc::Receiver<BluetoothFrame>>,
407}
408
409#[cfg(test)]
410impl MockBluetoothLink {
411    pub fn pair() -> (Arc<Self>, Arc<Self>) {
412        let (tx_a, rx_a) = mpsc::channel(32);
413        let (tx_b, rx_b) = mpsc::channel(32);
414        (
415            Arc::new(Self {
416                open: std::sync::atomic::AtomicBool::new(true),
417                tx: tx_a,
418                rx: Mutex::new(rx_b),
419            }),
420            Arc::new(Self {
421                open: std::sync::atomic::AtomicBool::new(true),
422                tx: tx_b,
423                rx: Mutex::new(rx_a),
424            }),
425        )
426    }
427}
428
429#[cfg(test)]
430#[async_trait]
431impl BluetoothLink for MockBluetoothLink {
432    async fn send(&self, frame: BluetoothFrame) -> Result<()> {
433        use std::sync::atomic::Ordering;
434        if !self.open.load(Ordering::Relaxed) {
435            return Ok(());
436        }
437        self.tx.send(frame).await.map_err(Into::into)
438    }
439
440    async fn recv(&self) -> Option<BluetoothFrame> {
441        self.rx.lock().await.recv().await
442    }
443
444    fn is_open(&self) -> bool {
445        use std::sync::atomic::Ordering;
446        self.open.load(Ordering::Relaxed)
447    }
448
449    async fn close(&self) -> Result<()> {
450        use std::sync::atomic::Ordering;
451        self.open.store(false, Ordering::Relaxed);
452        Ok(())
453    }
454}