1use core::marker::PhantomData;
20use futures::{stream::FusedStream, Stream, StreamExt};
21use log::{debug, warn};
22use sc_client_api::{BlockBackend, BlockchainEvents, FinalityNotifications};
23use sp_core::H256;
24use sp_runtime::traits::{BlakeTwo256, Block, Hash, Header, NumberFor, Saturating, Zero};
25use std::{
26 collections::{hash_map::Entry, HashMap, VecDeque},
27 pin::Pin,
28 sync::Arc,
29 task::{Context, Poll},
30};
31
32const LOG_TARGET: &str = "sub-libp2p::ipfs";
34
35type Multihash = cid::multihash::Multihash<32>;
37
38pub enum Change {
40 Added(Multihash),
42 Removed(Multihash),
44}
45
46pub trait BlockProvider: Send + Sync {
49 fn have(&self, multihash: &Multihash) -> bool;
51
52 fn get(&self, multihash: &Multihash) -> Option<Vec<u8>>;
54
55 fn changes(&self) -> Pin<Box<dyn Stream<Item = Change> + Send>>;
59}
60
61trait HasMultihashCode {
64 const MULTIHASH_CODE: u64;
66}
67
68impl HasMultihashCode for BlakeTwo256 {
69 const MULTIHASH_CODE: u64 = 0xb220;
70}
71
72fn try_from_multihash<H: Hash + HasMultihashCode>(multihash: &Multihash) -> Option<H::Output> {
73 if multihash.code() != H::MULTIHASH_CODE {
74 return None;
75 }
76 let mut hash = H::Output::default();
77 let src = multihash.digest();
78 let dst = hash.as_mut();
79 if src.len() != dst.len() {
80 return None;
81 }
82 dst.copy_from_slice(src);
83 Some(hash)
84}
85
86fn to_multihash<H: Hash + HasMultihashCode>(hash: &H::Output) -> Multihash {
87 Multihash::wrap(H::MULTIHASH_CODE, hash.as_ref()).expect("Hash size is fixed and small enough")
88}
89
90struct IndexedBlock<B: Block> {
92 number: NumberFor<B>,
93 transaction_hashes: Vec<H256>,
97}
98
99struct IndexedTransactionChanges<B: Block, C> {
100 client: Arc<C>,
101 num_blocks_kept: u32,
103 finality_notifications: FinalityNotifications<B>,
104 finalized_to: NumberFor<B>,
106 blocks: VecDeque<IndexedBlock<B>>,
109 fetched_to: NumberFor<B>,
113 added_to: Option<usize>,
116 extra_refs: HashMap<H256, u32>,
120}
121
122impl<B, C> IndexedTransactionChanges<B, C>
123where
124 B: Block,
125 C: BlockchainEvents<B>,
126{
127 fn new(client: Arc<C>, num_blocks_kept: u32) -> Self {
128 let finality_notifications = client.finality_notification_stream();
129 Self {
130 client,
131 num_blocks_kept,
132 finality_notifications,
133 finalized_to: Zero::zero(),
134 blocks: VecDeque::new(),
135 fetched_to: Zero::zero(),
136 added_to: None,
137 extra_refs: HashMap::new(),
138 }
139 }
140}
141
142impl<B: Block, C> Unpin for IndexedTransactionChanges<B, C> {}
143
144impl<B, C> Stream for IndexedTransactionChanges<B, C>
145where
146 B: Block,
147 C: BlockBackend<B>,
148{
149 type Item = Change;
150
151 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
152 let this = self.get_mut();
153
154 if !this.finality_notifications.is_terminated() {
156 while let Poll::Ready(Some(notification)) =
157 this.finality_notifications.poll_next_unpin(cx)
158 {
159 this.finalized_to = *notification.header.number() + 1u32.into();
160 }
161 }
162
163 let pruned_to = this.finalized_to.saturating_sub(this.num_blocks_kept.into());
165 this.fetched_to = this.fetched_to.max(pruned_to); while let (only_block, Some(block)) = (this.blocks.len() == 1, this.blocks.front_mut()) {
167 if block.number >= pruned_to {
168 break; }
170
171 if let (true, Some(added_to)) = (only_block, this.added_to) {
173 block.transaction_hashes.truncate(added_to);
174 this.added_to = None;
175 }
176
177 while let Some(hash) = block.transaction_hashes.pop() {
178 match this.extra_refs.entry(hash) {
179 Entry::Occupied(mut entry) => match entry.get().checked_sub(1) {
180 Some(extra_refs) => {
181 entry.insert(extra_refs);
182 },
183 None => {
184 entry.remove();
185 return Poll::Ready(Some(Change::Removed(
187 to_multihash::<BlakeTwo256>(&hash),
188 )));
189 },
190 },
191 Entry::Vacant(_) => warn!("Pruned transaction hash {hash} not found"),
193 }
194 }
195
196 this.blocks.pop_front();
197 }
198
199 loop {
201 while this.added_to.is_none() && (this.fetched_to < this.finalized_to) {
203 let hashes = this.client.block_hash(this.fetched_to).and_then(|hash| {
204 let hash = hash.ok_or_else(|| {
205 sp_blockchain::Error::UnknownBlock(format!(
206 "Hash of block {} not found",
207 this.fetched_to,
208 ))
209 })?;
210 this.client.block_indexed_hashes(hash)
211 });
212 match hashes {
213 Ok(Some(hashes)) if !hashes.is_empty() => {
214 this.blocks.push_back(IndexedBlock {
215 number: this.fetched_to,
216 transaction_hashes: hashes,
217 });
218 this.added_to = Some(0);
219 },
220 Ok(_) => (),
221 Err(err) => debug!("Error fetching block {}: {err}", this.fetched_to),
222 }
223 this.fetched_to += 1u32.into();
224 }
225
226 while let Some(added_to) = &mut this.added_to {
228 let block = this.blocks.back().expect(
229 "added_to only set to Some after pushing a block, \
230 set to None before popping last block",
231 );
232 let hash = block.transaction_hashes[*added_to];
233 *added_to += 1;
234 if *added_to == block.transaction_hashes.len() {
235 this.added_to = None;
236 }
237
238 match this.extra_refs.entry(hash) {
239 Entry::Occupied(mut entry) => *entry.get_mut() += 1,
240 Entry::Vacant(entry) => {
241 entry.insert(0);
242 return Poll::Ready(Some(Change::Added(to_multihash::<BlakeTwo256>(
244 &hash,
245 ))));
246 },
247 }
248 }
249
250 debug_assert!(this.fetched_to <= this.finalized_to);
253 if this.fetched_to == this.finalized_to {
254 return Poll::Pending;
255 }
256 }
257 }
258}
259
260pub struct IndexedTransactions<B, C> {
264 client: Arc<C>,
265 num_blocks_kept: u32,
266 phantom: PhantomData<B>,
267}
268
269impl<B, C> IndexedTransactions<B, C> {
270 pub fn new(client: Arc<C>, num_blocks_kept: u32) -> Self {
273 Self { client, num_blocks_kept, phantom: PhantomData }
274 }
275}
276
277impl<B, C> BlockProvider for IndexedTransactions<B, C>
278where
279 B: Block,
280 C: BlockchainEvents<B> + BlockBackend<B> + Send + Sync + 'static,
281{
282 fn have(&self, multihash: &Multihash) -> bool {
283 let Some(hash) = try_from_multihash::<BlakeTwo256>(multihash) else { return false };
284 match self.client.has_indexed_transaction(hash) {
285 Ok(have) => have,
286 Err(err) => {
287 debug!(target: LOG_TARGET, "Error checking for block {hash:?}: {err}");
288 false
289 },
290 }
291 }
292
293 fn get(&self, multihash: &Multihash) -> Option<Vec<u8>> {
294 let Some(hash) = try_from_multihash::<BlakeTwo256>(multihash) else { return None };
295 match self.client.indexed_transaction(hash) {
296 Ok(block) => block,
297 Err(err) => {
298 debug!(target: LOG_TARGET, "Error getting block {hash:?}: {err}");
299 None
300 },
301 }
302 }
303
304 fn changes(&self) -> Pin<Box<dyn Stream<Item = Change> + Send>> {
305 Box::pin(IndexedTransactionChanges::new(self.client.clone(), self.num_blocks_kept))
306 }
307}