1use atspi_common::{object_ref::ObjectRefOwned, AtspiError};
17use atspi_proxies::{
18 accessible::{AccessibleProxy, ObjectRefExt},
19 application::{self, ApplicationProxy},
20 proxy_ext::ProxyExt,
21 registry::RegistryProxy,
22};
23use futures_lite::stream::StreamExt;
24use std::sync::{Arc, Mutex};
25use zbus::{
26 conn::Builder,
27 fdo::DBusProxy,
28 names::{
29 BusName, OwnedBusName, OwnedUniqueName, OwnedWellKnownName, UniqueName, WellKnownName,
30 },
31 proxy::{CacheProperties, Defaults},
32 zvariant::ObjectPath,
33 Address,
34};
35
36#[cfg(feature = "tracing")]
37use tracing::{debug, info, warn};
38
39use crate::AtspiResult;
40
41#[derive(Clone, Debug)]
43pub struct Peer {
44 unique_name: OwnedUniqueName,
45 well_known_name: Option<OwnedWellKnownName>,
46 socket_address: Address,
47 p2p_connection: zbus::Connection,
48}
49
50impl Peer {
51 pub(crate) async fn try_new<B, S>(
64 bus_name: B,
65 socket: S,
66 conn: &zbus::Connection,
67 ) -> Result<Self, AtspiError>
68 where
69 B: Into<OwnedBusName>,
70 S: TryInto<Address>,
71 {
72 let dbus_proxy = DBusProxy::new(conn).await?;
73 let owned_bus_name: OwnedBusName = bus_name.into();
74
75 let socket_address = socket
76 .try_into()
77 .map_err(|_| AtspiError::ParseError("Invalid address string"))?;
78
79 let well_known_names: Vec<OwnedWellKnownName> = dbus_proxy
84 .list_names()
85 .await?
86 .into_iter()
87 .filter_map(|name| {
88 if let BusName::WellKnown(well_nown_name) = name.clone().inner() {
89 Some(OwnedWellKnownName::from(well_nown_name.clone()))
90 } else {
91 None
92 }
93 })
94 .collect();
95
96 let mut unique_to_well_known: Vec<(OwnedUniqueName, OwnedWellKnownName)> = Vec::new();
98
99 for well_known_name in &well_known_names {
103 let bus_name = BusName::from(well_known_name.clone());
104 if let Ok(unique_name) = dbus_proxy.get_name_owner(bus_name).await {
105 unique_to_well_known.push((unique_name, well_known_name.clone()));
106 }
107 }
108
109 let (unique_name, well_known_name) = match owned_bus_name.inner() {
116 BusName::Unique(name) => {
117 let owned_well_known_name = unique_to_well_known.iter().find_map(|(u, w)| {
119 if u == name {
120 Some(w.clone())
121 } else {
122 None
123 }
124 });
125 let owned_unique_name = OwnedUniqueName::from(name.clone());
126 (owned_unique_name, owned_well_known_name)
127 }
128 BusName::WellKnown(well_known_name) => {
129 let bus_name = BusName::from(well_known_name.clone());
130 let owned_unique_name = dbus_proxy.get_name_owner(bus_name).await?;
131 let owned_well_known_name = OwnedWellKnownName::from(well_known_name.clone());
132 (owned_unique_name, Some(owned_well_known_name))
133 }
134 };
135
136 let p2p_connection = Builder::address(socket_address.clone())?.p2p().build().await?;
137
138 Ok(Peer { unique_name, well_known_name, socket_address, p2p_connection })
139 }
140
141 #[must_use]
143 pub fn unique_name(&self) -> &OwnedUniqueName {
144 &self.unique_name
145 }
146
147 #[must_use]
149 pub fn well_known_name(&self) -> Option<&OwnedWellKnownName> {
150 self.well_known_name.as_ref()
151 }
152
153 #[must_use]
155 pub fn socket_address(&self) -> &Address {
156 &self.socket_address
157 }
158
159 pub fn connection(&self) -> &zbus::Connection {
161 &self.p2p_connection
162 }
163
164 pub async fn try_from_bus_name(
170 bus_name: BusName<'_>,
171 conn: &zbus::Connection,
172 ) -> AtspiResult<Self> {
173 let application_proxy = ApplicationProxy::builder(conn)
175 .destination(&bus_name)?
176 .cache_properties(CacheProperties::No)
177 .build()
178 .await?;
179
180 let socket_path = application_proxy.get_application_bus_address().await?;
181 Self::try_new(bus_name, socket_path.as_str(), conn).await
182 }
183
184 pub async fn proxies(
190 &'_ self,
191 path: &ObjectPath<'_>,
192 ) -> AtspiResult<atspi_proxies::proxy_ext::Proxies<'_>> {
193 let accessible_proxy = AccessibleProxy::builder(&self.p2p_connection)
194 .path(path.to_owned())?
195 .cache_properties(CacheProperties::No)
196 .build()
197 .await?;
198
199 accessible_proxy.proxies().await
200 }
201
202 pub async fn as_root_accessible_proxy(&self) -> AtspiResult<AccessibleProxy<'_>> {
207 AccessibleProxy::builder(&self.p2p_connection)
208 .cache_properties(CacheProperties::No)
209 .build()
210 .await
211 .map_err(AtspiError::from)
212 }
213
214 pub async fn as_accessible_proxy(
219 &self,
220 obj: &ObjectRefOwned,
221 ) -> AtspiResult<AccessibleProxy<'_>> {
222 let path = obj.path();
223
224 AccessibleProxy::builder(&self.p2p_connection)
225 .path(path)?
226 .cache_properties(CacheProperties::No)
227 .build()
228 .await
229 .map_err(AtspiError::from)
230 }
231}
232
233pub(crate) trait BusNameExt {
235 async fn get_p2p_address(&self, conn: &zbus::Connection) -> AtspiResult<Address>;
237}
238
239impl BusNameExt for BusName<'_> {
240 async fn get_p2p_address(&self, conn: &zbus::Connection) -> AtspiResult<Address> {
241 let application_proxy = application::ApplicationProxy::builder(conn)
242 .destination(self)?
243 .cache_properties(CacheProperties::No)
244 .build()
245 .await?;
246
247 application_proxy
248 .get_application_bus_address()
249 .await
250 .map_err(|e| {
251 AtspiError::Owned(format!(
252 "Failed to get application bus address for {}: {e}",
253 &self
254 ))
255 })
256 .and_then(|address| {
257 Address::try_from(address.as_str())
258 .map_err(|_| AtspiError::ParseError("Invalid address string"))
259 })
260 }
261}
262
263#[derive(Clone, Debug)]
264pub(crate) struct Peers {
265 peers: Arc<Mutex<Vec<Peer>>>,
266}
267
268impl Peers {
269 pub(crate) async fn initialize_peers(conn: &zbus::Connection) -> AtspiResult<Self> {
280 let registry_well_known_name = RegistryProxy::DESTINATION
281 .as_ref()
282 .expect("RegistryProxy `default_destination` is not set");
283 let reg_accessible = AccessibleProxy::builder(conn)
284 .destination(registry_well_known_name)?
285 .cache_properties(CacheProperties::No)
286 .build()
287 .await?;
288
289 let accessible_applications = reg_accessible.get_children().await?;
290 let mut peers = Vec::with_capacity(accessible_applications.len());
291
292 for app in accessible_applications {
293 let accessible_proxy = app.as_accessible_proxy(conn).await?;
294 let proxies = accessible_proxy.proxies().await?;
295 let application_proxy = proxies.application().await?;
296
297 if let Ok(address) = application_proxy.get_application_bus_address().await {
300 let name = app.name().ok_or(AtspiError::MissingName)?;
301 let bus_name = BusName::from(name.clone());
302
303 match Peer::try_new(bus_name, address.as_str(), conn).await {
304 Ok(peer) => peers.push(peer),
305
306 #[cfg(feature = "tracing")]
307 Err(e) => {
308 tracing::warn!("Failed to create peer for {:?}: {}", app.name_as_str(), e);
309 }
310
311 #[cfg(all(debug_assertions, not(feature = "tracing")))]
312 Err(e) => {
313 eprintln!("Failed to create peer for {:?}: {}", app.name_as_str(), e);
314 }
315
316 #[cfg(not(any(feature = "tracing", debug_assertions)))]
317 Err(_) => {
318 }
320 }
321 }
322 }
323
324 Ok(Peers { peers: Arc::new(Mutex::new(peers)) })
325 }
326
327 fn get_peer(&self, bus_name: &BusName<'_>) -> Option<Peer> {
329 let peers = self.peers.lock().expect("already locked by current thread");
330
331 let matched = match bus_name {
332 BusName::Unique(unique_name) => {
333 peers.iter().find(|peer| peer.unique_name() == unique_name)
334 }
335 BusName::WellKnown(well_known_name) => {
336 let owned_well_known_name = OwnedWellKnownName::from(well_known_name.clone());
337 peers
338 .iter()
339 .find(|peer| peer.well_known_name() == Some(&owned_well_known_name))
340 }
341 };
342 matched.cloned()
343 }
344
345 fn inner(&self) -> Arc<Mutex<Vec<Peer>>> {
347 Arc::clone(&self.peers)
348 }
349
350 async fn insert_unique(
352 &self,
353 unique_name: &zbus::names::UniqueName<'_>,
354 conn: &zbus::Connection,
355 ) -> AtspiResult<()> {
356 let bus_name = BusName::Unique(unique_name.as_ref());
357 let address = bus_name.get_p2p_address(conn).await?;
358 let p2p_connection = Builder::address(address.clone())?.p2p().build().await?;
359
360 let unique_name = OwnedUniqueName::from(unique_name.clone());
361
362 let peer =
363 Peer { unique_name, well_known_name: None, socket_address: address, p2p_connection };
364
365 let mut guard = self.peers.lock().expect("lock already held by current thread");
366 guard.push(peer);
367 Ok(())
368 }
369
370 fn remove_unique(&self, unique_name: &zbus::names::UniqueName<'_>) {
372 let mut peers = self.peers.lock().expect("lock already held by current thread");
373 peers.retain(|peer| peer.unique_name() != unique_name);
374 }
375
376 async fn insert_well_known(
378 &self,
379 well_known_name: &WellKnownName<'_>,
380 name_owner: &UniqueName<'_>,
381 conn: &zbus::Connection,
382 ) -> AtspiResult<()> {
383 let bus_name = BusName::WellKnown(well_known_name.clone());
384 let address = bus_name.get_p2p_address(conn).await?;
385 let p2p_connection = Builder::address(address.clone())?.p2p().build().await?;
386
387 let well_known_name = OwnedWellKnownName::from(well_known_name.clone());
388 let unique_name = OwnedUniqueName::from(name_owner.clone());
389
390 let peer = Peer {
391 unique_name,
392 well_known_name: Some(well_known_name),
393 socket_address: address,
394 p2p_connection,
395 };
396
397 let mut guard = self.peers.lock().expect("lock already held by current thread");
398 guard.push(peer);
399 Ok(())
400 }
401
402 fn remove_well_known(&self, well_known_name: &WellKnownName<'_>, name_owner: &UniqueName<'_>) {
404 let mut peers = self.peers.lock().expect("lock already held by current thread");
405 let owned_well_known_name = OwnedWellKnownName::from(well_known_name.clone());
406 peers.retain(|peer| {
407 (peer.well_known_name() != Some(&owned_well_known_name))
408 && peer.unique_name() == name_owner
409 });
410 }
411
412 async fn update_well_known_owner(
414 &self,
415 well_known_name: &WellKnownName<'_>,
416 old_name_owner: &UniqueName<'_>,
417 new_name_owner: &UniqueName<'_>,
418 conn: &zbus::Connection,
419 ) -> AtspiResult<()> {
420 let socket_address = BusName::from(new_name_owner.clone()).get_p2p_address(conn).await?;
421 let p2p_connection = Builder::address(socket_address.clone())?.p2p().build().await?;
422
423 let well_known_name = Some(OwnedWellKnownName::from(well_known_name.clone()));
424 let old_name_owner = OwnedUniqueName::from(old_name_owner.clone());
425 let unique_name = OwnedUniqueName::from(new_name_owner.clone());
426
427 let peer = Peer {
428 unique_name,
429 well_known_name: well_known_name.clone(),
430 socket_address,
431 p2p_connection,
432 };
433
434 let mut peers = self.peers.lock().expect("lock already held by current thread");
435 if let Some(existing_peer) = peers.iter_mut().find(|p| {
436 p.well_known_name() == well_known_name.as_ref() && p.unique_name() == &old_name_owner
437 }) {
438 *existing_peer = peer;
439 } else {
440 return Err(AtspiError::Owned(format!(
441 "Owner swap failed: well-known name {well_known_name:?} with owner: {old_name_owner} not found"
442 )));
443 }
444 Ok(())
445 }
446
447 pub(crate) fn spawn_peer_listener_task(&self, conn: &zbus::Connection) {
457 let peers = self.clone();
460 let conn = conn.clone();
461 let dbus_proxy = futures_lite::future::block_on(DBusProxy::new(&conn))
462 .expect("Failed to create DBusProxy");
463
464 let executor = conn.executor().clone();
465
466 executor.spawn(async move {
467 let Ok(mut name_owner_changed_stream) = dbus_proxy.receive_name_owner_changed().await.inspect_err(|#[allow(unused_variables)] err| {
468 #[cfg(feature = "tracing")]
469 debug!("Failed to receive `NameOwnerChanged` stream: {err}");
470 }) else {
471 return;
472 };
473
474 while let Some(name_owner_event) = name_owner_changed_stream.next().await {
475 let Ok(args) = name_owner_event.args() else {
476 #[cfg(feature = "tracing")]
477 tracing::debug!("Received name owner changed event without args, skipping.");
478 continue;
479 };
480 let name = args.name().clone();
481 let new = args.new_owner().clone();
482 let old = args.old_owner().clone();
483
484 match name {
494 BusName::Unique(unique_name) => {
495 match (&*old, &*new) {
497 (None, Some(new_owner)) => {
499 debug_assert_eq!(new_owner, &unique_name, "When a name appears on the bus, the new owner must be the unique name itself.");
500
501 if let Ok(()) = peers.insert_unique(&unique_name, &conn).await.inspect_err(|#[allow(unused_variables)] err| {
502 #[cfg(feature = "tracing")]
503 warn!("Failed to insert unique name: {unique_name}: {err}");
504 }) {
505 #[cfg(feature = "tracing")]
506 info!("Inserted unique name: {unique_name} into the peer list.");
507 };
508
509 }
510 (Some(old), None) => {
512 debug_assert!(old == &unique_name, "When a unique name is removed from the bus, the old owner must be the unique name itself.");
513 peers.remove_unique(&unique_name);
514
515 #[cfg(feature = "tracing")]
516 info!("Peer with unique name: {unique_name} left the bus - removed from peer list.");
517 }
518
519 (_, _) => {
521 #[cfg(feature = "tracing")]
522 debug!("NameOwnerChanged` with unique name: {unique_name} has unknown argument combination ({old:?}, {new:?}).");
523 }
524 }
525 }
526 BusName::WellKnown(well_known_name) => {
527 match (&*old, &*new) {
528 (None, None) => {
530 #[cfg(feature = "tracing")]
531 debug!("Received `NameOwnerChanged` event with no old or new owner for well-known name: {}", well_known_name);
532 }
533
534 (None, Some(new_owner_unique_name)) => {
536 if let Ok(()) = peers.insert_well_known(
537 &well_known_name,
538 new_owner_unique_name,
539 &conn,
540 ).await.inspect_err(|#[allow(unused_variables)] err| {
541 #[cfg(feature = "tracing")]
542 warn!("Failed to insert well-known name: {} with owner: {} - {}", &well_known_name, &new_owner_unique_name, err);
543 }) {
544 #[cfg(feature = "tracing")]
545 info!("Well-known name: {} with owner: {} inserted into the peer list.", &well_known_name, &new_owner_unique_name);
546 }
547 }
548
549 (Some(old_owner_unique_name), None) => {
551 peers.remove_well_known(
552 &well_known_name,
553 old_owner_unique_name,
554 );
555
556 #[cfg(feature = "tracing")]
557 info!(
558 "Well-known name: {} with owner: {} removed from the peer list.",
559 &well_known_name,
560 &old_owner_unique_name
561 );
562 },
563
564 (Some(old_owner_unique_name), Some(new_owner_unique_name)) => {
566 if let Ok(()) = peers.update_well_known_owner(&well_known_name, old_owner_unique_name, new_owner_unique_name, &conn).await.inspect_err(|#[allow(unused_variables)] err| {
567 #[cfg(feature = "tracing")]
568 warn!("Failed to update well-known name: {} owner from: {} to: {} - {}", &well_known_name, &old_owner_unique_name, &new_owner_unique_name, err);
569 }) {
570 #[cfg(feature = "tracing")]
571 info!("Well-known name: {} updated owner from: {} to: {}", &well_known_name, &old_owner_unique_name, &new_owner_unique_name);
572 };
573 }
574 }
575 }
576 } } #[cfg(feature = "tracing")]
580 tracing::warn!("Peer listener task stopped, clearing peers list.");
581 peers.clear();
582 }, "PeerListenerTask")
583 .detach();
584 }
585
586 fn clear(&self) {
591 let mut peers = self.peers.lock().expect("lock already held by current thread");
592 peers.clear();
593 }
594}
595
596pub trait P2P {
598 fn object_as_accessible(
601 &'_ self,
602 obj: &ObjectRefOwned,
603 ) -> impl std::future::Future<Output = AtspiResult<AccessibleProxy<'_>>>;
604
605 fn bus_name_as_root_accessible(
608 &'_ self,
609 name: &BusName,
610 ) -> impl std::future::Future<Output = AtspiResult<AccessibleProxy<'_>>>;
611
612 fn peers(&self) -> Arc<Mutex<Vec<Peer>>>;
614
615 fn get_peer(&self, bus_name: &BusName<'_>) -> Option<Peer>;
617}
618
619impl P2P for crate::AccessibilityConnection {
620 async fn object_as_accessible(&self, obj: &ObjectRefOwned) -> AtspiResult<AccessibleProxy<'_>> {
687 if obj.is_null() {
688 return Err(AtspiError::NullRef(
689 "`p2p::object_as_accessible` called with null-reference ObjectRef",
690 ));
691 }
692
693 let name = obj.name().ok_or(AtspiError::MissingName)?.to_owned();
694 let name = OwnedUniqueName::from(name);
695 let path = obj.path();
696
697 let lookup = self
698 .peers
699 .peers
700 .lock()
701 .expect("lock already held by current thread")
702 .iter()
703 .find(|peer| &name == peer.unique_name())
704 .cloned();
705
706 if let Some(peer) = lookup {
707 AccessibleProxy::builder(peer.connection())
709 .path(path)?
710 .cache_properties(CacheProperties::No)
711 .build()
712 .await
713 .map_err(Into::into)
714 } else {
715 let conn = self.connection();
717 AccessibleProxy::builder(conn)
718 .path(path)?
719 .cache_properties(CacheProperties::No)
720 .build()
721 .await
722 .map_err(Into::into)
723 }
724 }
725
726 async fn bus_name_as_root_accessible(
749 &'_ self,
750 name: &BusName<'_>,
751 ) -> AtspiResult<AccessibleProxy<'_>> {
752 let lookup = self
754 .peers
755 .peers
756 .lock()
757 .expect("lock already held by current thread")
758 .iter()
759 .find(|peer| {
760 match name {
762 BusName::Unique(unique_name) => peer.unique_name() == unique_name,
763 BusName::WellKnown(well_known_name) => {
764 peer.well_known_name().is_some_and(|w| w == well_known_name)
765 }
766 }
767 })
768 .cloned();
769
770 if let Some(peer) = lookup {
771 AccessibleProxy::builder(peer.connection())
773 .cache_properties(CacheProperties::No)
774 .build()
775 .await
776 .map_err(Into::into)
777 } else {
778 let conn = self.connection();
780 AccessibleProxy::builder(conn)
781 .cache_properties(CacheProperties::No)
782 .build()
783 .await
784 .map_err(Into::into)
785 }
786 }
787
788 fn peers(&self) -> Arc<Mutex<Vec<Peer>>> {
806 self.peers.inner()
807 }
808
809 fn get_peer(&self, bus_name: &BusName<'_>) -> Option<Peer> {
824 self.peers.get_peer(bus_name)
825 }
826}