bluez_async/
messagestream.rs

1use dbus::Message;
2use dbus::nonblock::{MsgMatch, SyncConnection};
3use futures::Stream;
4use futures::channel::mpsc::UnboundedReceiver;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::task::{Context, Poll};
8
9/// Wrapper for a stream of D-Bus messages which automatically removes the `MsgMatch` from the D-Bus
10/// connection when it is dropped.
11pub struct MessageStream {
12    msg_match: Option<MsgMatch>,
13    events: UnboundedReceiver<Message>,
14    connection: Arc<SyncConnection>,
15}
16
17impl MessageStream {
18    pub fn new(msg_match: MsgMatch, connection: Arc<SyncConnection>) -> Self {
19        let (msg_match, events) = msg_match.msg_stream();
20        Self {
21            msg_match: Some(msg_match),
22            events,
23            connection,
24        }
25    }
26}
27
28impl Stream for MessageStream {
29    type Item = Message;
30
31    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32        Pin::new(&mut self.events).poll_next(cx)
33    }
34}
35
36impl Drop for MessageStream {
37    fn drop(&mut self) {
38        let connection = self.connection.clone();
39        let msg_match = self.msg_match.take().unwrap();
40        tokio::spawn(async move { connection.remove_match(msg_match.token()).await.unwrap() });
41    }
42}