use log::{debug, info, trace};
use std::collections::HashMap;
use std::thread;
use std::time::Duration;
use crate::actor::{ActorCell, Context, SenderType, Uri};
use crate::executor::{
CommandChannel, Executor, ExecutorCommands, ExecutorFactory, ExecutorHandle,
};
use crate::system::RuntimeManagerRef;
pub struct ThreadExecutorFactory {}
impl ExecutorFactory for ThreadExecutorFactory {
fn spawn_executor(
&self,
name: String,
command_channel: CommandChannel<ExecutorCommands>,
manager_ref: RuntimeManagerRef,
) -> ExecutorHandle {
let t =
thread::spawn(move || ThreadExecutor::init(name, command_channel, manager_ref).run());
ExecutorHandle::new(move || t.join().unwrap())
}
}
struct ThreadExecutor {
name: String,
actor_cells: HashMap<Uri, ActorCell>,
command_channel: CommandChannel<ExecutorCommands>,
runtime_manager: RuntimeManagerRef,
}
impl ThreadExecutor {
fn init(
name: String,
command_channel: CommandChannel<ExecutorCommands>,
runtime_manager: RuntimeManagerRef,
) -> ThreadExecutor {
ThreadExecutor {
name,
actor_cells: HashMap::new(),
command_channel,
runtime_manager,
}
}
fn assert_unique_address(&self, address: &Uri) {
if self.actor_cells.contains_key(address) {
panic!("Actor name {} already exists", address);
}
}
}
impl Executor for ThreadExecutor {
fn run(mut self) {
const SLEEP_DURATION_MS: u64 = 1;
loop {
if !self.command_channel.recv_is_empty() {
match self.command_channel.recv().unwrap() {
ExecutorCommands::AssignActor(mut cell) => {
debug!("received actor assignment for {}", &cell.address.uri);
self.assert_unique_address(&cell.address.uri);
trace!("calling before_start for actor {}", &cell.address.uri);
cell.actor.before_start(Context {
address: &cell.address,
runtime_manager: &self.runtime_manager,
parent: &cell.parent,
children: &mut cell.children,
sender: &SenderType::System,
});
self.actor_cells.insert(cell.address.uri.clone(), cell);
}
ExecutorCommands::Shutdown => {
info!("received shutdown command");
break;
}
}
}
for (_, cell) in self.actor_cells.iter_mut() {
if !cell.mailbox.is_empty() {
let result = cell.mailbox.try_recv();
if let Ok(letter) = result {
trace!("[{}] processing message: {:?}", &cell.address, &letter);
cell.actor.receive(
Context {
address: &cell.address,
runtime_manager: &self.runtime_manager,
parent: &cell.parent,
children: &mut cell.children,
sender: &letter.sender,
},
letter.payload,
);
}
}
}
trace!("nothing to do, sleeping...");
thread::sleep(Duration::from_millis(SLEEP_DURATION_MS));
}
self.runtime_manager.notify_shutdown(self.name);
}
}