1use anyhow::Result;
2use async_trait::async_trait;
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::sync::{mpsc, oneshot, Mutex};
7use tracing::{debug, warn};
8
9use crate::nostr_relay::NostrRelay;
10
11use super::peer::ContentStore;
12use super::signaling::WebRTCState;
13use super::types::{
14 encode_request, encode_response, hash_to_hex, parse_message, DataMessage, DataRequest,
15 DataResponse, MeshNostrFrame, PeerDirection, PeerHTLConfig, PeerId, TimedSeenSet,
16 BLOB_REQUEST_POLICY,
17};
18use nostr::{
19 ClientMessage as NostrClientMessage, Filter as NostrFilter, JsonUtil as NostrJsonUtil,
20 RelayMessage as NostrRelayMessage, SubscriptionId as NostrSubscriptionId, Timestamp,
21};
22
23const BLUETOOTH_SEEN_EVENT_CAP: usize = 2048;
24const BLUETOOTH_SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
25
26#[derive(Debug, Clone)]
27pub enum BluetoothFrame {
28 Text(String),
29 Binary(Vec<u8>),
30}
31
32#[async_trait]
33pub trait BluetoothLink: Send + Sync {
34 async fn send(&self, frame: BluetoothFrame) -> Result<()>;
35 async fn recv(&self) -> Option<BluetoothFrame>;
36 fn is_open(&self) -> bool;
37 async fn close(&self) -> Result<()>;
38}
39
40pub struct BluetoothPeer {
41 pub peer_id: PeerId,
42 pub direction: PeerDirection,
43 pub created_at: std::time::Instant,
44 pub connected_at: Option<std::time::Instant>,
45 link: Arc<dyn BluetoothLink>,
46 store: Option<Arc<dyn ContentStore>>,
47 pending_requests: Arc<Mutex<HashMap<String, oneshot::Sender<Option<Vec<u8>>>>>>,
48 pending_nostr_queries: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>>,
49 nostr_relay: Option<Arc<NostrRelay>>,
50 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
51 traffic_state: Option<Arc<WebRTCState>>,
52 seen_event_ids: Arc<Mutex<TimedSeenSet>>,
53 htl_config: PeerHTLConfig,
54}
55
56impl BluetoothPeer {
57 #[allow(clippy::too_many_arguments)]
58 pub fn new(
59 peer_id: PeerId,
60 direction: PeerDirection,
61 link: Arc<dyn BluetoothLink>,
62 store: Option<Arc<dyn ContentStore>>,
63 nostr_relay: Option<Arc<NostrRelay>>,
64 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
65 traffic_state: Option<Arc<WebRTCState>>,
66 ) -> Arc<Self> {
67 let peer = Arc::new(Self {
68 peer_id,
69 direction,
70 created_at: std::time::Instant::now(),
71 connected_at: Some(std::time::Instant::now()),
72 link,
73 store,
74 pending_requests: Arc::new(Mutex::new(HashMap::new())),
75 pending_nostr_queries: Arc::new(Mutex::new(HashMap::new())),
76 nostr_relay,
77 mesh_frame_tx,
78 traffic_state,
79 seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
80 BLUETOOTH_SEEN_EVENT_CAP,
81 BLUETOOTH_SEEN_EVENT_TTL,
82 ))),
83 htl_config: PeerHTLConfig::random(),
84 });
85 Self::spawn_reader(peer.clone());
86 peer
87 }
88
89 async fn mark_seen_event_id(&self, event_id: String) -> bool {
90 self.seen_event_ids.lock().await.insert_if_new(event_id)
91 }
92
93 fn spawn_reader(peer: Arc<Self>) {
94 tokio::spawn(async move {
95 let mut nostr_forward_task = None;
96 let mut nostr_client_id = None;
97
98 if let Some(relay) = peer.nostr_relay.as_ref() {
99 let client_id = relay.next_client_id();
100 let (nostr_tx, mut nostr_rx) = mpsc::unbounded_channel::<String>();
101 relay
102 .register_client(client_id, nostr_tx, Some(peer.peer_id.pubkey.clone()))
103 .await;
104 nostr_client_id = Some(client_id);
105
106 let live_subscription_id =
107 NostrSubscriptionId::new(format!("bluetooth-live-{}", rand::random::<u64>()));
108 let _ = relay
109 .register_subscription_query(
110 client_id,
111 live_subscription_id.clone(),
112 vec![NostrFilter::new().since(Timestamp::now())],
113 )
114 .await;
115
116 let peer_for_forward = peer.clone();
117 nostr_forward_task = Some(tokio::spawn(async move {
118 while let Some(text) = nostr_rx.recv().await {
119 if let Ok(NostrRelayMessage::Event {
120 subscription_id,
121 event,
122 }) = NostrRelayMessage::from_json(&text)
123 {
124 if subscription_id == live_subscription_id {
125 if event.kind.is_ephemeral()
126 || !peer_for_forward.mark_seen_event_id(event.id.to_hex()).await
127 {
128 continue;
129 }
130 if peer_for_forward
131 .send_frame(BluetoothFrame::Text(event.as_json()))
132 .await
133 .is_err()
134 {
135 break;
136 }
137 continue;
138 }
139 }
140 if peer_for_forward
141 .send_frame(BluetoothFrame::Text(text))
142 .await
143 .is_err()
144 {
145 break;
146 }
147 }
148 }));
149 }
150
151 while let Some(frame) = peer.link.recv().await {
152 match frame {
153 BluetoothFrame::Binary(data) => {
154 if let Err(err) = peer.handle_binary_frame(data).await {
155 debug!(
156 "[BluetoothPeer {}] Ignoring invalid binary frame: {}",
157 peer.peer_id.short(),
158 err
159 );
160 }
161 }
162 BluetoothFrame::Text(text) => {
163 peer.handle_text_frame(text, nostr_client_id).await;
164 }
165 }
166 }
167
168 if let (Some(relay), Some(client_id)) = (peer.nostr_relay.as_ref(), nostr_client_id) {
169 relay.unregister_client(client_id).await;
170 }
171
172 if let Some(task) = nostr_forward_task {
173 let _ = task.await;
174 }
175 });
176 }
177
178 async fn handle_binary_frame(&self, data: Vec<u8>) -> Result<()> {
179 self.record_received(data.len() as u64).await;
180 match parse_message(&data)? {
181 DataMessage::Request(req) => {
182 let hash_hex = hash_to_hex(&req.h);
183 if let Some(store) = self.store.as_ref() {
184 if let Ok(Some(data)) = store.get(&hash_hex) {
185 let response = DataResponse {
186 h: req.h,
187 d: data,
188 i: None,
189 n: None,
190 };
191 let wire = encode_response(&response)?;
192 self.send_frame(BluetoothFrame::Binary(wire)).await?;
193 }
194 }
195 }
196 DataMessage::Response(res) => {
197 let hash_hex = hash_to_hex(&res.h);
198 if let Some(sender) = self.pending_requests.lock().await.remove(&hash_hex) {
199 let _ = sender.send(Some(res.d));
200 }
201 }
202 other => {
203 debug!(
204 "[BluetoothPeer {}] Ignoring unsupported Bluetooth data frame {:?}",
205 self.peer_id.short(),
206 other
207 );
208 }
209 }
210 Ok(())
211 }
212
213 async fn handle_text_frame(&self, text: String, nostr_client_id: Option<u64>) {
214 self.record_received(text.len() as u64).await;
215 if let Ok(mesh_frame) = serde_json::from_str::<MeshNostrFrame>(&text) {
216 if let Some(tx) = self.mesh_frame_tx.as_ref() {
217 let _ = tx.send((self.peer_id.clone(), mesh_frame)).await;
218 return;
219 }
220 }
221
222 if let Ok(relay_msg) = NostrRelayMessage::from_json(&text) {
223 if let Some(sub_id) = relay_subscription_id(&relay_msg) {
224 let sender = {
225 let pending = self.pending_nostr_queries.lock().await;
226 pending.get(&sub_id).cloned()
227 };
228 if let Some(tx) = sender {
229 let _ = tx.send(relay_msg);
230 return;
231 }
232 }
233 }
234
235 if let Some(relay) = self.nostr_relay.as_ref() {
236 if let Ok(event) = nostr::Event::from_json(&text) {
237 if self.mark_seen_event_id(event.id.to_hex()).await {
238 let _ = relay
239 .ingest_trusted_event_from_bluetooth(event, Some(self.peer_id.to_string()))
240 .await;
241 }
242 return;
243 }
244
245 if let Ok(nostr_msg) = NostrClientMessage::from_json(&text) {
246 if let Some(client_id) = nostr_client_id {
247 relay.handle_client_message(client_id, nostr_msg).await;
248 }
249 }
250 }
251 }
252
253 pub fn is_connected(&self) -> bool {
254 self.link.is_open()
255 }
256
257 pub fn htl_config(&self) -> &PeerHTLConfig {
258 &self.htl_config
259 }
260
261 async fn record_sent(&self, bytes: u64) {
262 if let Some(state) = self.traffic_state.as_ref() {
263 state.record_sent(&self.peer_id.to_string(), bytes).await;
264 }
265 }
266
267 async fn record_received(&self, bytes: u64) {
268 if let Some(state) = self.traffic_state.as_ref() {
269 state
270 .record_received(&self.peer_id.to_string(), bytes)
271 .await;
272 }
273 }
274
275 async fn send_frame(&self, frame: BluetoothFrame) -> Result<()> {
276 let bytes = match &frame {
277 BluetoothFrame::Text(text) => text.len() as u64,
278 BluetoothFrame::Binary(payload) => payload.len() as u64,
279 };
280 if let Err(err) = self.link.send(frame).await {
281 warn!(
282 "[BluetoothPeer {}] Failed to send frame over BLE: {}",
283 self.peer_id.short(),
284 err
285 );
286 let _ = self.link.close().await;
287 return Err(err);
288 }
289 self.record_sent(bytes).await;
290 Ok(())
291 }
292
293 pub async fn request_with_timeout(
294 &self,
295 hash_hex: &str,
296 timeout: Duration,
297 ) -> Result<Option<Vec<u8>>> {
298 if !self.link.is_open() {
299 return Ok(None);
300 }
301
302 let hash = hex::decode(hash_hex)?;
303 let request = DataRequest {
304 h: hash,
305 htl: BLOB_REQUEST_POLICY.max_htl,
306 q: None,
307 };
308 let wire = encode_request(&request)?;
309 let (tx, rx) = oneshot::channel();
310 self.pending_requests
311 .lock()
312 .await
313 .insert(hash_hex.to_string(), tx);
314 self.send_frame(BluetoothFrame::Binary(wire)).await?;
315
316 match tokio::time::timeout(timeout, rx).await {
317 Ok(Ok(data)) => Ok(data),
318 Ok(Err(_)) => Ok(None),
319 Err(_) => {
320 self.pending_requests.lock().await.remove(hash_hex);
321 Ok(None)
322 }
323 }
324 }
325
326 pub async fn query_nostr_events(
327 &self,
328 filters: Vec<NostrFilter>,
329 timeout: Duration,
330 ) -> Result<Vec<nostr::Event>> {
331 let subscription_id = NostrSubscriptionId::generate();
332 let subscription_key = subscription_id.to_string();
333 let (tx, mut rx) = mpsc::unbounded_channel::<NostrRelayMessage>();
334
335 self.pending_nostr_queries
336 .lock()
337 .await
338 .insert(subscription_key.clone(), tx);
339
340 let req = NostrClientMessage::req(subscription_id.clone(), filters);
341 self.send_frame(BluetoothFrame::Text(req.as_json())).await?;
342
343 let mut events = Vec::new();
344 let deadline = tokio::time::Instant::now() + timeout;
345
346 loop {
347 let now = tokio::time::Instant::now();
348 if now >= deadline {
349 break;
350 }
351 match tokio::time::timeout(deadline - now, rx.recv()).await {
352 Ok(Some(NostrRelayMessage::Event {
353 subscription_id: sid,
354 event,
355 })) if sid == subscription_id => events.push(*event),
356 Ok(Some(NostrRelayMessage::EndOfStoredEvents(sid))) if sid == subscription_id => {
357 break;
358 }
359 Ok(Some(NostrRelayMessage::Closed {
360 subscription_id: sid,
361 ..
362 })) if sid == subscription_id => break,
363 Ok(Some(_)) => {}
364 Ok(None) | Err(_) => break,
365 }
366 }
367
368 let close = NostrClientMessage::close(subscription_id.clone());
369 let _ = self.send_frame(BluetoothFrame::Text(close.as_json())).await;
370 self.pending_nostr_queries
371 .lock()
372 .await
373 .remove(&subscription_key);
374 Ok(events)
375 }
376
377 pub async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
378 let text = serde_json::to_string(frame)?;
379 self.send_frame(BluetoothFrame::Text(text)).await
380 }
381
382 pub async fn close(&self) -> Result<()> {
383 self.link.close().await
384 }
385}
386
387fn relay_subscription_id(msg: &NostrRelayMessage) -> Option<String> {
388 match msg {
389 NostrRelayMessage::Event {
390 subscription_id, ..
391 } => Some(subscription_id.to_string()),
392 NostrRelayMessage::EndOfStoredEvents(subscription_id) => Some(subscription_id.to_string()),
393 NostrRelayMessage::Closed {
394 subscription_id, ..
395 } => Some(subscription_id.to_string()),
396 NostrRelayMessage::Count {
397 subscription_id, ..
398 } => Some(subscription_id.to_string()),
399 _ => None,
400 }
401}
402
403#[cfg(test)]
404pub struct MockBluetoothLink {
405 open: std::sync::atomic::AtomicBool,
406 tx: mpsc::Sender<BluetoothFrame>,
407 rx: Mutex<mpsc::Receiver<BluetoothFrame>>,
408}
409
410#[cfg(test)]
411impl MockBluetoothLink {
412 pub fn pair() -> (Arc<Self>, Arc<Self>) {
413 let (tx_a, rx_a) = mpsc::channel(32);
414 let (tx_b, rx_b) = mpsc::channel(32);
415 (
416 Arc::new(Self {
417 open: std::sync::atomic::AtomicBool::new(true),
418 tx: tx_a,
419 rx: Mutex::new(rx_b),
420 }),
421 Arc::new(Self {
422 open: std::sync::atomic::AtomicBool::new(true),
423 tx: tx_b,
424 rx: Mutex::new(rx_a),
425 }),
426 )
427 }
428}
429
430#[cfg(test)]
431#[async_trait]
432impl BluetoothLink for MockBluetoothLink {
433 async fn send(&self, frame: BluetoothFrame) -> Result<()> {
434 use std::sync::atomic::Ordering;
435 if !self.open.load(Ordering::Relaxed) {
436 return Ok(());
437 }
438 self.tx.send(frame).await.map_err(Into::into)
439 }
440
441 async fn recv(&self) -> Option<BluetoothFrame> {
442 self.rx.lock().await.recv().await
443 }
444
445 fn is_open(&self) -> bool {
446 use std::sync::atomic::Ordering;
447 self.open.load(Ordering::Relaxed)
448 }
449
450 async fn close(&self) -> Result<()> {
451 use std::sync::atomic::Ordering;
452 self.open.store(false, Ordering::Relaxed);
453 Ok(())
454 }
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460 use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
461 use crate::webrtc::signaling::{ConnectionState, PeerEntry, PeerSignalPath, PeerTransport};
462 use anyhow::anyhow;
463 use nostr::{EventBuilder, Filter, Keys, Kind};
464 use std::collections::HashSet;
465 use std::sync::atomic::{AtomicBool, Ordering};
466 use std::sync::Arc;
467 use std::time::Instant;
468 use tempfile::TempDir;
469
470 struct TestStore {
471 blobs: HashMap<String, Vec<u8>>,
472 }
473
474 impl ContentStore for TestStore {
475 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
476 Ok(self.blobs.get(hash_hex).cloned())
477 }
478 }
479
480 struct FailingSendLink {
481 open: AtomicBool,
482 }
483
484 #[async_trait]
485 impl BluetoothLink for FailingSendLink {
486 async fn send(&self, _frame: BluetoothFrame) -> Result<()> {
487 Err(anyhow!("send failed"))
488 }
489
490 async fn recv(&self) -> Option<BluetoothFrame> {
491 std::future::pending::<Option<BluetoothFrame>>().await
492 }
493
494 fn is_open(&self) -> bool {
495 self.open.load(Ordering::Relaxed)
496 }
497
498 async fn close(&self) -> Result<()> {
499 self.open.store(false, Ordering::Relaxed);
500 Ok(())
501 }
502 }
503
504 #[tokio::test]
505 async fn bluetooth_peer_round_trips_hash_request_over_mock_link() {
506 let (link_a, link_b) = MockBluetoothLink::pair();
507 let data = b"bluetooth mesh payload".to_vec();
508 let hash_hex = hex::encode(hashtree_core::sha256(&data));
509
510 let requester = BluetoothPeer::new(
511 PeerId::new("peer-a".to_string()),
512 PeerDirection::Outbound,
513 link_a,
514 None,
515 None,
516 None,
517 None,
518 );
519
520 let mut blobs = HashMap::new();
521 blobs.insert(hash_hex.clone(), data.clone());
522 let responder = BluetoothPeer::new(
523 PeerId::new("peer-b".to_string()),
524 PeerDirection::Inbound,
525 link_b,
526 Some(Arc::new(TestStore { blobs })),
527 None,
528 None,
529 None,
530 );
531
532 let received = requester
533 .request_with_timeout(&hash_hex, Duration::from_secs(1))
534 .await
535 .expect("request should succeed");
536
537 assert_eq!(received, Some(data));
538 responder.close().await.unwrap();
539 }
540
541 #[tokio::test]
542 async fn bluetooth_peer_records_bidirectional_bytes_in_router_state() {
543 let (link_a, link_b) = MockBluetoothLink::pair();
544 let state = Arc::new(WebRTCState::new());
545 let data = b"bluetooth stats payload".to_vec();
546 let hash_hex = hex::encode(hashtree_core::sha256(&data));
547 let requester_id = PeerId::new("peer-a".to_string());
548 let responder_id = PeerId::new("peer-b".to_string());
549
550 for peer_id in [&requester_id, &responder_id] {
551 state.peers.write().await.insert(
552 peer_id.to_string(),
553 PeerEntry {
554 peer_id: peer_id.clone(),
555 direction: PeerDirection::Outbound,
556 state: ConnectionState::Connected,
557 last_seen: Instant::now(),
558 peer: None,
559 pool: super::super::types::PeerPool::Other,
560 transport: PeerTransport::Bluetooth,
561 signal_paths: std::collections::BTreeSet::from([PeerSignalPath::Bluetooth]),
562 bytes_sent: 0,
563 bytes_received: 0,
564 },
565 );
566 }
567
568 let requester = BluetoothPeer::new(
569 requester_id.clone(),
570 PeerDirection::Outbound,
571 link_a,
572 None,
573 None,
574 None,
575 Some(state.clone()),
576 );
577
578 let mut blobs = HashMap::new();
579 blobs.insert(hash_hex.clone(), data.clone());
580 let responder = BluetoothPeer::new(
581 responder_id.clone(),
582 PeerDirection::Inbound,
583 link_b,
584 Some(Arc::new(TestStore { blobs })),
585 None,
586 None,
587 Some(state.clone()),
588 );
589
590 let received = requester
591 .request_with_timeout(&hash_hex, Duration::from_secs(1))
592 .await
593 .expect("request should succeed");
594
595 assert_eq!(received, Some(data.clone()));
596 let hash = hex::decode(&hash_hex).expect("valid hash hex");
597 let expected_request_len = encode_request(&DataRequest {
598 h: hash.clone(),
599 htl: BLOB_REQUEST_POLICY.max_htl,
600 q: None,
601 })
602 .expect("request encoding")
603 .len() as u64;
604 let expected_response_len = encode_response(&DataResponse {
605 h: hash,
606 d: data.clone(),
607 i: None,
608 n: None,
609 })
610 .expect("response encoding")
611 .len() as u64;
612
613 let peers = state.peers.read().await;
614 let requester_stats = peers
615 .get(&requester_id.to_string())
616 .expect("requester stats");
617 let responder_stats = peers
618 .get(&responder_id.to_string())
619 .expect("responder stats");
620 assert_eq!(requester_stats.bytes_sent, expected_request_len);
621 assert_eq!(requester_stats.bytes_received, expected_response_len);
622 assert_eq!(responder_stats.bytes_received, expected_request_len);
623 assert_eq!(responder_stats.bytes_sent, expected_response_len);
624 drop(peers);
625
626 responder.close().await.unwrap();
627 }
628
629 #[tokio::test]
630 async fn bluetooth_peer_round_trips_nostr_queries_over_mock_link() -> Result<()> {
631 let (link_a, link_b) = MockBluetoothLink::pair();
632 let tmp = TempDir::new()?;
633 let graph_store = {
634 let _guard = crate::socialgraph::test_lock();
635 crate::socialgraph::open_social_graph_store_with_mapsize(
636 tmp.path(),
637 Some(128 * 1024 * 1024),
638 )?
639 };
640 let author_keys = Keys::generate();
641 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
642 let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
643 Arc::clone(&backend),
644 0,
645 HashSet::from([author_keys.public_key().to_hex()]),
646 ));
647 let relay = Arc::new(NostrRelay::new(
648 Arc::clone(&backend),
649 tmp.path().to_path_buf(),
650 HashSet::from([author_keys.public_key().to_hex()]),
651 Some(access),
652 NostrRelayConfig {
653 spambox_db_max_bytes: 0,
654 ..Default::default()
655 },
656 )?);
657
658 let requester = BluetoothPeer::new(
659 PeerId::new("peer-a".to_string()),
660 PeerDirection::Outbound,
661 link_a,
662 None,
663 None,
664 None,
665 None,
666 );
667 let responder = BluetoothPeer::new(
668 PeerId::new("peer-b".to_string()),
669 PeerDirection::Inbound,
670 link_b,
671 None,
672 Some(relay.clone()),
673 None,
674 None,
675 );
676
677 let event = EventBuilder::new(Kind::TextNote, "bluetooth nostr relay", [])
678 .to_event(&author_keys)?;
679 relay.ingest_trusted_event(event.clone()).await?;
680 tokio::time::sleep(Duration::from_millis(50)).await;
681
682 let events = requester
683 .query_nostr_events(
684 vec![Filter::new()
685 .authors(vec![event.pubkey])
686 .kinds(vec![event.kind])],
687 Duration::from_secs(1),
688 )
689 .await?;
690
691 assert_eq!(events.len(), 1);
692 assert_eq!(events[0].id, event.id);
693 responder.close().await?;
694 Ok(())
695 }
696
697 #[tokio::test]
698 async fn bluetooth_peer_forwards_local_publishes_and_records_bluetooth_provenance() -> Result<()>
699 {
700 let (link_a, link_b) = MockBluetoothLink::pair();
701 let tmp_a = TempDir::new()?;
702 let tmp_b = TempDir::new()?;
703
704 let graph_store_a = {
705 let _guard = crate::socialgraph::test_lock();
706 crate::socialgraph::open_social_graph_store_with_mapsize(
707 tmp_a.path(),
708 Some(128 * 1024 * 1024),
709 )?
710 };
711 let graph_store_b = {
712 let _guard = crate::socialgraph::test_lock();
713 crate::socialgraph::open_social_graph_store_with_mapsize(
714 tmp_b.path(),
715 Some(128 * 1024 * 1024),
716 )?
717 };
718 let author_keys = Keys::generate();
719
720 let backend_a: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store_a.clone();
721 let backend_b: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store_b.clone();
722 let access_a = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
723 Arc::clone(&backend_a),
724 0,
725 HashSet::from([author_keys.public_key().to_hex()]),
726 ));
727 let access_b = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
728 Arc::clone(&backend_b),
729 0,
730 HashSet::from([author_keys.public_key().to_hex()]),
731 ));
732
733 let relay_a = Arc::new(NostrRelay::new(
734 Arc::clone(&backend_a),
735 tmp_a.path().to_path_buf(),
736 HashSet::from([author_keys.public_key().to_hex()]),
737 Some(access_a),
738 NostrRelayConfig {
739 spambox_db_max_bytes: 0,
740 ..Default::default()
741 },
742 )?);
743 let relay_b = Arc::new(NostrRelay::new(
744 Arc::clone(&backend_b),
745 tmp_b.path().to_path_buf(),
746 HashSet::from([author_keys.public_key().to_hex()]),
747 Some(access_b),
748 NostrRelayConfig {
749 spambox_db_max_bytes: 0,
750 ..Default::default()
751 },
752 )?);
753
754 let sender = BluetoothPeer::new(
755 PeerId::new("peer-a".to_string()),
756 PeerDirection::Outbound,
757 link_a,
758 None,
759 Some(relay_a.clone()),
760 None,
761 None,
762 );
763 let receiver = BluetoothPeer::new(
764 PeerId::new("peer-b".to_string()),
765 PeerDirection::Inbound,
766 link_b,
767 None,
768 Some(relay_b.clone()),
769 None,
770 None,
771 );
772
773 tokio::time::sleep(Duration::from_millis(50)).await;
774
775 let cid = "ab".repeat(32);
776 let event = EventBuilder::new(
777 Kind::TextNote,
778 "bluetooth publish sync",
779 [nostr::Tag::parse(&["cid", &cid]).unwrap()],
780 )
781 .to_event(&author_keys)?;
782 relay_a.ingest_trusted_event(event.clone()).await?;
783
784 tokio::time::sleep(Duration::from_millis(150)).await;
785
786 let received = relay_b
787 .query_events(
788 &Filter::new()
789 .authors(vec![event.pubkey])
790 .kinds(vec![event.kind]),
791 10,
792 )
793 .await;
794 assert_eq!(
795 received
796 .iter()
797 .filter(|candidate| candidate.id == event.id)
798 .count(),
799 1
800 );
801
802 let bluetooth_events = relay_b.bluetooth_received_events(10).await;
803 assert_eq!(bluetooth_events.len(), 1);
804 assert_eq!(bluetooth_events[0].event_id, event.id.to_hex());
805 assert_eq!(bluetooth_events[0].peer_id.as_deref(), Some("peer-b"));
806 assert_eq!(bluetooth_events[0].cid_values, vec![cid]);
807
808 receiver.close().await?;
809 sender.close().await?;
810 Ok(())
811 }
812
813 #[tokio::test]
814 async fn bluetooth_peer_forwards_local_publishes_both_directions() -> Result<()> {
815 let (link_a, link_b) = MockBluetoothLink::pair();
816 let tmp_a = TempDir::new()?;
817 let tmp_b = TempDir::new()?;
818
819 let graph_store_a = {
820 let _guard = crate::socialgraph::test_lock();
821 crate::socialgraph::open_social_graph_store_with_mapsize(
822 tmp_a.path(),
823 Some(128 * 1024 * 1024),
824 )?
825 };
826 let graph_store_b = {
827 let _guard = crate::socialgraph::test_lock();
828 crate::socialgraph::open_social_graph_store_with_mapsize(
829 tmp_b.path(),
830 Some(128 * 1024 * 1024),
831 )?
832 };
833 let author_keys_a = Keys::generate();
834 let author_keys_b = Keys::generate();
835
836 let backend_a: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store_a.clone();
837 let backend_b: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store_b.clone();
838 let access_a = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
839 Arc::clone(&backend_a),
840 0,
841 HashSet::from([
842 author_keys_a.public_key().to_hex(),
843 author_keys_b.public_key().to_hex(),
844 ]),
845 ));
846 let access_b = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
847 Arc::clone(&backend_b),
848 0,
849 HashSet::from([
850 author_keys_a.public_key().to_hex(),
851 author_keys_b.public_key().to_hex(),
852 ]),
853 ));
854
855 let relay_a = Arc::new(NostrRelay::new(
856 Arc::clone(&backend_a),
857 tmp_a.path().to_path_buf(),
858 HashSet::from([
859 author_keys_a.public_key().to_hex(),
860 author_keys_b.public_key().to_hex(),
861 ]),
862 Some(access_a),
863 NostrRelayConfig {
864 spambox_db_max_bytes: 0,
865 ..Default::default()
866 },
867 )?);
868 let relay_b = Arc::new(NostrRelay::new(
869 Arc::clone(&backend_b),
870 tmp_b.path().to_path_buf(),
871 HashSet::from([
872 author_keys_a.public_key().to_hex(),
873 author_keys_b.public_key().to_hex(),
874 ]),
875 Some(access_b),
876 NostrRelayConfig {
877 spambox_db_max_bytes: 0,
878 ..Default::default()
879 },
880 )?);
881
882 let peer_a = BluetoothPeer::new(
883 PeerId::new("peer-a".to_string()),
884 PeerDirection::Outbound,
885 link_a,
886 None,
887 Some(relay_a.clone()),
888 None,
889 None,
890 );
891 let peer_b = BluetoothPeer::new(
892 PeerId::new("peer-b".to_string()),
893 PeerDirection::Inbound,
894 link_b,
895 None,
896 Some(relay_b.clone()),
897 None,
898 None,
899 );
900
901 tokio::time::sleep(Duration::from_millis(50)).await;
902
903 let cid_a = "ab".repeat(32);
904 let event_a = EventBuilder::new(
905 Kind::TextNote,
906 "bluetooth publish from a",
907 [nostr::Tag::parse(&["cid", &cid_a]).unwrap()],
908 )
909 .to_event(&author_keys_a)?;
910 relay_a.ingest_trusted_event(event_a.clone()).await?;
911
912 let cid_b = "cd".repeat(32);
913 let event_b = EventBuilder::new(
914 Kind::TextNote,
915 "bluetooth publish from b",
916 [nostr::Tag::parse(&["cid", &cid_b]).unwrap()],
917 )
918 .to_event(&author_keys_b)?;
919 relay_b.ingest_trusted_event(event_b.clone()).await?;
920
921 tokio::time::sleep(Duration::from_millis(200)).await;
922
923 let received_on_b = relay_b
924 .query_events(
925 &Filter::new()
926 .authors(vec![event_a.pubkey])
927 .kinds(vec![event_a.kind]),
928 10,
929 )
930 .await;
931 assert_eq!(
932 received_on_b
933 .iter()
934 .filter(|candidate| candidate.id == event_a.id)
935 .count(),
936 1
937 );
938
939 let received_on_a = relay_a
940 .query_events(
941 &Filter::new()
942 .authors(vec![event_b.pubkey])
943 .kinds(vec![event_b.kind]),
944 10,
945 )
946 .await;
947 assert_eq!(
948 received_on_a
949 .iter()
950 .filter(|candidate| candidate.id == event_b.id)
951 .count(),
952 1
953 );
954
955 let bluetooth_events_a = relay_a.bluetooth_received_events(10).await;
956 assert_eq!(bluetooth_events_a.len(), 1);
957 assert_eq!(bluetooth_events_a[0].event_id, event_b.id.to_hex());
958 assert_eq!(bluetooth_events_a[0].peer_id.as_deref(), Some("peer-a"));
959 assert_eq!(bluetooth_events_a[0].cid_values, vec![cid_b]);
960
961 let bluetooth_events_b = relay_b.bluetooth_received_events(10).await;
962 assert_eq!(bluetooth_events_b.len(), 1);
963 assert_eq!(bluetooth_events_b[0].event_id, event_a.id.to_hex());
964 assert_eq!(bluetooth_events_b[0].peer_id.as_deref(), Some("peer-b"));
965 assert_eq!(bluetooth_events_b[0].cid_values, vec![cid_a]);
966
967 peer_b.close().await?;
968 peer_a.close().await?;
969 Ok(())
970 }
971
972 #[tokio::test]
973 async fn bluetooth_peer_closes_after_local_publish_send_failure() -> Result<()> {
974 let tmp = TempDir::new()?;
975 let graph_store = {
976 let _guard = crate::socialgraph::test_lock();
977 crate::socialgraph::open_social_graph_store_with_mapsize(
978 tmp.path(),
979 Some(128 * 1024 * 1024),
980 )?
981 };
982 let author_keys = Keys::generate();
983 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
984 let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
985 Arc::clone(&backend),
986 0,
987 HashSet::from([author_keys.public_key().to_hex()]),
988 ));
989 let relay = Arc::new(NostrRelay::new(
990 Arc::clone(&backend),
991 tmp.path().to_path_buf(),
992 HashSet::from([author_keys.public_key().to_hex()]),
993 Some(access),
994 NostrRelayConfig {
995 spambox_db_max_bytes: 0,
996 ..Default::default()
997 },
998 )?);
999
1000 let peer = BluetoothPeer::new(
1001 PeerId::new("peer-a".to_string()),
1002 PeerDirection::Outbound,
1003 Arc::new(FailingSendLink {
1004 open: AtomicBool::new(true),
1005 }),
1006 None,
1007 Some(relay.clone()),
1008 None,
1009 None,
1010 );
1011
1012 tokio::time::sleep(Duration::from_millis(50)).await;
1013 assert!(peer.is_connected());
1014
1015 let event = EventBuilder::new(Kind::TextNote, "close stale bluetooth peer", [])
1016 .to_event(&author_keys)?;
1017 relay.ingest_trusted_event(event).await?;
1018
1019 tokio::time::sleep(Duration::from_millis(100)).await;
1020 assert!(!peer.is_connected());
1021 Ok(())
1022 }
1023}