use alloc::sync::Arc;
use core::time::Duration;
use owner_monad::OwnerMut;
use super::{
handle_event, Event, EventHandle, GenericSleep, Instant, Mutex, Selectable, Semaphore,
TIMEOUT_MAX,
};
use crate::error::Error;
pub struct SendChannel<T>(Arc<ChannelShared<T>>);
impl<T> SendChannel<T> {
pub fn select(&self, value: T) -> impl '_ + Selectable {
struct SendSelect<'b, T> {
value: T,
data: &'b ChannelShared<T>,
handle: EventHandle<SendWrapper<'b, T>>,
}
impl<'b, T> Selectable for SendSelect<'b, T> {
fn poll(self) -> Result<(), Self> {
let _send_lock = self.data.send_mutex.lock();
assert_eq!(self.data.ack_sem.count(), 0);
let n = {
let mut lock = self.data.data.lock();
lock.value = Some(self.value);
lock.seq = !lock.seq;
lock.receive_event.notify();
lock.receive_event.task_count()
};
for _ in 0..n {
self.data
.ack_sem
.wait(Duration::from_millis(TIMEOUT_MAX as u64))
.unwrap_or_else(|err| panic!("failed to synchronize on channel: {}", err));
}
if let Some(value) = self.data.data.lock().value.take() {
Err(Self {
value,
data: self.data,
handle: self.handle,
})
} else {
Ok(())
}
}
fn sleep(&self) -> GenericSleep {
if self.data.data.lock().receive_event.task_count() == 0 {
GenericSleep::NotifyTake(None)
} else {
GenericSleep::Timestamp(Instant::from_millis(0))
}
}
}
SendSelect {
value,
data: &self.0,
handle: handle_event(SendWrapper(&*self.0)),
}
}
}
impl<T> Clone for SendChannel<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
pub struct ReceiveChannel<T>(Arc<ChannelShared<T>>);
impl<T> ReceiveChannel<T> {
pub fn select(&self) -> impl '_ + Selectable<T> {
struct ReceiveSelect<'b, T> {
data: &'b ChannelShared<T>,
handle: EventHandle<ReceiveWrapper<'b, T>>,
seq: bool,
}
impl<'b, T> Selectable<T> for ReceiveSelect<'b, T> {
fn poll(mut self) -> core::result::Result<T, Self> {
let mut lock = self.data.data.lock();
if self.seq != lock.seq {
self.data.ack_sem.post().unwrap_or(());
self.seq = lock.seq;
}
if let Some(value) = lock.value.take() {
self.handle.clear();
Ok(value)
} else {
lock.send_event.notify();
Err(self)
}
}
fn sleep(&self) -> GenericSleep {
if self.data.data.lock().send_event.task_count() == 0 {
GenericSleep::NotifyTake(None)
} else {
GenericSleep::Timestamp(Instant::from_millis(0))
}
}
}
impl<'b, T> Drop for ReceiveSelect<'b, T> {
fn drop(&mut self) {
let lock = self.data.data.lock();
if self.seq != lock.seq && !self.handle.is_done() {
self.data.ack_sem.post().unwrap_or(());
}
self.handle.clear();
}
}
let lock = self.0.data.lock();
ReceiveSelect {
data: &self.0,
handle: handle_event(ReceiveWrapper(&*self.0)),
seq: lock.seq,
}
}
}
impl<T> Clone for ReceiveChannel<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
pub fn channel<T>() -> (SendChannel<T>, ReceiveChannel<T>) {
try_channel().unwrap_or_else(|err| panic!("failed to create channel: {}", err))
}
pub fn try_channel<T>() -> Result<(SendChannel<T>, ReceiveChannel<T>), Error> {
let data = Arc::new(ChannelShared {
data: Mutex::try_new(ChannelData {
send_event: Event::new(),
receive_event: Event::new(),
value: None,
seq: false,
})?,
send_mutex: Mutex::try_new(())?,
ack_sem: Semaphore::try_new(u32::MAX, 0)?,
});
let send = SendChannel(data.clone());
let receive = ReceiveChannel(data);
Ok((send, receive))
}
struct ChannelShared<T> {
data: Mutex<ChannelData<T>>,
send_mutex: Mutex<()>,
ack_sem: Semaphore,
}
struct ChannelData<T> {
send_event: Event,
receive_event: Event,
value: Option<T>,
seq: bool,
}
struct SendWrapper<'b, T>(&'b ChannelShared<T>);
impl<'b, T> OwnerMut<Event> for SendWrapper<'b, T> {
fn with<'a, U>(&'a mut self, f: impl FnOnce(&mut Event) -> U) -> Option<U>
where
Event: 'a,
{
Some(f(&mut self.0.data.try_lock().ok()?.send_event))
}
}
struct ReceiveWrapper<'b, T>(&'b ChannelShared<T>);
impl<'b, T> OwnerMut<Event> for ReceiveWrapper<'b, T> {
fn with<'a, U>(&'a mut self, f: impl FnOnce(&mut Event) -> U) -> Option<U>
where
Event: 'a,
{
Some(f(&mut self.0.data.try_lock().ok()?.receive_event))
}
}