1use crate::{
2 app::App,
3 error::IntersticeError,
4 logger::{LogLevel, LogSource, Logger},
5 network::{Network, NetworkHandle},
6 runtime::{Runtime, event::EventInstance, module::Module},
7};
8use interstice_abi::{ModuleSchema, NodeSchema};
9use std::sync::Arc;
10use std::{fs::File, path::Path, sync::Mutex, thread};
11use tokio::sync::{
12 Notify,
13 mpsc::{self, UnboundedReceiver},
14};
15use uuid::Uuid;
16
17pub type NodeId = Uuid;
18
19pub struct Node {
20 pub id: NodeId,
21 pub(crate) network_handle: NetworkHandle,
22 run_app_notify: Arc<Notify>,
23 event_receiver: UnboundedReceiver<EventInstance>,
24 network: Network,
25 app: App,
26 runtime: Arc<Runtime>,
27 logger: Logger,
28}
29
30impl Node {
31 pub fn new(root_data_path: &Path, port: u32) -> Result<Self, IntersticeError> {
32 let id = Uuid::new_v4();
33 let data_path = root_data_path.join(id.to_string());
34 std::fs::create_dir_all(&data_path).expect("Should be able to create node data path");
35 let modules_path = data_path.join("modules");
36 std::fs::create_dir_all(&modules_path).expect("Should be able to create modules path");
37
38 let address = format!("127.0.0.1:{}", port);
39 let transaction_log_path = data_path.join("transaction_log");
40
41 let (event_sender, event_receiver) = mpsc::unbounded_channel();
42
43 let logger = Logger::new(
44 File::create(data_path.join("node.log")).expect("Should be able to create log file"),
45 );
46
47 let network = Network::new(id, address.clone(), event_sender.clone(), logger.clone());
48 let network_handle = network.get_handle();
49 let gpu = Arc::new(Mutex::new(None));
50 let run_app_notify = Arc::new(Notify::new());
51 let runtime = Arc::new(Runtime::new(
52 modules_path,
53 &transaction_log_path,
54 event_sender.clone(),
55 network_handle.clone(),
56 gpu,
57 run_app_notify.clone(),
58 logger.clone(),
59 )?);
60 let gpu_call_receiver = runtime.take_gpu_call_receiver();
61 let app = App::new(
62 id,
63 event_sender.clone(),
64 runtime.gpu.clone(),
65 runtime.clone(),
66 gpu_call_receiver,
67 );
68
69 let node = Self {
70 id,
71 runtime,
72 app,
73 network,
74 network_handle,
75 event_receiver,
76 run_app_notify,
77 logger,
78 };
79
80 Ok(node)
81 }
82
83 pub async fn load(
84 root_data_path: &Path,
85 id: NodeId,
86 port: u32,
87 ) -> Result<Self, IntersticeError> {
88 let data_path = root_data_path.join(id.to_string());
89 let address = format!("127.0.0.1:{}", port);
90 let transaction_log_path = data_path.join("transaction_log");
91 let modules_path = data_path.join("modules");
92
93 let (event_sender, event_receiver) = mpsc::unbounded_channel();
94
95 let logger_file = File::options()
97 .append(true)
98 .open(data_path.join("node.log"))
99 .expect("Should be able to open log file");
100 let logger = Logger::new(logger_file);
101
102 let network = Network::new(id, address.clone(), event_sender.clone(), logger.clone());
103 let network_handle = network.get_handle();
104 let gpu = Arc::new(Mutex::new(None));
105 let run_app_notify = Arc::new(Notify::new());
106 let runtime = Arc::new(Runtime::new(
107 modules_path.clone(),
108 &transaction_log_path,
109 event_sender.clone(),
110 network_handle.clone(),
111 gpu,
112 run_app_notify.clone(),
113 logger.clone(),
114 )?);
115 let gpu_call_receiver = runtime.take_gpu_call_receiver();
116 let app = App::new(
117 id,
118 event_sender.clone(),
119 runtime.gpu.clone(),
120 runtime.clone(),
121 gpu_call_receiver,
122 );
123
124 for module_path in std::fs::read_dir(&modules_path).unwrap() {
126 let module_path = module_path.unwrap().path();
127 if module_path.extension().and_then(|s| s.to_str()) == Some("wasm") {
128 let module = Module::from_file(runtime.clone(), &module_path).await?;
129 Runtime::load_module(runtime.clone(), module).await?;
130 }
131 }
132
133 runtime.replay()?; let node = Self {
137 id,
138 runtime,
139 app,
140 network,
141 network_handle,
142 event_receiver,
143 run_app_notify,
144 logger,
145 };
146
147 Ok(node)
148 }
149
150 pub fn log(&self, message: &str, source: LogSource, level: LogLevel) {
151 self.logger.log(message, source, level);
152 }
153
154 pub async fn start(self) -> Result<(), IntersticeError> {
155 let Node {
156 id,
157 runtime,
158 app,
159 mut network,
160 network_handle: _network_handle,
161 event_receiver,
162 run_app_notify,
163 logger,
164 } = self;
165
166 logger.log(
167 &format!("Starting node with ID: {}", id),
168 LogSource::Node,
169 LogLevel::Info,
170 );
171
172 network.listen()?;
174 let _net_handle = network.run();
175
176 thread::spawn(move || {
177 let rt = tokio::runtime::Builder::new_current_thread()
178 .enable_all()
179 .build()
180 .expect("Failed to build runtime");
181 let local = tokio::task::LocalSet::new();
182 local.block_on(&rt, async move {
183 Runtime::run(runtime, event_receiver).await;
184 });
185 });
186
187 run_app_notify.notified().await;
188 app.run();
189
190 Ok(())
191 }
192
193 pub async fn schema(&self, name: String) -> NodeSchema {
194 NodeSchema {
195 name,
196 address: self.network_handle.address.clone(),
197 modules: self
198 .runtime
199 .modules
200 .lock()
201 .unwrap()
202 .values()
203 .map(|m| (*m.schema).clone())
204 .collect(),
205 }
206 }
207
208 pub async fn clear_logs(&mut self) -> Result<(), IntersticeError> {
209 self.runtime
210 .transaction_logs
211 .lock()
212 .unwrap()
213 .delete_all_logs()?;
214 Ok(())
215 }
216
217 pub async fn load_module_from_file<P: AsRef<Path>>(
218 &self,
219 path: P,
220 ) -> Result<ModuleSchema, IntersticeError> {
221 let module = Module::from_file(self.runtime.clone(), path.as_ref()).await?;
222 Runtime::load_module(self.runtime.clone(), module).await
223 }
224
225 pub async fn load_module_from_bytes(
226 &self,
227 bytes: &[u8],
228 ) -> Result<ModuleSchema, IntersticeError> {
229 let module = Module::from_bytes(self.runtime.clone(), bytes).await?;
230 Runtime::load_module(self.runtime.clone(), module).await
231 }
232
233 pub fn get_port(&self) -> u32 {
234 self.network_handle
235 .address
236 .split(':')
237 .last()
238 .unwrap()
239 .parse()
240 .unwrap()
241 }
242}