use dbus::nonblock::Proxy;
use dbus_crossroads::{Crossroads, IfaceBuilder, IfaceToken};
use futures::{Stream, StreamExt};
use std::{
fmt,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use strum::{Display, EnumString};
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio_stream::wrappers::ReceiverStream;
use uuid::Uuid;
use crate::{
method_call, Address, DbusResult, Device, Error, ErrorKind, Result, SessionInner, SERVICE_NAME, TIMEOUT,
};
pub(crate) const INTERFACE: &str = "org.bluez.AdvertisementMonitor1";
pub(crate) const MANAGER_INTERFACE: &str = "org.bluez.AdvertisementMonitorManager1";
pub(crate) const MANAGER_PATH: &str = "/org/bluez";
pub(crate) const MONITOR_PREFIX: &str = publish_path!("monitor");
#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Hash, Display, EnumString)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub enum Type {
#[strum(serialize = "or_patterns")]
OrPatterns,
}
impl Default for Type {
fn default() -> Self {
Self::OrPatterns
}
}
pub mod data_type {
pub const FLAGS: u8 = 0x01;
pub const INCOMPLETE_LIST_16_BIT_SERVICE_CLASS_UUIDS: u8 = 0x02;
pub const COMPLETE_LIST_16_BIT_SERVICE_CLASS_UUIDS: u8 = 0x03;
pub const INCOMPLETE_LIST_32_BIT_SERVICE_CLASS_UUIDS: u8 = 0x04;
pub const COMPLETE_LIST_32_BIT_SERVICE_CLASS_UUIDS: u8 = 0x05;
pub const INCOMPLETE_LIST_128_BIT_SERVICE_CLASS_UUIDS: u8 = 0x06;
pub const COMPLETE_LIST_128_BIT_SERVICE_CLASS_UUIDS: u8 = 0x07;
pub const SHORTENED_LOCAL_NAME: u8 = 0x08;
pub const COMPLETE_LOCAL_NAME: u8 = 0x09;
pub const TX_POWER_LEVEL: u8 = 0x0A;
pub const MANUFACTURER_SPECIFIC_DATA: u8 = 0xFF;
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct Pattern {
pub data_type: u8,
pub start_position: u8,
pub content: Vec<u8>,
}
impl Pattern {
pub fn new(data_type: u8, start_position: u8, content: &[u8]) -> Self {
Self { data_type, start_position, content: content.to_vec() }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum RssiSamplingPeriod {
All,
First,
Period(Duration),
}
impl RssiSamplingPeriod {
fn to_value(self) -> u16 {
match self {
Self::All => 0,
Self::First => 255,
Self::Period(period) => (period.as_millis() / 100).clamp(1, 254) as u16,
}
}
}
#[derive(Default)]
pub struct Monitor {
pub monitor_type: Type,
pub rssi_low_threshold: Option<i16>,
pub rssi_high_threshold: Option<i16>,
pub rssi_low_timeout: Option<Duration>,
pub rssi_high_timeout: Option<Duration>,
pub rssi_sampling_period: Option<RssiSamplingPeriod>,
pub patterns: Option<Vec<Pattern>>,
#[doc(hidden)]
pub _non_exhaustive: (),
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[non_exhaustive]
pub struct DeviceId {
pub adapter: String,
pub device: Address,
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum MonitorEvent {
DeviceFound(DeviceId),
DeviceLost(DeviceId),
}
pub(crate) struct RegisteredMonitor {
am: Monitor,
activate_tx: mpsc::Sender<()>,
release_tx: mpsc::Sender<()>,
event_tx: Mutex<Option<mpsc::Sender<MonitorEvent>>>,
}
impl RegisteredMonitor {
fn parse_device_path(device: &dbus::Path<'static>) -> DbusResult<(String, Address)> {
match Device::parse_dbus_path(device) {
Some((adapter, addr)) => Ok((adapter.to_string(), addr)),
None => {
log::error!("Cannot parse device path {}", &device);
Err(dbus::MethodErr::invalid_arg("cannot parse device path"))
}
}
}
pub(crate) fn register_interface(cr: &mut Crossroads) -> IfaceToken<Arc<RegisteredMonitor>> {
cr.register(INTERFACE, |ib: &mut IfaceBuilder<Arc<RegisteredMonitor>>| {
ib.method_with_cr_async("Release", (), (), |ctx, cr, ()| {
method_call(ctx, cr, |reg: Arc<RegisteredMonitor>| async move {
*reg.event_tx.lock().await = None;
let _ = reg.release_tx.send(()).await;
Ok(())
})
});
ib.method_with_cr_async("Activate", (), (), |ctx, cr, ()| {
method_call(ctx, cr, |reg: Arc<RegisteredMonitor>| async move {
let _ = reg.activate_tx.send(()).await;
Ok(())
})
});
ib.method_with_cr_async(
"DeviceFound",
("device",),
(),
|ctx, cr, (addr,): (dbus::Path<'static>,)| {
method_call(ctx, cr, |reg: Arc<RegisteredMonitor>| async move {
let (adapter, device) = Self::parse_device_path(&addr)?;
if let Some(event_tx) = reg.event_tx.lock().await.as_ref() {
let _ = event_tx.send(MonitorEvent::DeviceFound(DeviceId { adapter, device })).await;
}
Ok(())
})
},
);
ib.method_with_cr_async("DeviceLost", ("device",), (), |ctx, cr, (addr,): (dbus::Path<'static>,)| {
method_call(ctx, cr, move |reg: Arc<RegisteredMonitor>| async move {
let (adapter, device) = Self::parse_device_path(&addr)?;
if let Some(event_tx) = reg.event_tx.lock().await.as_ref() {
let _ = event_tx.send(MonitorEvent::DeviceLost(DeviceId { adapter, device })).await;
}
Ok(())
})
});
cr_property!(ib, "Type", r => {
Some(r.am.monitor_type.to_string())
});
cr_property!(ib, "RSSILowThreshold", r => {
r.am.rssi_low_threshold
});
cr_property!(ib, "RSSIHighThreshold", r => {
r.am.rssi_high_threshold
});
cr_property!(ib, "RSSILowTimeout", r => {
r.am.rssi_low_timeout.map(|t| t.as_secs().clamp(1, 300) as u16)
});
cr_property!(ib, "RSSIHighTimeout", r => {
r.am.rssi_high_timeout.map(|t| t.as_secs().clamp(1, 300) as u16)
});
cr_property!(ib, "RSSISamplingPeriod", r => {
r.am.rssi_sampling_period.map(|v| v.to_value())
});
cr_property!(ib, "Patterns", r => {
r.am.patterns.as_ref().map(|patterns: &Vec<Pattern>| {
patterns
.iter()
.map(|p| (p.start_position, p.data_type, p.content.clone()))
.collect::<Vec<_>>()
})
});
})
}
}
#[must_use = "the MonitorHandle must be held for the monitor to be active and its events must be consumed regularly"]
pub struct MonitorHandle {
name: dbus::Path<'static>,
event_rx: ReceiverStream<MonitorEvent>,
_drop_tx: oneshot::Sender<()>,
}
impl fmt::Debug for MonitorHandle {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "MonitorHandle {{ {} }}", &self.name)
}
}
impl Stream for MonitorHandle {
type Item = MonitorEvent;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
Pin::into_inner(self).event_rx.poll_next_unpin(cx)
}
}
impl Drop for MonitorHandle {
fn drop(&mut self) {
}
}
pub struct MonitorManager {
inner: Arc<SessionInner>,
root: dbus::Path<'static>,
_drop_tx: oneshot::Sender<()>,
}
impl fmt::Debug for MonitorManager {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("MonitorManager").finish()
}
}
impl MonitorManager {
pub(crate) async fn new(inner: Arc<SessionInner>, adapter_name: &str) -> Result<Self> {
let manager_path = dbus::Path::new(format!("{}/{}", MANAGER_PATH, adapter_name)).unwrap();
let root = dbus::Path::new(format!("{}/{}", MONITOR_PREFIX, Uuid::new_v4().as_simple())).unwrap();
log::trace!("Publishing advertisement monitor root at {}", &root);
{
let mut cr = inner.crossroads.lock().await;
let object_manager_token = cr.object_manager();
let introspectable_token = cr.introspectable();
let properties_token = cr.properties();
cr.insert(root.clone(), [&object_manager_token, &introspectable_token, &properties_token], ());
}
log::trace!("Registering advertisement monitor root at {}", &root);
let proxy = Proxy::new(SERVICE_NAME, manager_path, TIMEOUT, inner.connection.clone());
let () = proxy.method_call(MANAGER_INTERFACE, "RegisterMonitor", (root.clone(),)).await?;
let (_drop_tx, drop_rx) = oneshot::channel();
let unreg_root = root.clone();
let unreg_inner = inner.clone();
tokio::spawn(async move {
let _ = drop_rx.await;
log::trace!("Unregistering advertisement monitor root at {}", &unreg_root);
let _: std::result::Result<(), dbus::Error> =
proxy.method_call(MANAGER_INTERFACE, "UnregisterMonitor", (unreg_root.clone(),)).await;
log::trace!("Unpublishing advertisement monitor root at {}", &unreg_root);
let mut cr = unreg_inner.crossroads.lock().await;
cr.remove::<()>(&unreg_root);
});
Ok(Self { inner, root, _drop_tx })
}
pub async fn register(&self, advertisement_monitor: Monitor) -> Result<MonitorHandle> {
let name = dbus::Path::new(format!("{}/{}", &self.root, Uuid::new_v4().as_simple())).unwrap();
log::trace!("Publishing advertisement monitor target at {}", &name);
let (activate_tx, mut activate_rx) = mpsc::channel(1);
let (release_tx, mut release_rx) = mpsc::channel(1);
let (event_tx, event_rx) = mpsc::channel(1024);
let (_drop_tx, drop_rx) = oneshot::channel();
let reg = RegisteredMonitor {
am: advertisement_monitor,
activate_tx,
release_tx,
event_tx: Mutex::new(Some(event_tx)),
};
{
let mut cr = self.inner.crossroads.lock().await;
cr.insert(name.clone(), [&self.inner.monitor_token], Arc::new(reg));
}
let inner = self.inner.clone();
let unreg_name = name.clone();
tokio::spawn(async move {
let _ = drop_rx.await;
log::trace!("Unpublishing advertisement monitor target at {}", &unreg_name);
let mut cr = inner.crossroads.lock().await;
cr.remove::<Arc<RegisteredMonitor>>(&unreg_name);
});
tokio::select! {
biased;
_ = release_rx.recv() => return Err(Error::new(ErrorKind::AdvertisementMonitorRejected)),
res = activate_rx.recv() => {
if res.is_none() {
return Err(Error::new(ErrorKind::AdvertisementMonitorRejected))
}
},
}
Ok(MonitorHandle { name, event_rx: event_rx.into(), _drop_tx })
}
}
impl Drop for MonitorManager {
fn drop(&mut self) {
}
}