dbs_utils/
epoll_manager.rs1use anyhow::{anyhow, Result};
7use std::sync::{Arc, Mutex};
8
9pub use event_manager::{
10 Error, EventManager, EventOps, EventSet, Events, MutEventSubscriber, RemoteEndpoint,
11 SubscriberId, SubscriberOps,
12};
13
14pub type EpollSubscriber = Box<dyn MutEventSubscriber + Send>;
16
17type EpollManagerImpl = Arc<Mutex<EventManager<EpollSubscriber>>>;
18
19#[derive(Clone)]
31pub struct EpollManager {
32 pub mgr: EpollManagerImpl,
33 endpoint: Arc<Mutex<RemoteEndpoint<EpollSubscriber>>>,
34}
35
36impl EpollManager {
37 pub fn add_subscriber(&self, handler: EpollSubscriber) -> SubscriberId {
39 let _ = self.endpoint.lock().unwrap().kick();
40 if let Ok(mut mgr) = self.mgr.try_lock() {
41 mgr.add_subscriber(handler)
42 } else {
43 return self
44 .endpoint
45 .lock()
46 .unwrap()
47 .call_blocking::<_, _, Error>(move |mgr| Ok(mgr.add_subscriber(handler)))
48 .unwrap();
49 }
50 }
51
52 pub fn remove_subscriber(&mut self, subscriber_id: SubscriberId) -> Result<EpollSubscriber> {
54 let mut mgr = self
55 .mgr
56 .lock()
57 .map_err(|e| anyhow!("EventManager lock fail. {:?}", e))?;
58 mgr.remove_subscriber(subscriber_id)
59 .map_err(|e| anyhow!("remove subscriber err. {:?}", e))
60 }
61
62 pub fn add_event(
64 &self,
65 subscriber_id: SubscriberId,
66 events: Events,
67 ) -> std::result::Result<(), Error> {
68 loop {
69 let _ = self.endpoint.lock().unwrap().kick();
70 if let Ok(mut mgr) = self.mgr.try_lock() {
71 let mut ops = mgr.event_ops(subscriber_id)?;
72 return ops.add(events);
73 }
74 }
75 }
76
77 pub fn handle_events(&self, timeout: i32) -> std::result::Result<usize, Error> {
79 let mut guard = self.mgr.lock().unwrap();
81
82 guard.run_with_timeout(timeout)
83 }
84}
85
86impl Default for EpollManager {
87 fn default() -> Self {
89 let mgr = EventManager::new().expect("epoll_manager: failed create new instance");
90 let endpoint = Arc::new(Mutex::new(mgr.remote_endpoint()));
91
92 EpollManager {
93 mgr: Arc::new(Mutex::new(mgr)),
94 endpoint,
95 }
96 }
97}
98
99#[cfg(test)]
100mod tests {
101 use super::*;
102 use std::os::unix::io::AsRawFd;
103 use vmm_sys_util::{epoll::EventSet, eventfd::EventFd};
104
105 struct DummySubscriber {
106 pub event: EventFd,
107 }
108
109 impl DummySubscriber {
110 fn new() -> Self {
111 Self {
112 event: EventFd::new(0).unwrap(),
113 }
114 }
115 }
116
117 impl MutEventSubscriber for DummySubscriber {
118 fn process(&mut self, events: Events, _ops: &mut EventOps) {
119 let source = events.fd();
120 let event_set = events.event_set();
121 assert_ne!(source, self.event.as_raw_fd());
122 match event_set {
123 EventSet::IN => {
124 unreachable!()
125 }
126 EventSet::OUT => {
127 self.event.read().unwrap();
128 }
129 _ => {
130 unreachable!()
131 }
132 }
133 }
134
135 fn init(&mut self, _ops: &mut EventOps) {}
136 }
137
138 #[test]
139 fn test_epoll_manager() {
140 let mut epoll_manager = EpollManager::default();
141 let epoll_manager_clone = epoll_manager.clone();
142 let thread = std::thread::spawn(move || loop {
143 let count = epoll_manager_clone.handle_events(-1).unwrap();
144 if count == 0 {
145 continue;
146 }
147 assert_eq!(count, 1);
148 break;
149 });
150 let handler = DummySubscriber::new();
151 let event = handler.event.try_clone().unwrap();
152 let id = epoll_manager.add_subscriber(Box::new(handler));
153
154 thread.join().unwrap();
155
156 epoll_manager
157 .add_event(id, Events::new(&event, EventSet::OUT))
158 .unwrap();
159 event.write(1).unwrap();
160
161 let epoll_manager_clone = epoll_manager.clone();
162 let thread = std::thread::spawn(move || loop {
163 let count = epoll_manager_clone.handle_events(-1).unwrap();
164 if count == 0 {
165 continue;
166 }
167 assert_eq!(count, 2);
168 break;
169 });
170
171 thread.join().unwrap();
172 epoll_manager.remove_subscriber(id).unwrap();
173 }
174}