calloop_dbus/
lib.rs

1use calloop::{
2    channel,
3    generic::{Fd, Generic},
4    {EventSource, InsertError, Interest, Mode, Poll, Readiness, Source},
5};
6use dbus::{
7    arg::ReadAll,
8    blocking::stdintf::org_freedesktop_dbus,
9    blocking::{BlockingSender, Connection, LocalConnection, Proxy, SyncConnection},
10    channel::{BusType, Channel, MatchingReceiver, Sender, Token},
11    message::{MatchRule, MessageType},
12    strings::{BusName, Interface, Member, Path},
13    Error, Message,
14};
15use log::{trace, warn};
16
17use std::io;
18use std::time::Duration;
19
20pub use dbus;
21mod filters;
22use filters::Filters;
23
24/// A event source connection to D-Bus, non-async version where callbacks are Send but not Sync.
25pub struct DBusSource<Data: 'static> {
26    conn: Connection,
27    watch: Generic<Fd>,
28    filters: std::cell::RefCell<Filters<FilterCb<Data>>>,
29    channel: channel::Channel<Message>,
30}
31
32/// A event source conncetion to D-Bus, thread local + non-async version
33pub struct LocalDBusSource<Data: 'static> {
34    conn: LocalConnection,
35    watch: Generic<Fd>,
36    filters: std::cell::RefCell<Filters<LocalFilterCb<Data>>>,
37    channel: channel::Channel<Message>,
38}
39
40/// A event source connection to D-Bus, Send + Sync + non-async version
41pub struct SyncDBusSource<Data: 'static> {
42    conn: SyncConnection,
43    watch: Generic<Fd>,
44    filters: std::sync::Mutex<Filters<SyncFilterCb<Data>>>,
45    channel: std::sync::Mutex<channel::Channel<Message>>,
46}
47
48macro_rules! sourceimpl {
49    ($source: ident, $connection: ident, $callback: ident $(, $ss:tt)*) => {
50
51type $callback<Data> = Box<dyn FnMut(Message, &$source<Data>, &mut Data) -> bool $(+ $ss)* + 'static>;
52
53impl<Data> $source<Data> {
54    /// Create a new connection to the session bus.
55    pub fn new_session() -> io::Result<(Self, channel::Sender<Message>)> {
56        Self::new(Channel::get_private(BusType::Session))
57    }
58
59    /// Create a new connection to the system-wide bus.
60    pub fn new_system() -> io::Result<(Self, channel::Sender<Message>)> {
61        Self::new(Channel::get_private(BusType::System))
62    }
63
64    fn new(c: Result<Channel, Error>) -> io::Result<(Self, channel::Sender<Message>)> {
65        let mut channel = c.map_err(|_| {
66            io::Error::new(io::ErrorKind::ConnectionRefused, "Failed to connet to DBus")
67        })?;
68
69        channel.set_watch_enabled(true);
70
71        let watch_fd = channel.watch();
72
73        let interest = match (watch_fd.read, watch_fd.write) {
74            (true, true) => Interest::Both,
75            (false, true) => Interest::Writable,
76            (true, false) => Interest::Readable,
77            (false, false) => {
78                return Err(io::Error::new(
79                    io::ErrorKind::Other,
80                    "fd nether read nor write",
81                ))
82            }
83        };
84
85        let watch = Generic::from_fd(watch_fd.fd, interest, Mode::Level);
86
87        let conn: $connection = channel.into();
88
89        // lets a default match rule to catch the NameAcuierd messages
90        let mut match_rule_nameacquired = MatchRule::default();
91        match_rule_nameacquired.msg_type = Some(MessageType::Signal);
92        match_rule_nameacquired.path = Some(Path::new("/org/freedesktop/DBus").unwrap());
93        match_rule_nameacquired.interface = Some(Interface::new("org.freedesktop.DBus").unwrap());
94        match_rule_nameacquired.member = Some(Member::new("NameAcquired").unwrap());
95
96        let (sender, channel) = channel::channel::<Message>();
97
98        let source = Self {
99            conn,
100            watch,
101            filters: Default::default(),
102            channel: Self::pack_channel(channel)
103        };
104
105        source.add_match(match_rule_nameacquired, |_: (), _, _| true).unwrap();
106        Ok((source, sender))
107
108    }
109
110    /// Get the connection's unique name.
111    ///
112    /// It's usually something like ":1.54"
113    pub fn unique_name(&self) -> BusName {
114        self.conn.unique_name()
115    }
116
117    pub fn with_proxy<'a, 'b, Dest: Into<BusName<'a>>, BusPath: Into<Path<'a>>>(
118        &'b self,
119        dest: Dest,
120        path: BusPath,
121        timeout: Duration
122    ) -> Proxy<'a, &'b Self> {
123        Proxy { connection: self, destination: dest.into(), path: path.into(), timeout }
124    }
125
126    /// Request a name on the D-Bus.
127    ///
128    /// For detailed information on the flags and return values, see the libdbus documentation.
129    pub fn request_name<'a, Name: Into<BusName<'a>>>(
130        &self,
131        name: Name,
132        allow_replacement: bool,
133        replace_existing: bool,
134        do_not_queue: bool,
135    ) -> Result<org_freedesktop_dbus::RequestNameReply, Error> {
136        self.conn
137            .request_name(name, allow_replacement, replace_existing, do_not_queue)
138    }
139
140    /// Release a previously requested name on the D-Bus.
141    pub fn release_name<'a, Name: Into<BusName<'a>>>(&self, name: Name) -> Result<org_freedesktop_dbus::ReleaseNameReply, Error> {
142        self.conn.release_name(name)
143    }
144
145    /// Adds a new match to the connection, and sets up a callback when this message arrives.
146    ///
147    /// The returned value can be used to remove the match. The match is also removed if the callback
148    /// returns "false".
149    pub fn add_match<Args: ReadAll, Callback>(
150        &self,
151        match_rule: MatchRule<'static>,
152        callback: Callback,
153    ) -> Result<dbus::channel::Token, dbus::Error>
154    where
155        Callback: FnMut(Args, &Self, &Message) -> bool $(+ $ss)* + 'static,
156    {
157        let match_str = match_rule.match_str();
158        self.conn.add_match_no_cb(&match_str)?;
159        Ok(self.start_receive(match_rule, MakeSignal::make(callback, match_str)))
160    }
161
162    /// Adds a new match to the connection, and sets up a callback when this message arrives. This
163    /// callback will be able to access the calloop user data.
164    ///
165    /// The returned value can be used to remove the match. The match is also removed if the callback
166    /// returns "false".
167    pub fn add_match_data<Args: ReadAll, Callback>(
168        &self,
169        match_rule: MatchRule<'static>,
170        callback: Callback,
171    ) -> Result<dbus::channel::Token, dbus::Error>
172    where
173        Callback: FnMut(Args, &Self, &Message, &mut Data) -> bool $(+ $ss)* + 'static,
174    {
175        let match_str = match_rule.match_str();
176        self.conn.add_match_no_cb(&match_str)?;
177        Ok(self.start_receive(match_rule, MakeDataSignal::make(callback, match_str)))
178    }
179
180    /// Removes a previously added match and callback from the connection.
181    pub fn remove_match(&self, id: Token) -> Result<(), Error> {
182        let (match_rule, _) = self.stop_receive(id).ok_or_else(|| Error::new_failed("No match with id found"))?;
183        self.conn.remove_match_no_cb(&match_rule.match_str())
184    }
185
186    pub fn process(&mut self, timeout: Duration) -> Result<bool, Error> {
187        self.conn.process(timeout)
188    }
189
190    /// The Channel for this connection
191    pub fn channel(&self) -> &Channel {
192        self.conn.channel()
193    }
194
195    /// Insert this source into the given event loop with an adapder that ether panics on orphan
196    /// events or just logs it at warn level. You probably only what this if you set eavesdrop on a
197    /// MatchRule.
198    pub fn quick_insert(
199        self,
200        handle: calloop::LoopHandle<Data>,
201        panic_on_orphan: bool,
202    ) -> Result<Source<$source<Data>>, InsertError<$source<Data>>> {
203        handle.insert_source(self, move |msg, connection, data| {
204            match connection.filters_mut().get_matches(&msg) {
205                Some((token, (_, callback))) => {
206                    trace!("match on {:?}", &msg);
207                    if !callback(msg, &connection, data) {
208                        return Some(*token)
209                    }
210                }
211                None => {
212                    if let Some(reply) = dbus::channel::default_reply(&msg) {
213                        let _ = connection.send(reply);
214                        return None;
215                    }
216                    if panic_on_orphan {
217                        panic!("[calloop-dbus] Encountered an orphan event: {:#?}", msg);
218                    }
219                    warn!("orphan {:#?}", msg);
220                }
221            }
222            None
223        })
224    }
225}
226
227impl<Data> MatchingReceiver for $source<Data> {
228    type F = $callback<Data>;
229
230    fn start_receive(&self, match_rule: MatchRule<'static>, callback: Self::F) -> dbus::channel::Token {
231        self.filters_mut().add(match_rule, callback)
232    }
233
234    fn stop_receive(&self, id: dbus::channel::Token) -> Option<(MatchRule<'static>, Self::F)> {
235        self.filters_mut().remove(id)
236    }
237}
238
239impl<Data> BlockingSender for $source<Data> {
240    fn send_with_reply_and_block(&self, msg: Message, timeout: Duration) -> Result<Message, Error> {
241        self.conn.send_with_reply_and_block(msg, timeout)
242    }
243}
244
245impl<Data> Sender for $source<Data> {
246    fn send(&self, msg: Message) -> Result<u32, ()> {
247        trace!("sending {:?}", &msg);
248        self.conn.send(msg)
249    }
250}
251
252impl<Args: ReadAll, Callback: FnMut(Args, &$source<Data>, &Message, &mut Data) -> bool $(+ $ss)* + 'static, Data>
253    MakeDataSignal<$callback<Data>, Args, $source<Data>> for Callback
254{
255    fn make(mut self, match_str: String) -> $callback<Data> {
256        Box::new(move |msg: Message, event_source: &$source<Data>, data: &mut Data| {
257            if let Ok(args) = Args::read(&mut msg.iter_init()) {
258                if self(args, event_source, &msg, data) {
259                    return true
260                };
261                let _ = event_source.conn.remove_match_no_cb(&match_str);
262                false
263            } else {
264                true
265            }
266        })
267    }
268}
269
270impl<Args: ReadAll, Callback: FnMut(Args, &$source<Data>, &Message) -> bool $(+ $ss)* + 'static, Data>
271    MakeSignal<$callback<Data>, Args, $source<Data>> for Callback
272{
273    fn make(mut self, match_str: String) -> $callback<Data> {
274        Box::new(move |msg: Message, event_source: &$source<Data>, _| {
275            if let Ok(args) = Args::read(&mut msg.iter_init()) {
276                if self(args, event_source, &msg) {
277                    return true
278                };
279                let _ = event_source.conn.remove_match_no_cb(&match_str);
280                false
281            } else {
282                true
283            }
284        })
285    }
286}
287
288impl<Data> EventSource for $source<Data> {
289    type Event = Message;
290    type Metadata = $source<Data>;
291    type Ret = Option<Token>;
292
293    fn process_events<Callback>(
294        &mut self,
295        readiness: Readiness,
296        token: calloop::Token,
297        mut callback: Callback,
298    ) -> io::Result<()>
299    where
300        Callback: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
301    {
302        let mut signals: Vec<Message> = Vec::new();
303        self.channel_mut().process_events(readiness, token, |event, _| {
304            if let channel::Event::Msg(msg) = event {
305                signals.push(msg);
306            }
307        }).unwrap();
308
309        for s in signals {
310            self.send(s).unwrap();
311        }
312
313        // read in all message and send queued ones
314        self.conn
315            .channel()
316            .read_write(Some(Duration::from_millis(0)))
317            .map_err(|()| {
318                io::Error::new(io::ErrorKind::NotConnected, "DBus connection is closed")
319            })?;
320
321        // process each message
322        while let Some(message) = self.conn.channel().pop_message() {
323            trace!("recieved {:?}", &message);
324            if let Some(token) = callback(message, self) {
325                self.remove_match(token)
326                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
327            }
328        }
329
330        self.conn.channel().flush();
331        Ok(())
332    }
333
334    fn register(&mut self, poll: &mut Poll, token: calloop::Token) -> io::Result<()> {
335        self.watch.register(poll, token)
336            .and_then(|_| self.channel_mut().register(poll, token))
337    }
338
339    fn reregister(&mut self, poll: &mut Poll, token: calloop::Token) -> io::Result<()> {
340        self.watch.reregister(poll, token)
341            .and_then(|_| self.channel_mut().reregister(poll, token))
342    }
343
344    fn unregister(&mut self, poll: &mut Poll) -> io::Result<()> {
345        self.watch.unregister(poll)
346            .and_then(|_| self.channel_mut().unregister(poll))
347    }
348}
349
350    }
351}
352
353sourceimpl!(DBusSource, Connection, FilterCb, Send);
354sourceimpl!(LocalDBusSource, LocalConnection, LocalFilterCb);
355sourceimpl!(SyncDBusSource, SyncConnection, SyncFilterCb, Send, Sync);
356
357impl<Data> DBusSource<Data> {
358    fn filters_mut(&self) -> std::cell::RefMut<Filters<FilterCb<Data>>> {
359        self.filters.borrow_mut()
360    }
361
362    fn channel_mut(&mut self) -> &mut channel::Channel<Message> {
363        &mut self.channel
364    }
365
366    fn pack_channel(channel: channel::Channel<Message>) -> channel::Channel<Message> {
367        channel
368    }
369}
370
371impl<Data> LocalDBusSource<Data> {
372    fn filters_mut(&self) -> std::cell::RefMut<Filters<LocalFilterCb<Data>>> {
373        self.filters.borrow_mut()
374    }
375
376    fn channel_mut(&mut self) -> &mut channel::Channel<Message> {
377        &mut self.channel
378    }
379
380    fn pack_channel(channel: channel::Channel<Message>) -> channel::Channel<Message> {
381        channel
382    }
383}
384
385impl<Data> SyncDBusSource<Data> {
386    fn filters_mut(&self) -> std::sync::MutexGuard<Filters<SyncFilterCb<Data>>> {
387        self.filters.lock().unwrap()
388    }
389
390    fn channel_mut(&self) -> std::sync::MutexGuard<channel::Channel<Message>> {
391        self.channel.lock().unwrap()
392    }
393
394    fn pack_channel(
395        channel: channel::Channel<Message>,
396    ) -> std::sync::Mutex<channel::Channel<Message>> {
397        std::sync::Mutex::new(channel)
398    }
399}
400
401/// Internal helper trait
402pub trait MakeSignal<G, S, T> {
403    /// Internal helper trait
404    fn make(self, match_str: String) -> G;
405}
406///
407/// Internal helper trait
408pub trait MakeDataSignal<G, S, T> {
409    /// Internal helper trait
410    fn make(self, match_str: String) -> G;
411}
412
413#[test]
414fn test_add_match() {
415    use dbus::blocking::stdintf::org_freedesktop_dbus::PropertiesPropertiesChanged as Ppc;
416    use dbus::message::SignalArgs;
417    let (source, _): (DBusSource<usize>, _) = DBusSource::new_session().unwrap();
418    let token = source
419        .add_match(Ppc::match_rule(None, None), |_: Ppc, _, _| true)
420        .unwrap();
421    source.remove_match(token).unwrap();
422}
423
424#[test]
425fn test_conn_send_sync() {
426    fn is_send<T: Send>(_: &T) {}
427    fn is_sync<T: Sync>(_: &T) {}
428
429    let (source, _): (SyncDBusSource<usize>, _) = SyncDBusSource::new_session().unwrap();
430    is_send(&source);
431    is_sync(&source);
432
433    let (source, _): (DBusSource<usize>, _) = DBusSource::new_session().unwrap();
434    is_send(&source);
435}
436
437#[test]
438fn test_peer() {
439    let (mut source, _): (DBusSource<usize>, _) = DBusSource::new_session().unwrap();
440
441    let source_name = source.unique_name().into_static();
442    use std::sync::Arc;
443    let done = Arc::new(false);
444    let done2 = done.clone();
445    let thread = std::thread::spawn(move || {
446        let (source2, _): (DBusSource<usize>, _) = DBusSource::new_session().unwrap();
447
448        let proxy = source2.with_proxy(source_name, "/", Duration::from_secs(5));
449        let (signal2,): (String,) = proxy
450            .method_call("org.freedesktop.DBus.Peer", "GetMachineId", ())
451            .unwrap();
452        println!("{}", signal2);
453        assert_eq!(Arc::strong_count(&done2), 2);
454        signal2
455    });
456    assert_eq!(Arc::strong_count(&done), 2);
457
458    for _ in 0..30 {
459        source.process(Duration::from_millis(100)).unwrap();
460        if Arc::strong_count(&done) < 2 {
461            break;
462        }
463    }
464
465    let s2 = thread.join().unwrap();
466
467    let proxy = source.with_proxy("org.a11y.Bus", "/org/a11y/bus", Duration::from_secs(5));
468    let (s1,): (String,) = proxy
469        .method_call("org.freedesktop.DBus.Peer", "GetMachineId", ())
470        .unwrap();
471
472    assert_eq!(s1, s2);
473}