tokio_devd/
monitor.rs

1use crate::event::Event;
2use crate::socket::DevdStream;
3use devd_rs::parse_devd_event;
4use futures_core::stream::Stream;
5use std::io;
6use std::pin::Pin;
7use std::task::{ready, Context as TaskContext, Poll};
8use tokio::io::{AsyncBufRead, BufReader};
9
10/// Builds a `MonitorSocket` that can match a (optional) specified subsystem.
11#[derive(Debug, Default)]
12pub struct MonitorBuilder {
13    subsystem_filters: Vec<String>,
14}
15
16impl MonitorBuilder {
17    pub fn new() -> Self {
18        MonitorBuilder::default()
19    }
20
21    pub fn match_subsystem(mut self, subsystem: &str) -> io::Result<Self> {
22        self.subsystem_filters.push(subsystem.to_owned());
23        Ok(self)
24    }
25
26    pub fn listen(self) -> io::Result<MonitorSocket> {
27        MonitorSocket::new(self.subsystem_filters)
28    }
29}
30
31/// An async monitor of `devd` events.
32#[derive(Debug)]
33pub struct MonitorSocket {
34    reader: BufReader<DevdStream>,
35    subsystem_filters: Vec<String>,
36}
37
38impl MonitorSocket {
39    fn new(subsystem_filters: Vec<String>) -> io::Result<Self> {
40        let devd_stream = DevdStream::new()?;
41        Ok(Self {
42            reader: BufReader::new(devd_stream),
43            subsystem_filters,
44        })
45    }
46}
47
48impl Stream for MonitorSocket {
49    type Item = io::Result<Event>;
50
51    fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
52        loop {
53            let mut reader = Pin::new(&mut self.reader);
54
55            let buf = ready!(reader.as_mut().poll_fill_buf(cx))?;
56
57            if buf.is_empty() {
58                return Poll::Ready(None);
59            }
60
61            let len = buf.len();
62            let event_result = match std::str::from_utf8(buf) {
63                Ok(line_str) => parse_devd_event(line_str)
64                    .map(|raw_event| Event { inner: raw_event })
65                    .map_err(|e| e.into()),
66                Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)),
67            };
68
69            reader.as_mut().consume(len);
70            match &event_result {
71                Ok(event) => {
72                    if self.subsystem_filters.is_empty() {
73                        return Poll::Ready(Some(event_result));
74                    }
75
76                    if let devd_rs::Event::Notify { subsystem, .. } = &event.inner {
77                        if self.subsystem_filters.contains(subsystem) {
78                            return Poll::Ready(Some(event_result));
79                        }
80                    }
81
82                    continue;
83                }
84                Err(_) => {
85                    return Poll::Ready(Some(event_result));
86                }
87            }
88        }
89    }
90}