bluez_async/
messagestream.rs1use dbus::Message;
2use dbus::nonblock::{MsgMatch, SyncConnection};
3use futures::Stream;
4use futures::channel::mpsc::UnboundedReceiver;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::task::{Context, Poll};
8
9pub struct MessageStream {
12 msg_match: Option<MsgMatch>,
13 events: UnboundedReceiver<Message>,
14 connection: Arc<SyncConnection>,
15}
16
17impl MessageStream {
18 pub fn new(msg_match: MsgMatch, connection: Arc<SyncConnection>) -> Self {
19 let (msg_match, events) = msg_match.msg_stream();
20 Self {
21 msg_match: Some(msg_match),
22 events,
23 connection,
24 }
25 }
26}
27
28impl Stream for MessageStream {
29 type Item = Message;
30
31 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32 Pin::new(&mut self.events).poll_next(cx)
33 }
34}
35
36impl Drop for MessageStream {
37 fn drop(&mut self) {
38 let connection = self.connection.clone();
39 let msg_match = self.msg_match.take().unwrap();
40 tokio::spawn(async move { connection.remove_match(msg_match.token()).await.unwrap() });
41 }
42}