Skip to main content

hashtree_network/
bluetooth_peer.rs

1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::sync::{mpsc, oneshot, Mutex};
7use tracing::{debug, warn};
8
9use crate::manager::WebRTCState;
10use crate::peer::ContentStore;
11use crate::relay_bridge::SharedMeshRelayClient;
12use crate::{
13    encode_request, encode_response, hash_to_key, parse_message, DataMessage, DataRequest,
14    DataResponse, MeshNostrFrame, PeerDirection, PeerHTLConfig, PeerId, TimedSeenSet,
15    BLOB_REQUEST_POLICY,
16};
17use nostr_sdk::nostr::{
18    ClientMessage as NostrClientMessage, Event, Filter as NostrFilter, JsonUtil as NostrJsonUtil,
19    RelayMessage as NostrRelayMessage, SubscriptionId as NostrSubscriptionId, Timestamp,
20};
21
22const BLUETOOTH_SEEN_EVENT_CAP: usize = 2048;
23const BLUETOOTH_SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
24
25type PendingBlobRequests = Arc<Mutex<HashMap<String, oneshot::Sender<Option<Vec<u8>>>>>>;
26type PendingNostrQueries = Arc<Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>>;
27
28#[derive(Debug, Clone)]
29pub enum BluetoothFrame {
30    Text(String),
31    Binary(Vec<u8>),
32}
33
34#[async_trait]
35pub trait BluetoothLink: Send + Sync {
36    async fn send(&self, frame: BluetoothFrame) -> Result<()>;
37    async fn recv(&self) -> Option<BluetoothFrame>;
38    fn is_open(&self) -> bool;
39    async fn close(&self) -> Result<()>;
40}
41
42pub struct BluetoothPeer {
43    pub peer_id: PeerId,
44    pub direction: PeerDirection,
45    pub created_at: std::time::Instant,
46    pub connected_at: Option<std::time::Instant>,
47    link: Arc<dyn BluetoothLink>,
48    store: Option<Arc<dyn ContentStore>>,
49    pending_requests: PendingBlobRequests,
50    pending_nostr_queries: PendingNostrQueries,
51    nostr_relay: Option<SharedMeshRelayClient>,
52    mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
53    traffic_state: Option<Arc<WebRTCState>>,
54    seen_event_ids: Arc<Mutex<TimedSeenSet>>,
55    htl_config: PeerHTLConfig,
56}
57
58impl BluetoothPeer {
59    #[allow(clippy::too_many_arguments)]
60    pub fn new(
61        peer_id: PeerId,
62        direction: PeerDirection,
63        link: Arc<dyn BluetoothLink>,
64        store: Option<Arc<dyn ContentStore>>,
65        nostr_relay: Option<SharedMeshRelayClient>,
66        mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
67        traffic_state: Option<Arc<WebRTCState>>,
68    ) -> Arc<Self> {
69        let peer = Arc::new(Self {
70            peer_id,
71            direction,
72            created_at: std::time::Instant::now(),
73            connected_at: Some(std::time::Instant::now()),
74            link,
75            store,
76            pending_requests: Arc::new(Mutex::new(HashMap::new())),
77            pending_nostr_queries: Arc::new(Mutex::new(HashMap::new())),
78            nostr_relay,
79            mesh_frame_tx,
80            traffic_state,
81            seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
82                BLUETOOTH_SEEN_EVENT_CAP,
83                BLUETOOTH_SEEN_EVENT_TTL,
84            ))),
85            htl_config: PeerHTLConfig::random(),
86        });
87        Self::spawn_reader(peer.clone());
88        peer
89    }
90
91    async fn mark_seen_event_id(&self, event_id: String) -> bool {
92        self.seen_event_ids.lock().await.insert_if_new(event_id)
93    }
94
95    fn spawn_reader(peer: Arc<Self>) {
96        tokio::spawn(async move {
97            let mut nostr_forward_task = None;
98            let mut nostr_client_id = None;
99
100            if let Some(relay) = peer.nostr_relay.as_ref() {
101                let client_id = relay.next_client_id();
102                let (nostr_tx, mut nostr_rx) = mpsc::unbounded_channel::<String>();
103                relay
104                    .register_client(client_id, nostr_tx, Some(peer.peer_id.pubkey.clone()))
105                    .await;
106                nostr_client_id = Some(client_id);
107
108                let live_subscription_id =
109                    NostrSubscriptionId::new(format!("bluetooth-live-{}", rand::random::<u64>()));
110                let _ = relay
111                    .register_subscription_query(
112                        client_id,
113                        live_subscription_id.clone(),
114                        vec![NostrFilter::new().since(Timestamp::now())],
115                    )
116                    .await;
117
118                let peer_for_forward = peer.clone();
119                nostr_forward_task = Some(tokio::spawn(async move {
120                    while let Some(text) = nostr_rx.recv().await {
121                        if let Ok(NostrRelayMessage::Event {
122                            subscription_id,
123                            event,
124                        }) = NostrRelayMessage::from_json(&text)
125                        {
126                            if subscription_id == live_subscription_id {
127                                if event.kind.is_ephemeral()
128                                    || !peer_for_forward.mark_seen_event_id(event.id.to_hex()).await
129                                {
130                                    continue;
131                                }
132                                if peer_for_forward
133                                    .send_frame(BluetoothFrame::Text(event.as_json()))
134                                    .await
135                                    .is_err()
136                                {
137                                    break;
138                                }
139                                continue;
140                            }
141                        }
142                        if peer_for_forward
143                            .send_frame(BluetoothFrame::Text(text))
144                            .await
145                            .is_err()
146                        {
147                            break;
148                        }
149                    }
150                }));
151            }
152
153            while let Some(frame) = peer.link.recv().await {
154                match frame {
155                    BluetoothFrame::Binary(data) => {
156                        if let Err(err) = peer.handle_binary_frame(data).await {
157                            debug!(
158                                "[BluetoothPeer {}] Ignoring invalid binary frame: {}",
159                                peer.peer_id.short(),
160                                err
161                            );
162                        }
163                    }
164                    BluetoothFrame::Text(text) => {
165                        peer.handle_text_frame(text, nostr_client_id).await;
166                    }
167                }
168            }
169
170            if let (Some(relay), Some(client_id)) = (peer.nostr_relay.as_ref(), nostr_client_id) {
171                relay.unregister_client(client_id).await;
172            }
173
174            if let Some(task) = nostr_forward_task {
175                let _ = task.await;
176            }
177        });
178    }
179
180    async fn handle_binary_frame(&self, data: Vec<u8>) -> Result<()> {
181        self.record_received(data.len() as u64).await;
182        match parse_message(&data).ok_or_else(|| anyhow!("invalid Bluetooth data frame"))? {
183            DataMessage::Request(req) => {
184                let hash_hex = hash_to_key(&req.h);
185                if let Some(store) = self.store.as_ref() {
186                    if let Ok(Some(data)) = store.get(&hash_hex) {
187                        let response = DataResponse {
188                            h: req.h,
189                            d: data,
190                            i: None,
191                            n: None,
192                        };
193                        let wire = encode_response(&response);
194                        self.send_frame(BluetoothFrame::Binary(wire)).await?;
195                    }
196                }
197            }
198            DataMessage::Response(res) => {
199                let hash_hex = hash_to_key(&res.h);
200                if let Some(sender) = self.pending_requests.lock().await.remove(&hash_hex) {
201                    let _ = sender.send(Some(res.d));
202                }
203            }
204            other => {
205                debug!(
206                    "[BluetoothPeer {}] Ignoring unsupported Bluetooth data frame {:?}",
207                    self.peer_id.short(),
208                    other
209                );
210            }
211        }
212        Ok(())
213    }
214
215    async fn handle_text_frame(&self, text: String, nostr_client_id: Option<u64>) {
216        self.record_received(text.len() as u64).await;
217        if let Ok(mesh_frame) = serde_json::from_str::<MeshNostrFrame>(&text) {
218            if let Some(tx) = self.mesh_frame_tx.as_ref() {
219                let _ = tx.send((self.peer_id.clone(), mesh_frame)).await;
220                return;
221            }
222        }
223
224        if let Ok(relay_msg) = NostrRelayMessage::from_json(&text) {
225            if let Some(sub_id) = relay_subscription_id(&relay_msg) {
226                let sender = {
227                    let pending = self.pending_nostr_queries.lock().await;
228                    pending.get(&sub_id).cloned()
229                };
230                if let Some(tx) = sender {
231                    let _ = tx.send(relay_msg);
232                    return;
233                }
234            }
235        }
236
237        if let Some(relay) = self.nostr_relay.as_ref() {
238            if let Ok(event) = Event::from_json(&text) {
239                if self.mark_seen_event_id(event.id.to_hex()).await {
240                    let _ = relay
241                        .ingest_trusted_event_from_peer(event, Some(self.peer_id.to_string()))
242                        .await;
243                }
244                return;
245            }
246
247            if let Ok(nostr_msg) = NostrClientMessage::from_json(&text) {
248                if let Some(client_id) = nostr_client_id {
249                    relay.handle_client_message(client_id, nostr_msg).await;
250                }
251            }
252        }
253    }
254
255    pub fn is_connected(&self) -> bool {
256        self.link.is_open()
257    }
258
259    pub fn htl_config(&self) -> &PeerHTLConfig {
260        &self.htl_config
261    }
262
263    async fn record_sent(&self, bytes: u64) {
264        if let Some(state) = self.traffic_state.as_ref() {
265            state.record_sent(&self.peer_id.to_string(), bytes).await;
266        }
267    }
268
269    async fn record_received(&self, bytes: u64) {
270        if let Some(state) = self.traffic_state.as_ref() {
271            state
272                .record_received(&self.peer_id.to_string(), bytes)
273                .await;
274        }
275    }
276
277    async fn send_frame(&self, frame: BluetoothFrame) -> Result<()> {
278        let bytes = match &frame {
279            BluetoothFrame::Text(text) => text.len() as u64,
280            BluetoothFrame::Binary(payload) => payload.len() as u64,
281        };
282        if let Err(err) = self.link.send(frame).await {
283            warn!(
284                "[BluetoothPeer {}] Failed to send frame over BLE: {}",
285                self.peer_id.short(),
286                err
287            );
288            let _ = self.link.close().await;
289            return Err(err);
290        }
291        self.record_sent(bytes).await;
292        Ok(())
293    }
294
295    pub async fn request_with_timeout(
296        &self,
297        hash_hex: &str,
298        timeout: Duration,
299    ) -> Result<Option<Vec<u8>>> {
300        if !self.link.is_open() {
301            return Ok(None);
302        }
303
304        let hash = hex::decode(hash_hex)?;
305        let request = DataRequest {
306            h: hash,
307            htl: BLOB_REQUEST_POLICY.max_htl,
308            q: None,
309        };
310        let wire = encode_request(&request);
311        let (tx, rx) = oneshot::channel();
312        self.pending_requests
313            .lock()
314            .await
315            .insert(hash_hex.to_string(), tx);
316        self.send_frame(BluetoothFrame::Binary(wire)).await?;
317
318        match tokio::time::timeout(timeout, rx).await {
319            Ok(Ok(data)) => Ok(data),
320            Ok(Err(_)) => Ok(None),
321            Err(_) => {
322                self.pending_requests.lock().await.remove(hash_hex);
323                Ok(None)
324            }
325        }
326    }
327
328    pub async fn query_nostr_events(
329        &self,
330        filters: Vec<NostrFilter>,
331        timeout: Duration,
332    ) -> Result<Vec<Event>> {
333        let subscription_id = NostrSubscriptionId::generate();
334        let subscription_key = subscription_id.to_string();
335        let (tx, mut rx) = mpsc::unbounded_channel::<NostrRelayMessage>();
336
337        self.pending_nostr_queries
338            .lock()
339            .await
340            .insert(subscription_key.clone(), tx);
341
342        let req = NostrClientMessage::req(subscription_id.clone(), filters);
343        self.send_frame(BluetoothFrame::Text(req.as_json())).await?;
344
345        let mut events = Vec::new();
346        let deadline = tokio::time::Instant::now() + timeout;
347
348        loop {
349            let now = tokio::time::Instant::now();
350            if now >= deadline {
351                break;
352            }
353            match tokio::time::timeout(deadline - now, rx.recv()).await {
354                Ok(Some(NostrRelayMessage::Event {
355                    subscription_id: sid,
356                    event,
357                })) if sid == subscription_id => events.push(*event),
358                Ok(Some(NostrRelayMessage::EndOfStoredEvents(sid))) if sid == subscription_id => {
359                    break;
360                }
361                Ok(Some(NostrRelayMessage::Closed {
362                    subscription_id: sid,
363                    ..
364                })) if sid == subscription_id => break,
365                Ok(Some(_)) => {}
366                Ok(None) | Err(_) => break,
367            }
368        }
369
370        let close = NostrClientMessage::close(subscription_id.clone());
371        let _ = self.send_frame(BluetoothFrame::Text(close.as_json())).await;
372        self.pending_nostr_queries
373            .lock()
374            .await
375            .remove(&subscription_key);
376        Ok(events)
377    }
378
379    pub async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
380        let text = serde_json::to_string(frame)?;
381        self.send_frame(BluetoothFrame::Text(text)).await
382    }
383
384    pub async fn close(&self) -> Result<()> {
385        self.link.close().await
386    }
387}
388
389fn relay_subscription_id(msg: &NostrRelayMessage) -> Option<String> {
390    match msg {
391        NostrRelayMessage::Event {
392            subscription_id, ..
393        } => Some(subscription_id.to_string()),
394        NostrRelayMessage::EndOfStoredEvents(subscription_id) => Some(subscription_id.to_string()),
395        NostrRelayMessage::Closed {
396            subscription_id, ..
397        } => Some(subscription_id.to_string()),
398        NostrRelayMessage::Count {
399            subscription_id, ..
400        } => Some(subscription_id.to_string()),
401        _ => None,
402    }
403}
404
405#[cfg(test)]
406pub struct MockBluetoothLink {
407    open: std::sync::atomic::AtomicBool,
408    tx: mpsc::Sender<BluetoothFrame>,
409    rx: Mutex<mpsc::Receiver<BluetoothFrame>>,
410}
411
412#[cfg(test)]
413impl MockBluetoothLink {
414    pub fn pair() -> (Arc<Self>, Arc<Self>) {
415        let (tx_a, rx_a) = mpsc::channel(32);
416        let (tx_b, rx_b) = mpsc::channel(32);
417        (
418            Arc::new(Self {
419                open: std::sync::atomic::AtomicBool::new(true),
420                tx: tx_a,
421                rx: Mutex::new(rx_b),
422            }),
423            Arc::new(Self {
424                open: std::sync::atomic::AtomicBool::new(true),
425                tx: tx_b,
426                rx: Mutex::new(rx_a),
427            }),
428        )
429    }
430}
431
432#[cfg(test)]
433#[async_trait]
434impl BluetoothLink for MockBluetoothLink {
435    async fn send(&self, frame: BluetoothFrame) -> Result<()> {
436        use std::sync::atomic::Ordering;
437        if !self.open.load(Ordering::Relaxed) {
438            return Ok(());
439        }
440        self.tx.send(frame).await.map_err(Into::into)
441    }
442
443    async fn recv(&self) -> Option<BluetoothFrame> {
444        self.rx.lock().await.recv().await
445    }
446
447    fn is_open(&self) -> bool {
448        use std::sync::atomic::Ordering;
449        self.open.load(Ordering::Relaxed)
450    }
451
452    async fn close(&self) -> Result<()> {
453        use std::sync::atomic::Ordering;
454        self.open.store(false, Ordering::Relaxed);
455        Ok(())
456    }
457}