wl_client/connection/
wait_for_events.rs

1use {
2    crate::{
3        BorrowedQueue, Connection, Queue,
4        utils::{eventfd::Eventfd, executor::TaskId, on_drop::on_drop, poller},
5    },
6    parking_lot::{Condvar, Mutex},
7    std::{
8        convert::Infallible,
9        future::poll_fn,
10        io::{self, ErrorKind},
11        os::fd::{AsFd, AsRawFd, BorrowedFd, RawFd},
12        sync::{
13            Arc,
14            atomic::{AtomicBool, Ordering::Relaxed},
15        },
16        task::{Poll, Waker},
17    },
18};
19
20#[cfg(test)]
21mod tests;
22
23/// A file descriptor for event-loop integration.
24///
25/// This type wraps a file descriptor that will signal readability when there are events
26/// to be dispatched on one or more queues.
27///
28/// You can construct this type by calling [`Connection::create_watcher`],
29/// [`Queue::create_watcher`], or [`BorrowedQueue::create_watcher`].
30///
31/// The contained file descriptor is opaque. You should not interact with it except by
32/// polling it for readability.
33///
34/// Once the file descriptor signals readability, the application should
35///
36/// 1. dispatch the queue(s) by calling [`Queue::dispatch_pending`],
37/// 2. reset the readability by calling [`QueueWatcher::reset`],
38/// 3. flush outgoing requests by calling [`Connection::flush`],
39/// 3. wait for the file descriptor to become readable again.
40///
41/// If not all events were dispatched before calling [`QueueWatcher::reset`], then the
42/// file descriptor will become readable again immediately after being reset.
43///
44/// # Example
45///
46/// ```
47/// # use std::os::fd::AsRawFd;
48/// # use mio::Interest;
49/// # use mio::unix::SourceFd;
50/// # use wl_client::Libwayland;
51/// # use wl_client::test_protocols::core::wl_display::WlDisplay;
52/// #
53/// let lib = Libwayland::open().unwrap();
54/// let con = lib.connect_to_default_display().unwrap();
55/// let queue = con.create_queue(c"queue name");
56/// let watcher = queue.create_watcher().unwrap();
57/// let token = mio::Token(0);
58/// # let _sync = queue.display::<WlDisplay>().sync();
59///
60/// let mut events = mio::Events::with_capacity(2);
61/// let mut poll = mio::Poll::new().unwrap();
62/// poll
63///     .registry()
64///     .register(&mut SourceFd(&watcher.as_raw_fd()), token, Interest::READABLE)
65///     .unwrap();
66/// // register other application file descriptors
67///
68/// loop {
69///     // flush outgoing messages before going to sleep
70///     con.flush().unwrap();
71///
72///     poll.poll(&mut events, None).unwrap();
73///     for event in events.iter() {
74///         if event.token() == token {
75///             // dispatch new events
76///             queue.dispatch_pending().unwrap();
77///             // reset the watcher
78///             watcher.reset().unwrap();
79///         }   
80///         // handle other file descriptors
81///     }
82///     events.clear();
83///     # break;
84/// }
85/// ```
86#[derive(Clone)]
87pub struct QueueWatcher {
88    data: Arc<QueueWatcherData>,
89}
90
91struct QueueWatcherData {
92    task_id: TaskId,
93    connection: Connection,
94    data: Arc<QueueWatcherShared>,
95}
96
97struct QueueWatcherShared {
98    eventfd: Eventfd,
99    has_error: AtomicBool,
100    data: Mutex<QueueWatcherMutable>,
101    cancellation: Condvar,
102}
103
104#[derive(Default)]
105struct QueueWatcherMutable {
106    wait_for_reset: bool,
107    waker: Option<Waker>,
108    last_error: Option<ErrorKind>,
109    cancelled: bool,
110}
111
112impl Connection {
113    /// Waits for events on any number of event queues.
114    ///
115    /// When this function returns `Ok(())`, at least one of the event queues has an event
116    /// queued.
117    ///
118    /// If no other thread is dispatching these queues, then this function will return
119    /// `Ok(())` as soon as this happens.
120    ///
121    /// If the list of queues is empty, this function waits indefinitely.
122    ///
123    /// This function flushes all requests made before this function call.
124    ///
125    /// # Panic
126    ///
127    /// This function panics if the queues do not all belong to this connection.
128    pub async fn wait_for_events(&self, queues: &[&BorrowedQueue]) -> io::Result<()> {
129        self.flush()?;
130        self.wait_for_events_without_flush(queues).await
131    }
132
133    pub(crate) async fn wait_for_events_without_flush(
134        &self,
135        queues: &[&BorrowedQueue],
136    ) -> io::Result<()> {
137        for queue in queues {
138            if queue.connection() != self {
139                wrong_con();
140            }
141        }
142        loop {
143            let mut lock = None;
144            if let Some((first, other)) = queues.split_first() {
145                let Some(l) = self.acquire_read_lock_async(first).await else {
146                    return Ok(());
147                };
148                lock = Some(l);
149                for queue in other {
150                    if self.queue_has_events(queue) {
151                        return Ok(());
152                    }
153                }
154            }
155            let poll_data = self.data.poller.data.clone();
156            // NOTE: We cannot hold on to the lock on this thread since that could cause
157            //       a deadlock when another task on this thread calls
158            //       wl_display_read_events. Instead we move the lock and the wait to
159            //       the executor thread which will make progress and call
160            //       wl_display_read_events as soon as there are new events, even if this
161            //       thread is blocked.
162            self.data
163                .executor
164                .execute::<io::Result<()>, _>(async move {
165                    poller::readable(&poll_data).await?;
166                    if let Some(lock) = lock {
167                        lock.read_events().await?;
168                    }
169                    Ok(())
170                })
171                .await?;
172        }
173    }
174
175    /// Creates a [`QueueWatcher`] for event-loop integration.
176    ///
177    /// See the documentation of [`QueueWatcher`] for details on when and how an
178    /// application would use this.
179    ///
180    /// # Panic
181    ///
182    /// This function panics if the queues do not all belong to this connection.
183    pub fn create_watcher(
184        &self,
185        owned: &[&Queue],
186        borrowed: impl IntoIterator<Item = BorrowedQueue>,
187    ) -> io::Result<QueueWatcher> {
188        self.create_watcher_(owned, borrowed.into_iter().collect())
189    }
190
191    /// Creates a [`QueueWatcher`] for event-loop integration.
192    ///
193    /// See the documentation of [`QueueWatcher`] for details on when and how an
194    /// application would use this.
195    ///
196    /// # Panic
197    ///
198    /// This function panics if the queues do not all belong to this connection.
199    fn create_watcher_(
200        &self,
201        owned: &[&Queue],
202        borrowed: Vec<BorrowedQueue>,
203    ) -> io::Result<QueueWatcher> {
204        for q in owned {
205            if q.connection() != self {
206                wrong_con();
207            }
208        }
209        for q in &borrowed {
210            if q.connection() != self {
211                wrong_con();
212            }
213        }
214        let shared = Arc::new(QueueWatcherShared {
215            eventfd: Eventfd::new()?,
216            has_error: Default::default(),
217            data: Default::default(),
218            cancellation: Default::default(),
219        });
220        /// This type contains the queues that are transferred into the task of a [`QueueWatcher`].
221        ///
222        /// This type exists so that [`QueueWatcherData::drop`] can block until these objects
223        /// have definitely been dropped by the executor. This is achieved as follows: Rust
224        /// guarantees that fields are dropped from top to bottom. Therefore, when the drop impl
225        /// of `_f` runs, `owned` and `borrowed` have already been dropped.
226        ///
227        /// Blocking is required for two reasons:
228        ///
229        /// 1. The user of [`QueueWatcher`] must keep the underlying queues alive while any of the
230        ///    [`BorrowedQueue`] still exist. If the drop impl did not block, the user would be
231        ///    unable to determine when the [`BorrowedQueue`] have been dropped.
232        /// 2. The queues contain clones of the [`Connection`]. The last clone of the connection
233        ///    must not be dropped in the executor thread since the executor thread would then try
234        ///    to join itself.
235        struct CancelData<F> {
236            connection: Connection,
237            shared: Arc<QueueWatcherShared>,
238            owned: Vec<Queue>,
239            borrowed: Vec<BorrowedQueue>,
240            _f: F,
241        }
242        let cancel_data = CancelData {
243            connection: self.clone(),
244            shared: shared.clone(),
245            owned: owned.iter().map(|q| (*q).clone()).collect(),
246            borrowed,
247            _f: on_drop({
248                let shared = shared.clone();
249                move || {
250                    shared.data.lock().cancelled = true;
251                    shared.cancellation.notify_all();
252                }
253            }),
254        };
255        let task_id = self.data.executor.add(async move {
256            let cancel_data = cancel_data;
257            let mut qs = vec![];
258            for q in &cancel_data.owned {
259                qs.push(&**q);
260            }
261            for q in &cancel_data.borrowed {
262                qs.push(q);
263            }
264            let res: io::Result<Infallible> = async {
265                loop {
266                    cancel_data
267                        .connection
268                        .wait_for_events_without_flush(&qs)
269                        .await?;
270                    cancel_data.shared.eventfd.bump()?;
271                    poll_fn(|ctx| {
272                        let d = &mut *cancel_data.shared.data.lock();
273                        if d.wait_for_reset {
274                            d.waker = Some(ctx.waker().clone());
275                            Poll::Pending
276                        } else {
277                            d.wait_for_reset = true;
278                            d.waker = None;
279                            Poll::Ready(())
280                        }
281                    })
282                    .await;
283                }
284            }
285            .await;
286            let e = res.unwrap_err();
287            cancel_data.shared.data.lock().last_error = Some(e.kind());
288            cancel_data.shared.has_error.store(true, Relaxed);
289        });
290        let data = Arc::new(QueueWatcherData {
291            task_id,
292            connection: self.clone(),
293            data: shared,
294        });
295        Ok(QueueWatcher { data })
296    }
297}
298
299impl QueueWatcher {
300    /// Resets the file descriptor readability.
301    ///
302    /// The file descriptor will become readable again when there are events to be
303    /// dispatched.
304    pub fn reset(&self) -> io::Result<()> {
305        let data = &*self.data.data;
306        if data.has_error.load(Relaxed) {
307            if let Some(e) = data.data.lock().last_error {
308                return Err(e.into());
309            }
310        }
311        data.eventfd.clear()?;
312        let d = &mut *data.data.lock();
313        if let Some(e) = d.last_error {
314            let _ = data.eventfd.bump();
315            return Err(e.into());
316        }
317        d.wait_for_reset = false;
318        if let Some(waker) = d.waker.take() {
319            waker.wake()
320        }
321        Ok(())
322    }
323}
324
325impl Drop for QueueWatcherData {
326    fn drop(&mut self) {
327        self.connection.data.executor.cancel(self.task_id);
328        let mut lock = self.data.data.lock();
329        while !lock.cancelled {
330            self.data.cancellation.wait(&mut lock);
331        }
332    }
333}
334
335impl AsFd for QueueWatcher {
336    fn as_fd(&self) -> BorrowedFd<'_> {
337        self.data.data.eventfd.as_fd()
338    }
339}
340
341impl AsRawFd for QueueWatcher {
342    fn as_raw_fd(&self) -> RawFd {
343        self.as_fd().as_raw_fd()
344    }
345}
346
347impl AsRawFd for &'_ QueueWatcher {
348    fn as_raw_fd(&self) -> RawFd {
349        self.as_fd().as_raw_fd()
350    }
351}
352
353#[cold]
354fn wrong_con() -> ! {
355    panic!("queue does not belong to this connection");
356}