Skip to main content

interstice_core/
node.rs

1use crate::{
2    app::App,
3    audio::AudioEngine,
4    error::IntersticeError,
5    logger::{LogLevel, LogSource, Logger},
6    network::{Network, NetworkHandle},
7    persistence::{PeerTokenStore, TableStore},
8    runtime::{Runtime, event::EventInstance, module::Module, reducer::{CompletionToken, ReducerJob}},
9};
10use interstice_abi::{ModuleSchema, NodeSchema};
11use parking_lot::Mutex;
12use std::sync::Arc;
13use std::{collections::HashSet, fs::File, path::Path, thread};
14use tokio::sync::{
15    Notify,
16    mpsc::{self, UnboundedReceiver},
17};
18use crossbeam_channel;
19use uuid::Uuid;
20
21pub type NodeId = Uuid;
22
23/// Bounded reducer queue capacity. Limits how many remote reducer jobs can be
24/// queued before new ones are dropped, preventing OOM when clients dispatch
25/// faster than the WASM executor can process.
26const REDUCER_QUEUE_CAPACITY: usize = 8_192;
27
28pub struct Node {
29    pub id: NodeId,
30    pub(crate) network_handle: NetworkHandle,
31    run_app_notify: Arc<Notify>,
32    event_receiver: UnboundedReceiver<(EventInstance, Option<CompletionToken>)>,
33    network: Network,
34    app: App,
35    runtime: Arc<Runtime>,
36    logger: Logger,
37}
38
39impl Node {
40    pub fn new(nodes_path: &Path, port: u32) -> Result<Self, IntersticeError> {
41        let id = Uuid::new_v4();
42        let data_path = nodes_path.join(id.to_string());
43        let modules_path = data_path.join("modules");
44        std::fs::create_dir_all(&data_path).expect("Should be able to create node data path");
45        std::fs::create_dir_all(&modules_path).expect("Should be able to create modules path");
46        let table_store = TableStore::new(Some(modules_path.clone()));
47
48        let address = format!("127.0.0.1:{}", port);
49
50        let (event_sender, event_receiver) = mpsc::unbounded_channel();
51
52        let logger = Logger::new(
53            File::create(data_path.join("node.log")).expect("Should be able to create log file"),
54        );
55
56        let peer_tokens = Arc::new(Mutex::new(PeerTokenStore::load_or_create(
57            data_path.join("peer_tokens.toml"),
58        )?));
59
60        // Create bounded reducer channel first so both Network and Runtime share the
61        // same sender — remote reducer calls go directly to the executor without
62        // passing through the unbounded event channel.
63        let (reducer_sender, reducer_receiver) =
64            crossbeam_channel::bounded::<ReducerJob>(REDUCER_QUEUE_CAPACITY);
65
66        let network = Network::new(
67            id,
68            address.clone(),
69            event_sender.clone(),
70            reducer_sender.clone(),
71            peer_tokens,
72            logger.clone(),
73        );
74        let network_handle = network.get_handle();
75        let gpu = Arc::new(Mutex::new(None));
76        let audio_state = Arc::new(Mutex::new(
77            crate::runtime::host_calls::audio::AudioState::new(
78                crate::runtime::host_calls::audio::start_audio_thread(),
79            ),
80        ));
81        let run_app_notify = Arc::new(Notify::new());
82        let runtime = Arc::new(Runtime::new(
83            id,
84            Some(modules_path),
85            table_store,
86            event_sender.clone(),
87            network_handle.clone(),
88            audio_state,
89            gpu,
90            run_app_notify.clone(),
91            logger.clone(),
92            reducer_sender,
93            reducer_receiver,
94        )?);
95        let gpu_call_receiver = runtime.take_gpu_call_receiver();
96        let app = App::new(
97            id,
98            event_sender.clone(),
99            runtime.gpu.clone(),
100            runtime.clone(),
101            gpu_call_receiver,
102        );
103
104        let node = Self {
105            id,
106            runtime,
107            app,
108            network,
109            network_handle,
110            event_receiver,
111            run_app_notify,
112            logger,
113        };
114
115        Ok(node)
116    }
117
118    pub async fn load(nodes_path: &Path, id: NodeId, port: u32) -> Result<Self, IntersticeError> {
119        let data_path = nodes_path.join(id.to_string());
120        let address = format!("127.0.0.1:{}", port);
121        let modules_path = data_path.join("modules");
122        let table_store = TableStore::new(Some(modules_path.clone()));
123
124        let (event_sender, event_receiver) = mpsc::unbounded_channel();
125
126        // Open log file to append new logs on.
127        let logger_file = File::options()
128            .append(true)
129            .open(data_path.join("node.log"))
130            .expect("Should be able to open log file");
131        let logger = Logger::new(logger_file);
132
133        let peer_tokens = Arc::new(Mutex::new(PeerTokenStore::load_or_create(
134            data_path.join("peer_tokens.toml"),
135        )?));
136
137        let (reducer_sender, reducer_receiver) =
138            crossbeam_channel::bounded::<ReducerJob>(REDUCER_QUEUE_CAPACITY);
139
140        let network = Network::new(
141            id,
142            address.clone(),
143            event_sender.clone(),
144            reducer_sender.clone(),
145            peer_tokens,
146            logger.clone(),
147        );
148        let network_handle = network.get_handle();
149        let gpu = Arc::new(Mutex::new(None));
150        let audio_state = Arc::new(Mutex::new(
151            crate::runtime::host_calls::audio::AudioState::new(
152                crate::runtime::host_calls::audio::start_audio_thread(),
153            ),
154        ));
155        let run_app_notify = Arc::new(Notify::new());
156        let runtime = Arc::new(Runtime::new(
157            id,
158            Some(modules_path.clone()),
159            table_store,
160            event_sender.clone(),
161            network_handle.clone(),
162            audio_state,
163            gpu,
164            run_app_notify.clone(),
165            logger.clone(),
166            reducer_sender,
167            reducer_receiver,
168        )?);
169        let gpu_call_receiver = runtime.take_gpu_call_receiver();
170        let app = App::new(
171            id,
172            event_sender.clone(),
173            runtime.gpu.clone(),
174            runtime.clone(),
175            gpu_call_receiver,
176        );
177
178        let node = Self {
179            id,
180            runtime,
181            app,
182            network,
183            network_handle,
184            event_receiver,
185            run_app_notify,
186            logger,
187        };
188
189        Ok(node)
190    }
191
192    pub fn log(&self, message: &str, source: LogSource, level: LogLevel) {
193        self.logger.log(message, source, level);
194    }
195
196    pub async fn start(self) -> Result<(), IntersticeError> {
197        let Node {
198            id,
199            runtime,
200            app,
201            mut network,
202            network_handle: _network_handle,
203            event_receiver,
204            run_app_notify,
205            logger,
206        } = self;
207
208        logger.log(
209            &format!("Starting node with ID: {}", id),
210            LogSource::Node,
211            LogLevel::Info,
212        );
213
214        // Run network events
215        network.listen()?;
216        let _net_handle = network.run();
217
218        let runtime_for_thread = runtime.clone();
219
220        thread::spawn(move || {
221            let rt = tokio::runtime::Builder::new_current_thread()
222                .enable_all()
223                .build()
224                .expect("Failed to build runtime");
225            let local = tokio::task::LocalSet::new();
226            local.block_on(&rt, async move {
227                let audio_engine = AudioEngine::new(
228                    runtime_for_thread.audio_state.clone(),
229                    runtime_for_thread.authority_modules.clone(),
230                    runtime_for_thread.event_sender.clone(),
231                );
232                audio_engine.spawn();
233                Runtime::run(runtime_for_thread, event_receiver).await;
234            });
235        });
236
237        // Load persisted modules only after runtime loop is running so async schema
238        // responses required during module loading can be processed.
239        Node::load_modules_from_disk(runtime.clone()).await?;
240
241        run_app_notify.notified().await;
242        app.run();
243        Ok(())
244    }
245
246    pub async fn schema(&self, name: String) -> NodeSchema {
247        NodeSchema {
248            name,
249            address: self.network_handle.address.clone(),
250            modules: self
251                .runtime
252                .modules
253                .lock()
254                
255                .values()
256                .map(|m| (*m.schema).clone())
257                .collect(),
258        }
259    }
260
261    pub async fn clear_logs(&mut self) -> Result<(), IntersticeError> {
262        self.runtime.persistence.clear_all()?;
263        Ok(())
264    }
265
266    pub async fn load_module_from_file<P: AsRef<Path>>(
267        &self,
268        path: P,
269    ) -> Result<ModuleSchema, IntersticeError> {
270        let module = Module::from_file(self.runtime.clone(), path.as_ref()).await?;
271        Runtime::load_module(self.runtime.clone(), module, true).await
272    }
273
274    pub async fn load_module_from_bytes(
275        &self,
276        bytes: &[u8],
277    ) -> Result<ModuleSchema, IntersticeError> {
278        let module = Module::from_bytes(self.runtime.clone(), bytes).await?;
279        Runtime::load_module(self.runtime.clone(), module, true).await
280    }
281
282    pub fn get_port(&self) -> u32 {
283        self.network_handle
284            .address
285            .split(':')
286            .last()
287            .unwrap()
288            .parse()
289            .unwrap()
290    }
291
292    async fn load_modules_from_disk(runtime: Arc<Runtime>) -> Result<(), IntersticeError> {
293        let Some(modules_path) = runtime.modules_path.clone() else {
294            return Ok(());
295        };
296
297        let mut modules_to_load = Vec::new();
298        for entry in std::fs::read_dir(&modules_path).unwrap() {
299            let entry = entry.unwrap();
300            let path = entry.path();
301            if !path.is_dir() {
302                continue;
303            }
304
305            std::fs::create_dir_all(path.join("logs")).unwrap();
306            std::fs::create_dir_all(path.join("snapshots")).unwrap();
307
308            let wasm_path = path.join("module.wasm");
309            if !wasm_path.exists() {
310                continue;
311            }
312
313            let module = Module::from_file(runtime.clone(), &wasm_path).await?;
314            modules_to_load.push((module.schema.name.clone(), module));
315        }
316
317        let mut remaining = modules_to_load;
318        let mut loaded = HashSet::new();
319        let all_names: HashSet<_> = remaining.iter().map(|(name, _)| name.clone()).collect();
320        while !remaining.is_empty() {
321            let mut progressed = false;
322            let mut next_remaining = Vec::new();
323
324            for (name, module) in remaining.into_iter() {
325                let deps = module
326                    .schema
327                    .module_dependencies
328                    .iter()
329                    .map(|d| d.module_name.clone())
330                    .collect::<Vec<_>>();
331
332                if let Some(missing) = deps.iter().find(|d| !all_names.contains(*d)) {
333                    return Err(IntersticeError::ModuleNotFound(
334                        missing.clone(),
335                        format!("Required by '{}' on node startup", name),
336                    ));
337                }
338
339                if deps.iter().all(|d| loaded.contains(d)) {
340                    Runtime::load_module(runtime.clone(), module, false).await?;
341                    loaded.insert(name);
342                    progressed = true;
343                } else {
344                    next_remaining.push((name, module));
345                }
346            }
347
348            if !progressed {
349                return Err(IntersticeError::Internal(
350                    "Module dependency cycle detected while loading node modules".into(),
351                ));
352            }
353
354            remaining = next_remaining;
355        }
356
357        runtime.replay()?;
358
359        Ok(())
360    }
361}