wl_client/connection/wait_for_events.rs
1use {
2 crate::{
3 BorrowedQueue, Connection, Queue,
4 utils::{eventfd::Eventfd, executor::TaskId, os_error::OsError, poller},
5 },
6 parking_lot::{Condvar, Mutex},
7 run_on_drop::on_drop,
8 std::{
9 convert::Infallible,
10 future::poll_fn,
11 io,
12 os::fd::{AsFd, AsRawFd, BorrowedFd, RawFd},
13 sync::{
14 Arc,
15 atomic::{AtomicBool, Ordering::Relaxed},
16 },
17 task::{Poll, Waker},
18 },
19};
20
21#[cfg(test)]
22mod tests;
23
24/// A file descriptor for event-loop integration.
25///
26/// This type wraps a file descriptor that will signal readability when there are events
27/// to be dispatched on one or more queues.
28///
29/// You can construct this type by calling [`Connection::create_watcher`],
30/// [`Queue::create_watcher`], or [`BorrowedQueue::create_watcher`].
31///
32/// The contained file descriptor is opaque. You should not interact with it except by
33/// polling it for readability.
34///
35/// Once the file descriptor signals readability, the application should
36///
37/// 1. dispatch the queue(s) by calling [`Queue::dispatch_pending`],
38/// 2. reset the readability by calling [`QueueWatcher::reset`],
39/// 3. flush outgoing requests by calling [`Connection::flush`],
40/// 3. wait for the file descriptor to become readable again.
41///
42/// If not all events were dispatched before calling [`QueueWatcher::reset`], then the
43/// file descriptor will become readable again immediately after being reset.
44///
45/// # Example
46///
47/// ```
48/// # use std::os::fd::AsRawFd;
49/// # use mio::Interest;
50/// # use mio::unix::SourceFd;
51/// # use wl_client::Libwayland;
52/// # use wl_client::test_protocols::core::wl_display::WlDisplay;
53/// #
54/// let lib = Libwayland::open().unwrap();
55/// let con = lib.connect_to_default_display().unwrap();
56/// let queue = con.create_queue(c"queue name");
57/// let watcher = queue.create_watcher().unwrap();
58/// let token = mio::Token(0);
59/// # let _sync = queue.display::<WlDisplay>().sync();
60///
61/// let mut events = mio::Events::with_capacity(2);
62/// let mut poll = mio::Poll::new().unwrap();
63/// poll
64/// .registry()
65/// .register(&mut SourceFd(&watcher.as_raw_fd()), token, Interest::READABLE)
66/// .unwrap();
67/// // register other application file descriptors
68///
69/// loop {
70/// // flush outgoing messages before going to sleep
71/// con.flush().unwrap();
72///
73/// poll.poll(&mut events, None).unwrap();
74/// for event in events.iter() {
75/// if event.token() == token {
76/// // dispatch new events
77/// queue.dispatch_pending().unwrap();
78/// // reset the watcher
79/// watcher.reset().unwrap();
80/// }
81/// // handle other file descriptors
82/// }
83/// events.clear();
84/// # break;
85/// }
86/// ```
87#[derive(Clone)]
88pub struct QueueWatcher {
89 data: Arc<QueueWatcherData>,
90}
91
92struct QueueWatcherData {
93 task_id: TaskId,
94 connection: Connection,
95 data: Arc<QueueWatcherShared>,
96}
97
98struct QueueWatcherShared {
99 eventfd: Eventfd,
100 has_error: AtomicBool,
101 data: Mutex<QueueWatcherMutable>,
102 cancellation: Condvar,
103}
104
105#[derive(Default)]
106struct QueueWatcherMutable {
107 wait_for_reset: bool,
108 waker: Option<Waker>,
109 last_error: Option<OsError>,
110 cancelled: bool,
111}
112
113impl Connection {
114 /// Waits for events on any number of event queues.
115 ///
116 /// When this function returns `Ok(())`, at least one of the event queues has an event
117 /// queued.
118 ///
119 /// If no other thread is dispatching these queues, then this function will return
120 /// `Ok(())` as soon as this happens.
121 ///
122 /// If the list of queues is empty, this function waits indefinitely.
123 ///
124 /// This function flushes all requests made before this function call.
125 ///
126 /// # Panic
127 ///
128 /// This function panics if the queues do not all belong to this connection.
129 pub async fn wait_for_events(&self, queues: &[&BorrowedQueue]) -> io::Result<()> {
130 self.flush()?;
131 self.wait_for_events_without_flush(queues).await
132 }
133
134 pub(crate) async fn wait_for_events_without_flush(
135 &self,
136 queues: &[&BorrowedQueue],
137 ) -> io::Result<()> {
138 for queue in queues {
139 if queue.connection() != self {
140 wrong_con();
141 }
142 }
143 loop {
144 let mut lock = None;
145 if let Some((first, other)) = queues.split_first() {
146 let Some(l) = self.acquire_read_lock_async(first).await else {
147 return Ok(());
148 };
149 lock = Some(l);
150 for queue in other {
151 if self.queue_has_events(queue) {
152 return Ok(());
153 }
154 }
155 }
156 self.data.data.ensure_no_error()?;
157 let poll_data = self.data.poller.data.clone();
158 // NOTE: We cannot hold on to the lock on this thread since that could cause
159 // a deadlock when another task on this thread calls
160 // wl_display_read_events. Instead we move the lock and the wait to
161 // the executor thread which will make progress and call
162 // wl_display_read_events as soon as there are new events, even if this
163 // thread is blocked.
164 self.data
165 .executor
166 .execute::<io::Result<()>, _>(async move {
167 poller::readable(&poll_data).await?;
168 if let Some(lock) = lock {
169 lock.read_events().await?;
170 }
171 Ok(())
172 })
173 .await?;
174 }
175 }
176
177 /// Creates a [`QueueWatcher`] for event-loop integration.
178 ///
179 /// See the documentation of [`QueueWatcher`] for details on when and how an
180 /// application would use this.
181 ///
182 /// # Panic
183 ///
184 /// This function panics if the queues do not all belong to this connection.
185 pub fn create_watcher(
186 &self,
187 owned: &[&Queue],
188 borrowed: impl IntoIterator<Item = BorrowedQueue>,
189 ) -> io::Result<QueueWatcher> {
190 self.create_watcher_(owned, borrowed.into_iter().collect())
191 }
192
193 /// Creates a [`QueueWatcher`] for event-loop integration.
194 ///
195 /// See the documentation of [`QueueWatcher`] for details on when and how an
196 /// application would use this.
197 ///
198 /// # Panic
199 ///
200 /// This function panics if the queues do not all belong to this connection.
201 fn create_watcher_(
202 &self,
203 owned: &[&Queue],
204 borrowed: Vec<BorrowedQueue>,
205 ) -> io::Result<QueueWatcher> {
206 for q in owned {
207 if q.connection() != self {
208 wrong_con();
209 }
210 }
211 for q in &borrowed {
212 if q.connection() != self {
213 wrong_con();
214 }
215 }
216 let shared = Arc::new(QueueWatcherShared {
217 eventfd: Eventfd::new()?,
218 has_error: Default::default(),
219 data: Default::default(),
220 cancellation: Default::default(),
221 });
222 /// This type contains the queues that are transferred into the task of a [`QueueWatcher`].
223 ///
224 /// This type exists so that [`QueueWatcherData::drop`] can block until these objects
225 /// have definitely been dropped by the executor. This is achieved as follows: Rust
226 /// guarantees that fields are dropped from top to bottom. Therefore, when the drop impl
227 /// of `_f` runs, `owned` and `borrowed` have already been dropped.
228 ///
229 /// Blocking is required for two reasons:
230 ///
231 /// 1. The user of [`QueueWatcher`] must keep the underlying queues alive while any of the
232 /// [`BorrowedQueue`] still exist. If the drop impl did not block, the user would be
233 /// unable to determine when the [`BorrowedQueue`] have been dropped.
234 /// 2. The queues contain clones of the [`Connection`]. The last clone of the connection
235 /// must not be dropped in the executor thread since the executor thread would then try
236 /// to join itself.
237 struct CancelData<F> {
238 connection: Connection,
239 shared: Arc<QueueWatcherShared>,
240 owned: Vec<Queue>,
241 borrowed: Vec<BorrowedQueue>,
242 _f: F,
243 }
244 let cancel_data = CancelData {
245 connection: self.clone(),
246 shared: shared.clone(),
247 owned: owned.iter().map(|q| (*q).clone()).collect(),
248 borrowed,
249 _f: on_drop({
250 let shared = shared.clone();
251 move || {
252 shared.data.lock().cancelled = true;
253 shared.cancellation.notify_all();
254 }
255 }),
256 };
257 let task_id = self.data.executor.add(async move {
258 let cancel_data = cancel_data;
259 let mut qs = vec![];
260 for q in &cancel_data.owned {
261 qs.push(&**q);
262 }
263 for q in &cancel_data.borrowed {
264 qs.push(q);
265 }
266 let res: io::Result<Infallible> = async {
267 loop {
268 cancel_data
269 .connection
270 .wait_for_events_without_flush(&qs)
271 .await?;
272 cancel_data.shared.eventfd.bump()?;
273 poll_fn(|ctx| {
274 let d = &mut *cancel_data.shared.data.lock();
275 if d.wait_for_reset {
276 d.waker = Some(ctx.waker().clone());
277 Poll::Pending
278 } else {
279 d.wait_for_reset = true;
280 d.waker = None;
281 Poll::Ready(())
282 }
283 })
284 .await;
285 }
286 }
287 .await;
288 let e = res.unwrap_err();
289 cancel_data.shared.data.lock().last_error = Some(e.into());
290 cancel_data.shared.has_error.store(true, Relaxed);
291 });
292 let data = Arc::new(QueueWatcherData {
293 task_id,
294 connection: self.clone(),
295 data: shared,
296 });
297 Ok(QueueWatcher { data })
298 }
299}
300
301impl QueueWatcher {
302 /// Resets the file descriptor readability.
303 ///
304 /// The file descriptor will become readable again when there are events to be
305 /// dispatched.
306 pub fn reset(&self) -> io::Result<()> {
307 let data = &*self.data.data;
308 if data.has_error.load(Relaxed) {
309 if let Some(e) = data.data.lock().last_error {
310 return Err(e.into());
311 }
312 }
313 data.eventfd.clear()?;
314 let d = &mut *data.data.lock();
315 if let Some(e) = d.last_error {
316 let _ = data.eventfd.bump();
317 return Err(e.into());
318 }
319 d.wait_for_reset = false;
320 if let Some(waker) = d.waker.take() {
321 waker.wake()
322 }
323 Ok(())
324 }
325}
326
327impl Drop for QueueWatcherData {
328 fn drop(&mut self) {
329 self.connection.data.executor.cancel(self.task_id);
330 let mut lock = self.data.data.lock();
331 while !lock.cancelled {
332 self.data.cancellation.wait(&mut lock);
333 }
334 }
335}
336
337impl AsFd for QueueWatcher {
338 fn as_fd(&self) -> BorrowedFd<'_> {
339 self.data.data.eventfd.as_fd()
340 }
341}
342
343impl AsRawFd for QueueWatcher {
344 fn as_raw_fd(&self) -> RawFd {
345 self.as_fd().as_raw_fd()
346 }
347}
348
349impl AsRawFd for &'_ QueueWatcher {
350 fn as_raw_fd(&self) -> RawFd {
351 self.as_fd().as_raw_fd()
352 }
353}
354
355#[cold]
356fn wrong_con() -> ! {
357 panic!("queue does not belong to this connection");
358}