dbs_utils/
epoll_manager.rs

1// Copyright 2020 Alibaba Cloud. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! A simple wrapper over event_manager::EventManager to solve possible deadlock.
5
6use anyhow::{anyhow, Result};
7use std::sync::{Arc, Mutex};
8
9pub use event_manager::{
10    Error, EventManager, EventOps, EventSet, Events, MutEventSubscriber, RemoteEndpoint,
11    SubscriberId, SubscriberOps,
12};
13
14/// Type of epoll subscriber.
15pub type EpollSubscriber = Box<dyn MutEventSubscriber + Send>;
16
17type EpollManagerImpl = Arc<Mutex<EventManager<EpollSubscriber>>>;
18
19/// A wrapper struct over EventManager to solve possible deadlock.
20///
21/// It's a rather tough topic to deal with the epoll event manager in rust way.
22/// The event_manager::EventManager is designed for single-threaded environment and it leaves
23/// the task for concurrent access to the clients.
24/// There are two types of threads involved, epoll worker thread and vCPU threads.
25/// To reduce overhead, the epoll worker thread calls epoll::wait() without timeout, so the
26/// worker thread will hold the EpollManagerImpl::Mutex for undetermined periods. When the vCPU
27/// threads tries to activate virtio devices, they need to acquire the same EpollManagerImpl::Mutex.
28/// Thus the vCPU threads may block for an undetermined time. To solve this issue, we perform
29/// an kick()/try_lock() loop to wake up the epoll worker thread from sleeping.
30#[derive(Clone)]
31pub struct EpollManager {
32    pub mgr: EpollManagerImpl,
33    endpoint: Arc<Mutex<RemoteEndpoint<EpollSubscriber>>>,
34}
35
36impl EpollManager {
37    /// Add a new epoll event subscriber.
38    pub fn add_subscriber(&self, handler: EpollSubscriber) -> SubscriberId {
39        let _ = self.endpoint.lock().unwrap().kick();
40        if let Ok(mut mgr) = self.mgr.try_lock() {
41            mgr.add_subscriber(handler)
42        } else {
43            return self
44                .endpoint
45                .lock()
46                .unwrap()
47                .call_blocking::<_, _, Error>(move |mgr| Ok(mgr.add_subscriber(handler)))
48                .unwrap();
49        }
50    }
51
52    /// Remove a given epoll event subscriber.
53    pub fn remove_subscriber(&mut self, subscriber_id: SubscriberId) -> Result<EpollSubscriber> {
54        let mut mgr = self
55            .mgr
56            .lock()
57            .map_err(|e| anyhow!("EventManager lock fail. {:?}", e))?;
58        mgr.remove_subscriber(subscriber_id)
59            .map_err(|e| anyhow!("remove subscriber err. {:?}", e))
60    }
61
62    /// Add an epoll event to be monitored.
63    pub fn add_event(
64        &self,
65        subscriber_id: SubscriberId,
66        events: Events,
67    ) -> std::result::Result<(), Error> {
68        loop {
69            let _ = self.endpoint.lock().unwrap().kick();
70            if let Ok(mut mgr) = self.mgr.try_lock() {
71                let mut ops = mgr.event_ops(subscriber_id)?;
72                return ops.add(events);
73            }
74        }
75    }
76
77    /// Run the epoll polling loop.
78    pub fn handle_events(&self, timeout: i32) -> std::result::Result<usize, Error> {
79        // Do not expect poisoned lock.
80        let mut guard = self.mgr.lock().unwrap();
81
82        guard.run_with_timeout(timeout)
83    }
84}
85
86impl Default for EpollManager {
87    /// Create a new epoll manager.
88    fn default() -> Self {
89        let mgr = EventManager::new().expect("epoll_manager: failed create new instance");
90        let endpoint = Arc::new(Mutex::new(mgr.remote_endpoint()));
91
92        EpollManager {
93            mgr: Arc::new(Mutex::new(mgr)),
94            endpoint,
95        }
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use super::*;
102    use std::os::unix::io::AsRawFd;
103    use vmm_sys_util::{epoll::EventSet, eventfd::EventFd};
104
105    struct DummySubscriber {
106        pub event: EventFd,
107    }
108
109    impl DummySubscriber {
110        fn new() -> Self {
111            Self {
112                event: EventFd::new(0).unwrap(),
113            }
114        }
115    }
116
117    impl MutEventSubscriber for DummySubscriber {
118        fn process(&mut self, events: Events, _ops: &mut EventOps) {
119            let source = events.fd();
120            let event_set = events.event_set();
121            assert_ne!(source, self.event.as_raw_fd());
122            match event_set {
123                EventSet::IN => {
124                    unreachable!()
125                }
126                EventSet::OUT => {
127                    self.event.read().unwrap();
128                }
129                _ => {
130                    unreachable!()
131                }
132            }
133        }
134
135        fn init(&mut self, _ops: &mut EventOps) {}
136    }
137
138    #[test]
139    fn test_epoll_manager() {
140        let mut epoll_manager = EpollManager::default();
141        let epoll_manager_clone = epoll_manager.clone();
142        let thread = std::thread::spawn(move || loop {
143            let count = epoll_manager_clone.handle_events(-1).unwrap();
144            if count == 0 {
145                continue;
146            }
147            assert_eq!(count, 1);
148            break;
149        });
150        let handler = DummySubscriber::new();
151        let event = handler.event.try_clone().unwrap();
152        let id = epoll_manager.add_subscriber(Box::new(handler));
153
154        thread.join().unwrap();
155
156        epoll_manager
157            .add_event(id, Events::new(&event, EventSet::OUT))
158            .unwrap();
159        event.write(1).unwrap();
160
161        let epoll_manager_clone = epoll_manager.clone();
162        let thread = std::thread::spawn(move || loop {
163            let count = epoll_manager_clone.handle_events(-1).unwrap();
164            if count == 0 {
165                continue;
166            }
167            assert_eq!(count, 2);
168            break;
169        });
170
171        thread.join().unwrap();
172        epoll_manager.remove_subscriber(id).unwrap();
173    }
174}