dbus_async/
dbus.rs

1use crate::{
2    command::Command,
3    connection::Connection,
4    error::DBusResult,
5    introspect::add_introspect,
6    peer::add_peer,
7    stream::Stream,
8    {DBusError, DBusNameFlag},
9};
10use dbus_message_parser::{
11    match_rule::MatchRule,
12    message::{Message, MessageType},
13    value::{Bus, Interface, ObjectPath, Value},
14};
15use dbus_server_address_parser::Address;
16use futures::channel::{
17    mpsc::{unbounded, Receiver as MpscReceiver, Sender as MpscSender, UnboundedSender},
18    oneshot::channel,
19};
20use std::{collections::HashSet, convert::TryInto, env::var, sync::Arc};
21use tokio::{spawn, task::JoinHandle};
22
23/// This struct represents an object to communicate with the DBus daemon.
24#[derive(Clone)]
25pub struct DBus {
26    command_sender: UnboundedSender<Command>,
27    address: Arc<Address>,
28}
29
30impl DBus {
31    /// Connect to the session DBus.
32    ///
33    /// If the first argument (`introspectable`) is `true` then the Peer is [introspectable].
34    /// If the second argument (`peer`) is `true` then the Peer has the
35    /// [`org.freedesktop.DBus.Peer`].
36    ///
37    /// The `DBUS_SESSION_BUS_ADDRESS` environment variable **have to** be defined.
38    ///
39    /// [introspectable]: https://dbus.freedesktop.org/doc/dbus-specification.html#standard-interfaces-introspectable
40    /// [`org.freedesktop.DBus.Peer`]: https://dbus.freedesktop.org/doc/dbus-specification.html#standard-interfaces-peer
41    pub async fn session(introspectable: bool, peer: bool) -> DBusResult<(DBus, JoinHandle<()>)> {
42        if let Ok(path) = var("DBUS_SESSION_BUS_ADDRESS") {
43            DBus::new(&path, introspectable, peer).await
44        } else {
45            // It could not connect to any socket
46            Err(DBusError::DBusSessionBusAddress)
47        }
48    }
49
50    /// Connect to the system DBus.
51    ///
52    /// If the first argument (`introspectable`) is `true` then the Peer is [introspectable].
53    /// If the second argument (`peer`) is `true` then the Peer has the
54    /// [`org.freedesktop.DBus.Peer`].
55    ///
56    /// If there `DBUS_SYSTEM_BUS_ADDRESS` environment variable is defined then this path will be
57    /// used, else `unix:path=/var/run/dbus/system_bus_socket`.
58    ///
59    /// [introspectable]: https://dbus.freedesktop.org/doc/dbus-specification.html#standard-interfaces-introspectable
60    /// [`org.freedesktop.DBus.Peer`]: https://dbus.freedesktop.org/doc/dbus-specification.html#standard-interfaces-peer
61    pub async fn system(introspectable: bool, peer: bool) -> DBusResult<(DBus, JoinHandle<()>)> {
62        let path = if let Ok(path) = var("DBUS_SYSTEM_BUS_ADDRESS") {
63            path
64        } else {
65            "unix:path=/var/run/dbus/system_bus_socket".to_string()
66        };
67        DBus::new(&path, introspectable, peer).await
68    }
69
70    /// Connect to the specific (`addressses`) DBus daemon.
71    ///
72    /// If the second argument (`introspectable`) is `true` then the Peer is [introspectable].
73    /// If the third argument (`peer`) is `true` then the Peer has the
74    /// [`org.freedesktop.DBus.Peer`].
75    ///
76    /// [introspectable]: https://dbus.freedesktop.org/doc/dbus-specification.html#standard-interfaces-introspectable
77    /// [`org.freedesktop.DBus.Peer`]: https://dbus.freedesktop.org/doc/dbus-specification.html#standard-interfaces-peer
78    pub async fn new(
79        addressses: &str,
80        introspectable: bool,
81        peer: bool,
82    ) -> DBusResult<(DBus, JoinHandle<()>)> {
83        let (command_sender, command_receiver) = unbounded::<Command>();
84
85        // Create and spawn the stream and sink task.
86        let (address, stream) = Stream::new(addressses).await?;
87        let (message_sink, message_stream) = stream.start();
88
89        // Spawn the connection task.
90        let connection = Connection::from(command_receiver, message_sink, message_stream);
91        let connection_handle = spawn(connection.run());
92
93        let address = Arc::new(address);
94        let dbus = DBus {
95            command_sender,
96            address,
97        };
98
99        if introspectable {
100            add_introspect(dbus.clone())?;
101        }
102
103        if peer {
104            add_peer(dbus.clone())?;
105        }
106
107        // Send the Hello message.
108        let msg = dbus.call_hello().await?;
109        if let MessageType::Error = msg.get_type() {
110            let error = msg.get_error_name().unwrap();
111            Err(DBusError::Hello(error.clone()))
112        } else {
113            Ok((dbus, connection_handle))
114        }
115    }
116
117    /// Send a [`Message`](dbus_message_parser::message::Message).
118    pub fn send(&self, msg: Message) -> DBusResult<()> {
119        // Try to send the message.
120        let command = Command::SendMessage(msg);
121        self.command_sender.unbounded_send(command)?;
122        Ok(())
123    }
124
125    /// Send a [`Message`] and wait for a response.
126    ///
127    /// The [`Message`] have to be a `MessageCall`.
128    ///
129    /// [`Message`]: dbus_message_parser::message::Message
130    pub async fn call(&self, msg: Message) -> DBusResult<Message> {
131        // Create a oneshot channel for the response
132        let (msg_sender, msg_receiver) = channel::<Message>();
133        // Try to send the message.
134        let command = Command::SendMessageOneshot(msg, msg_sender);
135        self.command_sender.unbounded_send(command)?;
136        let msg = msg_receiver.await?;
137        Ok(msg)
138    }
139
140    /// Send a [`Message`] and specify a channel, where the response should be send.
141    ///
142    /// This function returns the serial number of the [`Message`]. This is useful, where the the
143    /// response and signals have to be processed in order.
144    ///
145    /// [`Message`]: dbus_message_parser::message::Message
146    pub async fn call_reply_serial(
147        &self,
148        msg: Message,
149        msg_sender: MpscSender<Message>,
150    ) -> DBusResult<u32> {
151        let (reply_serial_sender, reply_serial_receiver) = channel::<u32>();
152        // Try to send the message.
153        let command = Command::SendMessageMpcs(msg, reply_serial_sender, msg_sender);
154        self.command_sender.unbounded_send(command)?;
155        let reply_serial = reply_serial_receiver.await?;
156        Ok(reply_serial)
157    }
158
159    /// Call the [`Hello()`] method of the DBus daemon.
160    ///
161    /// [`Hello()`]: https://dbus.freedesktop.org/doc/dbus-specification.html#bus-messages-hello
162    async fn call_hello(&self) -> DBusResult<Message> {
163        let msg = Message::method_call(
164            "org.freedesktop.DBus".try_into().unwrap(),
165            "/org/freedesktop/DBus".try_into().unwrap(),
166            "org.freedesktop.DBus".try_into().unwrap(),
167            "Hello".try_into().unwrap(),
168        );
169        self.call(msg).await
170    }
171
172    /// Register a name for the peer. This calls the [`RequestName(String, UInt32)`] method of the
173    /// DBus daemon.
174    ///
175    /// [`RequestName(String, UInt32)`]: https://dbus.freedesktop.org/doc/dbus-specification.html#bus-messages-request-name
176    pub async fn request_name(&self, name: Bus, flags: &DBusNameFlag) -> DBusResult<Message> {
177        let mut msg = Message::method_call(
178            "org.freedesktop.DBus".try_into().unwrap(),
179            "/org/freedesktop/DBus".try_into().unwrap(),
180            "org.freedesktop.DBus".try_into().unwrap(),
181            "RequestName".try_into().unwrap(),
182        );
183        msg.add_value(Value::String(name.into()));
184        msg.add_value(Value::Uint32(flags.bits()));
185        self.call(msg).await
186    }
187
188    /// Add a channel to a specific [`ObjectPath`].
189    ///
190    /// The channel will receive all [`MethodCall`] messages for the specified [`ObjectPath`].
191    ///
192    /// If there is already channel added for this [`ObjectPath`] then it will be replace. So the
193    /// old channel will not receive any [`MethodCall`] messages for the [`ObjectPath`] anymore.
194    ///
195    /// [`ObjectPath`]: dbus_message_parser::value::ObjectPath
196    /// [`MethodCall`]: dbus_message_parser::message::MessageType::MethodCall
197    pub fn add_method_call(
198        &self,
199        object_path: ObjectPath,
200        sender: MpscSender<Message>,
201    ) -> DBusResult<()> {
202        let command = Command::AddMethodCall(object_path, sender);
203        self.command_sender.unbounded_send(command)?;
204        Ok(())
205    }
206
207    /// Delete the channel for a specific [`ObjectPath`] (see [`add_method_call`]).
208    ///
209    /// Even if there is no channel for this [`ObjectPath`] the function will return `Ok()`.
210    ///
211    /// [`add_method_call`]: #method.add_method_call
212    /// [`ObjectPath`]: dbus_message_parser::value::ObjectPath
213    pub fn delete_object_path(&self, object_path: ObjectPath) -> DBusResult<()> {
214        let command = Command::DeleteMethodCall(object_path);
215        self.command_sender.unbounded_send(command)?;
216        Ok(())
217    }
218
219    /// Delete the channel for every [`ObjectPath`], which the given sender is connected to
220    /// (see [`add_method_call`]).
221    ///
222    /// [`add_method_call`]: #method.add_method_call
223    pub fn delete_method_call_sender(&self, sender: MpscSender<Message>) -> DBusResult<()> {
224        let command = Command::DeleteMethodCallSender(sender);
225        self.command_sender.unbounded_send(command)?;
226        Ok(())
227    }
228
229    /// Delete the channel for every [`ObjectPath`], which the given sender is connected to
230    /// (see [`add_method_call`]).
231    ///
232    /// [`add_method_call`]: #method.add_method_call
233    /// [`ObjectPath`]: dbus_message_parser::value::ObjectPath
234    pub fn delete_method_call_receiver(&self, receiver: MpscReceiver<Message>) -> DBusResult<()> {
235        let command = Command::DeleteMethodCallReceiver(receiver);
236        self.command_sender.unbounded_send(command)?;
237        Ok(())
238    }
239
240    /// Add a channel to a specific [`Interface`].
241    ///
242    /// The channel will **only** receive all `MethodCall` messages for the specified [`Interface`],
243    /// if there is no channel by the [`ObjectPath`].
244    ///
245    /// If there is already channel added for this [`Interface`] then it will be replace. So the old
246    /// channel will not receive any `MethodCall` messages for the [`Interface`] anymore.
247    ///
248    /// [`Interface`]: dbus_message_parser::value::Interface
249    /// [`ObjectPath`]: dbus_message_parser::value::ObjectPath
250    pub fn add_method_call_interface(
251        &self,
252        interface: Interface,
253        sender: MpscSender<Message>,
254    ) -> DBusResult<()> {
255        let command = Command::AddMethodCallInterface(interface, sender);
256        self.command_sender.unbounded_send(command)?;
257        Ok(())
258    }
259
260    /// Delete the channel for every [`Interface`], which the given sender is connected to
261    /// (see [`add_method_call_interface`]).
262    ///
263    /// [`add_method_call_interface`]: #method.add_method_call_interface
264    /// [`Interface`]: dbus_message_parser::value::Interface
265    pub fn delete_method_call_interface_sender(
266        &self,
267        sender: MpscSender<Message>,
268    ) -> DBusResult<()> {
269        let command = Command::DeleteMethodCallInterfaceSender(sender);
270        self.command_sender.unbounded_send(command)?;
271        Ok(())
272    }
273
274    /// Delete the channel for every [`Interface`], which the given sender is connected to
275    /// (see [`add_method_call_interface`]).
276    ///
277    /// [`add_method_call_interface`]: #method.add_method_call_interface
278    /// [`Interface`]: dbus_message_parser::value::Interface
279    pub fn delete_method_call_interface_receiver(
280        &self,
281        receiver: MpscReceiver<Message>,
282    ) -> DBusResult<()> {
283        let command = Command::DeleteMethodCallInterfaceReceiver(receiver);
284        self.command_sender.unbounded_send(command)?;
285        Ok(())
286    }
287
288    /// Add a channel to a specific [`ObjectPath`].
289    ///
290    /// The channel will receive all [`Signal`] messages for the specified [`ObjectPath`].
291    ///
292    /// The second argument specify a closure to filter the [`Message`]. If the closure returns true
293    /// then the [`Message`] will not be send to the channel.
294    ///
295    /// There can be multiple channels, which will receive message of the specific [`ObjectPath`].
296    ///
297    /// [`Signal`]: dbus_message_parser::message::MessageType::Signal
298    /// [`Message`]: dbus_message_parser::message::Message
299    /// [`ObjectPath`]: dbus_message_parser::value::ObjectPath
300    pub fn add_signal(
301        &self,
302        object_path: ObjectPath,
303        filter: Option<fn(&Message) -> bool>,
304        sender: MpscSender<Message>,
305    ) -> DBusResult<()> {
306        let command = Command::AddSignal(object_path, filter, sender);
307        self.command_sender.unbounded_send(command)?;
308        Ok(())
309    }
310
311    /// Delete the channel for every [`ObjectPath`], which the given sender is connected to
312    /// (see [`add_signal`]).
313    ///
314    /// [`add_signal`]: #method.add_signal
315    /// [`ObjectPath`]: dbus_message_parser::value::ObjectPath
316    pub fn delete_signal_sender(&self, sender: MpscSender<Message>) -> DBusResult<()> {
317        let command = Command::DeleteSignalSender(sender);
318        self.command_sender.unbounded_send(command)?;
319        Ok(())
320    }
321
322    /// Delete the channel for every [`ObjectPath`], which the given sender is connected to
323    /// (see [`add_signal`]).
324    ///
325    /// [`add_signal`]: #method.add_signal
326    /// [`ObjectPath`]: dbus_message_parser::value::ObjectPath
327    pub fn delete_signal_receiver(&self, receiver: MpscReceiver<Message>) -> DBusResult<()> {
328        let command = Command::DeleteSignalReceiver(receiver);
329        self.command_sender.unbounded_send(command)?;
330        Ok(())
331    }
332
333    /// Add a channel to a specific [`MatchRule`]s.
334    ///
335    /// The channel will receive all message, which match the given [`MatchRule`]s.
336    ///
337    /// [`MatchRule`]: dbus_message_parser::match_rule::MatchRule
338    pub fn add_match_rules(
339        &self,
340        match_rules: Vec<MatchRule>,
341        sender: MpscSender<Message>,
342    ) -> DBusResult<()> {
343        let command = Command::AddMatchRules(match_rules, sender);
344        self.command_sender.unbounded_send(command)?;
345        Ok(())
346    }
347
348    /// Delete the channel for every [`MatchRule`]s, which the given sender is connected to
349    /// (see [`add_match_rules`]).
350    ///
351    /// [`add_match_rules`]: #method.add_match_rules
352    /// [`MatchRule`]: dbus_message_parser::match_rule::MatchRule
353    pub fn delete_match_rules_sender(&self, sender: MpscSender<Message>) -> DBusResult<()> {
354        let command = Command::DeleteMatchRulesSender(sender);
355        self.command_sender.unbounded_send(command)?;
356        Ok(())
357    }
358
359    /// Delete the channel for every [`MatchRule`]s, which the given sender is connected to
360    /// (see [`add_match_rules`]).
361    ///
362    /// [`add_match_rules`]: #method.add_match_rules
363    /// [`MatchRule`]: dbus_message_parser::match_rule::MatchRule
364    pub fn delete_match_rules_receiver(&self, receiver: MpscReceiver<Message>) -> DBusResult<()> {
365        let command = Command::DeleteMatchRulesReceiver(receiver);
366        self.command_sender.unbounded_send(command)?;
367        Ok(())
368    }
369
370    /// List all [`ObjectPath`]s under the given [`ObjectPath`].
371    ///
372    /// This will only list the [`ObjectPath`] for the `MethodCall` messages
373    /// (see [`add_method_call`]).
374    ///
375    /// [`add_method_call`]: #method.add_method_call
376    /// [`ObjectPath`]: dbus_message_parser::value::ObjectPath
377    pub async fn list_method_call(&self, object_path: ObjectPath) -> DBusResult<HashSet<String>> {
378        let (sender, receiver) = channel();
379        let command = Command::ListMethodCall(object_path, sender);
380        self.command_sender.unbounded_send(command)?;
381        let list = receiver.await?;
382        Ok(list)
383    }
384
385    /// Close the DBus connection.
386    pub fn close(&self) -> DBusResult<()> {
387        self.command_sender.unbounded_send(Command::Close)?;
388        Ok(())
389    }
390
391    /// Get the current path of the DBus daemon.
392    pub fn get_address(&self) -> &Address {
393        self.address.as_ref()
394    }
395}