1use std::mem::size_of;
5
6use vmm_sys_util::epoll::EpollEvent;
7#[cfg(feature = "remote_endpoint")]
8use vmm_sys_util::epoll::{ControlOperation, EventSet};
9
10#[cfg(feature = "remote_endpoint")]
11use super::endpoint::{EventManagerChannel, RemoteEndpoint};
12use super::epoll::EpollWrapper;
13use super::subscribers::Subscribers;
14#[cfg(feature = "remote_endpoint")]
15use super::Errno;
16use super::{Error, EventOps, Events, MutEventSubscriber, Result, SubscriberId, SubscriberOps};
17
18#[derive(Debug)]
20pub struct EventManager<T> {
21 subscribers: Subscribers<T>,
22 epoll_context: EpollWrapper,
23
24 #[cfg(feature = "remote_endpoint")]
25 channel: EventManagerChannel<T>,
26}
27
28pub const MAX_READY_EVENTS_CAPACITY: usize = i32::MAX as usize / size_of::<EpollEvent>();
36
37impl<T: MutEventSubscriber> SubscriberOps for EventManager<T> {
38 type Subscriber = T;
39
40 fn add_subscriber(&mut self, subscriber: T) -> SubscriberId {
42 let subscriber_id = self.subscribers.add(subscriber);
43 self.subscribers
44 .get_mut_unchecked(subscriber_id)
45 .init(&mut self.epoll_context.ops_unchecked(subscriber_id));
47 subscriber_id
48 }
49
50 fn remove_subscriber(&mut self, subscriber_id: SubscriberId) -> Result<T> {
52 let subscriber = self
53 .subscribers
54 .remove(subscriber_id)
55 .ok_or(Error::InvalidId)?;
56 self.epoll_context.remove(subscriber_id);
57 Ok(subscriber)
58 }
59
60 fn subscriber_mut(&mut self, subscriber_id: SubscriberId) -> Result<&mut T> {
62 if self.subscribers.contains(subscriber_id) {
63 return Ok(self.subscribers.get_mut_unchecked(subscriber_id));
64 }
65 Err(Error::InvalidId)
66 }
67
68 fn event_ops(&mut self, subscriber_id: SubscriberId) -> Result<EventOps> {
70 if self.subscribers.contains(subscriber_id) {
72 return Ok(self.epoll_context.ops_unchecked(subscriber_id));
74 }
75 Err(Error::InvalidId)
76 }
77}
78
79impl<S: MutEventSubscriber> EventManager<S> {
80 const DEFAULT_READY_EVENTS_CAPACITY: usize = 256;
81
82 pub fn new() -> Result<Self> {
84 Self::new_with_capacity(Self::DEFAULT_READY_EVENTS_CAPACITY)
85 }
86
87 pub fn new_with_capacity(ready_events_capacity: usize) -> Result<Self> {
95 if ready_events_capacity > MAX_READY_EVENTS_CAPACITY {
96 return Err(Error::InvalidCapacity);
97 }
98
99 let manager = EventManager {
100 subscribers: Subscribers::new(),
101 epoll_context: EpollWrapper::new(ready_events_capacity)?,
102 #[cfg(feature = "remote_endpoint")]
103 channel: EventManagerChannel::new()?,
104 };
105
106 #[cfg(feature = "remote_endpoint")]
107 manager
108 .epoll_context
109 .epoll
110 .ctl(
111 ControlOperation::Add,
112 manager.channel.fd(),
113 EpollEvent::new(EventSet::IN, manager.channel.fd() as u64),
114 )
115 .map_err(|e| Error::Epoll(Errno::from(e)))?;
116 Ok(manager)
117 }
118
119 pub fn run(&mut self) -> Result<usize> {
124 self.run_with_timeout(-1)
125 }
126
127 pub fn run_with_timeout(&mut self, milliseconds: i32) -> Result<usize> {
132 let event_count = self.epoll_context.poll(milliseconds)?;
133 self.dispatch_events(event_count);
134
135 Ok(event_count)
136 }
137
138 fn dispatch_events(&mut self, event_count: usize) {
139 let default_event: EpollEvent = EpollEvent::default();
141
142 #[cfg(feature = "remote_endpoint")]
144 let mut endpoint_event = None;
145
146 for ev_index in 0..event_count {
147 let event = self.epoll_context.ready_events[ev_index];
148 let fd = event.fd();
149
150 if event.events() == default_event.events() && fd == default_event.fd() {
153 continue;
154 }
155
156 if let Some(subscriber_id) = self.epoll_context.subscriber_id(fd) {
157 self.subscribers.get_mut_unchecked(subscriber_id).process(
158 Events::with_inner(event),
159 &mut self.epoll_context.ops_unchecked(subscriber_id),
161 );
162 } else {
163 #[cfg(feature = "remote_endpoint")]
164 {
165 if fd == self.channel.fd() {
171 endpoint_event = Some(event);
172 continue;
173 }
174 }
175
176 unreachable!("Received event on fd from subscriber that is not registered");
178 }
179 }
180
181 #[cfg(feature = "remote_endpoint")]
182 self.dispatch_endpoint_event(endpoint_event);
183 }
184}
185
186#[cfg(feature = "remote_endpoint")]
187impl<S: MutEventSubscriber> EventManager<S> {
188 pub fn remote_endpoint(&self) -> RemoteEndpoint<S> {
192 self.channel.remote_endpoint()
193 }
194
195 fn dispatch_endpoint_event(&mut self, endpoint_event: Option<EpollEvent>) {
196 if let Some(event) = endpoint_event {
197 if event.event_set() != EventSet::IN {
198 unreachable!();
201 }
202 self.handle_endpoint_calls();
203 }
204 }
205
206 fn handle_endpoint_calls(&mut self) {
207 let _ = self.channel.event_fd.read();
209
210 while let Ok(msg) = self.channel.receiver.try_recv() {
214 match msg.sender {
215 Some(sender) => {
216 let _ = sender.send((msg.fnbox)(self));
219 }
220 None => {
221 let _ = (msg.fnbox)(self);
223 }
224 }
225 }
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use super::super::Error;
232 use super::*;
233
234 use std::os::unix::io::{AsRawFd, RawFd};
235 use std::sync::{Arc, Mutex};
236
237 use vmm_sys_util::{epoll::EventSet, eventfd::EventFd};
238
239 struct DummySubscriber {
240 event_fd_1: EventFd,
241 event_fd_2: EventFd,
242
243 processed_ev1_out: bool,
246 processed_ev2_out: bool,
247 processed_ev1_in: bool,
248
249 register_ev2: bool,
252 unregister_ev1: bool,
253 modify_ev1: bool,
254 }
255
256 impl DummySubscriber {
257 fn new() -> Self {
258 DummySubscriber {
259 event_fd_1: EventFd::new(0).unwrap(),
260 event_fd_2: EventFd::new(0).unwrap(),
261 processed_ev1_out: false,
262 processed_ev2_out: false,
263 processed_ev1_in: false,
264 register_ev2: false,
265 unregister_ev1: false,
266 modify_ev1: false,
267 }
268 }
269 }
270
271 impl DummySubscriber {
272 fn register_ev2(&mut self) {
273 self.register_ev2 = true;
274 }
275
276 fn unregister_ev1(&mut self) {
277 self.unregister_ev1 = true;
278 }
279
280 fn modify_ev1(&mut self) {
281 self.modify_ev1 = true;
282 }
283
284 fn processed_ev1_out(&self) -> bool {
285 self.processed_ev1_out
286 }
287
288 fn processed_ev2_out(&self) -> bool {
289 self.processed_ev2_out
290 }
291
292 fn processed_ev1_in(&self) -> bool {
293 self.processed_ev1_in
294 }
295
296 fn reset_state(&mut self) {
297 self.processed_ev1_out = false;
298 self.processed_ev2_out = false;
299 self.processed_ev1_in = false;
300 }
301
302 fn handle_updates(&mut self, event_manager: &mut EventOps) {
303 if self.register_ev2 {
304 event_manager
305 .add(Events::new(&self.event_fd_2, EventSet::OUT))
306 .unwrap();
307 self.register_ev2 = false;
308 }
309
310 if self.unregister_ev1 {
311 event_manager
312 .remove(Events::new_raw(
313 self.event_fd_1.as_raw_fd(),
314 EventSet::empty(),
315 ))
316 .unwrap();
317 self.unregister_ev1 = false;
318 }
319
320 if self.modify_ev1 {
321 event_manager
322 .modify(Events::new(&self.event_fd_1, EventSet::IN))
323 .unwrap();
324 self.modify_ev1 = false;
325 }
326 }
327
328 fn handle_in(&mut self, source: RawFd) {
329 if self.event_fd_1.as_raw_fd() == source {
330 self.processed_ev1_in = true;
331 }
332 }
333
334 fn handle_out(&mut self, source: RawFd) {
335 match source {
336 _ if self.event_fd_1.as_raw_fd() == source => {
337 self.processed_ev1_out = true;
338 }
339 _ if self.event_fd_2.as_raw_fd() == source => {
340 self.processed_ev2_out = true;
341 }
342 _ => {}
343 }
344 }
345 }
346
347 impl MutEventSubscriber for DummySubscriber {
348 fn process(&mut self, events: Events, ops: &mut EventOps) {
349 let source = events.fd();
350 let event_set = events.event_set();
351
352 let all_but_in_out = EventSet::all() - EventSet::OUT - EventSet::IN;
355 if event_set.intersects(all_but_in_out) {
356 return;
357 }
358
359 self.handle_updates(ops);
360
361 match event_set {
362 EventSet::IN => self.handle_in(source),
363 EventSet::OUT => self.handle_out(source),
364 _ => {}
365 }
366 }
367
368 fn init(&mut self, ops: &mut EventOps) {
369 let event = Events::new(&self.event_fd_1, EventSet::OUT);
370 ops.add(event).unwrap();
371 }
372 }
373
374 #[test]
375 fn test_register() {
376 use super::SubscriberOps;
377
378 let mut event_manager = EventManager::<Arc<Mutex<dyn MutEventSubscriber>>>::new().unwrap();
379 let dummy_subscriber = Arc::new(Mutex::new(DummySubscriber::new()));
380
381 event_manager.add_subscriber(dummy_subscriber.clone());
382
383 dummy_subscriber.lock().unwrap().register_ev2();
384
385 event_manager.run().unwrap();
388 assert!(dummy_subscriber.lock().unwrap().processed_ev1_out());
389 assert!(!dummy_subscriber.lock().unwrap().processed_ev2_out());
390
391 dummy_subscriber.lock().unwrap().reset_state();
393 event_manager.run().unwrap();
394 assert!(dummy_subscriber.lock().unwrap().processed_ev1_out());
395 assert!(dummy_subscriber.lock().unwrap().processed_ev2_out());
396 }
397
398 #[test]
399 #[should_panic(expected = "FdAlreadyRegistered")]
400 fn test_add_invalid_subscriber() {
401 let mut event_manager = EventManager::<Arc<Mutex<dyn MutEventSubscriber>>>::new().unwrap();
402 let subscriber = Arc::new(Mutex::new(DummySubscriber::new()));
403
404 event_manager.add_subscriber(subscriber.clone());
405 event_manager.add_subscriber(subscriber);
406 }
407
408 #[test]
410 fn test_unregister() {
411 let mut event_manager = EventManager::<Arc<Mutex<dyn MutEventSubscriber>>>::new().unwrap();
412 let dummy_subscriber = Arc::new(Mutex::new(DummySubscriber::new()));
413
414 event_manager.add_subscriber(dummy_subscriber.clone());
415
416 dummy_subscriber.lock().unwrap().unregister_ev1();
418
419 event_manager.run().unwrap();
420 assert!(dummy_subscriber.lock().unwrap().processed_ev1_out());
421
422 dummy_subscriber.lock().unwrap().reset_state();
423
424 event_manager.run_with_timeout(100).unwrap();
426 assert!(!dummy_subscriber.lock().unwrap().processed_ev1_out());
427 }
428
429 #[test]
430 fn test_modify() {
431 let mut event_manager = EventManager::<Arc<Mutex<dyn MutEventSubscriber>>>::new().unwrap();
432 let dummy_subscriber = Arc::new(Mutex::new(DummySubscriber::new()));
433
434 event_manager.add_subscriber(dummy_subscriber.clone());
435
436 dummy_subscriber.lock().unwrap().modify_ev1();
438 event_manager.run().unwrap();
439 assert!(dummy_subscriber.lock().unwrap().processed_ev1_out());
440 assert!(!dummy_subscriber.lock().unwrap().processed_ev2_out());
441
442 dummy_subscriber.lock().unwrap().reset_state();
443
444 dummy_subscriber
446 .lock()
447 .unwrap()
448 .event_fd_1
449 .write(1)
450 .unwrap();
451
452 event_manager.run().unwrap();
453 assert!(!dummy_subscriber.lock().unwrap().processed_ev1_out());
454 assert!(!dummy_subscriber.lock().unwrap().processed_ev2_out());
455 assert!(dummy_subscriber.lock().unwrap().processed_ev1_in());
456 }
457
458 #[test]
459 fn test_remove_subscriber() {
460 let mut event_manager = EventManager::<Arc<Mutex<dyn MutEventSubscriber>>>::new().unwrap();
461 let dummy_subscriber = Arc::new(Mutex::new(DummySubscriber::new()));
462
463 let subscriber_id = event_manager.add_subscriber(dummy_subscriber.clone());
464 event_manager.run().unwrap();
465 assert!(dummy_subscriber.lock().unwrap().processed_ev1_out());
466
467 dummy_subscriber.lock().unwrap().reset_state();
468
469 event_manager.remove_subscriber(subscriber_id).unwrap();
470
471 event_manager.run_with_timeout(100).unwrap();
473 assert!(!dummy_subscriber.lock().unwrap().processed_ev1_out());
474
475 assert_eq!(
477 event_manager
478 .remove_subscriber(subscriber_id)
479 .err()
480 .unwrap(),
481 Error::InvalidId
482 );
483 }
484}