pub use with_data::QueueWithData;
use {
crate::{
Libwayland, QueueWatcher,
connection::Connection,
ffi::{interface_compatible, wl_event_queue, wl_proxy},
protocols::wayland::{
wl_callback::{WlCallbackEventHandler, WlCallbackRef},
wl_display::WlDisplay,
},
proxy::{
self, BorrowedProxy, OwnedProxy,
low_level::{
OwnedProxyRegistry, ProxyDataDestruction, UntypedOwnedProxy,
check_dispatching_proxy, check_new_proxy, owned::DISPATCH_PANIC,
},
},
utils::{
block_on::block_on,
reentrant_mutex::{ReentrantMutex, ReentrantMutexGuard},
sync_cell::SyncCell,
sync_ptr::{SyncNonNull, SyncPtr},
},
},
parking_lot::Mutex,
run_on_drop::on_drop,
std::{
any::TypeId,
cell::{Cell, RefCell},
ffi::{CStr, CString},
fmt::{Debug, Formatter},
future::poll_fn,
io, mem,
ops::Deref,
panic::resume_unwind,
pin::pin,
ptr::{self, NonNull},
sync::Arc,
task::{Poll, Waker},
thread::panicking,
},
};
#[cfg(test)]
mod tests;
mod with_data;
pub struct QueueOwner {
queue: Queue,
}
#[expect(clippy::doc_overindented_list_items)]
#[derive(Clone)]
pub struct Queue {
queue_data: Arc<QueueData>,
}
pub struct BorrowedQueue {
connection: Connection,
queue: Option<SyncNonNull<wl_event_queue>>,
}
pub struct DispatchLock<'a> {
_lock: ReentrantMutexGuard<'a, DispatchData>,
}
struct QueueData {
libwayland: &'static Libwayland,
borrowed: BorrowedQueue,
name: CString,
queue: SyncNonNull<wl_event_queue>,
mutex: ReentrantMutex<DispatchData>,
mut_data_type: Option<TypeId>,
mut_data_type_name: Option<&'static str>,
mut_data: SyncCell<SyncPtr<u8>>,
owned_proxy_registry: OwnedProxyRegistry,
}
#[derive(Default)]
struct DispatchData {
is_dispatching: Cell<bool>,
to_destroy_on_idle: RefCell<Vec<ProxyDataDestruction>>,
}
impl Deref for QueueOwner {
type Target = Queue;
fn deref(&self) -> &Self::Target {
&self.queue
}
}
impl Queue {
pub fn libwayland(&self) -> &'static Libwayland {
self.queue_data.libwayland
}
pub fn connection(&self) -> &Connection {
&self.queue_data.borrowed.connection
}
pub fn lock_dispatch(&self) -> DispatchLock<'_> {
DispatchLock {
_lock: self.queue_data.mutex.lock(),
}
}
pub fn display<T>(&self) -> T
where
T: OwnedProxy,
{
let compatible = unsafe { interface_compatible(WlDisplay::WL_INTERFACE, T::WL_INTERFACE) };
if !compatible {
panic!("T::WL_INTERFACE is not compatible with wl_display");
}
unsafe { self.wrap_wl_proxy(self.connection().wl_display().cast()) }
}
pub fn is_local(&self) -> bool {
self.queue_data.mutex.is_thread_local()
}
pub fn is_non_local(&self) -> bool {
!self.is_local()
}
pub fn wl_event_queue(&self) -> NonNull<wl_event_queue> {
self.queue_data.queue.0
}
pub(crate) fn owned_proxy_registry(&self) -> &OwnedProxyRegistry {
&self.queue_data.owned_proxy_registry
}
pub(crate) fn run_locked<T>(&self, f: impl FnOnce() -> T) -> T {
self.run_locked_(|_| f())
}
fn run_locked_<T>(&self, f: impl FnOnce(&DispatchData) -> T) -> T {
let lock = self.queue_data.mutex.lock();
f(&lock)
}
fn with_dispatch<T>(&self, f: impl FnOnce() -> T) -> T {
self.run_locked_(|dd| {
let is_dispatching = dd.is_dispatching.get();
dd.is_dispatching.set(true);
let ret = f();
dd.is_dispatching.set(is_dispatching);
if !is_dispatching {
let mut to_destroy = dd.to_destroy_on_idle.borrow_mut();
if to_destroy.len() > 0 {
let mut todo = mem::take(&mut *to_destroy);
drop(to_destroy);
for dd in todo.drain(..) {
unsafe {
dd.run();
}
}
let mut to_destroy = dd.to_destroy_on_idle.borrow_mut();
mem::swap(&mut todo, &mut *to_destroy);
}
}
ret
})
}
pub fn dispatch_blocking(&self) -> io::Result<u64> {
block_on(self.dispatch_async())
}
pub async fn dispatch_async(&self) -> io::Result<u64> {
self.connection.wait_for_events(&[self]).await?;
self.dispatch_pending()
}
pub fn dispatch_pending(&self) -> io::Result<u64> {
let d = &*self.queue_data;
if d.mut_data_type.is_some() {
panic!(
"Queue requires mutable data of type `{}` to be dispatched",
d.mut_data_type_name.unwrap(),
);
}
unsafe { self.dispatch_pending_internal(ptr::from_mut(&mut ()).cast()) }
}
unsafe fn dispatch_pending_internal(&self, mut_data: *mut u8) -> io::Result<u64> {
let d = &*self.queue_data;
let _resume_unwind = on_drop(|| {
if let Some(err) = DISPATCH_PANIC.take() {
if !panicking() {
resume_unwind(err);
}
}
});
let res = self.with_dispatch(|| {
let md = &self.queue_data.mut_data;
let prev_mut_data = unsafe { md.replace(SyncPtr(mut_data)) };
let _reset_mut_data = on_drop(|| {
unsafe { md.set(prev_mut_data) }
});
unsafe {
d.libwayland.wl_display_dispatch_queue_pending(
d.borrowed.connection.wl_display().as_ptr(),
d.queue.as_ptr(),
)
}
});
if res == -1 {
return Err(io::Error::last_os_error());
}
assert!(res >= 0);
Ok(res as u64)
}
pub fn dispatch_roundtrip_blocking(&self) -> io::Result<()> {
block_on(self.dispatch_roundtrip_async())
}
pub async fn dispatch_roundtrip_async(&self) -> io::Result<()> {
self.dispatch_roundtrip_async_internal(|| self.dispatch_pending())
.await
}
async fn dispatch_roundtrip_async_internal(
&self,
mut dispatch_pending: impl FnMut() -> io::Result<u64>,
) -> io::Result<()> {
#[derive(Default)]
struct State {
ready: bool,
waker: Option<Waker>,
}
struct RoundtripEventHandler(Arc<Mutex<State>>);
impl WlCallbackEventHandler for RoundtripEventHandler {
fn done(&self, _slf: &WlCallbackRef, _callback_data: u32) {
let waker = {
let state = &mut *self.0.lock();
state.ready = true;
state.waker.take()
};
if let Some(waker) = waker {
waker.wake();
}
}
}
let state = Arc::new(Mutex::new(State::default()));
let _sync = self.run_locked(|| {
let sync = self.display::<WlDisplay>().sync();
proxy::set_event_handler(&sync, RoundtripEventHandler(state.clone()));
sync
});
self.connection.flush()?;
let queues = [&**self];
loop {
let fut = self.connection.wait_for_events_without_flush(&queues);
let mut fut = pin!(fut);
let ready = poll_fn(|ctx| {
let mut s = state.lock();
if s.ready {
return Poll::Ready(Ok(true));
}
if let Poll::Ready(res) = fut.as_mut().poll(ctx) {
return Poll::Ready(res.map(|_| false));
}
s.waker = Some(ctx.waker().clone());
Poll::Pending
})
.await?;
if ready {
return Ok(());
}
dispatch_pending()?;
}
}
pub fn wrap_proxy<P>(&self, proxy: &P) -> P::Owned
where
P: BorrowedProxy,
{
let lock = proxy::get_ref(proxy).lock();
let proxy = check_dispatching_proxy(lock.wl_proxy());
unsafe { self.wrap_wl_proxy(proxy) }
}
pub unsafe fn wrap_wl_proxy<P>(&self, proxy: NonNull<wl_proxy>) -> P
where
P: OwnedProxy,
{
let d = &*self.queue_data;
let display = unsafe { d.libwayland.wl_proxy_get_display(proxy.as_ptr()) };
assert_eq!(display, d.borrowed.connection.wl_display().as_ptr());
let wrapper: *mut wl_proxy = unsafe {
d.libwayland
.wl_proxy_create_wrapper(proxy.as_ptr().cast())
.cast()
};
let wrapper = check_new_proxy(wrapper);
unsafe {
d.libwayland
.wl_proxy_set_queue(wrapper.as_ptr(), d.queue.as_ptr());
}
let wrapper = unsafe { UntypedOwnedProxy::from_wrapper_wl_proxy(self, wrapper) };
unsafe { proxy::low_level::from_untyped_owned(wrapper) }
}
pub(crate) unsafe fn run_destruction_on_idle(&self, destruction: ProxyDataDestruction) {
self.run_locked_(|dd| {
if dd.is_dispatching.get() {
dd.to_destroy_on_idle.borrow_mut().push(destruction);
} else {
unsafe {
destruction.run();
}
}
});
}
pub fn create_watcher(&self) -> io::Result<QueueWatcher> {
self.connection.create_watcher(&[self], [])
}
}
impl BorrowedQueue {
pub fn create_watcher(self) -> io::Result<QueueWatcher> {
let con = self.connection.clone();
con.create_watcher(&[], [self])
}
pub async fn wait_for_events(&self) -> io::Result<()> {
self.connection.wait_for_events(&[self]).await
}
pub fn wl_event_queue(&self) -> Option<NonNull<wl_event_queue>> {
self.queue.map(|q| q.0)
}
pub(crate) fn connection(&self) -> &Connection {
&self.connection
}
}
impl Connection {
pub fn create_queue(&self, name: &CStr) -> QueueOwner {
self.create_queue2(name, false, None, None)
}
pub fn create_local_queue(&self, name: &CStr) -> QueueOwner {
self.create_queue2(name, true, None, None)
}
fn create_queue2(
&self,
name: &CStr,
local: bool,
mut_data_type: Option<TypeId>,
mut_data_type_name: Option<&'static str>,
) -> QueueOwner {
let queue = unsafe {
self.libwayland()
.wl_display_create_queue_with_name(self.wl_display().as_ptr(), name.as_ptr())
};
let queue = NonNull::new(queue).unwrap();
QueueOwner {
queue: Queue {
queue_data: Arc::new(QueueData {
libwayland: self.libwayland(),
borrowed: BorrowedQueue {
connection: self.clone(),
queue: Some(SyncNonNull(queue)),
},
name: name.to_owned(),
queue: SyncNonNull(queue),
mutex: match local {
true => ReentrantMutex::new_thread_local(Default::default()),
false => ReentrantMutex::new_shared(Default::default()),
},
mut_data_type,
mut_data_type_name,
mut_data: SyncCell::new(SyncPtr(ptr::from_mut(&mut ()).cast())),
owned_proxy_registry: Default::default(),
}),
},
}
}
pub fn borrow_default_queue(&self) -> BorrowedQueue {
BorrowedQueue {
connection: self.clone(),
queue: None,
}
}
pub unsafe fn borrow_foreign_queue(&self, queue: NonNull<wl_event_queue>) -> BorrowedQueue {
BorrowedQueue {
connection: self.clone(),
queue: Some(SyncNonNull(queue)),
}
}
}
impl Drop for QueueOwner {
fn drop(&mut self) {
self.run_locked(|| ());
self.queue.queue_data.owned_proxy_registry.destroy_all();
}
}
impl Drop for QueueData {
fn drop(&mut self) {
unsafe {
self.libwayland.wl_event_queue_destroy(self.queue.as_ptr());
}
}
}
impl PartialEq for Queue {
fn eq(&self, other: &Queue) -> bool {
self.queue_data.queue == other.queue_data.queue
}
}
impl Eq for Queue {}
impl Debug for QueueOwner {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.queue.fmt(f)
}
}
impl Debug for Queue {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Queue")
.field("wl_event_queue", &self.wl_event_queue())
.field("name", &self.queue_data.name)
.finish_non_exhaustive()
}
}
impl Debug for DispatchLock<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DispatchLock").finish_non_exhaustive()
}
}
impl Deref for Queue {
type Target = BorrowedQueue;
fn deref(&self) -> &Self::Target {
&self.queue_data.borrowed
}
}