1use crate::strings::{BusName, Path, Interface, Member};
5use crate::arg::{AppendAll, ReadAll, IterAppend};
6use crate::{channel, Error, Message};
7use crate::message::{MatchRule, SignalArgs, MessageType};
8use crate::channel::{Channel, BusType, Token};
9use std::{cell::RefCell, time::Duration, sync::Mutex};
10use std::sync::atomic::{AtomicBool, Ordering};
11use crate::filters::Filters;
12
13#[allow(missing_docs)]
14mod generated_org_freedesktop_standard_interfaces;
15#[allow(dead_code)]
16mod generated_org_freedesktop_dbus;
17
18pub mod stdintf {
24 #[allow(missing_docs)]
25 pub mod org_freedesktop_dbus {
26 pub use super::super::generated_org_freedesktop_standard_interfaces::*;
27
28 #[derive(Debug, PartialEq, Eq, Copy, Clone)]
29 pub enum RequestNameReply {
30 PrimaryOwner = 1,
31 InQueue = 2,
32 Exists = 3,
33 AlreadyOwner = 4,
34 }
35
36 #[derive(Debug, PartialEq, Eq, Copy, Clone)]
37 pub enum ReleaseNameReply {
38 Released = 1,
39 NonExistent = 2,
40 NotOwner = 3,
41 }
42
43 #[derive(Debug, PartialEq, Eq, Copy, Clone)]
44 pub enum EmitsChangedSignal {
45 True,
46 Invalidates,
47 Const,
48 False,
49 }
50
51 pub (crate) fn request_name<S: crate::blocking::BlockingSender>(s: &S, name: &str, allow_replacement: bool, replace_existing: bool, do_not_queue: bool)
52 -> Result<RequestNameReply, crate::Error> {
53 let flags: u32 =
54 if allow_replacement { 1 } else { 0 } +
55 if replace_existing { 2 } else { 0 } +
56 if do_not_queue { 4 } else { 0 };
57 let proxy = super::proxy(s);
58 use super::org_freedesktop::DBus;
59 let r = proxy.request_name(name, flags)?;
60 use RequestNameReply::*;
61 let all = [PrimaryOwner, InQueue, Exists, AlreadyOwner];
62 all.iter().find(|x| **x as u32 == r).copied().ok_or_else(||
63 crate::Error::new_failed("Invalid reply from DBus server")
64 )
65 }
66
67 pub (crate) fn release_name<S: crate::blocking::BlockingSender>(s: &S, name: &str)
68 -> Result<ReleaseNameReply, crate::Error> {
69
70 let proxy = super::proxy(s);
71 use super::org_freedesktop::DBus;
72 let r = proxy.release_name(name)?;
73 use ReleaseNameReply::*;
74 let all = [Released, NonExistent, NotOwner];
75 all.iter().find(|x| **x as u32 == r).copied().ok_or_else(||
76 crate::Error::new_failed("Invalid reply from DBus server")
77 )
78 }
79
80 use crate::arg;
81 impl PropertiesPropertiesChanged {
82 pub fn add_prop<F: FnOnce() -> Box<dyn arg::RefArg>>(&mut self, prop_name: &str, emits: EmitsChangedSignal, f: F) -> bool {
83 match emits {
84 EmitsChangedSignal::False => { false },
85 EmitsChangedSignal::Invalidates => {
86 if !self.invalidated_properties.iter().any(|x| x == prop_name) {
87 self.invalidated_properties.push(prop_name.into())
88 }
89 true
90 }
91 EmitsChangedSignal::True => {
92 let val = f();
93 self.changed_properties.insert(prop_name.into(), arg::Variant(val));
94 true
95 }
96 EmitsChangedSignal::Const => panic!("Called add_prop with EmitsChangedSignal::Const")
97 }
98 }
99 }
100 }
101
102 pub (super) mod org_freedesktop {
104 pub(crate) use super::super::generated_org_freedesktop_dbus::*;
105 }
106
107 pub (crate) fn proxy<C>(c: C) -> crate::blocking::Proxy<'static, C> {
108 super::Proxy::new("org.freedesktop.DBus", "/org/freedesktop/DBus", std::time::Duration::from_millis(5000), c)
109 }
110}
111
112pub struct LocalConnection {
114 channel: Channel,
115 filters: RefCell<Filters<LocalFilterCb>>,
116 all_signal_matches: AtomicBool,
117}
118
119pub struct Connection {
121 channel: Channel,
122 filters: RefCell<Filters<FilterCb>>,
123 all_signal_matches: AtomicBool,
124}
125
126pub struct SyncConnection {
128 channel: Channel,
129 filters: Mutex<Filters<SyncFilterCb>>,
130 all_signal_matches: AtomicBool,
131}
132
133use crate::blocking::stdintf::org_freedesktop_dbus;
134
135macro_rules! connimpl {
136 ($c: ident, $cb: ident $(, $ss:tt)*) => {
137
138type
139 $cb = Box<dyn FnMut(Message, &$c) -> bool $(+ $ss)* + 'static>;
140
141
142impl $c {
143
144 pub fn new_session() -> Result<Self, Error> {
146 Channel::get_private(BusType::Session).map(From::from)
147 }
148
149 pub fn new_system() -> Result<Self, Error> {
151 Channel::get_private(BusType::System).map(From::from)
152 }
153
154 pub fn unique_name(&self) -> BusName { self.channel.unique_name().unwrap().into() }
158
159 pub fn with_proxy<'a, 'b, D: Into<BusName<'a>>, P: Into<Path<'a>>>(&'b self, dest: D, path: P, timeout: Duration) ->
161 Proxy<'a, &'b Self> {
162 Proxy { connection: self, destination: dest.into(), path: path.into(), timeout }
163 }
164
165
166 pub fn request_name<'a, N: Into<BusName<'a>>>(&self, name: N, allow_replacement: bool, replace_existing: bool, do_not_queue: bool)
170 -> Result<org_freedesktop_dbus::RequestNameReply, Error> {
171 org_freedesktop_dbus::request_name(&self.channel, &name.into(), allow_replacement, replace_existing, do_not_queue)
172 }
173
174 pub fn release_name<'a, N: Into<BusName<'a>>>(&self, name: N) -> Result<org_freedesktop_dbus::ReleaseNameReply, Error> {
176 org_freedesktop_dbus::release_name(&self.channel, &name.into())
177 }
178
179 pub fn add_match<S: ReadAll, F>(&self, match_rule: MatchRule<'static>, f: F) -> Result<Token, Error>
188 where F: FnMut(S, &Self, &Message) -> bool $(+ $ss)* + 'static {
189 let m = match_rule.match_str();
190 self.add_match_no_cb(&m)?;
191 use channel::MatchingReceiver;
192 Ok(self.start_receive(match_rule, MakeSignal::make(f, m)))
193 }
194
195 pub fn add_match_no_cb(&self, match_str: &str) -> Result<(), Error> {
197 use crate::blocking::stdintf::org_freedesktop::DBus;
198 let proxy = stdintf::proxy(self);
199 proxy.add_match(match_str)
200 }
201
202 pub fn remove_match_no_cb(&self, match_str: &str) -> Result<(), Error> {
204 use crate::blocking::stdintf::org_freedesktop::DBus;
205 let proxy = stdintf::proxy(self);
206 proxy.remove_match(match_str)
207 }
208
209 pub fn remove_match(&self, id: Token) -> Result<(), Error> {
211 use channel::MatchingReceiver;
212 let (mr, _) = self.stop_receive(id).ok_or_else(|| Error::new_failed("No match with that id found"))?;
213 self.remove_match_no_cb(&mr.match_str())
214 }
215
216 pub fn set_signal_match_mode(&self, match_all: bool) {
228 self.all_signal_matches.store(match_all, Ordering::Release);
229 }
230
231 pub fn process(&self, timeout: Duration) -> Result<bool, Error> {
242 if let Some(msg) = self.channel.blocking_pop_message(timeout)? {
243 if self.all_signal_matches.load(Ordering::Acquire) && msg.msg_type() == MessageType::Signal {
244 let matching_filters = self.filters_mut().remove_all_matching(&msg);
247 for mut ff in matching_filters {
251 if let Ok(copy) = msg.duplicate() {
252 if ff.2(copy, self) {
253 self.filters_mut().insert(ff);
254 }
255 } else {
256 self.filters_mut().insert(ff);
258 }
259 }
260 } else {
261 let ff = self.filters_mut().remove_first_matching(&msg);
263 if let Some(mut ff) = ff {
264 if ff.2(msg, self) {
265 self.filters_mut().insert(ff);
266 }
267 } else if let Some(reply) = crate::channel::default_reply(&msg) {
268 let _ = self.channel.send(reply);
269 }
270 }
271 Ok(true)
272 } else {
273 Ok(false)
274 }
275 }
276
277 pub fn channel(&self) -> &Channel {
279 &self.channel
280 }
281}
282
283impl BlockingSender for $c {
284 fn send_with_reply_and_block(&self, msg: Message, timeout: Duration) -> Result<Message, Error> {
285 self.channel.send_with_reply_and_block(msg, timeout)
286 }
287}
288
289impl From<Channel> for $c {
290 fn from(channel: Channel) -> $c { $c {
291 channel,
292 filters: Default::default(),
293 all_signal_matches: AtomicBool::new(false),
294 } }
295}
296
297impl channel::Sender for $c {
298 fn send(&self, msg: Message) -> Result<u32, ()> { self.channel.send(msg) }
299}
300
301impl<S: ReadAll, F: FnMut(S, &$c, &Message) -> bool $(+ $ss)* + 'static> MakeSignal<$cb, S, $c> for F {
302 fn make(mut self, mstr: String) -> $cb {
303 Box::new(move |msg: Message, conn: &$c| {
304 if let Ok(s) = S::read(&mut msg.iter_init()) {
305 if self(s, conn, &msg) { return true };
306 let proxy = stdintf::proxy(conn);
307 use crate::blocking::stdintf::org_freedesktop::DBus;
308 let _ = proxy.remove_match(&mstr);
309 false
310 } else { true }
311 })
312 }
313}
314
315impl channel::MatchingReceiver for $c {
316 type F = $cb;
317 fn start_receive(&self, m: MatchRule<'static>, f: Self::F) -> Token {
318 self.filters_mut().add(m, f)
319 }
320 fn stop_receive(&self, id: Token) -> Option<(MatchRule<'static>, Self::F)> {
321 self.filters_mut().remove(id)
322 }
323}
324
325
326
327 }
328}
329
330connimpl!(Connection, FilterCb, Send);
331connimpl!(LocalConnection, LocalFilterCb);
332connimpl!(SyncConnection, SyncFilterCb, Send, Sync);
333
334impl Connection {
335 fn filters_mut(&self) -> std::cell::RefMut<Filters<FilterCb>> { self.filters.borrow_mut() }
336}
337
338impl LocalConnection {
339 fn filters_mut(&self) -> std::cell::RefMut<Filters<LocalFilterCb>> { self.filters.borrow_mut() }
340}
341
342impl SyncConnection {
343 fn filters_mut(&self) -> std::sync::MutexGuard<Filters<SyncFilterCb>> { self.filters.lock().unwrap() }
344}
345
346pub trait BlockingSender {
348 fn send_with_reply_and_block(&self, msg: Message, timeout: Duration) -> Result<Message, Error>;
352}
353
354impl BlockingSender for Channel {
355 fn send_with_reply_and_block(&self, msg: Message, timeout: Duration) -> Result<Message, Error> {
356 Channel::send_with_reply_and_block(self, msg, timeout)
357 }
358}
359
360#[derive(Clone, Debug)]
366pub struct Proxy<'a, C> {
367 pub destination: BusName<'a>,
369 pub path: Path<'a>,
371 pub timeout: Duration,
373 pub connection: C,
375}
376
377impl<'a, C> Proxy<'a, C> {
378 pub fn new<D: Into<BusName<'a>>, P: Into<Path<'a>>>(dest: D, path: P, timeout: Duration, connection: C) -> Self {
380 Proxy { destination: dest.into(), path: path.into(), timeout, connection }
381 }
382}
383
384impl<'a, T: BlockingSender, C: std::ops::Deref<Target=T>> Proxy<'a, C> {
385pub fn method_call<'i, 'm, R: ReadAll, A: AppendAll, I: Into<Interface<'i>>, M: Into<Member<'m>>>(&self, i: I, m: M, args: A) -> Result<R, Error> {
400 let mut msg = Message::method_call(&self.destination, &self.path, &i.into(), &m.into());
401 args.append(&mut IterAppend::new(&mut msg));
402 let r = self.connection.send_with_reply_and_block(msg, self.timeout)?;
403 Ok(R::read(&mut r.iter_init())?)
404 }
405
406 pub fn match_start(&self, mut mr: MatchRule<'static>, call_add_match: bool, f: <T as channel::MatchingReceiver>::F)
414 -> Result<Token, Error>
415 where T: channel::MatchingReceiver {
416 mr.path = Some(self.path.clone().into_static());
417 mr.sender = Some(self.destination.clone().into_static());
418 if call_add_match {
419 use crate::blocking::stdintf::org_freedesktop::DBus;
420 let proxy = stdintf::proxy(&*self.connection);
421 proxy.add_match(&mr.match_str())?;
422 }
423
424 Ok(self.connection.start_receive(mr, f))
425 }
426
427 pub fn match_stop(&self, id: Token, call_remove_match: bool) -> Result<(), Error>
432 where T: channel::MatchingReceiver {
433 if let Some((mr, _)) = self.connection.stop_receive(id) {
434 if call_remove_match {
435 use crate::blocking::stdintf::org_freedesktop::DBus;
436 let proxy = stdintf::proxy(&*self.connection);
437 proxy.remove_match(&mr.match_str())?;
438 }
439 }
440 Ok(())
441 }
442
443 pub fn match_signal<S: SignalArgs + ReadAll, F>(&self, f: F) -> Result<Token, Error>
448 where T: channel::MatchingReceiver,
449 F: MakeSignal<<T as channel::MatchingReceiver>::F, S, T>
450 {
451 let mr = S::match_rule(Some(&self.destination), Some(&self.path)).static_clone();
452 let ff = f.make(mr.match_str());
453 self.match_start(mr, true, ff)
454 }
455}
456
457pub trait MakeSignal<G, S, T> {
459 fn make(self, mstr: String) -> G;
461}
462
463#[test]
464fn test_add_match() {
465 use self::stdintf::org_freedesktop_dbus::PropertiesPropertiesChanged as Ppc;
466 let c = Connection::new_session().unwrap();
467 let x = c.add_match(Ppc::match_rule(None, None), |_: Ppc, _, _| { true }).unwrap();
468 c.remove_match(x).unwrap();
469}
470
471#[test]
472fn test_conn_send_sync() {
473 fn is_send<T: Send>(_: &T) {}
474 fn is_sync<T: Sync>(_: &T) {}
475
476 let c = SyncConnection::new_session().unwrap();
477 is_send(&c);
478 is_sync(&c);
479
480 let c = Connection::new_session().unwrap();
481 is_send(&c);
482}
483
484#[test]
485fn test_peer() {
486 let c = Connection::new_session().unwrap();
487
488 let c_name = c.unique_name().into_static();
489 use std::sync::Arc;
490 let done = Arc::new(false);
491 let d2 = done.clone();
492 let j = std::thread::spawn(move || {
493 let c2 = Connection::new_session().unwrap();
494
495 let proxy = c2.with_proxy(c_name, "/", Duration::from_secs(5));
496 let (s2,): (String,) = proxy.method_call("org.freedesktop.DBus.Peer", "GetMachineId", ()).unwrap();
497 println!("{}", s2);
498 assert_eq!(Arc::strong_count(&d2), 2);
499 s2
500 });
501 assert_eq!(Arc::strong_count(&done), 2);
502
503 for _ in 0..30 {
504 c.process(Duration::from_millis(100)).unwrap();
505 if Arc::strong_count(&done) < 2 { break; }
506 }
507
508 let s2 = j.join().unwrap();
509
510 #[cfg(unix)]
511 {
512 let proxy = c.with_proxy("org.a11y.Bus", "/org/a11y/bus", Duration::from_secs(5));
513 let (s1,): (String,) = proxy.method_call("org.freedesktop.DBus.Peer", "GetMachineId", ()).unwrap();
514
515 assert_eq!(s1, s2);
516 }
517
518}