1use crate::state::State;
10use crate::types::{ActorId, ThreadId, WorkflowId};
11
12use super::{KernelOpError, StepError};
13
14#[derive(Debug, Clone, PartialEq, Eq)]
18#[must_use = "kernel operations must be executed"]
19pub enum KernelOp {
20 RouteOne {
24 dst: ActorId,
26 },
27
28 WorkflowTick {
32 wid: WorkflowId,
34 },
35
36 TimeTick,
40
41 UnblockSend {
45 dst: ActorId,
47 },
48
49 ThreadSwitch {
53 from: ThreadId,
55 to: ThreadId,
57 },
58}
59
60impl KernelOp {
61 pub fn execute(&self, state: &State) -> Result<State, StepError> {
77 match self {
78 KernelOp::RouteOne { dst } => execute_route_one(state, *dst),
79 KernelOp::WorkflowTick { wid } => execute_workflow_tick(state, *wid),
80 KernelOp::TimeTick => execute_time_tick(state),
81 KernelOp::UnblockSend { dst } => execute_unblock_send(state, *dst),
82 KernelOp::ThreadSwitch { from, to } => execute_thread_switch(state, *from, *to),
83 }
84 }
85
86 pub fn execute_mut(&self, state: &mut State) -> Result<(), StepError> {
90 match self {
91 KernelOp::RouteOne { dst } => execute_route_one_mut(state, *dst),
92 KernelOp::WorkflowTick { wid } => execute_workflow_tick_mut(state, *wid),
93 KernelOp::TimeTick => execute_time_tick_mut(state),
94 KernelOp::UnblockSend { dst } => execute_unblock_send_mut(state, *dst),
95 KernelOp::ThreadSwitch { from, to } => execute_thread_switch_mut(state, *from, *to),
96 }
97 }
98}
99
100fn execute_route_one(state: &State, dst: ActorId) -> Result<State, StepError> {
107 let actor = state.get_actor(dst).ok_or(StepError::ActorNotFound(dst))?;
108
109 if actor.pending_len() == 0 {
111 return Err(StepError::KernelOpFailed(
112 KernelOpError::NoPendingMessages { dst },
113 ));
114 }
115
116 if actor.mailbox_len() >= actor.capacity() {
118 return Err(StepError::KernelOpFailed(
119 KernelOpError::MailboxAtCapacity { dst },
120 ));
121 }
122
123 let mut new_state = state.clone();
125 if let Some(actor) = new_state.get_actor_mut(dst) {
126 let _ = actor.deliver_mut();
127 }
128
129 Ok(new_state)
130}
131
132fn execute_workflow_tick(state: &State, wid: WorkflowId) -> Result<State, StepError> {
143 let workflow = state.get_workflow(wid).ok_or(StepError::KernelOpFailed(
144 KernelOpError::WorkflowNotFound { wid },
145 ))?;
146
147 if !workflow.is_running() {
149 return Err(StepError::KernelOpFailed(
150 KernelOpError::WorkflowNotRunning { wid },
151 ));
152 }
153
154 Ok(state.clone())
157}
158
159fn execute_time_tick(state: &State) -> Result<State, StepError> {
165 let mut new_state = state.clone();
166 new_state
167 .tick()
168 .map_err(|e| StepError::KernelOpFailed(KernelOpError::CounterOverflow(e.to_string())))?;
169 Ok(new_state)
170}
171
172fn execute_unblock_send(state: &State, dst: ActorId) -> Result<State, StepError> {
178 let _actor = state.get_actor(dst).ok_or(StepError::ActorNotFound(dst))?;
179
180 let mut new_state = state.clone();
181 if let Some(actor) = new_state.get_actor_mut(dst) {
182 actor.unblock_mut();
183 }
184
185 Ok(new_state)
186}
187
188fn execute_thread_switch(
198 _state: &State,
199 _from: ThreadId,
200 _to: ThreadId,
201) -> Result<State, StepError> {
202 Err(StepError::KernelOpFailed(KernelOpError::NotImplemented {
203 operation: "thread_switch",
204 }))
205}
206
207fn execute_route_one_mut(state: &mut State, dst: ActorId) -> Result<(), StepError> {
210 let actor = state.get_actor(dst).ok_or(StepError::ActorNotFound(dst))?;
211
212 if actor.pending_len() == 0 {
213 return Err(StepError::KernelOpFailed(
214 KernelOpError::NoPendingMessages { dst },
215 ));
216 }
217
218 if actor.mailbox_len() >= actor.capacity() {
219 return Err(StepError::KernelOpFailed(
220 KernelOpError::MailboxAtCapacity { dst },
221 ));
222 }
223
224 if let Some(actor) = state.get_actor_mut(dst) {
225 let _ = actor.deliver_mut();
226 }
227
228 Ok(())
229}
230
231fn execute_workflow_tick_mut(state: &mut State, wid: WorkflowId) -> Result<(), StepError> {
232 let workflow = state.get_workflow(wid).ok_or(StepError::KernelOpFailed(
233 KernelOpError::WorkflowNotFound { wid },
234 ))?;
235
236 if !workflow.is_running() {
237 return Err(StepError::KernelOpFailed(
238 KernelOpError::WorkflowNotRunning { wid },
239 ));
240 }
241
242 Ok(())
244}
245
246fn execute_time_tick_mut(state: &mut State) -> Result<(), StepError> {
247 state
248 .tick()
249 .map_err(|e| StepError::KernelOpFailed(KernelOpError::CounterOverflow(e.to_string())))?;
250 Ok(())
251}
252
253fn execute_unblock_send_mut(state: &mut State, dst: ActorId) -> Result<(), StepError> {
254 let _actor = state.get_actor(dst).ok_or(StepError::ActorNotFound(dst))?;
255
256 if let Some(actor) = state.get_actor_mut(dst) {
257 actor.unblock_mut();
258 }
259
260 Ok(())
261}
262
263fn execute_thread_switch_mut(
264 _state: &mut State,
265 _from: ThreadId,
266 _to: ThreadId,
267) -> Result<(), StepError> {
268 Err(StepError::KernelOpFailed(KernelOpError::NotImplemented {
269 operation: "thread_switch",
270 }))
271}
272
273#[cfg(test)]
279mod tests {
280 use super::*;
281 use crate::state::{ActorRuntime, Message, PluginState, WorkflowInstance};
282 use crate::types::SecurityLevel;
283
284 fn make_test_message(id: u128) -> Message {
285 Message::new(id, 1, 2, SecurityLevel::Public, vec![1, 2, 3])
286 }
287
288 #[test]
289 fn test_time_tick_increments_time() {
290 let state = State::empty();
291 assert_eq!(state.time(), 0);
292
293 let op = KernelOp::TimeTick;
294 let new_state = op.execute(&state).expect("time_tick should succeed");
295
296 assert_eq!(new_state.time(), 1);
297 }
298
299 #[test]
300 fn test_time_tick_preserves_plugins() {
301 let mut state = State::empty();
303 let _ = state.insert_plugin(1, PluginState::empty(SecurityLevel::Public, 100));
304
305 let op = KernelOp::TimeTick;
306 let new_state = op.execute(&state).expect("time_tick should succeed");
307
308 assert!(new_state.get_plugin(1).is_some());
310 assert_eq!(
311 new_state.get_plugin(1).map(|p| p.memory_bounds()),
312 Some(100)
313 );
314 }
315
316 #[test]
317 fn test_time_tick_preserves_actors() {
318 let mut state = State::empty();
320 let _ = state.insert_actor(1, ActorRuntime::empty(10));
321
322 let op = KernelOp::TimeTick;
323 let new_state = op.execute(&state).expect("time_tick should succeed");
324
325 assert!(new_state.get_actor(1).is_some());
327 assert_eq!(new_state.get_actor(1).map(|a| a.capacity()), Some(10));
328 }
329
330 #[test]
331 fn test_route_one_moves_message() {
332 let mut state = State::empty();
333 let mut actor = ActorRuntime::empty(10);
334 actor.enqueue_pending_mut(make_test_message(42));
335 state.insert_actor(1, actor).unwrap();
336
337 assert_eq!(state.get_actor(1).map(|a| a.pending_len()), Some(1));
338 assert_eq!(state.get_actor(1).map(|a| a.mailbox_len()), Some(0));
339
340 let op = KernelOp::RouteOne { dst: 1 };
341 let new_state = op.execute(&state).expect("route_one should succeed");
342
343 assert_eq!(new_state.get_actor(1).map(|a| a.pending_len()), Some(0));
344 assert_eq!(new_state.get_actor(1).map(|a| a.mailbox_len()), Some(1));
345 }
346
347 #[test]
348 fn test_route_one_preserves_other_actors() {
349 let mut state = State::empty();
351
352 let mut actor1 = ActorRuntime::empty(10);
353 actor1.enqueue_pending_mut(make_test_message(1));
354 state.insert_actor(1, actor1).unwrap();
355
356 let actor2 = ActorRuntime::empty(5);
357 state.insert_actor(2, actor2).unwrap();
358
359 let op = KernelOp::RouteOne { dst: 1 };
360 let new_state = op.execute(&state).expect("route_one should succeed");
361
362 assert_eq!(new_state.get_actor(2).map(|a| a.capacity()), Some(5));
364 assert_eq!(new_state.get_actor(2).map(|a| a.pending_len()), Some(0));
365 }
366
367 #[test]
368 fn test_route_one_preserves_plugins() {
369 let mut state = State::empty();
371
372 let mut actor = ActorRuntime::empty(10);
373 actor.enqueue_pending_mut(make_test_message(1));
374 state.insert_actor(1, actor).unwrap();
375
376 let _ = state.insert_plugin(1, PluginState::empty(SecurityLevel::Secret, 1024));
377
378 let op = KernelOp::RouteOne { dst: 1 };
379 let new_state = op.execute(&state).expect("route_one should succeed");
380
381 assert_eq!(new_state.plugin_level(1), Some(SecurityLevel::Secret));
383 }
384
385 #[test]
386 fn test_route_one_no_pending_fails() {
387 let mut state = State::empty();
388 let _ = state.insert_actor(1, ActorRuntime::empty(10));
389
390 let op = KernelOp::RouteOne { dst: 1 };
391 let result = op.execute(&state);
392
393 assert!(matches!(result, Err(StepError::KernelOpFailed(_))));
394 }
395
396 #[test]
397 fn test_route_one_mailbox_full_fails() {
398 let mut state = State::empty();
399
400 let mut actor = ActorRuntime::empty(1);
402 actor.enqueue_pending_mut(make_test_message(1));
403 let _ = actor.deliver_mut(); actor.enqueue_pending_mut(make_test_message(2)); state.insert_actor(1, actor).unwrap();
406
407 let op = KernelOp::RouteOne { dst: 1 };
408 let result = op.execute(&state);
409
410 assert!(matches!(result, Err(StepError::KernelOpFailed(_))));
411 }
412
413 #[test]
414 fn test_unblock_send_clears_blocked() {
415 let mut state = State::empty();
416 let mut actor = ActorRuntime::empty(10);
417 actor.set_blocked_mut(42);
418 state.insert_actor(1, actor).unwrap();
419
420 assert!(state.get_actor(1).map(|a| a.is_blocked()).unwrap_or(false));
421
422 let op = KernelOp::UnblockSend { dst: 1 };
423 let new_state = op.execute(&state).expect("unblock_send should succeed");
424
425 assert!(!new_state
426 .get_actor(1)
427 .map(|a| a.is_blocked())
428 .unwrap_or(true));
429 }
430
431 #[test]
432 fn test_unblock_send_preserves_mailbox() {
433 let mut state = State::empty();
435 let mut actor = ActorRuntime::empty(10);
436 actor.enqueue_pending_mut(make_test_message(1));
437 let _ = actor.deliver_mut();
438 actor.set_blocked_mut(42);
439 state.insert_actor(1, actor).unwrap();
440
441 let initial_mailbox_len = state.get_actor(1).map(|a| a.mailbox_len()).unwrap_or(0);
442
443 let op = KernelOp::UnblockSend { dst: 1 };
444 let new_state = op.execute(&state).expect("unblock_send should succeed");
445
446 let final_mailbox_len = new_state.get_actor(1).map(|a| a.mailbox_len()).unwrap_or(0);
447 assert_eq!(initial_mailbox_len, final_mailbox_len);
448 }
449
450 #[test]
451 fn test_workflow_tick_not_found() {
452 let state = State::empty();
453
454 let op = KernelOp::WorkflowTick { wid: 999 };
455 let result = op.execute(&state);
456
457 assert!(matches!(result, Err(StepError::KernelOpFailed(_))));
458 }
459
460 #[test]
461 fn test_workflow_tick_preserves_other_workflows() {
462 let mut state = State::empty();
464
465 let _ = state.insert_workflow(1, WorkflowInstance::running(100));
466 let _ = state.insert_workflow(2, WorkflowInstance::running(200));
467
468 let op = KernelOp::WorkflowTick { wid: 1 };
469 let new_state = op.execute(&state).expect("workflow_tick should succeed");
470
471 assert!(new_state
473 .get_workflow(2)
474 .map(|w| w.is_running())
475 .unwrap_or(false));
476 }
477}