1use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader};
5use crate::{BlockSource, BlockSourceResult, Cache, ChainNotifier};
6
7use bitcoin::block::Header;
8use bitcoin::hash_types::BlockHash;
9use bitcoin::network::Network;
10
11use lightning::chain;
12use lightning::chain::BestBlock;
13
14use std::ops::Deref;
15
16pub async fn validate_best_block_header<B: Deref>(
23 block_source: B,
24) -> BlockSourceResult<ValidatedBlockHeader>
25where
26 B::Target: BlockSource,
27{
28 let (best_block_hash, best_block_height) = block_source.get_best_block().await?;
29 block_source.get_header(&best_block_hash, best_block_height).await?.validate(best_block_hash)
30}
31
32pub async fn synchronize_listeners<
142 B: Deref + Sized + Send + Sync,
143 C: Cache,
144 L: chain::Listen + ?Sized,
145>(
146 block_source: B, network: Network, header_cache: &mut C,
147 mut chain_listeners: Vec<(BlockHash, &L)>,
148) -> BlockSourceResult<ValidatedBlockHeader>
149where
150 B::Target: BlockSource,
151{
152 let best_header = validate_best_block_header(&*block_source).await?;
153
154 let mut chain_listeners_with_old_headers = Vec::new();
156 for (old_block_hash, chain_listener) in chain_listeners.drain(..) {
157 let old_header = match header_cache.look_up(&old_block_hash) {
158 Some(header) => *header,
159 None => {
160 block_source.get_header(&old_block_hash, None).await?.validate(old_block_hash)?
161 },
162 };
163 chain_listeners_with_old_headers.push((old_header, chain_listener))
164 }
165
166 let mut chain_poller = ChainPoller::new(block_source, network);
168 let mut chain_listeners_at_height = Vec::new();
169 let mut most_common_ancestor = None;
170 let mut most_connected_blocks = Vec::new();
171 for (old_header, chain_listener) in chain_listeners_with_old_headers.drain(..) {
172 let header_cache = &mut ReadOnlyCache(header_cache);
174 let (common_ancestor, connected_blocks) = {
175 let chain_listener = &DynamicChainListener(chain_listener);
176 let mut chain_notifier = ChainNotifier { header_cache, chain_listener };
177 let difference =
178 chain_notifier.find_difference(best_header, &old_header, &mut chain_poller).await?;
179 chain_notifier.disconnect_blocks(difference.disconnected_blocks);
180 (difference.common_ancestor, difference.connected_blocks)
181 };
182
183 chain_listeners_at_height.push((common_ancestor.height, chain_listener));
185 if connected_blocks.len() > most_connected_blocks.len() {
186 most_common_ancestor = Some(common_ancestor);
187 most_connected_blocks = connected_blocks;
188 }
189 }
190
191 if let Some(common_ancestor) = most_common_ancestor {
193 let chain_listener = &ChainListenerSet(chain_listeners_at_height);
194 let mut chain_notifier = ChainNotifier { header_cache, chain_listener };
195 chain_notifier
196 .connect_blocks(common_ancestor, most_connected_blocks, &mut chain_poller)
197 .await
198 .map_err(|(e, _)| e)?;
199 }
200
201 Ok(best_header)
202}
203
204struct ReadOnlyCache<'a, C: Cache>(&'a mut C);
209
210impl<'a, C: Cache> Cache for ReadOnlyCache<'a, C> {
211 fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
212 self.0.look_up(block_hash)
213 }
214
215 fn block_connected(&mut self, _block_hash: BlockHash, _block_header: ValidatedBlockHeader) {
216 unreachable!()
217 }
218
219 fn block_disconnected(&mut self, _block_hash: &BlockHash) -> Option<ValidatedBlockHeader> {
220 None
221 }
222}
223
224struct DynamicChainListener<'a, L: chain::Listen + ?Sized>(&'a L);
226
227impl<'a, L: chain::Listen + ?Sized> chain::Listen for DynamicChainListener<'a, L> {
228 fn filtered_block_connected(
229 &self, _header: &Header, _txdata: &chain::transaction::TransactionData, _height: u32,
230 ) {
231 unreachable!()
232 }
233
234 fn blocks_disconnected(&self, fork_point: BestBlock) {
235 self.0.blocks_disconnected(fork_point)
236 }
237}
238
239struct ChainListenerSet<'a, L: chain::Listen + ?Sized>(Vec<(u32, &'a L)>);
241
242impl<'a, L: chain::Listen + ?Sized> chain::Listen for ChainListenerSet<'a, L> {
243 fn block_connected(&self, block: &bitcoin::Block, height: u32) {
244 for (starting_height, chain_listener) in self.0.iter() {
245 if height > *starting_height {
246 chain_listener.block_connected(block, height);
247 }
248 }
249 }
250
251 fn filtered_block_connected(
252 &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
253 ) {
254 for (starting_height, chain_listener) in self.0.iter() {
255 if height > *starting_height {
256 chain_listener.filtered_block_connected(header, txdata, height);
257 }
258 }
259 }
260
261 fn blocks_disconnected(&self, _fork_point: BestBlock) {
262 unreachable!()
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269 use crate::test_utils::{Blockchain, MockChainListener};
270
271 #[tokio::test]
272 async fn sync_from_same_chain() {
273 let chain = Blockchain::default().with_height(4);
274
275 let listener_1 = MockChainListener::new()
276 .expect_block_connected(*chain.at_height(2))
277 .expect_block_connected(*chain.at_height(3))
278 .expect_block_connected(*chain.at_height(4));
279 let listener_2 = MockChainListener::new()
280 .expect_block_connected(*chain.at_height(3))
281 .expect_block_connected(*chain.at_height(4));
282 let listener_3 = MockChainListener::new().expect_block_connected(*chain.at_height(4));
283
284 let listeners = vec![
285 (chain.at_height(1).block_hash, &listener_1 as &dyn chain::Listen),
286 (chain.at_height(2).block_hash, &listener_2 as &dyn chain::Listen),
287 (chain.at_height(3).block_hash, &listener_3 as &dyn chain::Listen),
288 ];
289 let mut cache = chain.header_cache(0..=4);
290 match synchronize_listeners(&chain, Network::Bitcoin, &mut cache, listeners).await {
291 Ok(header) => assert_eq!(header, chain.tip()),
292 Err(e) => panic!("Unexpected error: {:?}", e),
293 }
294 }
295
296 #[tokio::test]
297 async fn sync_from_different_chains() {
298 let main_chain = Blockchain::default().with_height(4);
299 let fork_chain_1 = main_chain.fork_at_height(1);
300 let fork_chain_2 = main_chain.fork_at_height(2);
301 let fork_chain_3 = main_chain.fork_at_height(3);
302
303 let listener_1 = MockChainListener::new()
304 .expect_blocks_disconnected(*fork_chain_1.at_height(1))
305 .expect_block_connected(*main_chain.at_height(2))
306 .expect_block_connected(*main_chain.at_height(3))
307 .expect_block_connected(*main_chain.at_height(4));
308 let listener_2 = MockChainListener::new()
309 .expect_blocks_disconnected(*fork_chain_2.at_height(2))
310 .expect_block_connected(*main_chain.at_height(3))
311 .expect_block_connected(*main_chain.at_height(4));
312 let listener_3 = MockChainListener::new()
313 .expect_blocks_disconnected(*fork_chain_3.at_height(3))
314 .expect_block_connected(*main_chain.at_height(4));
315
316 let listeners = vec![
317 (fork_chain_1.tip().block_hash, &listener_1 as &dyn chain::Listen),
318 (fork_chain_2.tip().block_hash, &listener_2 as &dyn chain::Listen),
319 (fork_chain_3.tip().block_hash, &listener_3 as &dyn chain::Listen),
320 ];
321 let mut cache = fork_chain_1.header_cache(2..=4);
322 cache.extend(fork_chain_2.header_cache(3..=4));
323 cache.extend(fork_chain_3.header_cache(4..=4));
324 match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await {
325 Ok(header) => assert_eq!(header, main_chain.tip()),
326 Err(e) => panic!("Unexpected error: {:?}", e),
327 }
328 }
329
330 #[tokio::test]
331 async fn sync_from_overlapping_chains() {
332 let main_chain = Blockchain::default().with_height(4);
333 let fork_chain_1 = main_chain.fork_at_height(1);
334 let fork_chain_2 = fork_chain_1.fork_at_height(2);
335 let fork_chain_3 = fork_chain_2.fork_at_height(3);
336
337 let listener_1 = MockChainListener::new()
338 .expect_blocks_disconnected(*fork_chain_1.at_height(1))
339 .expect_block_connected(*main_chain.at_height(2))
340 .expect_block_connected(*main_chain.at_height(3))
341 .expect_block_connected(*main_chain.at_height(4));
342 let listener_2 = MockChainListener::new()
343 .expect_blocks_disconnected(*fork_chain_2.at_height(1))
344 .expect_block_connected(*main_chain.at_height(2))
345 .expect_block_connected(*main_chain.at_height(3))
346 .expect_block_connected(*main_chain.at_height(4));
347 let listener_3 = MockChainListener::new()
348 .expect_blocks_disconnected(*fork_chain_3.at_height(1))
349 .expect_block_connected(*main_chain.at_height(2))
350 .expect_block_connected(*main_chain.at_height(3))
351 .expect_block_connected(*main_chain.at_height(4));
352
353 let listeners = vec![
354 (fork_chain_1.tip().block_hash, &listener_1 as &dyn chain::Listen),
355 (fork_chain_2.tip().block_hash, &listener_2 as &dyn chain::Listen),
356 (fork_chain_3.tip().block_hash, &listener_3 as &dyn chain::Listen),
357 ];
358 let mut cache = fork_chain_1.header_cache(2..=4);
359 cache.extend(fork_chain_2.header_cache(3..=4));
360 cache.extend(fork_chain_3.header_cache(4..=4));
361 match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await {
362 Ok(header) => assert_eq!(header, main_chain.tip()),
363 Err(e) => panic!("Unexpected error: {:?}", e),
364 }
365 }
366
367 #[tokio::test]
368 async fn cache_connected_and_keep_disconnected_blocks() {
369 let main_chain = Blockchain::default().with_height(2);
370 let fork_chain = main_chain.fork_at_height(1);
371 let new_tip = main_chain.tip();
372 let old_tip = fork_chain.tip();
373
374 let listener = MockChainListener::new()
375 .expect_blocks_disconnected(*fork_chain.at_height(1))
376 .expect_block_connected(*new_tip);
377
378 let listeners = vec![(old_tip.block_hash, &listener as &dyn chain::Listen)];
379 let mut cache = fork_chain.header_cache(2..=2);
380 match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await {
381 Ok(_) => {
382 assert!(cache.contains_key(&new_tip.block_hash));
383 assert!(cache.contains_key(&old_tip.block_hash));
384 },
385 Err(e) => panic!("Unexpected error: {:?}", e),
386 }
387 }
388}