1use ruc::*;
5
6use crate::application::Application;
7use crate::commit;
8use crate::store::BlockStore;
9use hotmint_types::context::BlockContext;
10use hotmint_types::epoch::Epoch;
11use hotmint_types::sync::{MAX_SYNC_BATCH, SyncRequest, SyncResponse};
12use hotmint_types::{Block, Height};
13use tokio::sync::mpsc;
14use tokio::time::{Duration, timeout};
15use tracing::info;
16
17const SYNC_TIMEOUT: Duration = Duration::from_secs(10);
18
19pub async fn sync_to_tip(
24 store: &mut dyn BlockStore,
25 app: &dyn Application,
26 current_epoch: &mut Epoch,
27 last_committed_height: &mut Height,
28 request_tx: &mpsc::UnboundedSender<SyncRequest>,
29 response_rx: &mut mpsc::UnboundedReceiver<SyncResponse>,
30) -> Result<()> {
31 request_tx
33 .send(SyncRequest::GetStatus)
34 .map_err(|_| eg!("sync channel closed"))?;
35
36 let peer_status = match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
37 Ok(Some(SyncResponse::Status {
38 last_committed_height: peer_height,
39 ..
40 })) => peer_height,
41 Ok(Some(SyncResponse::Error(e))) => return Err(eg!("peer error: {}", e)),
42 Ok(Some(SyncResponse::Blocks(_))) => return Err(eg!("unexpected blocks response")),
43 Ok(None) => return Err(eg!("sync channel closed")),
44 Err(_) => {
45 info!("sync status request timed out, starting from current state");
46 return Ok(());
47 }
48 };
49
50 if peer_status <= *last_committed_height {
51 info!(
52 our_height = last_committed_height.as_u64(),
53 peer_height = peer_status.as_u64(),
54 "already caught up"
55 );
56 return Ok(());
57 }
58
59 info!(
60 our_height = last_committed_height.as_u64(),
61 peer_height = peer_status.as_u64(),
62 "starting block sync"
63 );
64
65 loop {
67 let from = Height(last_committed_height.as_u64() + 1);
68 let to = Height(std::cmp::min(
69 from.as_u64() + MAX_SYNC_BATCH - 1,
70 peer_status.as_u64(),
71 ));
72
73 request_tx
74 .send(SyncRequest::GetBlocks {
75 from_height: from,
76 to_height: to,
77 })
78 .map_err(|_| eg!("sync channel closed"))?;
79
80 let blocks = match timeout(SYNC_TIMEOUT, response_rx.recv()).await {
81 Ok(Some(SyncResponse::Blocks(blocks))) => blocks,
82 Ok(Some(SyncResponse::Error(e))) => return Err(eg!("peer error: {}", e)),
83 Ok(Some(SyncResponse::Status { .. })) => return Err(eg!("unexpected status response")),
84 Ok(None) => return Err(eg!("sync channel closed")),
85 Err(_) => return Err(eg!("sync request timed out")),
86 };
87
88 if blocks.is_empty() {
89 break;
90 }
91
92 replay_blocks(&blocks, store, app, current_epoch, last_committed_height)?;
94
95 info!(
96 synced_to = last_committed_height.as_u64(),
97 target = peer_status.as_u64(),
98 "sync progress"
99 );
100
101 if *last_committed_height >= peer_status {
102 break;
103 }
104 }
105
106 info!(
107 height = last_committed_height.as_u64(),
108 epoch = %current_epoch.number,
109 "block sync complete"
110 );
111 Ok(())
112}
113
114pub fn replay_blocks(
117 blocks: &[Block],
118 store: &mut dyn BlockStore,
119 app: &dyn Application,
120 current_epoch: &mut Epoch,
121 last_committed_height: &mut Height,
122) -> Result<()> {
123 for (i, block) in blocks.iter().enumerate() {
124 if i > 0 && block.parent_hash != blocks[i - 1].hash {
126 return Err(eg!(
127 "chain discontinuity at height {}: expected parent {}, got {}",
128 block.height.as_u64(),
129 blocks[i - 1].hash,
130 block.parent_hash
131 ));
132 }
133
134 if block.height <= *last_committed_height {
136 continue;
137 }
138
139 store.put_block(block.clone());
141
142 let ctx = BlockContext {
144 height: block.height,
145 view: block.view,
146 proposer: block.proposer,
147 epoch: current_epoch.number,
148 validator_set: ¤t_epoch.validator_set,
149 };
150
151 app.begin_block(&ctx)
152 .c(d!("begin_block failed during sync"))?;
153
154 for tx in commit::decode_payload(&block.payload) {
155 app.deliver_tx(tx).c(d!("deliver_tx failed during sync"))?;
156 }
157
158 let response = app.end_block(&ctx).c(d!("end_block failed during sync"))?;
159
160 app.on_commit(block, &ctx)
161 .c(d!("on_commit failed during sync"))?;
162
163 if !response.validator_updates.is_empty() {
165 let new_vs = current_epoch
166 .validator_set
167 .apply_updates(&response.validator_updates);
168 *current_epoch = Epoch::new(current_epoch.number.next(), block.view, new_vs);
169 }
170
171 *last_committed_height = block.height;
172 }
173
174 Ok(())
175}
176
177#[cfg(test)]
178mod tests {
179 use super::*;
180 use crate::application::NoopApplication;
181 use crate::store::MemoryBlockStore;
182 use hotmint_types::{BlockHash, ValidatorId, ViewNumber};
183
184 fn make_block(height: u64, parent: BlockHash) -> Block {
185 let hash = BlockHash([height as u8; 32]);
186 Block {
187 height: Height(height),
188 parent_hash: parent,
189 view: ViewNumber(height),
190 proposer: ValidatorId(0),
191 payload: vec![],
192 hash,
193 }
194 }
195
196 #[test]
197 fn test_replay_blocks_valid_chain() {
198 let mut store = MemoryBlockStore::new();
199 let app = NoopApplication;
200 let vs = hotmint_types::ValidatorSet::new(vec![hotmint_types::ValidatorInfo {
201 id: ValidatorId(0),
202 public_key: hotmint_types::PublicKey(vec![0]),
203 power: 1,
204 }]);
205 let mut epoch = Epoch::genesis(vs);
206 let mut height = Height::GENESIS;
207
208 let b1 = make_block(1, BlockHash::GENESIS);
209 let b2 = make_block(2, b1.hash);
210 let b3 = make_block(3, b2.hash);
211
212 replay_blocks(&[b1, b2, b3], &mut store, &app, &mut epoch, &mut height).unwrap();
213 assert_eq!(height, Height(3));
214 assert!(store.get_block_by_height(Height(1)).is_some());
215 assert!(store.get_block_by_height(Height(3)).is_some());
216 }
217
218 #[test]
219 fn test_replay_blocks_broken_chain() {
220 let mut store = MemoryBlockStore::new();
221 let app = NoopApplication;
222 let vs = hotmint_types::ValidatorSet::new(vec![hotmint_types::ValidatorInfo {
223 id: ValidatorId(0),
224 public_key: hotmint_types::PublicKey(vec![0]),
225 power: 1,
226 }]);
227 let mut epoch = Epoch::genesis(vs);
228 let mut height = Height::GENESIS;
229
230 let b1 = make_block(1, BlockHash::GENESIS);
231 let b3 = make_block(3, BlockHash([99u8; 32])); assert!(replay_blocks(&[b1, b3], &mut store, &app, &mut epoch, &mut height).is_err());
234 }
235}