use crate::state::State;
use crate::types::{ActorId, ThreadId, WorkflowId};
use super::{KernelOpError, StepError};
#[derive(Debug, Clone, PartialEq, Eq)]
#[must_use = "kernel operations must be executed"]
pub enum KernelOp {
RouteOne {
dst: ActorId,
},
WorkflowTick {
wid: WorkflowId,
},
TimeTick,
UnblockSend {
dst: ActorId,
},
ThreadSwitch {
from: ThreadId,
to: ThreadId,
},
}
impl KernelOp {
pub fn execute(&self, state: &State) -> Result<State, StepError> {
match self {
KernelOp::RouteOne { dst } => execute_route_one(state, *dst),
KernelOp::WorkflowTick { wid } => execute_workflow_tick(state, *wid),
KernelOp::TimeTick => execute_time_tick(state),
KernelOp::UnblockSend { dst } => execute_unblock_send(state, *dst),
KernelOp::ThreadSwitch { from, to } => execute_thread_switch(state, *from, *to),
}
}
pub fn execute_mut(&self, state: &mut State) -> Result<(), StepError> {
match self {
KernelOp::RouteOne { dst } => execute_route_one_mut(state, *dst),
KernelOp::WorkflowTick { wid } => execute_workflow_tick_mut(state, *wid),
KernelOp::TimeTick => execute_time_tick_mut(state),
KernelOp::UnblockSend { dst } => execute_unblock_send_mut(state, *dst),
KernelOp::ThreadSwitch { from, to } => execute_thread_switch_mut(state, *from, *to),
}
}
}
fn execute_route_one(state: &State, dst: ActorId) -> Result<State, StepError> {
let actor = state.get_actor(dst).ok_or(StepError::ActorNotFound(dst))?;
if actor.pending_len() == 0 {
return Err(StepError::KernelOpFailed(
KernelOpError::NoPendingMessages { dst },
));
}
if actor.mailbox_len() >= actor.capacity() {
return Err(StepError::KernelOpFailed(
KernelOpError::MailboxAtCapacity { dst },
));
}
let mut new_state = state.clone();
if let Some(actor) = new_state.get_actor_mut(dst) {
let _ = actor.deliver_mut();
}
Ok(new_state)
}
fn execute_workflow_tick(state: &State, wid: WorkflowId) -> Result<State, StepError> {
let workflow = state.get_workflow(wid).ok_or(StepError::KernelOpFailed(
KernelOpError::WorkflowNotFound { wid },
))?;
if !workflow.is_running() {
return Err(StepError::KernelOpFailed(
KernelOpError::WorkflowNotRunning { wid },
));
}
Ok(state.clone())
}
fn execute_time_tick(state: &State) -> Result<State, StepError> {
let mut new_state = state.clone();
new_state
.tick()
.map_err(|e| StepError::KernelOpFailed(KernelOpError::CounterOverflow(e.to_string())))?;
Ok(new_state)
}
fn execute_unblock_send(state: &State, dst: ActorId) -> Result<State, StepError> {
let _actor = state.get_actor(dst).ok_or(StepError::ActorNotFound(dst))?;
let mut new_state = state.clone();
if let Some(actor) = new_state.get_actor_mut(dst) {
actor.unblock_mut();
}
Ok(new_state)
}
fn execute_thread_switch(
_state: &State,
_from: ThreadId,
_to: ThreadId,
) -> Result<State, StepError> {
Err(StepError::KernelOpFailed(KernelOpError::NotImplemented {
operation: "thread_switch",
}))
}
fn execute_route_one_mut(state: &mut State, dst: ActorId) -> Result<(), StepError> {
let actor = state.get_actor(dst).ok_or(StepError::ActorNotFound(dst))?;
if actor.pending_len() == 0 {
return Err(StepError::KernelOpFailed(
KernelOpError::NoPendingMessages { dst },
));
}
if actor.mailbox_len() >= actor.capacity() {
return Err(StepError::KernelOpFailed(
KernelOpError::MailboxAtCapacity { dst },
));
}
if let Some(actor) = state.get_actor_mut(dst) {
let _ = actor.deliver_mut();
}
Ok(())
}
fn execute_workflow_tick_mut(state: &mut State, wid: WorkflowId) -> Result<(), StepError> {
let workflow = state.get_workflow(wid).ok_or(StepError::KernelOpFailed(
KernelOpError::WorkflowNotFound { wid },
))?;
if !workflow.is_running() {
return Err(StepError::KernelOpFailed(
KernelOpError::WorkflowNotRunning { wid },
));
}
Ok(())
}
fn execute_time_tick_mut(state: &mut State) -> Result<(), StepError> {
state
.tick()
.map_err(|e| StepError::KernelOpFailed(KernelOpError::CounterOverflow(e.to_string())))?;
Ok(())
}
fn execute_unblock_send_mut(state: &mut State, dst: ActorId) -> Result<(), StepError> {
let _actor = state.get_actor(dst).ok_or(StepError::ActorNotFound(dst))?;
if let Some(actor) = state.get_actor_mut(dst) {
actor.unblock_mut();
}
Ok(())
}
fn execute_thread_switch_mut(
_state: &mut State,
_from: ThreadId,
_to: ThreadId,
) -> Result<(), StepError> {
Err(StepError::KernelOpFailed(KernelOpError::NotImplemented {
operation: "thread_switch",
}))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::state::{ActorRuntime, Message, PluginState, WorkflowInstance};
use crate::types::SecurityLevel;
fn make_test_message(id: u128) -> Message {
Message::new(id, 1, 2, SecurityLevel::Public, vec![1, 2, 3])
}
#[test]
fn test_time_tick_increments_time() {
let state = State::empty();
assert_eq!(state.time(), 0);
let op = KernelOp::TimeTick;
let new_state = op.execute(&state).expect("time_tick should succeed");
assert_eq!(new_state.time(), 1);
}
#[test]
fn test_time_tick_preserves_plugins() {
let mut state = State::empty();
let _ = state.insert_plugin(1, PluginState::empty(SecurityLevel::Public, 100));
let op = KernelOp::TimeTick;
let new_state = op.execute(&state).expect("time_tick should succeed");
assert!(new_state.get_plugin(1).is_some());
assert_eq!(
new_state.get_plugin(1).map(|p| p.memory_bounds()),
Some(100)
);
}
#[test]
fn test_time_tick_preserves_actors() {
let mut state = State::empty();
let _ = state.insert_actor(1, ActorRuntime::empty(10));
let op = KernelOp::TimeTick;
let new_state = op.execute(&state).expect("time_tick should succeed");
assert!(new_state.get_actor(1).is_some());
assert_eq!(new_state.get_actor(1).map(|a| a.capacity()), Some(10));
}
#[test]
fn test_route_one_moves_message() {
let mut state = State::empty();
let mut actor = ActorRuntime::empty(10);
actor.enqueue_pending_mut(make_test_message(42));
state.insert_actor(1, actor).unwrap();
assert_eq!(state.get_actor(1).map(|a| a.pending_len()), Some(1));
assert_eq!(state.get_actor(1).map(|a| a.mailbox_len()), Some(0));
let op = KernelOp::RouteOne { dst: 1 };
let new_state = op.execute(&state).expect("route_one should succeed");
assert_eq!(new_state.get_actor(1).map(|a| a.pending_len()), Some(0));
assert_eq!(new_state.get_actor(1).map(|a| a.mailbox_len()), Some(1));
}
#[test]
fn test_route_one_preserves_other_actors() {
let mut state = State::empty();
let mut actor1 = ActorRuntime::empty(10);
actor1.enqueue_pending_mut(make_test_message(1));
state.insert_actor(1, actor1).unwrap();
let actor2 = ActorRuntime::empty(5);
state.insert_actor(2, actor2).unwrap();
let op = KernelOp::RouteOne { dst: 1 };
let new_state = op.execute(&state).expect("route_one should succeed");
assert_eq!(new_state.get_actor(2).map(|a| a.capacity()), Some(5));
assert_eq!(new_state.get_actor(2).map(|a| a.pending_len()), Some(0));
}
#[test]
fn test_route_one_preserves_plugins() {
let mut state = State::empty();
let mut actor = ActorRuntime::empty(10);
actor.enqueue_pending_mut(make_test_message(1));
state.insert_actor(1, actor).unwrap();
let _ = state.insert_plugin(1, PluginState::empty(SecurityLevel::Secret, 1024));
let op = KernelOp::RouteOne { dst: 1 };
let new_state = op.execute(&state).expect("route_one should succeed");
assert_eq!(new_state.plugin_level(1), Some(SecurityLevel::Secret));
}
#[test]
fn test_route_one_no_pending_fails() {
let mut state = State::empty();
let _ = state.insert_actor(1, ActorRuntime::empty(10));
let op = KernelOp::RouteOne { dst: 1 };
let result = op.execute(&state);
assert!(matches!(result, Err(StepError::KernelOpFailed(_))));
}
#[test]
fn test_route_one_mailbox_full_fails() {
let mut state = State::empty();
let mut actor = ActorRuntime::empty(1);
actor.enqueue_pending_mut(make_test_message(1));
let _ = actor.deliver_mut(); actor.enqueue_pending_mut(make_test_message(2)); state.insert_actor(1, actor).unwrap();
let op = KernelOp::RouteOne { dst: 1 };
let result = op.execute(&state);
assert!(matches!(result, Err(StepError::KernelOpFailed(_))));
}
#[test]
fn test_unblock_send_clears_blocked() {
let mut state = State::empty();
let mut actor = ActorRuntime::empty(10);
actor.set_blocked_mut(42);
state.insert_actor(1, actor).unwrap();
assert!(state.get_actor(1).map(|a| a.is_blocked()).unwrap_or(false));
let op = KernelOp::UnblockSend { dst: 1 };
let new_state = op.execute(&state).expect("unblock_send should succeed");
assert!(!new_state
.get_actor(1)
.map(|a| a.is_blocked())
.unwrap_or(true));
}
#[test]
fn test_unblock_send_preserves_mailbox() {
let mut state = State::empty();
let mut actor = ActorRuntime::empty(10);
actor.enqueue_pending_mut(make_test_message(1));
let _ = actor.deliver_mut();
actor.set_blocked_mut(42);
state.insert_actor(1, actor).unwrap();
let initial_mailbox_len = state.get_actor(1).map(|a| a.mailbox_len()).unwrap_or(0);
let op = KernelOp::UnblockSend { dst: 1 };
let new_state = op.execute(&state).expect("unblock_send should succeed");
let final_mailbox_len = new_state.get_actor(1).map(|a| a.mailbox_len()).unwrap_or(0);
assert_eq!(initial_mailbox_len, final_mailbox_len);
}
#[test]
fn test_workflow_tick_not_found() {
let state = State::empty();
let op = KernelOp::WorkflowTick { wid: 999 };
let result = op.execute(&state);
assert!(matches!(result, Err(StepError::KernelOpFailed(_))));
}
#[test]
fn test_workflow_tick_preserves_other_workflows() {
let mut state = State::empty();
let _ = state.insert_workflow(1, WorkflowInstance::running(100));
let _ = state.insert_workflow(2, WorkflowInstance::running(200));
let op = KernelOp::WorkflowTick { wid: 1 };
let new_state = op.execute(&state).expect("workflow_tick should succeed");
assert!(new_state
.get_workflow(2)
.map(|w| w.is_running())
.unwrap_or(false));
}
}