use crate::{
platform::PlatformRef,
runtime_service::{Notification, RuntimeError, RuntimeService},
};
use alloc::{sync::Arc, vec::Vec};
use core::num::NonZeroUsize;
use futures_util::{future, stream, StreamExt as _};
use smoldot::{executor, header};
pub async fn subscribe_runtime_version<TPlat: PlatformRef>(
runtime_service: &Arc<RuntimeService<TPlat>>,
) -> (
Result<executor::CoreVersion, RuntimeError>,
stream::BoxStream<'static, Result<executor::CoreVersion, RuntimeError>>,
) {
let mut master_stream = stream::unfold(runtime_service.clone(), |runtime_service| async move {
let subscribe_all = runtime_service
.subscribe_all("subscribe-runtime-version", 16, NonZeroUsize::new(24).unwrap())
.await;
let mut headers = hashbrown::HashMap::<
[u8; 32],
Arc<Result<executor::CoreVersion, RuntimeError>>,
fnv::FnvBuildHasher,
>::with_capacity_and_hasher(16, Default::default());
let current_finalized_hash = header::hash_from_scale_encoded_header(
&subscribe_all.finalized_block_scale_encoded_header,
);
subscribe_all
.new_blocks
.unpin_block(¤t_finalized_hash)
.await;
headers.insert(
current_finalized_hash,
Arc::new(subscribe_all.finalized_block_runtime),
);
let mut current_best = None;
for block in subscribe_all.non_finalized_blocks_ancestry_order {
let hash = header::hash_from_scale_encoded_header(&block.scale_encoded_header);
subscribe_all.new_blocks.unpin_block(&hash).await;
if let Some(new_runtime) = block.new_runtime {
headers.insert(hash, Arc::new(new_runtime));
} else {
let parent_runtime = headers
.get(&block.parent_hash)
.unwrap()
.clone();
headers.insert(hash, parent_runtime);
}
if block.is_new_best {
debug_assert!(current_best.is_none());
current_best = Some(hash);
}
}
let current_best = current_best.unwrap_or(current_finalized_hash);
let current_best_runtime = (**headers.get(¤t_best).unwrap()).clone();
let substream = stream::unfold(
(
subscribe_all.new_blocks,
headers,
current_finalized_hash,
current_best,
),
|(
mut new_blocks,
mut headers,
mut current_finalized_hash,
mut current_best,
)| async move {
loop {
match new_blocks.next().await? {
Notification::Block(block) => {
let hash =
header::hash_from_scale_encoded_header(&block.scale_encoded_header);
new_blocks.unpin_block(&hash).await;
if let Some(new_runtime) = block.new_runtime {
headers.insert(hash, Arc::new(new_runtime));
} else {
let parent_runtime = headers
.get(&block.parent_hash)
.unwrap()
.clone();
headers.insert(hash, parent_runtime);
}
if block.is_new_best {
let current_best_runtime =
headers.get(¤t_best).unwrap();
let new_best_runtime = headers.get(&hash).unwrap();
current_best = hash;
if !Arc::ptr_eq(current_best_runtime, new_best_runtime) {
let runtime = (**new_best_runtime).clone();
break Some((
runtime,
(
new_blocks,
headers,
current_finalized_hash,
current_best,
),
));
}
}
}
Notification::Finalized {
hash,
pruned_blocks,
best_block_hash,
} => {
let current_best_runtime =
headers.get(¤t_best).unwrap().clone();
let new_best_runtime =
headers.get(&best_block_hash).unwrap().clone();
for pruned_block in pruned_blocks {
let _was_in = headers.remove(&pruned_block);
debug_assert!(_was_in.is_some());
}
let _ = headers
.remove(¤t_finalized_hash)
.unwrap();
current_finalized_hash = hash;
current_best = best_block_hash;
if !Arc::ptr_eq(¤t_best_runtime, &new_best_runtime) {
let runtime = (*new_best_runtime).clone();
break Some((
runtime,
(
new_blocks,
headers,
current_finalized_hash,
current_best,
),
));
}
}
Notification::BestBlockChanged { hash } => {
let current_best_runtime =
headers.get(¤t_best).unwrap().clone();
let new_best_runtime =
headers.get(&hash).unwrap().clone();
current_best = hash;
if !Arc::ptr_eq(¤t_best_runtime, &new_best_runtime) {
let runtime = (*new_best_runtime).clone();
break Some((
runtime,
(
new_blocks,
headers,
current_finalized_hash,
current_best,
),
));
}
}
}
}
},
);
let substream = stream::once(future::ready(current_best_runtime)).chain(substream);
Some((substream, runtime_service))
})
.flatten()
.boxed();
let first_value = master_stream.next().await.unwrap();
(first_value, master_stream)
}
pub async fn subscribe_finalized<TPlat: PlatformRef>(
runtime_service: &Arc<RuntimeService<TPlat>>,
) -> (Vec<u8>, stream::BoxStream<'static, Vec<u8>>) {
let mut master_stream = stream::unfold(runtime_service.clone(), |runtime_service| async move {
let subscribe_all = runtime_service
.subscribe_all("subscribe-finalized", 16, NonZeroUsize::new(32).unwrap())
.await;
let mut non_finalized_headers =
hashbrown::HashMap::<[u8; 32], Vec<u8>, fnv::FnvBuildHasher>::with_capacity_and_hasher(
16,
Default::default(),
);
subscribe_all
.new_blocks
.unpin_block(&header::hash_from_scale_encoded_header(
&subscribe_all.finalized_block_scale_encoded_header,
))
.await;
for block in subscribe_all.non_finalized_blocks_ancestry_order {
let hash = header::hash_from_scale_encoded_header(&block.scale_encoded_header);
subscribe_all.new_blocks.unpin_block(&hash).await;
non_finalized_headers.insert(hash, block.scale_encoded_header);
}
let substream = stream::unfold(
(subscribe_all.new_blocks, non_finalized_headers),
|(mut new_blocks, mut non_finalized_headers)| async {
loop {
match new_blocks.next().await? {
Notification::Block(block) => {
let hash =
header::hash_from_scale_encoded_header(&block.scale_encoded_header);
new_blocks.unpin_block(&hash).await;
non_finalized_headers.insert(hash, block.scale_encoded_header);
}
Notification::Finalized {
hash,
pruned_blocks,
..
} => {
for pruned_block in pruned_blocks {
let _was_in = non_finalized_headers.remove(&pruned_block);
debug_assert!(_was_in.is_some());
}
let header = non_finalized_headers.remove(&hash).unwrap();
break Some((header, (new_blocks, non_finalized_headers)));
}
Notification::BestBlockChanged { .. } => {}
}
}
},
);
let substream = stream::once(future::ready(
subscribe_all.finalized_block_scale_encoded_header,
))
.chain(substream);
Some((substream, runtime_service))
})
.flatten()
.boxed();
let first_value = master_stream.next().await.unwrap();
(first_value, master_stream)
}
pub async fn subscribe_best<TPlat: PlatformRef>(
runtime_service: &Arc<RuntimeService<TPlat>>,
) -> (Vec<u8>, stream::BoxStream<'static, Vec<u8>>) {
let mut master_stream = stream::unfold(runtime_service.clone(), |runtime_service| async move {
let subscribe_all = runtime_service
.subscribe_all("subscribe-best", 16, NonZeroUsize::new(32).unwrap())
.await;
let mut headers =
hashbrown::HashMap::<[u8; 32], Vec<u8>, fnv::FnvBuildHasher>::with_capacity_and_hasher(
16,
Default::default(),
);
let current_finalized_hash = header::hash_from_scale_encoded_header(
&subscribe_all.finalized_block_scale_encoded_header,
);
subscribe_all
.new_blocks
.unpin_block(¤t_finalized_hash)
.await;
headers.insert(
current_finalized_hash,
subscribe_all.finalized_block_scale_encoded_header,
);
let mut current_best = None;
for block in subscribe_all.non_finalized_blocks_ancestry_order {
let hash = header::hash_from_scale_encoded_header(&block.scale_encoded_header);
subscribe_all.new_blocks.unpin_block(&hash).await;
headers.insert(hash, block.scale_encoded_header);
if block.is_new_best {
debug_assert!(current_best.is_none());
current_best = Some(hash);
}
}
let current_best = current_best.unwrap_or(current_finalized_hash);
let current_best_header = headers.get(¤t_best).unwrap().clone();
let substream = stream::unfold(
(
subscribe_all.new_blocks,
headers,
current_finalized_hash,
current_best,
),
|(
mut new_blocks,
mut headers,
mut current_finalized_hash,
mut current_best,
)| async move {
loop {
match new_blocks.next().await? {
Notification::Block(block) => {
let hash =
header::hash_from_scale_encoded_header(&block.scale_encoded_header);
new_blocks.unpin_block(&hash).await;
headers.insert(hash, block.scale_encoded_header);
if block.is_new_best {
current_best = hash;
let header =
headers.get(¤t_best).unwrap().clone();
break Some((
header,
(
new_blocks,
headers,
current_finalized_hash,
current_best,
),
));
}
}
Notification::Finalized {
hash,
pruned_blocks,
best_block_hash,
} => {
for pruned_block in pruned_blocks {
let _was_in = headers.remove(&pruned_block);
debug_assert!(_was_in.is_some());
}
let _ = headers
.remove(¤t_finalized_hash)
.unwrap();
current_finalized_hash = hash;
if best_block_hash != current_best {
current_best = best_block_hash;
let header =
headers.get(¤t_best).unwrap().clone();
break Some((
header,
(
new_blocks,
headers,
current_finalized_hash,
current_best,
),
));
}
}
Notification::BestBlockChanged { hash } => {
if hash != current_best {
current_best = hash;
let header =
headers.get(¤t_best).unwrap().clone();
break Some((
header,
(
new_blocks,
headers,
current_finalized_hash,
current_best,
),
));
}
}
}
}
},
);
let substream = stream::once(future::ready(current_best_header)).chain(substream);
Some((substream, runtime_service))
})
.flatten()
.boxed();
let first_value = master_stream.next().await.unwrap();
(first_value, master_stream)
}