essential_storage/
lib.rs

1#![deny(missing_docs)]
2//! # Storage
3//!
4//! Trait for the storage layer of the Essential platform.
5
6use std::{future::Future, ops::Range, time::Duration};
7
8use essential_types::{
9    contract::{Contract, SignedContract},
10    predicate::Predicate,
11    solution::Solution,
12    Block, ContentAddress, Hash, Key, PredicateAddress, Word,
13};
14use failed_solution::{FailedSolution, SolutionFailReason, SolutionOutcomes};
15
16/// Module for failed solution struct.
17pub mod failed_solution;
18/// Module for streams.
19pub mod streams;
20
21/// Data to commit after a block has been built.
22/// This data should all be committed atomically.
23pub struct CommitData<'a> {
24    /// Block number
25    pub block_number: u64,
26    /// Block timestamp
27    pub block_timestamp: Duration,
28    /// Failed solutions
29    pub failed: &'a [(Hash, SolutionFailReason)],
30    /// Solved solutions
31    pub solved: &'a [Hash],
32    /// State updates
33    pub state_updates: Box<dyn Iterator<Item = (ContentAddress, Key, Vec<Word>)> + 'a>,
34}
35
36/// Storage trait for the Essential platform.
37/// All inserts and updates are idempotent.
38pub trait Storage: StateStorage {
39    // Updates
40    /// Insert a contract with their storage layout.
41    fn insert_contract(
42        &self,
43        predicate: SignedContract,
44    ) -> impl Future<Output = anyhow::Result<()>> + Send;
45
46    /// Add a solution to the pool of unsolved solutions.
47    fn insert_solution_into_pool(
48        &self,
49        solution: Solution,
50    ) -> impl Future<Output = anyhow::Result<()>> + Send;
51
52    /// Move these solutions from the pool to the solved state.
53    fn move_solutions_to_solved(
54        &self,
55        block_number: u64,
56        block_timestamp: Duration,
57        solutions: &[Hash],
58    ) -> impl Future<Output = anyhow::Result<()>> + Send;
59
60    /// Move these solutions from the pool to the failed state.
61    fn move_solutions_to_failed(
62        &self,
63        solutions: &[(Hash, SolutionFailReason)],
64    ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send;
65
66    // Reads
67    /// Get an individual predicate.
68    /// Note that the same predicate might be in multiple contracts.
69    fn get_predicate(
70        &self,
71        address: &PredicateAddress,
72    ) -> impl Future<Output = anyhow::Result<Option<Predicate>>> + Send;
73
74    /// Get the entire contract.
75    fn get_contract(
76        &self,
77        address: &ContentAddress,
78    ) -> impl Future<Output = anyhow::Result<Option<SignedContract>>> + Send;
79
80    /// List all contracts. This will paginate the results. The page is 0-indexed.
81    /// A time range can optionally be provided to filter the results.
82    /// The time is duration since the Unix epoch.
83    fn list_contracts(
84        &self,
85        time_range: Option<Range<Duration>>,
86        page: Option<usize>,
87    ) -> impl Future<Output = anyhow::Result<Vec<Contract>>> + Send;
88
89    /// Subscribe to new contracts from a given start page or start time.
90    /// This will return all the contracts from that point then continue to stream
91    /// as new contracts are added.
92    fn subscribe_contracts(
93        self,
94        start_time: Option<Duration>,
95        start_page: Option<usize>,
96    ) -> impl futures::Stream<Item = anyhow::Result<Contract>> + Send + 'static;
97
98    /// List all solutions in the pool.
99    fn list_solutions_pool(
100        &self,
101        page: Option<usize>,
102    ) -> impl Future<Output = anyhow::Result<Vec<Solution>>> + Send;
103
104    /// List all failed solutions in the pool.
105    fn list_failed_solutions_pool(
106        &self,
107        page: Option<usize>,
108    ) -> impl std::future::Future<Output = anyhow::Result<Vec<FailedSolution>>> + Send;
109
110    /// List all blocks of solutions that have been solved.
111    fn list_blocks(
112        &self,
113        time_range: Option<Range<Duration>>,
114        block_number: Option<u64>,
115        page: Option<usize>,
116    ) -> impl Future<Output = anyhow::Result<Vec<Block>>> + Send;
117
118    /// Subscribe to new blocks from a given block number or start page or start time.
119    /// This will return all the blocks from that point then continue to stream
120    /// as new blocks are added.
121    fn subscribe_blocks(
122        self,
123        start_time: Option<Duration>,
124        block_number: Option<u64>,
125        start_page: Option<usize>,
126    ) -> impl futures::Stream<Item = anyhow::Result<Block>> + Send + 'static;
127
128    /// Get failed solution and its failing reason.
129    fn get_solution(
130        &self,
131        solution_hash: Hash,
132    ) -> impl std::future::Future<Output = anyhow::Result<Option<SolutionOutcomes>>> + Send;
133
134    /// Get latest block.
135    fn get_latest_block(
136        &self,
137    ) -> impl std::future::Future<Output = anyhow::Result<Option<Block>>> + Send;
138
139    /// Prune failed solutions that failed before the provided duration.
140    fn prune_failed_solutions(
141        &self,
142        older_than: Duration,
143    ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send;
144
145    /// Commit block data atomically.
146    fn commit_block(
147        &self,
148        data: CommitData,
149    ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send;
150}
151
152/// Storage trait just for state reads and writes.
153pub trait StateStorage: QueryState {
154    /// Update the state of a content address.
155    fn update_state(
156        &self,
157        address: &ContentAddress,
158        key: &Key,
159        value: Vec<Word>,
160    ) -> impl std::future::Future<Output = anyhow::Result<Vec<Word>>> + Send;
161
162    /// Update a batch of state in one transaction.
163    fn update_state_batch<U>(
164        &self,
165        updates: U,
166    ) -> impl std::future::Future<Output = anyhow::Result<Vec<Vec<Word>>>> + Send
167    where
168        U: IntoIterator<Item = (ContentAddress, Key, Vec<Word>)> + Send;
169}
170
171/// Storage trait for reading state.
172pub trait QueryState {
173    /// Query the state of a content address.
174    fn query_state(
175        &self,
176        address: &ContentAddress,
177        key: &Key,
178    ) -> impl std::future::Future<Output = anyhow::Result<Vec<Word>>> + Send;
179}
180
181/// Get a range of words from the state.
182pub async fn key_range<S, E>(
183    storage: &S,
184    contract_addr: ContentAddress,
185    mut key: Key,
186    num_words: usize,
187) -> Result<Vec<Vec<Word>>, E>
188where
189    S: QueryState + Send,
190    E: From<anyhow::Error>,
191{
192    let mut words = vec![];
193    for _ in 0..num_words {
194        let slot = storage.query_state(&contract_addr, &key).await?;
195        words.push(slot);
196        key = next_key(key).ok_or_else(|| anyhow::anyhow!("Failed to find next key"))?
197    }
198    Ok(words)
199}
200
201/// Calculate the next key.
202pub fn next_key(mut key: Key) -> Option<Key> {
203    for w in key.iter_mut().rev() {
204        match *w {
205            Word::MAX => *w = Word::MIN,
206            _ => {
207                *w += 1;
208                return Some(key);
209            }
210        }
211    }
212    None
213}