use crate::{
condvar_api::RawCondvar,
locking::{Condvar, RawMutex},
Error, Result,
};
use lock_api::RawMutex as RawMutexTrait;
use std::{sync::Arc, time::Duration};
struct CellValue<P> {
current: Option<P>,
closed: bool,
}
impl<P> Default for CellValue<P> {
fn default() -> Self {
Self {
current: None,
closed: false,
}
}
}
struct DataCellInner<P, M, CV> {
value: lock_api::Mutex<M, CellValue<P>>,
data_available: CV,
}
pub struct DataCell<P, M = RawMutex, CV = Condvar> {
inner: Arc<DataCellInner<P, M, CV>>,
}
impl<P, M, CV> Default for DataCell<P, M, CV>
where
M: RawMutexTrait,
CV: RawCondvar,
{
fn default() -> Self {
Self {
inner: Arc::new(DataCellInner {
value: <_>::default(),
data_available: CV::new(),
}),
}
}
}
impl<P, M, CV> Clone for DataCell<P, M, CV>
where
M: RawMutexTrait,
CV: RawCondvar,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<P, M, CV> DataCell<P, M, CV>
where
M: RawMutexTrait,
CV: RawCondvar + RawCondvar<RawMutex = M>,
{
pub fn new() -> Self {
Self::default()
}
pub fn close(&self) {
let mut value = self.inner.value.lock();
value.closed = true;
self.inner.data_available.notify_all();
}
pub fn is_closed(&self) -> bool {
let value = self.inner.value.lock();
value.closed
}
pub fn set(&self, data: P) {
let mut value = self.inner.value.lock();
value.current = Some(data);
self.inner.data_available.notify_one();
}
pub fn replace(&self, data: P) -> Option<P> {
let mut value = self.inner.value.lock();
let prev = value.current.replace(data);
self.inner.data_available.notify_one();
prev
}
pub fn get(&self) -> Result<P> {
let mut value = self.inner.value.lock();
loop {
if value.closed {
return Err(Error::ChannelClosed);
}
if let Some(current) = value.current.take() {
return Ok(current);
}
self.inner
.data_available
.wait::<CellValue<P>, M>(&mut value);
}
}
pub fn get_timeout(&self, timeout: Duration) -> Result<P> {
let mut value = self.inner.value.lock();
loop {
if value.closed {
return Err(Error::ChannelClosed);
}
if let Some(current) = value.current.take() {
return Ok(current);
}
if self
.inner
.data_available
.wait_for::<CellValue<P>, M>(&mut value, timeout)
.timed_out()
{
return Err(Error::Timeout);
}
}
}
pub fn try_get(&self) -> Result<P> {
let mut value = self.inner.value.lock();
if value.closed {
return Err(Error::ChannelClosed);
}
value.current.take().ok_or(Error::ChannelEmpty)
}
}
impl<P, M, CV> Iterator for DataCell<P, M, CV>
where
M: RawMutexTrait,
CV: RawCondvar + RawCondvar<RawMutex = M>,
{
type Item = P;
fn next(&mut self) -> Option<Self::Item> {
self.get().ok()
}
}
#[cfg(test)]
mod test {
use std::{thread, time::Duration};
use crate::Error;
use super::DataCell;
#[test]
fn test_datacell() {
let cell: DataCell<_> = DataCell::new();
let cell2 = cell.clone();
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
cell2.set(42);
});
assert_eq!(cell.get().unwrap(), 42);
handle.join().unwrap();
}
#[test]
fn test_datacell_close() {
let cell: DataCell<_> = DataCell::new();
let cell2 = cell.clone();
let handle = thread::spawn(move || {
cell2.set(42);
});
cell.close();
assert!(matches!(cell.get().unwrap_err(), Error::ChannelClosed));
handle.join().unwrap();
}
#[test]
fn test_datacell_close_empty() {
let cell: DataCell<u8> = DataCell::new();
let cell2 = cell.clone();
let handle = thread::spawn(
move || cell2.get(), );
thread::sleep(Duration::from_millis(100));
cell.close();
assert!(matches!(handle.join().unwrap(), Err(Error::ChannelClosed)));
assert!(matches!(cell.get().unwrap_err(), Error::ChannelClosed));
}
#[test]
fn test_datacell_try_get() {
let cell: DataCell<_> = DataCell::new();
assert!(matches!(cell.try_get().unwrap_err(), Error::ChannelEmpty));
let cell2 = cell.clone();
let handle = thread::spawn(move || {
cell2.set(42);
});
handle.join().unwrap();
assert_eq!(cell.try_get().unwrap(), 42);
}
#[test]
fn test_datacell_other_mutex() {
let cell: DataCell<_, parking_lot_rt::RawMutex, parking_lot_rt::Condvar> = DataCell::new();
cell.set(42);
assert_eq!(cell.get().unwrap(), 42);
}
}