1use essential_check::{
8 self as check,
9 solution::{CheckPredicateConfig, Utility},
10};
11pub use essential_server_types::{CheckSolutionOutput, SolutionOutcome};
12pub use essential_state_read_vm::{Gas, StateRead};
13use essential_storage::failed_solution::CheckOutcome;
14pub use essential_storage::Storage;
15use essential_transaction_storage::{Transaction, TransactionStorage};
16use essential_types::{
17 contract::{Contract, SignedContract},
18 predicate::Predicate,
19 solution::Solution,
20 Block, ContentAddress, Hash, Key, PredicateAddress, Word,
21};
22use run::{Handle, Shutdown};
23use solution::read::read_contract_from_storage;
24use std::{collections::HashMap, ops::Range, sync::Arc, time::Duration};
25
26mod deploy;
27mod protocol;
28mod query_state_reads;
29mod run;
30mod solution;
31#[cfg(test)]
32mod test_utils;
33
34#[derive(Clone)]
35pub struct Essential<S>
36where
37 S: Storage + Clone,
38{
39 storage: S,
40 config: Arc<CheckPredicateConfig>,
43 time_config: Arc<TimeConfig>,
44}
45
46#[derive(Debug, Clone)]
47pub struct Config {
49 pub run_loop_interval: Duration,
51}
52
53#[derive(Debug, Clone)]
54pub struct TimeConfig {
56 pub enable_time: bool,
57 pub allow_time_submissions: bool,
58}
59
60impl Default for Config {
61 fn default() -> Self {
62 Self {
63 run_loop_interval: run::RUN_LOOP_FREQUENCY,
64 }
65 }
66}
67
68impl Default for TimeConfig {
69 fn default() -> Self {
70 Self {
71 enable_time: true,
72 allow_time_submissions: false,
73 }
74 }
75}
76
77const PRUNE_FAILED_STORAGE_OLDER_THAN: Duration = Duration::from_secs(604800); impl<S> Essential<S>
80where
81 S: Storage + StateRead + Clone + Send + Sync + 'static,
82 <S as StateRead>::Future: Send,
83 <S as StateRead>::Error: Send,
84{
85 pub fn new(
86 storage: S,
87 config: Arc<CheckPredicateConfig>,
88 time_config: Arc<TimeConfig>,
89 ) -> Self {
90 Self {
91 storage,
92 config,
93 time_config,
94 }
95 }
96
97 pub fn spawn(self, config: Config) -> anyhow::Result<Handle>
98 where
99 S: 'static + Send + Sync,
100 {
101 let (mut handle, shutdown) = Handle::new();
102 let jh = tokio::spawn(async move { self.run(shutdown, config.run_loop_interval).await });
103 handle.contract_jh(jh);
104 Ok(handle)
105 }
106
107 pub async fn run(&self, shutdown: Shutdown, run_loop_interval: Duration) -> anyhow::Result<()> {
108 run::run(
109 &self.storage,
110 shutdown,
111 run_loop_interval,
112 &self.time_config,
113 )
114 .await
115 }
116
117 pub async fn deploy_contract(
118 &self,
119 contract: SignedContract,
120 ) -> anyhow::Result<ContentAddress> {
121 deploy::deploy(&self.storage, contract).await
122 }
123
124 pub async fn check_solution(&self, solution: Solution) -> anyhow::Result<CheckSolutionOutput> {
125 check::solution::check(&solution)?;
126 let contract = read_contract_from_storage(&solution, &self.storage).await?;
127 let transaction = self.storage.clone().transaction();
128 let solution = Arc::new(solution);
129 let config = self.config.clone();
130 let (_post_state, utility, gas) =
131 checked_state_transition(&transaction, solution, &contract, config).await?;
132 Ok(CheckSolutionOutput { utility, gas })
133 }
134
135 pub async fn check_solution_with_contracts(
136 &self,
137 solution: Solution,
138 contracts: Vec<Contract>,
139 ) -> anyhow::Result<CheckSolutionOutput> {
140 let predicates: HashMap<_, _> = contracts
141 .into_iter()
142 .flat_map(|contract| {
143 let contract_addr = essential_hash::contract_addr::from_contract(&contract);
144 contract.predicates.into_iter().map({
145 let contract_addr = contract_addr.clone();
146 move |predicate| {
147 (
148 PredicateAddress {
149 contract: contract_addr.clone(),
150 predicate: essential_hash::content_addr(&predicate),
151 },
152 Arc::new(predicate),
153 )
154 }
155 })
156 })
157 .collect();
158
159 check::solution::check(&solution)?;
160
161 let transaction = self.storage.clone().transaction();
162 let config = self.config.clone();
163 let solution = Arc::new(solution);
164 let (_post_state, utility, gas) =
165 checked_state_transition(&transaction, solution, &predicates, config).await?;
166 Ok(CheckSolutionOutput { utility, gas })
167 }
168
169 pub async fn submit_solution(&self, solution: Solution) -> anyhow::Result<ContentAddress> {
170 solution::filter_solution(&self.time_config, &solution)?;
171 solution::submit_solution(&self.storage, solution).await
172 }
173
174 pub async fn solution_outcome(
175 &self,
176 solution_hash: &Hash,
177 ) -> anyhow::Result<Vec<SolutionOutcome>> {
178 Ok(self
179 .storage
180 .get_solution(*solution_hash)
181 .await?
182 .map(|outcome| {
183 outcome
184 .outcome
185 .into_iter()
186 .map(|outcome| match outcome {
187 CheckOutcome::Success(block_number) => {
188 SolutionOutcome::Success(block_number)
189 }
190 CheckOutcome::Fail(fail) => SolutionOutcome::Fail(fail.to_string()),
191 })
192 .collect()
193 })
194 .unwrap_or_default())
195 }
196
197 pub async fn get_predicate(
198 &self,
199 address: &PredicateAddress,
200 ) -> anyhow::Result<Option<Predicate>> {
201 self.storage.get_predicate(address).await
202 }
203
204 pub async fn get_contract(
205 &self,
206 address: &ContentAddress,
207 ) -> anyhow::Result<Option<SignedContract>> {
208 self.storage.get_contract(address).await
209 }
210
211 pub async fn list_contracts(
212 &self,
213 time_range: Option<Range<Duration>>,
214 page: Option<usize>,
215 ) -> anyhow::Result<Vec<Contract>> {
216 self.storage.list_contracts(time_range, page).await
217 }
218
219 pub fn subscribe_contracts(
220 &self,
221 start_time: Option<Duration>,
222 start_page: Option<usize>,
223 ) -> impl futures::stream::Stream<Item = anyhow::Result<Contract>> + Send + 'static {
224 self.storage
225 .clone()
226 .subscribe_contracts(start_time, start_page)
227 }
228
229 pub async fn list_solutions_pool(&self, page: Option<usize>) -> anyhow::Result<Vec<Solution>> {
230 self.storage.list_solutions_pool(page).await
231 }
232
233 pub async fn list_blocks(
234 &self,
235 time_range: Option<Range<Duration>>,
236 block_number: Option<u64>,
237 page: Option<usize>,
238 ) -> anyhow::Result<Vec<Block>> {
239 self.storage
240 .list_blocks(time_range, block_number, page)
241 .await
242 }
243
244 pub fn subscribe_blocks(
245 &self,
246 start_time: Option<Duration>,
247 start_number: Option<u64>,
248 start_page: Option<usize>,
249 ) -> impl futures::stream::Stream<Item = anyhow::Result<Block>> + Send + 'static {
250 self.storage
251 .clone()
252 .subscribe_blocks(start_time, start_number, start_page)
253 }
254
255 pub async fn query_state(
256 &self,
257 address: &ContentAddress,
258 key: &Key,
259 ) -> anyhow::Result<Vec<Word>> {
260 self.storage.query_state(address, key).await
261 }
262
263 pub async fn query_state_reads(
264 &self,
265 query: essential_server_types::QueryStateReads,
266 ) -> anyhow::Result<essential_server_types::QueryStateReadsOutput> {
267 let storage = self.storage.clone().transaction();
268 query_state_reads::query_state_reads(storage, query).await
269 }
270}
271
272pub(crate) async fn checked_state_transition<S>(
280 pre_state: &TransactionStorage<S>,
281 solution: Arc<Solution>,
282 contract: &HashMap<PredicateAddress, Arc<Predicate>>,
283 config: Arc<check::solution::CheckPredicateConfig>,
284) -> anyhow::Result<(TransactionStorage<S>, Utility, Gas)>
285where
286 S: Storage + StateRead + Clone + Send + Sync + 'static,
287{
288 solution::validate_contract(&solution, contract)?;
290 let get_predicate = |addr: &PredicateAddress| contract[addr].clone();
291
292 let post_state = solution::create_post_state(pre_state, &solution)?;
294
295 let pre = pre_state.view();
297 let post = post_state.view();
298 let (util, gas) =
299 check::solution::check_predicates(&pre, &post, solution.clone(), get_predicate, config)
300 .await?;
301
302 Ok((post_state, util, gas))
303}