1use std::cmp;
5
6use ruc::*;
7
8use crate::application::Application;
9use crate::commit;
10use crate::store::BlockStore;
11use hotmint_types::context::BlockContext;
12use hotmint_types::epoch::Epoch;
13use hotmint_types::sync::{
14 ChunkApplyResult, MAX_SYNC_BATCH, SnapshotOfferResult, SyncRequest, SyncResponse,
15};
16use hotmint_types::{Block, BlockHash, Height, ViewNumber};
17use tokio::sync::mpsc;
18use tokio::time::{Duration, timeout};
19use tracing::{info, warn};
20
21const SYNC_TIMEOUT: Duration = Duration::from_secs(10);
22
23pub struct SyncState<'a> {
25 pub store: &'a mut dyn BlockStore,
26 pub app: &'a dyn Application,
27 pub current_epoch: &'a mut Epoch,
28 pub last_committed_height: &'a mut Height,
29 pub last_app_hash: &'a mut BlockHash,
30 pub chain_id_hash: &'a [u8; 32],
31}
32
33pub async fn sync_to_tip(
38 state: &mut SyncState<'_>,
39 request_tx: &mpsc::Sender<SyncRequest>,
40 response_rx: &mut mpsc::Receiver<SyncResponse>,
41) -> Result<()> {
42 request_tx
44 .send(SyncRequest::GetStatus)
45 .await
46 .map_err(|_| eg!("sync channel closed"))?;
47
48 let peer_status = match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
49 Ok(Some(SyncResponse::Status {
50 last_committed_height: peer_height,
51 ..
52 })) => peer_height,
53 Ok(Some(SyncResponse::Error(e))) => return Err(eg!("peer error: {}", e)),
54 Ok(Some(SyncResponse::Blocks(_))) => return Err(eg!("unexpected blocks response")),
55 Ok(Some(SyncResponse::Snapshots(_))) => return Err(eg!("unexpected snapshots response")),
56 Ok(Some(SyncResponse::SnapshotChunk { .. })) => {
57 return Err(eg!("unexpected snapshot chunk response"));
58 }
59 Ok(None) => return Err(eg!("sync channel closed")),
60 Err(_) => {
61 info!("sync status request timed out, starting from current state");
62 return Ok(());
63 }
64 };
65
66 if peer_status <= *state.last_committed_height {
67 info!(
68 our_height = state.last_committed_height.as_u64(),
69 peer_height = peer_status.as_u64(),
70 "already caught up"
71 );
72 return Ok(());
73 }
74
75 info!(
76 our_height = state.last_committed_height.as_u64(),
77 peer_height = peer_status.as_u64(),
78 "starting block sync"
79 );
80
81 let first_from = Height(state.last_committed_height.as_u64() + 1);
86 let first_to = Height(cmp::min(
87 first_from.as_u64() + MAX_SYNC_BATCH - 1,
88 peer_status.as_u64(),
89 ));
90 request_tx
91 .send(SyncRequest::GetBlocks {
92 from_height: first_from,
93 to_height: first_to,
94 })
95 .await
96 .map_err(|_| eg!("sync channel closed"))?;
97
98 loop {
99 let blocks = match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
101 Ok(Some(SyncResponse::Blocks(blocks))) => blocks,
102 Ok(Some(SyncResponse::Error(e))) => return Err(eg!("peer error: {}", e)),
103 Ok(Some(SyncResponse::Status { .. })) => return Err(eg!("unexpected status response")),
104 Ok(Some(SyncResponse::Snapshots(_))) => {
105 return Err(eg!("unexpected snapshots response"));
106 }
107 Ok(Some(SyncResponse::SnapshotChunk { .. })) => {
108 return Err(eg!("unexpected snapshot chunk response"));
109 }
110 Ok(None) => return Err(eg!("sync channel closed")),
111 Err(_) => return Err(eg!("sync request timed out")),
112 };
113
114 if blocks.is_empty() {
115 break;
116 }
117
118 let synced_through = state.last_committed_height.as_u64() + blocks.len() as u64;
120 let needs_more = synced_through < peer_status.as_u64();
121 if needs_more {
122 let next_from = Height(
123 blocks
124 .last()
125 .map(|(b, _)| b.height.as_u64() + 1)
126 .unwrap_or(first_from.as_u64()),
127 );
128 let next_to = Height(cmp::min(
129 next_from.as_u64() + MAX_SYNC_BATCH - 1,
130 peer_status.as_u64(),
131 ));
132 let _ = request_tx
133 .send(SyncRequest::GetBlocks {
134 from_height: next_from,
135 to_height: next_to,
136 })
137 .await;
138 }
139
140 let _pending = replay_blocks(&blocks, state)?;
142
143 info!(
144 synced_to = state.last_committed_height.as_u64(),
145 target = peer_status.as_u64(),
146 "sync progress"
147 );
148
149 if *state.last_committed_height >= peer_status || !needs_more {
150 break;
151 }
152 }
153
154 info!(
155 height = state.last_committed_height.as_u64(),
156 epoch = %state.current_epoch.number,
157 "block sync complete"
158 );
159 Ok(())
160}
161
162pub async fn query_peer_status(
164 request_tx: &mpsc::Sender<SyncRequest>,
165 response_rx: &mut mpsc::Receiver<SyncResponse>,
166) -> Option<Height> {
167 request_tx.send(SyncRequest::GetStatus).await.ok()?;
168 match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
169 Ok(Some(SyncResponse::Status {
170 last_committed_height,
171 ..
172 })) => Some(last_committed_height),
173 _ => None,
174 }
175}
176
177pub async fn sync_via_snapshot(
180 state: &mut SyncState<'_>,
181 request_tx: &mpsc::Sender<SyncRequest>,
182 response_rx: &mut mpsc::Receiver<SyncResponse>,
183) -> Result<bool> {
184 request_tx
186 .send(SyncRequest::GetSnapshots)
187 .await
188 .map_err(|_| eg!("sync channel closed"))?;
189
190 let snapshots = match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
192 Ok(Some(SyncResponse::Snapshots(list))) => list,
193 Ok(Some(SyncResponse::Error(e))) => return Err(eg!("peer error: {}", e)),
194 Ok(Some(_)) => return Err(eg!("unexpected response to GetSnapshots")),
195 Ok(None) => return Err(eg!("sync channel closed")),
196 Err(_) => {
197 info!("snapshot list request timed out");
198 return Ok(false);
199 }
200 };
201
202 if snapshots.is_empty() {
204 info!("peer has no snapshots, falling back to block sync");
205 return Ok(false);
206 }
207
208 let snapshot = snapshots
210 .iter()
211 .max_by_key(|s| s.height.as_u64())
212 .unwrap()
213 .clone();
214
215 if snapshot.height <= *state.last_committed_height {
217 info!(
218 snapshot_height = snapshot.height.as_u64(),
219 our_height = state.last_committed_height.as_u64(),
220 "snapshot not ahead of our state, falling back to block sync"
221 );
222 return Ok(false);
223 }
224
225 info!(
226 snapshot_height = snapshot.height.as_u64(),
227 chunks = snapshot.chunks,
228 "offering snapshot to application"
229 );
230
231 let offer_result = state.app.offer_snapshot(&snapshot);
233 match offer_result {
234 SnapshotOfferResult::Accept => {}
235 SnapshotOfferResult::Reject => {
236 info!("application rejected snapshot, falling back to block sync");
237 return Ok(false);
238 }
239 SnapshotOfferResult::Abort => {
240 return Err(eg!("application aborted snapshot sync"));
241 }
242 }
243
244 for chunk_index in 0..snapshot.chunks {
246 request_tx
248 .send(SyncRequest::GetSnapshotChunk {
249 height: snapshot.height,
250 chunk_index,
251 })
252 .await
253 .map_err(|_| eg!("sync channel closed"))?;
254
255 let chunk_data = match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
257 Ok(Some(SyncResponse::SnapshotChunk { data, .. })) => data,
258 Ok(Some(SyncResponse::Error(e))) => return Err(eg!("peer error: {}", e)),
259 Ok(Some(_)) => return Err(eg!("unexpected response to GetSnapshotChunk")),
260 Ok(None) => return Err(eg!("sync channel closed")),
261 Err(_) => return Err(eg!("snapshot chunk request timed out")),
262 };
263
264 let apply_result = state.app.apply_snapshot_chunk(chunk_data, chunk_index);
266 match apply_result {
267 ChunkApplyResult::Accept => {
268 info!(
269 chunk = chunk_index,
270 total = snapshot.chunks,
271 "applied snapshot chunk"
272 );
273 }
274 ChunkApplyResult::Retry => {
275 warn!(
278 chunk = chunk_index,
279 "application requested chunk retry — aborting snapshot sync"
280 );
281 return Err(eg!(
282 "snapshot chunk {} apply requested retry (not yet supported)",
283 chunk_index
284 ));
285 }
286 ChunkApplyResult::Abort => {
287 return Err(eg!(
288 "application aborted snapshot sync at chunk {}",
289 chunk_index
290 ));
291 }
292 }
293 }
294
295 *state.last_committed_height = snapshot.height;
297 *state.last_app_hash = BlockHash(snapshot.hash);
300
301 info!(height = snapshot.height.as_u64(), "snapshot sync complete");
302 Ok(true)
303}
304
305pub fn replay_blocks(
311 blocks: &[(Block, Option<hotmint_types::QuorumCertificate>)],
312 state: &mut SyncState<'_>,
313) -> Result<Option<Epoch>> {
314 let mut pending_epoch: Option<Epoch> = None;
318
319 for (i, (block, qc)) in blocks.iter().enumerate() {
320 if let Some(ref ep) = pending_epoch
323 && block.view >= ep.start_view
324 {
325 *state.current_epoch = pending_epoch.take().unwrap();
326 }
327 if i > 0 && block.parent_hash != blocks[i - 1].0.hash {
329 return Err(eg!(
330 "chain discontinuity at height {}: expected parent {}, got {}",
331 block.height.as_u64(),
332 blocks[i - 1].0.hash,
333 block.parent_hash
334 ));
335 }
336 if i == 0
338 && state.last_committed_height.as_u64() > 0
339 && let Some(last) = state
340 .store
341 .get_block_by_height(*state.last_committed_height)
342 && block.parent_hash != last.hash
343 {
344 return Err(eg!(
345 "sync batch first block parent {} does not match last committed block {} at height {}",
346 block.parent_hash,
347 last.hash,
348 state.last_committed_height
349 ));
350 }
351
352 if let Some(cert) = qc {
354 if cert.block_hash != block.hash {
355 return Err(eg!(
356 "sync QC block_hash mismatch at height {}: QC={} block={}",
357 block.height.as_u64(),
358 cert.block_hash,
359 block.hash
360 ));
361 }
362 let verifier = hotmint_crypto::Ed25519Verifier;
364 let qc_bytes = hotmint_types::vote::Vote::signing_bytes(
365 state.chain_id_hash,
366 cert.epoch,
367 cert.view,
368 &cert.block_hash,
369 hotmint_types::vote::VoteType::Vote,
370 );
371 if !hotmint_types::Verifier::verify_aggregate(
372 &verifier,
373 &state.current_epoch.validator_set,
374 &qc_bytes,
375 &cert.aggregate_signature,
376 ) {
377 return Err(eg!(
378 "sync QC signature verification failed at height {}",
379 block.height.as_u64()
380 ));
381 }
382 if !hotmint_crypto::has_quorum(
383 &state.current_epoch.validator_set,
384 &cert.aggregate_signature,
385 ) {
386 return Err(eg!(
387 "sync QC below quorum threshold at height {}",
388 block.height.as_u64()
389 ));
390 }
391 } else if block.height.as_u64() > 0 {
392 return Err(eg!(
395 "sync block at height {} missing commit QC — refusing unverified block",
396 block.height.as_u64()
397 ));
398 }
399
400 if block.height <= *state.last_committed_height {
402 continue;
403 }
404
405 if let Some(expected_leader) = state
409 .current_epoch
410 .validator_set
411 .leader_for_view(block.view)
412 && block.proposer != expected_leader.id
413 {
414 return Err(eg!(
415 "sync block at height {} has wrong proposer: expected {} for view {}, got {}",
416 block.height.as_u64(),
417 expected_leader.id,
418 block.view,
419 block.proposer
420 ));
421 }
422
423 let expected_hash = hotmint_crypto::compute_block_hash(block);
425 if block.hash != expected_hash {
426 return Err(eg!(
427 "sync block hash mismatch at height {}: declared {} != computed {}",
428 block.height.as_u64(),
429 block.hash,
430 expected_hash
431 ));
432 }
433
434 if state.app.tracks_app_hash() && block.app_hash != *state.last_app_hash {
439 return Err(eg!(
440 "sync block app_hash mismatch at height {}: block {} != local {}",
441 block.height.as_u64(),
442 block.app_hash,
443 state.last_app_hash
444 ));
445 }
446
447 state.store.put_block(block.clone());
450 if let Some(commit_qc) = qc {
451 state.store.put_commit_qc(block.height, commit_qc.clone());
452 }
453
454 let ctx = BlockContext {
456 height: block.height,
457 view: block.view,
458 proposer: block.proposer,
459 epoch: state.current_epoch.number,
460 epoch_start_view: state.current_epoch.start_view,
461 validator_set: &state.current_epoch.validator_set,
462 vote_extensions: vec![],
463 };
464
465 if !state.app.validate_block(block, &ctx) {
466 return Err(eg!(
467 "app rejected synced block at height {}",
468 block.height.as_u64()
469 ));
470 }
471
472 let txs = commit::decode_payload(&block.payload);
473 let response = state
474 .app
475 .execute_block(&txs, &ctx)
476 .c(d!("execute_block failed during sync"))?;
477
478 state
479 .app
480 .on_commit(block, &ctx)
481 .c(d!("on_commit failed during sync"))?;
482
483 *state.last_app_hash = if state.app.tracks_app_hash() {
484 response.app_hash
485 } else {
486 block.app_hash
489 };
490
491 if !response.validator_updates.is_empty() {
493 let new_vs = state
494 .current_epoch
495 .validator_set
496 .apply_updates(&response.validator_updates);
497 let epoch_start = ViewNumber(block.view.as_u64() + 2);
498 pending_epoch = Some(Epoch::new(
499 state.current_epoch.number.next(),
500 epoch_start,
501 new_vs,
502 ));
503 }
504
505 *state.last_committed_height = block.height;
506 }
507
508 if let Some(ref ep) = pending_epoch
511 && let Some((last_block, _)) = blocks.last()
512 && last_block.view >= ep.start_view
513 {
514 *state.current_epoch = pending_epoch.take().unwrap();
515 }
516
517 Ok(pending_epoch)
519}
520
521#[cfg(test)]
522mod tests {
523 use super::*;
524 use crate::application::NoopApplication;
525 use crate::store::MemoryBlockStore;
526 use hotmint_types::epoch::EpochNumber;
527 use hotmint_types::{BlockHash, QuorumCertificate, ValidatorId, ViewNumber};
528
529 const TEST_CHAIN: [u8; 32] = [0u8; 32];
530
531 fn make_qc(block: &Block, signer: &hotmint_crypto::Ed25519Signer) -> QuorumCertificate {
532 let vote_bytes = hotmint_types::vote::Vote::signing_bytes(
533 &TEST_CHAIN,
534 EpochNumber(0),
535 block.view,
536 &block.hash,
537 hotmint_types::vote::VoteType::Vote,
538 );
539 let sig = hotmint_types::Signer::sign(signer, &vote_bytes);
540 let mut agg = hotmint_types::AggregateSignature::new(1);
541 agg.add(0, sig).unwrap();
542 QuorumCertificate {
543 block_hash: block.hash,
544 view: block.view,
545 aggregate_signature: agg,
546 epoch: EpochNumber(0),
547 }
548 }
549
550 fn make_block(height: u64, parent: BlockHash) -> Block {
551 let mut block = Block {
552 height: Height(height),
553 parent_hash: parent,
554 view: ViewNumber(height),
555 proposer: ValidatorId(0),
556 timestamp: 0,
557 payload: vec![],
558 app_hash: BlockHash::GENESIS,
559 evidence: Vec::new(),
560 hash: BlockHash::GENESIS, };
562 block.hash = hotmint_crypto::compute_block_hash(&block);
563 block
564 }
565
566 #[test]
567 fn test_replay_blocks_valid_chain() {
568 let mut store = MemoryBlockStore::new();
569 let app = NoopApplication;
570 let signer = hotmint_crypto::Ed25519Signer::generate(ValidatorId(0));
571 let vs = hotmint_types::ValidatorSet::new(vec![hotmint_types::ValidatorInfo {
572 id: ValidatorId(0),
573 public_key: hotmint_types::Signer::public_key(&signer),
574 power: 1,
575 }]);
576 let mut epoch = Epoch::genesis(vs);
577 let mut height = Height::GENESIS;
578
579 let b1 = make_block(1, BlockHash::GENESIS);
580 let b2 = make_block(2, b1.hash);
581 let b3 = make_block(3, b2.hash);
582
583 let qc1 = make_qc(&b1, &signer);
584 let qc2 = make_qc(&b2, &signer);
585 let qc3 = make_qc(&b3, &signer);
586
587 let blocks: Vec<_> = vec![(b1, Some(qc1)), (b2, Some(qc2)), (b3, Some(qc3))];
588 let mut app_hash = BlockHash::GENESIS;
589 let mut state = SyncState {
590 store: &mut store,
591 app: &app,
592 current_epoch: &mut epoch,
593 last_committed_height: &mut height,
594 last_app_hash: &mut app_hash,
595 chain_id_hash: &TEST_CHAIN,
596 };
597 replay_blocks(&blocks, &mut state).unwrap();
598 assert_eq!(height, Height(3));
599 assert!(store.get_block_by_height(Height(1)).is_some());
600 assert!(store.get_block_by_height(Height(3)).is_some());
601 }
602
603 #[test]
604 fn test_replay_blocks_rejects_missing_qc() {
605 let mut store = MemoryBlockStore::new();
606 let app = NoopApplication;
607 let signer = hotmint_crypto::Ed25519Signer::generate(ValidatorId(0));
608 let vs = hotmint_types::ValidatorSet::new(vec![hotmint_types::ValidatorInfo {
609 id: ValidatorId(0),
610 public_key: hotmint_types::Signer::public_key(&signer),
611 power: 1,
612 }]);
613 let mut epoch = Epoch::genesis(vs);
614 let mut height = Height::GENESIS;
615
616 let b1 = make_block(1, BlockHash::GENESIS);
617 let qc1 = make_qc(&b1, &signer);
618 let b2 = make_block(2, b1.hash);
619 let blocks: Vec<_> = vec![(b1, Some(qc1)), (b2, None)];
621 let mut app_hash = BlockHash::GENESIS;
622 let mut state = SyncState {
623 store: &mut store,
624 app: &app,
625 current_epoch: &mut epoch,
626 last_committed_height: &mut height,
627 last_app_hash: &mut app_hash,
628 chain_id_hash: &TEST_CHAIN,
629 };
630 assert!(replay_blocks(&blocks, &mut state).is_err());
631 }
632
633 #[test]
634 fn test_replay_blocks_broken_chain() {
635 let mut store = MemoryBlockStore::new();
636 let app = NoopApplication;
637 let signer = hotmint_crypto::Ed25519Signer::generate(ValidatorId(0));
638 let vs = hotmint_types::ValidatorSet::new(vec![hotmint_types::ValidatorInfo {
639 id: ValidatorId(0),
640 public_key: hotmint_types::Signer::public_key(&signer),
641 power: 1,
642 }]);
643 let mut epoch = Epoch::genesis(vs);
644 let mut height = Height::GENESIS;
645
646 let b1 = make_block(1, BlockHash::GENESIS);
647 let b3 = make_block(3, BlockHash([99u8; 32])); let qc1 = make_qc(&b1, &signer);
650 let qc3 = make_qc(&b3, &signer);
651 let blocks: Vec<_> = vec![(b1, Some(qc1)), (b3, Some(qc3))];
652 let mut app_hash = BlockHash::GENESIS;
653 let mut state = SyncState {
654 store: &mut store,
655 app: &app,
656 current_epoch: &mut epoch,
657 last_committed_height: &mut height,
658 last_app_hash: &mut app_hash,
659 chain_id_hash: &TEST_CHAIN,
660 };
661 assert!(replay_blocks(&blocks, &mut state).is_err());
662 }
663}