Skip to main content

interstice_core/
node.rs

1use crate::{
2    app::App,
3    error::IntersticeError,
4    logger::{LogLevel, LogSource, Logger},
5    network::{Network, NetworkHandle},
6    runtime::{Runtime, event::EventInstance, module::Module},
7};
8use interstice_abi::{ModuleSchema, NodeSchema};
9use std::sync::Arc;
10use std::{fs::File, path::Path, sync::Mutex, thread};
11use tokio::sync::{
12    Notify,
13    mpsc::{self, UnboundedReceiver},
14};
15use uuid::Uuid;
16
17pub type NodeId = Uuid;
18
19pub struct Node {
20    pub id: NodeId,
21    pub(crate) network_handle: NetworkHandle,
22    run_app_notify: Arc<Notify>,
23    event_receiver: UnboundedReceiver<EventInstance>,
24    network: Network,
25    app: App,
26    runtime: Arc<Runtime>,
27    logger: Logger,
28}
29
30impl Node {
31    pub fn new(root_data_path: &Path, port: u32) -> Result<Self, IntersticeError> {
32        let id = Uuid::new_v4();
33        let data_path = root_data_path.join(id.to_string());
34        std::fs::create_dir_all(&data_path).expect("Should be able to create node data path");
35        let modules_path = data_path.join("modules");
36        std::fs::create_dir_all(&modules_path).expect("Should be able to create modules path");
37
38        let address = format!("127.0.0.1:{}", port);
39        let transaction_log_path = data_path.join("transaction_log");
40
41        let (event_sender, event_receiver) = mpsc::unbounded_channel();
42
43        let logger = Logger::new(
44            File::create(data_path.join("node.log")).expect("Should be able to create log file"),
45        );
46
47        let network = Network::new(id, address.clone(), event_sender.clone(), logger.clone());
48        let network_handle = network.get_handle();
49        let gpu = Arc::new(Mutex::new(None));
50        let run_app_notify = Arc::new(Notify::new());
51        let runtime = Arc::new(Runtime::new(
52            modules_path,
53            &transaction_log_path,
54            event_sender.clone(),
55            network_handle.clone(),
56            gpu,
57            run_app_notify.clone(),
58            logger.clone(),
59        )?);
60        let gpu_call_receiver = runtime.take_gpu_call_receiver();
61        let app = App::new(
62            id,
63            event_sender.clone(),
64            runtime.gpu.clone(),
65            runtime.clone(),
66            gpu_call_receiver,
67        );
68
69        let node = Self {
70            id,
71            runtime,
72            app,
73            network,
74            network_handle,
75            event_receiver,
76            run_app_notify,
77            logger,
78        };
79
80        Ok(node)
81    }
82
83    pub async fn load(
84        root_data_path: &Path,
85        id: NodeId,
86        port: u32,
87    ) -> Result<Self, IntersticeError> {
88        let data_path = root_data_path.join(id.to_string());
89        let address = format!("127.0.0.1:{}", port);
90        let transaction_log_path = data_path.join("transaction_log");
91        let modules_path = data_path.join("modules");
92
93        let (event_sender, event_receiver) = mpsc::unbounded_channel();
94
95        // Open log file to append new logs on.
96        let logger_file = File::options()
97            .append(true)
98            .open(data_path.join("node.log"))
99            .expect("Should be able to open log file");
100        let logger = Logger::new(logger_file);
101
102        let network = Network::new(id, address.clone(), event_sender.clone(), logger.clone());
103        let network_handle = network.get_handle();
104        let gpu = Arc::new(Mutex::new(None));
105        let run_app_notify = Arc::new(Notify::new());
106        let runtime = Arc::new(Runtime::new(
107            modules_path.clone(),
108            &transaction_log_path,
109            event_sender.clone(),
110            network_handle.clone(),
111            gpu,
112            run_app_notify.clone(),
113            logger.clone(),
114        )?);
115        let gpu_call_receiver = runtime.take_gpu_call_receiver();
116        let app = App::new(
117            id,
118            event_sender.clone(),
119            runtime.gpu.clone(),
120            runtime.clone(),
121            gpu_call_receiver,
122        );
123
124        // Load all modules
125        for module_path in std::fs::read_dir(&modules_path).unwrap() {
126            let module_path = module_path.unwrap().path();
127            if module_path.extension().and_then(|s| s.to_str()) == Some("wasm") {
128                let module = Module::from_file(runtime.clone(), &module_path).await?;
129                Runtime::load_module(runtime.clone(), module).await?;
130            }
131        }
132
133        // Replay transaction logs to restore state
134        runtime.replay()?; // Doesn't work when module loaded afetr app initialization, need to load modules before replaying logs
135
136        let node = Self {
137            id,
138            runtime,
139            app,
140            network,
141            network_handle,
142            event_receiver,
143            run_app_notify,
144            logger,
145        };
146
147        Ok(node)
148    }
149
150    pub fn log(&self, message: &str, source: LogSource, level: LogLevel) {
151        self.logger.log(message, source, level);
152    }
153
154    pub async fn start(self) -> Result<(), IntersticeError> {
155        let Node {
156            id,
157            runtime,
158            app,
159            mut network,
160            network_handle: _network_handle,
161            event_receiver,
162            run_app_notify,
163            logger,
164        } = self;
165
166        logger.log(
167            &format!("Starting node with ID: {}", id),
168            LogSource::Node,
169            LogLevel::Info,
170        );
171
172        // Run network events
173        network.listen()?;
174        let _net_handle = network.run();
175
176        thread::spawn(move || {
177            let rt = tokio::runtime::Builder::new_current_thread()
178                .enable_all()
179                .build()
180                .expect("Failed to build runtime");
181            let local = tokio::task::LocalSet::new();
182            local.block_on(&rt, async move {
183                Runtime::run(runtime, event_receiver).await;
184            });
185        });
186
187        run_app_notify.notified().await;
188        app.run();
189
190        Ok(())
191    }
192
193    pub async fn schema(&self, name: String) -> NodeSchema {
194        NodeSchema {
195            name,
196            address: self.network_handle.address.clone(),
197            modules: self
198                .runtime
199                .modules
200                .lock()
201                .unwrap()
202                .values()
203                .map(|m| (*m.schema).clone())
204                .collect(),
205        }
206    }
207
208    pub async fn clear_logs(&mut self) -> Result<(), IntersticeError> {
209        self.runtime
210            .transaction_logs
211            .lock()
212            .unwrap()
213            .delete_all_logs()?;
214        Ok(())
215    }
216
217    pub async fn load_module_from_file<P: AsRef<Path>>(
218        &self,
219        path: P,
220    ) -> Result<ModuleSchema, IntersticeError> {
221        let module = Module::from_file(self.runtime.clone(), path.as_ref()).await?;
222        Runtime::load_module(self.runtime.clone(), module).await
223    }
224
225    pub async fn load_module_from_bytes(
226        &self,
227        bytes: &[u8],
228    ) -> Result<ModuleSchema, IntersticeError> {
229        let module = Module::from_bytes(self.runtime.clone(), bytes).await?;
230        Runtime::load_module(self.runtime.clone(), module).await
231    }
232
233    pub fn get_port(&self) -> u32 {
234        self.network_handle
235            .address
236            .split(':')
237            .last()
238            .unwrap()
239            .parse()
240            .unwrap()
241    }
242}