pub mod addr;
pub mod discovery;
pub mod io;
pub mod pool;
pub mod stats;
use super::{
ConnectionState, DiscoveredPeer, PacketTx, ReceivedPacket, Transport, TransportAddr,
TransportError, TransportId, TransportState, TransportType,
};
use crate::config::BleConfig;
use crate::identity::NodeAddr;
use addr::BleAddr;
use discovery::DiscoveryBuffer;
use io::{BleIo, BleScanner, BleStream};
use pool::{BleConnection, ConnectionPool};
use stats::BleStats;
use secp256k1::XOnlyPublicKey;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::{debug, info, trace, warn};
pub const DEFAULT_PSM: u16 = 0x0085;
#[cfg(all(bluer_available, not(test)))]
pub type DefaultBleTransport = BleTransport<io::BluerIo>;
#[cfg(any(not(bluer_available), test))]
pub type DefaultBleTransport = BleTransport<io::MockBleIo>;
pub struct BleTransport<I: BleIo> {
transport_id: TransportId,
name: Option<String>,
config: BleConfig,
state: TransportState,
io: Arc<I>,
pool: Arc<Mutex<ConnectionPool<Arc<I::Stream>>>>,
connecting: Arc<Mutex<HashMap<TransportAddr, ConnectingEntry>>>,
packet_tx: PacketTx,
accept_task: Option<JoinHandle<()>>,
scan_probe_task: Option<JoinHandle<()>>,
discovery_buffer: Arc<DiscoveryBuffer>,
stats: Arc<BleStats>,
local_pubkey: Option<[u8; 32]>,
}
struct ConnectingEntry {
task: JoinHandle<()>,
}
impl<I: BleIo> BleTransport<I> {
pub fn new(
transport_id: TransportId,
name: Option<String>,
config: BleConfig,
io: I,
packet_tx: PacketTx,
) -> Self {
let max_conns = config.max_connections();
Self {
transport_id,
name,
config,
state: TransportState::Configured,
io: Arc::new(io),
pool: Arc::new(Mutex::new(ConnectionPool::new(max_conns))),
connecting: Arc::new(Mutex::new(HashMap::new())),
packet_tx,
accept_task: None,
scan_probe_task: None,
discovery_buffer: Arc::new(DiscoveryBuffer::new(transport_id)),
stats: Arc::new(BleStats::new()),
local_pubkey: None,
}
}
pub fn name(&self) -> Option<&str> {
self.name.as_deref()
}
pub fn stats(&self) -> &Arc<BleStats> {
&self.stats
}
pub fn io(&self) -> &Arc<I> {
&self.io
}
pub fn set_local_pubkey(&mut self, pubkey: [u8; 32]) {
self.local_pubkey = Some(pubkey);
}
pub async fn start_async(&mut self) -> Result<(), TransportError> {
if !self.state.can_start() {
return Err(TransportError::AlreadyStarted);
}
self.state = TransportState::Starting;
let psm = self.config.psm();
let adapter = self.io.adapter_name().to_string();
let local_node_addr = self.local_pubkey.and_then(|pk| {
XOnlyPublicKey::from_slice(&pk)
.ok()
.map(|xonly| NodeAddr::from_pubkey(&xonly))
});
if self.config.accept_connections() {
match self.io.listen(psm).await {
Ok(acceptor) => {
let pool = Arc::clone(&self.pool);
let packet_tx = self.packet_tx.clone();
let transport_id = self.transport_id;
let stats = Arc::clone(&self.stats);
let max_conns = self.config.max_connections();
self.accept_task = Some(tokio::spawn(accept_loop(
acceptor,
pool,
packet_tx,
transport_id,
stats,
max_conns,
self.local_pubkey,
Arc::clone(&self.discovery_buffer),
local_node_addr,
)));
debug!(adapter = %adapter, psm = psm, "BLE accept loop started");
}
Err(e) => {
warn!(adapter = %adapter, error = %e, "failed to start BLE listener");
self.state = TransportState::Failed;
return Err(e);
}
}
}
if self.config.advertise() {
if let Err(e) = self.io.start_advertising().await {
warn!(adapter = %adapter, error = %e, "failed to start BLE advertising");
} else {
self.stats.record_advertisement();
debug!(adapter = %adapter, "BLE advertising started (continuous)");
}
}
if self.config.scan() {
match self.io.start_scanning().await {
Ok(scanner) => {
self.scan_probe_task = Some(tokio::spawn(scan_probe_loop::<I>(
scanner,
Arc::clone(&self.io),
Arc::clone(&self.pool),
Arc::clone(&self.discovery_buffer),
Arc::clone(&self.stats),
self.local_pubkey,
self.config.psm(),
self.config.connect_timeout_ms(),
self.config.probe_cooldown_secs(),
local_node_addr,
self.packet_tx.clone(),
self.transport_id,
)));
debug!(adapter = %adapter, "BLE scan+probe loop started");
}
Err(e) => {
warn!(adapter = %adapter, error = %e, "failed to start BLE scanning");
}
}
}
self.state = TransportState::Up;
info!(adapter = %adapter, psm = psm, "BLE transport started");
Ok(())
}
pub async fn stop_async(&mut self) -> Result<(), TransportError> {
let _ = self.io.stop_advertising().await;
if let Some(task) = self.accept_task.take() {
task.abort();
}
if let Some(task) = self.scan_probe_task.take() {
task.abort();
}
{
let mut connecting = self.connecting.lock().await;
for (_, entry) in connecting.drain() {
entry.task.abort();
}
}
{
let mut pool = self.pool.lock().await;
for addr in pool.addrs() {
pool.remove(&addr);
}
}
self.state = TransportState::Down;
info!("BLE transport stopped");
Ok(())
}
pub async fn send_async(
&self,
addr: &TransportAddr,
data: &[u8],
) -> Result<usize, TransportError> {
let pool = self.pool.lock().await;
let conn = match pool.get(addr) {
Some(c) => c,
None => {
drop(pool);
let _ = self.connect_async(addr).await;
return Err(TransportError::SendFailed("not connected".into()));
}
};
let mtu = conn.effective_mtu() as usize;
if data.len() > mtu {
self.stats.record_mtu_exceeded();
return Err(TransportError::MtuExceeded {
packet_size: data.len(),
mtu: mtu as u16,
});
}
match conn.stream.send(data).await {
Ok(()) => {
self.stats.record_send(data.len());
Ok(data.len())
}
Err(e) => {
self.stats.record_send_error();
drop(pool);
let mut pool = self.pool.lock().await;
pool.remove(addr);
warn!(addr = %addr, error = %e, "BLE send failed, connection removed");
Err(e)
}
}
}
#[allow(dead_code)]
async fn connect_inline(&self, addr: &TransportAddr) -> Result<(), TransportError> {
let ble_addr = BleAddr::parse(
addr.as_str()
.ok_or_else(|| TransportError::InvalidAddress("not valid UTF-8".into()))?,
)?;
let psm = self.config.psm();
let timeout_ms = self.config.connect_timeout_ms();
let stream = match tokio::time::timeout(
std::time::Duration::from_millis(timeout_ms),
self.io.connect(&ble_addr, psm),
)
.await
{
Ok(Ok(stream)) => stream,
Ok(Err(e)) => {
debug!(addr = %addr, error = %e, "BLE connect-on-send failed");
return Err(TransportError::ConnectionRefused);
}
Err(_) => {
self.stats.record_connect_timeout();
debug!(addr = %addr, "BLE connect-on-send timeout");
return Err(TransportError::Timeout);
}
};
if let Some(ref our_pubkey) = self.local_pubkey {
match pubkey_exchange(&stream, our_pubkey).await {
Ok(peer_pubkey) => {
debug!(addr = %addr, "BLE outbound pubkey exchange complete");
self.discovery_buffer
.add_peer_with_pubkey(&ble_addr, peer_pubkey);
}
Err(e) => {
warn!(addr = %addr, error = %e, "BLE outbound pubkey exchange failed");
return Err(e);
}
}
}
self.promote_connection(addr, &ble_addr, stream).await
}
async fn promote_connection(
&self,
addr: &TransportAddr,
ble_addr: &BleAddr,
stream: I::Stream,
) -> Result<(), TransportError> {
let send_mtu = stream.send_mtu();
let recv_mtu = stream.recv_mtu();
let stream = Arc::new(stream);
let recv_task = tokio::spawn(receive_loop(
Arc::clone(&stream),
addr.clone(),
Arc::clone(&self.pool),
self.packet_tx.clone(),
self.transport_id,
Arc::clone(&self.stats),
recv_mtu,
));
let conn = BleConnection {
stream,
recv_task: Some(recv_task),
send_mtu,
recv_mtu,
established_at: tokio::time::Instant::now(),
is_static: false,
addr: ble_addr.clone(),
};
let mut pool = self.pool.lock().await;
match pool.insert(addr.clone(), conn) {
Ok(Some(evicted)) => {
self.stats.record_pool_eviction();
debug!(addr = %addr, evicted = %evicted, "BLE connection established (evicted peer)");
}
Ok(None) => {
debug!(addr = %addr, "BLE connection established");
}
Err(e) => {
warn!(addr = %addr, error = %e, "BLE pool full, connection dropped");
self.stats.record_connection_rejected();
return Err(TransportError::SendFailed("pool full".into()));
}
}
self.stats.record_connection_established();
Ok(())
}
pub async fn connect_async(&self, addr: &TransportAddr) -> Result<(), TransportError> {
{
let pool = self.pool.lock().await;
if pool.contains(addr) {
return Ok(());
}
}
{
let connecting = self.connecting.lock().await;
if connecting.contains_key(addr) {
return Ok(());
}
}
let ble_addr = BleAddr::parse(
addr.as_str()
.ok_or_else(|| TransportError::InvalidAddress("not valid UTF-8".into()))?,
)?;
let io = Arc::clone(&self.io);
let pool = Arc::clone(&self.pool);
let connecting = Arc::clone(&self.connecting);
let packet_tx = self.packet_tx.clone();
let transport_id = self.transport_id;
let stats = Arc::clone(&self.stats);
let psm = self.config.psm();
let timeout_ms = self.config.connect_timeout_ms();
let addr_clone = addr.clone();
let local_pubkey = self.local_pubkey;
let discovery_buffer = Arc::clone(&self.discovery_buffer);
let task = tokio::spawn(async move {
let result = tokio::time::timeout(
std::time::Duration::from_millis(timeout_ms),
io.connect(&ble_addr, psm),
)
.await;
connecting.lock().await.remove(&addr_clone);
match result {
Ok(Ok(stream)) => {
if let Some(ref our_pubkey) = local_pubkey {
match pubkey_exchange(&stream, our_pubkey).await {
Ok(peer_pubkey) => {
debug!(addr = %addr_clone, "BLE outbound pubkey exchange complete");
discovery_buffer.add_peer_with_pubkey(&ble_addr, peer_pubkey);
}
Err(e) => {
warn!(
addr = %addr_clone, error = %e,
"BLE outbound pubkey exchange failed"
);
return;
}
}
}
let send_mtu = stream.send_mtu();
let recv_mtu = stream.recv_mtu();
let stream = Arc::new(stream);
let recv_task = tokio::spawn(receive_loop(
Arc::clone(&stream),
addr_clone.clone(),
Arc::clone(&pool),
packet_tx,
transport_id,
Arc::clone(&stats),
recv_mtu,
));
let conn = BleConnection {
stream,
recv_task: Some(recv_task),
send_mtu,
recv_mtu,
established_at: tokio::time::Instant::now(),
is_static: false,
addr: ble_addr,
};
let mut pool = pool.lock().await;
match pool.insert(addr_clone.clone(), conn) {
Ok(Some(evicted)) => {
stats.record_pool_eviction();
debug!(addr = %addr_clone, evicted = %evicted, "BLE connection established (evicted peer)");
}
Ok(None) => {
debug!(addr = %addr_clone, "BLE connection established");
}
Err(e) => {
warn!(addr = %addr_clone, error = %e, "BLE pool full, connection dropped");
stats.record_connection_rejected();
return;
}
}
stats.record_connection_established();
}
Ok(Err(e)) => {
debug!(addr = %addr_clone, error = %e, "BLE connect failed");
}
Err(_) => {
stats.record_connect_timeout();
debug!(addr = %addr_clone, "BLE connect timeout");
}
}
});
self.connecting
.lock()
.await
.insert(addr.clone(), ConnectingEntry { task });
Ok(())
}
pub fn connection_state_sync(&self, addr: &TransportAddr) -> ConnectionState {
if let Ok(pool) = self.pool.try_lock()
&& pool.contains(addr)
{
return ConnectionState::Connected;
}
if let Ok(connecting) = self.connecting.try_lock()
&& connecting.contains_key(addr)
{
return ConnectionState::Connecting;
}
ConnectionState::None
}
pub async fn close_connection_async(&self, addr: &TransportAddr) {
let mut pool = self.pool.lock().await;
if let Some(conn) = pool.remove(addr) {
debug!(addr = %addr, "BLE connection closed");
drop(conn); }
}
pub fn link_mtu(&self, addr: &TransportAddr) -> u16 {
if let Ok(pool) = self.pool.try_lock()
&& let Some(conn) = pool.get(addr)
{
return conn.effective_mtu();
}
self.config.mtu()
}
}
impl<I: BleIo> Transport for BleTransport<I> {
fn transport_id(&self) -> TransportId {
self.transport_id
}
fn transport_type(&self) -> &TransportType {
&TransportType::BLE
}
fn state(&self) -> TransportState {
self.state
}
fn mtu(&self) -> u16 {
self.config.mtu()
}
fn link_mtu(&self, addr: &TransportAddr) -> u16 {
self.link_mtu(addr)
}
fn start(&mut self) -> Result<(), TransportError> {
Err(TransportError::NotSupported(
"use start_async() for BLE transport".into(),
))
}
fn stop(&mut self) -> Result<(), TransportError> {
Err(TransportError::NotSupported(
"use stop_async() for BLE transport".into(),
))
}
fn send(&self, _addr: &TransportAddr, _data: &[u8]) -> Result<(), TransportError> {
Err(TransportError::NotSupported(
"use send_async() for BLE transport".into(),
))
}
fn discover(&self) -> Result<Vec<DiscoveredPeer>, TransportError> {
Ok(self.discovery_buffer.take())
}
fn auto_connect(&self) -> bool {
self.config.auto_connect()
}
fn accept_connections(&self) -> bool {
self.config.accept_connections()
}
fn close_connection(&self, _addr: &TransportAddr) {
}
}
const PUBKEY_EXCHANGE_PREFIX: u8 = 0x00;
const PUBKEY_EXCHANGE_SIZE: usize = 33;
const PUBKEY_EXCHANGE_TIMEOUT_SECS: u64 = 5;
async fn pubkey_exchange<S: BleStream>(
stream: &S,
local_pubkey: &[u8; 32],
) -> Result<XOnlyPublicKey, TransportError> {
let mut msg = [0u8; PUBKEY_EXCHANGE_SIZE];
msg[0] = PUBKEY_EXCHANGE_PREFIX;
msg[1..].copy_from_slice(local_pubkey);
stream.send(&msg).await?;
let mut buf = [0u8; PUBKEY_EXCHANGE_SIZE];
let timeout = std::time::Duration::from_secs(PUBKEY_EXCHANGE_TIMEOUT_SECS);
let n = match tokio::time::timeout(timeout, stream.recv(&mut buf)).await {
Ok(result) => result?,
Err(_) => return Err(TransportError::Timeout),
};
if n != PUBKEY_EXCHANGE_SIZE {
return Err(TransportError::RecvFailed(format!(
"pubkey exchange: expected {} bytes, got {}",
PUBKEY_EXCHANGE_SIZE, n
)));
}
if buf[0] != PUBKEY_EXCHANGE_PREFIX {
return Err(TransportError::RecvFailed(format!(
"pubkey exchange: bad prefix 0x{:02X}",
buf[0]
)));
}
XOnlyPublicKey::from_slice(&buf[1..])
.map_err(|e| TransportError::RecvFailed(format!("pubkey exchange: invalid key: {}", e)))
}
#[allow(clippy::too_many_arguments)]
async fn accept_loop<A>(
mut acceptor: A,
pool: Arc<Mutex<ConnectionPool<Arc<A::Stream>>>>,
packet_tx: PacketTx,
transport_id: TransportId,
stats: Arc<BleStats>,
_max_conns: usize,
local_pubkey: Option<[u8; 32]>,
discovery_buffer: Arc<DiscoveryBuffer>,
local_node_addr: Option<NodeAddr>,
) where
A: io::BleAcceptor,
A::Stream: 'static,
{
loop {
match acceptor.accept().await {
Ok(stream) => {
let addr = stream.remote_addr().clone();
let ta = addr.to_transport_addr();
{
let pool_guard = pool.lock().await;
if pool_guard.contains(&ta) {
debug!(addr = %ta, "BLE inbound: already connected, skipping");
continue;
}
}
let send_mtu = stream.send_mtu();
let recv_mtu = stream.recv_mtu();
if let Some(ref our_pubkey) = local_pubkey {
match pubkey_exchange(&stream, our_pubkey).await {
Ok(peer_pubkey) => {
debug!(addr = %ta, "BLE inbound pubkey exchange complete");
discovery_buffer.add_peer_with_pubkey(&addr, peer_pubkey);
if let Some(ref our_addr) = local_node_addr {
let peer_addr = NodeAddr::from_pubkey(&peer_pubkey);
if our_addr < &peer_addr {
debug!(
addr = %ta,
"BLE inbound tie-breaker: dropping (our addr < peer, outbound wins)"
);
continue;
}
}
}
Err(e) => {
debug!(addr = %ta, error = %e, "BLE inbound pubkey exchange failed");
continue;
}
}
}
let stream = Arc::new(stream);
let recv_task = tokio::spawn(receive_loop(
Arc::clone(&stream),
ta.clone(),
Arc::clone(&pool),
packet_tx.clone(),
transport_id,
Arc::clone(&stats),
recv_mtu,
));
let conn = BleConnection {
stream,
recv_task: Some(recv_task),
send_mtu,
recv_mtu,
established_at: tokio::time::Instant::now(),
is_static: false,
addr,
};
let mut pool_guard = pool.lock().await;
match pool_guard.insert(ta.clone(), conn) {
Ok(Some(evicted)) => {
stats.record_pool_eviction();
info!(addr = %ta, evicted = %evicted, "BLE inbound accepted (evicted peer)");
}
Ok(None) => {
info!(addr = %ta, send_mtu, recv_mtu, "BLE inbound connection accepted");
}
Err(e) => {
warn!(addr = %ta, error = %e, "BLE pool full, inbound connection rejected");
stats.record_connection_rejected();
continue;
}
}
stats.record_connection_accepted();
}
Err(e) => {
warn!(error = %e, "BLE accept error");
break;
}
}
}
}
async fn receive_loop<S: BleStream>(
stream: Arc<S>,
addr: TransportAddr,
pool: Arc<Mutex<ConnectionPool<Arc<S>>>>,
packet_tx: PacketTx,
transport_id: TransportId,
stats: Arc<BleStats>,
recv_mtu: u16,
) {
let mut buf = vec![0u8; recv_mtu as usize];
loop {
match stream.recv(&mut buf).await {
Ok(0) => {
debug!(addr = %addr, "BLE connection closed by peer");
break;
}
Ok(n) => {
stats.record_recv(n);
let packet = ReceivedPacket::new(transport_id, addr.clone(), buf[..n].to_vec());
if packet_tx.send(packet).is_err() {
trace!("BLE packet_tx closed, stopping receive loop");
break;
}
}
Err(e) => {
debug!(addr = %addr, error = %e, "BLE receive error");
stats.record_recv_error();
break;
}
}
}
let mut pool = pool.lock().await;
pool.remove(&addr);
}
#[allow(clippy::too_many_arguments)]
async fn scan_probe_loop<I: io::BleIo>(
mut scanner: I::Scanner,
io: Arc<I>,
pool: Arc<Mutex<ConnectionPool<Arc<I::Stream>>>>,
buffer: Arc<DiscoveryBuffer>,
stats: Arc<BleStats>,
local_pubkey: Option<[u8; 32]>,
psm: u16,
connect_timeout_ms: u64,
cooldown_secs: u64,
local_node_addr: Option<NodeAddr>,
packet_tx: PacketTx,
transport_id: TransportId,
) {
let mut last_probed: HashMap<BleAddr, tokio::time::Instant> = HashMap::new();
let mut pending_addrs: Vec<BleAddr> = Vec::new();
let cooldown = std::time::Duration::from_secs(cooldown_secs);
let retry_interval = tokio::time::interval(std::time::Duration::from_secs(cooldown_secs));
tokio::pin!(retry_interval);
retry_interval.tick().await;
loop {
let addr = tokio::select! {
result = scanner.next() => {
match result {
Some(a) => a,
None => {
debug!("BLE scanner ended");
break;
}
}
}
_ = retry_interval.tick() => {
let pool_guard = pool.lock().await;
pending_addrs.retain(|a| !pool_guard.contains(&a.to_transport_addr()));
drop(pool_guard);
if let Some(a) = pending_addrs.first().cloned() {
a
} else {
continue;
}
}
};
trace!(addr = %addr, "BLE scan result");
stats.record_scan_result();
{
let pool_guard = pool.lock().await;
if pool_guard.contains(&addr.to_transport_addr()) {
pending_addrs.retain(|a| a != &addr);
continue;
}
}
if !pending_addrs.contains(&addr) {
pending_addrs.push(addr.clone());
}
if last_probed
.get(&addr)
.is_some_and(|last| last.elapsed() < cooldown)
{
continue;
}
last_probed.insert(addr.clone(), tokio::time::Instant::now());
let our_pubkey = match local_pubkey {
Some(pk) => pk,
None => {
buffer.add_peer(&addr);
continue;
}
};
let stream = match tokio::time::timeout(
std::time::Duration::from_millis(connect_timeout_ms),
io.connect(&addr, psm),
)
.await
{
Ok(Ok(s)) => s,
Ok(Err(e)) => {
debug!(addr = %addr, error = %e, "BLE probe connect failed");
continue;
}
Err(_) => {
debug!(addr = %addr, "BLE probe connect timeout");
stats.record_connect_timeout();
continue;
}
};
let ta = addr.to_transport_addr();
match pubkey_exchange(&stream, &our_pubkey).await {
Ok(peer_pubkey) => {
debug!(addr = %addr, "BLE probe complete");
if let Some(ref our_addr) = local_node_addr {
let peer_addr = NodeAddr::from_pubkey(&peer_pubkey);
if our_addr >= &peer_addr {
debug!(
addr = %addr,
"BLE probe tie-breaker: yielding to peer's outbound"
);
buffer.add_peer_with_pubkey(&addr, peer_pubkey);
continue;
}
}
let send_mtu = stream.send_mtu();
let recv_mtu = stream.recv_mtu();
let stream = Arc::new(stream);
let recv_task = tokio::spawn(receive_loop(
Arc::clone(&stream),
ta.clone(),
Arc::clone(&pool),
packet_tx.clone(),
transport_id,
Arc::clone(&stats),
recv_mtu,
));
let conn = BleConnection {
stream,
recv_task: Some(recv_task),
send_mtu,
recv_mtu,
established_at: tokio::time::Instant::now(),
is_static: false,
addr: addr.clone(),
};
let mut pool_guard = pool.lock().await;
match pool_guard.insert(ta.clone(), conn) {
Ok(Some(evicted)) => {
stats.record_pool_eviction();
debug!(addr = %ta, evicted = %evicted, "BLE probe promoted (evicted peer)");
}
Ok(None) => {
debug!(addr = %ta, "BLE probe promoted to pool");
}
Err(e) => {
warn!(addr = %ta, error = %e, "BLE pool full, probe connection dropped");
stats.record_connection_rejected();
}
}
drop(pool_guard);
stats.record_connection_established();
pending_addrs.retain(|a| a != &addr);
buffer.add_peer_with_pubkey(&addr, peer_pubkey);
}
Err(e) => {
debug!(addr = %addr, error = %e, "BLE probe pubkey exchange failed");
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use io::MockBleIo;
fn test_addr(n: u8) -> BleAddr {
BleAddr {
adapter: "hci0".to_string(),
device: [0xAA, 0xBB, 0xCC, 0xDD, 0xEE, n],
}
}
fn make_transport(io: MockBleIo) -> (BleTransport<MockBleIo>, crate::transport::PacketRx) {
let (tx, rx) = crate::transport::packet_channel(64);
let config = BleConfig::default();
let transport = BleTransport::new(TransportId::new(1), None, config, io, tx);
(transport, rx)
}
#[test]
fn test_transport_type() {
let io = MockBleIo::new("hci0", test_addr(1));
let (transport, _rx) = make_transport(io);
assert_eq!(transport.transport_type().name, "ble");
assert!(transport.transport_type().connection_oriented);
assert!(transport.transport_type().reliable);
}
#[test]
fn test_transport_initial_state() {
let io = MockBleIo::new("hci0", test_addr(1));
let (transport, _rx) = make_transport(io);
assert_eq!(transport.state(), TransportState::Configured);
}
#[test]
fn test_transport_default_mtu() {
let io = MockBleIo::new("hci0", test_addr(1));
let (transport, _rx) = make_transport(io);
assert_eq!(transport.mtu(), 2048);
}
#[tokio::test]
async fn test_transport_start_stop() {
let io = MockBleIo::new("hci0", test_addr(1));
let (mut transport, _rx) = make_transport(io);
transport.start_async().await.unwrap();
assert_eq!(transport.state(), TransportState::Up);
transport.stop_async().await.unwrap();
assert_eq!(transport.state(), TransportState::Down);
}
#[tokio::test(start_paused = true)]
async fn test_scan_discovers_peers() {
let io = MockBleIo::new("hci0", test_addr(1));
let (mut transport, _rx) = make_transport(io);
transport.start_async().await.unwrap();
transport.io.inject_scan_result(test_addr(2)).await;
transport.io.inject_scan_result(test_addr(3)).await;
tokio::task::yield_now().await;
tokio::time::advance(std::time::Duration::from_secs(6)).await;
tokio::task::yield_now().await;
let peers = transport.discovery_buffer.take();
assert_eq!(peers.len(), 2);
}
#[tokio::test(start_paused = true)]
async fn test_scan_deduplicates() {
let io = MockBleIo::new("hci0", test_addr(1));
let (mut transport, _rx) = make_transport(io);
transport.start_async().await.unwrap();
transport.io.inject_scan_result(test_addr(2)).await;
transport.io.inject_scan_result(test_addr(2)).await;
tokio::task::yield_now().await;
tokio::time::advance(std::time::Duration::from_secs(6)).await;
tokio::task::yield_now().await;
let peers = transport.discovery_buffer.take();
assert_eq!(peers.len(), 1);
}
#[test]
fn test_transport_auto_connect_default() {
let io = MockBleIo::new("hci0", test_addr(1));
let (transport, _rx) = make_transport(io);
assert!(!transport.auto_connect());
}
#[test]
fn test_connection_state_none() {
let io = MockBleIo::new("hci0", test_addr(1));
let (transport, _rx) = make_transport(io);
let addr = test_addr(2).to_transport_addr();
assert_eq!(
transport.connection_state_sync(&addr),
ConnectionState::None
);
}
#[test]
fn test_tiebreaker_convention() {
use secp256k1::{Secp256k1, SecretKey};
let secp = Secp256k1::new();
let sk_a = SecretKey::from_slice(&[1u8; 32]).unwrap();
let sk_b = SecretKey::from_slice(&[2u8; 32]).unwrap();
let (pk_a, _) = sk_a.public_key(&secp).x_only_public_key();
let (pk_b, _) = sk_b.public_key(&secp).x_only_public_key();
let addr_a = NodeAddr::from_pubkey(&pk_a);
let addr_b = NodeAddr::from_pubkey(&pk_b);
let (smaller, larger) = if addr_a < addr_b {
(addr_a, addr_b)
} else {
(addr_b, addr_a)
};
assert!(smaller < larger, "test setup: smaller < larger");
}
}