wl_client/
queue.rs

1pub use with_data::QueueWithData;
2use {
3    crate::{
4        Libwayland, QueueWatcher,
5        connection::Connection,
6        ffi::{interface_compatible, wl_event_queue, wl_proxy},
7        protocols::wayland::{
8            wl_callback::{WlCallbackEventHandler, WlCallbackRef},
9            wl_display::WlDisplay,
10        },
11        proxy::{
12            self, BorrowedProxy, OwnedProxy,
13            low_level::{
14                OwnedProxyRegistry, ProxyDataDestruction, UntypedOwnedProxy,
15                check_dispatching_proxy, check_new_proxy, owned::DISPATCH_PANIC,
16            },
17        },
18        utils::{
19            block_on::block_on,
20            reentrant_mutex::{ReentrantMutex, ReentrantMutexGuard},
21            sync_cell::SyncCell,
22            sync_ptr::{SyncNonNull, SyncPtr},
23        },
24    },
25    parking_lot::Mutex,
26    run_on_drop::on_drop,
27    std::{
28        any::TypeId,
29        cell::{Cell, RefCell},
30        ffi::{CStr, CString},
31        fmt::{Debug, Formatter},
32        future::poll_fn,
33        io, mem,
34        ops::Deref,
35        panic::resume_unwind,
36        pin::pin,
37        ptr::{self, NonNull},
38        sync::Arc,
39        task::{Poll, Waker},
40        thread::panicking,
41    },
42};
43
44#[cfg(test)]
45mod tests;
46mod with_data;
47
48/// The owner of an event queue.
49///
50/// This is a thin wrapper around [`Queue`] and implements `Deref<Target = Queue>`.
51///
52/// The purpose of this type is to manage the lifetimes of proxies attached to queues.
53/// This is described in more detail in the documentation of [`Queue`].
54pub struct QueueOwner {
55    queue: Queue,
56}
57
58#[expect(clippy::doc_overindented_list_items)]
59/// An event queue.
60///
61/// An event queue stores events that have been sent by the compositor but whose callbacks
62/// have not yet been invoked. Invoking these callbacks is called _dispatching_ the queue.
63///
64/// A connection can have many queues that can be dispatched in parallel, but the events
65/// in a single queue are always dispatched in series. That is, the callback for the next
66/// event will not start executing until after the callback for the previous event.
67///
68/// New queues are created by calling [`Connection::create_local_queue`] or
69/// [`Connection::create_queue`].
70///
71/// # Proxies attached to a queue
72///
73/// Each proxy is attached to a queue. This queue is determined as follows:
74///
75/// - When [`Queue::display`] is called, the returned proxy is attached to that queue.
76/// - When [`Queue::wrap_proxy`] is called, the returned proxy is attached to that queue.
77/// - When a constructor request is called on an owned proxy `P`, e.g. `WlDisplay::sync`,
78///   the returned proxy is attached to the same queue as `P`.
79/// - When a constructor request is called on a borrowed proxy, e.g. `WlDisplayRef::sync`,
80///   the request takes a queue argument and the returned proxy is attached to that queue.
81///
82/// See the documentation of the [`proxy`] module for more information about proxies.
83///
84/// # Queue ownership
85///
86/// Each queue is owned by a [`QueueOwner`] which is returned when you call
87/// [`Connection::create_local_queue`] or [`Connection::create_queue`].
88///
89/// When a [`QueueOwner`] is dropped, it will automatically destroy _some_ of the proxies
90/// attached to the queue. This ensures that, when the [`QueueOwner`] is dropped, all
91/// reference cycles between proxies and their event handlers are broken and no memory is
92/// leaked. (Normally these cycles are broken when a destructor request is sent or the
93/// proxy is destroyed with [`proxy::destroy`], but for long-lived proxies, such as
94/// `wl_registry`, it is more convenient for this to happen automatically.)
95///
96/// A queue and the attached proxies should no longer be used after its [`QueueOwner`] has
97/// been dropped. Doing so might lead to panics or memory leaks.
98///
99/// To ensure that the [`QueueOwner`] is eventually dropped, the queue owner should never
100/// be reachable from an event handler.
101///
102/// ```
103/// # use wl_client::Libwayland;
104/// # use wl_client::Queue;
105/// # use wl_client::Connection;
106/// # use wl_client::QueueOwner;
107/// #
108/// fn main() {
109///     let lib = Libwayland::open().unwrap();
110///     let con = lib.connect_to_default_display().unwrap();
111///     let queue: QueueOwner = con.create_queue(c"queue name");
112///
113///     // the queue owner is stored on the stack and will be dropped when this function
114///     // returns
115///     application_logic(&queue);
116/// }
117/// #
118/// # fn application_logic(_queue: &Queue) {
119/// # }
120/// ```
121///
122/// # Local and non-local queues
123///
124/// Each queue is either local or non-local. Local queues are created with
125/// [`Connection::create_local_queue`]. Non-local queues are created with
126/// [`Connection::create_queue`].
127///
128/// Local queues have the following advantage:
129///
130/// - Event handlers attached to local queues do not have to implement [`Send`].
131///
132/// This allows such events handlers to contain `Rc<T>` where the same event handler on
133/// a non-local queue would have to use `Arc<T>`.
134///
135/// To ensure that these event handlers are not accessed or dropped on different threads,
136/// this advantage comes with the following restrictions:
137///
138/// - If a local queue was created on a thread `X`:
139///   - Event handlers must be attached on `X`.
140///   - The queue must only be dispatched on `X`.
141///   - The [`QueueOwner`] owning the queue must be dropped on `X`.
142///   - Proxies attached to the queue must only be destroyed on `X`.
143///     - Note: Proxies are destroyed when [`proxy::destroy`] is called *or* when a
144///       destructor request is sent *or* implicitly when the last reference to the proxy is
145///       dropped.
146///
147/// These restrictions are checked at runtime and will lead to panics if violated.
148///
149/// ```
150/// # use std::cell::Cell;
151/// # use std::rc::Rc;
152/// # use wl_client::{proxy, Libwayland};
153/// # use wl_client::test_protocols::core::wl_callback::{WlCallback, WlCallbackEventHandler, WlCallbackRef};
154/// # use wl_client::test_protocols::core::wl_display::WlDisplay;
155/// #
156/// let lib = Libwayland::open().unwrap();
157/// let con = lib.connect_to_default_display().unwrap();
158///
159/// // Create a local queue.
160/// let queue = con.create_local_queue(c"queue name");
161/// let display: WlDisplay = queue.display();
162///
163/// // Create an `Rc` and use it as an event handler.
164/// let done = Rc::new(Cell::new(false));
165/// let done2 = done.clone();
166/// let sync = display.sync();
167/// proxy::set_event_handler_local(&sync, WlCallback::on_done(move |_, _| done2.set(true)));
168///
169/// // Calling this function from another thread would panic.
170/// queue.dispatch_roundtrip_blocking().unwrap();
171///
172/// assert!(done.get());
173/// ```
174///
175/// # Locking
176///
177/// Each queue contains a re-entrant mutex. Re-entrant means that a thread that already
178/// holds the lock can acquire it again.
179///
180/// This lock can be acquired explicitly by calling [`Queue::lock_dispatch`]. You might
181/// want to use this function if you're dispatching the queue from multiple threads to
182/// avoid lock inversion. For example, consider the following situation:
183///
184/// - Thread 1: Acquires the lock of some shared state.
185/// - Thread 2: Starts dispatching and acquires the queue's re-entrant lock.
186/// - Thread 2: Calls an event handler which tries to lock the shared state and blocks.
187/// - Thread 1: Destroys a proxy which will implicitly acquire the queue's re-entrant lock.
188///             This deadlocks.
189///
190/// Instead, thread 1 could call [`Queue::lock_dispatch`] before locking the shared state
191/// to prevent the lock inversion.
192///
193/// This deadlock concern does not apply if the queue is only ever used from a single
194/// thread or if the queue is a local queue. If you need to poll the wayland socket on a
195/// separate thread, you can use [`BorrowedQueue::wait_for_events`] and send a message to
196/// the main thread when the future completes.
197///
198/// The lock is acquired implicitly in the following situations:
199///
200/// - The lock is held for the entirety of the following function calls:
201///   - [`Queue::dispatch_pending`]
202///   - [`QueueWithData::dispatch_pending`]
203/// - The lock is sometimes held during the following function calls but not while they
204///   are waiting for new events:
205///   - [`Queue::dispatch_blocking`]
206///   - [`Queue::dispatch_roundtrip_blocking`]
207///   - [`QueueWithData::dispatch_blocking`]
208///   - [`QueueWithData::dispatch_roundtrip_blocking`]
209/// - The lock is sometimes held while the futures produced by the following functions
210///   are being polled but not while they are waiting for new events:
211///   - [`Queue::dispatch_async`]
212///   - [`Queue::dispatch_roundtrip_async`]
213///   - [`QueueWithData::dispatch_async`]
214///   - [`QueueWithData::dispatch_roundtrip_async`]
215/// - The lock is held at the start and the end of the following function calls but not
216///   while invoking the callback:
217///   - [`Queue::dispatch_scope_blocking`]
218/// - The lock is sometimes held while the futures produced by the following functions
219///   are being polled but not while polling the future passed as an argument:
220///   - [`Queue::dispatch_scope_async`]
221/// - The lock is held while attaching an event handler to a proxy.
222/// - The lock is held when a proxy with an event handler is being destroyed:
223///   - When sending a destructor request.
224///   - When calling [`proxy::destroy`].
225///   - Implicitly when dropping the last reference to a proxy.
226/// - The lock is held while dropping the [`QueueOwner`] of the queue.
227#[derive(Clone)]
228pub struct Queue {
229    queue_data: Arc<QueueData>,
230}
231
232/// A borrowed event queue.
233///
234/// This type is a thin wrapper around a `wl_event_queue` pointer. If the pointer is a
235/// null pointer, this object refers to the default libwayland queue.
236///
237/// [`Queue`] implements `Deref<Target = BorrowedQueue>`.
238///
239/// This type can be used to safely interact with foreign queues. It guarantees that the
240/// contained pointer is either null or valid. This type can be passed into
241/// [`Connection::wait_for_events`] to wait for events to arrive on multiple queues
242/// in a race-free way.
243///
244/// You can construct a [`BorrowedQueue`] by calling
245///
246/// - [`Connection::borrow_default_queue`] or
247/// - [`Connection::borrow_foreign_queue`].
248///
249/// # Example
250///
251/// ```
252/// # use wl_client::Libwayland;
253/// # use wl_client::test_protocols::core::wl_display::WlDisplay;
254/// #
255/// # tokio_test::block_on(async {
256/// let lib = Libwayland::open().unwrap();
257/// let con = lib.connect_to_default_display().unwrap();
258/// let queue1 = con.create_queue(c"queue name");
259/// let queue2 = con.create_queue(c"another queue");
260/// let default_queue = con.borrow_default_queue();
261/// # // ensure that some event arrives for this test
262/// # let sync = queue2.display::<WlDisplay>().sync();
263///
264/// con.wait_for_events(&[&queue1, &queue2, &default_queue]).await.unwrap();
265/// # });
266/// ```
267pub struct BorrowedQueue {
268    /// A reference to the connection that this queue belongs to. This also ensures
269    /// that the connection outlives the queue.
270    connection: Connection,
271    queue: Option<SyncNonNull<wl_event_queue>>,
272}
273
274/// A lock that prevents concurrent dispatching of a queue.
275///
276/// This lock can be acquired by calling [`Queue::lock_dispatch`].
277///
278/// See the description of [`Queue`] for why you might use this.
279///
280/// # Example
281///
282/// ```
283/// # use std::thread;
284/// # use wl_client::Libwayland;
285/// #
286/// let lib = Libwayland::open().unwrap();
287/// let con = lib.connect_to_default_display().unwrap();
288/// let queue = con.create_queue(c"queue name");
289///
290/// let lock = queue.lock_dispatch();
291///
292/// let thread = {
293///     let queue = queue.clone();
294///     thread::spawn(move || {
295///         // this dispatch will not start until the lock is dropped.
296///         queue.dispatch_roundtrip_blocking().unwrap();
297///     })
298/// };
299///
300/// // this dispatch starts immediately since the lock is re-entrant
301/// queue.dispatch_roundtrip_blocking().unwrap();
302///
303/// drop(lock);
304/// thread.join().unwrap();
305/// ```
306pub struct DispatchLock<'a> {
307    _lock: ReentrantMutexGuard<'a, DispatchData>,
308}
309
310struct QueueData {
311    /// A reference to the Libwayland singleton.
312    libwayland: &'static Libwayland,
313    /// The borrowed version of this queue for `Deref`.
314    borrowed: BorrowedQueue,
315    /// The name of the queue.
316    name: CString,
317    /// The native queue. This is always a valid pointer.
318    queue: SyncNonNull<wl_event_queue>,
319    /// This mutex protects
320    /// - the unsynchronized fields of any proxies attached to the queue
321    /// - the fields is_dispatching and to_destroy below
322    mutex: ReentrantMutex<DispatchData>,
323    /// The type of mutable data passed to event handlers attached to this queue. Each
324    /// event handler attached to this queue must
325    /// - not use any mutable data,
326    /// - use mutable data of type `()`, or
327    /// - use mutable data of this type.
328    mut_data_type: Option<TypeId>,
329    /// The name of mut_data_type, if any. This field is only used for panic messages.
330    mut_data_type_name: Option<&'static str>,
331    /// This field is protected by the mutex. It always contains a non-null pointer that
332    /// can be dereferenced to `&mut ()`. During dispatch, if `mut_data_type` is not
333    /// `None`, it contains a pointer to the data that was passed into `dispatch_pending`.
334    mut_data: SyncCell<SyncPtr<u8>>,
335    /// The registry for proxies that need manual destruction when the connection is
336    /// dropped.
337    owned_proxy_registry: OwnedProxyRegistry,
338}
339
340#[derive(Default)]
341struct DispatchData {
342    /// Contains whether the queue is currently dispatching. Note that, since
343    /// dispatching is protected by the mutex, this field is only ever accessed by a
344    /// single thread at a time.
345    is_dispatching: Cell<bool>,
346    /// If we are dispatching, this vector might contain work to run after the dispatch.
347    /// Protected against access from multiple threads by the mutex. These destructions
348    /// have the following invariant:
349    ///
350    /// - It must be safe to run the destruction once the queue is idle.
351    to_destroy_on_idle: RefCell<Vec<ProxyDataDestruction>>,
352}
353
354impl Deref for QueueOwner {
355    type Target = Queue;
356
357    fn deref(&self) -> &Self::Target {
358        &self.queue
359    }
360}
361
362impl Queue {
363    /// Returns a reference to the [`Libwayland`] singleton.
364    pub fn libwayland(&self) -> &'static Libwayland {
365        self.queue_data.libwayland
366    }
367
368    /// Returns the connection that this queue belongs to.
369    ///
370    /// # Example
371    ///
372    /// ```
373    /// # use wl_client::Libwayland;
374    /// #
375    /// let lib = Libwayland::open().unwrap();
376    /// let con = lib.connect_to_default_display().unwrap();
377    /// let queue = con.create_queue(c"queue name");
378    /// assert_eq!(queue.connection(), &con);
379    /// ```
380    pub fn connection(&self) -> &Connection {
381        &self.queue_data.borrowed.connection
382    }
383
384    /// Acquires the queue's re-entrant lock.
385    ///
386    /// See the description of [`Queue`] for why you might use this.
387    ///
388    /// # Example
389    ///
390    /// ```
391    /// # use std::thread;
392    /// # use wl_client::Libwayland;
393    /// #
394    /// let lib = Libwayland::open().unwrap();
395    /// let con = lib.connect_to_default_display().unwrap();
396    /// let queue = con.create_queue(c"queue name");
397    ///
398    /// let lock = queue.lock_dispatch();
399    ///
400    /// let thread = {
401    ///     let queue = queue.clone();
402    ///     thread::spawn(move || {
403    ///         // this dispatch will not start until the lock is dropped.
404    ///         queue.dispatch_roundtrip_blocking().unwrap();
405    ///     })
406    /// };
407    ///
408    /// // this dispatch starts immediately since the lock is re-entrant
409    /// queue.dispatch_roundtrip_blocking().unwrap();
410    ///
411    /// drop(lock);
412    /// thread.join().unwrap();
413    /// ```
414    pub fn lock_dispatch(&self) -> DispatchLock<'_> {
415        DispatchLock {
416            _lock: self.queue_data.mutex.lock(),
417        }
418    }
419
420    /// Creates a wrapper proxy around the singleton `wl_display` object.
421    ///
422    /// The proxy is a wrapper and no event handler can be attached to it.
423    ///
424    /// The proxy is attached to this queue.
425    ///
426    /// # Panic
427    ///
428    /// Panics if the interface of `T` is not compatible with the `wl_display` interface.
429    ///
430    /// # Example
431    ///
432    /// ```
433    /// # use wl_client::{proxy, Libwayland};
434    /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
435    /// #
436    /// let lib = Libwayland::open().unwrap();
437    /// let con = lib.connect_to_default_display().unwrap();
438    /// let queue = con.create_queue(c"queue name");
439    /// let display: WlDisplay = queue.display();
440    /// assert_eq!(proxy::queue(&display), &*queue);
441    /// assert_eq!(proxy::id(&*display), 1);
442    /// ```
443    pub fn display<T>(&self) -> T
444    where
445        T: OwnedProxy,
446    {
447        // SAFETY: OwnedProxy::WL_DISPLAY is always a valid interface.
448        let compatible = unsafe { interface_compatible(WlDisplay::WL_INTERFACE, T::WL_INTERFACE) };
449        if !compatible {
450            panic!("T::WL_INTERFACE is not compatible with wl_display");
451        }
452        // SAFETY: - wl_display always returns a valid object
453        //         - we've just verified that T has a compatible interface
454        unsafe { self.wrap_wl_proxy(self.connection().wl_display().cast()) }
455    }
456
457    /// Returns whether this is a local queue.
458    ///
459    /// The documentation of the [`Queue`] type explains the difference between local and
460    /// non-local queues.
461    ///
462    /// # Example
463    ///
464    /// ```
465    /// # use wl_client::Libwayland;
466    /// #
467    /// let lib = Libwayland::open().unwrap();
468    /// let con = lib.connect_to_default_display().unwrap();
469    /// let queue1 = con.create_queue(c"non-local queue");
470    /// let queue2 = con.create_local_queue(c"local queue");
471    ///
472    /// assert!(queue1.is_non_local());
473    /// assert!(queue2.is_local());
474    /// ```
475    pub fn is_local(&self) -> bool {
476        self.queue_data.mutex.is_thread_local()
477    }
478
479    /// Returns whether this is *not* a local queue.
480    ///
481    /// This is the same as `!self.is_local()`.
482    ///
483    /// The documentation of the [`Queue`] type explains the difference between local and
484    /// non-local queues.
485    pub fn is_non_local(&self) -> bool {
486        !self.is_local()
487    }
488
489    /// Returns the `wl_event_queue` pointer of this queue.
490    ///
491    /// The returned pointer is valid for as long as this queue exists.
492    ///
493    /// You must not dispatch the queue except through the [`Queue`] interface.
494    /// Otherwise the behavior is undefined.
495    pub fn wl_event_queue(&self) -> NonNull<wl_event_queue> {
496        self.queue_data.queue.0
497    }
498
499    /// Returns the proxy registry of this queue.
500    ///
501    /// Proxies attached to this registry will be destroyed when the [`QueueOwner`] is
502    /// dropped.
503    pub(crate) fn owned_proxy_registry(&self) -> &OwnedProxyRegistry {
504        &self.queue_data.owned_proxy_registry
505    }
506
507    /// Runs the closure while holding the reentrant queue mutex.
508    pub(crate) fn run_locked<T>(&self, f: impl FnOnce() -> T) -> T {
509        self.run_locked_(|_| f())
510    }
511
512    /// Runs the closure while holding the reentrant queue mutex.
513    fn run_locked_<T>(&self, f: impl FnOnce(&DispatchData) -> T) -> T {
514        let lock = self.queue_data.mutex.lock();
515        f(&lock)
516    }
517
518    /// Runs the closure while holding the reentrant queue mutex. The is_dispatching
519    /// field will be set to true for the duration of the callback.
520    fn with_dispatch<T>(&self, f: impl FnOnce() -> T) -> T {
521        self.run_locked_(|dd| {
522            let is_dispatching = dd.is_dispatching.get();
523            dd.is_dispatching.set(true);
524            let ret = f();
525            dd.is_dispatching.set(is_dispatching);
526            if !is_dispatching {
527                let mut to_destroy = dd.to_destroy_on_idle.borrow_mut();
528                if to_destroy.len() > 0 {
529                    let mut todo = mem::take(&mut *to_destroy);
530                    drop(to_destroy);
531                    for dd in todo.drain(..) {
532                        // SAFETY: - For a thread to dispatch, it must
533                        //           1. hold the queue lock
534                        //           2. set is_dispatching
535                        //         - The queue lock is not dropped before the dispatch finishes
536                        //           and is_dispatching is not reset before then.
537                        //         - Since we're holding the queue lock and is_dispatching is false,
538                        //           we know that no dispatches are currently running.
539                        //         - If a future dispatch starts in this thread, it will happen
540                        //           after this line of code.
541                        //         - If a future dispatch starts on another thread, it will have
542                        //           to acquire the queue lock and will therefore happen after we
543                        //           release the lock below.
544                        //         - Therefore this queue is idle at this point.
545                        //         - to_destroy_on_idle contains only destructions that are safe
546                        //           to run while idle.
547                        unsafe {
548                            dd.run();
549                        }
550                    }
551                    let mut to_destroy = dd.to_destroy_on_idle.borrow_mut();
552                    mem::swap(&mut todo, &mut *to_destroy);
553                }
554            }
555            ret
556        })
557    }
558
559    /// Blocks the current thread until at least one event has been dispatched.
560    ///
561    /// If you are in an async context, then you might want to use
562    /// [`Queue::dispatch_async`] instead.
563    ///
564    /// This function should not be used when integrating with an existing, poll-based
565    /// event loop, as it might block indefinitely. Use [`Connection::create_watcher`] and
566    /// [`Queue::dispatch_pending`] instead.
567    ///
568    /// This function cannot be used if the queue was created with
569    /// [`Connection::create_queue_with_data`] or
570    /// [`Connection::create_local_queue_with_data`]. Use
571    /// [`QueueWithData::dispatch_blocking`] instead.
572    ///
573    /// The returned number is the number of events that have been dispatched by this
574    /// call. The number can be zero if another thread dispatched the events before us.
575    ///
576    /// # Panic
577    ///
578    /// - Panics if this is a [local queue](Connection::create_local_queue) and the current
579    ///   thread is not the thread that this queue was created in.
580    /// - Panics if the queue was created with [`Connection::create_queue_with_data`] or
581    ///   [`Connection::create_local_queue_with_data`].
582    ///
583    /// # Example
584    ///
585    /// ```
586    /// # use wl_client::Libwayland;
587    /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
588    /// #
589    /// let lib = Libwayland::open().unwrap();
590    /// let con = lib.connect_to_default_display().unwrap();
591    /// let queue = con.create_queue(c"queue name");
592    ///
593    /// // For this example, ensure that the compositor sends an event in the near future.
594    /// let _sync = queue.display::<WlDisplay>().sync();
595    ///
596    /// queue.dispatch_blocking().unwrap();
597    /// ```
598    pub fn dispatch_blocking(&self) -> io::Result<u64> {
599        block_on(self.dispatch_async())
600    }
601
602    /// Completes when at least one event has been dispatched.
603    ///
604    /// This function is the same as [`Queue::dispatch_blocking`] except that it is async and does
605    /// not block the current thread.
606    ///
607    /// # Panic
608    ///
609    /// - Panics if this is a [local queue](Connection::create_local_queue) and the thread
610    ///   polling the future is not the thread that this queue was created in.
611    /// - Panics if the queue was created with [`Connection::create_queue_with_data`] or
612    ///   [`Connection::create_local_queue_with_data`].
613    ///
614    /// # Example
615    ///
616    /// ```
617    /// # use wl_client::Libwayland;
618    /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
619    /// #
620    /// # tokio_test::block_on(async {
621    /// let lib = Libwayland::open().unwrap();
622    /// let con = lib.connect_to_default_display().unwrap();
623    /// let queue = con.create_queue(c"queue name");
624    ///
625    /// // For this example, ensure that the compositor sends an event in the near future.
626    /// let _sync = queue.display::<WlDisplay>().sync();
627    ///
628    /// queue.dispatch_async().await.unwrap();
629    /// # });
630    /// ```
631    pub async fn dispatch_async(&self) -> io::Result<u64> {
632        self.connection.wait_for_events(&[self]).await?;
633        self.dispatch_pending()
634    }
635
636    /// Dispatches enqueued events.
637    ///
638    /// This function does not read new events from the file descriptor.
639    ///
640    /// This function can be used together with [`BorrowedQueue::wait_for_events`] or
641    /// [`Queue::create_watcher`] to dispatch the queue in an async context or event loop
642    /// respectively.
643    ///
644    /// This function cannot be used if the queue was created with
645    /// [`Connection::create_queue_with_data`] or
646    /// [`Connection::create_local_queue_with_data`]. Use
647    /// [`QueueWithData::dispatch_pending`] instead.
648    ///
649    /// The returned number is the number of events that were dispatched.
650    ///
651    /// # Panic
652    ///
653    /// - Panics if this is a [local queue](Connection::create_local_queue) and the current
654    ///   thread is not the thread that this queue was created in.
655    /// - Panics if the queue was created with [`Connection::create_queue_with_data`] or
656    ///   [`Connection::create_local_queue_with_data`].
657    ///
658    /// # Example
659    ///
660    /// ```
661    /// # use std::sync::Arc;
662    /// # use std::sync::atomic::AtomicBool;
663    /// # use std::sync::atomic::Ordering::Relaxed;
664    /// # use wl_client::{proxy, Libwayland};
665    /// # use wl_client::test_protocol_helpers::callback;
666    /// # use wl_client::test_protocols::core::wl_callback::{WlCallback, WlCallbackEventHandler, WlCallbackRef};
667    /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
668    /// #
669    /// # tokio_test::block_on(async {
670    /// let lib = Libwayland::open().unwrap();
671    /// let con = lib.connect_to_default_display().unwrap();
672    /// let queue = con.create_queue(c"queue name");
673    /// let display: WlDisplay = queue.display();
674    ///
675    /// let done = Arc::new(AtomicBool::new(false));
676    /// let done2 = done.clone();
677    /// let sync = display.sync();
678    /// proxy::set_event_handler(&sync, WlCallback::on_done(move |_, _| {
679    ///     done2.store(true, Relaxed);
680    /// }));
681    ///
682    /// while !done.load(Relaxed) {
683    ///     queue.wait_for_events().await.unwrap();
684    ///     // Dispatch the events.
685    ///     queue.dispatch_pending().unwrap();
686    /// }
687    /// # });
688    /// ```
689    pub fn dispatch_pending(&self) -> io::Result<u64> {
690        let d = &*self.queue_data;
691        if d.mut_data_type.is_some() {
692            panic!(
693                "Queue requires mutable data of type `{}` to be dispatched",
694                d.mut_data_type_name.unwrap(),
695            );
696        }
697        // SAFETY: - We've just checked that `mut_data_type` is None and
698        //           `&mut () = &mut U`.
699        unsafe { self.dispatch_pending_internal(ptr::from_mut(&mut ()).cast()) }
700    }
701
702    /// # Safety
703    ///
704    /// - If `self.mut_data_type` is Some, then `mut_data` must be a `&mut T` where
705    ///   `T` has the type ID `self.mut_data_type`.
706    /// - Otherwise `mut_data` must be `&mut U` for any `U`.
707    unsafe fn dispatch_pending_internal(&self, mut_data: *mut u8) -> io::Result<u64> {
708        let d = &*self.queue_data;
709        let _resume_unwind = on_drop(|| {
710            if let Some(err) = DISPATCH_PANIC.take() {
711                if !panicking() {
712                    resume_unwind(err);
713                }
714            }
715        });
716        let res = self.with_dispatch(|| {
717            let md = &self.queue_data.mut_data;
718            // SAFETY: - We're holding the queue lock.
719            //         - If mut_data_type is Some, then mut_data is `&mut T` where T has
720            //           the type ID mut_data_type.
721            let prev_mut_data = unsafe { md.replace(SyncPtr(mut_data)) };
722            let _reset_mut_data = on_drop(|| {
723                // SAFETY: - This closure runs before exiting the with_dispatch callback,
724                //           so we're still holding the queue lock.
725                //         - If this is a nested dispatch, then that dispatch had already
726                //           started when we entered the with_dispatch callback, therefore
727                //           prev_mut_data satisfies all of the requirements.
728                unsafe { md.set(prev_mut_data) }
729            });
730            // SAFETY: - by the invariants, the display and queue are valid
731            //         - the queue was created from the display
732            //         - we're inside with_dispatch which means that we're holding the
733            //           reentrant queue mutex. by the invariants, this mutex protects
734            //           the unsynchronized fields of the proxies.
735            unsafe {
736                d.libwayland.wl_display_dispatch_queue_pending(
737                    d.borrowed.connection.wl_display().as_ptr(),
738                    d.queue.as_ptr(),
739                )
740            }
741        });
742        if res == -1 {
743            return Err(io::Error::last_os_error());
744        }
745        assert!(res >= 0);
746        Ok(res as u64)
747    }
748
749    /// Blocks the current thread until the compositor has processed all previous requests
750    /// and all of its response events have been dispatched.
751    ///
752    /// If you are in an async context, then you might want to use
753    /// [`Queue::dispatch_roundtrip_async`] instead.
754    ///
755    /// Since this function usually returns quickly, you might use this function even
756    /// when integrating a wayland connection into an existing event loop and even in an
757    /// async context. For example, a library that creates buffers might use this function
758    /// during initialization to receive the full list of supported formats before
759    /// returning.
760    ///
761    /// This function cannot be used if the queue was created with
762    /// [`Connection::create_queue_with_data`] or
763    /// [`Connection::create_local_queue_with_data`]. Use
764    /// [`QueueWithData::dispatch_roundtrip_blocking`] instead.
765    ///
766    /// If this function returns `Ok(())`, then the function returns after (in the sense
767    /// of the C++ memory model) the event handlers of all previous events have been
768    /// invoked.
769    ///
770    /// # Panic
771    ///
772    /// - Panics if this is a [local queue](Connection::create_local_queue) and the current
773    ///   thread is not the thread that this queue was created in.
774    /// - Panics if the queue was created with [`Connection::create_queue_with_data`] or
775    ///   [`Connection::create_local_queue_with_data`].
776    ///
777    /// # Example
778    ///
779    /// ```
780    /// # use std::sync::Arc;
781    /// # use std::sync::atomic::AtomicBool;
782    /// # use std::sync::atomic::Ordering::Relaxed;
783    /// # use wl_client::{proxy, Libwayland};
784    /// # use wl_client::test_protocols::core::wl_callback::{WlCallback, WlCallbackEventHandler, WlCallbackRef};
785    /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
786    /// #
787    /// let lib = Libwayland::open().unwrap();
788    /// let con = lib.connect_to_default_display().unwrap();
789    /// let queue = con.create_queue(c"");
790    /// let display: WlDisplay = queue.display();
791    ///
792    /// // send some messages to the compositor
793    /// let done = Arc::new(AtomicBool::new(false));
794    /// let done2 = done.clone();
795    /// let sync = display.sync();
796    /// proxy::set_event_handler(&sync, WlCallback::on_done(move |_, _| {
797    ///     done2.store(true, Relaxed);
798    /// }));
799    ///
800    /// // perform a roundtrip
801    /// queue.dispatch_roundtrip_blocking().unwrap();
802    ///
803    /// // assert that we've received the response
804    /// assert!(done.load(Relaxed));
805    /// ```
806    pub fn dispatch_roundtrip_blocking(&self) -> io::Result<()> {
807        block_on(self.dispatch_roundtrip_async())
808    }
809
810    /// Completes when the compositor has processed all previous requests and all of its
811    /// response events have been dispatched.
812    ///
813    /// This function is the same as [`Queue::dispatch_roundtrip_blocking`] except that it is async and does
814    /// not block the current thread.
815    ///
816    /// If the future completes with `Ok(())`, then the future completes after (in the
817    /// sense of the C++ memory model) the event handlers of all previous events have been
818    /// invoked.
819    ///
820    /// # Panic
821    ///
822    /// - Panics if this is a [local queue](Connection::create_local_queue) and the thread
823    ///   polling the future is not the thread that this queue was created in.
824    /// - Panics if the queue was created with [`Connection::create_queue_with_data`] or
825    ///   [`Connection::create_local_queue_with_data`].
826    ///
827    /// # Example
828    ///
829    /// ```
830    /// # use std::sync::Arc;
831    /// # use std::sync::atomic::AtomicBool;
832    /// # use std::sync::atomic::Ordering::Relaxed;
833    /// # use wl_client::{proxy, Libwayland};
834    /// # use wl_client::test_protocols::core::wl_callback::{WlCallback, WlCallbackEventHandler, WlCallbackRef};
835    /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
836    /// #
837    /// # tokio_test::block_on(async {
838    /// let lib = Libwayland::open().unwrap();
839    /// let con = lib.connect_to_default_display().unwrap();
840    /// let queue = con.create_queue(c"queue name");
841    /// let display: WlDisplay = queue.display();
842    ///
843    /// // send some messages to the compositor
844    /// let done = Arc::new(AtomicBool::new(false));
845    /// let done2 = done.clone();
846    /// let sync = display.sync();
847    /// proxy::set_event_handler(&sync, WlCallback::on_done(move |_, _| {
848    ///     done2.store(true, Relaxed);
849    /// }));
850    ///
851    /// // perform a roundtrip
852    /// queue.dispatch_roundtrip_async().await.unwrap();
853    ///
854    /// // assert that we've received the response
855    /// assert!(done.load(Relaxed));
856    /// # });
857    /// ```
858    pub async fn dispatch_roundtrip_async(&self) -> io::Result<()> {
859        self.dispatch_roundtrip_async_internal(|| self.dispatch_pending())
860            .await
861    }
862
863    async fn dispatch_roundtrip_async_internal(
864        &self,
865        mut dispatch_pending: impl FnMut() -> io::Result<u64>,
866    ) -> io::Result<()> {
867        #[derive(Default)]
868        struct State {
869            ready: bool,
870            waker: Option<Waker>,
871        }
872
873        struct RoundtripEventHandler(Arc<Mutex<State>>);
874        impl WlCallbackEventHandler for RoundtripEventHandler {
875            fn done(&self, _slf: &WlCallbackRef, _callback_data: u32) {
876                let waker = {
877                    let state = &mut *self.0.lock();
878                    state.ready = true;
879                    state.waker.take()
880                };
881                if let Some(waker) = waker {
882                    waker.wake();
883                }
884            }
885        }
886
887        let state = Arc::new(Mutex::new(State::default()));
888
889        let _sync = self.run_locked(|| {
890            let sync = self.display::<WlDisplay>().sync();
891            proxy::set_event_handler(&sync, RoundtripEventHandler(state.clone()));
892            sync
893        });
894
895        // NOTE: A simple 1) check flag 2) wait for events loop would be incorrect here
896        //       since another thread could dispatch the queue between these two steps,
897        //       leaving us blocked even though the flag has already been set. Therefore
898        //       we have to make sure that this task gets woken up whenever the flag is
899        //       set.
900
901        self.connection.flush()?;
902        let queues = [&**self];
903        loop {
904            let fut = self.connection.wait_for_events_without_flush(&queues);
905            let mut fut = pin!(fut);
906            let ready = poll_fn(|ctx| {
907                let mut s = state.lock();
908                if s.ready {
909                    return Poll::Ready(Ok(true));
910                }
911                if let Poll::Ready(res) = fut.as_mut().poll(ctx) {
912                    return Poll::Ready(res.map(|_| false));
913                }
914                s.waker = Some(ctx.waker().clone());
915                Poll::Pending
916            })
917            .await?;
918            if ready {
919                return Ok(());
920            }
921            dispatch_pending()?;
922        }
923    }
924
925    /// Creates a wrapper for an existing proxy.
926    ///
927    /// The wrapper will be assigned to this queue. No event handler can be assigned to
928    /// the wrapper.
929    ///
930    /// # Panic
931    ///
932    /// - Panics if the proxy and this queue don't belong to the same `wl_display`.
933    /// - Panics if the proxy is already destroyed.
934    ///
935    /// # Example
936    ///
937    /// ```
938    /// # use wl_client::{proxy, Libwayland};
939    /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
940    /// #
941    /// let lib = Libwayland::open().unwrap();
942    /// let con = lib.connect_to_default_display().unwrap();
943    ///
944    /// let queue1 = con.create_queue(c"queue name");
945    /// let display1: WlDisplay = queue1.display();
946    /// assert_eq!(proxy::queue(&display1), &*queue1);
947    ///
948    /// let queue2 = con.create_queue(c"second queue");
949    /// let display2 = queue2.wrap_proxy(&*display1);
950    /// assert_eq!(proxy::queue(&display2), &*queue2);
951    /// ```
952    pub fn wrap_proxy<P>(&self, proxy: &P) -> P::Owned
953    where
954        P: BorrowedProxy,
955    {
956        let lock = proxy::get_ref(proxy).lock();
957        let proxy = check_dispatching_proxy(lock.wl_proxy());
958        // SAFETY: - We've verified that the proxy is not null. UntypedBorrowedProxy
959        //           requires that the pointer is valid in this case.
960        //         - The interface of the proxy is compatible with P::Owned::WL_INTERFACE.
961        unsafe { self.wrap_wl_proxy(proxy) }
962    }
963
964    /// Creates a wrapper for an existing `wl_proxy`.
965    ///
966    /// The wrapper will be assigned to this queue. No event handler can be assigned to
967    /// the wrapper.
968    ///
969    /// If the `wl_proxy` already has a safe wrapper, the [`Queue::wrap_proxy`] function
970    /// can be used instead.
971    ///
972    /// # Panic
973    ///
974    /// - Panics if the proxy and this queue don't belong to the same `wl_display`.
975    ///
976    /// # Safety
977    ///
978    /// - `proxy` must be a valid pointer.
979    /// - `proxy` must have an interface compatible with `P::WL_INTERFACE`.
980    ///
981    /// # Example
982    ///
983    /// Some frameworks, e.g. winit, expose libwayland `wl_display` and `wl_surface`
984    /// pointers. These can be imported into this crate as follows:
985    ///
986    /// ```
987    /// # use std::ffi::c_void;
988    /// # use std::ptr::NonNull;
989    /// # use wl_client::{Libwayland, Queue};
990    /// # use wl_client::test_protocols::core::wl_surface::WlSurface;
991    /// #
992    /// unsafe fn wrap_foreign_surface(display: NonNull<c_void>, wl_surface: NonNull<c_void>) {
993    ///     let lib = Libwayland::open().unwrap();
994    ///     // SAFETY: ...
995    ///     let con = unsafe { lib.wrap_borrowed_pointer(display.cast()).unwrap() };
996    ///     let queue = con.create_queue(c"queue name");
997    ///     // SAFETY: ...
998    ///     let surface: WlSurface = unsafe { queue.wrap_wl_proxy(wl_surface.cast()) };
999    /// }
1000    /// ```
1001    pub unsafe fn wrap_wl_proxy<P>(&self, proxy: NonNull<wl_proxy>) -> P
1002    where
1003        P: OwnedProxy,
1004    {
1005        let d = &*self.queue_data;
1006        // SAFETY: - It's a requirement of this function that the proxy is a valid pointer.
1007        let display = unsafe { d.libwayland.wl_proxy_get_display(proxy.as_ptr()) };
1008        assert_eq!(display, d.borrowed.connection.wl_display().as_ptr());
1009        // SAFETY: - It's a requirement of this function that the proxy is a valid pointer.
1010        let wrapper: *mut wl_proxy = unsafe {
1011            d.libwayland
1012                .wl_proxy_create_wrapper(proxy.as_ptr().cast())
1013                .cast()
1014        };
1015        let wrapper = check_new_proxy(wrapper);
1016        // SAFETY: - we just created wrapper so it is valid
1017        //         - queue is valid by the invariants
1018        //         - the UntypedOwnedProxy created below will hold a reference to this queue
1019        //         - the wrapper belongs to the same display as the proxy, and we've
1020        //           verified above that the proxy has the same display as this queue
1021        unsafe {
1022            d.libwayland
1023                .wl_proxy_set_queue(wrapper.as_ptr(), d.queue.as_ptr());
1024        }
1025        // SAFETY: - we just created wrapper so it is valid
1026        //         - wrapper is a wrapper so it doesn't have an event handler
1027        //         - we have ownership of wrapper and hand it over
1028        //         - we just assigned self as the queue of wrapper
1029        //         - the interface is none since this is a wrapper
1030        let wrapper = unsafe { UntypedOwnedProxy::from_wrapper_wl_proxy(self, wrapper) };
1031        // SAFETY: - the requirement is forwarded to the caller
1032        unsafe { proxy::low_level::from_untyped_owned(wrapper) }
1033    }
1034
1035    /// Schedules a destruction to be run once the queue has become idle.
1036    ///
1037    /// Idle here means that
1038    ///
1039    /// 1. there are no dispatches running, and
1040    /// 2. all future dispatches happen after this function call.
1041    ///
1042    /// # Panic
1043    ///
1044    /// Panics if this is a local queue and the current thread is not the thread that this
1045    /// queue was created in.
1046    ///
1047    /// # Safety
1048    ///
1049    /// - It must be safe to run the destruction once the queue is idle.
1050    pub(crate) unsafe fn run_destruction_on_idle(&self, destruction: ProxyDataDestruction) {
1051        self.run_locked_(|dd| {
1052            if dd.is_dispatching.get() {
1053                dd.to_destroy_on_idle.borrow_mut().push(destruction);
1054            } else {
1055                // SAFETY: - For a thread to dispatch, it must
1056                //           1. hold the queue lock
1057                //           2. set is_dispatching
1058                //         - The queue lock is not dropped before the dispatch finishes
1059                //           and is_dispatching is not reset before then.
1060                //         - Since we're holding the queue lock and is_dispatching is false,
1061                //           we know that no dispatches are currently running.
1062                //         - If a future dispatch starts in this thread, it will happen
1063                //           after this line of code.
1064                //         - If a future dispatch starts on another thread, it will have
1065                //           to acquire the queue lock and will therefore happen after we
1066                //           release the lock below.
1067                //         - By the pre-conditions of this function, it is safe to run
1068                //           the destruction at this point.
1069                unsafe {
1070                    destruction.run();
1071                }
1072            }
1073        });
1074    }
1075
1076    /// Creates a [`QueueWatcher`] for event-loop integration.
1077    ///
1078    /// This is a shorthand for calling [`Connection::create_watcher`] with a queue list
1079    /// containing exactly this queue.
1080    pub fn create_watcher(&self) -> io::Result<QueueWatcher> {
1081        self.connection.create_watcher(&[self], [])
1082    }
1083}
1084
1085impl BorrowedQueue {
1086    /// Creates a [`QueueWatcher`] for event-loop integration.
1087    ///
1088    /// This is a shorthand for calling [`Connection::create_watcher`] with a queue list
1089    /// containing exactly this queue.
1090    ///
1091    /// The [`BorrowedQueue`] is dropped when the last clone of the [`QueueWatcher`] is
1092    /// dropped.
1093    pub fn create_watcher(self) -> io::Result<QueueWatcher> {
1094        let con = self.connection.clone();
1095        con.create_watcher(&[], [self])
1096    }
1097
1098    /// Completes when there are new events in this queue.
1099    ///
1100    /// When this function returns `Ok(())`, this queue has an event queued.
1101    ///
1102    /// This is a shorthand for calling [`Connection::wait_for_events`] with a
1103    /// queue list consisting of exactly this queue.
1104    ///
1105    /// # Example
1106    ///
1107    /// ```
1108    /// # use std::future::pending;
1109    /// # use std::sync::Arc;
1110    /// # use std::sync::atomic::AtomicBool;
1111    /// # use std::sync::atomic::Ordering::{Acquire, Release};
1112    /// # use wl_client::{proxy, Libwayland};
1113    /// # use wl_client::test_protocols::core::wl_callback::{WlCallback, WlCallbackEventHandler, WlCallbackRef};
1114    /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
1115    /// #
1116    /// # tokio_test::block_on(async {
1117    /// let lib = Libwayland::open().unwrap();
1118    /// let con = lib.connect_to_default_display().unwrap();
1119    /// let queue = con.create_queue(c"queue name");
1120    /// let display: WlDisplay = queue.display();
1121    ///
1122    /// let done = Arc::new(AtomicBool::new(false));
1123    /// let done2 = done.clone();
1124    /// let sync = display.sync();
1125    /// proxy::set_event_handler(&sync, WlCallback::on_done(move |_, _| {
1126    ///     done2.store(true, Release);
1127    /// }));
1128    ///
1129    /// while !done.load(Acquire) {
1130    ///     queue.wait_for_events().await.unwrap();
1131    ///     queue.dispatch_pending().unwrap();
1132    /// }
1133    /// # });
1134    /// ```
1135    pub async fn wait_for_events(&self) -> io::Result<()> {
1136        self.connection.wait_for_events(&[self]).await
1137    }
1138
1139    /// Returns the `wl_event_queue` representing this queue.
1140    ///
1141    /// This function returns `None` if and only if this queue is the default queue of the
1142    /// connection.
1143    ///
1144    /// The returned pointer, if any, remains valid as long as this object exists.
1145    ///
1146    /// # Example
1147    ///
1148    /// ```
1149    /// # use wl_client::Libwayland;
1150    /// #
1151    /// let lib = Libwayland::open().unwrap();
1152    /// let con = lib.connect_to_default_display().unwrap();
1153    /// let queue = con.create_queue(c"queue name");
1154    /// let default_queue = con.borrow_default_queue();
1155    ///
1156    /// assert_eq!((**queue).wl_event_queue(), Some(queue.wl_event_queue()));
1157    /// assert_eq!(default_queue.wl_event_queue(), None);
1158    /// ```
1159    pub fn wl_event_queue(&self) -> Option<NonNull<wl_event_queue>> {
1160        self.queue.map(|q| q.0)
1161    }
1162
1163    pub(crate) fn connection(&self) -> &Connection {
1164        &self.connection
1165    }
1166}
1167
1168impl Connection {
1169    /// Creates a new queue.
1170    ///
1171    /// The new queue is not a local queue. It can be dispatched from any thread. Event
1172    /// handlers attached to this queue must implement [`Send`]. See the documentation of
1173    /// [`Queue`] for a description of local and non-local queues.
1174    ///
1175    /// # Example
1176    ///
1177    /// ```
1178    /// # use wl_client::Libwayland;
1179    /// #
1180    /// let lib = Libwayland::open().unwrap();
1181    /// let con = lib.connect_to_default_display().unwrap();
1182    /// let _queue = con.create_queue(c"queue name");
1183    /// ```
1184    pub fn create_queue(&self, name: &CStr) -> QueueOwner {
1185        self.create_queue2(name, false, None, None)
1186    }
1187
1188    /// Creates a new local queue.
1189    ///
1190    /// The new queue is a local queue. It can only be dispatched from the thread that
1191    /// called this function. See the documentation of [`Queue`] for a description of
1192    /// local and non-local queues.
1193    ///
1194    /// # Example
1195    ///
1196    /// ```
1197    /// # use wl_client::Libwayland;
1198    /// #
1199    /// let lib = Libwayland::open().unwrap();
1200    /// let con = lib.connect_to_default_display().unwrap();
1201    /// let _queue = con.create_local_queue(c"queue name");
1202    /// ```
1203    pub fn create_local_queue(&self, name: &CStr) -> QueueOwner {
1204        self.create_queue2(name, true, None, None)
1205    }
1206
1207    /// Creates a new queue.
1208    fn create_queue2(
1209        &self,
1210        name: &CStr,
1211        local: bool,
1212        mut_data_type: Option<TypeId>,
1213        mut_data_type_name: Option<&'static str>,
1214    ) -> QueueOwner {
1215        // SAFETY: The display is valid and queue_name is a CString.
1216        let queue = unsafe {
1217            self.libwayland()
1218                .wl_display_create_queue_with_name(self.wl_display().as_ptr(), name.as_ptr())
1219        };
1220        let queue = NonNull::new(queue).unwrap();
1221        QueueOwner {
1222            queue: Queue {
1223                queue_data: Arc::new(QueueData {
1224                    libwayland: self.libwayland(),
1225                    borrowed: BorrowedQueue {
1226                        connection: self.clone(),
1227                        queue: Some(SyncNonNull(queue)),
1228                    },
1229                    name: name.to_owned(),
1230                    queue: SyncNonNull(queue),
1231                    mutex: match local {
1232                        true => ReentrantMutex::new_thread_local(Default::default()),
1233                        false => ReentrantMutex::new_shared(Default::default()),
1234                    },
1235                    mut_data_type,
1236                    mut_data_type_name,
1237                    mut_data: SyncCell::new(SyncPtr(ptr::from_mut(&mut ()).cast())),
1238                    owned_proxy_registry: Default::default(),
1239                }),
1240            },
1241        }
1242    }
1243
1244    /// Creates a [`BorrowedQueue`] representing the default queue.
1245    pub fn borrow_default_queue(&self) -> BorrowedQueue {
1246        BorrowedQueue {
1247            connection: self.clone(),
1248            queue: None,
1249        }
1250    }
1251
1252    /// Creates a [`BorrowedQueue`] representing a `wl_event_queue` pointer.
1253    ///
1254    /// # Safety
1255    ///
1256    /// - The queue must be valid and stay valid for the lifetime of the [`BorrowedQueue`].
1257    /// - The queue must belong to this connection.
1258    pub unsafe fn borrow_foreign_queue(&self, queue: NonNull<wl_event_queue>) -> BorrowedQueue {
1259        BorrowedQueue {
1260            connection: self.clone(),
1261            queue: Some(SyncNonNull(queue)),
1262        }
1263    }
1264}
1265
1266impl Drop for QueueOwner {
1267    fn drop(&mut self) {
1268        // To catch errors early, acquire the lock unconditionally even if there are no
1269        // proxies to be destroyed.
1270        self.run_locked(|| ());
1271        self.queue.queue_data.owned_proxy_registry.destroy_all();
1272    }
1273}
1274
1275impl Drop for QueueData {
1276    fn drop(&mut self) {
1277        // SAFETY: - queue is always a valid pointer until this call
1278        //         - all proxies attached to the queue hold a reference to the queue,
1279        //           therefore this function does not run until all proxies are dropped,
1280        //           which causes the proxies to be destroyed
1281        unsafe {
1282            self.libwayland.wl_event_queue_destroy(self.queue.as_ptr());
1283        }
1284    }
1285}
1286
1287impl PartialEq for Queue {
1288    fn eq(&self, other: &Queue) -> bool {
1289        self.queue_data.queue == other.queue_data.queue
1290    }
1291}
1292
1293impl Eq for Queue {}
1294
1295impl Debug for QueueOwner {
1296    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1297        self.queue.fmt(f)
1298    }
1299}
1300
1301impl Debug for Queue {
1302    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1303        f.debug_struct("Queue")
1304            .field("wl_event_queue", &self.wl_event_queue())
1305            .field("name", &self.queue_data.name)
1306            .finish_non_exhaustive()
1307    }
1308}
1309
1310impl Debug for DispatchLock<'_> {
1311    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1312        f.debug_struct("DispatchLock").finish_non_exhaustive()
1313    }
1314}
1315
1316impl Deref for Queue {
1317    type Target = BorrowedQueue;
1318
1319    fn deref(&self) -> &Self::Target {
1320        &self.queue_data.borrowed
1321    }
1322}