use enumflags2::{bitflags, BitFlags};
use event_listener::{Event, EventListener};
use futures_core::{ready, stream};
use futures_util::{future::Either, stream::Map};
use once_cell::sync::OnceCell;
use ordered_stream::{
join as join_streams, FromFuture, JoinMultiple, OrderedStream, Peekable, PollResult,
};
use static_assertions::assert_impl_all;
use std::{
collections::{HashMap, HashSet},
convert::{TryFrom, TryInto},
future::Future,
marker::PhantomData,
ops::Deref,
pin::Pin,
sync::{Arc, RwLock, RwLockReadGuard},
task::{Context, Poll},
};
use tracing::{debug, info_span, instrument, Instrument};
use zbus_names::{BusName, InterfaceName, MemberName, UniqueName};
use zvariant::{ObjectPath, OwnedValue, Str, Value};
use crate::{
fdo::{self, IntrospectableProxy, NameOwnerChanged, PropertiesProxy},
AsyncDrop, CacheProperties, Connection, Error, Executor, MatchRule, Message, MessageFlags,
MessageSequence, MessageStream, MessageType, OwnedMatchRule, ProxyBuilder, Result, Task,
};
#[derive(Clone, Debug)]
pub struct Proxy<'a> {
pub(crate) inner: Arc<ProxyInner<'a>>,
}
assert_impl_all!(Proxy<'_>: Send, Sync, Unpin);
#[derive(derivative::Derivative)]
#[derivative(Debug)]
pub(crate) struct ProxyInnerStatic {
#[derivative(Debug = "ignore")]
pub(crate) conn: Connection,
dest_owner_change_match_rule: OnceCell<OwnedMatchRule>,
}
#[derive(Debug)]
pub(crate) struct ProxyInner<'a> {
inner_without_borrows: ProxyInnerStatic,
pub(crate) destination: BusName<'a>,
pub(crate) path: ObjectPath<'a>,
pub(crate) interface: InterfaceName<'a>,
property_cache: Option<OnceCell<(Arc<PropertiesCache>, Task<()>)>>,
uncached_properties: HashSet<Str<'a>>,
}
impl Drop for ProxyInnerStatic {
fn drop(&mut self) {
if let Some(rule) = self.dest_owner_change_match_rule.take() {
self.conn.queue_remove_match(rule);
}
}
}
pub struct PropertyChanged<'a, T> {
name: &'a str,
properties: Arc<PropertiesCache>,
proxy: Proxy<'a>,
phantom: std::marker::PhantomData<T>,
}
impl<'a, T> PropertyChanged<'a, T> {
pub fn name(&self) -> &str {
self.name
}
pub async fn get_raw<'p>(&'p self) -> Result<impl Deref<Target = Value<'static>> + 'p> {
struct Wrapper<'w> {
name: &'w str,
values: RwLockReadGuard<'w, HashMap<String, PropertyValue>>,
}
impl<'w> Deref for Wrapper<'w> {
type Target = Value<'static>;
fn deref(&self) -> &Self::Target {
self.values
.get(self.name)
.expect("PropertyStream with no corresponding property")
.value
.as_ref()
.expect("PropertyStream with no corresponding property")
}
}
{
let values = self.properties.values.read().expect("lock poisoned");
if values
.get(self.name)
.expect("PropertyStream with no corresponding property")
.value
.is_some()
{
return Ok(Wrapper {
name: self.name,
values,
});
}
}
let properties_proxy = self.proxy.properties_proxy();
let value = properties_proxy
.get(self.proxy.inner.interface.clone(), self.name)
.await
.map_err(crate::Error::from)?;
let mut values = self.properties.values.write().expect("lock poisoned");
values
.get_mut(self.name)
.expect("PropertyStream with no corresponding property")
.value = Some(value);
Ok(Wrapper {
name: self.name,
values: self.properties.values.read().expect("lock poisoned"),
})
}
}
impl<T> PropertyChanged<'_, T>
where
T: TryFrom<zvariant::OwnedValue>,
T::Error: Into<crate::Error>,
{
pub async fn get(&self) -> Result<T> {
self.get_raw()
.await
.and_then(|v| T::try_from(OwnedValue::from(&*v)).map_err(Into::into))
}
}
#[derive(derivative::Derivative)]
#[derivative(Debug)]
pub struct PropertyStream<'a, T> {
name: &'a str,
proxy: Proxy<'a>,
event: EventListener,
phantom: std::marker::PhantomData<T>,
}
impl<'a, T> stream::Stream for PropertyStream<'a, T>
where
T: Unpin,
{
type Item = PropertyChanged<'a, T>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let m = self.get_mut();
let properties = match m.proxy.get_property_cache() {
Some(properties) => properties.clone(),
None => return Poll::Ready(None),
};
ready!(Pin::new(&mut m.event).poll(cx));
m.event = properties
.values
.read()
.expect("lock poisoned")
.get(m.name)
.expect("PropertyStream with no corresponding property")
.event
.listen();
Poll::Ready(Some(PropertyChanged {
name: m.name,
properties,
proxy: m.proxy.clone(),
phantom: std::marker::PhantomData,
}))
}
}
#[derive(Debug)]
pub(crate) struct PropertiesCache {
values: RwLock<HashMap<String, PropertyValue>>,
caching_result: RwLock<CachingResult>,
}
#[derive(Debug)]
enum CachingResult {
Caching { ready: Event },
Cached { result: Result<()> },
}
impl PropertiesCache {
#[instrument]
fn new(
proxy: PropertiesProxy<'static>,
interface: InterfaceName<'static>,
executor: &Executor<'_>,
uncached_properties: HashSet<zvariant::Str<'static>>,
) -> (Arc<Self>, Task<()>) {
let cache = Arc::new(PropertiesCache {
values: Default::default(),
caching_result: RwLock::new(CachingResult::Caching {
ready: Event::new(),
}),
});
let cache_clone = cache.clone();
let task_name = format!("{interface} proxy caching");
let proxy_caching = async move {
let result = cache_clone
.init(proxy, interface, uncached_properties)
.await;
let (proxy, interface, uncached_properties) = {
let mut caching_result = cache_clone.caching_result.write().expect("lock poisoned");
let ready = match &*caching_result {
CachingResult::Caching { ready } => ready,
_ => unreachable!(),
};
match result {
Ok((proxy, interface, uncached_properties)) => {
ready.notify(usize::MAX);
*caching_result = CachingResult::Cached { result: Ok(()) };
(proxy, interface, uncached_properties)
}
Err(e) => {
ready.notify(usize::MAX);
*caching_result = CachingResult::Cached { result: Err(e) };
return;
}
}
};
if let Err(e) = cache_clone
.keep_updated(proxy, interface, uncached_properties)
.await
{
debug!("Error keeping properties cache updated: {e}");
}
}
.instrument(info_span!("{}", task_name));
let task = executor.spawn(proxy_caching, &task_name);
(cache, task)
}
async fn init(
&self,
proxy: PropertiesProxy<'static>,
interface: InterfaceName<'static>,
uncached_properties: HashSet<zvariant::Str<'static>>,
) -> Result<(
PropertiesProxy<'static>,
InterfaceName<'static>,
HashSet<zvariant::Str<'static>>,
)> {
use ordered_stream::OrderedStreamExt;
let prop_changes = proxy
.receive_properties_changed()
.await
.map(|s| s.map(Either::Left))?;
let get_all = proxy
.connection()
.call_method_raw(
Some(proxy.destination()),
proxy.path(),
Some(proxy.interface()),
"GetAll",
BitFlags::empty(),
&interface,
)
.await
.map(|r| FromFuture::from(r.expect("no reply")).map(Either::Right))?;
let mut join = join_streams(prop_changes, get_all);
match join.next().await {
Some(Either::Left(update)) => {
if let Ok(args) = update.args() {
if args.interface_name == interface {
self.update_cache(
&uncached_properties,
&args.changed_properties,
args.invalidated_properties,
&interface,
);
}
}
}
Some(Either::Right(populate)) => {
populate?.body().map(|values| {
self.update_cache(&uncached_properties, &values, Vec::new(), &interface);
})?;
}
None => (),
}
Ok((proxy, interface, uncached_properties))
}
async fn keep_updated(
&self,
proxy: PropertiesProxy<'static>,
interface: InterfaceName<'static>,
uncached_properties: HashSet<zvariant::Str<'static>>,
) -> Result<()> {
use futures_util::StreamExt;
let mut prop_changes = proxy.receive_properties_changed().await?;
while let Some(update) = prop_changes.next().await {
if let Ok(args) = update.args() {
if args.interface_name == interface {
self.update_cache(
&uncached_properties,
&args.changed_properties,
args.invalidated_properties,
&interface,
);
}
}
}
Ok(())
}
fn update_cache(
&self,
uncached_properties: &HashSet<Str<'_>>,
changed: &HashMap<&str, Value<'_>>,
invalidated: Vec<&str>,
interface: &InterfaceName<'_>,
) {
let mut values = self.values.write().expect("lock poisoned");
for inval in invalidated {
if uncached_properties.contains(&Str::from(inval)) {
debug!(
"Ignoring invalidation of uncached property `{}.{}`",
interface, inval
);
continue;
}
if let Some(entry) = values.get_mut(inval) {
entry.value = None;
entry.event.notify(usize::MAX);
}
}
for (property_name, value) in changed {
if uncached_properties.contains(&Str::from(*property_name)) {
debug!(
"Ignoring update of uncached property `{}.{}`",
interface, property_name
);
continue;
}
let entry = values
.entry(property_name.to_string())
.or_insert_with(PropertyValue::default);
entry.value = Some(OwnedValue::from(value));
entry.event.notify(usize::MAX);
}
}
pub(crate) async fn ready(&self) -> Result<()> {
let listener = match &*self.caching_result.read().expect("lock poisoned") {
CachingResult::Caching { ready } => ready.listen(),
CachingResult::Cached { result } => return result.clone(),
};
listener.await;
match &*self.caching_result.read().expect("lock poisoned") {
CachingResult::Caching { .. } => unreachable!(),
CachingResult::Cached { result } => result.clone(),
}
}
}
impl<'a> ProxyInner<'a> {
pub(crate) fn new(
conn: Connection,
destination: BusName<'a>,
path: ObjectPath<'a>,
interface: InterfaceName<'a>,
cache: CacheProperties,
uncached_properties: HashSet<Str<'a>>,
) -> Self {
let property_cache = match cache {
CacheProperties::Yes | CacheProperties::Lazily => Some(OnceCell::new()),
CacheProperties::No => None,
};
Self {
inner_without_borrows: ProxyInnerStatic {
conn,
dest_owner_change_match_rule: OnceCell::new(),
},
destination,
path,
interface,
property_cache,
uncached_properties,
}
}
pub(crate) async fn subscribe_dest_owner_change(&self) -> Result<()> {
if !self.inner_without_borrows.conn.is_bus() {
return Ok(());
}
let well_known_name = match &self.destination {
BusName::WellKnown(well_known_name) => well_known_name,
BusName::Unique(_) => return Ok(()),
};
if self
.inner_without_borrows
.dest_owner_change_match_rule
.get()
.is_some()
{
return Ok(());
}
let conn = &self.inner_without_borrows.conn;
let signal_rule: OwnedMatchRule = MatchRule::builder()
.msg_type(MessageType::Signal)
.sender("org.freedesktop.DBus")?
.path("/org/freedesktop/DBus")?
.interface("org.freedesktop.DBus")?
.member("NameOwnerChanged")?
.add_arg(well_known_name.as_str())?
.build()
.to_owned()
.into();
conn.add_match(
signal_rule.clone(),
Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED),
)
.await?;
if self
.inner_without_borrows
.dest_owner_change_match_rule
.set(signal_rule.clone())
.is_err()
{
conn.remove_match(signal_rule).await?;
}
Ok(())
}
}
const MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED: usize = 8;
impl<'a> Proxy<'a> {
pub async fn new<D, P, I>(
conn: &Connection,
destination: D,
path: P,
interface: I,
) -> Result<Proxy<'a>>
where
D: TryInto<BusName<'a>>,
P: TryInto<ObjectPath<'a>>,
I: TryInto<InterfaceName<'a>>,
D::Error: Into<Error>,
P::Error: Into<Error>,
I::Error: Into<Error>,
{
ProxyBuilder::new_bare(conn)
.destination(destination)?
.path(path)?
.interface(interface)?
.build()
.await
}
pub async fn new_owned<D, P, I>(
conn: Connection,
destination: D,
path: P,
interface: I,
) -> Result<Proxy<'a>>
where
D: TryInto<BusName<'static>>,
P: TryInto<ObjectPath<'static>>,
I: TryInto<InterfaceName<'static>>,
D::Error: Into<Error>,
P::Error: Into<Error>,
I::Error: Into<Error>,
{
ProxyBuilder::new_bare(&conn)
.destination(destination)?
.path(path)?
.interface(interface)?
.build()
.await
}
pub fn connection(&self) -> &Connection {
&self.inner.inner_without_borrows.conn
}
pub fn destination(&self) -> &BusName<'_> {
&self.inner.destination
}
pub fn path(&self) -> &ObjectPath<'_> {
&self.inner.path
}
pub fn interface(&self) -> &InterfaceName<'_> {
&self.inner.interface
}
pub async fn introspect(&self) -> fdo::Result<String> {
let proxy = IntrospectableProxy::builder(&self.inner.inner_without_borrows.conn)
.destination(&self.inner.destination)?
.path(&self.inner.path)?
.build()
.await?;
proxy.introspect().await
}
fn properties_proxy(&self) -> PropertiesProxy<'_> {
PropertiesProxy::builder(&self.inner.inner_without_borrows.conn)
.destination(self.inner.destination.as_ref())
.unwrap()
.path(self.inner.path.as_ref())
.unwrap()
.cache_properties(CacheProperties::No)
.build_internal()
.into()
}
fn owned_properties_proxy(&self) -> PropertiesProxy<'static> {
PropertiesProxy::builder(&self.inner.inner_without_borrows.conn)
.destination(self.inner.destination.to_owned())
.unwrap()
.path(self.inner.path.to_owned())
.unwrap()
.cache_properties(CacheProperties::No)
.build_internal()
.into()
}
pub(crate) fn get_property_cache(&self) -> Option<&Arc<PropertiesCache>> {
let cache = match &self.inner.property_cache {
Some(cache) => cache,
None => return None,
};
let (cache, _) = &cache.get_or_init(|| {
let proxy = self.owned_properties_proxy();
let interface = self.interface().to_owned();
let uncached_properties: HashSet<zvariant::Str<'static>> = self
.inner
.uncached_properties
.iter()
.map(|s| s.to_owned())
.collect();
let executor = self.connection().executor();
PropertiesCache::new(proxy, interface, executor, uncached_properties)
});
Some(cache)
}
pub fn cached_property<T>(&self, property_name: &str) -> Result<Option<T>>
where
T: TryFrom<OwnedValue>,
T::Error: Into<Error>,
{
self.cached_property_raw(property_name)
.as_deref()
.map(|v| T::try_from(OwnedValue::from(v)))
.transpose()
.map_err(Into::into)
}
pub fn cached_property_raw<'p>(
&'p self,
property_name: &'p str,
) -> Option<impl Deref<Target = Value<'static>> + 'p> {
if let Some(values) = self
.inner
.property_cache
.as_ref()
.and_then(OnceCell::get)
.map(|c| c.0.values.read().expect("lock poisoned"))
{
values
.get(property_name)
.and_then(|e| e.value.as_ref())?;
struct Wrapper<'a> {
values: RwLockReadGuard<'a, HashMap<String, PropertyValue>>,
property_name: &'a str,
}
impl Deref for Wrapper<'_> {
type Target = Value<'static>;
fn deref(&self) -> &Self::Target {
self.values
.get(self.property_name)
.and_then(|e| e.value.as_ref())
.map(|v| v.deref())
.expect("inexistent property")
}
}
Some(Wrapper {
values,
property_name,
})
} else {
None
}
}
async fn get_proxy_property(&self, property_name: &str) -> Result<OwnedValue> {
Ok(self
.properties_proxy()
.get(self.inner.interface.as_ref(), property_name)
.await?)
}
pub async fn get_property<T>(&self, property_name: &str) -> Result<T>
where
T: TryFrom<OwnedValue>,
T::Error: Into<Error>,
{
if let Some(cache) = self.get_property_cache() {
cache.ready().await?;
}
if let Some(value) = self.cached_property(property_name)? {
return Ok(value);
}
let value = self.get_proxy_property(property_name).await?;
value.try_into().map_err(Into::into)
}
pub async fn set_property<'t, T: 't>(&self, property_name: &str, value: T) -> fdo::Result<()>
where
T: Into<Value<'t>>,
{
self.properties_proxy()
.set(self.inner.interface.as_ref(), property_name, &value.into())
.await
}
pub async fn call_method<'m, M, B>(&self, method_name: M, body: &B) -> Result<Arc<Message>>
where
M: TryInto<MemberName<'m>>,
M::Error: Into<Error>,
B: serde::ser::Serialize + zvariant::DynamicType,
{
self.inner
.inner_without_borrows
.conn
.call_method(
Some(&self.inner.destination),
self.inner.path.as_str(),
Some(&self.inner.interface),
method_name,
body,
)
.await
}
pub async fn call<'m, M, B, R>(&self, method_name: M, body: &B) -> Result<R>
where
M: TryInto<MemberName<'m>>,
M::Error: Into<Error>,
B: serde::ser::Serialize + zvariant::DynamicType,
R: serde::de::DeserializeOwned + zvariant::Type,
{
let reply = self.call_method(method_name, body).await?;
reply.body()
}
pub async fn call_with_flags<'m, M, B, R>(
&self,
method_name: M,
flags: BitFlags<MethodFlags>,
body: &B,
) -> Result<Option<R>>
where
M: TryInto<MemberName<'m>>,
M::Error: Into<Error>,
B: serde::ser::Serialize + zvariant::DynamicType,
R: serde::de::DeserializeOwned + zvariant::Type,
{
let flags = flags
.iter()
.map(MessageFlags::from)
.collect::<BitFlags<_>>();
match self
.inner
.inner_without_borrows
.conn
.call_method_raw(
Some(self.destination()),
self.path(),
Some(self.interface()),
method_name,
flags,
body,
)
.await?
{
Some(reply) => reply.await?.body().map(Some),
None => Ok(None),
}
}
pub async fn call_noreply<'m, M, B>(&self, method_name: M, body: &B) -> Result<()>
where
M: TryInto<MemberName<'m>>,
M::Error: Into<Error>,
B: serde::ser::Serialize + zvariant::DynamicType,
{
self.call_with_flags::<_, _, ()>(method_name, MethodFlags::NoReplyExpected.into(), body)
.await?;
Ok(())
}
pub async fn receive_signal<'m: 'a, M>(&self, signal_name: M) -> Result<SignalStream<'a>>
where
M: TryInto<MemberName<'m>>,
M::Error: Into<Error>,
{
self.receive_signal_with_args(signal_name, &[]).await
}
pub async fn receive_signal_with_args<'m: 'a, M>(
&self,
signal_name: M,
args: &[(u8, &str)],
) -> Result<SignalStream<'a>>
where
M: TryInto<MemberName<'m>>,
M::Error: Into<Error>,
{
let signal_name = signal_name.try_into().map_err(Into::into)?;
self.receive_signals(Some(signal_name), args).await
}
async fn receive_signals<'m: 'a>(
&self,
signal_name: Option<MemberName<'m>>,
args: &[(u8, &str)],
) -> Result<SignalStream<'a>> {
self.inner.subscribe_dest_owner_change().await?;
SignalStream::new(self.clone(), signal_name, args).await
}
pub async fn receive_all_signals(&self) -> Result<SignalStream<'a>> {
self.receive_signals(None, &[]).await
}
pub async fn receive_property_changed<'name: 'a, T>(
&self,
name: &'name str,
) -> PropertyStream<'a, T> {
let properties = self.get_property_cache();
let event = if let Some(properties) = &properties {
let mut values = properties.values.write().expect("lock poisoned");
let entry = values
.entry(name.to_string())
.or_insert_with(PropertyValue::default);
entry.event.listen()
} else {
Event::new().listen()
};
PropertyStream {
name,
proxy: self.clone(),
event,
phantom: std::marker::PhantomData,
}
}
pub async fn receive_owner_changed(&self) -> Result<OwnerChangedStream<'_>> {
use futures_util::StreamExt;
let dbus_proxy = fdo::DBusProxy::builder(self.connection())
.cache_properties(CacheProperties::No)
.build()
.await?;
Ok(OwnerChangedStream {
stream: dbus_proxy
.receive_name_owner_changed_with_args(&[(0, self.destination().as_str())])
.await?
.map(Box::new(move |signal| {
let args = signal.args().unwrap();
let new_owner = args.new_owner().as_ref().map(|owner| owner.to_owned());
new_owner
})),
name: self.destination().clone(),
})
}
}
#[derive(Debug, Default)]
struct PropertyValue {
value: Option<OwnedValue>,
event: Event,
}
#[bitflags]
#[repr(u8)]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum MethodFlags {
NoReplyExpected = 0x1,
NoAutoStart = 0x2,
AllowInteractiveAuth = 0x4,
}
assert_impl_all!(MethodFlags: Send, Sync, Unpin);
impl From<MethodFlags> for MessageFlags {
fn from(method_flag: MethodFlags) -> Self {
match method_flag {
MethodFlags::NoReplyExpected => Self::NoReplyExpected,
MethodFlags::NoAutoStart => Self::NoAutoStart,
MethodFlags::AllowInteractiveAuth => Self::AllowInteractiveAuth,
}
}
}
type OwnerChangedStreamMap<'a> = Map<
fdo::NameOwnerChangedStream<'a>,
Box<dyn FnMut(fdo::NameOwnerChanged) -> Option<UniqueName<'static>> + Send + Sync + Unpin>,
>;
pub struct OwnerChangedStream<'a> {
stream: OwnerChangedStreamMap<'a>,
name: BusName<'a>,
}
assert_impl_all!(OwnerChangedStream<'_>: Send, Sync, Unpin);
impl OwnerChangedStream<'_> {
pub fn name(&self) -> &BusName<'_> {
&self.name
}
}
impl<'a> stream::Stream for OwnerChangedStream<'a> {
type Item = Option<UniqueName<'static>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use futures_util::StreamExt;
self.get_mut().stream.poll_next_unpin(cx)
}
}
#[derive(Debug)]
pub struct SignalStream<'a> {
stream: JoinMultiple<Vec<Peekable<MessageStream>>>,
src_unique_name: Option<UniqueName<'static>>,
phantom: PhantomData<&'a ()>,
}
impl<'a> SignalStream<'a> {
async fn new<'m: 'a>(
proxy: Proxy<'a>,
signal_name: Option<MemberName<'m>>,
args: &[(u8, &str)],
) -> Result<SignalStream<'a>> {
let mut rule_builder = MatchRule::builder()
.msg_type(MessageType::Signal)
.sender(proxy.destination())?
.path(proxy.path())?
.interface(proxy.interface())?;
if let Some(name) = &signal_name {
rule_builder = rule_builder.member(name)?;
}
for (i, arg) in args {
rule_builder = rule_builder.arg(*i, *arg)?;
}
let signal_rule: OwnedMatchRule = rule_builder.build().to_owned().into();
let conn = proxy.connection();
let (src_unique_name, stream) = match proxy.destination().to_owned() {
BusName::Unique(name) => (
Some(name),
JoinMultiple(vec![ordered_stream::OrderedStreamExt::peekable(
MessageStream::for_match_rule(signal_rule, conn, None).await?,
)]),
),
BusName::WellKnown(name) => {
use ordered_stream::OrderedStreamExt;
let name_owner_changed_rule = MatchRule::builder()
.msg_type(MessageType::Signal)
.sender("org.freedesktop.DBus")?
.path("/org/freedesktop/DBus")?
.interface("org.freedesktop.DBus")?
.member("NameOwnerChanged")?
.add_arg(name.as_str())?
.build();
let name_owner_changed_stream = MessageStream::for_match_rule(
name_owner_changed_rule,
conn,
Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED),
)
.await?
.map(Either::Left);
let get_name_owner = conn
.call_method_raw(
Some("org.freedesktop.DBus"),
"/org/freedesktop/DBus",
Some("org.freedesktop.DBus"),
"GetNameOwner",
BitFlags::empty(),
&name,
)
.await
.map(|r| FromFuture::from(r.expect("no reply")).map(Either::Right))?;
let mut join = join_streams(name_owner_changed_stream, get_name_owner);
let mut src_unique_name = loop {
match join.next().await {
Some(Either::Left(msg)) => {
if let Some(signal) =
msg.map(NameOwnerChanged::from_message).ok().flatten()
{
if let Ok(args) = signal.args() {
match (args.name(), args.new_owner().deref()) {
(BusName::WellKnown(n), Some(new_owner)) if n == &name => {
break Some(new_owner.to_owned());
}
_ => (),
}
}
}
}
Some(Either::Right(Ok(response))) => {
break Some(response.body::<UniqueName<'_>>()?.to_owned())
}
Some(Either::Right(Err(e))) => {
debug!("Failed to get owner of {name}: {e}");
break None;
}
None => {
return Err(Error::InputOutput(
std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"connection closed",
)
.into(),
))
}
}
};
let (stream, _, queued) = join.into_inner();
if let Some(msg) = queued.and_then(|e| match e.0 {
Either::Left(Ok(msg)) => Some(msg),
Either::Left(Err(_)) | Either::Right(_) => None,
}) {
if let Some(signal) = NameOwnerChanged::from_message(msg) {
if let Ok(args) = signal.args() {
match (args.name(), args.new_owner().deref()) {
(BusName::WellKnown(n), Some(new_owner)) if n == &name => {
src_unique_name = Some(new_owner.to_owned());
}
_ => (),
}
}
}
}
let name_owner_changed_stream = stream.into_inner();
let stream = JoinMultiple(vec![
MessageStream::for_match_rule(signal_rule, conn, None)
.await?
.peekable(),
name_owner_changed_stream.peekable(),
]);
(src_unique_name, stream)
}
};
Ok(Self {
stream,
src_unique_name,
phantom: PhantomData,
})
}
fn filter(&mut self, msg: &Arc<Message>) -> Result<bool> {
let header = msg.header()?;
let sender = header.sender()?;
if sender == self.src_unique_name.as_ref() {
return Ok(true);
}
if let Some(signal) = NameOwnerChanged::from_message(msg.clone()) {
let args = signal.args()?;
self.src_unique_name = args.new_owner().as_ref().map(|n| n.to_owned());
}
Ok(false)
}
}
assert_impl_all!(SignalStream<'_>: Send, Sync, Unpin);
impl<'a> stream::Stream for SignalStream<'a> {
type Item = Arc<Message>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match ready!(OrderedStream::poll_next_before(self.as_mut(), cx, None)) {
PollResult::Item {
data: msg,
ordering: _,
} => return Poll::Ready(Some(msg)),
PollResult::Terminated => return Poll::Ready(None),
PollResult::NoneBefore => unreachable!(),
}
}
}
}
impl<'a> OrderedStream for SignalStream<'a> {
type Data = Arc<Message>;
type Ordering = MessageSequence;
fn poll_next_before(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
before: Option<&Self::Ordering>,
) -> Poll<PollResult<Self::Ordering, Self::Data>> {
let this = self.get_mut();
loop {
match ready!(OrderedStream::poll_next_before(
Pin::new(&mut this.stream),
cx,
before
)) {
PollResult::Item { data, ordering } => {
if let Ok(msg) = data {
if let Ok(true) = this.filter(&msg) {
return Poll::Ready(PollResult::Item {
data: msg,
ordering,
});
}
}
}
PollResult::Terminated => return Poll::Ready(PollResult::Terminated),
PollResult::NoneBefore => return Poll::Ready(PollResult::NoneBefore),
}
}
}
}
impl<'a> stream::FusedStream for SignalStream<'a> {
fn is_terminated(&self) -> bool {
ordered_stream::FusedOrderedStream::is_terminated(&self.stream)
}
}
#[async_trait::async_trait]
impl AsyncDrop for SignalStream<'_> {
async fn async_drop(self) {
for stream in self.stream.0 {
stream.into_inner().0.async_drop().await
}
}
}
impl<'a> From<crate::blocking::Proxy<'a>> for Proxy<'a> {
fn from(proxy: crate::blocking::Proxy<'a>) -> Self {
proxy.into_inner()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
dbus_interface, dbus_proxy, utils::block_on, AsyncDrop, ConnectionBuilder, SignalContext,
};
use futures_util::StreamExt;
use ntest::timeout;
use test_log::test;
#[test]
#[timeout(15000)]
fn signal() {
block_on(test_signal()).unwrap();
}
async fn test_signal() -> Result<()> {
let conn = Connection::session().await?;
let dest_conn = Connection::session().await?;
let unique_name = dest_conn.unique_name().unwrap().clone();
let well_known = "org.freedesktop.zbus.async.ProxySignalStreamTest";
let proxy: Proxy<'_> = ProxyBuilder::new_bare(&conn)
.destination(well_known)?
.path("/does/not/matter")?
.interface("does.not.matter")?
.build()
.await?;
let mut owner_changed_stream = proxy.receive_owner_changed().await?;
let proxy = fdo::DBusProxy::new(&dest_conn).await?;
let mut name_acquired_stream = proxy
.receive_signal_with_args("NameAcquired", &[(0, well_known)])
.await?;
let prop_stream =
proxy
.receive_property_changed("SomeProp")
.await
.filter_map(|changed| async move {
let v: Option<u32> = changed.get().await.ok();
dbg!(v)
});
drop(proxy);
drop(prop_stream);
dest_conn.request_name(well_known).await?;
let (new_owner, acquired_signal) =
futures_util::join!(owner_changed_stream.next(), name_acquired_stream.next(),);
assert_eq!(&new_owner.unwrap().unwrap(), &*unique_name);
let acquired_signal = acquired_signal.unwrap();
assert_eq!(acquired_signal.body::<&str>().unwrap(), well_known);
let proxy = Proxy::new(&conn, &unique_name, "/does/not/matter", "does.not.matter").await?;
let mut unique_name_changed_stream = proxy.receive_owner_changed().await?;
drop(dest_conn);
name_acquired_stream.async_drop().await;
let new_owner = owner_changed_stream.next().await;
assert!(new_owner.unwrap().is_none());
let new_unique_owner = unique_name_changed_stream.next().await;
assert!(new_unique_owner.unwrap().is_none());
Ok(())
}
#[test]
#[timeout(15000)]
fn signal_stream_deadlock() {
block_on(test_signal_stream_deadlock()).unwrap();
}
async fn test_signal_stream_deadlock() -> Result<()> {
#[dbus_proxy(
gen_blocking = false,
default_path = "/org/zbus/Test",
default_service = "org.zbus.Test.MR501",
interface = "org.zbus.Test"
)]
trait Test {
#[dbus_proxy(signal)]
fn my_signal(&self, msg: &str) -> Result<()>;
}
struct TestIface;
#[dbus_interface(name = "org.zbus.Test")]
impl TestIface {
#[dbus_interface(signal)]
async fn my_signal(context: &SignalContext<'_>, msg: &'static str) -> Result<()>;
}
let test_iface = TestIface;
let server_conn = ConnectionBuilder::session()?
.name("org.zbus.Test.MR501")?
.serve_at("/org/zbus/Test", test_iface)?
.build()
.await?;
let client_conn = ConnectionBuilder::session()?.max_queued(1).build().await?;
let test_proxy = TestProxy::new(&client_conn).await?;
let test_prop_proxy = PropertiesProxy::builder(&client_conn)
.destination("org.zbus.Test.MR501")?
.path("/org/zbus/Test")?
.build()
.await?;
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let handle = {
let tx = tx.clone();
let conn = server_conn.clone();
let server_fut = async move {
use std::time::Duration;
#[cfg(not(feature = "tokio"))]
use async_io::Timer;
#[cfg(feature = "tokio")]
use tokio::time::sleep;
let iface_ref = conn
.object_server()
.interface::<_, TestIface>("/org/zbus/Test")
.await
.unwrap();
let context = iface_ref.signal_context();
while !tx.is_closed() {
for _ in 0..10 {
TestIface::my_signal(context, "This is a test")
.await
.unwrap();
}
#[cfg(not(feature = "tokio"))]
Timer::after(Duration::from_millis(5)).await;
#[cfg(feature = "tokio")]
sleep(Duration::from_millis(5)).await;
}
};
server_conn.executor().spawn(server_fut, "server_task")
};
let signal_fut = async {
let mut signal_stream = test_proxy.receive_my_signal().await.unwrap();
tx.send(()).await.unwrap();
while let Some(_signal) = signal_stream.next().await {}
};
let prop_fut = async move {
rx.recv().await.unwrap();
let _prop_stream = test_prop_proxy.receive_properties_changed().await.unwrap();
};
futures_util::pin_mut!(signal_fut);
futures_util::pin_mut!(prop_fut);
futures_util::future::select(signal_fut, prop_fut).await;
handle.await;
Ok(())
}
}