use atspi_common::{object_ref::ObjectRefOwned, AtspiError};
use atspi_proxies::{
accessible::{AccessibleProxy, ObjectRefExt},
application::{self, ApplicationProxy},
proxy_ext::ProxyExt,
registry::RegistryProxy,
};
use futures_lite::stream::StreamExt;
use std::sync::{Arc, Mutex};
use zbus::{
conn::Builder,
fdo::DBusProxy,
names::{
BusName, OwnedBusName, OwnedUniqueName, OwnedWellKnownName, UniqueName, WellKnownName,
},
proxy::{CacheProperties, Defaults},
zvariant::ObjectPath,
Address,
};
#[cfg(feature = "tracing")]
use tracing::{debug, info, warn};
use crate::AtspiResult;
#[derive(Clone, Debug)]
pub struct Peer {
unique_name: OwnedUniqueName,
well_known_name: Option<OwnedWellKnownName>,
socket_address: Address,
p2p_connection: zbus::Connection,
}
impl Peer {
pub(crate) async fn try_new<B, S>(
bus_name: B,
socket: S,
conn: &zbus::Connection,
) -> Result<Self, AtspiError>
where
B: Into<OwnedBusName>,
S: TryInto<Address>,
{
let dbus_proxy = DBusProxy::new(conn).await?;
let owned_bus_name: OwnedBusName = bus_name.into();
let socket_address = socket
.try_into()
.map_err(|_| AtspiError::ParseError("Invalid address string"))?;
let well_known_names: Vec<OwnedWellKnownName> = dbus_proxy
.list_names()
.await?
.into_iter()
.filter_map(|name| {
if let BusName::WellKnown(well_nown_name) = name.clone().inner() {
Some(OwnedWellKnownName::from(well_nown_name.clone()))
} else {
None
}
})
.collect();
let mut unique_to_well_known: Vec<(OwnedUniqueName, OwnedWellKnownName)> = Vec::new();
for well_known_name in &well_known_names {
let bus_name = BusName::from(well_known_name.clone());
if let Ok(unique_name) = dbus_proxy.get_name_owner(bus_name).await {
unique_to_well_known.push((unique_name, well_known_name.clone()));
}
}
let (unique_name, well_known_name) = match owned_bus_name.inner() {
BusName::Unique(name) => {
let owned_well_known_name = unique_to_well_known.iter().find_map(|(u, w)| {
if u == name {
Some(w.clone())
} else {
None
}
});
let owned_unique_name = OwnedUniqueName::from(name.clone());
(owned_unique_name, owned_well_known_name)
}
BusName::WellKnown(well_known_name) => {
let bus_name = BusName::from(well_known_name.clone());
let owned_unique_name = dbus_proxy.get_name_owner(bus_name).await?;
let owned_well_known_name = OwnedWellKnownName::from(well_known_name.clone());
(owned_unique_name, Some(owned_well_known_name))
}
};
let p2p_connection = Builder::address(socket_address.clone())?.p2p().build().await?;
Ok(Peer { unique_name, well_known_name, socket_address, p2p_connection })
}
#[must_use]
pub fn unique_name(&self) -> &OwnedUniqueName {
&self.unique_name
}
#[must_use]
pub fn well_known_name(&self) -> Option<&OwnedWellKnownName> {
self.well_known_name.as_ref()
}
#[must_use]
pub fn socket_address(&self) -> &Address {
&self.socket_address
}
pub fn connection(&self) -> &zbus::Connection {
&self.p2p_connection
}
pub async fn try_from_bus_name(
bus_name: BusName<'_>,
conn: &zbus::Connection,
) -> AtspiResult<Self> {
let application_proxy = ApplicationProxy::builder(conn)
.destination(&bus_name)?
.cache_properties(CacheProperties::No)
.build()
.await?;
let socket_path = application_proxy.get_application_bus_address().await?;
Self::try_new(bus_name, socket_path.as_str(), conn).await
}
pub async fn proxies(
&'_ self,
path: &ObjectPath<'_>,
) -> AtspiResult<atspi_proxies::proxy_ext::Proxies<'_>> {
let accessible_proxy = AccessibleProxy::builder(&self.p2p_connection)
.path(path.to_owned())?
.cache_properties(CacheProperties::No)
.build()
.await?;
accessible_proxy.proxies().await
}
pub async fn as_root_accessible_proxy(&self) -> AtspiResult<AccessibleProxy<'_>> {
AccessibleProxy::builder(&self.p2p_connection)
.cache_properties(CacheProperties::No)
.build()
.await
.map_err(AtspiError::from)
}
pub async fn as_accessible_proxy(
&self,
obj: &ObjectRefOwned,
) -> AtspiResult<AccessibleProxy<'_>> {
let path = obj.path();
AccessibleProxy::builder(&self.p2p_connection)
.path(path)?
.cache_properties(CacheProperties::No)
.build()
.await
.map_err(AtspiError::from)
}
}
pub(crate) trait BusNameExt {
async fn get_p2p_address(&self, conn: &zbus::Connection) -> AtspiResult<Address>;
}
impl BusNameExt for BusName<'_> {
async fn get_p2p_address(&self, conn: &zbus::Connection) -> AtspiResult<Address> {
let application_proxy = application::ApplicationProxy::builder(conn)
.destination(self)?
.cache_properties(CacheProperties::No)
.build()
.await?;
application_proxy
.get_application_bus_address()
.await
.map_err(|e| {
AtspiError::Owned(format!(
"Failed to get application bus address for {}: {e}",
&self
))
})
.and_then(|address| {
Address::try_from(address.as_str())
.map_err(|_| AtspiError::ParseError("Invalid address string"))
})
}
}
#[derive(Clone, Debug)]
pub(crate) struct Peers {
peers: Arc<Mutex<Vec<Peer>>>,
}
impl Peers {
pub(crate) async fn initialize_peers(conn: &zbus::Connection) -> AtspiResult<Self> {
let registry_well_known_name = RegistryProxy::DESTINATION
.as_ref()
.expect("RegistryProxy `default_destination` is not set");
let reg_accessible = AccessibleProxy::builder(conn)
.destination(registry_well_known_name)?
.cache_properties(CacheProperties::No)
.build()
.await?;
let accessible_applications = reg_accessible.get_children().await?;
let mut peers = Vec::with_capacity(accessible_applications.len());
for app in accessible_applications {
let accessible_proxy = app.as_accessible_proxy(conn).await?;
let proxies = accessible_proxy.proxies().await?;
let application_proxy = proxies.application().await?;
if let Ok(address) = application_proxy.get_application_bus_address().await {
let name = app.name().ok_or(AtspiError::MissingName)?;
let bus_name = BusName::from(name.clone());
match Peer::try_new(bus_name, address.as_str(), conn).await {
Ok(peer) => peers.push(peer),
#[cfg(feature = "tracing")]
Err(e) => {
tracing::warn!("Failed to create peer for {:?}: {}", app.name_as_str(), e);
}
#[cfg(all(debug_assertions, not(feature = "tracing")))]
Err(e) => {
eprintln!("Failed to create peer for {:?}: {}", app.name_as_str(), e);
}
#[cfg(not(any(feature = "tracing", debug_assertions)))]
Err(_) => {
}
}
}
}
Ok(Peers { peers: Arc::new(Mutex::new(peers)) })
}
fn get_peer(&self, bus_name: &BusName<'_>) -> Option<Peer> {
let peers = self.peers.lock().expect("already locked by current thread");
let matched = match bus_name {
BusName::Unique(unique_name) => {
peers.iter().find(|peer| peer.unique_name() == unique_name)
}
BusName::WellKnown(well_known_name) => {
let owned_well_known_name = OwnedWellKnownName::from(well_known_name.clone());
peers
.iter()
.find(|peer| peer.well_known_name() == Some(&owned_well_known_name))
}
};
matched.cloned()
}
fn inner(&self) -> Arc<Mutex<Vec<Peer>>> {
Arc::clone(&self.peers)
}
async fn insert_unique(
&self,
unique_name: &zbus::names::UniqueName<'_>,
conn: &zbus::Connection,
) -> AtspiResult<()> {
let bus_name = BusName::Unique(unique_name.as_ref());
let address = bus_name.get_p2p_address(conn).await?;
let p2p_connection = Builder::address(address.clone())?.p2p().build().await?;
let unique_name = OwnedUniqueName::from(unique_name.clone());
let peer =
Peer { unique_name, well_known_name: None, socket_address: address, p2p_connection };
let mut guard = self.peers.lock().expect("lock already held by current thread");
guard.push(peer);
Ok(())
}
fn remove_unique(&self, unique_name: &zbus::names::UniqueName<'_>) {
let mut peers = self.peers.lock().expect("lock already held by current thread");
peers.retain(|peer| peer.unique_name() != unique_name);
}
async fn insert_well_known(
&self,
well_known_name: &WellKnownName<'_>,
name_owner: &UniqueName<'_>,
conn: &zbus::Connection,
) -> AtspiResult<()> {
let bus_name = BusName::WellKnown(well_known_name.clone());
let address = bus_name.get_p2p_address(conn).await?;
let p2p_connection = Builder::address(address.clone())?.p2p().build().await?;
let well_known_name = OwnedWellKnownName::from(well_known_name.clone());
let unique_name = OwnedUniqueName::from(name_owner.clone());
let peer = Peer {
unique_name,
well_known_name: Some(well_known_name),
socket_address: address,
p2p_connection,
};
let mut guard = self.peers.lock().expect("lock already held by current thread");
guard.push(peer);
Ok(())
}
fn remove_well_known(&self, well_known_name: &WellKnownName<'_>, name_owner: &UniqueName<'_>) {
let mut peers = self.peers.lock().expect("lock already held by current thread");
let owned_well_known_name = OwnedWellKnownName::from(well_known_name.clone());
peers.retain(|peer| {
(peer.well_known_name() != Some(&owned_well_known_name))
&& peer.unique_name() == name_owner
});
}
async fn update_well_known_owner(
&self,
well_known_name: &WellKnownName<'_>,
old_name_owner: &UniqueName<'_>,
new_name_owner: &UniqueName<'_>,
conn: &zbus::Connection,
) -> AtspiResult<()> {
let socket_address = BusName::from(new_name_owner.clone()).get_p2p_address(conn).await?;
let p2p_connection = Builder::address(socket_address.clone())?.p2p().build().await?;
let well_known_name = Some(OwnedWellKnownName::from(well_known_name.clone()));
let old_name_owner = OwnedUniqueName::from(old_name_owner.clone());
let unique_name = OwnedUniqueName::from(new_name_owner.clone());
let peer = Peer {
unique_name,
well_known_name: well_known_name.clone(),
socket_address,
p2p_connection,
};
let mut peers = self.peers.lock().expect("lock already held by current thread");
if let Some(existing_peer) = peers.iter_mut().find(|p| {
p.well_known_name() == well_known_name.as_ref() && p.unique_name() == &old_name_owner
}) {
*existing_peer = peer;
} else {
return Err(AtspiError::Owned(format!(
"Owner swap failed: well-known name {well_known_name:?} with owner: {old_name_owner} not found"
)));
}
Ok(())
}
pub(crate) fn spawn_peer_listener_task(&self, conn: &zbus::Connection) {
let peers = self.clone();
let conn = conn.clone();
let dbus_proxy = futures_lite::future::block_on(DBusProxy::new(&conn))
.expect("Failed to create DBusProxy");
let executor = conn.executor().clone();
executor.spawn(async move {
let Ok(mut name_owner_changed_stream) = dbus_proxy.receive_name_owner_changed().await.inspect_err(|#[allow(unused_variables)] err| {
#[cfg(feature = "tracing")]
debug!("Failed to receive `NameOwnerChanged` stream: {err}");
}) else {
return;
};
while let Some(name_owner_event) = name_owner_changed_stream.next().await {
let Ok(args) = name_owner_event.args() else {
#[cfg(feature = "tracing")]
tracing::debug!("Received name owner changed event without args, skipping.");
continue;
};
let name = args.name().clone();
let new = args.new_owner().clone();
let old = args.old_owner().clone();
match name {
BusName::Unique(unique_name) => {
match (&*old, &*new) {
(None, Some(new_owner)) => {
debug_assert_eq!(new_owner, &unique_name, "When a name appears on the bus, the new owner must be the unique name itself.");
if let Ok(()) = peers.insert_unique(&unique_name, &conn).await.inspect_err(|#[allow(unused_variables)] err| {
#[cfg(feature = "tracing")]
warn!("Failed to insert unique name: {unique_name}: {err}");
}) {
#[cfg(feature = "tracing")]
info!("Inserted unique name: {unique_name} into the peer list.");
};
}
(Some(old), None) => {
debug_assert!(old == &unique_name, "When a unique name is removed from the bus, the old owner must be the unique name itself.");
peers.remove_unique(&unique_name);
#[cfg(feature = "tracing")]
info!("Peer with unique name: {unique_name} left the bus - removed from peer list.");
}
(_, _) => {
#[cfg(feature = "tracing")]
debug!("NameOwnerChanged` with unique name: {unique_name} has unknown argument combination ({old:?}, {new:?}).");
}
}
}
BusName::WellKnown(well_known_name) => {
match (&*old, &*new) {
(None, None) => {
#[cfg(feature = "tracing")]
debug!("Received `NameOwnerChanged` event with no old or new owner for well-known name: {}", well_known_name);
}
(None, Some(new_owner_unique_name)) => {
if let Ok(()) = peers.insert_well_known(
&well_known_name,
new_owner_unique_name,
&conn,
).await.inspect_err(|#[allow(unused_variables)] err| {
#[cfg(feature = "tracing")]
warn!("Failed to insert well-known name: {} with owner: {} - {}", &well_known_name, &new_owner_unique_name, err);
}) {
#[cfg(feature = "tracing")]
info!("Well-known name: {} with owner: {} inserted into the peer list.", &well_known_name, &new_owner_unique_name);
}
}
(Some(old_owner_unique_name), None) => {
peers.remove_well_known(
&well_known_name,
old_owner_unique_name,
);
#[cfg(feature = "tracing")]
info!(
"Well-known name: {} with owner: {} removed from the peer list.",
&well_known_name,
&old_owner_unique_name
);
},
(Some(old_owner_unique_name), Some(new_owner_unique_name)) => {
if let Ok(()) = peers.update_well_known_owner(&well_known_name, old_owner_unique_name, new_owner_unique_name, &conn).await.inspect_err(|#[allow(unused_variables)] err| {
#[cfg(feature = "tracing")]
warn!("Failed to update well-known name: {} owner from: {} to: {} - {}", &well_known_name, &old_owner_unique_name, &new_owner_unique_name, err);
}) {
#[cfg(feature = "tracing")]
info!("Well-known name: {} updated owner from: {} to: {}", &well_known_name, &old_owner_unique_name, &new_owner_unique_name);
};
}
}
}
} }
#[cfg(feature = "tracing")]
tracing::warn!("Peer listener task stopped, clearing peers list.");
peers.clear();
}, "PeerListenerTask")
.detach();
}
fn clear(&self) {
let mut peers = self.peers.lock().expect("lock already held by current thread");
peers.clear();
}
}
pub trait P2P {
fn object_as_accessible(
&'_ self,
obj: &ObjectRefOwned,
) -> impl std::future::Future<Output = AtspiResult<AccessibleProxy<'_>>>;
fn bus_name_as_root_accessible(
&'_ self,
name: &BusName,
) -> impl std::future::Future<Output = AtspiResult<AccessibleProxy<'_>>>;
fn peers(&self) -> Arc<Mutex<Vec<Peer>>>;
fn get_peer(&self, bus_name: &BusName<'_>) -> Option<Peer>;
}
impl P2P for crate::AccessibilityConnection {
async fn object_as_accessible(&self, obj: &ObjectRefOwned) -> AtspiResult<AccessibleProxy<'_>> {
if obj.is_null() {
return Err(AtspiError::NullRef(
"`p2p::object_as_accessible` called with null-reference ObjectRef",
));
}
let name = obj.name().ok_or(AtspiError::MissingName)?.to_owned();
let name = OwnedUniqueName::from(name);
let path = obj.path();
let lookup = self
.peers
.peers
.lock()
.expect("lock already held by current thread")
.iter()
.find(|peer| &name == peer.unique_name())
.cloned();
if let Some(peer) = lookup {
AccessibleProxy::builder(peer.connection())
.path(path)?
.cache_properties(CacheProperties::No)
.build()
.await
.map_err(Into::into)
} else {
let conn = self.connection();
AccessibleProxy::builder(conn)
.path(path)?
.cache_properties(CacheProperties::No)
.build()
.await
.map_err(Into::into)
}
}
async fn bus_name_as_root_accessible(
&'_ self,
name: &BusName<'_>,
) -> AtspiResult<AccessibleProxy<'_>> {
let lookup = self
.peers
.peers
.lock()
.expect("lock already held by current thread")
.iter()
.find(|peer| {
match name {
BusName::Unique(unique_name) => peer.unique_name() == unique_name,
BusName::WellKnown(well_known_name) => {
peer.well_known_name().is_some_and(|w| w == well_known_name)
}
}
})
.cloned();
if let Some(peer) = lookup {
AccessibleProxy::builder(peer.connection())
.cache_properties(CacheProperties::No)
.build()
.await
.map_err(Into::into)
} else {
let conn = self.connection();
AccessibleProxy::builder(conn)
.cache_properties(CacheProperties::No)
.build()
.await
.map_err(Into::into)
}
}
fn peers(&self) -> Arc<Mutex<Vec<Peer>>> {
self.peers.inner()
}
fn get_peer(&self, bus_name: &BusName<'_>) -> Option<Peer> {
self.peers.get_peer(bus_name)
}
}