use crossbeam_channel::{bounded, unbounded, Sender};
use log::{info, warn};
use std::collections::HashMap;
use std::thread;
use crate::actor::{Actor, ActorAddress, ActorCell, ActorInit, Letter, Uri};
use crate::executor::{get_executor_factory, ExecutorCommands, ExecutorHandle};
use crate::util::CommandChannel;
use crate::{actor, config};
pub struct ActorSystem {
executors: HashMap<String, ExecutorHandle>,
runtime_manager: RuntimeManagerRef,
runtime_thread_handle: thread::JoinHandle<()>,
root_actor_assigned: bool,
}
impl ActorSystem {
pub fn init(config: config::ActorSystemConfig) -> ActorSystem {
config.validate().unwrap();
let mut runtime_manager = RuntimeManager::init();
let executor_factory = get_executor_factory(&config.executor_config.executor_type);
let mut executors = HashMap::new();
for i in 0..(config.executor_config.num_executors) {
let command_channel = CommandChannel::new();
let executor_name = format!("executor-{}", i);
let executor_handle = executor_factory.spawn_executor(
executor_name.clone(),
command_channel.clone(),
runtime_manager.get_ref(),
);
executors.insert(executor_name.clone(), executor_handle);
runtime_manager.add_executor(executor_name.clone(), command_channel);
}
let runtime_manager_ref = runtime_manager.get_ref();
let runtime_thread_handle = thread::spawn(move || {
runtime_manager.run();
});
ActorSystem {
executors,
runtime_manager: runtime_manager_ref,
runtime_thread_handle,
root_actor_assigned: false,
}
}
pub fn spawn_root_actor<B, A: ActorInit<Init = B> + Actor + 'static>(
&mut self,
name: &str,
init_msg: &B,
) {
debug_assert!(
!self.executors.is_empty(),
"No executors available to spawn actor"
);
debug_assert!(!self.root_actor_assigned, "Root actor already assigned");
self.root_actor_assigned = true;
self.runtime_manager.assign_actor(
Box::new(A::init(init_msg)),
ActorAddress::new_root(name),
None,
);
}
pub fn shutdown(self) {
self.runtime_manager.shutdown_system();
self.await_shutdown();
}
pub fn await_shutdown(self) {
self.executors
.into_iter()
.for_each(|(_, manager)| manager.await_close());
self.runtime_thread_handle.join().unwrap();
}
}
struct RuntimeManager {
executor_command_channels: HashMap<String, CommandChannel<ExecutorCommands>>,
actor_registry: HashMap<Uri, actor::Mailbox>,
manager_command_channel: CommandChannel<ManagerCommands>,
round_robin_state: usize,
shutdown_initiated: bool,
}
impl RuntimeManager {
fn init() -> RuntimeManager {
RuntimeManager {
executor_command_channels: HashMap::new(),
actor_registry: HashMap::new(),
manager_command_channel: CommandChannel::new(),
round_robin_state: 0,
shutdown_initiated: false,
}
}
fn add_executor(&mut self, name: String, command_channel: CommandChannel<ExecutorCommands>) {
self.executor_command_channels.insert(name, command_channel);
}
fn get_ref(&self) -> RuntimeManagerRef {
RuntimeManagerRef::new(self.manager_command_channel.clone())
}
fn run(mut self) {
loop {
match self.manager_command_channel.recv() {
Ok(ManagerCommands::Shutdown) => {
if self.shutdown_initiated {
continue;
}
self.shutdown_initiated = true;
self.executor_command_channels
.iter()
.for_each(|(_, channel)| {
channel.send(ExecutorCommands::Shutdown).unwrap();
});
}
Ok(ManagerCommands::ExecutorShutdown { name }) => {
if self.executor_command_channels.contains_key(&name) {
self.executor_command_channels.remove(&name);
if self.executor_command_channels.is_empty() {
break;
}
}
}
Ok(ManagerCommands::AssignActor {
actor,
address,
parent,
}) => {
let executor_name = self.get_next_executor();
let (sender, receiver) = unbounded::<Letter>();
let address_uri = address.uri.clone();
address.set_mailbox(sender.clone());
let cell = ActorCell::new(actor, receiver, address, parent);
self.actor_registry.insert(address_uri, sender);
self.executor_command_channels
.get(&executor_name)
.unwrap()
.send(ExecutorCommands::AssignActor(cell))
.unwrap();
}
Ok(ManagerCommands::ResolveAddress {
address_uri,
return_channel,
}) => {
let mailbox_lookup = self.actor_registry.get(&address_uri);
let result = match mailbox_lookup {
Some(mailbox) => return_channel.try_send(Some(mailbox.clone())),
None => return_channel.try_send(None),
};
if result.is_err() {
warn!(
"Failed to send address resolution result on return channel: {}",
result.err().unwrap(),
);
}
}
Err(_) => {}
}
}
info!("Runtime manager shutting down");
}
fn get_next_executor(&mut self) -> String {
let mut iter = self.executor_command_channels.iter();
if self.round_robin_state >= iter.len() {
self.round_robin_state = 0;
}
iter.nth(self.round_robin_state).unwrap().0.clone()
}
}
pub struct RuntimeManagerRef {
manager_command_channel: CommandChannel<ManagerCommands>,
}
impl RuntimeManagerRef {
fn new(manager_command_channel: CommandChannel<ManagerCommands>) -> RuntimeManagerRef {
RuntimeManagerRef {
manager_command_channel,
}
}
pub(crate) fn shutdown_system(&self) {
self.manager_command_channel
.send(ManagerCommands::Shutdown)
.unwrap();
}
pub(crate) fn notify_shutdown(&self, executor_name: String) {
self.manager_command_channel
.send(ManagerCommands::ExecutorShutdown {
name: executor_name,
})
.unwrap();
}
pub(crate) fn assign_actor(
&self,
actor: Box<dyn Actor>,
address: ActorAddress,
parent: Option<ActorAddress>,
) {
self.manager_command_channel
.send(ManagerCommands::AssignActor {
actor,
address,
parent,
})
.unwrap();
}
pub(crate) fn resolve_address(&self, address: &ActorAddress) -> Option<actor::Mailbox> {
let uri = address.uri.clone();
let (sender, receiver) = bounded::<Option<actor::Mailbox>>(1);
self.manager_command_channel
.send(ManagerCommands::ResolveAddress {
address_uri: uri,
return_channel: sender,
})
.unwrap();
receiver.recv().unwrap()
}
}
enum ManagerCommands {
Shutdown,
ExecutorShutdown { name: String },
AssignActor {
actor: Box<dyn Actor>,
address: ActorAddress,
parent: Option<ActorAddress>,
},
ResolveAddress {
address_uri: Uri,
return_channel: Sender<Option<actor::Mailbox>>,
},
}