use std::{collections::HashMap, marker::PhantomData, sync::Arc};
use tracing::{debug, instrument, trace, trace_span, Instrument};
use zbus_names::InterfaceName;
use zvariant::{ObjectPath, Value};
use crate::{
async_lock::RwLock,
connection::WeakConnection,
fdo,
fdo::ObjectManager,
message::{Header, Message},
Connection, Error, Result,
};
mod interface;
pub(crate) use interface::ArcInterface;
pub use interface::{DispatchResult, Interface, InterfaceDeref, InterfaceDerefMut, InterfaceRef};
mod signal_emitter;
pub use signal_emitter::SignalEmitter;
#[deprecated(since = "5.0.0", note = "Please use `SignalEmitter` instead.")]
pub type SignalContext<'s> = SignalEmitter<'s>;
mod dispatch_notifier;
pub use dispatch_notifier::ResponseDispatchNotifier;
mod node;
pub(crate) use node::Node;
#[derive(Debug, Clone)]
pub struct ObjectServer {
conn: WeakConnection,
root: Arc<RwLock<Node>>,
}
impl ObjectServer {
pub(crate) fn new(conn: &Connection) -> Self {
Self {
conn: conn.into(),
root: Arc::new(RwLock::new(Node::new(
"/".try_into().expect("zvariant bug"),
))),
}
}
pub(crate) fn root(&self) -> &RwLock<Node> {
&self.root
}
pub async fn at<'p, P, I>(&self, path: P, iface: I) -> Result<bool>
where
I: Interface,
P: TryInto<ObjectPath<'p>>,
P::Error: Into<Error>,
{
self.add_arc_interface(path, I::name(), ArcInterface::new(iface))
.await
}
pub(crate) async fn add_arc_interface<'p, P>(
&self,
path: P,
name: InterfaceName<'static>,
arc_iface: ArcInterface,
) -> Result<bool>
where
P: TryInto<ObjectPath<'p>>,
P::Error: Into<Error>,
{
let path = path.try_into().map_err(Into::into)?;
let mut root = self.root().write().await;
let (node, manager_path) = root.get_child_mut(&path, true);
let node = node.unwrap();
let added = node.add_arc_interface(name.clone(), arc_iface);
if added {
if name == ObjectManager::name() {
let emitter = SignalEmitter::new(&self.connection(), path)?;
let objects = node.get_managed_objects(self, &self.connection()).await?;
for (path, owned_interfaces) in objects {
let interfaces = owned_interfaces
.iter()
.map(|(i, props)| {
let props = props
.iter()
.map(|(k, v)| Ok((k.as_str(), Value::try_from(v)?)))
.collect::<Result<_>>();
Ok((i.into(), props?))
})
.collect::<Result<_>>()?;
ObjectManager::interfaces_added(&emitter, path.into(), interfaces).await?;
}
} else if let Some(manager_path) = manager_path {
let emitter = SignalEmitter::new(&self.connection(), manager_path.clone())?;
let mut interfaces = HashMap::new();
let owned_props = node
.get_properties(self, &self.connection(), name.clone())
.await?;
let props = owned_props
.iter()
.map(|(k, v)| Ok((k.as_str(), Value::try_from(v)?)))
.collect::<Result<_>>()?;
interfaces.insert(name, props);
ObjectManager::interfaces_added(&emitter, path, interfaces).await?;
}
}
Ok(added)
}
pub async fn remove<'p, I, P>(&self, path: P) -> Result<bool>
where
I: Interface,
P: TryInto<ObjectPath<'p>>,
P::Error: Into<Error>,
{
let path = path.try_into().map_err(Into::into)?;
let mut root = self.root.write().await;
let (node, manager_path) = root.get_child_mut(&path, false);
let node = node.ok_or(Error::InterfaceNotFound)?;
if !node.remove_interface(I::name()) {
return Err(Error::InterfaceNotFound);
}
if let Some(manager_path) = manager_path {
let ctxt = SignalEmitter::new(&self.connection(), manager_path.clone())?;
ObjectManager::interfaces_removed(&ctxt, path.clone(), (&[I::name()]).into()).await?;
}
if node.is_empty() {
let mut path_parts = path.rsplit('/').filter(|i| !i.is_empty());
let last_part = path_parts.next().unwrap();
let ppath = ObjectPath::from_string_unchecked(
path_parts.fold(String::new(), |a, p| format!("/{p}{a}")),
);
root.get_child_mut(&ppath, false)
.0
.unwrap()
.remove_node(last_part);
return Ok(true);
}
Ok(false)
}
pub async fn interface<'p, P, I>(&self, path: P) -> Result<InterfaceRef<I>>
where
I: Interface,
P: TryInto<ObjectPath<'p>>,
P::Error: Into<Error>,
{
let path = path.try_into().map_err(Into::into)?;
let root = self.root().read().await;
let node = root.get_child(&path).ok_or(Error::InterfaceNotFound)?;
let lock = node
.interface_lock(I::name())
.ok_or(Error::InterfaceNotFound)?
.instance
.clone();
lock.read()
.await
.downcast_ref::<I>()
.ok_or(Error::InterfaceNotFound)?;
let conn = self.connection();
let emitter = SignalEmitter::new(&conn, path).unwrap().into_owned();
Ok(InterfaceRef {
emitter,
lock,
phantom: PhantomData,
})
}
async fn dispatch_call_to_iface(
&self,
iface: Arc<RwLock<dyn Interface>>,
connection: &Connection,
msg: &Message,
hdr: &Header<'_>,
) -> fdo::Result<()> {
let member = hdr
.member()
.ok_or_else(|| fdo::Error::Failed("Missing member".into()))?;
let iface_name = hdr
.interface()
.ok_or_else(|| fdo::Error::Failed("Missing interface".into()))?;
trace!("acquiring read lock on interface `{}`", iface_name);
let read_lock = iface.read().await;
trace!("acquired read lock on interface `{}`", iface_name);
match read_lock.call(self, connection, msg, member.as_ref()) {
DispatchResult::NotFound => {
return Err(fdo::Error::UnknownMethod(format!(
"Unknown method '{member}'"
)));
}
DispatchResult::Async(f) => {
return f.await.map_err(|e| match e {
Error::FDO(e) => *e,
e => fdo::Error::Failed(format!("{e}")),
});
}
DispatchResult::RequiresMut => {}
}
drop(read_lock);
trace!("acquiring write lock on interface `{}`", iface_name);
let mut write_lock = iface.write().await;
trace!("acquired write lock on interface `{}`", iface_name);
match write_lock.call_mut(self, connection, msg, member.as_ref()) {
DispatchResult::NotFound => {}
DispatchResult::RequiresMut => {}
DispatchResult::Async(f) => {
return f.await.map_err(|e| match e {
Error::FDO(e) => *e,
e => fdo::Error::Failed(format!("{e}")),
});
}
}
drop(write_lock);
Err(fdo::Error::UnknownMethod(format!(
"Unknown method '{member}'"
)))
}
async fn dispatch_method_call_try(
&self,
connection: &Connection,
msg: &Message,
hdr: &Header<'_>,
) -> fdo::Result<()> {
let path = hdr
.path()
.ok_or_else(|| fdo::Error::Failed("Missing object path".into()))?;
let iface_name = hdr
.interface()
.ok_or_else(|| fdo::Error::Failed("Missing interface".into()))?;
let _ = hdr
.member()
.ok_or_else(|| fdo::Error::Failed("Missing member".into()))?;
let (iface, with_spawn) = {
let root = self.root.read().await;
let node = root
.get_child(path)
.ok_or_else(|| fdo::Error::UnknownObject(format!("Unknown object '{path}'")))?;
let iface = node.interface_lock(iface_name.as_ref()).ok_or_else(|| {
fdo::Error::UnknownInterface(format!("Unknown interface '{iface_name}'"))
})?;
(iface.instance, iface.spawn_tasks_for_methods)
};
if with_spawn {
let executor = connection.executor().clone();
let task_name = format!("`{msg}` method dispatcher");
let connection = connection.clone();
let msg = msg.clone();
executor
.spawn(
async move {
let server = connection.object_server();
let hdr = msg.header();
if let Err(e) = server
.dispatch_call_to_iface(iface, &connection, &msg, &hdr)
.await
{
debug!("Returning error: {}", e);
if let Err(e) = connection.reply_dbus_error(&hdr, e).await {
debug!(
"Error dispatching message. Message: {:?}, error: {:?}",
msg, e
);
}
}
}
.instrument(trace_span!("{}", task_name)),
&task_name,
)
.detach();
Ok(())
} else {
self.dispatch_call_to_iface(iface, connection, msg, hdr)
.await
}
}
#[instrument(skip(self))]
pub(crate) async fn dispatch_call(&self, msg: &Message, hdr: &Header<'_>) -> Result<()> {
let conn = self.connection();
if let Err(e) = self.dispatch_method_call_try(&conn, msg, hdr).await {
debug!("Returning error: {}", e);
conn.reply_dbus_error(hdr, e).await?;
}
trace!("Handled: {}", msg);
Ok(())
}
pub(crate) fn connection(&self) -> Connection {
self.conn
.upgrade()
.expect("ObjectServer can't exist w/o an associated Connection")
}
}
#[cfg(feature = "blocking-api")]
impl From<crate::blocking::ObjectServer> for ObjectServer {
fn from(server: crate::blocking::ObjectServer) -> Self {
server.into_inner()
}
}