use std::alloc::System;
#[global_allocator]
static ALLOCATOR: System = System;
use std::collections::HashMap;
use async_std::sync::{Arc, Mutex};
use std::fs::File;
use std::io::{BufReader, BufWriter};
use nucleon::exports::*;
mod nucleon_manager;
pub mod config;
use config::NucleusConfig;
struct Inner {
registered_nucleons: HashMap<String, nucleon_manager::NucleonManager>,
nucleon_instances: HashMap<String, Box<dyn Nucleon>>,
tx: Sender<Message>,
}
pub struct Nucleus {
config: NucleusConfig,
inner: Arc<Mutex<Inner>>,
receive: Receiver<Message>,
}
impl Nucleus {
pub fn new(config_file: &str, default_config: Option<NucleusConfig>) -> Self {
env_logger::init();
let (tx, rx) = async_std::channel::unbounded();
let config = Self::load_config(&config_file)
.or(default_config)
.unwrap_or(NucleusConfig::new());
Self {
config,
inner: Arc::new(Mutex::new(Inner {
registered_nucleons: HashMap::new(),
nucleon_instances: HashMap::new(),
tx,
})),
receive: rx,
}
}
pub async fn init(&mut self) {
log::info!("Scanning for nucleons in {}", self.config.nucleon_path);
match std::fs::read_dir(&self.config.nucleon_path) {
Ok(files) => {
let files: Vec<String> = files
.filter_map(Result::ok)
.filter(|f| {
f.path().extension() == Some(std::ffi::OsStr::new("so"))
|| (f.path().extension() == Some(std::ffi::OsStr::new("dll")))
})
.map(|f| f.path().display().to_string())
.collect();
if files.is_empty() {
log::warn!("No nucleons found, is the nucleon path set correctly?");
} else {
for lib in files {
log::info!("Found nucleon {}", lib);
let nucleon = match nucleon_manager::NucleonManager::new(&lib) {
Ok(m) => m,
Err(e) => {
log::warn!("Failed to load nucleon: {}", e.to_string());
continue;
}
};
let info = nucleon.static_info();
log::info!("Registered new nucleon: {}", info.type_name);
let mut inner = self.inner.lock().await;
inner.registered_nucleons.insert(info.type_name, nucleon);
}
}
}
Err(e) => log::error!("Failed to open nucleon folder: {}", e.to_string()),
}
self.load_nucleons().await;
let mut inner = self.inner.lock().await;
for (_, nucleon) in &mut inner.nucleon_instances {
nucleon.init();
}
}
async fn load_nucleons(&mut self) {
let nucleons = self.config.nucleons.clone();
for (type_name, nucleon_configs) in &nucleons {
for nucleon_config in nucleon_configs {
log::info!(
"Adding nucleon {} of type {}",
nucleon_config.nucleon_name,
type_name
);
let nucleon_info = NucleonInfo {
properties: vec![],
type_name: type_name.clone(),
capabilities: vec![],
};
Self::instantiate_nucleon(&self.inner, nucleon_config, &nucleon_info)
.await
.unwrap();
}
}
}
fn load_config(path: &str) -> Option<NucleusConfig> {
if let Ok(file) = File::open(path) {
let reader = BufReader::new(file);
match serde_json::from_reader(reader) {
Ok(config) => return Some(config),
Err(e) => log::error!("Failed to parse configuration file: {}", e.to_string()),
};
}
log::info!("Using default configuration");
None
}
fn _save_config(path: &str, config: &NucleusConfig) {
let file = File::create(path).unwrap();
let writer = BufWriter::new(file);
serde_json::to_writer_pretty(writer, config).unwrap();
}
fn _add(&mut self, type_name: String, nucleon_config: &NucleonConfig) {
let mut inner = async_std::task::block_on(self.inner.lock());
let base_nucleon = if let Some(m) = inner.registered_nucleons.get(&type_name) {
m
} else {
log::error!("Module of type \"{}\" does not exist", type_name);
return;
};
if inner
.nucleon_instances
.get(&nucleon_config.nucleon_name)
.is_some()
{
log::error!(
"Module already exists with the name \"{}\"",
&nucleon_config.nucleon_name
);
return;
}
let tx = inner.tx.clone();
let instance = base_nucleon.create_nucleon(nucleon_config, &tx);
inner
.nucleon_instances
.insert(nucleon_config.nucleon_name.clone(), instance);
}
async fn instantiate_nucleon(
inner: &Arc<Mutex<Inner>>,
nucleon_config: &NucleonConfig,
nucleon_info: &NucleonInfo,
) -> Result<(), String> {
let mut inner = inner.lock().await;
let builder = inner
.registered_nucleons
.get(&nucleon_info.type_name)
.ok_or(format!(
"Nucleon of type \"{}\" does not exist",
&nucleon_info.type_name
))?;
if inner
.nucleon_instances
.get(&nucleon_config.nucleon_name)
.is_some()
{
return Err(format!(
"Nucleon with name \"{}\" already exists",
&nucleon_config.nucleon_name
));
}
let instance = builder.create_nucleon(&nucleon_config, &inner.tx);
inner
.nucleon_instances
.insert(nucleon_config.nucleon_name.clone(), instance);
Ok(())
}
pub async fn run(&mut self) {
{
let mut inner = self.inner.lock().await;
for (_, nucleon) in &mut inner.nucleon_instances {
nucleon.start();
}
}
loop {
if let Ok(msg) = self.receive.recv().await {
if let MessageType::Request(Request::System(_)) = msg.message {
let inner = self.inner.clone();
async_std::task::spawn(Self::handle_message(inner, msg));
continue;
}
async_std::task::spawn(Self::relay_message(self.inner.clone(), msg));
}
}
}
}
impl Nucleus {
async fn handle_message(inner: Arc<Mutex<Inner>>, message: Message) {
log::info!("Got system request: {:?}", message);
let reply = if let MessageType::Request(Request::System(s)) = &message.message {
match s {
SystemRequest::ListRegisteredNucleons => {
let inner = inner.lock().await;
let registered_nucleons = inner
.registered_nucleons
.iter()
.map(|(_, nucleon)| nucleon.static_info())
.collect();
SystemReply::ListRegisteredNucleons(registered_nucleons)
}
SystemRequest::ListNucleonInstances => {
let inner = inner.lock().await;
let nucleon_instances = inner
.nucleon_instances
.iter()
.map(|(_, nucleon)| nucleon.get_config())
.collect();
SystemReply::ListNucleonInstances(nucleon_instances)
}
SystemRequest::AddNucleon(nucleon_config, nucleon_info) => {
if let Err(e) =
Self::instantiate_nucleon(&inner, &nucleon_config, &nucleon_info).await
{
SystemReply::Error(e)
} else {
SystemReply::Success
}
}
SystemRequest::DeleteNucleon(name) => {
let mut inner = inner.lock().await;
if let Some((name, mut nucleon)) = inner.nucleon_instances.remove_entry(name) {
nucleon.stop();
drop(nucleon);
drop(name);
SystemReply::Success
} else {
SystemReply::Error(format!(
"Unable to delete nucleon \"{}\", it does not exist",
name
))
}
}
SystemRequest::SetNucleonState(name, state) => {
let mut inner = inner.lock().await;
if let Some(nucleon) = inner.nucleon_instances.get_mut(name) {
match state {
NucleonState::Initial => {
SystemReply::Error("Not supported".to_string());
}
NucleonState::Inited => nucleon.init(),
NucleonState::Stopped => {
nucleon.stop();
}
NucleonState::Started => {
nucleon.start();
}
}
SystemReply::Success
} else {
SystemReply::Error(format!(
"Unable change nucleon \"{}\" state to {}, it does not exist",
name, state
))
}
}
}
} else {
return;
};
let message = MessageBuilder::new()
.sender("system")
.recipient(&message.sender)
.reply(Reply::System(reply));
async_std::task::spawn(Self::relay_message(inner, message));
}
async fn relay_message(inner: Arc<Mutex<Inner>>, message: Message) {
if let Recipient::Unicast(recipient) = &message.recipient {
let inner = inner.lock().await;
if let Some(nucleon) = inner.nucleon_instances.get(recipient) {
nucleon.on_message(message);
} else {
log::error!("Failed to send message, unknown recipient {}", recipient);
}
} else {
let inner = inner.lock().await;
for (_, nucleon) in &inner.nucleon_instances {
nucleon.on_message(message.clone());
}
}
}
}
impl Drop for Nucleus {
fn drop(&mut self) {
let mut inner = async_std::task::block_on(self.inner.lock());
for (name, nucleon) in inner.nucleon_instances.drain() {
drop(nucleon);
drop(name);
}
for (type_name, base_nucleon) in inner.registered_nucleons.drain() {
drop(type_name);
drop(base_nucleon);
}
}
}