1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
#![deny(missing_docs)]
//! # Storage
//!
//! Trait for the storage layer of the Essential platform.

use std::{future::Future, ops::Range, time::Duration};

use essential_types::{
    contract::{Contract, SignedContract},
    predicate::Predicate,
    solution::Solution,
    Block, ContentAddress, Hash, Key, PredicateAddress, Word,
};
use failed_solution::{FailedSolution, SolutionFailReason, SolutionOutcomes};

/// Module for failed solution struct.
pub mod failed_solution;
/// Module for streams.
pub mod streams;

/// Data to commit after a block has been built.
/// This data should all be committed atomically.
pub struct CommitData<'a> {
    /// Failed solutions
    pub failed: &'a [(Hash, SolutionFailReason)],
    /// Solved solutions
    pub solved: &'a [Hash],
    /// State updates
    pub state_updates: Box<dyn Iterator<Item = (ContentAddress, Key, Vec<Word>)> + 'a>,
}

/// Storage trait for the Essential platform.
/// All inserts and updates are idempotent.
pub trait Storage: StateStorage {
    // Updates
    /// Insert a contract with their storage layout.
    fn insert_contract(
        &self,
        predicate: SignedContract,
    ) -> impl Future<Output = anyhow::Result<()>> + Send;

    /// Add a solution to the pool of unsolved solutions.
    fn insert_solution_into_pool(
        &self,
        solution: Solution,
    ) -> impl Future<Output = anyhow::Result<()>> + Send;

    /// Move these solutions from the pool to the solved state.
    fn move_solutions_to_solved(
        &self,
        solutions: &[Hash],
    ) -> impl Future<Output = anyhow::Result<()>> + Send;

    /// Move these solutions from the pool to the failed state.
    fn move_solutions_to_failed(
        &self,
        solutions: &[(Hash, SolutionFailReason)],
    ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send;

    // Reads
    /// Get an individual predicate.
    /// Note that the same predicate might be in multiple contracts.
    fn get_predicate(
        &self,
        address: &PredicateAddress,
    ) -> impl Future<Output = anyhow::Result<Option<Predicate>>> + Send;

    /// Get the entire contract.
    fn get_contract(
        &self,
        address: &ContentAddress,
    ) -> impl Future<Output = anyhow::Result<Option<SignedContract>>> + Send;

    /// List all contracts. This will paginate the results. The page is 0-indexed.
    /// A time range can optionally be provided to filter the results.
    /// The time is duration since the Unix epoch.
    fn list_contracts(
        &self,
        time_range: Option<Range<Duration>>,
        page: Option<usize>,
    ) -> impl Future<Output = anyhow::Result<Vec<Contract>>> + Send;

    /// Subscribe to new contracts from a given start page or start time.
    /// This will return all the contracts from that point then continue to stream
    /// as new contracts are added.
    fn subscribe_contracts(
        self,
        start_time: Option<Duration>,
        start_page: Option<usize>,
    ) -> impl futures::Stream<Item = anyhow::Result<Contract>> + Send + 'static;

    /// List all solutions in the pool.
    fn list_solutions_pool(
        &self,
        page: Option<usize>,
    ) -> impl Future<Output = anyhow::Result<Vec<Solution>>> + Send;

    /// List all failed solutions in the pool.
    fn list_failed_solutions_pool(
        &self,
        page: Option<usize>,
    ) -> impl std::future::Future<Output = anyhow::Result<Vec<FailedSolution>>> + Send;

    /// List all blocks of solutions that have been solved.
    fn list_blocks(
        &self,
        time_range: Option<Range<Duration>>,
        block_number: Option<u64>,
        page: Option<usize>,
    ) -> impl Future<Output = anyhow::Result<Vec<Block>>> + Send;

    /// Subscribe to new blocks from a given block number or start page or start time.
    /// This will return all the blocks from that point then continue to stream
    /// as new blocks are added.
    fn subscribe_blocks(
        self,
        start_time: Option<Duration>,
        block_number: Option<u64>,
        start_page: Option<usize>,
    ) -> impl futures::Stream<Item = anyhow::Result<Block>> + Send + 'static;

    /// Get failed solution and its failing reason.
    fn get_solution(
        &self,
        solution_hash: Hash,
    ) -> impl std::future::Future<Output = anyhow::Result<Option<SolutionOutcomes>>> + Send;

    /// Prune failed solutions that failed before the provided duration.
    fn prune_failed_solutions(
        &self,
        older_than: Duration,
    ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send;

    /// Commit block data atomically.
    fn commit_block(
        &self,
        data: CommitData,
    ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send;
}

/// Storage trait just for state reads and writes.
pub trait StateStorage: QueryState {
    /// Update the state of a content address.
    fn update_state(
        &self,
        address: &ContentAddress,
        key: &Key,
        value: Vec<Word>,
    ) -> impl std::future::Future<Output = anyhow::Result<Vec<Word>>> + Send;

    /// Update a batch of state in one transaction.
    fn update_state_batch<U>(
        &self,
        updates: U,
    ) -> impl std::future::Future<Output = anyhow::Result<Vec<Vec<Word>>>> + Send
    where
        U: IntoIterator<Item = (ContentAddress, Key, Vec<Word>)> + Send;
}

/// Storage trait for reading state.
pub trait QueryState {
    /// Query the state of a content address.
    fn query_state(
        &self,
        address: &ContentAddress,
        key: &Key,
    ) -> impl std::future::Future<Output = anyhow::Result<Vec<Word>>> + Send;
}

/// Get a range of words from the state.
pub async fn key_range<S, E>(
    storage: &S,
    contract_addr: ContentAddress,
    mut key: Key,
    num_words: usize,
) -> Result<Vec<Vec<Word>>, E>
where
    S: QueryState + Send,
    E: From<anyhow::Error>,
{
    let mut words = vec![];
    for _ in 0..num_words {
        let slot = storage.query_state(&contract_addr, &key).await?;
        words.push(slot);
        key = next_key(key).ok_or_else(|| anyhow::anyhow!("Failed to find next key"))?
    }
    Ok(words)
}

/// Calculate the next key.
pub fn next_key(mut key: Key) -> Option<Key> {
    for w in key.iter_mut().rev() {
        match *w {
            Word::MAX => *w = Word::MIN,
            _ => {
                *w += 1;
                return Some(key);
            }
        }
    }
    None
}