1pub mod error;
2pub mod raw;
3pub mod sys;
4
5use crate::error::{Error, Result};
6use crate::sys::{
7 BindInterdomainRequest, BindUnboundPortRequest, BindVirqRequest, NotifyRequest,
8 UnbindPortRequest,
9};
10
11use crate::raw::EVENT_CHANNEL_DEVICE;
12use byteorder::{LittleEndian, ReadBytesExt};
13use log::error;
14use std::collections::hash_map::Entry;
15use std::collections::HashMap;
16use std::mem::size_of;
17use std::os::fd::AsRawFd;
18use std::os::raw::c_void;
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::sync::Arc;
21use tokio::fs::{File, OpenOptions};
22use tokio::sync::{Mutex, Notify};
23
24type WakeMap = Arc<Mutex<HashMap<u32, Arc<Notify>>>>;
25
26#[derive(Clone)]
27pub struct EventChannelService {
28 handle: Arc<Mutex<File>>,
29 wakes: WakeMap,
30 process_flag: Arc<AtomicBool>,
31}
32
33pub struct BoundEventChannel {
34 pub local_port: u32,
35 pub receiver: Arc<Notify>,
36 pub service: EventChannelService,
37}
38
39impl BoundEventChannel {
40 pub async fn unmask(&self) -> Result<()> {
41 self.service.unmask(self.local_port).await
42 }
43}
44
45impl Drop for BoundEventChannel {
46 fn drop(&mut self) {
47 let service = self.service.clone();
48 let port = self.local_port;
49 tokio::task::spawn(async move {
50 let _ = service.unbind(port).await;
51 });
52 }
53}
54
55impl EventChannelService {
56 pub async fn open() -> Result<EventChannelService> {
57 let handle = OpenOptions::new()
58 .read(true)
59 .write(true)
60 .open(EVENT_CHANNEL_DEVICE)
61 .await?;
62 let wakes = Arc::new(Mutex::new(HashMap::new()));
63 let flag = Arc::new(AtomicBool::new(false));
64 let processor = EventChannelProcessor {
65 flag: flag.clone(),
66 handle: handle.try_clone().await?.into_std().await,
67 wakes: wakes.clone(),
68 };
69 processor.launch()?;
70
71 Ok(EventChannelService {
72 handle: Arc::new(Mutex::new(handle)),
73 wakes,
74 process_flag: flag,
75 })
76 }
77
78 pub async fn bind_virq(&self, virq: u32) -> Result<u32> {
79 let handle = self.handle.lock().await;
80 let fd = handle.as_raw_fd();
81 let mut request = BindVirqRequest { virq };
82 let result =
83 tokio::task::spawn_blocking(move || unsafe { sys::bind_virq(fd, &mut request) })
84 .await
85 .map_err(|_| Error::BlockingTaskJoin)?? as u32;
86 Ok(result)
87 }
88
89 pub async fn bind_interdomain(&self, domid: u32, port: u32) -> Result<u32> {
90 let handle = self.handle.lock().await;
91 let fd = handle.as_raw_fd();
92 let mut request = BindInterdomainRequest {
93 remote_domain: domid,
94 remote_port: port,
95 };
96 let result =
97 tokio::task::spawn_blocking(move || unsafe { sys::bind_interdomain(fd, &mut request) })
98 .await
99 .map_err(|_| Error::BlockingTaskJoin)?? as u32;
100 Ok(result)
101 }
102
103 pub async fn bind_unbound_port(&self, domid: u32) -> Result<u32> {
104 let handle = self.handle.lock().await;
105 let fd = handle.as_raw_fd();
106 let mut request = BindUnboundPortRequest {
107 remote_domain: domid,
108 };
109 let result = tokio::task::spawn_blocking(move || unsafe {
110 sys::bind_unbound_port(fd, &mut request)
111 })
112 .await
113 .map_err(|_| Error::BlockingTaskJoin)?? as u32;
114 Ok(result)
115 }
116
117 pub async fn unmask(&self, port: u32) -> Result<()> {
118 let handle = self.handle.lock().await;
119 let mut port = port;
120 let fd = handle.as_raw_fd();
121 let result = tokio::task::spawn_blocking(move || unsafe {
122 libc::write(fd, &mut port as *mut u32 as *mut c_void, size_of::<u32>())
123 })
124 .await
125 .map_err(|_| Error::BlockingTaskJoin)?;
126 if result != size_of::<u32>() as isize {
127 return Err(Error::Io(std::io::Error::from_raw_os_error(result as i32)));
128 }
129 Ok(())
130 }
131
132 pub async fn unbind(&self, port: u32) -> Result<u32> {
133 let handle = self.handle.lock().await;
134 let mut request = UnbindPortRequest { port };
135 let fd = handle.as_raw_fd();
136 let result = tokio::task::spawn_blocking(move || unsafe { sys::unbind(fd, &mut request) })
137 .await
138 .map_err(|_| Error::BlockingTaskJoin)?? as u32;
139 self.wakes.lock().await.remove(&port);
140 Ok(result)
141 }
142
143 pub async fn notify(&self, port: u32) -> Result<u32> {
144 let handle = self.handle.lock().await;
145 let mut request = NotifyRequest { port };
146 let fd = handle.as_raw_fd();
147 let result = tokio::task::spawn_blocking(move || unsafe { sys::notify(fd, &mut request) })
148 .await
149 .map_err(|_| Error::BlockingTaskJoin)?? as u32;
150 Ok(result)
151 }
152
153 pub async fn reset(&self) -> Result<u32> {
154 let handle = self.handle.lock().await;
155 let fd = handle.as_raw_fd();
156 let result = tokio::task::spawn_blocking(move || unsafe { sys::reset(fd) })
157 .await
158 .map_err(|_| Error::BlockingTaskJoin)?? as u32;
159 Ok(result)
160 }
161
162 pub async fn bind(&self, domid: u32, port: u32) -> Result<BoundEventChannel> {
163 let local_port = self.bind_interdomain(domid, port).await?;
164 let receiver = self.subscribe(local_port).await?;
165 let bound = BoundEventChannel {
166 local_port,
167 receiver,
168 service: self.clone(),
169 };
170 Ok(bound)
171 }
172
173 pub async fn subscribe(&self, port: u32) -> Result<Arc<Notify>> {
174 let mut wakes = self.wakes.lock().await;
175 let receiver = match wakes.entry(port) {
176 Entry::Occupied(entry) => entry.get().clone(),
177
178 Entry::Vacant(entry) => {
179 let notify = Arc::new(Notify::new());
180 entry.insert(notify.clone());
181 notify
182 }
183 };
184 Ok(receiver)
185 }
186}
187
188pub struct EventChannelProcessor {
189 flag: Arc<AtomicBool>,
190 handle: std::fs::File,
191 wakes: WakeMap,
192}
193
194impl EventChannelProcessor {
195 pub fn launch(mut self) -> Result<()> {
196 std::thread::spawn(move || {
197 while let Err(error) = self.process() {
198 if self.flag.load(Ordering::Acquire) {
199 break;
200 }
201 error!("failed to process event channel wakes: {}", error);
202 }
203 });
204
205 Ok(())
206 }
207
208 pub fn process(&mut self) -> Result<()> {
209 loop {
210 let port = self.handle.read_u32::<LittleEndian>()?;
211 let receiver = match self.wakes.blocking_lock().entry(port) {
212 Entry::Occupied(entry) => entry.get().clone(),
213
214 Entry::Vacant(entry) => {
215 let notify = Arc::new(Notify::new());
216 entry.insert(notify.clone());
217 notify
218 }
219 };
220 receiver.notify_one();
221 }
222 }
223}
224
225impl Drop for EventChannelService {
226 fn drop(&mut self) {
227 if Arc::strong_count(&self.handle) <= 1 {
228 self.process_flag.store(true, Ordering::Release);
229 }
230 }
231}