use dbus::Message;
use dbus::nonblock::{MsgMatch, SyncConnection};
use futures::Stream;
use futures::channel::mpsc::UnboundedReceiver;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
pub struct MessageStream {
msg_match: Option<MsgMatch>,
events: UnboundedReceiver<Message>,
connection: Arc<SyncConnection>,
}
impl MessageStream {
pub fn new(msg_match: MsgMatch, connection: Arc<SyncConnection>) -> Self {
let (msg_match, events) = msg_match.msg_stream();
Self {
msg_match: Some(msg_match),
events,
connection,
}
}
}
impl Stream for MessageStream {
type Item = Message;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.events).poll_next(cx)
}
}
impl Drop for MessageStream {
fn drop(&mut self) {
let connection = self.connection.clone();
let msg_match = self.msg_match.take().unwrap();
tokio::spawn(async move { connection.remove_match(msg_match.token()).await.unwrap() });
}
}