1use std::cmp;
5
6use ruc::*;
7
8use crate::application::Application;
9use crate::commit;
10use crate::store::BlockStore;
11use hotmint_types::context::BlockContext;
12use hotmint_types::epoch::Epoch;
13use hotmint_types::sync::{
14 ChunkApplyResult, MAX_SYNC_BATCH, SnapshotOfferResult, SyncRequest, SyncResponse,
15};
16use hotmint_types::{Block, BlockHash, Height, ViewNumber};
17use tokio::sync::mpsc;
18use tokio::time::{Duration, timeout};
19use tracing::{info, warn};
20
21const SYNC_TIMEOUT: Duration = Duration::from_secs(10);
22
23pub struct SyncState<'a> {
25 pub store: &'a mut dyn BlockStore,
26 pub app: &'a dyn Application,
27 pub current_epoch: &'a mut Epoch,
28 pub last_committed_height: &'a mut Height,
29 pub last_app_hash: &'a mut BlockHash,
30 pub chain_id_hash: &'a [u8; 32],
31 pub pending_epoch: &'a mut Option<Epoch>,
33}
34
35pub async fn sync_to_tip(
40 state: &mut SyncState<'_>,
41 request_tx: &mpsc::Sender<SyncRequest>,
42 response_rx: &mut mpsc::Receiver<SyncResponse>,
43) -> Result<()> {
44 request_tx
46 .send(SyncRequest::GetStatus)
47 .await
48 .map_err(|_| eg!("sync channel closed"))?;
49
50 let peer_status = match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
51 Ok(Some(SyncResponse::Status {
52 last_committed_height: peer_height,
53 ..
54 })) => peer_height,
55 Ok(Some(SyncResponse::Error(e))) => return Err(eg!("peer error: {}", e)),
56 Ok(Some(SyncResponse::Blocks(_))) => return Err(eg!("unexpected blocks response")),
57 Ok(Some(SyncResponse::Snapshots(_))) => return Err(eg!("unexpected snapshots response")),
58 Ok(Some(SyncResponse::SnapshotChunk { .. })) => {
59 return Err(eg!("unexpected snapshot chunk response"));
60 }
61 Ok(None) => return Err(eg!("sync channel closed")),
62 Err(_) => {
63 info!("sync status request timed out, starting from current state");
64 return Ok(());
65 }
66 };
67
68 if peer_status <= *state.last_committed_height {
69 info!(
70 our_height = state.last_committed_height.as_u64(),
71 peer_height = peer_status.as_u64(),
72 "already caught up"
73 );
74 return Ok(());
75 }
76
77 info!(
78 our_height = state.last_committed_height.as_u64(),
79 peer_height = peer_status.as_u64(),
80 "starting block sync"
81 );
82
83 let gap = peer_status
86 .as_u64()
87 .saturating_sub(state.last_committed_height.as_u64());
88 if gap > MAX_SYNC_BATCH {
89 match sync_via_snapshot(state, request_tx, response_rx).await {
90 Ok(true) => {
91 info!(
92 height = state.last_committed_height.as_u64(),
93 "snapshot sync succeeded, continuing with block sync for remaining blocks"
94 );
95 }
97 Ok(false) => {
98 info!("no suitable snapshot available, using full block sync");
99 }
100 Err(e) => {
101 warn!(error = %e, "snapshot sync failed, falling back to block sync");
102 }
103 }
104 }
105
106 if *state.last_committed_height >= peer_status {
108 info!(
109 height = state.last_committed_height.as_u64(),
110 "caught up via snapshot"
111 );
112 return Ok(());
113 }
114
115 let first_from = Height(state.last_committed_height.as_u64() + 1);
120 let first_to = Height(cmp::min(
121 first_from.as_u64() + MAX_SYNC_BATCH - 1,
122 peer_status.as_u64(),
123 ));
124 request_tx
125 .send(SyncRequest::GetBlocks {
126 from_height: first_from,
127 to_height: first_to,
128 })
129 .await
130 .map_err(|_| eg!("sync channel closed"))?;
131
132 loop {
133 let blocks = match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
135 Ok(Some(SyncResponse::Blocks(blocks))) => blocks,
136 Ok(Some(SyncResponse::Error(e))) => return Err(eg!("peer error: {}", e)),
137 Ok(Some(SyncResponse::Status { .. })) => return Err(eg!("unexpected status response")),
138 Ok(Some(SyncResponse::Snapshots(_))) => {
139 return Err(eg!("unexpected snapshots response"));
140 }
141 Ok(Some(SyncResponse::SnapshotChunk { .. })) => {
142 return Err(eg!("unexpected snapshot chunk response"));
143 }
144 Ok(None) => return Err(eg!("sync channel closed")),
145 Err(_) => return Err(eg!("sync request timed out")),
146 };
147
148 if blocks.is_empty() {
149 break;
150 }
151
152 let synced_through = state.last_committed_height.as_u64() + blocks.len() as u64;
154 let needs_more = synced_through < peer_status.as_u64();
155 if needs_more {
156 let next_from = Height(
157 blocks
158 .last()
159 .map(|(b, _)| b.height.as_u64() + 1)
160 .unwrap_or(first_from.as_u64()),
161 );
162 let next_to = Height(cmp::min(
163 next_from.as_u64() + MAX_SYNC_BATCH - 1,
164 peer_status.as_u64(),
165 ));
166 let _ = request_tx
167 .send(SyncRequest::GetBlocks {
168 from_height: next_from,
169 to_height: next_to,
170 })
171 .await;
172 }
173
174 if let Some(epoch) = replay_blocks(&blocks, state)? {
178 *state.pending_epoch = Some(epoch);
179 }
180
181 info!(
182 synced_to = state.last_committed_height.as_u64(),
183 target = peer_status.as_u64(),
184 "sync progress"
185 );
186
187 if *state.last_committed_height >= peer_status || !needs_more {
188 break;
189 }
190 }
191
192 info!(
193 height = state.last_committed_height.as_u64(),
194 epoch = %state.current_epoch.number,
195 "block sync complete"
196 );
197 Ok(())
198}
199
200pub async fn query_peer_status(
202 request_tx: &mpsc::Sender<SyncRequest>,
203 response_rx: &mut mpsc::Receiver<SyncResponse>,
204) -> Option<Height> {
205 request_tx.send(SyncRequest::GetStatus).await.ok()?;
206 match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
207 Ok(Some(SyncResponse::Status {
208 last_committed_height,
209 ..
210 })) => Some(last_committed_height),
211 _ => None,
212 }
213}
214
215pub async fn sync_via_snapshot(
218 state: &mut SyncState<'_>,
219 request_tx: &mpsc::Sender<SyncRequest>,
220 response_rx: &mut mpsc::Receiver<SyncResponse>,
221) -> Result<bool> {
222 request_tx
224 .send(SyncRequest::GetSnapshots)
225 .await
226 .map_err(|_| eg!("sync channel closed"))?;
227
228 let snapshots = match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
230 Ok(Some(SyncResponse::Snapshots(list))) => list,
231 Ok(Some(SyncResponse::Error(e))) => return Err(eg!("peer error: {}", e)),
232 Ok(Some(_)) => return Err(eg!("unexpected response to GetSnapshots")),
233 Ok(None) => return Err(eg!("sync channel closed")),
234 Err(_) => {
235 info!("snapshot list request timed out");
236 return Ok(false);
237 }
238 };
239
240 if snapshots.is_empty() {
242 info!("peer has no snapshots, falling back to block sync");
243 return Ok(false);
244 }
245
246 let snapshot = snapshots
248 .iter()
249 .max_by_key(|s| s.height.as_u64())
250 .unwrap()
251 .clone();
252
253 if snapshot.height <= *state.last_committed_height {
255 info!(
256 snapshot_height = snapshot.height.as_u64(),
257 our_height = state.last_committed_height.as_u64(),
258 "snapshot not ahead of our state, falling back to block sync"
259 );
260 return Ok(false);
261 }
262
263 info!(
264 snapshot_height = snapshot.height.as_u64(),
265 chunks = snapshot.chunks,
266 "offering snapshot to application"
267 );
268
269 let offer_result = state.app.offer_snapshot(&snapshot);
271 match offer_result {
272 SnapshotOfferResult::Accept => {}
273 SnapshotOfferResult::Reject => {
274 info!("application rejected snapshot, falling back to block sync");
275 return Ok(false);
276 }
277 SnapshotOfferResult::Abort => {
278 return Err(eg!("application aborted snapshot sync"));
279 }
280 }
281
282 for chunk_index in 0..snapshot.chunks {
284 request_tx
286 .send(SyncRequest::GetSnapshotChunk {
287 height: snapshot.height,
288 chunk_index,
289 })
290 .await
291 .map_err(|_| eg!("sync channel closed"))?;
292
293 let chunk_data = match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
295 Ok(Some(SyncResponse::SnapshotChunk { data, .. })) => data,
296 Ok(Some(SyncResponse::Error(e))) => return Err(eg!("peer error: {}", e)),
297 Ok(Some(_)) => return Err(eg!("unexpected response to GetSnapshotChunk")),
298 Ok(None) => return Err(eg!("sync channel closed")),
299 Err(_) => return Err(eg!("snapshot chunk request timed out")),
300 };
301
302 let apply_result = state.app.apply_snapshot_chunk(chunk_data, chunk_index);
304 match apply_result {
305 ChunkApplyResult::Accept => {
306 info!(
307 chunk = chunk_index,
308 total = snapshot.chunks,
309 "applied snapshot chunk"
310 );
311 }
312 ChunkApplyResult::Retry => {
313 warn!(
316 chunk = chunk_index,
317 "application requested chunk retry — aborting snapshot sync"
318 );
319 return Err(eg!(
320 "snapshot chunk {} apply requested retry (not yet supported)",
321 chunk_index
322 ));
323 }
324 ChunkApplyResult::Abort => {
325 return Err(eg!(
326 "application aborted snapshot sync at chunk {}",
327 chunk_index
328 ));
329 }
330 }
331 }
332
333 request_tx
337 .send(SyncRequest::GetBlocks {
338 from_height: snapshot.height,
339 to_height: snapshot.height,
340 })
341 .await
342 .map_err(|_| eg!("sync channel closed"))?;
343
344 let anchor_block = match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
345 Ok(Some(SyncResponse::Blocks(blocks))) if !blocks.is_empty() => blocks,
346 Ok(Some(SyncResponse::Error(e))) => return Err(eg!("peer error fetching anchor: {}", e)),
347 Ok(Some(_)) => return Err(eg!("unexpected response fetching snapshot anchor block")),
348 Ok(None) => return Err(eg!("sync channel closed")),
349 Err(_) => return Err(eg!("snapshot anchor block request timed out")),
350 };
351
352 let (block, qc_opt) = &anchor_block[0];
353 let qc = qc_opt
354 .as_ref()
355 .ok_or_else(|| eg!("snapshot anchor block has no QC"))?;
356
357 if qc.block_hash != block.hash {
359 return Err(eg!(
360 "snapshot anchor QC block_hash {} != block hash {}",
361 qc.block_hash,
362 block.hash
363 ));
364 }
365
366 let verifier = hotmint_crypto::Ed25519Verifier;
368 let qc_bytes = hotmint_types::vote::Vote::signing_bytes(
369 state.chain_id_hash,
370 qc.epoch,
371 qc.view,
372 &qc.block_hash,
373 hotmint_types::vote::VoteType::Vote,
374 );
375 if !hotmint_types::Verifier::verify_aggregate(
376 &verifier,
377 &state.current_epoch.validator_set,
378 &qc_bytes,
379 &qc.aggregate_signature,
380 ) {
381 return Err(eg!(
382 "snapshot anchor QC signature verification failed at height {}",
383 snapshot.height.as_u64()
384 ));
385 }
386 if !hotmint_crypto::has_quorum(&state.current_epoch.validator_set, &qc.aggregate_signature) {
387 return Err(eg!(
388 "snapshot anchor QC below quorum threshold at height {}",
389 snapshot.height.as_u64()
390 ));
391 }
392
393 *state.last_committed_height = snapshot.height;
400 let app_info = state.app.info();
401 if app_info.last_block_height == snapshot.height
402 && app_info.last_block_app_hash != BlockHash::GENESIS
403 {
404 *state.last_app_hash = app_info.last_block_app_hash;
405 } else {
406 *state.last_app_hash = BlockHash::GENESIS;
410 }
411
412 info!(height = snapshot.height.as_u64(), "snapshot sync complete");
413 Ok(true)
414}
415
416pub fn replay_blocks(
422 blocks: &[(Block, Option<hotmint_types::QuorumCertificate>)],
423 state: &mut SyncState<'_>,
424) -> Result<Option<Epoch>> {
425 let mut pending_epoch: Option<Epoch> = None;
429
430 for (i, (block, qc)) in blocks.iter().enumerate() {
431 if let Some(ref ep) = pending_epoch
434 && block.view >= ep.start_view
435 {
436 *state.current_epoch = pending_epoch.take().unwrap();
437 }
438 if i > 0 && block.parent_hash != blocks[i - 1].0.hash {
440 return Err(eg!(
441 "chain discontinuity at height {}: expected parent {}, got {}",
442 block.height.as_u64(),
443 blocks[i - 1].0.hash,
444 block.parent_hash
445 ));
446 }
447 if i == 0
449 && state.last_committed_height.as_u64() > 0
450 && let Some(last) = state
451 .store
452 .get_block_by_height(*state.last_committed_height)
453 && block.parent_hash != last.hash
454 {
455 return Err(eg!(
456 "sync batch first block parent {} does not match last committed block {} at height {}",
457 block.parent_hash,
458 last.hash,
459 state.last_committed_height
460 ));
461 }
462
463 if let Some(cert) = qc {
465 if cert.block_hash != block.hash {
466 return Err(eg!(
467 "sync QC block_hash mismatch at height {}: QC={} block={}",
468 block.height.as_u64(),
469 cert.block_hash,
470 block.hash
471 ));
472 }
473 let verifier = hotmint_crypto::Ed25519Verifier;
475 let qc_bytes = hotmint_types::vote::Vote::signing_bytes(
476 state.chain_id_hash,
477 cert.epoch,
478 cert.view,
479 &cert.block_hash,
480 hotmint_types::vote::VoteType::Vote,
481 );
482 if !hotmint_types::Verifier::verify_aggregate(
483 &verifier,
484 &state.current_epoch.validator_set,
485 &qc_bytes,
486 &cert.aggregate_signature,
487 ) {
488 return Err(eg!(
489 "sync QC signature verification failed at height {}",
490 block.height.as_u64()
491 ));
492 }
493 if !hotmint_crypto::has_quorum(
494 &state.current_epoch.validator_set,
495 &cert.aggregate_signature,
496 ) {
497 return Err(eg!(
498 "sync QC below quorum threshold at height {}",
499 block.height.as_u64()
500 ));
501 }
502 } else if block.height.as_u64() > 0 {
503 return Err(eg!(
506 "sync block at height {} missing commit QC — refusing unverified block",
507 block.height.as_u64()
508 ));
509 }
510
511 if block.height <= *state.last_committed_height {
513 continue;
514 }
515
516 if let Some(expected_leader) = state
520 .current_epoch
521 .validator_set
522 .leader_for_view(block.view)
523 && block.proposer != expected_leader.id
524 {
525 return Err(eg!(
526 "sync block at height {} has wrong proposer: expected {} for view {}, got {}",
527 block.height.as_u64(),
528 expected_leader.id,
529 block.view,
530 block.proposer
531 ));
532 }
533
534 let expected_hash = hotmint_crypto::compute_block_hash(block);
536 if block.hash != expected_hash {
537 return Err(eg!(
538 "sync block hash mismatch at height {}: declared {} != computed {}",
539 block.height.as_u64(),
540 block.hash,
541 expected_hash
542 ));
543 }
544
545 if state.app.tracks_app_hash() && block.app_hash != *state.last_app_hash {
550 return Err(eg!(
551 "sync block app_hash mismatch at height {}: block {} != local {}",
552 block.height.as_u64(),
553 block.app_hash,
554 state.last_app_hash
555 ));
556 }
557
558 state.store.put_block(block.clone());
561 if let Some(commit_qc) = qc {
562 state.store.put_commit_qc(block.height, commit_qc.clone());
563 }
564
565 let ctx = BlockContext {
567 height: block.height,
568 view: block.view,
569 proposer: block.proposer,
570 epoch: state.current_epoch.number,
571 epoch_start_view: state.current_epoch.start_view,
572 validator_set: &state.current_epoch.validator_set,
573 timestamp: block.timestamp,
574 vote_extensions: vec![],
575 };
576
577 if !state.app.validate_block(block, &ctx) {
578 return Err(eg!(
579 "app rejected synced block at height {}",
580 block.height.as_u64()
581 ));
582 }
583
584 let txs = commit::decode_payload(&block.payload);
585 let response = state
586 .app
587 .execute_block(&txs, &ctx)
588 .c(d!("execute_block failed during sync"))?;
589
590 state
591 .app
592 .on_commit(block, &ctx)
593 .c(d!("on_commit failed during sync"))?;
594
595 *state.last_app_hash = if state.app.tracks_app_hash() {
596 response.app_hash
597 } else {
598 block.app_hash
601 };
602
603 if !response.validator_updates.is_empty() {
605 let new_vs = state
606 .current_epoch
607 .validator_set
608 .apply_updates(&response.validator_updates);
609 let epoch_start = ViewNumber(block.view.as_u64() + 2);
610 pending_epoch = Some(Epoch::new(
611 state.current_epoch.number.next(),
612 epoch_start,
613 new_vs,
614 ));
615 }
616
617 *state.last_committed_height = block.height;
618 }
619
620 if let Some(ref ep) = pending_epoch
623 && let Some((last_block, _)) = blocks.last()
624 && last_block.view >= ep.start_view
625 {
626 *state.current_epoch = pending_epoch.take().unwrap();
627 }
628
629 Ok(pending_epoch)
631}
632
633#[cfg(test)]
634mod tests {
635 use super::*;
636 use crate::application::NoopApplication;
637 use crate::store::MemoryBlockStore;
638 use hotmint_types::epoch::EpochNumber;
639 use hotmint_types::{BlockHash, QuorumCertificate, ValidatorId, ViewNumber};
640
641 const TEST_CHAIN: [u8; 32] = [0u8; 32];
642
643 fn make_qc(block: &Block, signer: &hotmint_crypto::Ed25519Signer) -> QuorumCertificate {
644 let vote_bytes = hotmint_types::vote::Vote::signing_bytes(
645 &TEST_CHAIN,
646 EpochNumber(0),
647 block.view,
648 &block.hash,
649 hotmint_types::vote::VoteType::Vote,
650 );
651 let sig = hotmint_types::Signer::sign(signer, &vote_bytes);
652 let mut agg = hotmint_types::AggregateSignature::new(1);
653 agg.add(0, sig).unwrap();
654 QuorumCertificate {
655 block_hash: block.hash,
656 view: block.view,
657 aggregate_signature: agg,
658 epoch: EpochNumber(0),
659 }
660 }
661
662 fn make_block(height: u64, parent: BlockHash) -> Block {
663 let mut block = Block {
664 height: Height(height),
665 parent_hash: parent,
666 view: ViewNumber(height),
667 proposer: ValidatorId(0),
668 timestamp: 0,
669 payload: vec![],
670 app_hash: BlockHash::GENESIS,
671 evidence: Vec::new(),
672 hash: BlockHash::GENESIS, };
674 block.hash = hotmint_crypto::compute_block_hash(&block);
675 block
676 }
677
678 #[test]
679 fn test_replay_blocks_valid_chain() {
680 let mut store = MemoryBlockStore::new();
681 let app = NoopApplication;
682 let signer = hotmint_crypto::Ed25519Signer::generate(ValidatorId(0));
683 let vs = hotmint_types::ValidatorSet::new(vec![hotmint_types::ValidatorInfo {
684 id: ValidatorId(0),
685 public_key: hotmint_types::Signer::public_key(&signer),
686 power: 1,
687 }]);
688 let mut epoch = Epoch::genesis(vs);
689 let mut height = Height::GENESIS;
690
691 let b1 = make_block(1, BlockHash::GENESIS);
692 let b2 = make_block(2, b1.hash);
693 let b3 = make_block(3, b2.hash);
694
695 let qc1 = make_qc(&b1, &signer);
696 let qc2 = make_qc(&b2, &signer);
697 let qc3 = make_qc(&b3, &signer);
698
699 let blocks: Vec<_> = vec![(b1, Some(qc1)), (b2, Some(qc2)), (b3, Some(qc3))];
700 let mut app_hash = BlockHash::GENESIS;
701 let mut pending_epoch = None;
702 let mut state = SyncState {
703 store: &mut store,
704 app: &app,
705 current_epoch: &mut epoch,
706 last_committed_height: &mut height,
707 last_app_hash: &mut app_hash,
708 chain_id_hash: &TEST_CHAIN,
709 pending_epoch: &mut pending_epoch,
710 };
711 replay_blocks(&blocks, &mut state).unwrap();
712 assert_eq!(height, Height(3));
713 assert!(store.get_block_by_height(Height(1)).is_some());
714 assert!(store.get_block_by_height(Height(3)).is_some());
715 }
716
717 #[test]
718 fn test_replay_blocks_rejects_missing_qc() {
719 let mut store = MemoryBlockStore::new();
720 let app = NoopApplication;
721 let signer = hotmint_crypto::Ed25519Signer::generate(ValidatorId(0));
722 let vs = hotmint_types::ValidatorSet::new(vec![hotmint_types::ValidatorInfo {
723 id: ValidatorId(0),
724 public_key: hotmint_types::Signer::public_key(&signer),
725 power: 1,
726 }]);
727 let mut epoch = Epoch::genesis(vs);
728 let mut height = Height::GENESIS;
729
730 let b1 = make_block(1, BlockHash::GENESIS);
731 let qc1 = make_qc(&b1, &signer);
732 let b2 = make_block(2, b1.hash);
733 let blocks: Vec<_> = vec![(b1, Some(qc1)), (b2, None)];
735 let mut app_hash = BlockHash::GENESIS;
736 let mut pending_epoch = None;
737 let mut state = SyncState {
738 store: &mut store,
739 app: &app,
740 current_epoch: &mut epoch,
741 last_committed_height: &mut height,
742 last_app_hash: &mut app_hash,
743 chain_id_hash: &TEST_CHAIN,
744 pending_epoch: &mut pending_epoch,
745 };
746 assert!(replay_blocks(&blocks, &mut state).is_err());
747 }
748
749 #[test]
750 fn test_replay_blocks_broken_chain() {
751 let mut store = MemoryBlockStore::new();
752 let app = NoopApplication;
753 let signer = hotmint_crypto::Ed25519Signer::generate(ValidatorId(0));
754 let vs = hotmint_types::ValidatorSet::new(vec![hotmint_types::ValidatorInfo {
755 id: ValidatorId(0),
756 public_key: hotmint_types::Signer::public_key(&signer),
757 power: 1,
758 }]);
759 let mut epoch = Epoch::genesis(vs);
760 let mut height = Height::GENESIS;
761
762 let b1 = make_block(1, BlockHash::GENESIS);
763 let b3 = make_block(3, BlockHash([99u8; 32])); let qc1 = make_qc(&b1, &signer);
766 let qc3 = make_qc(&b3, &signer);
767 let blocks: Vec<_> = vec![(b1, Some(qc1)), (b3, Some(qc3))];
768 let mut app_hash = BlockHash::GENESIS;
769 let mut pending_epoch = None;
770 let mut state = SyncState {
771 store: &mut store,
772 app: &app,
773 current_epoch: &mut epoch,
774 last_committed_height: &mut height,
775 last_app_hash: &mut app_hash,
776 chain_id_hash: &TEST_CHAIN,
777 pending_epoch: &mut pending_epoch,
778 };
779 assert!(replay_blocks(&blocks, &mut state).is_err());
780 }
781}