atomr-core 0.1.0

Actors, supervision, dispatch, mailboxes, scheduler, FSM, event stream, and coordinated shutdown — the core of the atomr actor runtime.
Documentation
//! Message queue implementations. akka.net: `Dispatch/MessageQueues`.
//!
//! These are in-memory data structures used by the mailbox. They are
//! `!Send` outside their owning `ActorCell` — all external sending goes
//! through the typed channel held in [`crate::actor::ActorRef`].

use std::cmp::Ordering;
use std::collections::{BinaryHeap, VecDeque};

use crate::util::BoundedQueue;

/// Envelope trait used by priority queues.
pub trait Prioritized {
    fn priority(&self) -> i32;
}

/// Unbounded FIFO queue (akka.net: `UnboundedMessageQueue`).
#[derive(Debug, Default)]
pub struct UnboundedQueue<T> {
    inner: VecDeque<T>,
}

impl<T> UnboundedQueue<T> {
    pub fn new() -> Self {
        Self { inner: VecDeque::new() }
    }

    pub fn push(&mut self, msg: T) {
        self.inner.push_back(msg);
    }

    pub fn pop(&mut self) -> Option<T> {
        self.inner.pop_front()
    }

    pub fn len(&self) -> usize {
        self.inner.len()
    }

    pub fn is_empty(&self) -> bool {
        self.inner.is_empty()
    }
}

/// Bounded FIFO queue (akka.net: `BoundedMessageQueue`).
#[derive(Debug)]
pub struct BoundedMsgQueue<T> {
    inner: BoundedQueue<T>,
}

impl<T> BoundedMsgQueue<T> {
    pub fn new(capacity: usize) -> Self {
        Self { inner: BoundedQueue::new(capacity) }
    }

    pub fn push(&mut self, msg: T) -> Result<(), T> {
        self.inner.push(msg)
    }

    pub fn pop(&mut self) -> Option<T> {
        self.inner.pop()
    }

    pub fn is_full(&self) -> bool {
        self.inner.is_full()
    }
}

/// Deque-like queue permitting front insertion (for stash/unstash).
/// akka.net: `UnboundedDequeMessageQueue`.
#[derive(Debug)]
pub struct DequeQueue<T> {
    inner: VecDeque<T>,
}

impl<T> Default for DequeQueue<T> {
    fn default() -> Self {
        Self { inner: VecDeque::new() }
    }
}

impl<T> DequeQueue<T> {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn push_back(&mut self, msg: T) {
        self.inner.push_back(msg);
    }

    pub fn push_front(&mut self, msg: T) {
        self.inner.push_front(msg);
    }

    pub fn pop(&mut self) -> Option<T> {
        self.inner.pop_front()
    }
}

/// Priority queue. akka.net: `UnboundedPriorityMessageQueue`.
///
/// `T` must implement [`Prioritized`].
pub struct PriorityQueue<T: Prioritized> {
    heap: BinaryHeap<PriItem<T>>,
}

impl<T: Prioritized> std::fmt::Debug for PriorityQueue<T> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("PriorityQueue").field("len", &self.heap.len()).finish()
    }
}

impl<T: Prioritized> Default for PriorityQueue<T> {
    fn default() -> Self {
        Self { heap: BinaryHeap::new() }
    }
}

impl<T: Prioritized> PriorityQueue<T> {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn push(&mut self, msg: T) {
        let p = msg.priority();
        self.heap.push(PriItem { prio: p, inner: msg });
    }

    pub fn pop(&mut self) -> Option<T> {
        self.heap.pop().map(|i| i.inner)
    }
}

struct PriItem<T: Prioritized> {
    prio: i32,
    inner: T,
}

impl<T: Prioritized> PartialEq for PriItem<T> {
    fn eq(&self, other: &Self) -> bool {
        self.prio == other.prio
    }
}
impl<T: Prioritized> Eq for PriItem<T> {}
impl<T: Prioritized> PartialOrd for PriItem<T> {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}
impl<T: Prioritized> Ord for PriItem<T> {
    fn cmp(&self, other: &Self) -> Ordering {
        self.prio.cmp(&other.prio)
    }
}

/// Stable priority queue (FIFO among equal priorities).
/// akka.net: `UnboundedStablePriorityMessageQueue`.
pub struct StablePriorityQueue<T: Prioritized> {
    heap: BinaryHeap<StableItem<T>>,
    seq: u64,
}

impl<T: Prioritized> std::fmt::Debug for StablePriorityQueue<T> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("StablePriorityQueue").field("len", &self.heap.len()).finish()
    }
}

impl<T: Prioritized> Default for StablePriorityQueue<T> {
    fn default() -> Self {
        Self { heap: BinaryHeap::new(), seq: 0 }
    }
}

impl<T: Prioritized> StablePriorityQueue<T> {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn push(&mut self, msg: T) {
        let p = msg.priority();
        let s = self.seq;
        self.seq = self.seq.wrapping_add(1);
        self.heap.push(StableItem { prio: p, seq: s, inner: msg });
    }

    pub fn pop(&mut self) -> Option<T> {
        self.heap.pop().map(|i| i.inner)
    }
}

struct StableItem<T: Prioritized> {
    prio: i32,
    seq: u64,
    inner: T,
}

impl<T: Prioritized> PartialEq for StableItem<T> {
    fn eq(&self, other: &Self) -> bool {
        self.prio == other.prio && self.seq == other.seq
    }
}
impl<T: Prioritized> Eq for StableItem<T> {}
impl<T: Prioritized> PartialOrd for StableItem<T> {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}
impl<T: Prioritized> Ord for StableItem<T> {
    fn cmp(&self, other: &Self) -> Ordering {
        self.prio.cmp(&other.prio).then_with(|| other.seq.cmp(&self.seq))
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[derive(Debug, PartialEq)]
    struct M(i32);
    impl Prioritized for M {
        fn priority(&self) -> i32 {
            self.0
        }
    }

    #[test]
    fn unbounded_fifo() {
        let mut q = UnboundedQueue::new();
        q.push(1);
        q.push(2);
        assert_eq!(q.pop(), Some(1));
        assert_eq!(q.pop(), Some(2));
    }

    #[test]
    fn bounded_rejects_when_full() {
        let mut q = BoundedMsgQueue::new(1);
        q.push(1).unwrap();
        assert!(q.push(2).is_err());
    }

    #[test]
    fn priority_highest_first() {
        let mut q = PriorityQueue::new();
        q.push(M(1));
        q.push(M(5));
        q.push(M(3));
        assert_eq!(q.pop().unwrap().0, 5);
        assert_eq!(q.pop().unwrap().0, 3);
    }

    #[test]
    fn stable_priority_preserves_fifo_for_ties() {
        let mut q = StablePriorityQueue::new();
        q.push(M(1));
        q.push(M(2));
        q.push(M(1));
        assert_eq!(q.pop().unwrap().0, 2);
        // both remaining priorities are 1 — FIFO
        assert_eq!(q.pop().unwrap().0, 1);
        assert_eq!(q.pop().unwrap().0, 1);
    }
}