1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::sync::{mpsc, oneshot, Mutex};
7use tracing::{debug, warn};
8
9use crate::manager::WebRTCState;
10use crate::peer::ContentStore;
11use crate::relay_bridge::SharedMeshRelayClient;
12use crate::{
13 encode_request, encode_response, hash_to_key, parse_message, DataMessage, DataRequest,
14 DataResponse, MeshNostrFrame, PeerDirection, PeerHTLConfig, PeerId, TimedSeenSet,
15 BLOB_REQUEST_POLICY,
16};
17use nostr_sdk::nostr::{
18 ClientMessage as NostrClientMessage, Event, Filter as NostrFilter, JsonUtil as NostrJsonUtil,
19 RelayMessage as NostrRelayMessage, SubscriptionId as NostrSubscriptionId, Timestamp,
20};
21
22const BLUETOOTH_SEEN_EVENT_CAP: usize = 2048;
23const BLUETOOTH_SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
24
25#[derive(Debug, Clone)]
26pub enum BluetoothFrame {
27 Text(String),
28 Binary(Vec<u8>),
29}
30
31#[async_trait]
32pub trait BluetoothLink: Send + Sync {
33 async fn send(&self, frame: BluetoothFrame) -> Result<()>;
34 async fn recv(&self) -> Option<BluetoothFrame>;
35 fn is_open(&self) -> bool;
36 async fn close(&self) -> Result<()>;
37}
38
39pub struct BluetoothPeer {
40 pub peer_id: PeerId,
41 pub direction: PeerDirection,
42 pub created_at: std::time::Instant,
43 pub connected_at: Option<std::time::Instant>,
44 link: Arc<dyn BluetoothLink>,
45 store: Option<Arc<dyn ContentStore>>,
46 pending_requests: Arc<Mutex<HashMap<String, oneshot::Sender<Option<Vec<u8>>>>>>,
47 pending_nostr_queries: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>>,
48 nostr_relay: Option<SharedMeshRelayClient>,
49 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
50 traffic_state: Option<Arc<WebRTCState>>,
51 seen_event_ids: Arc<Mutex<TimedSeenSet>>,
52 htl_config: PeerHTLConfig,
53}
54
55impl BluetoothPeer {
56 #[allow(clippy::too_many_arguments)]
57 pub fn new(
58 peer_id: PeerId,
59 direction: PeerDirection,
60 link: Arc<dyn BluetoothLink>,
61 store: Option<Arc<dyn ContentStore>>,
62 nostr_relay: Option<SharedMeshRelayClient>,
63 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
64 traffic_state: Option<Arc<WebRTCState>>,
65 ) -> Arc<Self> {
66 let peer = Arc::new(Self {
67 peer_id,
68 direction,
69 created_at: std::time::Instant::now(),
70 connected_at: Some(std::time::Instant::now()),
71 link,
72 store,
73 pending_requests: Arc::new(Mutex::new(HashMap::new())),
74 pending_nostr_queries: Arc::new(Mutex::new(HashMap::new())),
75 nostr_relay,
76 mesh_frame_tx,
77 traffic_state,
78 seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
79 BLUETOOTH_SEEN_EVENT_CAP,
80 BLUETOOTH_SEEN_EVENT_TTL,
81 ))),
82 htl_config: PeerHTLConfig::random(),
83 });
84 Self::spawn_reader(peer.clone());
85 peer
86 }
87
88 async fn mark_seen_event_id(&self, event_id: String) -> bool {
89 self.seen_event_ids.lock().await.insert_if_new(event_id)
90 }
91
92 fn spawn_reader(peer: Arc<Self>) {
93 tokio::spawn(async move {
94 let mut nostr_forward_task = None;
95 let mut nostr_client_id = None;
96
97 if let Some(relay) = peer.nostr_relay.as_ref() {
98 let client_id = relay.next_client_id();
99 let (nostr_tx, mut nostr_rx) = mpsc::unbounded_channel::<String>();
100 relay
101 .register_client(client_id, nostr_tx, Some(peer.peer_id.pubkey.clone()))
102 .await;
103 nostr_client_id = Some(client_id);
104
105 let live_subscription_id =
106 NostrSubscriptionId::new(format!("bluetooth-live-{}", rand::random::<u64>()));
107 let _ = relay
108 .register_subscription_query(
109 client_id,
110 live_subscription_id.clone(),
111 vec![NostrFilter::new().since(Timestamp::now())],
112 )
113 .await;
114
115 let peer_for_forward = peer.clone();
116 nostr_forward_task = Some(tokio::spawn(async move {
117 while let Some(text) = nostr_rx.recv().await {
118 if let Ok(NostrRelayMessage::Event {
119 subscription_id,
120 event,
121 }) = NostrRelayMessage::from_json(&text)
122 {
123 if subscription_id == live_subscription_id {
124 if event.kind.is_ephemeral()
125 || !peer_for_forward.mark_seen_event_id(event.id.to_hex()).await
126 {
127 continue;
128 }
129 if peer_for_forward
130 .send_frame(BluetoothFrame::Text(event.as_json()))
131 .await
132 .is_err()
133 {
134 break;
135 }
136 continue;
137 }
138 }
139 if peer_for_forward
140 .send_frame(BluetoothFrame::Text(text))
141 .await
142 .is_err()
143 {
144 break;
145 }
146 }
147 }));
148 }
149
150 while let Some(frame) = peer.link.recv().await {
151 match frame {
152 BluetoothFrame::Binary(data) => {
153 if let Err(err) = peer.handle_binary_frame(data).await {
154 debug!(
155 "[BluetoothPeer {}] Ignoring invalid binary frame: {}",
156 peer.peer_id.short(),
157 err
158 );
159 }
160 }
161 BluetoothFrame::Text(text) => {
162 peer.handle_text_frame(text, nostr_client_id).await;
163 }
164 }
165 }
166
167 if let (Some(relay), Some(client_id)) = (peer.nostr_relay.as_ref(), nostr_client_id) {
168 relay.unregister_client(client_id).await;
169 }
170
171 if let Some(task) = nostr_forward_task {
172 let _ = task.await;
173 }
174 });
175 }
176
177 async fn handle_binary_frame(&self, data: Vec<u8>) -> Result<()> {
178 self.record_received(data.len() as u64).await;
179 match parse_message(&data).ok_or_else(|| anyhow!("invalid Bluetooth data frame"))? {
180 DataMessage::Request(req) => {
181 let hash_hex = hash_to_key(&req.h);
182 if let Some(store) = self.store.as_ref() {
183 if let Ok(Some(data)) = store.get(&hash_hex) {
184 let response = DataResponse {
185 h: req.h,
186 d: data,
187 i: None,
188 n: None,
189 };
190 let wire = encode_response(&response);
191 self.send_frame(BluetoothFrame::Binary(wire)).await?;
192 }
193 }
194 }
195 DataMessage::Response(res) => {
196 let hash_hex = hash_to_key(&res.h);
197 if let Some(sender) = self.pending_requests.lock().await.remove(&hash_hex) {
198 let _ = sender.send(Some(res.d));
199 }
200 }
201 other => {
202 debug!(
203 "[BluetoothPeer {}] Ignoring unsupported Bluetooth data frame {:?}",
204 self.peer_id.short(),
205 other
206 );
207 }
208 }
209 Ok(())
210 }
211
212 async fn handle_text_frame(&self, text: String, nostr_client_id: Option<u64>) {
213 self.record_received(text.len() as u64).await;
214 if let Ok(mesh_frame) = serde_json::from_str::<MeshNostrFrame>(&text) {
215 if let Some(tx) = self.mesh_frame_tx.as_ref() {
216 let _ = tx.send((self.peer_id.clone(), mesh_frame)).await;
217 return;
218 }
219 }
220
221 if let Ok(relay_msg) = NostrRelayMessage::from_json(&text) {
222 if let Some(sub_id) = relay_subscription_id(&relay_msg) {
223 let sender = {
224 let pending = self.pending_nostr_queries.lock().await;
225 pending.get(&sub_id).cloned()
226 };
227 if let Some(tx) = sender {
228 let _ = tx.send(relay_msg);
229 return;
230 }
231 }
232 }
233
234 if let Some(relay) = self.nostr_relay.as_ref() {
235 if let Ok(event) = Event::from_json(&text) {
236 if self.mark_seen_event_id(event.id.to_hex()).await {
237 let _ = relay
238 .ingest_trusted_event_from_peer(event, Some(self.peer_id.to_string()))
239 .await;
240 }
241 return;
242 }
243
244 if let Ok(nostr_msg) = NostrClientMessage::from_json(&text) {
245 if let Some(client_id) = nostr_client_id {
246 relay.handle_client_message(client_id, nostr_msg).await;
247 }
248 }
249 }
250 }
251
252 pub fn is_connected(&self) -> bool {
253 self.link.is_open()
254 }
255
256 pub fn htl_config(&self) -> &PeerHTLConfig {
257 &self.htl_config
258 }
259
260 async fn record_sent(&self, bytes: u64) {
261 if let Some(state) = self.traffic_state.as_ref() {
262 state.record_sent(&self.peer_id.to_string(), bytes).await;
263 }
264 }
265
266 async fn record_received(&self, bytes: u64) {
267 if let Some(state) = self.traffic_state.as_ref() {
268 state
269 .record_received(&self.peer_id.to_string(), bytes)
270 .await;
271 }
272 }
273
274 async fn send_frame(&self, frame: BluetoothFrame) -> Result<()> {
275 let bytes = match &frame {
276 BluetoothFrame::Text(text) => text.len() as u64,
277 BluetoothFrame::Binary(payload) => payload.len() as u64,
278 };
279 if let Err(err) = self.link.send(frame).await {
280 warn!(
281 "[BluetoothPeer {}] Failed to send frame over BLE: {}",
282 self.peer_id.short(),
283 err
284 );
285 let _ = self.link.close().await;
286 return Err(err);
287 }
288 self.record_sent(bytes).await;
289 Ok(())
290 }
291
292 pub async fn request_with_timeout(
293 &self,
294 hash_hex: &str,
295 timeout: Duration,
296 ) -> Result<Option<Vec<u8>>> {
297 if !self.link.is_open() {
298 return Ok(None);
299 }
300
301 let hash = hex::decode(hash_hex)?;
302 let request = DataRequest {
303 h: hash,
304 htl: BLOB_REQUEST_POLICY.max_htl,
305 q: None,
306 };
307 let wire = encode_request(&request);
308 let (tx, rx) = oneshot::channel();
309 self.pending_requests
310 .lock()
311 .await
312 .insert(hash_hex.to_string(), tx);
313 self.send_frame(BluetoothFrame::Binary(wire)).await?;
314
315 match tokio::time::timeout(timeout, rx).await {
316 Ok(Ok(data)) => Ok(data),
317 Ok(Err(_)) => Ok(None),
318 Err(_) => {
319 self.pending_requests.lock().await.remove(hash_hex);
320 Ok(None)
321 }
322 }
323 }
324
325 pub async fn query_nostr_events(
326 &self,
327 filters: Vec<NostrFilter>,
328 timeout: Duration,
329 ) -> Result<Vec<Event>> {
330 let subscription_id = NostrSubscriptionId::generate();
331 let subscription_key = subscription_id.to_string();
332 let (tx, mut rx) = mpsc::unbounded_channel::<NostrRelayMessage>();
333
334 self.pending_nostr_queries
335 .lock()
336 .await
337 .insert(subscription_key.clone(), tx);
338
339 let req = NostrClientMessage::req(subscription_id.clone(), filters);
340 self.send_frame(BluetoothFrame::Text(req.as_json())).await?;
341
342 let mut events = Vec::new();
343 let deadline = tokio::time::Instant::now() + timeout;
344
345 loop {
346 let now = tokio::time::Instant::now();
347 if now >= deadline {
348 break;
349 }
350 match tokio::time::timeout(deadline - now, rx.recv()).await {
351 Ok(Some(NostrRelayMessage::Event {
352 subscription_id: sid,
353 event,
354 })) if sid == subscription_id => events.push(*event),
355 Ok(Some(NostrRelayMessage::EndOfStoredEvents(sid))) if sid == subscription_id => {
356 break;
357 }
358 Ok(Some(NostrRelayMessage::Closed {
359 subscription_id: sid,
360 ..
361 })) if sid == subscription_id => break,
362 Ok(Some(_)) => {}
363 Ok(None) | Err(_) => break,
364 }
365 }
366
367 let close = NostrClientMessage::close(subscription_id.clone());
368 let _ = self.send_frame(BluetoothFrame::Text(close.as_json())).await;
369 self.pending_nostr_queries
370 .lock()
371 .await
372 .remove(&subscription_key);
373 Ok(events)
374 }
375
376 pub async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
377 let text = serde_json::to_string(frame)?;
378 self.send_frame(BluetoothFrame::Text(text)).await
379 }
380
381 pub async fn close(&self) -> Result<()> {
382 self.link.close().await
383 }
384}
385
386fn relay_subscription_id(msg: &NostrRelayMessage) -> Option<String> {
387 match msg {
388 NostrRelayMessage::Event {
389 subscription_id, ..
390 } => Some(subscription_id.to_string()),
391 NostrRelayMessage::EndOfStoredEvents(subscription_id) => Some(subscription_id.to_string()),
392 NostrRelayMessage::Closed {
393 subscription_id, ..
394 } => Some(subscription_id.to_string()),
395 NostrRelayMessage::Count {
396 subscription_id, ..
397 } => Some(subscription_id.to_string()),
398 _ => None,
399 }
400}
401
402#[cfg(test)]
403pub struct MockBluetoothLink {
404 open: std::sync::atomic::AtomicBool,
405 tx: mpsc::Sender<BluetoothFrame>,
406 rx: Mutex<mpsc::Receiver<BluetoothFrame>>,
407}
408
409#[cfg(test)]
410impl MockBluetoothLink {
411 pub fn pair() -> (Arc<Self>, Arc<Self>) {
412 let (tx_a, rx_a) = mpsc::channel(32);
413 let (tx_b, rx_b) = mpsc::channel(32);
414 (
415 Arc::new(Self {
416 open: std::sync::atomic::AtomicBool::new(true),
417 tx: tx_a,
418 rx: Mutex::new(rx_b),
419 }),
420 Arc::new(Self {
421 open: std::sync::atomic::AtomicBool::new(true),
422 tx: tx_b,
423 rx: Mutex::new(rx_a),
424 }),
425 )
426 }
427}
428
429#[cfg(test)]
430#[async_trait]
431impl BluetoothLink for MockBluetoothLink {
432 async fn send(&self, frame: BluetoothFrame) -> Result<()> {
433 use std::sync::atomic::Ordering;
434 if !self.open.load(Ordering::Relaxed) {
435 return Ok(());
436 }
437 self.tx.send(frame).await.map_err(Into::into)
438 }
439
440 async fn recv(&self) -> Option<BluetoothFrame> {
441 self.rx.lock().await.recv().await
442 }
443
444 fn is_open(&self) -> bool {
445 use std::sync::atomic::Ordering;
446 self.open.load(Ordering::Relaxed)
447 }
448
449 async fn close(&self) -> Result<()> {
450 use std::sync::atomic::Ordering;
451 self.open.store(false, Ordering::Relaxed);
452 Ok(())
453 }
454}