nucleus-app 0.1.1

Modular application builder using Nucleons as building blocks
Documentation
use std::alloc::System;

#[global_allocator]
static ALLOCATOR: System = System;

use std::collections::HashMap;

use async_std::sync::{Arc, Mutex};
use std::fs::File;
use std::io::{BufReader, BufWriter};

use nucleon::exports::*;

mod nucleon_manager;

/// Configuration of the Nucleus app
pub mod config;
use config::NucleusConfig;

struct Inner {
    registered_nucleons: HashMap<String, nucleon_manager::NucleonManager>,
    nucleon_instances: HashMap<String, Box<dyn Nucleon>>,
    tx: Sender<Message>,
}

/// The main Nucleus instance that manages the Nucleons
pub struct Nucleus {
    config: NucleusConfig,
    inner: Arc<Mutex<Inner>>,
    receive: Receiver<Message>,
}

impl Nucleus {
    /// Create a new Nucleus instance with the config file location and an optional default config.
    pub fn new(config_file: &str, default_config: Option<NucleusConfig>) -> Self {
        env_logger::init();

        let (tx, rx) = async_std::channel::unbounded();
        let config = Self::load_config(&config_file)
            .or(default_config)
            .unwrap_or(NucleusConfig::new());

        Self {
            config,
            inner: Arc::new(Mutex::new(Inner {
                registered_nucleons: HashMap::new(),
                nucleon_instances: HashMap::new(),
                tx,
            })),
            receive: rx,
        }
    }

    /// Used to initialize the Nucleus app. It will scan for Nucleons, instantiate them
    /// based on the configuration file and initialize all the Nucleon instances.
    pub async fn init(&mut self) {
        log::info!("Scanning for nucleons in {}", self.config.nucleon_path);
        match std::fs::read_dir(&self.config.nucleon_path) {
            Ok(files) => {
                let files: Vec<String> = files
                    .filter_map(Result::ok)
                    .filter(|f| {
                        f.path().extension() == Some(std::ffi::OsStr::new("so"))
                            || (f.path().extension() == Some(std::ffi::OsStr::new("dll")))
                    })
                    .map(|f| f.path().display().to_string())
                    .collect();

                if files.is_empty() {
                    log::warn!("No nucleons found, is the nucleon path set correctly?");
                } else {
                    for lib in files {
                        log::info!("Found nucleon {}", lib);
                        let nucleon = match nucleon_manager::NucleonManager::new(&lib) {
                            Ok(m) => m,
                            Err(e) => {
                                log::warn!("Failed to load nucleon: {}", e.to_string());
                                continue;
                            }
                        };

                        let info = nucleon.static_info();
                        log::info!("Registered new nucleon: {}", info.type_name);

                        let mut inner = self.inner.lock().await;
                        inner.registered_nucleons.insert(info.type_name, nucleon);
                    }
                }
            }
            Err(e) => log::error!("Failed to open nucleon folder: {}", e.to_string()),
        }

        self.load_nucleons().await;

        let mut inner = self.inner.lock().await;
        for (_, nucleon) in &mut inner.nucleon_instances {
            nucleon.init();
        }
    }

    async fn load_nucleons(&mut self) {
        let nucleons = self.config.nucleons.clone();
        for (type_name, nucleon_configs) in &nucleons {
            for nucleon_config in nucleon_configs {
                log::info!(
                    "Adding nucleon {} of type {}",
                    nucleon_config.nucleon_name,
                    type_name
                );
                let nucleon_info = NucleonInfo {
                    properties: vec![],
                    type_name: type_name.clone(),
                    capabilities: vec![],
                };
                Self::instantiate_nucleon(&self.inner, nucleon_config, &nucleon_info)
                    .await
                    .unwrap();
            }
        }
    }

    fn load_config(path: &str) -> Option<NucleusConfig> {
        if let Ok(file) = File::open(path) {
            let reader = BufReader::new(file);
            match serde_json::from_reader(reader) {
                Ok(config) => return Some(config),
                Err(e) => log::error!("Failed to parse configuration file: {}", e.to_string()),
            };
        }
        log::info!("Using default configuration");
        None
    }

    fn _save_config(path: &str, config: &NucleusConfig) {
        // TODO: Error handling
        let file = File::create(path).unwrap();
        let writer = BufWriter::new(file);

        serde_json::to_writer_pretty(writer, config).unwrap();
    }

    fn _add(&mut self, type_name: String, nucleon_config: &NucleonConfig) {
        let mut inner = async_std::task::block_on(self.inner.lock());
        let base_nucleon = if let Some(m) = inner.registered_nucleons.get(&type_name) {
            m
        } else {
            log::error!("Module of type \"{}\" does not exist", type_name);
            return;
        };

        if inner
            .nucleon_instances
            .get(&nucleon_config.nucleon_name)
            .is_some()
        {
            log::error!(
                "Module already exists with the name \"{}\"",
                &nucleon_config.nucleon_name
            );
            return;
        }

        let tx = inner.tx.clone();
        let instance = base_nucleon.create_nucleon(nucleon_config, &tx);

        inner
            .nucleon_instances
            .insert(nucleon_config.nucleon_name.clone(), instance);
    }

    async fn instantiate_nucleon(
        inner: &Arc<Mutex<Inner>>,
        nucleon_config: &NucleonConfig,
        nucleon_info: &NucleonInfo,
    ) -> Result<(), String> {
        let mut inner = inner.lock().await;

        let builder = inner
            .registered_nucleons
            .get(&nucleon_info.type_name)
            .ok_or(format!(
                "Nucleon of type \"{}\" does not exist",
                &nucleon_info.type_name
            ))?;

        if inner
            .nucleon_instances
            .get(&nucleon_config.nucleon_name)
            .is_some()
        {
            return Err(format!(
                "Nucleon with name \"{}\" already exists",
                &nucleon_config.nucleon_name
            ));
        }

        let instance = builder.create_nucleon(&nucleon_config, &inner.tx);

        inner
            .nucleon_instances
            .insert(nucleon_config.nucleon_name.clone(), instance);

        Ok(())
    }

    /// Main run loop that starts the application (Blocking).
    pub async fn run(&mut self) {
        {
            let mut inner = self.inner.lock().await;
            for (_, nucleon) in &mut inner.nucleon_instances {
                nucleon.start();
            }
        }

        loop {
            if let Ok(msg) = self.receive.recv().await {
                if let MessageType::Request(Request::System(_)) = msg.message {
                    let inner = self.inner.clone();
                    async_std::task::spawn(Self::handle_message(inner, msg));
                    continue;
                }

                async_std::task::spawn(Self::relay_message(self.inner.clone(), msg));
            }
        }
        //  for (_, nucleon) in &mut self.nucleons {
        //     nucleon.stop();
        // }
    }
}

// Message related impl
impl Nucleus {
    async fn handle_message(inner: Arc<Mutex<Inner>>, message: Message) {
        log::info!("Got system request: {:?}", message);
        let reply = if let MessageType::Request(Request::System(s)) = &message.message {
            match s {
                SystemRequest::ListRegisteredNucleons => {
                    let inner = inner.lock().await;
                    let registered_nucleons = inner
                        .registered_nucleons
                        .iter()
                        .map(|(_, nucleon)| nucleon.static_info())
                        .collect();
                    SystemReply::ListRegisteredNucleons(registered_nucleons)
                }
                SystemRequest::ListNucleonInstances => {
                    let inner = inner.lock().await;

                    let nucleon_instances = inner
                        .nucleon_instances
                        .iter()
                        .map(|(_, nucleon)| nucleon.get_config())
                        .collect();

                    SystemReply::ListNucleonInstances(nucleon_instances)
                }
                SystemRequest::AddNucleon(nucleon_config, nucleon_info) => {
                    if let Err(e) =
                        Self::instantiate_nucleon(&inner, &nucleon_config, &nucleon_info).await
                    {
                        SystemReply::Error(e)
                    } else {
                        SystemReply::Success
                    }
                }
                SystemRequest::DeleteNucleon(name) => {
                    let mut inner = inner.lock().await;
                    if let Some((name, mut nucleon)) = inner.nucleon_instances.remove_entry(name) {
                        nucleon.stop();
                        drop(nucleon);
                        drop(name);
                        SystemReply::Success
                    } else {
                        SystemReply::Error(format!(
                            "Unable to delete nucleon \"{}\", it does not exist",
                            name
                        ))
                    }
                }
                SystemRequest::SetNucleonState(name, state) => {
                    let mut inner = inner.lock().await;
                    if let Some(nucleon) = inner.nucleon_instances.get_mut(name) {
                        match state {
                            NucleonState::Initial => {
                                SystemReply::Error("Not supported".to_string());
                            }
                            NucleonState::Inited => nucleon.init(),
                            NucleonState::Stopped => {
                                nucleon.stop();
                            }
                            NucleonState::Started => {
                                nucleon.start();
                            }
                        }
                        SystemReply::Success
                    } else {
                        SystemReply::Error(format!(
                            "Unable change nucleon \"{}\" state to {}, it does not exist",
                            name, state
                        ))
                    }
                }
            }
        } else {
            return;
        };

        let message = MessageBuilder::new()
            .sender("system")
            .recipient(&message.sender)
            .reply(Reply::System(reply));

        async_std::task::spawn(Self::relay_message(inner, message));
    }

    async fn relay_message(inner: Arc<Mutex<Inner>>, message: Message) {
        if let Recipient::Unicast(recipient) = &message.recipient {
            let inner = inner.lock().await;
            if let Some(nucleon) = inner.nucleon_instances.get(recipient) {
                nucleon.on_message(message);
            } else {
                log::error!("Failed to send message, unknown recipient {}", recipient);
            }
        } else {
            let inner = inner.lock().await;
            for (_, nucleon) in &inner.nucleon_instances {
                nucleon.on_message(message.clone());
            }
        }
    }
}

// make sure the instances are dropped before the libraries are unloaded
impl Drop for Nucleus {
    fn drop(&mut self) {
        let mut inner = async_std::task::block_on(self.inner.lock());

        for (name, nucleon) in inner.nucleon_instances.drain() {
            drop(nucleon);
            drop(name);
        }

        for (type_name, base_nucleon) in inner.registered_nucleons.drain() {
            drop(type_name);
            drop(base_nucleon);
        }
    }
}