async_rustbus/
lib.rs

1//! Async-rustbus is an async-rustbus library built on top of [`rustbus`].
2//! It is a multi-threaded client allowing for asynchronous calls to services,
3//! and the creation of local services.
4//!
5//! # Missing Features
6//! * Eavesdropping using match rules is not currently supported.
7//! * Monitor mode is not currently supported. There are plans to implement it.
8//! * There is no support for DBUS_COOKIE_SHA1 authentication. This makes DBus over TCP not
9//!   as useful with only ANOYNMOUS mode supported when using TCP (EXTERNAL is not available for TCP).
10//! # API Stability
11//! As with most crates,
12//! breaking changes will only be added after an increment of the most sigificant version number (SemVer).
13//! The most likely elements to change in the future is the incoming signal handling
14//! and the relation of this crate and [`rustbus`].
15//!
16//!
17//! # Examples
18//! An example client that queues info about the current DBus session server connections:
19//! ```
20//! # use tokio::runtime::Builder;
21//! # let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
22//! # runtime.block_on(async {
23//! use std::collections::HashMap;
24//! use futures::future::try_join_all;
25//! use async_rustbus::{RpcConn, MatchRule};
26//! use async_rustbus::rustbus_core::message_builder;
27//! use async_rustbus::rustbus_core::dbus_variant_var;
28//! use message_builder::{MessageBuilder, MarshalledMessage, MessageType};
29//!
30//! // Create the DBus connection to the session DBus.
31//! let conn = RpcConn::session_conn(false).await.unwrap();
32//!
33//! // Fetch the ID of the DBus
34//! let mut msg = MessageBuilder::new().call("GetId")
35//!     .with_interface("org.freedesktop.DBus")
36//!     .on("/org/freedesktop/DBus")
37//!     .at("org.freedesktop.DBus")
38//!     .build();
39//! let res = conn.send_msg_w_rsp(&msg).await.unwrap().await.unwrap();
40//! assert!(matches!(res.typ, MessageType::Reply));
41//! let id: &str = res.body.parser().get().unwrap();
42//! println!("Info for Dbus {}:", id);
43//!
44//! // Get call of the names of all connections.
45//! msg.dynheader.member = Some("ListNames".into());
46//! let res = conn.send_msg_w_rsp(&msg).await.unwrap().await.unwrap();
47//! assert!(matches!(res.typ, MessageType::Reply));
48//! let mut names: Vec<&str> = res.body.parser().get().unwrap();
49//! // Ignore unique names
50//! names.retain(|s| !s.starts_with(":"));
51//!
52//! // Get stats for each individual message
53//! let mut dbg_msg = MessageBuilder::new().call("GetConnectionStats")
54//!     .with_interface("org.freedesktop.DBus.Debug.Stats")
55//!     .on("/org/freedesktop/DBus")
56//!     .at("org.freedesktop.DBus")
57//!     .build();
58//! let mut res_futs = Vec::with_capacity(names.len());
59//! for name in names.iter() {
60//!     dbg_msg.body.reset();
61//!     dbg_msg.body.push_param(name).unwrap();
62//!     let res_fut = conn.send_msg_w_rsp(&dbg_msg).await.unwrap();
63//!     res_futs.push(res_fut);
64//! }
65//! let stats = try_join_all(res_futs).await.unwrap();
66//!
67//! // Parse responses and print out some info
68//! dbus_variant_var!(StatVariant, U32 => u32; Str => &'buf str);
69//! for (name, stat_msg) in names.into_iter().zip(stats) {
70//!     if !matches!(stat_msg.typ, MessageType::Reply) {
71//!         continue;
72//!     }
73//!     let mut stat_map: HashMap<&str, StatVariant> = stat_msg.body.parser().get().unwrap();
74//!     let unique = match stat_map["UniqueName"] {
75//!         StatVariant::Str(s) => s,
76//!         _ => continue,
77//!     };
78//!     let peak_out = match stat_map["PeakOutgoingBytes"] {
79//!         StatVariant::U32(s) => s,
80//!         _ => continue,
81//!     };
82//!     let peak_in = match stat_map["PeakIncomingBytes"] {
83//!         StatVariant::U32(s) => s,
84//!         _ => continue,
85//!     };
86//!     println!("\t{} ({}):", name, unique);
87//!     println!("\t\t PeakIncomingBytes: {}, PeakOutgoingBytes: {}\n", peak_in, peak_out);
88//! }
89//! # });
90//! ```
91//! A simple example server that gives out the time in millis since Epoch and a reference time:
92//! ```no_run
93//! # use tokio::runtime::Builder;
94//! # let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
95//! # runtime.block_on(async {
96//! use async_rustbus::{RpcConn, MatchRule, CallAction};
97//! use async_rustbus::rustbus_core;
98//! use rustbus_core::message_builder::{MessageBuilder, MessageType};
99//! use std::time::{Instant, SystemTime, UNIX_EPOCH};
100//! let conn = RpcConn::session_conn(false).await.unwrap();
101//! conn.insert_call_path("/example/TimeServer", CallAction::Exact).await.unwrap();
102//! conn.insert_call_path("/", CallAction::Intro).await.unwrap();
103//! conn.request_name("example.TimeServer").await.unwrap();
104//! let start = Instant::now();
105//! loop {
106//!     let call = match conn.get_call("/example/TimeServer").await {
107//!         Ok(c) => c,
108//!         Err(e) => {
109//!             eprintln!("Error occurred waiting for calls: {:?}", e);
110//!               break;
111//!         }
112//!        };
113//!     assert!(matches!(call.typ, MessageType::Call));
114//!     let res = match (call.dynheader.interface.as_deref().unwrap(), call.dynheader.member.as_deref().unwrap()) {
115//!         ("example.TimeServer", "GetUnixTime") => {
116//!             let mut res = call.dynheader.make_response();
117//!             let cur_time = UNIX_EPOCH.elapsed().unwrap().as_millis() as u64;
118//!             res.body.push_param(cur_time).unwrap();
119//!             res
120//!         }
121//!         ("example.TimeServer", "GetRefTime") => {
122//!             let mut res = call.dynheader.make_response();
123//!             let elapsed = start.elapsed().as_millis() as u64;
124//!             res.body.push_param(elapsed).unwrap();
125//!             res
126//!         }
127//!         ("org.freedesktop.DBus.Introspectable", "Introspect") => {
128//!             todo!("We need to put a introspect impl so that other connection can discover this object.");
129//!         }
130//!         _ => {
131//!             call.dynheader.make_error_response("UnknownInterface", None)
132//!         }
133//!     };
134//!     conn.send_msg_wo_rsp(&res).await.unwrap();
135//! }
136//! # });
137//! ```
138//!
139//! [`rustbus`]: https://crates.io/crates/rustbus
140use std::collections::HashMap;
141use std::convert::TryInto;
142use std::io::ErrorKind;
143use std::num::NonZeroU32;
144use std::os::unix::io::{AsRawFd, RawFd};
145use std::pin::Pin;
146use std::sync::atomic::{AtomicU32, Ordering};
147use std::sync::Arc;
148
149use async_channel::{unbounded, Receiver as CReceiver, Sender as CSender};
150use futures::future::{Either, TryFutureExt};
151
152#[doc(hidden)]
153pub use utils::poll_once;
154
155use std::path::Path;
156use tokio::io::unix::AsyncFd;
157use tokio::net::ToSocketAddrs;
158use tokio::sync::oneshot::{
159    channel as oneshot_channel, Receiver as OneReceiver, Sender as OneSender,
160};
161// use async_std::sync::{Condvar, Mutex};
162use futures::pin_mut;
163use futures::prelude::*;
164use futures::task::{Context, Poll};
165use tokio::sync::watch::{channel as watch_channel, Sender as WatchSender};
166use tokio::sync::Mutex;
167
168pub mod rustbus_core;
169
170use rustbus_core::message_builder::{MarshalledMessage, MessageType};
171use rustbus_core::path::ObjectPath;
172use rustbus_core::standard_messages::{hello, release_name, request_name};
173use rustbus_core::standard_messages::{
174    DBUS_NAME_FLAG_DO_NOT_QUEUE, DBUS_REQUEST_NAME_REPLY_ALREADY_OWNER,
175    DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER,
176};
177
178pub mod conn;
179
180use conn::{Conn, GenStream, RecvState, SendState};
181
182mod utils;
183
184mod routing;
185use routing::{queue_sig, CallHierarchy};
186pub use routing::{CallAction, MatchRule, EMPTY_MATCH};
187
188pub use conn::{get_session_bus_addr, get_system_bus_addr, DBusAddr};
189
190const NO_REPLY_EXPECTED: u8 = 0x01;
191
192struct MsgQueue {
193    sender: CSender<MarshalledMessage>,
194    recv: CReceiver<MarshalledMessage>,
195}
196impl MsgQueue {
197    fn new() -> Self {
198        let (sender, recv) = unbounded::<MarshalledMessage>();
199        Self { sender, recv }
200    }
201    fn get_receiver(&self) -> CReceiver<MarshalledMessage> {
202        self.recv.clone()
203    }
204    fn send(&self, msg: MarshalledMessage) {
205        self.sender.try_send(msg).unwrap()
206    }
207}
208struct RecvData {
209    state: RecvState,
210    reply_map: HashMap<NonZeroU32, OneSender<MarshalledMessage>>,
211    hierarchy: CallHierarchy,
212    sig_matches: Vec<MatchRule>,
213}
214/// RpcConn is used to create and interact with a DBus connection.
215/// It can be used to easily connect to either session (user) or system DBus daemons.
216/// `RpcConn` is thread-safe and can be used from within an [`Arc`] if desired.
217///
218/// [`Arc`]: https://doc.rust-lang.org/std/sync/struct.Arc.html
219pub struct RpcConn {
220    conn: AsyncFd<GenStream>,
221    //recv_cond: Condvar,
222    recv_watch: WatchSender<()>,
223    recv_data: Arc<Mutex<RecvData>>,
224    send_data: Mutex<(SendState, Option<NonZeroU32>)>,
225    serial: AtomicU32,
226    auto_name: String,
227}
228impl RpcConn {
229    async fn new(conn: Conn) -> std::io::Result<Self> {
230        unsafe {
231            let recvd = libc::fcntl(conn.as_raw_fd(), libc::F_GETFL);
232            if recvd == -1 {
233                return Err(std::io::Error::last_os_error());
234            }
235            if libc::O_NONBLOCK & recvd == 0
236                && libc::fcntl(conn.as_raw_fd(), libc::F_SETFL, recvd | libc::O_NONBLOCK) == -1
237            {
238                return Err(std::io::Error::last_os_error());
239            }
240        }
241        let recv_data = RecvData {
242            state: conn.recv_state,
243            reply_map: HashMap::new(),
244            hierarchy: CallHierarchy::new(),
245            sig_matches: Vec::new(),
246        };
247        let (recv_watch, _) = watch_channel(());
248        let mut ret = Self {
249            conn: AsyncFd::new(conn.stream)?,
250            send_data: Mutex::new((conn.send_state, None)),
251            recv_data: Arc::new(Mutex::new(recv_data)),
252            recv_watch,
253            serial: AtomicU32::new(1),
254            auto_name: String::new(),
255        };
256        let hello_res = ret.send_msg(&hello()).await?.unwrap().await?;
257        match hello_res.typ {
258            MessageType::Reply => {
259                ret.auto_name = hello_res.body.parser().get().map_err(|_| {
260                    std::io::Error::new(ErrorKind::ConnectionRefused, "Unable to parser name")
261                })?;
262                Ok(ret)
263            }
264            MessageType::Error => {
265                let (err, details): (&str, &str) = hello_res
266                    .body
267                    .parser()
268                    .get()
269                    .unwrap_or(("Unable to parse message", ""));
270                Err(std::io::Error::new(
271                    ErrorKind::ConnectionRefused,
272                    format!("Hello message failed with: {}: {}", err, details),
273                ))
274            }
275            _ => Err(std::io::Error::new(
276                ErrorKind::ConnectionAborted,
277                "Unexpected reply to hello message!",
278            )),
279        }
280    }
281    /// Returns the name assigned by the DBus daemon.
282    /// This name was retreived using the `org.freedesktop.DBus.Hello` call when the connection was started.
283    pub fn get_name(&self) -> &str {
284        &self.auto_name
285    }
286    /// Connect to the system bus.
287    ///
288    /// If `with_fd` is true then sending and receiving file descriptors is enabled for this connection.
289    /// # Notes
290    /// * Like all the `RpcConn` constructors, this method handles sending the initial `org.freedesktop.DBus.Hello` message and handles the response.
291    /// # Examples
292    /// ```
293    /// # use tokio::runtime::Builder;
294    /// # let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
295    /// # runtime.block_on(async {
296    /// use async_rustbus::RpcConn;
297    /// use async_rustbus::rustbus_core::message_builder::MessageBuilder;
298    /// let conn = RpcConn::system_conn(false).await.unwrap();
299    /// let mut msg = MessageBuilder::new().call("GetConnectionUnixProcessID")
300    ///     .at("org.freedesktop.DBus")
301    ///        .on("/org/freedesktop/DBus")
302    ///     .with_interface("org.freedesktop.DBus")
303    ///     .build();
304    /// msg.body.push_param(conn.get_name()).unwrap();
305    /// let res = conn.send_msg_w_rsp(&msg).await.unwrap().await.unwrap();
306    /// let pid: u32 = res.body.parser().get().unwrap();
307    /// assert_eq!(pid, std::process::id());
308    /// # });
309    /// ```
310    pub async fn session_conn(with_fd: bool) -> std::io::Result<Self> {
311        let addr = get_session_bus_addr().await?;
312        Self::connect_to_addr(&addr, with_fd).await
313    }
314    /// Connect to the session (user) bus.
315    ///
316    /// If `with_fd` is true then sending and receiving file descriptors is enabled for this connection.
317    /// # Notes
318    /// * Like all the `RpcConn` constructors, this method handles sending the initial `org.freedesktop.DBus.Hello` message and handles the response.
319    /// # Examples
320    /// ```
321    /// # use tokio::runtime::Builder;
322    /// # let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
323    /// # runtime.block_on(async {
324    /// use async_rustbus::RpcConn;
325    /// use async_rustbus::rustbus_core::message_builder::MessageBuilder;
326    /// let conn = RpcConn::system_conn(false).await.unwrap();
327    /// let mut msg = MessageBuilder::new().call("GetConnectionUnixProcessID")
328    ///     .at("org.freedesktop.DBus")
329    ///        .on("/org/freedesktop/DBus")
330    ///     .with_interface("org.freedesktop.DBus")
331    ///     .build();
332    /// msg.body.push_param(conn.get_name()).unwrap();
333    /// let res = conn.send_msg_w_rsp(&msg).await.unwrap().await.unwrap();
334    /// let pid: u32 = res.body.parser().get().unwrap();
335    /// assert_eq!(pid, std::process::id());
336    /// # });
337    /// ```
338    pub async fn system_conn(with_fd: bool) -> std::io::Result<Self> {
339        let addr = get_system_bus_addr().await?;
340        //let path = get_system_bus_path().await?;
341        Self::connect_to_addr(&addr, with_fd).await
342    }
343    /// Connect to the given address.
344    ///
345    /// This can be used to connect to a non-standard DBus daemon.
346    /// If `with_fd` is true then sending and receiving file descriptors is enabled for this connection.
347    /// In most instances users should use [`session_conn`] or [`system_conn`] to connecto to their local DBus instance.
348    /// # Notes
349    /// * Like all the `RpcConn` constructors, this method handles sending the initial `org.freedesktop.DBus.Hello` message and handles the response.
350    /// # Examples
351    /// ```
352    /// # use tokio::runtime::Builder;
353    /// # let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
354    /// # runtime.block_on(async {
355    /// use async_rustbus::{RpcConn, DBusAddr};
356    /// use async_rustbus::rustbus_core::message_builder::MessageBuilder;
357    /// let system_addr = DBusAddr::unix_path("/run/dbus/system_bus_socket");
358    /// let conn = RpcConn::connect_to_addr(&system_addr, false).await.unwrap();
359    /// let mut msg = MessageBuilder::new().call("GetConnectionUnixProcessID")
360    ///     .at("org.freedesktop.DBus")
361    ///        .on("/org/freedesktop/DBus")
362    ///     .with_interface("org.freedesktop.DBus")
363    ///     .build();
364    /// msg.body.push_param(conn.get_name()).unwrap();
365    /// let res = conn.send_msg_w_rsp(&msg).await.unwrap().await.unwrap();
366    /// let pid: u32 = res.body.parser().get().unwrap();
367    /// assert_eq!(pid, std::process::id());
368    /// # });
369    /// ```
370    ///
371    /// [`session_conn`]: ./struct.RpcConn.html#method.session_conn
372    /// [`system_conn`]: ./struct.RpcConn.html#method.system_conn
373    pub async fn connect_to_addr<P: AsRef<Path>, S: ToSocketAddrs, B: AsRef<[u8]>>(
374        addr: &DBusAddr<P, S, B>,
375        with_fd: bool,
376    ) -> std::io::Result<Self> {
377        let conn = Conn::connect_to_addr(addr, with_fd).await?;
378        Self::new(conn).await
379    }
380    /// Connect to the given Unix sockect path.
381    ///
382    /// This can be used to connect to a non-standard DBus daemon.
383    /// If `with_fd` is true then sending and receiving file descriptors is enabled for this connection.
384    /// In most instances users should use [`session_conn`] or [`system_conn`] to connecto to their local DBus instance.
385    /// # Notes
386    /// * Like all the `RpcConn` constructors, this method handles sending the initial `org.freedesktop.DBus.Hello` message and handles the response.
387    /// # Examples
388    /// ```
389    /// # use tokio::runtime::Builder;
390    /// # let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
391    /// # runtime.block_on(async {
392    /// use async_rustbus::RpcConn;
393    /// use async_rustbus::rustbus_core::message_builder::MessageBuilder;
394    /// let conn = RpcConn::connect_to_path("/run/dbus/system_bus_socket", false).await.unwrap();
395    /// let mut msg = MessageBuilder::new().call("GetConnectionUnixProcessID")
396    ///     .at("org.freedesktop.DBus")
397    ///        .on("/org/freedesktop/DBus")
398    ///     .with_interface("org.freedesktop.DBus")
399    ///     .build();
400    /// msg.body.push_param(conn.get_name()).unwrap();
401    /// let res = conn.send_msg_w_rsp(&msg).await.unwrap().await.unwrap();
402    /// let pid: u32 = res.body.parser().get().unwrap();
403    /// assert_eq!(pid, std::process::id());
404    /// # });
405    /// ```
406    ///
407    /// [`session_conn`]: ./struct.RpcConn.html#method.session_conn
408    /// [`system_conn`]: ./struct.RpcConn.html#method.system_conn
409    pub async fn connect_to_path<P: AsRef<Path>>(path: P, with_fd: bool) -> std::io::Result<Self> {
410        let conn = Conn::connect_to_path(path, with_fd).await?;
411        Self::new(conn).await
412    }
413    /*
414    /// Set a filter that determines whether a signal should be dropped for received.
415    ///
416    /// If the filter returns `true` then the message is allowed to be received, otherwise it is dropped.
417    /// The default signal filter when `RpcConn` is constructed is to drop all incoming signals.
418    /// This default was chosen primarly to prevent leaking resources for unexpected signals sent specifically to this connection (by setting the destination header field).
419    pub async fn set_sig_filter(
420        &self,
421        filter: Box<dyn Send + Sync + FnMut(&MarshalledMessage) -> bool>,
422    ) {
423        let mut recv_data = self.recv_data.lock().await;
424        recv_data.sig_filter = filter;
425    }
426    */
427    fn allocate_idx(&self) -> NonZeroU32 {
428        let mut idx = 0;
429        while idx == 0 {
430            idx = self.serial.fetch_add(1, Ordering::Relaxed);
431        }
432        NonZeroU32::new(idx).unwrap()
433    }
434    /// Make a DBus call to a remote service or send a signal.
435    ///
436    /// This function returns a future nested inside a future.
437    /// Awaiting the outer future sends the message out the DBus stream to the remote service.
438    /// The inner future, returned by the outer, waits for the response from the remote service.
439    /// # Notes
440    /// * If the message sent was a signal or has the NO_REPLY_EXPECTED flag set then the inner future will
441    ///   return immediatly when awaited.
442    /// * If two futures are simultanously being awaited (like via `futures::future::join` or across tasks) then outgoing order of messages is not guaranteed.
443    /// * If other incoming message are received before a response is received, then they will be processed by this future while awaiting.
444    /// This processing may include placing messages in their correct queue or sending simple responses out the connection.
445    ///
446    pub async fn send_msg(
447        &self,
448        msg: &MarshalledMessage,
449    ) -> std::io::Result<Option<impl Future<Output = std::io::Result<MarshalledMessage>> + '_>>
450    {
451        Ok(if expects_reply(msg) {
452            Some(self.send_msg_w_rsp(msg).await?)
453        } else {
454            self.send_msg_wo_rsp(msg).await?;
455            None
456        })
457    }
458    async fn send_msg_loop(&self, msg: &MarshalledMessage, idx: NonZeroU32) -> std::io::Result<()> {
459        let mut send_idx = None;
460        loop {
461            // TODO: use writeable instead of send_data lock
462            let mut write_guard = self.conn.writable().await?;
463            let mut send_lock = self.send_data.lock().await;
464            let stream = self.conn.get_ref();
465            match send_idx {
466                Some(send_idx) => {
467                    if send_lock.0.current_idx() > send_idx {
468                        return Ok(());
469                    }
470                    let new_idx = match send_lock.0.finish_sending_next(stream) {
471                        Ok(i) => i,
472                        Err(e) if e.kind() == ErrorKind::WouldBlock => {
473                            write_guard.clear_ready();
474                            continue;
475                        }
476                        Err(e) => return Err(e),
477                    };
478                    if new_idx > send_idx {
479                        return Ok(());
480                    }
481                }
482                None => {
483                    send_idx = match send_lock.0.write_next_message(stream, msg, idx) {
484                        Ok(si) => si,
485                        Err(e) if e.kind() == ErrorKind::WouldBlock => {
486                            write_guard.clear_ready();
487                            continue;
488                        }
489                        Err(e) => return Err(e),
490                    };
491                    if send_idx.is_none() {
492                        return Ok(());
493                    }
494                }
495            }
496            drop(send_lock);
497        }
498    }
499    /// Sends a signal or make a call to a remote service with the NO_REPLY_EXPECTED flag set.
500    ///
501    /// # Notes
502    /// * If multiple send futures are simultanously being awaited (like via `futures::future::join` or across tasks) then outgoing order of messages is not guaranteed.
503    /// # Panics
504    /// * Panics if the message given expects a reply.
505    pub async fn send_msg_wo_rsp(&self, msg: &MarshalledMessage) -> std::io::Result<()> {
506        assert!(!expects_reply(msg));
507        let idx = self.allocate_idx();
508        self.send_msg_loop(msg, idx).await
509    }
510    /// Make a call to a remote service.
511    ///
512    /// `msg` must be a message the expects a call otherwise this method will panic.
513    ///
514    /// # Notes
515    /// * If multiple send futures are simultanously being awaited (like via `futures::future::join` or across tasks) then outgoing order of messages is not guaranteed.
516    /// * If other incoming message are received before a response is received, then they will be processed by this future while awaiting.
517    /// This processing may include placing messages in their correct queue or sending simple responses out the connection.
518    /// # Panics
519    /// * Panics if the message does not expect a reply, such as signals or calls with the NO_REPLY_EXPECTED set.
520    pub async fn send_msg_w_rsp(
521        &self,
522        msg: &MarshalledMessage,
523    ) -> std::io::Result<impl Future<Output = std::io::Result<MarshalledMessage>> + '_> {
524        assert!(expects_reply(msg));
525        let idx = self.allocate_idx();
526        let recv = self.get_recv_and_insert_sender(idx).await;
527        let msg_fut = recv.map_err(|_| panic!("Message reply channel should never be closed"));
528        self.send_msg_loop(msg, idx).await?;
529        let res_pred = move |msg: &MarshalledMessage, _: &mut RecvData| match &msg.typ {
530            MessageType::Reply | MessageType::Error => {
531                let res_idx = match msg.dynheader.response_serial {
532                    Some(res_idx) => res_idx,
533                    None => {
534                        unreachable!("Should never reply/err without res serial.")
535                    }
536                };
537                res_idx == idx
538            }
539            _ => false,
540        };
541        Ok(ResponseFuture {
542            idx,
543            rpc_conn: self,
544            fut: self.get_msg(msg_fut, res_pred).boxed(),
545        })
546    }
547    async fn get_recv_and_insert_sender(&self, idx: NonZeroU32) -> OneReceiver<MarshalledMessage> {
548        let (sender, recv) = oneshot_channel();
549        let mut recv_lock = self.recv_data.lock().await;
550        recv_lock.reply_map.insert(idx, sender);
551        recv
552    }
553    /// Add a match to retreive signals.
554    ///
555    /// A `org.freedesktop.DBus.AddMatch` call is made to tell the DBus daemon to route matching signals to this connection.
556    /// These signals are stored by the `RpcConn` and can be retreived by using the [`get_signal`] method.
557    /// If a message is received that matches multiple `sig_match`es, then the message is associated with the most specific specific [`MatchRule`].
558    /// See [`MatchRule`] for more details.
559    ///
560    /// # Panics
561    /// * Panics if both the path and path_namespace matching parameters are used in the `MatchRule`.
562    ///
563    /// # Examples
564    /// ```
565    /// # use tokio::runtime::Builder;
566    /// # let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
567    /// # runtime.block_on(async {
568    /// use async_rustbus::{RpcConn, MatchRule};
569    /// let conn = RpcConn::session_conn(false).await.unwrap();
570    /// let rule = MatchRule::new()
571    ///     .sender("org.freedesktop.DBus")
572    ///     .interface("org.freedesktop.DBus")
573    ///     .member("NameOwnerChanged").clone();
574    /// conn.insert_sig_match(&rule).await.unwrap();
575    /// let owned = conn.request_name("example.name").await.unwrap();
576    /// if owned {
577    ///     let msg = conn.get_signal(&rule).await.unwrap();
578    ///     let (_, _, new_owner): (&str, &str, &str)  = msg.body.parser().get3().unwrap();
579    ///     assert_eq!(new_owner, conn.get_name());
580    /// }
581    /// conn.remove_sig_match(&rule).await.unwrap();
582    /// # });
583    /// ```
584    ///
585    /// [`get_signal`]: ./struct.RpcConn.html#method.get_signal
586    /// [`set_sig_filter`]: ./struct.RpcConn.html#method.set_sig_filter
587    /// [`MatchRule`]: ./struct.MatchRule.html
588    pub async fn insert_sig_match(&self, sig_match: &MatchRule) -> std::io::Result<()> {
589        assert!(!(sig_match.path.is_some() && sig_match.path_namespace.is_some()));
590        let mut recv_data = self.recv_data.lock().await;
591        let insert_idx = match recv_data.sig_matches.binary_search(sig_match) {
592            Ok(_) => {
593                return Err(std::io::Error::new(
594                    ErrorKind::InvalidInput,
595                    "Already exists",
596                ))
597            }
598            Err(i) => i,
599        };
600        let mut to_insert = sig_match.clone();
601        to_insert.queue = Some(MsgQueue::new());
602        recv_data.sig_matches.insert(insert_idx, to_insert);
603        drop(recv_data);
604        let match_str = sig_match.match_string();
605        let call = rustbus_core::standard_messages::add_match(&match_str);
606        let res = self.send_msg_w_rsp(&call).await?.await?;
607        match res.typ {
608            MessageType::Reply => Ok(()),
609            MessageType::Error => {
610                let mut recv_data = self.recv_data.lock().await;
611                if let Ok(idx) = recv_data.sig_matches.binary_search(sig_match) {
612                    recv_data.sig_matches.remove(idx);
613                }
614                let err_str: &str = res
615                    .body
616                    .parser()
617                    .get()
618                    .unwrap_or("Unknown DBus Error Type!");
619                Err(std::io::Error::new(ErrorKind::Other, err_str))
620            }
621            _ => unreachable!(),
622        }
623    }
624    /// Stop the reception of messages matching `sig_match`
625    ///
626    /// This method calls `org.freedesktop.DBus.RemoveMatch` to stop the reception of matching signals.
627    /// Any messages already received that haven't been retreived are lost.
628    /// This method will return an `InavalidInput` if the `MatchRule` is not already present in the `RpcConn`.
629    ///
630    /// # Examples
631    /// See [`insert_sig_path`] for an example.
632    ///
633    /// [`insert_sig_path`]: ./struct.RpcConn.html#method.insert_sig_path
634    pub async fn remove_sig_match(&self, sig_match: &MatchRule) -> std::io::Result<()> {
635        let mut recv_data = self.recv_data.lock().await;
636        let idx = match recv_data.sig_matches.binary_search(sig_match) {
637            Err(_) => {
638                return Err(std::io::Error::new(
639                    ErrorKind::InvalidInput,
640                    "MatchRule doesn't exist!",
641                ))
642            }
643            Ok(i) => i,
644        };
645        recv_data.sig_matches.remove(idx);
646        drop(recv_data);
647        let match_str = sig_match.match_string();
648        let call = rustbus_core::standard_messages::remove_match(&match_str);
649        let res = self.send_msg_w_rsp(&call).await?.await?;
650        match res.typ {
651            MessageType::Reply => Ok(()),
652            MessageType::Error => {
653                let err_str: &str = res
654                    .body
655                    .parser()
656                    .get()
657                    .unwrap_or("Unknown DBus Error Type!");
658                Err(std::io::Error::new(ErrorKind::Other, err_str))
659            }
660            _ => unreachable!(),
661        }
662    }
663    fn queue_msg<F>(
664        &self,
665        recv_data: &mut RecvData,
666        pred: F,
667    ) -> std::io::Result<(MarshalledMessage, bool)>
668    where
669        F: Fn(&MarshalledMessage, &mut RecvData) -> bool,
670    {
671        let stream = self.conn.get_ref();
672        loop {
673            let msg = recv_data.state.get_next_message(stream)?;
674            if pred(&msg, recv_data) {
675                return Ok((msg, false));
676            } else {
677                match &msg.typ {
678                    MessageType::Signal => queue_sig(&recv_data.sig_matches, msg),
679                    MessageType::Reply | MessageType::Error => {
680                        let idx = msg
681                            .dynheader
682                            .response_serial
683                            .expect("Reply should always have a response serial!");
684                        if let Some(sender) = recv_data.reply_map.remove(&idx) {
685                            sender.send(msg).ok();
686                        }
687                    }
688                    MessageType::Call => {
689                        if let Err(msg) = recv_data.hierarchy.send(msg) {
690                            return Ok((msg, true));
691                        }
692                    }
693                    MessageType::Invalid => unreachable!(),
694                }
695            }
696        }
697    }
698
699    async fn get_msg<F, P>(&self, msg_fut: F, pred: P) -> std::io::Result<MarshalledMessage>
700    where
701        F: Future<Output = std::io::Result<MarshalledMessage>>,
702        P: Fn(&MarshalledMessage, &mut RecvData) -> bool,
703    {
704        pin_mut!(msg_fut);
705        let mut msg_fut = match poll_once(msg_fut) {
706            Either::Left(res) => return res,
707            Either::Right(f) => f,
708        };
709
710        loop {
711            let mut read_guard = self.conn.readable().await?;
712            let recv_guard_fut = self.recv_data.lock();
713            tokio::select! {
714                biased;
715                res = &mut msg_fut => { return res }
716                mut recv_guard = recv_guard_fut => match self.queue_msg(&mut recv_guard, &pred) {
717                    Err(e) if e.kind() == ErrorKind::WouldBlock => {
718                        read_guard.clear_ready();
719                        continue;
720                    }
721                    Err(e) => { return Err(e) }
722                    Ok((msg, should_reply)) => {
723                        self.recv_watch.send_replace(());
724                        if !should_reply {
725                            return Ok(msg);
726                        }
727
728                        drop(recv_guard);
729                        self.send_msg_wo_rsp(&msg).await?;
730                    }
731                }
732            }
733        }
734    }
735
736    /// Gets the next signal not filtered by the message filter.
737    ///
738    /// Use the same `sig_match` used with [`insert_sig_match`] to wait for its associated signals.
739    /// Signals with this connection as a destination are always sent to this connection regardless
740    /// of if there is an matching [`MatchRule`]. If these signals are not filtered out and do not match
741    /// a given filter they can be retreived uisng the default `MatchRule`.
742    /// # Notes
743    /// * While awaiting for a matching signal, this future will process other incoming messages.
744    /// This processing may include placing messages in their correct queue or sending simple responses out the connection.
745    /// # Examples
746    /// See [`insert_sig_match`] for an example.
747    ///
748    /// [`insert_sig_match`]: ./struct.RpcConn.html#method.insert_sig_match
749    pub async fn get_signal(&self, sig_match: &MatchRule) -> std::io::Result<MarshalledMessage> {
750        let recv_data = self.recv_data.lock().await;
751        let idx = recv_data
752            .sig_matches
753            .binary_search(sig_match)
754            .map_err(|_| {
755                std::io::Error::new(ErrorKind::InvalidInput, "Unknown match rule given!")
756            })?;
757        let recv = recv_data.sig_matches[idx]
758            .queue
759            .as_ref()
760            .unwrap()
761            .get_receiver();
762        drop(recv_data);
763
764        let msg_fut = recv.recv().map_err(|_| {
765            std::io::Error::new(
766                ErrorKind::Interrupted,
767                "Signal match was deleted while waiting!",
768            )
769        });
770        let sig_pred = |msg: &MarshalledMessage, _: &mut RecvData| sig_match.matches(msg);
771        self.get_msg(msg_fut, sig_pred).await
772    }
773
774    /// Gets the next call associated with the given path.
775    ///
776    /// Use `insert_call_path` to setup the `RpcConn` for receiving calls.
777    /// An `InvalidInput` error is returned if the path does not have an associated queue.
778    /// # Notes
779    /// * While awaiting for a matching call, this future will process other incoming messages.
780    /// This processing may include placing messages in their correct queue or sending simple responses out the connection.
781    ///
782    /// [`insert_call_path`]: ./struct.RpcConn.html#method.insert_call_path
783    pub async fn get_call<'a, S, D>(&self, path: S) -> std::io::Result<MarshalledMessage>
784    where
785        S: TryInto<&'a ObjectPath, Error = D>,
786        D: std::fmt::Debug,
787    {
788        let path = path.try_into().map_err(|e| {
789            std::io::Error::new(ErrorKind::InvalidInput, format!("Invalid path: {:?}", e))
790        })?;
791        let recv_guard = self.recv_data.lock().await;
792        let msg_queue = recv_guard.hierarchy.get_queue(path).ok_or_else(|| {
793            std::io::Error::new(ErrorKind::InvalidInput, "Unknown message path given!")
794        })?;
795        let recv = msg_queue.get_receiver();
796        drop(recv_guard);
797        let call_fut = recv.recv().map_err(|_| {
798            std::io::Error::new(
799                ErrorKind::Interrupted,
800                "Call Queue was deleted while waiting!",
801            )
802        });
803        let call_pred = |msg: &MarshalledMessage, recv_data: &mut RecvData| match &msg.typ {
804            MessageType::Call => {
805                let msg_path =
806                    ObjectPath::from_str(msg.dynheader.object.as_ref().unwrap()).unwrap();
807                recv_data.hierarchy.is_match(path, msg_path)
808            }
809            _ => false,
810        };
811        self.get_msg(call_fut, call_pred).await
812    }
813
814    /// Configure what action the `RpcConn` should take when receiving calls for a path or namespace.
815    ///
816    /// See [`CallAction`] for details on what each action does.
817    /// The default action before any `insert_call_path` calls is to drop all incoming messsage calls and reply with an error.
818    ///
819    /// [`CallAction`]: ./enum.CallAction.html
820    pub async fn insert_call_path<'a, S, D>(&self, path: S, action: CallAction) -> Result<(), D>
821    where
822        S: TryInto<&'a ObjectPath, Error = D>,
823    {
824        let path = path.try_into()?;
825        let mut recv_data = self.recv_data.lock().await;
826        recv_data.hierarchy.insert_path(path, action);
827        Ok(())
828    }
829    /// Get the action for a path.
830    /// # Returns
831    /// Returns the action if there is one for that path.
832    /// If there is no action for the given path or the path is invalid `None` is returned.
833    pub async fn get_call_path_action<'a, S: TryInto<&'a ObjectPath>>(
834        &self,
835        path: S,
836    ) -> Option<CallAction> {
837        let path = path.try_into().ok()?;
838        let recv_data = self.recv_data.lock().await;
839        recv_data.hierarchy.get_action(path)
840    }
841    /// Get a receiver for the incoming call queue if there is one.
842    ///
843    /// This receiver will produce calls from this path/namespace.
844    /// Receiving messages from the `Receiver` doesn't actually cause the `RpcConn` to do any work.
845    /// It will only produce messages if another future from `get_signal` or `get_call` is being worked on.
846    /// This method can be useful if you know that there are other tasks working using this `RpcConn` that will do the work of processing incoming messages.
847    pub async fn get_call_recv<'a, S: TryInto<&'a ObjectPath>>(
848        &self,
849        path: S,
850    ) -> Option<CReceiver<MarshalledMessage>> {
851        let path = path.try_into().ok()?;
852        let recv_data = self.recv_data.lock().await;
853        Some(recv_data.hierarchy.get_queue(path)?.get_receiver())
854    }
855    /// Request a name from the DBus daemon.
856    ///
857    /// Returns `true` if the name was successfully obtained or if it was already owned by this connection.
858    /// otherwise `false` is returned (assuming an IO error did not occur).
859    /// The `DO_NOT_QUEUE` flag is used with the request so if the name is not available, this connection will not be queued to own it in the future.
860    pub async fn request_name(&self, name: &str) -> std::io::Result<bool> {
861        let req = request_name(name, DBUS_NAME_FLAG_DO_NOT_QUEUE);
862        let res = self.send_msg_w_rsp(&req).await?.await?;
863        if MessageType::Error == res.typ {
864            return Ok(false);
865        }
866        Ok(match res.body.parser().get::<u32>() {
867            Ok(ret) => matches!(
868                ret,
869                DBUS_REQUEST_NAME_REPLY_ALREADY_OWNER | DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER
870            ),
871            Err(_) => false,
872        })
873    }
874    /// Release a name from the DBus daemon.
875    ///
876    /// An `Err` is only returned on a IO error.
877    /// If the name was not owned by the connection, or the name was invalid `Ok` is still returned.
878    pub async fn release_name(&self, name: &str) -> std::io::Result<()> {
879        let rel_name = release_name(name);
880        self.send_msg_w_rsp(&rel_name).await?.await?;
881        Ok(())
882    }
883}
884impl AsRawFd for RpcConn {
885    fn as_raw_fd(&self) -> RawFd {
886        self.conn.as_raw_fd()
887    }
888}
889
890struct ResponseFuture<'a, T>
891where
892    T: Future<Output = std::io::Result<MarshalledMessage>>,
893{
894    rpc_conn: &'a RpcConn,
895    idx: NonZeroU32,
896    fut: T,
897}
898
899impl<T> Future for ResponseFuture<'_, T>
900where
901    T: Future<Output = std::io::Result<MarshalledMessage>>,
902{
903    type Output = T::Output;
904    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
905        unsafe { self.map_unchecked_mut(|s| &mut s.fut).poll(cx) }
906    }
907}
908
909impl<T> Drop for ResponseFuture<'_, T>
910where
911    T: Future<Output = std::io::Result<MarshalledMessage>>,
912{
913    fn drop(&mut self) {
914        if let Ok(mut recv_lock) = self.rpc_conn.recv_data.try_lock() {
915            recv_lock.reply_map.remove(&self.idx);
916            return;
917        }
918        let reply_arc = Arc::clone(&self.rpc_conn.recv_data);
919
920        //TODO: Is there a better solution to this?
921        let idx = self.idx;
922        tokio::spawn(async move {
923            let mut recv_lock = reply_arc.lock().await;
924            recv_lock.reply_map.remove(&idx);
925        });
926    }
927}
928
929fn expects_reply(msg: &MarshalledMessage) -> bool {
930    msg.typ == MessageType::Call && (msg.flags & NO_REPLY_EXPECTED) == 0
931}