use async_trait::async_trait;
use std::collections::HashMap;
#[cfg(all(feature = "ble", target_os = "macos"))]
use std::ffi::OsStr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::{RwLock, mpsc};
use super::addr::{TransportAddr, TransportType};
use super::capabilities::TransportCapabilities;
use super::provider::{
InboundDatagram, LinkQuality, TransportError, TransportProvider, TransportStats,
};
#[cfg(feature = "ble")]
#[allow(unused_imports)]
use btleplug::api::{
Central, CentralEvent, Characteristic, Manager as BtleManager, Peripheral as BtlePeripheral,
ScanFilter, WriteType,
};
#[cfg(feature = "ble")]
use btleplug::platform::{Adapter, Manager, Peripheral};
#[cfg(feature = "ble")]
#[allow(unused_imports)]
use futures_util::stream::StreamExt;
#[cfg(feature = "ble")]
use uuid::Uuid;
pub const ANT_QUIC_SERVICE_UUID: [u8; 16] = [
0xa0, 0x3d, 0x7e, 0x9f, 0x0b, 0xca, 0x12, 0xfe, 0xa6, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
];
pub const TX_CHARACTERISTIC_UUID: [u8; 16] = [
0xa0, 0x3d, 0x7e, 0x9f, 0x0b, 0xca, 0x12, 0xfe, 0xa6, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02,
];
pub const RX_CHARACTERISTIC_UUID: [u8; 16] = [
0xa0, 0x3d, 0x7e, 0x9f, 0x0b, 0xca, 0x12, 0xfe, 0xa6, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03,
];
pub const CCCD_UUID: [u8; 16] = [
0x00, 0x00, 0x29, 0x02, 0x00, 0x00, 0x10, 0x00, 0x80, 0x00, 0x00, 0x80, 0x5f, 0x9b, 0x34, 0xfb,
];
pub const CCCD_ENABLE_NOTIFICATION: [u8; 2] = [0x01, 0x00];
pub const CCCD_ENABLE_INDICATION: [u8; 2] = [0x02, 0x00];
pub const CCCD_DISABLE: [u8; 2] = [0x00, 0x00];
pub const FRAGMENT_HEADER_SIZE: usize = 4;
pub const DEFAULT_BLE_MTU: usize = 244;
#[allow(dead_code)] pub const DEFAULT_FRAGMENT_PAYLOAD_SIZE: usize = DEFAULT_BLE_MTU - FRAGMENT_HEADER_SIZE;
pub mod fragment_flags {
pub const START: u8 = 0x01;
pub const END: u8 = 0x02;
pub const SINGLE: u8 = START | END;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FragmentHeader {
pub seq_num: u8,
pub flags: u8,
pub total: u8,
pub msg_id: u8,
}
impl FragmentHeader {
pub const fn new(seq_num: u8, flags: u8, total: u8, msg_id: u8) -> Self {
Self {
seq_num,
flags,
total,
msg_id,
}
}
pub const fn single(msg_id: u8) -> Self {
Self {
seq_num: 0,
flags: fragment_flags::SINGLE,
total: 1,
msg_id,
}
}
pub const fn is_start(&self) -> bool {
self.flags & fragment_flags::START != 0
}
pub const fn is_end(&self) -> bool {
self.flags & fragment_flags::END != 0
}
pub const fn is_single(&self) -> bool {
self.is_start() && self.is_end()
}
pub const fn to_bytes(&self) -> [u8; FRAGMENT_HEADER_SIZE] {
[self.seq_num, self.flags, self.total, self.msg_id]
}
pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
if bytes.len() < FRAGMENT_HEADER_SIZE {
return None;
}
Some(Self {
seq_num: bytes[0],
flags: bytes[1],
total: bytes[2],
msg_id: bytes[3],
})
}
}
#[derive(Debug, Clone)]
pub struct BlePacketFragmenter {
#[allow(dead_code)] mtu: usize,
payload_size: usize,
}
impl BlePacketFragmenter {
pub fn new(mtu: usize) -> Self {
assert!(
mtu > FRAGMENT_HEADER_SIZE,
"MTU must be greater than fragment header size ({})",
FRAGMENT_HEADER_SIZE
);
Self {
mtu,
payload_size: mtu - FRAGMENT_HEADER_SIZE,
}
}
pub fn default_ble() -> Self {
Self::new(DEFAULT_BLE_MTU)
}
pub const fn payload_size(&self) -> usize {
self.payload_size
}
#[allow(dead_code)] pub const fn mtu(&self) -> usize {
self.mtu
}
#[allow(dead_code)] pub fn needs_fragmentation(&self, data: &[u8]) -> bool {
data.len() > self.payload_size
}
pub fn fragment(&self, data: &[u8], msg_id: u8) -> Vec<Vec<u8>> {
if data.is_empty() {
let header = FragmentHeader::single(msg_id);
return vec![header.to_bytes().to_vec()];
}
let total_fragments = data.len().div_ceil(self.payload_size);
if total_fragments > 255 {
tracing::warn!(
data_len = data.len(),
max_fragments = 255,
"Data exceeds maximum fragment count"
);
}
let total = total_fragments.min(255) as u8;
let mut fragments = Vec::with_capacity(total as usize);
for (i, chunk) in data.chunks(self.payload_size).enumerate() {
if i >= 255 {
break; }
let seq_num = i as u8;
let flags = match (i == 0, i == total_fragments - 1) {
(true, true) => fragment_flags::SINGLE,
(true, false) => fragment_flags::START,
(false, true) => fragment_flags::END,
(false, false) => 0,
};
let header = FragmentHeader::new(seq_num, flags, total, msg_id);
let mut fragment = Vec::with_capacity(FRAGMENT_HEADER_SIZE + chunk.len());
fragment.extend_from_slice(&header.to_bytes());
fragment.extend_from_slice(chunk);
fragments.push(fragment);
}
fragments
}
}
impl Default for BlePacketFragmenter {
fn default() -> Self {
Self::default_ble()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
struct ReassemblyKey {
device_id: [u8; 6],
msg_id: u8,
}
#[derive(Debug)]
struct ReassemblyEntry {
fragments: Vec<Option<Vec<u8>>>,
received_count: usize,
expected_total: u8,
created: Instant,
}
impl ReassemblyEntry {
fn new(expected_total: u8) -> Self {
Self {
fragments: vec![None; expected_total as usize],
received_count: 0,
expected_total,
created: Instant::now(),
}
}
fn add_fragment(&mut self, seq_num: u8, payload: Vec<u8>) -> bool {
let idx = seq_num as usize;
if idx >= self.fragments.len() {
return false; }
if self.fragments[idx].is_some() {
return false; }
self.fragments[idx] = Some(payload);
self.received_count += 1;
true
}
fn is_complete(&self) -> bool {
self.received_count == self.expected_total as usize
}
fn assemble(&self) -> Vec<u8> {
let total_size: usize = self
.fragments
.iter()
.filter_map(|f| f.as_ref())
.map(|f| f.len())
.sum();
let mut result = Vec::with_capacity(total_size);
for data in self.fragments.iter().flatten() {
result.extend_from_slice(data);
}
result
}
fn is_expired(&self, timeout: Duration) -> bool {
self.created.elapsed() > timeout
}
}
#[derive(Debug)]
pub struct BleReassemblyBuffer {
entries: HashMap<ReassemblyKey, ReassemblyEntry>,
timeout: Duration,
}
impl BleReassemblyBuffer {
pub fn new(timeout: Duration) -> Self {
Self {
entries: HashMap::new(),
timeout,
}
}
pub fn default_timeout() -> Self {
Self::new(Duration::from_secs(30))
}
pub fn add_fragment(&mut self, device_id: [u8; 6], fragment: &[u8]) -> Option<Vec<u8>> {
let header = FragmentHeader::from_bytes(fragment)?;
let payload = fragment.get(FRAGMENT_HEADER_SIZE..)?.to_vec();
if header.is_single() {
return Some(payload);
}
let key = ReassemblyKey {
device_id,
msg_id: header.msg_id,
};
let entry = self
.entries
.entry(key)
.or_insert_with(|| ReassemblyEntry::new(header.total));
if entry.expected_total != header.total {
*entry = ReassemblyEntry::new(header.total);
}
entry.add_fragment(header.seq_num, payload);
if entry.is_complete() {
let complete = entry.assemble();
self.entries.remove(&key);
return Some(complete);
}
None
}
pub fn prune_stale(&mut self) -> usize {
let before = self.entries.len();
self.entries
.retain(|_, entry| !entry.is_expired(self.timeout));
before - self.entries.len()
}
pub fn pending_count(&self) -> usize {
self.entries.len()
}
#[allow(dead_code)] pub fn clear(&mut self) {
self.entries.clear();
}
}
impl Default for BleReassemblyBuffer {
fn default() -> Self {
Self::default_timeout()
}
}
#[cfg(feature = "ble")]
#[allow(dead_code)] pub(crate) fn uuid_from_bytes(bytes: &[u8; 16]) -> Uuid {
Uuid::from_bytes(*bytes)
}
#[cfg(feature = "ble")]
#[allow(dead_code)] pub(crate) fn service_uuid() -> Uuid {
uuid_from_bytes(&ANT_QUIC_SERVICE_UUID)
}
#[cfg(feature = "ble")]
#[allow(dead_code)] pub(crate) fn tx_uuid() -> Uuid {
uuid_from_bytes(&TX_CHARACTERISTIC_UUID)
}
#[cfg(feature = "ble")]
#[allow(dead_code)] pub(crate) fn rx_uuid() -> Uuid {
uuid_from_bytes(&RX_CHARACTERISTIC_UUID)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum BleConnectionState {
#[default]
Discovered,
Connecting,
Connected,
Disconnecting,
Disconnected,
}
impl std::fmt::Display for BleConnectionState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Discovered => write!(f, "discovered"),
Self::Connecting => write!(f, "connecting"),
Self::Connected => write!(f, "connected"),
Self::Disconnecting => write!(f, "disconnecting"),
Self::Disconnected => write!(f, "disconnected"),
}
}
}
#[derive(Debug, Clone)]
pub struct CharacteristicHandle {
pub uuid: [u8; 16],
pub write_without_response: bool,
pub notify: bool,
pub indicate: bool,
}
impl CharacteristicHandle {
pub fn tx() -> Self {
Self {
uuid: TX_CHARACTERISTIC_UUID,
write_without_response: true,
notify: false,
indicate: false,
}
}
pub fn rx() -> Self {
Self {
uuid: RX_CHARACTERISTIC_UUID,
write_without_response: false,
notify: true,
indicate: false,
}
}
}
pub struct BleConnection {
device_id: [u8; 6],
state: Arc<RwLock<BleConnectionState>>,
tx_characteristic: Option<CharacteristicHandle>,
rx_characteristic: Option<CharacteristicHandle>,
#[cfg(feature = "ble")]
peripheral: Option<Arc<Peripheral>>,
#[cfg(feature = "ble")]
btleplug_tx_char: Option<Characteristic>,
#[cfg(feature = "ble")]
btleplug_rx_char: Option<Characteristic>,
connected_at: Option<Instant>,
last_activity: Arc<RwLock<Instant>>,
shutdown_tx: mpsc::Sender<()>,
session_resumed: bool,
}
impl BleConnection {
pub fn new(device_id: [u8; 6]) -> Self {
Self::new_with_resumption(device_id, false)
}
pub fn new_with_resumption(device_id: [u8; 6], session_resumed: bool) -> Self {
let (shutdown_tx, _shutdown_rx) = mpsc::channel(1);
Self {
device_id,
state: Arc::new(RwLock::new(BleConnectionState::Discovered)),
tx_characteristic: None,
rx_characteristic: None,
#[cfg(feature = "ble")]
peripheral: None,
#[cfg(feature = "ble")]
btleplug_tx_char: None,
#[cfg(feature = "ble")]
btleplug_rx_char: None,
connected_at: None,
last_activity: Arc::new(RwLock::new(Instant::now())),
shutdown_tx,
session_resumed,
}
}
pub fn device_id(&self) -> [u8; 6] {
self.device_id
}
pub async fn state(&self) -> BleConnectionState {
*self.state.read().await
}
pub async fn is_connected(&self) -> bool {
*self.state.read().await == BleConnectionState::Connected
}
pub fn connection_duration(&self) -> Option<Duration> {
self.connected_at.map(|t| t.elapsed())
}
pub async fn idle_duration(&self) -> Duration {
self.last_activity.read().await.elapsed()
}
pub async fn touch(&self) {
*self.last_activity.write().await = Instant::now();
}
pub async fn start_connecting(&self) -> Result<(), TransportError> {
let mut state = self.state.write().await;
match *state {
BleConnectionState::Discovered | BleConnectionState::Disconnected => {
*state = BleConnectionState::Connecting;
Ok(())
}
other => Err(TransportError::Other {
message: format!("cannot connect from state: {other}"),
}),
}
}
pub async fn mark_connected(
&mut self,
tx_char: CharacteristicHandle,
rx_char: CharacteristicHandle,
) {
let mut state = self.state.write().await;
*state = BleConnectionState::Connected;
self.tx_characteristic = Some(tx_char);
self.rx_characteristic = Some(rx_char);
self.connected_at = Some(Instant::now());
*self.last_activity.write().await = Instant::now();
}
pub fn tx_characteristic(&self) -> Option<&CharacteristicHandle> {
self.tx_characteristic.as_ref()
}
pub fn rx_characteristic(&self) -> Option<&CharacteristicHandle> {
self.rx_characteristic.as_ref()
}
#[cfg(feature = "ble")]
pub fn set_peripheral(&mut self, peripheral: Arc<Peripheral>) {
self.peripheral = Some(peripheral);
}
#[cfg(feature = "ble")]
pub fn peripheral(&self) -> Option<&Arc<Peripheral>> {
self.peripheral.as_ref()
}
#[cfg(feature = "ble")]
pub fn set_btleplug_tx_char(&mut self, char: Characteristic) {
self.btleplug_tx_char = Some(char);
}
#[cfg(feature = "ble")]
pub fn btleplug_tx_char(&self) -> Option<&Characteristic> {
self.btleplug_tx_char.as_ref()
}
#[cfg(feature = "ble")]
pub fn set_btleplug_rx_char(&mut self, char: Characteristic) {
self.btleplug_rx_char = Some(char);
}
#[cfg(feature = "ble")]
pub fn btleplug_rx_char(&self) -> Option<&Characteristic> {
self.btleplug_rx_char.as_ref()
}
pub fn set_session_resumed(&mut self, resumed: bool) {
self.session_resumed = resumed;
}
pub fn was_session_resumed(&self) -> bool {
self.session_resumed
}
pub async fn start_disconnect(&self) -> Result<(), TransportError> {
let mut state = self.state.write().await;
match *state {
BleConnectionState::Connected | BleConnectionState::Connecting => {
*state = BleConnectionState::Disconnecting;
let _ = self.shutdown_tx.send(()).await;
Ok(())
}
BleConnectionState::Disconnecting | BleConnectionState::Disconnected => {
Ok(())
}
other => Err(TransportError::Other {
message: format!("cannot disconnect from state: {other}"),
}),
}
}
pub async fn mark_disconnected(&self) {
let mut state = self.state.write().await;
*state = BleConnectionState::Disconnected;
}
}
impl Drop for BleConnection {
fn drop(&mut self) {
tracing::debug!(
device_id = ?self.device_id,
"BleConnection dropped"
);
}
}
impl std::fmt::Debug for BleConnection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BleConnection")
.field("device_id", &format!("{:02x?}", self.device_id))
.field("tx_characteristic", &self.tx_characteristic.is_some())
.field("rx_characteristic", &self.rx_characteristic.is_some())
.field("connected_at", &self.connected_at)
.finish()
}
}
#[derive(Debug, Clone)]
pub struct BleConfig {
pub service_uuid: [u8; 16],
pub session_cache_duration: Duration,
pub max_connections: usize,
pub scan_interval: Duration,
pub connection_timeout: Duration,
pub session_persist_path: Option<std::path::PathBuf>,
pub max_cached_sessions: usize,
pub session_cleanup_interval: Option<Duration>,
}
impl Default for BleConfig {
fn default() -> Self {
Self {
service_uuid: ANT_QUIC_SERVICE_UUID,
session_cache_duration: Duration::from_secs(24 * 60 * 60), max_connections: 5,
scan_interval: Duration::from_secs(10),
connection_timeout: Duration::from_secs(30),
session_persist_path: None,
max_cached_sessions: 100, session_cleanup_interval: Some(Duration::from_secs(10 * 60)), }
}
}
#[derive(Clone)]
struct CachedSession {
device_id: [u8; 6],
session_key: [u8; 32],
session_id: u16,
established: Instant,
last_active: Instant,
}
impl CachedSession {
fn is_expired(&self, max_age: Duration) -> bool {
self.established.elapsed() > max_age
}
#[allow(dead_code)]
fn is_idle(&self, max_idle: Duration) -> bool {
self.last_active.elapsed() > max_idle
}
}
#[derive(Clone)]
pub struct ResumeToken {
pub peer_id_hash: [u8; 16],
pub session_hash: [u8; 16],
}
impl ResumeToken {
pub fn to_bytes(&self) -> [u8; 32] {
let mut bytes = [0u8; 32];
bytes[..16].copy_from_slice(&self.peer_id_hash);
bytes[16..].copy_from_slice(&self.session_hash);
bytes
}
pub fn from_bytes(bytes: &[u8; 32]) -> Self {
let mut peer_id_hash = [0u8; 16];
let mut session_hash = [0u8; 16];
peer_id_hash.copy_from_slice(&bytes[..16]);
session_hash.copy_from_slice(&bytes[16..]);
Self {
peer_id_hash,
session_hash,
}
}
}
#[derive(Debug, Clone)]
struct PersistedSession {
device_id: String,
session_key_hash: [u8; 32],
session_id: u16,
established_unix: u64,
}
impl PersistedSession {
fn from_cached(cached: &CachedSession) -> Self {
use std::time::{SystemTime, UNIX_EPOCH};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
std::hash::Hash::hash(&cached.session_key, &mut hasher);
let hash_val = std::hash::Hasher::finish(&hasher);
let mut session_key_hash = [0u8; 32];
session_key_hash[..8].copy_from_slice(&hash_val.to_le_bytes());
for (i, chunk) in cached.session_key.chunks(8).enumerate() {
let start = 8 + i * 8;
if start + chunk.len() <= 32 {
session_key_hash[start..start + chunk.len()].copy_from_slice(chunk);
}
}
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let elapsed = cached.established.elapsed().as_secs();
let established_unix = now.saturating_sub(elapsed);
Self {
device_id: hex::encode(cached.device_id),
session_key_hash,
session_id: cached.session_id,
established_unix,
}
}
}
#[derive(Debug)]
struct SessionCacheFile {
version: u32,
sessions: Vec<PersistedSession>,
}
impl SessionCacheFile {
const CURRENT_VERSION: u32 = 1;
fn new() -> Self {
Self {
version: Self::CURRENT_VERSION,
sessions: Vec::new(),
}
}
fn to_bytes(&self) -> Vec<u8> {
let mut bytes = Vec::new();
bytes.extend_from_slice(&self.version.to_le_bytes());
let count = self.sessions.len() as u32;
bytes.extend_from_slice(&count.to_le_bytes());
for session in &self.sessions {
let device_bytes = session.device_id.as_bytes();
let len = device_bytes.len().min(12) as u8;
bytes.push(len);
bytes.extend_from_slice(&device_bytes[..len as usize]);
bytes.extend(std::iter::repeat_n(0u8, 12 - len as usize));
bytes.extend_from_slice(&session.session_key_hash);
bytes.extend_from_slice(&session.session_id.to_le_bytes());
bytes.extend_from_slice(&session.established_unix.to_le_bytes());
}
bytes
}
fn from_bytes(bytes: &[u8]) -> Option<Self> {
if bytes.len() < 8 {
return None;
}
let version = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
if version != Self::CURRENT_VERSION {
return None; }
let count = u32::from_le_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]) as usize;
let mut sessions = Vec::with_capacity(count);
let mut offset = 8;
for _ in 0..count {
if offset + 55 > bytes.len() {
break; }
let len = bytes[offset] as usize;
offset += 1;
let device_id = String::from_utf8_lossy(&bytes[offset..offset + len]).to_string();
offset += 12;
let mut session_key_hash = [0u8; 32];
session_key_hash.copy_from_slice(&bytes[offset..offset + 32]);
offset += 32;
let session_id = u16::from_le_bytes([bytes[offset], bytes[offset + 1]]);
offset += 2;
let established_unix = u64::from_le_bytes([
bytes[offset],
bytes[offset + 1],
bytes[offset + 2],
bytes[offset + 3],
bytes[offset + 4],
bytes[offset + 5],
bytes[offset + 6],
bytes[offset + 7],
]);
offset += 8;
sessions.push(PersistedSession {
device_id,
session_key_hash,
session_id,
established_unix,
});
}
Some(Self { version, sessions })
}
}
#[derive(Debug, Clone)]
pub struct DiscoveredDevice {
pub device_id: [u8; 6],
pub local_name: Option<String>,
pub rssi: Option<i16>,
pub discovered_at: Instant,
pub last_seen: Instant,
pub has_service: bool,
#[cfg(feature = "ble")]
pub(crate) btleplug_id: Option<String>,
}
impl DiscoveredDevice {
pub fn new(device_id: [u8; 6]) -> Self {
let now = Instant::now();
Self {
device_id,
local_name: None,
rssi: None,
discovered_at: now,
last_seen: now,
has_service: false,
#[cfg(feature = "ble")]
btleplug_id: None,
}
}
#[cfg(feature = "ble")]
pub fn with_btleplug_id(device_id: [u8; 6], btleplug_id: String) -> Self {
let now = Instant::now();
Self {
device_id,
local_name: None,
rssi: None,
discovered_at: now,
last_seen: now,
has_service: false,
btleplug_id: Some(btleplug_id),
}
}
pub fn update_last_seen(&mut self) {
self.last_seen = Instant::now();
}
pub fn is_recent(&self, max_age: Duration) -> bool {
self.last_seen.elapsed() < max_age
}
pub fn age(&self) -> Duration {
self.last_seen.elapsed()
}
}
#[derive(Debug, Clone)]
pub struct ScanEvent {
pub device: DiscoveredDevice,
pub is_new: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ScanState {
#[default]
Idle,
Scanning,
Stopping,
}
impl std::fmt::Display for ScanState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Idle => write!(f, "idle"),
Self::Scanning => write!(f, "scanning"),
Self::Stopping => write!(f, "stopping"),
}
}
}
pub struct BleTransport {
config: BleConfig,
capabilities: TransportCapabilities,
local_device_id: [u8; 6],
online: AtomicBool,
stats: BleTransportStats,
session_cache: Arc<RwLock<Vec<CachedSession>>>,
inbound_tx: mpsc::Sender<InboundDatagram>,
inbound_rx: Arc<RwLock<Option<mpsc::Receiver<InboundDatagram>>>>,
shutdown_tx: mpsc::Sender<()>,
scan_state: Arc<RwLock<ScanState>>,
discovered_devices: Arc<RwLock<HashMap<[u8; 6], DiscoveredDevice>>>,
scan_event_tx: mpsc::Sender<ScanEvent>,
#[allow(dead_code)]
scan_event_rx: Arc<RwLock<Option<mpsc::Receiver<ScanEvent>>>>,
active_connections: Arc<RwLock<HashMap<[u8; 6], Arc<RwLock<BleConnection>>>>>,
#[cfg(feature = "ble")]
adapter: Arc<Adapter>,
fragmenter: BlePacketFragmenter,
reassembly: Arc<RwLock<BleReassemblyBuffer>>,
next_msg_id: AtomicU8,
}
struct BleTransportStats {
datagrams_sent: AtomicU64,
datagrams_received: AtomicU64,
bytes_sent: AtomicU64,
bytes_received: AtomicU64,
send_errors: AtomicU64,
receive_errors: AtomicU64,
session_cache_hits: AtomicU64,
session_cache_misses: AtomicU64,
}
impl Default for BleTransportStats {
fn default() -> Self {
Self {
datagrams_sent: AtomicU64::new(0),
datagrams_received: AtomicU64::new(0),
bytes_sent: AtomicU64::new(0),
bytes_received: AtomicU64::new(0),
send_errors: AtomicU64::new(0),
receive_errors: AtomicU64::new(0),
session_cache_hits: AtomicU64::new(0),
session_cache_misses: AtomicU64::new(0),
}
}
}
impl BleTransport {
pub async fn new() -> Result<Self, TransportError> {
Self::with_config(BleConfig::default()).await
}
#[cfg(feature = "ble")]
pub async fn with_config(config: BleConfig) -> Result<Self, TransportError> {
#[cfg(target_os = "macos")]
Self::ensure_macos_usage_description()?;
let (adapter, local_device_id) = Self::get_adapter_and_device_id().await?;
let (inbound_tx, inbound_rx) = mpsc::channel(256);
let (shutdown_tx, _shutdown_rx) = mpsc::channel(1);
let (scan_event_tx, scan_event_rx) = mpsc::channel(64);
let fragmenter = BlePacketFragmenter::new(TransportCapabilities::ble().mtu);
let transport = Self {
config,
capabilities: TransportCapabilities::ble(),
local_device_id,
online: AtomicBool::new(true),
stats: BleTransportStats::default(),
session_cache: Arc::new(RwLock::new(Vec::new())),
inbound_tx,
inbound_rx: Arc::new(RwLock::new(Some(inbound_rx))),
shutdown_tx,
scan_state: Arc::new(RwLock::new(ScanState::Idle)),
discovered_devices: Arc::new(RwLock::new(HashMap::new())),
scan_event_tx,
scan_event_rx: Arc::new(RwLock::new(Some(scan_event_rx))),
active_connections: Arc::new(RwLock::new(HashMap::new())),
adapter: Arc::new(adapter),
fragmenter,
reassembly: Arc::new(RwLock::new(BleReassemblyBuffer::default())),
next_msg_id: AtomicU8::new(0),
};
if transport.config.session_persist_path.is_some() {
if let Err(e) = transport.load_sessions_from_disk().await {
tracing::warn!(error = %e, "Failed to load session cache from disk");
}
}
Ok(transport)
}
#[cfg(not(feature = "ble"))]
pub async fn with_config(_config: BleConfig) -> Result<Self, TransportError> {
Err(TransportError::Other {
message: "BLE transport requires the 'ble' feature".to_string(),
})
}
#[cfg(all(feature = "ble", target_os = "macos"))]
fn ensure_macos_usage_description() -> Result<(), TransportError> {
let current_exe = std::env::current_exe().map_err(|_| TransportError::Other {
message:
"BLE transport unavailable on macOS because the current executable path could not be determined"
.to_string(),
})?;
if Self::app_bundle_info_plist(¤t_exe)
.as_deref()
.is_some_and(Self::file_declares_macos_bluetooth_usage)
{
return Ok(());
}
Err(TransportError::Other {
message: "BLE transport is compiled in on macOS but inactive for plain CLI/test binaries; run from an app bundle with NSBluetoothAlwaysUsageDescription to enable it".to_string(),
})
}
#[cfg(all(feature = "ble", target_os = "macos"))]
fn file_declares_macos_bluetooth_usage(path: &std::path::Path) -> bool {
const BLUETOOTH_USAGE_KEYS: [&[u8]; 2] = [
b"NSBluetoothAlwaysUsageDescription",
b"NSBluetoothPeripheralUsageDescription",
];
let bytes = match std::fs::read(path) {
Ok(bytes) => bytes,
Err(_) => return false,
};
BLUETOOTH_USAGE_KEYS
.iter()
.any(|needle| bytes.windows(needle.len()).any(|window| window == *needle))
}
#[cfg(all(feature = "ble", target_os = "macos"))]
fn app_bundle_info_plist(executable: &std::path::Path) -> Option<std::path::PathBuf> {
let macos_dir = executable.parent()?;
if macos_dir.file_name() != Some(OsStr::new("MacOS")) {
return None;
}
let contents_dir = macos_dir.parent()?;
if contents_dir.file_name() != Some(OsStr::new("Contents")) {
return None;
}
Some(contents_dir.join("Info.plist"))
}
#[cfg(feature = "ble")]
async fn get_adapter_and_device_id() -> Result<(Adapter, [u8; 6]), TransportError> {
let manager = Manager::new().await.map_err(|e| TransportError::Other {
message: format!("Failed to create BLE manager: {e}"),
})?;
let adapters = manager
.adapters()
.await
.map_err(|e| TransportError::Other {
message: format!("Failed to get BLE adapters: {e}"),
})?;
let adapter = adapters
.into_iter()
.next()
.ok_or_else(|| TransportError::Other {
message: "No Bluetooth adapter found".to_string(),
})?;
let adapter_info = adapter
.adapter_info()
.await
.map_err(|e| TransportError::Other {
message: format!("Failed to get adapter info: {e}"),
})?;
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(adapter_info.as_bytes());
let hash = hasher.finalize();
let mut device_id = [0u8; 6];
device_id.copy_from_slice(&hash[..6]);
device_id[0] |= 0x02;
tracing::info!(
adapter = %adapter_info,
device_id = ?device_id,
"BLE adapter initialized"
);
Ok((adapter, device_id))
}
#[cfg(feature = "ble")]
#[allow(dead_code)] async fn get_local_adapter_address() -> Result<[u8; 6], TransportError> {
let (_adapter, device_id) = Self::get_adapter_and_device_id().await?;
Ok(device_id)
}
#[cfg(not(feature = "ble"))]
async fn get_local_adapter_address() -> Result<[u8; 6], TransportError> {
Err(TransportError::Other {
message: "BLE transport is not supported without the 'ble' feature".to_string(),
})
}
pub async fn lookup_session(&self, device_id: &[u8; 6]) -> Option<ResumeToken> {
let cache = self.session_cache.read().await;
let max_age = self.config.session_cache_duration;
for session in cache.iter() {
if &session.device_id == device_id && !session.is_expired(max_age) {
self.stats
.session_cache_hits
.fetch_add(1, Ordering::Relaxed);
let mut peer_id_hash = [0u8; 16];
peer_id_hash[..6].copy_from_slice(device_id);
let session_hash = {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(session.session_key);
hasher.update(session.session_id.to_le_bytes());
let result = hasher.finalize();
let mut hash = [0u8; 16];
hash.copy_from_slice(&result[..16]);
hash
};
return Some(ResumeToken {
peer_id_hash,
session_hash,
});
}
}
self.stats
.session_cache_misses
.fetch_add(1, Ordering::Relaxed);
None
}
pub async fn cache_session(&self, device_id: [u8; 6], session_key: [u8; 32], session_id: u16) {
let mut cache = self.session_cache.write().await;
let max_age = self.config.session_cache_duration;
cache.retain(|s| !s.is_expired(max_age));
if let Some(session) = cache.iter_mut().find(|s| s.device_id == device_id) {
session.session_key = session_key;
session.session_id = session_id;
session.last_active = Instant::now();
return;
}
cache.push(CachedSession {
device_id,
session_key,
session_id,
established: Instant::now(),
last_active: Instant::now(),
});
while cache.len() > 100 {
if let Some(idx) = cache
.iter()
.enumerate()
.min_by_key(|(_, s)| s.established)
.map(|(i, _)| i)
{
cache.remove(idx);
}
}
}
pub fn cache_stats(&self) -> (u64, u64) {
(
self.stats.session_cache_hits.load(Ordering::Relaxed),
self.stats.session_cache_misses.load(Ordering::Relaxed),
)
}
pub async fn cache_connection_session(&self, device_id: [u8; 6], session_key: [u8; 32]) {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
std::hash::Hash::hash(&device_id, &mut hasher);
std::hash::Hash::hash(&Instant::now().elapsed().as_nanos(), &mut hasher);
let session_id = (std::hash::Hasher::finish(&hasher) & 0xFFFF) as u16;
self.cache_session(device_id, session_key, session_id).await;
tracing::debug!(
device_id = ?device_id,
session_id,
"Cached session for future resumption"
);
}
pub async fn touch_session(&self, device_id: &[u8; 6]) {
let mut cache = self.session_cache.write().await;
if let Some(session) = cache.iter_mut().find(|s| &s.device_id == device_id) {
session.last_active = Instant::now();
}
}
pub async fn cached_session_count(&self) -> usize {
self.session_cache.read().await.len()
}
pub async fn prune_expired_sessions(&self) -> usize {
let mut cache = self.session_cache.write().await;
let before = cache.len();
let max_age = self.config.session_cache_duration;
cache.retain(|s| !s.is_expired(max_age));
let expired_removed = before - cache.len();
let max_sessions = self.config.max_cached_sessions;
let lru_removed = if max_sessions > 0 && cache.len() > max_sessions {
cache.sort_by_key(|s| std::cmp::Reverse(s.last_active));
let to_remove = cache.len() - max_sessions;
cache.truncate(max_sessions);
to_remove
} else {
0
};
let total_removed = expired_removed + lru_removed;
if total_removed > 0 {
tracing::debug!(
expired = expired_removed,
lru = lru_removed,
remaining = cache.len(),
"Pruned sessions"
);
}
total_removed
}
#[allow(dead_code)]
async fn evict_lru_sessions(&self, count: usize) -> usize {
let mut cache = self.session_cache.write().await;
if cache.len() <= count {
let removed = cache.len();
cache.clear();
return removed;
}
cache.sort_by_key(|s| s.last_active);
let before = cache.len();
cache.drain(0..count);
let removed = before - cache.len();
tracing::debug!(removed, remaining = cache.len(), "Evicted LRU sessions");
removed
}
pub async fn clear_session_cache(&self) {
let mut cache = self.session_cache.write().await;
let count = cache.len();
cache.clear();
tracing::debug!(count, "Cleared session cache");
}
pub async fn save_sessions_to_disk(&self) -> Result<(), TransportError> {
let path = match &self.config.session_persist_path {
Some(p) => p.clone(),
None => return Ok(()), };
let cache = self.session_cache.read().await;
let mut file = SessionCacheFile::new();
for session in cache.iter() {
file.sessions.push(PersistedSession::from_cached(session));
}
let bytes = file.to_bytes();
std::fs::write(&path, &bytes).map_err(|e| TransportError::Other {
message: format!("Failed to save session cache to {}: {}", path.display(), e),
})?;
tracing::info!(
path = %path.display(),
sessions = cache.len(),
"Saved session cache to disk"
);
Ok(())
}
pub async fn load_sessions_from_disk(&self) -> Result<usize, TransportError> {
let path = match &self.config.session_persist_path {
Some(p) => p.clone(),
None => return Ok(0), };
if !path.exists() {
tracing::debug!(path = %path.display(), "Session cache file does not exist");
return Ok(0);
}
let bytes = std::fs::read(&path).map_err(|e| TransportError::Other {
message: format!(
"Failed to read session cache from {}: {}",
path.display(),
e
),
})?;
let file = match SessionCacheFile::from_bytes(&bytes) {
Some(f) => f,
None => {
tracing::warn!(
path = %path.display(),
"Invalid or corrupted session cache file, ignoring"
);
return Ok(0);
}
};
tracing::info!(
path = %path.display(),
sessions = file.sessions.len(),
"Loaded session cache metadata from disk (keys not restored)"
);
Ok(file.sessions.len())
}
pub fn start_cleanup_task(self: &Arc<Self>) -> Option<tokio::task::JoinHandle<()>> {
let interval = self.config.session_cleanup_interval?;
let transport = Arc::clone(self);
Some(tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
ticker.tick().await;
let pruned = transport.prune_expired_sessions().await;
if pruned > 0 {
tracing::debug!(pruned, "Periodic session cleanup completed");
}
let stale = transport.prune_stale_reassemblies().await;
if stale > 0 {
tracing::debug!(stale, "Pruned stale reassembly buffers");
}
if transport.config.session_persist_path.is_some() {
if let Err(e) = transport.save_sessions_to_disk().await {
tracing::warn!(error = %e, "Failed to persist session cache");
}
}
}
}))
}
#[cfg(feature = "ble")]
pub fn adapter(&self) -> &Arc<Adapter> {
&self.adapter
}
pub fn estimate_handshake_time(&self) -> Duration {
Duration::from_millis(1100)
}
pub async fn has_cached_session(&self, device_id: &[u8; 6]) -> bool {
self.lookup_session(device_id).await.is_some()
}
pub fn platform_name() -> &'static str {
#[cfg(target_os = "linux")]
{
"Linux (BlueZ)"
}
#[cfg(target_os = "macos")]
{
"macOS (Core Bluetooth)"
}
#[cfg(target_os = "windows")]
{
"Windows (WinRT)"
}
#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
{
"Unsupported"
}
}
pub async fn scan_state(&self) -> ScanState {
*self.scan_state.read().await
}
pub async fn is_scanning(&self) -> bool {
*self.scan_state.read().await == ScanState::Scanning
}
#[cfg(feature = "ble")]
pub async fn start_scanning(&self) -> Result<(), TransportError> {
if !self.online.load(Ordering::SeqCst) {
return Err(TransportError::Offline);
}
let mut state = self.scan_state.write().await;
if *state == ScanState::Scanning {
return Err(TransportError::Other {
message: "Already scanning".to_string(),
});
}
*state = ScanState::Scanning;
tracing::info!(
service_uuid = ?self.config.service_uuid,
platform = %Self::platform_name(),
"Starting BLE scan"
);
let service_filter = ScanFilter {
services: vec![service_uuid()],
};
self.adapter
.start_scan(service_filter)
.await
.map_err(|e| TransportError::Other {
message: format!("Failed to start BLE scan: {e}"),
})?;
let adapter = self.adapter.clone();
let discovered_devices = self.discovered_devices.clone();
let scan_event_tx = self.scan_event_tx.clone();
let scan_state = self.scan_state.clone();
#[allow(unused_variables)] let config_service_uuid = self.config.service_uuid;
tokio::spawn(async move {
let mut events = match adapter.events().await {
Ok(events) => events,
Err(e) => {
tracing::error!(error = %e, "Failed to get adapter events stream");
return;
}
};
while let Some(event) = events.next().await {
let state = *scan_state.read().await;
if state != ScanState::Scanning {
break;
}
match event {
CentralEvent::DeviceDiscovered(id) => {
if let Ok(peripheral) = adapter.peripheral(&id).await {
if let Ok(Some(props)) = peripheral.properties().await {
let local_name = props.local_name.clone();
let rssi = props.rssi;
let has_service =
props.services.iter().any(|s| *s == service_uuid());
let btleplug_id_str = id.to_string();
let device_id = Self::peripheral_id_to_device_id(&btleplug_id_str);
let mut device =
DiscoveredDevice::with_btleplug_id(device_id, btleplug_id_str);
device.local_name = local_name;
device.rssi = rssi;
device.has_service = has_service;
let mut devices = discovered_devices.write().await;
let is_new = !devices.contains_key(&device_id);
devices.insert(device_id, device.clone());
tracing::debug!(
device_id = ?device_id,
local_name = ?device.local_name,
rssi = ?device.rssi,
has_service = device.has_service,
is_new,
"Discovered BLE device"
);
let event = ScanEvent { device, is_new };
if scan_event_tx.send(event).await.is_err() {
tracing::debug!("Scan event receiver dropped");
}
}
}
}
CentralEvent::DeviceUpdated(id) => {
let device_id = Self::peripheral_id_to_device_id(&id.to_string());
if let Ok(peripheral) = adapter.peripheral(&id).await {
if let Ok(Some(props)) = peripheral.properties().await {
let mut devices = discovered_devices.write().await;
if let Some(device) = devices.get_mut(&device_id) {
device.update_last_seen();
device.rssi = props.rssi;
if props.local_name.is_some() {
device.local_name = props.local_name.clone();
}
let has_service =
props.services.iter().any(|s| *s == service_uuid());
if has_service {
device.has_service = true;
}
tracing::trace!(
device_id = ?device_id,
rssi = ?device.rssi,
"Updated BLE device"
);
}
}
}
}
CentralEvent::DeviceDisconnected(id) => {
let device_id = Self::peripheral_id_to_device_id(&id.to_string());
tracing::debug!(device_id = ?device_id, "BLE device disconnected");
}
_ => {
}
}
}
tracing::info!("BLE scan event processing stopped");
});
Ok(())
}
#[cfg(feature = "ble")]
fn peripheral_id_to_device_id(id_str: &str) -> [u8; 6] {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(id_str.as_bytes());
let hash = hasher.finalize();
let mut device_id = [0u8; 6];
device_id.copy_from_slice(&hash[..6]);
device_id[0] |= 0x02;
device_id
}
#[cfg(not(feature = "ble"))]
pub async fn start_scanning(&self) -> Result<(), TransportError> {
Err(TransportError::Other {
message: "BLE scanning is not supported without the 'ble' feature".to_string(),
})
}
#[cfg(feature = "ble")]
pub async fn stop_scanning(&self) -> Result<(), TransportError> {
let mut state = self.scan_state.write().await;
if *state != ScanState::Scanning {
return Ok(());
}
*state = ScanState::Stopping;
tracing::info!(
platform = %Self::platform_name(),
"Stopping BLE scan"
);
self.adapter
.stop_scan()
.await
.map_err(|e| TransportError::Other {
message: format!("Failed to stop BLE scan: {e}"),
})?;
*state = ScanState::Idle;
Ok(())
}
#[cfg(not(feature = "ble"))]
pub async fn stop_scanning(&self) -> Result<(), TransportError> {
Ok(())
}
pub async fn discovered_devices(&self) -> Vec<DiscoveredDevice> {
self.discovered_devices
.read()
.await
.values()
.cloned()
.collect()
}
pub async fn get_discovered_device(&self, device_id: &[u8; 6]) -> Option<DiscoveredDevice> {
self.discovered_devices.read().await.get(device_id).cloned()
}
pub async fn discovered_device_count(&self) -> usize {
self.discovered_devices.read().await.len()
}
pub async fn clear_discovered_devices(&self) {
self.discovered_devices.write().await.clear();
}
pub async fn prune_stale_devices(&self, max_age: Duration) -> usize {
let mut devices = self.discovered_devices.write().await;
let initial_count = devices.len();
devices.retain(|_, device| device.is_recent(max_age));
initial_count - devices.len()
}
pub async fn add_discovered_device(&self, device: DiscoveredDevice) -> bool {
let mut devices = self.discovered_devices.write().await;
let is_new = !devices.contains_key(&device.device_id);
let device_id = device.device_id;
devices.insert(device_id, device.clone());
let event = ScanEvent { device, is_new };
if self.scan_event_tx.send(event).await.is_err() {
tracing::debug!("Scan event receiver dropped");
}
is_new
}
pub async fn take_scan_events(&self) -> Option<mpsc::Receiver<ScanEvent>> {
self.scan_event_rx.write().await.take()
}
#[cfg(feature = "ble")]
pub async fn connect_to_device(
&self,
device_id: [u8; 6],
) -> Result<Arc<RwLock<BleConnection>>, TransportError> {
use btleplug::api::Peripheral as _;
if !self.online.load(Ordering::SeqCst) {
return Err(TransportError::Offline);
}
let connections = self.active_connections.read().await;
if connections.len() >= self.config.max_connections {
return Err(TransportError::Other {
message: format!(
"Connection limit exceeded: {} / {}",
connections.len(),
self.config.max_connections
),
});
}
if connections.contains_key(&device_id) {
return Err(TransportError::Other {
message: format!("Already connected to device: {:02x?}", device_id),
});
}
drop(connections);
let btleplug_id_str = {
let discovered = self.discovered_devices.read().await;
let device = discovered
.get(&device_id)
.ok_or_else(|| TransportError::Other {
message: format!("Device not discovered: {:02x?}", device_id),
})?;
device
.btleplug_id
.clone()
.ok_or_else(|| TransportError::Other {
message: format!("Device {:02x?} has no btleplug ID", device_id),
})?
};
let resume_token = self.lookup_session(&device_id).await;
let using_session_resumption = resume_token.is_some();
if using_session_resumption {
tracing::info!(
device_id = ?device_id,
"Found cached session - using fast handshake (32 bytes vs ~8KB)"
);
} else {
tracing::info!(
device_id = ?device_id,
"No cached session - will perform full PQC handshake"
);
}
tracing::info!(
device_id = ?device_id,
btleplug_id = %btleplug_id_str,
platform = %Self::platform_name(),
session_resumption = using_session_resumption,
"Connecting to BLE device"
);
let mut connection = BleConnection::new(device_id);
connection.set_session_resumed(using_session_resumption);
connection.start_connecting().await?;
let peripheral = self
.find_peripheral_by_id(&btleplug_id_str)
.await
.ok_or_else(|| TransportError::Other {
message: format!("Peripheral not found: {}", btleplug_id_str),
})?;
peripheral
.connect()
.await
.map_err(|e| TransportError::Other {
message: format!("Failed to connect: {e}"),
})?;
peripheral
.discover_services()
.await
.map_err(|e| TransportError::Other {
message: format!("Failed to discover services: {e}"),
})?;
let services = peripheral.services();
let our_service = services
.iter()
.find(|s| s.uuid == service_uuid())
.ok_or_else(|| TransportError::Other {
message: format!("ant-quic service not found on device {:02x?}", device_id),
})?;
let tx_char = our_service
.characteristics
.iter()
.find(|c| c.uuid == tx_uuid())
.cloned()
.ok_or_else(|| TransportError::Other {
message: "TX characteristic not found".to_string(),
})?;
let rx_char = our_service
.characteristics
.iter()
.find(|c| c.uuid == rx_uuid())
.cloned()
.ok_or_else(|| TransportError::Other {
message: "RX characteristic not found".to_string(),
})?;
tracing::debug!(
tx_uuid = %tx_char.uuid,
rx_uuid = %rx_char.uuid,
"Found ant-quic characteristics"
);
peripheral
.subscribe(&rx_char)
.await
.map_err(|e| TransportError::Other {
message: format!("Failed to subscribe to RX notifications: {e}"),
})?;
tracing::debug!(
device_id = ?device_id,
"Subscribed to RX notifications"
);
let peripheral_arc = Arc::new(peripheral);
connection.set_peripheral(peripheral_arc.clone());
connection.set_btleplug_tx_char(tx_char.clone());
connection.set_btleplug_rx_char(rx_char.clone());
connection
.mark_connected(CharacteristicHandle::tx(), CharacteristicHandle::rx())
.await;
let connection = Arc::new(RwLock::new(connection));
self.active_connections
.write()
.await
.insert(device_id, connection.clone());
let inbound_tx = self.inbound_tx.clone();
let config_service_uuid = self.config.service_uuid;
tokio::spawn(async move {
let mut notifications = match peripheral_arc.notifications().await {
Ok(stream) => stream,
Err(e) => {
tracing::error!(
device_id = ?device_id,
error = %e,
"Failed to get notification stream"
);
return;
}
};
tracing::info!(
device_id = ?device_id,
"Started notification handler"
);
while let Some(notification) = notifications.next().await {
if notification.uuid == rx_uuid() {
let data_len = notification.value.len();
let datagram = InboundDatagram {
source: TransportAddr::ble(device_id, Some(config_service_uuid)),
data: notification.value,
received_at: Instant::now(),
link_quality: Some(LinkQuality {
rssi: None, snr: None,
hop_count: Some(1),
rtt: None,
}),
};
if inbound_tx.send(datagram).await.is_err() {
tracing::debug!(
device_id = ?device_id,
"Inbound channel closed, stopping notification handler"
);
break;
}
tracing::trace!(
device_id = ?device_id,
data_len,
"Received BLE notification"
);
}
}
tracing::info!(
device_id = ?device_id,
"Notification handler stopped"
);
});
tracing::info!(
device_id = ?device_id,
session_resumed = using_session_resumption,
"BLE device connected"
);
if !using_session_resumption {
let session_id = (Instant::now().elapsed().as_millis() & 0xFFFF) as u16;
tracing::debug!(
device_id = ?device_id,
session_id,
"New connection - session can be cached after PQC handshake"
);
}
Ok(connection)
}
#[cfg(feature = "ble")]
async fn find_peripheral_by_id(&self, id_str: &str) -> Option<Peripheral> {
use btleplug::api::Peripheral as _;
let peripherals = self.adapter.peripherals().await.ok()?;
for peripheral in peripherals {
if peripheral.id().to_string() == id_str {
return Some(peripheral);
}
}
None
}
#[cfg(not(feature = "ble"))]
pub async fn connect_to_device(
&self,
_device_id: [u8; 6],
) -> Result<Arc<RwLock<BleConnection>>, TransportError> {
Err(TransportError::Other {
message: "BLE connections are not supported without the 'ble' feature".to_string(),
})
}
#[cfg(test)]
pub async fn connect_to_device_simulated(
&self,
device_id: [u8; 6],
) -> Result<Arc<RwLock<BleConnection>>, TransportError> {
if !self.online.load(Ordering::SeqCst) {
return Err(TransportError::Offline);
}
let connections = self.active_connections.read().await;
if connections.len() >= self.config.max_connections {
return Err(TransportError::Other {
message: format!(
"Connection limit exceeded: {} / {}",
connections.len(),
self.config.max_connections
),
});
}
if connections.contains_key(&device_id) {
return Err(TransportError::Other {
message: format!("Already connected to device: {:02x?}", device_id),
});
}
drop(connections);
{
let discovered = self.discovered_devices.read().await;
if !discovered.contains_key(&device_id) {
return Err(TransportError::Other {
message: format!("Device not discovered: {:02x?}", device_id),
});
}
}
let mut connection = BleConnection::new(device_id);
connection.start_connecting().await?;
connection
.mark_connected(CharacteristicHandle::tx(), CharacteristicHandle::rx())
.await;
let connection = Arc::new(RwLock::new(connection));
self.active_connections
.write()
.await
.insert(device_id, connection.clone());
tracing::debug!(
device_id = ?device_id,
"Created simulated BLE connection (test mode)"
);
Ok(connection)
}
pub async fn disconnect_from_device(&self, device_id: &[u8; 6]) -> Result<(), TransportError> {
let mut connections = self.active_connections.write().await;
if let Some(conn) = connections.remove(device_id) {
let conn = conn.read().await;
conn.start_disconnect().await?;
tracing::info!(
device_id = ?device_id,
"BLE device disconnected"
);
Ok(())
} else {
Err(TransportError::Other {
message: format!("No connection to device: {:02x?}", device_id),
})
}
}
pub async fn get_connection(&self, device_id: &[u8; 6]) -> Option<Arc<RwLock<BleConnection>>> {
self.active_connections.read().await.get(device_id).cloned()
}
pub async fn is_connected_to(&self, device_id: &[u8; 6]) -> bool {
if let Some(conn) = self.active_connections.read().await.get(device_id) {
conn.read().await.is_connected().await
} else {
false
}
}
pub async fn active_connection_count(&self) -> usize {
self.active_connections.read().await.len()
}
pub async fn connected_devices(&self) -> Vec<[u8; 6]> {
self.active_connections
.read()
.await
.keys()
.copied()
.collect()
}
pub async fn disconnect_all(&self) -> usize {
let mut connections = self.active_connections.write().await;
let count = connections.len();
for (device_id, conn) in connections.drain() {
let conn = conn.read().await;
if let Err(e) = conn.start_disconnect().await {
tracing::warn!(
device_id = ?device_id,
error = %e,
"Error disconnecting device"
);
}
}
tracing::info!(count, "Disconnected all BLE devices");
count
}
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
pub async fn connect_with_retry(
&self,
device_id: [u8; 6],
max_attempts: u32,
) -> Result<Arc<RwLock<BleConnection>>, TransportError> {
let mut attempts = 0;
let mut delay = Duration::from_millis(100);
let max_delay = Duration::from_secs(5);
loop {
attempts += 1;
match self.connect_to_device(device_id).await {
Ok(conn) => return Ok(conn),
Err(e) if attempts >= max_attempts => {
tracing::error!(
device_id = ?device_id,
attempts,
error = %e,
"Failed to connect after max attempts"
);
return Err(e);
}
Err(e) => {
tracing::warn!(
device_id = ?device_id,
attempt = attempts,
max_attempts,
delay_ms = delay.as_millis(),
error = %e,
"Connection failed, retrying"
);
self.active_connections.write().await.remove(&device_id);
tokio::time::sleep(delay).await;
delay = (delay * 2).min(max_delay);
}
}
}
}
#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
pub async fn connect_with_retry(
&self,
_device_id: [u8; 6],
_max_attempts: u32,
) -> Result<Arc<RwLock<BleConnection>>, TransportError> {
Err(TransportError::Other {
message: "BLE connections are not supported on this platform".to_string(),
})
}
pub async fn process_notification(
&self,
device_id: [u8; 6],
data: Vec<u8>,
) -> Result<(), TransportError> {
if !self.online.load(Ordering::SeqCst) {
return Err(TransportError::Offline);
}
let connections = self.active_connections.read().await;
if !connections.contains_key(&device_id) {
self.stats.receive_errors.fetch_add(1, Ordering::Relaxed);
return Err(TransportError::Other {
message: format!(
"Received notification from unknown device: {:02x?}",
device_id
),
});
}
if let Some(conn) = connections.get(&device_id) {
conn.read().await.touch().await;
}
drop(connections);
let fragment_len = data.len();
self.stats
.bytes_received
.fetch_add(fragment_len as u64, Ordering::Relaxed);
let complete_data = {
let mut reassembly = self.reassembly.write().await;
reassembly.add_fragment(device_id, &data)
};
let complete_data = match complete_data {
Some(data) => data,
None => {
tracing::trace!(
device_id = ?device_id,
fragment_len,
"BLE fragment received, waiting for more"
);
return Ok(());
}
};
let data_len = complete_data.len();
let datagram = InboundDatagram {
source: TransportAddr::ble(device_id, Some(self.config.service_uuid)),
data: complete_data,
received_at: Instant::now(),
link_quality: Some(LinkQuality {
rssi: None, snr: None,
hop_count: Some(1), rtt: None,
}),
};
self.inbound_tx
.send(datagram)
.await
.map_err(|_| TransportError::Other {
message: "Inbound channel closed".to_string(),
})?;
self.stats
.datagrams_received
.fetch_add(1, Ordering::Relaxed);
self.touch_session(&device_id).await;
tracing::trace!(
device_id = ?device_id,
data_len,
"Processed complete BLE message"
);
Ok(())
}
pub async fn take_inbound_receiver(&self) -> Option<mpsc::Receiver<InboundDatagram>> {
self.inbound_rx.write().await.take()
}
#[cfg(test)]
pub fn inbound_sender(&self) -> mpsc::Sender<InboundDatagram> {
self.inbound_tx.clone()
}
pub fn is_peripheral_mode_supported() -> bool {
#[cfg(target_os = "linux")]
{
true
}
#[cfg(target_os = "macos")]
{
false
}
#[cfg(target_os = "windows")]
{
false
}
#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
{
false
}
}
#[cfg(target_os = "linux")]
pub async fn start_advertising(&self) -> Result<(), TransportError> {
if !self.online.load(Ordering::SeqCst) {
return Err(TransportError::Offline);
}
tracing::info!(
service_uuid = ?self.config.service_uuid,
platform = %Self::platform_name(),
"Starting BLE advertising (peripheral mode - stub)"
);
Ok(())
}
#[cfg(not(target_os = "linux"))]
pub async fn start_advertising(&self) -> Result<(), TransportError> {
Err(TransportError::Other {
message: format!(
"Peripheral mode (advertising) is not supported on {}",
Self::platform_name()
),
})
}
pub async fn stop_advertising(&self) -> Result<(), TransportError> {
tracing::info!(
platform = %Self::platform_name(),
"Stopping BLE advertising"
);
Ok(())
}
pub async fn pool_stats(&self) -> ConnectionPoolStats {
let connections = self.active_connections.read().await;
let mut active = 0;
let mut connecting = 0;
let mut disconnecting = 0;
let mut oldest_activity = None;
for (_id, conn) in connections.iter() {
let conn = conn.read().await;
match conn.state().await {
BleConnectionState::Connected => active += 1,
BleConnectionState::Connecting => connecting += 1,
BleConnectionState::Disconnecting => disconnecting += 1,
_ => {}
}
let idle = conn.idle_duration().await;
if oldest_activity.is_none() || Some(idle) > oldest_activity {
oldest_activity = Some(idle);
}
}
ConnectionPoolStats {
max_connections: self.config.max_connections,
active,
connecting,
disconnecting,
total: connections.len(),
oldest_idle: oldest_activity,
}
}
pub async fn prune_stale_reassemblies(&self) -> usize {
let mut reassembly = self.reassembly.write().await;
reassembly.prune_stale()
}
pub async fn pending_reassemblies(&self) -> usize {
self.reassembly.read().await.pending_count()
}
pub async fn evict_lru_connection(&self) -> Option<[u8; 6]> {
let mut connections = self.active_connections.write().await;
if connections.is_empty() {
return None;
}
let mut lru_device = None;
let mut max_idle = Duration::ZERO;
for (device_id, conn) in connections.iter() {
let idle = conn.read().await.idle_duration().await;
if idle > max_idle {
max_idle = idle;
lru_device = Some(*device_id);
}
}
if let Some(device_id) = lru_device {
if let Some(conn) = connections.remove(&device_id) {
if let Err(e) = conn.read().await.start_disconnect().await {
tracing::warn!(
device_id = ?device_id,
error = %e,
"Error during LRU eviction"
);
}
tracing::info!(
device_id = ?device_id,
idle_secs = max_idle.as_secs(),
"Evicted LRU connection"
);
return Some(device_id);
}
}
None
}
pub async fn evict_idle_connections(&self, idle_threshold: Duration) -> usize {
let mut connections = self.active_connections.write().await;
let mut to_evict = Vec::new();
for (device_id, conn) in connections.iter() {
let idle = conn.read().await.idle_duration().await;
if idle > idle_threshold {
to_evict.push(*device_id);
}
}
for device_id in &to_evict {
if let Some(conn) = connections.remove(device_id) {
let _ = conn.read().await.start_disconnect().await;
}
}
if !to_evict.is_empty() {
tracing::info!(
count = to_evict.len(),
threshold_secs = idle_threshold.as_secs(),
"Evicted idle connections"
);
}
to_evict.len()
}
pub async fn maintain_pool(&self) {
let mut connections = self.active_connections.write().await;
let mut to_remove = Vec::new();
for (device_id, conn) in connections.iter() {
let state = conn.read().await.state().await;
if state == BleConnectionState::Disconnected {
to_remove.push(*device_id);
}
}
for device_id in &to_remove {
connections.remove(device_id);
}
if !to_remove.is_empty() {
tracing::debug!(
removed = to_remove.len(),
remaining = connections.len(),
"Pool maintenance: removed disconnected connections"
);
}
}
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
pub async fn connect_with_eviction(
&self,
device_id: [u8; 6],
) -> Result<Arc<RwLock<BleConnection>>, TransportError> {
let current = self.active_connection_count().await;
if current >= self.config.max_connections {
if self.evict_lru_connection().await.is_none() {
return Err(TransportError::Other {
message: "Failed to evict connection to make room".to_string(),
});
}
}
self.connect_to_device(device_id).await
}
#[cfg(test)]
pub async fn connect_with_eviction_simulated(
&self,
device_id: [u8; 6],
) -> Result<Arc<RwLock<BleConnection>>, TransportError> {
let current = self.active_connection_count().await;
if current >= self.config.max_connections {
if self.evict_lru_connection().await.is_none() {
return Err(TransportError::Other {
message: "Failed to evict connection to make room".to_string(),
});
}
}
self.connect_to_device_simulated(device_id).await
}
#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
pub async fn connect_with_eviction(
&self,
_device_id: [u8; 6],
) -> Result<Arc<RwLock<BleConnection>>, TransportError> {
Err(TransportError::Other {
message: "BLE connections are not supported on this platform".to_string(),
})
}
}
#[derive(Debug, Clone, Default)]
pub struct ConnectionPoolStats {
pub max_connections: usize,
pub active: usize,
pub connecting: usize,
pub disconnecting: usize,
pub total: usize,
pub oldest_idle: Option<Duration>,
}
impl ConnectionPoolStats {
pub fn has_capacity(&self) -> bool {
self.total < self.max_connections
}
pub fn remaining_capacity(&self) -> usize {
self.max_connections.saturating_sub(self.total)
}
}
#[async_trait]
impl TransportProvider for BleTransport {
fn name(&self) -> &str {
"BLE"
}
fn transport_type(&self) -> TransportType {
TransportType::Ble
}
fn capabilities(&self) -> &TransportCapabilities {
&self.capabilities
}
fn local_addr(&self) -> Option<TransportAddr> {
Some(TransportAddr::ble(
self.local_device_id,
Some(self.config.service_uuid),
))
}
async fn send(&self, data: &[u8], dest: &TransportAddr) -> Result<(), TransportError> {
if !self.online.load(Ordering::SeqCst) {
return Err(TransportError::Offline);
}
let (device_id, _service_uuid) = match dest {
TransportAddr::Ble {
device_id,
service_uuid,
} => (*device_id, service_uuid.unwrap_or(self.config.service_uuid)),
_ => {
return Err(TransportError::AddressMismatch {
expected: TransportType::Ble,
actual: dest.transport_type(),
});
}
};
let max_size = 255 * self.fragmenter.payload_size();
if data.len() > max_size {
return Err(TransportError::MessageTooLarge {
size: data.len(),
mtu: max_size,
});
}
let (is_connected, has_tx_char) = {
let connections = self.active_connections.read().await;
let conn = connections.get(&device_id).ok_or_else(|| {
self.stats.send_errors.fetch_add(1, Ordering::Relaxed);
TransportError::Other {
message: format!("No connection to device: {:02x?}", device_id),
}
})?;
let conn_guard = conn.read().await;
let is_connected = conn_guard.is_connected().await;
let has_tx_char = conn_guard.tx_characteristic().is_some();
if is_connected {
conn_guard.touch().await;
}
(is_connected, has_tx_char)
};
if !is_connected {
self.stats.send_errors.fetch_add(1, Ordering::Relaxed);
return Err(TransportError::Other {
message: format!("Connection to device {:02x?} is not active", device_id),
});
}
if !has_tx_char {
self.stats.send_errors.fetch_add(1, Ordering::Relaxed);
return Err(TransportError::Other {
message: format!(
"TX characteristic not available for device: {:02x?}",
device_id
),
});
}
let msg_id = self.next_msg_id.fetch_add(1, Ordering::Relaxed);
let fragments = self.fragmenter.fragment(data, msg_id);
let fragment_count = fragments.len();
#[cfg(feature = "ble")]
{
use btleplug::api::Peripheral as _;
let connections = self.active_connections.read().await;
let conn = connections.get(&device_id).ok_or_else(|| {
self.stats.send_errors.fetch_add(1, Ordering::Relaxed);
TransportError::Other {
message: format!("No connection to device: {:02x?}", device_id),
}
})?;
let conn_guard = conn.read().await;
if conn_guard.peripheral().is_none() {
#[cfg(test)]
{
tracing::debug!(
device_id = ?device_id,
data_len = data.len(),
fragments = fragment_count,
"BLE fragmented write (simulated connection)"
);
}
#[cfg(not(test))]
{
self.stats.send_errors.fetch_add(1, Ordering::Relaxed);
return Err(TransportError::Other {
message: "Peripheral not available".to_string(),
});
}
} else {
let peripheral = match conn_guard.peripheral() {
Some(p) => p,
None => {
self.stats.send_errors.fetch_add(1, Ordering::Relaxed);
return Err(TransportError::Other {
message: "Peripheral not available".to_string(),
});
}
};
let tx_char = conn_guard.btleplug_tx_char().ok_or_else(|| {
self.stats.send_errors.fetch_add(1, Ordering::Relaxed);
TransportError::Other {
message: "TX characteristic not available".to_string(),
}
})?;
for (i, fragment) in fragments.iter().enumerate() {
peripheral
.write(tx_char, fragment, WriteType::WithoutResponse)
.await
.map_err(|e| {
self.stats.send_errors.fetch_add(1, Ordering::Relaxed);
TransportError::Other {
message: format!(
"Failed to write fragment {}/{} to TX characteristic: {e}",
i + 1,
fragment_count
),
}
})?;
}
tracing::debug!(
device_id = ?device_id,
data_len = data.len(),
fragments = fragment_count,
platform = %Self::platform_name(),
"BLE fragmented write complete"
);
}
}
#[cfg(not(feature = "ble"))]
{
let _ = &fragments; tracing::debug!(
device_id = ?device_id,
data_len = data.len(),
fragments = fragment_count,
platform = %Self::platform_name(),
"BLE fragmented write (simulated - no BLE feature)"
);
}
self.stats.datagrams_sent.fetch_add(1, Ordering::Relaxed);
self.stats
.bytes_sent
.fetch_add(data.len() as u64, Ordering::Relaxed);
self.touch_session(&device_id).await;
Ok(())
}
fn inbound(&self) -> mpsc::Receiver<InboundDatagram> {
let maybe_rx = {
if let Ok(handle) = tokio::runtime::Handle::try_current() {
std::thread::scope(|s| {
s.spawn(|| handle.block_on(async { self.inbound_rx.write().await.take() }))
.join()
.ok()
.flatten()
})
} else {
None
}
};
maybe_rx.unwrap_or_else(|| {
let (_, rx) = mpsc::channel(256);
rx
})
}
fn is_online(&self) -> bool {
self.online.load(Ordering::SeqCst)
}
async fn shutdown(&self) -> Result<(), TransportError> {
self.online.store(false, Ordering::SeqCst);
let _ = self.shutdown_tx.send(()).await;
Ok(())
}
async fn broadcast(&self, data: &[u8]) -> Result<(), TransportError> {
if !self.capabilities.broadcast {
return Err(TransportError::BroadcastNotSupported);
}
if data.len() > 31 {
return Err(TransportError::MessageTooLarge {
size: data.len(),
mtu: 31,
});
}
tracing::debug!(
data_len = data.len(),
platform = %Self::platform_name(),
"BLE broadcast (simulated)"
);
Ok(())
}
async fn link_quality(&self, peer: &TransportAddr) -> Option<LinkQuality> {
let _device_id = match peer {
TransportAddr::Ble { device_id, .. } => device_id,
_ => return None,
};
Some(LinkQuality {
rssi: Some(-60), snr: None, hop_count: Some(1), rtt: Some(Duration::from_millis(100)),
})
}
fn stats(&self) -> TransportStats {
TransportStats {
datagrams_sent: self.stats.datagrams_sent.load(Ordering::Relaxed),
datagrams_received: self.stats.datagrams_received.load(Ordering::Relaxed),
bytes_sent: self.stats.bytes_sent.load(Ordering::Relaxed),
bytes_received: self.stats.bytes_received.load(Ordering::Relaxed),
send_errors: self.stats.send_errors.load(Ordering::Relaxed),
receive_errors: self.stats.receive_errors.load(Ordering::Relaxed),
current_rtt: Some(Duration::from_millis(100)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ble_capabilities() {
let caps = TransportCapabilities::ble();
assert!(!caps.supports_full_quic()); assert_eq!(caps.mtu, 244);
assert_eq!(caps.bandwidth_bps, 125_000);
assert!(caps.link_layer_acks);
assert!(caps.power_constrained);
assert!(caps.broadcast); }
#[test]
fn test_resume_token() {
let token = ResumeToken {
peer_id_hash: [0x01; 16],
session_hash: [0x02; 16],
};
let bytes = token.to_bytes();
let restored = ResumeToken::from_bytes(&bytes);
assert_eq!(restored.peer_id_hash, token.peer_id_hash);
assert_eq!(restored.session_hash, token.session_hash);
}
#[test]
fn test_ble_config_default() {
let config = BleConfig::default();
assert_eq!(config.service_uuid, ANT_QUIC_SERVICE_UUID);
assert_eq!(
config.session_cache_duration,
Duration::from_secs(24 * 60 * 60)
);
assert_eq!(config.max_connections, 5);
}
#[test]
fn test_handshake_estimate() {
let caps = TransportCapabilities::ble();
let handshake_bytes = 8800; let time = caps.estimate_transmission_time(handshake_bytes);
assert!(time >= Duration::from_millis(500));
assert!(time <= Duration::from_secs(3));
}
#[test]
fn test_platform_name() {
let name = BleTransport::platform_name();
#[cfg(target_os = "linux")]
assert_eq!(name, "Linux (BlueZ)");
#[cfg(target_os = "macos")]
assert_eq!(name, "macOS (Core Bluetooth)");
#[cfg(target_os = "windows")]
assert_eq!(name, "Windows (WinRT)");
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_ble_transport_creation() {
let result = BleTransport::new().await;
match result {
Ok(transport) => {
assert!(transport.is_online());
assert_eq!(transport.transport_type(), TransportType::Ble);
println!("BLE transport created on {}", BleTransport::platform_name());
}
Err(e) => {
println!("BLE transport error (expected without hardware): {e}");
assert!(!format!("{e}").is_empty());
}
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_session_caching() {
if let Ok(transport) = BleTransport::new().await {
let device_id = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66];
let session_key = [0xAA; 32];
assert!(!transport.has_cached_session(&device_id).await);
transport.cache_session(device_id, session_key, 1234).await;
assert!(transport.has_cached_session(&device_id).await);
let token = transport.lookup_session(&device_id).await;
assert!(token.is_some());
let (hits, misses) = transport.cache_stats();
assert_eq!(hits, 2); assert_eq!(misses, 1); }
}
#[test]
fn test_gatt_service_uuid() {
assert_eq!(ANT_QUIC_SERVICE_UUID[0], 0xa0);
assert_eq!(ANT_QUIC_SERVICE_UUID[15], 0x01);
assert_eq!(ANT_QUIC_SERVICE_UUID.len(), 16);
}
#[test]
fn test_gatt_tx_characteristic_uuid() {
assert_eq!(TX_CHARACTERISTIC_UUID[0], 0xa0);
assert_eq!(TX_CHARACTERISTIC_UUID[15], 0x02);
assert_eq!(TX_CHARACTERISTIC_UUID.len(), 16);
assert_eq!(&TX_CHARACTERISTIC_UUID[..15], &ANT_QUIC_SERVICE_UUID[..15]);
}
#[test]
fn test_gatt_rx_characteristic_uuid() {
assert_eq!(RX_CHARACTERISTIC_UUID[0], 0xa0);
assert_eq!(RX_CHARACTERISTIC_UUID[15], 0x03);
assert_eq!(RX_CHARACTERISTIC_UUID.len(), 16);
assert_eq!(&RX_CHARACTERISTIC_UUID[..15], &ANT_QUIC_SERVICE_UUID[..15]);
}
#[test]
fn test_cccd_uuid() {
assert_eq!(CCCD_UUID[2], 0x29);
assert_eq!(CCCD_UUID[3], 0x02);
assert_eq!(CCCD_UUID.len(), 16);
}
#[test]
fn test_cccd_values() {
assert_eq!(CCCD_ENABLE_NOTIFICATION, [0x01, 0x00]);
assert_eq!(CCCD_ENABLE_INDICATION, [0x02, 0x00]);
assert_eq!(CCCD_DISABLE, [0x00, 0x00]);
}
#[test]
fn test_characteristic_uuids_unique() {
assert_ne!(ANT_QUIC_SERVICE_UUID, TX_CHARACTERISTIC_UUID);
assert_ne!(ANT_QUIC_SERVICE_UUID, RX_CHARACTERISTIC_UUID);
assert_ne!(TX_CHARACTERISTIC_UUID, RX_CHARACTERISTIC_UUID);
assert_ne!(ANT_QUIC_SERVICE_UUID, CCCD_UUID);
}
#[test]
fn test_ble_connection_state_default() {
let state = BleConnectionState::default();
assert_eq!(state, BleConnectionState::Discovered);
}
#[test]
fn test_ble_connection_state_display() {
assert_eq!(format!("{}", BleConnectionState::Discovered), "discovered");
assert_eq!(format!("{}", BleConnectionState::Connecting), "connecting");
assert_eq!(format!("{}", BleConnectionState::Connected), "connected");
assert_eq!(
format!("{}", BleConnectionState::Disconnecting),
"disconnecting"
);
assert_eq!(
format!("{}", BleConnectionState::Disconnected),
"disconnected"
);
}
#[test]
fn test_characteristic_handle_tx() {
let tx = CharacteristicHandle::tx();
assert_eq!(tx.uuid, TX_CHARACTERISTIC_UUID);
assert!(tx.write_without_response);
assert!(!tx.notify);
assert!(!tx.indicate);
}
#[test]
fn test_characteristic_handle_rx() {
let rx = CharacteristicHandle::rx();
assert_eq!(rx.uuid, RX_CHARACTERISTIC_UUID);
assert!(!rx.write_without_response);
assert!(rx.notify);
assert!(!rx.indicate);
}
#[tokio::test]
async fn test_ble_connection_lifecycle() {
let device_id = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66];
let mut conn = BleConnection::new(device_id);
assert_eq!(conn.state().await, BleConnectionState::Discovered);
assert_eq!(conn.device_id(), device_id);
assert!(!conn.is_connected().await);
assert!(conn.connection_duration().is_none());
conn.start_connecting().await.unwrap();
assert_eq!(conn.state().await, BleConnectionState::Connecting);
let tx = CharacteristicHandle::tx();
let rx = CharacteristicHandle::rx();
conn.mark_connected(tx, rx).await;
assert_eq!(conn.state().await, BleConnectionState::Connected);
assert!(conn.is_connected().await);
assert!(conn.connection_duration().is_some());
assert!(conn.tx_characteristic().is_some());
assert!(conn.rx_characteristic().is_some());
tokio::time::sleep(Duration::from_millis(10)).await;
let idle_before = conn.idle_duration().await;
conn.touch().await;
let idle_after = conn.idle_duration().await;
assert!(idle_after < idle_before);
conn.start_disconnect().await.unwrap();
assert_eq!(conn.state().await, BleConnectionState::Disconnecting);
conn.mark_disconnected().await;
assert_eq!(conn.state().await, BleConnectionState::Disconnected);
assert!(!conn.is_connected().await);
}
#[tokio::test]
async fn test_ble_connection_invalid_transitions() {
let device_id = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66];
let conn = BleConnection::new(device_id);
let result = conn.start_disconnect().await;
assert!(result.is_err());
conn.start_connecting().await.unwrap();
let result = conn.start_connecting().await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_ble_connection_reconnect() {
let device_id = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66];
let mut conn = BleConnection::new(device_id);
conn.start_connecting().await.unwrap();
conn.mark_connected(CharacteristicHandle::tx(), CharacteristicHandle::rx())
.await;
conn.start_disconnect().await.unwrap();
conn.mark_disconnected().await;
conn.start_connecting().await.unwrap();
assert_eq!(conn.state().await, BleConnectionState::Connecting);
}
#[test]
fn test_ble_connection_debug() {
let device_id = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66];
let conn = BleConnection::new(device_id);
let debug_str = format!("{:?}", conn);
assert!(debug_str.contains("BleConnection"));
assert!(debug_str.contains("device_id"));
}
#[test]
fn test_discovered_device_new() {
let device_id = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66];
let device = DiscoveredDevice::new(device_id);
assert_eq!(device.device_id, device_id);
assert!(device.local_name.is_none());
assert!(device.rssi.is_none());
assert!(!device.has_service);
assert!(device.is_recent(Duration::from_secs(1)));
}
#[test]
fn test_discovered_device_update_last_seen() {
let device_id = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66];
let mut device = DiscoveredDevice::new(device_id);
let initial_seen = device.last_seen;
std::thread::sleep(Duration::from_millis(10));
device.update_last_seen();
assert!(device.last_seen > initial_seen);
}
#[test]
fn test_discovered_device_age() {
let device_id = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66];
let device = DiscoveredDevice::new(device_id);
std::thread::sleep(Duration::from_millis(50));
let age = device.age();
assert!(age >= Duration::from_millis(50));
}
#[test]
fn test_scan_state_default() {
let state = ScanState::default();
assert_eq!(state, ScanState::Idle);
}
#[test]
fn test_scan_state_display() {
assert_eq!(format!("{}", ScanState::Idle), "idle");
assert_eq!(format!("{}", ScanState::Scanning), "scanning");
assert_eq!(format!("{}", ScanState::Stopping), "stopping");
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_ble_transport_scanning() {
if let Ok(transport) = BleTransport::new().await {
assert!(!transport.is_scanning().await);
assert_eq!(transport.scan_state().await, ScanState::Idle);
if transport.start_scanning().await.is_ok() {
assert!(transport.is_scanning().await);
transport.stop_scanning().await.unwrap();
assert!(!transport.is_scanning().await);
}
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_ble_transport_discovered_devices() {
if let Ok(transport) = BleTransport::new().await {
assert_eq!(transport.discovered_device_count().await, 0);
let mut device = DiscoveredDevice::new([0x11, 0x22, 0x33, 0x44, 0x55, 0x66]);
device.local_name = Some("TestDevice".to_string());
device.rssi = Some(-60);
device.has_service = true;
let is_new = transport.add_discovered_device(device.clone()).await;
assert!(is_new);
assert_eq!(transport.discovered_device_count().await, 1);
let retrieved = transport
.get_discovered_device(&[0x11, 0x22, 0x33, 0x44, 0x55, 0x66])
.await;
assert!(retrieved.is_some());
let retrieved = retrieved.unwrap();
assert_eq!(retrieved.local_name, Some("TestDevice".to_string()));
let is_new = transport.add_discovered_device(device).await;
assert!(!is_new);
assert_eq!(transport.discovered_device_count().await, 1);
let all_devices = transport.discovered_devices().await;
assert_eq!(all_devices.len(), 1);
transport.clear_discovered_devices().await;
assert_eq!(transport.discovered_device_count().await, 0);
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_ble_transport_prune_stale_devices() {
if let Ok(transport) = BleTransport::new().await {
let mut old_device = DiscoveredDevice::new([0x11, 0x22, 0x33, 0x44, 0x55, 0x66]);
old_device.has_service = true;
transport.add_discovered_device(old_device).await;
tokio::time::sleep(Duration::from_millis(60)).await;
let recent_device = DiscoveredDevice::new([0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF]);
transport.add_discovered_device(recent_device).await;
assert_eq!(transport.discovered_device_count().await, 2);
let pruned = transport
.prune_stale_devices(Duration::from_millis(50))
.await;
assert_eq!(pruned, 1);
assert_eq!(transport.discovered_device_count().await, 1);
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_ble_transport_connect_disconnect() {
if let Ok(transport) = BleTransport::new().await {
let device_id = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66];
let device = DiscoveredDevice::new(device_id);
transport.add_discovered_device(device).await;
assert_eq!(transport.active_connection_count().await, 0);
assert!(!transport.is_connected_to(&device_id).await);
let conn = transport
.connect_to_device_simulated(device_id)
.await
.unwrap();
assert!(conn.read().await.is_connected().await);
assert_eq!(transport.active_connection_count().await, 1);
assert!(transport.is_connected_to(&device_id).await);
let retrieved = transport.get_connection(&device_id).await;
assert!(retrieved.is_some());
let connected = transport.connected_devices().await;
assert_eq!(connected.len(), 1);
assert_eq!(connected[0], device_id);
transport.disconnect_from_device(&device_id).await.unwrap();
assert_eq!(transport.active_connection_count().await, 0);
assert!(!transport.is_connected_to(&device_id).await);
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_ble_transport_connect_errors() {
if let Ok(transport) = BleTransport::new().await {
let device_id = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66];
let result = transport.connect_to_device_simulated(device_id).await;
assert!(result.is_err());
let device = DiscoveredDevice::new(device_id);
transport.add_discovered_device(device).await;
transport
.connect_to_device_simulated(device_id)
.await
.unwrap();
let result = transport.connect_to_device_simulated(device_id).await;
assert!(result.is_err());
let other_device = [0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF];
let result = transport.disconnect_from_device(&other_device).await;
assert!(result.is_err());
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_ble_transport_connection_limit() {
let config = BleConfig {
max_connections: 2,
..Default::default()
};
if let Ok(transport) = BleTransport::with_config(config).await {
for i in 0..3u8 {
let device = DiscoveredDevice::new([i, i, i, i, i, i]);
transport.add_discovered_device(device).await;
}
transport
.connect_to_device_simulated([0, 0, 0, 0, 0, 0])
.await
.unwrap();
transport
.connect_to_device_simulated([1, 1, 1, 1, 1, 1])
.await
.unwrap();
let result = transport
.connect_to_device_simulated([2, 2, 2, 2, 2, 2])
.await;
assert!(result.is_err());
assert!(format!("{:?}", result).contains("limit"));
transport
.disconnect_from_device(&[0, 0, 0, 0, 0, 0])
.await
.unwrap();
transport
.connect_to_device_simulated([2, 2, 2, 2, 2, 2])
.await
.unwrap();
assert_eq!(transport.active_connection_count().await, 2);
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_ble_transport_disconnect_all() {
if let Ok(transport) = BleTransport::new().await {
for i in 0..3u8 {
let device = DiscoveredDevice::new([i, i, i, i, i, i]);
transport.add_discovered_device(device).await;
transport
.connect_to_device_simulated([i, i, i, i, i, i])
.await
.unwrap();
}
assert_eq!(transport.active_connection_count().await, 3);
let count = transport.disconnect_all().await;
assert_eq!(count, 3);
assert_eq!(transport.active_connection_count().await, 0);
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_ble_transport_send_requires_connection() {
if let Ok(transport) = BleTransport::new().await {
let device_id = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66];
let dest = TransportAddr::ble(device_id, None);
let data = b"Hello BLE";
let result = transport.send(data, &dest).await;
assert!(result.is_err());
assert!(format!("{:?}", result).contains("No connection"));
let device = DiscoveredDevice::new(device_id);
transport.add_discovered_device(device).await;
transport
.connect_to_device_simulated(device_id)
.await
.unwrap();
let result = transport.send(data, &dest).await;
assert!(result.is_ok());
let stats = transport.stats();
assert_eq!(stats.datagrams_sent, 1);
assert_eq!(stats.bytes_sent, data.len() as u64);
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_ble_transport_send_size_check() {
if let Ok(transport) = BleTransport::new().await {
let device_id = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66];
let dest = TransportAddr::ble(device_id, None);
let device = DiscoveredDevice::new(device_id);
transport.add_discovered_device(device).await;
transport
.connect_to_device_simulated(device_id)
.await
.unwrap();
let small_data = vec![0u8; 100];
let result = transport.send(&small_data, &dest).await;
assert!(result.is_ok());
let large_data = vec![0u8; 500];
let result = transport.send(&large_data, &dest).await;
assert!(result.is_ok());
let max_size = 255 * DEFAULT_FRAGMENT_PAYLOAD_SIZE;
let too_large_data = vec![0u8; max_size + 1];
let result = transport.send(&too_large_data, &dest).await;
assert!(result.is_err());
assert!(format!("{:?}", result).contains("MessageTooLarge"));
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_ble_transport_send_address_mismatch() {
if let Ok(transport) = BleTransport::new().await {
let device_id = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66];
let device = DiscoveredDevice::new(device_id);
transport.add_discovered_device(device).await;
transport
.connect_to_device_simulated(device_id)
.await
.unwrap();
let udp_addr = TransportAddr::Udp("192.168.1.1:9000".parse().unwrap());
let result = transport.send(b"test", &udp_addr).await;
assert!(result.is_err());
assert!(format!("{:?}", result).contains("AddressMismatch"));
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_ble_transport_send_offline() {
if let Ok(transport) = BleTransport::new().await {
let device_id = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66];
let dest = TransportAddr::ble(device_id, None);
transport.shutdown().await.unwrap();
let result = transport.send(b"test", &dest).await;
assert!(result.is_err());
assert!(format!("{:?}", result).contains("Offline"));
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_ble_transport_process_notification() {
if let Ok(transport) = BleTransport::new().await {
let device_id = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66];
let device = DiscoveredDevice::new(device_id);
transport.add_discovered_device(device).await;
transport
.connect_to_device_simulated(device_id)
.await
.unwrap();
let mut rx = transport.take_inbound_receiver().await.unwrap();
let payload = b"Hello from peripheral".to_vec();
let mut fragment = FragmentHeader::single(0).to_bytes().to_vec();
fragment.extend_from_slice(&payload);
transport
.process_notification(device_id, fragment)
.await
.unwrap();
let stats = transport.stats();
assert_eq!(stats.datagrams_received, 1);
let received = rx.try_recv().unwrap();
assert_eq!(received.data, payload);
assert!(matches!(received.source, TransportAddr::Ble { .. }));
assert!(received.link_quality.is_some());
assert!(transport.take_inbound_receiver().await.is_none());
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_ble_transport_process_notification_unknown_device() {
if let Ok(transport) = BleTransport::new().await {
let device_id = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66];
let result = transport
.process_notification(device_id, b"test".to_vec())
.await;
assert!(result.is_err());
assert!(format!("{:?}", result).contains("unknown device"));
let stats = transport.stats();
assert_eq!(stats.receive_errors, 1);
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_ble_transport_multiple_notifications() {
if let Ok(transport) = BleTransport::new().await {
let device_id = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66];
let device = DiscoveredDevice::new(device_id);
transport.add_discovered_device(device).await;
transport
.connect_to_device_simulated(device_id)
.await
.unwrap();
let mut rx = transport.take_inbound_receiver().await.unwrap();
for i in 0..5u8 {
let payload = format!("Message {}", i).into_bytes();
let mut fragment = FragmentHeader::single(i).to_bytes().to_vec();
fragment.extend_from_slice(&payload);
transport
.process_notification(device_id, fragment)
.await
.unwrap();
}
let stats = transport.stats();
assert_eq!(stats.datagrams_received, 5);
let mut count = 0;
while rx.try_recv().is_ok() {
count += 1;
}
assert_eq!(count, 5);
}
}
#[test]
fn test_peripheral_mode_supported() {
let supported = BleTransport::is_peripheral_mode_supported();
#[cfg(target_os = "linux")]
assert!(supported);
#[cfg(not(target_os = "linux"))]
assert!(!supported);
}
#[tokio::test]
#[cfg(target_os = "linux")]
async fn test_ble_transport_advertising() {
if let Ok(transport) = BleTransport::new().await {
let result = transport.start_advertising().await;
assert!(result.is_ok());
let result = transport.stop_advertising().await;
assert!(result.is_ok());
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_connection_pool_stats() {
if let Ok(transport) = BleTransport::new().await {
let stats = transport.pool_stats().await;
assert_eq!(stats.active, 0);
assert_eq!(stats.total, 0);
assert!(stats.has_capacity());
assert_eq!(stats.remaining_capacity(), 5);
for i in 0..3u8 {
let device = DiscoveredDevice::new([i, i, i, i, i, i]);
transport.add_discovered_device(device).await;
transport
.connect_to_device_simulated([i, i, i, i, i, i])
.await
.unwrap();
}
let stats = transport.pool_stats().await;
assert_eq!(stats.active, 3);
assert_eq!(stats.total, 3);
assert!(stats.has_capacity());
assert_eq!(stats.remaining_capacity(), 2);
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_connection_pool_eviction() {
let config = BleConfig {
max_connections: 2,
..Default::default()
};
if let Ok(transport) = BleTransport::with_config(config).await {
for i in 0..3u8 {
let device = DiscoveredDevice::new([i, i, i, i, i, i]);
transport.add_discovered_device(device).await;
}
transport
.connect_to_device_simulated([0, 0, 0, 0, 0, 0])
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
transport
.connect_to_device_simulated([1, 1, 1, 1, 1, 1])
.await
.unwrap();
let stats = transport.pool_stats().await;
assert!(!stats.has_capacity());
let evicted = transport.evict_lru_connection().await;
assert!(evicted.is_some());
let stats = transport.pool_stats().await;
assert!(stats.has_capacity());
assert_eq!(stats.total, 1);
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_connect_with_eviction() {
let config = BleConfig {
max_connections: 2,
..Default::default()
};
if let Ok(transport) = BleTransport::with_config(config).await {
for i in 0..3u8 {
let device = DiscoveredDevice::new([i, i, i, i, i, i]);
transport.add_discovered_device(device).await;
}
transport
.connect_to_device_simulated([0, 0, 0, 0, 0, 0])
.await
.unwrap();
transport
.connect_to_device_simulated([1, 1, 1, 1, 1, 1])
.await
.unwrap();
assert_eq!(transport.active_connection_count().await, 2);
let result = transport
.connect_with_eviction_simulated([2, 2, 2, 2, 2, 2])
.await;
assert!(result.is_ok());
assert_eq!(transport.active_connection_count().await, 2);
assert!(transport.is_connected_to(&[2, 2, 2, 2, 2, 2]).await);
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_evict_idle_connections() {
if let Ok(transport) = BleTransport::new().await {
for i in 0..3u8 {
let device = DiscoveredDevice::new([i, i, i, i, i, i]);
transport.add_discovered_device(device).await;
transport
.connect_to_device_simulated([i, i, i, i, i, i])
.await
.unwrap();
}
if let Some(conn) = transport.get_connection(&[2, 2, 2, 2, 2, 2]).await {
conn.read().await.touch().await;
}
tokio::time::sleep(Duration::from_millis(50)).await;
let evicted = transport
.evict_idle_connections(Duration::from_millis(10))
.await;
assert!(evicted >= 2);
}
}
#[test]
fn test_connection_pool_stats_default() {
let stats = ConnectionPoolStats::default();
assert_eq!(stats.max_connections, 0);
assert_eq!(stats.active, 0);
assert_eq!(stats.total, 0);
assert!(!stats.has_capacity());
assert_eq!(stats.remaining_capacity(), 0);
}
#[test]
fn test_fragment_header_serialization() {
let header = FragmentHeader::new(5, fragment_flags::START, 10, 42);
let bytes = header.to_bytes();
assert_eq!(bytes, [5, fragment_flags::START, 10, 42]);
let restored = FragmentHeader::from_bytes(&bytes).unwrap();
assert_eq!(restored, header);
}
#[test]
fn test_fragment_header_single() {
let header = FragmentHeader::single(7);
assert_eq!(header.seq_num, 0);
assert_eq!(header.flags, fragment_flags::SINGLE);
assert_eq!(header.total, 1);
assert_eq!(header.msg_id, 7);
assert!(header.is_start());
assert!(header.is_end());
assert!(header.is_single());
}
#[test]
fn test_fragment_header_flags() {
let first = FragmentHeader::new(0, fragment_flags::START, 3, 0);
assert!(first.is_start());
assert!(!first.is_end());
assert!(!first.is_single());
let middle = FragmentHeader::new(1, 0, 3, 0);
assert!(!middle.is_start());
assert!(!middle.is_end());
assert!(!middle.is_single());
let last = FragmentHeader::new(2, fragment_flags::END, 3, 0);
assert!(!last.is_start());
assert!(last.is_end());
assert!(!last.is_single());
}
#[test]
fn test_fragment_header_from_bytes_too_short() {
assert!(FragmentHeader::from_bytes(&[]).is_none());
assert!(FragmentHeader::from_bytes(&[1, 2, 3]).is_none());
assert!(FragmentHeader::from_bytes(&[1, 2, 3, 4]).is_some());
}
#[test]
fn test_fragmenter_default() {
let fragmenter = BlePacketFragmenter::default_ble();
assert_eq!(fragmenter.mtu(), DEFAULT_BLE_MTU);
assert_eq!(
fragmenter.payload_size(),
DEFAULT_BLE_MTU - FRAGMENT_HEADER_SIZE
);
}
#[test]
fn test_fragmenter_custom_mtu() {
let fragmenter = BlePacketFragmenter::new(100);
assert_eq!(fragmenter.mtu(), 100);
assert_eq!(fragmenter.payload_size(), 96); }
#[test]
#[should_panic]
fn test_fragmenter_invalid_mtu() {
BlePacketFragmenter::new(4); }
#[test]
fn test_fragmenter_empty_data() {
let fragmenter = BlePacketFragmenter::default_ble();
let fragments = fragmenter.fragment(&[], 0);
assert_eq!(fragments.len(), 1);
assert_eq!(fragments[0].len(), FRAGMENT_HEADER_SIZE);
let header = FragmentHeader::from_bytes(&fragments[0]).unwrap();
assert!(header.is_single());
}
#[test]
fn test_fragmenter_single_fragment() {
let fragmenter = BlePacketFragmenter::default_ble();
let data = vec![0xAB; 100]; let fragments = fragmenter.fragment(&data, 42);
assert_eq!(fragments.len(), 1);
assert_eq!(fragments[0].len(), FRAGMENT_HEADER_SIZE + 100);
let header = FragmentHeader::from_bytes(&fragments[0]).unwrap();
assert!(header.is_single());
assert_eq!(header.msg_id, 42);
assert_eq!(&fragments[0][FRAGMENT_HEADER_SIZE..], &data[..]);
}
#[test]
fn test_fragmenter_exact_fit() {
let fragmenter = BlePacketFragmenter::default_ble();
let data = vec![0xCD; fragmenter.payload_size()];
let fragments = fragmenter.fragment(&data, 5);
assert_eq!(fragments.len(), 1);
assert!(
FragmentHeader::from_bytes(&fragments[0])
.unwrap()
.is_single()
);
}
#[test]
fn test_fragmenter_multiple_fragments() {
let fragmenter = BlePacketFragmenter::default_ble();
let payload_size = fragmenter.payload_size();
let data = vec![0xEF; payload_size * 2 + 50];
let fragments = fragmenter.fragment(&data, 10);
assert_eq!(fragments.len(), 3);
let h0 = FragmentHeader::from_bytes(&fragments[0]).unwrap();
assert!(h0.is_start());
assert!(!h0.is_end());
assert_eq!(h0.seq_num, 0);
assert_eq!(h0.total, 3);
assert_eq!(h0.msg_id, 10);
assert_eq!(fragments[0].len(), fragmenter.mtu());
let h1 = FragmentHeader::from_bytes(&fragments[1]).unwrap();
assert!(!h1.is_start());
assert!(!h1.is_end());
assert_eq!(h1.seq_num, 1);
assert_eq!(h1.total, 3);
let h2 = FragmentHeader::from_bytes(&fragments[2]).unwrap();
assert!(!h2.is_start());
assert!(h2.is_end());
assert_eq!(h2.seq_num, 2);
assert_eq!(fragments[2].len(), FRAGMENT_HEADER_SIZE + 50);
}
#[test]
fn test_fragmenter_needs_fragmentation() {
let fragmenter = BlePacketFragmenter::default_ble();
let payload_size = fragmenter.payload_size();
assert!(!fragmenter.needs_fragmentation(&[0; 100]));
assert!(!fragmenter.needs_fragmentation(&vec![0u8; payload_size]));
assert!(fragmenter.needs_fragmentation(&vec![0u8; payload_size + 1]));
}
#[test]
fn test_reassembly_buffer_single_fragment() {
let mut buffer = BleReassemblyBuffer::default();
let device_id = [1, 2, 3, 4, 5, 6];
let mut fragment = FragmentHeader::single(0).to_bytes().to_vec();
fragment.extend_from_slice(b"hello world");
let result = buffer.add_fragment(device_id, &fragment);
assert_eq!(result, Some(b"hello world".to_vec()));
assert_eq!(buffer.pending_count(), 0);
}
#[test]
fn test_reassembly_buffer_multi_fragment_in_order() {
let mut buffer = BleReassemblyBuffer::default();
let device_id = [1, 2, 3, 4, 5, 6];
let msg_id = 42;
let mut frag0 = FragmentHeader::new(0, fragment_flags::START, 3, msg_id)
.to_bytes()
.to_vec();
frag0.extend_from_slice(b"hello ");
let mut frag1 = FragmentHeader::new(1, 0, 3, msg_id).to_bytes().to_vec();
frag1.extend_from_slice(b"world ");
let mut frag2 = FragmentHeader::new(2, fragment_flags::END, 3, msg_id)
.to_bytes()
.to_vec();
frag2.extend_from_slice(b"!");
assert!(buffer.add_fragment(device_id, &frag0).is_none());
assert_eq!(buffer.pending_count(), 1);
assert!(buffer.add_fragment(device_id, &frag1).is_none());
assert_eq!(buffer.pending_count(), 1);
let result = buffer.add_fragment(device_id, &frag2);
assert_eq!(result, Some(b"hello world !".to_vec()));
assert_eq!(buffer.pending_count(), 0);
}
#[test]
fn test_reassembly_buffer_multi_fragment_out_of_order() {
let mut buffer = BleReassemblyBuffer::default();
let device_id = [1, 2, 3, 4, 5, 6];
let msg_id = 7;
let mut frag2 = FragmentHeader::new(2, fragment_flags::END, 3, msg_id)
.to_bytes()
.to_vec();
frag2.extend_from_slice(b"C");
let mut frag0 = FragmentHeader::new(0, fragment_flags::START, 3, msg_id)
.to_bytes()
.to_vec();
frag0.extend_from_slice(b"A");
let mut frag1 = FragmentHeader::new(1, 0, 3, msg_id).to_bytes().to_vec();
frag1.extend_from_slice(b"B");
assert!(buffer.add_fragment(device_id, &frag2).is_none());
assert!(buffer.add_fragment(device_id, &frag0).is_none());
let result = buffer.add_fragment(device_id, &frag1);
assert_eq!(result, Some(b"ABC".to_vec()));
}
#[test]
fn test_reassembly_buffer_duplicate_fragment() {
let mut buffer = BleReassemblyBuffer::default();
let device_id = [1, 2, 3, 4, 5, 6];
let msg_id = 99;
let mut frag0 = FragmentHeader::new(0, fragment_flags::START, 2, msg_id)
.to_bytes()
.to_vec();
frag0.extend_from_slice(b"data");
assert!(buffer.add_fragment(device_id, &frag0).is_none());
assert!(buffer.add_fragment(device_id, &frag0).is_none());
assert_eq!(buffer.pending_count(), 1);
}
#[test]
fn test_reassembly_buffer_multiple_devices() {
let mut buffer = BleReassemblyBuffer::default();
let device1 = [1, 1, 1, 1, 1, 1];
let device2 = [2, 2, 2, 2, 2, 2];
let mut frag1_0 = FragmentHeader::new(0, fragment_flags::START, 2, 0)
.to_bytes()
.to_vec();
frag1_0.extend_from_slice(b"D1-");
let mut frag2_0 = FragmentHeader::new(0, fragment_flags::START, 2, 0)
.to_bytes()
.to_vec();
frag2_0.extend_from_slice(b"D2-");
assert!(buffer.add_fragment(device1, &frag1_0).is_none());
assert!(buffer.add_fragment(device2, &frag2_0).is_none());
assert_eq!(buffer.pending_count(), 2);
let mut frag2_1 = FragmentHeader::new(1, fragment_flags::END, 2, 0)
.to_bytes()
.to_vec();
frag2_1.extend_from_slice(b"done");
let result = buffer.add_fragment(device2, &frag2_1);
assert_eq!(result, Some(b"D2-done".to_vec()));
assert_eq!(buffer.pending_count(), 1); }
#[test]
fn test_reassembly_buffer_prune_stale() {
let mut buffer = BleReassemblyBuffer::new(Duration::from_millis(10));
let device_id = [1, 2, 3, 4, 5, 6];
let mut frag0 = FragmentHeader::new(0, fragment_flags::START, 2, 0)
.to_bytes()
.to_vec();
frag0.extend_from_slice(b"incomplete");
buffer.add_fragment(device_id, &frag0);
assert_eq!(buffer.pending_count(), 1);
std::thread::sleep(Duration::from_millis(20));
let pruned = buffer.prune_stale();
assert_eq!(pruned, 1);
assert_eq!(buffer.pending_count(), 0);
}
#[test]
fn test_fragmenter_and_reassembly_roundtrip() {
let fragmenter = BlePacketFragmenter::default_ble();
let mut buffer = BleReassemblyBuffer::default();
let device_id = [0xAA; 6];
let original_data: Vec<u8> = (0..1000).map(|i| (i % 256) as u8).collect();
let fragments = fragmenter.fragment(&original_data, 123);
assert!(fragments.len() > 1);
let mut result = None;
for (i, frag) in fragments.iter().enumerate().rev() {
result = buffer.add_fragment(device_id, frag);
if i > 0 {
assert!(result.is_none());
}
}
assert_eq!(result.unwrap(), original_data);
}
#[test]
fn test_ble_config_session_caching_defaults() {
let config = BleConfig::default();
assert_eq!(
config.session_cache_duration,
Duration::from_secs(24 * 60 * 60)
);
assert_eq!(config.max_cached_sessions, 100);
assert_eq!(
config.session_cleanup_interval,
Some(Duration::from_secs(600))
);
assert!(config.session_persist_path.is_none());
}
#[test]
fn test_cached_session_expiry() {
let session = CachedSession {
device_id: [0x11, 0x22, 0x33, 0x44, 0x55, 0x66],
session_key: [0xAA; 32],
session_id: 1234,
established: Instant::now(),
last_active: Instant::now(),
};
assert!(!session.is_expired(Duration::from_secs(3600)));
assert!(session.is_expired(Duration::ZERO));
}
#[test]
fn test_persisted_session_from_cached() {
let cached = CachedSession {
device_id: [0x11, 0x22, 0x33, 0x44, 0x55, 0x66],
session_key: [0xAA; 32],
session_id: 1234,
established: Instant::now(),
last_active: Instant::now(),
};
let persisted = PersistedSession::from_cached(&cached);
assert_eq!(persisted.device_id, "112233445566");
assert_eq!(persisted.session_id, 1234);
let now_unix = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
assert!(persisted.established_unix <= now_unix);
assert!(persisted.established_unix >= now_unix.saturating_sub(10));
}
#[test]
fn test_session_cache_file_serialization() {
let mut file = SessionCacheFile::new();
file.sessions.push(PersistedSession {
device_id: "112233445566".to_string(),
session_key_hash: [0xBB; 32],
session_id: 5678,
established_unix: 1234567890,
});
file.sessions.push(PersistedSession {
device_id: "AABBCCDDEEFF".to_string(),
session_key_hash: [0xCC; 32],
session_id: 9012,
established_unix: 1234567891,
});
let bytes = file.to_bytes();
let restored = SessionCacheFile::from_bytes(&bytes).unwrap();
assert_eq!(restored.version, SessionCacheFile::CURRENT_VERSION);
assert_eq!(restored.sessions.len(), 2);
assert_eq!(restored.sessions[0].device_id, "112233445566");
assert_eq!(restored.sessions[0].session_id, 5678);
assert_eq!(restored.sessions[1].device_id, "AABBCCDDEEFF");
assert_eq!(restored.sessions[1].session_id, 9012);
}
#[test]
fn test_session_cache_file_empty() {
let file = SessionCacheFile::new();
let bytes = file.to_bytes();
let restored = SessionCacheFile::from_bytes(&bytes).unwrap();
assert_eq!(restored.sessions.len(), 0);
}
#[test]
fn test_session_cache_file_invalid() {
assert!(SessionCacheFile::from_bytes(&[]).is_none());
assert!(SessionCacheFile::from_bytes(&[1, 2, 3]).is_none());
let invalid_version = [0xFF, 0xFF, 0xFF, 0xFF, 0, 0, 0, 0];
assert!(SessionCacheFile::from_bytes(&invalid_version).is_none());
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_ble_transport_session_lookup_integration() {
if let Ok(transport) = BleTransport::new().await {
let device_id = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66];
let session_key = [0xAB; 32];
assert!(!transport.has_cached_session(&device_id).await);
let (hits, misses) = transport.cache_stats();
assert_eq!(hits, 0);
assert_eq!(misses, 1);
transport.cache_session(device_id, session_key, 1234).await;
assert!(transport.has_cached_session(&device_id).await);
let (hits, misses) = transport.cache_stats();
assert_eq!(hits, 1);
assert_eq!(misses, 1);
let token = transport.lookup_session(&device_id).await;
assert!(token.is_some());
let token = token.unwrap();
assert_eq!(&token.peer_id_hash[..6], &device_id);
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_ble_transport_session_touch() {
if let Ok(transport) = BleTransport::new().await {
let device_id = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66];
transport.cache_session(device_id, [0xAA; 32], 1234).await;
tokio::time::sleep(Duration::from_millis(10)).await;
transport.touch_session(&device_id).await;
assert!(transport.has_cached_session(&device_id).await);
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_ble_transport_prune_sessions() {
let config = BleConfig {
session_cache_duration: Duration::from_millis(50),
..Default::default()
};
if let Ok(transport) = BleTransport::with_config(config).await {
transport
.cache_session([0x11, 0x22, 0x33, 0x44, 0x55, 0x66], [0xAA; 32], 1234)
.await;
assert_eq!(transport.cached_session_count().await, 1);
tokio::time::sleep(Duration::from_millis(100)).await;
let pruned = transport.prune_expired_sessions().await;
assert_eq!(pruned, 1);
assert_eq!(transport.cached_session_count().await, 0);
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_ble_transport_prune_enforces_max_sessions() {
let config = BleConfig {
max_cached_sessions: 3,
session_cache_duration: Duration::from_secs(3600),
..Default::default()
};
if let Ok(transport) = BleTransport::with_config(config).await {
for i in 0..5u8 {
let device_id = [i, i, i, i, i, i];
transport.cache_session(device_id, [i; 32], i as u16).await;
tokio::time::sleep(Duration::from_millis(5)).await;
}
assert_eq!(transport.cached_session_count().await, 5);
let pruned = transport.prune_expired_sessions().await;
assert_eq!(pruned, 2);
assert_eq!(transport.cached_session_count().await, 3);
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_ble_transport_clear_session_cache() {
if let Ok(transport) = BleTransport::new().await {
for i in 0..3u8 {
let device_id = [i, i, i, i, i, i];
transport.cache_session(device_id, [i; 32], i as u16).await;
}
assert_eq!(transport.cached_session_count().await, 3);
transport.clear_session_cache().await;
assert_eq!(transport.cached_session_count().await, 0);
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_ble_transport_cache_connection_session() {
if let Ok(transport) = BleTransport::new().await {
let device_id = [0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF];
let session_key = [0x12; 32];
transport
.cache_connection_session(device_id, session_key)
.await;
assert!(transport.has_cached_session(&device_id).await);
assert_eq!(transport.cached_session_count().await, 1);
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_ble_transport_session_persistence_save_load() {
let temp_dir = std::env::temp_dir();
let persist_path = temp_dir.join("ant_quic_ble_session_test.cache");
let _ = std::fs::remove_file(&persist_path);
let config = BleConfig {
session_persist_path: Some(persist_path.clone()),
..Default::default()
};
if let Ok(transport) = BleTransport::with_config(config).await {
transport
.cache_session([0x11, 0x22, 0x33, 0x44, 0x55, 0x66], [0xAA; 32], 1234)
.await;
transport
.cache_session([0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF], [0xBB; 32], 5678)
.await;
transport.save_sessions_to_disk().await.unwrap();
assert!(persist_path.exists());
let count = transport.load_sessions_from_disk().await.unwrap();
assert_eq!(count, 2);
let _ = std::fs::remove_file(&persist_path);
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_ble_transport_session_persistence_no_path() {
if let Ok(transport) = BleTransport::new().await {
transport.save_sessions_to_disk().await.unwrap();
let count = transport.load_sessions_from_disk().await.unwrap();
assert_eq!(count, 0);
}
}
#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
async fn test_ble_connection_session_resumed_flag() {
let device_id = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66];
let conn = BleConnection::new(device_id);
assert!(!conn.session_resumed);
let conn_with_flag = BleConnection::new_with_resumption(device_id, true);
assert!(conn_with_flag.session_resumed);
}
}