1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::sync::{mpsc, oneshot, Mutex};
7use tracing::{debug, warn};
8
9use crate::manager::WebRTCState;
10use crate::peer::ContentStore;
11use crate::relay_bridge::SharedMeshRelayClient;
12use crate::{
13 encode_request, encode_response, hash_to_key, parse_message, DataMessage, DataRequest,
14 DataResponse, MeshNostrFrame, PeerDirection, PeerHTLConfig, PeerId, TimedSeenSet,
15 BLOB_REQUEST_POLICY,
16};
17use nostr_sdk::nostr::{
18 ClientMessage as NostrClientMessage, Event, Filter as NostrFilter, JsonUtil as NostrJsonUtil,
19 RelayMessage as NostrRelayMessage, SubscriptionId as NostrSubscriptionId, Timestamp,
20};
21
22const BLUETOOTH_SEEN_EVENT_CAP: usize = 2048;
23const BLUETOOTH_SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
24
25type PendingBlobRequests = Arc<Mutex<HashMap<String, oneshot::Sender<Option<Vec<u8>>>>>>;
26type PendingNostrQueries = Arc<Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>>;
27
28#[derive(Debug, Clone)]
29pub enum BluetoothFrame {
30 Text(String),
31 Binary(Vec<u8>),
32}
33
34#[async_trait]
35pub trait BluetoothLink: Send + Sync {
36 async fn send(&self, frame: BluetoothFrame) -> Result<()>;
37 async fn recv(&self) -> Option<BluetoothFrame>;
38 fn is_open(&self) -> bool;
39 async fn close(&self) -> Result<()>;
40}
41
42pub struct BluetoothPeer {
43 pub peer_id: PeerId,
44 pub direction: PeerDirection,
45 pub created_at: std::time::Instant,
46 pub connected_at: Option<std::time::Instant>,
47 link: Arc<dyn BluetoothLink>,
48 store: Option<Arc<dyn ContentStore>>,
49 pending_requests: PendingBlobRequests,
50 pending_nostr_queries: PendingNostrQueries,
51 nostr_relay: Option<SharedMeshRelayClient>,
52 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
53 traffic_state: Option<Arc<WebRTCState>>,
54 seen_event_ids: Arc<Mutex<TimedSeenSet>>,
55 htl_config: PeerHTLConfig,
56}
57
58impl BluetoothPeer {
59 #[allow(clippy::too_many_arguments)]
60 pub fn new(
61 peer_id: PeerId,
62 direction: PeerDirection,
63 link: Arc<dyn BluetoothLink>,
64 store: Option<Arc<dyn ContentStore>>,
65 nostr_relay: Option<SharedMeshRelayClient>,
66 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
67 traffic_state: Option<Arc<WebRTCState>>,
68 ) -> Arc<Self> {
69 let peer = Arc::new(Self {
70 peer_id,
71 direction,
72 created_at: std::time::Instant::now(),
73 connected_at: Some(std::time::Instant::now()),
74 link,
75 store,
76 pending_requests: Arc::new(Mutex::new(HashMap::new())),
77 pending_nostr_queries: Arc::new(Mutex::new(HashMap::new())),
78 nostr_relay,
79 mesh_frame_tx,
80 traffic_state,
81 seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
82 BLUETOOTH_SEEN_EVENT_CAP,
83 BLUETOOTH_SEEN_EVENT_TTL,
84 ))),
85 htl_config: PeerHTLConfig::random(),
86 });
87 Self::spawn_reader(peer.clone());
88 peer
89 }
90
91 async fn mark_seen_event_id(&self, event_id: String) -> bool {
92 self.seen_event_ids.lock().await.insert_if_new(event_id)
93 }
94
95 fn spawn_reader(peer: Arc<Self>) {
96 tokio::spawn(async move {
97 let mut nostr_forward_task = None;
98 let mut nostr_client_id = None;
99
100 if let Some(relay) = peer.nostr_relay.as_ref() {
101 let client_id = relay.next_client_id();
102 let (nostr_tx, mut nostr_rx) = mpsc::unbounded_channel::<String>();
103 relay
104 .register_client(client_id, nostr_tx, Some(peer.peer_id.pubkey.clone()))
105 .await;
106 nostr_client_id = Some(client_id);
107
108 let live_subscription_id =
109 NostrSubscriptionId::new(format!("bluetooth-live-{}", rand::random::<u64>()));
110 let _ = relay
111 .register_subscription_query(
112 client_id,
113 live_subscription_id.clone(),
114 vec![NostrFilter::new().since(Timestamp::now())],
115 )
116 .await;
117
118 let peer_for_forward = peer.clone();
119 nostr_forward_task = Some(tokio::spawn(async move {
120 while let Some(text) = nostr_rx.recv().await {
121 if let Ok(NostrRelayMessage::Event {
122 subscription_id,
123 event,
124 }) = NostrRelayMessage::from_json(&text)
125 {
126 if subscription_id == live_subscription_id {
127 if event.kind.is_ephemeral()
128 || !peer_for_forward.mark_seen_event_id(event.id.to_hex()).await
129 {
130 continue;
131 }
132 if peer_for_forward
133 .send_frame(BluetoothFrame::Text(event.as_json()))
134 .await
135 .is_err()
136 {
137 break;
138 }
139 continue;
140 }
141 }
142 if peer_for_forward
143 .send_frame(BluetoothFrame::Text(text))
144 .await
145 .is_err()
146 {
147 break;
148 }
149 }
150 }));
151 }
152
153 while let Some(frame) = peer.link.recv().await {
154 match frame {
155 BluetoothFrame::Binary(data) => {
156 if let Err(err) = peer.handle_binary_frame(data).await {
157 debug!(
158 "[BluetoothPeer {}] Ignoring invalid binary frame: {}",
159 peer.peer_id.short(),
160 err
161 );
162 }
163 }
164 BluetoothFrame::Text(text) => {
165 peer.handle_text_frame(text, nostr_client_id).await;
166 }
167 }
168 }
169
170 if let (Some(relay), Some(client_id)) = (peer.nostr_relay.as_ref(), nostr_client_id) {
171 relay.unregister_client(client_id).await;
172 }
173
174 if let Some(task) = nostr_forward_task {
175 let _ = task.await;
176 }
177 });
178 }
179
180 async fn handle_binary_frame(&self, data: Vec<u8>) -> Result<()> {
181 self.record_received(data.len() as u64).await;
182 match parse_message(&data).ok_or_else(|| anyhow!("invalid Bluetooth data frame"))? {
183 DataMessage::Request(req) => {
184 let hash_hex = hash_to_key(&req.h);
185 if let Some(store) = self.store.as_ref() {
186 if let Ok(Some(data)) = store.get(&hash_hex) {
187 let response = DataResponse {
188 h: req.h,
189 d: data,
190 i: None,
191 n: None,
192 };
193 let wire = encode_response(&response);
194 self.send_frame(BluetoothFrame::Binary(wire)).await?;
195 }
196 }
197 }
198 DataMessage::Response(res) => {
199 let hash_hex = hash_to_key(&res.h);
200 if let Some(sender) = self.pending_requests.lock().await.remove(&hash_hex) {
201 let _ = sender.send(Some(res.d));
202 }
203 }
204 other => {
205 debug!(
206 "[BluetoothPeer {}] Ignoring unsupported Bluetooth data frame {:?}",
207 self.peer_id.short(),
208 other
209 );
210 }
211 }
212 Ok(())
213 }
214
215 async fn handle_text_frame(&self, text: String, nostr_client_id: Option<u64>) {
216 self.record_received(text.len() as u64).await;
217 if let Ok(mesh_frame) = serde_json::from_str::<MeshNostrFrame>(&text) {
218 if let Some(tx) = self.mesh_frame_tx.as_ref() {
219 let _ = tx.send((self.peer_id.clone(), mesh_frame)).await;
220 return;
221 }
222 }
223
224 if let Ok(relay_msg) = NostrRelayMessage::from_json(&text) {
225 if let Some(sub_id) = relay_subscription_id(&relay_msg) {
226 let sender = {
227 let pending = self.pending_nostr_queries.lock().await;
228 pending.get(&sub_id).cloned()
229 };
230 if let Some(tx) = sender {
231 let _ = tx.send(relay_msg);
232 return;
233 }
234 }
235 }
236
237 if let Some(relay) = self.nostr_relay.as_ref() {
238 if let Ok(event) = Event::from_json(&text) {
239 if self.mark_seen_event_id(event.id.to_hex()).await {
240 let _ = relay
241 .ingest_trusted_event_from_peer(event, Some(self.peer_id.to_string()))
242 .await;
243 }
244 return;
245 }
246
247 if let Ok(nostr_msg) = NostrClientMessage::from_json(&text) {
248 if let Some(client_id) = nostr_client_id {
249 relay.handle_client_message(client_id, nostr_msg).await;
250 }
251 }
252 }
253 }
254
255 pub fn is_connected(&self) -> bool {
256 self.link.is_open()
257 }
258
259 pub fn htl_config(&self) -> &PeerHTLConfig {
260 &self.htl_config
261 }
262
263 async fn record_sent(&self, bytes: u64) {
264 if let Some(state) = self.traffic_state.as_ref() {
265 state.record_sent(&self.peer_id.to_string(), bytes).await;
266 }
267 }
268
269 async fn record_received(&self, bytes: u64) {
270 if let Some(state) = self.traffic_state.as_ref() {
271 state
272 .record_received(&self.peer_id.to_string(), bytes)
273 .await;
274 }
275 }
276
277 async fn send_frame(&self, frame: BluetoothFrame) -> Result<()> {
278 let bytes = match &frame {
279 BluetoothFrame::Text(text) => text.len() as u64,
280 BluetoothFrame::Binary(payload) => payload.len() as u64,
281 };
282 if let Err(err) = self.link.send(frame).await {
283 warn!(
284 "[BluetoothPeer {}] Failed to send frame over BLE: {}",
285 self.peer_id.short(),
286 err
287 );
288 let _ = self.link.close().await;
289 return Err(err);
290 }
291 self.record_sent(bytes).await;
292 Ok(())
293 }
294
295 pub async fn request_with_timeout(
296 &self,
297 hash_hex: &str,
298 timeout: Duration,
299 ) -> Result<Option<Vec<u8>>> {
300 if !self.link.is_open() {
301 return Ok(None);
302 }
303
304 let hash = hex::decode(hash_hex)?;
305 let request = DataRequest {
306 h: hash,
307 htl: BLOB_REQUEST_POLICY.max_htl,
308 q: None,
309 };
310 let wire = encode_request(&request);
311 let (tx, rx) = oneshot::channel();
312 self.pending_requests
313 .lock()
314 .await
315 .insert(hash_hex.to_string(), tx);
316 self.send_frame(BluetoothFrame::Binary(wire)).await?;
317
318 match tokio::time::timeout(timeout, rx).await {
319 Ok(Ok(data)) => Ok(data),
320 Ok(Err(_)) => Ok(None),
321 Err(_) => {
322 self.pending_requests.lock().await.remove(hash_hex);
323 Ok(None)
324 }
325 }
326 }
327
328 pub async fn query_nostr_events(
329 &self,
330 filters: Vec<NostrFilter>,
331 timeout: Duration,
332 ) -> Result<Vec<Event>> {
333 let subscription_id = NostrSubscriptionId::generate();
334 let subscription_key = subscription_id.to_string();
335 let (tx, mut rx) = mpsc::unbounded_channel::<NostrRelayMessage>();
336
337 self.pending_nostr_queries
338 .lock()
339 .await
340 .insert(subscription_key.clone(), tx);
341
342 let req = NostrClientMessage::req(subscription_id.clone(), filters);
343 self.send_frame(BluetoothFrame::Text(req.as_json())).await?;
344
345 let mut events = Vec::new();
346 let deadline = tokio::time::Instant::now() + timeout;
347
348 loop {
349 let now = tokio::time::Instant::now();
350 if now >= deadline {
351 break;
352 }
353 match tokio::time::timeout(deadline - now, rx.recv()).await {
354 Ok(Some(NostrRelayMessage::Event {
355 subscription_id: sid,
356 event,
357 })) if sid == subscription_id => events.push(*event),
358 Ok(Some(NostrRelayMessage::EndOfStoredEvents(sid))) if sid == subscription_id => {
359 break;
360 }
361 Ok(Some(NostrRelayMessage::Closed {
362 subscription_id: sid,
363 ..
364 })) if sid == subscription_id => break,
365 Ok(Some(_)) => {}
366 Ok(None) | Err(_) => break,
367 }
368 }
369
370 let close = NostrClientMessage::close(subscription_id.clone());
371 let _ = self.send_frame(BluetoothFrame::Text(close.as_json())).await;
372 self.pending_nostr_queries
373 .lock()
374 .await
375 .remove(&subscription_key);
376 Ok(events)
377 }
378
379 pub async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
380 let text = serde_json::to_string(frame)?;
381 self.send_frame(BluetoothFrame::Text(text)).await
382 }
383
384 pub async fn close(&self) -> Result<()> {
385 self.link.close().await
386 }
387}
388
389fn relay_subscription_id(msg: &NostrRelayMessage) -> Option<String> {
390 match msg {
391 NostrRelayMessage::Event {
392 subscription_id, ..
393 } => Some(subscription_id.to_string()),
394 NostrRelayMessage::EndOfStoredEvents(subscription_id) => Some(subscription_id.to_string()),
395 NostrRelayMessage::Closed {
396 subscription_id, ..
397 } => Some(subscription_id.to_string()),
398 NostrRelayMessage::Count {
399 subscription_id, ..
400 } => Some(subscription_id.to_string()),
401 _ => None,
402 }
403}
404
405#[cfg(test)]
406pub struct MockBluetoothLink {
407 open: std::sync::atomic::AtomicBool,
408 tx: mpsc::Sender<BluetoothFrame>,
409 rx: Mutex<mpsc::Receiver<BluetoothFrame>>,
410}
411
412#[cfg(test)]
413impl MockBluetoothLink {
414 pub fn pair() -> (Arc<Self>, Arc<Self>) {
415 let (tx_a, rx_a) = mpsc::channel(32);
416 let (tx_b, rx_b) = mpsc::channel(32);
417 (
418 Arc::new(Self {
419 open: std::sync::atomic::AtomicBool::new(true),
420 tx: tx_a,
421 rx: Mutex::new(rx_b),
422 }),
423 Arc::new(Self {
424 open: std::sync::atomic::AtomicBool::new(true),
425 tx: tx_b,
426 rx: Mutex::new(rx_a),
427 }),
428 )
429 }
430}
431
432#[cfg(test)]
433#[async_trait]
434impl BluetoothLink for MockBluetoothLink {
435 async fn send(&self, frame: BluetoothFrame) -> Result<()> {
436 use std::sync::atomic::Ordering;
437 if !self.open.load(Ordering::Relaxed) {
438 return Ok(());
439 }
440 self.tx.send(frame).await.map_err(Into::into)
441 }
442
443 async fn recv(&self) -> Option<BluetoothFrame> {
444 self.rx.lock().await.recv().await
445 }
446
447 fn is_open(&self) -> bool {
448 use std::sync::atomic::Ordering;
449 self.open.load(Ordering::Relaxed)
450 }
451
452 async fn close(&self) -> Result<()> {
453 use std::sync::atomic::Ordering;
454 self.open.store(false, Ordering::Relaxed);
455 Ok(())
456 }
457}