wl_client/connection/wait_for_events.rs
1use {
2 crate::{
3 BorrowedQueue, Connection, Queue,
4 utils::{eventfd::Eventfd, executor::TaskId, on_drop::on_drop, poller},
5 },
6 parking_lot::{Condvar, Mutex},
7 std::{
8 convert::Infallible,
9 future::poll_fn,
10 io::{self, ErrorKind},
11 os::fd::{AsFd, AsRawFd, BorrowedFd, RawFd},
12 sync::{
13 Arc,
14 atomic::{AtomicBool, Ordering::Relaxed},
15 },
16 task::{Poll, Waker},
17 },
18};
19
20#[cfg(test)]
21mod tests;
22
23/// A file descriptor for event-loop integration.
24///
25/// This type wraps a file descriptor that will signal readability when there are events
26/// to be dispatched on one or more queues.
27///
28/// You can construct this type by calling [`Connection::create_watcher`],
29/// [`Queue::create_watcher`], or [`BorrowedQueue::create_watcher`].
30///
31/// The contained file descriptor is opaque. You should not interact with it except by
32/// polling it for readability.
33///
34/// Once the file descriptor signals readability, the application should
35///
36/// 1. dispatch the queue(s) by calling [`Queue::dispatch_pending`],
37/// 2. reset the readability by calling [`QueueWatcher::reset`],
38/// 3. flush outgoing requests by calling [`Connection::flush`],
39/// 3. wait for the file descriptor to become readable again.
40///
41/// If not all events were dispatched before calling [`QueueWatcher::reset`], then the
42/// file descriptor will become readable again immediately after being reset.
43///
44/// # Example
45///
46/// ```
47/// # use std::os::fd::AsRawFd;
48/// # use mio::Interest;
49/// # use mio::unix::SourceFd;
50/// # use wl_client::Libwayland;
51/// # use wl_client::test_protocols::core::wl_display::WlDisplay;
52/// #
53/// let lib = Libwayland::open().unwrap();
54/// let con = lib.connect_to_default_display().unwrap();
55/// let queue = con.create_queue(c"queue name");
56/// let watcher = queue.create_watcher().unwrap();
57/// let token = mio::Token(0);
58/// # let _sync = queue.display::<WlDisplay>().sync();
59///
60/// let mut events = mio::Events::with_capacity(2);
61/// let mut poll = mio::Poll::new().unwrap();
62/// poll
63/// .registry()
64/// .register(&mut SourceFd(&watcher.as_raw_fd()), token, Interest::READABLE)
65/// .unwrap();
66/// // register other application file descriptors
67///
68/// loop {
69/// // flush outgoing messages before going to sleep
70/// con.flush().unwrap();
71///
72/// poll.poll(&mut events, None).unwrap();
73/// for event in events.iter() {
74/// if event.token() == token {
75/// // dispatch new events
76/// queue.dispatch_pending().unwrap();
77/// // reset the watcher
78/// watcher.reset().unwrap();
79/// }
80/// // handle other file descriptors
81/// }
82/// events.clear();
83/// # break;
84/// }
85/// ```
86#[derive(Clone)]
87pub struct QueueWatcher {
88 data: Arc<QueueWatcherData>,
89}
90
91struct QueueWatcherData {
92 task_id: TaskId,
93 connection: Connection,
94 data: Arc<QueueWatcherShared>,
95}
96
97struct QueueWatcherShared {
98 eventfd: Eventfd,
99 has_error: AtomicBool,
100 data: Mutex<QueueWatcherMutable>,
101 cancellation: Condvar,
102}
103
104#[derive(Default)]
105struct QueueWatcherMutable {
106 wait_for_reset: bool,
107 waker: Option<Waker>,
108 last_error: Option<ErrorKind>,
109 cancelled: bool,
110}
111
112impl Connection {
113 /// Waits for events on any number of event queues.
114 ///
115 /// When this function returns `Ok(())`, at least one of the event queues has an event
116 /// queued.
117 ///
118 /// If no other thread is dispatching these queues, then this function will return
119 /// `Ok(())` as soon as this happens.
120 ///
121 /// If the list of queues is empty, this function waits indefinitely.
122 ///
123 /// This function flushes all requests made before this function call.
124 ///
125 /// # Panic
126 ///
127 /// This function panics if the queues do not all belong to this connection.
128 pub async fn wait_for_events(&self, queues: &[&BorrowedQueue]) -> io::Result<()> {
129 self.flush()?;
130 self.wait_for_events_without_flush(queues).await
131 }
132
133 pub(crate) async fn wait_for_events_without_flush(
134 &self,
135 queues: &[&BorrowedQueue],
136 ) -> io::Result<()> {
137 for queue in queues {
138 if queue.connection() != self {
139 wrong_con();
140 }
141 }
142 loop {
143 let mut lock = None;
144 if let Some((first, other)) = queues.split_first() {
145 let Some(l) = self.acquire_read_lock_async(first).await else {
146 return Ok(());
147 };
148 lock = Some(l);
149 for queue in other {
150 if self.queue_has_events(queue) {
151 return Ok(());
152 }
153 }
154 }
155 let poll_data = self.data.poller.data.clone();
156 // NOTE: We cannot hold on to the lock on this thread since that could cause
157 // a deadlock when another task on this thread calls
158 // wl_display_read_events. Instead we move the lock and the wait to
159 // the executor thread which will make progress and call
160 // wl_display_read_events as soon as there are new events, even if this
161 // thread is blocked.
162 self.data
163 .executor
164 .execute::<io::Result<()>, _>(async move {
165 poller::readable(&poll_data).await?;
166 if let Some(lock) = lock {
167 lock.read_events().await?;
168 }
169 Ok(())
170 })
171 .await?;
172 }
173 }
174
175 /// Creates a [`QueueWatcher`] for event-loop integration.
176 ///
177 /// See the documentation of [`QueueWatcher`] for details on when and how an
178 /// application would use this.
179 ///
180 /// # Panic
181 ///
182 /// This function panics if the queues do not all belong to this connection.
183 pub fn create_watcher(
184 &self,
185 owned: &[&Queue],
186 borrowed: impl IntoIterator<Item = BorrowedQueue>,
187 ) -> io::Result<QueueWatcher> {
188 self.create_watcher_(owned, borrowed.into_iter().collect())
189 }
190
191 /// Creates a [`QueueWatcher`] for event-loop integration.
192 ///
193 /// See the documentation of [`QueueWatcher`] for details on when and how an
194 /// application would use this.
195 ///
196 /// # Panic
197 ///
198 /// This function panics if the queues do not all belong to this connection.
199 fn create_watcher_(
200 &self,
201 owned: &[&Queue],
202 borrowed: Vec<BorrowedQueue>,
203 ) -> io::Result<QueueWatcher> {
204 for q in owned {
205 if q.connection() != self {
206 wrong_con();
207 }
208 }
209 for q in &borrowed {
210 if q.connection() != self {
211 wrong_con();
212 }
213 }
214 let shared = Arc::new(QueueWatcherShared {
215 eventfd: Eventfd::new()?,
216 has_error: Default::default(),
217 data: Default::default(),
218 cancellation: Default::default(),
219 });
220 /// This type contains the queues that are transferred into the task of a [`QueueWatcher`].
221 ///
222 /// This type exists so that [`QueueWatcherData::drop`] can block until these objects
223 /// have definitely been dropped by the executor. This is achieved as follows: Rust
224 /// guarantees that fields are dropped from top to bottom. Therefore, when the drop impl
225 /// of `_f` runs, `owned` and `borrowed` have already been dropped.
226 ///
227 /// Blocking is required for two reasons:
228 ///
229 /// 1. The user of [`QueueWatcher`] must keep the underlying queues alive while any of the
230 /// [`BorrowedQueue`] still exist. If the drop impl did not block, the user would be
231 /// unable to determine when the [`BorrowedQueue`] have been dropped.
232 /// 2. The queues contain clones of the [`Connection`]. The last clone of the connection
233 /// must not be dropped in the executor thread since the executor thread would then try
234 /// to join itself.
235 struct CancelData<F> {
236 connection: Connection,
237 shared: Arc<QueueWatcherShared>,
238 owned: Vec<Queue>,
239 borrowed: Vec<BorrowedQueue>,
240 _f: F,
241 }
242 let cancel_data = CancelData {
243 connection: self.clone(),
244 shared: shared.clone(),
245 owned: owned.iter().map(|q| (*q).clone()).collect(),
246 borrowed,
247 _f: on_drop({
248 let shared = shared.clone();
249 move || {
250 shared.data.lock().cancelled = true;
251 shared.cancellation.notify_all();
252 }
253 }),
254 };
255 let task_id = self.data.executor.add(async move {
256 let cancel_data = cancel_data;
257 let mut qs = vec![];
258 for q in &cancel_data.owned {
259 qs.push(&**q);
260 }
261 for q in &cancel_data.borrowed {
262 qs.push(q);
263 }
264 let res: io::Result<Infallible> = async {
265 loop {
266 cancel_data
267 .connection
268 .wait_for_events_without_flush(&qs)
269 .await?;
270 cancel_data.shared.eventfd.bump()?;
271 poll_fn(|ctx| {
272 let d = &mut *cancel_data.shared.data.lock();
273 if d.wait_for_reset {
274 d.waker = Some(ctx.waker().clone());
275 Poll::Pending
276 } else {
277 d.wait_for_reset = true;
278 d.waker = None;
279 Poll::Ready(())
280 }
281 })
282 .await;
283 }
284 }
285 .await;
286 let e = res.unwrap_err();
287 cancel_data.shared.data.lock().last_error = Some(e.kind());
288 cancel_data.shared.has_error.store(true, Relaxed);
289 });
290 let data = Arc::new(QueueWatcherData {
291 task_id,
292 connection: self.clone(),
293 data: shared,
294 });
295 Ok(QueueWatcher { data })
296 }
297}
298
299impl QueueWatcher {
300 /// Resets the file descriptor readability.
301 ///
302 /// The file descriptor will become readable again when there are events to be
303 /// dispatched.
304 pub fn reset(&self) -> io::Result<()> {
305 let data = &*self.data.data;
306 if data.has_error.load(Relaxed) {
307 if let Some(e) = data.data.lock().last_error {
308 return Err(e.into());
309 }
310 }
311 data.eventfd.clear()?;
312 let d = &mut *data.data.lock();
313 if let Some(e) = d.last_error {
314 let _ = data.eventfd.bump();
315 return Err(e.into());
316 }
317 d.wait_for_reset = false;
318 if let Some(waker) = d.waker.take() {
319 waker.wake()
320 }
321 Ok(())
322 }
323}
324
325impl Drop for QueueWatcherData {
326 fn drop(&mut self) {
327 self.connection.data.executor.cancel(self.task_id);
328 let mut lock = self.data.data.lock();
329 while !lock.cancelled {
330 self.data.cancellation.wait(&mut lock);
331 }
332 }
333}
334
335impl AsFd for QueueWatcher {
336 fn as_fd(&self) -> BorrowedFd<'_> {
337 self.data.data.eventfd.as_fd()
338 }
339}
340
341impl AsRawFd for QueueWatcher {
342 fn as_raw_fd(&self) -> RawFd {
343 self.as_fd().as_raw_fd()
344 }
345}
346
347impl AsRawFd for &'_ QueueWatcher {
348 fn as_raw_fd(&self) -> RawFd {
349 self.as_fd().as_raw_fd()
350 }
351}
352
353#[cold]
354fn wrong_con() -> ! {
355 panic!("queue does not belong to this connection");
356}