use crate::strings::{BusName, Path, Interface, Member};
use crate::arg::{AppendAll, ReadAll, IterAppend};
use crate::{channel, Error, Message};
use crate::message::{MatchRule, SignalArgs};
use crate::channel::{Channel, BusType, Token};
use std::{cell::RefCell, time::Duration, sync::Mutex};
use crate::filters::Filters;
pub mod stdintf;
pub struct LocalConnection {
channel: Channel,
filters: RefCell<Filters<LocalFilterCb>>,
}
pub struct Connection {
channel: Channel,
filters: RefCell<Filters<FilterCb>>,
}
pub struct SyncConnection {
channel: Channel,
filters: Mutex<Filters<SyncFilterCb>>
}
use crate::blocking::stdintf::org_freedesktop_dbus;
macro_rules! connimpl {
($c: ident, $cb: ident $(, $ss:tt)*) => {
type
$cb = Box<dyn FnMut(Message, &$c) -> bool $(+ $ss)* + 'static>;
impl $c {
pub fn new_session() -> Result<Self, Error> {
Channel::get_private(BusType::Session).map(From::from)
}
pub fn new_system() -> Result<Self, Error> {
Channel::get_private(BusType::System).map(From::from)
}
pub fn unique_name(&self) -> BusName { self.channel.unique_name().unwrap().into() }
pub fn with_proxy<'a, 'b, D: Into<BusName<'a>>, P: Into<Path<'a>>>(&'b self, dest: D, path: P, timeout: Duration) ->
Proxy<'a, &'b Self> {
Proxy { connection: self, destination: dest.into(), path: path.into(), timeout }
}
pub fn request_name<'a, N: Into<BusName<'a>>>(&self, name: N, allow_replacement: bool, replace_existing: bool, do_not_queue: bool)
-> Result<org_freedesktop_dbus::RequestNameReply, Error> {
org_freedesktop_dbus::request_name(&self.channel, &name.into(), allow_replacement, replace_existing, do_not_queue)
}
pub fn release_name<'a, N: Into<BusName<'a>>>(&self, name: N) -> Result<org_freedesktop_dbus::ReleaseNameReply, Error> {
org_freedesktop_dbus::release_name(&self.channel, &name.into())
}
pub fn add_match<S: ReadAll, F>(&self, match_rule: MatchRule<'static>, f: F) -> Result<Token, Error>
where F: FnMut(S, &Self, &Message) -> bool $(+ $ss)* + 'static {
let m = match_rule.match_str();
self.add_match_no_cb(&m)?;
use channel::MatchingReceiver;
Ok(self.start_receive(match_rule, MakeSignal::make(f, m)))
}
pub fn add_match_no_cb(&self, match_str: &str) -> Result<(), Error> {
use crate::blocking::stdintf::org_freedesktop::DBus;
let proxy = stdintf::proxy(self);
proxy.add_match(match_str)
}
pub fn remove_match_no_cb(&self, match_str: &str) -> Result<(), Error> {
use crate::blocking::stdintf::org_freedesktop::DBus;
let proxy = stdintf::proxy(self);
proxy.remove_match(match_str)
}
pub fn remove_match(&self, id: Token) -> Result<(), Error> {
use channel::MatchingReceiver;
let (mr, _) = self.stop_receive(id).ok_or_else(|| Error::new_failed("No match with that id found"))?;
self.remove_match_no_cb(&mr.match_str())
}
pub fn process(&self, timeout: Duration) -> Result<bool, Error> {
if let Some(msg) = self.channel.blocking_pop_message(timeout)? {
let ff = self.filters_mut().remove_matching(&msg);
if let Some(mut ff) = ff {
if ff.2(msg, self) {
self.filters_mut().insert(ff);
}
} else if let Some(reply) = crate::channel::default_reply(&msg) {
let _ = self.channel.send(reply);
}
Ok(true)
} else {
Ok(false)
}
}
pub fn channel(&self) -> &Channel {
&self.channel
}
}
impl BlockingSender for $c {
fn send_with_reply_and_block(&self, msg: Message, timeout: Duration) -> Result<Message, Error> {
self.channel.send_with_reply_and_block(msg, timeout)
}
}
impl From<Channel> for $c {
fn from(channel: Channel) -> $c { $c {
channel, filters: Default::default(),
} }
}
impl channel::Sender for $c {
fn send(&self, msg: Message) -> Result<u32, ()> { self.channel.send(msg) }
}
impl<S: ReadAll, F: FnMut(S, &$c, &Message) -> bool $(+ $ss)* + 'static> MakeSignal<$cb, S, $c> for F {
fn make(mut self, mstr: String) -> $cb {
Box::new(move |msg: Message, conn: &$c| {
if let Ok(s) = S::read(&mut msg.iter_init()) {
if self(s, conn, &msg) { return true };
let proxy = stdintf::proxy(conn);
use crate::blocking::stdintf::org_freedesktop::DBus;
let _ = proxy.remove_match(&mstr);
false
} else { true }
})
}
}
impl channel::MatchingReceiver for $c {
type F = $cb;
fn start_receive(&self, m: MatchRule<'static>, f: Self::F) -> Token {
self.filters_mut().add(m, f)
}
fn stop_receive(&self, id: Token) -> Option<(MatchRule<'static>, Self::F)> {
self.filters_mut().remove(id)
}
}
}
}
connimpl!(Connection, FilterCb, Send);
connimpl!(LocalConnection, LocalFilterCb);
connimpl!(SyncConnection, SyncFilterCb, Send, Sync);
impl Connection {
fn filters_mut(&self) -> std::cell::RefMut<Filters<FilterCb>> { self.filters.borrow_mut() }
}
impl LocalConnection {
fn filters_mut(&self) -> std::cell::RefMut<Filters<LocalFilterCb>> { self.filters.borrow_mut() }
}
impl SyncConnection {
fn filters_mut(&self) -> std::sync::MutexGuard<Filters<SyncFilterCb>> { self.filters.lock().unwrap() }
}
pub trait BlockingSender {
fn send_with_reply_and_block(&self, msg: Message, timeout: Duration) -> Result<Message, Error>;
}
impl BlockingSender for Channel {
fn send_with_reply_and_block(&self, msg: Message, timeout: Duration) -> Result<Message, Error> {
Channel::send_with_reply_and_block(self, msg, timeout)
}
}
#[derive(Clone, Debug)]
pub struct Proxy<'a, C> {
pub destination: BusName<'a>,
pub path: Path<'a>,
pub timeout: Duration,
pub connection: C,
}
impl<'a, C> Proxy<'a, C> {
pub fn new<D: Into<BusName<'a>>, P: Into<Path<'a>>>(dest: D, path: P, timeout: Duration, connection: C) -> Self {
Proxy { destination: dest.into(), path: path.into(), timeout, connection }
}
}
impl<'a, T: BlockingSender, C: std::ops::Deref<Target=T>> Proxy<'a, C> {
pub fn method_call<'i, 'm, R: ReadAll, A: AppendAll, I: Into<Interface<'i>>, M: Into<Member<'m>>>(&self, i: I, m: M, args: A) -> Result<R, Error> {
let mut msg = Message::method_call(&self.destination, &self.path, &i.into(), &m.into());
args.append(&mut IterAppend::new(&mut msg));
let r = self.connection.send_with_reply_and_block(msg, self.timeout)?;
Ok(R::read(&mut r.iter_init())?)
}
pub fn match_start(&self, mut mr: MatchRule<'static>, call_add_match: bool, f: <T as channel::MatchingReceiver>::F)
-> Result<Token, Error>
where T: channel::MatchingReceiver {
mr.path = Some(self.path.clone().into_static());
mr.sender = Some(self.destination.clone().into_static());
if call_add_match {
use crate::blocking::stdintf::org_freedesktop::DBus;
let proxy = stdintf::proxy(&*self.connection);
proxy.add_match(&mr.match_str())?;
}
Ok(self.connection.start_receive(mr, f))
}
pub fn match_stop(&self, id: Token, call_remove_match: bool) -> Result<(), Error>
where T: channel::MatchingReceiver {
if let Some((mr, _)) = self.connection.stop_receive(id) {
if call_remove_match {
use crate::blocking::stdintf::org_freedesktop::DBus;
let proxy = stdintf::proxy(&*self.connection);
proxy.remove_match(&mr.match_str())?;
}
}
Ok(())
}
pub fn match_signal<S: SignalArgs + ReadAll, F>(&self, f: F) -> Result<Token, Error>
where T: channel::MatchingReceiver,
F: MakeSignal<<T as channel::MatchingReceiver>::F, S, T>
{
let mr = S::match_rule(Some(&self.destination), Some(&self.path)).static_clone();
let ff = f.make(mr.match_str());
self.match_start(mr, true, ff)
}
}
pub trait MakeSignal<G, S, T> {
fn make(self, mstr: String) -> G;
}
#[test]
fn test_add_match() {
use self::stdintf::org_freedesktop_dbus::PropertiesPropertiesChanged as Ppc;
let c = Connection::new_session().unwrap();
let x = c.add_match(Ppc::match_rule(None, None), |_: Ppc, _, _| { true }).unwrap();
c.remove_match(x).unwrap();
}
#[test]
fn test_conn_send_sync() {
fn is_send<T: Send>(_: &T) {}
fn is_sync<T: Sync>(_: &T) {}
let c = SyncConnection::new_session().unwrap();
is_send(&c);
is_sync(&c);
let c = Connection::new_session().unwrap();
is_send(&c);
}
#[test]
fn test_peer() {
let c = Connection::new_session().unwrap();
let c_name = c.unique_name().into_static();
use std::sync::Arc;
let done = Arc::new(false);
let d2 = done.clone();
let j = std::thread::spawn(move || {
let c2 = Connection::new_session().unwrap();
let proxy = c2.with_proxy(c_name, "/", Duration::from_secs(5));
let (s2,): (String,) = proxy.method_call("org.freedesktop.DBus.Peer", "GetMachineId", ()).unwrap();
println!("{}", s2);
assert_eq!(Arc::strong_count(&d2), 2);
s2
});
assert_eq!(Arc::strong_count(&done), 2);
for _ in 0..30 {
c.process(Duration::from_millis(100)).unwrap();
if Arc::strong_count(&done) < 2 { break; }
}
let s2 = j.join().unwrap();
let proxy = c.with_proxy("org.a11y.Bus", "/org/a11y/bus", Duration::from_secs(5));
let (s1,): (String,) = proxy.method_call("org.freedesktop.DBus.Peer", "GetMachineId", ()).unwrap();
assert_eq!(s1, s2);
}