async_rustbus/lib.rs
1//! Async-rustbus is an async-rustbus library built on top of [`rustbus`].
2//! It is a multi-threaded client allowing for asynchronous calls to services,
3//! and the creation of local services.
4//!
5//! # Missing Features
6//! * Eavesdropping using match rules is not currently supported.
7//! * Monitor mode is not currently supported. There are plans to implement it.
8//! * There is no support for DBUS_COOKIE_SHA1 authentication. This makes DBus over TCP not
9//! as useful with only ANOYNMOUS mode supported when using TCP (EXTERNAL is not available for TCP).
10//! # API Stability
11//! As with most crates,
12//! breaking changes will only be added after an increment of the most sigificant version number (SemVer).
13//! The most likely elements to change in the future is the incoming signal handling
14//! and the relation of this crate and [`rustbus`].
15//!
16//!
17//! # Examples
18//! An example client that queues info about the current DBus session server connections:
19//! ```
20//! # use tokio::runtime::Builder;
21//! # let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
22//! # runtime.block_on(async {
23//! use std::collections::HashMap;
24//! use futures::future::try_join_all;
25//! use async_rustbus::{RpcConn, MatchRule};
26//! use async_rustbus::rustbus_core::message_builder;
27//! use async_rustbus::rustbus_core::dbus_variant_var;
28//! use message_builder::{MessageBuilder, MarshalledMessage, MessageType};
29//!
30//! // Create the DBus connection to the session DBus.
31//! let conn = RpcConn::session_conn(false).await.unwrap();
32//!
33//! // Fetch the ID of the DBus
34//! let mut msg = MessageBuilder::new().call("GetId")
35//! .with_interface("org.freedesktop.DBus")
36//! .on("/org/freedesktop/DBus")
37//! .at("org.freedesktop.DBus")
38//! .build();
39//! let res = conn.send_msg_w_rsp(&msg).await.unwrap().await.unwrap();
40//! assert!(matches!(res.typ, MessageType::Reply));
41//! let id: &str = res.body.parser().get().unwrap();
42//! println!("Info for Dbus {}:", id);
43//!
44//! // Get call of the names of all connections.
45//! msg.dynheader.member = Some("ListNames".into());
46//! let res = conn.send_msg_w_rsp(&msg).await.unwrap().await.unwrap();
47//! assert!(matches!(res.typ, MessageType::Reply));
48//! let mut names: Vec<&str> = res.body.parser().get().unwrap();
49//! // Ignore unique names
50//! names.retain(|s| !s.starts_with(":"));
51//!
52//! // Get stats for each individual message
53//! let mut dbg_msg = MessageBuilder::new().call("GetConnectionStats")
54//! .with_interface("org.freedesktop.DBus.Debug.Stats")
55//! .on("/org/freedesktop/DBus")
56//! .at("org.freedesktop.DBus")
57//! .build();
58//! let mut res_futs = Vec::with_capacity(names.len());
59//! for name in names.iter() {
60//! dbg_msg.body.reset();
61//! dbg_msg.body.push_param(name).unwrap();
62//! let res_fut = conn.send_msg_w_rsp(&dbg_msg).await.unwrap();
63//! res_futs.push(res_fut);
64//! }
65//! let stats = try_join_all(res_futs).await.unwrap();
66//!
67//! // Parse responses and print out some info
68//! dbus_variant_var!(StatVariant, U32 => u32; Str => &'buf str);
69//! for (name, stat_msg) in names.into_iter().zip(stats) {
70//! if !matches!(stat_msg.typ, MessageType::Reply) {
71//! continue;
72//! }
73//! let mut stat_map: HashMap<&str, StatVariant> = stat_msg.body.parser().get().unwrap();
74//! let unique = match stat_map["UniqueName"] {
75//! StatVariant::Str(s) => s,
76//! _ => continue,
77//! };
78//! let peak_out = match stat_map["PeakOutgoingBytes"] {
79//! StatVariant::U32(s) => s,
80//! _ => continue,
81//! };
82//! let peak_in = match stat_map["PeakIncomingBytes"] {
83//! StatVariant::U32(s) => s,
84//! _ => continue,
85//! };
86//! println!("\t{} ({}):", name, unique);
87//! println!("\t\t PeakIncomingBytes: {}, PeakOutgoingBytes: {}\n", peak_in, peak_out);
88//! }
89//! # });
90//! ```
91//! A simple example server that gives out the time in millis since Epoch and a reference time:
92//! ```no_run
93//! # use tokio::runtime::Builder;
94//! # let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
95//! # runtime.block_on(async {
96//! use async_rustbus::{RpcConn, MatchRule, CallAction};
97//! use async_rustbus::rustbus_core;
98//! use rustbus_core::message_builder::{MessageBuilder, MessageType};
99//! use std::time::{Instant, SystemTime, UNIX_EPOCH};
100//! let conn = RpcConn::session_conn(false).await.unwrap();
101//! conn.insert_call_path("/example/TimeServer", CallAction::Exact).await.unwrap();
102//! conn.insert_call_path("/", CallAction::Intro).await.unwrap();
103//! conn.request_name("example.TimeServer").await.unwrap();
104//! let start = Instant::now();
105//! loop {
106//! let call = match conn.get_call("/example/TimeServer").await {
107//! Ok(c) => c,
108//! Err(e) => {
109//! eprintln!("Error occurred waiting for calls: {:?}", e);
110//! break;
111//! }
112//! };
113//! assert!(matches!(call.typ, MessageType::Call));
114//! let res = match (call.dynheader.interface.as_deref().unwrap(), call.dynheader.member.as_deref().unwrap()) {
115//! ("example.TimeServer", "GetUnixTime") => {
116//! let mut res = call.dynheader.make_response();
117//! let cur_time = UNIX_EPOCH.elapsed().unwrap().as_millis() as u64;
118//! res.body.push_param(cur_time).unwrap();
119//! res
120//! }
121//! ("example.TimeServer", "GetRefTime") => {
122//! let mut res = call.dynheader.make_response();
123//! let elapsed = start.elapsed().as_millis() as u64;
124//! res.body.push_param(elapsed).unwrap();
125//! res
126//! }
127//! ("org.freedesktop.DBus.Introspectable", "Introspect") => {
128//! todo!("We need to put a introspect impl so that other connection can discover this object.");
129//! }
130//! _ => {
131//! call.dynheader.make_error_response("UnknownInterface", None)
132//! }
133//! };
134//! conn.send_msg_wo_rsp(&res).await.unwrap();
135//! }
136//! # });
137//! ```
138//!
139//! [`rustbus`]: https://crates.io/crates/rustbus
140use std::collections::HashMap;
141use std::convert::TryInto;
142use std::io::ErrorKind;
143use std::num::NonZeroU32;
144use std::os::unix::io::{AsRawFd, RawFd};
145use std::pin::Pin;
146use std::sync::atomic::{AtomicU32, Ordering};
147use std::sync::Arc;
148
149use async_channel::{unbounded, Receiver as CReceiver, Sender as CSender};
150use futures::future::{Either, TryFutureExt};
151
152#[doc(hidden)]
153pub use utils::poll_once;
154
155use std::path::Path;
156use tokio::io::unix::AsyncFd;
157use tokio::net::ToSocketAddrs;
158use tokio::sync::oneshot::{
159 channel as oneshot_channel, Receiver as OneReceiver, Sender as OneSender,
160};
161// use async_std::sync::{Condvar, Mutex};
162use futures::pin_mut;
163use futures::prelude::*;
164use futures::task::{Context, Poll};
165use tokio::sync::watch::{channel as watch_channel, Sender as WatchSender};
166use tokio::sync::Mutex;
167
168pub mod rustbus_core;
169
170use rustbus_core::message_builder::{MarshalledMessage, MessageType};
171use rustbus_core::path::ObjectPath;
172use rustbus_core::standard_messages::{hello, release_name, request_name};
173use rustbus_core::standard_messages::{
174 DBUS_NAME_FLAG_DO_NOT_QUEUE, DBUS_REQUEST_NAME_REPLY_ALREADY_OWNER,
175 DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER,
176};
177
178pub mod conn;
179
180use conn::{Conn, GenStream, RecvState, SendState};
181
182mod utils;
183
184mod routing;
185use routing::{queue_sig, CallHierarchy};
186pub use routing::{CallAction, MatchRule, EMPTY_MATCH};
187
188pub use conn::{get_session_bus_addr, get_system_bus_addr, DBusAddr};
189
190const NO_REPLY_EXPECTED: u8 = 0x01;
191
192struct MsgQueue {
193 sender: CSender<MarshalledMessage>,
194 recv: CReceiver<MarshalledMessage>,
195}
196impl MsgQueue {
197 fn new() -> Self {
198 let (sender, recv) = unbounded::<MarshalledMessage>();
199 Self { sender, recv }
200 }
201 fn get_receiver(&self) -> CReceiver<MarshalledMessage> {
202 self.recv.clone()
203 }
204 fn send(&self, msg: MarshalledMessage) {
205 self.sender.try_send(msg).unwrap()
206 }
207}
208struct RecvData {
209 state: RecvState,
210 reply_map: HashMap<NonZeroU32, OneSender<MarshalledMessage>>,
211 hierarchy: CallHierarchy,
212 sig_matches: Vec<MatchRule>,
213}
214/// RpcConn is used to create and interact with a DBus connection.
215/// It can be used to easily connect to either session (user) or system DBus daemons.
216/// `RpcConn` is thread-safe and can be used from within an [`Arc`] if desired.
217///
218/// [`Arc`]: https://doc.rust-lang.org/std/sync/struct.Arc.html
219pub struct RpcConn {
220 conn: AsyncFd<GenStream>,
221 //recv_cond: Condvar,
222 recv_watch: WatchSender<()>,
223 recv_data: Arc<Mutex<RecvData>>,
224 send_data: Mutex<(SendState, Option<NonZeroU32>)>,
225 serial: AtomicU32,
226 auto_name: String,
227}
228impl RpcConn {
229 async fn new(conn: Conn) -> std::io::Result<Self> {
230 unsafe {
231 let recvd = libc::fcntl(conn.as_raw_fd(), libc::F_GETFL);
232 if recvd == -1 {
233 return Err(std::io::Error::last_os_error());
234 }
235 if libc::O_NONBLOCK & recvd == 0
236 && libc::fcntl(conn.as_raw_fd(), libc::F_SETFL, recvd | libc::O_NONBLOCK) == -1
237 {
238 return Err(std::io::Error::last_os_error());
239 }
240 }
241 let recv_data = RecvData {
242 state: conn.recv_state,
243 reply_map: HashMap::new(),
244 hierarchy: CallHierarchy::new(),
245 sig_matches: Vec::new(),
246 };
247 let (recv_watch, _) = watch_channel(());
248 let mut ret = Self {
249 conn: AsyncFd::new(conn.stream)?,
250 send_data: Mutex::new((conn.send_state, None)),
251 recv_data: Arc::new(Mutex::new(recv_data)),
252 recv_watch,
253 serial: AtomicU32::new(1),
254 auto_name: String::new(),
255 };
256 let hello_res = ret.send_msg(&hello()).await?.unwrap().await?;
257 match hello_res.typ {
258 MessageType::Reply => {
259 ret.auto_name = hello_res.body.parser().get().map_err(|_| {
260 std::io::Error::new(ErrorKind::ConnectionRefused, "Unable to parser name")
261 })?;
262 Ok(ret)
263 }
264 MessageType::Error => {
265 let (err, details): (&str, &str) = hello_res
266 .body
267 .parser()
268 .get()
269 .unwrap_or(("Unable to parse message", ""));
270 Err(std::io::Error::new(
271 ErrorKind::ConnectionRefused,
272 format!("Hello message failed with: {}: {}", err, details),
273 ))
274 }
275 _ => Err(std::io::Error::new(
276 ErrorKind::ConnectionAborted,
277 "Unexpected reply to hello message!",
278 )),
279 }
280 }
281 /// Returns the name assigned by the DBus daemon.
282 /// This name was retreived using the `org.freedesktop.DBus.Hello` call when the connection was started.
283 pub fn get_name(&self) -> &str {
284 &self.auto_name
285 }
286 /// Connect to the system bus.
287 ///
288 /// If `with_fd` is true then sending and receiving file descriptors is enabled for this connection.
289 /// # Notes
290 /// * Like all the `RpcConn` constructors, this method handles sending the initial `org.freedesktop.DBus.Hello` message and handles the response.
291 /// # Examples
292 /// ```
293 /// # use tokio::runtime::Builder;
294 /// # let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
295 /// # runtime.block_on(async {
296 /// use async_rustbus::RpcConn;
297 /// use async_rustbus::rustbus_core::message_builder::MessageBuilder;
298 /// let conn = RpcConn::system_conn(false).await.unwrap();
299 /// let mut msg = MessageBuilder::new().call("GetConnectionUnixProcessID")
300 /// .at("org.freedesktop.DBus")
301 /// .on("/org/freedesktop/DBus")
302 /// .with_interface("org.freedesktop.DBus")
303 /// .build();
304 /// msg.body.push_param(conn.get_name()).unwrap();
305 /// let res = conn.send_msg_w_rsp(&msg).await.unwrap().await.unwrap();
306 /// let pid: u32 = res.body.parser().get().unwrap();
307 /// assert_eq!(pid, std::process::id());
308 /// # });
309 /// ```
310 pub async fn session_conn(with_fd: bool) -> std::io::Result<Self> {
311 let addr = get_session_bus_addr().await?;
312 Self::connect_to_addr(&addr, with_fd).await
313 }
314 /// Connect to the session (user) bus.
315 ///
316 /// If `with_fd` is true then sending and receiving file descriptors is enabled for this connection.
317 /// # Notes
318 /// * Like all the `RpcConn` constructors, this method handles sending the initial `org.freedesktop.DBus.Hello` message and handles the response.
319 /// # Examples
320 /// ```
321 /// # use tokio::runtime::Builder;
322 /// # let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
323 /// # runtime.block_on(async {
324 /// use async_rustbus::RpcConn;
325 /// use async_rustbus::rustbus_core::message_builder::MessageBuilder;
326 /// let conn = RpcConn::system_conn(false).await.unwrap();
327 /// let mut msg = MessageBuilder::new().call("GetConnectionUnixProcessID")
328 /// .at("org.freedesktop.DBus")
329 /// .on("/org/freedesktop/DBus")
330 /// .with_interface("org.freedesktop.DBus")
331 /// .build();
332 /// msg.body.push_param(conn.get_name()).unwrap();
333 /// let res = conn.send_msg_w_rsp(&msg).await.unwrap().await.unwrap();
334 /// let pid: u32 = res.body.parser().get().unwrap();
335 /// assert_eq!(pid, std::process::id());
336 /// # });
337 /// ```
338 pub async fn system_conn(with_fd: bool) -> std::io::Result<Self> {
339 let addr = get_system_bus_addr().await?;
340 //let path = get_system_bus_path().await?;
341 Self::connect_to_addr(&addr, with_fd).await
342 }
343 /// Connect to the given address.
344 ///
345 /// This can be used to connect to a non-standard DBus daemon.
346 /// If `with_fd` is true then sending and receiving file descriptors is enabled for this connection.
347 /// In most instances users should use [`session_conn`] or [`system_conn`] to connecto to their local DBus instance.
348 /// # Notes
349 /// * Like all the `RpcConn` constructors, this method handles sending the initial `org.freedesktop.DBus.Hello` message and handles the response.
350 /// # Examples
351 /// ```
352 /// # use tokio::runtime::Builder;
353 /// # let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
354 /// # runtime.block_on(async {
355 /// use async_rustbus::{RpcConn, DBusAddr};
356 /// use async_rustbus::rustbus_core::message_builder::MessageBuilder;
357 /// let system_addr = DBusAddr::unix_path("/run/dbus/system_bus_socket");
358 /// let conn = RpcConn::connect_to_addr(&system_addr, false).await.unwrap();
359 /// let mut msg = MessageBuilder::new().call("GetConnectionUnixProcessID")
360 /// .at("org.freedesktop.DBus")
361 /// .on("/org/freedesktop/DBus")
362 /// .with_interface("org.freedesktop.DBus")
363 /// .build();
364 /// msg.body.push_param(conn.get_name()).unwrap();
365 /// let res = conn.send_msg_w_rsp(&msg).await.unwrap().await.unwrap();
366 /// let pid: u32 = res.body.parser().get().unwrap();
367 /// assert_eq!(pid, std::process::id());
368 /// # });
369 /// ```
370 ///
371 /// [`session_conn`]: ./struct.RpcConn.html#method.session_conn
372 /// [`system_conn`]: ./struct.RpcConn.html#method.system_conn
373 pub async fn connect_to_addr<P: AsRef<Path>, S: ToSocketAddrs, B: AsRef<[u8]>>(
374 addr: &DBusAddr<P, S, B>,
375 with_fd: bool,
376 ) -> std::io::Result<Self> {
377 let conn = Conn::connect_to_addr(addr, with_fd).await?;
378 Self::new(conn).await
379 }
380 /// Connect to the given Unix sockect path.
381 ///
382 /// This can be used to connect to a non-standard DBus daemon.
383 /// If `with_fd` is true then sending and receiving file descriptors is enabled for this connection.
384 /// In most instances users should use [`session_conn`] or [`system_conn`] to connecto to their local DBus instance.
385 /// # Notes
386 /// * Like all the `RpcConn` constructors, this method handles sending the initial `org.freedesktop.DBus.Hello` message and handles the response.
387 /// # Examples
388 /// ```
389 /// # use tokio::runtime::Builder;
390 /// # let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
391 /// # runtime.block_on(async {
392 /// use async_rustbus::RpcConn;
393 /// use async_rustbus::rustbus_core::message_builder::MessageBuilder;
394 /// let conn = RpcConn::connect_to_path("/run/dbus/system_bus_socket", false).await.unwrap();
395 /// let mut msg = MessageBuilder::new().call("GetConnectionUnixProcessID")
396 /// .at("org.freedesktop.DBus")
397 /// .on("/org/freedesktop/DBus")
398 /// .with_interface("org.freedesktop.DBus")
399 /// .build();
400 /// msg.body.push_param(conn.get_name()).unwrap();
401 /// let res = conn.send_msg_w_rsp(&msg).await.unwrap().await.unwrap();
402 /// let pid: u32 = res.body.parser().get().unwrap();
403 /// assert_eq!(pid, std::process::id());
404 /// # });
405 /// ```
406 ///
407 /// [`session_conn`]: ./struct.RpcConn.html#method.session_conn
408 /// [`system_conn`]: ./struct.RpcConn.html#method.system_conn
409 pub async fn connect_to_path<P: AsRef<Path>>(path: P, with_fd: bool) -> std::io::Result<Self> {
410 let conn = Conn::connect_to_path(path, with_fd).await?;
411 Self::new(conn).await
412 }
413 /*
414 /// Set a filter that determines whether a signal should be dropped for received.
415 ///
416 /// If the filter returns `true` then the message is allowed to be received, otherwise it is dropped.
417 /// The default signal filter when `RpcConn` is constructed is to drop all incoming signals.
418 /// This default was chosen primarly to prevent leaking resources for unexpected signals sent specifically to this connection (by setting the destination header field).
419 pub async fn set_sig_filter(
420 &self,
421 filter: Box<dyn Send + Sync + FnMut(&MarshalledMessage) -> bool>,
422 ) {
423 let mut recv_data = self.recv_data.lock().await;
424 recv_data.sig_filter = filter;
425 }
426 */
427 fn allocate_idx(&self) -> NonZeroU32 {
428 let mut idx = 0;
429 while idx == 0 {
430 idx = self.serial.fetch_add(1, Ordering::Relaxed);
431 }
432 NonZeroU32::new(idx).unwrap()
433 }
434 /// Make a DBus call to a remote service or send a signal.
435 ///
436 /// This function returns a future nested inside a future.
437 /// Awaiting the outer future sends the message out the DBus stream to the remote service.
438 /// The inner future, returned by the outer, waits for the response from the remote service.
439 /// # Notes
440 /// * If the message sent was a signal or has the NO_REPLY_EXPECTED flag set then the inner future will
441 /// return immediatly when awaited.
442 /// * If two futures are simultanously being awaited (like via `futures::future::join` or across tasks) then outgoing order of messages is not guaranteed.
443 /// * If other incoming message are received before a response is received, then they will be processed by this future while awaiting.
444 /// This processing may include placing messages in their correct queue or sending simple responses out the connection.
445 ///
446 pub async fn send_msg(
447 &self,
448 msg: &MarshalledMessage,
449 ) -> std::io::Result<Option<impl Future<Output = std::io::Result<MarshalledMessage>> + '_>>
450 {
451 Ok(if expects_reply(msg) {
452 Some(self.send_msg_w_rsp(msg).await?)
453 } else {
454 self.send_msg_wo_rsp(msg).await?;
455 None
456 })
457 }
458 async fn send_msg_loop(&self, msg: &MarshalledMessage, idx: NonZeroU32) -> std::io::Result<()> {
459 let mut send_idx = None;
460 loop {
461 // TODO: use writeable instead of send_data lock
462 let mut write_guard = self.conn.writable().await?;
463 let mut send_lock = self.send_data.lock().await;
464 let stream = self.conn.get_ref();
465 match send_idx {
466 Some(send_idx) => {
467 if send_lock.0.current_idx() > send_idx {
468 return Ok(());
469 }
470 let new_idx = match send_lock.0.finish_sending_next(stream) {
471 Ok(i) => i,
472 Err(e) if e.kind() == ErrorKind::WouldBlock => {
473 write_guard.clear_ready();
474 continue;
475 }
476 Err(e) => return Err(e),
477 };
478 if new_idx > send_idx {
479 return Ok(());
480 }
481 }
482 None => {
483 send_idx = match send_lock.0.write_next_message(stream, msg, idx) {
484 Ok(si) => si,
485 Err(e) if e.kind() == ErrorKind::WouldBlock => {
486 write_guard.clear_ready();
487 continue;
488 }
489 Err(e) => return Err(e),
490 };
491 if send_idx.is_none() {
492 return Ok(());
493 }
494 }
495 }
496 drop(send_lock);
497 }
498 }
499 /// Sends a signal or make a call to a remote service with the NO_REPLY_EXPECTED flag set.
500 ///
501 /// # Notes
502 /// * If multiple send futures are simultanously being awaited (like via `futures::future::join` or across tasks) then outgoing order of messages is not guaranteed.
503 /// # Panics
504 /// * Panics if the message given expects a reply.
505 pub async fn send_msg_wo_rsp(&self, msg: &MarshalledMessage) -> std::io::Result<()> {
506 assert!(!expects_reply(msg));
507 let idx = self.allocate_idx();
508 self.send_msg_loop(msg, idx).await
509 }
510 /// Make a call to a remote service.
511 ///
512 /// `msg` must be a message the expects a call otherwise this method will panic.
513 ///
514 /// # Notes
515 /// * If multiple send futures are simultanously being awaited (like via `futures::future::join` or across tasks) then outgoing order of messages is not guaranteed.
516 /// * If other incoming message are received before a response is received, then they will be processed by this future while awaiting.
517 /// This processing may include placing messages in their correct queue or sending simple responses out the connection.
518 /// # Panics
519 /// * Panics if the message does not expect a reply, such as signals or calls with the NO_REPLY_EXPECTED set.
520 pub async fn send_msg_w_rsp(
521 &self,
522 msg: &MarshalledMessage,
523 ) -> std::io::Result<impl Future<Output = std::io::Result<MarshalledMessage>> + '_> {
524 assert!(expects_reply(msg));
525 let idx = self.allocate_idx();
526 let recv = self.get_recv_and_insert_sender(idx).await;
527 let msg_fut = recv.map_err(|_| panic!("Message reply channel should never be closed"));
528 self.send_msg_loop(msg, idx).await?;
529 let res_pred = move |msg: &MarshalledMessage, _: &mut RecvData| match &msg.typ {
530 MessageType::Reply | MessageType::Error => {
531 let res_idx = match msg.dynheader.response_serial {
532 Some(res_idx) => res_idx,
533 None => {
534 unreachable!("Should never reply/err without res serial.")
535 }
536 };
537 res_idx == idx
538 }
539 _ => false,
540 };
541 Ok(ResponseFuture {
542 idx,
543 rpc_conn: self,
544 fut: self.get_msg(msg_fut, res_pred).boxed(),
545 })
546 }
547 async fn get_recv_and_insert_sender(&self, idx: NonZeroU32) -> OneReceiver<MarshalledMessage> {
548 let (sender, recv) = oneshot_channel();
549 let mut recv_lock = self.recv_data.lock().await;
550 recv_lock.reply_map.insert(idx, sender);
551 recv
552 }
553 /// Add a match to retreive signals.
554 ///
555 /// A `org.freedesktop.DBus.AddMatch` call is made to tell the DBus daemon to route matching signals to this connection.
556 /// These signals are stored by the `RpcConn` and can be retreived by using the [`get_signal`] method.
557 /// If a message is received that matches multiple `sig_match`es, then the message is associated with the most specific specific [`MatchRule`].
558 /// See [`MatchRule`] for more details.
559 ///
560 /// # Panics
561 /// * Panics if both the path and path_namespace matching parameters are used in the `MatchRule`.
562 ///
563 /// # Examples
564 /// ```
565 /// # use tokio::runtime::Builder;
566 /// # let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
567 /// # runtime.block_on(async {
568 /// use async_rustbus::{RpcConn, MatchRule};
569 /// let conn = RpcConn::session_conn(false).await.unwrap();
570 /// let rule = MatchRule::new()
571 /// .sender("org.freedesktop.DBus")
572 /// .interface("org.freedesktop.DBus")
573 /// .member("NameOwnerChanged").clone();
574 /// conn.insert_sig_match(&rule).await.unwrap();
575 /// let owned = conn.request_name("example.name").await.unwrap();
576 /// if owned {
577 /// let msg = conn.get_signal(&rule).await.unwrap();
578 /// let (_, _, new_owner): (&str, &str, &str) = msg.body.parser().get3().unwrap();
579 /// assert_eq!(new_owner, conn.get_name());
580 /// }
581 /// conn.remove_sig_match(&rule).await.unwrap();
582 /// # });
583 /// ```
584 ///
585 /// [`get_signal`]: ./struct.RpcConn.html#method.get_signal
586 /// [`set_sig_filter`]: ./struct.RpcConn.html#method.set_sig_filter
587 /// [`MatchRule`]: ./struct.MatchRule.html
588 pub async fn insert_sig_match(&self, sig_match: &MatchRule) -> std::io::Result<()> {
589 assert!(!(sig_match.path.is_some() && sig_match.path_namespace.is_some()));
590 let mut recv_data = self.recv_data.lock().await;
591 let insert_idx = match recv_data.sig_matches.binary_search(sig_match) {
592 Ok(_) => {
593 return Err(std::io::Error::new(
594 ErrorKind::InvalidInput,
595 "Already exists",
596 ))
597 }
598 Err(i) => i,
599 };
600 let mut to_insert = sig_match.clone();
601 to_insert.queue = Some(MsgQueue::new());
602 recv_data.sig_matches.insert(insert_idx, to_insert);
603 drop(recv_data);
604 let match_str = sig_match.match_string();
605 let call = rustbus_core::standard_messages::add_match(&match_str);
606 let res = self.send_msg_w_rsp(&call).await?.await?;
607 match res.typ {
608 MessageType::Reply => Ok(()),
609 MessageType::Error => {
610 let mut recv_data = self.recv_data.lock().await;
611 if let Ok(idx) = recv_data.sig_matches.binary_search(sig_match) {
612 recv_data.sig_matches.remove(idx);
613 }
614 let err_str: &str = res
615 .body
616 .parser()
617 .get()
618 .unwrap_or("Unknown DBus Error Type!");
619 Err(std::io::Error::new(ErrorKind::Other, err_str))
620 }
621 _ => unreachable!(),
622 }
623 }
624 /// Stop the reception of messages matching `sig_match`
625 ///
626 /// This method calls `org.freedesktop.DBus.RemoveMatch` to stop the reception of matching signals.
627 /// Any messages already received that haven't been retreived are lost.
628 /// This method will return an `InavalidInput` if the `MatchRule` is not already present in the `RpcConn`.
629 ///
630 /// # Examples
631 /// See [`insert_sig_path`] for an example.
632 ///
633 /// [`insert_sig_path`]: ./struct.RpcConn.html#method.insert_sig_path
634 pub async fn remove_sig_match(&self, sig_match: &MatchRule) -> std::io::Result<()> {
635 let mut recv_data = self.recv_data.lock().await;
636 let idx = match recv_data.sig_matches.binary_search(sig_match) {
637 Err(_) => {
638 return Err(std::io::Error::new(
639 ErrorKind::InvalidInput,
640 "MatchRule doesn't exist!",
641 ))
642 }
643 Ok(i) => i,
644 };
645 recv_data.sig_matches.remove(idx);
646 drop(recv_data);
647 let match_str = sig_match.match_string();
648 let call = rustbus_core::standard_messages::remove_match(&match_str);
649 let res = self.send_msg_w_rsp(&call).await?.await?;
650 match res.typ {
651 MessageType::Reply => Ok(()),
652 MessageType::Error => {
653 let err_str: &str = res
654 .body
655 .parser()
656 .get()
657 .unwrap_or("Unknown DBus Error Type!");
658 Err(std::io::Error::new(ErrorKind::Other, err_str))
659 }
660 _ => unreachable!(),
661 }
662 }
663 fn queue_msg<F>(
664 &self,
665 recv_data: &mut RecvData,
666 pred: F,
667 ) -> std::io::Result<(MarshalledMessage, bool)>
668 where
669 F: Fn(&MarshalledMessage, &mut RecvData) -> bool,
670 {
671 let stream = self.conn.get_ref();
672 loop {
673 let msg = recv_data.state.get_next_message(stream)?;
674 if pred(&msg, recv_data) {
675 return Ok((msg, false));
676 } else {
677 match &msg.typ {
678 MessageType::Signal => queue_sig(&recv_data.sig_matches, msg),
679 MessageType::Reply | MessageType::Error => {
680 let idx = msg
681 .dynheader
682 .response_serial
683 .expect("Reply should always have a response serial!");
684 if let Some(sender) = recv_data.reply_map.remove(&idx) {
685 sender.send(msg).ok();
686 }
687 }
688 MessageType::Call => {
689 if let Err(msg) = recv_data.hierarchy.send(msg) {
690 return Ok((msg, true));
691 }
692 }
693 MessageType::Invalid => unreachable!(),
694 }
695 }
696 }
697 }
698
699 async fn get_msg<F, P>(&self, msg_fut: F, pred: P) -> std::io::Result<MarshalledMessage>
700 where
701 F: Future<Output = std::io::Result<MarshalledMessage>>,
702 P: Fn(&MarshalledMessage, &mut RecvData) -> bool,
703 {
704 pin_mut!(msg_fut);
705 let mut msg_fut = match poll_once(msg_fut) {
706 Either::Left(res) => return res,
707 Either::Right(f) => f,
708 };
709
710 loop {
711 let mut read_guard = self.conn.readable().await?;
712 let recv_guard_fut = self.recv_data.lock();
713 tokio::select! {
714 biased;
715 res = &mut msg_fut => { return res }
716 mut recv_guard = recv_guard_fut => match self.queue_msg(&mut recv_guard, &pred) {
717 Err(e) if e.kind() == ErrorKind::WouldBlock => {
718 read_guard.clear_ready();
719 continue;
720 }
721 Err(e) => { return Err(e) }
722 Ok((msg, should_reply)) => {
723 self.recv_watch.send_replace(());
724 if !should_reply {
725 return Ok(msg);
726 }
727
728 drop(recv_guard);
729 self.send_msg_wo_rsp(&msg).await?;
730 }
731 }
732 }
733 }
734 }
735
736 /// Gets the next signal not filtered by the message filter.
737 ///
738 /// Use the same `sig_match` used with [`insert_sig_match`] to wait for its associated signals.
739 /// Signals with this connection as a destination are always sent to this connection regardless
740 /// of if there is an matching [`MatchRule`]. If these signals are not filtered out and do not match
741 /// a given filter they can be retreived uisng the default `MatchRule`.
742 /// # Notes
743 /// * While awaiting for a matching signal, this future will process other incoming messages.
744 /// This processing may include placing messages in their correct queue or sending simple responses out the connection.
745 /// # Examples
746 /// See [`insert_sig_match`] for an example.
747 ///
748 /// [`insert_sig_match`]: ./struct.RpcConn.html#method.insert_sig_match
749 pub async fn get_signal(&self, sig_match: &MatchRule) -> std::io::Result<MarshalledMessage> {
750 let recv_data = self.recv_data.lock().await;
751 let idx = recv_data
752 .sig_matches
753 .binary_search(sig_match)
754 .map_err(|_| {
755 std::io::Error::new(ErrorKind::InvalidInput, "Unknown match rule given!")
756 })?;
757 let recv = recv_data.sig_matches[idx]
758 .queue
759 .as_ref()
760 .unwrap()
761 .get_receiver();
762 drop(recv_data);
763
764 let msg_fut = recv.recv().map_err(|_| {
765 std::io::Error::new(
766 ErrorKind::Interrupted,
767 "Signal match was deleted while waiting!",
768 )
769 });
770 let sig_pred = |msg: &MarshalledMessage, _: &mut RecvData| sig_match.matches(msg);
771 self.get_msg(msg_fut, sig_pred).await
772 }
773
774 /// Gets the next call associated with the given path.
775 ///
776 /// Use `insert_call_path` to setup the `RpcConn` for receiving calls.
777 /// An `InvalidInput` error is returned if the path does not have an associated queue.
778 /// # Notes
779 /// * While awaiting for a matching call, this future will process other incoming messages.
780 /// This processing may include placing messages in their correct queue or sending simple responses out the connection.
781 ///
782 /// [`insert_call_path`]: ./struct.RpcConn.html#method.insert_call_path
783 pub async fn get_call<'a, S, D>(&self, path: S) -> std::io::Result<MarshalledMessage>
784 where
785 S: TryInto<&'a ObjectPath, Error = D>,
786 D: std::fmt::Debug,
787 {
788 let path = path.try_into().map_err(|e| {
789 std::io::Error::new(ErrorKind::InvalidInput, format!("Invalid path: {:?}", e))
790 })?;
791 let recv_guard = self.recv_data.lock().await;
792 let msg_queue = recv_guard.hierarchy.get_queue(path).ok_or_else(|| {
793 std::io::Error::new(ErrorKind::InvalidInput, "Unknown message path given!")
794 })?;
795 let recv = msg_queue.get_receiver();
796 drop(recv_guard);
797 let call_fut = recv.recv().map_err(|_| {
798 std::io::Error::new(
799 ErrorKind::Interrupted,
800 "Call Queue was deleted while waiting!",
801 )
802 });
803 let call_pred = |msg: &MarshalledMessage, recv_data: &mut RecvData| match &msg.typ {
804 MessageType::Call => {
805 let msg_path =
806 ObjectPath::from_str(msg.dynheader.object.as_ref().unwrap()).unwrap();
807 recv_data.hierarchy.is_match(path, msg_path)
808 }
809 _ => false,
810 };
811 self.get_msg(call_fut, call_pred).await
812 }
813
814 /// Configure what action the `RpcConn` should take when receiving calls for a path or namespace.
815 ///
816 /// See [`CallAction`] for details on what each action does.
817 /// The default action before any `insert_call_path` calls is to drop all incoming messsage calls and reply with an error.
818 ///
819 /// [`CallAction`]: ./enum.CallAction.html
820 pub async fn insert_call_path<'a, S, D>(&self, path: S, action: CallAction) -> Result<(), D>
821 where
822 S: TryInto<&'a ObjectPath, Error = D>,
823 {
824 let path = path.try_into()?;
825 let mut recv_data = self.recv_data.lock().await;
826 recv_data.hierarchy.insert_path(path, action);
827 Ok(())
828 }
829 /// Get the action for a path.
830 /// # Returns
831 /// Returns the action if there is one for that path.
832 /// If there is no action for the given path or the path is invalid `None` is returned.
833 pub async fn get_call_path_action<'a, S: TryInto<&'a ObjectPath>>(
834 &self,
835 path: S,
836 ) -> Option<CallAction> {
837 let path = path.try_into().ok()?;
838 let recv_data = self.recv_data.lock().await;
839 recv_data.hierarchy.get_action(path)
840 }
841 /// Get a receiver for the incoming call queue if there is one.
842 ///
843 /// This receiver will produce calls from this path/namespace.
844 /// Receiving messages from the `Receiver` doesn't actually cause the `RpcConn` to do any work.
845 /// It will only produce messages if another future from `get_signal` or `get_call` is being worked on.
846 /// This method can be useful if you know that there are other tasks working using this `RpcConn` that will do the work of processing incoming messages.
847 pub async fn get_call_recv<'a, S: TryInto<&'a ObjectPath>>(
848 &self,
849 path: S,
850 ) -> Option<CReceiver<MarshalledMessage>> {
851 let path = path.try_into().ok()?;
852 let recv_data = self.recv_data.lock().await;
853 Some(recv_data.hierarchy.get_queue(path)?.get_receiver())
854 }
855 /// Request a name from the DBus daemon.
856 ///
857 /// Returns `true` if the name was successfully obtained or if it was already owned by this connection.
858 /// otherwise `false` is returned (assuming an IO error did not occur).
859 /// The `DO_NOT_QUEUE` flag is used with the request so if the name is not available, this connection will not be queued to own it in the future.
860 pub async fn request_name(&self, name: &str) -> std::io::Result<bool> {
861 let req = request_name(name, DBUS_NAME_FLAG_DO_NOT_QUEUE);
862 let res = self.send_msg_w_rsp(&req).await?.await?;
863 if MessageType::Error == res.typ {
864 return Ok(false);
865 }
866 Ok(match res.body.parser().get::<u32>() {
867 Ok(ret) => matches!(
868 ret,
869 DBUS_REQUEST_NAME_REPLY_ALREADY_OWNER | DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER
870 ),
871 Err(_) => false,
872 })
873 }
874 /// Release a name from the DBus daemon.
875 ///
876 /// An `Err` is only returned on a IO error.
877 /// If the name was not owned by the connection, or the name was invalid `Ok` is still returned.
878 pub async fn release_name(&self, name: &str) -> std::io::Result<()> {
879 let rel_name = release_name(name);
880 self.send_msg_w_rsp(&rel_name).await?.await?;
881 Ok(())
882 }
883}
884impl AsRawFd for RpcConn {
885 fn as_raw_fd(&self) -> RawFd {
886 self.conn.as_raw_fd()
887 }
888}
889
890struct ResponseFuture<'a, T>
891where
892 T: Future<Output = std::io::Result<MarshalledMessage>>,
893{
894 rpc_conn: &'a RpcConn,
895 idx: NonZeroU32,
896 fut: T,
897}
898
899impl<T> Future for ResponseFuture<'_, T>
900where
901 T: Future<Output = std::io::Result<MarshalledMessage>>,
902{
903 type Output = T::Output;
904 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
905 unsafe { self.map_unchecked_mut(|s| &mut s.fut).poll(cx) }
906 }
907}
908
909impl<T> Drop for ResponseFuture<'_, T>
910where
911 T: Future<Output = std::io::Result<MarshalledMessage>>,
912{
913 fn drop(&mut self) {
914 if let Ok(mut recv_lock) = self.rpc_conn.recv_data.try_lock() {
915 recv_lock.reply_map.remove(&self.idx);
916 return;
917 }
918 let reply_arc = Arc::clone(&self.rpc_conn.recv_data);
919
920 //TODO: Is there a better solution to this?
921 let idx = self.idx;
922 tokio::spawn(async move {
923 let mut recv_lock = reply_arc.lock().await;
924 recv_lock.reply_map.remove(&idx);
925 });
926 }
927}
928
929fn expects_reply(msg: &MarshalledMessage) -> bool {
930 msg.typ == MessageType::Call && (msg.flags & NO_REPLY_EXPECTED) == 0
931}