1use anyhow::Result;
2use async_trait::async_trait;
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::sync::{mpsc, oneshot, Mutex};
7use tracing::debug;
8
9use crate::nostr_relay::NostrRelay;
10
11use super::peer::ContentStore;
12use super::signaling::WebRTCState;
13use super::types::{
14 encode_request, encode_response, hash_to_hex, parse_message, DataMessage, DataRequest,
15 DataResponse, MeshNostrFrame, PeerDirection, PeerHTLConfig, PeerId, TimedSeenSet,
16 BLOB_REQUEST_POLICY,
17};
18use nostr::{
19 ClientMessage as NostrClientMessage, Filter as NostrFilter, JsonUtil as NostrJsonUtil,
20 RelayMessage as NostrRelayMessage, SubscriptionId as NostrSubscriptionId, Timestamp,
21};
22
23const BLUETOOTH_SEEN_EVENT_CAP: usize = 2048;
24const BLUETOOTH_SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
25
26#[derive(Debug, Clone)]
27pub enum BluetoothFrame {
28 Text(String),
29 Binary(Vec<u8>),
30}
31
32#[async_trait]
33pub trait BluetoothLink: Send + Sync {
34 async fn send(&self, frame: BluetoothFrame) -> Result<()>;
35 async fn recv(&self) -> Option<BluetoothFrame>;
36 fn is_open(&self) -> bool;
37 async fn close(&self) -> Result<()>;
38}
39
40pub struct BluetoothPeer {
41 pub peer_id: PeerId,
42 pub direction: PeerDirection,
43 pub created_at: std::time::Instant,
44 pub connected_at: Option<std::time::Instant>,
45 link: Arc<dyn BluetoothLink>,
46 store: Option<Arc<dyn ContentStore>>,
47 pending_requests: Arc<Mutex<HashMap<String, oneshot::Sender<Option<Vec<u8>>>>>>,
48 pending_nostr_queries: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>>,
49 nostr_relay: Option<Arc<NostrRelay>>,
50 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
51 traffic_state: Option<Arc<WebRTCState>>,
52 seen_event_ids: Arc<Mutex<TimedSeenSet>>,
53 htl_config: PeerHTLConfig,
54}
55
56impl BluetoothPeer {
57 #[allow(clippy::too_many_arguments)]
58 pub fn new(
59 peer_id: PeerId,
60 direction: PeerDirection,
61 link: Arc<dyn BluetoothLink>,
62 store: Option<Arc<dyn ContentStore>>,
63 nostr_relay: Option<Arc<NostrRelay>>,
64 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
65 traffic_state: Option<Arc<WebRTCState>>,
66 ) -> Arc<Self> {
67 let peer = Arc::new(Self {
68 peer_id,
69 direction,
70 created_at: std::time::Instant::now(),
71 connected_at: Some(std::time::Instant::now()),
72 link,
73 store,
74 pending_requests: Arc::new(Mutex::new(HashMap::new())),
75 pending_nostr_queries: Arc::new(Mutex::new(HashMap::new())),
76 nostr_relay,
77 mesh_frame_tx,
78 traffic_state,
79 seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
80 BLUETOOTH_SEEN_EVENT_CAP,
81 BLUETOOTH_SEEN_EVENT_TTL,
82 ))),
83 htl_config: PeerHTLConfig::random(),
84 });
85 Self::spawn_reader(peer.clone());
86 peer
87 }
88
89 async fn mark_seen_event_id(&self, event_id: String) -> bool {
90 self.seen_event_ids.lock().await.insert_if_new(event_id)
91 }
92
93 fn spawn_reader(peer: Arc<Self>) {
94 tokio::spawn(async move {
95 let mut nostr_forward_task = None;
96 let mut nostr_client_id = None;
97
98 if let Some(relay) = peer.nostr_relay.as_ref() {
99 let client_id = relay.next_client_id();
100 let (nostr_tx, mut nostr_rx) = mpsc::unbounded_channel::<String>();
101 relay
102 .register_client(client_id, nostr_tx, Some(peer.peer_id.pubkey.clone()))
103 .await;
104 nostr_client_id = Some(client_id);
105
106 let live_subscription_id =
107 NostrSubscriptionId::new(format!("bluetooth-live-{}", rand::random::<u64>()));
108 let _ = relay
109 .register_subscription_query(
110 client_id,
111 live_subscription_id.clone(),
112 vec![NostrFilter::new().since(Timestamp::now())],
113 )
114 .await;
115
116 let peer_for_forward = peer.clone();
117 nostr_forward_task = Some(tokio::spawn(async move {
118 while let Some(text) = nostr_rx.recv().await {
119 if let Ok(NostrRelayMessage::Event {
120 subscription_id,
121 event,
122 }) = NostrRelayMessage::from_json(&text)
123 {
124 if subscription_id == live_subscription_id {
125 if event.kind.is_ephemeral()
126 || !peer_for_forward.mark_seen_event_id(event.id.to_hex()).await
127 {
128 continue;
129 }
130 if peer_for_forward
131 .send_frame(BluetoothFrame::Text(event.as_json()))
132 .await
133 .is_err()
134 {
135 break;
136 }
137 continue;
138 }
139 }
140 if peer_for_forward
141 .send_frame(BluetoothFrame::Text(text))
142 .await
143 .is_err()
144 {
145 break;
146 }
147 }
148 }));
149 }
150
151 while let Some(frame) = peer.link.recv().await {
152 match frame {
153 BluetoothFrame::Binary(data) => {
154 if let Err(err) = peer.handle_binary_frame(data).await {
155 debug!(
156 "[BluetoothPeer {}] Ignoring invalid binary frame: {}",
157 peer.peer_id.short(),
158 err
159 );
160 }
161 }
162 BluetoothFrame::Text(text) => {
163 peer.handle_text_frame(text, nostr_client_id).await;
164 }
165 }
166 }
167
168 if let (Some(relay), Some(client_id)) = (peer.nostr_relay.as_ref(), nostr_client_id) {
169 relay.unregister_client(client_id).await;
170 }
171
172 if let Some(task) = nostr_forward_task {
173 let _ = task.await;
174 }
175 });
176 }
177
178 async fn handle_binary_frame(&self, data: Vec<u8>) -> Result<()> {
179 self.record_received(data.len() as u64).await;
180 match parse_message(&data)? {
181 DataMessage::Request(req) => {
182 let hash_hex = hash_to_hex(&req.h);
183 if let Some(store) = self.store.as_ref() {
184 if let Ok(Some(data)) = store.get(&hash_hex) {
185 let response = DataResponse { h: req.h, d: data };
186 let wire = encode_response(&response)?;
187 self.send_frame(BluetoothFrame::Binary(wire)).await?;
188 }
189 }
190 }
191 DataMessage::Response(res) => {
192 let hash_hex = hash_to_hex(&res.h);
193 if let Some(sender) = self.pending_requests.lock().await.remove(&hash_hex) {
194 let _ = sender.send(Some(res.d));
195 }
196 }
197 other => {
198 debug!(
199 "[BluetoothPeer {}] Ignoring unsupported Bluetooth data frame {:?}",
200 self.peer_id.short(),
201 other
202 );
203 }
204 }
205 Ok(())
206 }
207
208 async fn handle_text_frame(&self, text: String, nostr_client_id: Option<u64>) {
209 self.record_received(text.len() as u64).await;
210 if let Ok(mesh_frame) = serde_json::from_str::<MeshNostrFrame>(&text) {
211 if let Some(tx) = self.mesh_frame_tx.as_ref() {
212 let _ = tx.send((self.peer_id.clone(), mesh_frame)).await;
213 return;
214 }
215 }
216
217 if let Ok(relay_msg) = NostrRelayMessage::from_json(&text) {
218 if let Some(sub_id) = relay_subscription_id(&relay_msg) {
219 let sender = {
220 let pending = self.pending_nostr_queries.lock().await;
221 pending.get(&sub_id).cloned()
222 };
223 if let Some(tx) = sender {
224 let _ = tx.send(relay_msg);
225 return;
226 }
227 }
228 }
229
230 if let Some(relay) = self.nostr_relay.as_ref() {
231 if let Ok(event) = nostr::Event::from_json(&text) {
232 if self.mark_seen_event_id(event.id.to_hex()).await {
233 let _ = relay
234 .ingest_trusted_event_from_bluetooth(event, Some(self.peer_id.to_string()))
235 .await;
236 }
237 return;
238 }
239
240 if let Ok(nostr_msg) = NostrClientMessage::from_json(&text) {
241 if let Some(client_id) = nostr_client_id {
242 relay.handle_client_message(client_id, nostr_msg).await;
243 }
244 }
245 }
246 }
247
248 pub fn is_connected(&self) -> bool {
249 self.link.is_open()
250 }
251
252 pub fn htl_config(&self) -> &PeerHTLConfig {
253 &self.htl_config
254 }
255
256 async fn record_sent(&self, bytes: u64) {
257 if let Some(state) = self.traffic_state.as_ref() {
258 state.record_sent(&self.peer_id.to_string(), bytes).await;
259 }
260 }
261
262 async fn record_received(&self, bytes: u64) {
263 if let Some(state) = self.traffic_state.as_ref() {
264 state
265 .record_received(&self.peer_id.to_string(), bytes)
266 .await;
267 }
268 }
269
270 async fn send_frame(&self, frame: BluetoothFrame) -> Result<()> {
271 let bytes = match &frame {
272 BluetoothFrame::Text(text) => text.len() as u64,
273 BluetoothFrame::Binary(payload) => payload.len() as u64,
274 };
275 if let Err(err) = self.link.send(frame).await {
276 let _ = self.link.close().await;
277 return Err(err);
278 }
279 self.record_sent(bytes).await;
280 Ok(())
281 }
282
283 pub async fn request_with_timeout(
284 &self,
285 hash_hex: &str,
286 timeout: Duration,
287 ) -> Result<Option<Vec<u8>>> {
288 if !self.link.is_open() {
289 return Ok(None);
290 }
291
292 let hash = hex::decode(hash_hex)?;
293 let request = DataRequest {
294 h: hash,
295 htl: BLOB_REQUEST_POLICY.max_htl,
296 q: None,
297 };
298 let wire = encode_request(&request)?;
299 let (tx, rx) = oneshot::channel();
300 self.pending_requests
301 .lock()
302 .await
303 .insert(hash_hex.to_string(), tx);
304 self.send_frame(BluetoothFrame::Binary(wire)).await?;
305
306 match tokio::time::timeout(timeout, rx).await {
307 Ok(Ok(data)) => Ok(data),
308 Ok(Err(_)) => Ok(None),
309 Err(_) => {
310 self.pending_requests.lock().await.remove(hash_hex);
311 Ok(None)
312 }
313 }
314 }
315
316 pub async fn query_nostr_events(
317 &self,
318 filters: Vec<NostrFilter>,
319 timeout: Duration,
320 ) -> Result<Vec<nostr::Event>> {
321 let subscription_id = NostrSubscriptionId::generate();
322 let subscription_key = subscription_id.to_string();
323 let (tx, mut rx) = mpsc::unbounded_channel::<NostrRelayMessage>();
324
325 self.pending_nostr_queries
326 .lock()
327 .await
328 .insert(subscription_key.clone(), tx);
329
330 let req = NostrClientMessage::req(subscription_id.clone(), filters);
331 self.send_frame(BluetoothFrame::Text(req.as_json())).await?;
332
333 let mut events = Vec::new();
334 let deadline = tokio::time::Instant::now() + timeout;
335
336 loop {
337 let now = tokio::time::Instant::now();
338 if now >= deadline {
339 break;
340 }
341 match tokio::time::timeout(deadline - now, rx.recv()).await {
342 Ok(Some(NostrRelayMessage::Event {
343 subscription_id: sid,
344 event,
345 })) if sid == subscription_id => events.push(*event),
346 Ok(Some(NostrRelayMessage::EndOfStoredEvents(sid))) if sid == subscription_id => {
347 break;
348 }
349 Ok(Some(NostrRelayMessage::Closed {
350 subscription_id: sid,
351 ..
352 })) if sid == subscription_id => break,
353 Ok(Some(_)) => {}
354 Ok(None) | Err(_) => break,
355 }
356 }
357
358 let close = NostrClientMessage::close(subscription_id.clone());
359 let _ = self.send_frame(BluetoothFrame::Text(close.as_json())).await;
360 self.pending_nostr_queries
361 .lock()
362 .await
363 .remove(&subscription_key);
364 Ok(events)
365 }
366
367 pub async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
368 let text = serde_json::to_string(frame)?;
369 self.send_frame(BluetoothFrame::Text(text)).await
370 }
371
372 pub async fn close(&self) -> Result<()> {
373 self.link.close().await
374 }
375}
376
377fn relay_subscription_id(msg: &NostrRelayMessage) -> Option<String> {
378 match msg {
379 NostrRelayMessage::Event {
380 subscription_id, ..
381 } => Some(subscription_id.to_string()),
382 NostrRelayMessage::EndOfStoredEvents(subscription_id) => Some(subscription_id.to_string()),
383 NostrRelayMessage::Closed {
384 subscription_id, ..
385 } => Some(subscription_id.to_string()),
386 NostrRelayMessage::Count {
387 subscription_id, ..
388 } => Some(subscription_id.to_string()),
389 _ => None,
390 }
391}
392
393#[cfg(test)]
394pub struct MockBluetoothLink {
395 open: std::sync::atomic::AtomicBool,
396 tx: mpsc::Sender<BluetoothFrame>,
397 rx: Mutex<mpsc::Receiver<BluetoothFrame>>,
398}
399
400#[cfg(test)]
401impl MockBluetoothLink {
402 pub fn pair() -> (Arc<Self>, Arc<Self>) {
403 let (tx_a, rx_a) = mpsc::channel(32);
404 let (tx_b, rx_b) = mpsc::channel(32);
405 (
406 Arc::new(Self {
407 open: std::sync::atomic::AtomicBool::new(true),
408 tx: tx_a,
409 rx: Mutex::new(rx_b),
410 }),
411 Arc::new(Self {
412 open: std::sync::atomic::AtomicBool::new(true),
413 tx: tx_b,
414 rx: Mutex::new(rx_a),
415 }),
416 )
417 }
418}
419
420#[cfg(test)]
421#[async_trait]
422impl BluetoothLink for MockBluetoothLink {
423 async fn send(&self, frame: BluetoothFrame) -> Result<()> {
424 use std::sync::atomic::Ordering;
425 if !self.open.load(Ordering::Relaxed) {
426 return Ok(());
427 }
428 self.tx.send(frame).await.map_err(Into::into)
429 }
430
431 async fn recv(&self) -> Option<BluetoothFrame> {
432 self.rx.lock().await.recv().await
433 }
434
435 fn is_open(&self) -> bool {
436 use std::sync::atomic::Ordering;
437 self.open.load(Ordering::Relaxed)
438 }
439
440 async fn close(&self) -> Result<()> {
441 use std::sync::atomic::Ordering;
442 self.open.store(false, Ordering::Relaxed);
443 Ok(())
444 }
445}
446
447#[cfg(test)]
448mod tests {
449 use super::*;
450 use anyhow::anyhow;
451 use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
452 use crate::webrtc::signaling::{ConnectionState, PeerEntry, PeerSignalPath, PeerTransport};
453 use nostr::{EventBuilder, Filter, Keys, Kind};
454 use std::collections::HashSet;
455 use std::sync::atomic::{AtomicBool, Ordering};
456 use std::sync::Arc;
457 use std::time::Instant;
458 use tempfile::TempDir;
459
460 struct TestStore {
461 blobs: HashMap<String, Vec<u8>>,
462 }
463
464 impl ContentStore for TestStore {
465 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
466 Ok(self.blobs.get(hash_hex).cloned())
467 }
468 }
469
470 struct FailingSendLink {
471 open: AtomicBool,
472 }
473
474 #[async_trait]
475 impl BluetoothLink for FailingSendLink {
476 async fn send(&self, _frame: BluetoothFrame) -> Result<()> {
477 Err(anyhow!("send failed"))
478 }
479
480 async fn recv(&self) -> Option<BluetoothFrame> {
481 std::future::pending::<Option<BluetoothFrame>>().await
482 }
483
484 fn is_open(&self) -> bool {
485 self.open.load(Ordering::Relaxed)
486 }
487
488 async fn close(&self) -> Result<()> {
489 self.open.store(false, Ordering::Relaxed);
490 Ok(())
491 }
492 }
493
494 #[tokio::test]
495 async fn bluetooth_peer_round_trips_hash_request_over_mock_link() {
496 let (link_a, link_b) = MockBluetoothLink::pair();
497 let data = b"bluetooth mesh payload".to_vec();
498 let hash_hex = hex::encode(hashtree_core::sha256(&data));
499
500 let requester = BluetoothPeer::new(
501 PeerId::new("peer-a".to_string(), Some("sess-a".to_string())),
502 PeerDirection::Outbound,
503 link_a,
504 None,
505 None,
506 None,
507 None,
508 );
509
510 let mut blobs = HashMap::new();
511 blobs.insert(hash_hex.clone(), data.clone());
512 let responder = BluetoothPeer::new(
513 PeerId::new("peer-b".to_string(), Some("sess-b".to_string())),
514 PeerDirection::Inbound,
515 link_b,
516 Some(Arc::new(TestStore { blobs })),
517 None,
518 None,
519 None,
520 );
521
522 let received = requester
523 .request_with_timeout(&hash_hex, Duration::from_secs(1))
524 .await
525 .expect("request should succeed");
526
527 assert_eq!(received, Some(data));
528 responder.close().await.unwrap();
529 }
530
531 #[tokio::test]
532 async fn bluetooth_peer_records_bidirectional_bytes_in_router_state() {
533 let (link_a, link_b) = MockBluetoothLink::pair();
534 let state = Arc::new(WebRTCState::new());
535 let data = b"bluetooth stats payload".to_vec();
536 let hash_hex = hex::encode(hashtree_core::sha256(&data));
537 let requester_id = PeerId::new("peer-a".to_string(), Some("sess-a".to_string()));
538 let responder_id = PeerId::new("peer-b".to_string(), Some("sess-b".to_string()));
539
540 for peer_id in [&requester_id, &responder_id] {
541 state.peers.write().await.insert(
542 peer_id.to_string(),
543 PeerEntry {
544 peer_id: peer_id.clone(),
545 direction: PeerDirection::Outbound,
546 state: ConnectionState::Connected,
547 last_seen: Instant::now(),
548 peer: None,
549 pool: super::super::types::PeerPool::Other,
550 transport: PeerTransport::Bluetooth,
551 signal_paths: std::collections::BTreeSet::from([PeerSignalPath::Bluetooth]),
552 bytes_sent: 0,
553 bytes_received: 0,
554 },
555 );
556 }
557
558 let requester = BluetoothPeer::new(
559 requester_id.clone(),
560 PeerDirection::Outbound,
561 link_a,
562 None,
563 None,
564 None,
565 Some(state.clone()),
566 );
567
568 let mut blobs = HashMap::new();
569 blobs.insert(hash_hex.clone(), data.clone());
570 let responder = BluetoothPeer::new(
571 responder_id.clone(),
572 PeerDirection::Inbound,
573 link_b,
574 Some(Arc::new(TestStore { blobs })),
575 None,
576 None,
577 Some(state.clone()),
578 );
579
580 let received = requester
581 .request_with_timeout(&hash_hex, Duration::from_secs(1))
582 .await
583 .expect("request should succeed");
584
585 assert_eq!(received, Some(data.clone()));
586 let hash = hex::decode(&hash_hex).expect("valid hash hex");
587 let expected_request_len = encode_request(&DataRequest {
588 h: hash.clone(),
589 htl: BLOB_REQUEST_POLICY.max_htl,
590 q: None,
591 })
592 .expect("request encoding")
593 .len() as u64;
594 let expected_response_len = encode_response(&DataResponse {
595 h: hash,
596 d: data.clone(),
597 })
598 .expect("response encoding")
599 .len() as u64;
600
601 let peers = state.peers.read().await;
602 let requester_stats = peers
603 .get(&requester_id.to_string())
604 .expect("requester stats");
605 let responder_stats = peers
606 .get(&responder_id.to_string())
607 .expect("responder stats");
608 assert_eq!(requester_stats.bytes_sent, expected_request_len);
609 assert_eq!(requester_stats.bytes_received, expected_response_len);
610 assert_eq!(responder_stats.bytes_received, expected_request_len);
611 assert_eq!(responder_stats.bytes_sent, expected_response_len);
612 drop(peers);
613
614 responder.close().await.unwrap();
615 }
616
617 #[tokio::test]
618 async fn bluetooth_peer_round_trips_nostr_queries_over_mock_link() -> Result<()> {
619 let (link_a, link_b) = MockBluetoothLink::pair();
620 let tmp = TempDir::new()?;
621 let graph_store = {
622 let _guard = crate::socialgraph::test_lock();
623 crate::socialgraph::open_social_graph_store_with_mapsize(
624 tmp.path(),
625 Some(128 * 1024 * 1024),
626 )?
627 };
628 let author_keys = Keys::generate();
629 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
630 let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
631 Arc::clone(&backend),
632 0,
633 HashSet::from([author_keys.public_key().to_hex()]),
634 ));
635 let relay = Arc::new(NostrRelay::new(
636 Arc::clone(&backend),
637 tmp.path().to_path_buf(),
638 HashSet::from([author_keys.public_key().to_hex()]),
639 Some(access),
640 NostrRelayConfig {
641 spambox_db_max_bytes: 0,
642 ..Default::default()
643 },
644 )?);
645
646 let requester = BluetoothPeer::new(
647 PeerId::new("peer-a".to_string(), Some("sess-a".to_string())),
648 PeerDirection::Outbound,
649 link_a,
650 None,
651 None,
652 None,
653 None,
654 );
655 let responder = BluetoothPeer::new(
656 PeerId::new("peer-b".to_string(), Some("sess-b".to_string())),
657 PeerDirection::Inbound,
658 link_b,
659 None,
660 Some(relay.clone()),
661 None,
662 None,
663 );
664
665 let event = EventBuilder::new(Kind::TextNote, "bluetooth nostr relay", [])
666 .to_event(&author_keys)?;
667 relay.ingest_trusted_event(event.clone()).await?;
668 tokio::time::sleep(Duration::from_millis(50)).await;
669
670 let events = requester
671 .query_nostr_events(
672 vec![Filter::new()
673 .authors(vec![event.pubkey])
674 .kinds(vec![event.kind])],
675 Duration::from_secs(1),
676 )
677 .await?;
678
679 assert_eq!(events.len(), 1);
680 assert_eq!(events[0].id, event.id);
681 responder.close().await?;
682 Ok(())
683 }
684
685 #[tokio::test]
686 async fn bluetooth_peer_forwards_local_publishes_and_records_bluetooth_provenance() -> Result<()>
687 {
688 let (link_a, link_b) = MockBluetoothLink::pair();
689 let tmp_a = TempDir::new()?;
690 let tmp_b = TempDir::new()?;
691
692 let graph_store_a = {
693 let _guard = crate::socialgraph::test_lock();
694 crate::socialgraph::open_social_graph_store_with_mapsize(
695 tmp_a.path(),
696 Some(128 * 1024 * 1024),
697 )?
698 };
699 let graph_store_b = {
700 let _guard = crate::socialgraph::test_lock();
701 crate::socialgraph::open_social_graph_store_with_mapsize(
702 tmp_b.path(),
703 Some(128 * 1024 * 1024),
704 )?
705 };
706 let author_keys = Keys::generate();
707
708 let backend_a: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store_a.clone();
709 let backend_b: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store_b.clone();
710 let access_a = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
711 Arc::clone(&backend_a),
712 0,
713 HashSet::from([author_keys.public_key().to_hex()]),
714 ));
715 let access_b = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
716 Arc::clone(&backend_b),
717 0,
718 HashSet::from([author_keys.public_key().to_hex()]),
719 ));
720
721 let relay_a = Arc::new(NostrRelay::new(
722 Arc::clone(&backend_a),
723 tmp_a.path().to_path_buf(),
724 HashSet::from([author_keys.public_key().to_hex()]),
725 Some(access_a),
726 NostrRelayConfig {
727 spambox_db_max_bytes: 0,
728 ..Default::default()
729 },
730 )?);
731 let relay_b = Arc::new(NostrRelay::new(
732 Arc::clone(&backend_b),
733 tmp_b.path().to_path_buf(),
734 HashSet::from([author_keys.public_key().to_hex()]),
735 Some(access_b),
736 NostrRelayConfig {
737 spambox_db_max_bytes: 0,
738 ..Default::default()
739 },
740 )?);
741
742 let sender = BluetoothPeer::new(
743 PeerId::new("peer-a".to_string(), Some("sess-a".to_string())),
744 PeerDirection::Outbound,
745 link_a,
746 None,
747 Some(relay_a.clone()),
748 None,
749 None,
750 );
751 let receiver = BluetoothPeer::new(
752 PeerId::new("peer-b".to_string(), Some("sess-b".to_string())),
753 PeerDirection::Inbound,
754 link_b,
755 None,
756 Some(relay_b.clone()),
757 None,
758 None,
759 );
760
761 tokio::time::sleep(Duration::from_millis(50)).await;
762
763 let cid = "ab".repeat(32);
764 let event = EventBuilder::new(
765 Kind::TextNote,
766 "bluetooth publish sync",
767 [nostr::Tag::parse(&["cid", &cid]).unwrap()],
768 )
769 .to_event(&author_keys)?;
770 relay_a.ingest_trusted_event(event.clone()).await?;
771
772 tokio::time::sleep(Duration::from_millis(150)).await;
773
774 let received = relay_b
775 .query_events(
776 &Filter::new()
777 .authors(vec![event.pubkey])
778 .kinds(vec![event.kind]),
779 10,
780 )
781 .await;
782 assert_eq!(
783 received
784 .iter()
785 .filter(|candidate| candidate.id == event.id)
786 .count(),
787 1
788 );
789
790 let bluetooth_events = relay_b.bluetooth_received_events(10).await;
791 assert_eq!(bluetooth_events.len(), 1);
792 assert_eq!(bluetooth_events[0].event_id, event.id.to_hex());
793 assert_eq!(
794 bluetooth_events[0].peer_id.as_deref(),
795 Some("peer-b:sess-b")
796 );
797 assert_eq!(bluetooth_events[0].cid_values, vec![cid]);
798
799 receiver.close().await?;
800 sender.close().await?;
801 Ok(())
802 }
803
804 #[tokio::test]
805 async fn bluetooth_peer_closes_after_local_publish_send_failure() -> Result<()> {
806 let tmp = TempDir::new()?;
807 let graph_store = {
808 let _guard = crate::socialgraph::test_lock();
809 crate::socialgraph::open_social_graph_store_with_mapsize(
810 tmp.path(),
811 Some(128 * 1024 * 1024),
812 )?
813 };
814 let author_keys = Keys::generate();
815 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
816 let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
817 Arc::clone(&backend),
818 0,
819 HashSet::from([author_keys.public_key().to_hex()]),
820 ));
821 let relay = Arc::new(NostrRelay::new(
822 Arc::clone(&backend),
823 tmp.path().to_path_buf(),
824 HashSet::from([author_keys.public_key().to_hex()]),
825 Some(access),
826 NostrRelayConfig {
827 spambox_db_max_bytes: 0,
828 ..Default::default()
829 },
830 )?);
831
832 let peer = BluetoothPeer::new(
833 PeerId::new("peer-a".to_string(), Some("sess-a".to_string())),
834 PeerDirection::Outbound,
835 Arc::new(FailingSendLink {
836 open: AtomicBool::new(true),
837 }),
838 None,
839 Some(relay.clone()),
840 None,
841 None,
842 );
843
844 tokio::time::sleep(Duration::from_millis(50)).await;
845 assert!(peer.is_connected());
846
847 let event = EventBuilder::new(Kind::TextNote, "close stale bluetooth peer", [])
848 .to_event(&author_keys)?;
849 relay.ingest_trusted_event(event).await?;
850
851 tokio::time::sleep(Duration::from_millis(100)).await;
852 assert!(!peer.is_connected());
853 Ok(())
854 }
855}