use core::marker::PhantomData;
use futures::{stream::FusedStream, Stream, StreamExt};
use log::{debug, warn};
use sc_client_api::{BlockBackend, BlockchainEvents, FinalityNotifications};
use sp_core::H256;
use sp_runtime::traits::{BlakeTwo256, Block, Hash, Header, NumberFor, Saturating, Zero};
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
const LOG_TARGET: &str = "sub-libp2p::ipfs";
type Multihash = cid::multihash::Multihash<32>;
pub enum Change {
Added(Multihash),
Removed(Multihash),
}
pub trait BlockProvider: Send + Sync {
fn have(&self, multihash: &Multihash) -> bool;
fn get(&self, multihash: &Multihash) -> Option<Vec<u8>>;
fn changes(&self) -> Pin<Box<dyn Stream<Item = Change> + Send>>;
}
trait HasMultihashCode {
const MULTIHASH_CODE: u64;
}
impl HasMultihashCode for BlakeTwo256 {
const MULTIHASH_CODE: u64 = 0xb220;
}
fn try_from_multihash<H: Hash + HasMultihashCode>(multihash: &Multihash) -> Option<H::Output> {
if multihash.code() != H::MULTIHASH_CODE {
return None;
}
let mut hash = H::Output::default();
let src = multihash.digest();
let dst = hash.as_mut();
if src.len() != dst.len() {
return None;
}
dst.copy_from_slice(src);
Some(hash)
}
fn to_multihash<H: Hash + HasMultihashCode>(hash: &H::Output) -> Multihash {
Multihash::wrap(H::MULTIHASH_CODE, hash.as_ref()).expect("Hash size is fixed and small enough")
}
struct IndexedBlock<B: Block> {
number: NumberFor<B>,
transaction_hashes: Vec<H256>,
}
struct IndexedTransactionChanges<B: Block, C> {
client: Arc<C>,
num_blocks_kept: u32,
finality_notifications: FinalityNotifications<B>,
finalized_to: NumberFor<B>,
blocks: VecDeque<IndexedBlock<B>>,
fetched_to: NumberFor<B>,
added_to: Option<usize>,
extra_refs: HashMap<H256, u32>,
}
impl<B, C> IndexedTransactionChanges<B, C>
where
B: Block,
C: BlockchainEvents<B>,
{
fn new(client: Arc<C>, num_blocks_kept: u32) -> Self {
let finality_notifications = client.finality_notification_stream();
Self {
client,
num_blocks_kept,
finality_notifications,
finalized_to: Zero::zero(),
blocks: VecDeque::new(),
fetched_to: Zero::zero(),
added_to: None,
extra_refs: HashMap::new(),
}
}
}
impl<B: Block, C> Unpin for IndexedTransactionChanges<B, C> {}
impl<B, C> Stream for IndexedTransactionChanges<B, C>
where
B: Block,
C: BlockBackend<B>,
{
type Item = Change;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if !this.finality_notifications.is_terminated() {
while let Poll::Ready(Some(notification)) =
this.finality_notifications.poll_next_unpin(cx)
{
this.finalized_to = *notification.header.number() + 1u32.into();
}
}
let pruned_to = this.finalized_to.saturating_sub(this.num_blocks_kept.into());
this.fetched_to = this.fetched_to.max(pruned_to); while let (only_block, Some(block)) = (this.blocks.len() == 1, this.blocks.front_mut()) {
if block.number >= pruned_to {
break; }
if let (true, Some(added_to)) = (only_block, this.added_to) {
block.transaction_hashes.truncate(added_to);
this.added_to = None;
}
while let Some(hash) = block.transaction_hashes.pop() {
match this.extra_refs.entry(hash) {
Entry::Occupied(mut entry) => match entry.get().checked_sub(1) {
Some(extra_refs) => {
entry.insert(extra_refs);
},
None => {
entry.remove();
return Poll::Ready(Some(Change::Removed(
to_multihash::<BlakeTwo256>(&hash),
)));
},
},
Entry::Vacant(_) => warn!("Pruned transaction hash {hash} not found"),
}
}
this.blocks.pop_front();
}
loop {
while this.added_to.is_none() && (this.fetched_to < this.finalized_to) {
let hashes = this.client.block_hash(this.fetched_to).and_then(|hash| {
let hash = hash.ok_or_else(|| {
sp_blockchain::Error::UnknownBlock(format!(
"Hash of block {} not found",
this.fetched_to,
))
})?;
this.client.block_indexed_hashes(hash)
});
match hashes {
Ok(Some(hashes)) if !hashes.is_empty() => {
this.blocks.push_back(IndexedBlock {
number: this.fetched_to,
transaction_hashes: hashes,
});
this.added_to = Some(0);
},
Ok(_) => (),
Err(err) => debug!("Error fetching block {}: {err}", this.fetched_to),
}
this.fetched_to += 1u32.into();
}
while let Some(added_to) = &mut this.added_to {
let block = this.blocks.back().expect(
"added_to only set to Some after pushing a block, \
set to None before popping last block",
);
let hash = block.transaction_hashes[*added_to];
*added_to += 1;
if *added_to == block.transaction_hashes.len() {
this.added_to = None;
}
match this.extra_refs.entry(hash) {
Entry::Occupied(mut entry) => *entry.get_mut() += 1,
Entry::Vacant(entry) => {
entry.insert(0);
return Poll::Ready(Some(Change::Added(to_multihash::<BlakeTwo256>(
&hash,
))));
},
}
}
debug_assert!(this.fetched_to <= this.finalized_to);
if this.fetched_to == this.finalized_to {
return Poll::Pending;
}
}
}
}
pub struct IndexedTransactions<B, C> {
client: Arc<C>,
num_blocks_kept: u32,
phantom: PhantomData<B>,
}
impl<B, C> IndexedTransactions<B, C> {
pub fn new(client: Arc<C>, num_blocks_kept: u32) -> Self {
Self { client, num_blocks_kept, phantom: PhantomData }
}
}
impl<B, C> BlockProvider for IndexedTransactions<B, C>
where
B: Block,
C: BlockchainEvents<B> + BlockBackend<B> + Send + Sync + 'static,
{
fn have(&self, multihash: &Multihash) -> bool {
let Some(hash) = try_from_multihash::<BlakeTwo256>(multihash) else { return false };
match self.client.has_indexed_transaction(hash) {
Ok(have) => have,
Err(err) => {
debug!(target: LOG_TARGET, "Error checking for block {hash:?}: {err}");
false
},
}
}
fn get(&self, multihash: &Multihash) -> Option<Vec<u8>> {
let Some(hash) = try_from_multihash::<BlakeTwo256>(multihash) else { return None };
match self.client.indexed_transaction(hash) {
Ok(block) => block,
Err(err) => {
debug!(target: LOG_TARGET, "Error getting block {hash:?}: {err}");
None
},
}
}
fn changes(&self) -> Pin<Box<dyn Stream<Item = Change> + Send>> {
Box::pin(IndexedTransactionChanges::new(self.client.clone(), self.num_blocks_kept))
}
}