Skip to main content

fips_core/transport/ble/
mod.rs

1//! BLE L2CAP Transport Implementation
2//!
3//! Provides BLE-based transport for FIPS peer communication using L2CAP
4//! Connection-Oriented Channels (CoC) in SeqPacket mode. L2CAP CoC
5//! preserves message boundaries (unlike TCP byte streams), so no FMP
6//! framing is needed — each send/recv is one FIPS packet.
7//!
8//! ## Architecture
9//!
10//! Transport logic (pool, discovery, lifecycle) is separated from the
11//! BlueZ/bluer stack via the `BleIo` trait. `BluerIo` provides the real
12//! implementation (behind `cfg(bluer_available)`); `MockBleIo` provides
13//! an in-memory test double for CI without hardware.
14//!
15//! ## Connection Pool
16//!
17//! BLE hardware limits concurrent connections (typically 4-10). The pool
18//! enforces a configurable maximum (default 7) with priority eviction:
19//! static (configured) peers get priority over discovered peers.
20
21pub mod addr;
22pub mod discovery;
23pub mod io;
24pub mod pool;
25pub mod stats;
26
27use super::{
28    ConnectionState, DiscoveredPeer, PacketTx, ReceivedPacket, Transport, TransportAddr,
29    TransportError, TransportId, TransportState, TransportType,
30};
31use crate::config::BleConfig;
32use crate::identity::NodeAddr;
33use addr::BleAddr;
34use discovery::DiscoveryBuffer;
35use io::{BleIo, BleScanner, BleStream};
36use pool::{BleConnection, ConnectionPool};
37use stats::BleStats;
38
39use secp256k1::XOnlyPublicKey;
40use std::collections::HashMap;
41use std::sync::Arc;
42use tokio::sync::Mutex;
43use tokio::task::JoinHandle;
44use tracing::{debug, info, trace, warn};
45
46/// Default FIPS L2CAP PSM (Protocol Service Multiplexer).
47///
48/// 0x0085 (133) is in the dynamic range (0x0080-0x00FF).
49pub const DEFAULT_PSM: u16 = 0x0085;
50
51/// Concrete BLE transport type for use in TransportHandle.
52///
53/// Production builds on glibc-linux use `BluerIo` (real BlueZ stack).
54/// Test builds, musl-linux, and non-Linux platforms use `MockBleIo`.
55#[cfg(all(bluer_available, not(test)))]
56pub type DefaultBleTransport = BleTransport<io::BluerIo>;
57
58#[cfg(any(not(bluer_available), test))]
59pub type DefaultBleTransport = BleTransport<io::MockBleIo>;
60
61// ============================================================================
62// BLE Transport
63// ============================================================================
64
65/// BLE transport for FIPS.
66///
67/// Provides connection-oriented, reliable delivery over BLE L2CAP CoC.
68/// Each peer has its own L2CAP connection; the pool enforces hardware
69/// connection limits with priority eviction.
70pub struct BleTransport<I: BleIo> {
71    /// Unique transport identifier.
72    transport_id: TransportId,
73    /// Optional instance name.
74    name: Option<String>,
75    /// Configuration.
76    config: BleConfig,
77    /// Current state.
78    state: TransportState,
79    /// BLE I/O implementation (BluerIo or MockBleIo).
80    io: Arc<I>,
81    /// Established connection pool.
82    pool: Arc<Mutex<ConnectionPool<Arc<I::Stream>>>>,
83    /// Pending connection attempts.
84    connecting: Arc<Mutex<HashMap<TransportAddr, ConnectingEntry>>>,
85    /// Channel for delivering received packets to Node.
86    packet_tx: PacketTx,
87    /// Accept loop task handle.
88    accept_task: Option<JoinHandle<()>>,
89    /// Combined scan + probe loop task handle.
90    scan_probe_task: Option<JoinHandle<()>>,
91    /// Discovery buffer for discovered peers.
92    discovery_buffer: Arc<DiscoveryBuffer>,
93    /// Transport statistics.
94    stats: Arc<BleStats>,
95    /// Our public key for pre-handshake identity exchange.
96    ///
97    /// BLE advertisements carry only the FIPS UUID, not the pubkey.
98    /// After L2CAP connection, both sides exchange `[0x00][pubkey:32]`
99    /// so the node layer can initiate the IK handshake.
100    /// Temporary — removed when FMP switches to XX.
101    local_pubkey: Option<[u8; 32]>,
102}
103
104/// A pending background connection attempt.
105struct ConnectingEntry {
106    task: JoinHandle<()>,
107}
108
109impl<I: BleIo> BleTransport<I> {
110    /// Create a new BLE transport.
111    pub fn new(
112        transport_id: TransportId,
113        name: Option<String>,
114        config: BleConfig,
115        io: I,
116        packet_tx: PacketTx,
117    ) -> Self {
118        let max_conns = config.max_connections();
119        Self {
120            transport_id,
121            name,
122            config,
123            state: TransportState::Configured,
124            io: Arc::new(io),
125            pool: Arc::new(Mutex::new(ConnectionPool::new(max_conns))),
126            connecting: Arc::new(Mutex::new(HashMap::new())),
127            packet_tx,
128            accept_task: None,
129            scan_probe_task: None,
130            discovery_buffer: Arc::new(DiscoveryBuffer::new(transport_id)),
131            stats: Arc::new(BleStats::new()),
132            local_pubkey: None,
133        }
134    }
135
136    /// Get the instance name.
137    pub fn name(&self) -> Option<&str> {
138        self.name.as_deref()
139    }
140
141    /// Get the transport statistics.
142    pub fn stats(&self) -> &Arc<BleStats> {
143        &self.stats
144    }
145
146    /// Get the I/O implementation (for test injection).
147    pub fn io(&self) -> &Arc<I> {
148        &self.io
149    }
150
151    /// Set the local public key for pre-handshake identity exchange.
152    ///
153    /// Must be called before `start_async()`. Without this, BLE
154    /// connections skip the pubkey exchange and discovered peers
155    /// won't have identity information for auto-connect.
156    pub fn set_local_pubkey(&mut self, pubkey: [u8; 32]) {
157        self.local_pubkey = Some(pubkey);
158    }
159
160    /// Start the transport asynchronously.
161    pub async fn start_async(&mut self) -> Result<(), TransportError> {
162        if !self.state.can_start() {
163            return Err(TransportError::AlreadyStarted);
164        }
165        self.state = TransportState::Starting;
166
167        let psm = self.config.psm();
168        let adapter = self.io.adapter_name().to_string();
169
170        // Pre-compute local NodeAddr for cross-probe tie-breaking
171        let local_node_addr = self.local_pubkey.and_then(|pk| {
172            XOnlyPublicKey::from_slice(&pk)
173                .ok()
174                .map(|xonly| NodeAddr::from_pubkey(&xonly))
175        });
176
177        // Start L2CAP listener for inbound connections
178        if self.config.accept_connections() {
179            match self.io.listen(psm).await {
180                Ok(acceptor) => {
181                    let pool = Arc::clone(&self.pool);
182                    let packet_tx = self.packet_tx.clone();
183                    let transport_id = self.transport_id;
184                    let stats = Arc::clone(&self.stats);
185                    let max_conns = self.config.max_connections();
186
187                    self.accept_task = Some(tokio::spawn(accept_loop(
188                        acceptor,
189                        pool,
190                        packet_tx,
191                        transport_id,
192                        stats,
193                        max_conns,
194                        self.local_pubkey,
195                        Arc::clone(&self.discovery_buffer),
196                        local_node_addr,
197                    )));
198                    debug!(adapter = %adapter, psm = psm, "BLE accept loop started");
199                }
200                Err(e) => {
201                    warn!(adapter = %adapter, error = %e, "failed to start BLE listener");
202                    self.state = TransportState::Failed;
203                    return Err(e);
204                }
205            }
206        }
207
208        // Start continuous advertising
209        if self.config.advertise() {
210            if let Err(e) = self.io.start_advertising().await {
211                warn!(adapter = %adapter, error = %e, "failed to start BLE advertising");
212            } else {
213                self.stats.record_advertisement();
214                debug!(adapter = %adapter, "BLE advertising started (continuous)");
215            }
216        }
217
218        // Start combined scan + probe loop
219        if self.config.scan() {
220            match self.io.start_scanning().await {
221                Ok(scanner) => {
222                    self.scan_probe_task = Some(tokio::spawn(scan_probe_loop::<I>(
223                        scanner,
224                        Arc::clone(&self.io),
225                        Arc::clone(&self.pool),
226                        Arc::clone(&self.discovery_buffer),
227                        Arc::clone(&self.stats),
228                        self.local_pubkey,
229                        self.config.psm(),
230                        self.config.connect_timeout_ms(),
231                        self.config.probe_cooldown_secs(),
232                        local_node_addr,
233                        self.packet_tx.clone(),
234                        self.transport_id,
235                    )));
236                    debug!(adapter = %adapter, "BLE scan+probe loop started");
237                }
238                Err(e) => {
239                    warn!(adapter = %adapter, error = %e, "failed to start BLE scanning");
240                }
241            }
242        }
243
244        self.state = TransportState::Up;
245        info!(adapter = %adapter, psm = psm, "BLE transport started");
246        Ok(())
247    }
248
249    /// Stop the transport asynchronously.
250    pub async fn stop_async(&mut self) -> Result<(), TransportError> {
251        // Stop advertising
252        let _ = self.io.stop_advertising().await;
253
254        // Abort accept loop
255        if let Some(task) = self.accept_task.take() {
256            task.abort();
257        }
258
259        // Abort scan+probe loop
260        if let Some(task) = self.scan_probe_task.take() {
261            task.abort();
262        }
263
264        // Drain connecting pool
265        {
266            let mut connecting = self.connecting.lock().await;
267            for (_, entry) in connecting.drain() {
268                entry.task.abort();
269            }
270        }
271
272        // Drain established connections (recv tasks aborted via Drop)
273        {
274            let mut pool = self.pool.lock().await;
275            for addr in pool.addrs() {
276                pool.remove(&addr);
277            }
278        }
279
280        self.state = TransportState::Down;
281        info!("BLE transport stopped");
282        Ok(())
283    }
284
285    /// Send data to a remote BLE address.
286    ///
287    /// If no connection exists, triggers a background connect and fails
288    /// fast. The next send retry (typically 1s later for handshake msg1)
289    /// will find the connection established. This avoids blocking the
290    /// event loop on L2CAP connect (up to 10s).
291    pub async fn send_async(
292        &self,
293        addr: &TransportAddr,
294        data: &[u8],
295    ) -> Result<usize, TransportError> {
296        let pool = self.pool.lock().await;
297        let conn = match pool.get(addr) {
298            Some(c) => c,
299            None => {
300                // Drop pool lock before triggering background connect
301                drop(pool);
302                // Fire-and-forget: connect_async spawns a background task
303                let _ = self.connect_async(addr).await;
304                return Err(TransportError::SendFailed("not connected".into()));
305            }
306        };
307
308        // MTU check
309        let mtu = conn.effective_mtu() as usize;
310        if data.len() > mtu {
311            self.stats.record_mtu_exceeded();
312            return Err(TransportError::MtuExceeded {
313                packet_size: data.len(),
314                mtu: mtu as u16,
315            });
316        }
317
318        match conn.stream.send(data).await {
319            Ok(()) => {
320                self.stats.record_send(data.len());
321                Ok(data.len())
322            }
323            Err(e) => {
324                self.stats.record_send_error();
325                // Drop pool lock before removing to avoid deadlock
326                drop(pool);
327                let mut pool = self.pool.lock().await;
328                pool.remove(addr);
329                warn!(addr = %addr, error = %e, "BLE send failed, connection removed");
330                Err(e)
331            }
332        }
333    }
334
335    /// Connect to a remote BLE device inline (blocking the caller).
336    ///
337    /// Not used in normal operation (send_async fails fast instead).
338    /// Retained for manual debugging / testing scenarios.
339    #[allow(dead_code)]
340    async fn connect_inline(&self, addr: &TransportAddr) -> Result<(), TransportError> {
341        let ble_addr = BleAddr::parse(
342            addr.as_str()
343                .ok_or_else(|| TransportError::InvalidAddress("not valid UTF-8".into()))?,
344        )?;
345
346        let psm = self.config.psm();
347        let timeout_ms = self.config.connect_timeout_ms();
348
349        let stream = match tokio::time::timeout(
350            std::time::Duration::from_millis(timeout_ms),
351            self.io.connect(&ble_addr, psm),
352        )
353        .await
354        {
355            Ok(Ok(stream)) => stream,
356            Ok(Err(e)) => {
357                debug!(addr = %addr, error = %e, "BLE connect-on-send failed");
358                return Err(TransportError::ConnectionRefused);
359            }
360            Err(_) => {
361                self.stats.record_connect_timeout();
362                debug!(addr = %addr, "BLE connect-on-send timeout");
363                return Err(TransportError::Timeout);
364            }
365        };
366
367        // Pre-handshake pubkey exchange (temporary, pre-XX)
368        if let Some(ref our_pubkey) = self.local_pubkey {
369            match pubkey_exchange(&stream, our_pubkey).await {
370                Ok(peer_pubkey) => {
371                    debug!(addr = %addr, "BLE outbound pubkey exchange complete");
372                    self.discovery_buffer
373                        .add_peer_with_pubkey(&ble_addr, peer_pubkey);
374                }
375                Err(e) => {
376                    warn!(addr = %addr, error = %e, "BLE outbound pubkey exchange failed");
377                    return Err(e);
378                }
379            }
380        }
381
382        self.promote_connection(addr, &ble_addr, stream).await
383    }
384
385    /// Promote a newly established stream into the connection pool.
386    ///
387    /// Spawns the receive loop and inserts into the pool with eviction.
388    async fn promote_connection(
389        &self,
390        addr: &TransportAddr,
391        ble_addr: &BleAddr,
392        stream: I::Stream,
393    ) -> Result<(), TransportError> {
394        let send_mtu = stream.send_mtu();
395        let recv_mtu = stream.recv_mtu();
396        let stream = Arc::new(stream);
397
398        let recv_task = tokio::spawn(receive_loop(
399            Arc::clone(&stream),
400            addr.clone(),
401            Arc::clone(&self.pool),
402            self.packet_tx.clone(),
403            self.transport_id,
404            Arc::clone(&self.stats),
405            recv_mtu,
406        ));
407
408        let conn = BleConnection {
409            stream,
410            recv_task: Some(recv_task),
411            send_mtu,
412            recv_mtu,
413            established_at: tokio::time::Instant::now(),
414            is_static: false,
415            addr: ble_addr.clone(),
416        };
417
418        let mut pool = self.pool.lock().await;
419        match pool.insert(addr.clone(), conn) {
420            Ok(Some(evicted)) => {
421                self.stats.record_pool_eviction();
422                debug!(addr = %addr, evicted = %evicted, "BLE connection established (evicted peer)");
423            }
424            Ok(None) => {
425                debug!(addr = %addr, "BLE connection established");
426            }
427            Err(e) => {
428                warn!(addr = %addr, error = %e, "BLE pool full, connection dropped");
429                self.stats.record_connection_rejected();
430                return Err(TransportError::SendFailed("pool full".into()));
431            }
432        }
433        self.stats.record_connection_established();
434        Ok(())
435    }
436
437    /// Initiate a non-blocking connection to a remote BLE device.
438    ///
439    /// Spawns a background task that connects with timeout and promotes
440    /// to the pool on success. Poll `connection_state_sync()` to check.
441    pub async fn connect_async(&self, addr: &TransportAddr) -> Result<(), TransportError> {
442        // Already connected?
443        {
444            let pool = self.pool.lock().await;
445            if pool.contains(addr) {
446                return Ok(());
447            }
448        }
449
450        // Already connecting?
451        {
452            let connecting = self.connecting.lock().await;
453            if connecting.contains_key(addr) {
454                return Ok(());
455            }
456        }
457
458        let ble_addr = BleAddr::parse(
459            addr.as_str()
460                .ok_or_else(|| TransportError::InvalidAddress("not valid UTF-8".into()))?,
461        )?;
462
463        let io = Arc::clone(&self.io);
464        let pool = Arc::clone(&self.pool);
465        let connecting = Arc::clone(&self.connecting);
466        let packet_tx = self.packet_tx.clone();
467        let transport_id = self.transport_id;
468        let stats = Arc::clone(&self.stats);
469        let psm = self.config.psm();
470        let timeout_ms = self.config.connect_timeout_ms();
471        let addr_clone = addr.clone();
472        let local_pubkey = self.local_pubkey;
473        let discovery_buffer = Arc::clone(&self.discovery_buffer);
474
475        let task = tokio::spawn(async move {
476            let result = tokio::time::timeout(
477                std::time::Duration::from_millis(timeout_ms),
478                io.connect(&ble_addr, psm),
479            )
480            .await;
481
482            // Remove from connecting pool
483            connecting.lock().await.remove(&addr_clone);
484
485            match result {
486                Ok(Ok(stream)) => {
487                    // Pre-handshake pubkey exchange (temporary, pre-XX)
488                    if let Some(ref our_pubkey) = local_pubkey {
489                        match pubkey_exchange(&stream, our_pubkey).await {
490                            Ok(peer_pubkey) => {
491                                debug!(addr = %addr_clone, "BLE outbound pubkey exchange complete");
492                                discovery_buffer.add_peer_with_pubkey(&ble_addr, peer_pubkey);
493                            }
494                            Err(e) => {
495                                warn!(
496                                    addr = %addr_clone, error = %e,
497                                    "BLE outbound pubkey exchange failed"
498                                );
499                                return;
500                            }
501                        }
502                    }
503
504                    let send_mtu = stream.send_mtu();
505                    let recv_mtu = stream.recv_mtu();
506                    let stream = Arc::new(stream);
507
508                    let recv_task = tokio::spawn(receive_loop(
509                        Arc::clone(&stream),
510                        addr_clone.clone(),
511                        Arc::clone(&pool),
512                        packet_tx,
513                        transport_id,
514                        Arc::clone(&stats),
515                        recv_mtu,
516                    ));
517
518                    let conn = BleConnection {
519                        stream,
520                        recv_task: Some(recv_task),
521                        send_mtu,
522                        recv_mtu,
523                        established_at: tokio::time::Instant::now(),
524                        is_static: false,
525                        addr: ble_addr,
526                    };
527
528                    let mut pool = pool.lock().await;
529                    match pool.insert(addr_clone.clone(), conn) {
530                        Ok(Some(evicted)) => {
531                            stats.record_pool_eviction();
532                            debug!(addr = %addr_clone, evicted = %evicted, "BLE connection established (evicted peer)");
533                        }
534                        Ok(None) => {
535                            debug!(addr = %addr_clone, "BLE connection established");
536                        }
537                        Err(e) => {
538                            warn!(addr = %addr_clone, error = %e, "BLE pool full, connection dropped");
539                            stats.record_connection_rejected();
540                            return;
541                        }
542                    }
543                    stats.record_connection_established();
544                }
545                Ok(Err(e)) => {
546                    debug!(addr = %addr_clone, error = %e, "BLE connect failed");
547                }
548                Err(_) => {
549                    stats.record_connect_timeout();
550                    debug!(addr = %addr_clone, "BLE connect timeout");
551                }
552            }
553        });
554
555        self.connecting
556            .lock()
557            .await
558            .insert(addr.clone(), ConnectingEntry { task });
559
560        Ok(())
561    }
562
563    /// Query the state of a connection attempt.
564    pub fn connection_state_sync(&self, addr: &TransportAddr) -> ConnectionState {
565        // Check established pool (try_lock to avoid blocking)
566        if let Ok(pool) = self.pool.try_lock()
567            && pool.contains(addr)
568        {
569            return ConnectionState::Connected;
570        }
571
572        // Check connecting pool
573        if let Ok(connecting) = self.connecting.try_lock()
574            && connecting.contains_key(addr)
575        {
576            return ConnectionState::Connecting;
577        }
578
579        ConnectionState::None
580    }
581
582    /// Close a specific connection.
583    pub async fn close_connection_async(&self, addr: &TransportAddr) {
584        let mut pool = self.pool.lock().await;
585        if let Some(conn) = pool.remove(addr) {
586            debug!(addr = %addr, "BLE connection closed");
587            drop(conn); // recv_task aborted via Drop
588        }
589    }
590
591    /// Get the link MTU for a specific address.
592    pub fn link_mtu(&self, addr: &TransportAddr) -> u16 {
593        if let Ok(pool) = self.pool.try_lock()
594            && let Some(conn) = pool.get(addr)
595        {
596            return conn.effective_mtu();
597        }
598        self.config.mtu()
599    }
600}
601
602impl<I: BleIo> Transport for BleTransport<I> {
603    fn transport_id(&self) -> TransportId {
604        self.transport_id
605    }
606
607    fn transport_type(&self) -> &TransportType {
608        &TransportType::BLE
609    }
610
611    fn state(&self) -> TransportState {
612        self.state
613    }
614
615    fn mtu(&self) -> u16 {
616        self.config.mtu()
617    }
618
619    fn link_mtu(&self, addr: &TransportAddr) -> u16 {
620        self.link_mtu(addr)
621    }
622
623    fn start(&mut self) -> Result<(), TransportError> {
624        Err(TransportError::NotSupported(
625            "use start_async() for BLE transport".into(),
626        ))
627    }
628
629    fn stop(&mut self) -> Result<(), TransportError> {
630        Err(TransportError::NotSupported(
631            "use stop_async() for BLE transport".into(),
632        ))
633    }
634
635    fn send(&self, _addr: &TransportAddr, _data: &[u8]) -> Result<(), TransportError> {
636        Err(TransportError::NotSupported(
637            "use send_async() for BLE transport".into(),
638        ))
639    }
640
641    fn discover(&self) -> Result<Vec<DiscoveredPeer>, TransportError> {
642        Ok(self.discovery_buffer.take())
643    }
644
645    fn auto_connect(&self) -> bool {
646        self.config.auto_connect()
647    }
648
649    fn accept_connections(&self) -> bool {
650        self.config.accept_connections()
651    }
652
653    fn close_connection(&self, _addr: &TransportAddr) {
654        // use close_connection_async()
655    }
656}
657
658// ============================================================================
659// Background Tasks
660// ============================================================================
661
662/// Pre-handshake pubkey exchange prefix byte.
663///
664/// Distinguishes the identity exchange from FMP packets (version ≥ 0x01).
665/// Temporary — removed when FMP switches from IK to XX handshake.
666const PUBKEY_EXCHANGE_PREFIX: u8 = 0x00;
667
668/// Pre-handshake pubkey exchange message size: `[0x00][pubkey:32]`.
669const PUBKEY_EXCHANGE_SIZE: usize = 33;
670
671/// Timeout for pubkey exchange recv (seconds).
672///
673/// The peer should respond in milliseconds; 5s is generous. Without this,
674/// a peer that connects but never sends its pubkey blocks the calling task
675/// forever — killing scan_probe_loop, accept_loop, or the event loop.
676const PUBKEY_EXCHANGE_TIMEOUT_SECS: u64 = 5;
677
678/// Exchange public keys over a newly established L2CAP connection.
679///
680/// Both sides send `[0x00][our_pubkey:32]` and receive the peer's.
681/// Returns the peer's XOnlyPublicKey on success.
682async fn pubkey_exchange<S: BleStream>(
683    stream: &S,
684    local_pubkey: &[u8; 32],
685) -> Result<XOnlyPublicKey, TransportError> {
686    // Send our pubkey
687    let mut msg = [0u8; PUBKEY_EXCHANGE_SIZE];
688    msg[0] = PUBKEY_EXCHANGE_PREFIX;
689    msg[1..].copy_from_slice(local_pubkey);
690    stream.send(&msg).await?;
691
692    // Receive peer's pubkey (with timeout to prevent indefinite blocking)
693    let mut buf = [0u8; PUBKEY_EXCHANGE_SIZE];
694    let timeout = std::time::Duration::from_secs(PUBKEY_EXCHANGE_TIMEOUT_SECS);
695    let n = match tokio::time::timeout(timeout, stream.recv(&mut buf)).await {
696        Ok(result) => result?,
697        Err(_) => return Err(TransportError::Timeout),
698    };
699    if n != PUBKEY_EXCHANGE_SIZE {
700        return Err(TransportError::RecvFailed(format!(
701            "pubkey exchange: expected {} bytes, got {}",
702            PUBKEY_EXCHANGE_SIZE, n
703        )));
704    }
705    if buf[0] != PUBKEY_EXCHANGE_PREFIX {
706        return Err(TransportError::RecvFailed(format!(
707            "pubkey exchange: bad prefix 0x{:02X}",
708            buf[0]
709        )));
710    }
711
712    XOnlyPublicKey::from_slice(&buf[1..])
713        .map_err(|e| TransportError::RecvFailed(format!("pubkey exchange: invalid key: {}", e)))
714}
715
716// Beacon loop removed — advertising is now continuous (started once
717// in start_async, stopped in stop_async). BLE advertising overhead
718// is negligible (~0.15% duty cycle on advertising channels).
719
720/// Accept loop: accepts inbound L2CAP connections, exchanges pubkeys,
721/// and adds to pool.
722#[allow(clippy::too_many_arguments)]
723async fn accept_loop<A>(
724    mut acceptor: A,
725    pool: Arc<Mutex<ConnectionPool<Arc<A::Stream>>>>,
726    packet_tx: PacketTx,
727    transport_id: TransportId,
728    stats: Arc<BleStats>,
729    _max_conns: usize,
730    local_pubkey: Option<[u8; 32]>,
731    discovery_buffer: Arc<DiscoveryBuffer>,
732    local_node_addr: Option<NodeAddr>,
733) where
734    A: io::BleAcceptor,
735    A::Stream: 'static,
736{
737    loop {
738        match acceptor.accept().await {
739            Ok(stream) => {
740                let addr = stream.remote_addr().clone();
741                let ta = addr.to_transport_addr();
742
743                // Skip if already connected (outbound won the race)
744                {
745                    let pool_guard = pool.lock().await;
746                    if pool_guard.contains(&ta) {
747                        debug!(addr = %ta, "BLE inbound: already connected, skipping");
748                        continue;
749                    }
750                }
751
752                let send_mtu = stream.send_mtu();
753                let recv_mtu = stream.recv_mtu();
754
755                // Pre-handshake pubkey exchange (temporary, pre-XX)
756                if let Some(ref our_pubkey) = local_pubkey {
757                    match pubkey_exchange(&stream, our_pubkey).await {
758                        Ok(peer_pubkey) => {
759                            debug!(addr = %ta, "BLE inbound pubkey exchange complete");
760                            discovery_buffer.add_peer_with_pubkey(&addr, peer_pubkey);
761
762                            // Cross-probe tie-breaker: smaller NodeAddr's
763                            // outbound wins. If we're smaller, our outbound
764                            // should win — drop this inbound.
765                            if let Some(ref our_addr) = local_node_addr {
766                                let peer_addr = NodeAddr::from_pubkey(&peer_pubkey);
767                                if our_addr < &peer_addr {
768                                    debug!(
769                                        addr = %ta,
770                                        "BLE inbound tie-breaker: dropping (our addr < peer, outbound wins)"
771                                    );
772                                    continue;
773                                }
774                            }
775                        }
776                        Err(e) => {
777                            debug!(addr = %ta, error = %e, "BLE inbound pubkey exchange failed");
778                            continue;
779                        }
780                    }
781                }
782
783                let stream = Arc::new(stream);
784
785                // Spawn receive loop
786                let recv_task = tokio::spawn(receive_loop(
787                    Arc::clone(&stream),
788                    ta.clone(),
789                    Arc::clone(&pool),
790                    packet_tx.clone(),
791                    transport_id,
792                    Arc::clone(&stats),
793                    recv_mtu,
794                ));
795
796                let conn = BleConnection {
797                    stream,
798                    recv_task: Some(recv_task),
799                    send_mtu,
800                    recv_mtu,
801                    established_at: tokio::time::Instant::now(),
802                    is_static: false,
803                    addr,
804                };
805
806                let mut pool_guard = pool.lock().await;
807                match pool_guard.insert(ta.clone(), conn) {
808                    Ok(Some(evicted)) => {
809                        stats.record_pool_eviction();
810                        info!(addr = %ta, evicted = %evicted, "BLE inbound accepted (evicted peer)");
811                    }
812                    Ok(None) => {
813                        info!(addr = %ta, send_mtu, recv_mtu, "BLE inbound connection accepted");
814                    }
815                    Err(e) => {
816                        warn!(addr = %ta, error = %e, "BLE pool full, inbound connection rejected");
817                        stats.record_connection_rejected();
818                        continue;
819                    }
820                }
821                stats.record_connection_accepted();
822            }
823            Err(e) => {
824                warn!(error = %e, "BLE accept error");
825                break;
826            }
827        }
828    }
829}
830
831/// Receive loop: reads packets from a BLE stream and delivers to node.
832async fn receive_loop<S: BleStream>(
833    stream: Arc<S>,
834    addr: TransportAddr,
835    pool: Arc<Mutex<ConnectionPool<Arc<S>>>>,
836    packet_tx: PacketTx,
837    transport_id: TransportId,
838    stats: Arc<BleStats>,
839    recv_mtu: u16,
840) {
841    let mut buf = vec![0u8; recv_mtu as usize];
842    loop {
843        match stream.recv(&mut buf).await {
844            Ok(0) => {
845                debug!(addr = %addr, "BLE connection closed by peer");
846                break;
847            }
848            Ok(n) => {
849                stats.record_recv(n);
850                let packet = ReceivedPacket::new(transport_id, addr.clone(), buf[..n].to_vec());
851                if packet_tx.send(packet).is_err() {
852                    trace!("BLE packet_tx closed, stopping receive loop");
853                    break;
854                }
855            }
856            Err(e) => {
857                debug!(addr = %addr, error = %e, "BLE receive error");
858                stats.record_recv_error();
859                break;
860            }
861        }
862    }
863
864    // Remove from pool
865    let mut pool = pool.lock().await;
866    pool.remove(&addr);
867}
868
869/// Combined scan + probe loop.
870///
871/// Scanner events arrive continuously (both sides advertise continuously).
872/// Each scan result is probed immediately unless the address is in cooldown
873/// (recently probed) or already connected. On successful probe, the
874/// connection is promoted directly into the pool (no second L2CAP connect
875/// needed) and the peer is reported to the discovery buffer for the node
876/// layer to auto-connect.
877///
878/// Cooldown prevents rapid re-probing of the same address: after any probe
879/// attempt (success or failure), the address is suppressed for
880/// `cooldown_secs`. Connected peers are filtered by pool membership.
881#[allow(clippy::too_many_arguments)]
882async fn scan_probe_loop<I: io::BleIo>(
883    mut scanner: I::Scanner,
884    io: Arc<I>,
885    pool: Arc<Mutex<ConnectionPool<Arc<I::Stream>>>>,
886    buffer: Arc<DiscoveryBuffer>,
887    stats: Arc<BleStats>,
888    local_pubkey: Option<[u8; 32]>,
889    psm: u16,
890    connect_timeout_ms: u64,
891    cooldown_secs: u64,
892    local_node_addr: Option<NodeAddr>,
893    packet_tx: PacketTx,
894    transport_id: TransportId,
895) {
896    // Track last probe time per address for cooldown
897    let mut last_probed: HashMap<BleAddr, tokio::time::Instant> = HashMap::new();
898    // Addresses discovered but not yet connected — retried after cooldown
899    // even if the scanner doesn't fire again (BlueZ deduplicates).
900    let mut pending_addrs: Vec<BleAddr> = Vec::new();
901    let cooldown = std::time::Duration::from_secs(cooldown_secs);
902    let retry_interval = tokio::time::interval(std::time::Duration::from_secs(cooldown_secs));
903    tokio::pin!(retry_interval);
904    retry_interval.tick().await; // consume initial tick
905
906    loop {
907        // Either a scanner event or the retry timer fires
908        let addr = tokio::select! {
909            result = scanner.next() => {
910                match result {
911                    Some(a) => a,
912                    None => {
913                        debug!("BLE scanner ended");
914                        break;
915                    }
916                }
917            }
918            _ = retry_interval.tick() => {
919                // Re-probe pending addresses that aren't connected
920                let pool_guard = pool.lock().await;
921                pending_addrs.retain(|a| !pool_guard.contains(&a.to_transport_addr()));
922                drop(pool_guard);
923                if let Some(a) = pending_addrs.first().cloned() {
924                    a
925                } else {
926                    continue;
927                }
928            }
929        };
930
931        trace!(addr = %addr, "BLE scan result");
932        stats.record_scan_result();
933
934        // Skip if already connected
935        {
936            let pool_guard = pool.lock().await;
937            if pool_guard.contains(&addr.to_transport_addr()) {
938                pending_addrs.retain(|a| a != &addr);
939                continue;
940            }
941        }
942
943        // Track for retry in case probe fails and scanner doesn't re-fire
944        if !pending_addrs.contains(&addr) {
945            pending_addrs.push(addr.clone());
946        }
947
948        // Skip if in cooldown
949        if last_probed
950            .get(&addr)
951            .is_some_and(|last| last.elapsed() < cooldown)
952        {
953            continue;
954        }
955
956        // Record probe time (before attempt, so cooldown applies on failure too)
957        last_probed.insert(addr.clone(), tokio::time::Instant::now());
958
959        // Need pubkey for probe
960        let our_pubkey = match local_pubkey {
961            Some(pk) => pk,
962            None => {
963                buffer.add_peer(&addr);
964                continue;
965            }
966        };
967
968        // L2CAP connect
969        let stream = match tokio::time::timeout(
970            std::time::Duration::from_millis(connect_timeout_ms),
971            io.connect(&addr, psm),
972        )
973        .await
974        {
975            Ok(Ok(s)) => s,
976            Ok(Err(e)) => {
977                debug!(addr = %addr, error = %e, "BLE probe connect failed");
978                continue;
979            }
980            Err(_) => {
981                debug!(addr = %addr, "BLE probe connect timeout");
982                stats.record_connect_timeout();
983                continue;
984            }
985        };
986
987        // Pubkey exchange, then promote connection to pool
988        let ta = addr.to_transport_addr();
989        match pubkey_exchange(&stream, &our_pubkey).await {
990            Ok(peer_pubkey) => {
991                debug!(addr = %addr, "BLE probe complete");
992
993                // Cross-probe tie-breaker: smaller NodeAddr's outbound wins.
994                // If we lose, drop connection — accept_loop handles inbound.
995                if let Some(ref our_addr) = local_node_addr {
996                    let peer_addr = NodeAddr::from_pubkey(&peer_pubkey);
997                    if our_addr >= &peer_addr {
998                        debug!(
999                            addr = %addr,
1000                            "BLE probe tie-breaker: yielding to peer's outbound"
1001                        );
1002                        buffer.add_peer_with_pubkey(&addr, peer_pubkey);
1003                        continue;
1004                    }
1005                }
1006
1007                // Promote connection to pool — no second L2CAP connect needed
1008                let send_mtu = stream.send_mtu();
1009                let recv_mtu = stream.recv_mtu();
1010                let stream = Arc::new(stream);
1011
1012                let recv_task = tokio::spawn(receive_loop(
1013                    Arc::clone(&stream),
1014                    ta.clone(),
1015                    Arc::clone(&pool),
1016                    packet_tx.clone(),
1017                    transport_id,
1018                    Arc::clone(&stats),
1019                    recv_mtu,
1020                ));
1021
1022                let conn = BleConnection {
1023                    stream,
1024                    recv_task: Some(recv_task),
1025                    send_mtu,
1026                    recv_mtu,
1027                    established_at: tokio::time::Instant::now(),
1028                    is_static: false,
1029                    addr: addr.clone(),
1030                };
1031
1032                let mut pool_guard = pool.lock().await;
1033                match pool_guard.insert(ta.clone(), conn) {
1034                    Ok(Some(evicted)) => {
1035                        stats.record_pool_eviction();
1036                        debug!(addr = %ta, evicted = %evicted, "BLE probe promoted (evicted peer)");
1037                    }
1038                    Ok(None) => {
1039                        debug!(addr = %ta, "BLE probe promoted to pool");
1040                    }
1041                    Err(e) => {
1042                        warn!(addr = %ta, error = %e, "BLE pool full, probe connection dropped");
1043                        stats.record_connection_rejected();
1044                    }
1045                }
1046                drop(pool_guard);
1047                stats.record_connection_established();
1048                pending_addrs.retain(|a| a != &addr);
1049
1050                // Report to node layer for auto-connect / handshake
1051                buffer.add_peer_with_pubkey(&addr, peer_pubkey);
1052            }
1053            Err(e) => {
1054                debug!(addr = %addr, error = %e, "BLE probe pubkey exchange failed");
1055            }
1056        }
1057    }
1058}
1059
1060// ============================================================================
1061// Tests
1062// ============================================================================
1063
1064#[cfg(test)]
1065mod tests {
1066    use super::*;
1067    use io::MockBleIo;
1068
1069    fn test_addr(n: u8) -> BleAddr {
1070        BleAddr {
1071            adapter: "hci0".to_string(),
1072            device: [0xAA, 0xBB, 0xCC, 0xDD, 0xEE, n],
1073        }
1074    }
1075
1076    fn make_transport(io: MockBleIo) -> (BleTransport<MockBleIo>, crate::transport::PacketRx) {
1077        let (tx, rx) = crate::transport::packet_channel(64);
1078        let config = BleConfig::default();
1079        let transport = BleTransport::new(TransportId::new(1), None, config, io, tx);
1080        (transport, rx)
1081    }
1082
1083    #[test]
1084    fn test_transport_type() {
1085        let io = MockBleIo::new("hci0", test_addr(1));
1086        let (transport, _rx) = make_transport(io);
1087        assert_eq!(transport.transport_type().name, "ble");
1088        assert!(transport.transport_type().connection_oriented);
1089        assert!(transport.transport_type().reliable);
1090    }
1091
1092    #[test]
1093    fn test_transport_initial_state() {
1094        let io = MockBleIo::new("hci0", test_addr(1));
1095        let (transport, _rx) = make_transport(io);
1096        assert_eq!(transport.state(), TransportState::Configured);
1097    }
1098
1099    #[test]
1100    fn test_transport_default_mtu() {
1101        let io = MockBleIo::new("hci0", test_addr(1));
1102        let (transport, _rx) = make_transport(io);
1103        assert_eq!(transport.mtu(), 2048);
1104    }
1105
1106    #[tokio::test]
1107    async fn test_transport_start_stop() {
1108        let io = MockBleIo::new("hci0", test_addr(1));
1109        let (mut transport, _rx) = make_transport(io);
1110        transport.start_async().await.unwrap();
1111        assert_eq!(transport.state(), TransportState::Up);
1112
1113        transport.stop_async().await.unwrap();
1114        assert_eq!(transport.state(), TransportState::Down);
1115    }
1116
1117    #[tokio::test(start_paused = true)]
1118    async fn test_scan_discovers_peers() {
1119        let io = MockBleIo::new("hci0", test_addr(1));
1120        let (mut transport, _rx) = make_transport(io);
1121        transport.start_async().await.unwrap();
1122
1123        // Inject scan results via the I/O mock
1124        transport.io.inject_scan_result(test_addr(2)).await;
1125        transport.io.inject_scan_result(test_addr(3)).await;
1126
1127        // Let scan_probe_loop pick up results and schedule jitter
1128        tokio::task::yield_now().await;
1129        // Advance past max jitter (5s) so probes fire
1130        tokio::time::advance(std::time::Duration::from_secs(6)).await;
1131        // Let the expired entries get processed
1132        tokio::task::yield_now().await;
1133
1134        // Without pubkey set, scan results go to discovery buffer as bare MACs
1135        let peers = transport.discovery_buffer.take();
1136        assert_eq!(peers.len(), 2);
1137    }
1138
1139    #[tokio::test(start_paused = true)]
1140    async fn test_scan_deduplicates() {
1141        let io = MockBleIo::new("hci0", test_addr(1));
1142        let (mut transport, _rx) = make_transport(io);
1143        transport.start_async().await.unwrap();
1144
1145        // Same address twice
1146        transport.io.inject_scan_result(test_addr(2)).await;
1147        transport.io.inject_scan_result(test_addr(2)).await;
1148
1149        // Let scan_probe_loop pick up results
1150        tokio::task::yield_now().await;
1151        tokio::time::advance(std::time::Duration::from_secs(6)).await;
1152        tokio::task::yield_now().await;
1153
1154        let peers = transport.discovery_buffer.take();
1155        assert_eq!(peers.len(), 1);
1156    }
1157
1158    #[test]
1159    fn test_transport_auto_connect_default() {
1160        let io = MockBleIo::new("hci0", test_addr(1));
1161        let (transport, _rx) = make_transport(io);
1162        assert!(!transport.auto_connect());
1163    }
1164
1165    #[test]
1166    fn test_connection_state_none() {
1167        let io = MockBleIo::new("hci0", test_addr(1));
1168        let (transport, _rx) = make_transport(io);
1169        let addr = test_addr(2).to_transport_addr();
1170        assert_eq!(
1171            transport.connection_state_sync(&addr),
1172            ConnectionState::None
1173        );
1174    }
1175
1176    /// Verify that the cross-probe tie-breaker follows the same convention
1177    /// as `cross_connection_winner`: smaller NodeAddr's outbound wins.
1178    #[test]
1179    fn test_tiebreaker_convention() {
1180        use secp256k1::{Secp256k1, SecretKey};
1181
1182        let secp = Secp256k1::new();
1183        let sk_a = SecretKey::from_slice(&[1u8; 32]).unwrap();
1184        let sk_b = SecretKey::from_slice(&[2u8; 32]).unwrap();
1185        let (pk_a, _) = sk_a.public_key(&secp).x_only_public_key();
1186        let (pk_b, _) = sk_b.public_key(&secp).x_only_public_key();
1187
1188        let addr_a = NodeAddr::from_pubkey(&pk_a);
1189        let addr_b = NodeAddr::from_pubkey(&pk_b);
1190
1191        // Determine which is smaller
1192        let (smaller, larger) = if addr_a < addr_b {
1193            (addr_a, addr_b)
1194        } else {
1195            (addr_b, addr_a)
1196        };
1197
1198        // scan_loop (outbound): promotes when our_addr < peer_addr
1199        // Smaller node scanning larger → our_addr < peer_addr → promote (win)
1200        assert!(smaller < larger, "test setup: smaller < larger");
1201
1202        // accept_loop (inbound): drops when our_addr < peer_addr
1203        // Smaller node accepting from larger → drops inbound (outbound wins)
1204        // This means: smaller always uses outbound, larger always uses inbound
1205    }
1206}