Skip to main content

sc_network/
ipfs_block_provider.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use core::marker::PhantomData;
20use futures::{stream::FusedStream, Stream, StreamExt};
21use log::{debug, warn};
22use sc_client_api::{BlockBackend, BlockchainEvents, FinalityNotifications};
23use sp_core::H256;
24use sp_runtime::traits::{BlakeTwo256, Block, Hash, Header, NumberFor, Saturating, Zero};
25use std::{
26	collections::{hash_map::Entry, HashMap, VecDeque},
27	pin::Pin,
28	sync::Arc,
29	task::{Context, Poll},
30};
31
32/// Log target for this file.
33const LOG_TARGET: &str = "sub-libp2p::ipfs";
34
35/// Multihash wiith allocated size of 32 bytes for hash value.
36type Multihash = cid::multihash::Multihash<32>;
37
38/// A change to the blocks available from a [`BlockProvider`].
39pub enum Change {
40	/// The block with the given hash is now available.
41	Added(Multihash),
42	/// The block with the given hash is no longer available.
43	Removed(Multihash),
44}
45
46/// Provides blocks to be served over IPFS. Requires `Send` and `Sync` so we can write `Arc<dyn
47/// BlockProvider>` instead of `Arc<dyn BlockProvider + Send + Sync>`.
48pub trait BlockProvider: Send + Sync {
49	/// Returns `true` if we have the block with the given hash.
50	fn have(&self, multihash: &Multihash) -> bool;
51
52	/// Returns the block with the given hash if possible, otherwise returns `None`.
53	fn get(&self, multihash: &Multihash) -> Option<Vec<u8>>;
54
55	/// Returns a stream of changes to the available blocks. All blocks which are available at the
56	/// point of calling will be reported as additions. Note that the stream may not be perfectly
57	/// synchronised with [`have`](Self::have)/[`get`](Self::get).
58	fn changes(&self) -> Pin<Box<dyn Stream<Item = Change> + Send>>;
59}
60
61/// Implemented for hasher types such as [`BlakeTwo256`], providing the corresponding Multihash
62/// code.
63trait HasMultihashCode {
64	/// The Multihash code for the hasher.
65	const MULTIHASH_CODE: u64;
66}
67
68impl HasMultihashCode for BlakeTwo256 {
69	const MULTIHASH_CODE: u64 = 0xb220;
70}
71
72fn try_from_multihash<H: Hash + HasMultihashCode>(multihash: &Multihash) -> Option<H::Output> {
73	if multihash.code() != H::MULTIHASH_CODE {
74		return None;
75	}
76	let mut hash = H::Output::default();
77	let src = multihash.digest();
78	let dst = hash.as_mut();
79	if src.len() != dst.len() {
80		return None;
81	}
82	dst.copy_from_slice(src);
83	Some(hash)
84}
85
86fn to_multihash<H: Hash + HasMultihashCode>(hash: &H::Output) -> Multihash {
87	Multihash::wrap(H::MULTIHASH_CODE, hash.as_ref()).expect("Hash size is fixed and small enough")
88}
89
90/// A block containing indexed transactions.
91struct IndexedBlock<B: Block> {
92	number: NumberFor<B>,
93	/// BLAKE2b-256 hashes of the indexed transactions.
94	// TODO: index transactions using multihash and not bare hash value, and use `Multihash`
95	// here instead of `H256` interpreted as BLAKE2b-256.
96	transaction_hashes: Vec<H256>,
97}
98
99struct IndexedTransactionChanges<B: Block, C> {
100	client: Arc<C>,
101	/// Number of finalized blocks kept by the client.
102	num_blocks_kept: u32,
103	finality_notifications: FinalityNotifications<B>,
104	/// The number of the last finalized block, _plus one_.
105	finalized_to: NumberFor<B>,
106	/// Finalized blocks with indexed transactions. Old blocks are at the front, new blocks at the
107	/// back. Transaction hashes and blocks are popped as they are reported as removed.
108	blocks: VecDeque<IndexedBlock<B>>,
109	/// The number of the last fetched block, _plus one_. Blocks are added to
110	/// [`blocks`](Self::blocks) as they are fetched, but only if they contain indexed
111	/// transactions.
112	fetched_to: NumberFor<B>,
113	/// The number of indexed transactions in the last fetched block which have been handled, or
114	/// `None` if all have been handled. Additional blocks will not be fetched if this is `Some`.
115	added_to: Option<usize>,
116	/// The BLAKE2b-256 hashes of all available transactions are present in this map. The `u32` is
117	/// the number of transactions with the hash, minus one. This is used to deduplicate
118	/// added/removed reports.
119	extra_refs: HashMap<H256, u32>,
120}
121
122impl<B, C> IndexedTransactionChanges<B, C>
123where
124	B: Block,
125	C: BlockchainEvents<B>,
126{
127	fn new(client: Arc<C>, num_blocks_kept: u32) -> Self {
128		let finality_notifications = client.finality_notification_stream();
129		Self {
130			client,
131			num_blocks_kept,
132			finality_notifications,
133			finalized_to: Zero::zero(),
134			blocks: VecDeque::new(),
135			fetched_to: Zero::zero(),
136			added_to: None,
137			extra_refs: HashMap::new(),
138		}
139	}
140}
141
142impl<B: Block, C> Unpin for IndexedTransactionChanges<B, C> {}
143
144impl<B, C> Stream for IndexedTransactionChanges<B, C>
145where
146	B: Block,
147	C: BlockBackend<B>,
148{
149	type Item = Change;
150
151	fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
152		let this = self.get_mut();
153
154		// Update finalized_to
155		if !this.finality_notifications.is_terminated() {
156			while let Poll::Ready(Some(notification)) =
157				this.finality_notifications.poll_next_unpin(cx)
158			{
159				this.finalized_to = *notification.header.number() + 1u32.into();
160			}
161		}
162
163		// Handle (assumed) pruned blocks
164		let pruned_to = this.finalized_to.saturating_sub(this.num_blocks_kept.into());
165		this.fetched_to = this.fetched_to.max(pruned_to); // Don't try to fetch pruned blocks!
166		while let (only_block, Some(block)) = (this.blocks.len() == 1, this.blocks.front_mut()) {
167			if block.number >= pruned_to {
168				break; // Not pruned
169			}
170
171			// Discard any transaction hashes that we didn't even add yet
172			if let (true, Some(added_to)) = (only_block, this.added_to) {
173				block.transaction_hashes.truncate(added_to);
174				this.added_to = None;
175			}
176
177			while let Some(hash) = block.transaction_hashes.pop() {
178				match this.extra_refs.entry(hash) {
179					Entry::Occupied(mut entry) => match entry.get().checked_sub(1) {
180						Some(extra_refs) => {
181							entry.insert(extra_refs);
182						},
183						None => {
184							entry.remove();
185							// TODO: don't assume hash is BLAKE2b, use `Multihash` instead.
186							return Poll::Ready(Some(Change::Removed(
187								to_multihash::<BlakeTwo256>(&hash),
188							)));
189						},
190					},
191					// This should not be possible!
192					Entry::Vacant(_) => warn!("Pruned transaction hash {hash} not found"),
193				}
194			}
195
196			this.blocks.pop_front();
197		}
198
199		// Handle finalized blocks
200		loop {
201			// Fetch finalized blocks
202			while this.added_to.is_none() && (this.fetched_to < this.finalized_to) {
203				let hashes = this.client.block_hash(this.fetched_to).and_then(|hash| {
204					let hash = hash.ok_or_else(|| {
205						sp_blockchain::Error::UnknownBlock(format!(
206							"Hash of block {} not found",
207							this.fetched_to,
208						))
209					})?;
210					this.client.block_indexed_hashes(hash)
211				});
212				match hashes {
213					Ok(Some(hashes)) if !hashes.is_empty() => {
214						this.blocks.push_back(IndexedBlock {
215							number: this.fetched_to,
216							transaction_hashes: hashes,
217						});
218						this.added_to = Some(0);
219					},
220					Ok(_) => (),
221					Err(err) => debug!("Error fetching block {}: {err}", this.fetched_to),
222				}
223				this.fetched_to += 1u32.into();
224			}
225
226			// Add from last fetched block
227			while let Some(added_to) = &mut this.added_to {
228				let block = this.blocks.back().expect(
229					"added_to only set to Some after pushing a block, \
230					set to None before popping last block",
231				);
232				let hash = block.transaction_hashes[*added_to];
233				*added_to += 1;
234				if *added_to == block.transaction_hashes.len() {
235					this.added_to = None;
236				}
237
238				match this.extra_refs.entry(hash) {
239					Entry::Occupied(mut entry) => *entry.get_mut() += 1,
240					Entry::Vacant(entry) => {
241						entry.insert(0);
242						// TODO: don't assume hash is BLAKE2b, use `Multihash` instead.
243						return Poll::Ready(Some(Change::Added(to_multihash::<BlakeTwo256>(
244							&hash,
245						))));
246					},
247				}
248			}
249
250			// Fully handled last fetched block. Loop if there are more blocks to fetch, otherwise
251			// nothing to do.
252			debug_assert!(this.fetched_to <= this.finalized_to);
253			if this.fetched_to == this.finalized_to {
254				return Poll::Pending;
255			}
256		}
257	}
258}
259
260/// Implements [`BlockProvider`], providing access to indexed transactions in the wrapped client.
261/// Note that it isn't possible to just implement [`BlockProvider`] on types implementing
262/// [`BlockBackend`] because `BlockBackend` is generic over the (chain) block type.
263pub struct IndexedTransactions<B, C> {
264	client: Arc<C>,
265	num_blocks_kept: u32,
266	phantom: PhantomData<B>,
267}
268
269impl<B, C> IndexedTransactions<B, C> {
270	/// Create a new `IndexedTransactions` wrapper over the given client. The client is assumed to
271	/// keep `num_blocks_kept` finalized blocks.
272	pub fn new(client: Arc<C>, num_blocks_kept: u32) -> Self {
273		Self { client, num_blocks_kept, phantom: PhantomData }
274	}
275}
276
277impl<B, C> BlockProvider for IndexedTransactions<B, C>
278where
279	B: Block,
280	C: BlockchainEvents<B> + BlockBackend<B> + Send + Sync + 'static,
281{
282	fn have(&self, multihash: &Multihash) -> bool {
283		let Some(hash) = try_from_multihash::<BlakeTwo256>(multihash) else { return false };
284		match self.client.has_indexed_transaction(hash) {
285			Ok(have) => have,
286			Err(err) => {
287				debug!(target: LOG_TARGET, "Error checking for block {hash:?}: {err}");
288				false
289			},
290		}
291	}
292
293	fn get(&self, multihash: &Multihash) -> Option<Vec<u8>> {
294		let Some(hash) = try_from_multihash::<BlakeTwo256>(multihash) else { return None };
295		match self.client.indexed_transaction(hash) {
296			Ok(block) => block,
297			Err(err) => {
298				debug!(target: LOG_TARGET, "Error getting block {hash:?}: {err}");
299				None
300			},
301		}
302	}
303
304	fn changes(&self) -> Pin<Box<dyn Stream<Item = Change> + Send>> {
305		Box::pin(IndexedTransactionChanges::new(self.client.clone(), self.num_blocks_kept))
306	}
307}