use std::collections::VecDeque;
use crate::types::{ActorId, MsgId, PluginId, SecurityLevel};
pub type Queue<T> = VecDeque<T>;
#[derive(Debug)]
pub enum ActorError {
MailboxFull(usize, usize),
NoMessage,
Blocked(ActorId),
}
pub type Data = Vec<u8>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Message {
pub(crate) id: MsgId,
pub(crate) src: PluginId,
pub(crate) dst: PluginId,
pub(crate) level: SecurityLevel,
pub(crate) body: Data,
}
impl Message {
pub fn new(id: MsgId, src: PluginId, dst: PluginId, level: SecurityLevel, body: Data) -> Self {
Message {
id,
src,
dst,
level,
body,
}
}
#[inline]
pub fn id(&self) -> MsgId {
self.id
}
#[inline]
pub fn src(&self) -> PluginId {
self.src
}
#[inline]
pub fn dst(&self) -> PluginId {
self.dst
}
#[inline]
pub fn level(&self) -> SecurityLevel {
self.level
}
#[inline]
pub fn body(&self) -> &Data {
&self.body
}
pub fn into_body(self) -> Data {
self.body
}
}
#[derive(Debug, Clone)]
pub struct ActorRuntime {
pub(crate) pending: Queue<Message>,
pub(crate) mailbox: Queue<Message>,
pub(crate) capacity: usize,
pub(crate) blocked_on: Option<ActorId>,
pub(crate) pending_send: Option<Message>,
}
impl ActorRuntime {
pub fn empty(capacity: usize) -> Self {
ActorRuntime {
pending: VecDeque::new(),
mailbox: VecDeque::new(),
capacity,
blocked_on: None,
pending_send: None,
}
}
#[inline]
pub fn can_receive(&self) -> bool {
!self.mailbox.is_empty()
}
#[inline]
pub fn can_send(&self, target_capacity: usize) -> bool {
self.pending.len() < target_capacity
}
#[inline]
pub fn mailbox_has_space(&self) -> bool {
self.mailbox.len() < self.capacity
}
pub fn enqueue_pending(&self, msg: Message) -> Self {
let mut new_pending = self.pending.clone();
new_pending.push_back(msg);
ActorRuntime {
pending: new_pending,
mailbox: self.mailbox.clone(),
capacity: self.capacity,
blocked_on: self.blocked_on,
pending_send: self.pending_send.clone(),
}
}
pub fn enqueue_pending_mut(&mut self, msg: Message) {
self.pending.push_back(msg);
}
pub fn deliver(&self) -> Self {
if self.pending.is_empty() {
return self.clone();
}
if self.mailbox.len() >= self.capacity {
return self.clone();
}
let mut new_pending = self.pending.clone();
let first_msg = new_pending.pop_front().expect("checked non-empty");
let mut new_mailbox = self.mailbox.clone();
new_mailbox.push_back(first_msg);
ActorRuntime {
pending: new_pending,
mailbox: new_mailbox,
capacity: self.capacity,
blocked_on: self.blocked_on,
pending_send: self.pending_send.clone(),
}
}
pub fn deliver_mut(&mut self) -> Result<bool, ActorError> {
if self.pending.is_empty() {
return Err(ActorError::NoMessage);
}
if self.mailbox.len() < self.capacity {
let msg = self.pending.pop_front().expect("checked non-empty");
self.mailbox.push_back(msg);
Ok(true)
} else {
Ok(false)
}
}
pub fn consume(&self) -> (Self, Option<Message>) {
if self.mailbox.is_empty() {
return (self.clone(), None);
}
let mut new_mailbox = self.mailbox.clone();
let msg = new_mailbox.pop_front().expect("checked non-empty");
(
ActorRuntime {
pending: self.pending.clone(),
mailbox: new_mailbox,
capacity: self.capacity,
blocked_on: self.blocked_on,
pending_send: self.pending_send.clone(),
},
Some(msg),
)
}
pub fn consume_mut(&mut self) -> Option<Message> {
self.mailbox.pop_front()
}
pub fn set_blocked(&self, on: ActorId) -> Self {
ActorRuntime {
pending: self.pending.clone(),
mailbox: self.mailbox.clone(),
capacity: self.capacity,
blocked_on: Some(on),
pending_send: self.pending_send.clone(),
}
}
pub fn set_blocked_mut(&mut self, on: ActorId) {
self.blocked_on = Some(on);
}
pub fn unblock(&self) -> Self {
ActorRuntime {
pending: self.pending.clone(),
mailbox: self.mailbox.clone(),
capacity: self.capacity,
blocked_on: None,
pending_send: self.pending_send.clone(),
}
}
pub fn unblock_mut(&mut self) {
self.blocked_on = None;
}
#[inline]
pub fn blocked_on(&self) -> Option<ActorId> {
self.blocked_on
}
#[inline]
pub fn is_blocked(&self) -> bool {
self.blocked_on.is_some()
}
#[inline]
pub fn capacity(&self) -> usize {
self.capacity
}
#[inline]
pub fn pending_len(&self) -> usize {
self.pending.len()
}
#[inline]
pub fn mailbox_len(&self) -> usize {
self.mailbox.len()
}
#[inline]
pub fn pending_send(&self) -> Option<&Message> {
self.pending_send.as_ref()
}
pub fn set_pending_send(&mut self, msg: Option<Message>) {
self.pending_send = msg;
}
}
impl Default for ActorRuntime {
fn default() -> Self {
ActorRuntime::empty(0)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_test_message(id: MsgId) -> Message {
Message::new(id, 1, 2, SecurityLevel::Public, vec![1, 2, 3])
}
#[test]
fn test_actor_runtime_empty() {
let ar = ActorRuntime::empty(10);
assert_eq!(ar.capacity(), 10);
assert_eq!(ar.pending_len(), 0);
assert_eq!(ar.mailbox_len(), 0);
assert!(!ar.can_receive());
assert!(!ar.is_blocked());
}
#[test]
fn test_actor_runtime_enqueue_pending() {
let mut ar = ActorRuntime::empty(10);
let msg = make_test_message(1);
ar.enqueue_pending_mut(msg);
assert_eq!(ar.pending_len(), 1);
}
#[test]
fn test_actor_runtime_enqueue_pending_increases() {
let ar = ActorRuntime::empty(10);
let msg = make_test_message(1);
let ar2 = ar.enqueue_pending(msg);
assert_eq!(ar2.pending_len(), ar.pending_len() + 1);
}
#[test]
fn test_actor_runtime_enqueue_pending_preserves_mailbox() {
let ar = ActorRuntime::empty(10);
let msg = make_test_message(1);
let ar2 = ar.enqueue_pending(msg);
assert_eq!(ar2.mailbox_len(), ar.mailbox_len());
}
#[test]
fn test_actor_runtime_deliver() {
let mut ar = ActorRuntime::empty(10);
let msg = make_test_message(1);
ar.enqueue_pending_mut(msg);
assert_eq!(ar.pending_len(), 1);
assert_eq!(ar.mailbox_len(), 0);
let result = ar.deliver_mut();
assert!(matches!(result, Ok(true)));
assert_eq!(ar.pending_len(), 0);
assert_eq!(ar.mailbox_len(), 1);
}
#[test]
fn test_actor_runtime_deliver_decreases_pending() {
let ar = ActorRuntime::empty(10);
let msg = make_test_message(1);
let ar = ar.enqueue_pending(msg);
let ar2 = ar.deliver();
assert!(ar2.pending_len() < ar.pending_len());
}
#[test]
fn test_actor_runtime_deliver_increases_mailbox() {
let ar = ActorRuntime::empty(10);
let msg = make_test_message(1);
let ar = ar.enqueue_pending(msg);
let ar2 = ar.deliver();
assert_eq!(ar2.mailbox_len(), ar.mailbox_len() + 1);
}
#[test]
fn test_actor_runtime_consume() {
let mut ar = ActorRuntime::empty(10);
let msg = make_test_message(42);
ar.enqueue_pending_mut(msg);
ar.deliver_mut().expect("deliver should succeed");
let consumed = ar.consume_mut();
assert!(consumed.is_some());
assert_eq!(consumed.map(|m| m.id()), Some(42));
assert_eq!(ar.mailbox_len(), 0);
}
#[test]
fn test_actor_runtime_consume_decreases_mailbox() {
let ar = ActorRuntime::empty(10);
let msg = make_test_message(1);
let ar = ar.enqueue_pending(msg);
let ar = ar.deliver();
let (ar2, _) = ar.consume();
assert!(ar2.mailbox_len() < ar.mailbox_len());
}
#[test]
fn test_actor_runtime_consume_preserves_capacity() {
let ar = ActorRuntime::empty(10);
let msg = make_test_message(1);
let ar = ar.enqueue_pending(msg);
let ar = ar.deliver();
let (ar2, _) = ar.consume();
assert_eq!(ar2.capacity(), ar.capacity());
}
#[test]
fn test_actor_runtime_consume_preserves_blocked_on() {
let ar = ActorRuntime::empty(10);
let msg = make_test_message(1);
let ar = ar.enqueue_pending(msg);
let ar = ar.deliver();
let ar = ar.set_blocked(99);
let (ar2, _) = ar.consume();
assert_eq!(ar2.blocked_on(), ar.blocked_on());
}
#[test]
fn test_actor_runtime_set_blocked() {
let ar = ActorRuntime::empty(10);
let ar2 = ar.set_blocked(42);
assert_eq!(ar2.blocked_on(), Some(42));
}
#[test]
fn test_actor_runtime_set_blocked_preserves_mailbox() {
let ar = ActorRuntime::empty(10);
let ar2 = ar.set_blocked(42);
assert_eq!(ar2.mailbox_len(), ar.mailbox_len());
}
#[test]
fn test_actor_runtime_unblock_clears() {
let ar = ActorRuntime::empty(10);
let ar = ar.set_blocked(42);
let ar2 = ar.unblock();
assert_eq!(ar2.blocked_on(), None);
}
#[test]
fn test_actor_runtime_unblock_preserves_mailbox() {
let ar = ActorRuntime::empty(10);
let ar = ar.set_blocked(42);
let ar2 = ar.unblock();
assert_eq!(ar2.mailbox_len(), ar.mailbox_len());
}
#[test]
fn test_actor_runtime_mailbox_full() {
let mut ar = ActorRuntime::empty(2);
ar.enqueue_pending_mut(make_test_message(1));
ar.deliver_mut().expect("deliver 1");
ar.enqueue_pending_mut(make_test_message(2));
ar.deliver_mut().expect("deliver 2");
assert!(!ar.mailbox_has_space());
assert_eq!(ar.mailbox_len(), 2);
ar.enqueue_pending_mut(make_test_message(3));
let result = ar.deliver_mut();
assert!(matches!(result, Ok(false))); }
#[test]
fn test_message_accessors() {
let msg = Message::new(42, 1, 2, SecurityLevel::Confidential, vec![10, 20, 30]);
assert_eq!(msg.id(), 42);
assert_eq!(msg.src(), 1);
assert_eq!(msg.dst(), 2);
assert_eq!(msg.level(), SecurityLevel::Confidential);
assert_eq!(msg.body(), &vec![10, 20, 30]);
}
}