wl_client/queue.rs
1use {
2 crate::{
3 Libwayland, QueueWatcher,
4 connection::Connection,
5 ffi::{interface_compatible, wl_event_queue, wl_proxy},
6 protocols::wayland::{
7 wl_callback::{WlCallbackEventHandler, WlCallbackRef},
8 wl_display::WlDisplay,
9 },
10 proxy::{
11 self, BorrowedProxy, OwnedProxy,
12 low_level::{
13 OwnedProxyRegistry, ProxyDataDestruction, UntypedOwnedProxy,
14 check_dispatching_proxy, check_new_proxy, owned::DISPATCH_PANIC,
15 },
16 },
17 utils::{
18 block_on::block_on,
19 on_drop::on_drop,
20 reentrant_mutex::{ReentrantMutex, ReentrantMutexGuard},
21 sync_ptr::SyncNonNull,
22 },
23 },
24 parking_lot::Mutex,
25 std::{
26 cell::{Cell, RefCell},
27 ffi::{CStr, CString},
28 fmt::{Debug, Formatter},
29 future::poll_fn,
30 io, mem,
31 ops::Deref,
32 panic::resume_unwind,
33 pin::pin,
34 ptr::NonNull,
35 sync::Arc,
36 task::{Poll, Waker},
37 thread::panicking,
38 },
39};
40
41#[cfg(test)]
42mod tests;
43
44/// The owner of an event queue.
45///
46/// This is a thin wrapper around [`Queue`] and implements `Deref<Target = Queue>`.
47///
48/// The purpose of this type is to manage the lifetimes of proxies attached to queues.
49/// This is described in more detail in the documentation of [`Queue`].
50pub struct QueueOwner {
51 queue: Queue,
52}
53
54#[expect(clippy::doc_overindented_list_items)]
55/// An event queue.
56///
57/// An event queue stores events that have been sent by the compositor but whose callbacks
58/// have not yet been invoked. Invoking these callbacks is called _dispatching_ the queue.
59///
60/// A connection can have many queues that can be dispatched in parallel, but the events
61/// in a single queue are always dispatched in series. That is, the callback for the next
62/// event will not start executing until after the callback for the previous event.
63///
64/// New queues are created by calling [`Connection::create_local_queue`] or
65/// [`Connection::create_queue`].
66///
67/// # Proxies attached to a queue
68///
69/// Each proxy is attached to a queue. This queue is determined as follows:
70///
71/// - When [`Queue::display`] is called, the returned proxy is attached to that queue.
72/// - When [`Queue::wrap_proxy`] is called, the returned proxy is attached to that queue.
73/// - When a constructor request is called on an owned proxy `P`, e.g. `WlDisplay::sync`,
74/// the returned proxy is attached to the same queue as `P`.
75/// - When a constructor request is called on a borrowed proxy, e.g. `WlDisplayRef::sync`,
76/// the request takes a queue argument and the returned proxy is attached to that queue.
77///
78/// See the documentation of the [`proxy`] module for more information about proxies.
79///
80/// # Queue ownership
81///
82/// Each queue is owned by a [`QueueOwner`] which is returned when you call
83/// [`Connection::create_local_queue`] or [`Connection::create_queue`].
84///
85/// When a [`QueueOwner`] is dropped, it will automatically destroy _some_ of the proxies
86/// attached to the queue. This ensures that, when the [`QueueOwner`] is dropped, all
87/// reference cycles between proxies and their event handlers are broken and no memory is
88/// leaked. (Normally these cycles are broken when a destructor request is sent or the
89/// proxy is destroyed with [`proxy::destroy`], but for long-lived proxies, such as
90/// `wl_registry`, it is more convenient for this to happen automatically.)
91///
92/// A queue and the attached proxies should no longer be used after its [`QueueOwner`] has
93/// been dropped. Doing so might lead to panics or memory leaks.
94///
95/// To ensure that the [`QueueOwner`] is eventually dropped, the queue owner should never
96/// be reachable from an event handler.
97///
98/// ```
99/// # use wl_client::Libwayland;
100/// # use wl_client::Queue;
101/// # use wl_client::Connection;
102/// # use wl_client::QueueOwner;
103/// #
104/// fn main() {
105/// let lib = Libwayland::open().unwrap();
106/// let con = lib.connect_to_default_display().unwrap();
107/// let queue: QueueOwner = con.create_queue(c"queue name");
108///
109/// // the queue owner is stored on the stack and will be dropped when this function
110/// // returns
111/// application_logic(&queue);
112/// }
113/// #
114/// # fn application_logic(_queue: &Queue) {
115/// # }
116/// ```
117///
118/// # Local and non-local queues
119///
120/// Each queue is either local or non-local. Local queues are created with
121/// [`Connection::create_local_queue`]. Non-local queues are created with
122/// [`Connection::create_queue`].
123///
124/// Local queues have the following advantage:
125///
126/// - Event handlers attached to local queues do not have to implement [`Send`].
127///
128/// This allows such events handlers to contain `Rc<T>` where the same event handler on
129/// a non-local queue would have to use `Arc<T>`.
130///
131/// To ensure that these event handlers are not accessed or dropped on different threads,
132/// this advantage comes with the following restrictions:
133///
134/// - If a local queue was created on a thread `X`:
135/// - Event handlers must be attached on `X`.
136/// - The queue must only be dispatched on `X`.
137/// - The [`QueueOwner`] owning the queue must be dropped on `X`.
138/// - Proxies attached to the queue must only be destroyed on `X`.
139/// - Note: Proxies are destroyed when [`proxy::destroy`] is called *or* when a
140/// destructor request is sent *or* implicitly when the last reference to the proxy is
141/// dropped.
142///
143/// These restrictions are checked at runtime and will lead to panics if violated.
144///
145/// ```
146/// # use std::cell::Cell;
147/// # use std::rc::Rc;
148/// # use wl_client::{proxy, Libwayland};
149/// # use wl_client::test_protocols::core::wl_callback::{WlCallback, WlCallbackEventHandler, WlCallbackRef};
150/// # use wl_client::test_protocols::core::wl_display::WlDisplay;
151/// #
152/// let lib = Libwayland::open().unwrap();
153/// let con = lib.connect_to_default_display().unwrap();
154///
155/// // Create a local queue.
156/// let queue = con.create_local_queue(c"queue name");
157/// let display: WlDisplay = queue.display();
158///
159/// // Create an `Rc` and use it as an event handler.
160/// let done = Rc::new(Cell::new(false));
161/// let done2 = done.clone();
162/// let sync = display.sync();
163/// proxy::set_event_handler_local(&sync, WlCallback::on_done(move |_, _| done2.set(true)));
164///
165/// // Calling this function from another thread would panic.
166/// queue.dispatch_roundtrip_blocking().unwrap();
167///
168/// assert!(done.get());
169/// ```
170///
171/// # Locking
172///
173/// Each queue contains a re-entrant mutex. Re-entrant means that a thread that already
174/// holds the lock can acquire it again.
175///
176/// This lock can be acquired explicitly by calling [`Queue::lock_dispatch`]. You might
177/// want to use this function if you're dispatching the queue from multiple threads to
178/// avoid lock inversion. For example, consider the following situation:
179///
180/// - Thread 1: Acquires the lock of some shared state.
181/// - Thread 2: Starts dispatching and acquires the queue's re-entrant lock.
182/// - Thread 2: Calls an event handler which tries to lock the shared state and blocks.
183/// - Thread 1: Destroys a proxy which will implicitly acquire the queue's re-entrant lock.
184/// This deadlocks.
185///
186/// Instead, thread 1 could call [`Queue::lock_dispatch`] before locking the shared state
187/// to prevent the lock inversion.
188///
189/// This deadlock concern does not apply if the queue is only ever used from a single
190/// thread or if the queue is a local queue. If you need to poll the wayland socket on a
191/// separate thread, you can use [`BorrowedQueue::wait_for_events`] and send a message to
192/// the main thread when the future completes.
193///
194/// The lock is acquired implicitly in the following situations:
195///
196/// - The lock is held for the entirety of the following function calls:
197/// - [`Queue::dispatch_pending`]
198/// - The lock is sometimes held during the following function calls but not while they
199/// are waiting for new events:
200/// - [`Queue::dispatch_blocking`]
201/// - [`Queue::dispatch_roundtrip_blocking`]
202/// - The lock is sometimes held while the futures produced by the following functions
203/// are being polled but not while they are waiting for new events:
204/// - [`Queue::dispatch_async`]
205/// - [`Queue::dispatch_roundtrip_async`]
206/// - The lock is held at the start and the end of the following function calls but not
207/// while invoking the callback:
208/// - [`Queue::dispatch_scope_blocking`]
209/// - The lock is sometimes held while the futures produced by the following functions
210/// are being polled but not while polling the future passed as an argument:
211/// - [`Queue::dispatch_scope_async`]
212/// - The lock is held while attaching an event handler to a proxy.
213/// - The lock is held when a proxy with an event handler is being destroyed:
214/// - When sending a destructor request.
215/// - When calling [`proxy::destroy`].
216/// - Implicitly when dropping the last reference to a proxy.
217/// - The lock is held while dropping the [`QueueOwner`] of the queue.
218#[derive(Clone)]
219pub struct Queue {
220 queue_data: Arc<QueueData>,
221}
222
223/// A borrowed event queue.
224///
225/// This type is a thin wrapper around a `wl_event_queue` pointer. If the pointer is a
226/// null pointer, this object refers to the default libwayland queue.
227///
228/// [`Queue`] implements `Deref<Target = BorrowedQueue>`.
229///
230/// This type can be used to safely interact with foreign queues. It guarantees that the
231/// contained pointer is either null or valid. This type can be passed into
232/// [`Connection::wait_for_events`] to wait for events to arrive on multiple queues
233/// in a race-free way.
234///
235/// You can construct a [`BorrowedQueue`] by calling
236///
237/// - [`Connection::borrow_default_queue`] or
238/// - [`Connection::borrow_foreign_queue`].
239///
240/// # Example
241///
242/// ```
243/// # use wl_client::Libwayland;
244/// # use wl_client::test_protocols::core::wl_display::WlDisplay;
245/// #
246/// # tokio_test::block_on(async {
247/// let lib = Libwayland::open().unwrap();
248/// let con = lib.connect_to_default_display().unwrap();
249/// let queue1 = con.create_queue(c"queue name");
250/// let queue2 = con.create_queue(c"another queue");
251/// let default_queue = con.borrow_default_queue();
252/// # // ensure that some event arrives for this test
253/// # let sync = queue2.display::<WlDisplay>().sync();
254///
255/// con.wait_for_events(&[&queue1, &queue2, &default_queue]).await.unwrap();
256/// # });
257/// ```
258pub struct BorrowedQueue {
259 /// A reference to the connection that this queue belongs to. This also ensures
260 /// that the connection outlives the queue.
261 connection: Connection,
262 queue: Option<SyncNonNull<wl_event_queue>>,
263}
264
265/// A lock that prevents concurrent dispatching of a queue.
266///
267/// This lock can be acquired by calling [`Queue::lock_dispatch`].
268///
269/// See the description of [`Queue`] for why you might use this.
270///
271/// # Example
272///
273/// ```
274/// # use std::thread;
275/// # use wl_client::Libwayland;
276/// #
277/// let lib = Libwayland::open().unwrap();
278/// let con = lib.connect_to_default_display().unwrap();
279/// let queue = con.create_queue(c"queue name");
280///
281/// let lock = queue.lock_dispatch();
282///
283/// let thread = {
284/// let queue = queue.clone();
285/// thread::spawn(move || {
286/// // this dispatch will not start until the lock is dropped.
287/// queue.dispatch_roundtrip_blocking().unwrap();
288/// })
289/// };
290///
291/// // this dispatch starts immediately since the lock is re-entrant
292/// queue.dispatch_roundtrip_blocking().unwrap();
293///
294/// drop(lock);
295/// thread.join().unwrap();
296/// ```
297pub struct DispatchLock<'a> {
298 _lock: ReentrantMutexGuard<'a, DispatchData>,
299}
300
301struct QueueData {
302 /// A reference to the Libwayland singleton.
303 libwayland: &'static Libwayland,
304 /// The borrowed version of this queue for `Deref`.
305 borrowed: BorrowedQueue,
306 /// The name of the queue.
307 name: CString,
308 /// The native queue. This is always a valid pointer.
309 queue: SyncNonNull<wl_event_queue>,
310 /// This mutex protects
311 /// - the unsynchronized fields of any proxies attached to the queue
312 /// - the fields is_dispatching and to_destroy below
313 mutex: ReentrantMutex<DispatchData>,
314 /// The registry for proxies that need manual destruction when the connection is
315 /// dropped.
316 owned_proxy_registry: OwnedProxyRegistry,
317}
318
319#[derive(Default)]
320struct DispatchData {
321 /// Contains whether the queue is currently dispatching. Note that, since
322 /// dispatching is protected by the mutex, this field is only ever accessed by a
323 /// single thread at a time.
324 is_dispatching: Cell<bool>,
325 /// If we are dispatching, this vector might contain work to run after the dispatch.
326 /// Protected against access from multiple threads by the mutex. These destructions
327 /// have the following invariant:
328 ///
329 /// - It must be safe to run the destruction once the queue is idle.
330 to_destroy_on_idle: RefCell<Vec<ProxyDataDestruction>>,
331}
332
333impl Deref for QueueOwner {
334 type Target = Queue;
335
336 fn deref(&self) -> &Self::Target {
337 &self.queue
338 }
339}
340
341impl Queue {
342 /// Returns a reference to the [`Libwayland`] singleton.
343 pub fn libwayland(&self) -> &'static Libwayland {
344 self.queue_data.libwayland
345 }
346
347 /// Returns the connection that this queue belongs to.
348 ///
349 /// # Example
350 ///
351 /// ```
352 /// # use wl_client::Libwayland;
353 /// #
354 /// let lib = Libwayland::open().unwrap();
355 /// let con = lib.connect_to_default_display().unwrap();
356 /// let queue = con.create_queue(c"queue name");
357 /// assert_eq!(queue.connection(), &con);
358 /// ```
359 pub fn connection(&self) -> &Connection {
360 &self.queue_data.borrowed.connection
361 }
362
363 /// Acquires the queue's re-entrant lock.
364 ///
365 /// See the description of [`Queue`] for why you might use this.
366 ///
367 /// # Example
368 ///
369 /// ```
370 /// # use std::thread;
371 /// # use wl_client::Libwayland;
372 /// #
373 /// let lib = Libwayland::open().unwrap();
374 /// let con = lib.connect_to_default_display().unwrap();
375 /// let queue = con.create_queue(c"queue name");
376 ///
377 /// let lock = queue.lock_dispatch();
378 ///
379 /// let thread = {
380 /// let queue = queue.clone();
381 /// thread::spawn(move || {
382 /// // this dispatch will not start until the lock is dropped.
383 /// queue.dispatch_roundtrip_blocking().unwrap();
384 /// })
385 /// };
386 ///
387 /// // this dispatch starts immediately since the lock is re-entrant
388 /// queue.dispatch_roundtrip_blocking().unwrap();
389 ///
390 /// drop(lock);
391 /// thread.join().unwrap();
392 /// ```
393 pub fn lock_dispatch(&self) -> DispatchLock<'_> {
394 DispatchLock {
395 _lock: self.queue_data.mutex.lock(),
396 }
397 }
398
399 /// Creates a wrapper proxy around the singleton `wl_display` object.
400 ///
401 /// The proxy is a wrapper and no event handler can be attached to it.
402 ///
403 /// The proxy is attached to this queue.
404 ///
405 /// # Panic
406 ///
407 /// Panics if the interface of `T` is not compatible with the `wl_display` interface.
408 ///
409 /// # Example
410 ///
411 /// ```
412 /// # use wl_client::{proxy, Libwayland};
413 /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
414 /// #
415 /// let lib = Libwayland::open().unwrap();
416 /// let con = lib.connect_to_default_display().unwrap();
417 /// let queue = con.create_queue(c"queue name");
418 /// let display: WlDisplay = queue.display();
419 /// assert_eq!(proxy::queue(&display), &*queue);
420 /// assert_eq!(proxy::id(&*display), 1);
421 /// ```
422 pub fn display<T>(&self) -> T
423 where
424 T: OwnedProxy,
425 {
426 // SAFETY: OwnedProxy::WL_DISPLAY is always a valid interface.
427 let compatible = unsafe { interface_compatible(WlDisplay::WL_INTERFACE, T::WL_INTERFACE) };
428 if !compatible {
429 panic!("T::WL_INTERFACE is not compatible with wl_display");
430 }
431 // SAFETY: - wl_display always returns a valid object
432 // - we've just verified that T has a compatible interface
433 unsafe { self.wrap_wl_proxy(self.connection().wl_display().cast()) }
434 }
435
436 /// Returns whether this is a local queue.
437 ///
438 /// The documentation of the [`Queue`] type explains the difference between local and
439 /// non-local queues.
440 ///
441 /// # Example
442 ///
443 /// ```
444 /// # use wl_client::Libwayland;
445 /// #
446 /// let lib = Libwayland::open().unwrap();
447 /// let con = lib.connect_to_default_display().unwrap();
448 /// let queue1 = con.create_queue(c"non-local queue");
449 /// let queue2 = con.create_local_queue(c"local queue");
450 ///
451 /// assert!(queue1.is_non_local());
452 /// assert!(queue2.is_local());
453 /// ```
454 pub fn is_local(&self) -> bool {
455 self.queue_data.mutex.is_thread_local()
456 }
457
458 /// Returns whether this is *not* a local queue.
459 ///
460 /// This is the same as `!self.is_local()`.
461 ///
462 /// The documentation of the [`Queue`] type explains the difference between local and
463 /// non-local queues.
464 pub fn is_non_local(&self) -> bool {
465 !self.is_local()
466 }
467
468 /// Returns the `wl_event_queue` pointer of this queue.
469 ///
470 /// The returned pointer is valid for as long as this queue exists.
471 ///
472 /// You must not dispatch the queue except through the [`Queue`] interface.
473 /// Otherwise the behavior is undefined.
474 pub fn wl_event_queue(&self) -> NonNull<wl_event_queue> {
475 self.queue_data.queue.0
476 }
477
478 /// Returns the proxy registry of this queue.
479 ///
480 /// Proxies attached to this registry will be destroyed when the [`QueueOwner`] is
481 /// dropped.
482 pub(crate) fn owned_proxy_registry(&self) -> &OwnedProxyRegistry {
483 &self.queue_data.owned_proxy_registry
484 }
485
486 /// Runs the closure while holding the reentrant queue mutex.
487 pub(crate) fn run_locked<T>(&self, f: impl FnOnce() -> T) -> T {
488 self.run_locked_(|_| f())
489 }
490
491 /// Runs the closure while holding the reentrant queue mutex.
492 fn run_locked_<T>(&self, f: impl FnOnce(&DispatchData) -> T) -> T {
493 let lock = self.queue_data.mutex.lock();
494 f(&lock)
495 }
496
497 /// Runs the closure while holding the reentrant queue mutex. The is_dispatching
498 /// field will be set to true for the duration of the callback.
499 fn with_dispatch<T>(&self, f: impl FnOnce() -> T) -> T {
500 self.run_locked_(|dd| {
501 let is_dispatching = dd.is_dispatching.get();
502 dd.is_dispatching.set(true);
503 let ret = f();
504 dd.is_dispatching.set(is_dispatching);
505 if !is_dispatching {
506 let mut to_destroy = dd.to_destroy_on_idle.borrow_mut();
507 if to_destroy.len() > 0 {
508 let mut todo = mem::take(&mut *to_destroy);
509 drop(to_destroy);
510 for dd in todo.drain(..) {
511 // SAFETY: - For a thread to dispatch, it must
512 // 1. hold the queue lock
513 // 2. set is_dispatching
514 // - The queue lock is not dropped before the dispatch finishes
515 // and is_dispatching is not reset before then.
516 // - Since we're holding the queue lock and is_dispatching is false,
517 // we know that no dispatches are currently running.
518 // - If a future dispatch starts in this thread, it will happen
519 // after this line of code.
520 // - If a future dispatch starts on another thread, it will have
521 // to acquire the queue lock and will therefore happen after we
522 // release the lock below.
523 // - Therefore this queue is idle at this point.
524 // - to_destroy_on_idle contains only destructions that are safe
525 // to run while idle.
526 unsafe {
527 dd.run();
528 }
529 }
530 let mut to_destroy = dd.to_destroy_on_idle.borrow_mut();
531 mem::swap(&mut todo, &mut *to_destroy);
532 }
533 }
534 ret
535 })
536 }
537
538 /// Blocks the current thread until at least one event has been dispatched.
539 ///
540 /// If you are in an async context, then you might want to use
541 /// [`Queue::dispatch_async`] instead.
542 ///
543 /// This function should not be used when integrating with an existing, poll-based
544 /// event loop, as it might block indefinitely. Use [`Connection::create_watcher`] and
545 /// [`Queue::dispatch_pending`] instead.
546 ///
547 /// The returned number is the number of events that have been dispatched by this
548 /// call. The number can be zero if another thread dispatched the events before us.
549 ///
550 /// # Panic
551 ///
552 /// Panics if this is a [local queue](Connection::create_local_queue) and the current
553 /// thread is not the thread that this queue was created in.
554 ///
555 /// # Example
556 ///
557 /// ```
558 /// # use wl_client::Libwayland;
559 /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
560 /// #
561 /// let lib = Libwayland::open().unwrap();
562 /// let con = lib.connect_to_default_display().unwrap();
563 /// let queue = con.create_queue(c"queue name");
564 ///
565 /// // For this example, ensure that the compositor sends an event in the near future.
566 /// let _sync = queue.display::<WlDisplay>().sync();
567 ///
568 /// queue.dispatch_blocking().unwrap();
569 /// ```
570 pub fn dispatch_blocking(&self) -> io::Result<u64> {
571 block_on(self.dispatch_async())
572 }
573
574 /// Completes when at least one event has been dispatched.
575 ///
576 /// This function is the same as [`Queue::dispatch_blocking`] except that it is async and does
577 /// not block the current thread.
578 ///
579 /// # Panic
580 ///
581 /// Panics if this is a [local queue](Connection::create_local_queue) and the thread
582 /// polling the future is not the thread that this queue was created in.
583 ///
584 /// # Example
585 ///
586 /// ```
587 /// # use wl_client::Libwayland;
588 /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
589 /// #
590 /// # tokio_test::block_on(async {
591 /// let lib = Libwayland::open().unwrap();
592 /// let con = lib.connect_to_default_display().unwrap();
593 /// let queue = con.create_queue(c"queue name");
594 ///
595 /// // For this example, ensure that the compositor sends an event in the near future.
596 /// let _sync = queue.display::<WlDisplay>().sync();
597 ///
598 /// queue.dispatch_async().await.unwrap();
599 /// # });
600 /// ```
601 pub async fn dispatch_async(&self) -> io::Result<u64> {
602 self.connection.wait_for_events(&[self]).await?;
603 self.dispatch_pending()
604 }
605
606 /// Dispatches enqueued events.
607 ///
608 /// This function does not read new events from the file descriptor.
609 ///
610 /// This function can be used together with [`BorrowedQueue::wait_for_events`] or
611 /// [`Queue::create_watcher`] to dispatch the queue in an async context or event loop
612 /// respectively.
613 ///
614 /// The returned number is the number of events that were dispatched.
615 ///
616 /// # Panic
617 ///
618 /// Panics if this is a [local queue](Connection::create_local_queue) and the current
619 /// thread is not the thread that this queue was created in.
620 ///
621 /// # Example
622 ///
623 /// ```
624 /// # use std::sync::Arc;
625 /// # use std::sync::atomic::AtomicBool;
626 /// # use std::sync::atomic::Ordering::Relaxed;
627 /// # use wl_client::{proxy, Libwayland};
628 /// # use wl_client::test_protocol_helpers::callback;
629 /// # use wl_client::test_protocols::core::wl_callback::{WlCallback, WlCallbackEventHandler, WlCallbackRef};
630 /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
631 /// #
632 /// # tokio_test::block_on(async {
633 /// let lib = Libwayland::open().unwrap();
634 /// let con = lib.connect_to_default_display().unwrap();
635 /// let queue = con.create_queue(c"queue name");
636 /// let display: WlDisplay = queue.display();
637 ///
638 /// let done = Arc::new(AtomicBool::new(false));
639 /// let done2 = done.clone();
640 /// let sync = display.sync();
641 /// proxy::set_event_handler(&sync, WlCallback::on_done(move |_, _| {
642 /// done2.store(true, Relaxed);
643 /// }));
644 ///
645 /// while !done.load(Relaxed) {
646 /// queue.wait_for_events().await.unwrap();
647 /// // Dispatch the events.
648 /// queue.dispatch_pending().unwrap();
649 /// }
650 /// # });
651 /// ```
652 pub fn dispatch_pending(&self) -> io::Result<u64> {
653 let d = &*self.queue_data;
654 let _resume_unwind = on_drop(|| {
655 if let Some(err) = DISPATCH_PANIC.take() {
656 if !panicking() {
657 resume_unwind(err);
658 }
659 }
660 });
661 let res = self.with_dispatch(|| {
662 // SAFETY: - by the invariants, the display and queue are valid
663 // - the queue was created from the display
664 // - we're inside with_dispatch which means that we're holding the
665 // reentrant queue mutex. by the invariants, this mutex protects
666 // the unsynchronized fields of the proxies.
667 unsafe {
668 d.libwayland.wl_display_dispatch_queue_pending(
669 d.borrowed.connection.wl_display().as_ptr(),
670 d.queue.as_ptr(),
671 )
672 }
673 });
674 if res == -1 {
675 return Err(io::Error::last_os_error());
676 }
677 assert!(res >= 0);
678 Ok(res as u64)
679 }
680
681 /// Blocks the current thread until the compositor has processed all previous requests
682 /// and all of its response events have been dispatched.
683 ///
684 /// If you are in an async context, then you might want to use
685 /// [`Queue::dispatch_roundtrip_async`] instead.
686 ///
687 /// Since this function usually returns quickly, you might use this function even
688 /// when integrating a wayland connection into an existing event loop and even in an
689 /// async context. For example, a library that creates buffers might use this function
690 /// during initialization to receive the full list of supported formats before
691 /// returning.
692 ///
693 /// If this function returns `Ok(())`, then the function returns after (in the sense
694 /// of the C++ memory model) the event handlers of all previous events have been
695 /// invoked.
696 ///
697 /// # Panic
698 ///
699 /// Panics if this is a [local queue](Connection::create_local_queue) and the current
700 /// thread is not the thread that this queue was created in.
701 ///
702 /// # Example
703 ///
704 /// ```
705 /// # use std::sync::Arc;
706 /// # use std::sync::atomic::AtomicBool;
707 /// # use std::sync::atomic::Ordering::Relaxed;
708 /// # use wl_client::{proxy, Libwayland};
709 /// # use wl_client::test_protocols::core::wl_callback::{WlCallback, WlCallbackEventHandler, WlCallbackRef};
710 /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
711 /// #
712 /// let lib = Libwayland::open().unwrap();
713 /// let con = lib.connect_to_default_display().unwrap();
714 /// let queue = con.create_queue(c"");
715 /// let display: WlDisplay = queue.display();
716 ///
717 /// // send some messages to the compositor
718 /// let done = Arc::new(AtomicBool::new(false));
719 /// let done2 = done.clone();
720 /// let sync = display.sync();
721 /// proxy::set_event_handler(&sync, WlCallback::on_done(move |_, _| {
722 /// done2.store(true, Relaxed);
723 /// }));
724 ///
725 /// // perform a roundtrip
726 /// queue.dispatch_roundtrip_blocking().unwrap();
727 ///
728 /// // assert that we've received the response
729 /// assert!(done.load(Relaxed));
730 /// ```
731 pub fn dispatch_roundtrip_blocking(&self) -> io::Result<()> {
732 block_on(self.dispatch_roundtrip_async())
733 }
734
735 /// Completes when the compositor has processed all previous requests and all of its
736 /// response events have been dispatched.
737 ///
738 /// This function is the same as [`Queue::dispatch_roundtrip_blocking`] except that it is async and does
739 /// not block the current thread.
740 ///
741 /// If the future completes with `Ok(())`, then the future completes after (in the
742 /// sense of the C++ memory model) the event handlers of all previous events have been
743 /// invoked.
744 ///
745 /// # Panic
746 ///
747 /// Panics if this is a [local queue](Connection::create_local_queue) and the thread
748 /// polling the future is not the thread that this queue was created in.
749 ///
750 /// # Example
751 ///
752 /// ```
753 /// # use std::sync::Arc;
754 /// # use std::sync::atomic::AtomicBool;
755 /// # use std::sync::atomic::Ordering::Relaxed;
756 /// # use wl_client::{proxy, Libwayland};
757 /// # use wl_client::test_protocols::core::wl_callback::{WlCallback, WlCallbackEventHandler, WlCallbackRef};
758 /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
759 /// #
760 /// # tokio_test::block_on(async {
761 /// let lib = Libwayland::open().unwrap();
762 /// let con = lib.connect_to_default_display().unwrap();
763 /// let queue = con.create_queue(c"queue name");
764 /// let display: WlDisplay = queue.display();
765 ///
766 /// // send some messages to the compositor
767 /// let done = Arc::new(AtomicBool::new(false));
768 /// let done2 = done.clone();
769 /// let sync = display.sync();
770 /// proxy::set_event_handler(&sync, WlCallback::on_done(move |_, _| {
771 /// done2.store(true, Relaxed);
772 /// }));
773 ///
774 /// // perform a roundtrip
775 /// queue.dispatch_roundtrip_async().await.unwrap();
776 ///
777 /// // assert that we've received the response
778 /// assert!(done.load(Relaxed));
779 /// # });
780 /// ```
781 pub async fn dispatch_roundtrip_async(&self) -> io::Result<()> {
782 #[derive(Default)]
783 struct State {
784 ready: bool,
785 waker: Option<Waker>,
786 }
787
788 struct RoundtripEventHandler(Arc<Mutex<State>>);
789 impl WlCallbackEventHandler for RoundtripEventHandler {
790 fn done(&self, _slf: &WlCallbackRef, _callback_data: u32) {
791 let waker = {
792 let state = &mut *self.0.lock();
793 state.ready = true;
794 state.waker.take()
795 };
796 if let Some(waker) = waker {
797 waker.wake();
798 }
799 }
800 }
801
802 let state = Arc::new(Mutex::new(State::default()));
803
804 let _sync = self.run_locked(|| {
805 let sync = self.display::<WlDisplay>().sync();
806 proxy::set_event_handler(&sync, RoundtripEventHandler(state.clone()));
807 sync
808 });
809
810 // NOTE: A simple 1) check flag 2) wait for events loop would be incorrect here
811 // since another thread could dispatch the queue between these two steps,
812 // leaving us blocked even though the flag has already been set. Therefore
813 // we have to make sure that this task gets woken up whenever the flag is
814 // set.
815
816 self.connection.flush()?;
817 let queues = [&**self];
818 loop {
819 let fut = self.connection.wait_for_events_without_flush(&queues);
820 let mut fut = pin!(fut);
821 let ready = poll_fn(|ctx| {
822 let mut s = state.lock();
823 if s.ready {
824 return Poll::Ready(Ok(true));
825 }
826 if let Poll::Ready(res) = fut.as_mut().poll(ctx) {
827 return Poll::Ready(res.map(|_| false));
828 }
829 s.waker = Some(ctx.waker().clone());
830 Poll::Pending
831 })
832 .await?;
833 if ready {
834 return Ok(());
835 }
836 self.dispatch_pending()?;
837 }
838 }
839
840 /// Creates a wrapper for an existing proxy.
841 ///
842 /// The wrapper will be assigned to this queue. No event handler can be assigned to
843 /// the wrapper.
844 ///
845 /// # Panic
846 ///
847 /// - Panics if the proxy and this queue don't belong to the same `wl_display`.
848 /// - Panics if the proxy is already destroyed.
849 ///
850 /// # Example
851 ///
852 /// ```
853 /// # use wl_client::{proxy, Libwayland};
854 /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
855 /// #
856 /// let lib = Libwayland::open().unwrap();
857 /// let con = lib.connect_to_default_display().unwrap();
858 ///
859 /// let queue1 = con.create_queue(c"queue name");
860 /// let display1: WlDisplay = queue1.display();
861 /// assert_eq!(proxy::queue(&display1), &*queue1);
862 ///
863 /// let queue2 = con.create_queue(c"second queue");
864 /// let display2 = queue2.wrap_proxy(&*display1);
865 /// assert_eq!(proxy::queue(&display2), &*queue2);
866 /// ```
867 pub fn wrap_proxy<P>(&self, proxy: &P) -> P::Owned
868 where
869 P: BorrowedProxy,
870 {
871 let lock = proxy::get_ref(proxy).lock();
872 let proxy = check_dispatching_proxy(lock.wl_proxy());
873 // SAFETY: - We've verified that the proxy is not null. UntypedBorrowedProxy
874 // requires that the pointer is valid in this case.
875 // - The interface of the proxy is compatible with P::Owned::WL_INTERFACE.
876 unsafe { self.wrap_wl_proxy(proxy) }
877 }
878
879 /// Creates a wrapper for an existing `wl_proxy`.
880 ///
881 /// The wrapper will be assigned to this queue. No event handler can be assigned to
882 /// the wrapper.
883 ///
884 /// If the `wl_proxy` already has a safe wrapper, the [`Queue::wrap_proxy`] function
885 /// can be used instead.
886 ///
887 /// # Panic
888 ///
889 /// - Panics if the proxy and this queue don't belong to the same `wl_display`.
890 ///
891 /// # Safety
892 ///
893 /// - `proxy` must be a valid pointer.
894 /// - `proxy` must have an interface compatible with `P::WL_INTERFACE`.
895 ///
896 /// # Example
897 ///
898 /// Some frameworks, e.g. winit, expose libwayland `wl_display` and `wl_surface`
899 /// pointers. These can be imported into this crate as follows:
900 ///
901 /// ```
902 /// # use std::ffi::c_void;
903 /// # use std::ptr::NonNull;
904 /// # use wl_client::{Libwayland, Queue};
905 /// # use wl_client::test_protocols::core::wl_surface::WlSurface;
906 /// #
907 /// unsafe fn wrap_foreign_surface(display: NonNull<c_void>, wl_surface: NonNull<c_void>) {
908 /// let lib = Libwayland::open().unwrap();
909 /// // SAFETY: ...
910 /// let con = unsafe { lib.wrap_borrowed_pointer(display.cast()).unwrap() };
911 /// let queue = con.create_queue(c"queue name");
912 /// // SAFETY: ...
913 /// let surface: WlSurface = unsafe { queue.wrap_wl_proxy(wl_surface.cast()) };
914 /// }
915 /// ```
916 pub unsafe fn wrap_wl_proxy<P>(&self, proxy: NonNull<wl_proxy>) -> P
917 where
918 P: OwnedProxy,
919 {
920 let d = &*self.queue_data;
921 // SAFETY: - It's a requirement of this function that the proxy is a valid pointer.
922 let display = unsafe { d.libwayland.wl_proxy_get_display(proxy.as_ptr()) };
923 assert_eq!(display, d.borrowed.connection.wl_display().as_ptr());
924 // SAFETY: - It's a requirement of this function that the proxy is a valid pointer.
925 let wrapper: *mut wl_proxy = unsafe {
926 d.libwayland
927 .wl_proxy_create_wrapper(proxy.as_ptr().cast())
928 .cast()
929 };
930 let wrapper = check_new_proxy(wrapper);
931 // SAFETY: - we just created wrapper so it is valid
932 // - queue is valid by the invariants
933 // - the UntypedOwnedProxy created below will hold a reference to this queue
934 // - the wrapper belongs to the same display as the proxy, and we've
935 // verified above that the proxy has the same display as this queue
936 unsafe {
937 d.libwayland
938 .wl_proxy_set_queue(wrapper.as_ptr(), d.queue.as_ptr());
939 }
940 // SAFETY: - we just created wrapper so it is valid
941 // - wrapper is a wrapper so it doesn't have an event handler
942 // - we have ownership of wrapper and hand it over
943 // - we just assigned self as the queue of wrapper
944 // - the interface is none since this is a wrapper
945 let wrapper = unsafe { UntypedOwnedProxy::from_wrapper_wl_proxy(self, wrapper) };
946 // SAFETY: - the requirement is forwarded to the caller
947 unsafe { proxy::low_level::from_untyped_owned(wrapper) }
948 }
949
950 /// Schedules a destruction to be run once the queue has become idle.
951 ///
952 /// Idle here means that
953 ///
954 /// 1. there are no dispatches running, and
955 /// 2. all future dispatches happen after this function call.
956 ///
957 /// # Panic
958 ///
959 /// Panics if this is a local queue and the current thread is not the thread that this
960 /// queue was created in.
961 ///
962 /// # Safety
963 ///
964 /// - It must be safe to run the destruction once the queue is idle.
965 pub(crate) unsafe fn run_destruction_on_idle(&self, destruction: ProxyDataDestruction) {
966 self.run_locked_(|dd| {
967 if dd.is_dispatching.get() {
968 dd.to_destroy_on_idle.borrow_mut().push(destruction);
969 } else {
970 // SAFETY: - For a thread to dispatch, it must
971 // 1. hold the queue lock
972 // 2. set is_dispatching
973 // - The queue lock is not dropped before the dispatch finishes
974 // and is_dispatching is not reset before then.
975 // - Since we're holding the queue lock and is_dispatching is false,
976 // we know that no dispatches are currently running.
977 // - If a future dispatch starts in this thread, it will happen
978 // after this line of code.
979 // - If a future dispatch starts on another thread, it will have
980 // to acquire the queue lock and will therefore happen after we
981 // release the lock below.
982 // - By the pre-conditions of this function, it is safe to run
983 // the destruction at this point.
984 unsafe {
985 destruction.run();
986 }
987 }
988 });
989 }
990
991 /// Creates a [`QueueWatcher`] for event-loop integration.
992 ///
993 /// This is a shorthand for calling [`Connection::create_watcher`] with a queue list
994 /// containing exactly this queue.
995 pub fn create_watcher(&self) -> io::Result<QueueWatcher> {
996 self.connection.create_watcher(&[self], [])
997 }
998}
999
1000impl BorrowedQueue {
1001 /// Creates a [`QueueWatcher`] for event-loop integration.
1002 ///
1003 /// This is a shorthand for calling [`Connection::create_watcher`] with a queue list
1004 /// containing exactly this queue.
1005 ///
1006 /// The [`BorrowedQueue`] is dropped when the last clone of the [`QueueWatcher`] is
1007 /// dropped.
1008 pub fn create_watcher(self) -> io::Result<QueueWatcher> {
1009 let con = self.connection.clone();
1010 con.create_watcher(&[], [self])
1011 }
1012
1013 /// Completes when there are new events in this queue.
1014 ///
1015 /// When this function returns `Ok(())`, this queue has an event queued.
1016 ///
1017 /// This is a shorthand for calling [`Connection::wait_for_events`] with a
1018 /// queue list consisting of exactly this queue.
1019 ///
1020 /// # Example
1021 ///
1022 /// ```
1023 /// # use std::future::pending;
1024 /// # use std::sync::Arc;
1025 /// # use std::sync::atomic::AtomicBool;
1026 /// # use std::sync::atomic::Ordering::{Acquire, Release};
1027 /// # use wl_client::{proxy, Libwayland};
1028 /// # use wl_client::test_protocols::core::wl_callback::{WlCallback, WlCallbackEventHandler, WlCallbackRef};
1029 /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
1030 /// #
1031 /// # tokio_test::block_on(async {
1032 /// let lib = Libwayland::open().unwrap();
1033 /// let con = lib.connect_to_default_display().unwrap();
1034 /// let queue = con.create_queue(c"queue name");
1035 /// let display: WlDisplay = queue.display();
1036 ///
1037 /// let done = Arc::new(AtomicBool::new(false));
1038 /// let done2 = done.clone();
1039 /// let sync = display.sync();
1040 /// proxy::set_event_handler(&sync, WlCallback::on_done(move |_, _| {
1041 /// done2.store(true, Release);
1042 /// }));
1043 ///
1044 /// while !done.load(Acquire) {
1045 /// queue.wait_for_events().await.unwrap();
1046 /// queue.dispatch_pending().unwrap();
1047 /// }
1048 /// # });
1049 /// ```
1050 pub async fn wait_for_events(&self) -> io::Result<()> {
1051 self.connection.wait_for_events(&[self]).await
1052 }
1053
1054 /// Returns the `wl_event_queue` representing this queue.
1055 ///
1056 /// This function returns `None` if and only if this queue is the default queue of the
1057 /// connection.
1058 ///
1059 /// The returned pointer, if any, remains valid as long as this object exists.
1060 ///
1061 /// # Example
1062 ///
1063 /// ```
1064 /// # use wl_client::Libwayland;
1065 /// #
1066 /// let lib = Libwayland::open().unwrap();
1067 /// let con = lib.connect_to_default_display().unwrap();
1068 /// let queue = con.create_queue(c"queue name");
1069 /// let default_queue = con.borrow_default_queue();
1070 ///
1071 /// assert_eq!((**queue).wl_event_queue(), Some(queue.wl_event_queue()));
1072 /// assert_eq!(default_queue.wl_event_queue(), None);
1073 /// ```
1074 pub fn wl_event_queue(&self) -> Option<NonNull<wl_event_queue>> {
1075 self.queue.map(|q| q.0)
1076 }
1077
1078 pub(crate) fn connection(&self) -> &Connection {
1079 &self.connection
1080 }
1081}
1082
1083impl Connection {
1084 /// Creates a new queue.
1085 ///
1086 /// The new queue is not a local queue. It can be dispatched from any thread. Event
1087 /// handlers attached to this queue must implement [`Send`]. See the documentation of
1088 /// [`Queue`] for a description of local and non-local queues.
1089 ///
1090 /// # Example
1091 ///
1092 /// ```
1093 /// # use wl_client::Libwayland;
1094 /// #
1095 /// let lib = Libwayland::open().unwrap();
1096 /// let con = lib.connect_to_default_display().unwrap();
1097 /// let _queue = con.create_queue(c"queue name");
1098 /// ```
1099 pub fn create_queue(&self, name: &CStr) -> QueueOwner {
1100 self.create_queue2(name, false)
1101 }
1102
1103 /// Creates a new local queue.
1104 ///
1105 /// The new queue is a local queue. It can only be dispatched from the thread that
1106 /// called this function. See the documentation of [`Queue`] for a description of
1107 /// local and non-local queues.
1108 ///
1109 /// # Example
1110 ///
1111 /// ```
1112 /// # use wl_client::Libwayland;
1113 /// #
1114 /// let lib = Libwayland::open().unwrap();
1115 /// let con = lib.connect_to_default_display().unwrap();
1116 /// let _queue = con.create_local_queue(c"queue name");
1117 /// ```
1118 pub fn create_local_queue(&self, name: &CStr) -> QueueOwner {
1119 self.create_queue2(name, true)
1120 }
1121
1122 /// Creates a new queue.
1123 fn create_queue2(&self, name: &CStr, local: bool) -> QueueOwner {
1124 // SAFETY: The display is valid and queue_name is a CString.
1125 let queue = unsafe {
1126 self.libwayland()
1127 .wl_display_create_queue_with_name(self.wl_display().as_ptr(), name.as_ptr())
1128 };
1129 let queue = NonNull::new(queue).unwrap();
1130 QueueOwner {
1131 queue: Queue {
1132 queue_data: Arc::new(QueueData {
1133 libwayland: self.libwayland(),
1134 borrowed: BorrowedQueue {
1135 connection: self.clone(),
1136 queue: Some(SyncNonNull(queue)),
1137 },
1138 name: name.to_owned(),
1139 queue: SyncNonNull(queue),
1140 mutex: match local {
1141 true => ReentrantMutex::new_thread_local(Default::default()),
1142 false => ReentrantMutex::new_shared(Default::default()),
1143 },
1144 owned_proxy_registry: Default::default(),
1145 }),
1146 },
1147 }
1148 }
1149
1150 /// Creates a [`BorrowedQueue`] representing the default queue.
1151 pub fn borrow_default_queue(&self) -> BorrowedQueue {
1152 BorrowedQueue {
1153 connection: self.clone(),
1154 queue: None,
1155 }
1156 }
1157
1158 /// Creates a [`BorrowedQueue`] representing a `wl_event_queue` pointer.
1159 ///
1160 /// # Safety
1161 ///
1162 /// - The queue must be valid and stay valid for the lifetime of the [`BorrowedQueue`].
1163 /// - The queue must belong to this connection.
1164 pub unsafe fn borrow_foreign_queue(&self, queue: NonNull<wl_event_queue>) -> BorrowedQueue {
1165 BorrowedQueue {
1166 connection: self.clone(),
1167 queue: Some(SyncNonNull(queue)),
1168 }
1169 }
1170}
1171
1172impl Drop for QueueOwner {
1173 fn drop(&mut self) {
1174 // To catch errors early, acquire the lock unconditionally even if there are no
1175 // proxies to be destroyed.
1176 self.run_locked(|| ());
1177 self.queue.queue_data.owned_proxy_registry.destroy_all();
1178 }
1179}
1180
1181impl Drop for QueueData {
1182 fn drop(&mut self) {
1183 // SAFETY: - queue is always a valid pointer until this call
1184 // - all proxies attached to the queue hold a reference to the queue,
1185 // therefore this function does not run until all proxies are dropped,
1186 // which causes the proxies to be destroyed
1187 unsafe {
1188 self.libwayland.wl_event_queue_destroy(self.queue.as_ptr());
1189 }
1190 }
1191}
1192
1193impl PartialEq for Queue {
1194 fn eq(&self, other: &Queue) -> bool {
1195 self.queue_data.queue == other.queue_data.queue
1196 }
1197}
1198
1199impl Eq for Queue {}
1200
1201impl Debug for QueueOwner {
1202 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1203 self.queue.fmt(f)
1204 }
1205}
1206
1207impl Debug for Queue {
1208 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1209 f.debug_struct("Queue")
1210 .field("wl_event_queue", &self.wl_event_queue())
1211 .field("name", &self.queue_data.name)
1212 .finish_non_exhaustive()
1213 }
1214}
1215
1216impl Debug for DispatchLock<'_> {
1217 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1218 f.debug_struct("DispatchLock").finish_non_exhaustive()
1219 }
1220}
1221
1222impl Deref for Queue {
1223 type Target = BorrowedQueue;
1224
1225 fn deref(&self) -> &Self::Target {
1226 &self.queue_data.borrowed
1227 }
1228}