Skip to main content

interstice_core/
node.rs

1use crate::{
2    audio::AudioEngine,
3    app::App,
4    error::IntersticeError,
5    logger::{LogLevel, LogSource, Logger},
6    network::{Network, NetworkHandle},
7    persistence::{PeerTokenStore, TableStore},
8    runtime::{Runtime, event::EventInstance, module::Module},
9};
10use interstice_abi::{ModuleSchema, NodeSchema};
11use std::sync::Arc;
12use std::{fs::File, path::Path, sync::Mutex, thread};
13use tokio::sync::{
14    Notify,
15    mpsc::{self, UnboundedReceiver},
16};
17use uuid::Uuid;
18
19pub type NodeId = Uuid;
20
21pub struct Node {
22    pub id: NodeId,
23    pub(crate) network_handle: NetworkHandle,
24    run_app_notify: Arc<Notify>,
25    event_receiver: UnboundedReceiver<EventInstance>,
26    network: Network,
27    app: App,
28    runtime: Arc<Runtime>,
29    logger: Logger,
30}
31
32impl Node {
33    pub fn new(nodes_path: &Path, port: u32) -> Result<Self, IntersticeError> {
34        let id = Uuid::new_v4();
35        let data_path = nodes_path.join(id.to_string());
36        let modules_path = data_path.join("modules");
37        std::fs::create_dir_all(&data_path).expect("Should be able to create node data path");
38        std::fs::create_dir_all(&modules_path).expect("Should be able to create modules path");
39        let table_store = TableStore::new(Some(modules_path.clone()));
40
41        let address = format!("127.0.0.1:{}", port);
42
43        let (event_sender, event_receiver) = mpsc::unbounded_channel();
44
45        let logger = Logger::new(
46            File::create(data_path.join("node.log")).expect("Should be able to create log file"),
47        );
48
49        let peer_tokens = Arc::new(Mutex::new(PeerTokenStore::load_or_create(
50            data_path.join("peer_tokens.toml"),
51        )?));
52        let network = Network::new(
53            id,
54            address.clone(),
55            event_sender.clone(),
56            peer_tokens,
57            logger.clone(),
58        );
59        let network_handle = network.get_handle();
60        let gpu = Arc::new(Mutex::new(None));
61        let audio_state = Arc::new(Mutex::new(
62            crate::runtime::host_calls::audio::AudioState::new(
63                crate::runtime::host_calls::audio::start_audio_thread(),
64            ),
65        ));
66        let run_app_notify = Arc::new(Notify::new());
67        let runtime = Arc::new(Runtime::new(
68            Some(modules_path),
69            table_store,
70            event_sender.clone(),
71            network_handle.clone(),
72            audio_state,
73            gpu,
74            run_app_notify.clone(),
75            logger.clone(),
76        )?);
77        let gpu_call_receiver = runtime.take_gpu_call_receiver();
78        let app = App::new(
79            id,
80            event_sender.clone(),
81            runtime.gpu.clone(),
82            runtime.clone(),
83            gpu_call_receiver,
84        );
85
86        let node = Self {
87            id,
88            runtime,
89            app,
90            network,
91            network_handle,
92            event_receiver,
93            run_app_notify,
94            logger,
95        };
96
97        Ok(node)
98    }
99
100    pub async fn load(nodes_path: &Path, id: NodeId, port: u32) -> Result<Self, IntersticeError> {
101        let data_path = nodes_path.join(id.to_string());
102        let address = format!("127.0.0.1:{}", port);
103        let modules_path = data_path.join("modules");
104        let table_store = TableStore::new(Some(modules_path.clone()));
105
106        let (event_sender, event_receiver) = mpsc::unbounded_channel();
107
108        // Open log file to append new logs on.
109        let logger_file = File::options()
110            .append(true)
111            .open(data_path.join("node.log"))
112            .expect("Should be able to open log file");
113        let logger = Logger::new(logger_file);
114
115        let peer_tokens = Arc::new(Mutex::new(PeerTokenStore::load_or_create(
116            data_path.join("peer_tokens.toml"),
117        )?));
118        let network = Network::new(
119            id,
120            address.clone(),
121            event_sender.clone(),
122            peer_tokens,
123            logger.clone(),
124        );
125        let network_handle = network.get_handle();
126        let gpu = Arc::new(Mutex::new(None));
127        let audio_state = Arc::new(Mutex::new(
128            crate::runtime::host_calls::audio::AudioState::new(
129                crate::runtime::host_calls::audio::start_audio_thread(),
130            ),
131        ));
132        let run_app_notify = Arc::new(Notify::new());
133        let runtime = Arc::new(Runtime::new(
134            Some(modules_path.clone()),
135            table_store,
136            event_sender.clone(),
137            network_handle.clone(),
138            audio_state,
139            gpu,
140            run_app_notify.clone(),
141            logger.clone(),
142        )?);
143        let gpu_call_receiver = runtime.take_gpu_call_receiver();
144        let app = App::new(
145            id,
146            event_sender.clone(),
147            runtime.gpu.clone(),
148            runtime.clone(),
149            gpu_call_receiver,
150        );
151
152        // Load all modules
153        for entry in std::fs::read_dir(&modules_path).unwrap() {
154            let entry = entry.unwrap();
155            let path = entry.path();
156            if !path.is_dir() {
157                continue;
158            }
159
160            std::fs::create_dir_all(path.join("logs")).unwrap();
161            std::fs::create_dir_all(path.join("snapshots")).unwrap();
162
163            let wasm_path = path.join("module.wasm");
164            if !wasm_path.exists() {
165                continue;
166            }
167
168            let module = Module::from_file(runtime.clone(), &wasm_path).await?;
169            Runtime::load_module(runtime.clone(), module, false).await?;
170        }
171
172        // Replay transaction logs to restore state once all modules are available
173        runtime.replay()?;
174
175        let node = Self {
176            id,
177            runtime,
178            app,
179            network,
180            network_handle,
181            event_receiver,
182            run_app_notify,
183            logger,
184        };
185
186        Ok(node)
187    }
188
189    pub fn log(&self, message: &str, source: LogSource, level: LogLevel) {
190        self.logger.log(message, source, level);
191    }
192
193    pub async fn start(self) -> Result<(), IntersticeError> {
194        let Node {
195            id,
196            runtime,
197            app,
198            mut network,
199            network_handle: _network_handle,
200            event_receiver,
201            run_app_notify,
202            logger,
203        } = self;
204
205        logger.log(
206            &format!("Starting node with ID: {}", id),
207            LogSource::Node,
208            LogLevel::Info,
209        );
210
211        // Run network events
212        network.listen()?;
213        let _net_handle = network.run();
214
215        thread::spawn(move || {
216            let rt = tokio::runtime::Builder::new_current_thread()
217                .enable_all()
218                .build()
219                .expect("Failed to build runtime");
220            let local = tokio::task::LocalSet::new();
221            local.block_on(&rt, async move {
222                let audio_engine = AudioEngine::new(
223                    runtime.audio_state.clone(),
224                    runtime.authority_modules.clone(),
225                    runtime.event_sender.clone(),
226                );
227                audio_engine.spawn();
228                Runtime::run(runtime, event_receiver).await;
229            });
230        });
231
232        run_app_notify.notified().await;
233        app.run();
234
235        Ok(())
236    }
237
238    pub async fn schema(&self, name: String) -> NodeSchema {
239        NodeSchema {
240            name,
241            address: self.network_handle.address.clone(),
242            modules: self
243                .runtime
244                .modules
245                .lock()
246                .unwrap()
247                .values()
248                .map(|m| (*m.schema).clone())
249                .collect(),
250        }
251    }
252
253    pub async fn clear_logs(&mut self) -> Result<(), IntersticeError> {
254        self.runtime.persistence.clear_all()?;
255        Ok(())
256    }
257
258    pub async fn load_module_from_file<P: AsRef<Path>>(
259        &self,
260        path: P,
261    ) -> Result<ModuleSchema, IntersticeError> {
262        let module = Module::from_file(self.runtime.clone(), path.as_ref()).await?;
263        Runtime::load_module(self.runtime.clone(), module, true).await
264    }
265
266    pub async fn load_module_from_bytes(
267        &self,
268        bytes: &[u8],
269    ) -> Result<ModuleSchema, IntersticeError> {
270        let module = Module::from_bytes(self.runtime.clone(), bytes).await?;
271        Runtime::load_module(self.runtime.clone(), module, true).await
272    }
273
274    pub fn get_port(&self) -> u32 {
275        self.network_handle
276            .address
277            .split(':')
278            .last()
279            .unwrap()
280            .parse()
281            .unwrap()
282    }
283}