dbus_async/dbus.rs
1use crate::{
2 command::Command,
3 connection::Connection,
4 error::DBusResult,
5 introspect::add_introspect,
6 peer::add_peer,
7 stream::Stream,
8 {DBusError, DBusNameFlag},
9};
10use dbus_message_parser::{
11 match_rule::MatchRule,
12 message::{Message, MessageType},
13 value::{Bus, Interface, ObjectPath, Value},
14};
15use dbus_server_address_parser::Address;
16use futures::channel::{
17 mpsc::{unbounded, Receiver as MpscReceiver, Sender as MpscSender, UnboundedSender},
18 oneshot::channel,
19};
20use std::{collections::HashSet, convert::TryInto, env::var, sync::Arc};
21use tokio::{spawn, task::JoinHandle};
22
23/// This struct represents an object to communicate with the DBus daemon.
24#[derive(Clone)]
25pub struct DBus {
26 command_sender: UnboundedSender<Command>,
27 address: Arc<Address>,
28}
29
30impl DBus {
31 /// Connect to the session DBus.
32 ///
33 /// If the first argument (`introspectable`) is `true` then the Peer is [introspectable].
34 /// If the second argument (`peer`) is `true` then the Peer has the
35 /// [`org.freedesktop.DBus.Peer`].
36 ///
37 /// The `DBUS_SESSION_BUS_ADDRESS` environment variable **have to** be defined.
38 ///
39 /// [introspectable]: https://dbus.freedesktop.org/doc/dbus-specification.html#standard-interfaces-introspectable
40 /// [`org.freedesktop.DBus.Peer`]: https://dbus.freedesktop.org/doc/dbus-specification.html#standard-interfaces-peer
41 pub async fn session(introspectable: bool, peer: bool) -> DBusResult<(DBus, JoinHandle<()>)> {
42 if let Ok(path) = var("DBUS_SESSION_BUS_ADDRESS") {
43 DBus::new(&path, introspectable, peer).await
44 } else {
45 // It could not connect to any socket
46 Err(DBusError::DBusSessionBusAddress)
47 }
48 }
49
50 /// Connect to the system DBus.
51 ///
52 /// If the first argument (`introspectable`) is `true` then the Peer is [introspectable].
53 /// If the second argument (`peer`) is `true` then the Peer has the
54 /// [`org.freedesktop.DBus.Peer`].
55 ///
56 /// If there `DBUS_SYSTEM_BUS_ADDRESS` environment variable is defined then this path will be
57 /// used, else `unix:path=/var/run/dbus/system_bus_socket`.
58 ///
59 /// [introspectable]: https://dbus.freedesktop.org/doc/dbus-specification.html#standard-interfaces-introspectable
60 /// [`org.freedesktop.DBus.Peer`]: https://dbus.freedesktop.org/doc/dbus-specification.html#standard-interfaces-peer
61 pub async fn system(introspectable: bool, peer: bool) -> DBusResult<(DBus, JoinHandle<()>)> {
62 let path = if let Ok(path) = var("DBUS_SYSTEM_BUS_ADDRESS") {
63 path
64 } else {
65 "unix:path=/var/run/dbus/system_bus_socket".to_string()
66 };
67 DBus::new(&path, introspectable, peer).await
68 }
69
70 /// Connect to the specific (`addressses`) DBus daemon.
71 ///
72 /// If the second argument (`introspectable`) is `true` then the Peer is [introspectable].
73 /// If the third argument (`peer`) is `true` then the Peer has the
74 /// [`org.freedesktop.DBus.Peer`].
75 ///
76 /// [introspectable]: https://dbus.freedesktop.org/doc/dbus-specification.html#standard-interfaces-introspectable
77 /// [`org.freedesktop.DBus.Peer`]: https://dbus.freedesktop.org/doc/dbus-specification.html#standard-interfaces-peer
78 pub async fn new(
79 addressses: &str,
80 introspectable: bool,
81 peer: bool,
82 ) -> DBusResult<(DBus, JoinHandle<()>)> {
83 let (command_sender, command_receiver) = unbounded::<Command>();
84
85 // Create and spawn the stream and sink task.
86 let (address, stream) = Stream::new(addressses).await?;
87 let (message_sink, message_stream) = stream.start();
88
89 // Spawn the connection task.
90 let connection = Connection::from(command_receiver, message_sink, message_stream);
91 let connection_handle = spawn(connection.run());
92
93 let address = Arc::new(address);
94 let dbus = DBus {
95 command_sender,
96 address,
97 };
98
99 if introspectable {
100 add_introspect(dbus.clone())?;
101 }
102
103 if peer {
104 add_peer(dbus.clone())?;
105 }
106
107 // Send the Hello message.
108 let msg = dbus.call_hello().await?;
109 if let MessageType::Error = msg.get_type() {
110 let error = msg.get_error_name().unwrap();
111 Err(DBusError::Hello(error.clone()))
112 } else {
113 Ok((dbus, connection_handle))
114 }
115 }
116
117 /// Send a [`Message`](dbus_message_parser::message::Message).
118 pub fn send(&self, msg: Message) -> DBusResult<()> {
119 // Try to send the message.
120 let command = Command::SendMessage(msg);
121 self.command_sender.unbounded_send(command)?;
122 Ok(())
123 }
124
125 /// Send a [`Message`] and wait for a response.
126 ///
127 /// The [`Message`] have to be a `MessageCall`.
128 ///
129 /// [`Message`]: dbus_message_parser::message::Message
130 pub async fn call(&self, msg: Message) -> DBusResult<Message> {
131 // Create a oneshot channel for the response
132 let (msg_sender, msg_receiver) = channel::<Message>();
133 // Try to send the message.
134 let command = Command::SendMessageOneshot(msg, msg_sender);
135 self.command_sender.unbounded_send(command)?;
136 let msg = msg_receiver.await?;
137 Ok(msg)
138 }
139
140 /// Send a [`Message`] and specify a channel, where the response should be send.
141 ///
142 /// This function returns the serial number of the [`Message`]. This is useful, where the the
143 /// response and signals have to be processed in order.
144 ///
145 /// [`Message`]: dbus_message_parser::message::Message
146 pub async fn call_reply_serial(
147 &self,
148 msg: Message,
149 msg_sender: MpscSender<Message>,
150 ) -> DBusResult<u32> {
151 let (reply_serial_sender, reply_serial_receiver) = channel::<u32>();
152 // Try to send the message.
153 let command = Command::SendMessageMpcs(msg, reply_serial_sender, msg_sender);
154 self.command_sender.unbounded_send(command)?;
155 let reply_serial = reply_serial_receiver.await?;
156 Ok(reply_serial)
157 }
158
159 /// Call the [`Hello()`] method of the DBus daemon.
160 ///
161 /// [`Hello()`]: https://dbus.freedesktop.org/doc/dbus-specification.html#bus-messages-hello
162 async fn call_hello(&self) -> DBusResult<Message> {
163 let msg = Message::method_call(
164 "org.freedesktop.DBus".try_into().unwrap(),
165 "/org/freedesktop/DBus".try_into().unwrap(),
166 "org.freedesktop.DBus".try_into().unwrap(),
167 "Hello".try_into().unwrap(),
168 );
169 self.call(msg).await
170 }
171
172 /// Register a name for the peer. This calls the [`RequestName(String, UInt32)`] method of the
173 /// DBus daemon.
174 ///
175 /// [`RequestName(String, UInt32)`]: https://dbus.freedesktop.org/doc/dbus-specification.html#bus-messages-request-name
176 pub async fn request_name(&self, name: Bus, flags: &DBusNameFlag) -> DBusResult<Message> {
177 let mut msg = Message::method_call(
178 "org.freedesktop.DBus".try_into().unwrap(),
179 "/org/freedesktop/DBus".try_into().unwrap(),
180 "org.freedesktop.DBus".try_into().unwrap(),
181 "RequestName".try_into().unwrap(),
182 );
183 msg.add_value(Value::String(name.into()));
184 msg.add_value(Value::Uint32(flags.bits()));
185 self.call(msg).await
186 }
187
188 /// Add a channel to a specific [`ObjectPath`].
189 ///
190 /// The channel will receive all [`MethodCall`] messages for the specified [`ObjectPath`].
191 ///
192 /// If there is already channel added for this [`ObjectPath`] then it will be replace. So the
193 /// old channel will not receive any [`MethodCall`] messages for the [`ObjectPath`] anymore.
194 ///
195 /// [`ObjectPath`]: dbus_message_parser::value::ObjectPath
196 /// [`MethodCall`]: dbus_message_parser::message::MessageType::MethodCall
197 pub fn add_method_call(
198 &self,
199 object_path: ObjectPath,
200 sender: MpscSender<Message>,
201 ) -> DBusResult<()> {
202 let command = Command::AddMethodCall(object_path, sender);
203 self.command_sender.unbounded_send(command)?;
204 Ok(())
205 }
206
207 /// Delete the channel for a specific [`ObjectPath`] (see [`add_method_call`]).
208 ///
209 /// Even if there is no channel for this [`ObjectPath`] the function will return `Ok()`.
210 ///
211 /// [`add_method_call`]: #method.add_method_call
212 /// [`ObjectPath`]: dbus_message_parser::value::ObjectPath
213 pub fn delete_object_path(&self, object_path: ObjectPath) -> DBusResult<()> {
214 let command = Command::DeleteMethodCall(object_path);
215 self.command_sender.unbounded_send(command)?;
216 Ok(())
217 }
218
219 /// Delete the channel for every [`ObjectPath`], which the given sender is connected to
220 /// (see [`add_method_call`]).
221 ///
222 /// [`add_method_call`]: #method.add_method_call
223 pub fn delete_method_call_sender(&self, sender: MpscSender<Message>) -> DBusResult<()> {
224 let command = Command::DeleteMethodCallSender(sender);
225 self.command_sender.unbounded_send(command)?;
226 Ok(())
227 }
228
229 /// Delete the channel for every [`ObjectPath`], which the given sender is connected to
230 /// (see [`add_method_call`]).
231 ///
232 /// [`add_method_call`]: #method.add_method_call
233 /// [`ObjectPath`]: dbus_message_parser::value::ObjectPath
234 pub fn delete_method_call_receiver(&self, receiver: MpscReceiver<Message>) -> DBusResult<()> {
235 let command = Command::DeleteMethodCallReceiver(receiver);
236 self.command_sender.unbounded_send(command)?;
237 Ok(())
238 }
239
240 /// Add a channel to a specific [`Interface`].
241 ///
242 /// The channel will **only** receive all `MethodCall` messages for the specified [`Interface`],
243 /// if there is no channel by the [`ObjectPath`].
244 ///
245 /// If there is already channel added for this [`Interface`] then it will be replace. So the old
246 /// channel will not receive any `MethodCall` messages for the [`Interface`] anymore.
247 ///
248 /// [`Interface`]: dbus_message_parser::value::Interface
249 /// [`ObjectPath`]: dbus_message_parser::value::ObjectPath
250 pub fn add_method_call_interface(
251 &self,
252 interface: Interface,
253 sender: MpscSender<Message>,
254 ) -> DBusResult<()> {
255 let command = Command::AddMethodCallInterface(interface, sender);
256 self.command_sender.unbounded_send(command)?;
257 Ok(())
258 }
259
260 /// Delete the channel for every [`Interface`], which the given sender is connected to
261 /// (see [`add_method_call_interface`]).
262 ///
263 /// [`add_method_call_interface`]: #method.add_method_call_interface
264 /// [`Interface`]: dbus_message_parser::value::Interface
265 pub fn delete_method_call_interface_sender(
266 &self,
267 sender: MpscSender<Message>,
268 ) -> DBusResult<()> {
269 let command = Command::DeleteMethodCallInterfaceSender(sender);
270 self.command_sender.unbounded_send(command)?;
271 Ok(())
272 }
273
274 /// Delete the channel for every [`Interface`], which the given sender is connected to
275 /// (see [`add_method_call_interface`]).
276 ///
277 /// [`add_method_call_interface`]: #method.add_method_call_interface
278 /// [`Interface`]: dbus_message_parser::value::Interface
279 pub fn delete_method_call_interface_receiver(
280 &self,
281 receiver: MpscReceiver<Message>,
282 ) -> DBusResult<()> {
283 let command = Command::DeleteMethodCallInterfaceReceiver(receiver);
284 self.command_sender.unbounded_send(command)?;
285 Ok(())
286 }
287
288 /// Add a channel to a specific [`ObjectPath`].
289 ///
290 /// The channel will receive all [`Signal`] messages for the specified [`ObjectPath`].
291 ///
292 /// The second argument specify a closure to filter the [`Message`]. If the closure returns true
293 /// then the [`Message`] will not be send to the channel.
294 ///
295 /// There can be multiple channels, which will receive message of the specific [`ObjectPath`].
296 ///
297 /// [`Signal`]: dbus_message_parser::message::MessageType::Signal
298 /// [`Message`]: dbus_message_parser::message::Message
299 /// [`ObjectPath`]: dbus_message_parser::value::ObjectPath
300 pub fn add_signal(
301 &self,
302 object_path: ObjectPath,
303 filter: Option<fn(&Message) -> bool>,
304 sender: MpscSender<Message>,
305 ) -> DBusResult<()> {
306 let command = Command::AddSignal(object_path, filter, sender);
307 self.command_sender.unbounded_send(command)?;
308 Ok(())
309 }
310
311 /// Delete the channel for every [`ObjectPath`], which the given sender is connected to
312 /// (see [`add_signal`]).
313 ///
314 /// [`add_signal`]: #method.add_signal
315 /// [`ObjectPath`]: dbus_message_parser::value::ObjectPath
316 pub fn delete_signal_sender(&self, sender: MpscSender<Message>) -> DBusResult<()> {
317 let command = Command::DeleteSignalSender(sender);
318 self.command_sender.unbounded_send(command)?;
319 Ok(())
320 }
321
322 /// Delete the channel for every [`ObjectPath`], which the given sender is connected to
323 /// (see [`add_signal`]).
324 ///
325 /// [`add_signal`]: #method.add_signal
326 /// [`ObjectPath`]: dbus_message_parser::value::ObjectPath
327 pub fn delete_signal_receiver(&self, receiver: MpscReceiver<Message>) -> DBusResult<()> {
328 let command = Command::DeleteSignalReceiver(receiver);
329 self.command_sender.unbounded_send(command)?;
330 Ok(())
331 }
332
333 /// Add a channel to a specific [`MatchRule`]s.
334 ///
335 /// The channel will receive all message, which match the given [`MatchRule`]s.
336 ///
337 /// [`MatchRule`]: dbus_message_parser::match_rule::MatchRule
338 pub fn add_match_rules(
339 &self,
340 match_rules: Vec<MatchRule>,
341 sender: MpscSender<Message>,
342 ) -> DBusResult<()> {
343 let command = Command::AddMatchRules(match_rules, sender);
344 self.command_sender.unbounded_send(command)?;
345 Ok(())
346 }
347
348 /// Delete the channel for every [`MatchRule`]s, which the given sender is connected to
349 /// (see [`add_match_rules`]).
350 ///
351 /// [`add_match_rules`]: #method.add_match_rules
352 /// [`MatchRule`]: dbus_message_parser::match_rule::MatchRule
353 pub fn delete_match_rules_sender(&self, sender: MpscSender<Message>) -> DBusResult<()> {
354 let command = Command::DeleteMatchRulesSender(sender);
355 self.command_sender.unbounded_send(command)?;
356 Ok(())
357 }
358
359 /// Delete the channel for every [`MatchRule`]s, which the given sender is connected to
360 /// (see [`add_match_rules`]).
361 ///
362 /// [`add_match_rules`]: #method.add_match_rules
363 /// [`MatchRule`]: dbus_message_parser::match_rule::MatchRule
364 pub fn delete_match_rules_receiver(&self, receiver: MpscReceiver<Message>) -> DBusResult<()> {
365 let command = Command::DeleteMatchRulesReceiver(receiver);
366 self.command_sender.unbounded_send(command)?;
367 Ok(())
368 }
369
370 /// List all [`ObjectPath`]s under the given [`ObjectPath`].
371 ///
372 /// This will only list the [`ObjectPath`] for the `MethodCall` messages
373 /// (see [`add_method_call`]).
374 ///
375 /// [`add_method_call`]: #method.add_method_call
376 /// [`ObjectPath`]: dbus_message_parser::value::ObjectPath
377 pub async fn list_method_call(&self, object_path: ObjectPath) -> DBusResult<HashSet<String>> {
378 let (sender, receiver) = channel();
379 let command = Command::ListMethodCall(object_path, sender);
380 self.command_sender.unbounded_send(command)?;
381 let list = receiver.await?;
382 Ok(list)
383 }
384
385 /// Close the DBus connection.
386 pub fn close(&self) -> DBusResult<()> {
387 self.command_sender.unbounded_send(Command::Close)?;
388 Ok(())
389 }
390
391 /// Get the current path of the DBus daemon.
392 pub fn get_address(&self) -> &Address {
393 self.address.as_ref()
394 }
395}