1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// Copyright (c) Aptos
// SPDX-License-Identifier: Apache-2.0

#![forbid(unsafe_code)]

use anyhow::{anyhow, Result};

use executor_types::{in_memory_state_calculator::IntoLedgerView, ExecutedChunk};
use std::{collections::VecDeque, sync::Arc};
use storage_interface::{DbReader, ExecutedTrees};

pub struct ChunkCommitQueue {
    persisted_view: ExecutedTrees,
    chunks_to_commit: VecDeque<Arc<ExecutedChunk>>,
}

impl ChunkCommitQueue {
    pub fn new_from_db(db: &Arc<dyn DbReader>) -> Result<Self> {
        let persisted_view = db
            .get_startup_info()?
            .ok_or_else(|| anyhow!("DB not bootstrapped."))?
            .into_latest_tree_state()
            .into_ledger_view(db)?;
        Ok(Self::new(persisted_view))
    }

    pub fn new(persisted_view: ExecutedTrees) -> Self {
        Self {
            persisted_view,
            chunks_to_commit: VecDeque::new(),
        }
    }

    pub fn persisted_and_latest_view(&self) -> (ExecutedTrees, ExecutedTrees) {
        (self.persisted_view.clone(), self.latest_view())
    }

    pub fn latest_view(&self) -> ExecutedTrees {
        self.chunks_to_commit
            .back()
            .map(|chunk| chunk.result_view.clone())
            .unwrap_or_else(|| self.persisted_view.clone())
    }

    pub fn next_chunk_to_commit(&self) -> Result<(ExecutedTrees, Arc<ExecutedChunk>)> {
        Ok((
            self.persisted_view.clone(),
            self.chunks_to_commit
                .front()
                .ok_or_else(|| anyhow!("Commit queue is empty."))
                .map(Arc::clone)?,
        ))
    }

    pub fn enqueue(&mut self, chunk: ExecutedChunk) {
        self.chunks_to_commit.push_back(Arc::new(chunk))
    }

    pub fn dequeue(&mut self) -> Result<()> {
        let committed_chunk = self
            .chunks_to_commit
            .pop_front()
            .ok_or_else(|| anyhow!("Commit queue is empty."))?;
        self.persisted_view = committed_chunk.result_view.clone();
        Ok(())
    }
}