Skip to main content

interstice_core/
node.rs

1use crate::{
2    app::App,
3    audio::AudioEngine,
4    error::IntersticeError,
5    logger::{LogLevel, LogSource, Logger},
6    network::{Network, NetworkHandle},
7    persistence::{PeerTokenStore, TableStore},
8    runtime::{Runtime, event::EventInstance, module::Module},
9};
10use interstice_abi::{ModuleSchema, NodeSchema};
11use std::sync::Arc;
12use std::{collections::HashSet, fs::File, path::Path, sync::Mutex, thread};
13use tokio::sync::{
14    Notify,
15    mpsc::{self, UnboundedReceiver},
16};
17use uuid::Uuid;
18
19pub type NodeId = Uuid;
20
21pub struct Node {
22    pub id: NodeId,
23    pub(crate) network_handle: NetworkHandle,
24    run_app_notify: Arc<Notify>,
25    event_receiver: UnboundedReceiver<EventInstance>,
26    network: Network,
27    app: App,
28    runtime: Arc<Runtime>,
29    logger: Logger,
30}
31
32impl Node {
33    pub fn new(nodes_path: &Path, port: u32) -> Result<Self, IntersticeError> {
34        let id = Uuid::new_v4();
35        let data_path = nodes_path.join(id.to_string());
36        let modules_path = data_path.join("modules");
37        std::fs::create_dir_all(&data_path).expect("Should be able to create node data path");
38        std::fs::create_dir_all(&modules_path).expect("Should be able to create modules path");
39        let table_store = TableStore::new(Some(modules_path.clone()));
40
41        let address = format!("127.0.0.1:{}", port);
42
43        let (event_sender, event_receiver) = mpsc::unbounded_channel();
44
45        let logger = Logger::new(
46            File::create(data_path.join("node.log")).expect("Should be able to create log file"),
47        );
48
49        let peer_tokens = Arc::new(Mutex::new(PeerTokenStore::load_or_create(
50            data_path.join("peer_tokens.toml"),
51        )?));
52        let network = Network::new(
53            id,
54            address.clone(),
55            event_sender.clone(),
56            peer_tokens,
57            logger.clone(),
58        );
59        let network_handle = network.get_handle();
60        let gpu = Arc::new(Mutex::new(None));
61        let audio_state = Arc::new(Mutex::new(
62            crate::runtime::host_calls::audio::AudioState::new(
63                crate::runtime::host_calls::audio::start_audio_thread(),
64            ),
65        ));
66        let run_app_notify = Arc::new(Notify::new());
67        let runtime = Arc::new(Runtime::new(
68            id,
69            Some(modules_path),
70            table_store,
71            event_sender.clone(),
72            network_handle.clone(),
73            audio_state,
74            gpu,
75            run_app_notify.clone(),
76            logger.clone(),
77        )?);
78        let gpu_call_receiver = runtime.take_gpu_call_receiver();
79        let app = App::new(
80            id,
81            event_sender.clone(),
82            runtime.gpu.clone(),
83            runtime.clone(),
84            gpu_call_receiver,
85        );
86
87        let node = Self {
88            id,
89            runtime,
90            app,
91            network,
92            network_handle,
93            event_receiver,
94            run_app_notify,
95            logger,
96        };
97
98        Ok(node)
99    }
100
101    pub async fn load(nodes_path: &Path, id: NodeId, port: u32) -> Result<Self, IntersticeError> {
102        let data_path = nodes_path.join(id.to_string());
103        let address = format!("127.0.0.1:{}", port);
104        let modules_path = data_path.join("modules");
105        let table_store = TableStore::new(Some(modules_path.clone()));
106
107        let (event_sender, event_receiver) = mpsc::unbounded_channel();
108
109        // Open log file to append new logs on.
110        let logger_file = File::options()
111            .append(true)
112            .open(data_path.join("node.log"))
113            .expect("Should be able to open log file");
114        let logger = Logger::new(logger_file);
115
116        let peer_tokens = Arc::new(Mutex::new(PeerTokenStore::load_or_create(
117            data_path.join("peer_tokens.toml"),
118        )?));
119        let network = Network::new(
120            id,
121            address.clone(),
122            event_sender.clone(),
123            peer_tokens,
124            logger.clone(),
125        );
126        let network_handle = network.get_handle();
127        let gpu = Arc::new(Mutex::new(None));
128        let audio_state = Arc::new(Mutex::new(
129            crate::runtime::host_calls::audio::AudioState::new(
130                crate::runtime::host_calls::audio::start_audio_thread(),
131            ),
132        ));
133        let run_app_notify = Arc::new(Notify::new());
134        let runtime = Arc::new(Runtime::new(
135            id,
136            Some(modules_path.clone()),
137            table_store,
138            event_sender.clone(),
139            network_handle.clone(),
140            audio_state,
141            gpu,
142            run_app_notify.clone(),
143            logger.clone(),
144        )?);
145        let gpu_call_receiver = runtime.take_gpu_call_receiver();
146        let app = App::new(
147            id,
148            event_sender.clone(),
149            runtime.gpu.clone(),
150            runtime.clone(),
151            gpu_call_receiver,
152        );
153
154        let node = Self {
155            id,
156            runtime,
157            app,
158            network,
159            network_handle,
160            event_receiver,
161            run_app_notify,
162            logger,
163        };
164
165        Ok(node)
166    }
167
168    pub fn log(&self, message: &str, source: LogSource, level: LogLevel) {
169        self.logger.log(message, source, level);
170    }
171
172    pub async fn start(self) -> Result<(), IntersticeError> {
173        let Node {
174            id,
175            runtime,
176            app,
177            mut network,
178            network_handle: _network_handle,
179            event_receiver,
180            run_app_notify,
181            logger,
182        } = self;
183
184        logger.log(
185            &format!("Starting node with ID: {}", id),
186            LogSource::Node,
187            LogLevel::Info,
188        );
189
190        // Run network events
191        network.listen()?;
192        let _net_handle = network.run();
193
194        let runtime_for_thread = runtime.clone();
195
196        thread::spawn(move || {
197            let rt = tokio::runtime::Builder::new_current_thread()
198                .enable_all()
199                .build()
200                .expect("Failed to build runtime");
201            let local = tokio::task::LocalSet::new();
202            local.block_on(&rt, async move {
203                let audio_engine = AudioEngine::new(
204                    runtime_for_thread.audio_state.clone(),
205                    runtime_for_thread.authority_modules.clone(),
206                    runtime_for_thread.event_sender.clone(),
207                );
208                audio_engine.spawn();
209                Runtime::run(runtime_for_thread, event_receiver).await;
210            });
211        });
212
213        // Load persisted modules only after runtime loop is running so async schema
214        // responses required during module loading can be processed.
215        Node::load_modules_from_disk(runtime.clone()).await?;
216
217        run_app_notify.notified().await;
218        app.run();
219        Ok(())
220    }
221
222    pub async fn schema(&self, name: String) -> NodeSchema {
223        NodeSchema {
224            name,
225            address: self.network_handle.address.clone(),
226            modules: self
227                .runtime
228                .modules
229                .lock()
230                .unwrap()
231                .values()
232                .map(|m| (*m.schema).clone())
233                .collect(),
234        }
235    }
236
237    pub async fn clear_logs(&mut self) -> Result<(), IntersticeError> {
238        self.runtime.persistence.clear_all()?;
239        Ok(())
240    }
241
242    pub async fn load_module_from_file<P: AsRef<Path>>(
243        &self,
244        path: P,
245    ) -> Result<ModuleSchema, IntersticeError> {
246        let module = Module::from_file(self.runtime.clone(), path.as_ref()).await?;
247        Runtime::load_module(self.runtime.clone(), module, true).await
248    }
249
250    pub async fn load_module_from_bytes(
251        &self,
252        bytes: &[u8],
253    ) -> Result<ModuleSchema, IntersticeError> {
254        let module = Module::from_bytes(self.runtime.clone(), bytes).await?;
255        Runtime::load_module(self.runtime.clone(), module, true).await
256    }
257
258    pub fn get_port(&self) -> u32 {
259        self.network_handle
260            .address
261            .split(':')
262            .last()
263            .unwrap()
264            .parse()
265            .unwrap()
266    }
267
268    async fn load_modules_from_disk(runtime: Arc<Runtime>) -> Result<(), IntersticeError> {
269        let Some(modules_path) = runtime.modules_path.clone() else {
270            return Ok(());
271        };
272
273        let mut modules_to_load = Vec::new();
274        for entry in std::fs::read_dir(&modules_path).unwrap() {
275            let entry = entry.unwrap();
276            let path = entry.path();
277            if !path.is_dir() {
278                continue;
279            }
280
281            std::fs::create_dir_all(path.join("logs")).unwrap();
282            std::fs::create_dir_all(path.join("snapshots")).unwrap();
283
284            let wasm_path = path.join("module.wasm");
285            if !wasm_path.exists() {
286                continue;
287            }
288
289            let module = Module::from_file(runtime.clone(), &wasm_path).await?;
290            modules_to_load.push((module.schema.name.clone(), module));
291        }
292
293        let mut remaining = modules_to_load;
294        let mut loaded = HashSet::new();
295        let all_names: HashSet<_> = remaining.iter().map(|(name, _)| name.clone()).collect();
296        while !remaining.is_empty() {
297            let mut progressed = false;
298            let mut next_remaining = Vec::new();
299
300            for (name, module) in remaining.into_iter() {
301                let deps = module
302                    .schema
303                    .module_dependencies
304                    .iter()
305                    .map(|d| d.module_name.clone())
306                    .collect::<Vec<_>>();
307
308                if let Some(missing) = deps.iter().find(|d| !all_names.contains(*d)) {
309                    return Err(IntersticeError::ModuleNotFound(
310                        missing.clone(),
311                        format!("Required by '{}' on node startup", name),
312                    ));
313                }
314
315                if deps.iter().all(|d| loaded.contains(d)) {
316                    Runtime::load_module(runtime.clone(), module, false).await?;
317                    loaded.insert(name);
318                    progressed = true;
319                } else {
320                    next_remaining.push((name, module));
321                }
322            }
323
324            if !progressed {
325                return Err(IntersticeError::Internal(
326                    "Module dependency cycle detected while loading node modules".into(),
327                ));
328            }
329
330            remaining = next_remaining;
331        }
332
333        runtime.replay()?;
334
335        Ok(())
336    }
337}