nucleus_app/
lib.rs

1use std::alloc::System;
2
3#[global_allocator]
4static ALLOCATOR: System = System;
5
6use std::collections::HashMap;
7
8use async_std::sync::{Arc, Mutex};
9use std::fs::File;
10use std::io::{BufReader, BufWriter};
11
12use nucleon::exports::*;
13
14mod nucleon_manager;
15
16/// Configuration of the Nucleus app
17pub mod config;
18use config::NucleusConfig;
19
20struct Inner {
21    registered_nucleons: HashMap<String, nucleon_manager::NucleonManager>,
22    nucleon_instances: HashMap<String, Box<dyn Nucleon>>,
23    tx: Sender<Message>,
24}
25
26/// The main Nucleus instance that manages the Nucleons
27pub struct Nucleus {
28    config: NucleusConfig,
29    inner: Arc<Mutex<Inner>>,
30    receive: Receiver<Message>,
31}
32
33impl Nucleus {
34    /// Create a new Nucleus instance with the config file location and an optional default config.
35    pub fn new(config_file: &str, default_config: Option<NucleusConfig>) -> Self {
36        env_logger::init();
37
38        let (tx, rx) = async_std::channel::unbounded();
39        let config = Self::load_config(&config_file)
40            .or(default_config)
41            .unwrap_or(NucleusConfig::new());
42
43        Self {
44            config,
45            inner: Arc::new(Mutex::new(Inner {
46                registered_nucleons: HashMap::new(),
47                nucleon_instances: HashMap::new(),
48                tx,
49            })),
50            receive: rx,
51        }
52    }
53
54    /// Used to initialize the Nucleus app. It will scan for Nucleons, instantiate them
55    /// based on the configuration file and initialize all the Nucleon instances.
56    pub async fn init(&mut self) {
57        log::info!("Scanning for nucleons in {}", self.config.nucleon_path);
58        match std::fs::read_dir(&self.config.nucleon_path) {
59            Ok(files) => {
60                let files: Vec<String> = files
61                    .filter_map(Result::ok)
62                    .filter(|f| {
63                        f.path().extension() == Some(std::ffi::OsStr::new("so"))
64                            || (f.path().extension() == Some(std::ffi::OsStr::new("dll")))
65                    })
66                    .map(|f| f.path().display().to_string())
67                    .collect();
68
69                if files.is_empty() {
70                    log::warn!("No nucleons found, is the nucleon path set correctly?");
71                } else {
72                    for lib in files {
73                        log::info!("Found nucleon {}", lib);
74                        let nucleon = match nucleon_manager::NucleonManager::new(&lib) {
75                            Ok(m) => m,
76                            Err(e) => {
77                                log::warn!("Failed to load nucleon: {}", e.to_string());
78                                continue;
79                            }
80                        };
81
82                        let info = nucleon.static_info();
83                        log::info!("Registered new nucleon: {}", info.type_name);
84
85                        let mut inner = self.inner.lock().await;
86                        inner.registered_nucleons.insert(info.type_name, nucleon);
87                    }
88                }
89            }
90            Err(e) => log::error!("Failed to open nucleon folder: {}", e.to_string()),
91        }
92
93        self.load_nucleons().await;
94
95        let mut inner = self.inner.lock().await;
96        for (_, nucleon) in &mut inner.nucleon_instances {
97            nucleon.init();
98        }
99    }
100
101    async fn load_nucleons(&mut self) {
102        let nucleons = self.config.nucleons.clone();
103        for (type_name, nucleon_configs) in &nucleons {
104            for nucleon_config in nucleon_configs {
105                log::info!(
106                    "Adding nucleon {} of type {}",
107                    nucleon_config.nucleon_name,
108                    type_name
109                );
110                let nucleon_info = NucleonInfo {
111                    properties: vec![],
112                    type_name: type_name.clone(),
113                    capabilities: vec![],
114                };
115                Self::instantiate_nucleon(&self.inner, nucleon_config, &nucleon_info)
116                    .await
117                    .unwrap();
118            }
119        }
120    }
121
122    fn load_config(path: &str) -> Option<NucleusConfig> {
123        if let Ok(file) = File::open(path) {
124            let reader = BufReader::new(file);
125            match serde_json::from_reader(reader) {
126                Ok(config) => return Some(config),
127                Err(e) => log::error!("Failed to parse configuration file: {}", e.to_string()),
128            };
129        }
130        log::info!("Using default configuration");
131        None
132    }
133
134    fn _save_config(path: &str, config: &NucleusConfig) {
135        // TODO: Error handling
136        let file = File::create(path).unwrap();
137        let writer = BufWriter::new(file);
138
139        serde_json::to_writer_pretty(writer, config).unwrap();
140    }
141
142    fn _add(&mut self, type_name: String, nucleon_config: &NucleonConfig) {
143        let mut inner = async_std::task::block_on(self.inner.lock());
144        let base_nucleon = if let Some(m) = inner.registered_nucleons.get(&type_name) {
145            m
146        } else {
147            log::error!("Module of type \"{}\" does not exist", type_name);
148            return;
149        };
150
151        if inner
152            .nucleon_instances
153            .get(&nucleon_config.nucleon_name)
154            .is_some()
155        {
156            log::error!(
157                "Module already exists with the name \"{}\"",
158                &nucleon_config.nucleon_name
159            );
160            return;
161        }
162
163        let tx = inner.tx.clone();
164        let instance = base_nucleon.create_nucleon(nucleon_config, &tx);
165
166        inner
167            .nucleon_instances
168            .insert(nucleon_config.nucleon_name.clone(), instance);
169    }
170
171    async fn instantiate_nucleon(
172        inner: &Arc<Mutex<Inner>>,
173        nucleon_config: &NucleonConfig,
174        nucleon_info: &NucleonInfo,
175    ) -> Result<(), String> {
176        let mut inner = inner.lock().await;
177
178        let builder = inner
179            .registered_nucleons
180            .get(&nucleon_info.type_name)
181            .ok_or(format!(
182                "Nucleon of type \"{}\" does not exist",
183                &nucleon_info.type_name
184            ))?;
185
186        if inner
187            .nucleon_instances
188            .get(&nucleon_config.nucleon_name)
189            .is_some()
190        {
191            return Err(format!(
192                "Nucleon with name \"{}\" already exists",
193                &nucleon_config.nucleon_name
194            ));
195        }
196
197        let instance = builder.create_nucleon(&nucleon_config, &inner.tx);
198
199        inner
200            .nucleon_instances
201            .insert(nucleon_config.nucleon_name.clone(), instance);
202
203        Ok(())
204    }
205
206    /// Main run loop that starts the application (Blocking).
207    pub async fn run(&mut self) {
208        {
209            let mut inner = self.inner.lock().await;
210            for (_, nucleon) in &mut inner.nucleon_instances {
211                nucleon.start();
212            }
213        }
214
215        loop {
216            if let Ok(msg) = self.receive.recv().await {
217                if let MessageType::Request(Request::System(_)) = msg.message {
218                    let inner = self.inner.clone();
219                    async_std::task::spawn(Self::handle_message(inner, msg));
220                    continue;
221                }
222
223                async_std::task::spawn(Self::relay_message(self.inner.clone(), msg));
224            }
225        }
226        //  for (_, nucleon) in &mut self.nucleons {
227        //     nucleon.stop();
228        // }
229    }
230}
231
232// Message related impl
233impl Nucleus {
234    async fn handle_message(inner: Arc<Mutex<Inner>>, message: Message) {
235        log::info!("Got system request: {:?}", message);
236        let reply = if let MessageType::Request(Request::System(s)) = &message.message {
237            match s {
238                SystemRequest::ListRegisteredNucleons => {
239                    let inner = inner.lock().await;
240                    let registered_nucleons = inner
241                        .registered_nucleons
242                        .iter()
243                        .map(|(_, nucleon)| nucleon.static_info())
244                        .collect();
245                    SystemReply::ListRegisteredNucleons(registered_nucleons)
246                }
247                SystemRequest::ListNucleonInstances => {
248                    let inner = inner.lock().await;
249
250                    let nucleon_instances = inner
251                        .nucleon_instances
252                        .iter()
253                        .map(|(_, nucleon)| nucleon.get_config())
254                        .collect();
255
256                    SystemReply::ListNucleonInstances(nucleon_instances)
257                }
258                SystemRequest::AddNucleon(nucleon_config, nucleon_info) => {
259                    if let Err(e) =
260                        Self::instantiate_nucleon(&inner, &nucleon_config, &nucleon_info).await
261                    {
262                        SystemReply::Error(e)
263                    } else {
264                        SystemReply::Success
265                    }
266                }
267                SystemRequest::DeleteNucleon(name) => {
268                    let mut inner = inner.lock().await;
269                    if let Some((name, mut nucleon)) = inner.nucleon_instances.remove_entry(name) {
270                        nucleon.stop();
271                        drop(nucleon);
272                        drop(name);
273                        SystemReply::Success
274                    } else {
275                        SystemReply::Error(format!(
276                            "Unable to delete nucleon \"{}\", it does not exist",
277                            name
278                        ))
279                    }
280                }
281                SystemRequest::SetNucleonState(name, state) => {
282                    let mut inner = inner.lock().await;
283                    if let Some(nucleon) = inner.nucleon_instances.get_mut(name) {
284                        match state {
285                            NucleonState::Initial => {
286                                SystemReply::Error("Not supported".to_string());
287                            }
288                            NucleonState::Inited => nucleon.init(),
289                            NucleonState::Stopped => {
290                                nucleon.stop();
291                            }
292                            NucleonState::Started => {
293                                nucleon.start();
294                            }
295                        }
296                        SystemReply::Success
297                    } else {
298                        SystemReply::Error(format!(
299                            "Unable change nucleon \"{}\" state to {}, it does not exist",
300                            name, state
301                        ))
302                    }
303                }
304            }
305        } else {
306            return;
307        };
308
309        let message = MessageBuilder::new()
310            .sender("system")
311            .recipient(&message.sender)
312            .reply(Reply::System(reply));
313
314        async_std::task::spawn(Self::relay_message(inner, message));
315    }
316
317    async fn relay_message(inner: Arc<Mutex<Inner>>, message: Message) {
318        if let Recipient::Unicast(recipient) = &message.recipient {
319            let inner = inner.lock().await;
320            if let Some(nucleon) = inner.nucleon_instances.get(recipient) {
321                nucleon.on_message(message);
322            } else {
323                log::error!("Failed to send message, unknown recipient {}", recipient);
324            }
325        } else {
326            let inner = inner.lock().await;
327            for (_, nucleon) in &inner.nucleon_instances {
328                nucleon.on_message(message.clone());
329            }
330        }
331    }
332}
333
334// make sure the instances are dropped before the libraries are unloaded
335impl Drop for Nucleus {
336    fn drop(&mut self) {
337        let mut inner = async_std::task::block_on(self.inner.lock());
338
339        for (name, nucleon) in inner.nucleon_instances.drain() {
340            drop(nucleon);
341            drop(name);
342        }
343
344        for (type_name, base_nucleon) in inner.registered_nucleons.drain() {
345            drop(type_name);
346            drop(base_nucleon);
347        }
348    }
349}