Skip to main content

hotmint_consensus/
sync.rs

1//! Block sync: allows a node that is behind to catch up by requesting
2//! missing blocks from peers and replaying the commit lifecycle.
3
4use std::cmp;
5
6use ruc::*;
7
8use crate::application::Application;
9use crate::commit;
10use crate::store::BlockStore;
11use hotmint_types::context::BlockContext;
12use hotmint_types::epoch::Epoch;
13use hotmint_types::sync::{
14    ChunkApplyResult, MAX_SYNC_BATCH, SnapshotOfferResult, SyncRequest, SyncResponse,
15};
16use hotmint_types::{Block, BlockHash, Height, ViewNumber};
17use tokio::sync::mpsc;
18use tokio::time::{Duration, timeout};
19use tracing::{info, warn};
20
21const SYNC_TIMEOUT: Duration = Duration::from_secs(10);
22
23/// Mutable state needed by block sync and replay.
24pub struct SyncState<'a> {
25    pub store: &'a mut dyn BlockStore,
26    pub app: &'a dyn Application,
27    pub current_epoch: &'a mut Epoch,
28    pub last_committed_height: &'a mut Height,
29    pub last_app_hash: &'a mut BlockHash,
30    pub chain_id_hash: &'a [u8; 32],
31}
32
33/// Run block sync: request missing blocks from peers and replay them.
34///
35/// This should be called **before** the consensus engine starts.
36/// Returns the updated (height, epoch) after syncing.
37pub async fn sync_to_tip(
38    state: &mut SyncState<'_>,
39    request_tx: &mpsc::Sender<SyncRequest>,
40    response_rx: &mut mpsc::Receiver<SyncResponse>,
41) -> Result<()> {
42    // First, get status from peer
43    request_tx
44        .send(SyncRequest::GetStatus)
45        .await
46        .map_err(|_| eg!("sync channel closed"))?;
47
48    let peer_status = match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
49        Ok(Some(SyncResponse::Status {
50            last_committed_height: peer_height,
51            ..
52        })) => peer_height,
53        Ok(Some(SyncResponse::Error(e))) => return Err(eg!("peer error: {}", e)),
54        Ok(Some(SyncResponse::Blocks(_))) => return Err(eg!("unexpected blocks response")),
55        Ok(Some(SyncResponse::Snapshots(_))) => return Err(eg!("unexpected snapshots response")),
56        Ok(Some(SyncResponse::SnapshotChunk { .. })) => {
57            return Err(eg!("unexpected snapshot chunk response"));
58        }
59        Ok(None) => return Err(eg!("sync channel closed")),
60        Err(_) => {
61            info!("sync status request timed out, starting from current state");
62            return Ok(());
63        }
64    };
65
66    if peer_status <= *state.last_committed_height {
67        info!(
68            our_height = state.last_committed_height.as_u64(),
69            peer_height = peer_status.as_u64(),
70            "already caught up"
71        );
72        return Ok(());
73    }
74
75    info!(
76        our_height = state.last_committed_height.as_u64(),
77        peer_height = peer_status.as_u64(),
78        "starting block sync"
79    );
80
81    // Pipelined sync: prefetch the next batch while replaying the current one.
82    // This overlaps network latency with CPU-bound block execution.
83
84    // Request first batch
85    let first_from = Height(state.last_committed_height.as_u64() + 1);
86    let first_to = Height(cmp::min(
87        first_from.as_u64() + MAX_SYNC_BATCH - 1,
88        peer_status.as_u64(),
89    ));
90    request_tx
91        .send(SyncRequest::GetBlocks {
92            from_height: first_from,
93            to_height: first_to,
94        })
95        .await
96        .map_err(|_| eg!("sync channel closed"))?;
97
98    loop {
99        // Wait for current batch
100        let blocks = match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
101            Ok(Some(SyncResponse::Blocks(blocks))) => blocks,
102            Ok(Some(SyncResponse::Error(e))) => return Err(eg!("peer error: {}", e)),
103            Ok(Some(SyncResponse::Status { .. })) => return Err(eg!("unexpected status response")),
104            Ok(Some(SyncResponse::Snapshots(_))) => {
105                return Err(eg!("unexpected snapshots response"));
106            }
107            Ok(Some(SyncResponse::SnapshotChunk { .. })) => {
108                return Err(eg!("unexpected snapshot chunk response"));
109            }
110            Ok(None) => return Err(eg!("sync channel closed")),
111            Err(_) => return Err(eg!("sync request timed out")),
112        };
113
114        if blocks.is_empty() {
115            break;
116        }
117
118        // Prefetch next batch BEFORE replaying current one (pipeline overlap).
119        let synced_through = state.last_committed_height.as_u64() + blocks.len() as u64;
120        let needs_more = synced_through < peer_status.as_u64();
121        if needs_more {
122            let next_from = Height(
123                blocks
124                    .last()
125                    .map(|(b, _)| b.height.as_u64() + 1)
126                    .unwrap_or(first_from.as_u64()),
127            );
128            let next_to = Height(cmp::min(
129                next_from.as_u64() + MAX_SYNC_BATCH - 1,
130                peer_status.as_u64(),
131            ));
132            let _ = request_tx
133                .send(SyncRequest::GetBlocks {
134                    from_height: next_from,
135                    to_height: next_to,
136                })
137                .await;
138        }
139
140        // Replay current batch while the next one is being fetched.
141        let _pending = replay_blocks(&blocks, state)?;
142
143        info!(
144            synced_to = state.last_committed_height.as_u64(),
145            target = peer_status.as_u64(),
146            "sync progress"
147        );
148
149        if *state.last_committed_height >= peer_status || !needs_more {
150            break;
151        }
152    }
153
154    info!(
155        height = state.last_committed_height.as_u64(),
156        epoch = %state.current_epoch.number,
157        "block sync complete"
158    );
159    Ok(())
160}
161
162/// Query a peer's status via the sync channel. Returns `None` on timeout/error.
163pub async fn query_peer_status(
164    request_tx: &mpsc::Sender<SyncRequest>,
165    response_rx: &mut mpsc::Receiver<SyncResponse>,
166) -> Option<Height> {
167    request_tx.send(SyncRequest::GetStatus).await.ok()?;
168    match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
169        Ok(Some(SyncResponse::Status {
170            last_committed_height,
171            ..
172        })) => Some(last_committed_height),
173        _ => None,
174    }
175}
176
177/// Attempt state sync via snapshots. Returns `Ok(true)` if successful,
178/// `Ok(false)` if no snapshots are available (caller should fall back to block sync).
179pub async fn sync_via_snapshot(
180    state: &mut SyncState<'_>,
181    request_tx: &mpsc::Sender<SyncRequest>,
182    response_rx: &mut mpsc::Receiver<SyncResponse>,
183) -> Result<bool> {
184    // 1. Request snapshot list from peer
185    request_tx
186        .send(SyncRequest::GetSnapshots)
187        .await
188        .map_err(|_| eg!("sync channel closed"))?;
189
190    // 2. Wait for the snapshot list (10s timeout)
191    let snapshots = match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
192        Ok(Some(SyncResponse::Snapshots(list))) => list,
193        Ok(Some(SyncResponse::Error(e))) => return Err(eg!("peer error: {}", e)),
194        Ok(Some(_)) => return Err(eg!("unexpected response to GetSnapshots")),
195        Ok(None) => return Err(eg!("sync channel closed")),
196        Err(_) => {
197            info!("snapshot list request timed out");
198            return Ok(false);
199        }
200    };
201
202    // 3. If no snapshots available, fall back to block sync
203    if snapshots.is_empty() {
204        info!("peer has no snapshots, falling back to block sync");
205        return Ok(false);
206    }
207
208    // 4. Pick the latest snapshot (highest height)
209    let snapshot = snapshots
210        .iter()
211        .max_by_key(|s| s.height.as_u64())
212        .unwrap()
213        .clone();
214
215    // Skip if we're already at or past this height
216    if snapshot.height <= *state.last_committed_height {
217        info!(
218            snapshot_height = snapshot.height.as_u64(),
219            our_height = state.last_committed_height.as_u64(),
220            "snapshot not ahead of our state, falling back to block sync"
221        );
222        return Ok(false);
223    }
224
225    info!(
226        snapshot_height = snapshot.height.as_u64(),
227        chunks = snapshot.chunks,
228        "offering snapshot to application"
229    );
230
231    // 5. Offer the snapshot to the application
232    let offer_result = state.app.offer_snapshot(&snapshot);
233    match offer_result {
234        SnapshotOfferResult::Accept => {}
235        SnapshotOfferResult::Reject => {
236            info!("application rejected snapshot, falling back to block sync");
237            return Ok(false);
238        }
239        SnapshotOfferResult::Abort => {
240            return Err(eg!("application aborted snapshot sync"));
241        }
242    }
243
244    // 6. Download and apply chunks one by one
245    for chunk_index in 0..snapshot.chunks {
246        // Request chunk from peer
247        request_tx
248            .send(SyncRequest::GetSnapshotChunk {
249                height: snapshot.height,
250                chunk_index,
251            })
252            .await
253            .map_err(|_| eg!("sync channel closed"))?;
254
255        // Wait for chunk response
256        let chunk_data = match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
257            Ok(Some(SyncResponse::SnapshotChunk { data, .. })) => data,
258            Ok(Some(SyncResponse::Error(e))) => return Err(eg!("peer error: {}", e)),
259            Ok(Some(_)) => return Err(eg!("unexpected response to GetSnapshotChunk")),
260            Ok(None) => return Err(eg!("sync channel closed")),
261            Err(_) => return Err(eg!("snapshot chunk request timed out")),
262        };
263
264        // Apply chunk to the application
265        let apply_result = state.app.apply_snapshot_chunk(chunk_data, chunk_index);
266        match apply_result {
267            ChunkApplyResult::Accept => {
268                info!(
269                    chunk = chunk_index,
270                    total = snapshot.chunks,
271                    "applied snapshot chunk"
272                );
273            }
274            ChunkApplyResult::Retry => {
275                // For now, treat retry as a fatal error; a more sophisticated
276                // implementation could retry the chunk download.
277                warn!(
278                    chunk = chunk_index,
279                    "application requested chunk retry — aborting snapshot sync"
280                );
281                return Err(eg!(
282                    "snapshot chunk {} apply requested retry (not yet supported)",
283                    chunk_index
284                ));
285            }
286            ChunkApplyResult::Abort => {
287                return Err(eg!(
288                    "application aborted snapshot sync at chunk {}",
289                    chunk_index
290                ));
291            }
292        }
293    }
294
295    // 7. Update state to reflect the snapshot height
296    *state.last_committed_height = snapshot.height;
297    // The app_hash after snapshot restore is the snapshot's integrity hash
298    // (the application should set its own state root internally).
299    *state.last_app_hash = BlockHash(snapshot.hash);
300
301    info!(height = snapshot.height.as_u64(), "snapshot sync complete");
302    Ok(true)
303}
304
305/// Replay a batch of blocks: store them and run the application lifecycle.
306/// Validates chain continuity (parent_hash linkage).
307/// Replay synced blocks and return any pending epoch that hasn't reached its
308/// `start_view` yet. The caller should store this as `pending_epoch` in the
309/// engine so the epoch transition fires at the correct view.
310pub fn replay_blocks(
311    blocks: &[(Block, Option<hotmint_types::QuorumCertificate>)],
312    state: &mut SyncState<'_>,
313) -> Result<Option<Epoch>> {
314    // H-7: Track pending epoch separately, matching the runtime's deferred
315    // activation semantics. The new epoch only takes effect when we reach
316    // its start_view, preventing validator set mismatches during replay.
317    let mut pending_epoch: Option<Epoch> = None;
318
319    for (i, (block, qc)) in blocks.iter().enumerate() {
320        // H-7: Apply pending epoch transition at exactly start_view, matching
321        // the engine's advance_view_to behavior.
322        if let Some(ref ep) = pending_epoch
323            && block.view >= ep.start_view
324        {
325            *state.current_epoch = pending_epoch.take().unwrap();
326        }
327        // Validate chain continuity
328        if i > 0 && block.parent_hash != blocks[i - 1].0.hash {
329            return Err(eg!(
330                "chain discontinuity at height {}: expected parent {}, got {}",
331                block.height.as_u64(),
332                blocks[i - 1].0.hash,
333                block.parent_hash
334            ));
335        }
336        // F-06: Validate first block links to our last committed block
337        if i == 0
338            && state.last_committed_height.as_u64() > 0
339            && let Some(last) = state
340                .store
341                .get_block_by_height(*state.last_committed_height)
342            && block.parent_hash != last.hash
343        {
344            return Err(eg!(
345                "sync batch first block parent {} does not match last committed block {} at height {}",
346                block.parent_hash,
347                last.hash,
348                state.last_committed_height
349            ));
350        }
351
352        // Verify commit QC if present (non-genesis blocks should have one)
353        if let Some(cert) = qc {
354            if cert.block_hash != block.hash {
355                return Err(eg!(
356                    "sync QC block_hash mismatch at height {}: QC={} block={}",
357                    block.height.as_u64(),
358                    cert.block_hash,
359                    block.hash
360                ));
361            }
362            // Verify QC aggregate signature and quorum
363            let verifier = hotmint_crypto::Ed25519Verifier;
364            let qc_bytes = hotmint_types::vote::Vote::signing_bytes(
365                state.chain_id_hash,
366                cert.epoch,
367                cert.view,
368                &cert.block_hash,
369                hotmint_types::vote::VoteType::Vote,
370            );
371            if !hotmint_types::Verifier::verify_aggregate(
372                &verifier,
373                &state.current_epoch.validator_set,
374                &qc_bytes,
375                &cert.aggregate_signature,
376            ) {
377                return Err(eg!(
378                    "sync QC signature verification failed at height {}",
379                    block.height.as_u64()
380                ));
381            }
382            if !hotmint_crypto::has_quorum(
383                &state.current_epoch.validator_set,
384                &cert.aggregate_signature,
385            ) {
386                return Err(eg!(
387                    "sync QC below quorum threshold at height {}",
388                    block.height.as_u64()
389                ));
390            }
391        } else if block.height.as_u64() > 0 {
392            // Non-genesis blocks MUST have a commit QC — without one, the block
393            // has not been certified by a 2/3 quorum and must be rejected.
394            return Err(eg!(
395                "sync block at height {} missing commit QC — refusing unverified block",
396                block.height.as_u64()
397            ));
398        }
399
400        // Skip already-committed blocks
401        if block.height <= *state.last_committed_height {
402            continue;
403        }
404
405        // Defense-in-depth: verify the proposer is the correct leader for this view.
406        // The QC already proves 2f+1 honest validators accepted this block (and they
407        // checked the proposer), but we re-check here to catch corrupted sync data.
408        if let Some(expected_leader) = state
409            .current_epoch
410            .validator_set
411            .leader_for_view(block.view)
412            && block.proposer != expected_leader.id
413        {
414            return Err(eg!(
415                "sync block at height {} has wrong proposer: expected {} for view {}, got {}",
416                block.height.as_u64(),
417                expected_leader.id,
418                block.view,
419                block.proposer
420            ));
421        }
422
423        // Verify block hash integrity (includes app_hash in computation)
424        let expected_hash = hotmint_crypto::compute_block_hash(block);
425        if block.hash != expected_hash {
426            return Err(eg!(
427                "sync block hash mismatch at height {}: declared {} != computed {}",
428                block.height.as_u64(),
429                block.hash,
430                expected_hash
431            ));
432        }
433
434        // Verify app_hash matches local application state.
435        // Skip when the application does not track state roots (e.g. NoopApplication),
436        // so that fullnodes without an ABCI backend can sync from peers running real
437        // applications that produce non-zero app_hash values.
438        if state.app.tracks_app_hash() && block.app_hash != *state.last_app_hash {
439            return Err(eg!(
440                "sync block app_hash mismatch at height {}: block {} != local {}",
441                block.height.as_u64(),
442                block.app_hash,
443                state.last_app_hash
444            ));
445        }
446
447        // Store the block and its commit QC (H-12: so the node can serve
448        // commit proofs to other syncing peers and light clients).
449        state.store.put_block(block.clone());
450        if let Some(commit_qc) = qc {
451            state.store.put_commit_qc(block.height, commit_qc.clone());
452        }
453
454        // Run application lifecycle
455        let ctx = BlockContext {
456            height: block.height,
457            view: block.view,
458            proposer: block.proposer,
459            epoch: state.current_epoch.number,
460            epoch_start_view: state.current_epoch.start_view,
461            validator_set: &state.current_epoch.validator_set,
462            vote_extensions: vec![],
463        };
464
465        if !state.app.validate_block(block, &ctx) {
466            return Err(eg!(
467                "app rejected synced block at height {}",
468                block.height.as_u64()
469            ));
470        }
471
472        let txs = commit::decode_payload(&block.payload);
473        let response = state
474            .app
475            .execute_block(&txs, &ctx)
476            .c(d!("execute_block failed during sync"))?;
477
478        state
479            .app
480            .on_commit(block, &ctx)
481            .c(d!("on_commit failed during sync"))?;
482
483        *state.last_app_hash = if state.app.tracks_app_hash() {
484            response.app_hash
485        } else {
486            // App does not compute state roots: carry the chain's authoritative
487            // value forward so that the continuity check stays coherent.
488            block.app_hash
489        };
490
491        // Handle epoch transitions — defer to start_view (H-7)
492        if !response.validator_updates.is_empty() {
493            let new_vs = state
494                .current_epoch
495                .validator_set
496                .apply_updates(&response.validator_updates);
497            let epoch_start = ViewNumber(block.view.as_u64() + 2);
498            pending_epoch = Some(Epoch::new(
499                state.current_epoch.number.next(),
500                epoch_start,
501                new_vs,
502            ));
503        }
504
505        *state.last_committed_height = block.height;
506    }
507
508    // If the pending epoch's start_view was reached by the last block, apply it.
509    // Otherwise, return it so the caller (engine) can defer activation correctly.
510    if let Some(ref ep) = pending_epoch
511        && let Some((last_block, _)) = blocks.last()
512        && last_block.view >= ep.start_view
513    {
514        *state.current_epoch = pending_epoch.take().unwrap();
515    }
516
517    // Return any still-pending epoch for the caller to handle.
518    Ok(pending_epoch)
519}
520
521#[cfg(test)]
522mod tests {
523    use super::*;
524    use crate::application::NoopApplication;
525    use crate::store::MemoryBlockStore;
526    use hotmint_types::epoch::EpochNumber;
527    use hotmint_types::{BlockHash, QuorumCertificate, ValidatorId, ViewNumber};
528
529    const TEST_CHAIN: [u8; 32] = [0u8; 32];
530
531    fn make_qc(block: &Block, signer: &hotmint_crypto::Ed25519Signer) -> QuorumCertificate {
532        let vote_bytes = hotmint_types::vote::Vote::signing_bytes(
533            &TEST_CHAIN,
534            EpochNumber(0),
535            block.view,
536            &block.hash,
537            hotmint_types::vote::VoteType::Vote,
538        );
539        let sig = hotmint_types::Signer::sign(signer, &vote_bytes);
540        let mut agg = hotmint_types::AggregateSignature::new(1);
541        agg.add(0, sig).unwrap();
542        QuorumCertificate {
543            block_hash: block.hash,
544            view: block.view,
545            aggregate_signature: agg,
546            epoch: EpochNumber(0),
547        }
548    }
549
550    fn make_block(height: u64, parent: BlockHash) -> Block {
551        let mut block = Block {
552            height: Height(height),
553            parent_hash: parent,
554            view: ViewNumber(height),
555            proposer: ValidatorId(0),
556            timestamp: 0,
557            payload: vec![],
558            app_hash: BlockHash::GENESIS,
559            evidence: Vec::new(),
560            hash: BlockHash::GENESIS, // placeholder
561        };
562        block.hash = hotmint_crypto::compute_block_hash(&block);
563        block
564    }
565
566    #[test]
567    fn test_replay_blocks_valid_chain() {
568        let mut store = MemoryBlockStore::new();
569        let app = NoopApplication;
570        let signer = hotmint_crypto::Ed25519Signer::generate(ValidatorId(0));
571        let vs = hotmint_types::ValidatorSet::new(vec![hotmint_types::ValidatorInfo {
572            id: ValidatorId(0),
573            public_key: hotmint_types::Signer::public_key(&signer),
574            power: 1,
575        }]);
576        let mut epoch = Epoch::genesis(vs);
577        let mut height = Height::GENESIS;
578
579        let b1 = make_block(1, BlockHash::GENESIS);
580        let b2 = make_block(2, b1.hash);
581        let b3 = make_block(3, b2.hash);
582
583        let qc1 = make_qc(&b1, &signer);
584        let qc2 = make_qc(&b2, &signer);
585        let qc3 = make_qc(&b3, &signer);
586
587        let blocks: Vec<_> = vec![(b1, Some(qc1)), (b2, Some(qc2)), (b3, Some(qc3))];
588        let mut app_hash = BlockHash::GENESIS;
589        let mut state = SyncState {
590            store: &mut store,
591            app: &app,
592            current_epoch: &mut epoch,
593            last_committed_height: &mut height,
594            last_app_hash: &mut app_hash,
595            chain_id_hash: &TEST_CHAIN,
596        };
597        replay_blocks(&blocks, &mut state).unwrap();
598        assert_eq!(height, Height(3));
599        assert!(store.get_block_by_height(Height(1)).is_some());
600        assert!(store.get_block_by_height(Height(3)).is_some());
601    }
602
603    #[test]
604    fn test_replay_blocks_rejects_missing_qc() {
605        let mut store = MemoryBlockStore::new();
606        let app = NoopApplication;
607        let signer = hotmint_crypto::Ed25519Signer::generate(ValidatorId(0));
608        let vs = hotmint_types::ValidatorSet::new(vec![hotmint_types::ValidatorInfo {
609            id: ValidatorId(0),
610            public_key: hotmint_types::Signer::public_key(&signer),
611            power: 1,
612        }]);
613        let mut epoch = Epoch::genesis(vs);
614        let mut height = Height::GENESIS;
615
616        let b1 = make_block(1, BlockHash::GENESIS);
617        let qc1 = make_qc(&b1, &signer);
618        let b2 = make_block(2, b1.hash);
619        // Non-genesis block without QC should be rejected
620        let blocks: Vec<_> = vec![(b1, Some(qc1)), (b2, None)];
621        let mut app_hash = BlockHash::GENESIS;
622        let mut state = SyncState {
623            store: &mut store,
624            app: &app,
625            current_epoch: &mut epoch,
626            last_committed_height: &mut height,
627            last_app_hash: &mut app_hash,
628            chain_id_hash: &TEST_CHAIN,
629        };
630        assert!(replay_blocks(&blocks, &mut state).is_err());
631    }
632
633    #[test]
634    fn test_replay_blocks_broken_chain() {
635        let mut store = MemoryBlockStore::new();
636        let app = NoopApplication;
637        let signer = hotmint_crypto::Ed25519Signer::generate(ValidatorId(0));
638        let vs = hotmint_types::ValidatorSet::new(vec![hotmint_types::ValidatorInfo {
639            id: ValidatorId(0),
640            public_key: hotmint_types::Signer::public_key(&signer),
641            power: 1,
642        }]);
643        let mut epoch = Epoch::genesis(vs);
644        let mut height = Height::GENESIS;
645
646        let b1 = make_block(1, BlockHash::GENESIS);
647        let b3 = make_block(3, BlockHash([99u8; 32])); // wrong parent
648
649        let qc1 = make_qc(&b1, &signer);
650        let qc3 = make_qc(&b3, &signer);
651        let blocks: Vec<_> = vec![(b1, Some(qc1)), (b3, Some(qc3))];
652        let mut app_hash = BlockHash::GENESIS;
653        let mut state = SyncState {
654            store: &mut store,
655            app: &app,
656            current_epoch: &mut epoch,
657            last_committed_height: &mut height,
658            last_app_hash: &mut app_hash,
659            chain_id_hash: &TEST_CHAIN,
660        };
661        assert!(replay_blocks(&blocks, &mut state).is_err());
662    }
663}