Skip to main content

hotmint_consensus/
commit.rs

1use ruc::*;
2
3use crate::application::Application;
4use crate::store::BlockStore;
5use hotmint_types::context::BlockContext;
6use hotmint_types::epoch::Epoch;
7use hotmint_types::{Block, BlockHash, DoubleCertificate, EndBlockResponse, Height, ViewNumber};
8use tracing::info;
9
10/// Result of a commit operation
11pub struct CommitResult {
12    pub committed_blocks: Vec<Block>,
13    /// The QC that certified the committed block (for sync protocol).
14    pub commit_qc: hotmint_types::QuorumCertificate,
15    /// If an epoch transition was triggered by end_block, the new epoch (start_view is placeholder)
16    pub pending_epoch: Option<Epoch>,
17    /// Application state root after executing the last committed block.
18    pub last_app_hash: BlockHash,
19    /// EndBlockResponse for each committed block (same order as committed_blocks).
20    pub block_responses: Vec<EndBlockResponse>,
21}
22
23/// Decode length-prefixed transactions from a block payload.
24pub fn decode_payload(payload: &[u8]) -> Vec<&[u8]> {
25    let mut txs = Vec::new();
26    let mut offset = 0;
27    while offset + 4 <= payload.len() {
28        let len = u32::from_le_bytes(payload[offset..offset + 4].try_into().unwrap()) as usize;
29        offset += 4;
30        if offset + len > payload.len() {
31            tracing::warn!(
32                offset,
33                claimed_len = len,
34                payload_len = payload.len(),
35                "decode_payload: length prefix exceeds remaining bytes, truncating"
36            );
37            break;
38        }
39        txs.push(&payload[offset..offset + len]);
40        offset += len;
41    }
42    txs
43}
44
45/// Execute the two-chain commit rule:
46/// When we get C_v(C_v(B_k)), commit the inner QC's block and all uncommitted ancestors.
47///
48/// For each committed block, runs the full application lifecycle:
49/// begin_block → deliver_tx (×N) → end_block → on_commit
50///
51/// # Safety
52/// Caller MUST verify both inner_qc and outer_qc aggregate signatures
53/// and quorum counts before calling this function. This function trusts
54/// the DoubleCertificate completely and performs no cryptographic checks.
55pub fn try_commit(
56    double_cert: &DoubleCertificate,
57    store: &dyn BlockStore,
58    app: &dyn Application,
59    last_committed_height: &mut Height,
60    current_epoch: &Epoch,
61) -> Result<CommitResult> {
62    let commit_hash = double_cert.inner_qc.block_hash;
63    let commit_block = store
64        .get_block(&commit_hash)
65        .c(d!("block to commit not found"))?;
66
67    if commit_block.height <= *last_committed_height {
68        return Ok(CommitResult {
69            committed_blocks: vec![],
70            commit_qc: double_cert.inner_qc.clone(),
71            pending_epoch: None,
72            last_app_hash: BlockHash::GENESIS,
73            block_responses: vec![],
74        });
75    }
76
77    // Collect all uncommitted ancestors (from highest to lowest)
78    let mut to_commit = Vec::new();
79    let mut current = commit_block;
80    loop {
81        if current.height <= *last_committed_height {
82            break;
83        }
84        let parent_hash = current.parent_hash;
85        let current_height = current.height;
86        to_commit.push(current);
87        if parent_hash == BlockHash::GENESIS {
88            break;
89        }
90        match store.get_block(&parent_hash) {
91            Some(parent) => current = parent,
92            None => {
93                // If the missing ancestor is above last committed + 1, the store
94                // is corrupt or incomplete — we must not silently skip blocks.
95                if current_height > Height(last_committed_height.as_u64() + 1) {
96                    return Err(eg!(
97                        "missing ancestor block {} for height {} (last committed: {})",
98                        parent_hash,
99                        current_height,
100                        last_committed_height
101                    ));
102                }
103                break;
104            }
105        }
106    }
107
108    // Commit from lowest height to highest
109    to_commit.reverse();
110
111    let mut pending_epoch: Option<Epoch> = None;
112    let mut last_app_hash = BlockHash::GENESIS;
113    let mut block_responses = Vec::with_capacity(to_commit.len());
114
115    for block in &to_commit {
116        let ctx = BlockContext {
117            height: block.height,
118            view: block.view,
119            proposer: block.proposer,
120            epoch: current_epoch.number,
121            epoch_start_view: current_epoch.start_view,
122            validator_set: &current_epoch.validator_set,
123            timestamp: block.timestamp,
124            vote_extensions: vec![],
125        };
126
127        info!(height = block.height.as_u64(), hash = %block.hash, "committing block");
128
129        let txs = decode_payload(&block.payload);
130        // A committed block MUST be executed successfully. If the application
131        // returns an error here, the node's state is irrecoverably corrupted
132        // (partial batch commit). Panicking causes a restart from persistent
133        // state, which is safer than continuing with a diverged app_hash.
134        let response = app.execute_block(&txs, &ctx).unwrap_or_else(|e| {
135            panic!(
136                "FATAL: execute_block failed for committed block height={} hash={}: {:?}. \
137                 Node state is corrupt; restart from last committed height.",
138                block.height, block.hash, e
139            )
140        });
141
142        app.on_commit(block, &ctx).unwrap_or_else(|e| {
143            panic!(
144                "FATAL: on_commit failed for committed block height={} hash={}: {:?}. \
145                 Node state is corrupt; restart from last committed height.",
146                block.height, block.hash, e
147            )
148        });
149
150        // Process embedded evidence — notify the application layer for each
151        // proof so it can apply slashing deterministically (C-3).
152        for proof in &block.evidence {
153            if let Err(e) = app.on_evidence(proof) {
154                tracing::warn!(
155                    validator = %proof.validator,
156                    error = %e,
157                    "on_evidence failed for embedded proof"
158                );
159            }
160        }
161
162        // When the application does not track state roots, carry the block's
163        // authoritative app_hash forward so the engine state stays coherent
164        // with the chain even when NoopApplication always returns GENESIS.
165        last_app_hash = if app.tracks_app_hash() {
166            response.app_hash
167        } else {
168            block.app_hash
169        };
170
171        if !response.validator_updates.is_empty() {
172            // Chain epoch transitions: if a prior block in this batch already
173            // produced a pending epoch, apply the new updates on top of that
174            // intermediate validator set rather than the original epoch's set.
175            let base_vs = if let Some(ref ep) = pending_epoch {
176                &ep.validator_set
177            } else {
178                &current_epoch.validator_set
179            };
180            let base_num = if let Some(ref ep) = pending_epoch {
181                ep.number
182            } else {
183                current_epoch.number
184            };
185            let new_vs = base_vs.apply_updates(&response.validator_updates);
186            let epoch_start = ViewNumber(block.view.as_u64() + 2);
187            pending_epoch = Some(Epoch::new(base_num.next(), epoch_start, new_vs));
188        }
189
190        block_responses.push(response);
191        *last_committed_height = block.height;
192    }
193
194    Ok(CommitResult {
195        committed_blocks: to_commit,
196        commit_qc: double_cert.inner_qc.clone(),
197        pending_epoch,
198        last_app_hash,
199        block_responses,
200    })
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206    use crate::application::NoopApplication;
207    use crate::store::MemoryBlockStore;
208    use hotmint_types::crypto::PublicKey;
209    use hotmint_types::epoch::EpochNumber;
210    use hotmint_types::validator::{ValidatorInfo, ValidatorSet};
211    use hotmint_types::{AggregateSignature, QuorumCertificate, ValidatorId, ViewNumber};
212
213    fn make_block(height: u64, parent: BlockHash) -> Block {
214        let hash = BlockHash([height as u8; 32]);
215        Block {
216            height: Height(height),
217            parent_hash: parent,
218            view: ViewNumber(height),
219            proposer: ValidatorId(0),
220            timestamp: 0,
221            payload: vec![],
222            app_hash: BlockHash::GENESIS,
223            evidence: Vec::new(),
224            hash,
225        }
226    }
227
228    fn make_qc(hash: BlockHash, view: u64) -> QuorumCertificate {
229        QuorumCertificate {
230            block_hash: hash,
231            view: ViewNumber(view),
232            aggregate_signature: AggregateSignature::new(4),
233            epoch: EpochNumber(0),
234        }
235    }
236
237    fn make_epoch() -> Epoch {
238        let vs = ValidatorSet::new(vec![ValidatorInfo {
239            id: ValidatorId(0),
240            public_key: PublicKey(vec![0]),
241            power: 1,
242        }]);
243        Epoch::genesis(vs)
244    }
245
246    #[test]
247    fn test_commit_single_block() {
248        let mut store = MemoryBlockStore::new();
249        let app = NoopApplication;
250        let epoch = make_epoch();
251        let b1 = make_block(1, BlockHash::GENESIS);
252        store.put_block(b1.clone());
253
254        let dc = DoubleCertificate {
255            vote_extensions: vec![],
256            inner_qc: make_qc(b1.hash, 1),
257            outer_qc: make_qc(b1.hash, 1),
258        };
259
260        let mut last = Height::GENESIS;
261        let result = try_commit(&dc, &store, &app, &mut last, &epoch).unwrap();
262        assert_eq!(result.committed_blocks.len(), 1);
263        assert_eq!(result.committed_blocks[0].height, Height(1));
264        assert_eq!(last, Height(1));
265        assert!(result.pending_epoch.is_none());
266    }
267
268    #[test]
269    fn test_commit_chain_of_blocks() {
270        let mut store = MemoryBlockStore::new();
271        let app = NoopApplication;
272        let epoch = make_epoch();
273        let b1 = make_block(1, BlockHash::GENESIS);
274        let b2 = make_block(2, b1.hash);
275        let b3 = make_block(3, b2.hash);
276        store.put_block(b1);
277        store.put_block(b2);
278        store.put_block(b3.clone());
279
280        let dc = DoubleCertificate {
281            vote_extensions: vec![],
282            inner_qc: make_qc(b3.hash, 3),
283            outer_qc: make_qc(b3.hash, 3),
284        };
285
286        let mut last = Height::GENESIS;
287        let result = try_commit(&dc, &store, &app, &mut last, &epoch).unwrap();
288        assert_eq!(result.committed_blocks.len(), 3);
289        assert_eq!(result.committed_blocks[0].height, Height(1));
290        assert_eq!(result.committed_blocks[1].height, Height(2));
291        assert_eq!(result.committed_blocks[2].height, Height(3));
292        assert_eq!(last, Height(3));
293    }
294
295    #[test]
296    fn test_commit_already_committed() {
297        let mut store = MemoryBlockStore::new();
298        let app = NoopApplication;
299        let epoch = make_epoch();
300        let b1 = make_block(1, BlockHash::GENESIS);
301        store.put_block(b1.clone());
302
303        let dc = DoubleCertificate {
304            vote_extensions: vec![],
305            inner_qc: make_qc(b1.hash, 1),
306            outer_qc: make_qc(b1.hash, 1),
307        };
308
309        let mut last = Height(1);
310        let result = try_commit(&dc, &store, &app, &mut last, &epoch).unwrap();
311        assert!(result.committed_blocks.is_empty());
312    }
313
314    #[test]
315    fn test_commit_partial_chain() {
316        let mut store = MemoryBlockStore::new();
317        let app = NoopApplication;
318        let epoch = make_epoch();
319        let b1 = make_block(1, BlockHash::GENESIS);
320        let b2 = make_block(2, b1.hash);
321        let b3 = make_block(3, b2.hash);
322        store.put_block(b1);
323        store.put_block(b2);
324        store.put_block(b3.clone());
325
326        let dc = DoubleCertificate {
327            vote_extensions: vec![],
328            inner_qc: make_qc(b3.hash, 3),
329            outer_qc: make_qc(b3.hash, 3),
330        };
331
332        let mut last = Height(1);
333        let result = try_commit(&dc, &store, &app, &mut last, &epoch).unwrap();
334        assert_eq!(result.committed_blocks.len(), 2);
335        assert_eq!(result.committed_blocks[0].height, Height(2));
336        assert_eq!(result.committed_blocks[1].height, Height(3));
337    }
338
339    #[test]
340    fn test_commit_missing_block() {
341        let store = MemoryBlockStore::new();
342        let app = NoopApplication;
343        let epoch = make_epoch();
344        let dc = DoubleCertificate {
345            vote_extensions: vec![],
346            inner_qc: make_qc(BlockHash([99u8; 32]), 1),
347            outer_qc: make_qc(BlockHash([99u8; 32]), 1),
348        };
349        let mut last = Height::GENESIS;
350        assert!(try_commit(&dc, &store, &app, &mut last, &epoch).is_err());
351    }
352
353    #[test]
354    fn test_decode_payload_empty() {
355        assert!(decode_payload(&[]).is_empty());
356    }
357
358    #[test]
359    fn test_decode_payload_roundtrip() {
360        // Encode: 4-byte LE length prefix + data
361        let mut payload = Vec::new();
362        let tx1 = b"hello";
363        let tx2 = b"world";
364        payload.extend_from_slice(&(tx1.len() as u32).to_le_bytes());
365        payload.extend_from_slice(tx1);
366        payload.extend_from_slice(&(tx2.len() as u32).to_le_bytes());
367        payload.extend_from_slice(tx2);
368
369        let txs = decode_payload(&payload);
370        assert_eq!(txs.len(), 2);
371        assert_eq!(txs[0], b"hello");
372        assert_eq!(txs[1], b"world");
373    }
374}