event_manager/
lib.rs

1// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0 OR BSD-3-Clause
3
4//! Event Manager traits and implementation.
5#![deny(missing_debug_implementations)]
6#![deny(missing_docs)]
7#![cfg_attr(docsrs, feature(doc_auto_cfg))]
8
9use std::cell::RefCell;
10use std::ops::{Deref, DerefMut};
11use std::rc::Rc;
12use std::result;
13use std::sync::{Arc, Mutex};
14
15use vmm_sys_util::errno::Error as Errno;
16
17/// The type of epoll events we can monitor a file descriptor for.
18pub use vmm_sys_util::epoll::EventSet;
19
20mod epoll;
21mod events;
22mod manager;
23mod subscribers;
24#[doc(hidden)]
25#[cfg(feature = "test_utilities")]
26pub mod utilities;
27
28pub use events::{EventOps, Events};
29pub use manager::{EventManager, MAX_READY_EVENTS_CAPACITY};
30
31#[cfg(feature = "remote_endpoint")]
32mod endpoint;
33#[cfg(feature = "remote_endpoint")]
34pub use endpoint::RemoteEndpoint;
35
36/// Error conditions that may appear during `EventManager` related operations.
37#[derive(Debug, Eq, PartialEq)]
38pub enum Error {
39    #[cfg(feature = "remote_endpoint")]
40    /// Cannot send message on channel.
41    ChannelSend,
42    #[cfg(feature = "remote_endpoint")]
43    /// Cannot receive message on channel.
44    ChannelRecv,
45    #[cfg(feature = "remote_endpoint")]
46    /// Operation on `eventfd` failed.
47    EventFd(Errno),
48    /// Operation on `libc::epoll` failed.
49    Epoll(Errno),
50    // TODO: should we allow fds to be registered multiple times?
51    /// The fd is already associated with an existing subscriber.
52    FdAlreadyRegistered,
53    /// The Subscriber ID does not exist or is no longer associated with a Subscriber.
54    InvalidId,
55    /// The ready list capacity passed to `EventManager::new` is invalid.
56    InvalidCapacity,
57}
58
59impl std::fmt::Display for Error {
60    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
61        match self {
62            #[cfg(feature = "remote_endpoint")]
63            Error::ChannelSend => write!(
64                f,
65                "event_manager: failed to send message to remote endpoint"
66            ),
67            #[cfg(feature = "remote_endpoint")]
68            Error::ChannelRecv => write!(
69                f,
70                "event_manager: failed to receive message from remote endpoint"
71            ),
72            #[cfg(feature = "remote_endpoint")]
73            Error::EventFd(e) => write!(
74                f,
75                "event_manager: failed to manage EventFd file descriptor: {}",
76                e
77            ),
78            Error::Epoll(e) => write!(
79                f,
80                "event_manager: failed to manage epoll file descriptor: {}",
81                e
82            ),
83            Error::FdAlreadyRegistered => write!(
84                f,
85                "event_manager: file descriptor has already been registered"
86            ),
87            Error::InvalidId => write!(f, "event_manager: invalid subscriber Id"),
88            Error::InvalidCapacity => write!(f, "event_manager: invalid ready_list capacity"),
89        }
90    }
91}
92
93impl std::error::Error for Error {
94    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
95        match self {
96            #[cfg(feature = "remote_endpoint")]
97            Error::ChannelSend => None,
98            #[cfg(feature = "remote_endpoint")]
99            Error::ChannelRecv => None,
100            #[cfg(feature = "remote_endpoint")]
101            Error::EventFd(e) => Some(e),
102            Error::Epoll(e) => Some(e),
103            Error::FdAlreadyRegistered => None,
104            Error::InvalidId => None,
105            Error::InvalidCapacity => None,
106        }
107    }
108}
109
110/// Generic result type that may return `EventManager` errors.
111pub type Result<T> = result::Result<T, Error>;
112
113/// Opaque object that uniquely represents a subscriber registered with an `EventManager`.
114#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
115pub struct SubscriberId(u64);
116
117/// Allows the interaction between an `EventManager` and different event subscribers that do not
118/// require a `&mut self` borrow to perform `init` and `process`.
119///
120/// Any type implementing this also trivially implements `MutEventSubscriber`. The main role of
121/// `EventSubscriber` is to allow wrappers such as `Arc` and `Rc` to implement `EventSubscriber`
122/// themselves when the inner type is also an implementor.
123pub trait EventSubscriber {
124    /// Process `events` triggered in the event manager loop.
125    ///
126    /// Optionally, the subscriber can use `ops` to update the events it monitors.
127    fn process(&self, events: Events, ops: &mut EventOps);
128
129    /// Initialization called by the [EventManager](struct.EventManager.html) when the subscriber
130    /// is registered.
131    ///
132    /// The subscriber is expected to use `ops` to register the events it wants to monitor.
133    fn init(&self, ops: &mut EventOps);
134}
135
136/// Allows the interaction between an `EventManager` and different event subscribers. Methods
137/// are invoked with a mutable `self` borrow.
138pub trait MutEventSubscriber {
139    /// Process `events` triggered in the event manager loop.
140    ///
141    /// Optionally, the subscriber can use `ops` to update the events it monitors.
142    fn process(&mut self, events: Events, ops: &mut EventOps);
143
144    /// Initialization called by the [EventManager](struct.EventManager.html) when the subscriber
145    /// is registered.
146    ///
147    /// The subscriber is expected to use `ops` to register the events it wants to monitor.
148    fn init(&mut self, ops: &mut EventOps);
149}
150
151/// API that allows users to add, remove, and interact with registered subscribers.
152pub trait SubscriberOps {
153    /// Subscriber type for which the operations apply.
154    type Subscriber: MutEventSubscriber;
155
156    /// Registers a new subscriber and returns the ID associated with it.
157    ///
158    /// # Panics
159    ///
160    /// This function might panic if the subscriber is already registered. Whether a panic
161    /// is triggered depends on the implementation of
162    /// [Subscriber::init()](trait.EventSubscriber.html#tymethod.init).
163    ///
164    /// Typically, in the `init` function, the subscriber adds fds to its interest list. The same
165    /// fd cannot be added twice and the `EventManager` will return
166    /// [Error::FdAlreadyRegistered](enum.Error.html). Using `unwrap` in init in this situation
167    /// triggers a panic.
168    fn add_subscriber(&mut self, subscriber: Self::Subscriber) -> SubscriberId;
169
170    /// Removes the subscriber corresponding to `subscriber_id` from the watch list.
171    fn remove_subscriber(&mut self, subscriber_id: SubscriberId) -> Result<Self::Subscriber>;
172
173    /// Returns a mutable reference to the subscriber corresponding to `subscriber_id`.
174    fn subscriber_mut(&mut self, subscriber_id: SubscriberId) -> Result<&mut Self::Subscriber>;
175
176    /// Creates an event operations wrapper for the subscriber corresponding to `subscriber_id`.
177    ///
178    ///  The event operations can be used to update the events monitored by the subscriber.
179    fn event_ops(&mut self, subscriber_id: SubscriberId) -> Result<EventOps>;
180}
181
182impl<T: EventSubscriber + ?Sized> EventSubscriber for Arc<T> {
183    fn process(&self, events: Events, ops: &mut EventOps) {
184        self.deref().process(events, ops);
185    }
186
187    fn init(&self, ops: &mut EventOps) {
188        self.deref().init(ops);
189    }
190}
191
192impl<T: EventSubscriber + ?Sized> MutEventSubscriber for Arc<T> {
193    fn process(&mut self, events: Events, ops: &mut EventOps) {
194        self.deref().process(events, ops);
195    }
196
197    fn init(&mut self, ops: &mut EventOps) {
198        self.deref().init(ops);
199    }
200}
201
202impl<T: EventSubscriber + ?Sized> EventSubscriber for Rc<T> {
203    fn process(&self, events: Events, ops: &mut EventOps) {
204        self.deref().process(events, ops);
205    }
206
207    fn init(&self, ops: &mut EventOps) {
208        self.deref().init(ops);
209    }
210}
211
212impl<T: EventSubscriber + ?Sized> MutEventSubscriber for Rc<T> {
213    fn process(&mut self, events: Events, ops: &mut EventOps) {
214        self.deref().process(events, ops);
215    }
216
217    fn init(&mut self, ops: &mut EventOps) {
218        self.deref().init(ops);
219    }
220}
221
222impl<T: MutEventSubscriber + ?Sized> EventSubscriber for RefCell<T> {
223    fn process(&self, events: Events, ops: &mut EventOps) {
224        self.borrow_mut().process(events, ops);
225    }
226
227    fn init(&self, ops: &mut EventOps) {
228        self.borrow_mut().init(ops);
229    }
230}
231
232impl<T: MutEventSubscriber + ?Sized> MutEventSubscriber for RefCell<T> {
233    fn process(&mut self, events: Events, ops: &mut EventOps) {
234        self.borrow_mut().process(events, ops);
235    }
236
237    fn init(&mut self, ops: &mut EventOps) {
238        self.borrow_mut().init(ops);
239    }
240}
241
242impl<T: MutEventSubscriber + ?Sized> EventSubscriber for Mutex<T> {
243    fn process(&self, events: Events, ops: &mut EventOps) {
244        self.lock().unwrap().process(events, ops);
245    }
246
247    fn init(&self, ops: &mut EventOps) {
248        self.lock().unwrap().init(ops);
249    }
250}
251
252impl<T: MutEventSubscriber + ?Sized> MutEventSubscriber for Mutex<T> {
253    fn process(&mut self, events: Events, ops: &mut EventOps) {
254        // If another user of this mutex panicked while holding the mutex, then
255        // we terminate the process.
256        self.get_mut().unwrap().process(events, ops);
257    }
258
259    fn init(&mut self, ops: &mut EventOps) {
260        // If another user of this mutex panicked while holding the mutex, then
261        // we terminate the process.
262        self.get_mut().unwrap().init(ops);
263    }
264}
265
266impl<T: EventSubscriber + ?Sized> EventSubscriber for Box<T> {
267    fn process(&self, events: Events, ops: &mut EventOps) {
268        self.deref().process(events, ops);
269    }
270
271    fn init(&self, ops: &mut EventOps) {
272        self.deref().init(ops);
273    }
274}
275
276impl<T: MutEventSubscriber + ?Sized> MutEventSubscriber for Box<T> {
277    fn process(&mut self, events: Events, ops: &mut EventOps) {
278        self.deref_mut().process(events, ops);
279    }
280
281    fn init(&mut self, ops: &mut EventOps) {
282        self.deref_mut().init(ops);
283    }
284}