essential_memory_storage/
lib.rs

1use anyhow::bail;
2use essential_lock::StdLock;
3use essential_state_read_vm::StateRead;
4use essential_storage::{
5    failed_solution::{CheckOutcome, FailedSolution, SolutionFailReason, SolutionOutcomes},
6    key_range, CommitData, QueryState, StateStorage, Storage,
7};
8use essential_types::{
9    contract::{Contract, SignedContract},
10    predicate::Predicate,
11    solution::Solution,
12    ContentAddress, Hash, Key, PredicateAddress, Signature, Word,
13};
14use futures::{future::FutureExt, StreamExt};
15use std::{
16    collections::{BTreeMap, HashMap, HashSet},
17    pin::Pin,
18    sync::Arc,
19    time::{Duration, SystemTime, UNIX_EPOCH},
20};
21use thiserror::Error;
22
23mod values;
24
25/// Amount of values returned in a single page.
26const PAGE_SIZE: usize = 100;
27
28#[derive(Clone)]
29pub struct MemoryStorage {
30    inner: Arc<StdLock<Inner>>,
31    streams: essential_storage::streams::Notify,
32}
33
34impl Default for MemoryStorage {
35    fn default() -> Self {
36        Self::new()
37    }
38}
39
40#[derive(Default, Debug)]
41struct Inner {
42    contracts: HashMap<ContentAddress, ContractWithAddresses>,
43    predicates: HashMap<ContentAddress, Predicate>,
44    contract_time_index: BTreeMap<Duration, Vec<ContentAddress>>,
45    solution_pool: HashSet<Hash>,
46    solution_time_index: BTreeMap<Duration, Vec<Hash>>,
47    failed_solution_pool: HashMap<Hash, Vec<(SolutionFailReason, Duration)>>,
48    failed_solution_time_index: BTreeMap<Duration, Vec<Hash>>,
49    solutions: HashMap<Hash, Solution>,
50    /// Solved batches ordered by the time they were solved.
51    solved: BTreeMap<Duration, Block>,
52    block_number_index: HashMap<u64, Duration>,
53    solution_block_time_index: HashMap<Hash, Vec<Duration>>,
54    state: HashMap<ContentAddress, BTreeMap<Key, Vec<Word>>>,
55}
56
57#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
58struct Block {
59    number: u64,
60    timestamp: Duration,
61    hashes: Vec<Hash>,
62}
63
64#[derive(Debug)]
65struct ContractWithAddresses {
66    salt: Hash,
67    data: HashSet<ContentAddress>,
68    signature: Signature,
69}
70
71impl ContractWithAddresses {
72    /// All predicate addresses ordered by their CA.
73    fn predicate_addrs(&self) -> Vec<&ContentAddress> {
74        let mut addrs: Vec<_> = self.data.iter().collect();
75        addrs.sort();
76        addrs
77    }
78
79    /// All predicates in the contract, ordered by their CA.
80    fn predicates_owned(&self, predicates: &HashMap<ContentAddress, Predicate>) -> Vec<Predicate> {
81        self.predicate_addrs()
82            .into_iter()
83            .filter_map(|addr| predicates.get(addr).cloned())
84            .collect()
85    }
86
87    /// Re-construct the `SignedContract`.
88    ///
89    /// Predicates in the returned contract will be ordered by their CA.
90    fn signed_contract(&self, predicates: &HashMap<ContentAddress, Predicate>) -> SignedContract {
91        let signature = self.signature.clone();
92        let predicates = self.predicates_owned(predicates);
93        SignedContract {
94            contract: Contract {
95                salt: self.salt,
96                predicates,
97            },
98            signature,
99        }
100    }
101}
102
103impl MemoryStorage {
104    pub fn new() -> Self {
105        Self {
106            inner: Arc::new(StdLock::new(Inner::default())),
107            streams: essential_storage::streams::Notify::new(),
108        }
109    }
110}
111
112impl StateStorage for MemoryStorage {
113    async fn update_state(
114        &self,
115        address: &ContentAddress,
116        key: &Key,
117        value: Vec<Word>,
118    ) -> anyhow::Result<Vec<Word>> {
119        self.inner.apply(|i| {
120            let Some(map) = i.state.get_mut(address) else {
121                bail!("No state for address, {:?}", address);
122            };
123            let v = if value.is_empty() {
124                map.remove(key)
125            } else {
126                map.insert(key.clone(), value)
127            };
128            let v = v.unwrap_or_default();
129            Ok(v)
130        })
131    }
132
133    async fn update_state_batch<U>(&self, updates: U) -> anyhow::Result<Vec<Vec<Word>>>
134    where
135        U: IntoIterator<Item = (ContentAddress, Key, Vec<Word>)> + Send,
136    {
137        let v = self.inner.apply(|i| update_state_batch(i, updates));
138        Ok(v)
139    }
140}
141
142impl QueryState for MemoryStorage {
143    async fn query_state(&self, address: &ContentAddress, key: &Key) -> anyhow::Result<Vec<Word>> {
144        let v = self.inner.apply(|i| {
145            let map = i.state.get(address)?;
146            let v = map.get(key)?;
147            Some(v.clone())
148        });
149        Ok(v.unwrap_or_default())
150    }
151}
152
153impl Storage for MemoryStorage {
154    async fn insert_contract(&self, signed: SignedContract) -> anyhow::Result<()> {
155        let SignedContract {
156            contract,
157            signature,
158        } = signed;
159
160        let salt = contract.salt;
161
162        let data: HashMap<_, _> = contract
163            .predicates
164            .into_iter()
165            .map(|p| (essential_hash::content_addr(&p), p))
166            .collect();
167
168        let contract_addr =
169            essential_hash::contract_addr::from_predicate_addrs(data.keys().cloned(), &salt);
170
171        let contract_with_addrs = ContractWithAddresses {
172            salt,
173            data: data.keys().cloned().collect(),
174            signature,
175        };
176        let time = SystemTime::now().duration_since(UNIX_EPOCH)?;
177        let r = self.inner.apply(|i| {
178            i.predicates.extend(data);
179            let contains = i
180                .contracts
181                .insert(contract_addr.clone(), contract_with_addrs);
182            if contains.is_none() {
183                i.contract_time_index
184                    .entry(time)
185                    .or_default()
186                    .push(contract_addr.clone());
187            }
188            i.state.entry(contract_addr).or_default();
189            Ok(())
190        });
191
192        // There is a new contract.
193        self.streams.notify_new_contracts();
194        r
195    }
196
197    async fn insert_solution_into_pool(&self, solution: Solution) -> anyhow::Result<()> {
198        let timestamp = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)?;
199        let hash = essential_hash::hash(&solution);
200        self.inner.apply(|i| {
201            if i.solution_pool.insert(hash) {
202                i.solution_time_index
203                    .entry(timestamp)
204                    .or_default()
205                    .push(hash);
206            }
207            i.solutions.insert(hash, solution);
208        });
209        Ok(())
210    }
211
212    async fn move_solutions_to_solved(
213        &self,
214        block_number: u64,
215        block_timestamp: Duration,
216        solutions: &[Hash],
217    ) -> anyhow::Result<()> {
218        let new_block = !solutions.is_empty();
219        let hashes: HashSet<_> = solutions.iter().collect();
220        let r = self.inner.apply(|i| {
221            move_solutions_to_solved(i, block_number, block_timestamp, solutions, hashes)
222        });
223
224        if new_block {
225            // There is a new block.
226            self.streams.notify_new_blocks();
227        }
228        r
229    }
230
231    async fn move_solutions_to_failed(
232        &self,
233        solutions: &[(Hash, SolutionFailReason)],
234    ) -> anyhow::Result<()> {
235        let hashes: HashSet<_> = solutions.iter().map(|(h, _)| h).collect();
236        self.inner
237            .apply(|i| move_solutions_to_failed(i, solutions, hashes))
238    }
239
240    async fn get_predicate(&self, address: &PredicateAddress) -> anyhow::Result<Option<Predicate>> {
241        let v = self.inner.apply(|i| {
242            if i.contracts
243                .get(&address.contract)
244                .map_or(true, |c| !c.data.contains(&address.predicate))
245            {
246                return None;
247            }
248            let predicate = i.predicates.get(&address.predicate)?;
249            Some(predicate.clone())
250        });
251        Ok(v)
252    }
253
254    async fn get_contract(
255        &self,
256        address: &ContentAddress,
257    ) -> anyhow::Result<Option<SignedContract>> {
258        let v = self
259            .inner
260            .apply(|i| Some(i.contracts.get(address)?.signed_contract(&i.predicates)));
261        Ok(v)
262    }
263
264    async fn list_contracts(
265        &self,
266        time_range: Option<std::ops::Range<std::time::Duration>>,
267        page: Option<usize>,
268    ) -> anyhow::Result<Vec<Contract>> {
269        let page = page.unwrap_or(0);
270        match time_range {
271            Some(range) => {
272                let v = self.inner.apply(|i| {
273                    values::page_contract_by_time(
274                        &i.contract_time_index,
275                        &i.contracts,
276                        &i.predicates,
277                        range,
278                        page,
279                        PAGE_SIZE,
280                    )
281                });
282                Ok(v)
283            }
284            None => {
285                let v = self.inner.apply(|i| {
286                    values::page_contract(
287                        i.contract_time_index.values().flatten(),
288                        &i.contracts,
289                        &i.predicates,
290                        page,
291                        PAGE_SIZE,
292                    )
293                });
294                Ok(v)
295            }
296        }
297    }
298
299    fn subscribe_contracts(
300        self,
301        start_time: Option<Duration>,
302        start_page: Option<usize>,
303    ) -> impl futures::Stream<Item = anyhow::Result<Contract>> + Send + 'static {
304        let new_contracts = self.streams.subscribe_contracts();
305        let init = essential_storage::streams::StreamState::new(start_page, start_time, None);
306        futures::stream::unfold(init, move |state| {
307            let storage = self.clone();
308            essential_storage::streams::next_data(
309                new_contracts.clone(),
310                state,
311                PAGE_SIZE,
312                // List contracts expects a Range not a RangeFrom so we give it a range from
313                // start till the end of time.
314                move |get| {
315                    let storage = storage.clone();
316                    async move {
317                        storage
318                            .list_contracts(get.time.map(|s| s..Duration::MAX), Some(get.page))
319                            .await
320                    }
321                },
322            )
323        })
324        .flat_map(futures::stream::iter)
325    }
326
327    async fn list_solutions_pool(&self, page: Option<usize>) -> anyhow::Result<Vec<Solution>> {
328        Ok(self.inner.apply(|i| {
329            let iter = i
330                .solution_time_index
331                .values()
332                .flatten()
333                .filter(|h| i.solution_pool.contains(*h));
334            values::page_solutions(
335                iter,
336                |h| i.solutions.get(h).cloned(),
337                page.unwrap_or(0),
338                PAGE_SIZE,
339            )
340        }))
341    }
342
343    async fn list_failed_solutions_pool(
344        &self,
345        page: Option<usize>,
346    ) -> anyhow::Result<Vec<FailedSolution>> {
347        Ok(self.inner.apply(|i| {
348            let iter = i.failed_solution_time_index.values().flat_map(|hashes| {
349                hashes.iter().flat_map(|h| {
350                    i.failed_solution_pool
351                        .get(h)
352                        .into_iter()
353                        .flatten()
354                        .map(|r| (*h, r.clone()))
355                })
356            });
357            values::page_solutions(
358                iter,
359                |(h, r)| {
360                    let solution = i.solutions.get(&h).cloned()?;
361                    Some(FailedSolution {
362                        solution,
363                        reason: r.0,
364                    })
365                },
366                page.unwrap_or(0),
367                PAGE_SIZE,
368            )
369        }))
370    }
371
372    async fn list_blocks(
373        &self,
374        time_range: Option<std::ops::Range<std::time::Duration>>,
375        block_number: Option<u64>,
376        page: Option<usize>,
377    ) -> anyhow::Result<Vec<essential_types::Block>> {
378        let page = page.unwrap_or(0);
379        self.inner.apply(|i| {
380            values::page_blocks(
381                &i.solved,
382                &i.solutions,
383                &i.block_number_index,
384                time_range,
385                block_number,
386                page,
387                PAGE_SIZE,
388            )
389        })
390    }
391
392    fn subscribe_blocks(
393        self,
394        start_time: Option<Duration>,
395        block_number: Option<u64>,
396        start_page: Option<usize>,
397    ) -> impl futures::Stream<Item = anyhow::Result<essential_types::Block>> + Send + 'static {
398        let new_blocks = self.streams.subscribe_blocks();
399        let init =
400            essential_storage::streams::StreamState::new(start_page, start_time, block_number);
401        futures::stream::unfold(init, move |state| {
402            let storage = self.clone();
403            essential_storage::streams::next_data(
404                new_blocks.clone(),
405                state,
406                PAGE_SIZE,
407                // List blocks expects a Range not a RangeFrom so we give it a range from
408                // start till the end of time.
409                move |get| {
410                    let storage = storage.clone();
411                    async move {
412                        storage
413                            .list_blocks(
414                                get.time.map(|s| s..Duration::MAX),
415                                get.number,
416                                Some(get.page),
417                            )
418                            .await
419                    }
420                },
421            )
422        })
423        .flat_map(futures::stream::iter)
424    }
425
426    async fn get_solution(&self, solution_hash: Hash) -> anyhow::Result<Option<SolutionOutcomes>> {
427        let r = self.inner.apply(|i| {
428            i.solutions.get(&solution_hash).cloned().map(|s| {
429                let mut outcomes: Vec<_> = i
430                    .failed_solution_pool
431                    .get(&solution_hash)
432                    .into_iter()
433                    .flatten()
434                    .cloned()
435                    .map(|(r, t)| (t, CheckOutcome::Fail(r)))
436                    .chain(
437                        i.solution_block_time_index
438                            .get(&solution_hash)
439                            .into_iter()
440                            .flatten()
441                            .filter_map(|time| {
442                                let b = i.solved.get(time)?;
443                                Some((*time, CheckOutcome::Success(b.number)))
444                            }),
445                    )
446                    .collect();
447                outcomes.sort_by_key(|(t, _)| *t);
448                let outcome = outcomes.into_iter().map(|(_, o)| o).collect();
449                SolutionOutcomes {
450                    solution: s.clone(),
451                    outcome,
452                }
453            })
454        });
455        Ok(r)
456    }
457
458    async fn prune_failed_solutions(&self, older_than: Duration) -> anyhow::Result<()> {
459        self.inner.apply(|i| {
460            i.failed_solution_time_index.retain(|timestamp, hash| {
461                let retain = *timestamp >= older_than;
462                if !retain {
463                    for hash in hash {
464                        i.failed_solution_pool.remove(hash);
465                    }
466                }
467                retain
468            });
469            Ok(())
470        })
471    }
472
473    fn commit_block(
474        &self,
475        data: CommitData,
476    ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
477        let CommitData {
478            failed,
479            solved,
480            state_updates,
481            block_number,
482            block_timestamp,
483        } = data;
484        let hashes: HashSet<_> = failed.iter().map(|(h, _)| h).collect();
485        let solved_hashes: HashSet<_> = solved.iter().collect();
486        let r = self.inner.apply(|i| {
487            let new_block = !solved_hashes.is_empty();
488            move_solutions_to_failed(i, failed, hashes)?;
489            move_solutions_to_solved(i, block_number, block_timestamp, solved, solved_hashes)?;
490            update_state_batch(i, state_updates);
491            Ok(new_block)
492        });
493
494        if let Ok(r) = &r {
495            if *r {
496                // There is a new block.
497                self.streams.notify_new_blocks();
498            }
499        }
500
501        async { r.map(|_| ()) }
502    }
503
504    async fn get_latest_block(&self) -> anyhow::Result<Option<essential_types::Block>> {
505        let r = self.inner.apply(|i| match i.solved.last_key_value() {
506            Some((_, block)) => {
507                let solutions = block
508                    .hashes
509                    .iter()
510                    .map(|h| i.solutions.get(h).cloned())
511                    .collect::<Option<Vec<_>>>()?;
512                Some(essential_types::Block {
513                    number: block.number,
514                    timestamp: block.timestamp,
515                    solutions,
516                })
517            }
518            None => None,
519        });
520        Ok(r)
521    }
522}
523
524fn move_solutions_to_failed(
525    i: &mut Inner,
526    solutions: &[(Hash, SolutionFailReason)],
527    hashes: HashSet<&Hash>,
528) -> Result<(), anyhow::Error> {
529    let time = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)?;
530    let solutions = solutions.iter().filter_map(|(h, r)| {
531        if i.solution_pool.remove(h) {
532            Some((*h, r.clone()))
533        } else {
534            None
535        }
536    });
537
538    for v in i.solution_time_index.values_mut() {
539        v.retain(|h| !hashes.contains(h));
540    }
541    i.solution_time_index.retain(|_, v| !v.is_empty());
542
543    for (hash, reason) in solutions {
544        i.failed_solution_pool
545            .entry(hash)
546            .or_default()
547            .push((reason, time));
548        i.failed_solution_time_index
549            .entry(time)
550            .or_default()
551            .push(hash);
552    }
553
554    Ok(())
555}
556
557fn move_solutions_to_solved(
558    i: &mut Inner,
559    block_number: u64,
560    block_timestamp: Duration,
561    solutions: &[Hash],
562    hashes: HashSet<&Hash>,
563) -> Result<(), anyhow::Error> {
564    if solutions.is_empty() {
565        return Ok(());
566    }
567
568    if solutions.iter().all(|s| !i.solution_pool.contains(s)) {
569        return Ok(());
570    }
571
572    if i.solved.contains_key(&block_timestamp) {
573        bail!("Two blocks created at the same time");
574    }
575
576    for v in i.solution_time_index.values_mut() {
577        v.retain(|h| !hashes.contains(h));
578    }
579    i.solution_time_index.retain(|_, v| !v.is_empty());
580
581    for hash in solutions {
582        i.solution_block_time_index
583            .entry(*hash)
584            .or_default()
585            .push(block_timestamp);
586    }
587    let solutions = solutions
588        .iter()
589        .filter(|h| i.solution_pool.remove(*h))
590        .cloned()
591        .collect();
592
593    let block = Block {
594        number: block_number,
595        timestamp: block_timestamp,
596        hashes: solutions,
597    };
598    i.solved.insert(block_timestamp, block);
599    i.block_number_index.insert(block_number, block_timestamp);
600    Ok(())
601}
602
603fn update_state_batch<U>(i: &mut Inner, updates: U) -> Vec<Vec<i64>>
604where
605    U: IntoIterator<Item = (ContentAddress, Key, Vec<Word>)>,
606{
607    updates
608        .into_iter()
609        .map(|(address, key, value)| {
610            let map = i.state.entry(address).or_default();
611            let v = if value.is_empty() {
612                map.remove(&key)
613            } else {
614                map.insert(key, value)
615            };
616            v.unwrap_or_default()
617        })
618        .collect()
619}
620
621#[derive(Debug, Error)]
622pub enum MemoryStorageError {
623    #[error("failed to read from memory storage")]
624    ReadError(#[from] anyhow::Error),
625    #[error("invalid key range")]
626    KeyRangeError,
627}
628
629impl StateRead for MemoryStorage {
630    type Error = MemoryStorageError;
631
632    type Future =
633        Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<Word>>, Self::Error>> + Send>>;
634
635    fn key_range(&self, contract_addr: ContentAddress, key: Key, num_words: usize) -> Self::Future {
636        let storage = self.clone();
637        async move { key_range(&storage, contract_addr, key, num_words).await }.boxed()
638    }
639}