event_manager/
manager.rs

1// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0 OR BSD-3-Clause
3
4use std::mem::size_of;
5
6use vmm_sys_util::epoll::EpollEvent;
7#[cfg(feature = "remote_endpoint")]
8use vmm_sys_util::epoll::{ControlOperation, EventSet};
9
10#[cfg(feature = "remote_endpoint")]
11use super::endpoint::{EventManagerChannel, RemoteEndpoint};
12use super::epoll::EpollWrapper;
13use super::subscribers::Subscribers;
14#[cfg(feature = "remote_endpoint")]
15use super::Errno;
16use super::{Error, EventOps, Events, MutEventSubscriber, Result, SubscriberId, SubscriberOps};
17
18/// Allows event subscribers to be registered, connected to the event loop, and later removed.
19#[derive(Debug)]
20pub struct EventManager<T> {
21    subscribers: Subscribers<T>,
22    epoll_context: EpollWrapper,
23
24    #[cfg(feature = "remote_endpoint")]
25    channel: EventManagerChannel<T>,
26}
27
28/// Maximum capacity of ready events that can be passed when initializing the `EventManager`.
29// This constant is not defined inside the `EventManager` implementation because it would
30// make it really weird to use as the `EventManager` uses generics (S: MutEventSubscriber).
31// That means that when using this const, you could not write
32// EventManager::MAX_READY_EVENTS_CAPACITY because the type `S` could not be inferred.
33//
34// This value is taken from: https://elixir.bootlin.com/linux/latest/source/fs/eventpoll.c#L101
35pub const MAX_READY_EVENTS_CAPACITY: usize = i32::MAX as usize / size_of::<EpollEvent>();
36
37impl<T: MutEventSubscriber> SubscriberOps for EventManager<T> {
38    type Subscriber = T;
39
40    /// Register a subscriber with the event event_manager and returns the associated ID.
41    fn add_subscriber(&mut self, subscriber: T) -> SubscriberId {
42        let subscriber_id = self.subscribers.add(subscriber);
43        self.subscribers
44            .get_mut_unchecked(subscriber_id)
45            // The index is valid because we've just added the subscriber.
46            .init(&mut self.epoll_context.ops_unchecked(subscriber_id));
47        subscriber_id
48    }
49
50    /// Unregisters and returns the subscriber associated with the provided ID.
51    fn remove_subscriber(&mut self, subscriber_id: SubscriberId) -> Result<T> {
52        let subscriber = self
53            .subscribers
54            .remove(subscriber_id)
55            .ok_or(Error::InvalidId)?;
56        self.epoll_context.remove(subscriber_id);
57        Ok(subscriber)
58    }
59
60    /// Return a mutable reference to the subscriber associated with the provided id.
61    fn subscriber_mut(&mut self, subscriber_id: SubscriberId) -> Result<&mut T> {
62        if self.subscribers.contains(subscriber_id) {
63            return Ok(self.subscribers.get_mut_unchecked(subscriber_id));
64        }
65        Err(Error::InvalidId)
66    }
67
68    /// Returns a `EventOps` object for the subscriber associated with the provided ID.
69    fn event_ops(&mut self, subscriber_id: SubscriberId) -> Result<EventOps> {
70        // Check if the subscriber_id is valid.
71        if self.subscribers.contains(subscriber_id) {
72            // The index is valid because the result of `find` was not `None`.
73            return Ok(self.epoll_context.ops_unchecked(subscriber_id));
74        }
75        Err(Error::InvalidId)
76    }
77}
78
79impl<S: MutEventSubscriber> EventManager<S> {
80    const DEFAULT_READY_EVENTS_CAPACITY: usize = 256;
81
82    /// Create a new `EventManger` object.
83    pub fn new() -> Result<Self> {
84        Self::new_with_capacity(Self::DEFAULT_READY_EVENTS_CAPACITY)
85    }
86
87    /// Creates a new `EventManger` object with specified event capacity.
88    ///
89    /// # Arguments
90    ///
91    /// * `ready_events_capacity`: maximum number of ready events to be
92    ///                            processed a single `run`. The maximum value of this
93    ///                            parameter is `EventManager::MAX_READY_EVENTS_CAPACITY`.
94    pub fn new_with_capacity(ready_events_capacity: usize) -> Result<Self> {
95        if ready_events_capacity > MAX_READY_EVENTS_CAPACITY {
96            return Err(Error::InvalidCapacity);
97        }
98
99        let manager = EventManager {
100            subscribers: Subscribers::new(),
101            epoll_context: EpollWrapper::new(ready_events_capacity)?,
102            #[cfg(feature = "remote_endpoint")]
103            channel: EventManagerChannel::new()?,
104        };
105
106        #[cfg(feature = "remote_endpoint")]
107        manager
108            .epoll_context
109            .epoll
110            .ctl(
111                ControlOperation::Add,
112                manager.channel.fd(),
113                EpollEvent::new(EventSet::IN, manager.channel.fd() as u64),
114            )
115            .map_err(|e| Error::Epoll(Errno::from(e)))?;
116        Ok(manager)
117    }
118
119    /// Run the event loop blocking until events are triggered or an error is returned.
120    /// Calls [subscriber.process()](trait.EventSubscriber.html#tymethod.process) for each event.
121    ///
122    /// On success, it returns number of dispatched events or 0 when interrupted by a signal.
123    pub fn run(&mut self) -> Result<usize> {
124        self.run_with_timeout(-1)
125    }
126
127    /// Wait for events for a maximum timeout of `miliseconds` or until an error is returned.
128    /// Calls [subscriber.process()](trait.EventSubscriber.html#tymethod.process) for each event.
129    ///
130    /// On success, it returns number of dispatched events or 0 when interrupted by a signal.
131    pub fn run_with_timeout(&mut self, milliseconds: i32) -> Result<usize> {
132        let event_count = self.epoll_context.poll(milliseconds)?;
133        self.dispatch_events(event_count);
134
135        Ok(event_count)
136    }
137
138    fn dispatch_events(&mut self, event_count: usize) {
139        // EpollEvent doesn't implement Eq or PartialEq, so...
140        let default_event: EpollEvent = EpollEvent::default();
141
142        // Used to record whether there's an endpoint event that needs to be handled.
143        #[cfg(feature = "remote_endpoint")]
144        let mut endpoint_event = None;
145
146        for ev_index in 0..event_count {
147            let event = self.epoll_context.ready_events[ev_index];
148            let fd = event.fd();
149
150            // Check whether this event has been discarded.
151            // EpollWrapper::remove_event() discards an IO event by setting it to default value.
152            if event.events() == default_event.events() && fd == default_event.fd() {
153                continue;
154            }
155
156            if let Some(subscriber_id) = self.epoll_context.subscriber_id(fd) {
157                self.subscribers.get_mut_unchecked(subscriber_id).process(
158                    Events::with_inner(event),
159                    // The `subscriber_id` is valid because we checked it before.
160                    &mut self.epoll_context.ops_unchecked(subscriber_id),
161                );
162            } else {
163                #[cfg(feature = "remote_endpoint")]
164                {
165                    // If we got here, there's a chance the event was triggered by the remote
166                    // endpoint fd. Only check for incoming endpoint events right now, and defer
167                    // actually handling them until all subscriber events have been handled first.
168                    // This prevents subscribers whose events are about to be handled from being
169                    // removed by an endpoint request (or other similar situations).
170                    if fd == self.channel.fd() {
171                        endpoint_event = Some(event);
172                        continue;
173                    }
174                }
175
176                // This should not occur during normal operation.
177                unreachable!("Received event on fd from subscriber that is not registered");
178            }
179        }
180
181        #[cfg(feature = "remote_endpoint")]
182        self.dispatch_endpoint_event(endpoint_event);
183    }
184}
185
186#[cfg(feature = "remote_endpoint")]
187impl<S: MutEventSubscriber> EventManager<S> {
188    /// Return a `RemoteEndpoint` object, that allows interacting with the `EventManager` from a
189    /// different thread. Using `RemoteEndpoint::call_blocking` on the same thread the event loop
190    /// runs on leads to a deadlock.
191    pub fn remote_endpoint(&self) -> RemoteEndpoint<S> {
192        self.channel.remote_endpoint()
193    }
194
195    fn dispatch_endpoint_event(&mut self, endpoint_event: Option<EpollEvent>) {
196        if let Some(event) = endpoint_event {
197            if event.event_set() != EventSet::IN {
198                // This situation is virtually impossible to occur. If it does we have
199                // a programming error in our code.
200                unreachable!();
201            }
202            self.handle_endpoint_calls();
203        }
204    }
205
206    fn handle_endpoint_calls(&mut self) {
207        // Clear the inner event_fd. We don't do anything about an error here at this point.
208        let _ = self.channel.event_fd.read();
209
210        // Process messages. We consider only `Empty` errors can appear here; we don't check
211        // for `Disconnected` errors because we keep at least one clone of `channel.sender` alive
212        // at all times ourselves.
213        while let Ok(msg) = self.channel.receiver.try_recv() {
214            match msg.sender {
215                Some(sender) => {
216                    // We call the inner closure and attempt to send back the result, but can't really do
217                    // anything in case of error here.
218                    let _ = sender.send((msg.fnbox)(self));
219                }
220                None => {
221                    // Just call the function and discard the result.
222                    let _ = (msg.fnbox)(self);
223                }
224            }
225        }
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use super::super::Error;
232    use super::*;
233
234    use std::os::unix::io::{AsRawFd, RawFd};
235    use std::sync::{Arc, Mutex};
236
237    use vmm_sys_util::{epoll::EventSet, eventfd::EventFd};
238
239    struct DummySubscriber {
240        event_fd_1: EventFd,
241        event_fd_2: EventFd,
242
243        // Flags used for checking that the event event_manager called the `process`
244        // function for ev1/ev2.
245        processed_ev1_out: bool,
246        processed_ev2_out: bool,
247        processed_ev1_in: bool,
248
249        // Flags used for driving register/unregister/modify of events from
250        // outside of the `process` function.
251        register_ev2: bool,
252        unregister_ev1: bool,
253        modify_ev1: bool,
254    }
255
256    impl DummySubscriber {
257        fn new() -> Self {
258            DummySubscriber {
259                event_fd_1: EventFd::new(0).unwrap(),
260                event_fd_2: EventFd::new(0).unwrap(),
261                processed_ev1_out: false,
262                processed_ev2_out: false,
263                processed_ev1_in: false,
264                register_ev2: false,
265                unregister_ev1: false,
266                modify_ev1: false,
267            }
268        }
269    }
270
271    impl DummySubscriber {
272        fn register_ev2(&mut self) {
273            self.register_ev2 = true;
274        }
275
276        fn unregister_ev1(&mut self) {
277            self.unregister_ev1 = true;
278        }
279
280        fn modify_ev1(&mut self) {
281            self.modify_ev1 = true;
282        }
283
284        fn processed_ev1_out(&self) -> bool {
285            self.processed_ev1_out
286        }
287
288        fn processed_ev2_out(&self) -> bool {
289            self.processed_ev2_out
290        }
291
292        fn processed_ev1_in(&self) -> bool {
293            self.processed_ev1_in
294        }
295
296        fn reset_state(&mut self) {
297            self.processed_ev1_out = false;
298            self.processed_ev2_out = false;
299            self.processed_ev1_in = false;
300        }
301
302        fn handle_updates(&mut self, event_manager: &mut EventOps) {
303            if self.register_ev2 {
304                event_manager
305                    .add(Events::new(&self.event_fd_2, EventSet::OUT))
306                    .unwrap();
307                self.register_ev2 = false;
308            }
309
310            if self.unregister_ev1 {
311                event_manager
312                    .remove(Events::new_raw(
313                        self.event_fd_1.as_raw_fd(),
314                        EventSet::empty(),
315                    ))
316                    .unwrap();
317                self.unregister_ev1 = false;
318            }
319
320            if self.modify_ev1 {
321                event_manager
322                    .modify(Events::new(&self.event_fd_1, EventSet::IN))
323                    .unwrap();
324                self.modify_ev1 = false;
325            }
326        }
327
328        fn handle_in(&mut self, source: RawFd) {
329            if self.event_fd_1.as_raw_fd() == source {
330                self.processed_ev1_in = true;
331            }
332        }
333
334        fn handle_out(&mut self, source: RawFd) {
335            match source {
336                _ if self.event_fd_1.as_raw_fd() == source => {
337                    self.processed_ev1_out = true;
338                }
339                _ if self.event_fd_2.as_raw_fd() == source => {
340                    self.processed_ev2_out = true;
341                }
342                _ => {}
343            }
344        }
345    }
346
347    impl MutEventSubscriber for DummySubscriber {
348        fn process(&mut self, events: Events, ops: &mut EventOps) {
349            let source = events.fd();
350            let event_set = events.event_set();
351
352            // We only know how to treat EPOLLOUT and EPOLLIN.
353            // If we received anything else just stop processing the event.
354            let all_but_in_out = EventSet::all() - EventSet::OUT - EventSet::IN;
355            if event_set.intersects(all_but_in_out) {
356                return;
357            }
358
359            self.handle_updates(ops);
360
361            match event_set {
362                EventSet::IN => self.handle_in(source),
363                EventSet::OUT => self.handle_out(source),
364                _ => {}
365            }
366        }
367
368        fn init(&mut self, ops: &mut EventOps) {
369            let event = Events::new(&self.event_fd_1, EventSet::OUT);
370            ops.add(event).unwrap();
371        }
372    }
373
374    #[test]
375    fn test_register() {
376        use super::SubscriberOps;
377
378        let mut event_manager = EventManager::<Arc<Mutex<dyn MutEventSubscriber>>>::new().unwrap();
379        let dummy_subscriber = Arc::new(Mutex::new(DummySubscriber::new()));
380
381        event_manager.add_subscriber(dummy_subscriber.clone());
382
383        dummy_subscriber.lock().unwrap().register_ev2();
384
385        // When running the loop the first time, ev1 should be processed, but ev2 shouldn't
386        // because it was just added as part of processing ev1.
387        event_manager.run().unwrap();
388        assert!(dummy_subscriber.lock().unwrap().processed_ev1_out());
389        assert!(!dummy_subscriber.lock().unwrap().processed_ev2_out());
390
391        // Check that both ev1 and ev2 are processed.
392        dummy_subscriber.lock().unwrap().reset_state();
393        event_manager.run().unwrap();
394        assert!(dummy_subscriber.lock().unwrap().processed_ev1_out());
395        assert!(dummy_subscriber.lock().unwrap().processed_ev2_out());
396    }
397
398    #[test]
399    #[should_panic(expected = "FdAlreadyRegistered")]
400    fn test_add_invalid_subscriber() {
401        let mut event_manager = EventManager::<Arc<Mutex<dyn MutEventSubscriber>>>::new().unwrap();
402        let subscriber = Arc::new(Mutex::new(DummySubscriber::new()));
403
404        event_manager.add_subscriber(subscriber.clone());
405        event_manager.add_subscriber(subscriber);
406    }
407
408    // Test that unregistering an event while processing another one works.
409    #[test]
410    fn test_unregister() {
411        let mut event_manager = EventManager::<Arc<Mutex<dyn MutEventSubscriber>>>::new().unwrap();
412        let dummy_subscriber = Arc::new(Mutex::new(DummySubscriber::new()));
413
414        event_manager.add_subscriber(dummy_subscriber.clone());
415
416        // Disable ev1. We should only receive this event once.
417        dummy_subscriber.lock().unwrap().unregister_ev1();
418
419        event_manager.run().unwrap();
420        assert!(dummy_subscriber.lock().unwrap().processed_ev1_out());
421
422        dummy_subscriber.lock().unwrap().reset_state();
423
424        // We expect no events to be available. Let's run with timeout so that run exists.
425        event_manager.run_with_timeout(100).unwrap();
426        assert!(!dummy_subscriber.lock().unwrap().processed_ev1_out());
427    }
428
429    #[test]
430    fn test_modify() {
431        let mut event_manager = EventManager::<Arc<Mutex<dyn MutEventSubscriber>>>::new().unwrap();
432        let dummy_subscriber = Arc::new(Mutex::new(DummySubscriber::new()));
433
434        event_manager.add_subscriber(dummy_subscriber.clone());
435
436        // Modify ev1 so that it waits for EPOLL_IN.
437        dummy_subscriber.lock().unwrap().modify_ev1();
438        event_manager.run().unwrap();
439        assert!(dummy_subscriber.lock().unwrap().processed_ev1_out());
440        assert!(!dummy_subscriber.lock().unwrap().processed_ev2_out());
441
442        dummy_subscriber.lock().unwrap().reset_state();
443
444        // Make sure ev1 is ready for IN so that we don't loop forever.
445        dummy_subscriber
446            .lock()
447            .unwrap()
448            .event_fd_1
449            .write(1)
450            .unwrap();
451
452        event_manager.run().unwrap();
453        assert!(!dummy_subscriber.lock().unwrap().processed_ev1_out());
454        assert!(!dummy_subscriber.lock().unwrap().processed_ev2_out());
455        assert!(dummy_subscriber.lock().unwrap().processed_ev1_in());
456    }
457
458    #[test]
459    fn test_remove_subscriber() {
460        let mut event_manager = EventManager::<Arc<Mutex<dyn MutEventSubscriber>>>::new().unwrap();
461        let dummy_subscriber = Arc::new(Mutex::new(DummySubscriber::new()));
462
463        let subscriber_id = event_manager.add_subscriber(dummy_subscriber.clone());
464        event_manager.run().unwrap();
465        assert!(dummy_subscriber.lock().unwrap().processed_ev1_out());
466
467        dummy_subscriber.lock().unwrap().reset_state();
468
469        event_manager.remove_subscriber(subscriber_id).unwrap();
470
471        // We expect no events to be available. Let's run with timeout so that run exits.
472        event_manager.run_with_timeout(100).unwrap();
473        assert!(!dummy_subscriber.lock().unwrap().processed_ev1_out());
474
475        // Removing the subscriber twice should return an error.
476        assert_eq!(
477            event_manager
478                .remove_subscriber(subscriber_id)
479                .err()
480                .unwrap(),
481            Error::InvalidId
482        );
483    }
484}