1use crate::{
2 app::App,
3 audio::AudioEngine,
4 error::IntersticeError,
5 logger::{LogLevel, LogSource, Logger},
6 network::{Network, NetworkHandle},
7 persistence::{PeerTokenStore, TableStore},
8 runtime::{Runtime, event::EventInstance, module::Module, reducer::{CompletionToken, ReducerJob}},
9};
10use interstice_abi::{ModuleSchema, NodeSchema};
11use parking_lot::Mutex;
12use std::sync::Arc;
13use std::{collections::HashSet, fs::File, path::Path, thread};
14use tokio::sync::{
15 Notify,
16 mpsc::{self, UnboundedReceiver},
17};
18use crossbeam_channel;
19use uuid::Uuid;
20
21pub type NodeId = Uuid;
22
23const REDUCER_QUEUE_CAPACITY: usize = 8_192;
27
28pub struct Node {
29 pub id: NodeId,
30 pub(crate) network_handle: NetworkHandle,
31 run_app_notify: Arc<Notify>,
32 event_receiver: UnboundedReceiver<(EventInstance, Option<CompletionToken>)>,
33 network: Network,
34 app: App,
35 runtime: Arc<Runtime>,
36 logger: Logger,
37}
38
39impl Node {
40 pub fn new(nodes_path: &Path, port: u32) -> Result<Self, IntersticeError> {
41 let id = Uuid::new_v4();
42 let data_path = nodes_path.join(id.to_string());
43 let modules_path = data_path.join("modules");
44 std::fs::create_dir_all(&data_path).expect("Should be able to create node data path");
45 std::fs::create_dir_all(&modules_path).expect("Should be able to create modules path");
46 let table_store = TableStore::new(Some(modules_path.clone()));
47
48 let address = format!("127.0.0.1:{}", port);
49
50 let (event_sender, event_receiver) = mpsc::unbounded_channel();
51
52 let logger = Logger::new(
53 File::create(data_path.join("node.log")).expect("Should be able to create log file"),
54 );
55
56 let peer_tokens = Arc::new(Mutex::new(PeerTokenStore::load_or_create(
57 data_path.join("peer_tokens.toml"),
58 )?));
59
60 let (reducer_sender, reducer_receiver) =
64 crossbeam_channel::bounded::<ReducerJob>(REDUCER_QUEUE_CAPACITY);
65
66 let network = Network::new(
67 id,
68 address.clone(),
69 event_sender.clone(),
70 reducer_sender.clone(),
71 peer_tokens,
72 logger.clone(),
73 );
74 let network_handle = network.get_handle();
75 let gpu = Arc::new(Mutex::new(None));
76 let audio_state = Arc::new(Mutex::new(
77 crate::runtime::host_calls::audio::AudioState::new(
78 crate::runtime::host_calls::audio::start_audio_thread(),
79 ),
80 ));
81 let run_app_notify = Arc::new(Notify::new());
82 let runtime = Arc::new(Runtime::new(
83 id,
84 Some(modules_path),
85 table_store,
86 event_sender.clone(),
87 network_handle.clone(),
88 audio_state,
89 gpu,
90 run_app_notify.clone(),
91 logger.clone(),
92 reducer_sender,
93 reducer_receiver,
94 )?);
95 let gpu_call_receiver = runtime.take_gpu_call_receiver();
96 let app = App::new(
97 id,
98 event_sender.clone(),
99 runtime.gpu.clone(),
100 runtime.clone(),
101 gpu_call_receiver,
102 );
103
104 let node = Self {
105 id,
106 runtime,
107 app,
108 network,
109 network_handle,
110 event_receiver,
111 run_app_notify,
112 logger,
113 };
114
115 Ok(node)
116 }
117
118 pub async fn load(nodes_path: &Path, id: NodeId, port: u32) -> Result<Self, IntersticeError> {
119 let data_path = nodes_path.join(id.to_string());
120 let address = format!("127.0.0.1:{}", port);
121 let modules_path = data_path.join("modules");
122 let table_store = TableStore::new(Some(modules_path.clone()));
123
124 let (event_sender, event_receiver) = mpsc::unbounded_channel();
125
126 let logger_file = File::options()
128 .append(true)
129 .open(data_path.join("node.log"))
130 .expect("Should be able to open log file");
131 let logger = Logger::new(logger_file);
132
133 let peer_tokens = Arc::new(Mutex::new(PeerTokenStore::load_or_create(
134 data_path.join("peer_tokens.toml"),
135 )?));
136
137 let (reducer_sender, reducer_receiver) =
138 crossbeam_channel::bounded::<ReducerJob>(REDUCER_QUEUE_CAPACITY);
139
140 let network = Network::new(
141 id,
142 address.clone(),
143 event_sender.clone(),
144 reducer_sender.clone(),
145 peer_tokens,
146 logger.clone(),
147 );
148 let network_handle = network.get_handle();
149 let gpu = Arc::new(Mutex::new(None));
150 let audio_state = Arc::new(Mutex::new(
151 crate::runtime::host_calls::audio::AudioState::new(
152 crate::runtime::host_calls::audio::start_audio_thread(),
153 ),
154 ));
155 let run_app_notify = Arc::new(Notify::new());
156 let runtime = Arc::new(Runtime::new(
157 id,
158 Some(modules_path.clone()),
159 table_store,
160 event_sender.clone(),
161 network_handle.clone(),
162 audio_state,
163 gpu,
164 run_app_notify.clone(),
165 logger.clone(),
166 reducer_sender,
167 reducer_receiver,
168 )?);
169 let gpu_call_receiver = runtime.take_gpu_call_receiver();
170 let app = App::new(
171 id,
172 event_sender.clone(),
173 runtime.gpu.clone(),
174 runtime.clone(),
175 gpu_call_receiver,
176 );
177
178 let node = Self {
179 id,
180 runtime,
181 app,
182 network,
183 network_handle,
184 event_receiver,
185 run_app_notify,
186 logger,
187 };
188
189 Ok(node)
190 }
191
192 pub fn log(&self, message: &str, source: LogSource, level: LogLevel) {
193 self.logger.log(message, source, level);
194 }
195
196 pub async fn start(self) -> Result<(), IntersticeError> {
197 let Node {
198 id,
199 runtime,
200 app,
201 mut network,
202 network_handle: _network_handle,
203 event_receiver,
204 run_app_notify,
205 logger,
206 } = self;
207
208 logger.log(
209 &format!("Starting node with ID: {}", id),
210 LogSource::Node,
211 LogLevel::Info,
212 );
213
214 network.listen()?;
216 let _net_handle = network.run();
217
218 let runtime_for_thread = runtime.clone();
219
220 thread::spawn(move || {
221 let rt = tokio::runtime::Builder::new_current_thread()
222 .enable_all()
223 .build()
224 .expect("Failed to build runtime");
225 let local = tokio::task::LocalSet::new();
226 local.block_on(&rt, async move {
227 let audio_engine = AudioEngine::new(
228 runtime_for_thread.audio_state.clone(),
229 runtime_for_thread.authority_modules.clone(),
230 runtime_for_thread.event_sender.clone(),
231 );
232 audio_engine.spawn();
233 Runtime::run(runtime_for_thread, event_receiver).await;
234 });
235 });
236
237 Node::load_modules_from_disk(runtime.clone()).await?;
240
241 run_app_notify.notified().await;
242 app.run();
243 Ok(())
244 }
245
246 pub async fn schema(&self, name: String) -> NodeSchema {
247 NodeSchema {
248 name,
249 address: self.network_handle.address.clone(),
250 modules: self
251 .runtime
252 .modules
253 .lock()
254
255 .values()
256 .map(|m| (*m.schema).clone())
257 .collect(),
258 }
259 }
260
261 pub async fn clear_logs(&mut self) -> Result<(), IntersticeError> {
262 self.runtime.persistence.clear_all()?;
263 Ok(())
264 }
265
266 pub async fn load_module_from_file<P: AsRef<Path>>(
267 &self,
268 path: P,
269 ) -> Result<ModuleSchema, IntersticeError> {
270 let module = Module::from_file(self.runtime.clone(), path.as_ref()).await?;
271 Runtime::load_module(self.runtime.clone(), module, true).await
272 }
273
274 pub async fn load_module_from_bytes(
275 &self,
276 bytes: &[u8],
277 ) -> Result<ModuleSchema, IntersticeError> {
278 let module = Module::from_bytes(self.runtime.clone(), bytes).await?;
279 Runtime::load_module(self.runtime.clone(), module, true).await
280 }
281
282 pub fn get_port(&self) -> u32 {
283 self.network_handle
284 .address
285 .split(':')
286 .last()
287 .unwrap()
288 .parse()
289 .unwrap()
290 }
291
292 async fn load_modules_from_disk(runtime: Arc<Runtime>) -> Result<(), IntersticeError> {
293 let Some(modules_path) = runtime.modules_path.clone() else {
294 return Ok(());
295 };
296
297 let mut modules_to_load = Vec::new();
298 for entry in std::fs::read_dir(&modules_path).unwrap() {
299 let entry = entry.unwrap();
300 let path = entry.path();
301 if !path.is_dir() {
302 continue;
303 }
304
305 std::fs::create_dir_all(path.join("logs")).unwrap();
306 std::fs::create_dir_all(path.join("snapshots")).unwrap();
307
308 let wasm_path = path.join("module.wasm");
309 if !wasm_path.exists() {
310 continue;
311 }
312
313 let module = Module::from_file(runtime.clone(), &wasm_path).await?;
314 modules_to_load.push((module.schema.name.clone(), module));
315 }
316
317 let mut remaining = modules_to_load;
318 let mut loaded = HashSet::new();
319 let all_names: HashSet<_> = remaining.iter().map(|(name, _)| name.clone()).collect();
320 while !remaining.is_empty() {
321 let mut progressed = false;
322 let mut next_remaining = Vec::new();
323
324 for (name, module) in remaining.into_iter() {
325 let deps = module
326 .schema
327 .module_dependencies
328 .iter()
329 .map(|d| d.module_name.clone())
330 .collect::<Vec<_>>();
331
332 if let Some(missing) = deps.iter().find(|d| !all_names.contains(*d)) {
333 return Err(IntersticeError::ModuleNotFound(
334 missing.clone(),
335 format!("Required by '{}' on node startup", name),
336 ));
337 }
338
339 if deps.iter().all(|d| loaded.contains(d)) {
340 Runtime::load_module(runtime.clone(), module, false).await?;
341 loaded.insert(name);
342 progressed = true;
343 } else {
344 next_remaining.push((name, module));
345 }
346 }
347
348 if !progressed {
349 return Err(IntersticeError::Internal(
350 "Module dependency cycle detected while loading node modules".into(),
351 ));
352 }
353
354 remaining = next_remaining;
355 }
356
357 runtime.replay()?;
358
359 Ok(())
360 }
361}