use std::os::unix::io::{AsRawFd, RawFd};
use libudev::Event;
use nix::poll::{poll, PollFd, PollFlags};
use tokio::sync::{
broadcast::{error::TryRecvError, Receiver},
mpsc::UnboundedSender,
};
use crate::{
engine::UdevEngineEvent,
stratis::errors::{StratisError, StratisResult},
};
pub fn udev_thread(
sender: UnboundedSender<UdevEngineEvent>,
mut should_exit: Receiver<()>,
) -> StratisResult<()> {
let context = libudev::Context::new()?;
let mut udev = UdevMonitor::create(&context)?;
let mut pollers = [PollFd::new(udev.as_raw_fd(), PollFlags::POLLIN)];
loop {
match poll(&mut pollers, 100)? {
0 => {
match should_exit.try_recv() {
Ok(()) => {
info!("udev thread was notified to exit");
return Ok(());
}
Err(TryRecvError::Closed) | Err(TryRecvError::Lagged(_)) => {
return Err(StratisError::Error(
"udev processing thread can no longer be notified to exit; shutting down...".to_string()
));
}
_ => (),
};
}
_ => {
if let Some(ref e) = udev.poll() {
if let Err(e) = sender.send(UdevEngineEvent::from(e)) {
warn!(
"udev event could not be sent to engine thread: {}; the \
engine was not notified of this udev event",
e,
);
}
}
}
}
}
}
struct UdevMonitor<'a> {
socket: libudev::MonitorSocket<'a>,
}
impl<'a> UdevMonitor<'a> {
fn create(context: &'a libudev::Context) -> StratisResult<UdevMonitor<'a>> {
let mut monitor = libudev::Monitor::new(&context)?;
monitor.match_subsystem("block")?;
let socket = monitor.listen()?;
Ok(UdevMonitor { socket })
}
pub fn poll(&mut self) -> Option<Event<'a>> {
self.socket.receive_event()
}
}
impl<'a> AsRawFd for UdevMonitor<'a> {
fn as_raw_fd(&self) -> RawFd {
self.socket.as_raw_fd()
}
}