use std::os::unix::io::{AsRawFd, RawFd};
use libudev::Event;
use nix::poll::{poll, PollFd, PollFlags};
use tokio::{
sync::{
broadcast::{error::TryRecvError, Receiver},
mpsc::UnboundedSender,
},
task::spawn_blocking,
};
use crate::{
engine::UdevEngineEvent,
stratis::errors::{StratisError, StratisResult},
};
pub async fn udev_thread(
sender: UnboundedSender<UdevEngineEvent>,
mut should_exit: Receiver<()>,
) -> StratisResult<()> {
spawn_blocking(move || {
let context = libudev::Context::new()?;
let mut udev = UdevMonitor::create(&context)?;
let mut pollers = [PollFd::new(udev.as_raw_fd(), PollFlags::POLLIN)];
loop {
match poll(&mut pollers, 100)? {
0 => {
match should_exit.try_recv() {
Ok(()) => {
info!("udev thread was notified to exit");
return Ok(());
}
Err(TryRecvError::Closed | TryRecvError::Lagged(_)) => {
return Err(StratisError::Msg(
"udev processing thread can no longer be notified to exit; shutting down...".to_string()
));
}
_ => (),
};
}
_ => {
if let Some(ref e) = udev.poll() {
if let Err(e) = sender.send(UdevEngineEvent::from(e)) {
warn!(
"udev event could not be sent to engine thread: {}; the \
engine was not notified of this udev event",
e,
);
}
}
}
}
}
})
.await??;
Ok(())
}
struct UdevMonitor {
socket: libudev::MonitorSocket,
}
impl UdevMonitor {
fn create(context: &libudev::Context) -> StratisResult<UdevMonitor> {
let mut monitor = libudev::Monitor::new(context)?;
monitor.match_subsystem("block")?;
let socket = monitor.listen()?;
Ok(UdevMonitor { socket })
}
pub fn poll(&mut self) -> Option<Event> {
self.socket.receive_event()
}
}
impl AsRawFd for UdevMonitor {
fn as_raw_fd(&self) -> RawFd {
self.socket.as_raw_fd()
}
}