#![allow(clippy::non_std_lazy_statics)]
use crate::runtime::actor_runtime::{ActorFieldValue, ActorMessage};
use crate::runtime::InterpreterError;
use std::collections::HashMap;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{self, JoinHandle};
use std::time::Duration;
#[derive(Debug, Clone)]
pub enum SupervisionStrategy {
OneForOne { max_restarts: u32, within: Duration },
AllForOne { max_restarts: u32, within: Duration },
RestForOne { max_restarts: u32, within: Duration },
}
#[derive(Debug, Clone, PartialEq)]
pub enum ActorState {
Starting,
Running,
Stopping,
Stopped,
Restarting,
Failed(String),
}
#[derive(Debug)]
pub enum Envelope {
UserMessage {
from: Option<String>,
message: ActorMessage,
},
SystemMessage(SystemMessage),
}
#[derive(Debug)]
pub enum SystemMessage {
Start,
Stop,
Restart,
Supervise(String, String), }
pub struct ConcurrentActor {
pub id: String,
pub actor_type: String,
pub state: Arc<RwLock<HashMap<String, ActorFieldValue>>>,
pub lifecycle_state: Arc<RwLock<ActorState>>,
pub mailbox_sender: Sender<Envelope>,
pub thread_handle: Option<JoinHandle<()>>,
pub children: Arc<RwLock<Vec<String>>>,
pub supervisor: Option<String>,
pub supervision_strategy: SupervisionStrategy,
pub restart_count: Arc<Mutex<u32>>,
pub last_restart: Arc<Mutex<std::time::Instant>>,
}
impl ConcurrentActor {
pub fn new(
id: String,
actor_type: String,
initial_state: HashMap<String, ActorFieldValue>,
supervisor: Option<String>,
) -> Self {
let (tx, _rx) = channel();
Self {
id,
actor_type,
state: Arc::new(RwLock::new(initial_state)),
lifecycle_state: Arc::new(RwLock::new(ActorState::Starting)),
mailbox_sender: tx,
thread_handle: None,
children: Arc::new(RwLock::new(Vec::new())),
supervisor,
supervision_strategy: SupervisionStrategy::OneForOne {
max_restarts: 3,
within: Duration::from_secs(60),
},
restart_count: Arc::new(Mutex::new(0)),
last_restart: Arc::new(Mutex::new(std::time::Instant::now())),
}
}
pub fn start(
&mut self,
receive_handlers: HashMap<String, String>,
) -> Result<(), InterpreterError> {
let (tx, rx) = channel();
self.mailbox_sender = tx;
let id = self.id.clone();
let state = Arc::clone(&self.state);
let lifecycle_state = Arc::clone(&self.lifecycle_state);
let children = Arc::clone(&self.children);
{
let mut ls = lifecycle_state
.write()
.expect("RwLock poisoned: actor write lock is corrupted");
*ls = ActorState::Running;
}
let handle = thread::spawn(move || {
Self::actor_loop(id, rx, state, lifecycle_state, children, receive_handlers);
});
self.thread_handle = Some(handle);
Ok(())
}
fn actor_loop(
id: String,
receiver: Receiver<Envelope>,
state: Arc<RwLock<HashMap<String, ActorFieldValue>>>,
lifecycle_state: Arc<RwLock<ActorState>>,
children: Arc<RwLock<Vec<String>>>,
receive_handlers: HashMap<String, String>,
) {
loop {
{
let ls = lifecycle_state
.read()
.expect("RwLock poisoned: actor read lock is corrupted");
match *ls {
ActorState::Stopping | ActorState::Stopped => break,
ActorState::Failed(_) => {
thread::sleep(Duration::from_millis(100));
continue;
}
_ => {}
}
}
if let Ok(envelope) = receiver.recv_timeout(Duration::from_millis(100)) {
Self::process_envelope(
&id,
envelope,
&state,
&lifecycle_state,
&children,
&receive_handlers,
);
} else {
}
}
let mut ls = lifecycle_state
.write()
.expect("RwLock poisoned: actor write lock is corrupted");
*ls = ActorState::Stopped;
}
fn process_envelope(
id: &str,
envelope: Envelope,
state: &Arc<RwLock<HashMap<String, ActorFieldValue>>>,
lifecycle_state: &Arc<RwLock<ActorState>>,
children: &Arc<RwLock<Vec<String>>>,
receive_handlers: &HashMap<String, String>,
) {
match envelope {
Envelope::UserMessage { from: _, message } => {
if receive_handlers.contains_key(&message.message_type) {
if message.message_type == "Increment" {
let mut state_guard = state
.write()
.expect("RwLock poisoned: actor write lock is corrupted");
if let Some(ActorFieldValue::Integer(count)) = state_guard.get("count") {
let new_count = count + 1;
state_guard
.insert("count".to_string(), ActorFieldValue::Integer(new_count));
}
}
}
}
Envelope::SystemMessage(sys_msg) => {
Self::handle_system_message(id, sys_msg, lifecycle_state, children);
}
}
}
fn handle_system_message(
_id: &str,
message: SystemMessage,
lifecycle_state: &Arc<RwLock<ActorState>>,
_children: &Arc<RwLock<Vec<String>>>,
) {
match message {
SystemMessage::Stop => {
let mut ls = lifecycle_state
.write()
.expect("RwLock poisoned: actor write lock is corrupted");
*ls = ActorState::Stopping;
}
SystemMessage::Restart => {
let mut ls = lifecycle_state
.write()
.expect("RwLock poisoned: actor write lock is corrupted");
*ls = ActorState::Restarting;
}
SystemMessage::Start => {
let mut ls = lifecycle_state
.write()
.expect("RwLock poisoned: actor write lock is corrupted");
*ls = ActorState::Running;
}
SystemMessage::Supervise(child_id, error) => {
println!("Child {child_id} failed: {error}");
}
}
}
pub fn stop(&mut self) -> Result<(), InterpreterError> {
self.mailbox_sender
.send(Envelope::SystemMessage(SystemMessage::Stop))
.map_err(|_| {
InterpreterError::RuntimeError("Failed to send stop message".to_string())
})?;
if let Some(handle) = self.thread_handle.take() {
handle.join().map_err(|_| {
InterpreterError::RuntimeError("Failed to join actor thread".to_string())
})?;
}
Ok(())
}
pub fn send(
&self,
message: ActorMessage,
from: Option<String>,
) -> Result<(), InterpreterError> {
self.mailbox_sender
.send(Envelope::UserMessage { from, message })
.map_err(|_| InterpreterError::RuntimeError("Actor mailbox closed".to_string()))
}
pub fn should_restart(&self) -> bool {
match self.supervision_strategy {
SupervisionStrategy::OneForOne {
max_restarts,
within,
} => {
let count = *self
.restart_count
.lock()
.expect("Mutex poisoned: actor lock is corrupted");
let last = *self
.last_restart
.lock()
.expect("Mutex poisoned: actor lock is corrupted");
if last.elapsed() > within {
*self
.restart_count
.lock()
.expect("Mutex poisoned: actor lock is corrupted") = 0;
true
} else {
count < max_restarts
}
}
_ => true, }
}
pub fn restart(
&mut self,
receive_handlers: HashMap<String, String>,
) -> Result<(), InterpreterError> {
self.stop()?;
{
let mut count = self
.restart_count
.lock()
.expect("Mutex poisoned: actor lock is corrupted");
*count += 1;
let mut last = self
.last_restart
.lock()
.expect("Mutex poisoned: actor lock is corrupted");
*last = std::time::Instant::now();
}
{
let mut state_guard = self
.state
.write()
.expect("RwLock poisoned: actor write lock is corrupted");
state_guard.clear();
state_guard.insert("count".to_string(), ActorFieldValue::Integer(0));
}
self.start(receive_handlers)
}
}
pub struct ConcurrentActorSystem {
actors: Arc<RwLock<HashMap<String, Arc<Mutex<ConcurrentActor>>>>>,
supervision_tree: Arc<RwLock<HashMap<String, Vec<String>>>>, }
impl Default for ConcurrentActorSystem {
fn default() -> Self {
Self {
actors: Arc::new(RwLock::new(HashMap::new())),
supervision_tree: Arc::new(RwLock::new(HashMap::new())),
}
}
}
impl ConcurrentActorSystem {
pub fn new() -> Self {
Self::default()
}
pub fn spawn_actor(
&self,
actor_type: String,
initial_state: HashMap<String, ActorFieldValue>,
receive_handlers: HashMap<String, String>,
supervisor: Option<String>,
) -> Result<String, InterpreterError> {
let id = format!("actor_{}_{}", actor_type, uuid::Uuid::new_v4());
let mut actor =
ConcurrentActor::new(id.clone(), actor_type, initial_state, supervisor.clone());
actor.start(receive_handlers)?;
{
let mut actors = self
.actors
.write()
.expect("RwLock poisoned: actor write lock is corrupted");
actors.insert(id.clone(), Arc::new(Mutex::new(actor)));
}
if let Some(sup_id) = supervisor {
let mut tree = self
.supervision_tree
.write()
.expect("RwLock poisoned: actor write lock is corrupted");
tree.entry(sup_id).or_default().push(id.clone());
}
Ok(id)
}
pub fn send_message(
&self,
actor_id: &str,
message: ActorMessage,
from: Option<String>,
) -> Result<(), InterpreterError> {
let actors = self
.actors
.read()
.expect("RwLock poisoned: actor read lock is corrupted");
if let Some(actor) = actors.get(actor_id) {
let actor = actor
.lock()
.expect("Mutex poisoned: actor lock is corrupted");
actor.send(message, from)
} else {
Err(InterpreterError::RuntimeError(format!(
"Actor not found: {actor_id}"
)))
}
}
pub fn handle_failure(
&self,
failed_id: &str,
_error: String,
supervisor_id: &str,
) -> Result<(), InterpreterError> {
let actors = self
.actors
.read()
.expect("RwLock poisoned: actor read lock is corrupted");
if let Some(supervisor) = actors.get(supervisor_id) {
let sup = supervisor
.lock()
.expect("Mutex poisoned: actor lock is corrupted");
let should_restart = sup.should_restart();
if should_restart {
if let Some(failed) = actors.get(failed_id) {
let mut failed_actor = failed
.lock()
.expect("Mutex poisoned: actor lock is corrupted");
match &sup.supervision_strategy {
SupervisionStrategy::OneForOne { .. } => {
failed_actor.restart(HashMap::new())?;
}
SupervisionStrategy::AllForOne { .. } => {
let tree = self
.supervision_tree
.read()
.expect("RwLock poisoned: actor read lock is corrupted");
if let Some(children) = tree.get(supervisor_id) {
for child_id in children {
if let Some(child) = actors.get(child_id) {
let mut child_actor = child
.lock()
.expect("Mutex poisoned: actor lock is corrupted");
child_actor.restart(HashMap::new())?;
}
}
}
}
SupervisionStrategy::RestForOne { .. } => {
failed_actor.restart(HashMap::new())?;
}
}
}
} else {
if let Some(failed) = actors.get(failed_id) {
let mut failed_actor = failed
.lock()
.expect("Mutex poisoned: actor lock is corrupted");
failed_actor.stop()?;
}
}
}
Ok(())
}
pub fn shutdown(&self) -> Result<(), InterpreterError> {
let actors = self
.actors
.read()
.expect("RwLock poisoned: actor read lock is corrupted");
for (_id, actor) in actors.iter() {
let mut actor = actor
.lock()
.expect("Mutex poisoned: actor lock is corrupted");
actor.stop()?;
}
Ok(())
}
}
lazy_static::lazy_static! {
pub static ref CONCURRENT_ACTOR_SYSTEM: ConcurrentActorSystem = ConcurrentActorSystem::new();
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_concurrent_actor_creation() {
let mut state = HashMap::new();
state.insert("count".to_string(), ActorFieldValue::Integer(0));
let mut actor =
ConcurrentActor::new("test_actor".to_string(), "Counter".to_string(), state, None);
let handlers = HashMap::new();
assert!(actor.start(handlers).is_ok());
{
let ls = actor
.lifecycle_state
.read()
.expect("RwLock poisoned: actor read lock is corrupted");
assert_eq!(*ls, ActorState::Running);
}
assert!(actor.stop().is_ok());
}
#[test]
fn test_concurrent_message_sending() {
let system = ConcurrentActorSystem::new();
let mut state = HashMap::new();
state.insert("count".to_string(), ActorFieldValue::Integer(0));
let mut handlers = HashMap::new();
handlers.insert("Increment".to_string(), "handler".to_string());
let actor_id = system
.spawn_actor("Counter".to_string(), state, handlers, None)
.expect("spawn_actor should succeed in test");
let msg = ActorMessage {
message_type: "Increment".to_string(),
data: vec![],
};
assert!(system.send_message(&actor_id, msg, None).is_ok());
thread::sleep(Duration::from_millis(200));
let actors = system
.actors
.read()
.expect("RwLock poisoned: actor read lock is corrupted");
let actor = actors.get(&actor_id).expect("actor should exist in test");
let actor = actor
.lock()
.expect("Mutex poisoned: actor lock is corrupted");
let state = actor
.state
.read()
.expect("RwLock poisoned: actor read lock is corrupted");
assert_eq!(state.get("count"), Some(&ActorFieldValue::Integer(1)));
}
#[test]
fn test_supervision_strategy_one_for_one() {
let strategy = SupervisionStrategy::OneForOne {
max_restarts: 3,
within: Duration::from_secs(60),
};
if let SupervisionStrategy::OneForOne {
max_restarts,
within,
} = strategy
{
assert_eq!(max_restarts, 3);
assert_eq!(within, Duration::from_secs(60));
} else {
panic!("Expected OneForOne");
}
}
#[test]
fn test_supervision_strategy_all_for_one() {
let strategy = SupervisionStrategy::AllForOne {
max_restarts: 5,
within: Duration::from_secs(120),
};
if let SupervisionStrategy::AllForOne {
max_restarts,
within,
} = strategy
{
assert_eq!(max_restarts, 5);
assert_eq!(within, Duration::from_secs(120));
} else {
panic!("Expected AllForOne");
}
}
#[test]
fn test_supervision_strategy_rest_for_one() {
let strategy = SupervisionStrategy::RestForOne {
max_restarts: 2,
within: Duration::from_secs(30),
};
if let SupervisionStrategy::RestForOne {
max_restarts,
within,
} = strategy
{
assert_eq!(max_restarts, 2);
assert_eq!(within, Duration::from_secs(30));
} else {
panic!("Expected RestForOne");
}
}
#[test]
fn test_actor_state_variants() {
assert_eq!(ActorState::Starting, ActorState::Starting);
assert_eq!(ActorState::Running, ActorState::Running);
assert_eq!(ActorState::Stopping, ActorState::Stopping);
assert_eq!(ActorState::Stopped, ActorState::Stopped);
assert_eq!(ActorState::Restarting, ActorState::Restarting);
assert_ne!(ActorState::Starting, ActorState::Running);
}
#[test]
fn test_actor_state_failed() {
let failed = ActorState::Failed("test error".to_string());
if let ActorState::Failed(msg) = failed {
assert_eq!(msg, "test error");
} else {
panic!("Expected Failed state");
}
}
#[test]
fn test_envelope_user_message() {
let msg = ActorMessage {
message_type: "Test".to_string(),
data: vec![],
};
let envelope = Envelope::UserMessage {
from: Some("sender".to_string()),
message: msg,
};
if let Envelope::UserMessage { from, message } = envelope {
assert_eq!(from, Some("sender".to_string()));
assert_eq!(message.message_type, "Test");
} else {
panic!("Expected UserMessage");
}
}
#[test]
fn test_envelope_system_message() {
let envelope = Envelope::SystemMessage(SystemMessage::Stop);
if let Envelope::SystemMessage(SystemMessage::Stop) = envelope {
} else {
panic!("Expected SystemMessage::Stop");
}
}
#[test]
fn test_system_message_variants() {
let _ = SystemMessage::Start;
let _ = SystemMessage::Stop;
let _ = SystemMessage::Restart;
let supervise = SystemMessage::Supervise("child1".to_string(), "error".to_string());
if let SystemMessage::Supervise(child, error) = supervise {
assert_eq!(child, "child1");
assert_eq!(error, "error");
} else {
panic!("Expected Supervise");
}
}
#[test]
fn test_concurrent_actor_new_default_state() {
let state = HashMap::new();
let actor =
ConcurrentActor::new("test_id".to_string(), "TestType".to_string(), state, None);
assert_eq!(actor.id, "test_id");
assert_eq!(actor.actor_type, "TestType");
assert!(actor.supervisor.is_none());
let ls = actor.lifecycle_state.read().unwrap();
assert_eq!(*ls, ActorState::Starting);
}
#[test]
fn test_concurrent_actor_with_supervisor() {
let state = HashMap::new();
let actor = ConcurrentActor::new(
"child".to_string(),
"Child".to_string(),
state,
Some("parent".to_string()),
);
assert_eq!(actor.supervisor, Some("parent".to_string()));
}
#[test]
fn test_concurrent_actor_should_restart_within_limit() {
let state = HashMap::new();
let actor = ConcurrentActor::new("test".to_string(), "Test".to_string(), state, None);
assert!(actor.should_restart());
}
#[test]
fn test_concurrent_actor_system_new() {
let system = ConcurrentActorSystem::new();
let actors = system.actors.read().unwrap();
assert!(actors.is_empty());
}
#[test]
fn test_concurrent_actor_system_default() {
let system = ConcurrentActorSystem::default();
let tree = system.supervision_tree.read().unwrap();
assert!(tree.is_empty());
}
#[test]
fn test_send_message_to_nonexistent_actor() {
let system = ConcurrentActorSystem::new();
let msg = ActorMessage {
message_type: "Test".to_string(),
data: vec![],
};
let result = system.send_message("nonexistent", msg, None);
assert!(result.is_err());
}
#[test]
fn test_spawn_actor_with_supervisor() {
let system = ConcurrentActorSystem::new();
let parent_state = HashMap::new();
let parent_handlers = HashMap::new();
let parent_id = system
.spawn_actor("Parent".to_string(), parent_state, parent_handlers, None)
.expect("should spawn parent");
let child_state = HashMap::new();
let child_handlers = HashMap::new();
let child_id = system
.spawn_actor(
"Child".to_string(),
child_state,
child_handlers,
Some(parent_id.clone()),
)
.expect("should spawn child");
let tree = system.supervision_tree.read().unwrap();
assert!(tree.get(&parent_id).unwrap().contains(&child_id));
system.shutdown().ok();
}
#[test]
fn test_actor_system_shutdown() {
let system = ConcurrentActorSystem::new();
let state = HashMap::new();
let handlers = HashMap::new();
let _ = system
.spawn_actor("Test".to_string(), state, handlers, None)
.expect("should spawn");
assert!(system.shutdown().is_ok());
}
#[test]
fn test_actor_stop_lifecycle() {
let mut state = HashMap::new();
state.insert("value".to_string(), ActorFieldValue::Integer(42));
let mut actor = ConcurrentActor::new(
"lifecycle_test".to_string(),
"Test".to_string(),
state,
None,
);
let handlers = HashMap::new();
actor.start(handlers).expect("should start");
{
let ls = actor.lifecycle_state.read().unwrap();
assert_eq!(*ls, ActorState::Running);
}
actor.stop().expect("should stop");
{
let ls = actor.lifecycle_state.read().unwrap();
assert_eq!(*ls, ActorState::Stopped);
}
}
#[test]
fn test_actor_initial_state_preserved() {
let mut state = HashMap::new();
state.insert(
"name".to_string(),
ActorFieldValue::String("test".to_string()),
);
state.insert("count".to_string(), ActorFieldValue::Integer(100));
let actor = ConcurrentActor::new("state_test".to_string(), "Test".to_string(), state, None);
let s = actor.state.read().unwrap();
assert_eq!(
s.get("name"),
Some(&ActorFieldValue::String("test".to_string()))
);
assert_eq!(s.get("count"), Some(&ActorFieldValue::Integer(100)));
}
#[test]
fn test_actor_children_initially_empty() {
let state = HashMap::new();
let actor = ConcurrentActor::new("test".to_string(), "Test".to_string(), state, None);
let children = actor.children.read().unwrap();
assert!(children.is_empty());
}
#[test]
fn test_actor_restart_count_initially_zero() {
let state = HashMap::new();
let actor = ConcurrentActor::new("test".to_string(), "Test".to_string(), state, None);
let count = actor.restart_count.lock().unwrap();
assert_eq!(*count, 0);
}
#[test]
fn test_supervision_strategy_clone() {
let strategy = SupervisionStrategy::OneForOne {
max_restarts: 5,
within: Duration::from_secs(60),
};
let cloned = strategy.clone();
if let SupervisionStrategy::OneForOne { max_restarts, .. } = cloned {
assert_eq!(max_restarts, 5);
}
}
#[test]
fn test_actor_state_clone() {
let state = ActorState::Running;
let cloned = state.clone();
assert_eq!(cloned, ActorState::Running);
}
#[test]
fn test_actor_state_debug() {
let state = ActorState::Starting;
let debug_str = format!("{:?}", state);
assert!(debug_str.contains("Starting"));
}
#[test]
fn test_envelope_debug() {
let msg = ActorMessage {
message_type: "Test".to_string(),
data: vec![],
};
let envelope = Envelope::UserMessage {
from: None,
message: msg,
};
let debug_str = format!("{:?}", envelope);
assert!(debug_str.contains("UserMessage"));
}
#[test]
fn test_system_message_debug() {
let msg = SystemMessage::Start;
let debug_str = format!("{:?}", msg);
assert!(debug_str.contains("Start"));
}
#[test]
fn test_supervision_strategy_debug() {
let strategy = SupervisionStrategy::AllForOne {
max_restarts: 3,
within: Duration::from_secs(30),
};
let debug_str = format!("{:?}", strategy);
assert!(debug_str.contains("AllForOne"));
}
#[test]
fn test_actor_send_without_from() {
let mut state = HashMap::new();
state.insert("count".to_string(), ActorFieldValue::Integer(0));
let mut actor =
ConcurrentActor::new("send_test".to_string(), "Test".to_string(), state, None);
let handlers = HashMap::new();
actor.start(handlers).expect("should start");
let msg = ActorMessage {
message_type: "Test".to_string(),
data: vec![],
};
assert!(actor.send(msg, None).is_ok());
actor.stop().ok();
}
#[test]
fn test_actor_thread_handle_none_initially() {
let state = HashMap::new();
let actor =
ConcurrentActor::new("handle_test".to_string(), "Test".to_string(), state, None);
assert!(actor.thread_handle.is_none());
}
#[test]
fn test_actor_default_supervision_strategy() {
let state = HashMap::new();
let actor =
ConcurrentActor::new("strategy_test".to_string(), "Test".to_string(), state, None);
if let SupervisionStrategy::OneForOne {
max_restarts,
within,
} = actor.supervision_strategy
{
assert_eq!(max_restarts, 3);
assert_eq!(within, Duration::from_secs(60));
} else {
panic!("Expected OneForOne default strategy");
}
}
#[test]
fn test_multiple_actors_in_system() {
let system = ConcurrentActorSystem::new();
let state1 = HashMap::new();
let state2 = HashMap::new();
let handlers = HashMap::new();
let id1 = system
.spawn_actor("Type1".to_string(), state1, handlers.clone(), None)
.unwrap();
let id2 = system
.spawn_actor("Type2".to_string(), state2, handlers, None)
.unwrap();
let actors = system.actors.read().unwrap();
assert_eq!(actors.len(), 2);
assert!(actors.contains_key(&id1));
assert!(actors.contains_key(&id2));
drop(actors);
system.shutdown().ok();
}
#[test]
fn test_actor_id_format() {
let system = ConcurrentActorSystem::new();
let state = HashMap::new();
let handlers = HashMap::new();
let id = system
.spawn_actor("MyType".to_string(), state, handlers, None)
.unwrap();
assert!(id.starts_with("actor_MyType_"));
system.shutdown().ok();
}
#[test]
fn test_actor_last_restart_initialized() {
let state = HashMap::new();
let actor =
ConcurrentActor::new("restart_test".to_string(), "Test".to_string(), state, None);
let last = actor.last_restart.lock().unwrap();
assert!(last.elapsed() < Duration::from_secs(5));
}
#[test]
fn test_supervision_tree_initially_empty() {
let system = ConcurrentActorSystem::new();
let tree = system.supervision_tree.read().unwrap();
assert!(tree.is_empty());
}
#[test]
fn test_process_envelope_with_handler() {
let mut state = HashMap::new();
state.insert("count".to_string(), ActorFieldValue::Integer(0));
let mut actor =
ConcurrentActor::new("envelope_test".to_string(), "Test".to_string(), state, None);
let mut handlers = HashMap::new();
handlers.insert("Increment".to_string(), "handler".to_string());
actor.start(handlers.clone()).expect("should start");
let msg = ActorMessage {
message_type: "Increment".to_string(),
data: vec![],
};
actor
.send(msg, Some("sender".to_string()))
.expect("should send");
thread::sleep(Duration::from_millis(200));
let s = actor.state.read().unwrap();
assert_eq!(s.get("count"), Some(&ActorFieldValue::Integer(1)));
drop(s);
actor.stop().ok();
}
#[test]
fn test_process_envelope_no_handler() {
let state = HashMap::new();
let mut actor = ConcurrentActor::new(
"no_handler_test".to_string(),
"Test".to_string(),
state,
None,
);
let handlers = HashMap::new(); actor.start(handlers).expect("should start");
let msg = ActorMessage {
message_type: "Unknown".to_string(),
data: vec![],
};
actor.send(msg, None).expect("should send");
thread::sleep(Duration::from_millis(100));
actor.stop().ok();
}
#[test]
fn test_handle_system_message_start() {
let mut state = HashMap::new();
state.insert("count".to_string(), ActorFieldValue::Integer(0));
let mut actor = ConcurrentActor::new(
"sys_start_test".to_string(),
"Test".to_string(),
state,
None,
);
actor.start(HashMap::new()).expect("should start");
actor
.mailbox_sender
.send(Envelope::SystemMessage(SystemMessage::Start))
.expect("should send");
thread::sleep(Duration::from_millis(100));
let ls = actor.lifecycle_state.read().unwrap();
assert_eq!(*ls, ActorState::Running);
drop(ls);
actor.stop().ok();
}
#[test]
fn test_handle_system_message_restart() {
let state = HashMap::new();
let mut actor = ConcurrentActor::new(
"sys_restart_test".to_string(),
"Test".to_string(),
state,
None,
);
actor.start(HashMap::new()).expect("should start");
actor
.mailbox_sender
.send(Envelope::SystemMessage(SystemMessage::Restart))
.expect("should send");
thread::sleep(Duration::from_millis(100));
let ls = actor.lifecycle_state.read().unwrap();
assert_eq!(*ls, ActorState::Restarting);
drop(ls);
}
#[test]
fn test_handle_system_message_supervise() {
let state = HashMap::new();
let mut actor = ConcurrentActor::new(
"sys_supervise_test".to_string(),
"Test".to_string(),
state,
None,
);
actor.start(HashMap::new()).expect("should start");
actor
.mailbox_sender
.send(Envelope::SystemMessage(SystemMessage::Supervise(
"child1".to_string(),
"error".to_string(),
)))
.expect("should send");
thread::sleep(Duration::from_millis(100));
let ls = actor.lifecycle_state.read().unwrap();
assert_eq!(*ls, ActorState::Running);
drop(ls);
actor.stop().ok();
}
#[test]
fn test_should_restart_resets_counter() {
let state = HashMap::new();
let mut actor = ConcurrentActor::new(
"restart_reset_test".to_string(),
"Test".to_string(),
state,
None,
);
actor.supervision_strategy = SupervisionStrategy::OneForOne {
max_restarts: 2,
within: Duration::from_millis(1), };
*actor.restart_count.lock().unwrap() = 2;
*actor.last_restart.lock().unwrap() =
std::time::Instant::now() - Duration::from_millis(100);
assert!(actor.should_restart());
assert_eq!(*actor.restart_count.lock().unwrap(), 0);
}
#[test]
fn test_should_restart_at_limit() {
let state = HashMap::new();
let mut actor =
ConcurrentActor::new("at_limit_test".to_string(), "Test".to_string(), state, None);
actor.supervision_strategy = SupervisionStrategy::OneForOne {
max_restarts: 2,
within: Duration::from_secs(60),
};
*actor.restart_count.lock().unwrap() = 2;
*actor.last_restart.lock().unwrap() = std::time::Instant::now();
assert!(!actor.should_restart());
}
#[test]
fn test_should_restart_all_for_one() {
let state = HashMap::new();
let mut actor = ConcurrentActor::new(
"all_for_one_test".to_string(),
"Test".to_string(),
state,
None,
);
actor.supervision_strategy = SupervisionStrategy::AllForOne {
max_restarts: 2,
within: Duration::from_secs(60),
};
assert!(actor.should_restart());
}
#[test]
fn test_should_restart_rest_for_one() {
let state = HashMap::new();
let mut actor = ConcurrentActor::new(
"rest_for_one_test".to_string(),
"Test".to_string(),
state,
None,
);
actor.supervision_strategy = SupervisionStrategy::RestForOne {
max_restarts: 2,
within: Duration::from_secs(60),
};
assert!(actor.should_restart());
}
#[test]
fn test_actor_restart() {
let mut state = HashMap::new();
state.insert("count".to_string(), ActorFieldValue::Integer(10));
let mut actor = ConcurrentActor::new(
"restart_method_test".to_string(),
"Test".to_string(),
state,
None,
);
actor.start(HashMap::new()).expect("should start");
{
let s = actor.state.read().unwrap();
assert_eq!(s.get("count"), Some(&ActorFieldValue::Integer(10)));
}
let handlers = HashMap::new();
actor.restart(handlers).expect("should restart");
{
let s = actor.state.read().unwrap();
assert_eq!(s.get("count"), Some(&ActorFieldValue::Integer(0)));
}
assert_eq!(*actor.restart_count.lock().unwrap(), 1);
actor.stop().ok();
}
#[test]
fn test_handle_failure_one_for_one() {
let system = ConcurrentActorSystem::new();
let mut sup_state = HashMap::new();
sup_state.insert("count".to_string(), ActorFieldValue::Integer(0));
let sup_handlers = HashMap::new();
let sup_id = system
.spawn_actor("Supervisor".to_string(), sup_state, sup_handlers, None)
.expect("should spawn supervisor");
let mut child_state = HashMap::new();
child_state.insert("count".to_string(), ActorFieldValue::Integer(0));
let child_handlers = HashMap::new();
let child_id = system
.spawn_actor(
"Child".to_string(),
child_state,
child_handlers,
Some(sup_id.clone()),
)
.expect("should spawn child");
let result = system.handle_failure(&child_id, "test error".to_string(), &sup_id);
assert!(result.is_ok());
thread::sleep(Duration::from_millis(200));
system.shutdown().ok();
}
#[test]
fn test_handle_failure_supervisor_not_found() {
let system = ConcurrentActorSystem::new();
let mut child_state = HashMap::new();
child_state.insert("count".to_string(), ActorFieldValue::Integer(0));
let child_id = system
.spawn_actor("Child".to_string(), child_state, HashMap::new(), None)
.expect("should spawn");
let result = system.handle_failure(&child_id, "error".to_string(), "nonexistent_sup");
assert!(result.is_ok());
system.shutdown().ok();
}
#[test]
fn test_handle_failure_stop_after_max_restarts() {
let system = ConcurrentActorSystem::new();
let sup_state = HashMap::new();
let sup_id = system
.spawn_actor("Supervisor".to_string(), sup_state, HashMap::new(), None)
.expect("should spawn supervisor");
{
let actors = system.actors.read().unwrap();
let sup = actors.get(&sup_id).unwrap();
let mut sup_actor = sup.lock().unwrap();
sup_actor.supervision_strategy = SupervisionStrategy::OneForOne {
max_restarts: 0, within: Duration::from_secs(60),
};
}
let child_state = HashMap::new();
let child_id = system
.spawn_actor(
"Child".to_string(),
child_state,
HashMap::new(),
Some(sup_id.clone()),
)
.expect("should spawn child");
let result = system.handle_failure(&child_id, "error".to_string(), &sup_id);
assert!(result.is_ok());
thread::sleep(Duration::from_millis(200));
system.shutdown().ok();
}
#[test]
fn test_envelope_user_message_from_none() {
let msg = ActorMessage {
message_type: "Test".to_string(),
data: vec!["arg".to_string()],
};
let envelope = Envelope::UserMessage {
from: None,
message: msg,
};
if let Envelope::UserMessage { from, message } = envelope {
assert!(from.is_none());
assert_eq!(message.message_type, "Test");
assert_eq!(message.data, vec!["arg".to_string()]);
} else {
panic!("Expected UserMessage");
}
}
#[test]
fn test_actor_message_with_data() {
let msg = ActorMessage {
message_type: "Command".to_string(),
data: vec!["arg1".to_string(), "arg2".to_string(), "arg3".to_string()],
};
assert_eq!(msg.message_type, "Command");
assert_eq!(msg.data.len(), 3);
}
#[test]
fn test_actor_message_clone() {
let msg = ActorMessage {
message_type: "Test".to_string(),
data: vec!["data".to_string()],
};
let cloned = msg.clone();
assert_eq!(cloned.message_type, msg.message_type);
assert_eq!(cloned.data, msg.data);
}
#[test]
fn test_actor_field_value_types() {
let int_val = ActorFieldValue::Integer(42);
let float_val = ActorFieldValue::Float(3.14);
let str_val = ActorFieldValue::String("hello".to_string());
let bool_val = ActorFieldValue::Bool(true);
let nil_val = ActorFieldValue::Nil;
assert_eq!(int_val, ActorFieldValue::Integer(42));
assert_eq!(float_val, ActorFieldValue::Float(3.14));
assert_eq!(str_val, ActorFieldValue::String("hello".to_string()));
assert_eq!(bool_val, ActorFieldValue::Bool(true));
assert_eq!(nil_val, ActorFieldValue::Nil);
}
#[test]
fn test_supervision_tree_multiple_children() {
let system = ConcurrentActorSystem::new();
let parent_id = system
.spawn_actor("Parent".to_string(), HashMap::new(), HashMap::new(), None)
.expect("should spawn parent");
let child1_id = system
.spawn_actor(
"Child1".to_string(),
HashMap::new(),
HashMap::new(),
Some(parent_id.clone()),
)
.expect("should spawn child1");
let child2_id = system
.spawn_actor(
"Child2".to_string(),
HashMap::new(),
HashMap::new(),
Some(parent_id.clone()),
)
.expect("should spawn child2");
let tree = system.supervision_tree.read().unwrap();
let children = tree.get(&parent_id).unwrap();
assert!(children.contains(&child1_id));
assert!(children.contains(&child2_id));
assert_eq!(children.len(), 2);
drop(tree);
system.shutdown().ok();
}
#[test]
fn test_actor_state_multiple_fields() {
let mut state = HashMap::new();
state.insert(
"name".to_string(),
ActorFieldValue::String("actor1".to_string()),
);
state.insert("count".to_string(), ActorFieldValue::Integer(0));
state.insert("active".to_string(), ActorFieldValue::Bool(true));
state.insert("rate".to_string(), ActorFieldValue::Float(1.5));
let actor =
ConcurrentActor::new("multi_field".to_string(), "Test".to_string(), state, None);
let s = actor.state.read().unwrap();
assert_eq!(s.len(), 4);
assert!(s.contains_key("name"));
assert!(s.contains_key("count"));
assert!(s.contains_key("active"));
assert!(s.contains_key("rate"));
}
#[test]
fn test_actor_state_nil_field() {
let mut state = HashMap::new();
state.insert("value".to_string(), ActorFieldValue::Nil);
let actor = ConcurrentActor::new("nil_field".to_string(), "Test".to_string(), state, None);
let s = actor.state.read().unwrap();
assert_eq!(s.get("value"), Some(&ActorFieldValue::Nil));
}
#[test]
fn test_global_actor_system_exists() {
let actors = CONCURRENT_ACTOR_SYSTEM.actors.read().unwrap();
let _ = actors.len();
}
#[test]
fn test_actor_state_inequality() {
assert_ne!(ActorState::Starting, ActorState::Stopped);
assert_ne!(ActorState::Running, ActorState::Restarting);
assert_ne!(
ActorState::Stopping,
ActorState::Failed("error".to_string())
);
}
#[test]
fn test_actor_state_failed_equality() {
let f1 = ActorState::Failed("error1".to_string());
let f2 = ActorState::Failed("error1".to_string());
let f3 = ActorState::Failed("error2".to_string());
assert_eq!(f1, f2);
assert_ne!(f1, f3);
}
#[test]
fn test_stop_not_started_actor() {
let state = HashMap::new();
let mut actor =
ConcurrentActor::new("not_started".to_string(), "Test".to_string(), state, None);
let result = actor.stop();
assert!(result.is_err());
}
#[test]
fn test_send_to_stopped_actor() {
let state = HashMap::new();
let mut actor = ConcurrentActor::new(
"stop_send_test".to_string(),
"Test".to_string(),
state,
None,
);
actor.start(HashMap::new()).expect("should start");
actor.stop().expect("should stop");
let msg = ActorMessage {
message_type: "Test".to_string(),
data: vec![],
};
let result = actor.send(msg, None);
assert!(result.is_err());
}
#[test]
fn test_restart_tracking() {
let mut state = HashMap::new();
state.insert("count".to_string(), ActorFieldValue::Integer(0));
let mut actor =
ConcurrentActor::new("tracking_test".to_string(), "Test".to_string(), state, None);
actor.start(HashMap::new()).expect("should start");
assert_eq!(*actor.restart_count.lock().unwrap(), 0);
actor.restart(HashMap::new()).expect("should restart");
assert_eq!(*actor.restart_count.lock().unwrap(), 1);
actor.restart(HashMap::new()).expect("should restart");
assert_eq!(*actor.restart_count.lock().unwrap(), 2);
actor.stop().ok();
}
#[test]
fn test_lifecycle_during_start() {
let state = HashMap::new();
let mut actor = ConcurrentActor::new(
"lifecycle_start_test".to_string(),
"Test".to_string(),
state,
None,
);
{
let ls = actor.lifecycle_state.read().unwrap();
assert_eq!(*ls, ActorState::Starting);
}
actor.start(HashMap::new()).expect("should start");
{
let ls = actor.lifecycle_state.read().unwrap();
assert_eq!(*ls, ActorState::Running);
}
actor.stop().ok();
}
#[test]
fn test_send_multiple_messages() {
let mut state = HashMap::new();
state.insert("count".to_string(), ActorFieldValue::Integer(0));
let mut actor = ConcurrentActor::new(
"multi_msg_test".to_string(),
"Test".to_string(),
state,
None,
);
let mut handlers = HashMap::new();
handlers.insert("Increment".to_string(), "handler".to_string());
actor.start(handlers).expect("should start");
for _ in 0..5 {
let msg = ActorMessage {
message_type: "Increment".to_string(),
data: vec![],
};
actor.send(msg, None).expect("should send");
}
thread::sleep(Duration::from_millis(500));
let s = actor.state.read().unwrap();
assert_eq!(s.get("count"), Some(&ActorFieldValue::Integer(5)));
drop(s);
actor.stop().ok();
}
#[test]
fn test_actor_type_preserved() {
let state = HashMap::new();
let actor = ConcurrentActor::new(
"type_test".to_string(),
"CustomActorType".to_string(),
state,
None,
);
assert_eq!(actor.actor_type, "CustomActorType");
}
#[test]
fn test_supervision_strategy_changeable() {
let state = HashMap::new();
let mut actor = ConcurrentActor::new(
"strategy_change_test".to_string(),
"Test".to_string(),
state,
None,
);
if let SupervisionStrategy::OneForOne { .. } = actor.supervision_strategy {
} else {
panic!("Expected default OneForOne");
}
actor.supervision_strategy = SupervisionStrategy::AllForOne {
max_restarts: 5,
within: Duration::from_secs(120),
};
if let SupervisionStrategy::AllForOne { max_restarts, .. } = actor.supervision_strategy {
assert_eq!(max_restarts, 5);
} else {
panic!("Expected AllForOne");
}
}
#[test]
fn test_handle_failure_one_for_one_restarts_child() {
let system = ConcurrentActorSystem::new();
let parent_state = HashMap::new();
let parent_handlers = HashMap::new();
let supervisor_id = system
.spawn_actor("Supervisor".to_string(), parent_state, parent_handlers, None)
.expect("should spawn supervisor");
let mut child_state = HashMap::new();
child_state.insert("count".to_string(), ActorFieldValue::Integer(0));
let child_handlers = HashMap::new();
let child_id = system
.spawn_actor(
"Child".to_string(),
child_state,
child_handlers,
Some(supervisor_id.clone()),
)
.expect("should spawn child");
thread::sleep(Duration::from_millis(100));
let result = system.handle_failure(&child_id, "test error".to_string(), &supervisor_id);
assert!(result.is_ok());
system.shutdown().ok();
}
#[test]
fn test_handle_failure_nonexistent_supervisor() {
let system = ConcurrentActorSystem::new();
let child_state = HashMap::new();
let child_handlers = HashMap::new();
let child_id = system
.spawn_actor("Child".to_string(), child_state, child_handlers, None)
.expect("should spawn child");
let result = system.handle_failure(&child_id, "test error".to_string(), "nonexistent_sup");
assert!(result.is_ok());
system.shutdown().ok();
}
#[test]
fn test_handle_failure_supervisor_no_restart() {
let system = ConcurrentActorSystem::new();
let parent_state = HashMap::new();
let parent_handlers = HashMap::new();
let supervisor_id = system
.spawn_actor("Supervisor".to_string(), parent_state, parent_handlers, None)
.expect("should spawn supervisor");
{
let actors = system.actors.read().unwrap();
if let Some(sup) = actors.get(&supervisor_id) {
let sup = sup.lock().unwrap();
let mut count = sup.restart_count.lock().unwrap();
*count = 100; }
}
let child_state = HashMap::new();
let child_handlers = HashMap::new();
let child_id = system
.spawn_actor(
"Child".to_string(),
child_state,
child_handlers,
Some(supervisor_id.clone()),
)
.expect("should spawn child");
thread::sleep(Duration::from_millis(100));
let result = system.handle_failure(&child_id, "test error".to_string(), &supervisor_id);
assert!(result.is_ok());
system.shutdown().ok();
}
#[test]
fn test_handle_failure_rest_for_one_strategy() {
let system = ConcurrentActorSystem::new();
let parent_state = HashMap::new();
let parent_handlers = HashMap::new();
let supervisor_id = system
.spawn_actor("Supervisor".to_string(), parent_state, parent_handlers, None)
.expect("should spawn supervisor");
{
let actors = system.actors.read().unwrap();
if let Some(sup) = actors.get(&supervisor_id) {
let mut sup = sup.lock().unwrap();
sup.supervision_strategy = SupervisionStrategy::RestForOne {
max_restarts: 5,
within: Duration::from_secs(60),
};
}
}
let child_state = HashMap::new();
let child_handlers = HashMap::new();
let child_id = system
.spawn_actor(
"Child".to_string(),
child_state,
child_handlers,
Some(supervisor_id.clone()),
)
.expect("should spawn child");
thread::sleep(Duration::from_millis(100));
let result = system.handle_failure(&child_id, "test error".to_string(), &supervisor_id);
assert!(result.is_ok());
system.shutdown().ok();
}
}