#[expect(unused_imports)]
use crate::queue::QueueData;
use {
crate::{Connection, Queue, QueueOwner, utils::block_on::block_on},
std::{
any::{TypeId, type_name},
ffi::CStr,
fmt::{Debug, Formatter},
io,
marker::PhantomData,
ops::Deref,
ptr,
},
};
#[cfg(test)]
mod tests;
pub struct QueueWithData<T>
where
T: 'static,
{
queue: Queue,
_phantom: PhantomData<fn(&mut T)>,
}
impl Connection {
pub fn create_queue_with_data<T>(&self, name: &CStr) -> (QueueOwner, QueueWithData<T>)
where
T: 'static,
{
let owner =
self.create_queue2(name, false, Some(TypeId::of::<T>()), Some(type_name::<T>()));
let queue = owner.with_data();
(owner, queue)
}
pub fn create_local_queue_with_data<T>(&self, name: &CStr) -> (QueueOwner, QueueWithData<T>)
where
T: 'static,
{
let owner = self.create_queue2(name, true, Some(TypeId::of::<T>()), Some(type_name::<T>()));
let queue = owner.with_data();
(owner, queue)
}
}
impl Queue {
pub fn with_data<T>(&self) -> QueueWithData<T>
where
T: 'static,
{
let d = &*self.queue_data;
if d.mut_data_type.is_some() && d.mut_data_type != Some(TypeId::of::<T>()) {
let rn = type_name::<T>();
panic!(
"This queue only supports mutable data of type `{}` but the \
requested type is `{rn}`",
d.mut_data_type_name.unwrap(),
);
}
QueueWithData {
queue: self.clone(),
_phantom: Default::default(),
}
}
pub(crate) fn mut_data_type(&self) -> (Option<TypeId>, Option<&'static str>) {
let d = &*self.queue_data;
(d.mut_data_type, d.mut_data_type_name)
}
pub(crate) unsafe fn data(&self) -> *mut u8 {
unsafe { self.queue_data.mut_data.get().0 }
}
}
impl<T> QueueWithData<T>
where
T: 'static,
{
pub fn dispatch_blocking(&self, data: &mut T) -> io::Result<u64> {
block_on(self.dispatch_async(data))
}
pub async fn dispatch_async(&self, data: &mut T) -> io::Result<u64> {
self.connection.wait_for_events(&[self]).await?;
self.dispatch_pending(data)
}
pub fn dispatch_roundtrip_blocking(&self, data: &mut T) -> io::Result<()> {
block_on(self.dispatch_roundtrip_async(data))
}
pub async fn dispatch_roundtrip_async(&self, data: &mut T) -> io::Result<()> {
self.dispatch_roundtrip_async_internal(|| self.dispatch_pending(data))
.await
}
pub fn dispatch_pending(&self, data: &mut T) -> io::Result<u64> {
unsafe { self.dispatch_pending_internal(ptr::from_mut(data).cast()) }
}
}
impl<T> Clone for QueueWithData<T>
where
T: 'static,
{
fn clone(&self) -> Self {
Self {
queue: self.queue.clone(),
_phantom: Default::default(),
}
}
}
impl<T> Deref for QueueWithData<T>
where
T: 'static,
{
type Target = Queue;
fn deref(&self) -> &Self::Target {
&self.queue
}
}
impl<T> PartialEq for QueueWithData<T>
where
T: 'static,
{
fn eq(&self, other: &Self) -> bool {
self.queue == other.queue
}
}
impl<T> Eq for QueueWithData<T> where T: 'static {}
impl<T> Debug for QueueWithData<T>
where
T: 'static,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Debug::fmt(&self.queue, f)
}
}