1use crate::{
2 audio::AudioEngine,
3 app::App,
4 error::IntersticeError,
5 logger::{LogLevel, LogSource, Logger},
6 network::{Network, NetworkHandle},
7 persistence::{PeerTokenStore, TableStore},
8 runtime::{Runtime, event::EventInstance, module::Module},
9};
10use interstice_abi::{ModuleSchema, NodeSchema};
11use std::sync::Arc;
12use std::{fs::File, path::Path, sync::Mutex, thread};
13use tokio::sync::{
14 Notify,
15 mpsc::{self, UnboundedReceiver},
16};
17use uuid::Uuid;
18
19pub type NodeId = Uuid;
20
21pub struct Node {
22 pub id: NodeId,
23 pub(crate) network_handle: NetworkHandle,
24 run_app_notify: Arc<Notify>,
25 event_receiver: UnboundedReceiver<EventInstance>,
26 network: Network,
27 app: App,
28 runtime: Arc<Runtime>,
29 logger: Logger,
30}
31
32impl Node {
33 pub fn new(nodes_path: &Path, port: u32) -> Result<Self, IntersticeError> {
34 let id = Uuid::new_v4();
35 let data_path = nodes_path.join(id.to_string());
36 let modules_path = data_path.join("modules");
37 std::fs::create_dir_all(&data_path).expect("Should be able to create node data path");
38 std::fs::create_dir_all(&modules_path).expect("Should be able to create modules path");
39 let table_store = TableStore::new(Some(modules_path.clone()));
40
41 let address = format!("127.0.0.1:{}", port);
42
43 let (event_sender, event_receiver) = mpsc::unbounded_channel();
44
45 let logger = Logger::new(
46 File::create(data_path.join("node.log")).expect("Should be able to create log file"),
47 );
48
49 let peer_tokens = Arc::new(Mutex::new(PeerTokenStore::load_or_create(
50 data_path.join("peer_tokens.toml"),
51 )?));
52 let network = Network::new(
53 id,
54 address.clone(),
55 event_sender.clone(),
56 peer_tokens,
57 logger.clone(),
58 );
59 let network_handle = network.get_handle();
60 let gpu = Arc::new(Mutex::new(None));
61 let audio_state = Arc::new(Mutex::new(
62 crate::runtime::host_calls::audio::AudioState::new(
63 crate::runtime::host_calls::audio::start_audio_thread(),
64 ),
65 ));
66 let run_app_notify = Arc::new(Notify::new());
67 let runtime = Arc::new(Runtime::new(
68 Some(modules_path),
69 table_store,
70 event_sender.clone(),
71 network_handle.clone(),
72 audio_state,
73 gpu,
74 run_app_notify.clone(),
75 logger.clone(),
76 )?);
77 let gpu_call_receiver = runtime.take_gpu_call_receiver();
78 let app = App::new(
79 id,
80 event_sender.clone(),
81 runtime.gpu.clone(),
82 runtime.clone(),
83 gpu_call_receiver,
84 );
85
86 let node = Self {
87 id,
88 runtime,
89 app,
90 network,
91 network_handle,
92 event_receiver,
93 run_app_notify,
94 logger,
95 };
96
97 Ok(node)
98 }
99
100 pub async fn load(nodes_path: &Path, id: NodeId, port: u32) -> Result<Self, IntersticeError> {
101 let data_path = nodes_path.join(id.to_string());
102 let address = format!("127.0.0.1:{}", port);
103 let modules_path = data_path.join("modules");
104 let table_store = TableStore::new(Some(modules_path.clone()));
105
106 let (event_sender, event_receiver) = mpsc::unbounded_channel();
107
108 let logger_file = File::options()
110 .append(true)
111 .open(data_path.join("node.log"))
112 .expect("Should be able to open log file");
113 let logger = Logger::new(logger_file);
114
115 let peer_tokens = Arc::new(Mutex::new(PeerTokenStore::load_or_create(
116 data_path.join("peer_tokens.toml"),
117 )?));
118 let network = Network::new(
119 id,
120 address.clone(),
121 event_sender.clone(),
122 peer_tokens,
123 logger.clone(),
124 );
125 let network_handle = network.get_handle();
126 let gpu = Arc::new(Mutex::new(None));
127 let audio_state = Arc::new(Mutex::new(
128 crate::runtime::host_calls::audio::AudioState::new(
129 crate::runtime::host_calls::audio::start_audio_thread(),
130 ),
131 ));
132 let run_app_notify = Arc::new(Notify::new());
133 let runtime = Arc::new(Runtime::new(
134 Some(modules_path.clone()),
135 table_store,
136 event_sender.clone(),
137 network_handle.clone(),
138 audio_state,
139 gpu,
140 run_app_notify.clone(),
141 logger.clone(),
142 )?);
143 let gpu_call_receiver = runtime.take_gpu_call_receiver();
144 let app = App::new(
145 id,
146 event_sender.clone(),
147 runtime.gpu.clone(),
148 runtime.clone(),
149 gpu_call_receiver,
150 );
151
152 for entry in std::fs::read_dir(&modules_path).unwrap() {
154 let entry = entry.unwrap();
155 let path = entry.path();
156 if !path.is_dir() {
157 continue;
158 }
159
160 std::fs::create_dir_all(path.join("logs")).unwrap();
161 std::fs::create_dir_all(path.join("snapshots")).unwrap();
162
163 let wasm_path = path.join("module.wasm");
164 if !wasm_path.exists() {
165 continue;
166 }
167
168 let module = Module::from_file(runtime.clone(), &wasm_path).await?;
169 Runtime::load_module(runtime.clone(), module, false).await?;
170 }
171
172 runtime.replay()?;
174
175 let node = Self {
176 id,
177 runtime,
178 app,
179 network,
180 network_handle,
181 event_receiver,
182 run_app_notify,
183 logger,
184 };
185
186 Ok(node)
187 }
188
189 pub fn log(&self, message: &str, source: LogSource, level: LogLevel) {
190 self.logger.log(message, source, level);
191 }
192
193 pub async fn start(self) -> Result<(), IntersticeError> {
194 let Node {
195 id,
196 runtime,
197 app,
198 mut network,
199 network_handle: _network_handle,
200 event_receiver,
201 run_app_notify,
202 logger,
203 } = self;
204
205 logger.log(
206 &format!("Starting node with ID: {}", id),
207 LogSource::Node,
208 LogLevel::Info,
209 );
210
211 network.listen()?;
213 let _net_handle = network.run();
214
215 thread::spawn(move || {
216 let rt = tokio::runtime::Builder::new_current_thread()
217 .enable_all()
218 .build()
219 .expect("Failed to build runtime");
220 let local = tokio::task::LocalSet::new();
221 local.block_on(&rt, async move {
222 let audio_engine = AudioEngine::new(
223 runtime.audio_state.clone(),
224 runtime.authority_modules.clone(),
225 runtime.event_sender.clone(),
226 );
227 audio_engine.spawn();
228 Runtime::run(runtime, event_receiver).await;
229 });
230 });
231
232 run_app_notify.notified().await;
233 app.run();
234
235 Ok(())
236 }
237
238 pub async fn schema(&self, name: String) -> NodeSchema {
239 NodeSchema {
240 name,
241 address: self.network_handle.address.clone(),
242 modules: self
243 .runtime
244 .modules
245 .lock()
246 .unwrap()
247 .values()
248 .map(|m| (*m.schema).clone())
249 .collect(),
250 }
251 }
252
253 pub async fn clear_logs(&mut self) -> Result<(), IntersticeError> {
254 self.runtime.persistence.clear_all()?;
255 Ok(())
256 }
257
258 pub async fn load_module_from_file<P: AsRef<Path>>(
259 &self,
260 path: P,
261 ) -> Result<ModuleSchema, IntersticeError> {
262 let module = Module::from_file(self.runtime.clone(), path.as_ref()).await?;
263 Runtime::load_module(self.runtime.clone(), module, true).await
264 }
265
266 pub async fn load_module_from_bytes(
267 &self,
268 bytes: &[u8],
269 ) -> Result<ModuleSchema, IntersticeError> {
270 let module = Module::from_bytes(self.runtime.clone(), bytes).await?;
271 Runtime::load_module(self.runtime.clone(), module, true).await
272 }
273
274 pub fn get_port(&self) -> u32 {
275 self.network_handle
276 .address
277 .split(':')
278 .last()
279 .unwrap()
280 .parse()
281 .unwrap()
282 }
283}