atspi_connection/
p2p.rs

1//! Extends the `AccessibilityConnection` with P2P capabilities.
2//!
3//! # Considerations on using executors and P2P
4//!
5//! Every connection has a zbus `Executor` instance, on which tasks can be launched. Internally, zbus uses this executor for all sorts of tasks: listening for D-Bus signals,
6//! keeping property caches up to date, handling timeout errors, etc.
7//!
8//! When zbus users use `tokio` (zbus feature "tokio" set), zbus will latch onto the tokio runtime.
9//! The `Executor` instance will be empty and all zbus tasks are run on the user's tokio runtime.
10//! However, when using any other executor (smol, glommio, etc.), each `Connection` will spin up a thread with an `async_executor::Executor`.
11//!
12//! Typically an application will have a single connection, but with P2P, your application will have a connection with each application that supports it.
13//! Consequently, on anything but tokio, applications will get an extra thread with an `async_executor` for each connection!
14//! (So picking smol won't necessarily make your application small in the context of P2P.)
15
16use atspi_common::{object_ref::ObjectRefOwned, AtspiError};
17use atspi_proxies::{
18	accessible::{AccessibleProxy, ObjectRefExt},
19	application::{self, ApplicationProxy},
20	proxy_ext::ProxyExt,
21	registry::RegistryProxy,
22};
23use futures_lite::stream::StreamExt;
24use std::sync::{Arc, Mutex};
25use zbus::{
26	conn::Builder,
27	fdo::DBusProxy,
28	names::{
29		BusName, OwnedBusName, OwnedUniqueName, OwnedWellKnownName, UniqueName, WellKnownName,
30	},
31	proxy::{CacheProperties, Defaults},
32	zvariant::ObjectPath,
33	Address,
34};
35
36#[cfg(feature = "tracing")]
37use tracing::{debug, info, warn};
38
39use crate::AtspiResult;
40
41/// Represents a peer with the name, path and connection for the P2P peer.
42#[derive(Clone, Debug)]
43pub struct Peer {
44	unique_name: OwnedUniqueName,
45	well_known_name: Option<OwnedWellKnownName>,
46	socket_address: Address,
47	p2p_connection: zbus::Connection,
48}
49
50impl Peer {
51	/// Creates a new `Peer` with the given bus name and socket path.
52	///
53	/// # Note
54	/// This function is intended for use in building the initial list of peers.
55	///
56	/// If given a `UniqueName`, it will check if the peer also owns a well-known name.
57	/// If given a `WellKnownName`, it will query the D-Bus for the unique name of the peer.
58	///
59	/// # Errors
60	/// - `DBusProxy` cannot be created.
61	/// - The socket address cannot be parsed.
62	///
63	pub(crate) async fn try_new<B, S>(
64		bus_name: B,
65		socket: S,
66		conn: &zbus::Connection,
67	) -> Result<Self, AtspiError>
68	where
69		B: Into<OwnedBusName>,
70		S: TryInto<Address>,
71	{
72		let dbus_proxy = DBusProxy::new(conn).await?;
73		let owned_bus_name: OwnedBusName = bus_name.into();
74
75		let socket_address = socket
76			.try_into()
77			.map_err(|_| AtspiError::ParseError("Invalid address string"))?;
78
79		// Because D-Bus does not let us query whether a unique name is the owner of a well-known name,
80		// we need to query all well-known names and their owners, and then check if the unique name is one of them.
81
82		// Get all well-known names from D-Bus
83		let well_known_names: Vec<OwnedWellKnownName> = dbus_proxy
84			.list_names()
85			.await?
86			.into_iter()
87			.filter_map(|name| {
88				if let BusName::WellKnown(well_nown_name) = name.clone().inner() {
89					Some(OwnedWellKnownName::from(well_nown_name.clone()))
90				} else {
91					None
92				}
93			})
94			.collect();
95
96		// We are creating a mapping of unique names to well-known names.
97		let mut unique_to_well_known: Vec<(OwnedUniqueName, OwnedWellKnownName)> = Vec::new();
98
99		// For each well-known name, we get the unique name of the owner.
100		// Note: not all well-known names on the bus will have an accessible owner.
101		// For instance, the `org.freedesktop.DBus` well-known name does not have an accessible connection.
102		for well_known_name in &well_known_names {
103			let bus_name = BusName::from(well_known_name.clone());
104			if let Ok(unique_name) = dbus_proxy.get_name_owner(bus_name).await {
105				unique_to_well_known.push((unique_name, well_known_name.clone()));
106			}
107		}
108
109		// Now we have a mapping of unique names to well-known names.
110		//
111		// A `Peer` instance requires a unique name which may or may not have a well-known name.
112		// We can build a `Peer` instance from either a unique name or a well-known name.
113		// If the argument name is a unique name, we look up whether this peer also owns a well-known name in our mapping.
114		// If the argument name is a well-known name, we _must_ get its unique owner too to create a `Peer`.
115		let (unique_name, well_known_name) = match owned_bus_name.inner() {
116			BusName::Unique(name) => {
117				// The argument name is the mandatory `UniqueName`, we do want check whether this peer also owns a well-known name.
118				let owned_well_known_name = unique_to_well_known.iter().find_map(|(u, w)| {
119					if u == name {
120						Some(w.clone())
121					} else {
122						None
123					}
124				});
125				let owned_unique_name = OwnedUniqueName::from(name.clone());
126				(owned_unique_name, owned_well_known_name)
127			}
128			BusName::WellKnown(well_known_name) => {
129				let bus_name = BusName::from(well_known_name.clone());
130				let owned_unique_name = dbus_proxy.get_name_owner(bus_name).await?;
131				let owned_well_known_name = OwnedWellKnownName::from(well_known_name.clone());
132				(owned_unique_name, Some(owned_well_known_name))
133			}
134		};
135
136		let p2p_connection = Builder::address(socket_address.clone())?.p2p().build().await?;
137
138		Ok(Peer { unique_name, well_known_name, socket_address, p2p_connection })
139	}
140
141	/// Returns the bus name of the peer.
142	#[must_use]
143	pub fn unique_name(&self) -> &OwnedUniqueName {
144		&self.unique_name
145	}
146
147	/// Returns the well-known bus name of the peer, if it has one.
148	#[must_use]
149	pub fn well_known_name(&self) -> Option<&OwnedWellKnownName> {
150		self.well_known_name.as_ref()
151	}
152
153	/// Returns the socket [`Address`] of the peer.
154	#[must_use]
155	pub fn socket_address(&self) -> &Address {
156		&self.socket_address
157	}
158
159	/// Returns the p2p [`Connection`][zbus::Connection] of the peer.
160	pub fn connection(&self) -> &zbus::Connection {
161		&self.p2p_connection
162	}
163
164	/// Try to create a new `Peer` from a bus name.
165	///
166	/// # Errors
167	/// Returns an error if the application proxy cannot be created or if it does not support `get_application_bus_address`.\
168	/// A non-existent bus name will also return an error.
169	pub async fn try_from_bus_name(
170		bus_name: BusName<'_>,
171		conn: &zbus::Connection,
172	) -> AtspiResult<Self> {
173		// Get the application proxy for the bus name
174		let application_proxy = ApplicationProxy::builder(conn)
175			.destination(&bus_name)?
176			.cache_properties(CacheProperties::No)
177			.build()
178			.await?;
179
180		let socket_path = application_proxy.get_application_bus_address().await?;
181		Self::try_new(bus_name, socket_path.as_str(), conn).await
182	}
183
184	/// Returns a [`Proxies`][atspi_proxies::proxy_ext::Proxies] object for the given object path.\
185	/// A `Proxies` object is used to obtain any of the proxies the object supports.
186	///
187	/// # Errors
188	/// On invalid object path.
189	pub async fn proxies(
190		&'_ self,
191		path: &ObjectPath<'_>,
192	) -> AtspiResult<atspi_proxies::proxy_ext::Proxies<'_>> {
193		let accessible_proxy = AccessibleProxy::builder(&self.p2p_connection)
194			.path(path.to_owned())?
195			.cache_properties(CacheProperties::No)
196			.build()
197			.await?;
198
199		accessible_proxy.proxies().await
200	}
201
202	/// Returns an `AccessibleProxy` for the root accessible object of the peer.
203	///
204	/// # Errors
205	/// In case of an invalid connection.
206	pub async fn as_root_accessible_proxy(&self) -> AtspiResult<AccessibleProxy<'_>> {
207		AccessibleProxy::builder(&self.p2p_connection)
208			.cache_properties(CacheProperties::No)
209			.build()
210			.await
211			.map_err(AtspiError::from)
212	}
213
214	/// Returns an [`AccessibleProxy`] for the accessible object of the peer.
215	///
216	/// # Errors
217	/// In case of an invalid connection or object path.
218	pub async fn as_accessible_proxy(
219		&self,
220		obj: &ObjectRefOwned,
221	) -> AtspiResult<AccessibleProxy<'_>> {
222		let path = obj.path();
223
224		AccessibleProxy::builder(&self.p2p_connection)
225			.path(path)?
226			.cache_properties(CacheProperties::No)
227			.build()
228			.await
229			.map_err(AtspiError::from)
230	}
231}
232
233// A trait is needed to extend functionality on `BusName` for P2P address lookup.
234pub(crate) trait BusNameExt {
235	/// Looks up a `BusName`'s P2P address, if available.
236	async fn get_p2p_address(&self, conn: &zbus::Connection) -> AtspiResult<Address>;
237}
238
239impl BusNameExt for BusName<'_> {
240	async fn get_p2p_address(&self, conn: &zbus::Connection) -> AtspiResult<Address> {
241		let application_proxy = application::ApplicationProxy::builder(conn)
242			.destination(self)?
243			.cache_properties(CacheProperties::No)
244			.build()
245			.await?;
246
247		application_proxy
248			.get_application_bus_address()
249			.await
250			.map_err(|e| {
251				AtspiError::Owned(format!(
252					"Failed to get application bus address for {}: {e}",
253					&self
254				))
255			})
256			.and_then(|address| {
257				Address::try_from(address.as_str())
258					.map_err(|_| AtspiError::ParseError("Invalid address string"))
259			})
260	}
261}
262
263#[derive(Clone, Debug)]
264pub(crate) struct Peers {
265	peers: Arc<Mutex<Vec<Peer>>>,
266}
267
268impl Peers {
269	/// Returns a `Peers` containing the initial peers that support P2P connections.
270	///
271	/// # Note
272	/// Intended for internal use with `AccessibilityConnection::new()`.
273	///
274	/// # Errors
275	/// This function can return an error in the following cases:
276	/// - the `AccessibleProxy` to the registry cannot be created.
277	/// - the registry returns an error when querying for children.
278	/// - for any child, the `AccessibleProxy` cannot be created or the `ApplicationProxy` cannot be created.
279	pub(crate) async fn initialize_peers(conn: &zbus::Connection) -> AtspiResult<Self> {
280		let registry_well_known_name = RegistryProxy::DESTINATION
281			.as_ref()
282			.expect("RegistryProxy `default_destination` is not set");
283		let reg_accessible = AccessibleProxy::builder(conn)
284			.destination(registry_well_known_name)?
285			.cache_properties(CacheProperties::No)
286			.build()
287			.await?;
288
289		let accessible_applications = reg_accessible.get_children().await?;
290		let mut peers = Vec::with_capacity(accessible_applications.len());
291
292		for app in accessible_applications {
293			let accessible_proxy = app.as_accessible_proxy(conn).await?;
294			let proxies = accessible_proxy.proxies().await?;
295			let application_proxy = proxies.application().await?;
296
297			// Get the application bus address
298			// aka: Does the application support P2P connections?
299			if let Ok(address) = application_proxy.get_application_bus_address().await {
300				let name = app.name().ok_or(AtspiError::MissingName)?;
301				let bus_name = BusName::from(name.clone());
302
303				match Peer::try_new(bus_name, address.as_str(), conn).await {
304					Ok(peer) => peers.push(peer),
305
306					#[cfg(feature = "tracing")]
307					Err(e) => {
308						tracing::warn!("Failed to create peer for {:?}: {}", app.name_as_str(), e);
309					}
310
311					#[cfg(all(debug_assertions, not(feature = "tracing")))]
312					Err(e) => {
313						eprintln!("Failed to create peer for {:?}: {}", app.name_as_str(), e);
314					}
315
316					#[cfg(not(any(feature = "tracing", debug_assertions)))]
317					Err(_) => {
318						// Ignore error creating peer
319					}
320				}
321			}
322		}
323
324		Ok(Peers { peers: Arc::new(Mutex::new(peers)) })
325	}
326
327	/// Returns a [`Peer`] by its bus name.
328	fn get_peer(&self, bus_name: &BusName<'_>) -> Option<Peer> {
329		let peers = self.peers.lock().expect("already locked by current thread");
330
331		let matched = match bus_name {
332			BusName::Unique(unique_name) => {
333				peers.iter().find(|peer| peer.unique_name() == unique_name)
334			}
335			BusName::WellKnown(well_known_name) => {
336				let owned_well_known_name = OwnedWellKnownName::from(well_known_name.clone());
337				peers
338					.iter()
339					.find(|peer| peer.well_known_name() == Some(&owned_well_known_name))
340			}
341		};
342		matched.cloned()
343	}
344
345	/// Returns the inner `Arc<Mutex<Vec<Peer>>>`.
346	fn inner(&self) -> Arc<Mutex<Vec<Peer>>> {
347		Arc::clone(&self.peers)
348	}
349
350	/// Inserts a new `Peer` into the list of peers.
351	async fn insert_unique(
352		&self,
353		unique_name: &zbus::names::UniqueName<'_>,
354		conn: &zbus::Connection,
355	) -> AtspiResult<()> {
356		let bus_name = BusName::Unique(unique_name.as_ref());
357		let address = bus_name.get_p2p_address(conn).await?;
358		let p2p_connection = Builder::address(address.clone())?.p2p().build().await?;
359
360		let unique_name = OwnedUniqueName::from(unique_name.clone());
361
362		let peer =
363			Peer { unique_name, well_known_name: None, socket_address: address, p2p_connection };
364
365		let mut guard = self.peers.lock().expect("lock already held by current thread");
366		guard.push(peer);
367		Ok(())
368	}
369
370	/// Removes a `Peer` from the list of peers by its unique name.
371	fn remove_unique(&self, unique_name: &zbus::names::UniqueName<'_>) {
372		let mut peers = self.peers.lock().expect("lock already held by current thread");
373		peers.retain(|peer| peer.unique_name() != unique_name);
374	}
375
376	/// Inserts a new `Peer` with a well-known name into the list of peers.
377	async fn insert_well_known(
378		&self,
379		well_known_name: &WellKnownName<'_>,
380		name_owner: &UniqueName<'_>,
381		conn: &zbus::Connection,
382	) -> AtspiResult<()> {
383		let bus_name = BusName::WellKnown(well_known_name.clone());
384		let address = bus_name.get_p2p_address(conn).await?;
385		let p2p_connection = Builder::address(address.clone())?.p2p().build().await?;
386
387		let well_known_name = OwnedWellKnownName::from(well_known_name.clone());
388		let unique_name = OwnedUniqueName::from(name_owner.clone());
389
390		let peer = Peer {
391			unique_name,
392			well_known_name: Some(well_known_name),
393			socket_address: address,
394			p2p_connection,
395		};
396
397		let mut guard = self.peers.lock().expect("lock already held by current thread");
398		guard.push(peer);
399		Ok(())
400	}
401
402	/// Removes a `Peer` with a well-known name from the list of peers.
403	fn remove_well_known(&self, well_known_name: &WellKnownName<'_>, name_owner: &UniqueName<'_>) {
404		let mut peers = self.peers.lock().expect("lock already held by current thread");
405		let owned_well_known_name = OwnedWellKnownName::from(well_known_name.clone());
406		peers.retain(|peer| {
407			(peer.well_known_name() != Some(&owned_well_known_name))
408				&& peer.unique_name() == name_owner
409		});
410	}
411
412	/// Update a `Peer` with a new owner of it's well-known name in the list of peers.
413	async fn update_well_known_owner(
414		&self,
415		well_known_name: &WellKnownName<'_>,
416		old_name_owner: &UniqueName<'_>,
417		new_name_owner: &UniqueName<'_>,
418		conn: &zbus::Connection,
419	) -> AtspiResult<()> {
420		let socket_address = BusName::from(new_name_owner.clone()).get_p2p_address(conn).await?;
421		let p2p_connection = Builder::address(socket_address.clone())?.p2p().build().await?;
422
423		let well_known_name = Some(OwnedWellKnownName::from(well_known_name.clone()));
424		let old_name_owner = OwnedUniqueName::from(old_name_owner.clone());
425		let unique_name = OwnedUniqueName::from(new_name_owner.clone());
426
427		let peer = Peer {
428			unique_name,
429			well_known_name: well_known_name.clone(),
430			socket_address,
431			p2p_connection,
432		};
433
434		let mut peers = self.peers.lock().expect("lock already held by current thread");
435		if let Some(existing_peer) = peers.iter_mut().find(|p| {
436			p.well_known_name() == well_known_name.as_ref() && p.unique_name() == &old_name_owner
437		}) {
438			*existing_peer = peer;
439		} else {
440			return Err(AtspiError::Owned(format!(
441                "Owner swap failed: well-known name {well_known_name:?} with owner: {old_name_owner} not found"
442            )));
443		}
444		Ok(())
445	}
446
447	/// Spawns a task which listens for peer mutations.
448	///
449	/// This task listens for `NameOwnerChanged` signals and updates the list of peers accordingly.
450	///
451	/// # executor
452	/// The task is spawned on the executor of the `zbus::Connection`.
453	///
454	/// # Note
455	/// This function is called internally by `AccessibilityConnection::new()`.
456	pub(crate) fn spawn_peer_listener_task(&self, conn: &zbus::Connection) {
457		// Clone the `Peers` and `Connection` to move them into the async task.
458		// This is necessary because the async task needs to own these values.
459		let peers = self.clone();
460		let conn = conn.clone();
461		let dbus_proxy = futures_lite::future::block_on(DBusProxy::new(&conn))
462			.expect("Failed to create DBusProxy");
463
464		let executor = conn.executor().clone();
465
466		executor.spawn(async move {
467			let Ok(mut name_owner_changed_stream) = dbus_proxy.receive_name_owner_changed().await.inspect_err(|#[allow(unused_variables)] err| {
468				#[cfg(feature = "tracing")]
469				debug!("Failed to receive `NameOwnerChanged` stream: {err}");
470			}) else {
471				return;
472			};
473
474			while let Some(name_owner_event) = name_owner_changed_stream.next().await {
475					let Ok(args) = name_owner_event.args() else {
476						#[cfg(feature = "tracing")]
477						tracing::debug!("Received name owner changed event without args, skipping.");
478						continue;
479					};
480					let name = args.name().clone();
481					let new = args.new_owner().clone();
482					let old = args.old_owner().clone();
483
484					// `NameOwnerChanged` table (U = Unique, W = Well-Known):
485					// | Name | Old Owner | New Owner | Operation |
486					// |------|-----------|-----------|----------|
487					// |   U  |   None    |  Some(U)  |  Add     |
488					// |   U  |   Some(U) |    None   |  Remove  |
489					// |   W  |   None    |  Some(U)  |  Add     |
490					// |   W  |   Some(U) |    None   |  Remove  |
491					// |   W  |   Some(U) |  Some(U)  |  Replace |
492
493					match name {
494						BusName::Unique(unique_name) => {
495							// `zvariant:Optional` has deref target `Option`.
496							match (&*old, &*new) {
497								// Application appeared on the bus.
498								(None, Some(new_owner)) => {
499									debug_assert_eq!(new_owner, &unique_name, "When a name appears on the bus, the new owner must be the unique name itself.");
500
501									if let Ok(()) = peers.insert_unique(&unique_name, &conn).await.inspect_err(|#[allow(unused_variables)] err| {
502										#[cfg(feature = "tracing")]
503										warn!("Failed to insert unique name: {unique_name}: {err}");
504									}) {
505										#[cfg(feature = "tracing")]
506										info!("Inserted unique name: {unique_name} into the peer list.");
507									};
508
509								}
510								// Unique name left the bus.
511								(Some(old), None) => {
512									debug_assert!(old == &unique_name, "When a unique name is removed from the bus, the old owner must be the unique name itself.");
513									peers.remove_unique(&unique_name);
514
515									#[cfg(feature = "tracing")]
516									info!("Peer with unique name: {unique_name} left the bus - removed from peer list.");
517								}
518
519								// Unknown combination.
520								(_, _) => {
521									#[cfg(feature = "tracing")]
522									debug!("NameOwnerChanged` with unique name: {unique_name} has unknown argument combination ({old:?}, {new:?}).");
523								}
524							}
525						}
526						BusName::WellKnown(well_known_name) => {
527							match (&*old, &*new) {
528								// Unknown mutatuion. Well-known names should always have at least a new or old owner.
529								(None, None) => {
530									#[cfg(feature = "tracing")]
531									debug!("Received `NameOwnerChanged` event with no old or new owner for well-known name: {}", well_known_name);
532								}
533
534								// Well-known name appeared on the bus.
535								(None, Some(new_owner_unique_name)) => {
536									if let Ok(()) = peers.insert_well_known(
537										&well_known_name,
538										new_owner_unique_name,
539										&conn,
540									).await.inspect_err(|#[allow(unused_variables)] err| {
541										#[cfg(feature = "tracing")]
542										warn!("Failed to insert well-known name: {} with owner: {} - {}", &well_known_name, &new_owner_unique_name, err);
543									}) {
544										#[cfg(feature = "tracing")]
545										info!("Well-known name: {} with owner: {} inserted into the peer list.", &well_known_name, &new_owner_unique_name);
546									}
547								}
548
549								// Well-known name left the bus.
550								(Some(old_owner_unique_name), None) => {
551									peers.remove_well_known(
552										&well_known_name,
553										old_owner_unique_name,
554									);
555
556									#[cfg(feature = "tracing")]
557									info!(
558										"Well-known name: {} with owner: {} removed from the peer list.",
559										&well_known_name,
560										&old_owner_unique_name
561									);
562								},
563
564								// Well-known name received a new owner on the bus.
565								(Some(old_owner_unique_name), Some(new_owner_unique_name)) => {
566									if let Ok(()) = peers.update_well_known_owner(&well_known_name, old_owner_unique_name, new_owner_unique_name, &conn).await.inspect_err(|#[allow(unused_variables)] err| {
567										#[cfg(feature = "tracing")]
568										warn!("Failed to update well-known name: {} owner from: {} to: {} - {}", &well_known_name, &old_owner_unique_name, &new_owner_unique_name, err);
569									}) {
570										#[cfg(feature = "tracing")]
571										info!("Well-known name: {} updated owner from: {} to: {}", &well_known_name, &old_owner_unique_name, &new_owner_unique_name);
572									};
573								}
574							}
575						}
576					} // End of match on `name`
577				} // End of while loop
578
579				#[cfg(feature = "tracing")]
580				tracing::warn!("Peer listener task stopped, clearing peers list.");
581				peers.clear();
582			}, "PeerListenerTask")
583			.detach();
584	}
585
586	/// Clears the list of peers.
587	///
588	/// # Note
589	/// This is used to reset the list of peers when the D-Bus connection is lost.
590	fn clear(&self) {
591		let mut peers = self.peers.lock().expect("lock already held by current thread");
592		peers.clear();
593	}
594}
595
596/// Trait for P2P connection handling.
597pub trait P2P {
598	/// Returns a P2P connected `AccessibleProxy`for object, _if available_.\
599	/// If the application does not support P2P, this returns an `AccessibleProxy` for the object with a bus connection.
600	fn object_as_accessible(
601		&'_ self,
602		obj: &ObjectRefOwned,
603	) -> impl std::future::Future<Output = AtspiResult<AccessibleProxy<'_>>>;
604
605	/// Returns a P2P connected `AccessibleProxy` to the root  accessible object for the given bus name, _if available_.\
606	/// If the P2P connection is not available, it returns an `AccessibleProxy` with a bus connection.
607	fn bus_name_as_root_accessible(
608		&'_ self,
609		name: &BusName,
610	) -> impl std::future::Future<Output = AtspiResult<AccessibleProxy<'_>>>;
611
612	/// Return a list of peers that are currently connected.
613	fn peers(&self) -> Arc<Mutex<Vec<Peer>>>;
614
615	/// Returns a [`Peer`] by its bus name.
616	fn get_peer(&self, bus_name: &BusName<'_>) -> Option<Peer>;
617}
618
619impl P2P for crate::AccessibilityConnection {
620	/// Returns a P2P connected `AccessibleProxy` for the object, _if available_.\
621	/// If the application does not support P2P, an `AccessibleProxy` with a bus connection is returned.
622	///
623	/// # Examples
624	/// ```rust
625	/// # use tokio_test::block_on;
626	/// use zbus::names::UniqueName;
627	/// use zbus::zvariant::ObjectPath;
628	/// use atspi_proxies::accessible::AccessibleProxy;
629	/// use atspi_common::ObjectRef;
630	/// use atspi_connection::{P2P, Peer};
631	/// use atspi_connection::AccessibilityConnection;
632	///
633	/// # block_on(async {
634	/// let conn = AccessibilityConnection::new().await.unwrap();
635	///
636	/// let name = UniqueName::from_static_str_unchecked(":1.1");
637	/// let path = ObjectPath::from_static_str_unchecked("/org/freedesktop/accessible/root");
638	///
639	/// let object_ref = ObjectRef::new_owned(name, path);
640	/// let accessible_proxy = conn.object_as_accessible(&object_ref).await;
641	/// assert!(
642	///    accessible_proxy.is_ok(),
643	///    "Failed to get accessible proxy: {:?}",
644	///    accessible_proxy.err()
645	/// );
646	/// # });
647	/// ```
648	///
649	/// Handling `ObjectRef::Null` case:
650	///
651	/// ```rust
652	/// # use tokio_test::block_on;
653	/// use atspi_proxies::accessible::AccessibleProxy;
654	/// use atspi_common::{AtspiError, ObjectRef, ObjectRefOwned};
655	/// use atspi_connection::P2P;
656	/// use atspi_connection::AccessibilityConnection;
657	///
658	/// # block_on(async {
659	/// let conn = AccessibilityConnection::new().await.unwrap();
660	/// let object_ref = ObjectRef::Null;
661	/// let object_ref = ObjectRefOwned::new(object_ref); // Assume we received this from `Accessible.Parent`
662	///
663	/// let res = conn.object_as_accessible(&object_ref).await;
664	/// match res {
665	///     Ok(proxy) => {
666	///         // Use the proxy
667	///         let _proxy: AccessibleProxy<'_> = proxy;
668	///     }
669	///     Err(AtspiError::NullRef(_msg)) => {
670	///         // Handle null-reference case
671	///     }
672	///     Err(_other) => {
673	///         // Handle other error types
674	///     }
675	/// }
676	/// # });
677	/// ```
678	///
679	/// # Errors
680	/// If the method is called with a null-reference `ObjectRef`, it will return an `AtspiError::NullRef`.
681	/// Users should ensure that the `ObjectRef` is non-null before calling this method or handle the result.
682	/// If the `AccessibleProxy` cannot be created, or if the object path is invalid.
683	///
684	/// # Note
685	/// This function will first try to find a [`Peer`] with a P2P connection
686	async fn object_as_accessible(&self, obj: &ObjectRefOwned) -> AtspiResult<AccessibleProxy<'_>> {
687		if obj.is_null() {
688			return Err(AtspiError::NullRef(
689				"`p2p::object_as_accessible` called with null-reference ObjectRef",
690			));
691		}
692
693		let name = obj.name().ok_or(AtspiError::MissingName)?.to_owned();
694		let name = OwnedUniqueName::from(name);
695		let path = obj.path();
696
697		let lookup = self
698			.peers
699			.peers
700			.lock()
701			.expect("lock already held by current thread")
702			.iter()
703			.find(|peer| &name == peer.unique_name())
704			.cloned();
705
706		if let Some(peer) = lookup {
707			// If a peer is found, create an `AccessibleProxy` with a P2P connection
708			AccessibleProxy::builder(peer.connection())
709				.path(path)?
710				.cache_properties(CacheProperties::No)
711				.build()
712				.await
713				.map_err(Into::into)
714		} else {
715			// If _no_ peer was found, fall back to the bus connection
716			let conn = self.connection();
717			AccessibleProxy::builder(conn)
718				.path(path)?
719				.cache_properties(CacheProperties::No)
720				.build()
721				.await
722				.map_err(Into::into)
723		}
724	}
725
726	/// Returns a P2P connected [`AccessibleProxy`] to the root accessible object for the given bus name _if available_.\
727	/// If the P2P connection is not available, it returns an `AccessibleProxy` with a bus connection.
728	///
729	/// # Examples
730	///
731	/// ```rust
732	/// # use tokio_test::block_on;
733	/// use zbus::names::BusName;
734	/// use atspi_proxies::accessible::AccessibleProxy;
735	/// use atspi_common::ObjectRef;
736	/// use atspi_connection::{AccessibilityConnection, P2P};
737	///
738	/// # block_on(async {
739	///   let conn = AccessibilityConnection::new().await.unwrap();
740	///   let bus_name = BusName::from_static_str("org.a11y.atspi.Registry").unwrap();
741	///   let _accessible_proxy = conn.bus_name_as_root_accessible(&bus_name).await.unwrap();
742	///   // Use the accessible proxy as needed
743	/// # });
744	/// ```
745	///
746	/// # Errors
747	/// In case of an invalid connection or object path.
748	async fn bus_name_as_root_accessible(
749		&'_ self,
750		name: &BusName<'_>,
751	) -> AtspiResult<AccessibleProxy<'_>> {
752		// Look up peer by bus name
753		let lookup = self
754			.peers
755			.peers
756			.lock()
757			.expect("lock already held by current thread")
758			.iter()
759			.find(|peer| {
760				// Check if the peer's unique name matches the bus name
761				match name {
762					BusName::Unique(unique_name) => peer.unique_name() == unique_name,
763					BusName::WellKnown(well_known_name) => {
764						peer.well_known_name().is_some_and(|w| w == well_known_name)
765					}
766				}
767			})
768			.cloned();
769
770		if let Some(peer) = lookup {
771			// If a peer is found, create an AccessibleProxy with a P2P connection
772			AccessibleProxy::builder(peer.connection())
773				.cache_properties(CacheProperties::No)
774				.build()
775				.await
776				.map_err(Into::into)
777		} else {
778			// If no peer is found, fall back to the bus connection
779			let conn = self.connection();
780			AccessibleProxy::builder(conn)
781				.cache_properties(CacheProperties::No)
782				.build()
783				.await
784				.map_err(Into::into)
785		}
786	}
787
788	/// Get the currently connected P2P capable peers.
789	///
790	/// # Examples
791	/// ```rust
792	/// # use tokio_test::block_on;
793	/// use atspi_connection::AccessibilityConnection;
794	/// use atspi_connection::{P2P, Peer};
795	///
796	/// # block_on(async {
797	///   let conn = AccessibilityConnection::new().await.unwrap();
798	///   let locked_peers = conn.peers();
799	///   let peers = locked_peers.lock().expect("lock already held by current thread");
800	///   for peer in &*peers {
801	///       println!("Peer: {} at {}", peer.unique_name(), peer.socket_address());
802	///   }
803	/// # });
804	/// ```
805	fn peers(&self) -> Arc<Mutex<Vec<Peer>>> {
806		self.peers.inner()
807	}
808
809	/// Returns a [`Peer`] by its bus name.
810	///
811	/// # Examples
812	/// ```rust
813	/// # use tokio_test::block_on;
814	/// use atspi_connection::{AccessibilityConnection, P2P, Peer};
815	/// use zbus::names::BusName;
816	///
817	/// # block_on(async {
818	///   let a11y = AccessibilityConnection::new().await.unwrap();
819	///   let bus_name = BusName::from_static_str(":1.42").unwrap();
820	///   let peer: Option<Peer> = a11y.get_peer(&bus_name);
821	/// # });
822	/// ```
823	fn get_peer(&self, bus_name: &BusName<'_>) -> Option<Peer> {
824		self.peers.get_peer(bus_name)
825	}
826}