Skip to main content

hotmint_consensus/
sync.rs

1//! Block sync: allows a node that is behind to catch up by requesting
2//! missing blocks from peers and replaying the commit lifecycle.
3
4use ruc::*;
5
6use crate::application::Application;
7use crate::commit;
8use crate::store::BlockStore;
9use hotmint_types::context::BlockContext;
10use hotmint_types::epoch::Epoch;
11use hotmint_types::sync::{MAX_SYNC_BATCH, SyncRequest, SyncResponse};
12use hotmint_types::{Block, Height};
13use tokio::sync::mpsc;
14use tokio::time::{Duration, timeout};
15use tracing::info;
16
17const SYNC_TIMEOUT: Duration = Duration::from_secs(10);
18
19/// Run block sync: request missing blocks from peers and replay them.
20///
21/// This should be called **before** the consensus engine starts.
22/// Returns the updated (height, epoch) after syncing.
23pub async fn sync_to_tip(
24    store: &mut dyn BlockStore,
25    app: &dyn Application,
26    current_epoch: &mut Epoch,
27    last_committed_height: &mut Height,
28    request_tx: &mpsc::UnboundedSender<SyncRequest>,
29    response_rx: &mut mpsc::UnboundedReceiver<SyncResponse>,
30) -> Result<()> {
31    // First, get status from peer
32    request_tx
33        .send(SyncRequest::GetStatus)
34        .map_err(|_| eg!("sync channel closed"))?;
35
36    let peer_status = match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
37        Ok(Some(SyncResponse::Status {
38            last_committed_height: peer_height,
39            ..
40        })) => peer_height,
41        Ok(Some(SyncResponse::Error(e))) => return Err(eg!("peer error: {}", e)),
42        Ok(Some(SyncResponse::Blocks(_))) => return Err(eg!("unexpected blocks response")),
43        Ok(None) => return Err(eg!("sync channel closed")),
44        Err(_) => {
45            info!("sync status request timed out, starting from current state");
46            return Ok(());
47        }
48    };
49
50    if peer_status <= *last_committed_height {
51        info!(
52            our_height = last_committed_height.as_u64(),
53            peer_height = peer_status.as_u64(),
54            "already caught up"
55        );
56        return Ok(());
57    }
58
59    info!(
60        our_height = last_committed_height.as_u64(),
61        peer_height = peer_status.as_u64(),
62        "starting block sync"
63    );
64
65    // Batch sync loop
66    loop {
67        let from = Height(last_committed_height.as_u64() + 1);
68        let to = Height(std::cmp::min(
69            from.as_u64() + MAX_SYNC_BATCH - 1,
70            peer_status.as_u64(),
71        ));
72
73        request_tx
74            .send(SyncRequest::GetBlocks {
75                from_height: from,
76                to_height: to,
77            })
78            .map_err(|_| eg!("sync channel closed"))?;
79
80        let blocks = match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
81            Ok(Some(SyncResponse::Blocks(blocks))) => blocks,
82            Ok(Some(SyncResponse::Error(e))) => return Err(eg!("peer error: {}", e)),
83            Ok(Some(SyncResponse::Status { .. })) => return Err(eg!("unexpected status response")),
84            Ok(None) => return Err(eg!("sync channel closed")),
85            Err(_) => return Err(eg!("sync request timed out")),
86        };
87
88        if blocks.is_empty() {
89            break;
90        }
91
92        // Validate chain continuity and replay
93        replay_blocks(&blocks, store, app, current_epoch, last_committed_height)?;
94
95        info!(
96            synced_to = last_committed_height.as_u64(),
97            target = peer_status.as_u64(),
98            "sync progress"
99        );
100
101        if *last_committed_height >= peer_status {
102            break;
103        }
104    }
105
106    info!(
107        height = last_committed_height.as_u64(),
108        epoch = %current_epoch.number,
109        "block sync complete"
110    );
111    Ok(())
112}
113
114/// Replay a batch of blocks: store them and run the application lifecycle.
115/// Validates chain continuity (parent_hash linkage).
116pub fn replay_blocks(
117    blocks: &[Block],
118    store: &mut dyn BlockStore,
119    app: &dyn Application,
120    current_epoch: &mut Epoch,
121    last_committed_height: &mut Height,
122) -> Result<()> {
123    for (i, block) in blocks.iter().enumerate() {
124        // Validate chain continuity
125        if i > 0 && block.parent_hash != blocks[i - 1].hash {
126            return Err(eg!(
127                "chain discontinuity at height {}: expected parent {}, got {}",
128                block.height.as_u64(),
129                blocks[i - 1].hash,
130                block.parent_hash
131            ));
132        }
133
134        // Skip already-committed blocks
135        if block.height <= *last_committed_height {
136            continue;
137        }
138
139        // Store the block
140        store.put_block(block.clone());
141
142        // Run application lifecycle
143        let ctx = BlockContext {
144            height: block.height,
145            view: block.view,
146            proposer: block.proposer,
147            epoch: current_epoch.number,
148            validator_set: &current_epoch.validator_set,
149        };
150
151        app.begin_block(&ctx)
152            .c(d!("begin_block failed during sync"))?;
153
154        for tx in commit::decode_payload(&block.payload) {
155            app.deliver_tx(tx).c(d!("deliver_tx failed during sync"))?;
156        }
157
158        let response = app.end_block(&ctx).c(d!("end_block failed during sync"))?;
159
160        app.on_commit(block, &ctx)
161            .c(d!("on_commit failed during sync"))?;
162
163        // Handle epoch transitions
164        if !response.validator_updates.is_empty() {
165            let new_vs = current_epoch
166                .validator_set
167                .apply_updates(&response.validator_updates);
168            *current_epoch = Epoch::new(current_epoch.number.next(), block.view, new_vs);
169        }
170
171        *last_committed_height = block.height;
172    }
173
174    Ok(())
175}
176
177#[cfg(test)]
178mod tests {
179    use super::*;
180    use crate::application::NoopApplication;
181    use crate::store::MemoryBlockStore;
182    use hotmint_types::{BlockHash, ValidatorId, ViewNumber};
183
184    fn make_block(height: u64, parent: BlockHash) -> Block {
185        let hash = BlockHash([height as u8; 32]);
186        Block {
187            height: Height(height),
188            parent_hash: parent,
189            view: ViewNumber(height),
190            proposer: ValidatorId(0),
191            payload: vec![],
192            hash,
193        }
194    }
195
196    #[test]
197    fn test_replay_blocks_valid_chain() {
198        let mut store = MemoryBlockStore::new();
199        let app = NoopApplication;
200        let vs = hotmint_types::ValidatorSet::new(vec![hotmint_types::ValidatorInfo {
201            id: ValidatorId(0),
202            public_key: hotmint_types::PublicKey(vec![0]),
203            power: 1,
204        }]);
205        let mut epoch = Epoch::genesis(vs);
206        let mut height = Height::GENESIS;
207
208        let b1 = make_block(1, BlockHash::GENESIS);
209        let b2 = make_block(2, b1.hash);
210        let b3 = make_block(3, b2.hash);
211
212        replay_blocks(&[b1, b2, b3], &mut store, &app, &mut epoch, &mut height).unwrap();
213        assert_eq!(height, Height(3));
214        assert!(store.get_block_by_height(Height(1)).is_some());
215        assert!(store.get_block_by_height(Height(3)).is_some());
216    }
217
218    #[test]
219    fn test_replay_blocks_broken_chain() {
220        let mut store = MemoryBlockStore::new();
221        let app = NoopApplication;
222        let vs = hotmint_types::ValidatorSet::new(vec![hotmint_types::ValidatorInfo {
223            id: ValidatorId(0),
224            public_key: hotmint_types::PublicKey(vec![0]),
225            power: 1,
226        }]);
227        let mut epoch = Epoch::genesis(vs);
228        let mut height = Height::GENESIS;
229
230        let b1 = make_block(1, BlockHash::GENESIS);
231        let b3 = make_block(3, BlockHash([99u8; 32])); // wrong parent
232
233        assert!(replay_blocks(&[b1, b3], &mut store, &app, &mut epoch, &mut height).is_err());
234    }
235}