use std::ptr::{self, NonNull};
use cordyceps::{
mpsc_queue::{Links, MpscQueue},
Linked,
};
use parking_lot::Mutex;
use tokio::sync::{Notify, Semaphore, TryAcquireError};
use elfo_utils::CachePadded;
use crate::{
envelope::{Envelope, EnvelopeHeader},
errors::{SendError, TrySendError},
tracing::TraceId,
};
pub mod config {
#[derive(Debug, PartialEq, serde::Deserialize)]
#[serde(default)]
pub struct MailboxConfig {
pub capacity: usize,
}
impl Default for MailboxConfig {
fn default() -> Self {
Self { capacity: 100 }
}
}
}
pub(crate) type Link = Links<EnvelopeHeader>;
assert_not_impl_any!(EnvelopeHeader: Unpin);
unsafe impl Linked<Link> for EnvelopeHeader {
type Handle = Envelope;
fn into_ptr(handle: Self::Handle) -> NonNull<Self> {
handle.into_header_ptr()
}
unsafe fn from_ptr(ptr: NonNull<Self>) -> Self::Handle {
Self::Handle::from_header_ptr(ptr)
}
unsafe fn links(ptr: NonNull<Self>) -> NonNull<Link> {
let links = ptr::addr_of_mut!((*ptr.as_ptr()).link);
NonNull::new_unchecked(links)
}
}
pub(crate) struct Mailbox {
queue: MpscQueue<EnvelopeHeader>,
tx_semaphore: Semaphore,
rx_notify: CachePadded<Notify>,
control: Mutex<Control>,
}
struct Control {
closed_trace_id: Option<TraceId>,
capacity: usize,
}
impl Mailbox {
pub(crate) fn new(config: &config::MailboxConfig) -> Self {
let capacity = clamp_capacity(config.capacity);
Self {
queue: MpscQueue::new_with_stub(Envelope::stub()),
tx_semaphore: Semaphore::new(capacity),
rx_notify: CachePadded::new(Notify::new()),
control: Mutex::new(Control {
closed_trace_id: None,
capacity,
}),
}
}
pub(crate) fn set_capacity(&self, capacity: usize) {
let mut control = self.control.lock();
if capacity == control.capacity {
return;
}
if capacity < control.capacity {
let delta = control.capacity - capacity;
let real_delta = self.tx_semaphore.forget_permits(delta);
debug_assert!(real_delta <= delta);
control.capacity -= real_delta;
} else {
let real_delta = clamp_capacity(capacity) - control.capacity;
self.tx_semaphore.add_permits(real_delta);
control.capacity += real_delta;
}
}
pub(crate) async fn send(&self, envelope: Envelope) -> Result<(), SendError<Envelope>> {
let permit = match self.tx_semaphore.acquire().await {
Ok(permit) => permit,
Err(_) => return Err(SendError(envelope)),
};
permit.forget();
self.queue.enqueue(envelope);
self.rx_notify.notify_one();
Ok(())
}
pub(crate) fn try_send(&self, envelope: Envelope) -> Result<(), TrySendError<Envelope>> {
match self.tx_semaphore.try_acquire() {
Ok(permit) => {
permit.forget();
self.queue.enqueue(envelope);
self.rx_notify.notify_one();
Ok(())
}
Err(TryAcquireError::NoPermits) => Err(TrySendError::Full(envelope)),
Err(TryAcquireError::Closed) => Err(TrySendError::Closed(envelope)),
}
}
pub(crate) fn unbounded_send(&self, envelope: Envelope) -> Result<(), SendError<Envelope>> {
match self.tx_semaphore.try_acquire() {
Ok(permit) => {
permit.forget();
}
Err(TryAcquireError::Closed) => return Err(SendError(envelope)),
Err(TryAcquireError::NoPermits) => {}
}
self.queue.enqueue(envelope);
self.rx_notify.notify_one();
Ok(())
}
pub(crate) async fn recv(&self) -> RecvResult {
loop {
if let Some(envelope) = self.queue.dequeue() {
self.tx_semaphore.add_permits(1);
return RecvResult::Data(envelope);
}
if self.tx_semaphore.is_closed() {
return self.on_close();
}
self.rx_notify.notified().await;
}
}
pub(crate) fn try_recv(&self) -> Option<RecvResult> {
match self.queue.dequeue() {
Some(envelope) => {
self.tx_semaphore.add_permits(1);
Some(RecvResult::Data(envelope))
}
None if self.tx_semaphore.is_closed() => Some(self.on_close()),
None => None,
}
}
#[cold]
pub(crate) fn close(&self, trace_id: TraceId) -> bool {
let mut control = self.control.lock();
if self.tx_semaphore.is_closed() {
return false;
}
control.closed_trace_id = Some(trace_id);
self.tx_semaphore.close();
self.rx_notify.notify_one();
true
}
#[cold]
pub(crate) fn drop_all(&self) {
while self.queue.dequeue().is_some() {}
}
#[cold]
fn on_close(&self) -> RecvResult {
match self.queue.dequeue() {
Some(envelope) => RecvResult::Data(envelope),
None => {
let control = self.control.lock();
let trace_id = control.closed_trace_id.expect("called before close()");
RecvResult::Closed(trace_id)
}
}
}
}
pub(crate) enum RecvResult {
Data(Envelope),
Closed(TraceId),
}
fn clamp_capacity(capacity: usize) -> usize {
capacity.min(Semaphore::MAX_PERMITS)
}