use dbus::{
arg::{OwnedFd, PropMap, RefArg, Variant},
nonblock::Proxy,
};
use dbus_crossroads::{Crossroads, IfaceBuilder, IfaceToken};
use futures::Future;
use pin_project::{pin_project, pinned_drop};
use std::{
collections::HashMap,
fmt,
os::unix::io::IntoRawFd,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use strum::{Display, EnumString, IntoStaticStr};
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio_stream::wrappers::ReceiverStream;
use uuid::Uuid;
use super::{Socket, Stream};
use crate::{method_call, read_dict, Address, Device, Result, SessionInner, ERR_PREFIX, SERVICE_NAME, TIMEOUT};
pub(crate) const MANAGER_INTERFACE: &str = "org.bluez.ProfileManager1";
pub(crate) const MANAGER_PATH: &str = "/org/bluez";
pub(crate) const PROFILE_INTERFACE: &str = "org.bluez.Profile1";
pub(crate) const PROFILE_PREFIX: &str = publish_path!("profile/");
#[cfg_attr(docsrs, doc(cfg(all(feature = "rfcomm", feature = "bluetoothd"))))]
#[derive(Clone, Copy, Debug, displaydoc::Display, Eq, PartialEq, Ord, PartialOrd, Hash, IntoStaticStr)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub enum ReqError {
Rejected,
Canceled,
}
impl std::error::Error for ReqError {}
impl Default for ReqError {
fn default() -> Self {
Self::Canceled
}
}
impl From<ReqError> for dbus::MethodErr {
fn from(err: ReqError) -> Self {
let name: &'static str = err.into();
Self::from((ERR_PREFIX.to_string() + name, &err.to_string()))
}
}
#[cfg_attr(docsrs, doc(cfg(all(feature = "rfcomm", feature = "bluetoothd"))))]
pub type ReqResult<T> = std::result::Result<T, ReqError>;
#[cfg_attr(docsrs, doc(cfg(all(feature = "rfcomm", feature = "bluetoothd"))))]
#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Hash, Display, EnumString)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum Role {
#[strum(serialize = "client")]
Client,
#[strum(serialize = "server")]
Server,
}
#[cfg_attr(docsrs, doc(cfg(all(feature = "rfcomm", feature = "bluetoothd"))))]
#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct Profile {
pub uuid: Uuid,
pub name: Option<String>,
pub service: Option<Uuid>,
pub role: Option<Role>,
pub channel: Option<u16>,
pub psm: Option<u16>,
pub require_authentication: Option<bool>,
pub require_authorization: Option<bool>,
pub auto_connect: Option<bool>,
pub service_record: Option<String>,
pub version: Option<u16>,
pub features: Option<u16>,
#[doc(hidden)]
pub _non_exhaustive: (),
}
impl Profile {
fn to_dict(&self) -> PropMap {
let mut pm = PropMap::new();
if let Some(name) = &self.name {
pm.insert("Name".to_string(), Variant(name.box_clone()));
}
if let Some(service) = &self.service {
pm.insert("Service".to_string(), Variant(service.to_string().box_clone()));
}
if let Some(role) = &self.role {
pm.insert("Role".to_string(), Variant(role.to_string().box_clone()));
}
if let Some(channel) = &self.channel {
pm.insert("Channel".to_string(), Variant(channel.box_clone()));
}
if let Some(psm) = &self.psm {
pm.insert("PSM".to_string(), Variant(psm.box_clone()));
}
if let Some(require_authentication) = &self.require_authentication {
pm.insert("RequireAuthentication".to_string(), Variant(require_authentication.box_clone()));
}
if let Some(require_authorization) = &self.require_authorization {
pm.insert("RequireAuthorization".to_string(), Variant(require_authorization.box_clone()));
}
if let Some(auto_connect) = &self.auto_connect {
pm.insert("AutoConnect".to_string(), Variant(auto_connect.box_clone()));
}
if let Some(service_record) = &self.service_record {
pm.insert("ServiceRecord".to_string(), Variant(service_record.box_clone()));
}
if let Some(version) = &self.version {
pm.insert("Version".to_string(), Variant(version.box_clone()));
}
if let Some(features) = &self.features {
pm.insert("Features".to_string(), Variant(features.box_clone()));
}
pm
}
}
#[cfg_attr(docsrs, doc(cfg(all(feature = "rfcomm", feature = "bluetoothd"))))]
pub struct ConnectRequest {
device: Address,
fd: OwnedFd,
props: ConnectRequestProps,
tx: oneshot::Sender<ReqResult<()>>,
closed_tx: mpsc::Sender<()>,
}
impl fmt::Debug for ConnectRequest {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("ConnectRequest")
.field("device", &self.device)
.field("version", &self.version())
.field("features", &self.features())
.finish_non_exhaustive()
}
}
impl ConnectRequest {
pub fn device(&self) -> Address {
self.device
}
pub fn version(&self) -> Option<u16> {
self.props.version
}
pub fn features(&self) -> Option<u16> {
self.props.features
}
pub fn closed(&self) -> impl Future<Output = ()> {
let closed_tx = self.closed_tx.clone();
async move { closed_tx.closed().await }
}
pub fn accept(self) -> Result<Stream> {
let Self { fd, tx, .. } = self;
let fd = fd.into_raw_fd();
if unsafe { libc::fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK) } == -1 {
return Err(std::io::Error::last_os_error().into());
}
let socket = unsafe { Socket::from_raw_fd(fd) }?;
let stream = Stream::from_socket(socket)?;
let _ = tx.send(Ok(()));
Ok(stream)
}
pub fn reject(self, reason: ReqError) {
let _ = self.tx.send(Err(reason));
}
}
#[derive(Clone, Debug)]
struct ConnectRequestProps {
pub version: Option<u16>,
pub features: Option<u16>,
}
impl ConnectRequestProps {
fn from_dict(dict: &HashMap<String, Variant<Box<dyn RefArg + 'static>>>) -> Self {
Self {
version: read_dict(dict, "Version").ok().cloned(),
features: read_dict(dict, "Features").ok().cloned(),
}
}
}
pub(crate) struct RegisteredProfile {
req_tx: mpsc::Sender<ConnectRequest>,
device_closed_rx: Mutex<HashMap<Address, Vec<mpsc::Receiver<()>>>>,
}
impl RegisteredProfile {
pub(crate) fn new(req_tx: mpsc::Sender<ConnectRequest>) -> Self {
Self { req_tx, device_closed_rx: Mutex::new(HashMap::new()) }
}
pub(crate) fn register_interface(cr: &mut Crossroads) -> IfaceToken<Arc<Self>> {
cr.register(PROFILE_INTERFACE, |ib: &mut IfaceBuilder<Arc<Self>>| {
ib.method_with_cr_async(
"NewConnection",
("device", "fd", "fd_properties"),
(),
|ctx, cr, (device_path, fd, props): (dbus::Path<'static>, OwnedFd, PropMap)| {
method_call(ctx, cr, |reg: Arc<Self>| async move {
let device = if let Some((_, device)) = Device::parse_dbus_path(&device_path) {
device
} else {
log::error!("Cannot parse device path: {}", &device_path);
return Err(ReqError::Rejected.into());
};
let props = ConnectRequestProps::from_dict(&props);
let (tx, rx) = oneshot::channel();
let (closed_tx, closed_rx) = mpsc::channel(1);
let cr = ConnectRequest { device, fd, props, tx, closed_tx };
let _ = reg.req_tx.send(cr).await;
match rx.await {
Ok(Ok(())) => {
let mut device_closed_rx = reg.device_closed_rx.lock().await;
device_closed_rx.entry(device).or_default().push(closed_rx);
Ok(())
}
Ok(Err(err)) => Err(err.into()),
Err(_) => Err(ReqError::Rejected.into()),
}
})
},
);
ib.method_with_cr_async(
"RequestDisconnection",
("device",),
(),
|ctx, cr, (device_path,): (dbus::Path<'static>,)| {
method_call(ctx, cr, |reg: Arc<Self>| async move {
let device = if let Some((_, device)) = Device::parse_dbus_path(&device_path) {
device
} else {
log::error!("Cannot parse device path: {}", &device_path);
return Err(ReqError::Rejected.into());
};
let mut device_closed_rx = reg.device_closed_rx.lock().await;
device_closed_rx.remove(&device);
Ok(())
})
},
);
})
}
pub(crate) async fn register(
self, inner: Arc<SessionInner>, profile: Profile, req_rx: mpsc::Receiver<ConnectRequest>,
) -> Result<ProfileHandle> {
let name = dbus::Path::new(format!("{}{}", PROFILE_PREFIX, Uuid::new_v4().as_simple())).unwrap();
log::trace!("Publishing profile at {}", &name);
{
let mut cr = inner.crossroads.lock().await;
cr.insert(name.clone(), &[inner.profile_token], Arc::new(self));
}
log::trace!("Registering profile at {}", &name);
let proxy = Proxy::new(SERVICE_NAME, MANAGER_PATH, TIMEOUT, inner.connection.clone());
proxy
.method_call(
MANAGER_INTERFACE,
"RegisterProfile",
(name.clone(), profile.uuid.to_string(), profile.to_dict()),
)
.await?;
let (drop_tx, drop_rx) = oneshot::channel();
let unreg_name = name.clone();
tokio::spawn(async move {
let _ = drop_rx.await;
log::trace!("Unregistering profile at {}", &unreg_name);
let _: std::result::Result<(), dbus::Error> =
proxy.method_call(MANAGER_INTERFACE, "UnregisterProfile", (unreg_name.clone(),)).await;
log::trace!("Unpublishing profile at {}", &unreg_name);
let mut cr = inner.crossroads.lock().await;
let _: Option<Self> = cr.remove(&unreg_name);
});
Ok(ProfileHandle { name, req_rx: ReceiverStream::new(req_rx), _drop_tx: drop_tx })
}
}
#[cfg_attr(docsrs, doc(cfg(all(feature = "rfcomm", feature = "bluetoothd"))))]
#[pin_project(PinnedDrop)]
pub struct ProfileHandle {
name: dbus::Path<'static>,
#[pin]
req_rx: ReceiverStream<ConnectRequest>,
_drop_tx: oneshot::Sender<()>,
}
impl futures::stream::Stream for ProfileHandle {
type Item = ConnectRequest;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.project().req_rx.poll_next(cx)
}
}
#[pinned_drop]
impl PinnedDrop for ProfileHandle {
fn drop(self: Pin<&mut Self>) {
}
}
impl fmt::Debug for ProfileHandle {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "ProfileHandle {{ {} }}", &self.name)
}
}