xenevtchn/
lib.rs

1pub mod error;
2pub mod raw;
3pub mod sys;
4
5use crate::error::{Error, Result};
6use crate::sys::{
7    BindInterdomainRequest, BindUnboundPortRequest, BindVirqRequest, NotifyRequest,
8    UnbindPortRequest,
9};
10
11use crate::raw::EVENT_CHANNEL_DEVICE;
12use byteorder::{LittleEndian, ReadBytesExt};
13use log::error;
14use std::collections::hash_map::Entry;
15use std::collections::HashMap;
16use std::mem::size_of;
17use std::os::fd::AsRawFd;
18use std::os::raw::c_void;
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::sync::Arc;
21use tokio::fs::{File, OpenOptions};
22use tokio::sync::{Mutex, Notify};
23
24type WakeMap = Arc<Mutex<HashMap<u32, Arc<Notify>>>>;
25
26#[derive(Clone)]
27pub struct EventChannelService {
28    handle: Arc<Mutex<File>>,
29    wakes: WakeMap,
30    process_flag: Arc<AtomicBool>,
31}
32
33pub struct BoundEventChannel {
34    pub local_port: u32,
35    pub receiver: Arc<Notify>,
36    pub service: EventChannelService,
37}
38
39impl BoundEventChannel {
40    pub async fn unmask(&self) -> Result<()> {
41        self.service.unmask(self.local_port).await
42    }
43}
44
45impl Drop for BoundEventChannel {
46    fn drop(&mut self) {
47        let service = self.service.clone();
48        let port = self.local_port;
49        tokio::task::spawn(async move {
50            let _ = service.unbind(port).await;
51        });
52    }
53}
54
55impl EventChannelService {
56    pub async fn open() -> Result<EventChannelService> {
57        let handle = OpenOptions::new()
58            .read(true)
59            .write(true)
60            .open(EVENT_CHANNEL_DEVICE)
61            .await?;
62        let wakes = Arc::new(Mutex::new(HashMap::new()));
63        let flag = Arc::new(AtomicBool::new(false));
64        let processor = EventChannelProcessor {
65            flag: flag.clone(),
66            handle: handle.try_clone().await?.into_std().await,
67            wakes: wakes.clone(),
68        };
69        processor.launch()?;
70
71        Ok(EventChannelService {
72            handle: Arc::new(Mutex::new(handle)),
73            wakes,
74            process_flag: flag,
75        })
76    }
77
78    pub async fn bind_virq(&self, virq: u32) -> Result<u32> {
79        let handle = self.handle.lock().await;
80        let fd = handle.as_raw_fd();
81        let mut request = BindVirqRequest { virq };
82        let result =
83            tokio::task::spawn_blocking(move || unsafe { sys::bind_virq(fd, &mut request) })
84                .await
85                .map_err(|_| Error::BlockingTaskJoin)?? as u32;
86        Ok(result)
87    }
88
89    pub async fn bind_interdomain(&self, domid: u32, port: u32) -> Result<u32> {
90        let handle = self.handle.lock().await;
91        let fd = handle.as_raw_fd();
92        let mut request = BindInterdomainRequest {
93            remote_domain: domid,
94            remote_port: port,
95        };
96        let result =
97            tokio::task::spawn_blocking(move || unsafe { sys::bind_interdomain(fd, &mut request) })
98                .await
99                .map_err(|_| Error::BlockingTaskJoin)?? as u32;
100        Ok(result)
101    }
102
103    pub async fn bind_unbound_port(&self, domid: u32) -> Result<u32> {
104        let handle = self.handle.lock().await;
105        let fd = handle.as_raw_fd();
106        let mut request = BindUnboundPortRequest {
107            remote_domain: domid,
108        };
109        let result = tokio::task::spawn_blocking(move || unsafe {
110            sys::bind_unbound_port(fd, &mut request)
111        })
112        .await
113        .map_err(|_| Error::BlockingTaskJoin)?? as u32;
114        Ok(result)
115    }
116
117    pub async fn unmask(&self, port: u32) -> Result<()> {
118        let handle = self.handle.lock().await;
119        let mut port = port;
120        let fd = handle.as_raw_fd();
121        let result = tokio::task::spawn_blocking(move || unsafe {
122            libc::write(fd, &mut port as *mut u32 as *mut c_void, size_of::<u32>())
123        })
124        .await
125        .map_err(|_| Error::BlockingTaskJoin)?;
126        if result != size_of::<u32>() as isize {
127            return Err(Error::Io(std::io::Error::from_raw_os_error(result as i32)));
128        }
129        Ok(())
130    }
131
132    pub async fn unbind(&self, port: u32) -> Result<u32> {
133        let handle = self.handle.lock().await;
134        let mut request = UnbindPortRequest { port };
135        let fd = handle.as_raw_fd();
136        let result = tokio::task::spawn_blocking(move || unsafe { sys::unbind(fd, &mut request) })
137            .await
138            .map_err(|_| Error::BlockingTaskJoin)?? as u32;
139        self.wakes.lock().await.remove(&port);
140        Ok(result)
141    }
142
143    pub async fn notify(&self, port: u32) -> Result<u32> {
144        let handle = self.handle.lock().await;
145        let mut request = NotifyRequest { port };
146        let fd = handle.as_raw_fd();
147        let result = tokio::task::spawn_blocking(move || unsafe { sys::notify(fd, &mut request) })
148            .await
149            .map_err(|_| Error::BlockingTaskJoin)?? as u32;
150        Ok(result)
151    }
152
153    pub async fn reset(&self) -> Result<u32> {
154        let handle = self.handle.lock().await;
155        let fd = handle.as_raw_fd();
156        let result = tokio::task::spawn_blocking(move || unsafe { sys::reset(fd) })
157            .await
158            .map_err(|_| Error::BlockingTaskJoin)?? as u32;
159        Ok(result)
160    }
161
162    pub async fn bind(&self, domid: u32, port: u32) -> Result<BoundEventChannel> {
163        let local_port = self.bind_interdomain(domid, port).await?;
164        let receiver = self.subscribe(local_port).await?;
165        let bound = BoundEventChannel {
166            local_port,
167            receiver,
168            service: self.clone(),
169        };
170        Ok(bound)
171    }
172
173    pub async fn subscribe(&self, port: u32) -> Result<Arc<Notify>> {
174        let mut wakes = self.wakes.lock().await;
175        let receiver = match wakes.entry(port) {
176            Entry::Occupied(entry) => entry.get().clone(),
177
178            Entry::Vacant(entry) => {
179                let notify = Arc::new(Notify::new());
180                entry.insert(notify.clone());
181                notify
182            }
183        };
184        Ok(receiver)
185    }
186}
187
188pub struct EventChannelProcessor {
189    flag: Arc<AtomicBool>,
190    handle: std::fs::File,
191    wakes: WakeMap,
192}
193
194impl EventChannelProcessor {
195    pub fn launch(mut self) -> Result<()> {
196        std::thread::spawn(move || {
197            while let Err(error) = self.process() {
198                if self.flag.load(Ordering::Acquire) {
199                    break;
200                }
201                error!("failed to process event channel wakes: {}", error);
202            }
203        });
204
205        Ok(())
206    }
207
208    pub fn process(&mut self) -> Result<()> {
209        loop {
210            let port = self.handle.read_u32::<LittleEndian>()?;
211            let receiver = match self.wakes.blocking_lock().entry(port) {
212                Entry::Occupied(entry) => entry.get().clone(),
213
214                Entry::Vacant(entry) => {
215                    let notify = Arc::new(Notify::new());
216                    entry.insert(notify.clone());
217                    notify
218                }
219            };
220            receiver.notify_one();
221        }
222    }
223}
224
225impl Drop for EventChannelService {
226    fn drop(&mut self) {
227        if Arc::strong_count(&self.handle) <= 1 {
228            self.process_flag.store(true, Ordering::Release);
229        }
230    }
231}