1use crate::{
2 app::App,
3 error::IntersticeError,
4 network::{Network, NetworkHandle},
5 runtime::{Runtime, event::EventInstance},
6};
7use interstice_abi::{ModuleSchema, NodeSchema};
8use std::sync::Arc;
9use std::{path::Path, sync::Mutex};
10use tokio::sync::{
11 Notify,
12 mpsc::{self, UnboundedReceiver},
13};
14use uuid::Uuid;
15
16pub type NodeId = Uuid;
17
18pub struct Node {
19 pub id: NodeId,
20 pub(crate) network_handle: NetworkHandle,
21 run_app_notify: Arc<Notify>,
22 event_receiver: UnboundedReceiver<EventInstance>,
23 network: Network,
24 app: App,
25 runtime: Arc<Runtime>,
26}
27
28impl Node {
29 pub fn new(transaction_log_path: &Path, port: u32) -> Result<Self, IntersticeError> {
30 let id = Uuid::new_v4();
31 let (event_sender, event_receiver) = mpsc::unbounded_channel();
32 let address = format!("127.0.0.1:{}", port);
33 let network = Network::new(id, address.clone(), event_sender.clone());
35 let network_handle = network.get_handle();
36
37 let gpu = Arc::new(Mutex::new(None));
38
39 let app = App::new(id, event_sender.clone(), gpu.clone());
40
41 let run_app_notify = Arc::new(Notify::new());
42 let runtime = Arc::new(Runtime::new(
43 transaction_log_path,
44 event_sender.clone(),
45 network_handle.clone(),
46 gpu,
47 run_app_notify.clone(),
48 )?);
49
50 let node = Self {
51 id,
52 runtime,
53 app,
54 network,
55 network_handle,
56 event_receiver,
57 run_app_notify,
58 };
59
60 Ok(node)
61 }
62
63 pub async fn start(mut self) -> Result<(), IntersticeError> {
64 self.runtime.replay()?;
66
67 self.network.listen()?;
69 let _net_handle = self.network.run();
70 let _runtime_handle = Runtime::run(self.runtime, self.event_receiver);
71
72 self.run_app_notify.notified().await;
73 self.app.run();
74
75 Ok(())
78 }
79
80 pub async fn schema(&self, name: String) -> NodeSchema {
81 NodeSchema {
82 name,
83 address: self.network_handle.address.clone(),
84 modules: self
85 .runtime
86 .modules
87 .lock()
88 .unwrap()
89 .values()
90 .map(|m| (*m.schema).clone())
91 .collect(),
92 }
93 }
94
95 pub async fn clear_logs(&mut self) -> Result<(), IntersticeError> {
96 self.runtime
97 .transaction_logs
98 .lock()
99 .unwrap()
100 .delete_all_logs()?;
101 Ok(())
102 }
103
104 pub async fn load_module(&self, path: &str) -> Result<ModuleSchema, IntersticeError> {
105 Runtime::load_module(self.runtime.clone(), path).await
106 }
107}