wl_client/
queue.rs

1use {
2    crate::{
3        Libwayland, QueueWatcher,
4        connection::Connection,
5        ffi::{interface_compatible, wl_event_queue, wl_proxy},
6        protocols::wayland::{
7            wl_callback::{WlCallbackEventHandler, WlCallbackRef},
8            wl_display::WlDisplay,
9        },
10        proxy::{
11            self, BorrowedProxy, OwnedProxy,
12            low_level::{
13                OwnedProxyRegistry, ProxyDataDestruction, UntypedOwnedProxy,
14                check_dispatching_proxy, check_new_proxy, owned::DISPATCH_PANIC,
15            },
16        },
17        utils::{
18            block_on::block_on,
19            on_drop::on_drop,
20            reentrant_mutex::{ReentrantMutex, ReentrantMutexGuard},
21            sync_ptr::SyncNonNull,
22        },
23    },
24    parking_lot::Mutex,
25    std::{
26        cell::{Cell, RefCell},
27        ffi::{CStr, CString},
28        fmt::{Debug, Formatter},
29        future::poll_fn,
30        io, mem,
31        ops::Deref,
32        panic::resume_unwind,
33        pin::pin,
34        ptr::NonNull,
35        sync::Arc,
36        task::{Poll, Waker},
37        thread::panicking,
38    },
39};
40
41#[cfg(test)]
42mod tests;
43
44/// The owner of an event queue.
45///
46/// This is a thin wrapper around [`Queue`] and implements `Deref<Target = Queue>`.
47///
48/// The purpose of this type is to manage the lifetimes of proxies attached to queues.
49/// This is described in more detail in the documentation of [`Queue`].
50pub struct QueueOwner {
51    queue: Queue,
52}
53
54#[expect(clippy::doc_overindented_list_items)]
55/// An event queue.
56///
57/// An event queue stores events that have been sent by the compositor but whose callbacks
58/// have not yet been invoked. Invoking these callbacks is called _dispatching_ the queue.
59///
60/// A connection can have many queues that can be dispatched in parallel, but the events
61/// in a single queue are always dispatched in series. That is, the callback for the next
62/// event will not start executing until after the callback for the previous event.
63///
64/// New queues are created by calling [`Connection::create_local_queue`] or
65/// [`Connection::create_queue`].
66///
67/// # Proxies attached to a queue
68///
69/// Each proxy is attached to a queue. This queue is determined as follows:
70///
71/// - When [`Queue::display`] is called, the returned proxy is attached to that queue.
72/// - When [`Queue::wrap_proxy`] is called, the returned proxy is attached to that queue.
73/// - When a constructor request is called on an owned proxy `P`, e.g. `WlDisplay::sync`,
74///   the returned proxy is attached to the same queue as `P`.
75/// - When a constructor request is called on a borrowed proxy, e.g. `WlDisplayRef::sync`,
76///   the request takes a queue argument and the returned proxy is attached to that queue.
77///
78/// See the documentation of the [`proxy`] module for more information about proxies.
79///
80/// # Queue ownership
81///
82/// Each queue is owned by a [`QueueOwner`] which is returned when you call
83/// [`Connection::create_local_queue`] or [`Connection::create_queue`].
84///
85/// When a [`QueueOwner`] is dropped, it will automatically destroy _some_ of the proxies
86/// attached to the queue. This ensures that, when the [`QueueOwner`] is dropped, all
87/// reference cycles between proxies and their event handlers are broken and no memory is
88/// leaked. (Normally these cycles are broken when a destructor request is sent or the
89/// proxy is destroyed with [`proxy::destroy`], but for long-lived proxies, such as
90/// `wl_registry`, it is more convenient for this to happen automatically.)
91///
92/// A queue and the attached proxies should no longer be used after its [`QueueOwner`] has
93/// been dropped. Doing so might lead to panics or memory leaks.
94///
95/// To ensure that the [`QueueOwner`] is eventually dropped, the queue owner should never
96/// be reachable from an event handler.
97///
98/// ```
99/// # use wl_client::Libwayland;
100/// # use wl_client::Queue;
101/// # use wl_client::Connection;
102/// # use wl_client::QueueOwner;
103/// #
104/// fn main() {
105///     let lib = Libwayland::open().unwrap();
106///     let con = lib.connect_to_default_display().unwrap();
107///     let queue: QueueOwner = con.create_queue(c"queue name");
108///
109///     // the queue owner is stored on the stack and will be dropped when this function
110///     // returns
111///     application_logic(&queue);
112/// }
113/// #
114/// # fn application_logic(_queue: &Queue) {
115/// # }
116/// ```
117///
118/// # Local and non-local queues
119///
120/// Each queue is either local or non-local. Local queues are created with
121/// [`Connection::create_local_queue`]. Non-local queues are created with
122/// [`Connection::create_queue`].
123///
124/// Local queues have the following advantage:
125///
126/// - Event handlers attached to local queues do not have to implement [`Send`].
127///
128/// This allows such events handlers to contain `Rc<T>` where the same event handler on
129/// a non-local queue would have to use `Arc<T>`.
130///
131/// To ensure that these event handlers are not accessed or dropped on different threads,
132/// this advantage comes with the following restrictions:
133///
134/// - If a local queue was created on a thread `X`:
135///   - Event handlers must be attached on `X`.
136///   - The queue must only be dispatched on `X`.
137///   - The [`QueueOwner`] owning the queue must be dropped on `X`.
138///   - Proxies attached to the queue must only be destroyed on `X`.
139///     - Note: Proxies are destroyed when [`proxy::destroy`] is called *or* when a
140///       destructor request is sent *or* implicitly when the last reference to the proxy is
141///       dropped.
142///
143/// These restrictions are checked at runtime and will lead to panics if violated.
144///
145/// ```
146/// # use std::cell::Cell;
147/// # use std::rc::Rc;
148/// # use wl_client::{proxy, Libwayland};
149/// # use wl_client::test_protocols::core::wl_callback::{WlCallback, WlCallbackEventHandler, WlCallbackRef};
150/// # use wl_client::test_protocols::core::wl_display::WlDisplay;
151/// #
152/// let lib = Libwayland::open().unwrap();
153/// let con = lib.connect_to_default_display().unwrap();
154///
155/// // Create a local queue.
156/// let queue = con.create_local_queue(c"queue name");
157/// let display: WlDisplay = queue.display();
158///
159/// // Create an `Rc` and use it as an event handler.
160/// let done = Rc::new(Cell::new(false));
161/// let done2 = done.clone();
162/// let sync = display.sync();
163/// proxy::set_event_handler_local(&sync, WlCallback::on_done(move |_, _| done2.set(true)));
164///
165/// // Calling this function from another thread would panic.
166/// queue.dispatch_roundtrip_blocking().unwrap();
167///
168/// assert!(done.get());
169/// ```
170///
171/// # Locking
172///
173/// Each queue contains a re-entrant mutex. Re-entrant means that a thread that already
174/// holds the lock can acquire it again.
175///
176/// This lock can be acquired explicitly by calling [`Queue::lock_dispatch`]. You might
177/// want to use this function if you're dispatching the queue from multiple threads to
178/// avoid lock inversion. For example, consider the following situation:
179///
180/// - Thread 1: Acquires the lock of some shared state.
181/// - Thread 2: Starts dispatching and acquires the queue's re-entrant lock.
182/// - Thread 2: Calls an event handler which tries to lock the shared state and blocks.
183/// - Thread 1: Destroys a proxy which will implicitly acquire the queue's re-entrant lock.
184///             This deadlocks.
185///
186/// Instead, thread 1 could call [`Queue::lock_dispatch`] before locking the shared state
187/// to prevent the lock inversion.
188///
189/// This deadlock concern does not apply if the queue is only ever used from a single
190/// thread or if the queue is a local queue. If you need to poll the wayland socket on a
191/// separate thread, you can use [`BorrowedQueue::wait_for_events`] and send a message to
192/// the main thread when the future completes.
193///
194/// The lock is acquired implicitly in the following situations:
195///
196/// - The lock is held for the entirety of the following function calls:
197///   - [`Queue::dispatch_pending`]
198/// - The lock is sometimes held during the following function calls but not while they
199///   are waiting for new events:
200///   - [`Queue::dispatch_blocking`]
201///   - [`Queue::dispatch_roundtrip_blocking`]
202/// - The lock is sometimes held while the futures produced by the following functions
203///   are being polled but not while they are waiting for new events:
204///   - [`Queue::dispatch_async`]
205///   - [`Queue::dispatch_roundtrip_async`]
206/// - The lock is held at the start and the end of the following function calls but not
207///   while invoking the callback:
208///   - [`Queue::dispatch_scope_blocking`]
209/// - The lock is sometimes held while the futures produced by the following functions
210///   are being polled but not while polling the future passed as an argument:
211///   - [`Queue::dispatch_scope_async`]
212/// - The lock is held while attaching an event handler to a proxy.
213/// - The lock is held when a proxy with an event handler is being destroyed:
214///   - When sending a destructor request.
215///   - When calling [`proxy::destroy`].
216///   - Implicitly when dropping the last reference to a proxy.
217/// - The lock is held while dropping the [`QueueOwner`] of the queue.
218#[derive(Clone)]
219pub struct Queue {
220    queue_data: Arc<QueueData>,
221}
222
223/// A borrowed event queue.
224///
225/// This type is a thin wrapper around a `wl_event_queue` pointer. If the pointer is a
226/// null pointer, this object refers to the default libwayland queue.
227///
228/// [`Queue`] implements `Deref<Target = BorrowedQueue>`.
229///
230/// This type can be used to safely interact with foreign queues. It guarantees that the
231/// contained pointer is either null or valid. This type can be passed into
232/// [`Connection::wait_for_events`] to wait for events to arrive on multiple queues
233/// in a race-free way.
234///
235/// You can construct a [`BorrowedQueue`] by calling
236///
237/// - [`Connection::borrow_default_queue`] or
238/// - [`Connection::borrow_foreign_queue`].
239///
240/// # Example
241///
242/// ```
243/// # use wl_client::Libwayland;
244/// # use wl_client::test_protocols::core::wl_display::WlDisplay;
245/// #
246/// # tokio_test::block_on(async {
247/// let lib = Libwayland::open().unwrap();
248/// let con = lib.connect_to_default_display().unwrap();
249/// let queue1 = con.create_queue(c"queue name");
250/// let queue2 = con.create_queue(c"another queue");
251/// let default_queue = con.borrow_default_queue();
252/// # // ensure that some event arrives for this test
253/// # let sync = queue2.display::<WlDisplay>().sync();
254///
255/// con.wait_for_events(&[&queue1, &queue2, &default_queue]).await.unwrap();
256/// # });
257/// ```
258pub struct BorrowedQueue {
259    /// A reference to the connection that this queue belongs to. This also ensures
260    /// that the connection outlives the queue.
261    connection: Connection,
262    queue: Option<SyncNonNull<wl_event_queue>>,
263}
264
265/// A lock that prevents concurrent dispatching of a queue.
266///
267/// This lock can be acquired by calling [`Queue::lock_dispatch`].
268///
269/// See the description of [`Queue`] for why you might use this.
270///
271/// # Example
272///
273/// ```
274/// # use std::thread;
275/// # use wl_client::Libwayland;
276/// #
277/// let lib = Libwayland::open().unwrap();
278/// let con = lib.connect_to_default_display().unwrap();
279/// let queue = con.create_queue(c"queue name");
280///
281/// let lock = queue.lock_dispatch();
282///
283/// let thread = {
284///     let queue = queue.clone();
285///     thread::spawn(move || {
286///         // this dispatch will not start until the lock is dropped.
287///         queue.dispatch_roundtrip_blocking().unwrap();
288///     })
289/// };
290///
291/// // this dispatch starts immediately since the lock is re-entrant
292/// queue.dispatch_roundtrip_blocking().unwrap();
293///
294/// drop(lock);
295/// thread.join().unwrap();
296/// ```
297pub struct DispatchLock<'a> {
298    _lock: ReentrantMutexGuard<'a, DispatchData>,
299}
300
301struct QueueData {
302    /// A reference to the Libwayland singleton.
303    libwayland: &'static Libwayland,
304    /// The borrowed version of this queue for `Deref`.
305    borrowed: BorrowedQueue,
306    /// The name of the queue.
307    name: CString,
308    /// The native queue. This is always a valid pointer.
309    queue: SyncNonNull<wl_event_queue>,
310    /// This mutex protects
311    /// - the unsynchronized fields of any proxies attached to the queue
312    /// - the fields is_dispatching and to_destroy below
313    mutex: ReentrantMutex<DispatchData>,
314    /// The registry for proxies that need manual destruction when the connection is
315    /// dropped.
316    owned_proxy_registry: OwnedProxyRegistry,
317}
318
319#[derive(Default)]
320struct DispatchData {
321    /// Contains whether the queue is currently dispatching. Note that, since
322    /// dispatching is protected by the mutex, this field is only ever accessed by a
323    /// single thread at a time.
324    is_dispatching: Cell<bool>,
325    /// If we are dispatching, this vector might contain work to run after the dispatch.
326    /// Protected against access from multiple threads by the mutex. These destructions
327    /// have the following invariant:
328    ///
329    /// - It must be safe to run the destruction once the queue is idle.
330    to_destroy_on_idle: RefCell<Vec<ProxyDataDestruction>>,
331}
332
333impl Deref for QueueOwner {
334    type Target = Queue;
335
336    fn deref(&self) -> &Self::Target {
337        &self.queue
338    }
339}
340
341impl Queue {
342    /// Returns a reference to the [`Libwayland`] singleton.
343    pub fn libwayland(&self) -> &'static Libwayland {
344        self.queue_data.libwayland
345    }
346
347    /// Returns the connection that this queue belongs to.
348    ///
349    /// # Example
350    ///
351    /// ```
352    /// # use wl_client::Libwayland;
353    /// #
354    /// let lib = Libwayland::open().unwrap();
355    /// let con = lib.connect_to_default_display().unwrap();
356    /// let queue = con.create_queue(c"queue name");
357    /// assert_eq!(queue.connection(), &con);
358    /// ```
359    pub fn connection(&self) -> &Connection {
360        &self.queue_data.borrowed.connection
361    }
362
363    /// Acquires the queue's re-entrant lock.
364    ///
365    /// See the description of [`Queue`] for why you might use this.
366    ///
367    /// # Example
368    ///
369    /// ```
370    /// # use std::thread;
371    /// # use wl_client::Libwayland;
372    /// #
373    /// let lib = Libwayland::open().unwrap();
374    /// let con = lib.connect_to_default_display().unwrap();
375    /// let queue = con.create_queue(c"queue name");
376    ///
377    /// let lock = queue.lock_dispatch();
378    ///
379    /// let thread = {
380    ///     let queue = queue.clone();
381    ///     thread::spawn(move || {
382    ///         // this dispatch will not start until the lock is dropped.
383    ///         queue.dispatch_roundtrip_blocking().unwrap();
384    ///     })
385    /// };
386    ///
387    /// // this dispatch starts immediately since the lock is re-entrant
388    /// queue.dispatch_roundtrip_blocking().unwrap();
389    ///
390    /// drop(lock);
391    /// thread.join().unwrap();
392    /// ```
393    pub fn lock_dispatch(&self) -> DispatchLock<'_> {
394        DispatchLock {
395            _lock: self.queue_data.mutex.lock(),
396        }
397    }
398
399    /// Creates a wrapper proxy around the singleton `wl_display` object.
400    ///
401    /// The proxy is a wrapper and no event handler can be attached to it.
402    ///
403    /// The proxy is attached to this queue.
404    ///
405    /// # Panic
406    ///
407    /// Panics if the interface of `T` is not compatible with the `wl_display` interface.
408    ///
409    /// # Example
410    ///
411    /// ```
412    /// # use wl_client::{proxy, Libwayland};
413    /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
414    /// #
415    /// let lib = Libwayland::open().unwrap();
416    /// let con = lib.connect_to_default_display().unwrap();
417    /// let queue = con.create_queue(c"queue name");
418    /// let display: WlDisplay = queue.display();
419    /// assert_eq!(proxy::queue(&display), &*queue);
420    /// assert_eq!(proxy::id(&*display), 1);
421    /// ```
422    pub fn display<T>(&self) -> T
423    where
424        T: OwnedProxy,
425    {
426        // SAFETY: OwnedProxy::WL_DISPLAY is always a valid interface.
427        let compatible = unsafe { interface_compatible(WlDisplay::WL_INTERFACE, T::WL_INTERFACE) };
428        if !compatible {
429            panic!("T::WL_INTERFACE is not compatible with wl_display");
430        }
431        // SAFETY: - wl_display always returns a valid object
432        //         - we've just verified that T has a compatible interface
433        unsafe { self.wrap_wl_proxy(self.connection().wl_display().cast()) }
434    }
435
436    /// Returns whether this is a local queue.
437    ///
438    /// The documentation of the [`Queue`] type explains the difference between local and
439    /// non-local queues.
440    ///
441    /// # Example
442    ///
443    /// ```
444    /// # use wl_client::Libwayland;
445    /// #
446    /// let lib = Libwayland::open().unwrap();
447    /// let con = lib.connect_to_default_display().unwrap();
448    /// let queue1 = con.create_queue(c"non-local queue");
449    /// let queue2 = con.create_local_queue(c"local queue");
450    ///
451    /// assert!(queue1.is_non_local());
452    /// assert!(queue2.is_local());
453    /// ```
454    pub fn is_local(&self) -> bool {
455        self.queue_data.mutex.is_thread_local()
456    }
457
458    /// Returns whether this is *not* a local queue.
459    ///
460    /// This is the same as `!self.is_local()`.
461    ///
462    /// The documentation of the [`Queue`] type explains the difference between local and
463    /// non-local queues.
464    pub fn is_non_local(&self) -> bool {
465        !self.is_local()
466    }
467
468    /// Returns the `wl_event_queue` pointer of this queue.
469    ///
470    /// The returned pointer is valid for as long as this queue exists.
471    ///
472    /// You must not dispatch the queue except through the [`Queue`] interface.
473    /// Otherwise the behavior is undefined.
474    pub fn wl_event_queue(&self) -> NonNull<wl_event_queue> {
475        self.queue_data.queue.0
476    }
477
478    /// Returns the proxy registry of this queue.
479    ///
480    /// Proxies attached to this registry will be destroyed when the [`QueueOwner`] is
481    /// dropped.
482    pub(crate) fn owned_proxy_registry(&self) -> &OwnedProxyRegistry {
483        &self.queue_data.owned_proxy_registry
484    }
485
486    /// Runs the closure while holding the reentrant queue mutex.
487    pub(crate) fn run_locked<T>(&self, f: impl FnOnce() -> T) -> T {
488        self.run_locked_(|_| f())
489    }
490
491    /// Runs the closure while holding the reentrant queue mutex.
492    fn run_locked_<T>(&self, f: impl FnOnce(&DispatchData) -> T) -> T {
493        let lock = self.queue_data.mutex.lock();
494        f(&lock)
495    }
496
497    /// Runs the closure while holding the reentrant queue mutex. The is_dispatching
498    /// field will be set to true for the duration of the callback.
499    fn with_dispatch<T>(&self, f: impl FnOnce() -> T) -> T {
500        self.run_locked_(|dd| {
501            let is_dispatching = dd.is_dispatching.get();
502            dd.is_dispatching.set(true);
503            let ret = f();
504            dd.is_dispatching.set(is_dispatching);
505            if !is_dispatching {
506                let mut to_destroy = dd.to_destroy_on_idle.borrow_mut();
507                if to_destroy.len() > 0 {
508                    let mut todo = mem::take(&mut *to_destroy);
509                    drop(to_destroy);
510                    for dd in todo.drain(..) {
511                        // SAFETY: - For a thread to dispatch, it must
512                        //           1. hold the queue lock
513                        //           2. set is_dispatching
514                        //         - The queue lock is not dropped before the dispatch finishes
515                        //           and is_dispatching is not reset before then.
516                        //         - Since we're holding the queue lock and is_dispatching is false,
517                        //           we know that no dispatches are currently running.
518                        //         - If a future dispatch starts in this thread, it will happen
519                        //           after this line of code.
520                        //         - If a future dispatch starts on another thread, it will have
521                        //           to acquire the queue lock and will therefore happen after we
522                        //           release the lock below.
523                        //         - Therefore this queue is idle at this point.
524                        //         - to_destroy_on_idle contains only destructions that are safe
525                        //           to run while idle.
526                        unsafe {
527                            dd.run();
528                        }
529                    }
530                    let mut to_destroy = dd.to_destroy_on_idle.borrow_mut();
531                    mem::swap(&mut todo, &mut *to_destroy);
532                }
533            }
534            ret
535        })
536    }
537
538    /// Blocks the current thread until at least one event has been dispatched.
539    ///
540    /// If you are in an async context, then you might want to use
541    /// [`Queue::dispatch_async`] instead.
542    ///
543    /// This function should not be used when integrating with an existing, poll-based
544    /// event loop, as it might block indefinitely. Use [`Connection::create_watcher`] and
545    /// [`Queue::dispatch_pending`] instead.
546    ///
547    /// The returned number is the number of events that have been dispatched by this
548    /// call. The number can be zero if another thread dispatched the events before us.
549    ///
550    /// # Panic
551    ///
552    /// Panics if this is a [local queue](Connection::create_local_queue) and the current
553    /// thread is not the thread that this queue was created in.
554    ///
555    /// # Example
556    ///
557    /// ```
558    /// # use wl_client::Libwayland;
559    /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
560    /// #
561    /// let lib = Libwayland::open().unwrap();
562    /// let con = lib.connect_to_default_display().unwrap();
563    /// let queue = con.create_queue(c"queue name");
564    ///
565    /// // For this example, ensure that the compositor sends an event in the near future.
566    /// let _sync = queue.display::<WlDisplay>().sync();
567    ///
568    /// queue.dispatch_blocking().unwrap();
569    /// ```
570    pub fn dispatch_blocking(&self) -> io::Result<u64> {
571        block_on(self.dispatch_async())
572    }
573
574    /// Completes when at least one event has been dispatched.
575    ///
576    /// This function is the same as [`Queue::dispatch_blocking`] except that it is async and does
577    /// not block the current thread.
578    ///
579    /// # Panic
580    ///
581    /// Panics if this is a [local queue](Connection::create_local_queue) and the thread
582    /// polling the future is not the thread that this queue was created in.
583    ///
584    /// # Example
585    ///
586    /// ```
587    /// # use wl_client::Libwayland;
588    /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
589    /// #
590    /// # tokio_test::block_on(async {
591    /// let lib = Libwayland::open().unwrap();
592    /// let con = lib.connect_to_default_display().unwrap();
593    /// let queue = con.create_queue(c"queue name");
594    ///
595    /// // For this example, ensure that the compositor sends an event in the near future.
596    /// let _sync = queue.display::<WlDisplay>().sync();
597    ///
598    /// queue.dispatch_async().await.unwrap();
599    /// # });
600    /// ```
601    pub async fn dispatch_async(&self) -> io::Result<u64> {
602        self.connection.wait_for_events(&[self]).await?;
603        self.dispatch_pending()
604    }
605
606    /// Dispatches enqueued events.
607    ///
608    /// This function does not read new events from the file descriptor.
609    ///
610    /// This function can be used together with [`BorrowedQueue::wait_for_events`] or
611    /// [`Queue::create_watcher`] to dispatch the queue in an async context or event loop
612    /// respectively.
613    ///
614    /// The returned number is the number of events that were dispatched.
615    ///
616    /// # Panic
617    ///
618    /// Panics if this is a [local queue](Connection::create_local_queue) and the current
619    /// thread is not the thread that this queue was created in.
620    ///
621    /// # Example
622    ///
623    /// ```
624    /// # use std::sync::Arc;
625    /// # use std::sync::atomic::AtomicBool;
626    /// # use std::sync::atomic::Ordering::Relaxed;
627    /// # use wl_client::{proxy, Libwayland};
628    /// # use wl_client::test_protocol_helpers::callback;
629    /// # use wl_client::test_protocols::core::wl_callback::{WlCallback, WlCallbackEventHandler, WlCallbackRef};
630    /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
631    /// #
632    /// # tokio_test::block_on(async {
633    /// let lib = Libwayland::open().unwrap();
634    /// let con = lib.connect_to_default_display().unwrap();
635    /// let queue = con.create_queue(c"queue name");
636    /// let display: WlDisplay = queue.display();
637    ///
638    /// let done = Arc::new(AtomicBool::new(false));
639    /// let done2 = done.clone();
640    /// let sync = display.sync();
641    /// proxy::set_event_handler(&sync, WlCallback::on_done(move |_, _| {
642    ///     done2.store(true, Relaxed);
643    /// }));
644    ///
645    /// while !done.load(Relaxed) {
646    ///     queue.wait_for_events().await.unwrap();
647    ///     // Dispatch the events.
648    ///     queue.dispatch_pending().unwrap();
649    /// }
650    /// # });
651    /// ```
652    pub fn dispatch_pending(&self) -> io::Result<u64> {
653        let d = &*self.queue_data;
654        let _resume_unwind = on_drop(|| {
655            if let Some(err) = DISPATCH_PANIC.take() {
656                if !panicking() {
657                    resume_unwind(err);
658                }
659            }
660        });
661        let res = self.with_dispatch(|| {
662            // SAFETY: - by the invariants, the display and queue are valid
663            //         - the queue was created from the display
664            //         - we're inside with_dispatch which means that we're holding the
665            //           reentrant queue mutex. by the invariants, this mutex protects
666            //           the unsynchronized fields of the proxies.
667            unsafe {
668                d.libwayland.wl_display_dispatch_queue_pending(
669                    d.borrowed.connection.wl_display().as_ptr(),
670                    d.queue.as_ptr(),
671                )
672            }
673        });
674        if res == -1 {
675            return Err(io::Error::last_os_error());
676        }
677        assert!(res >= 0);
678        Ok(res as u64)
679    }
680
681    /// Blocks the current thread until the compositor has processed all previous requests
682    /// and all of its response events have been dispatched.
683    ///
684    /// If you are in an async context, then you might want to use
685    /// [`Queue::dispatch_roundtrip_async`] instead.
686    ///
687    /// Since this function usually returns quickly, you might use this function even
688    /// when integrating a wayland connection into an existing event loop and even in an
689    /// async context. For example, a library that creates buffers might use this function
690    /// during initialization to receive the full list of supported formats before
691    /// returning.
692    ///
693    /// If this function returns `Ok(())`, then the function returns after (in the sense
694    /// of the C++ memory model) the event handlers of all previous events have been
695    /// invoked.
696    ///
697    /// # Panic
698    ///
699    /// Panics if this is a [local queue](Connection::create_local_queue) and the current
700    /// thread is not the thread that this queue was created in.
701    ///
702    /// # Example
703    ///
704    /// ```
705    /// # use std::sync::Arc;
706    /// # use std::sync::atomic::AtomicBool;
707    /// # use std::sync::atomic::Ordering::Relaxed;
708    /// # use wl_client::{proxy, Libwayland};
709    /// # use wl_client::test_protocols::core::wl_callback::{WlCallback, WlCallbackEventHandler, WlCallbackRef};
710    /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
711    /// #
712    /// let lib = Libwayland::open().unwrap();
713    /// let con = lib.connect_to_default_display().unwrap();
714    /// let queue = con.create_queue(c"");
715    /// let display: WlDisplay = queue.display();
716    ///
717    /// // send some messages to the compositor
718    /// let done = Arc::new(AtomicBool::new(false));
719    /// let done2 = done.clone();
720    /// let sync = display.sync();
721    /// proxy::set_event_handler(&sync, WlCallback::on_done(move |_, _| {
722    ///     done2.store(true, Relaxed);
723    /// }));
724    ///
725    /// // perform a roundtrip
726    /// queue.dispatch_roundtrip_blocking().unwrap();
727    ///
728    /// // assert that we've received the response
729    /// assert!(done.load(Relaxed));
730    /// ```
731    pub fn dispatch_roundtrip_blocking(&self) -> io::Result<()> {
732        block_on(self.dispatch_roundtrip_async())
733    }
734
735    /// Completes when the compositor has processed all previous requests and all of its
736    /// response events have been dispatched.
737    ///
738    /// This function is the same as [`Queue::dispatch_roundtrip_blocking`] except that it is async and does
739    /// not block the current thread.
740    ///
741    /// If the future completes with `Ok(())`, then the future completes after (in the
742    /// sense of the C++ memory model) the event handlers of all previous events have been
743    /// invoked.
744    ///
745    /// # Panic
746    ///
747    /// Panics if this is a [local queue](Connection::create_local_queue) and the thread
748    /// polling the future is not the thread that this queue was created in.
749    ///
750    /// # Example
751    ///
752    /// ```
753    /// # use std::sync::Arc;
754    /// # use std::sync::atomic::AtomicBool;
755    /// # use std::sync::atomic::Ordering::Relaxed;
756    /// # use wl_client::{proxy, Libwayland};
757    /// # use wl_client::test_protocols::core::wl_callback::{WlCallback, WlCallbackEventHandler, WlCallbackRef};
758    /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
759    /// #
760    /// # tokio_test::block_on(async {
761    /// let lib = Libwayland::open().unwrap();
762    /// let con = lib.connect_to_default_display().unwrap();
763    /// let queue = con.create_queue(c"queue name");
764    /// let display: WlDisplay = queue.display();
765    ///
766    /// // send some messages to the compositor
767    /// let done = Arc::new(AtomicBool::new(false));
768    /// let done2 = done.clone();
769    /// let sync = display.sync();
770    /// proxy::set_event_handler(&sync, WlCallback::on_done(move |_, _| {
771    ///     done2.store(true, Relaxed);
772    /// }));
773    ///
774    /// // perform a roundtrip
775    /// queue.dispatch_roundtrip_async().await.unwrap();
776    ///
777    /// // assert that we've received the response
778    /// assert!(done.load(Relaxed));
779    /// # });
780    /// ```
781    pub async fn dispatch_roundtrip_async(&self) -> io::Result<()> {
782        #[derive(Default)]
783        struct State {
784            ready: bool,
785            waker: Option<Waker>,
786        }
787
788        struct RoundtripEventHandler(Arc<Mutex<State>>);
789        impl WlCallbackEventHandler for RoundtripEventHandler {
790            fn done(&self, _slf: &WlCallbackRef, _callback_data: u32) {
791                let waker = {
792                    let state = &mut *self.0.lock();
793                    state.ready = true;
794                    state.waker.take()
795                };
796                if let Some(waker) = waker {
797                    waker.wake();
798                }
799            }
800        }
801
802        let state = Arc::new(Mutex::new(State::default()));
803
804        let _sync = self.run_locked(|| {
805            let sync = self.display::<WlDisplay>().sync();
806            proxy::set_event_handler(&sync, RoundtripEventHandler(state.clone()));
807            sync
808        });
809
810        // NOTE: A simple 1) check flag 2) wait for events loop would be incorrect here
811        //       since another thread could dispatch the queue between these two steps,
812        //       leaving us blocked even though the flag has already been set. Therefore
813        //       we have to make sure that this task gets woken up whenever the flag is
814        //       set.
815
816        self.connection.flush()?;
817        let queues = [&**self];
818        loop {
819            let fut = self.connection.wait_for_events_without_flush(&queues);
820            let mut fut = pin!(fut);
821            let ready = poll_fn(|ctx| {
822                let mut s = state.lock();
823                if s.ready {
824                    return Poll::Ready(Ok(true));
825                }
826                if let Poll::Ready(res) = fut.as_mut().poll(ctx) {
827                    return Poll::Ready(res.map(|_| false));
828                }
829                s.waker = Some(ctx.waker().clone());
830                Poll::Pending
831            })
832            .await?;
833            if ready {
834                return Ok(());
835            }
836            self.dispatch_pending()?;
837        }
838    }
839
840    /// Creates a wrapper for an existing proxy.
841    ///
842    /// The wrapper will be assigned to this queue. No event handler can be assigned to
843    /// the wrapper.
844    ///
845    /// # Panic
846    ///
847    /// - Panics if the proxy and this queue don't belong to the same `wl_display`.
848    /// - Panics if the proxy is already destroyed.
849    ///
850    /// # Example
851    ///
852    /// ```
853    /// # use wl_client::{proxy, Libwayland};
854    /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
855    /// #
856    /// let lib = Libwayland::open().unwrap();
857    /// let con = lib.connect_to_default_display().unwrap();
858    ///
859    /// let queue1 = con.create_queue(c"queue name");
860    /// let display1: WlDisplay = queue1.display();
861    /// assert_eq!(proxy::queue(&display1), &*queue1);
862    ///
863    /// let queue2 = con.create_queue(c"second queue");
864    /// let display2 = queue2.wrap_proxy(&*display1);
865    /// assert_eq!(proxy::queue(&display2), &*queue2);
866    /// ```
867    pub fn wrap_proxy<P>(&self, proxy: &P) -> P::Owned
868    where
869        P: BorrowedProxy,
870    {
871        let lock = proxy::get_ref(proxy).lock();
872        let proxy = check_dispatching_proxy(lock.wl_proxy());
873        // SAFETY: - We've verified that the proxy is not null. UntypedBorrowedProxy
874        //           requires that the pointer is valid in this case.
875        //         - The interface of the proxy is compatible with P::Owned::WL_INTERFACE.
876        unsafe { self.wrap_wl_proxy(proxy) }
877    }
878
879    /// Creates a wrapper for an existing `wl_proxy`.
880    ///
881    /// The wrapper will be assigned to this queue. No event handler can be assigned to
882    /// the wrapper.
883    ///
884    /// If the `wl_proxy` already has a safe wrapper, the [`Queue::wrap_proxy`] function
885    /// can be used instead.
886    ///
887    /// # Panic
888    ///
889    /// - Panics if the proxy and this queue don't belong to the same `wl_display`.
890    ///
891    /// # Safety
892    ///
893    /// - `proxy` must be a valid pointer.
894    /// - `proxy` must have an interface compatible with `P::WL_INTERFACE`.
895    ///
896    /// # Example
897    ///
898    /// Some frameworks, e.g. winit, expose libwayland `wl_display` and `wl_surface`
899    /// pointers. These can be imported into this crate as follows:
900    ///
901    /// ```
902    /// # use std::ffi::c_void;
903    /// # use std::ptr::NonNull;
904    /// # use wl_client::{Libwayland, Queue};
905    /// # use wl_client::test_protocols::core::wl_surface::WlSurface;
906    /// #
907    /// unsafe fn wrap_foreign_surface(display: NonNull<c_void>, wl_surface: NonNull<c_void>) {
908    ///     let lib = Libwayland::open().unwrap();
909    ///     // SAFETY: ...
910    ///     let con = unsafe { lib.wrap_borrowed_pointer(display.cast()).unwrap() };
911    ///     let queue = con.create_queue(c"queue name");
912    ///     // SAFETY: ...
913    ///     let surface: WlSurface = unsafe { queue.wrap_wl_proxy(wl_surface.cast()) };
914    /// }
915    /// ```
916    pub unsafe fn wrap_wl_proxy<P>(&self, proxy: NonNull<wl_proxy>) -> P
917    where
918        P: OwnedProxy,
919    {
920        let d = &*self.queue_data;
921        // SAFETY: - It's a requirement of this function that the proxy is a valid pointer.
922        let display = unsafe { d.libwayland.wl_proxy_get_display(proxy.as_ptr()) };
923        assert_eq!(display, d.borrowed.connection.wl_display().as_ptr());
924        // SAFETY: - It's a requirement of this function that the proxy is a valid pointer.
925        let wrapper: *mut wl_proxy = unsafe {
926            d.libwayland
927                .wl_proxy_create_wrapper(proxy.as_ptr().cast())
928                .cast()
929        };
930        let wrapper = check_new_proxy(wrapper);
931        // SAFETY: - we just created wrapper so it is valid
932        //         - queue is valid by the invariants
933        //         - the UntypedOwnedProxy created below will hold a reference to this queue
934        //         - the wrapper belongs to the same display as the proxy, and we've
935        //           verified above that the proxy has the same display as this queue
936        unsafe {
937            d.libwayland
938                .wl_proxy_set_queue(wrapper.as_ptr(), d.queue.as_ptr());
939        }
940        // SAFETY: - we just created wrapper so it is valid
941        //         - wrapper is a wrapper so it doesn't have an event handler
942        //         - we have ownership of wrapper and hand it over
943        //         - we just assigned self as the queue of wrapper
944        //         - the interface is none since this is a wrapper
945        let wrapper = unsafe { UntypedOwnedProxy::from_wrapper_wl_proxy(self, wrapper) };
946        // SAFETY: - the requirement is forwarded to the caller
947        unsafe { proxy::low_level::from_untyped_owned(wrapper) }
948    }
949
950    /// Schedules a destruction to be run once the queue has become idle.
951    ///
952    /// Idle here means that
953    ///
954    /// 1. there are no dispatches running, and
955    /// 2. all future dispatches happen after this function call.
956    ///
957    /// # Panic
958    ///
959    /// Panics if this is a local queue and the current thread is not the thread that this
960    /// queue was created in.
961    ///
962    /// # Safety
963    ///
964    /// - It must be safe to run the destruction once the queue is idle.
965    pub(crate) unsafe fn run_destruction_on_idle(&self, destruction: ProxyDataDestruction) {
966        self.run_locked_(|dd| {
967            if dd.is_dispatching.get() {
968                dd.to_destroy_on_idle.borrow_mut().push(destruction);
969            } else {
970                // SAFETY: - For a thread to dispatch, it must
971                //           1. hold the queue lock
972                //           2. set is_dispatching
973                //         - The queue lock is not dropped before the dispatch finishes
974                //           and is_dispatching is not reset before then.
975                //         - Since we're holding the queue lock and is_dispatching is false,
976                //           we know that no dispatches are currently running.
977                //         - If a future dispatch starts in this thread, it will happen
978                //           after this line of code.
979                //         - If a future dispatch starts on another thread, it will have
980                //           to acquire the queue lock and will therefore happen after we
981                //           release the lock below.
982                //         - By the pre-conditions of this function, it is safe to run
983                //           the destruction at this point.
984                unsafe {
985                    destruction.run();
986                }
987            }
988        });
989    }
990
991    /// Creates a [`QueueWatcher`] for event-loop integration.
992    ///
993    /// This is a shorthand for calling [`Connection::create_watcher`] with a queue list
994    /// containing exactly this queue.
995    pub fn create_watcher(&self) -> io::Result<QueueWatcher> {
996        self.connection.create_watcher(&[self], [])
997    }
998}
999
1000impl BorrowedQueue {
1001    /// Creates a [`QueueWatcher`] for event-loop integration.
1002    ///
1003    /// This is a shorthand for calling [`Connection::create_watcher`] with a queue list
1004    /// containing exactly this queue.
1005    ///
1006    /// The [`BorrowedQueue`] is dropped when the last clone of the [`QueueWatcher`] is
1007    /// dropped.
1008    pub fn create_watcher(self) -> io::Result<QueueWatcher> {
1009        let con = self.connection.clone();
1010        con.create_watcher(&[], [self])
1011    }
1012
1013    /// Completes when there are new events in this queue.
1014    ///
1015    /// When this function returns `Ok(())`, this queue has an event queued.
1016    ///
1017    /// This is a shorthand for calling [`Connection::wait_for_events`] with a
1018    /// queue list consisting of exactly this queue.
1019    ///
1020    /// # Example
1021    ///
1022    /// ```
1023    /// # use std::future::pending;
1024    /// # use std::sync::Arc;
1025    /// # use std::sync::atomic::AtomicBool;
1026    /// # use std::sync::atomic::Ordering::{Acquire, Release};
1027    /// # use wl_client::{proxy, Libwayland};
1028    /// # use wl_client::test_protocols::core::wl_callback::{WlCallback, WlCallbackEventHandler, WlCallbackRef};
1029    /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
1030    /// #
1031    /// # tokio_test::block_on(async {
1032    /// let lib = Libwayland::open().unwrap();
1033    /// let con = lib.connect_to_default_display().unwrap();
1034    /// let queue = con.create_queue(c"queue name");
1035    /// let display: WlDisplay = queue.display();
1036    ///
1037    /// let done = Arc::new(AtomicBool::new(false));
1038    /// let done2 = done.clone();
1039    /// let sync = display.sync();
1040    /// proxy::set_event_handler(&sync, WlCallback::on_done(move |_, _| {
1041    ///     done2.store(true, Release);
1042    /// }));
1043    ///
1044    /// while !done.load(Acquire) {
1045    ///     queue.wait_for_events().await.unwrap();
1046    ///     queue.dispatch_pending().unwrap();
1047    /// }
1048    /// # });
1049    /// ```
1050    pub async fn wait_for_events(&self) -> io::Result<()> {
1051        self.connection.wait_for_events(&[self]).await
1052    }
1053
1054    /// Returns the `wl_event_queue` representing this queue.
1055    ///
1056    /// This function returns `None` if and only if this queue is the default queue of the
1057    /// connection.
1058    ///
1059    /// The returned pointer, if any, remains valid as long as this object exists.
1060    ///
1061    /// # Example
1062    ///
1063    /// ```
1064    /// # use wl_client::Libwayland;
1065    /// #
1066    /// let lib = Libwayland::open().unwrap();
1067    /// let con = lib.connect_to_default_display().unwrap();
1068    /// let queue = con.create_queue(c"queue name");
1069    /// let default_queue = con.borrow_default_queue();
1070    ///
1071    /// assert_eq!((**queue).wl_event_queue(), Some(queue.wl_event_queue()));
1072    /// assert_eq!(default_queue.wl_event_queue(), None);
1073    /// ```
1074    pub fn wl_event_queue(&self) -> Option<NonNull<wl_event_queue>> {
1075        self.queue.map(|q| q.0)
1076    }
1077
1078    pub(crate) fn connection(&self) -> &Connection {
1079        &self.connection
1080    }
1081}
1082
1083impl Connection {
1084    /// Creates a new queue.
1085    ///
1086    /// The new queue is not a local queue. It can be dispatched from any thread. Event
1087    /// handlers attached to this queue must implement [`Send`]. See the documentation of
1088    /// [`Queue`] for a description of local and non-local queues.
1089    ///
1090    /// # Example
1091    ///
1092    /// ```
1093    /// # use wl_client::Libwayland;
1094    /// #
1095    /// let lib = Libwayland::open().unwrap();
1096    /// let con = lib.connect_to_default_display().unwrap();
1097    /// let _queue = con.create_queue(c"queue name");
1098    /// ```
1099    pub fn create_queue(&self, name: &CStr) -> QueueOwner {
1100        self.create_queue2(name, false)
1101    }
1102
1103    /// Creates a new local queue.
1104    ///
1105    /// The new queue is a local queue. It can only be dispatched from the thread that
1106    /// called this function. See the documentation of [`Queue`] for a description of
1107    /// local and non-local queues.
1108    ///
1109    /// # Example
1110    ///
1111    /// ```
1112    /// # use wl_client::Libwayland;
1113    /// #
1114    /// let lib = Libwayland::open().unwrap();
1115    /// let con = lib.connect_to_default_display().unwrap();
1116    /// let _queue = con.create_local_queue(c"queue name");
1117    /// ```
1118    pub fn create_local_queue(&self, name: &CStr) -> QueueOwner {
1119        self.create_queue2(name, true)
1120    }
1121
1122    /// Creates a new queue.
1123    fn create_queue2(&self, name: &CStr, local: bool) -> QueueOwner {
1124        // SAFETY: The display is valid and queue_name is a CString.
1125        let queue = unsafe {
1126            self.libwayland()
1127                .wl_display_create_queue_with_name(self.wl_display().as_ptr(), name.as_ptr())
1128        };
1129        let queue = NonNull::new(queue).unwrap();
1130        QueueOwner {
1131            queue: Queue {
1132                queue_data: Arc::new(QueueData {
1133                    libwayland: self.libwayland(),
1134                    borrowed: BorrowedQueue {
1135                        connection: self.clone(),
1136                        queue: Some(SyncNonNull(queue)),
1137                    },
1138                    name: name.to_owned(),
1139                    queue: SyncNonNull(queue),
1140                    mutex: match local {
1141                        true => ReentrantMutex::new_thread_local(Default::default()),
1142                        false => ReentrantMutex::new_shared(Default::default()),
1143                    },
1144                    owned_proxy_registry: Default::default(),
1145                }),
1146            },
1147        }
1148    }
1149
1150    /// Creates a [`BorrowedQueue`] representing the default queue.
1151    pub fn borrow_default_queue(&self) -> BorrowedQueue {
1152        BorrowedQueue {
1153            connection: self.clone(),
1154            queue: None,
1155        }
1156    }
1157
1158    /// Creates a [`BorrowedQueue`] representing a `wl_event_queue` pointer.
1159    ///
1160    /// # Safety
1161    ///
1162    /// - The queue must be valid and stay valid for the lifetime of the [`BorrowedQueue`].
1163    /// - The queue must belong to this connection.
1164    pub unsafe fn borrow_foreign_queue(&self, queue: NonNull<wl_event_queue>) -> BorrowedQueue {
1165        BorrowedQueue {
1166            connection: self.clone(),
1167            queue: Some(SyncNonNull(queue)),
1168        }
1169    }
1170}
1171
1172impl Drop for QueueOwner {
1173    fn drop(&mut self) {
1174        // To catch errors early, acquire the lock unconditionally even if there are no
1175        // proxies to be destroyed.
1176        self.run_locked(|| ());
1177        self.queue.queue_data.owned_proxy_registry.destroy_all();
1178    }
1179}
1180
1181impl Drop for QueueData {
1182    fn drop(&mut self) {
1183        // SAFETY: - queue is always a valid pointer until this call
1184        //         - all proxies attached to the queue hold a reference to the queue,
1185        //           therefore this function does not run until all proxies are dropped,
1186        //           which causes the proxies to be destroyed
1187        unsafe {
1188            self.libwayland.wl_event_queue_destroy(self.queue.as_ptr());
1189        }
1190    }
1191}
1192
1193impl PartialEq for Queue {
1194    fn eq(&self, other: &Queue) -> bool {
1195        self.queue_data.queue == other.queue_data.queue
1196    }
1197}
1198
1199impl Eq for Queue {}
1200
1201impl Debug for QueueOwner {
1202    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1203        self.queue.fmt(f)
1204    }
1205}
1206
1207impl Debug for Queue {
1208    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1209        f.debug_struct("Queue")
1210            .field("wl_event_queue", &self.wl_event_queue())
1211            .field("name", &self.queue_data.name)
1212            .finish_non_exhaustive()
1213    }
1214}
1215
1216impl Debug for DispatchLock<'_> {
1217    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1218        f.debug_struct("DispatchLock").finish_non_exhaustive()
1219    }
1220}
1221
1222impl Deref for Queue {
1223    type Target = BorrowedQueue;
1224
1225    fn deref(&self) -> &Self::Target {
1226        &self.queue_data.borrowed
1227    }
1228}