use tokio::sync::mpsc;
#[derive(Debug, Clone, Copy)]
pub enum MailboxConfig {
Unbounded,
Bounded {
capacity: usize,
},
}
impl Default for MailboxConfig {
fn default() -> Self {
Self::Bounded { capacity: 100 }
}
}
impl MailboxConfig {
#[must_use]
pub const fn unbounded() -> Self {
Self::Unbounded
}
#[must_use]
pub fn bounded(capacity: usize) -> Self {
assert!(capacity > 0, "mailbox capacity must be > 0");
Self::Bounded { capacity }
}
}
#[derive(Clone)]
pub struct MailboxHandle {
tx: MailboxSender,
worker_id: String,
}
enum MailboxSender {
Bounded(mpsc::Sender<String>),
Unbounded(mpsc::UnboundedSender<String>),
}
impl Clone for MailboxSender {
fn clone(&self) -> Self {
match self {
Self::Bounded(tx) => Self::Bounded(tx.clone()),
Self::Unbounded(tx) => Self::Unbounded(tx.clone()),
}
}
}
impl MailboxHandle {
pub(crate) fn new_bounded(tx: mpsc::Sender<String>, worker_id: impl Into<String>) -> Self {
Self {
tx: MailboxSender::Bounded(tx),
worker_id: worker_id.into(),
}
}
pub(crate) fn new_unbounded(
tx: mpsc::UnboundedSender<String>,
worker_id: impl Into<String>,
) -> Self {
Self {
tx: MailboxSender::Unbounded(tx),
worker_id: worker_id.into(),
}
}
pub async fn send(&self, message: impl Into<String>) -> Result<(), SendError> {
let msg = message.into();
match &self.tx {
MailboxSender::Bounded(tx) => tx
.send(msg)
.await
.map_err(|_| SendError::Closed(self.worker_id.clone())),
MailboxSender::Unbounded(tx) => tx
.send(msg)
.map_err(|_| SendError::Closed(self.worker_id.clone())),
}
}
pub fn try_send(&self, message: impl Into<String>) -> Result<(), TrySendError> {
let msg = message.into();
match &self.tx {
MailboxSender::Bounded(tx) => tx.try_send(msg).map_err(|e| match e {
mpsc::error::TrySendError::Full(_) => TrySendError::Full,
mpsc::error::TrySendError::Closed(_) => {
TrySendError::Closed(self.worker_id.clone())
}
}),
MailboxSender::Unbounded(tx) => tx
.send(msg)
.map_err(|_| TrySendError::Closed(self.worker_id.clone())),
}
}
#[must_use]
pub fn worker_id(&self) -> &str {
&self.worker_id
}
#[must_use]
pub fn is_open(&self) -> bool {
match &self.tx {
MailboxSender::Bounded(tx) => !tx.is_closed(),
MailboxSender::Unbounded(tx) => !tx.is_closed(),
}
}
}
pub struct Mailbox {
rx: MailboxReceiver,
}
enum MailboxReceiver {
Bounded(mpsc::Receiver<String>),
Unbounded(mpsc::UnboundedReceiver<String>),
}
impl Mailbox {
pub(crate) fn new_bounded(rx: mpsc::Receiver<String>) -> Self {
Self {
rx: MailboxReceiver::Bounded(rx),
}
}
pub(crate) fn new_unbounded(rx: mpsc::UnboundedReceiver<String>) -> Self {
Self {
rx: MailboxReceiver::Unbounded(rx),
}
}
pub async fn recv(&mut self) -> Option<String> {
match &mut self.rx {
MailboxReceiver::Bounded(rx) => rx.recv().await,
MailboxReceiver::Unbounded(rx) => rx.recv().await,
}
}
pub fn try_recv(&mut self) -> Result<String, TryRecvError> {
match &mut self.rx {
MailboxReceiver::Bounded(rx) => rx.try_recv().map_err(|e| match e {
mpsc::error::TryRecvError::Empty => TryRecvError::Empty,
mpsc::error::TryRecvError::Disconnected => TryRecvError::Disconnected,
}),
MailboxReceiver::Unbounded(rx) => rx.try_recv().map_err(|e| match e {
mpsc::error::TryRecvError::Empty => TryRecvError::Empty,
mpsc::error::TryRecvError::Disconnected => TryRecvError::Disconnected,
}),
}
}
}
#[must_use]
pub fn mailbox(config: MailboxConfig) -> (MailboxHandle, Mailbox) {
match config {
MailboxConfig::Unbounded => {
let (tx, rx) = mpsc::unbounded_channel();
(
MailboxHandle::new_unbounded(tx, "unnamed"),
Mailbox::new_unbounded(rx),
)
}
MailboxConfig::Bounded { capacity } => {
let (tx, rx) = mpsc::channel(capacity);
(
MailboxHandle::new_bounded(tx, "unnamed"),
Mailbox::new_bounded(rx),
)
}
}
}
pub fn mailbox_named(
config: MailboxConfig,
worker_id_input: impl Into<String>,
) -> (MailboxHandle, Mailbox) {
let worker_id = worker_id_input.into();
match config {
MailboxConfig::Unbounded => {
let (tx, rx) = mpsc::unbounded_channel();
(
MailboxHandle::new_unbounded(tx, worker_id),
Mailbox::new_unbounded(rx),
)
}
MailboxConfig::Bounded { capacity } => {
let (tx, rx) = mpsc::channel(capacity);
(
MailboxHandle::new_bounded(tx, worker_id),
Mailbox::new_bounded(rx),
)
}
}
}
#[derive(Debug, Clone, thiserror::Error)]
pub enum SendError {
#[error("worker '{0}' mailbox is closed")]
Closed(String),
}
#[derive(Debug, Clone, thiserror::Error)]
pub enum TrySendError {
#[error("mailbox is full")]
Full,
#[error("worker '{0}' mailbox is closed")]
Closed(String),
}
#[derive(Debug, Clone, thiserror::Error)]
pub enum TryRecvError {
#[error("mailbox is empty")]
Empty,
#[error("mailbox is disconnected")]
Disconnected,
}