use std::{
collections::VecDeque,
fmt,
sync::{Arc, Mutex},
};
use jacquard_macros::public_model;
use serde::{Deserialize, Serialize};
#[public_model]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum DispatchSendOutcome {
Enqueued,
}
#[public_model]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct DispatchOverflow;
impl fmt::Display for DispatchOverflow {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.write_str("dispatch queue is full")
}
}
impl std::error::Error for DispatchOverflow {}
struct SharedDispatch<T> {
queue: Mutex<VecDeque<T>>,
capacity: usize,
}
#[derive(Clone)]
pub struct DispatchSender<T> {
shared: Arc<SharedDispatch<T>>,
}
pub struct DispatchReceiver<T> {
shared: Arc<SharedDispatch<T>>,
}
#[must_use]
pub fn dispatch_mailbox<T>(capacity: usize) -> (DispatchSender<T>, DispatchReceiver<T>) {
assert!(capacity > 0, "dispatch mailbox capacity must be non-zero");
let shared = Arc::new(SharedDispatch {
queue: Mutex::new(VecDeque::new()),
capacity,
});
(
DispatchSender {
shared: Arc::clone(&shared),
},
DispatchReceiver { shared },
)
}
impl<T> DispatchSender<T> {
pub fn send(&self, item: T) -> Result<DispatchSendOutcome, DispatchOverflow> {
let mut guard = self.shared.queue.lock().expect("dispatch queue lock");
if guard.len() >= self.shared.capacity {
return Err(DispatchOverflow);
}
guard.push_back(item);
Ok(DispatchSendOutcome::Enqueued)
}
}
impl<T> DispatchReceiver<T> {
#[must_use]
pub fn drain(&mut self) -> Vec<T> {
self.shared
.queue
.lock()
.expect("dispatch queue lock")
.drain(..)
.collect()
}
#[must_use]
pub fn pending_len(&self) -> usize {
self.shared.queue.lock().expect("dispatch queue lock").len()
}
}
#[cfg(test)]
mod tests {
use super::{dispatch_mailbox, DispatchSendOutcome};
#[test]
fn dispatch_mailbox_fails_closed_when_full() {
let (sender, _) = dispatch_mailbox(1);
assert_eq!(
sender.send(1).expect("enqueue command"),
DispatchSendOutcome::Enqueued
);
let error = sender
.send(2)
.expect_err("queue should fail closed when full");
assert_eq!(error.to_string(), "dispatch queue is full");
}
#[test]
fn dispatch_mailbox_drains_in_fifo_order() {
let (sender, mut receiver) = dispatch_mailbox(4);
sender.send(1).expect("enqueue first");
sender.send(2).expect("enqueue second");
let drained = receiver.drain();
assert_eq!(drained, vec![1, 2]);
assert_eq!(receiver.pending_len(), 0);
}
}