use anyhow::{anyhow, Result};
use std::sync::{Arc, Mutex};
pub use event_manager::{
Error, EventManager, EventOps, EventSet, Events, MutEventSubscriber, RemoteEndpoint,
SubscriberId, SubscriberOps,
};
pub type EpollSubscriber = Box<dyn MutEventSubscriber + Send>;
type EpollManagerImpl = Arc<Mutex<EventManager<EpollSubscriber>>>;
#[derive(Clone)]
pub struct EpollManager {
pub mgr: EpollManagerImpl,
endpoint: Arc<Mutex<RemoteEndpoint<EpollSubscriber>>>,
}
impl EpollManager {
pub fn add_subscriber(&self, handler: EpollSubscriber) -> SubscriberId {
let _ = self.endpoint.lock().unwrap().kick();
if let Ok(mut mgr) = self.mgr.try_lock() {
mgr.add_subscriber(handler)
} else {
return self
.endpoint
.lock()
.unwrap()
.call_blocking::<_, _, Error>(move |mgr| Ok(mgr.add_subscriber(handler)))
.unwrap();
}
}
pub fn remove_subscriber(&mut self, subscriber_id: SubscriberId) -> Result<EpollSubscriber> {
let mut mgr = self
.mgr
.lock()
.map_err(|e| anyhow!("EventManager lock fail. {:?}", e))?;
mgr.remove_subscriber(subscriber_id)
.map_err(|e| anyhow!("remove subscriber err. {:?}", e))
}
pub fn add_event(
&self,
subscriber_id: SubscriberId,
events: Events,
) -> std::result::Result<(), Error> {
loop {
let _ = self.endpoint.lock().unwrap().kick();
if let Ok(mut mgr) = self.mgr.try_lock() {
let mut ops = mgr.event_ops(subscriber_id)?;
return ops.add(events);
}
}
}
pub fn handle_events(&self, timeout: i32) -> std::result::Result<usize, Error> {
let mut guard = self.mgr.lock().unwrap();
guard.run_with_timeout(timeout)
}
}
impl Default for EpollManager {
fn default() -> Self {
let mgr = EventManager::new().expect("epoll_manager: failed create new instance");
let endpoint = Arc::new(Mutex::new(mgr.remote_endpoint()));
EpollManager {
mgr: Arc::new(Mutex::new(mgr)),
endpoint,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::os::unix::io::AsRawFd;
use vmm_sys_util::{epoll::EventSet, eventfd::EventFd};
struct DummySubscriber {
pub event: EventFd,
}
impl DummySubscriber {
fn new() -> Self {
Self {
event: EventFd::new(0).unwrap(),
}
}
}
impl MutEventSubscriber for DummySubscriber {
fn process(&mut self, events: Events, _ops: &mut EventOps) {
let source = events.fd();
let event_set = events.event_set();
assert_ne!(source, self.event.as_raw_fd());
match event_set {
EventSet::IN => {
unreachable!()
}
EventSet::OUT => {
self.event.read().unwrap();
}
_ => {
unreachable!()
}
}
}
fn init(&mut self, _ops: &mut EventOps) {}
}
#[test]
fn test_epoll_manager() {
let mut epoll_manager = EpollManager::default();
let epoll_manager_clone = epoll_manager.clone();
let thread = std::thread::spawn(move || loop {
let count = epoll_manager_clone.handle_events(-1).unwrap();
if count == 0 {
continue;
}
assert_eq!(count, 1);
break;
});
let handler = DummySubscriber::new();
let event = handler.event.try_clone().unwrap();
let id = epoll_manager.add_subscriber(Box::new(handler));
thread.join().unwrap();
epoll_manager
.add_event(id, Events::new(&event, EventSet::OUT))
.unwrap();
event.write(1).unwrap();
let epoll_manager_clone = epoll_manager.clone();
let thread = std::thread::spawn(move || loop {
let count = epoll_manager_clone.handle_events(-1).unwrap();
if count == 0 {
continue;
}
assert_eq!(count, 2);
break;
});
thread.join().unwrap();
epoll_manager.remove_subscriber(id).unwrap();
}
}