use std::any::Any;
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, Weak};
use crate::{BusInner, MessageStore};
pub struct Message<'bus, T>
where
T: 'static + Send,
{
message: Arc<DynMessage>,
data: Option<Box<T>>,
_phantom_data: PhantomData<&'bus BusInner>,
}
impl<'bus, T> Message<'bus, T>
where
T: 'static + Send,
{
pub(crate) fn new(message: Arc<DynMessage>) -> Self {
assert!(
!message.is_borrowed.load(Ordering::SeqCst),
"data race: cannot borrow a message twice"
);
message.is_borrowed.store(true, Ordering::SeqCst);
let data = message.take().unwrap();
Self {
message,
data: Some(data),
_phantom_data: PhantomData,
}
}
pub fn consume(mut self) -> T {
let data = *self.data.take().unwrap();
self.message.is_borrowed.store(false, Ordering::SeqCst);
data
}
}
impl<T> Deref for Message<'_, T>
where
T: 'static + Send,
{
type Target = T;
fn deref(&self) -> &Self::Target {
self.data.as_ref().unwrap()
}
}
impl<T> Drop for Message<'_, T>
where
T: 'static + Send,
{
fn drop(&mut self) {
if let Some(data) = self.data.take() {
self.message.is_borrowed.store(false, Ordering::SeqCst);
self.message.put(data);
}
}
}
pub(crate) struct DynMessage {
data: Mutex<Option<Box<dyn Any + Send>>>,
is_borrowed: AtomicBool,
store: Weak<MessageStore>,
}
impl DynMessage {
pub fn new<T>(data: T, store: Weak<MessageStore>) -> DynMessage
where
T: 'static + Send,
{
Self {
data: Mutex::new(Some(Box::new(data))),
store,
is_borrowed: AtomicBool::new(false),
}
}
pub fn is_free(&self) -> bool {
let data = self.data.lock().unwrap();
!self.is_borrowed.load(Ordering::SeqCst) && data.is_none()
}
pub fn is<T>(&self) -> bool
where
T: 'static + Send,
{
let data = self.data.lock().unwrap();
match data.deref() {
Some(x) => x.is::<T>(),
None => false,
}
}
pub fn take<T>(&self) -> Option<Box<T>>
where
T: 'static + Send,
{
if self.is::<T>() {
let mut data = self.data.lock().unwrap();
let boxed = data.take()?;
self.store.upgrade().unwrap().message_count.fetch_sub(1, Ordering::SeqCst);
Some(boxed.downcast::<T>().ok()?)
} else {
None
}
}
pub fn put<T>(&self, data: Box<T>)
where
T: 'static + Send,
{
assert!(
!self.is_borrowed.load(Ordering::SeqCst),
"data race: attempt to put() a value in a borrowed message"
);
let mut locked = self.data.lock().unwrap();
locked.replace(data);
}
}