1pub mod addr;
22pub mod discovery;
23pub mod io;
24pub mod pool;
25pub mod stats;
26
27use super::{
28 ConnectionState, DiscoveredPeer, PacketTx, ReceivedPacket, Transport, TransportAddr,
29 TransportError, TransportId, TransportState, TransportType,
30};
31use crate::config::BleConfig;
32use crate::identity::NodeAddr;
33use addr::BleAddr;
34use discovery::DiscoveryBuffer;
35use io::{BleIo, BleScanner, BleStream};
36use pool::{BleConnection, ConnectionPool};
37use stats::BleStats;
38
39use secp256k1::XOnlyPublicKey;
40use std::collections::HashMap;
41use std::sync::Arc;
42use tokio::sync::Mutex;
43use tokio::task::JoinHandle;
44use tracing::{debug, info, trace, warn};
45
46pub const DEFAULT_PSM: u16 = 0x0085;
50
51#[cfg(all(bluer_available, not(test)))]
56pub type DefaultBleTransport = BleTransport<io::BluerIo>;
57
58#[cfg(any(not(bluer_available), test))]
59pub type DefaultBleTransport = BleTransport<io::MockBleIo>;
60
61pub struct BleTransport<I: BleIo> {
71 transport_id: TransportId,
73 name: Option<String>,
75 config: BleConfig,
77 state: TransportState,
79 io: Arc<I>,
81 pool: Arc<Mutex<ConnectionPool<Arc<I::Stream>>>>,
83 connecting: Arc<Mutex<HashMap<TransportAddr, ConnectingEntry>>>,
85 packet_tx: PacketTx,
87 accept_task: Option<JoinHandle<()>>,
89 scan_probe_task: Option<JoinHandle<()>>,
91 discovery_buffer: Arc<DiscoveryBuffer>,
93 stats: Arc<BleStats>,
95 local_pubkey: Option<[u8; 32]>,
102}
103
104struct ConnectingEntry {
106 task: JoinHandle<()>,
107}
108
109impl<I: BleIo> BleTransport<I> {
110 pub fn new(
112 transport_id: TransportId,
113 name: Option<String>,
114 config: BleConfig,
115 io: I,
116 packet_tx: PacketTx,
117 ) -> Self {
118 let max_conns = config.max_connections();
119 Self {
120 transport_id,
121 name,
122 config,
123 state: TransportState::Configured,
124 io: Arc::new(io),
125 pool: Arc::new(Mutex::new(ConnectionPool::new(max_conns))),
126 connecting: Arc::new(Mutex::new(HashMap::new())),
127 packet_tx,
128 accept_task: None,
129 scan_probe_task: None,
130 discovery_buffer: Arc::new(DiscoveryBuffer::new(transport_id)),
131 stats: Arc::new(BleStats::new()),
132 local_pubkey: None,
133 }
134 }
135
136 pub fn name(&self) -> Option<&str> {
138 self.name.as_deref()
139 }
140
141 pub fn stats(&self) -> &Arc<BleStats> {
143 &self.stats
144 }
145
146 pub fn io(&self) -> &Arc<I> {
148 &self.io
149 }
150
151 pub fn set_local_pubkey(&mut self, pubkey: [u8; 32]) {
157 self.local_pubkey = Some(pubkey);
158 }
159
160 pub async fn start_async(&mut self) -> Result<(), TransportError> {
162 if !self.state.can_start() {
163 return Err(TransportError::AlreadyStarted);
164 }
165 self.state = TransportState::Starting;
166
167 let psm = self.config.psm();
168 let adapter = self.io.adapter_name().to_string();
169
170 let local_node_addr = self.local_pubkey.and_then(|pk| {
172 XOnlyPublicKey::from_slice(&pk)
173 .ok()
174 .map(|xonly| NodeAddr::from_pubkey(&xonly))
175 });
176
177 if self.config.accept_connections() {
179 match self.io.listen(psm).await {
180 Ok(acceptor) => {
181 let pool = Arc::clone(&self.pool);
182 let packet_tx = self.packet_tx.clone();
183 let transport_id = self.transport_id;
184 let stats = Arc::clone(&self.stats);
185 let max_conns = self.config.max_connections();
186
187 self.accept_task = Some(tokio::spawn(accept_loop(
188 acceptor,
189 pool,
190 packet_tx,
191 transport_id,
192 stats,
193 max_conns,
194 self.local_pubkey,
195 Arc::clone(&self.discovery_buffer),
196 local_node_addr,
197 )));
198 debug!(adapter = %adapter, psm = psm, "BLE accept loop started");
199 }
200 Err(e) => {
201 warn!(adapter = %adapter, error = %e, "failed to start BLE listener");
202 self.state = TransportState::Failed;
203 return Err(e);
204 }
205 }
206 }
207
208 if self.config.advertise() {
210 if let Err(e) = self.io.start_advertising().await {
211 warn!(adapter = %adapter, error = %e, "failed to start BLE advertising");
212 } else {
213 self.stats.record_advertisement();
214 debug!(adapter = %adapter, "BLE advertising started (continuous)");
215 }
216 }
217
218 if self.config.scan() {
220 match self.io.start_scanning().await {
221 Ok(scanner) => {
222 self.scan_probe_task = Some(tokio::spawn(scan_probe_loop::<I>(
223 scanner,
224 Arc::clone(&self.io),
225 Arc::clone(&self.pool),
226 Arc::clone(&self.discovery_buffer),
227 Arc::clone(&self.stats),
228 self.local_pubkey,
229 self.config.psm(),
230 self.config.connect_timeout_ms(),
231 self.config.probe_cooldown_secs(),
232 local_node_addr,
233 self.packet_tx.clone(),
234 self.transport_id,
235 )));
236 debug!(adapter = %adapter, "BLE scan+probe loop started");
237 }
238 Err(e) => {
239 warn!(adapter = %adapter, error = %e, "failed to start BLE scanning");
240 }
241 }
242 }
243
244 self.state = TransportState::Up;
245 info!(adapter = %adapter, psm = psm, "BLE transport started");
246 Ok(())
247 }
248
249 pub async fn stop_async(&mut self) -> Result<(), TransportError> {
251 let _ = self.io.stop_advertising().await;
253
254 if let Some(task) = self.accept_task.take() {
256 task.abort();
257 }
258
259 if let Some(task) = self.scan_probe_task.take() {
261 task.abort();
262 }
263
264 {
266 let mut connecting = self.connecting.lock().await;
267 for (_, entry) in connecting.drain() {
268 entry.task.abort();
269 }
270 }
271
272 {
274 let mut pool = self.pool.lock().await;
275 for addr in pool.addrs() {
276 pool.remove(&addr);
277 }
278 }
279
280 self.state = TransportState::Down;
281 info!("BLE transport stopped");
282 Ok(())
283 }
284
285 pub async fn send_async(
292 &self,
293 addr: &TransportAddr,
294 data: &[u8],
295 ) -> Result<usize, TransportError> {
296 let pool = self.pool.lock().await;
297 let conn = match pool.get(addr) {
298 Some(c) => c,
299 None => {
300 drop(pool);
302 let _ = self.connect_async(addr).await;
304 return Err(TransportError::SendFailed("not connected".into()));
305 }
306 };
307
308 let mtu = conn.effective_mtu() as usize;
310 if data.len() > mtu {
311 self.stats.record_mtu_exceeded();
312 return Err(TransportError::MtuExceeded {
313 packet_size: data.len(),
314 mtu: mtu as u16,
315 });
316 }
317
318 match conn.stream.send(data).await {
319 Ok(()) => {
320 self.stats.record_send(data.len());
321 Ok(data.len())
322 }
323 Err(e) => {
324 self.stats.record_send_error();
325 drop(pool);
327 let mut pool = self.pool.lock().await;
328 pool.remove(addr);
329 warn!(addr = %addr, error = %e, "BLE send failed, connection removed");
330 Err(e)
331 }
332 }
333 }
334
335 #[allow(dead_code)]
340 async fn connect_inline(&self, addr: &TransportAddr) -> Result<(), TransportError> {
341 let ble_addr = BleAddr::parse(
342 addr.as_str()
343 .ok_or_else(|| TransportError::InvalidAddress("not valid UTF-8".into()))?,
344 )?;
345
346 let psm = self.config.psm();
347 let timeout_ms = self.config.connect_timeout_ms();
348
349 let stream = match tokio::time::timeout(
350 std::time::Duration::from_millis(timeout_ms),
351 self.io.connect(&ble_addr, psm),
352 )
353 .await
354 {
355 Ok(Ok(stream)) => stream,
356 Ok(Err(e)) => {
357 debug!(addr = %addr, error = %e, "BLE connect-on-send failed");
358 return Err(TransportError::ConnectionRefused);
359 }
360 Err(_) => {
361 self.stats.record_connect_timeout();
362 debug!(addr = %addr, "BLE connect-on-send timeout");
363 return Err(TransportError::Timeout);
364 }
365 };
366
367 if let Some(ref our_pubkey) = self.local_pubkey {
369 match pubkey_exchange(&stream, our_pubkey).await {
370 Ok(peer_pubkey) => {
371 debug!(addr = %addr, "BLE outbound pubkey exchange complete");
372 self.discovery_buffer
373 .add_peer_with_pubkey(&ble_addr, peer_pubkey);
374 }
375 Err(e) => {
376 warn!(addr = %addr, error = %e, "BLE outbound pubkey exchange failed");
377 return Err(e);
378 }
379 }
380 }
381
382 self.promote_connection(addr, &ble_addr, stream).await
383 }
384
385 async fn promote_connection(
389 &self,
390 addr: &TransportAddr,
391 ble_addr: &BleAddr,
392 stream: I::Stream,
393 ) -> Result<(), TransportError> {
394 let send_mtu = stream.send_mtu();
395 let recv_mtu = stream.recv_mtu();
396 let stream = Arc::new(stream);
397
398 let recv_task = tokio::spawn(receive_loop(
399 Arc::clone(&stream),
400 addr.clone(),
401 Arc::clone(&self.pool),
402 self.packet_tx.clone(),
403 self.transport_id,
404 Arc::clone(&self.stats),
405 recv_mtu,
406 ));
407
408 let conn = BleConnection {
409 stream,
410 recv_task: Some(recv_task),
411 send_mtu,
412 recv_mtu,
413 established_at: tokio::time::Instant::now(),
414 is_static: false,
415 addr: ble_addr.clone(),
416 };
417
418 let mut pool = self.pool.lock().await;
419 match pool.insert(addr.clone(), conn) {
420 Ok(Some(evicted)) => {
421 self.stats.record_pool_eviction();
422 debug!(addr = %addr, evicted = %evicted, "BLE connection established (evicted peer)");
423 }
424 Ok(None) => {
425 debug!(addr = %addr, "BLE connection established");
426 }
427 Err(e) => {
428 warn!(addr = %addr, error = %e, "BLE pool full, connection dropped");
429 self.stats.record_connection_rejected();
430 return Err(TransportError::SendFailed("pool full".into()));
431 }
432 }
433 self.stats.record_connection_established();
434 Ok(())
435 }
436
437 pub async fn connect_async(&self, addr: &TransportAddr) -> Result<(), TransportError> {
442 {
444 let pool = self.pool.lock().await;
445 if pool.contains(addr) {
446 return Ok(());
447 }
448 }
449
450 {
452 let connecting = self.connecting.lock().await;
453 if connecting.contains_key(addr) {
454 return Ok(());
455 }
456 }
457
458 let ble_addr = BleAddr::parse(
459 addr.as_str()
460 .ok_or_else(|| TransportError::InvalidAddress("not valid UTF-8".into()))?,
461 )?;
462
463 let io = Arc::clone(&self.io);
464 let pool = Arc::clone(&self.pool);
465 let connecting = Arc::clone(&self.connecting);
466 let packet_tx = self.packet_tx.clone();
467 let transport_id = self.transport_id;
468 let stats = Arc::clone(&self.stats);
469 let psm = self.config.psm();
470 let timeout_ms = self.config.connect_timeout_ms();
471 let addr_clone = addr.clone();
472 let local_pubkey = self.local_pubkey;
473 let discovery_buffer = Arc::clone(&self.discovery_buffer);
474
475 let task = tokio::spawn(async move {
476 let result = tokio::time::timeout(
477 std::time::Duration::from_millis(timeout_ms),
478 io.connect(&ble_addr, psm),
479 )
480 .await;
481
482 connecting.lock().await.remove(&addr_clone);
484
485 match result {
486 Ok(Ok(stream)) => {
487 if let Some(ref our_pubkey) = local_pubkey {
489 match pubkey_exchange(&stream, our_pubkey).await {
490 Ok(peer_pubkey) => {
491 debug!(addr = %addr_clone, "BLE outbound pubkey exchange complete");
492 discovery_buffer.add_peer_with_pubkey(&ble_addr, peer_pubkey);
493 }
494 Err(e) => {
495 warn!(
496 addr = %addr_clone, error = %e,
497 "BLE outbound pubkey exchange failed"
498 );
499 return;
500 }
501 }
502 }
503
504 let send_mtu = stream.send_mtu();
505 let recv_mtu = stream.recv_mtu();
506 let stream = Arc::new(stream);
507
508 let recv_task = tokio::spawn(receive_loop(
509 Arc::clone(&stream),
510 addr_clone.clone(),
511 Arc::clone(&pool),
512 packet_tx,
513 transport_id,
514 Arc::clone(&stats),
515 recv_mtu,
516 ));
517
518 let conn = BleConnection {
519 stream,
520 recv_task: Some(recv_task),
521 send_mtu,
522 recv_mtu,
523 established_at: tokio::time::Instant::now(),
524 is_static: false,
525 addr: ble_addr,
526 };
527
528 let mut pool = pool.lock().await;
529 match pool.insert(addr_clone.clone(), conn) {
530 Ok(Some(evicted)) => {
531 stats.record_pool_eviction();
532 debug!(addr = %addr_clone, evicted = %evicted, "BLE connection established (evicted peer)");
533 }
534 Ok(None) => {
535 debug!(addr = %addr_clone, "BLE connection established");
536 }
537 Err(e) => {
538 warn!(addr = %addr_clone, error = %e, "BLE pool full, connection dropped");
539 stats.record_connection_rejected();
540 return;
541 }
542 }
543 stats.record_connection_established();
544 }
545 Ok(Err(e)) => {
546 debug!(addr = %addr_clone, error = %e, "BLE connect failed");
547 }
548 Err(_) => {
549 stats.record_connect_timeout();
550 debug!(addr = %addr_clone, "BLE connect timeout");
551 }
552 }
553 });
554
555 self.connecting
556 .lock()
557 .await
558 .insert(addr.clone(), ConnectingEntry { task });
559
560 Ok(())
561 }
562
563 pub fn connection_state_sync(&self, addr: &TransportAddr) -> ConnectionState {
565 if let Ok(pool) = self.pool.try_lock()
567 && pool.contains(addr)
568 {
569 return ConnectionState::Connected;
570 }
571
572 if let Ok(connecting) = self.connecting.try_lock()
574 && connecting.contains_key(addr)
575 {
576 return ConnectionState::Connecting;
577 }
578
579 ConnectionState::None
580 }
581
582 pub async fn close_connection_async(&self, addr: &TransportAddr) {
584 let mut pool = self.pool.lock().await;
585 if let Some(conn) = pool.remove(addr) {
586 debug!(addr = %addr, "BLE connection closed");
587 drop(conn); }
589 }
590
591 pub fn link_mtu(&self, addr: &TransportAddr) -> u16 {
593 if let Ok(pool) = self.pool.try_lock()
594 && let Some(conn) = pool.get(addr)
595 {
596 return conn.effective_mtu();
597 }
598 self.config.mtu()
599 }
600}
601
602impl<I: BleIo> Transport for BleTransport<I> {
603 fn transport_id(&self) -> TransportId {
604 self.transport_id
605 }
606
607 fn transport_type(&self) -> &TransportType {
608 &TransportType::BLE
609 }
610
611 fn state(&self) -> TransportState {
612 self.state
613 }
614
615 fn mtu(&self) -> u16 {
616 self.config.mtu()
617 }
618
619 fn link_mtu(&self, addr: &TransportAddr) -> u16 {
620 self.link_mtu(addr)
621 }
622
623 fn start(&mut self) -> Result<(), TransportError> {
624 Err(TransportError::NotSupported(
625 "use start_async() for BLE transport".into(),
626 ))
627 }
628
629 fn stop(&mut self) -> Result<(), TransportError> {
630 Err(TransportError::NotSupported(
631 "use stop_async() for BLE transport".into(),
632 ))
633 }
634
635 fn send(&self, _addr: &TransportAddr, _data: &[u8]) -> Result<(), TransportError> {
636 Err(TransportError::NotSupported(
637 "use send_async() for BLE transport".into(),
638 ))
639 }
640
641 fn discover(&self) -> Result<Vec<DiscoveredPeer>, TransportError> {
642 Ok(self.discovery_buffer.take())
643 }
644
645 fn auto_connect(&self) -> bool {
646 self.config.auto_connect()
647 }
648
649 fn accept_connections(&self) -> bool {
650 self.config.accept_connections()
651 }
652
653 fn close_connection(&self, _addr: &TransportAddr) {
654 }
656}
657
658const PUBKEY_EXCHANGE_PREFIX: u8 = 0x00;
667
668const PUBKEY_EXCHANGE_SIZE: usize = 33;
670
671const PUBKEY_EXCHANGE_TIMEOUT_SECS: u64 = 5;
677
678async fn pubkey_exchange<S: BleStream>(
683 stream: &S,
684 local_pubkey: &[u8; 32],
685) -> Result<XOnlyPublicKey, TransportError> {
686 let mut msg = [0u8; PUBKEY_EXCHANGE_SIZE];
688 msg[0] = PUBKEY_EXCHANGE_PREFIX;
689 msg[1..].copy_from_slice(local_pubkey);
690 stream.send(&msg).await?;
691
692 let mut buf = [0u8; PUBKEY_EXCHANGE_SIZE];
694 let timeout = std::time::Duration::from_secs(PUBKEY_EXCHANGE_TIMEOUT_SECS);
695 let n = match tokio::time::timeout(timeout, stream.recv(&mut buf)).await {
696 Ok(result) => result?,
697 Err(_) => return Err(TransportError::Timeout),
698 };
699 if n != PUBKEY_EXCHANGE_SIZE {
700 return Err(TransportError::RecvFailed(format!(
701 "pubkey exchange: expected {} bytes, got {}",
702 PUBKEY_EXCHANGE_SIZE, n
703 )));
704 }
705 if buf[0] != PUBKEY_EXCHANGE_PREFIX {
706 return Err(TransportError::RecvFailed(format!(
707 "pubkey exchange: bad prefix 0x{:02X}",
708 buf[0]
709 )));
710 }
711
712 XOnlyPublicKey::from_slice(&buf[1..])
713 .map_err(|e| TransportError::RecvFailed(format!("pubkey exchange: invalid key: {}", e)))
714}
715
716#[allow(clippy::too_many_arguments)]
723async fn accept_loop<A>(
724 mut acceptor: A,
725 pool: Arc<Mutex<ConnectionPool<Arc<A::Stream>>>>,
726 packet_tx: PacketTx,
727 transport_id: TransportId,
728 stats: Arc<BleStats>,
729 _max_conns: usize,
730 local_pubkey: Option<[u8; 32]>,
731 discovery_buffer: Arc<DiscoveryBuffer>,
732 local_node_addr: Option<NodeAddr>,
733) where
734 A: io::BleAcceptor,
735 A::Stream: 'static,
736{
737 loop {
738 match acceptor.accept().await {
739 Ok(stream) => {
740 let addr = stream.remote_addr().clone();
741 let ta = addr.to_transport_addr();
742
743 {
745 let pool_guard = pool.lock().await;
746 if pool_guard.contains(&ta) {
747 debug!(addr = %ta, "BLE inbound: already connected, skipping");
748 continue;
749 }
750 }
751
752 let send_mtu = stream.send_mtu();
753 let recv_mtu = stream.recv_mtu();
754
755 if let Some(ref our_pubkey) = local_pubkey {
757 match pubkey_exchange(&stream, our_pubkey).await {
758 Ok(peer_pubkey) => {
759 debug!(addr = %ta, "BLE inbound pubkey exchange complete");
760 discovery_buffer.add_peer_with_pubkey(&addr, peer_pubkey);
761
762 if let Some(ref our_addr) = local_node_addr {
766 let peer_addr = NodeAddr::from_pubkey(&peer_pubkey);
767 if our_addr < &peer_addr {
768 debug!(
769 addr = %ta,
770 "BLE inbound tie-breaker: dropping (our addr < peer, outbound wins)"
771 );
772 continue;
773 }
774 }
775 }
776 Err(e) => {
777 debug!(addr = %ta, error = %e, "BLE inbound pubkey exchange failed");
778 continue;
779 }
780 }
781 }
782
783 let stream = Arc::new(stream);
784
785 let recv_task = tokio::spawn(receive_loop(
787 Arc::clone(&stream),
788 ta.clone(),
789 Arc::clone(&pool),
790 packet_tx.clone(),
791 transport_id,
792 Arc::clone(&stats),
793 recv_mtu,
794 ));
795
796 let conn = BleConnection {
797 stream,
798 recv_task: Some(recv_task),
799 send_mtu,
800 recv_mtu,
801 established_at: tokio::time::Instant::now(),
802 is_static: false,
803 addr,
804 };
805
806 let mut pool_guard = pool.lock().await;
807 match pool_guard.insert(ta.clone(), conn) {
808 Ok(Some(evicted)) => {
809 stats.record_pool_eviction();
810 info!(addr = %ta, evicted = %evicted, "BLE inbound accepted (evicted peer)");
811 }
812 Ok(None) => {
813 info!(addr = %ta, send_mtu, recv_mtu, "BLE inbound connection accepted");
814 }
815 Err(e) => {
816 warn!(addr = %ta, error = %e, "BLE pool full, inbound connection rejected");
817 stats.record_connection_rejected();
818 continue;
819 }
820 }
821 stats.record_connection_accepted();
822 }
823 Err(e) => {
824 warn!(error = %e, "BLE accept error");
825 break;
826 }
827 }
828 }
829}
830
831async fn receive_loop<S: BleStream>(
833 stream: Arc<S>,
834 addr: TransportAddr,
835 pool: Arc<Mutex<ConnectionPool<Arc<S>>>>,
836 packet_tx: PacketTx,
837 transport_id: TransportId,
838 stats: Arc<BleStats>,
839 recv_mtu: u16,
840) {
841 let mut buf = vec![0u8; recv_mtu as usize];
842 loop {
843 match stream.recv(&mut buf).await {
844 Ok(0) => {
845 debug!(addr = %addr, "BLE connection closed by peer");
846 break;
847 }
848 Ok(n) => {
849 stats.record_recv(n);
850 let packet = ReceivedPacket::new(transport_id, addr.clone(), buf[..n].to_vec());
851 if packet_tx.send(packet).is_err() {
852 trace!("BLE packet_tx closed, stopping receive loop");
853 break;
854 }
855 }
856 Err(e) => {
857 debug!(addr = %addr, error = %e, "BLE receive error");
858 stats.record_recv_error();
859 break;
860 }
861 }
862 }
863
864 let mut pool = pool.lock().await;
866 pool.remove(&addr);
867}
868
869#[allow(clippy::too_many_arguments)]
882async fn scan_probe_loop<I: io::BleIo>(
883 mut scanner: I::Scanner,
884 io: Arc<I>,
885 pool: Arc<Mutex<ConnectionPool<Arc<I::Stream>>>>,
886 buffer: Arc<DiscoveryBuffer>,
887 stats: Arc<BleStats>,
888 local_pubkey: Option<[u8; 32]>,
889 psm: u16,
890 connect_timeout_ms: u64,
891 cooldown_secs: u64,
892 local_node_addr: Option<NodeAddr>,
893 packet_tx: PacketTx,
894 transport_id: TransportId,
895) {
896 let mut last_probed: HashMap<BleAddr, tokio::time::Instant> = HashMap::new();
898 let mut pending_addrs: Vec<BleAddr> = Vec::new();
901 let cooldown = std::time::Duration::from_secs(cooldown_secs);
902 let retry_interval = tokio::time::interval(std::time::Duration::from_secs(cooldown_secs));
903 tokio::pin!(retry_interval);
904 retry_interval.tick().await; loop {
907 let addr = tokio::select! {
909 result = scanner.next() => {
910 match result {
911 Some(a) => a,
912 None => {
913 debug!("BLE scanner ended");
914 break;
915 }
916 }
917 }
918 _ = retry_interval.tick() => {
919 let pool_guard = pool.lock().await;
921 pending_addrs.retain(|a| !pool_guard.contains(&a.to_transport_addr()));
922 drop(pool_guard);
923 if let Some(a) = pending_addrs.first().cloned() {
924 a
925 } else {
926 continue;
927 }
928 }
929 };
930
931 trace!(addr = %addr, "BLE scan result");
932 stats.record_scan_result();
933
934 {
936 let pool_guard = pool.lock().await;
937 if pool_guard.contains(&addr.to_transport_addr()) {
938 pending_addrs.retain(|a| a != &addr);
939 continue;
940 }
941 }
942
943 if !pending_addrs.contains(&addr) {
945 pending_addrs.push(addr.clone());
946 }
947
948 if last_probed
950 .get(&addr)
951 .is_some_and(|last| last.elapsed() < cooldown)
952 {
953 continue;
954 }
955
956 last_probed.insert(addr.clone(), tokio::time::Instant::now());
958
959 let our_pubkey = match local_pubkey {
961 Some(pk) => pk,
962 None => {
963 buffer.add_peer(&addr);
964 continue;
965 }
966 };
967
968 let stream = match tokio::time::timeout(
970 std::time::Duration::from_millis(connect_timeout_ms),
971 io.connect(&addr, psm),
972 )
973 .await
974 {
975 Ok(Ok(s)) => s,
976 Ok(Err(e)) => {
977 debug!(addr = %addr, error = %e, "BLE probe connect failed");
978 continue;
979 }
980 Err(_) => {
981 debug!(addr = %addr, "BLE probe connect timeout");
982 stats.record_connect_timeout();
983 continue;
984 }
985 };
986
987 let ta = addr.to_transport_addr();
989 match pubkey_exchange(&stream, &our_pubkey).await {
990 Ok(peer_pubkey) => {
991 debug!(addr = %addr, "BLE probe complete");
992
993 if let Some(ref our_addr) = local_node_addr {
996 let peer_addr = NodeAddr::from_pubkey(&peer_pubkey);
997 if our_addr >= &peer_addr {
998 debug!(
999 addr = %addr,
1000 "BLE probe tie-breaker: yielding to peer's outbound"
1001 );
1002 buffer.add_peer_with_pubkey(&addr, peer_pubkey);
1003 continue;
1004 }
1005 }
1006
1007 let send_mtu = stream.send_mtu();
1009 let recv_mtu = stream.recv_mtu();
1010 let stream = Arc::new(stream);
1011
1012 let recv_task = tokio::spawn(receive_loop(
1013 Arc::clone(&stream),
1014 ta.clone(),
1015 Arc::clone(&pool),
1016 packet_tx.clone(),
1017 transport_id,
1018 Arc::clone(&stats),
1019 recv_mtu,
1020 ));
1021
1022 let conn = BleConnection {
1023 stream,
1024 recv_task: Some(recv_task),
1025 send_mtu,
1026 recv_mtu,
1027 established_at: tokio::time::Instant::now(),
1028 is_static: false,
1029 addr: addr.clone(),
1030 };
1031
1032 let mut pool_guard = pool.lock().await;
1033 match pool_guard.insert(ta.clone(), conn) {
1034 Ok(Some(evicted)) => {
1035 stats.record_pool_eviction();
1036 debug!(addr = %ta, evicted = %evicted, "BLE probe promoted (evicted peer)");
1037 }
1038 Ok(None) => {
1039 debug!(addr = %ta, "BLE probe promoted to pool");
1040 }
1041 Err(e) => {
1042 warn!(addr = %ta, error = %e, "BLE pool full, probe connection dropped");
1043 stats.record_connection_rejected();
1044 }
1045 }
1046 drop(pool_guard);
1047 stats.record_connection_established();
1048 pending_addrs.retain(|a| a != &addr);
1049
1050 buffer.add_peer_with_pubkey(&addr, peer_pubkey);
1052 }
1053 Err(e) => {
1054 debug!(addr = %addr, error = %e, "BLE probe pubkey exchange failed");
1055 }
1056 }
1057 }
1058}
1059
1060#[cfg(test)]
1065mod tests {
1066 use super::*;
1067 use io::MockBleIo;
1068
1069 fn test_addr(n: u8) -> BleAddr {
1070 BleAddr {
1071 adapter: "hci0".to_string(),
1072 device: [0xAA, 0xBB, 0xCC, 0xDD, 0xEE, n],
1073 }
1074 }
1075
1076 fn make_transport(io: MockBleIo) -> (BleTransport<MockBleIo>, crate::transport::PacketRx) {
1077 let (tx, rx) = crate::transport::packet_channel(64);
1078 let config = BleConfig::default();
1079 let transport = BleTransport::new(TransportId::new(1), None, config, io, tx);
1080 (transport, rx)
1081 }
1082
1083 #[test]
1084 fn test_transport_type() {
1085 let io = MockBleIo::new("hci0", test_addr(1));
1086 let (transport, _rx) = make_transport(io);
1087 assert_eq!(transport.transport_type().name, "ble");
1088 assert!(transport.transport_type().connection_oriented);
1089 assert!(transport.transport_type().reliable);
1090 }
1091
1092 #[test]
1093 fn test_transport_initial_state() {
1094 let io = MockBleIo::new("hci0", test_addr(1));
1095 let (transport, _rx) = make_transport(io);
1096 assert_eq!(transport.state(), TransportState::Configured);
1097 }
1098
1099 #[test]
1100 fn test_transport_default_mtu() {
1101 let io = MockBleIo::new("hci0", test_addr(1));
1102 let (transport, _rx) = make_transport(io);
1103 assert_eq!(transport.mtu(), 2048);
1104 }
1105
1106 #[tokio::test]
1107 async fn test_transport_start_stop() {
1108 let io = MockBleIo::new("hci0", test_addr(1));
1109 let (mut transport, _rx) = make_transport(io);
1110 transport.start_async().await.unwrap();
1111 assert_eq!(transport.state(), TransportState::Up);
1112
1113 transport.stop_async().await.unwrap();
1114 assert_eq!(transport.state(), TransportState::Down);
1115 }
1116
1117 #[tokio::test(start_paused = true)]
1118 async fn test_scan_discovers_peers() {
1119 let io = MockBleIo::new("hci0", test_addr(1));
1120 let (mut transport, _rx) = make_transport(io);
1121 transport.start_async().await.unwrap();
1122
1123 transport.io.inject_scan_result(test_addr(2)).await;
1125 transport.io.inject_scan_result(test_addr(3)).await;
1126
1127 tokio::task::yield_now().await;
1129 tokio::time::advance(std::time::Duration::from_secs(6)).await;
1131 tokio::task::yield_now().await;
1133
1134 let peers = transport.discovery_buffer.take();
1136 assert_eq!(peers.len(), 2);
1137 }
1138
1139 #[tokio::test(start_paused = true)]
1140 async fn test_scan_deduplicates() {
1141 let io = MockBleIo::new("hci0", test_addr(1));
1142 let (mut transport, _rx) = make_transport(io);
1143 transport.start_async().await.unwrap();
1144
1145 transport.io.inject_scan_result(test_addr(2)).await;
1147 transport.io.inject_scan_result(test_addr(2)).await;
1148
1149 tokio::task::yield_now().await;
1151 tokio::time::advance(std::time::Duration::from_secs(6)).await;
1152 tokio::task::yield_now().await;
1153
1154 let peers = transport.discovery_buffer.take();
1155 assert_eq!(peers.len(), 1);
1156 }
1157
1158 #[test]
1159 fn test_transport_auto_connect_default() {
1160 let io = MockBleIo::new("hci0", test_addr(1));
1161 let (transport, _rx) = make_transport(io);
1162 assert!(!transport.auto_connect());
1163 }
1164
1165 #[test]
1166 fn test_connection_state_none() {
1167 let io = MockBleIo::new("hci0", test_addr(1));
1168 let (transport, _rx) = make_transport(io);
1169 let addr = test_addr(2).to_transport_addr();
1170 assert_eq!(
1171 transport.connection_state_sync(&addr),
1172 ConnectionState::None
1173 );
1174 }
1175
1176 #[test]
1179 fn test_tiebreaker_convention() {
1180 use secp256k1::{Secp256k1, SecretKey};
1181
1182 let secp = Secp256k1::new();
1183 let sk_a = SecretKey::from_slice(&[1u8; 32]).unwrap();
1184 let sk_b = SecretKey::from_slice(&[2u8; 32]).unwrap();
1185 let (pk_a, _) = sk_a.public_key(&secp).x_only_public_key();
1186 let (pk_b, _) = sk_b.public_key(&secp).x_only_public_key();
1187
1188 let addr_a = NodeAddr::from_pubkey(&pk_a);
1189 let addr_b = NodeAddr::from_pubkey(&pk_b);
1190
1191 let (smaller, larger) = if addr_a < addr_b {
1193 (addr_a, addr_b)
1194 } else {
1195 (addr_b, addr_a)
1196 };
1197
1198 assert!(smaller < larger, "test setup: smaller < larger");
1201
1202 }
1206}