use crate::error::{Error, Result};
use crate::sys;
use std::marker::PhantomData;
use std::os::raw::c_void;
use std::ptr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
struct Inner<T> {
raw: *mut sys::llam_channel_t,
senders: AtomicUsize,
_marker: PhantomData<fn() -> T>,
}
unsafe impl<T: Send> Send for Inner<T> {}
unsafe impl<T: Send> Sync for Inner<T> {}
impl<T> Drop for Inner<T> {
fn drop(&mut self) {
if !self.raw.is_null() {
unsafe {
let _ = sys::llam_channel_close(self.raw);
loop {
let mut ptr = ptr::null_mut();
if sys::llam_channel_recv_until_result(self.raw, 0, &mut ptr) != 0 {
break;
}
if !ptr.is_null() {
drop(Box::from_raw(ptr as *mut T));
}
}
let _ = sys::llam_channel_destroy(self.raw);
}
}
}
}
pub struct Sender<T> {
inner: Arc<Inner<T>>,
_marker: PhantomData<fn(T)>,
}
pub struct Receiver<T> {
inner: Arc<Inner<T>>,
_marker: PhantomData<fn() -> T>,
}
unsafe impl<T: Send> Send for Sender<T> {}
unsafe impl<T: Send> Sync for Sender<T> {}
unsafe impl<T: Send> Send for Receiver<T> {}
unsafe impl<T: Send> Sync for Receiver<T> {}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
self.inner.senders.fetch_add(1, Ordering::Relaxed);
Self {
inner: Arc::clone(&self.inner),
_marker: PhantomData,
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
if self.inner.senders.fetch_sub(1, Ordering::AcqRel) == 1 {
unsafe {
let _ = sys::llam_channel_close(self.inner.raw);
}
}
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
_marker: PhantomData,
}
}
}
pub fn bounded<T: Send>(capacity: usize) -> Result<(Sender<T>, Receiver<T>)> {
let raw = unsafe { sys::llam_channel_create(capacity) };
if raw.is_null() {
return Err(Error::last());
}
let inner = Arc::new(Inner {
raw,
senders: AtomicUsize::new(1),
_marker: PhantomData,
});
Ok((
Sender {
inner: Arc::clone(&inner),
_marker: PhantomData,
},
Receiver {
inner,
_marker: PhantomData,
},
))
}
pub struct SendError<T> {
pub value: T,
pub error: Error,
}
impl<T: std::fmt::Debug> std::fmt::Debug for SendError<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SendError")
.field("value", &self.value)
.field("error", &self.error)
.finish()
}
}
impl<T> std::fmt::Display for SendError<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "send failed: {}", self.error)
}
}
impl<T: std::fmt::Debug> std::error::Error for SendError<T> {}
impl<T> Sender<T> {
pub fn send(&self, value: T) -> std::result::Result<(), SendError<T>> {
self.send_ptr(value, None)
}
pub fn send_timeout(
&self,
value: T,
timeout: Duration,
) -> std::result::Result<(), SendError<T>> {
self.send_ptr(value, Some(crate::time::deadline_after(timeout)))
}
pub fn try_send(&self, value: T) -> std::result::Result<(), SendError<T>> {
self.send_ptr(value, Some(0))
}
fn send_ptr(&self, value: T, deadline: Option<u64>) -> std::result::Result<(), SendError<T>> {
let ptr = Box::into_raw(Box::new(value)) as *mut c_void;
let rc = unsafe {
match deadline {
Some(deadline) => sys::llam_channel_send_until(self.inner.raw, ptr, deadline),
None => sys::llam_channel_send(self.inner.raw, ptr),
}
};
if rc == 0 {
Ok(())
} else {
let value = unsafe { *Box::from_raw(ptr as *mut T) };
Err(SendError {
value,
error: Error::last(),
})
}
}
pub fn close(&self) -> Result<()> {
let rc = unsafe { sys::llam_channel_close(self.inner.raw) };
if rc == 0 {
Ok(())
} else {
Err(Error::last())
}
}
pub(crate) fn raw(&self) -> *mut sys::llam_channel_t {
self.inner.raw
}
}
impl<T> Receiver<T> {
pub fn recv(&self) -> Result<T> {
self.recv_deadline(None)
}
pub fn recv_timeout(&self, timeout: Duration) -> Result<T> {
self.recv_deadline(Some(crate::time::deadline_after(timeout)))
}
pub fn try_recv(&self) -> Result<T> {
self.recv_deadline(Some(0))
}
pub fn recv_option(&self) -> Result<Option<T>> {
self.recv_option_deadline(None)
}
pub fn recv_timeout_option(&self, timeout: Duration) -> Result<Option<T>> {
self.recv_option_deadline(Some(crate::time::deadline_after(timeout)))
}
pub fn try_recv_option(&self) -> Result<Option<T>> {
self.recv_option_deadline(Some(0))
}
fn recv_deadline(&self, deadline: Option<u64>) -> Result<T> {
match self.recv_option_deadline(deadline)? {
Some(value) => Ok(value),
None => Err(Error::from_errno(libc::EPIPE)),
}
}
fn recv_option_deadline(&self, deadline: Option<u64>) -> Result<Option<T>> {
let mut ptr = ptr::null_mut();
let rc = unsafe {
match deadline {
Some(deadline) => {
sys::llam_channel_recv_until_result(self.inner.raw, deadline, &mut ptr)
}
None => sys::llam_channel_recv_result(self.inner.raw, &mut ptr),
}
};
if rc != 0 {
let error = Error::last();
if error.is_closed() {
return Ok(None);
}
return Err(error);
}
if ptr.is_null() {
Ok(None)
} else {
Ok(Some(unsafe { *Box::from_raw(ptr as *mut T) }))
}
}
#[doc(hidden)]
pub unsafe fn __take_selected(&self, ptr: *mut c_void) -> Result<T> {
if ptr.is_null() {
return Err(Error::from_errno(libc::EPIPE));
}
Ok(*Box::from_raw(ptr as *mut T))
}
pub(crate) fn raw(&self) -> *mut sys::llam_channel_t {
self.inner.raw
}
}
#[doc(hidden)]
pub fn __recv_op<T>(rx: &Receiver<T>, out: &mut *mut c_void) -> sys::llam_select_op_t {
sys::llam_select_op_t {
kind: sys::LLAM_SELECT_OP_RECV,
reserved0: 0,
channel: rx.raw(),
send_value: ptr::null_mut(),
recv_out: out,
result_errno: 0,
}
}
#[doc(hidden)]
pub struct __PendingSend<T> {
ptr: *mut c_void,
_marker: PhantomData<T>,
}
#[doc(hidden)]
impl<T> __PendingSend<T> {
pub fn new(value: T) -> Self {
Self {
ptr: Box::into_raw(Box::new(value)) as *mut c_void,
_marker: PhantomData,
}
}
pub fn op(&mut self, tx: &Sender<T>) -> sys::llam_select_op_t {
sys::llam_select_op_t {
kind: sys::LLAM_SELECT_OP_SEND,
reserved0: 0,
channel: tx.raw(),
send_value: self.ptr,
recv_out: ptr::null_mut(),
result_errno: 0,
}
}
pub fn commit(&mut self) {
self.ptr = ptr::null_mut();
}
}
#[doc(hidden)]
impl<T> Drop for __PendingSend<T> {
fn drop(&mut self) {
if !self.ptr.is_null() {
unsafe {
drop(Box::from_raw(self.ptr as *mut T));
}
self.ptr = ptr::null_mut();
}
}
}
#[doc(hidden)]
pub unsafe fn __select_raw(ops: &mut [sys::llam_select_op_t], deadline_ns: u64) -> Result<usize> {
let mut selected = 0usize;
let rc = sys::llam_channel_select(ops.as_mut_ptr(), ops.len(), deadline_ns, &mut selected);
if rc == 0 {
Ok(selected)
} else {
Err(Error::last())
}
}
pub struct Select<'a, R> {
ops: Vec<sys::llam_select_op_t>,
arms: Vec<Box<dyn SelectArm<'a, R> + 'a>>,
pending_closed: Vec<PendingClosed<'a, R>>,
after: Option<(Duration, Box<dyn FnOnce() -> R + 'a>)>,
default: Option<Box<dyn FnOnce() -> R + 'a>>,
}
impl<'a, R: 'a> Select<'a, R> {
pub fn new() -> Self {
Self {
ops: Vec::new(),
arms: Vec::new(),
pending_closed: Vec::new(),
after: None,
default: None,
}
}
pub fn recv<T, F>(&mut self, rx: &'a Receiver<T>, on_recv: F)
where
T: 'a,
F: FnOnce(Result<T>) -> R + 'a,
{
let raw = rx.raw();
let closed = take_pending_closed(&mut self.pending_closed, raw);
let mut arm = Box::new(RecvSelectArm {
rx,
out: ptr::null_mut(),
on_recv: Some(on_recv),
on_closed: closed,
});
let op = arm.op();
self.ops.push(op);
self.arms.push(arm);
}
pub fn send<T, F>(&mut self, tx: &'a Sender<T>, value: T, on_send: F)
where
T: 'a,
F: FnOnce() -> R + 'a,
{
let mut arm = Box::new(SendSelectArm {
pending: __PendingSend::new(value),
tx,
on_send: Some(on_send),
});
let op = arm.op();
self.ops.push(op);
self.arms.push(arm);
}
pub fn closed<T, F>(&mut self, rx: &'a Receiver<T>, on_closed: F)
where
T: 'a,
F: FnOnce() -> R + 'a,
{
let raw = rx.raw();
let mut handler = Some(Box::new(on_closed) as Box<dyn FnOnce() -> R + 'a>);
for arm in self.arms.iter_mut().rev() {
arm.try_set_closed(raw, &mut handler);
if handler.is_none() {
return;
}
}
self.pending_closed.push(PendingClosed { raw, handler });
}
pub fn after<F>(&mut self, duration: Duration, on_timeout: F)
where
F: FnOnce() -> R + 'a,
{
assert!(
self.default.is_none(),
"LLAM select cannot combine after(...) and default arms"
);
assert!(
self.after.is_none(),
"LLAM select accepts at most one after(...) arm"
);
self.after = Some((duration, Box::new(on_timeout)));
}
pub fn default<F>(&mut self, on_default: F)
where
F: FnOnce() -> R + 'a,
{
assert!(
self.after.is_none(),
"LLAM select cannot combine after(...) and default arms"
);
assert!(
self.default.is_none(),
"LLAM select accepts at most one default arm"
);
self.default = Some(Box::new(on_default));
}
pub fn run(mut self) -> R {
assert!(
!self.ops.is_empty(),
"LLAM select requires at least one recv or send arm"
);
assert!(
self.pending_closed.is_empty(),
"LLAM select closed(...) has no matching recv arm"
);
let deadline = if let Some((duration, _)) = self.after.as_ref() {
crate::time::deadline_after(*duration)
} else if self.default.is_some() {
0
} else {
u64::MAX
};
match unsafe { __select_raw(&mut self.ops, deadline) } {
Ok(index) => {
let arm = self.arms.swap_remove(index);
arm.complete()
}
Err(error) if error.is_timed_out() => {
if let Some(default) = self.default.take() {
default()
} else if let Some((_, after)) = self.after.take() {
after()
} else {
panic!("LLAM select failed: {error}");
}
}
Err(error) => panic!("LLAM select failed: {error}"),
}
}
}
impl<'a, R: 'a> Default for Select<'a, R> {
fn default() -> Self {
Self::new()
}
}
struct PendingClosed<'a, R> {
raw: *mut sys::llam_channel_t,
handler: Option<Box<dyn FnOnce() -> R + 'a>>,
}
fn take_pending_closed<'a, R>(
pending: &mut Vec<PendingClosed<'a, R>>,
raw: *mut sys::llam_channel_t,
) -> Option<Box<dyn FnOnce() -> R + 'a>> {
let index = pending.iter().position(|closed| closed.raw == raw)?;
pending.swap_remove(index).handler
}
trait SelectArm<'a, R> {
fn op(&mut self) -> sys::llam_select_op_t;
fn try_set_closed(
&mut self,
raw: *mut sys::llam_channel_t,
handler: &mut Option<Box<dyn FnOnce() -> R + 'a>>,
);
fn complete(self: Box<Self>) -> R;
}
struct RecvSelectArm<'a, T, R, F>
where
F: FnOnce(Result<T>) -> R + 'a,
{
rx: &'a Receiver<T>,
out: *mut c_void,
on_recv: Option<F>,
on_closed: Option<Box<dyn FnOnce() -> R + 'a>>,
}
impl<'a, T: 'a, R, F> SelectArm<'a, R> for RecvSelectArm<'a, T, R, F>
where
F: FnOnce(Result<T>) -> R + 'a,
{
fn op(&mut self) -> sys::llam_select_op_t {
__recv_op(self.rx, &mut self.out)
}
fn try_set_closed(
&mut self,
raw: *mut sys::llam_channel_t,
handler: &mut Option<Box<dyn FnOnce() -> R + 'a>>,
) {
if self.rx.raw() == raw && self.on_closed.is_none() {
self.on_closed = handler.take();
}
}
fn complete(mut self: Box<Self>) -> R {
if self.out.is_null() {
if let Some(on_closed) = self.on_closed.take() {
return on_closed();
}
}
let value = unsafe { self.rx.__take_selected(self.out) };
(self
.on_recv
.take()
.expect("LLAM select recv arm already used"))(value)
}
}
struct SendSelectArm<'a, T, R, F>
where
F: FnOnce() -> R + 'a,
{
pending: __PendingSend<T>,
tx: &'a Sender<T>,
on_send: Option<F>,
}
impl<'a, T: 'a, R, F> SelectArm<'a, R> for SendSelectArm<'a, T, R, F>
where
F: FnOnce() -> R + 'a,
{
fn op(&mut self) -> sys::llam_select_op_t {
self.pending.op(self.tx)
}
fn try_set_closed(
&mut self,
_raw: *mut sys::llam_channel_t,
_handler: &mut Option<Box<dyn FnOnce() -> R + 'a>>,
) {
}
fn complete(mut self: Box<Self>) -> R {
self.pending.commit();
(self
.on_send
.take()
.expect("LLAM select send arm already used"))()
}
}