from __future__ import annotations
import argparse
import asyncio
import csv
import enum
import hashlib
import hmac
import io
import logging
import os
import struct
import sys
import time
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from threading import Lock
from typing import Any, Callable, Optional
import bleak
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
AWEAR_SERVICE_UUID = "FC740001-0291-41FC-A9B0-45951B5B01D7"
AWEAR_TX_CHARACTERISTIC_UUID = "FC740002-0291-41FC-A9B0-45951B5B01D7" AWEAR_RX_CHARACTERISTIC_UUID = "FC740003-0291-41FC-A9B0-45951B5B01D7"
NORDIC_DFU_SERVICE_UUID = "8EC90001-F315-4F60-9FB8-838830DAEA50"
NORDIC_DFU_CONTROL_CHARACTERISTIC_UUID = "8EC90001-F315-4F60-9FB8-838830DAEA50"
NORDIC_DFU_PACKET_CHARACTERISTIC_UUID = "8EC90002-F315-4F60-9FB8-838830DAEA50"
AWEAR_CONNECTED_PREFIX = "AWEAR_CONNECTED:"
LUCA_MAGIC = b"LUCA" CHALLENGE_REPLY_PREFIX = "CRPL:"
CHALLENGE_CONSTANT = 0xD4C3B2A1 CHALLENGE_SYMMETRIC_KEY = bytes.fromhex("4ee1fbef8798e94a17bb945898c1898f")
def compute_challenge_reply(challenge_hex: str) -> str:
challenge_val = int(challenge_hex, 16)
reversed_bytes = struct.pack(">I", challenge_val)
constant_bytes = struct.pack("<I", CHALLENGE_CONSTANT)
message = reversed_bytes + constant_bytes
mac = hmac.new(CHALLENGE_SYMMETRIC_KEY, message, hashlib.sha256).digest()
reply_hex = mac[:8].hex().upper()
return f"{CHALLENGE_REPLY_PREFIX}{reply_hex}"
DEFAULTS: dict[str, Any] = {
"AwearConnectAutomaticallyAfterStartup": False,
"AwearReconnectAutomaticallyAfterCrash": True,
"AwearReconnectAutomatically": True,
"AwearConnectTimeout": 30.0,
"AwearReconnectTimeout": 10.0,
"AwearMaxEEGValueCheck": 8388607, "AwearMinEEGValueCheck": -8388608,
"AwearDiscoveryMinSignal": -80,
"AwearDiscoveryShowSignal": True,
"AwearDiscoverySortBySignal": True,
"AwearLastConnectedDeviceName": "",
"AwearLastConnectedDevice": "",
"SaveSettingsToICloud": False,
"ShowLastConnectedDevice": True,
"WarningSoundWithErrors": True,
"SaveDataAsOpenBCI": True,
"StartAfterConnected": False,
"SaveBTData": False,
}
SEARCH_TIMEOUT = 10.0
STALE_DEVICE_TIMEOUT = 15.0
DEVICE_CONNECTION_TIMEOUT = 30.0
DFU_CONNECTION_TIMEOUT = 60.0
STREAMING_DATA_TIMER_INTERVAL = 1.0
CHECK_DATA_TIMER_INTERVAL = 5.0
FREQUENCY_COUNTER_TIMER_INTERVAL = 1.0
BOARD_LIST_UPDATE_INTERVAL = 1.0
LAST_WARNING_SOUND_TIMEOUT = 5.0
RECONNECTION_MAX_RETRIES = 10
log = logging.getLogger("awear.runner")
class ThreadSafeArray:
def __init__(self) -> None:
self._lock = Lock()
self._items: list[Any] = []
def append(self, item: Any) -> None:
with self._lock:
self._items.append(item)
def remove(self, item: Any) -> None:
with self._lock:
try:
self._items.remove(item)
except ValueError:
pass
def clear(self) -> None:
with self._lock:
self._items.clear()
def snapshot(self) -> list[Any]:
with self._lock:
return list(self._items)
def __len__(self) -> int:
with self._lock:
return len(self._items)
def __iter__(self):
return iter(self.snapshot())
class DeviceStatus(enum.Enum):
DISCONNECTED = "disconnected"
CONNECTING = "connecting"
CONNECTED = "connected"
READY = "ready"
STREAMING = "streaming"
RECONNECTING = "reconnecting"
DFU = "dfu"
class DataPacketType(enum.IntEnum):
EEG = 0x01
BATTERY = 0x02
SIGNAL = 0x03
MISC = 0x04
@dataclass
class DiscoveredDevice:
name: str
address: str
rssi: int
ble_device: BLEDevice
advertisement: AdvertisementData
last_seen: float = field(default_factory=time.monotonic)
@dataclass
class PacketStats:
eeg_packet_count: int = 0
battery_packet_count: int = 0
signal_packet_count: int = 0
misc_packet_count: int = 0
lost_packets: int = 0
prev_lost_packets: int = 0
packet_counter: int = 0
prev_packet_counter: int = 0
frequency_counter: int = 0
last_frequency: float = 0.0
@dataclass
class DeviceInfo:
name: str = ""
peripheral_uuid: str = ""
firmware: str = ""
bootloader: str = ""
battery: int = -1
signal: int = -127
is_awear: bool = False
is_dk: bool = False
OPENBCI_HEADER = "AWEAR_OpenBCI_Header"
class FileWriter:
def __init__(self, output_dir: Path, openbci_format: bool = True) -> None:
self.output_dir = output_dir
self.openbci_format = openbci_format
self._file: Optional[io.BufferedWriter | io.TextIOWrapper] = None
self._csv_writer: Optional[csv.writer] = None
self._filename: str = ""
self._lock = Lock()
self._sample_index: int = 0
output_dir.mkdir(parents=True, exist_ok=True)
def open(self) -> str:
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
if self.openbci_format:
self._filename = f"AWEAR_{ts}.csv"
path = self.output_dir / self._filename
self._file = open(path, "w", newline="")
self._csv_writer = csv.writer(self._file)
self._write_openbci_header()
else:
self._filename = f"AWEAR_{ts}.bin"
path = self.output_dir / self._filename
self._file = open(path, "wb")
self._sample_index = 0
log.info("Opened data file: %s", path)
return str(path)
def close(self) -> None:
with self._lock:
if self._file:
self._file.close()
self._file = None
self._csv_writer = None
def write_eeg(self, channels: list[int], timestamp: Optional[float] = None) -> None:
with self._lock:
if self._file is None:
return
if self.openbci_format:
assert self._csv_writer is not None
row = [self._sample_index] + channels
if timestamp is not None:
row.append(f"{timestamp:.6f}")
self._csv_writer.writerow(row)
else:
buf = struct.pack("<I", self._sample_index & 0xFFFFFFFF)
for ch in channels:
b = ch.to_bytes(3, byteorder="big", signed=True)
buf += b
if timestamp is not None:
buf += struct.pack("<d", timestamp)
self._file.write(buf)
self._sample_index += 1
def write_raw(self, data: bytes, timestamp: Optional[float] = None) -> None:
with self._lock:
if self._file is None:
return
if self.openbci_format:
assert self._csv_writer is not None
hex_str = data.hex()
row: list[Any] = [self._sample_index, hex_str]
if timestamp is not None:
row.append(f"{timestamp:.6f}")
self._csv_writer.writerow(row)
else:
if timestamp is not None:
self._file.write(struct.pack("<d", timestamp))
self._file.write(data)
self._sample_index += 1
def flush(self) -> None:
with self._lock:
if self._file:
self._file.flush()
def _write_openbci_header(self) -> None:
assert self._csv_writer is not None
self._csv_writer.writerow(["%%AWEAR Raw EXG Data"])
self._csv_writer.writerow(["%%Number of channels = 1"])
self._csv_writer.writerow(["%%Sample Rate = 256 Hz"])
self._csv_writer.writerow(["Sample Index", "EXG Channel 0", "Timestamp"])
class DFUStatus(enum.Enum):
IDLE = "idle"
STARTING = "starting"
UPLOADING = "uploading"
VALIDATING = "validating"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class DFUProgress:
status: DFUStatus = DFUStatus.IDLE
bytes_sent: int = 0
total_bytes: int = 0
speed_bps: float = 0.0
avg_speed_bps: float = 0.0
@property
def percent(self) -> float:
if self.total_bytes == 0:
return 0.0
return (self.bytes_sent / self.total_bytes) * 100.0
class DFUController:
DFU_PACKET_SIZE = 20
def __init__(self, client: BleakClient) -> None:
self._client = client
self.progress = DFUProgress()
self._control_char: Optional[BleakGATTCharacteristic] = None
self._packet_char: Optional[BleakGATTCharacteristic] = None
async def discover(self) -> bool:
services = self._client.services
dfu_service = services.get_service(NORDIC_DFU_SERVICE_UUID)
if dfu_service is None:
log.warning("Nordic DFU service not found")
return False
for char in dfu_service.characteristics:
uuid_upper = char.uuid.upper()
if uuid_upper == NORDIC_DFU_CONTROL_CHARACTERISTIC_UUID.upper():
self._control_char = char
elif uuid_upper == NORDIC_DFU_PACKET_CHARACTERISTIC_UUID.upper():
self._packet_char = char
found = self._control_char is not None and self._packet_char is not None
if not found:
log.warning("DFU characteristics not fully discovered")
return found
async def update_firmware(
self,
firmware_path: str,
on_progress: Optional[Callable[[DFUProgress], None]] = None,
) -> bool:
if self._control_char is None or self._packet_char is None:
log.error("DFU not discovered; call discover() first")
self.progress.status = DFUStatus.FAILED
return False
firmware_data = Path(firmware_path).read_bytes()
self.progress = DFUProgress(
status=DFUStatus.STARTING,
total_bytes=len(firmware_data),
)
if on_progress:
on_progress(self.progress)
try:
await self._client.start_notify(
self._control_char, self._on_dfu_control_notify
)
await self._client.write_gatt_char(
self._control_char, bytearray([0x01, 0x04]), response=True
)
size_bytes = struct.pack("<I", len(firmware_data))
await self._client.write_gatt_char(
self._packet_char, bytearray(size_bytes), response=False
)
await asyncio.sleep(0.1)
await self._client.write_gatt_char(
self._control_char, bytearray([0x03]), response=True
)
self.progress.status = DFUStatus.UPLOADING
offset = 0
start_time = time.monotonic()
while offset < len(firmware_data):
chunk = firmware_data[offset : offset + self.DFU_PACKET_SIZE]
await self._client.write_gatt_char(
self._packet_char, bytearray(chunk), response=False
)
offset += len(chunk)
self.progress.bytes_sent = offset
elapsed = time.monotonic() - start_time
if elapsed > 0:
self.progress.speed_bps = offset / elapsed
self.progress.avg_speed_bps = offset / elapsed
if on_progress:
on_progress(self.progress)
self.progress.status = DFUStatus.VALIDATING
await self._client.write_gatt_char(
self._control_char, bytearray([0x04]), response=True
)
await asyncio.sleep(0.5)
await self._client.write_gatt_char(
self._control_char, bytearray([0x05]), response=True
)
self.progress.status = DFUStatus.COMPLETED
if on_progress:
on_progress(self.progress)
log.info("DFU completed successfully")
return True
except Exception as exc:
log.error("DFU failed: %s", exc)
self.progress.status = DFUStatus.FAILED
if on_progress:
on_progress(self.progress)
return False
finally:
try:
await self._client.stop_notify(self._control_char)
except Exception:
pass
def _on_dfu_control_notify(
self, _char: BleakGATTCharacteristic, data: bytearray
) -> None:
log.debug("DFU control notification: %s", data.hex())
class AwearControllerDelegate:
def on_status_update(self, status: str) -> None: ...
def on_status_set(self, status: DeviceStatus) -> None: ...
def on_data_update(self, key: str, value: Any) -> None: ...
def on_data_received(self, packet_type: DataPacketType, data: bytes) -> None: ...
def on_ready(self) -> None: ...
def on_central_state_changed(self, state: str) -> None: ...
def on_eeg_data(self, channels: list[int], counter: int) -> None: ...
def on_battery_data(self, level: int) -> None: ...
def on_signal_data(self, rssi: int) -> None: ...
def on_misc_data(self, data: bytes) -> None: ...
def on_disconnected(self) -> None: ...
def on_connected(self) -> None: ...
class AwearController:
def __init__(self, config: Optional[dict[str, Any]] = None) -> None:
self.config: dict[str, Any] = {**DEFAULTS, **(config or {})}
self.delegate: Optional[AwearControllerDelegate] = None
self.device_info = DeviceInfo()
self.status = DeviceStatus.DISCONNECTED
self.stats = PacketStats()
self.found_devices = ThreadSafeArray()
self._searching = False
self._client: Optional[BleakClient] = None
self._scanner: Optional[BleakScanner] = None
self._tx_char: Optional[BleakGATTCharacteristic] = None
self._rx_char: Optional[BleakGATTCharacteristic] = None
self._started = False
self._device_ready = False
self._command_output: deque[str] = deque(maxlen=256)
self._reconnection_counter = 0
self._reconnection_requested = False
self._resume_streaming_after_reconnect = False
self._ready_event: Optional[asyncio.Event] = None
self._file_writer: Optional[FileWriter] = None
self._dfu: Optional[DFUController] = None
self._timer_tasks: dict[str, asyncio.Task] = {}
self._freq_counter = 0
self._last_freq_time = time.monotonic()
self._current_frequency: float = 0.0
self._luca_expecting_data = False
self._luca_last_chunk_size = 0 self._luca_block_type = 0
self._luca_block_counter = 0
self._luca_eeg_buffer = bytearray()
self._haptic_engine_available = True
self._device_listing_active = False
self.benchmark_latencies: list[float] = []
self._bench_mode = False
@property
def device_name(self) -> str:
return self.device_info.name
@property
def device_connected(self) -> bool:
return self.status in (
DeviceStatus.CONNECTED,
DeviceStatus.READY,
DeviceStatus.STREAMING,
)
@property
def device_connecting(self) -> bool:
return self.status == DeviceStatus.CONNECTING
@property
def device_disconnected(self) -> bool:
return self.status == DeviceStatus.DISCONNECTED
@property
def device_ready(self) -> bool:
return self._device_ready
@property
def device_streaming(self) -> bool:
return self.status == DeviceStatus.STREAMING
@property
def device_reconnecting(self) -> bool:
return self.status == DeviceStatus.RECONNECTING
@property
def is_searching(self) -> bool:
return self._searching
@property
def is_started(self) -> bool:
return self._started
@property
def battery_level(self) -> int:
return self.device_info.battery
@property
def signal_level(self) -> int:
return self.device_info.signal
@property
def firmware_version(self) -> str:
return self.device_info.firmware
@property
def bootloader_version(self) -> str:
return self.device_info.bootloader
@property
def last_connected_device_name(self) -> str:
return self.config["AwearLastConnectedDeviceName"]
@last_connected_device_name.setter
def last_connected_device_name(self, value: str) -> None:
self.config["AwearLastConnectedDeviceName"] = value
@property
def save_bt_data(self) -> bool:
return self.config.get("SaveBTData", False)
@save_bt_data.setter
def save_bt_data(self, value: bool) -> None:
self.config["SaveBTData"] = value
async def search_devices(self, timeout: Optional[float] = None) -> list[DiscoveredDevice]:
if self._searching:
log.debug("Already searching")
return self.found_devices.snapshot()
timeout = timeout or SEARCH_TIMEOUT
self._searching = True
self.found_devices.clear()
log.info("Starting AWEAR device search (timeout=%.1fs)", timeout)
if self.delegate:
self.delegate.on_status_update("Searching for AWEAR devices...")
min_rssi = self.config["AwearDiscoveryMinSignal"]
def _detection_callback(device: BLEDevice, adv: AdvertisementData) -> None:
name = device.name or adv.local_name or ""
if not name:
return
rssi = adv.rssi if adv.rssi is not None else -127
if rssi < min_rssi:
return
is_awear = (
"AWEAR" in name.upper()
or AWEAR_SERVICE_UUID.lower() in [u.lower() for u in (adv.service_uuids or [])]
)
if not is_awear:
return
existing = [d for d in self.found_devices if d.address == device.address]
if existing:
existing[0].rssi = rssi
existing[0].last_seen = time.monotonic()
return
disc = DiscoveredDevice(
name=name,
address=device.address,
rssi=rssi,
ble_device=device,
advertisement=adv,
)
self.found_devices.append(disc)
log.info("Found AWEAR device: %s (%s) RSSI=%d", name, device.address, rssi)
self._scanner = BleakScanner(detection_callback=_detection_callback)
await self._scanner.start()
self._start_timer("stale_device", self._stale_device_cleanup, STALE_DEVICE_TIMEOUT)
await asyncio.sleep(timeout)
await self.stop_search()
devices = self.found_devices.snapshot()
if self.config["AwearDiscoverySortBySignal"]:
devices.sort(key=lambda d: d.rssi, reverse=True)
log.info("AWEAR boards found: %d", len(devices))
if self.delegate:
self.delegate.on_status_update(f"AWEAR devices found: {len(devices)}")
return devices
async def stop_search(self) -> None:
if self._scanner:
try:
await self._scanner.stop()
except Exception:
pass
self._scanner = None
self._searching = False
self._cancel_timer("stale_device")
log.info("Stopped AWEAR device search")
async def _stale_device_cleanup(self) -> None:
while self._searching:
await asyncio.sleep(STALE_DEVICE_TIMEOUT)
now = time.monotonic()
stale = [
d
for d in self.found_devices
if (now - d.last_seen) > STALE_DEVICE_TIMEOUT
]
for d in stale:
self.found_devices.remove(d)
log.debug("Removed stale device: %s", d.name)
async def connect(self, device: str | DiscoveredDevice | BLEDevice) -> bool:
if self.device_connected:
log.info("Already connected to AWEAR board: %s", self.device_info.name)
return True
ble_device: Optional[BLEDevice] = None
device_name = ""
if isinstance(device, DiscoveredDevice):
ble_device = device.ble_device
device_name = device.name
elif isinstance(device, BLEDevice):
ble_device = device
device_name = device.name or device.address
elif isinstance(device, str):
device_name = device
for d in self.found_devices:
if d.name == device or d.address == device:
ble_device = d.ble_device
break
if ble_device is None:
log.info("Device %s not in discovered list, scanning...", device)
await self.search_devices(timeout=5.0)
for d in self.found_devices:
if d.name == device or d.address == device:
ble_device = d.ble_device
break
if ble_device is None:
log.error("Could not find AWEAR device: %s", device)
return False
self._set_status(DeviceStatus.CONNECTING)
log.info("@@@@ Bluetooth.beginConnecting-1: %s", device_name)
connect_timeout = self.config["AwearConnectTimeout"]
self._client = BleakClient(
ble_device,
disconnected_callback=self._on_disconnected_callback,
)
try:
await asyncio.wait_for(
self._client.connect(), timeout=connect_timeout
)
except (asyncio.TimeoutError, Exception) as exc:
log.error("Connection failed: %s", exc)
self._set_status(DeviceStatus.DISCONNECTED)
self._client = None
return False
log.info("@@@@ Bluetooth.beginConnecting-2: connected to %s", device_name)
if not await self._discover_services():
await self.disconnect()
return False
self.device_info.name = device_name
self.device_info.peripheral_uuid = ble_device.address
self.config["AwearLastConnectedDeviceName"] = device_name
self.config["AwearLastConnectedDevice"] = ble_device.address
self._set_status(DeviceStatus.CONNECTED)
self._reconnection_counter = 0
log.info("Connected to AWEAR board: %s", device_name)
if self._rx_char:
self._challenge_event = asyncio.Event()
self._ready_event = asyncio.Event()
self._pending_challenge = None
await self._client.start_notify(self._rx_char, self._on_rx_notify)
log.info("Notifications enabled — waiting for AWEAR_CONNECTED...")
try:
await asyncio.wait_for(self._challenge_event.wait(), timeout=10.0)
except asyncio.TimeoutError:
log.warning("AWEAR_CONNECTED not received (timeout)")
challenge = getattr(self, "_pending_challenge", None)
if challenge and self._client and self._client.is_connected and self._tx_char:
reply = compute_challenge_reply(challenge)
log.info("Sending challenge reply: %s", reply)
await self._client.write_gatt_char(
self._tx_char, reply.encode("utf-8"), response=False
)
log.info("CRPL sent — waiting for AWEAR_READY...")
try:
await asyncio.wait_for(self._ready_event.wait(), timeout=10.0)
except asyncio.TimeoutError:
log.warning("AWEAR_READY not received (timeout)")
if not self._client or not self._client.is_connected:
log.error("Device disconnected during handshake")
self._set_status(DeviceStatus.DISCONNECTED)
return False
self._device_ready = True
self._set_status(DeviceStatus.READY)
log.info("Device ready — authenticated and streaming")
if self.delegate:
self.delegate.on_ready()
if self.delegate:
self.delegate.on_connected()
if self.config["StartAfterConnected"]:
log.info("Starting after connected")
await self.start()
return True
async def disconnect(self) -> None:
log.info("@@@@ AwearController.disconnect")
self._cancel_all_timers()
self._started = False
self._device_ready = False
if self._client and self._client.is_connected:
try:
if self._rx_char:
await self._client.stop_notify(self._rx_char)
except Exception:
pass
try:
await self._client.disconnect()
except Exception:
pass
self._client = None
self._tx_char = None
self._rx_char = None
self._set_status(DeviceStatus.DISCONNECTED)
if self._file_writer:
self._file_writer.close()
self._file_writer = None
log.info("@@@@ Bluetooth.disconnect: Disconnected from AWEAR board: %s",
self.device_info.name)
if self.delegate:
self.delegate.on_disconnected()
async def _discover_services(self) -> bool:
assert self._client is not None
services = self._client.services
awear_svc = services.get_service(AWEAR_SERVICE_UUID)
if awear_svc is None:
for svc in services:
if svc.uuid.upper() == AWEAR_SERVICE_UUID.upper():
awear_svc = svc
break
if awear_svc is None:
log.error("AWEAR service not found on device")
return False
for char in awear_svc.characteristics:
uuid_upper = char.uuid.upper()
log.info(" Characteristic %s properties=%s", char.uuid, char.properties)
if uuid_upper == AWEAR_TX_CHARACTERISTIC_UUID.upper():
self._tx_char = char
elif uuid_upper == AWEAR_RX_CHARACTERISTIC_UUID.upper():
self._rx_char = char
if self._tx_char is None or self._rx_char is None:
log.error("AWEAR TX/RX characteristics not found (tx=%s, rx=%s)",
self._tx_char, self._rx_char)
return False
log.info("Discovered AWEAR service — TX=%s RX=%s",
self._tx_char.uuid, self._rx_char.uuid)
return True
def _on_disconnected_callback(self, _client: BleakClient) -> None:
log.warning("AWEAR disconnected (callback)")
was_connected = self.device_connected
was_streaming = self._started
self._device_ready = False
self._started = False
self._tx_char = None
self._rx_char = None
self._set_status(DeviceStatus.DISCONNECTED)
if self.delegate:
self.delegate.on_disconnected()
if was_connected and self.config["AwearReconnectAutomatically"]:
self._reconnection_requested = True
self._resume_streaming_after_reconnect = was_streaming
try:
loop = asyncio.get_running_loop()
loop.create_task(self._reconnect())
except RuntimeError:
log.warning("No running event loop for reconnection")
async def _reconnect(self) -> None:
if not self._reconnection_requested:
return
device_name = self.config.get("AwearLastConnectedDeviceName", "")
if not device_name:
log.warning("No last connected device name for reconnection")
self._reconnection_requested = False
return
self._set_status(DeviceStatus.RECONNECTING)
reconnect_timeout = self.config["AwearReconnectTimeout"]
while (
self._reconnection_requested
and self._reconnection_counter < RECONNECTION_MAX_RETRIES
):
self._reconnection_counter += 1
log.info(
"@@@@ Bluetooth.beginReconnecting: %s (attempt %d)",
device_name,
self._reconnection_counter,
)
if self.delegate:
self.delegate.on_status_update(
f"Reconnecting to {device_name} (attempt {self._reconnection_counter})"
)
success = await self.connect(device_name)
if success:
log.info("Reconnection to AWEAR board %s successful", device_name)
self._reconnection_requested = False
if getattr(self, "_resume_streaming_after_reconnect", False):
self._resume_streaming_after_reconnect = False
log.info("Resuming streaming after reconnection")
await self.start()
return
log.info("Reconnection attempt %d failed, waiting %.1fs",
self._reconnection_counter, reconnect_timeout)
await asyncio.sleep(reconnect_timeout)
log.warning("Canceled reconnection to device %s after %d attempts",
device_name, self._reconnection_counter)
self._reconnection_requested = False
self._set_status(DeviceStatus.DISCONNECTED)
async def start(self) -> None:
if not self.device_ready:
log.warning("Device not ready; cannot start")
return
if not self._client or not self._client.is_connected:
log.warning("Device not connected; cannot start")
return
log.info("AwearController:start")
await self._send_command("START")
log.info("START command sent to connected AWEAR board")
self._started = True
self._set_status(DeviceStatus.STREAMING)
self.stats = PacketStats()
self._freq_counter = 0
self._last_freq_time = time.monotonic()
if self.save_bt_data:
openbci = self.config.get("SaveDataAsOpenBCI", True)
output_dir = Path.cwd() / "awear_data"
self._file_writer = FileWriter(output_dir, openbci_format=openbci)
self._file_writer.open()
self._start_timer("frequency_counter", self._frequency_counter_tick,
FREQUENCY_COUNTER_TIMER_INTERVAL)
self._start_timer("check_data", self._check_data_tick,
CHECK_DATA_TIMER_INTERVAL)
if self.delegate:
self.delegate.on_status_update("Streaming started")
async def stop(self) -> None:
log.info("AwearController:stop")
if self._client and self._client.is_connected and self._tx_char:
await self._send_command("STOP")
log.info("STOP command sent to connected AWEAR board")
self._started = False
if self.status == DeviceStatus.STREAMING:
self._set_status(DeviceStatus.READY)
self._cancel_timer("frequency_counter")
self._cancel_timer("check_data")
if self._file_writer:
self._file_writer.flush()
self._file_writer.close()
self._file_writer = None
if self.delegate:
self.delegate.on_status_update("Streaming stopped")
async def send_command(self, command: str) -> None:
if not command:
log.warning("sendCommandAWEAR: invalid argument")
return
await self._send_command(command)
def get_command_output(self) -> Optional[str]:
if self._command_output:
return self._command_output.popleft()
return None
def has_command_output(self) -> bool:
return len(self._command_output) > 0
async def _send_command(self, command: str) -> None:
if not self._client or not self._client.is_connected or not self._tx_char:
log.warning("Cannot send command — not connected")
return
data = command.encode("utf-8")
try:
await self._client.write_gatt_char(self._tx_char, data, response=False)
log.debug("Sent command: %s (%d bytes, write-without-response)",
command, len(data))
except Exception as exc:
log.error("Failed to send command '%s': %s", command, exc)
def _on_rx_notify(self, _char: BleakGATTCharacteristic, data: bytearray) -> None:
recv_time = time.monotonic()
if len(data) < 1:
return
if self._started:
log.debug("RX [%d bytes] first=0x%02X last=0x%02X luca_expecting=%s",
len(data), data[0], data[-1], self._luca_expecting_data)
if data[-1:] == b"\n":
try:
text = data.decode("utf-8", errors="ignore").strip()
if text.startswith(AWEAR_CONNECTED_PREFIX):
board_info = text[len(AWEAR_CONNECTED_PREFIX):].strip()
log.info("Received handshake: AWEAR_CONNECTED:%s", board_info)
self.device_info.is_awear = True
self._pending_challenge = board_info if len(board_info) == 8 else None
ev = getattr(self, "_challenge_event", None)
if ev and not ev.is_set():
ev.set()
return
if text.startswith("AWEAR_READY:"):
parts = text.split(":")
log.info("Device authenticated: %s", text)
if len(parts) >= 6:
self._awear_packet_size = int(parts[4]) ev = getattr(self, "_ready_event", None)
if ev and not ev.is_set():
ev.set()
return
if text.startswith("Battery mV:"):
try:
mv = int(text.split(":")[1].strip())
pct = max(0, min(100, int((mv - 3200) / 10)))
self.device_info.battery = pct
log.debug("Battery: %d mV (%d%%)", mv, pct)
if self.delegate:
self.delegate.on_battery_data(pct)
except (ValueError, IndexError):
pass
return
if text.startswith("RSSI DBm:"):
try:
rssi = int(text.split(":")[1].strip())
self.device_info.signal = rssi
log.debug("Signal: %d dBm", rssi)
if self.delegate:
self.delegate.on_signal_data(rssi)
except (ValueError, IndexError):
pass
return
if text:
log.debug("RX text: %s", text)
self._command_output.append(text)
return
except Exception:
pass
if len(data) >= 4 and data[:4] == LUCA_MAGIC:
if self._device_ready:
self._handle_luca_header(data, recv_time)
return
if self._luca_expecting_data and self._device_ready:
self._handle_luca_data(data, recv_time)
return
packet_type_byte = data[0]
try:
ptype = DataPacketType(packet_type_byte)
except ValueError:
log.debug("Unknown packet: 0x%02X (%d bytes)", packet_type_byte, len(data))
return
if self.delegate:
self.delegate.on_data_received(ptype, bytes(data))
if ptype == DataPacketType.EEG:
self._received_eeg_data(data, recv_time)
elif ptype == DataPacketType.BATTERY:
self._received_battery_data(data)
elif ptype == DataPacketType.SIGNAL:
self._received_signal_data(data)
elif ptype == DataPacketType.MISC:
self._received_misc_data(data)
def _handle_luca_header(self, data: bytearray, recv_time: float) -> None:
if len(data) < 36:
return
data_type = struct.unpack_from(">I", data, 4)[0]
seq = struct.unpack_from("<I", data, 28)[0]
payload_hint = struct.unpack_from("<H", data, 32)[0]
self._luca_block_type = data_type
self._luca_block_counter = seq
if data_type in (1, 7):
if self._luca_expecting_data and self._luca_eeg_buffer:
self._finalize_eeg_block(time.monotonic())
self._luca_expecting_data = True
self._luca_eeg_buffer = bytearray()
self._luca_last_chunk_size = payload_hint log.debug("LUCA EEG header: type=%d seq=%d last_chunk=%d", data_type, seq, payload_hint)
else:
if self._luca_expecting_data and self._luca_eeg_buffer:
self._finalize_eeg_block(recv_time)
self._luca_expecting_data = False
log.debug("LUCA info header: type=%d seq=%d", data_type, seq)
def _handle_luca_data(self, data: bytearray, recv_time: float) -> None:
self._luca_eeg_buffer.extend(data)
self._freq_counter += 1
last_chunk = getattr(self, "_luca_last_chunk_size", 0)
if last_chunk > 0 and len(data) == last_chunk:
self._finalize_eeg_block(recv_time)
def _finalize_eeg_block(self, recv_time: float) -> None:
self._luca_expecting_data = False
self.stats.eeg_packet_count += 1
raw_buf = self._luca_eeg_buffer
try:
hex_str = raw_buf.decode("ascii").strip()
buf = bytes.fromhex(hex_str)
except (UnicodeDecodeError, ValueError):
buf = bytes(raw_buf)
bytes_per_sample = 2
num_samples = len(buf) // bytes_per_sample
for s in range(num_samples):
offset = s * bytes_per_sample
if offset + 1 < len(buf):
raw = struct.unpack_from(">h", buf, offset)[0] channels = [raw]
if channels:
if self.delegate:
self.delegate.on_eeg_data(channels, self._luca_block_counter)
if self._file_writer and self.save_bt_data:
ts = time.time() if self.config.get("AdjustTimestamp", False) else None
self._file_writer.write_eeg(channels, timestamp=ts)
if self._bench_mode:
self.benchmark_latencies.append(time.monotonic() - recv_time)
if self.delegate:
self.delegate.on_data_update("eeg_block", {
"samples": num_samples,
"block_seq": self._luca_block_counter,
"block_bytes": len(buf),
})
log.debug("EEG block: %d bytes, %d samples", len(buf), num_samples)
self._luca_eeg_buffer = bytearray()
def _received_eeg_data(self, data: bytearray, recv_time: float) -> None:
if len(data) < 2:
return
self.stats.eeg_packet_count += 1
self._freq_counter += 1
counter = data[1]
expected = (self.stats.prev_packet_counter + 1) & 0xFF
if self.stats.eeg_packet_count > 1 and counter != expected:
if counter > expected:
lost = counter - expected
else:
lost = (256 - expected) + counter
self.stats.lost_packets += lost
log.debug("Lost EEG packets: %d (total lost: %d)", lost, self.stats.lost_packets)
self.stats.prev_packet_counter = counter
self.stats.packet_counter = counter
payload = data[2:]
channels: list[int] = []
i = 0
while i + 2 < len(payload):
raw = (payload[i] << 16) | (payload[i + 1] << 8) | payload[i + 2]
if raw & 0x800000: raw -= 0x1000000
channels.append(raw)
i += 3
min_val = self.config["AwearMinEEGValueCheck"]
max_val = self.config["AwearMaxEEGValueCheck"]
for ch_val in channels:
if ch_val < min_val or ch_val > max_val:
log.debug("EEG value out of range: %d", ch_val)
if self._bench_mode:
self.benchmark_latencies.append(time.monotonic() - recv_time)
if self._file_writer and self.save_bt_data:
ts = time.time() if self.config.get("AdjustTimestamp", False) else None
self._file_writer.write_eeg(channels, timestamp=ts)
if self.delegate:
self.delegate.on_eeg_data(channels, counter)
self.delegate.on_data_update("eeg", channels)
def _received_battery_data(self, data: bytearray) -> None:
self.stats.battery_packet_count += 1
if len(data) >= 2:
level = data[1]
self.device_info.battery = level
log.debug("AwearController receivedBatteryData: %d%%", level)
if self.delegate:
self.delegate.on_battery_data(level)
self.delegate.on_data_update("battery", level)
if self._file_writer and self.save_bt_data:
self._file_writer.write_raw(bytes(data), timestamp=time.time())
def _received_signal_data(self, data: bytearray) -> None:
self.stats.signal_packet_count += 1
if len(data) >= 2:
rssi = data[1] if data[1] < 128 else data[1] - 256
self.device_info.signal = rssi
log.debug("AwearController receivedSignalData: %d dBm", rssi)
if self.delegate:
self.delegate.on_signal_data(rssi)
self.delegate.on_data_update("signal", rssi)
if self._file_writer and self.save_bt_data:
self._file_writer.write_raw(bytes(data), timestamp=time.time())
def _received_misc_data(self, data: bytearray) -> None:
self.stats.misc_packet_count += 1
log.debug("AwearController receivedMiscData: %d bytes", len(data))
try:
text = data[1:].decode("utf-8").strip()
if text:
self._command_output.append(text)
except UnicodeDecodeError:
pass
if self.delegate:
self.delegate.on_misc_data(bytes(data))
self.delegate.on_data_update("misc", bytes(data))
if self._file_writer and self.save_bt_data:
self._file_writer.write_raw(bytes(data), timestamp=time.time())
async def update_firmware(self, firmware_path: str,
on_progress: Optional[Callable[[DFUProgress], None]] = None) -> bool:
if not self._client or not self._client.is_connected:
log.error("Cannot update firmware — not connected")
return False
self._set_status(DeviceStatus.DFU)
self._dfu = DFUController(self._client)
if not await self._dfu.discover():
log.error("DFU service discovery failed")
self._set_status(DeviceStatus.READY if self._device_ready else DeviceStatus.CONNECTED)
return False
success = await self._dfu.update_firmware(firmware_path, on_progress=on_progress)
if success:
log.info("Firmware update complete; device will reboot")
self._dfu = None
return success
def write_file_data(self, data: bytes) -> None:
if self._file_writer:
self._file_writer.write_raw(data)
def write_file_data_with_timestamp(self, data: bytes) -> None:
if self._file_writer:
self._file_writer.write_raw(data, timestamp=time.time())
def _start_timer(self, name: str, coro_func, interval: float) -> None:
self._cancel_timer(name)
async def _loop():
try:
await coro_func()
except asyncio.CancelledError:
pass
self._timer_tasks[name] = asyncio.ensure_future(_loop())
def _cancel_timer(self, name: str) -> None:
task = self._timer_tasks.pop(name, None)
if task and not task.done():
task.cancel()
def _cancel_all_timers(self) -> None:
for name in list(self._timer_tasks):
self._cancel_timer(name)
async def _frequency_counter_tick(self) -> None:
while True:
await asyncio.sleep(FREQUENCY_COUNTER_TIMER_INTERVAL)
now = time.monotonic()
elapsed = now - self._last_freq_time
if elapsed > 0:
self._current_frequency = self._freq_counter / elapsed
self.stats.last_frequency = self._current_frequency
if self._current_frequency > 0:
log.debug("EEG packet frequency: %.1f Hz", self._current_frequency)
else:
log.debug("EEG packet frequency: 0")
self._freq_counter = 0
self._last_freq_time = now
async def _check_data_tick(self) -> None:
while True:
await asyncio.sleep(CHECK_DATA_TIMER_INTERVAL)
if self._started and self.stats.eeg_packet_count == 0:
log.warning("No EEG data received from AWEAR board")
if self.delegate:
self.delegate.on_status_update(
"Warning: not received from AWEAR board"
)
new_lost = self.stats.lost_packets - self.stats.prev_lost_packets
if new_lost > 0:
log.warning("Lost %d EEG packets since last check", new_lost)
self.stats.prev_lost_packets = self.stats.lost_packets
def _set_status(self, status: DeviceStatus) -> None:
old = self.status
self.status = status
if old != status:
log.info("AWEAR StatusSet: %s → %s", old.value, status.value)
if self.delegate:
self.delegate.on_status_set(status)
def get_connection_key(self) -> str:
return self.device_info.peripheral_uuid
def get_devices(self) -> list[dict[str, Any]]:
devices = self.found_devices.snapshot()
result = []
for d in devices:
result.append({
"name": d.name,
"address": d.address,
"rssi": d.rssi,
})
log.debug("AWEAR device list data sent to Flutter")
return result
async def stop_connecting(self) -> None:
log.info("@@@@ Bluetooth.stopConnecting")
if self.status == DeviceStatus.CONNECTING and self._client:
try:
await self._client.disconnect()
except Exception:
pass
self._client = None
self._set_status(DeviceStatus.DISCONNECTED)
def start_device_listing(self) -> None:
log.debug("Flutter method startDeviceListing() called")
self._device_listing_active = True
def stop_device_listing(self) -> None:
log.debug("Flutter method stopDeviceListing() called")
self._device_listing_active = False
def device_list_updated(self) -> bool:
log.debug("Flutter method deviceListUpdatedAWEAR() called")
return self._device_listing_active and len(self.found_devices) > 0
async def transmit_cloud_data(self, collection: str, data: dict[str, Any]) -> bool:
log.debug("Flutter method transmit_cloud_data() called")
if not collection or not data:
log.error("Invalid transmit_cloud_data")
return False
if not isinstance(data, dict):
log.error("Invalid transmit_cloud_data: data must be a dict")
return False
import json
try:
payload = json.dumps(data, default=str)
except (TypeError, ValueError) as exc:
log.error("transmit_cloud_data serialization failed: %s", exc)
return False
cloud_endpoint = self.config.get("CloudEndpoint")
if cloud_endpoint:
import urllib.request
try:
req = urllib.request.Request(
cloud_endpoint,
data=payload.encode("utf-8"),
headers={"Content-Type": "application/json"},
method="POST",
)
urllib.request.urlopen(req, timeout=10)
except Exception as exc:
log.error("Cloud transmit failed: %s", exc)
return False
log.info("transmit_cloud_data: collection=%s, payload_size=%d bytes",
collection, len(payload))
return True
async def trigger_breathing_cycle(self, duration: float = 4.0) -> bool:
log.info("Flutter method triggerBreathingCycle(%.1f) called", duration)
if not self._haptic_engine_available:
log.warning("#### No haptic engine available")
return False
log.info("#### Breathing haptic cycle started (%.1fs)", duration)
events = []
steps = int(duration / 0.1)
for i in range(steps):
t = i * 0.1
import math
intensity = 0.5 + 0.5 * math.sin(2 * math.pi * t / duration)
sharpness = 0.3
events.append({
"type": "hapticContinuous",
"time": t,
"duration": 0.1,
"intensity": intensity,
"sharpness": sharpness,
})
await asyncio.sleep(duration)
log.info("Breathing haptic cycle completed")
return True
async def trigger_stress_vibration(self) -> bool:
log.info("Flutter method triggerStressVibration() called")
log.debug("#### triggerStressVibrationBackground() start")
for _ in range(3):
await asyncio.sleep(0.2)
log.debug("#### triggerStressVibrationBackground() done")
return True
async def cleanup(self) -> None:
log.info("AwearController deinit")
self._reconnection_requested = False
self._cancel_all_timers()
if self.device_connected:
await self.disconnect()
class BenchmarkRunner:
def __init__(self, controller: AwearController) -> None:
self.ctrl = controller
self.results: dict[str, dict[str, float]] = {}
async def run_all(self, device_name: Optional[str] = None, duration: float = 30.0) -> dict:
log.info("=" * 60)
log.info("AWEAR Runner Python Benchmark")
log.info("=" * 60)
await self._bench("scan_devices", self._bench_scan)
if device_name:
await self._bench("connect", self._bench_connect, device_name)
elif len(self.ctrl.found_devices) > 0:
dev = self.ctrl.found_devices.snapshot()[0]
device_name = dev.name
await self._bench("connect", self._bench_connect, dev)
if not self.ctrl.device_connected:
log.warning("No device connected — skipping connected benchmarks")
self._print_results()
return self.results
await self._bench("service_discovery", self._bench_service_discovery)
await self._bench("start_streaming", self._bench_start)
await self._bench("data_streaming", self._bench_streaming, duration)
await self._bench("command_roundtrip", self._bench_command)
await self._bench("stop_streaming", self._bench_stop)
await self._bench("file_write_openbci", self._bench_file_write, True)
await self._bench("file_write_binary", self._bench_file_write, False)
await self._bench("packet_parse_eeg", self._bench_packet_parse)
await self._bench("breathing_cycle", self._bench_breathing_cycle)
await self._bench("stress_vibration", self._bench_stress_vibration)
await self._bench("transmit_cloud_data", self._bench_cloud_transmit)
await self._bench("device_listing_api", self._bench_device_listing)
await self._bench("disconnect", self._bench_disconnect)
if device_name:
await self._bench("reconnection", self._bench_reconnect, device_name)
if self.ctrl.device_connected:
await self.ctrl.disconnect()
self._print_results()
return self.results
async def _bench(self, name: str, func, *args) -> None:
log.info("--- Benchmark: %s ---", name)
t0 = time.monotonic()
try:
result = await func(*args)
except Exception as exc:
log.error("Benchmark %s failed: %s", name, exc)
result = {"error": str(exc)}
elapsed = time.monotonic() - t0
entry = {"elapsed_s": round(elapsed, 6)}
if isinstance(result, dict):
entry.update(result)
self.results[name] = entry
log.info(" %s: %.4fs %s", name, elapsed,
" ".join(f"{k}={v}" for k, v in entry.items() if k != "elapsed_s"))
async def _bench_scan(self) -> dict:
devices = await self.ctrl.search_devices(timeout=SEARCH_TIMEOUT)
return {"devices_found": len(devices)}
async def _bench_connect(self, device) -> dict:
ok = await self.ctrl.connect(device)
return {"success": ok}
async def _bench_service_discovery(self) -> dict:
has_tx = self.ctrl._tx_char is not None
has_rx = self.ctrl._rx_char is not None
return {"tx_found": has_tx, "rx_found": has_rx}
async def _bench_start(self) -> dict:
await self.ctrl.start()
return {"started": self.ctrl.is_started}
async def _bench_streaming(self, duration: float) -> dict:
self.ctrl._bench_mode = True
self.ctrl.benchmark_latencies.clear()
initial_count = self.ctrl.stats.eeg_packet_count
await asyncio.sleep(duration)
total_packets = self.ctrl.stats.eeg_packet_count - initial_count
lost = self.ctrl.stats.lost_packets
freq = self.ctrl._current_frequency
latencies = list(self.ctrl.benchmark_latencies)
self.ctrl._bench_mode = False
avg_latency = sum(latencies) / len(latencies) if latencies else 0
max_latency = max(latencies) if latencies else 0
min_latency = min(latencies) if latencies else 0
p95_latency = sorted(latencies)[int(len(latencies) * 0.95)] if latencies else 0
return {
"total_packets": total_packets,
"lost_packets": lost,
"frequency_hz": round(freq, 2),
"avg_latency_us": round(avg_latency * 1e6, 2),
"min_latency_us": round(min_latency * 1e6, 2),
"max_latency_us": round(max_latency * 1e6, 2),
"p95_latency_us": round(p95_latency * 1e6, 2),
"throughput_pps": round(total_packets / duration, 2) if duration > 0 else 0,
}
async def _bench_command(self) -> dict:
if not self.ctrl._tx_char:
return {"error": "no TX characteristic"}
t0 = time.monotonic()
await self.ctrl.send_command("PING")
for _ in range(200):
if self.ctrl.has_command_output():
break
await asyncio.sleep(0.01)
rtt = time.monotonic() - t0
resp = self.ctrl.get_command_output()
return {"rtt_ms": round(rtt * 1000, 2), "response": resp or "timeout"}
async def _bench_stop(self) -> dict:
await self.ctrl.stop()
return {"stopped": not self.ctrl.is_started}
async def _bench_file_write(self, openbci: bool) -> dict:
n = 10_000
output_dir = Path.cwd() / "awear_benchmark_data"
fw = FileWriter(output_dir, openbci_format=openbci)
fw.open()
channels = [100000, -200000, 350000, -450000, 500000, -600000, 700000, -800000]
t0 = time.monotonic()
for i in range(n):
fw.write_eeg(channels, timestamp=time.time())
fw.flush()
elapsed = time.monotonic() - t0
fw.close()
return {
"samples": n,
"throughput_sps": round(n / elapsed, 2),
"format": "openbci" if openbci else "binary",
}
async def _bench_packet_parse(self) -> dict:
n = 100_000
channels_raw = b""
for val in [100000, -200000, 350000, -450000, 500000, -600000, 700000, -800000]:
channels_raw += val.to_bytes(3, "big", signed=True)
base_packet = bytearray([0x01, 0x00]) + bytearray(channels_raw)
t0 = time.monotonic()
ctrl = self.ctrl
ctrl._bench_mode = False old_delegate = ctrl.delegate
ctrl.delegate = None old_writer = ctrl._file_writer
ctrl._file_writer = None
for i in range(n):
pkt = bytearray(base_packet)
pkt[1] = i & 0xFF
ctrl._received_eeg_data(pkt, time.monotonic())
elapsed = time.monotonic() - t0
ctrl.delegate = old_delegate
ctrl._file_writer = old_writer
return {
"packets": n,
"throughput_pps": round(n / elapsed, 2),
"parse_us_per_packet": round((elapsed / n) * 1e6, 2),
}
async def _bench_breathing_cycle(self) -> dict:
ok = await self.ctrl.trigger_breathing_cycle(duration=1.0)
return {"success": ok}
async def _bench_stress_vibration(self) -> dict:
ok = await self.ctrl.trigger_stress_vibration()
return {"success": ok}
async def _bench_cloud_transmit(self) -> dict:
n = 1_000
sample_data = {
"timestamp": time.time(),
"eeg_channels": [100000, -200000, 350000, -450000],
"battery": 85,
"signal": -45,
"device": "AWEAR_TEST",
"session_id": "bench-001",
}
t0 = time.monotonic()
for _ in range(n):
await self.ctrl.transmit_cloud_data("benchmark", sample_data)
elapsed = time.monotonic() - t0
return {
"payloads": n,
"throughput_pps": round(n / elapsed, 2),
}
async def _bench_device_listing(self) -> dict:
self.ctrl.start_device_listing()
updated = self.ctrl.device_list_updated()
devices = self.ctrl.get_devices()
self.ctrl.stop_device_listing()
return {"devices": len(devices), "updated": updated}
async def _bench_disconnect(self) -> dict:
await self.ctrl.disconnect()
return {"disconnected": self.ctrl.device_disconnected}
async def _bench_reconnect(self, device_name: str) -> dict:
t0 = time.monotonic()
ok = await self.ctrl.connect(device_name)
elapsed = time.monotonic() - t0
return {"success": ok, "reconnect_time_s": round(elapsed, 4)}
def _print_results(self) -> None:
print("\n" + "=" * 70)
print("BENCHMARK RESULTS — Python AWEAR Runner")
print("=" * 70)
for name, metrics in self.results.items():
elapsed = metrics.get("elapsed_s", 0)
detail = ", ".join(
f"{k}={v}" for k, v in metrics.items() if k != "elapsed_s"
)
print(f" {name:30s} {elapsed:10.4f}s {detail}")
print("=" * 70)
async def main() -> None:
parser = argparse.ArgumentParser(
description="AWEAR Runner — Python BLE parity benchmark"
)
parser.add_argument("--benchmark", action="store_true", help="Run benchmark suite")
parser.add_argument("--device", type=str, default=None, help="Device name to connect to")
parser.add_argument("--scan-timeout", type=float, default=SEARCH_TIMEOUT,
help="Scan timeout in seconds")
parser.add_argument("--connect-timeout", type=float, default=DEVICE_CONNECTION_TIMEOUT,
help="Connection timeout in seconds")
parser.add_argument("--stream-duration", type=float, default=30.0,
help="Streaming benchmark duration in seconds")
parser.add_argument("--save-bt-data", action="store_true",
help="Save BT data to file during streaming")
parser.add_argument("--openbci", action="store_true", default=True,
help="Save in OpenBCI CSV format (default)")
parser.add_argument("--binary", action="store_true",
help="Save in binary format instead of OpenBCI")
parser.add_argument("--min-rssi", type=int, default=-80,
help="Minimum RSSI for device discovery")
parser.add_argument("--auto-start", action="store_true",
help="Auto-start streaming after connection")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose logging")
args = parser.parse_args()
level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%H:%M:%S",
)
config = {
**DEFAULTS,
"AwearConnectTimeout": args.connect_timeout,
"AwearDiscoveryMinSignal": args.min_rssi,
"StartAfterConnected": args.auto_start,
"SaveBTData": args.save_bt_data,
"SaveDataAsOpenBCI": not args.binary,
}
controller = AwearController(config=config)
try:
if args.benchmark:
bench = BenchmarkRunner(controller)
await bench.run_all(device_name=args.device, duration=args.stream_duration)
else:
devices = await controller.search_devices(timeout=args.scan_timeout)
if not devices:
print("No AWEAR devices found.")
return
target = args.device
if not target:
print("\nDiscovered devices:")
for i, d in enumerate(devices):
print(f" [{i}] {d.name} ({d.address}) RSSI={d.rssi}")
choice = input("\nSelect device [0]: ").strip()
idx = int(choice) if choice else 0
target = devices[idx]
ok = await controller.connect(target)
if not ok:
print("Connection failed.")
return
print(f"\nConnected to {controller.device_name}")
print("Streaming... (Ctrl+C to stop)")
await controller.start()
try:
while True:
await asyncio.sleep(1.0)
freq = controller._current_frequency
pkts = controller.stats.eeg_packet_count
lost = controller.stats.lost_packets
batt = controller.battery_level
sig = controller.signal_level
print(
f"\r Freq={freq:.1f}Hz Packets={pkts} "
f"Lost={lost} Battery={batt}% Signal={sig}dBm",
end="", flush=True,
)
except KeyboardInterrupt:
print("\nStopping...")
await controller.stop()
finally:
await controller.cleanup()
if __name__ == "__main__":
asyncio.run(main())