essential_server/
lib.rs

1//! A library providing the Essential server's core logic around handling
2//! storage read/write access, validation and state transitions.
3//!
4//! For an executable implementation of the Essential server, see the
5//! `essential-rest-server` crate.
6
7use essential_check::{
8    self as check,
9    solution::{CheckPredicateConfig, Utility},
10};
11pub use essential_server_types::{CheckSolutionOutput, SolutionOutcome};
12pub use essential_state_read_vm::{Gas, StateRead};
13use essential_storage::failed_solution::CheckOutcome;
14pub use essential_storage::Storage;
15use essential_transaction_storage::{Transaction, TransactionStorage};
16use essential_types::{
17    contract::{Contract, SignedContract},
18    predicate::Predicate,
19    solution::Solution,
20    Block, ContentAddress, Hash, Key, PredicateAddress, Word,
21};
22use run::{Handle, Shutdown};
23use solution::read::read_contract_from_storage;
24use std::{collections::HashMap, ops::Range, sync::Arc, time::Duration};
25
26mod deploy;
27mod protocol;
28mod query_state_reads;
29mod run;
30mod solution;
31#[cfg(test)]
32mod test_utils;
33
34#[derive(Clone)]
35pub struct Essential<S>
36where
37    S: Storage + Clone,
38{
39    storage: S,
40    // Currently only check-related config, though we may want to add a
41    // top-level `Config` type for other kinds of configuration (e.g. gas costs).
42    config: Arc<CheckPredicateConfig>,
43    time_config: Arc<TimeConfig>,
44}
45
46#[derive(Debug, Clone)]
47/// Server configuration.
48pub struct Config {
49    /// Interval at which to run the main loop.
50    pub run_loop_interval: Duration,
51}
52
53#[derive(Debug, Clone)]
54/// Time configuration.
55pub struct TimeConfig {
56    pub enable_time: bool,
57    pub allow_time_submissions: bool,
58}
59
60impl Default for Config {
61    fn default() -> Self {
62        Self {
63            run_loop_interval: run::RUN_LOOP_FREQUENCY,
64        }
65    }
66}
67
68impl Default for TimeConfig {
69    fn default() -> Self {
70        Self {
71            enable_time: true,
72            allow_time_submissions: false,
73        }
74    }
75}
76
77const PRUNE_FAILED_STORAGE_OLDER_THAN: Duration = Duration::from_secs(604800); // one week
78
79impl<S> Essential<S>
80where
81    S: Storage + StateRead + Clone + Send + Sync + 'static,
82    <S as StateRead>::Future: Send,
83    <S as StateRead>::Error: Send,
84{
85    pub fn new(
86        storage: S,
87        config: Arc<CheckPredicateConfig>,
88        time_config: Arc<TimeConfig>,
89    ) -> Self {
90        Self {
91            storage,
92            config,
93            time_config,
94        }
95    }
96
97    pub fn spawn(self, config: Config) -> anyhow::Result<Handle>
98    where
99        S: 'static + Send + Sync,
100    {
101        let (mut handle, shutdown) = Handle::new();
102        let jh = tokio::spawn(async move { self.run(shutdown, config.run_loop_interval).await });
103        handle.contract_jh(jh);
104        Ok(handle)
105    }
106
107    pub async fn run(&self, shutdown: Shutdown, run_loop_interval: Duration) -> anyhow::Result<()> {
108        run::run(
109            &self.storage,
110            shutdown,
111            run_loop_interval,
112            &self.time_config,
113        )
114        .await
115    }
116
117    pub async fn deploy_contract(
118        &self,
119        contract: SignedContract,
120    ) -> anyhow::Result<ContentAddress> {
121        deploy::deploy(&self.storage, contract).await
122    }
123
124    pub async fn check_solution(&self, solution: Solution) -> anyhow::Result<CheckSolutionOutput> {
125        check::solution::check(&solution)?;
126        let contract = read_contract_from_storage(&solution, &self.storage).await?;
127        let transaction = self.storage.clone().transaction();
128        let solution = Arc::new(solution);
129        let config = self.config.clone();
130        let (_post_state, utility, gas) =
131            checked_state_transition(&transaction, solution, &contract, config).await?;
132        Ok(CheckSolutionOutput { utility, gas })
133    }
134
135    pub async fn check_solution_with_contracts(
136        &self,
137        solution: Solution,
138        contracts: Vec<Contract>,
139    ) -> anyhow::Result<CheckSolutionOutput> {
140        let predicates: HashMap<_, _> = contracts
141            .into_iter()
142            .flat_map(|contract| {
143                let contract_addr = essential_hash::contract_addr::from_contract(&contract);
144                contract.predicates.into_iter().map({
145                    let contract_addr = contract_addr.clone();
146                    move |predicate| {
147                        (
148                            PredicateAddress {
149                                contract: contract_addr.clone(),
150                                predicate: essential_hash::content_addr(&predicate),
151                            },
152                            Arc::new(predicate),
153                        )
154                    }
155                })
156            })
157            .collect();
158
159        check::solution::check(&solution)?;
160
161        let transaction = self.storage.clone().transaction();
162        let config = self.config.clone();
163        let solution = Arc::new(solution);
164        let (_post_state, utility, gas) =
165            checked_state_transition(&transaction, solution, &predicates, config).await?;
166        Ok(CheckSolutionOutput { utility, gas })
167    }
168
169    pub async fn submit_solution(&self, solution: Solution) -> anyhow::Result<ContentAddress> {
170        solution::filter_solution(&self.time_config, &solution)?;
171        solution::submit_solution(&self.storage, solution).await
172    }
173
174    pub async fn solution_outcome(
175        &self,
176        solution_hash: &Hash,
177    ) -> anyhow::Result<Vec<SolutionOutcome>> {
178        Ok(self
179            .storage
180            .get_solution(*solution_hash)
181            .await?
182            .map(|outcome| {
183                outcome
184                    .outcome
185                    .into_iter()
186                    .map(|outcome| match outcome {
187                        CheckOutcome::Success(block_number) => {
188                            SolutionOutcome::Success(block_number)
189                        }
190                        CheckOutcome::Fail(fail) => SolutionOutcome::Fail(fail.to_string()),
191                    })
192                    .collect()
193            })
194            .unwrap_or_default())
195    }
196
197    pub async fn get_predicate(
198        &self,
199        address: &PredicateAddress,
200    ) -> anyhow::Result<Option<Predicate>> {
201        self.storage.get_predicate(address).await
202    }
203
204    pub async fn get_contract(
205        &self,
206        address: &ContentAddress,
207    ) -> anyhow::Result<Option<SignedContract>> {
208        self.storage.get_contract(address).await
209    }
210
211    pub async fn list_contracts(
212        &self,
213        time_range: Option<Range<Duration>>,
214        page: Option<usize>,
215    ) -> anyhow::Result<Vec<Contract>> {
216        self.storage.list_contracts(time_range, page).await
217    }
218
219    pub fn subscribe_contracts(
220        &self,
221        start_time: Option<Duration>,
222        start_page: Option<usize>,
223    ) -> impl futures::stream::Stream<Item = anyhow::Result<Contract>> + Send + 'static {
224        self.storage
225            .clone()
226            .subscribe_contracts(start_time, start_page)
227    }
228
229    pub async fn list_solutions_pool(&self, page: Option<usize>) -> anyhow::Result<Vec<Solution>> {
230        self.storage.list_solutions_pool(page).await
231    }
232
233    pub async fn list_blocks(
234        &self,
235        time_range: Option<Range<Duration>>,
236        block_number: Option<u64>,
237        page: Option<usize>,
238    ) -> anyhow::Result<Vec<Block>> {
239        self.storage
240            .list_blocks(time_range, block_number, page)
241            .await
242    }
243
244    pub fn subscribe_blocks(
245        &self,
246        start_time: Option<Duration>,
247        start_number: Option<u64>,
248        start_page: Option<usize>,
249    ) -> impl futures::stream::Stream<Item = anyhow::Result<Block>> + Send + 'static {
250        self.storage
251            .clone()
252            .subscribe_blocks(start_time, start_number, start_page)
253    }
254
255    pub async fn query_state(
256        &self,
257        address: &ContentAddress,
258        key: &Key,
259    ) -> anyhow::Result<Vec<Word>> {
260        self.storage.query_state(address, key).await
261    }
262
263    pub async fn query_state_reads(
264        &self,
265        query: essential_server_types::QueryStateReads,
266    ) -> anyhow::Result<essential_server_types::QueryStateReadsOutput> {
267        let storage = self.storage.clone().transaction();
268        query_state_reads::query_state_reads(storage, query).await
269    }
270}
271
272/// Performs the three main steps of producing a state transition.
273///
274/// 1. Validates the given `contract` against the given `solution` prior to execution.
275/// 2. Clones the `pre_state` storage transaction and creates the proposed `post_state`.
276/// 3. Checks that the solution's data satisfies all constraints.
277///
278/// In the success case, returns the post state, utility and total gas used.
279pub(crate) async fn checked_state_transition<S>(
280    pre_state: &TransactionStorage<S>,
281    solution: Arc<Solution>,
282    contract: &HashMap<PredicateAddress, Arc<Predicate>>,
283    config: Arc<check::solution::CheckPredicateConfig>,
284) -> anyhow::Result<(TransactionStorage<S>, Utility, Gas)>
285where
286    S: Storage + StateRead + Clone + Send + Sync + 'static,
287{
288    // Pre-execution validation.
289    solution::validate_contract(&solution, contract)?;
290    let get_predicate = |addr: &PredicateAddress| contract[addr].clone();
291
292    // Create the post state for constraint checking.
293    let post_state = solution::create_post_state(pre_state, &solution)?;
294
295    // We only need read-only access to pre and post state during validation.
296    let pre = pre_state.view();
297    let post = post_state.view();
298    let (util, gas) =
299        check::solution::check_predicates(&pre, &post, solution.clone(), get_predicate, config)
300            .await?;
301
302    Ok((post_state, util, gas))
303}