1use calloop::{
2 channel,
3 generic::{Fd, Generic},
4 {EventSource, InsertError, Interest, Mode, Poll, Readiness, Source},
5};
6use dbus::{
7 arg::ReadAll,
8 blocking::stdintf::org_freedesktop_dbus,
9 blocking::{BlockingSender, Connection, LocalConnection, Proxy, SyncConnection},
10 channel::{BusType, Channel, MatchingReceiver, Sender, Token},
11 message::{MatchRule, MessageType},
12 strings::{BusName, Interface, Member, Path},
13 Error, Message,
14};
15use log::{trace, warn};
16
17use std::io;
18use std::time::Duration;
19
20pub use dbus;
21mod filters;
22use filters::Filters;
23
24pub struct DBusSource<Data: 'static> {
26 conn: Connection,
27 watch: Generic<Fd>,
28 filters: std::cell::RefCell<Filters<FilterCb<Data>>>,
29 channel: channel::Channel<Message>,
30}
31
32pub struct LocalDBusSource<Data: 'static> {
34 conn: LocalConnection,
35 watch: Generic<Fd>,
36 filters: std::cell::RefCell<Filters<LocalFilterCb<Data>>>,
37 channel: channel::Channel<Message>,
38}
39
40pub struct SyncDBusSource<Data: 'static> {
42 conn: SyncConnection,
43 watch: Generic<Fd>,
44 filters: std::sync::Mutex<Filters<SyncFilterCb<Data>>>,
45 channel: std::sync::Mutex<channel::Channel<Message>>,
46}
47
48macro_rules! sourceimpl {
49 ($source: ident, $connection: ident, $callback: ident $(, $ss:tt)*) => {
50
51type $callback<Data> = Box<dyn FnMut(Message, &$source<Data>, &mut Data) -> bool $(+ $ss)* + 'static>;
52
53impl<Data> $source<Data> {
54 pub fn new_session() -> io::Result<(Self, channel::Sender<Message>)> {
56 Self::new(Channel::get_private(BusType::Session))
57 }
58
59 pub fn new_system() -> io::Result<(Self, channel::Sender<Message>)> {
61 Self::new(Channel::get_private(BusType::System))
62 }
63
64 fn new(c: Result<Channel, Error>) -> io::Result<(Self, channel::Sender<Message>)> {
65 let mut channel = c.map_err(|_| {
66 io::Error::new(io::ErrorKind::ConnectionRefused, "Failed to connet to DBus")
67 })?;
68
69 channel.set_watch_enabled(true);
70
71 let watch_fd = channel.watch();
72
73 let interest = match (watch_fd.read, watch_fd.write) {
74 (true, true) => Interest::Both,
75 (false, true) => Interest::Writable,
76 (true, false) => Interest::Readable,
77 (false, false) => {
78 return Err(io::Error::new(
79 io::ErrorKind::Other,
80 "fd nether read nor write",
81 ))
82 }
83 };
84
85 let watch = Generic::from_fd(watch_fd.fd, interest, Mode::Level);
86
87 let conn: $connection = channel.into();
88
89 let mut match_rule_nameacquired = MatchRule::default();
91 match_rule_nameacquired.msg_type = Some(MessageType::Signal);
92 match_rule_nameacquired.path = Some(Path::new("/org/freedesktop/DBus").unwrap());
93 match_rule_nameacquired.interface = Some(Interface::new("org.freedesktop.DBus").unwrap());
94 match_rule_nameacquired.member = Some(Member::new("NameAcquired").unwrap());
95
96 let (sender, channel) = channel::channel::<Message>();
97
98 let source = Self {
99 conn,
100 watch,
101 filters: Default::default(),
102 channel: Self::pack_channel(channel)
103 };
104
105 source.add_match(match_rule_nameacquired, |_: (), _, _| true).unwrap();
106 Ok((source, sender))
107
108 }
109
110 pub fn unique_name(&self) -> BusName {
114 self.conn.unique_name()
115 }
116
117 pub fn with_proxy<'a, 'b, Dest: Into<BusName<'a>>, BusPath: Into<Path<'a>>>(
118 &'b self,
119 dest: Dest,
120 path: BusPath,
121 timeout: Duration
122 ) -> Proxy<'a, &'b Self> {
123 Proxy { connection: self, destination: dest.into(), path: path.into(), timeout }
124 }
125
126 pub fn request_name<'a, Name: Into<BusName<'a>>>(
130 &self,
131 name: Name,
132 allow_replacement: bool,
133 replace_existing: bool,
134 do_not_queue: bool,
135 ) -> Result<org_freedesktop_dbus::RequestNameReply, Error> {
136 self.conn
137 .request_name(name, allow_replacement, replace_existing, do_not_queue)
138 }
139
140 pub fn release_name<'a, Name: Into<BusName<'a>>>(&self, name: Name) -> Result<org_freedesktop_dbus::ReleaseNameReply, Error> {
142 self.conn.release_name(name)
143 }
144
145 pub fn add_match<Args: ReadAll, Callback>(
150 &self,
151 match_rule: MatchRule<'static>,
152 callback: Callback,
153 ) -> Result<dbus::channel::Token, dbus::Error>
154 where
155 Callback: FnMut(Args, &Self, &Message) -> bool $(+ $ss)* + 'static,
156 {
157 let match_str = match_rule.match_str();
158 self.conn.add_match_no_cb(&match_str)?;
159 Ok(self.start_receive(match_rule, MakeSignal::make(callback, match_str)))
160 }
161
162 pub fn add_match_data<Args: ReadAll, Callback>(
168 &self,
169 match_rule: MatchRule<'static>,
170 callback: Callback,
171 ) -> Result<dbus::channel::Token, dbus::Error>
172 where
173 Callback: FnMut(Args, &Self, &Message, &mut Data) -> bool $(+ $ss)* + 'static,
174 {
175 let match_str = match_rule.match_str();
176 self.conn.add_match_no_cb(&match_str)?;
177 Ok(self.start_receive(match_rule, MakeDataSignal::make(callback, match_str)))
178 }
179
180 pub fn remove_match(&self, id: Token) -> Result<(), Error> {
182 let (match_rule, _) = self.stop_receive(id).ok_or_else(|| Error::new_failed("No match with id found"))?;
183 self.conn.remove_match_no_cb(&match_rule.match_str())
184 }
185
186 pub fn process(&mut self, timeout: Duration) -> Result<bool, Error> {
187 self.conn.process(timeout)
188 }
189
190 pub fn channel(&self) -> &Channel {
192 self.conn.channel()
193 }
194
195 pub fn quick_insert(
199 self,
200 handle: calloop::LoopHandle<Data>,
201 panic_on_orphan: bool,
202 ) -> Result<Source<$source<Data>>, InsertError<$source<Data>>> {
203 handle.insert_source(self, move |msg, connection, data| {
204 match connection.filters_mut().get_matches(&msg) {
205 Some((token, (_, callback))) => {
206 trace!("match on {:?}", &msg);
207 if !callback(msg, &connection, data) {
208 return Some(*token)
209 }
210 }
211 None => {
212 if let Some(reply) = dbus::channel::default_reply(&msg) {
213 let _ = connection.send(reply);
214 return None;
215 }
216 if panic_on_orphan {
217 panic!("[calloop-dbus] Encountered an orphan event: {:#?}", msg);
218 }
219 warn!("orphan {:#?}", msg);
220 }
221 }
222 None
223 })
224 }
225}
226
227impl<Data> MatchingReceiver for $source<Data> {
228 type F = $callback<Data>;
229
230 fn start_receive(&self, match_rule: MatchRule<'static>, callback: Self::F) -> dbus::channel::Token {
231 self.filters_mut().add(match_rule, callback)
232 }
233
234 fn stop_receive(&self, id: dbus::channel::Token) -> Option<(MatchRule<'static>, Self::F)> {
235 self.filters_mut().remove(id)
236 }
237}
238
239impl<Data> BlockingSender for $source<Data> {
240 fn send_with_reply_and_block(&self, msg: Message, timeout: Duration) -> Result<Message, Error> {
241 self.conn.send_with_reply_and_block(msg, timeout)
242 }
243}
244
245impl<Data> Sender for $source<Data> {
246 fn send(&self, msg: Message) -> Result<u32, ()> {
247 trace!("sending {:?}", &msg);
248 self.conn.send(msg)
249 }
250}
251
252impl<Args: ReadAll, Callback: FnMut(Args, &$source<Data>, &Message, &mut Data) -> bool $(+ $ss)* + 'static, Data>
253 MakeDataSignal<$callback<Data>, Args, $source<Data>> for Callback
254{
255 fn make(mut self, match_str: String) -> $callback<Data> {
256 Box::new(move |msg: Message, event_source: &$source<Data>, data: &mut Data| {
257 if let Ok(args) = Args::read(&mut msg.iter_init()) {
258 if self(args, event_source, &msg, data) {
259 return true
260 };
261 let _ = event_source.conn.remove_match_no_cb(&match_str);
262 false
263 } else {
264 true
265 }
266 })
267 }
268}
269
270impl<Args: ReadAll, Callback: FnMut(Args, &$source<Data>, &Message) -> bool $(+ $ss)* + 'static, Data>
271 MakeSignal<$callback<Data>, Args, $source<Data>> for Callback
272{
273 fn make(mut self, match_str: String) -> $callback<Data> {
274 Box::new(move |msg: Message, event_source: &$source<Data>, _| {
275 if let Ok(args) = Args::read(&mut msg.iter_init()) {
276 if self(args, event_source, &msg) {
277 return true
278 };
279 let _ = event_source.conn.remove_match_no_cb(&match_str);
280 false
281 } else {
282 true
283 }
284 })
285 }
286}
287
288impl<Data> EventSource for $source<Data> {
289 type Event = Message;
290 type Metadata = $source<Data>;
291 type Ret = Option<Token>;
292
293 fn process_events<Callback>(
294 &mut self,
295 readiness: Readiness,
296 token: calloop::Token,
297 mut callback: Callback,
298 ) -> io::Result<()>
299 where
300 Callback: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
301 {
302 let mut signals: Vec<Message> = Vec::new();
303 self.channel_mut().process_events(readiness, token, |event, _| {
304 if let channel::Event::Msg(msg) = event {
305 signals.push(msg);
306 }
307 }).unwrap();
308
309 for s in signals {
310 self.send(s).unwrap();
311 }
312
313 self.conn
315 .channel()
316 .read_write(Some(Duration::from_millis(0)))
317 .map_err(|()| {
318 io::Error::new(io::ErrorKind::NotConnected, "DBus connection is closed")
319 })?;
320
321 while let Some(message) = self.conn.channel().pop_message() {
323 trace!("recieved {:?}", &message);
324 if let Some(token) = callback(message, self) {
325 self.remove_match(token)
326 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
327 }
328 }
329
330 self.conn.channel().flush();
331 Ok(())
332 }
333
334 fn register(&mut self, poll: &mut Poll, token: calloop::Token) -> io::Result<()> {
335 self.watch.register(poll, token)
336 .and_then(|_| self.channel_mut().register(poll, token))
337 }
338
339 fn reregister(&mut self, poll: &mut Poll, token: calloop::Token) -> io::Result<()> {
340 self.watch.reregister(poll, token)
341 .and_then(|_| self.channel_mut().reregister(poll, token))
342 }
343
344 fn unregister(&mut self, poll: &mut Poll) -> io::Result<()> {
345 self.watch.unregister(poll)
346 .and_then(|_| self.channel_mut().unregister(poll))
347 }
348}
349
350 }
351}
352
353sourceimpl!(DBusSource, Connection, FilterCb, Send);
354sourceimpl!(LocalDBusSource, LocalConnection, LocalFilterCb);
355sourceimpl!(SyncDBusSource, SyncConnection, SyncFilterCb, Send, Sync);
356
357impl<Data> DBusSource<Data> {
358 fn filters_mut(&self) -> std::cell::RefMut<Filters<FilterCb<Data>>> {
359 self.filters.borrow_mut()
360 }
361
362 fn channel_mut(&mut self) -> &mut channel::Channel<Message> {
363 &mut self.channel
364 }
365
366 fn pack_channel(channel: channel::Channel<Message>) -> channel::Channel<Message> {
367 channel
368 }
369}
370
371impl<Data> LocalDBusSource<Data> {
372 fn filters_mut(&self) -> std::cell::RefMut<Filters<LocalFilterCb<Data>>> {
373 self.filters.borrow_mut()
374 }
375
376 fn channel_mut(&mut self) -> &mut channel::Channel<Message> {
377 &mut self.channel
378 }
379
380 fn pack_channel(channel: channel::Channel<Message>) -> channel::Channel<Message> {
381 channel
382 }
383}
384
385impl<Data> SyncDBusSource<Data> {
386 fn filters_mut(&self) -> std::sync::MutexGuard<Filters<SyncFilterCb<Data>>> {
387 self.filters.lock().unwrap()
388 }
389
390 fn channel_mut(&self) -> std::sync::MutexGuard<channel::Channel<Message>> {
391 self.channel.lock().unwrap()
392 }
393
394 fn pack_channel(
395 channel: channel::Channel<Message>,
396 ) -> std::sync::Mutex<channel::Channel<Message>> {
397 std::sync::Mutex::new(channel)
398 }
399}
400
401pub trait MakeSignal<G, S, T> {
403 fn make(self, match_str: String) -> G;
405}
406pub trait MakeDataSignal<G, S, T> {
409 fn make(self, match_str: String) -> G;
411}
412
413#[test]
414fn test_add_match() {
415 use dbus::blocking::stdintf::org_freedesktop_dbus::PropertiesPropertiesChanged as Ppc;
416 use dbus::message::SignalArgs;
417 let (source, _): (DBusSource<usize>, _) = DBusSource::new_session().unwrap();
418 let token = source
419 .add_match(Ppc::match_rule(None, None), |_: Ppc, _, _| true)
420 .unwrap();
421 source.remove_match(token).unwrap();
422}
423
424#[test]
425fn test_conn_send_sync() {
426 fn is_send<T: Send>(_: &T) {}
427 fn is_sync<T: Sync>(_: &T) {}
428
429 let (source, _): (SyncDBusSource<usize>, _) = SyncDBusSource::new_session().unwrap();
430 is_send(&source);
431 is_sync(&source);
432
433 let (source, _): (DBusSource<usize>, _) = DBusSource::new_session().unwrap();
434 is_send(&source);
435}
436
437#[test]
438fn test_peer() {
439 let (mut source, _): (DBusSource<usize>, _) = DBusSource::new_session().unwrap();
440
441 let source_name = source.unique_name().into_static();
442 use std::sync::Arc;
443 let done = Arc::new(false);
444 let done2 = done.clone();
445 let thread = std::thread::spawn(move || {
446 let (source2, _): (DBusSource<usize>, _) = DBusSource::new_session().unwrap();
447
448 let proxy = source2.with_proxy(source_name, "/", Duration::from_secs(5));
449 let (signal2,): (String,) = proxy
450 .method_call("org.freedesktop.DBus.Peer", "GetMachineId", ())
451 .unwrap();
452 println!("{}", signal2);
453 assert_eq!(Arc::strong_count(&done2), 2);
454 signal2
455 });
456 assert_eq!(Arc::strong_count(&done), 2);
457
458 for _ in 0..30 {
459 source.process(Duration::from_millis(100)).unwrap();
460 if Arc::strong_count(&done) < 2 {
461 break;
462 }
463 }
464
465 let s2 = thread.join().unwrap();
466
467 let proxy = source.with_proxy("org.a11y.Bus", "/org/a11y/bus", Duration::from_secs(5));
468 let (s1,): (String,) = proxy
469 .method_call("org.freedesktop.DBus.Peer", "GetMachineId", ())
470 .unwrap();
471
472 assert_eq!(s1, s2);
473}