essential_node/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
#![deny(missing_docs)]
//! The Essential node implementation.
//!
//! The primary API for accessing blocks and contracts is provided via the
//! [`ConnectionPool`] type, accessible via the [`db`] function.
//!
//! The node, via the [`run`] function:
//! - Runs the relayer stream and syncs blocks.
//! - Derives state from the synced blocks.
//! - Performs validation.

use db::ConnectionPool;
use error::{BigBangError, ConnPoolNewError, CriticalError};
use essential_node_db as node_db;
use essential_node_types::BigBang;
use essential_relayer::Relayer;
use essential_types::ContentAddress;
pub use handles::node::Handle;
use state_derivation::state_derivation_stream;
pub use validate::validate;
pub use validate::validate_solution;
use validation::validation_stream;

pub mod db;
mod error;
mod handles;
mod state_derivation;
#[cfg(any(feature = "test-utils", test))]
#[allow(missing_docs)]
pub mod test_utils;
pub mod validate;
mod validation;

/// Wrapper around `watch::Sender` to notify of new blocks.
///
/// This is used by `essential-builder` to notify `essential-relayer`
/// and by `essential-relayer` to notify [`state_derivation`] and [`validation`] streams.
#[derive(Clone, Default)]
pub struct BlockTx(tokio::sync::watch::Sender<()>);

/// Wrapper around `watch::Receiver` to listen to new blocks.
///
/// This is used by [`db::subscribe_blocks`] stream.
#[derive(Clone)]
pub struct BlockRx(tokio::sync::watch::Receiver<()>);

/// Options for running the node.
#[derive(Clone, Debug)]
pub struct RunConfig {
    /// Node endpoint to sync blocks from.
    /// If `None` then the relayer stream will not run.
    pub relayer_source_endpoint: Option<String>,
    /// If `false` then the state derivation stream will not run.
    pub run_state_derivation: bool,
    /// If `false` then the validation stream will not run.
    pub run_validation: bool,
}

/// Create a new `ConnectionPool` from the given configuration.
///
/// Upon construction, the node's database tables are created if they have
/// not already been created.
///
/// ## Example
///
/// ```rust
/// # use essential_node::{BlockTx, db::Config, db, run};
/// # #[tokio::main]
/// # async fn main() {
/// let conf = Config::default();
/// let db = essential_node::db(&conf).unwrap();
/// for block in db.list_blocks(0..100).await.unwrap() {
///     println!("Block: {block:?}");
/// }
/// # }
/// ```
pub fn db(conf: &db::Config) -> Result<ConnectionPool, ConnPoolNewError> {
    // Initialize the connection pool.
    let db = db::ConnectionPool::new(conf)?;

    // Create the tables.
    let mut conn = db.try_acquire().expect("all permits available");
    if let db::Source::Path(_) = conf.source {
        conn.pragma_update(None, "journal_mode", "wal")?;
    };
    db::with_tx(&mut conn, |tx| essential_node_db::create_tables(tx))?;

    Ok(db)
}

/// Ensures that a big bang block exists in the DB for the given `BigBang` configuration.
///
/// If no block exists with `block_number` `0`, this inserts the big bang block.
///
/// If a block already exists with `block_number` `0`, this validates that its [`ContentAddress`]
/// matches the `ContentAddress` of the `Block` returned from [`BigBang::block`].
///
/// If validation has not yet begun, this initializes progress to begin from the big bang `Block`.
///
/// Returns the `ContentAddress` of the big bang `Block`.
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
pub async fn ensure_big_bang_block(
    conn_pool: &ConnectionPool,
    big_bang: &BigBang,
) -> Result<ContentAddress, BigBangError> {
    let bb_block = big_bang.block();
    let bb_block_ca = essential_hash::content_addr(&bb_block);

    #[cfg(feature = "tracing")]
    tracing::debug!("Big Bang Block CA: {bb_block_ca}");

    // List out the first block.
    match conn_pool.list_blocks(0..1).await?.into_iter().next() {
        // If no block at block `0` exists, insert and "finalize" the big bang block.
        None => {
            #[cfg(feature = "tracing")]
            tracing::debug!("Big Bang Block not found - inserting into DB");
            let bbb_ca = bb_block_ca.clone();
            conn_pool
                .acquire_then(|conn| {
                    db::with_tx(conn, move |tx| {
                        node_db::insert_block(tx, &bb_block)?;
                        node_db::finalize_block(tx, &bbb_ca)?;
                        Ok::<_, rusqlite::Error>(())
                    })
                })
                .await?;
        }
        // If a block already exists, ensure its the big bang block we expect.
        Some(block) => {
            let ca = essential_hash::content_addr(&block);
            if ca != bb_block_ca {
                return Err(BigBangError::UnexpectedBlock {
                    expected: bb_block_ca,
                    found: ca,
                });
            }
            #[cfg(feature = "tracing")]
            tracing::debug!("Big Bang Block already exists");
        }
    }

    // If validation has not yet begun, ensure it begins from the big bang block.
    if conn_pool.get_validation_progress().await?.is_none() {
        #[cfg(feature = "tracing")]
        tracing::debug!("Starting validation progress at Big Bang Block CA");
        conn_pool
            .update_validation_progress(bb_block_ca.clone())
            .await?;
    }

    Ok(bb_block_ca)
}

/// Optionally run the relayer and state derivation and validation streams.
///
/// Relayer will sync blocks from the node API blocks stream to node database
/// and notify state derivation stream of new blocks via the shared watch channel.
///
/// Returns a [`Handle`] that can be used to close the streams.
/// The streams will continue to run until the handle is dropped.
pub fn run(
    conn_pool: ConnectionPool,
    conf: RunConfig,
    contract_registry: ContentAddress,
    block_notify: BlockTx,
) -> Result<Handle, CriticalError> {
    let RunConfig {
        run_state_derivation,
        run_validation,
        relayer_source_endpoint,
    } = conf;

    // Run relayer.
    let relayer_handle = if let Some(relayer_source_endpoint) = relayer_source_endpoint {
        let relayer = Relayer::new(relayer_source_endpoint.as_str())?;
        Some(relayer.run(conn_pool.0.clone(), block_notify.0.clone())?)
    } else {
        None
    };

    // Run state derivation stream.
    let state_handle = if run_state_derivation {
        Some(state_derivation_stream(
            conn_pool.clone(),
            block_notify.new_listener().0,
        )?)
    } else {
        None
    };

    // Run validation stream.
    let validation_handle = if run_validation {
        Some(validation_stream(
            conn_pool.clone(),
            contract_registry,
            block_notify.new_listener().0,
        )?)
    } else {
        None
    };

    Ok(Handle::new(relayer_handle, state_handle, validation_handle))
}

impl BlockTx {
    /// Create a new [`BlockTx`] to notify listeners of new blocks.
    pub fn new() -> Self {
        let (block_tx, _block_rx) = tokio::sync::watch::channel(());
        Self(block_tx)
    }

    /// Notify listeners that a new block has been received.
    ///
    /// Note this is best effort and will still send even if there are currently no listeners.
    pub fn notify(&self) {
        let _ = self.0.send(());
    }

    /// Create a new [`BlockRx`] to listen for new blocks.
    pub fn new_listener(&self) -> BlockRx {
        BlockRx(self.0.subscribe())
    }
}

impl BlockRx {
    /// Waits for a change notification.
    pub async fn changed(&mut self) -> Result<(), tokio::sync::watch::error::RecvError> {
        self.0.changed().await
    }
}