dbus/
blocking.rs

1//! Connections and proxies that make blocking method calls.
2
3
4use crate::strings::{BusName, Path, Interface, Member};
5use crate::arg::{AppendAll, ReadAll, IterAppend};
6use crate::{channel, Error, Message};
7use crate::message::{MatchRule, SignalArgs, MessageType};
8use crate::channel::{Channel, BusType, Token};
9use std::{cell::RefCell, time::Duration, sync::Mutex};
10use std::sync::atomic::{AtomicBool, Ordering};
11use crate::filters::Filters;
12
13#[allow(missing_docs)]
14mod generated_org_freedesktop_standard_interfaces;
15#[allow(dead_code)]
16mod generated_org_freedesktop_dbus;
17
18/// This module contains some standard interfaces and an easy way to call them.
19///
20/// See the [D-Bus specification](https://dbus.freedesktop.org/doc/dbus-specification.html#standard-interfaces) for more information about these standard interfaces.
21///
22/// The code was created by dbus-codegen.
23pub mod stdintf {
24    #[allow(missing_docs)]
25    pub mod org_freedesktop_dbus {
26        pub use super::super::generated_org_freedesktop_standard_interfaces::*;
27
28        #[derive(Debug, PartialEq, Eq, Copy, Clone)]
29        pub enum RequestNameReply {
30            PrimaryOwner = 1,
31            InQueue = 2,
32            Exists = 3,
33            AlreadyOwner = 4,
34        }
35
36        #[derive(Debug, PartialEq, Eq, Copy, Clone)]
37        pub enum ReleaseNameReply {
38            Released = 1,
39            NonExistent = 2,
40            NotOwner = 3,
41        }
42
43        #[derive(Debug, PartialEq, Eq, Copy, Clone)]
44        pub enum EmitsChangedSignal {
45            True,
46            Invalidates,
47            Const,
48            False,
49        }
50
51        pub (crate) fn request_name<S: crate::blocking::BlockingSender>(s: &S, name: &str, allow_replacement: bool, replace_existing: bool, do_not_queue: bool)
52            -> Result<RequestNameReply, crate::Error> {
53            let flags: u32 =
54                if allow_replacement { 1 } else { 0 } +
55                if replace_existing { 2 } else { 0 } +
56                if do_not_queue { 4 } else { 0 };
57            let proxy = super::proxy(s);
58            use super::org_freedesktop::DBus;
59            let r = proxy.request_name(name, flags)?;
60            use RequestNameReply::*;
61            let all = [PrimaryOwner, InQueue, Exists, AlreadyOwner];
62            all.iter().find(|x| **x as u32 == r).copied().ok_or_else(||
63                crate::Error::new_failed("Invalid reply from DBus server")
64            )
65        }
66
67        pub (crate) fn release_name<S: crate::blocking::BlockingSender>(s: &S, name: &str)
68            -> Result<ReleaseNameReply, crate::Error> {
69
70            let proxy = super::proxy(s);
71            use super::org_freedesktop::DBus;
72            let r = proxy.release_name(name)?;
73            use ReleaseNameReply::*;
74            let all = [Released, NonExistent, NotOwner];
75            all.iter().find(|x| **x as u32 == r).copied().ok_or_else(||
76                crate::Error::new_failed("Invalid reply from DBus server")
77            )
78        }
79
80        use crate::arg;
81        impl PropertiesPropertiesChanged {
82            pub fn add_prop<F: FnOnce() -> Box<dyn arg::RefArg>>(&mut self, prop_name: &str, emits: EmitsChangedSignal, f: F) -> bool {
83                match emits {
84                    EmitsChangedSignal::False => { false },
85                    EmitsChangedSignal::Invalidates => {
86                        if !self.invalidated_properties.iter().any(|x| x == prop_name) {
87                            self.invalidated_properties.push(prop_name.into())
88                        }
89                        true
90                    }
91                    EmitsChangedSignal::True => {
92                        let val = f();
93                        self.changed_properties.insert(prop_name.into(), arg::Variant(val));
94                        true
95                    }
96                    EmitsChangedSignal::Const => panic!("Called add_prop with EmitsChangedSignal::Const")
97                }
98            }
99        }
100    }
101
102    // Not public yet, because of lack of named arguments
103    pub (super) mod org_freedesktop {
104        pub(crate) use super::super::generated_org_freedesktop_dbus::*;
105    }
106
107    pub (crate) fn proxy<C>(c: C) -> crate::blocking::Proxy<'static, C> {
108        super::Proxy::new("org.freedesktop.DBus", "/org/freedesktop/DBus", std::time::Duration::from_millis(5000), c)
109    }
110}
111
112/// A connection to D-Bus, thread local + non-async version
113pub struct LocalConnection {
114    channel: Channel,
115    filters: RefCell<Filters<LocalFilterCb>>,
116    all_signal_matches: AtomicBool,
117}
118
119/// A connection to D-Bus, non-async version where callbacks are Send but not Sync.
120pub struct Connection {
121    channel: Channel,
122    filters: RefCell<Filters<FilterCb>>,
123    all_signal_matches: AtomicBool,
124}
125
126/// A connection to D-Bus, Send + Sync + non-async version
127pub struct SyncConnection {
128    channel: Channel,
129    filters: Mutex<Filters<SyncFilterCb>>,
130    all_signal_matches: AtomicBool,
131}
132
133use crate::blocking::stdintf::org_freedesktop_dbus;
134
135macro_rules! connimpl {
136     ($c: ident, $cb: ident $(, $ss:tt)*) =>  {
137
138type
139    $cb = Box<dyn FnMut(Message, &$c) -> bool $(+ $ss)* + 'static>;
140
141
142impl $c {
143
144    /// Create a new connection to the session bus.
145    pub fn new_session() -> Result<Self, Error> {
146        Channel::get_private(BusType::Session).map(From::from)
147    }
148
149    /// Create a new connection to the system-wide bus.
150    pub fn new_system() -> Result<Self, Error> {
151        Channel::get_private(BusType::System).map(From::from)
152    }
153
154    /// Get the connection's unique name.
155    ///
156    /// It's usually something like ":1.54"
157    pub fn unique_name(&self) -> BusName { self.channel.unique_name().unwrap().into() }
158
159    /// Create a convenience struct for easier calling of many methods on the same destination and path.
160    pub fn with_proxy<'a, 'b, D: Into<BusName<'a>>, P: Into<Path<'a>>>(&'b self, dest: D, path: P, timeout: Duration) ->
161    Proxy<'a, &'b Self> {
162        Proxy { connection: self, destination: dest.into(), path: path.into(), timeout }
163    }
164
165
166    /// Request a name on the D-Bus.
167    ///
168    /// For detailed information on the flags and return values, see the libdbus documentation.
169    pub fn request_name<'a, N: Into<BusName<'a>>>(&self, name: N, allow_replacement: bool, replace_existing: bool, do_not_queue: bool)
170    -> Result<org_freedesktop_dbus::RequestNameReply, Error> {
171        org_freedesktop_dbus::request_name(&self.channel, &name.into(), allow_replacement, replace_existing, do_not_queue)
172    }
173
174    /// Release a previously requested name on the D-Bus.
175    pub fn release_name<'a, N: Into<BusName<'a>>>(&self, name: N) -> Result<org_freedesktop_dbus::ReleaseNameReply, Error> {
176        org_freedesktop_dbus::release_name(&self.channel, &name.into())
177    }
178
179    /// Adds a new match to the connection, and sets up a callback when this message arrives.
180    ///
181    /// If multiple [`MatchRule`]s match the same message, then by default only the first match will
182	/// get the callback. This behaviour can be changed for signal messages by calling
183	/// [`set_signal_match_mode`](Self::set_signal_match_mode).
184    ///
185    /// The returned value can be used to remove the match. The match is also removed if the callback
186    /// returns "false".
187    pub fn add_match<S: ReadAll, F>(&self, match_rule: MatchRule<'static>, f: F) -> Result<Token, Error>
188    where F: FnMut(S, &Self, &Message) -> bool $(+ $ss)* + 'static {
189        let m = match_rule.match_str();
190        self.add_match_no_cb(&m)?;
191        use channel::MatchingReceiver;
192        Ok(self.start_receive(match_rule, MakeSignal::make(f, m)))
193    }
194
195    /// Adds a new match to the connection, without setting up a callback when this message arrives.
196    pub fn add_match_no_cb(&self, match_str: &str) -> Result<(), Error> {
197        use crate::blocking::stdintf::org_freedesktop::DBus;
198        let proxy = stdintf::proxy(self);
199        proxy.add_match(match_str)
200    }
201
202    /// Removes a match from the connection, without removing any callbacks.
203    pub fn remove_match_no_cb(&self, match_str: &str) -> Result<(), Error> {
204        use crate::blocking::stdintf::org_freedesktop::DBus;
205        let proxy = stdintf::proxy(self);
206        proxy.remove_match(match_str)
207    }
208
209    /// Removes a previously added match and callback from the connection.
210    pub fn remove_match(&self, id: Token) -> Result<(), Error> {
211        use channel::MatchingReceiver;
212        let (mr, _) = self.stop_receive(id).ok_or_else(|| Error::new_failed("No match with that id found"))?;
213        self.remove_match_no_cb(&mr.match_str())
214    }
215
216    /// If true, configures the connection to send signal messages to all matching [`MatchRule`]
217    /// filters added with [`add_match`](Self::add_match) rather than just the first one. This comes
218    /// with the following gotchas:
219    ///
220    ///  * The messages might be duplicated, so the message serial might be lost (this is
221    ///    generally not a problem for signals).
222    ///  * Panicking inside a match callback might mess with other callbacks, causing them
223    ///    to be permanently dropped.
224    ///  * Removing other matches from inside a match callback is not supported.
225    ///
226    /// This is false by default, for a newly-created connection.
227    pub fn set_signal_match_mode(&self, match_all: bool) {
228        self.all_signal_matches.store(match_all, Ordering::Release);
229    }
230
231    /// Tries to handle an incoming message if there is one. If there isn't one,
232    /// it will wait up to timeout
233    ///
234    /// This method only takes "&self" instead of "&mut self", but it is a logic error to call
235    /// it recursively and might lead to panics or deadlocks.
236    ///
237    /// For `SyncConnection`: It is also a logic error to call this method from one thread, while
238    /// calling this or other methods from other threads. This can lead to messages being lost.
239    ///
240    /// Returns true when there was a message to process, and false when time out reached.
241    pub fn process(&self, timeout: Duration) -> Result<bool, Error> {
242        if let Some(msg) = self.channel.blocking_pop_message(timeout)? {
243            if self.all_signal_matches.load(Ordering::Acquire) && msg.msg_type() == MessageType::Signal {
244                // If it's a signal and the mode is enabled, send a copy of the message to all
245                // matching filters.
246                let matching_filters = self.filters_mut().remove_all_matching(&msg);
247                // `matching_filters` needs to be a separate variable and not inlined here, because
248                // if it's inline then the `MutexGuard` will live too long and we'll get a deadlock
249                // on the next call to `filters_mut()` below.
250                for mut ff in matching_filters {
251                    if let Ok(copy) = msg.duplicate() {
252                        if ff.2(copy, self) {
253                            self.filters_mut().insert(ff);
254                        }
255                    } else {
256                        // Silently drop the message, but add the filter back.
257                        self.filters_mut().insert(ff);
258                    }
259                }
260            } else {
261                // Otherwise, send the original message to only the first matching filter.
262                let ff = self.filters_mut().remove_first_matching(&msg);
263                if let Some(mut ff) = ff {
264                    if ff.2(msg, self) {
265                        self.filters_mut().insert(ff);
266                    }
267                } else if let Some(reply) = crate::channel::default_reply(&msg) {
268                    let _ = self.channel.send(reply);
269                }
270            }
271            Ok(true)
272        } else {
273            Ok(false)
274        }
275    }
276
277    /// The channel for this connection
278    pub fn channel(&self) -> &Channel {
279        &self.channel
280    }
281}
282
283impl BlockingSender for $c {
284    fn send_with_reply_and_block(&self, msg: Message, timeout: Duration) -> Result<Message, Error> {
285        self.channel.send_with_reply_and_block(msg, timeout)
286    }
287}
288
289impl From<Channel> for $c {
290    fn from(channel: Channel) -> $c { $c {
291        channel,
292        filters: Default::default(),
293        all_signal_matches: AtomicBool::new(false),
294    } }
295}
296
297impl channel::Sender for $c {
298    fn send(&self, msg: Message) -> Result<u32, ()> { self.channel.send(msg) }
299}
300
301impl<S: ReadAll, F: FnMut(S, &$c, &Message) -> bool $(+ $ss)* + 'static> MakeSignal<$cb, S, $c> for F {
302    fn make(mut self, mstr: String) -> $cb {
303        Box::new(move |msg: Message, conn: &$c| {
304            if let Ok(s) = S::read(&mut msg.iter_init()) {
305                if self(s, conn, &msg) { return true };
306                let proxy = stdintf::proxy(conn);
307                use crate::blocking::stdintf::org_freedesktop::DBus;
308                let _ = proxy.remove_match(&mstr);
309                false
310            } else { true }
311        })
312    }
313}
314
315impl channel::MatchingReceiver for $c {
316    type F = $cb;
317    fn start_receive(&self, m: MatchRule<'static>, f: Self::F) -> Token {
318        self.filters_mut().add(m, f)
319    }
320    fn stop_receive(&self, id: Token) -> Option<(MatchRule<'static>, Self::F)> {
321        self.filters_mut().remove(id)
322    }
323}
324
325
326
327     }
328}
329
330connimpl!(Connection, FilterCb, Send);
331connimpl!(LocalConnection, LocalFilterCb);
332connimpl!(SyncConnection, SyncFilterCb, Send, Sync);
333
334impl Connection {
335    fn filters_mut(&self) -> std::cell::RefMut<Filters<FilterCb>> { self.filters.borrow_mut() }
336}
337
338impl LocalConnection {
339    fn filters_mut(&self) -> std::cell::RefMut<Filters<LocalFilterCb>> { self.filters.borrow_mut() }
340}
341
342impl SyncConnection {
343    fn filters_mut(&self) -> std::sync::MutexGuard<Filters<SyncFilterCb>> { self.filters.lock().unwrap() }
344}
345
346/// Abstraction over different connections
347pub trait BlockingSender {
348    /// Sends a message over the D-Bus and blocks, waiting for a reply or a timeout. This is used for method calls.
349    ///
350    /// Note: In case of an error reply, this is returned as an Err(), not as a Ok(Message) with the error type.
351    fn send_with_reply_and_block(&self, msg: Message, timeout: Duration) -> Result<Message, Error>;
352}
353
354impl BlockingSender for Channel {
355    fn send_with_reply_and_block(&self, msg: Message, timeout: Duration) -> Result<Message, Error> {
356        Channel::send_with_reply_and_block(self, msg, timeout)
357    }
358}
359
360/// A struct that wraps a connection, destination and path.
361///
362/// A D-Bus "Proxy" is a client-side object that corresponds to a remote object on the server side.
363/// Calling methods on the proxy object calls methods on the remote object.
364/// Read more in the [D-Bus tutorial](https://dbus.freedesktop.org/doc/dbus-tutorial.html#proxies)
365#[derive(Clone, Debug)]
366pub struct Proxy<'a, C> {
367    /// Destination, i e what D-Bus service you're communicating with
368    pub destination: BusName<'a>,
369    /// Object path on the destination
370    pub path: Path<'a>,
371    /// Timeout for method calls
372    pub timeout: Duration,
373    /// Some way to send and/or receive messages, either blocking or non-blocking.
374    pub connection: C,
375}
376
377impl<'a, C> Proxy<'a, C> {
378    /// Creates a new proxy struct.
379    pub fn new<D: Into<BusName<'a>>, P: Into<Path<'a>>>(dest: D, path: P, timeout: Duration, connection: C) -> Self {
380        Proxy { destination: dest.into(), path: path.into(), timeout, connection }
381    }
382}
383
384impl<'a, T: BlockingSender, C: std::ops::Deref<Target=T>> Proxy<'a, C> {
385// impl<'a, S: std::convert::AsRef<channel::Sender>> Proxy<'a, S> {
386    /// Make a method call using typed input and output arguments, then block waiting for a reply.
387    ///
388    /// # Example
389    ///
390    /// ```
391    /// use dbus::blocking::{Connection, Proxy};
392    ///
393    /// let conn = Connection::new_session()?;
394    /// let proxy = Proxy::new("org.freedesktop.DBus", "/", std::time::Duration::from_millis(5000), &conn);
395    /// let (has_owner,): (bool,) = proxy.method_call("org.freedesktop.DBus", "NameHasOwner", ("dummy.name.without.owner",))?;
396    /// assert_eq!(has_owner, false);
397    /// # Ok::<(), Box<dyn std::error::Error>>(())
398    /// ```
399    pub fn method_call<'i, 'm, R: ReadAll, A: AppendAll, I: Into<Interface<'i>>, M: Into<Member<'m>>>(&self, i: I, m: M, args: A) -> Result<R, Error> {
400        let mut msg = Message::method_call(&self.destination, &self.path, &i.into(), &m.into());
401        args.append(&mut IterAppend::new(&mut msg));
402        let r = self.connection.send_with_reply_and_block(msg, self.timeout)?;
403        Ok(R::read(&mut r.iter_init())?)
404    }
405
406    /// Starts matching incoming messages on this destination and path.
407    ///
408    /// For matching signals, match_signal might be more convenient.
409    ///
410    /// The match rule will be modified to include this path and destination only.
411    ///
412    /// If call_add_match is true, will notify the D-Bus server that matching should start.
413    pub fn match_start(&self, mut mr: MatchRule<'static>, call_add_match: bool, f: <T as channel::MatchingReceiver>::F)
414    -> Result<Token, Error>
415    where T: channel::MatchingReceiver {
416        mr.path = Some(self.path.clone().into_static());
417        mr.sender = Some(self.destination.clone().into_static());
418        if call_add_match {
419            use crate::blocking::stdintf::org_freedesktop::DBus;
420            let proxy = stdintf::proxy(&*self.connection);
421            proxy.add_match(&mr.match_str())?;
422        }
423
424        Ok(self.connection.start_receive(mr, f))
425    }
426
427    /// Stops matching a signal added with match_start or match_signal.
428    ///
429    /// If call_remove_match is true, will notify the D-Bus server that matching should stop,
430    /// this should be true in case match_signal was used.
431    pub fn match_stop(&self, id: Token, call_remove_match: bool) -> Result<(), Error>
432    where T: channel::MatchingReceiver {
433        if let Some((mr, _)) = self.connection.stop_receive(id) {
434            if call_remove_match {
435                use crate::blocking::stdintf::org_freedesktop::DBus;
436                let proxy = stdintf::proxy(&*self.connection);
437                proxy.remove_match(&mr.match_str())?;
438            }
439        }
440        Ok(())
441    }
442
443    /// Sets up an incoming signal match, that calls the supplied callback every time the signal is received.
444    ///
445    /// The returned value can be used to remove the match. The match is also removed if the callback
446    /// returns "false".
447    pub fn match_signal<S: SignalArgs + ReadAll, F>(&self, f: F) -> Result<Token, Error>
448    where T: channel::MatchingReceiver,
449          F: MakeSignal<<T as channel::MatchingReceiver>::F, S, T>
450    {
451        let mr = S::match_rule(Some(&self.destination), Some(&self.path)).static_clone();
452        let ff = f.make(mr.match_str());
453        self.match_start(mr, true, ff)
454    }
455}
456
457/// Internal helper trait
458pub trait MakeSignal<G, S, T> {
459    /// Internal helper trait
460    fn make(self, mstr: String) -> G;
461}
462
463#[test]
464fn test_add_match() {
465    use self::stdintf::org_freedesktop_dbus::PropertiesPropertiesChanged as Ppc;
466    let c = Connection::new_session().unwrap();
467    let x = c.add_match(Ppc::match_rule(None, None), |_: Ppc, _, _| { true }).unwrap();
468    c.remove_match(x).unwrap();
469}
470
471#[test]
472fn test_conn_send_sync() {
473    fn is_send<T: Send>(_: &T) {}
474    fn is_sync<T: Sync>(_: &T) {}
475
476    let c = SyncConnection::new_session().unwrap();
477    is_send(&c);
478    is_sync(&c);
479
480    let c = Connection::new_session().unwrap();
481    is_send(&c);
482}
483
484#[test]
485fn test_peer() {
486    let c = Connection::new_session().unwrap();
487
488    let c_name = c.unique_name().into_static();
489    use std::sync::Arc;
490    let done = Arc::new(false);
491    let d2 = done.clone();
492    let j = std::thread::spawn(move || {
493        let c2 = Connection::new_session().unwrap();
494
495        let proxy = c2.with_proxy(c_name, "/", Duration::from_secs(5));
496        let (s2,): (String,) = proxy.method_call("org.freedesktop.DBus.Peer", "GetMachineId", ()).unwrap();
497        println!("{}", s2);
498        assert_eq!(Arc::strong_count(&d2), 2);
499        s2
500    });
501    assert_eq!(Arc::strong_count(&done), 2);
502
503    for _ in 0..30 {
504        c.process(Duration::from_millis(100)).unwrap();
505        if Arc::strong_count(&done) < 2 { break; }
506    }
507
508    let s2 = j.join().unwrap();
509
510    #[cfg(unix)]
511    {
512        let proxy = c.with_proxy("org.a11y.Bus", "/org/a11y/bus", Duration::from_secs(5));
513        let (s1,): (String,) = proxy.method_call("org.freedesktop.DBus.Peer", "GetMachineId", ()).unwrap();
514
515        assert_eq!(s1, s2);
516    }
517
518}