Skip to main content

interstice_core/
node.rs

1use crate::{
2    app::App,
3    error::IntersticeError,
4    network::{Network, NetworkHandle},
5    runtime::{Runtime, event::EventInstance},
6};
7use interstice_abi::{ModuleSchema, NodeSchema};
8use std::sync::Arc;
9use std::{path::Path, sync::Mutex};
10use tokio::sync::{
11    Notify,
12    mpsc::{self, UnboundedReceiver},
13};
14use uuid::Uuid;
15
16pub type NodeId = Uuid;
17
18pub struct Node {
19    pub id: NodeId,
20    pub(crate) network_handle: NetworkHandle,
21    run_app_notify: Arc<Notify>,
22    event_receiver: UnboundedReceiver<EventInstance>,
23    network: Network,
24    app: App,
25    runtime: Arc<Runtime>,
26}
27
28impl Node {
29    pub fn new(transaction_log_path: &Path, port: u32) -> Result<Self, IntersticeError> {
30        let id = Uuid::new_v4();
31        let (event_sender, event_receiver) = mpsc::unbounded_channel();
32        let address = format!("127.0.0.1:{}", port);
33        // create network and listen to events
34        let network = Network::new(id, address.clone(), event_sender.clone());
35        let network_handle = network.get_handle();
36
37        let gpu = Arc::new(Mutex::new(None));
38
39        let app = App::new(id, event_sender.clone(), gpu.clone());
40
41        let run_app_notify = Arc::new(Notify::new());
42        let runtime = Arc::new(Runtime::new(
43            transaction_log_path,
44            event_sender.clone(),
45            network_handle.clone(),
46            gpu,
47            run_app_notify.clone(),
48        )?);
49
50        let node = Self {
51            id,
52            runtime,
53            app,
54            network,
55            network_handle,
56            event_receiver,
57            run_app_notify,
58        };
59
60        Ok(node)
61    }
62
63    pub async fn start(mut self) -> Result<(), IntersticeError> {
64        // Retreive the current state from logs
65        self.runtime.replay()?;
66
67        // Run network events
68        self.network.listen()?;
69        let _net_handle = self.network.run();
70        let _runtime_handle = Runtime::run(self.runtime, self.event_receiver);
71
72        self.run_app_notify.notified().await;
73        self.app.run();
74
75        // let _ = tokio::join!(net_handle, runtime_handle);
76
77        Ok(())
78    }
79
80    pub async fn schema(&self, name: String) -> NodeSchema {
81        NodeSchema {
82            name,
83            address: self.network_handle.address.clone(),
84            modules: self
85                .runtime
86                .modules
87                .lock()
88                .unwrap()
89                .values()
90                .map(|m| (*m.schema).clone())
91                .collect(),
92        }
93    }
94
95    pub async fn clear_logs(&mut self) -> Result<(), IntersticeError> {
96        self.runtime
97            .transaction_logs
98            .lock()
99            .unwrap()
100            .delete_all_logs()?;
101        Ok(())
102    }
103
104    pub async fn load_module(&self, path: &str) -> Result<ModuleSchema, IntersticeError> {
105        Runtime::load_module(self.runtime.clone(), path).await
106    }
107}