1use crate::event::Event;
2use crate::socket::DevdStream;
3use devd_rs::parse_devd_event;
4use futures_core::stream::Stream;
5use std::io;
6use std::pin::Pin;
7use std::task::{ready, Context as TaskContext, Poll};
8use tokio::io::{AsyncBufRead, BufReader};
9
10#[derive(Debug, Default)]
12pub struct MonitorBuilder {
13 subsystem_filters: Vec<String>,
14}
15
16impl MonitorBuilder {
17 pub fn new() -> Self {
18 MonitorBuilder::default()
19 }
20
21 pub fn match_subsystem(mut self, subsystem: &str) -> io::Result<Self> {
22 self.subsystem_filters.push(subsystem.to_owned());
23 Ok(self)
24 }
25
26 pub fn listen(self) -> io::Result<MonitorSocket> {
27 MonitorSocket::new(self.subsystem_filters)
28 }
29}
30
31#[derive(Debug)]
33pub struct MonitorSocket {
34 reader: BufReader<DevdStream>,
35 subsystem_filters: Vec<String>,
36}
37
38impl MonitorSocket {
39 fn new(subsystem_filters: Vec<String>) -> io::Result<Self> {
40 let devd_stream = DevdStream::new()?;
41 Ok(Self {
42 reader: BufReader::new(devd_stream),
43 subsystem_filters,
44 })
45 }
46}
47
48impl Stream for MonitorSocket {
49 type Item = io::Result<Event>;
50
51 fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
52 loop {
53 let mut reader = Pin::new(&mut self.reader);
54
55 let buf = ready!(reader.as_mut().poll_fill_buf(cx))?;
56
57 if buf.is_empty() {
58 return Poll::Ready(None);
59 }
60
61 let len = buf.len();
62 let event_result = match std::str::from_utf8(buf) {
63 Ok(line_str) => parse_devd_event(line_str)
64 .map(|raw_event| Event { inner: raw_event })
65 .map_err(|e| e.into()),
66 Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)),
67 };
68
69 reader.as_mut().consume(len);
70 match &event_result {
71 Ok(event) => {
72 if self.subsystem_filters.is_empty() {
73 return Poll::Ready(Some(event_result));
74 }
75
76 if let devd_rs::Event::Notify { subsystem, .. } = &event.inner {
77 if self.subsystem_filters.contains(subsystem) {
78 return Poll::Ready(Some(event_result));
79 }
80 }
81
82 continue;
83 }
84 Err(_) => {
85 return Poll::Ready(Some(event_result));
86 }
87 }
88 }
89 }
90}