use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};
use futures_concurrency::future::Join as _;
use hidpp::{
channel::HidppChannel,
device::Device,
feature::{
CreatableFeature, device_information::DeviceInformationFeature,
device_type_and_name::DeviceTypeAndNameFeature, unified_battery::UnifiedBatteryFeature,
},
receiver::{
self, Receiver,
bolt::{
DeviceConnection as BoltDeviceConnection, Event as BoltEvent, Receiver as BoltReceiver,
},
unifying::{
DeviceConnection as UnifyingDeviceConnection, Event as UnifyingEvent,
Receiver as UnifyingReceiver,
},
},
};
use openlogi_core::device::{
BatteryInfo, Capabilities, DeviceInventory, DeviceKind, DeviceModelInfo, DeviceTransports,
PairedDevice, ReceiverInfo,
};
use thiserror::Error;
use tokio::time::timeout;
use tracing::{debug, warn};
use crate::mappings::{
map_battery_level, map_battery_status, map_device_type, map_kind, map_unifying_kind,
normalize_serial_number, resolve_device_kind,
};
use crate::node_ledger::NodeLedger;
use crate::route::DIRECT_DEVICE_INDEX;
use crate::transport::{enumerate_hidpp_devices, open_hidpp_channel};
const ARRIVAL_DRAIN: Duration = Duration::from_millis(1500);
const MAX_BOLT_SLOTS: u8 = 6;
const PROBE_BUDGET: Duration = Duration::from_secs(5);
const UNIFYING_SLOT_PROBE: Duration = Duration::from_millis(3500);
#[derive(Debug, Error)]
pub enum InventoryError {
#[error("HID transport error")]
Hid(#[from] async_hid::HidError),
}
const REFRESH_TICKS: u64 = 15;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum CacheKey {
Bolt { unit_id: [u8; 4] },
UnifyingSlot { receiver_uid: String, slot: u8 },
Direct(async_hid::DeviceId),
}
const CACHE_MISS_GRACE: u8 = 3;
#[derive(Clone)]
struct Cached {
probe: ProbedFeatures,
battery_index: Option<u8>,
probed_tick: u64,
}
enum CacheOutcome {
Fresh(CacheKey, Cached),
Update(CacheKey, Cached),
Seen(CacheKey),
Unkeyed,
}
fn seen(id: Option<CacheKey>) -> CacheOutcome {
id.map_or(CacheOutcome::Unkeyed, CacheOutcome::Seen)
}
fn is_stale(cached: &Cached, tick: u64) -> bool {
tick.wrapping_sub(cached.probed_tick) >= REFRESH_TICKS
}
async fn probe_or_reuse(
channel: &Arc<HidppChannel>,
index: u8,
id: Option<CacheKey>,
cached: Option<&Cached>,
online: bool,
tick: u64,
) -> (ProbedFeatures, CacheOutcome) {
if online && cached.is_none_or(|c| is_stale(c, tick)) {
let (fresh, battery_index) = probe_features(channel, index).await;
if fresh.capabilities.is_some() {
return match id {
Some(key) => {
let value = Cached {
probe: fresh.clone(),
battery_index,
probed_tick: tick,
};
(fresh, CacheOutcome::Fresh(key, value))
}
None => (fresh, CacheOutcome::Unkeyed),
};
}
return match cached {
Some(c) => (c.probe.clone(), seen(id)),
None => (fresh, seen(id)),
};
}
match cached {
Some(c) => {
if online
&& let Some(feature_index) = c.battery_index
&& let Some(key) = id.clone()
&& let Some(battery) = read_battery(channel, index, feature_index).await
{
let mut entry = c.clone();
entry.probe.battery = Some(battery);
return (entry.probe.clone(), CacheOutcome::Update(key, entry));
}
(c.probe.clone(), seen(id))
}
None => (ProbedFeatures::default(), seen(id)),
}
}
#[derive(Default)]
pub struct Enumerator {
cache: HashMap<CacheKey, Cached>,
misses: HashMap<CacheKey, u8>,
channels: HashMap<async_hid::DeviceId, CachedChannel>,
ledger: NodeLedger<async_hid::DeviceId>,
tick: u64,
}
struct CachedChannel {
info: async_hid::DeviceInfo,
channel: Arc<HidppChannel>,
}
pub async fn enumerate() -> Result<Vec<DeviceInventory>, InventoryError> {
let mut enumerator = Enumerator::default();
let mut attempt = 1u8;
loop {
let (inventories, all_healthy) = enumerator.enumerate_reporting_health().await?;
if all_healthy || attempt >= ONESHOT_ATTEMPTS {
return Ok(inventories);
}
debug!(
attempt,
"one-shot enumerate saw an unhealthy node — retrying"
);
tokio::time::sleep(ONESHOT_RETRY_DELAY).await;
attempt += 1;
}
}
const ONESHOT_ATTEMPTS: u8 = 4;
const ONESHOT_RETRY_DELAY: Duration = Duration::from_millis(300);
impl Enumerator {
pub async fn enumerate(&mut self) -> Result<Vec<DeviceInventory>, InventoryError> {
self.enumerate_reporting_health().await.map(|(inv, _)| inv)
}
async fn enumerate_reporting_health(
&mut self,
) -> Result<(Vec<DeviceInventory>, bool), InventoryError> {
self.tick = self.tick.wrapping_add(1);
let tick = self.tick;
let candidates = enumerate_hidpp_devices().await?;
debug!(count = candidates.len(), "HID++ candidate interfaces");
let mut active: Vec<(async_hid::DeviceInfo, Arc<HidppChannel>)> = Vec::new();
let mut seen_nodes: HashSet<async_hid::DeviceId> = HashSet::new();
let mut open_failures: Vec<async_hid::DeviceId> = Vec::new();
for dev in candidates {
let node = dev.id.clone();
seen_nodes.insert(node.clone());
if let Some(open) = self.channels.get(&node) {
active.push((open.info.clone(), Arc::clone(&open.channel)));
continue;
}
match open_hidpp_channel(dev).await {
Ok(Some((info, channel))) => {
self.channels.insert(
node,
CachedChannel {
info: info.clone(),
channel: Arc::clone(&channel),
},
);
active.push((info, channel));
}
Ok(None) => {} Err(e) => {
warn!(error = ?e, "failed to open HID++ channel — retrying next tick");
open_failures.push(node);
}
}
}
self.channels.retain(|node, _| seen_nodes.contains(node));
self.ledger.retain_nodes(&seen_nodes);
let results = {
let cache = &self.cache;
active
.into_iter()
.map(|(info, channel)| async move {
let node = info.id.clone();
let probe = timeout(PROBE_BUDGET, probe_one(info, channel, cache, tick)).await;
(node, probe)
})
.collect::<Vec<_>>()
.join()
.await
};
let mut inventories = Vec::new();
let mut outcomes = Vec::new();
let mut all_healthy = true;
for (node, result) in results {
let probe = if let Ok(probe) = result {
probe
} else {
warn!(budget = ?PROBE_BUDGET, "device probe timed out — treating as a failed probe");
NodeProbe::failed()
};
all_healthy &= probe.healthy;
outcomes.extend(probe.outcomes);
let settled = self.ledger.settle(&node, probe.healthy, probe.inventory);
if settled.evict_channel && self.channels.remove(&node).is_some() {
warn!("node probe keeps failing — dropping its channel to reopen next tick");
}
inventories.extend(settled.inventory);
}
for node in open_failures {
all_healthy = false;
let settled = self.ledger.settle(&node, false, None);
inventories.extend(settled.inventory);
}
let mut seen_keys = HashSet::new();
for outcome in outcomes {
match outcome {
CacheOutcome::Fresh(key, cached) | CacheOutcome::Update(key, cached) => {
seen_keys.insert(key.clone());
self.cache.insert(key, cached);
}
CacheOutcome::Seen(key) => {
seen_keys.insert(key);
}
CacheOutcome::Unkeyed => {}
}
}
self.evict_unseen(&seen_keys);
Ok((inventories, all_healthy))
}
fn evict_unseen(&mut self, seen_keys: &HashSet<CacheKey>) {
for key in seen_keys {
self.misses.remove(key);
}
let missing: Vec<CacheKey> = self
.cache
.keys()
.filter(|k| !seen_keys.contains(*k))
.cloned()
.collect();
for key in missing {
let misses = self.misses.entry(key.clone()).or_insert(0);
*misses += 1;
if *misses > CACHE_MISS_GRACE {
self.cache.remove(&key);
self.misses.remove(&key);
}
}
}
}
struct NodeProbe {
inventory: Option<DeviceInventory>,
healthy: bool,
outcomes: Vec<CacheOutcome>,
}
impl NodeProbe {
fn failed() -> Self {
Self {
inventory: None,
healthy: false,
outcomes: Vec::new(),
}
}
}
async fn probe_one(
info: async_hid::DeviceInfo,
channel: Arc<HidppChannel>,
cache: &HashMap<CacheKey, Cached>,
tick: u64,
) -> NodeProbe {
match receiver::detect(Arc::clone(&channel)) {
Some(Receiver::Bolt(bolt)) => probe_bolt_receiver(channel, info, bolt, cache, tick).await,
Some(Receiver::Unifying(unifying)) => {
probe_unifying_receiver(channel, info, unifying, cache, tick).await
}
None | Some(_) => {
probe_direct(channel, &info, cache, tick).await
}
}
}
async fn probe_bolt_receiver(
channel: Arc<HidppChannel>,
info: async_hid::DeviceInfo,
bolt: BoltReceiver,
cache: &HashMap<CacheKey, Cached>,
tick: u64,
) -> NodeProbe {
let unique_id = bolt.get_unique_id().await.ok();
let pairing_count = bolt.count_pairings().await.ok();
debug!(?pairing_count, "receiver reports pairing count");
let connections = drain_device_arrival(&bolt).await;
debug!(events = connections.len(), "drained device-arrival events");
let by_slot: HashMap<u8, BoltDeviceConnection> =
connections.into_iter().map(|c| (c.index, c)).collect();
let mut paired = Vec::new();
let mut outcomes = Vec::new();
for slot in 1u8..=MAX_BOLT_SLOTS {
if let Some((device, outcome)) =
probe_bolt_slot(&channel, &bolt, by_slot.get(&slot), slot, cache, tick).await
{
paired.push(device);
outcomes.push(outcome);
}
}
if let Some(count) = pairing_count
&& paired.len() != usize::from(count)
{
warn!(
expected = count,
found = paired.len(),
"paired-device count mismatch — some slots may be unreadable"
);
}
let complete = pairing_count.is_some_and(|count| paired.len() == usize::from(count));
NodeProbe {
inventory: Some(DeviceInventory {
receiver: ReceiverInfo {
name: "Logi Bolt Receiver".to_string(),
vendor_id: info.vendor_id,
product_id: info.product_id,
unique_id,
},
paired,
}),
healthy: complete,
outcomes,
}
}
async fn probe_unifying_receiver(
channel: Arc<HidppChannel>,
info: async_hid::DeviceInfo,
unifying: UnifyingReceiver,
cache: &HashMap<CacheKey, Cached>,
tick: u64,
) -> NodeProbe {
let unique_id = unifying.get_unique_id().await.ok();
let pairing_count = unifying.count_pairings().await.ok();
debug!(?pairing_count, "receiver reports pairing count");
let Some(connections) = drain_device_arrival_unifying(&unifying).await else {
return NodeProbe::failed();
};
debug!(events = connections.len(), "drained device-arrival events");
let receiver_uid_fallback;
let receiver_uid = if let Some(uid) = unique_id.as_deref() {
uid
} else {
tracing::warn!("Unifying receiver UID unavailable; cache isolation may be degraded");
receiver_uid_fallback = format!("pid:{:04x}", info.product_id);
&receiver_uid_fallback
};
let slot_results = connections
.iter()
.map(|conn| probe_unifying_slot(&channel, conn, receiver_uid, cache, tick))
.collect::<Vec<_>>()
.join()
.await;
let (paired, outcomes): (Vec<_>, Vec<_>) = slot_results.into_iter().flatten().unzip();
if let Some(count) = pairing_count
&& paired.len() != usize::from(count)
{
debug!(
expected = count,
found = paired.len(),
"online devices differ from pairing count; offline devices not yet surfaced for Unifying"
);
}
let healthy = pairing_count.is_some();
NodeProbe {
inventory: Some(DeviceInventory {
receiver: ReceiverInfo {
name: "Unifying Receiver".to_string(),
vendor_id: info.vendor_id,
product_id: info.product_id,
unique_id,
},
paired,
}),
healthy,
outcomes,
}
}
async fn probe_bolt_slot(
channel: &Arc<HidppChannel>,
bolt: &BoltReceiver,
event: Option<&BoltDeviceConnection>,
slot: u8,
cache: &HashMap<CacheKey, Cached>,
tick: u64,
) -> Option<(PairedDevice, CacheOutcome)> {
let pairing = match bolt.get_device_pairing_information(slot).await {
Ok(p) => p,
Err(e) => {
debug!(slot, error = ?e, "slot empty or unreadable");
return None;
}
};
let codename = read_codename(channel, slot).await;
let online = event.map_or(pairing.online, |c| c.online);
let bolt_kind = event.map_or(pairing.kind, |c| c.kind);
let wpid = event.map(|c| c.wpid);
debug!(
slot,
online,
?wpid,
?bolt_kind,
has_event = event.is_some(),
codename = ?codename,
"paired slot"
);
let id = (pairing.unit_id != [0u8; 4]).then_some(CacheKey::Bolt {
unit_id: pairing.unit_id,
});
let cached = id.as_ref().and_then(|i| cache.get(i));
let register_kind = map_kind(bolt_kind);
let (probe, outcome) = probe_or_reuse(channel, slot, id, cached, online, tick).await;
if matches!(outcome, CacheOutcome::Fresh(..))
&& let Some(probed) = probe.kind
&& probed != DeviceKind::Unknown
&& register_kind != DeviceKind::Unknown
&& probed != register_kind
{
debug!(
slot,
?register_kind,
?probed,
"device-kind sources disagree — trusting 0x0005"
);
}
let device = PairedDevice {
slot,
codename,
wpid,
kind: resolve_device_kind(probe.kind, register_kind),
online,
battery: probe.battery,
model_info: probe.model_info,
capabilities: probe.capabilities,
};
Some((device, outcome))
}
async fn probe_direct(
channel: Arc<HidppChannel>,
info: &async_hid::DeviceInfo,
cache: &HashMap<CacheKey, Cached>,
tick: u64,
) -> NodeProbe {
let id = CacheKey::Direct(info.id.clone());
let cached = cache.get(&id);
let (probe, outcome) =
probe_or_reuse(&channel, DIRECT_DEVICE_INDEX, Some(id), cached, true, tick).await;
let walk_succeeded = probe.capabilities.is_some();
let caps = probe.capabilities.unwrap_or_default();
let is_peripheral = probe.battery.is_some() || caps.buttons || caps.pointer || caps.lighting;
if !is_peripheral {
debug!(
vid = format_args!("{:04x}", info.vendor_id),
pid = format_args!("{:04x}", info.product_id),
has_model = probe.model_info.is_some(),
"slot 0xff exposes no battery or config feature — likely a receiver \
secondary interface; skipping"
);
return NodeProbe {
inventory: None,
healthy: walk_succeeded,
outcomes: vec![CacheOutcome::Unkeyed],
};
}
debug!(name = %info.name, "BT-direct / wired device recognised");
let inventory = DeviceInventory {
receiver: ReceiverInfo {
name: info.name.clone(),
vendor_id: info.vendor_id,
product_id: info.product_id,
unique_id: None,
},
paired: vec![PairedDevice {
slot: DIRECT_DEVICE_INDEX,
codename: Some(info.name.clone()),
wpid: None,
kind: resolve_device_kind(probe.kind, DeviceKind::Unknown),
online: true,
battery: probe.battery,
model_info: probe.model_info,
capabilities: probe.capabilities,
}],
};
NodeProbe {
inventory: Some(inventory),
healthy: true,
outcomes: vec![outcome],
}
}
async fn drain_device_arrival(bolt: &BoltReceiver) -> Vec<BoltDeviceConnection> {
let rx = bolt.listen();
if let Err(e) = bolt.trigger_device_arrival().await {
debug!(error = ?e, "trigger_device_arrival failed; receiver may report no devices");
return Vec::new();
}
let mut out = Vec::new();
loop {
match timeout(ARRIVAL_DRAIN, rx.recv()).await {
Ok(Ok(BoltEvent::DeviceConnection(c))) => out.push(c),
Ok(Ok(_)) => {} Ok(Err(_)) | Err(_) => break,
}
}
out
}
async fn drain_device_arrival_unifying(
unifying: &UnifyingReceiver,
) -> Option<Vec<UnifyingDeviceConnection>> {
let rx = unifying.listen();
if let Err(e) = unifying.trigger_device_arrival().await {
debug!(error = ?e, "trigger_device_arrival failed; receiver may report no devices");
return None;
}
let mut out = Vec::new();
loop {
match timeout(ARRIVAL_DRAIN, rx.recv()).await {
Ok(Ok(UnifyingEvent::DeviceConnection(c))) => out.push(c),
Ok(Ok(_)) => {}
Ok(Err(_)) | Err(_) => break,
}
}
Some(out)
}
async fn probe_unifying_slot(
channel: &Arc<HidppChannel>,
event: &UnifyingDeviceConnection,
receiver_uid: &str,
cache: &HashMap<CacheKey, Cached>,
tick: u64,
) -> Option<(PairedDevice, CacheOutcome)> {
let slot = event.index;
let codename = read_codename(channel, slot).await;
debug!(
slot,
online = event.online,
wpid = format_args!("{:04x}", event.wpid),
kind = ?event.kind,
codename = ?codename,
"unifying paired slot"
);
let id = CacheKey::UnifyingSlot {
receiver_uid: receiver_uid.to_string(),
slot,
};
let cached = cache.get(&id);
let register_kind = map_unifying_kind(event.kind);
let probe_result = timeout(
UNIFYING_SLOT_PROBE,
probe_or_reuse(channel, slot, Some(id.clone()), cached, event.online, tick),
)
.await;
let (probe, outcome) = if let Ok(r) = probe_result {
r
} else {
debug!(slot, budget = ?UNIFYING_SLOT_PROBE,
"Unifying slot probe timed out; using cached data if available");
let probe = cached.map_or_else(ProbedFeatures::default, |c| c.probe.clone());
(probe, CacheOutcome::Seen(id))
};
let device = PairedDevice {
slot,
codename,
wpid: Some(event.wpid),
kind: resolve_device_kind(probe.kind, register_kind),
online: event.online,
battery: probe.battery,
model_info: probe.model_info,
capabilities: probe.capabilities,
};
Some((device, outcome))
}
async fn read_codename(channel: &HidppChannel, slot: u8) -> Option<String> {
let response = channel
.read_long_register(0xFF, 0xB5, [0x60 + slot, 0x01, 0x00])
.await
.ok()?;
let len = usize::from(response[2]).min(13);
core::str::from_utf8(&response[3..3 + len])
.ok()
.map(str::to_string)
}
#[derive(Default, Clone)]
struct ProbedFeatures {
battery: Option<BatteryInfo>,
model_info: Option<DeviceModelInfo>,
kind: Option<DeviceKind>,
capabilities: Option<Capabilities>,
}
async fn read_battery(
channel: &Arc<HidppChannel>,
slot: u8,
feature_index: u8,
) -> Option<BatteryInfo> {
let feature = UnifiedBatteryFeature::new(Arc::clone(channel), slot, feature_index);
feature
.get_battery_info()
.await
.ok()
.map(|info| BatteryInfo {
percentage: info.charging_percentage,
level: map_battery_level(info.level),
status: map_battery_status(info.status),
})
}
fn battery_feature_index(ids: impl IntoIterator<Item = u16>) -> Option<u8> {
ids.into_iter()
.position(|id| id == UnifiedBatteryFeature::ID)
.and_then(|pos| u8::try_from(pos + 1).ok())
}
async fn probe_features(channel: &Arc<HidppChannel>, slot: u8) -> (ProbedFeatures, Option<u8>) {
let mut device = match Device::new(Arc::clone(channel), slot).await {
Ok(d) => d,
Err(e) => {
debug!(slot, error = ?e, "Device::new failed");
return (ProbedFeatures::default(), None);
}
};
let mut battery_index = None;
let capabilities = match device.enumerate_features().await {
Ok(Some(features)) => {
let ids: Vec<u16> = features.iter().map(|f| f.id).collect();
battery_index = battery_feature_index(ids.iter().copied());
Some(Capabilities::from_feature_ids(&ids))
}
Ok(None) => None,
Err(e) => {
debug!(slot, error = ?e, "enumerate_features failed");
return (ProbedFeatures::default(), None);
}
};
let battery = match battery_index {
Some(feature_index) => read_battery(channel, slot, feature_index).await,
None => None,
};
let model_info = match device.get_feature::<DeviceInformationFeature>() {
Some(feature) => match feature.get_device_info().await {
Ok(info) => {
let serial_number = if info.capabilities.serial_number {
match feature.get_serial_number().await {
Ok(serial) => normalize_serial_number(&serial),
Err(e) => {
debug!(slot, error = ?e, "DeviceInformation serial read failed");
None
}
}
} else {
None
};
Some(DeviceModelInfo {
entity_count: info.entity_count,
serial_number,
unit_id: info.unit_id,
transports: DeviceTransports {
usb: info.transport.usb,
equad: info.transport.e_quad,
btle: info.transport.btle,
bluetooth: info.transport.bluetooth,
},
model_ids: info.model_id,
extended_model_id: info.extended_model_id,
})
}
Err(e) => {
debug!(slot, error = ?e, "DeviceInformation read failed");
None
}
},
None => None,
};
let kind = match device.get_feature::<DeviceTypeAndNameFeature>() {
Some(feature) => match feature.get_device_type().await {
Ok(ty) => Some(map_device_type(ty)),
Err(e) => {
debug!(slot, error = ?e, "DeviceType read failed");
None
}
},
None => None,
};
(
ProbedFeatures {
battery,
model_info,
kind,
capabilities,
},
battery_index,
)
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use super::{
CACHE_MISS_GRACE, CacheKey, Cached, Enumerator, ProbedFeatures, REFRESH_TICKS,
UnifiedBatteryFeature, battery_feature_index, is_stale,
};
use hidpp::feature::CreatableFeature as _;
fn cache_entry(probed_tick: u64) -> Cached {
Cached {
probe: ProbedFeatures::default(),
battery_index: None,
probed_tick,
}
}
#[test]
fn cache_entry_survives_grace_then_evicts() {
let mut e = Enumerator::default();
let key = CacheKey::Bolt {
unit_id: [1, 2, 3, 4],
};
e.cache.insert(key.clone(), cache_entry(0));
let nobody = HashSet::new();
for _ in 0..CACHE_MISS_GRACE {
e.evict_unseen(&nobody);
assert!(
e.cache.contains_key(&key),
"evicted inside the grace window"
);
}
e.evict_unseen(&nobody);
assert!(
!e.cache.contains_key(&key),
"should evict past the grace window"
);
}
#[test]
fn being_seen_resets_the_miss_counter() {
let mut e = Enumerator::default();
let key = CacheKey::Bolt { unit_id: [9; 4] };
e.cache.insert(key.clone(), cache_entry(0));
let nobody = HashSet::new();
let seen: HashSet<CacheKey> = std::iter::once(key.clone()).collect();
e.evict_unseen(&nobody); e.evict_unseen(&seen); for _ in 0..CACHE_MISS_GRACE {
e.evict_unseen(&nobody);
}
assert!(
e.cache.contains_key(&key),
"counter reset by a sighting, so still within grace"
);
}
#[test]
fn cached_probe_is_reused_until_refresh_ticks() {
let cached = Cached {
probe: ProbedFeatures::default(),
battery_index: None,
probed_tick: 10,
};
assert!(!is_stale(&cached, 10), "same tick is fresh");
assert!(
!is_stale(&cached, 10 + REFRESH_TICKS - 1),
"just under the window is still fresh"
);
assert!(
is_stale(&cached, 10 + REFRESH_TICKS),
"at the window the probe is refreshed"
);
}
#[test]
fn battery_index_is_one_based_in_the_enumerated_table() {
let table = [0x0001, UnifiedBatteryFeature::ID, 0x2201];
assert_eq!(battery_feature_index(table), Some(2));
assert_eq!(
battery_feature_index([UnifiedBatteryFeature::ID]),
Some(1),
"first entry maps to index 1, not 0"
);
}
#[test]
fn no_battery_feature_means_no_index() {
assert_eq!(battery_feature_index([0x0001, 0x2201, 0x1b04]), None);
assert_eq!(battery_feature_index([]), None);
}
}