Skip to main content

hotmint_consensus/
sync.rs

1//! Block sync: allows a node that is behind to catch up by requesting
2//! missing blocks from peers and replaying the commit lifecycle.
3
4use std::cmp;
5
6use ruc::*;
7
8use crate::application::Application;
9use crate::commit;
10use crate::store::BlockStore;
11use hotmint_types::context::BlockContext;
12use hotmint_types::epoch::Epoch;
13use hotmint_types::sync::{
14    ChunkApplyResult, MAX_SYNC_BATCH, SnapshotOfferResult, SyncRequest, SyncResponse,
15};
16use hotmint_types::{Block, BlockHash, Height, ViewNumber};
17use tokio::sync::mpsc;
18use tokio::time::{Duration, timeout};
19use tracing::{info, warn};
20
21const SYNC_TIMEOUT: Duration = Duration::from_secs(10);
22
23/// Mutable state needed by block sync and replay.
24pub struct SyncState<'a> {
25    pub store: &'a mut dyn BlockStore,
26    pub app: &'a dyn Application,
27    pub current_epoch: &'a mut Epoch,
28    pub last_committed_height: &'a mut Height,
29    pub last_app_hash: &'a mut BlockHash,
30    pub chain_id_hash: &'a [u8; 32],
31    /// A-1: Tracks pending epoch transitions across replay batches.
32    pub pending_epoch: &'a mut Option<Epoch>,
33}
34
35/// Run block sync: request missing blocks from peers and replay them.
36///
37/// This should be called **before** the consensus engine starts.
38/// Returns the updated (height, epoch) after syncing.
39pub async fn sync_to_tip(
40    state: &mut SyncState<'_>,
41    request_tx: &mpsc::Sender<SyncRequest>,
42    response_rx: &mut mpsc::Receiver<SyncResponse>,
43) -> Result<()> {
44    // First, get status from peer
45    request_tx
46        .send(SyncRequest::GetStatus)
47        .await
48        .map_err(|_| eg!("sync channel closed"))?;
49
50    let peer_status = match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
51        Ok(Some(SyncResponse::Status {
52            last_committed_height: peer_height,
53            ..
54        })) => peer_height,
55        Ok(Some(SyncResponse::Error(e))) => return Err(eg!("peer error: {}", e)),
56        Ok(Some(SyncResponse::Blocks(_))) => return Err(eg!("unexpected blocks response")),
57        Ok(Some(SyncResponse::Snapshots(_))) => return Err(eg!("unexpected snapshots response")),
58        Ok(Some(SyncResponse::SnapshotChunk { .. })) => {
59            return Err(eg!("unexpected snapshot chunk response"));
60        }
61        Ok(None) => return Err(eg!("sync channel closed")),
62        Err(_) => {
63            info!("sync status request timed out, starting from current state");
64            return Ok(());
65        }
66    };
67
68    if peer_status <= *state.last_committed_height {
69        info!(
70            our_height = state.last_committed_height.as_u64(),
71            peer_height = peer_status.as_u64(),
72            "already caught up"
73        );
74        return Ok(());
75    }
76
77    info!(
78        our_height = state.last_committed_height.as_u64(),
79        peer_height = peer_status.as_u64(),
80        "starting block sync"
81    );
82
83    // A-3: Try snapshot sync first for large gaps. If the peer has snapshots
84    // and the application accepts one, we skip most of the block replay.
85    let gap = peer_status
86        .as_u64()
87        .saturating_sub(state.last_committed_height.as_u64());
88    if gap > MAX_SYNC_BATCH {
89        match sync_via_snapshot(state, request_tx, response_rx).await {
90            Ok(true) => {
91                info!(
92                    height = state.last_committed_height.as_u64(),
93                    "snapshot sync succeeded, continuing with block sync for remaining blocks"
94                );
95                // Fall through to block sync for any blocks after the snapshot
96            }
97            Ok(false) => {
98                info!("no suitable snapshot available, using full block sync");
99            }
100            Err(e) => {
101                warn!(error = %e, "snapshot sync failed, falling back to block sync");
102            }
103        }
104    }
105
106    // If we're caught up after snapshot, return early
107    if *state.last_committed_height >= peer_status {
108        info!(
109            height = state.last_committed_height.as_u64(),
110            "caught up via snapshot"
111        );
112        return Ok(());
113    }
114
115    // Pipelined sync: prefetch the next batch while replaying the current one.
116    // This overlaps network latency with CPU-bound block execution.
117
118    // Request first batch
119    let first_from = Height(state.last_committed_height.as_u64() + 1);
120    let first_to = Height(cmp::min(
121        first_from.as_u64() + MAX_SYNC_BATCH - 1,
122        peer_status.as_u64(),
123    ));
124    request_tx
125        .send(SyncRequest::GetBlocks {
126            from_height: first_from,
127            to_height: first_to,
128        })
129        .await
130        .map_err(|_| eg!("sync channel closed"))?;
131
132    loop {
133        // Wait for current batch
134        let blocks = match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
135            Ok(Some(SyncResponse::Blocks(blocks))) => blocks,
136            Ok(Some(SyncResponse::Error(e))) => return Err(eg!("peer error: {}", e)),
137            Ok(Some(SyncResponse::Status { .. })) => return Err(eg!("unexpected status response")),
138            Ok(Some(SyncResponse::Snapshots(_))) => {
139                return Err(eg!("unexpected snapshots response"));
140            }
141            Ok(Some(SyncResponse::SnapshotChunk { .. })) => {
142                return Err(eg!("unexpected snapshot chunk response"));
143            }
144            Ok(None) => return Err(eg!("sync channel closed")),
145            Err(_) => return Err(eg!("sync request timed out")),
146        };
147
148        if blocks.is_empty() {
149            break;
150        }
151
152        // Prefetch next batch BEFORE replaying current one (pipeline overlap).
153        let synced_through = state.last_committed_height.as_u64() + blocks.len() as u64;
154        let needs_more = synced_through < peer_status.as_u64();
155        if needs_more {
156            let next_from = Height(
157                blocks
158                    .last()
159                    .map(|(b, _)| b.height.as_u64() + 1)
160                    .unwrap_or(first_from.as_u64()),
161            );
162            let next_to = Height(cmp::min(
163                next_from.as_u64() + MAX_SYNC_BATCH - 1,
164                peer_status.as_u64(),
165            ));
166            let _ = request_tx
167                .send(SyncRequest::GetBlocks {
168                    from_height: next_from,
169                    to_height: next_to,
170                })
171                .await;
172        }
173
174        // Replay current batch while the next one is being fetched.
175        // A-1: Propagate pending epoch from replay so it survives across
176        // batches and is available to the caller after sync completes.
177        if let Some(epoch) = replay_blocks(&blocks, state)? {
178            *state.pending_epoch = Some(epoch);
179        }
180
181        info!(
182            synced_to = state.last_committed_height.as_u64(),
183            target = peer_status.as_u64(),
184            "sync progress"
185        );
186
187        if *state.last_committed_height >= peer_status || !needs_more {
188            break;
189        }
190    }
191
192    info!(
193        height = state.last_committed_height.as_u64(),
194        epoch = %state.current_epoch.number,
195        "block sync complete"
196    );
197    Ok(())
198}
199
200/// Query a peer's status via the sync channel. Returns `None` on timeout/error.
201pub async fn query_peer_status(
202    request_tx: &mpsc::Sender<SyncRequest>,
203    response_rx: &mut mpsc::Receiver<SyncResponse>,
204) -> Option<Height> {
205    request_tx.send(SyncRequest::GetStatus).await.ok()?;
206    match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
207        Ok(Some(SyncResponse::Status {
208            last_committed_height,
209            ..
210        })) => Some(last_committed_height),
211        _ => None,
212    }
213}
214
215/// Attempt state sync via snapshots. Returns `Ok(true)` if successful,
216/// `Ok(false)` if no snapshots are available (caller should fall back to block sync).
217pub async fn sync_via_snapshot(
218    state: &mut SyncState<'_>,
219    request_tx: &mpsc::Sender<SyncRequest>,
220    response_rx: &mut mpsc::Receiver<SyncResponse>,
221) -> Result<bool> {
222    // 1. Request snapshot list from peer
223    request_tx
224        .send(SyncRequest::GetSnapshots)
225        .await
226        .map_err(|_| eg!("sync channel closed"))?;
227
228    // 2. Wait for the snapshot list (10s timeout)
229    let snapshots = match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
230        Ok(Some(SyncResponse::Snapshots(list))) => list,
231        Ok(Some(SyncResponse::Error(e))) => return Err(eg!("peer error: {}", e)),
232        Ok(Some(_)) => return Err(eg!("unexpected response to GetSnapshots")),
233        Ok(None) => return Err(eg!("sync channel closed")),
234        Err(_) => {
235            info!("snapshot list request timed out");
236            return Ok(false);
237        }
238    };
239
240    // 3. If no snapshots available, fall back to block sync
241    if snapshots.is_empty() {
242        info!("peer has no snapshots, falling back to block sync");
243        return Ok(false);
244    }
245
246    // 4. Pick the latest snapshot (highest height)
247    let snapshot = snapshots
248        .iter()
249        .max_by_key(|s| s.height.as_u64())
250        .unwrap()
251        .clone();
252
253    // Skip if we're already at or past this height
254    if snapshot.height <= *state.last_committed_height {
255        info!(
256            snapshot_height = snapshot.height.as_u64(),
257            our_height = state.last_committed_height.as_u64(),
258            "snapshot not ahead of our state, falling back to block sync"
259        );
260        return Ok(false);
261    }
262
263    info!(
264        snapshot_height = snapshot.height.as_u64(),
265        chunks = snapshot.chunks,
266        "offering snapshot to application"
267    );
268
269    // 5. Offer the snapshot to the application
270    let offer_result = state.app.offer_snapshot(&snapshot);
271    match offer_result {
272        SnapshotOfferResult::Accept => {}
273        SnapshotOfferResult::Reject => {
274            info!("application rejected snapshot, falling back to block sync");
275            return Ok(false);
276        }
277        SnapshotOfferResult::Abort => {
278            return Err(eg!("application aborted snapshot sync"));
279        }
280    }
281
282    // 6. Download and apply chunks one by one
283    for chunk_index in 0..snapshot.chunks {
284        // Request chunk from peer
285        request_tx
286            .send(SyncRequest::GetSnapshotChunk {
287                height: snapshot.height,
288                chunk_index,
289            })
290            .await
291            .map_err(|_| eg!("sync channel closed"))?;
292
293        // Wait for chunk response
294        let chunk_data = match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
295            Ok(Some(SyncResponse::SnapshotChunk { data, .. })) => data,
296            Ok(Some(SyncResponse::Error(e))) => return Err(eg!("peer error: {}", e)),
297            Ok(Some(_)) => return Err(eg!("unexpected response to GetSnapshotChunk")),
298            Ok(None) => return Err(eg!("sync channel closed")),
299            Err(_) => return Err(eg!("snapshot chunk request timed out")),
300        };
301
302        // Apply chunk to the application
303        let apply_result = state.app.apply_snapshot_chunk(chunk_data, chunk_index);
304        match apply_result {
305            ChunkApplyResult::Accept => {
306                info!(
307                    chunk = chunk_index,
308                    total = snapshot.chunks,
309                    "applied snapshot chunk"
310                );
311            }
312            ChunkApplyResult::Retry => {
313                // For now, treat retry as a fatal error; a more sophisticated
314                // implementation could retry the chunk download.
315                warn!(
316                    chunk = chunk_index,
317                    "application requested chunk retry — aborting snapshot sync"
318                );
319                return Err(eg!(
320                    "snapshot chunk {} apply requested retry (not yet supported)",
321                    chunk_index
322                ));
323            }
324            ChunkApplyResult::Abort => {
325                return Err(eg!(
326                    "application aborted snapshot sync at chunk {}",
327                    chunk_index
328                ));
329            }
330        }
331    }
332
333    // 7. Verify snapshot trust anchor (A-3): fetch the block+QC at the
334    // snapshot height and verify the QC with full aggregate-signature and
335    // quorum checks — identical to the replay path.
336    request_tx
337        .send(SyncRequest::GetBlocks {
338            from_height: snapshot.height,
339            to_height: snapshot.height,
340        })
341        .await
342        .map_err(|_| eg!("sync channel closed"))?;
343
344    let anchor_block = match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
345        Ok(Some(SyncResponse::Blocks(blocks))) if !blocks.is_empty() => blocks,
346        Ok(Some(SyncResponse::Error(e))) => return Err(eg!("peer error fetching anchor: {}", e)),
347        Ok(Some(_)) => return Err(eg!("unexpected response fetching snapshot anchor block")),
348        Ok(None) => return Err(eg!("sync channel closed")),
349        Err(_) => return Err(eg!("snapshot anchor block request timed out")),
350    };
351
352    let (block, qc_opt) = &anchor_block[0];
353    let qc = qc_opt
354        .as_ref()
355        .ok_or_else(|| eg!("snapshot anchor block has no QC"))?;
356
357    // Verify QC signs this block
358    if qc.block_hash != block.hash {
359        return Err(eg!(
360            "snapshot anchor QC block_hash {} != block hash {}",
361            qc.block_hash,
362            block.hash
363        ));
364    }
365
366    // Full QC aggregate-signature verification (same as replay_blocks)
367    let verifier = hotmint_crypto::Ed25519Verifier;
368    let qc_bytes = hotmint_types::vote::Vote::signing_bytes(
369        state.chain_id_hash,
370        qc.epoch,
371        qc.view,
372        &qc.block_hash,
373        hotmint_types::vote::VoteType::Vote,
374    );
375    if !hotmint_types::Verifier::verify_aggregate(
376        &verifier,
377        &state.current_epoch.validator_set,
378        &qc_bytes,
379        &qc.aggregate_signature,
380    ) {
381        return Err(eg!(
382            "snapshot anchor QC signature verification failed at height {}",
383            snapshot.height.as_u64()
384        ));
385    }
386    if !hotmint_crypto::has_quorum(&state.current_epoch.validator_set, &qc.aggregate_signature) {
387        return Err(eg!(
388            "snapshot anchor QC below quorum threshold at height {}",
389            snapshot.height.as_u64()
390        ));
391    }
392
393    // 8. Update state to reflect the snapshot height.
394    //
395    // Semantics: block.app_hash records the state BEFORE executing this
396    // block (set during propose as state.last_app_hash).  After snapshot
397    // restore the application holds the state AFTER executing block H.
398    // Query the application for the authoritative post-restore app_hash.
399    *state.last_committed_height = snapshot.height;
400    let app_info = state.app.info();
401    if app_info.last_block_height == snapshot.height
402        && app_info.last_block_app_hash != BlockHash::GENESIS
403    {
404        *state.last_app_hash = app_info.last_block_app_hash;
405    } else {
406        // Fallback: the app doesn't track app_hash or didn't update yet.
407        // Use GENESIS so that the next replayed block's app_hash check is
408        // skipped for non-tracking apps (tracks_app_hash() == false).
409        *state.last_app_hash = BlockHash::GENESIS;
410    }
411
412    info!(height = snapshot.height.as_u64(), "snapshot sync complete");
413    Ok(true)
414}
415
416/// Replay a batch of blocks: store them and run the application lifecycle.
417/// Validates chain continuity (parent_hash linkage).
418/// Replay synced blocks and return any pending epoch that hasn't reached its
419/// `start_view` yet. The caller should store this as `pending_epoch` in the
420/// engine so the epoch transition fires at the correct view.
421pub fn replay_blocks(
422    blocks: &[(Block, Option<hotmint_types::QuorumCertificate>)],
423    state: &mut SyncState<'_>,
424) -> Result<Option<Epoch>> {
425    // H-7: Track pending epoch separately, matching the runtime's deferred
426    // activation semantics. The new epoch only takes effect when we reach
427    // its start_view, preventing validator set mismatches during replay.
428    let mut pending_epoch: Option<Epoch> = None;
429
430    for (i, (block, qc)) in blocks.iter().enumerate() {
431        // H-7: Apply pending epoch transition at exactly start_view, matching
432        // the engine's advance_view_to behavior.
433        if let Some(ref ep) = pending_epoch
434            && block.view >= ep.start_view
435        {
436            *state.current_epoch = pending_epoch.take().unwrap();
437        }
438        // Validate chain continuity
439        if i > 0 && block.parent_hash != blocks[i - 1].0.hash {
440            return Err(eg!(
441                "chain discontinuity at height {}: expected parent {}, got {}",
442                block.height.as_u64(),
443                blocks[i - 1].0.hash,
444                block.parent_hash
445            ));
446        }
447        // F-06: Validate first block links to our last committed block
448        if i == 0
449            && state.last_committed_height.as_u64() > 0
450            && let Some(last) = state
451                .store
452                .get_block_by_height(*state.last_committed_height)
453            && block.parent_hash != last.hash
454        {
455            return Err(eg!(
456                "sync batch first block parent {} does not match last committed block {} at height {}",
457                block.parent_hash,
458                last.hash,
459                state.last_committed_height
460            ));
461        }
462
463        // Verify commit QC if present (non-genesis blocks should have one)
464        if let Some(cert) = qc {
465            if cert.block_hash != block.hash {
466                return Err(eg!(
467                    "sync QC block_hash mismatch at height {}: QC={} block={}",
468                    block.height.as_u64(),
469                    cert.block_hash,
470                    block.hash
471                ));
472            }
473            // Verify QC aggregate signature and quorum
474            let verifier = hotmint_crypto::Ed25519Verifier;
475            let qc_bytes = hotmint_types::vote::Vote::signing_bytes(
476                state.chain_id_hash,
477                cert.epoch,
478                cert.view,
479                &cert.block_hash,
480                hotmint_types::vote::VoteType::Vote,
481            );
482            if !hotmint_types::Verifier::verify_aggregate(
483                &verifier,
484                &state.current_epoch.validator_set,
485                &qc_bytes,
486                &cert.aggregate_signature,
487            ) {
488                return Err(eg!(
489                    "sync QC signature verification failed at height {}",
490                    block.height.as_u64()
491                ));
492            }
493            if !hotmint_crypto::has_quorum(
494                &state.current_epoch.validator_set,
495                &cert.aggregate_signature,
496            ) {
497                return Err(eg!(
498                    "sync QC below quorum threshold at height {}",
499                    block.height.as_u64()
500                ));
501            }
502        } else if block.height.as_u64() > 0 {
503            // Non-genesis blocks MUST have a commit QC — without one, the block
504            // has not been certified by a 2/3 quorum and must be rejected.
505            return Err(eg!(
506                "sync block at height {} missing commit QC — refusing unverified block",
507                block.height.as_u64()
508            ));
509        }
510
511        // Skip already-committed blocks
512        if block.height <= *state.last_committed_height {
513            continue;
514        }
515
516        // Defense-in-depth: verify the proposer is the correct leader for this view.
517        // The QC already proves 2f+1 honest validators accepted this block (and they
518        // checked the proposer), but we re-check here to catch corrupted sync data.
519        if let Some(expected_leader) = state
520            .current_epoch
521            .validator_set
522            .leader_for_view(block.view)
523            && block.proposer != expected_leader.id
524        {
525            return Err(eg!(
526                "sync block at height {} has wrong proposer: expected {} for view {}, got {}",
527                block.height.as_u64(),
528                expected_leader.id,
529                block.view,
530                block.proposer
531            ));
532        }
533
534        // Verify block hash integrity (includes app_hash in computation)
535        let expected_hash = hotmint_crypto::compute_block_hash(block);
536        if block.hash != expected_hash {
537            return Err(eg!(
538                "sync block hash mismatch at height {}: declared {} != computed {}",
539                block.height.as_u64(),
540                block.hash,
541                expected_hash
542            ));
543        }
544
545        // Verify app_hash matches local application state.
546        // Skip when the application does not track state roots (e.g. NoopApplication),
547        // so that fullnodes without an ABCI backend can sync from peers running real
548        // applications that produce non-zero app_hash values.
549        if state.app.tracks_app_hash() && block.app_hash != *state.last_app_hash {
550            return Err(eg!(
551                "sync block app_hash mismatch at height {}: block {} != local {}",
552                block.height.as_u64(),
553                block.app_hash,
554                state.last_app_hash
555            ));
556        }
557
558        // Store the block and its commit QC (H-12: so the node can serve
559        // commit proofs to other syncing peers and light clients).
560        state.store.put_block(block.clone());
561        if let Some(commit_qc) = qc {
562            state.store.put_commit_qc(block.height, commit_qc.clone());
563        }
564
565        // Run application lifecycle
566        let ctx = BlockContext {
567            height: block.height,
568            view: block.view,
569            proposer: block.proposer,
570            epoch: state.current_epoch.number,
571            epoch_start_view: state.current_epoch.start_view,
572            validator_set: &state.current_epoch.validator_set,
573            timestamp: block.timestamp,
574            vote_extensions: vec![],
575        };
576
577        if !state.app.validate_block(block, &ctx) {
578            return Err(eg!(
579                "app rejected synced block at height {}",
580                block.height.as_u64()
581            ));
582        }
583
584        let txs = commit::decode_payload(&block.payload);
585        let response = state
586            .app
587            .execute_block(&txs, &ctx)
588            .c(d!("execute_block failed during sync"))?;
589
590        state
591            .app
592            .on_commit(block, &ctx)
593            .c(d!("on_commit failed during sync"))?;
594
595        *state.last_app_hash = if state.app.tracks_app_hash() {
596            response.app_hash
597        } else {
598            // App does not compute state roots: carry the chain's authoritative
599            // value forward so that the continuity check stays coherent.
600            block.app_hash
601        };
602
603        // Handle epoch transitions — defer to start_view (H-7)
604        if !response.validator_updates.is_empty() {
605            let new_vs = state
606                .current_epoch
607                .validator_set
608                .apply_updates(&response.validator_updates);
609            let epoch_start = ViewNumber(block.view.as_u64() + 2);
610            pending_epoch = Some(Epoch::new(
611                state.current_epoch.number.next(),
612                epoch_start,
613                new_vs,
614            ));
615        }
616
617        *state.last_committed_height = block.height;
618    }
619
620    // If the pending epoch's start_view was reached by the last block, apply it.
621    // Otherwise, return it so the caller (engine) can defer activation correctly.
622    if let Some(ref ep) = pending_epoch
623        && let Some((last_block, _)) = blocks.last()
624        && last_block.view >= ep.start_view
625    {
626        *state.current_epoch = pending_epoch.take().unwrap();
627    }
628
629    // Return any still-pending epoch for the caller to handle.
630    Ok(pending_epoch)
631}
632
633#[cfg(test)]
634mod tests {
635    use super::*;
636    use crate::application::NoopApplication;
637    use crate::store::MemoryBlockStore;
638    use hotmint_types::epoch::EpochNumber;
639    use hotmint_types::{BlockHash, QuorumCertificate, ValidatorId, ViewNumber};
640
641    const TEST_CHAIN: [u8; 32] = [0u8; 32];
642
643    fn make_qc(block: &Block, signer: &hotmint_crypto::Ed25519Signer) -> QuorumCertificate {
644        let vote_bytes = hotmint_types::vote::Vote::signing_bytes(
645            &TEST_CHAIN,
646            EpochNumber(0),
647            block.view,
648            &block.hash,
649            hotmint_types::vote::VoteType::Vote,
650        );
651        let sig = hotmint_types::Signer::sign(signer, &vote_bytes);
652        let mut agg = hotmint_types::AggregateSignature::new(1);
653        agg.add(0, sig).unwrap();
654        QuorumCertificate {
655            block_hash: block.hash,
656            view: block.view,
657            aggregate_signature: agg,
658            epoch: EpochNumber(0),
659        }
660    }
661
662    fn make_block(height: u64, parent: BlockHash) -> Block {
663        let mut block = Block {
664            height: Height(height),
665            parent_hash: parent,
666            view: ViewNumber(height),
667            proposer: ValidatorId(0),
668            timestamp: 0,
669            payload: vec![],
670            app_hash: BlockHash::GENESIS,
671            evidence: Vec::new(),
672            hash: BlockHash::GENESIS, // placeholder
673        };
674        block.hash = hotmint_crypto::compute_block_hash(&block);
675        block
676    }
677
678    #[test]
679    fn test_replay_blocks_valid_chain() {
680        let mut store = MemoryBlockStore::new();
681        let app = NoopApplication;
682        let signer = hotmint_crypto::Ed25519Signer::generate(ValidatorId(0));
683        let vs = hotmint_types::ValidatorSet::new(vec![hotmint_types::ValidatorInfo {
684            id: ValidatorId(0),
685            public_key: hotmint_types::Signer::public_key(&signer),
686            power: 1,
687        }]);
688        let mut epoch = Epoch::genesis(vs);
689        let mut height = Height::GENESIS;
690
691        let b1 = make_block(1, BlockHash::GENESIS);
692        let b2 = make_block(2, b1.hash);
693        let b3 = make_block(3, b2.hash);
694
695        let qc1 = make_qc(&b1, &signer);
696        let qc2 = make_qc(&b2, &signer);
697        let qc3 = make_qc(&b3, &signer);
698
699        let blocks: Vec<_> = vec![(b1, Some(qc1)), (b2, Some(qc2)), (b3, Some(qc3))];
700        let mut app_hash = BlockHash::GENESIS;
701        let mut pending_epoch = None;
702        let mut state = SyncState {
703            store: &mut store,
704            app: &app,
705            current_epoch: &mut epoch,
706            last_committed_height: &mut height,
707            last_app_hash: &mut app_hash,
708            chain_id_hash: &TEST_CHAIN,
709            pending_epoch: &mut pending_epoch,
710        };
711        replay_blocks(&blocks, &mut state).unwrap();
712        assert_eq!(height, Height(3));
713        assert!(store.get_block_by_height(Height(1)).is_some());
714        assert!(store.get_block_by_height(Height(3)).is_some());
715    }
716
717    #[test]
718    fn test_replay_blocks_rejects_missing_qc() {
719        let mut store = MemoryBlockStore::new();
720        let app = NoopApplication;
721        let signer = hotmint_crypto::Ed25519Signer::generate(ValidatorId(0));
722        let vs = hotmint_types::ValidatorSet::new(vec![hotmint_types::ValidatorInfo {
723            id: ValidatorId(0),
724            public_key: hotmint_types::Signer::public_key(&signer),
725            power: 1,
726        }]);
727        let mut epoch = Epoch::genesis(vs);
728        let mut height = Height::GENESIS;
729
730        let b1 = make_block(1, BlockHash::GENESIS);
731        let qc1 = make_qc(&b1, &signer);
732        let b2 = make_block(2, b1.hash);
733        // Non-genesis block without QC should be rejected
734        let blocks: Vec<_> = vec![(b1, Some(qc1)), (b2, None)];
735        let mut app_hash = BlockHash::GENESIS;
736        let mut pending_epoch = None;
737        let mut state = SyncState {
738            store: &mut store,
739            app: &app,
740            current_epoch: &mut epoch,
741            last_committed_height: &mut height,
742            last_app_hash: &mut app_hash,
743            chain_id_hash: &TEST_CHAIN,
744            pending_epoch: &mut pending_epoch,
745        };
746        assert!(replay_blocks(&blocks, &mut state).is_err());
747    }
748
749    #[test]
750    fn test_replay_blocks_broken_chain() {
751        let mut store = MemoryBlockStore::new();
752        let app = NoopApplication;
753        let signer = hotmint_crypto::Ed25519Signer::generate(ValidatorId(0));
754        let vs = hotmint_types::ValidatorSet::new(vec![hotmint_types::ValidatorInfo {
755            id: ValidatorId(0),
756            public_key: hotmint_types::Signer::public_key(&signer),
757            power: 1,
758        }]);
759        let mut epoch = Epoch::genesis(vs);
760        let mut height = Height::GENESIS;
761
762        let b1 = make_block(1, BlockHash::GENESIS);
763        let b3 = make_block(3, BlockHash([99u8; 32])); // wrong parent
764
765        let qc1 = make_qc(&b1, &signer);
766        let qc3 = make_qc(&b3, &signer);
767        let blocks: Vec<_> = vec![(b1, Some(qc1)), (b3, Some(qc3))];
768        let mut app_hash = BlockHash::GENESIS;
769        let mut pending_epoch = None;
770        let mut state = SyncState {
771            store: &mut store,
772            app: &app,
773            current_epoch: &mut epoch,
774            last_committed_height: &mut height,
775            last_app_hash: &mut app_hash,
776            chain_id_hash: &TEST_CHAIN,
777            pending_epoch: &mut pending_epoch,
778        };
779        assert!(replay_blocks(&blocks, &mut state).is_err());
780    }
781}