Struct zbus::Connection

source ·
pub struct Connection { /* private fields */ }
Expand description

A D-Bus connection.

A connection to a D-Bus bus, or a direct peer.

Once created, the connection is authenticated and negotiated and messages can be sent or received, such as method calls or signals.

For higher-level message handling (typed functions, introspection, documentation reasons etc), it is recommended to wrap the low-level D-Bus messages into Rust functions with the dbus_proxy and dbus_interface macros instead of doing it directly on a Connection.

Typically, a connection is made to the session bus with Connection::session, or to the system bus with Connection::system. Then the connection is used with crate::Proxy instances or the on-demand ObjectServer instance that can be accessed through Connection::object_server.

Connection implements Clone and cloning it is a very cheap operation, as the underlying data is not cloned. This makes it very convenient to share the connection between different parts of your code. Connection also implements std::marker::Sync and std::marker::Send so you can send and share a connection instance across threads as well.

Connection keeps internal queues of incoming message. The default capacity of each of these is 64. The capacity of the main (unfiltered) queue is configurable through the set_max_queued method. When the queue is full, no more messages can be received until room is created for more. This is why it’s important to ensure that all crate::MessageStream and crate::blocking::MessageIterator instances are continuously polled and iterated on, respectively.

For sending messages you can either use Connection::send_message method or make use of the Sink implementation. For latter, you might find SinkExt API very useful. Keep in mind that Connection will not manage the serial numbers (cookies) on the messages for you when they are sent through the Sink implementation. You can manually assign unique serial numbers to them using the Connection::assign_serial_num method before sending them off, if needed. Having said that, the Sink is mainly useful for sending out signals, as they do not expect a reply, and serial numbers are not very useful for signals either for the same reason.

Since you do not need exclusive access to a zbus::Connection to send messages on the bus, Sink is also implemented on &Connection.

Caveats

At the moment, a simultaneous flush request from multiple tasks/threads could potentially create a busy loop, thus wasting CPU time. This limitation may be removed in the future.

Examples

Get the session bus ID
use zbus::Connection;

let connection = Connection::session().await?;

let reply = connection
    .call_method(
        Some("org.freedesktop.DBus"),
        "/org/freedesktop/DBus",
        Some("org.freedesktop.DBus"),
        "GetId",
        &(),
    )
    .await?;

let id: &str = reply.body()?;
println!("Unique ID of the bus: {}", id);
Monitoring all messages

Let’s eavesdrop on the session bus 😈 using the Monitor interface:

use futures_util::stream::TryStreamExt;
use zbus::{Connection, MessageStream};

let connection = Connection::session().await?;

connection
    .call_method(
        Some("org.freedesktop.DBus"),
        "/org/freedesktop/DBus",
        Some("org.freedesktop.DBus.Monitoring"),
        "BecomeMonitor",
        &(&[] as &[&str], 0u32),
    )
    .await?;

let mut stream = MessageStream::from(connection);
while let Some(msg) = stream.try_next().await? {
    println!("Got message: {}", msg);
}

This should print something like:

Got message: Signal NameAcquired from org.freedesktop.DBus
Got message: Signal NameLost from org.freedesktop.DBus
Got message: Method call GetConnectionUnixProcessID from :1.1324
Got message: Error org.freedesktop.DBus.Error.NameHasNoOwner:
             Could not get PID of name ':1.1332': no such name from org.freedesktop.DBus
Got message: Method call AddMatch from :1.918
Got message: Method return from org.freedesktop.DBus

Implementations§

source§

impl Connection

source

pub async fn send_message(&self, msg: Message) -> Result<u32>

Send msg to the peer.

Unlike our Sink implementation, this method sets a unique (to this connection) serial number on the message before sending it off, for you.

On successfully sending off msg, the assigned serial number is returned.

source

pub async fn call_method<'d, 'p, 'i, 'm, D, P, I, M, B>( &self, destination: Option<D>, path: P, interface: Option<I>, method_name: M, body: &B ) -> Result<Arc<Message>>where D: TryInto<BusName<'d>>, P: TryInto<ObjectPath<'p>>, I: TryInto<InterfaceName<'i>>, M: TryInto<MemberName<'m>>, D::Error: Into<Error>, P::Error: Into<Error>, I::Error: Into<Error>, M::Error: Into<Error>, B: Serialize + DynamicType,

Send a method call.

Create a method-call message, send it over the connection, then wait for the reply.

On successful reply, an Ok(Message) is returned. On error, an Err is returned. D-Bus error replies are returned as Error::MethodError.

source

pub async fn emit_signal<'d, 'p, 'i, 'm, D, P, I, M, B>( &self, destination: Option<D>, path: P, interface: I, signal_name: M, body: &B ) -> Result<()>where D: TryInto<BusName<'d>>, P: TryInto<ObjectPath<'p>>, I: TryInto<InterfaceName<'i>>, M: TryInto<MemberName<'m>>, D::Error: Into<Error>, P::Error: Into<Error>, I::Error: Into<Error>, M::Error: Into<Error>, B: Serialize + DynamicType,

Emit a signal.

Create a signal message, and send it over the connection.

source

pub async fn reply<B>(&self, call: &Message, body: &B) -> Result<u32>where B: Serialize + DynamicType,

Reply to a message.

Given an existing message (likely a method call), send a reply back to the caller with the given body.

Returns the message serial number.

source

pub async fn reply_error<'e, E, B>( &self, call: &Message, error_name: E, body: &B ) -> Result<u32>where B: Serialize + DynamicType, E: TryInto<ErrorName<'e>>, E::Error: Into<Error>,

Reply an error to a message.

Given an existing message (likely a method call), send an error reply back to the caller with the given error_name and body.

Returns the message serial number.

source

pub async fn reply_dbus_error( &self, call: &MessageHeader<'_>, err: impl DBusError ) -> Result<u32>

Reply an error to a message.

Given an existing message (likely a method call), send an error reply back to the caller using one of the standard interface reply types.

Returns the message serial number.

source

pub async fn request_name<'w, W>(&self, well_known_name: W) -> Result<()>where W: TryInto<WellKnownName<'w>>, W::Error: Into<Error>,

Register a well-known name for this connection.

When connecting to a bus, the name is requested from the bus. In case of p2p connection, the name (if requested) is used of self-identification.

You can request multiple names for the same connection. Use Connection::release_name for deregistering names registered through this method.

Note that exclusive ownership without queueing is requested (using RequestNameFlags::ReplaceExisting and RequestNameFlags::DoNotQueue flags) since that is the most typical case. If that is not what you want, you should use Connection::request_name_with_flags instead (but make sure then that name is requested after you’ve setup your service implementation with the ObjectServer).

Caveats

The associated ObjectServer will only handle method calls destined for the unique name of this connection or any of the registered well-known names. If no well-known name is registered, the method calls destined to all well-known names will be handled.

Since names registered through any other means than Connection or ConnectionBuilder API are not known to the connection, method calls destined to those names will only be handled by the associated ObjectServer if none of the names are registered through Connection* API. Simply put, either register all the names through Connection* API or none of them.

Errors

Fails with zbus::Error::NameTaken if the name is already owned by another peer.

source

pub async fn request_name_with_flags<'w, W>( &self, well_known_name: W, flags: BitFlags<RequestNameFlags> ) -> Result<RequestNameReply>where W: TryInto<WellKnownName<'w>>, W::Error: Into<Error>,

Register a well-known name for this connection.

This is the same as Connection::request_name but allows to specify the flags to use when requesting the name.

If the RequestNameFlags::DoNotQueue flag is not specified and request ends up in the queue, you can use fdo::NameAcquiredStream to be notified when the name is acquired. A queued name request can be cancelled using Connection::release_name.

If the RequestNameFlags::AllowReplacement flag is specified, the requested name can be lost if another peer requests the same name. You can use fdo::NameLostStream to be notified when the name is lost

Example
use zbus::{Connection, fdo::{DBusProxy, RequestNameFlags, RequestNameReply}};
use enumflags2::BitFlags;
use futures_util::stream::StreamExt;

let name = "org.freedesktop.zbus.QueuedNameTest";
let conn1 = Connection::session().await?;
// This should just work right away.
conn1.request_name(name).await?;

let conn2 = Connection::session().await?;
// A second request from the another connection will fail with `DoNotQueue` flag, which is
// implicit with `request_name` method.
assert!(conn2.request_name(name).await.is_err());

// Now let's try w/o `DoNotQueue` and we should be queued.
let reply = conn2
    .request_name_with_flags(name, RequestNameFlags::AllowReplacement.into())
    .await?;
assert_eq!(reply, RequestNameReply::InQueue);
// Another request should just give us the same response.
let reply = conn2
    // The flags on subsequent requests will however be ignored.
    .request_name_with_flags(name, BitFlags::empty())
    .await?;
assert_eq!(reply, RequestNameReply::InQueue);
let mut acquired_stream = DBusProxy::new(&conn2)
    .await?
    .receive_name_acquired()
    .await?;
assert!(conn1.release_name(name).await?);
// This would have waited forever if `conn1` hadn't just release the name.
let acquired = acquired_stream.next().await.unwrap();
assert_eq!(acquired.args().unwrap().name, name);

// conn2 made the mistake of being too nice and allowed name replacemnt, so conn1 should be
// able to take it back.
let mut lost_stream = DBusProxy::new(&conn2)
    .await?
    .receive_name_lost()
    .await?;
conn1.request_name(name).await?;
let lost = lost_stream.next().await.unwrap();
assert_eq!(lost.args().unwrap().name, name);
Caveats
  • Same as that of Connection::request_name.
  • If you wish to track changes to name ownership after this call, make sure that the fdo::NameAcquired and/or fdo::NameLostStream instance(s) are created before calling this method. Otherwise, you may loose the signal if it’s emitted after this call but just before the stream instance get created.
source

pub async fn release_name<'w, W>(&self, well_known_name: W) -> Result<bool>where W: TryInto<WellKnownName<'w>>, W::Error: Into<Error>,

Deregister a previously registered well-known name for this service on the bus.

Use this method to deregister a well-known name, registered through Connection::request_name.

Unless an error is encountered, returns Ok(true) if name was previously registered with the bus through self and it has now been successfully deregistered, Ok(false) if name was not previously registered or already deregistered.

source

pub fn is_bus(&self) -> bool

Checks if self is a connection to a message bus.

This will return false for p2p connections.

source

pub fn assign_serial_num(&self, msg: &mut Message) -> Result<u32>

Assigns a serial number to msg that is unique to this connection.

This method can fail if msg is corrupted.

source

pub fn unique_name(&self) -> Option<&OwnedUniqueName>

The unique name of the connection, if set/applicable.

The unique name is assigned by the message bus or set manually using Connection::set_unique_name.

source

pub fn set_unique_name<U>(&self, unique_name: U) -> Result<()>where U: TryInto<OwnedUniqueName>, U::Error: Into<Error>,

Sets the unique name of the connection (if not already set).

Panics

This method panics if the unique name is already set. It will always panic if the connection is to a message bus as it’s the bus that assigns peers their unique names. This is mainly provided for bus implementations. All other users should not need to use this method.

source

pub fn max_queued(&self) -> usize

The capacity of the main (unfiltered) queue.

source

pub fn set_max_queued(&mut self, max: usize)

Set the capacity of the main (unfiltered) queue.

source

pub fn server_guid(&self) -> &str

The server’s GUID.

source

pub fn executor(&self) -> &Executor<'static>

The underlying executor.

When a connection is built with internal_executor set to false, zbus will not spawn a thread to run the executor. You’re responsible to continuously tick the executor. Failure to do so will result in hangs.

Examples

Here is how one would typically run the zbus executor through async-std’s single-threaded scheduler:

use zbus::ConnectionBuilder;
use async_std::task::{block_on, spawn};

block_on(async {
    let conn = ConnectionBuilder::session()
        .unwrap()
        .internal_executor(false)
        .build()
        .await
        .unwrap();
    {
       let conn = conn.clone();
       spawn(async move {
           loop {
               conn.executor().tick().await;
           }
       });
    }

    // All your other async code goes here.
});

Note: zbus 2.1 added support for tight integration with tokio. This means, if you use zbus with tokio, you do not need to worry about this at all. All you need to do is enable tokio feature. You should also disable the (default) async-io feature in your Cargo.toml to avoid unused dependencies. Also note that prior to zbus 3.0, disabling async-io was required to enable tight tokio integration.

source

pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_

Get a reference to the associated ObjectServer.

The ObjectServer is created on-demand.

Note: Once the ObjectServer is created, it will be replying to all method calls received on self. If you want to manually reply to method calls, do not use this method (or any of the ObjectServer related API).

source

pub async fn session() -> Result<Self>

Create a Connection to the session/user message bus.

source

pub async fn system() -> Result<Self>

Create a Connection to the system-wide message bus.

source

pub fn monitor_activity(&self) -> EventListener

Returns a listener, notified on various connection activity.

This function is meant for the caller to implement idle or timeout on inactivity.

source

pub fn peer_pid(&self) -> Result<Option<u32>>

Returns the peer process ID, or Ok(None) if it cannot be returned for the associated socket.

Trait Implementations§

source§

impl Clone for Connection

source§

fn clone(&self) -> Connection

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl Debug for Connection

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl From<&Connection> for MessageStream

source§

fn from(conn: &Connection) -> Self

Converts to this type from the input type.
source§

impl From<&MessageStream> for Connection

source§

fn from(stream: &MessageStream) -> Connection

Converts to this type from the input type.
source§

impl From<Connection> for Connection

source§

fn from(conn: Connection) -> Self

Converts to this type from the input type.
source§

impl From<Connection> for Connection

source§

fn from(conn: Connection) -> Self

Converts to this type from the input type.
source§

impl From<Connection> for MessageStream

source§

fn from(conn: Connection) -> Self

Converts to this type from the input type.
source§

impl From<MessageStream> for Connection

source§

fn from(stream: MessageStream) -> Connection

Converts to this type from the input type.
source§

impl<'a, T> Sink<T> for &'a Connectionwhere T: Into<Arc<Message>>,

§

type Error = Error

The type of value produced by the sink when an error occurs.
source§

fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>>

Attempts to prepare the Sink to receive a value. Read more
source§

fn start_send(self: Pin<&mut Self>, msg: T) -> Result<()>

Begin the process of sending a value to the sink. Each call to this function must be preceded by a successful call to poll_ready which returned Poll::Ready(Ok(())). Read more
source§

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>

Flush any remaining output from this sink. Read more
source§

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>

Flush any remaining output and close this sink, if necessary. Read more
source§

impl<T> Sink<T> for Connectionwhere T: Into<Arc<Message>>,

§

type Error = Error

The type of value produced by the sink when an error occurs.
source§

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>

Attempts to prepare the Sink to receive a value. Read more
source§

fn start_send(self: Pin<&mut Self>, msg: T) -> Result<()>

Begin the process of sending a value to the sink. Each call to this function must be preceded by a successful call to poll_ready which returned Poll::Ready(Ok(())). Read more
source§

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>

Flush any remaining output from this sink. Read more
source§

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>

Flush any remaining output and close this sink, if necessary. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

const: unstable · source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

const: unstable · source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

const: unstable · source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for Twhere U: From<T>,

const: unstable · source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> Same<T> for T

§

type Output = T

Should always be Self
source§

impl<T, Item> SinkExt<Item> for Twhere T: Sink<Item> + ?Sized,

source§

fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>where F: FnMut(U) -> Fut, Fut: Future<Output = Result<Item, E>>, E: From<Self::Error>, Self: Sized,

Composes a function in front of the sink. Read more
source§

fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>where F: FnMut(U) -> St, St: Stream<Item = Result<Item, Self::Error>>, Self: Sized,

Composes a function in front of the sink. Read more
source§

fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>where F: FnOnce(Self::Error) -> E, Self: Sized,

Transforms the error returned by the sink.
source§

fn sink_err_into<E>(self) -> SinkErrInto<Self, Item, E>where Self: Sized, Self::Error: Into<E>,

Map this sink’s error to a different error type using the Into trait. Read more
source§

fn buffer(self, capacity: usize) -> Buffer<Self, Item>where Self: Sized,

Adds a fixed-size buffer to the current sink. Read more
source§

fn close(&mut self) -> Close<'_, Self, Item>where Self: Unpin,

Close the sink.
source§

fn fanout<Si>(self, other: Si) -> Fanout<Self, Si>where Self: Sized, Item: Clone, Si: Sink<Item, Error = Self::Error>,

Fanout items to multiple sinks. Read more
source§

fn flush(&mut self) -> Flush<'_, Self, Item>where Self: Unpin,

Flush the sink, processing all pending items. Read more
source§

fn send(&mut self, item: Item) -> Send<'_, Self, Item>where Self: Unpin,

A future that completes after the given item has been fully processed into the sink, including flushing. Read more
source§

fn feed(&mut self, item: Item) -> Feed<'_, Self, Item>where Self: Unpin,

A future that completes after the given item has been received by the sink. Read more
source§

fn send_all<St, 'a>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>where St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized, Self: Unpin,

A future that completes after the given stream has been fully processed into the sink, including flushing. Read more
source§

fn left_sink<Si2>(self) -> Either<Self, Si2>where Si2: Sink<Item, Error = Self::Error>, Self: Sized,

Wrap this sink in an Either sink, making it the left-hand variant of that Either. Read more
source§

fn right_sink<Si1>(self) -> Either<Si1, Self>where Si1: Sink<Item, Error = Self::Error>, Self: Sized,

Wrap this stream in an Either stream, making it the right-hand variant of that Either. Read more
source§

fn poll_ready_unpin( &mut self, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>where Self: Unpin,

A convenience method for calling Sink::poll_ready on Unpin sink types.
source§

fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>where Self: Unpin,

A convenience method for calling Sink::start_send on Unpin sink types.
source§

fn poll_flush_unpin( &mut self, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>where Self: Unpin,

A convenience method for calling Sink::poll_flush on Unpin sink types.
source§

fn poll_close_unpin( &mut self, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>where Self: Unpin,

A convenience method for calling Sink::poll_close on Unpin sink types.
source§

impl<T> ToOwned for Twhere T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
const: unstable · source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
const: unstable · source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for Twhere V: MultiLane<T>,

§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more