1use crate::{
2 app::App,
3 audio::AudioEngine,
4 error::IntersticeError,
5 logger::{LogLevel, LogSource, Logger},
6 network::{Network, NetworkHandle},
7 persistence::{PeerTokenStore, TableStore},
8 runtime::{Runtime, event::EventInstance, module::Module},
9};
10use interstice_abi::{ModuleSchema, NodeSchema};
11use std::sync::Arc;
12use std::{collections::HashSet, fs::File, path::Path, sync::Mutex, thread};
13use tokio::sync::{
14 Notify,
15 mpsc::{self, UnboundedReceiver},
16};
17use uuid::Uuid;
18
19pub type NodeId = Uuid;
20
21pub struct Node {
22 pub id: NodeId,
23 pub(crate) network_handle: NetworkHandle,
24 run_app_notify: Arc<Notify>,
25 event_receiver: UnboundedReceiver<EventInstance>,
26 network: Network,
27 app: App,
28 runtime: Arc<Runtime>,
29 logger: Logger,
30}
31
32impl Node {
33 pub fn new(nodes_path: &Path, port: u32) -> Result<Self, IntersticeError> {
34 let id = Uuid::new_v4();
35 let data_path = nodes_path.join(id.to_string());
36 let modules_path = data_path.join("modules");
37 std::fs::create_dir_all(&data_path).expect("Should be able to create node data path");
38 std::fs::create_dir_all(&modules_path).expect("Should be able to create modules path");
39 let table_store = TableStore::new(Some(modules_path.clone()));
40
41 let address = format!("127.0.0.1:{}", port);
42
43 let (event_sender, event_receiver) = mpsc::unbounded_channel();
44
45 let logger = Logger::new(
46 File::create(data_path.join("node.log")).expect("Should be able to create log file"),
47 );
48
49 let peer_tokens = Arc::new(Mutex::new(PeerTokenStore::load_or_create(
50 data_path.join("peer_tokens.toml"),
51 )?));
52 let network = Network::new(
53 id,
54 address.clone(),
55 event_sender.clone(),
56 peer_tokens,
57 logger.clone(),
58 );
59 let network_handle = network.get_handle();
60 let gpu = Arc::new(Mutex::new(None));
61 let audio_state = Arc::new(Mutex::new(
62 crate::runtime::host_calls::audio::AudioState::new(
63 crate::runtime::host_calls::audio::start_audio_thread(),
64 ),
65 ));
66 let run_app_notify = Arc::new(Notify::new());
67 let runtime = Arc::new(Runtime::new(
68 id,
69 Some(modules_path),
70 table_store,
71 event_sender.clone(),
72 network_handle.clone(),
73 audio_state,
74 gpu,
75 run_app_notify.clone(),
76 logger.clone(),
77 )?);
78 let gpu_call_receiver = runtime.take_gpu_call_receiver();
79 let app = App::new(
80 id,
81 event_sender.clone(),
82 runtime.gpu.clone(),
83 runtime.clone(),
84 gpu_call_receiver,
85 );
86
87 let node = Self {
88 id,
89 runtime,
90 app,
91 network,
92 network_handle,
93 event_receiver,
94 run_app_notify,
95 logger,
96 };
97
98 Ok(node)
99 }
100
101 pub async fn load(nodes_path: &Path, id: NodeId, port: u32) -> Result<Self, IntersticeError> {
102 let data_path = nodes_path.join(id.to_string());
103 let address = format!("127.0.0.1:{}", port);
104 let modules_path = data_path.join("modules");
105 let table_store = TableStore::new(Some(modules_path.clone()));
106
107 let (event_sender, event_receiver) = mpsc::unbounded_channel();
108
109 let logger_file = File::options()
111 .append(true)
112 .open(data_path.join("node.log"))
113 .expect("Should be able to open log file");
114 let logger = Logger::new(logger_file);
115
116 let peer_tokens = Arc::new(Mutex::new(PeerTokenStore::load_or_create(
117 data_path.join("peer_tokens.toml"),
118 )?));
119 let network = Network::new(
120 id,
121 address.clone(),
122 event_sender.clone(),
123 peer_tokens,
124 logger.clone(),
125 );
126 let network_handle = network.get_handle();
127 let gpu = Arc::new(Mutex::new(None));
128 let audio_state = Arc::new(Mutex::new(
129 crate::runtime::host_calls::audio::AudioState::new(
130 crate::runtime::host_calls::audio::start_audio_thread(),
131 ),
132 ));
133 let run_app_notify = Arc::new(Notify::new());
134 let runtime = Arc::new(Runtime::new(
135 id,
136 Some(modules_path.clone()),
137 table_store,
138 event_sender.clone(),
139 network_handle.clone(),
140 audio_state,
141 gpu,
142 run_app_notify.clone(),
143 logger.clone(),
144 )?);
145 let gpu_call_receiver = runtime.take_gpu_call_receiver();
146 let app = App::new(
147 id,
148 event_sender.clone(),
149 runtime.gpu.clone(),
150 runtime.clone(),
151 gpu_call_receiver,
152 );
153
154 let node = Self {
155 id,
156 runtime,
157 app,
158 network,
159 network_handle,
160 event_receiver,
161 run_app_notify,
162 logger,
163 };
164
165 Ok(node)
166 }
167
168 pub fn log(&self, message: &str, source: LogSource, level: LogLevel) {
169 self.logger.log(message, source, level);
170 }
171
172 pub async fn start(self) -> Result<(), IntersticeError> {
173 let Node {
174 id,
175 runtime,
176 app,
177 mut network,
178 network_handle: _network_handle,
179 event_receiver,
180 run_app_notify,
181 logger,
182 } = self;
183
184 logger.log(
185 &format!("Starting node with ID: {}", id),
186 LogSource::Node,
187 LogLevel::Info,
188 );
189
190 network.listen()?;
192 let _net_handle = network.run();
193
194 let runtime_for_thread = runtime.clone();
195
196 thread::spawn(move || {
197 let rt = tokio::runtime::Builder::new_current_thread()
198 .enable_all()
199 .build()
200 .expect("Failed to build runtime");
201 let local = tokio::task::LocalSet::new();
202 local.block_on(&rt, async move {
203 let audio_engine = AudioEngine::new(
204 runtime_for_thread.audio_state.clone(),
205 runtime_for_thread.authority_modules.clone(),
206 runtime_for_thread.event_sender.clone(),
207 );
208 audio_engine.spawn();
209 Runtime::run(runtime_for_thread, event_receiver).await;
210 });
211 });
212
213 Node::load_modules_from_disk(runtime.clone()).await?;
216
217 run_app_notify.notified().await;
218 app.run();
219 Ok(())
220 }
221
222 pub async fn schema(&self, name: String) -> NodeSchema {
223 NodeSchema {
224 name,
225 address: self.network_handle.address.clone(),
226 modules: self
227 .runtime
228 .modules
229 .lock()
230 .unwrap()
231 .values()
232 .map(|m| (*m.schema).clone())
233 .collect(),
234 }
235 }
236
237 pub async fn clear_logs(&mut self) -> Result<(), IntersticeError> {
238 self.runtime.persistence.clear_all()?;
239 Ok(())
240 }
241
242 pub async fn load_module_from_file<P: AsRef<Path>>(
243 &self,
244 path: P,
245 ) -> Result<ModuleSchema, IntersticeError> {
246 let module = Module::from_file(self.runtime.clone(), path.as_ref()).await?;
247 Runtime::load_module(self.runtime.clone(), module, true).await
248 }
249
250 pub async fn load_module_from_bytes(
251 &self,
252 bytes: &[u8],
253 ) -> Result<ModuleSchema, IntersticeError> {
254 let module = Module::from_bytes(self.runtime.clone(), bytes).await?;
255 Runtime::load_module(self.runtime.clone(), module, true).await
256 }
257
258 pub fn get_port(&self) -> u32 {
259 self.network_handle
260 .address
261 .split(':')
262 .last()
263 .unwrap()
264 .parse()
265 .unwrap()
266 }
267
268 async fn load_modules_from_disk(runtime: Arc<Runtime>) -> Result<(), IntersticeError> {
269 let Some(modules_path) = runtime.modules_path.clone() else {
270 return Ok(());
271 };
272
273 let mut modules_to_load = Vec::new();
274 for entry in std::fs::read_dir(&modules_path).unwrap() {
275 let entry = entry.unwrap();
276 let path = entry.path();
277 if !path.is_dir() {
278 continue;
279 }
280
281 std::fs::create_dir_all(path.join("logs")).unwrap();
282 std::fs::create_dir_all(path.join("snapshots")).unwrap();
283
284 let wasm_path = path.join("module.wasm");
285 if !wasm_path.exists() {
286 continue;
287 }
288
289 let module = Module::from_file(runtime.clone(), &wasm_path).await?;
290 modules_to_load.push((module.schema.name.clone(), module));
291 }
292
293 let mut remaining = modules_to_load;
294 let mut loaded = HashSet::new();
295 let all_names: HashSet<_> = remaining.iter().map(|(name, _)| name.clone()).collect();
296 while !remaining.is_empty() {
297 let mut progressed = false;
298 let mut next_remaining = Vec::new();
299
300 for (name, module) in remaining.into_iter() {
301 let deps = module
302 .schema
303 .module_dependencies
304 .iter()
305 .map(|d| d.module_name.clone())
306 .collect::<Vec<_>>();
307
308 if let Some(missing) = deps.iter().find(|d| !all_names.contains(*d)) {
309 return Err(IntersticeError::ModuleNotFound(
310 missing.clone(),
311 format!("Required by '{}' on node startup", name),
312 ));
313 }
314
315 if deps.iter().all(|d| loaded.contains(d)) {
316 Runtime::load_module(runtime.clone(), module, false).await?;
317 loaded.insert(name);
318 progressed = true;
319 } else {
320 next_remaining.push((name, module));
321 }
322 }
323
324 if !progressed {
325 return Err(IntersticeError::Internal(
326 "Module dependency cycle detected while loading node modules".into(),
327 ));
328 }
329
330 remaining = next_remaining;
331 }
332
333 runtime.replay()?;
334
335 Ok(())
336 }
337}