use btleplug::api::{Central, Manager as _, Peripheral as _, ScanFilter, WriteType};
use btleplug::platform::{Adapter, Manager, Peripheral};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::sync::OnceCell;
use tokio::time;
use super::gatt_uuids;
const DEFAULT_SCAN_TIMEOUT: Duration = Duration::from_secs(15);
static SCAN_RUNNING: AtomicBool = AtomicBool::new(false);
static ADAPTER: OnceCell<Adapter> = OnceCell::const_new();
pub async fn get_adapter() -> &'static Adapter {
ADAPTER
.get_or_init(|| async {
let (tx, rx) = tokio::sync::oneshot::channel();
std::thread::Builder::new()
.name("btleplug-test-adapter".into())
.spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create adapter runtime");
rt.block_on(async {
let manager = Manager::new().await.expect("failed to create BLE manager");
let adapters = manager.adapters().await.expect("failed to get adapters");
std::mem::forget(manager);
let adapter = adapters.into_iter().next().expect("no BLE adapters found");
tx.send(adapter).ok();
std::future::pending::<()>().await;
});
})
.expect("failed to spawn adapter thread");
rx.await
.expect("failed to receive adapter from background thread")
})
.await
}
async fn ensure_clean_state(adapter: &Adapter) {
if let Ok(Ok(peripherals)) =
tokio::time::timeout(Duration::from_secs(2), adapter.peripherals()).await
{
for p in peripherals {
if let Ok(Ok(true)) =
tokio::time::timeout(Duration::from_secs(1), p.is_connected()).await
{
let _ = tokio::time::timeout(Duration::from_secs(5), p.disconnect()).await;
}
}
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
pub async fn find_and_connect() -> Peripheral {
let peripheral_name = std::env::var("BTLEPLUG_TEST_PERIPHERAL")
.unwrap_or_else(|_| gatt_uuids::TEST_PERIPHERAL_NAME.to_string());
let adapter = get_adapter().await;
ensure_clean_state(adapter).await;
if !SCAN_RUNNING.load(Ordering::Relaxed) {
adapter
.start_scan(ScanFilter::default())
.await
.expect("failed to start scan");
SCAN_RUNNING.store(true, Ordering::Relaxed);
}
let peripheral = tokio::time::timeout(DEFAULT_SCAN_TIMEOUT, async {
let start = tokio::time::Instant::now();
let mut scan_restarted = false;
loop {
let peripherals = adapter
.peripherals()
.await
.expect("failed to list peripherals");
for p in peripherals {
if let Ok(Some(props)) = p.properties().await {
if props.local_name.as_deref() == Some(&peripheral_name) {
return p;
}
}
}
if !scan_restarted && start.elapsed() >= Duration::from_secs(5) {
let _ = adapter.start_scan(ScanFilter::default()).await;
SCAN_RUNNING.store(true, Ordering::Relaxed);
scan_restarted = true;
}
time::sleep(Duration::from_millis(200)).await;
}
})
.await
.unwrap_or_else(|_| {
panic!(
"timed out after {:?} waiting for peripheral '{}'",
DEFAULT_SCAN_TIMEOUT, peripheral_name
)
});
peripheral
.connect_with_timeout(Duration::from_secs(10))
.await
.expect("failed to connect to test peripheral");
peripheral
.discover_services_with_timeout(Duration::from_secs(10))
.await
.expect("failed to discover services");
peripheral
}
pub async fn send_control_command(peripheral: &Peripheral, opcode: u8) {
let chars = peripheral.characteristics();
let control_point = chars
.iter()
.find(|c| c.uuid == gatt_uuids::CONTROL_POINT)
.expect("Control Point characteristic not found");
peripheral
.write(control_point, &[opcode], WriteType::WithResponse)
.await
.expect("failed to write control command");
}
pub async fn reset_peripheral(peripheral: &Peripheral) {
send_control_command(peripheral, gatt_uuids::CMD_RESET_STATE).await;
time::sleep(Duration::from_millis(100)).await;
}
pub fn find_characteristic(
peripheral: &Peripheral,
uuid: uuid::Uuid,
) -> btleplug::api::Characteristic {
peripheral
.characteristics()
.into_iter()
.find(|c| c.uuid == uuid)
.unwrap_or_else(|| panic!("characteristic {} not found", uuid))
}