1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#![forbid(unsafe_code)]
use anyhow::{anyhow, Result};
use executor_types::{in_memory_state_calculator::IntoLedgerView, ExecutedChunk};
use std::{collections::VecDeque, sync::Arc};
use storage_interface::{DbReader, ExecutedTrees};
pub struct ChunkCommitQueue {
persisted_view: ExecutedTrees,
chunks_to_commit: VecDeque<Arc<ExecutedChunk>>,
}
impl ChunkCommitQueue {
pub fn new_from_db(db: &Arc<dyn DbReader>) -> Result<Self> {
let persisted_view = db
.get_startup_info()?
.ok_or_else(|| anyhow!("DB not bootstrapped."))?
.into_latest_tree_state()
.into_ledger_view(db)?;
Ok(Self::new(persisted_view))
}
pub fn new(persisted_view: ExecutedTrees) -> Self {
Self {
persisted_view,
chunks_to_commit: VecDeque::new(),
}
}
pub fn persisted_and_latest_view(&self) -> (ExecutedTrees, ExecutedTrees) {
(self.persisted_view.clone(), self.latest_view())
}
pub fn latest_view(&self) -> ExecutedTrees {
self.chunks_to_commit
.back()
.map(|chunk| chunk.result_view.clone())
.unwrap_or_else(|| self.persisted_view.clone())
}
pub fn next_chunk_to_commit(&self) -> Result<(ExecutedTrees, Arc<ExecutedChunk>)> {
Ok((
self.persisted_view.clone(),
self.chunks_to_commit
.front()
.ok_or_else(|| anyhow!("Commit queue is empty."))
.map(Arc::clone)?,
))
}
pub fn enqueue(&mut self, chunk: ExecutedChunk) {
self.chunks_to_commit.push_back(Arc::new(chunk))
}
pub fn dequeue(&mut self) -> Result<()> {
let committed_chunk = self
.chunks_to_commit
.pop_front()
.ok_or_else(|| anyhow!("Commit queue is empty."))?;
self.persisted_view = committed_chunk.result_view.clone();
Ok(())
}
}