1use std::alloc::System;
2
3#[global_allocator]
4static ALLOCATOR: System = System;
5
6use std::collections::HashMap;
7
8use async_std::sync::{Arc, Mutex};
9use std::fs::File;
10use std::io::{BufReader, BufWriter};
11
12use nucleon::exports::*;
13
14mod nucleon_manager;
15
16pub mod config;
18use config::NucleusConfig;
19
20struct Inner {
21 registered_nucleons: HashMap<String, nucleon_manager::NucleonManager>,
22 nucleon_instances: HashMap<String, Box<dyn Nucleon>>,
23 tx: Sender<Message>,
24}
25
26pub struct Nucleus {
28 config: NucleusConfig,
29 inner: Arc<Mutex<Inner>>,
30 receive: Receiver<Message>,
31}
32
33impl Nucleus {
34 pub fn new(config_file: &str, default_config: Option<NucleusConfig>) -> Self {
36 env_logger::init();
37
38 let (tx, rx) = async_std::channel::unbounded();
39 let config = Self::load_config(&config_file)
40 .or(default_config)
41 .unwrap_or(NucleusConfig::new());
42
43 Self {
44 config,
45 inner: Arc::new(Mutex::new(Inner {
46 registered_nucleons: HashMap::new(),
47 nucleon_instances: HashMap::new(),
48 tx,
49 })),
50 receive: rx,
51 }
52 }
53
54 pub async fn init(&mut self) {
57 log::info!("Scanning for nucleons in {}", self.config.nucleon_path);
58 match std::fs::read_dir(&self.config.nucleon_path) {
59 Ok(files) => {
60 let files: Vec<String> = files
61 .filter_map(Result::ok)
62 .filter(|f| {
63 f.path().extension() == Some(std::ffi::OsStr::new("so"))
64 || (f.path().extension() == Some(std::ffi::OsStr::new("dll")))
65 })
66 .map(|f| f.path().display().to_string())
67 .collect();
68
69 if files.is_empty() {
70 log::warn!("No nucleons found, is the nucleon path set correctly?");
71 } else {
72 for lib in files {
73 log::info!("Found nucleon {}", lib);
74 let nucleon = match nucleon_manager::NucleonManager::new(&lib) {
75 Ok(m) => m,
76 Err(e) => {
77 log::warn!("Failed to load nucleon: {}", e.to_string());
78 continue;
79 }
80 };
81
82 let info = nucleon.static_info();
83 log::info!("Registered new nucleon: {}", info.type_name);
84
85 let mut inner = self.inner.lock().await;
86 inner.registered_nucleons.insert(info.type_name, nucleon);
87 }
88 }
89 }
90 Err(e) => log::error!("Failed to open nucleon folder: {}", e.to_string()),
91 }
92
93 self.load_nucleons().await;
94
95 let mut inner = self.inner.lock().await;
96 for (_, nucleon) in &mut inner.nucleon_instances {
97 nucleon.init();
98 }
99 }
100
101 async fn load_nucleons(&mut self) {
102 let nucleons = self.config.nucleons.clone();
103 for (type_name, nucleon_configs) in &nucleons {
104 for nucleon_config in nucleon_configs {
105 log::info!(
106 "Adding nucleon {} of type {}",
107 nucleon_config.nucleon_name,
108 type_name
109 );
110 let nucleon_info = NucleonInfo {
111 properties: vec![],
112 type_name: type_name.clone(),
113 capabilities: vec![],
114 };
115 Self::instantiate_nucleon(&self.inner, nucleon_config, &nucleon_info)
116 .await
117 .unwrap();
118 }
119 }
120 }
121
122 fn load_config(path: &str) -> Option<NucleusConfig> {
123 if let Ok(file) = File::open(path) {
124 let reader = BufReader::new(file);
125 match serde_json::from_reader(reader) {
126 Ok(config) => return Some(config),
127 Err(e) => log::error!("Failed to parse configuration file: {}", e.to_string()),
128 };
129 }
130 log::info!("Using default configuration");
131 None
132 }
133
134 fn _save_config(path: &str, config: &NucleusConfig) {
135 let file = File::create(path).unwrap();
137 let writer = BufWriter::new(file);
138
139 serde_json::to_writer_pretty(writer, config).unwrap();
140 }
141
142 fn _add(&mut self, type_name: String, nucleon_config: &NucleonConfig) {
143 let mut inner = async_std::task::block_on(self.inner.lock());
144 let base_nucleon = if let Some(m) = inner.registered_nucleons.get(&type_name) {
145 m
146 } else {
147 log::error!("Module of type \"{}\" does not exist", type_name);
148 return;
149 };
150
151 if inner
152 .nucleon_instances
153 .get(&nucleon_config.nucleon_name)
154 .is_some()
155 {
156 log::error!(
157 "Module already exists with the name \"{}\"",
158 &nucleon_config.nucleon_name
159 );
160 return;
161 }
162
163 let tx = inner.tx.clone();
164 let instance = base_nucleon.create_nucleon(nucleon_config, &tx);
165
166 inner
167 .nucleon_instances
168 .insert(nucleon_config.nucleon_name.clone(), instance);
169 }
170
171 async fn instantiate_nucleon(
172 inner: &Arc<Mutex<Inner>>,
173 nucleon_config: &NucleonConfig,
174 nucleon_info: &NucleonInfo,
175 ) -> Result<(), String> {
176 let mut inner = inner.lock().await;
177
178 let builder = inner
179 .registered_nucleons
180 .get(&nucleon_info.type_name)
181 .ok_or(format!(
182 "Nucleon of type \"{}\" does not exist",
183 &nucleon_info.type_name
184 ))?;
185
186 if inner
187 .nucleon_instances
188 .get(&nucleon_config.nucleon_name)
189 .is_some()
190 {
191 return Err(format!(
192 "Nucleon with name \"{}\" already exists",
193 &nucleon_config.nucleon_name
194 ));
195 }
196
197 let instance = builder.create_nucleon(&nucleon_config, &inner.tx);
198
199 inner
200 .nucleon_instances
201 .insert(nucleon_config.nucleon_name.clone(), instance);
202
203 Ok(())
204 }
205
206 pub async fn run(&mut self) {
208 {
209 let mut inner = self.inner.lock().await;
210 for (_, nucleon) in &mut inner.nucleon_instances {
211 nucleon.start();
212 }
213 }
214
215 loop {
216 if let Ok(msg) = self.receive.recv().await {
217 if let MessageType::Request(Request::System(_)) = msg.message {
218 let inner = self.inner.clone();
219 async_std::task::spawn(Self::handle_message(inner, msg));
220 continue;
221 }
222
223 async_std::task::spawn(Self::relay_message(self.inner.clone(), msg));
224 }
225 }
226 }
230}
231
232impl Nucleus {
234 async fn handle_message(inner: Arc<Mutex<Inner>>, message: Message) {
235 log::info!("Got system request: {:?}", message);
236 let reply = if let MessageType::Request(Request::System(s)) = &message.message {
237 match s {
238 SystemRequest::ListRegisteredNucleons => {
239 let inner = inner.lock().await;
240 let registered_nucleons = inner
241 .registered_nucleons
242 .iter()
243 .map(|(_, nucleon)| nucleon.static_info())
244 .collect();
245 SystemReply::ListRegisteredNucleons(registered_nucleons)
246 }
247 SystemRequest::ListNucleonInstances => {
248 let inner = inner.lock().await;
249
250 let nucleon_instances = inner
251 .nucleon_instances
252 .iter()
253 .map(|(_, nucleon)| nucleon.get_config())
254 .collect();
255
256 SystemReply::ListNucleonInstances(nucleon_instances)
257 }
258 SystemRequest::AddNucleon(nucleon_config, nucleon_info) => {
259 if let Err(e) =
260 Self::instantiate_nucleon(&inner, &nucleon_config, &nucleon_info).await
261 {
262 SystemReply::Error(e)
263 } else {
264 SystemReply::Success
265 }
266 }
267 SystemRequest::DeleteNucleon(name) => {
268 let mut inner = inner.lock().await;
269 if let Some((name, mut nucleon)) = inner.nucleon_instances.remove_entry(name) {
270 nucleon.stop();
271 drop(nucleon);
272 drop(name);
273 SystemReply::Success
274 } else {
275 SystemReply::Error(format!(
276 "Unable to delete nucleon \"{}\", it does not exist",
277 name
278 ))
279 }
280 }
281 SystemRequest::SetNucleonState(name, state) => {
282 let mut inner = inner.lock().await;
283 if let Some(nucleon) = inner.nucleon_instances.get_mut(name) {
284 match state {
285 NucleonState::Initial => {
286 SystemReply::Error("Not supported".to_string());
287 }
288 NucleonState::Inited => nucleon.init(),
289 NucleonState::Stopped => {
290 nucleon.stop();
291 }
292 NucleonState::Started => {
293 nucleon.start();
294 }
295 }
296 SystemReply::Success
297 } else {
298 SystemReply::Error(format!(
299 "Unable change nucleon \"{}\" state to {}, it does not exist",
300 name, state
301 ))
302 }
303 }
304 }
305 } else {
306 return;
307 };
308
309 let message = MessageBuilder::new()
310 .sender("system")
311 .recipient(&message.sender)
312 .reply(Reply::System(reply));
313
314 async_std::task::spawn(Self::relay_message(inner, message));
315 }
316
317 async fn relay_message(inner: Arc<Mutex<Inner>>, message: Message) {
318 if let Recipient::Unicast(recipient) = &message.recipient {
319 let inner = inner.lock().await;
320 if let Some(nucleon) = inner.nucleon_instances.get(recipient) {
321 nucleon.on_message(message);
322 } else {
323 log::error!("Failed to send message, unknown recipient {}", recipient);
324 }
325 } else {
326 let inner = inner.lock().await;
327 for (_, nucleon) in &inner.nucleon_instances {
328 nucleon.on_message(message.clone());
329 }
330 }
331 }
332}
333
334impl Drop for Nucleus {
336 fn drop(&mut self) {
337 let mut inner = async_std::task::block_on(self.inner.lock());
338
339 for (name, nucleon) in inner.nucleon_instances.drain() {
340 drop(nucleon);
341 drop(name);
342 }
343
344 for (type_name, base_nucleon) in inner.registered_nucleons.drain() {
345 drop(type_name);
346 drop(base_nucleon);
347 }
348 }
349}