wl_client/connection/
wait_for_events.rs

1use {
2    crate::{
3        BorrowedQueue, Connection, Queue,
4        utils::{eventfd::Eventfd, executor::TaskId, os_error::OsError, poller},
5    },
6    parking_lot::{Condvar, Mutex},
7    run_on_drop::on_drop,
8    std::{
9        convert::Infallible,
10        future::poll_fn,
11        io,
12        os::fd::{AsFd, AsRawFd, BorrowedFd, RawFd},
13        sync::{
14            Arc,
15            atomic::{AtomicBool, Ordering::Relaxed},
16        },
17        task::{Poll, Waker},
18    },
19};
20
21#[cfg(test)]
22mod tests;
23
24/// A file descriptor for event-loop integration.
25///
26/// This type wraps a file descriptor that will signal readability when there are events
27/// to be dispatched on one or more queues.
28///
29/// You can construct this type by calling [`Connection::create_watcher`],
30/// [`Queue::create_watcher`], or [`BorrowedQueue::create_watcher`].
31///
32/// The contained file descriptor is opaque. You should not interact with it except by
33/// polling it for readability.
34///
35/// Once the file descriptor signals readability, the application should
36///
37/// 1. dispatch the queue(s) by calling [`Queue::dispatch_pending`],
38/// 2. reset the readability by calling [`QueueWatcher::reset`],
39/// 3. flush outgoing requests by calling [`Connection::flush`],
40/// 3. wait for the file descriptor to become readable again.
41///
42/// If not all events were dispatched before calling [`QueueWatcher::reset`], then the
43/// file descriptor will become readable again immediately after being reset.
44///
45/// # Example
46///
47/// ```
48/// # use std::os::fd::AsRawFd;
49/// # use mio::Interest;
50/// # use mio::unix::SourceFd;
51/// # use wl_client::Libwayland;
52/// # use wl_client::test_protocols::core::wl_display::WlDisplay;
53/// #
54/// let lib = Libwayland::open().unwrap();
55/// let con = lib.connect_to_default_display().unwrap();
56/// let queue = con.create_queue(c"queue name");
57/// let watcher = queue.create_watcher().unwrap();
58/// let token = mio::Token(0);
59/// # let _sync = queue.display::<WlDisplay>().sync();
60///
61/// let mut events = mio::Events::with_capacity(2);
62/// let mut poll = mio::Poll::new().unwrap();
63/// poll
64///     .registry()
65///     .register(&mut SourceFd(&watcher.as_raw_fd()), token, Interest::READABLE)
66///     .unwrap();
67/// // register other application file descriptors
68///
69/// loop {
70///     // flush outgoing messages before going to sleep
71///     con.flush().unwrap();
72///
73///     poll.poll(&mut events, None).unwrap();
74///     for event in events.iter() {
75///         if event.token() == token {
76///             // dispatch new events
77///             queue.dispatch_pending().unwrap();
78///             // reset the watcher
79///             watcher.reset().unwrap();
80///         }   
81///         // handle other file descriptors
82///     }
83///     events.clear();
84///     # break;
85/// }
86/// ```
87#[derive(Clone)]
88pub struct QueueWatcher {
89    data: Arc<QueueWatcherData>,
90}
91
92struct QueueWatcherData {
93    task_id: TaskId,
94    connection: Connection,
95    data: Arc<QueueWatcherShared>,
96}
97
98struct QueueWatcherShared {
99    eventfd: Eventfd,
100    has_error: AtomicBool,
101    data: Mutex<QueueWatcherMutable>,
102    cancellation: Condvar,
103}
104
105#[derive(Default)]
106struct QueueWatcherMutable {
107    wait_for_reset: bool,
108    waker: Option<Waker>,
109    last_error: Option<OsError>,
110    cancelled: bool,
111}
112
113impl Connection {
114    /// Waits for events on any number of event queues.
115    ///
116    /// When this function returns `Ok(())`, at least one of the event queues has an event
117    /// queued.
118    ///
119    /// If no other thread is dispatching these queues, then this function will return
120    /// `Ok(())` as soon as this happens.
121    ///
122    /// If the list of queues is empty, this function waits indefinitely.
123    ///
124    /// This function flushes all requests made before this function call.
125    ///
126    /// # Panic
127    ///
128    /// This function panics if the queues do not all belong to this connection.
129    pub async fn wait_for_events(&self, queues: &[&BorrowedQueue]) -> io::Result<()> {
130        self.flush()?;
131        self.wait_for_events_without_flush(queues).await
132    }
133
134    pub(crate) async fn wait_for_events_without_flush(
135        &self,
136        queues: &[&BorrowedQueue],
137    ) -> io::Result<()> {
138        for queue in queues {
139            if queue.connection() != self {
140                wrong_con();
141            }
142        }
143        loop {
144            let mut lock = None;
145            if let Some((first, other)) = queues.split_first() {
146                let Some(l) = self.acquire_read_lock_async(first).await else {
147                    return Ok(());
148                };
149                lock = Some(l);
150                for queue in other {
151                    if self.queue_has_events(queue) {
152                        return Ok(());
153                    }
154                }
155            }
156            self.data.data.ensure_no_error()?;
157            let poll_data = self.data.poller.data.clone();
158            // NOTE: We cannot hold on to the lock on this thread since that could cause
159            //       a deadlock when another task on this thread calls
160            //       wl_display_read_events. Instead we move the lock and the wait to
161            //       the executor thread which will make progress and call
162            //       wl_display_read_events as soon as there are new events, even if this
163            //       thread is blocked.
164            self.data
165                .executor
166                .execute::<io::Result<()>, _>(async move {
167                    poller::readable(&poll_data).await?;
168                    if let Some(lock) = lock {
169                        lock.read_events().await?;
170                    }
171                    Ok(())
172                })
173                .await?;
174        }
175    }
176
177    /// Creates a [`QueueWatcher`] for event-loop integration.
178    ///
179    /// See the documentation of [`QueueWatcher`] for details on when and how an
180    /// application would use this.
181    ///
182    /// # Panic
183    ///
184    /// This function panics if the queues do not all belong to this connection.
185    pub fn create_watcher(
186        &self,
187        owned: &[&Queue],
188        borrowed: impl IntoIterator<Item = BorrowedQueue>,
189    ) -> io::Result<QueueWatcher> {
190        self.create_watcher_(owned, borrowed.into_iter().collect())
191    }
192
193    /// Creates a [`QueueWatcher`] for event-loop integration.
194    ///
195    /// See the documentation of [`QueueWatcher`] for details on when and how an
196    /// application would use this.
197    ///
198    /// # Panic
199    ///
200    /// This function panics if the queues do not all belong to this connection.
201    fn create_watcher_(
202        &self,
203        owned: &[&Queue],
204        borrowed: Vec<BorrowedQueue>,
205    ) -> io::Result<QueueWatcher> {
206        for q in owned {
207            if q.connection() != self {
208                wrong_con();
209            }
210        }
211        for q in &borrowed {
212            if q.connection() != self {
213                wrong_con();
214            }
215        }
216        let shared = Arc::new(QueueWatcherShared {
217            eventfd: Eventfd::new()?,
218            has_error: Default::default(),
219            data: Default::default(),
220            cancellation: Default::default(),
221        });
222        /// This type contains the queues that are transferred into the task of a [`QueueWatcher`].
223        ///
224        /// This type exists so that [`QueueWatcherData::drop`] can block until these objects
225        /// have definitely been dropped by the executor. This is achieved as follows: Rust
226        /// guarantees that fields are dropped from top to bottom. Therefore, when the drop impl
227        /// of `_f` runs, `owned` and `borrowed` have already been dropped.
228        ///
229        /// Blocking is required for two reasons:
230        ///
231        /// 1. The user of [`QueueWatcher`] must keep the underlying queues alive while any of the
232        ///    [`BorrowedQueue`] still exist. If the drop impl did not block, the user would be
233        ///    unable to determine when the [`BorrowedQueue`] have been dropped.
234        /// 2. The queues contain clones of the [`Connection`]. The last clone of the connection
235        ///    must not be dropped in the executor thread since the executor thread would then try
236        ///    to join itself.
237        struct CancelData<F> {
238            connection: Connection,
239            shared: Arc<QueueWatcherShared>,
240            owned: Vec<Queue>,
241            borrowed: Vec<BorrowedQueue>,
242            _f: F,
243        }
244        let cancel_data = CancelData {
245            connection: self.clone(),
246            shared: shared.clone(),
247            owned: owned.iter().map(|q| (*q).clone()).collect(),
248            borrowed,
249            _f: on_drop({
250                let shared = shared.clone();
251                move || {
252                    shared.data.lock().cancelled = true;
253                    shared.cancellation.notify_all();
254                }
255            }),
256        };
257        let task_id = self.data.executor.add(async move {
258            let cancel_data = cancel_data;
259            let mut qs = vec![];
260            for q in &cancel_data.owned {
261                qs.push(&**q);
262            }
263            for q in &cancel_data.borrowed {
264                qs.push(q);
265            }
266            let res: io::Result<Infallible> = async {
267                loop {
268                    cancel_data
269                        .connection
270                        .wait_for_events_without_flush(&qs)
271                        .await?;
272                    cancel_data.shared.eventfd.bump()?;
273                    poll_fn(|ctx| {
274                        let d = &mut *cancel_data.shared.data.lock();
275                        if d.wait_for_reset {
276                            d.waker = Some(ctx.waker().clone());
277                            Poll::Pending
278                        } else {
279                            d.wait_for_reset = true;
280                            d.waker = None;
281                            Poll::Ready(())
282                        }
283                    })
284                    .await;
285                }
286            }
287            .await;
288            let e = res.unwrap_err();
289            cancel_data.shared.data.lock().last_error = Some(e.into());
290            cancel_data.shared.has_error.store(true, Relaxed);
291        });
292        let data = Arc::new(QueueWatcherData {
293            task_id,
294            connection: self.clone(),
295            data: shared,
296        });
297        Ok(QueueWatcher { data })
298    }
299}
300
301impl QueueWatcher {
302    /// Resets the file descriptor readability.
303    ///
304    /// The file descriptor will become readable again when there are events to be
305    /// dispatched.
306    pub fn reset(&self) -> io::Result<()> {
307        let data = &*self.data.data;
308        if data.has_error.load(Relaxed) {
309            if let Some(e) = data.data.lock().last_error {
310                return Err(e.into());
311            }
312        }
313        data.eventfd.clear()?;
314        let d = &mut *data.data.lock();
315        if let Some(e) = d.last_error {
316            let _ = data.eventfd.bump();
317            return Err(e.into());
318        }
319        d.wait_for_reset = false;
320        if let Some(waker) = d.waker.take() {
321            waker.wake()
322        }
323        Ok(())
324    }
325}
326
327impl Drop for QueueWatcherData {
328    fn drop(&mut self) {
329        self.connection.data.executor.cancel(self.task_id);
330        let mut lock = self.data.data.lock();
331        while !lock.cancelled {
332            self.data.cancellation.wait(&mut lock);
333        }
334    }
335}
336
337impl AsFd for QueueWatcher {
338    fn as_fd(&self) -> BorrowedFd<'_> {
339        self.data.data.eventfd.as_fd()
340    }
341}
342
343impl AsRawFd for QueueWatcher {
344    fn as_raw_fd(&self) -> RawFd {
345        self.as_fd().as_raw_fd()
346    }
347}
348
349impl AsRawFd for &'_ QueueWatcher {
350    fn as_raw_fd(&self) -> RawFd {
351        self.as_fd().as_raw_fd()
352    }
353}
354
355#[cold]
356fn wrong_con() -> ! {
357    panic!("queue does not belong to this connection");
358}