use vmm_sys_util::epoll::EpollEvent;
#[cfg(feature = "remote_endpoint")]
use vmm_sys_util::epoll::{ControlOperation, EventSet};
#[cfg(feature = "remote_endpoint")]
use super::endpoint::{EventManagerChannel, RemoteEndpoint};
use super::epoll::EpollWrapper;
use super::subscribers::Subscribers;
#[cfg(feature = "remote_endpoint")]
use super::Errno;
use super::{Error, EventOps, Events, MutEventSubscriber, Result, SubscriberId, SubscriberOps};
pub struct EventManager<T> {
subscribers: Subscribers<T>,
epoll_context: EpollWrapper,
#[cfg(feature = "remote_endpoint")]
channel: EventManagerChannel<T>,
}
impl<T: MutEventSubscriber> SubscriberOps for EventManager<T> {
type Subscriber = T;
fn add_subscriber(&mut self, subscriber: T) -> SubscriberId {
let subscriber_id = self.subscribers.add(subscriber);
self.subscribers
.get_mut_unchecked(subscriber_id)
.init(&mut self.epoll_context.ops_unchecked(subscriber_id));
subscriber_id
}
fn remove_subscriber(&mut self, subscriber_id: SubscriberId) -> Result<T> {
let subscriber = self
.subscribers
.remove(subscriber_id)
.ok_or(Error::InvalidId)?;
self.epoll_context.remove(subscriber_id);
Ok(subscriber)
}
fn subscriber_mut(&mut self, subscriber_id: SubscriberId) -> Result<&mut T> {
if self.subscribers.contains(subscriber_id) {
return Ok(self.subscribers.get_mut_unchecked(subscriber_id));
}
Err(Error::InvalidId)
}
fn event_ops(&mut self, subscriber_id: SubscriberId) -> Result<EventOps> {
if self.subscribers.contains(subscriber_id) {
return Ok(self.epoll_context.ops_unchecked(subscriber_id));
}
Err(Error::InvalidId)
}
}
impl<S: MutEventSubscriber> EventManager<S> {
const DEFAULT_READY_EVENTS_CAPACITY: usize = 256;
pub fn new() -> Result<Self> {
Self::new_with_capacity(Self::DEFAULT_READY_EVENTS_CAPACITY)
}
pub fn new_with_capacity(ready_events_capacity: usize) -> Result<Self> {
let manager = EventManager {
subscribers: Subscribers::new(),
epoll_context: EpollWrapper::new(ready_events_capacity)?,
#[cfg(feature = "remote_endpoint")]
channel: EventManagerChannel::new()?,
};
#[cfg(feature = "remote_endpoint")]
manager
.epoll_context
.epoll
.ctl(
ControlOperation::Add,
manager.channel.fd(),
EpollEvent::new(EventSet::IN, manager.channel.fd() as u64),
)
.map_err(|e| Error::Epoll(Errno::from(e)))?;
Ok(manager)
}
pub fn run(&mut self) -> Result<usize> {
self.run_with_timeout(-1)
}
pub fn run_with_timeout(&mut self, milliseconds: i32) -> Result<usize> {
let event_count = self.epoll_context.poll(milliseconds)?;
self.dispatch_events(event_count);
Ok(event_count)
}
fn dispatch_events(&mut self, event_count: usize) {
let default_event: EpollEvent = EpollEvent::default();
#[cfg(feature = "remote_endpoint")]
let mut endpoint_event = None;
for ev_index in 0..event_count {
let event = self.epoll_context.ready_events[ev_index];
let fd = event.fd();
if event.events() == default_event.events() && fd == default_event.fd() {
continue;
}
if let Some(subscriber_id) = self.epoll_context.subscriber_id(fd) {
self.subscribers.get_mut_unchecked(subscriber_id).process(
Events::with_inner(event),
&mut self.epoll_context.ops_unchecked(subscriber_id),
);
} else {
#[cfg(feature = "remote_endpoint")]
{
if fd == self.channel.fd() {
endpoint_event = Some(event);
continue;
}
}
unreachable!("Received event on fd from subscriber that is not registered");
}
}
#[cfg(feature = "remote_endpoint")]
self.dispatch_endpoint_event(endpoint_event);
}
}
#[cfg(feature = "remote_endpoint")]
impl<S: MutEventSubscriber> EventManager<S> {
pub fn remote_endpoint(&self) -> RemoteEndpoint<S> {
self.channel.remote_endpoint()
}
fn dispatch_endpoint_event(&mut self, endpoint_event: Option<EpollEvent>) {
if let Some(event) = endpoint_event {
if event.event_set() != EventSet::IN {
unreachable!();
}
self.handle_endpoint_calls();
}
}
fn handle_endpoint_calls(&mut self) {
let _ = self.channel.event_fd.read();
while let Ok(msg) = self.channel.receiver.try_recv() {
match msg.sender {
Some(sender) => {
let _ = sender.send((msg.fnbox)(self));
}
None => {
let _ = (msg.fnbox)(self);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::super::Error;
use super::*;
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::{Arc, Mutex};
use vmm_sys_util::{epoll::EventSet, eventfd::EventFd};
struct DummySubscriber {
event_fd_1: EventFd,
event_fd_2: EventFd,
processed_ev1_out: bool,
processed_ev2_out: bool,
processed_ev1_in: bool,
register_ev2: bool,
unregister_ev1: bool,
modify_ev1: bool,
}
impl DummySubscriber {
fn new() -> Self {
DummySubscriber {
event_fd_1: EventFd::new(0).unwrap(),
event_fd_2: EventFd::new(0).unwrap(),
processed_ev1_out: false,
processed_ev2_out: false,
processed_ev1_in: false,
register_ev2: false,
unregister_ev1: false,
modify_ev1: false,
}
}
}
impl DummySubscriber {
fn register_ev2(&mut self) {
self.register_ev2 = true;
}
fn unregister_ev1(&mut self) {
self.unregister_ev1 = true;
}
fn modify_ev1(&mut self) {
self.modify_ev1 = true;
}
fn processed_ev1_out(&self) -> bool {
self.processed_ev1_out
}
fn processed_ev2_out(&self) -> bool {
self.processed_ev2_out
}
fn processed_ev1_in(&self) -> bool {
self.processed_ev1_in
}
fn reset_state(&mut self) {
self.processed_ev1_out = false;
self.processed_ev2_out = false;
self.processed_ev1_in = false;
}
fn handle_updates(&mut self, event_manager: &mut EventOps) {
if self.register_ev2 {
event_manager
.add(Events::new(&self.event_fd_2, EventSet::OUT))
.unwrap();
self.register_ev2 = false;
}
if self.unregister_ev1 {
event_manager
.remove(Events::new_raw(
self.event_fd_1.as_raw_fd(),
EventSet::empty(),
))
.unwrap();
self.unregister_ev1 = false;
}
if self.modify_ev1 {
event_manager
.modify(Events::new(&self.event_fd_1, EventSet::IN))
.unwrap();
self.modify_ev1 = false;
}
}
fn handle_in(&mut self, source: RawFd) {
if self.event_fd_1.as_raw_fd() == source {
self.processed_ev1_in = true;
}
}
fn handle_out(&mut self, source: RawFd) {
match source {
_ if self.event_fd_1.as_raw_fd() == source => {
self.processed_ev1_out = true;
}
_ if self.event_fd_2.as_raw_fd() == source => {
self.processed_ev2_out = true;
}
_ => {}
}
}
}
impl MutEventSubscriber for DummySubscriber {
fn process(&mut self, events: Events, ops: &mut EventOps) {
let source = events.fd();
let event_set = events.event_set();
let all_but_in_out = EventSet::all() - EventSet::OUT - EventSet::IN;
if event_set.intersects(all_but_in_out) {
return;
}
self.handle_updates(ops);
match event_set {
EventSet::IN => self.handle_in(source),
EventSet::OUT => self.handle_out(source),
_ => {}
}
}
fn init(&mut self, ops: &mut EventOps) {
let event = Events::new(&self.event_fd_1, EventSet::OUT);
ops.add(event).unwrap();
}
}
#[test]
fn test_register() {
use super::SubscriberOps;
let mut event_manager = EventManager::<Arc<Mutex<dyn MutEventSubscriber>>>::new().unwrap();
let dummy_subscriber = Arc::new(Mutex::new(DummySubscriber::new()));
event_manager.add_subscriber(dummy_subscriber.clone());
dummy_subscriber.lock().unwrap().register_ev2();
event_manager.run().unwrap();
assert_eq!(dummy_subscriber.lock().unwrap().processed_ev1_out(), true);
assert_eq!(dummy_subscriber.lock().unwrap().processed_ev2_out(), false);
dummy_subscriber.lock().unwrap().reset_state();
event_manager.run().unwrap();
assert_eq!(dummy_subscriber.lock().unwrap().processed_ev1_out(), true);
assert_eq!(dummy_subscriber.lock().unwrap().processed_ev2_out(), true);
}
#[test]
#[should_panic(expected = "FdAlreadyRegistered")]
fn test_add_invalid_subscriber() {
use std::os::unix::io::FromRawFd;
let mut event_manager = EventManager::<Arc<Mutex<dyn MutEventSubscriber>>>::new().unwrap();
let subscriber = Arc::new(Mutex::new(DummySubscriber::new()));
event_manager.add_subscriber(subscriber.clone());
let invalid_subscriber = Arc::new(Mutex::new(DummySubscriber::new()));
invalid_subscriber.lock().unwrap().event_fd_1 = unsafe {
EventFd::from_raw_fd(subscriber.lock().unwrap().event_fd_1.as_raw_fd() as RawFd)
};
event_manager.add_subscriber(invalid_subscriber);
}
#[test]
fn test_unregister() {
let mut event_manager = EventManager::<Arc<Mutex<dyn MutEventSubscriber>>>::new().unwrap();
let dummy_subscriber = Arc::new(Mutex::new(DummySubscriber::new()));
event_manager.add_subscriber(dummy_subscriber.clone());
dummy_subscriber.lock().unwrap().unregister_ev1();
event_manager.run().unwrap();
assert_eq!(dummy_subscriber.lock().unwrap().processed_ev1_out(), true);
dummy_subscriber.lock().unwrap().reset_state();
event_manager.run_with_timeout(100).unwrap();
assert_eq!(dummy_subscriber.lock().unwrap().processed_ev1_out(), false);
}
#[test]
fn test_modify() {
let mut event_manager = EventManager::<Arc<Mutex<dyn MutEventSubscriber>>>::new().unwrap();
let dummy_subscriber = Arc::new(Mutex::new(DummySubscriber::new()));
event_manager.add_subscriber(dummy_subscriber.clone());
dummy_subscriber.lock().unwrap().modify_ev1();
event_manager.run().unwrap();
assert_eq!(dummy_subscriber.lock().unwrap().processed_ev1_out(), true);
assert_eq!(dummy_subscriber.lock().unwrap().processed_ev2_out(), false);
dummy_subscriber.lock().unwrap().reset_state();
dummy_subscriber
.lock()
.unwrap()
.event_fd_1
.write(1)
.unwrap();
event_manager.run().unwrap();
assert_eq!(dummy_subscriber.lock().unwrap().processed_ev1_out(), false);
assert_eq!(dummy_subscriber.lock().unwrap().processed_ev2_out(), false);
assert_eq!(dummy_subscriber.lock().unwrap().processed_ev1_in(), true);
}
#[test]
fn test_remove_subscriber() {
let mut event_manager = EventManager::<Arc<Mutex<dyn MutEventSubscriber>>>::new().unwrap();
let dummy_subscriber = Arc::new(Mutex::new(DummySubscriber::new()));
let subscriber_id = event_manager.add_subscriber(dummy_subscriber.clone());
event_manager.run().unwrap();
assert_eq!(dummy_subscriber.lock().unwrap().processed_ev1_out(), true);
dummy_subscriber.lock().unwrap().reset_state();
event_manager.remove_subscriber(subscriber_id).unwrap();
event_manager.run_with_timeout(100).unwrap();
assert_eq!(dummy_subscriber.lock().unwrap().processed_ev1_out(), false);
assert_eq!(
event_manager
.remove_subscriber(subscriber_id)
.err()
.unwrap(),
Error::InvalidId
);
}
}