use futures_channel::oneshot;
use futures_lite::FutureExt as _;
use smol::stream::StreamExt as _;
use smoldot::{
executor,
json_rpc::{methods, service},
};
use std::{
future::Future,
num::NonZero,
pin::{self, Pin},
sync::Arc,
};
use crate::{consensus_service, database_thread};
pub struct Config {
pub tasks_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
pub receiver: async_channel::Receiver<Message>,
pub chain_head_follow_subscription: service::SubscriptionStartProcess,
pub with_runtime: bool,
pub consensus_service: Arc<consensus_service::ConsensusService>,
pub database: Arc<database_thread::DatabaseThread>,
}
pub enum Message {
Header {
request: service::RequestProcess,
},
Unpin {
block_hashes: Vec<[u8; 32]>,
outcome: oneshot::Sender<Result<(), ()>>,
},
}
pub async fn spawn_chain_head_subscription_task(config: Config) -> String {
let mut json_rpc_subscription = config.chain_head_follow_subscription.accept();
let json_rpc_subscription_id = json_rpc_subscription.subscription_id().to_owned();
let return_value = json_rpc_subscription_id.clone();
let tasks_executor = config.tasks_executor.clone();
tasks_executor(Box::pin(async move {
let consensus_service_subscription = config
.consensus_service
.subscribe_all(32, NonZero::<usize>::new(32).unwrap())
.await;
let mut consensus_service_subscription_new_blocks =
pin::pin!(consensus_service_subscription.new_blocks);
let mut foreground_receiver = pin::pin!(config.receiver);
let mut pinned_blocks =
hashbrown::HashSet::with_capacity_and_hasher(32, fnv::FnvBuildHasher::default());
let mut current_best_block = consensus_service_subscription.finalized_block_hash;
pinned_blocks.insert(consensus_service_subscription.finalized_block_hash);
json_rpc_subscription
.send_notification(methods::ServerToClient::chainHead_v1_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::Initialized {
finalized_block_hashes: vec![methods::HashHexString(
consensus_service_subscription.finalized_block_hash,
)],
finalized_block_runtime: if config.with_runtime {
Some(convert_runtime_spec(
consensus_service_subscription
.finalized_block_runtime
.runtime_version(),
))
} else {
None
},
},
})
.await;
for block in consensus_service_subscription.non_finalized_blocks_ancestry_order {
pinned_blocks.insert(block.block_hash);
json_rpc_subscription
.send_notification(methods::ServerToClient::chainHead_v1_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::NewBlock {
block_hash: methods::HashHexString(block.block_hash),
new_runtime: if let (Some(new_runtime), true) =
(&block.runtime_update, config.with_runtime)
{
Some(convert_runtime_spec(new_runtime.runtime_version()))
} else {
None
},
parent_block_hash: methods::HashHexString(block.parent_hash),
},
})
.await;
if block.is_new_best {
current_best_block = block.block_hash;
json_rpc_subscription
.send_notification(methods::ServerToClient::chainHead_v1_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::BestBlockChanged {
best_block_hash: methods::HashHexString(block.block_hash),
},
})
.await;
}
}
loop {
enum WakeUpReason {
ConsensusNotification(consensus_service::Notification),
ConsensusSubscriptionStop,
Foreground(Message),
ForegroundClosed,
}
let wake_up_reason = async {
consensus_service_subscription_new_blocks
.next()
.await
.map_or(
WakeUpReason::ConsensusSubscriptionStop,
WakeUpReason::ConsensusNotification,
)
}
.or(async {
foreground_receiver
.next()
.await
.map_or(WakeUpReason::ForegroundClosed, WakeUpReason::Foreground)
})
.await;
match wake_up_reason {
WakeUpReason::ForegroundClosed => return,
WakeUpReason::Foreground(Message::Header { request }) => {
let methods::MethodCall::chainHead_v1_header { hash, .. } = request.request()
else {
unreachable!()
};
if !pinned_blocks.contains(&hash.0) {
request.fail(service::ErrorResponse::InvalidParams);
continue;
}
let database_outcome = config
.database
.with_database(move |database| database.block_scale_encoded_header(&hash.0))
.await;
match database_outcome {
Ok(Some(header)) => {
request.respond(methods::Response::chainHead_v1_header(Some(
methods::HexString(header),
)))
}
Ok(None) => {
request.fail(service::ErrorResponse::InternalError);
}
Err(_) => {
request.fail(service::ErrorResponse::InternalError);
}
}
}
WakeUpReason::Foreground(Message::Unpin {
block_hashes,
outcome,
}) => {
if block_hashes.iter().any(|h| !pinned_blocks.contains(h)) {
let _ = outcome.send(Err(()));
} else {
for block_hash in block_hashes {
pinned_blocks.remove(&block_hash);
config
.consensus_service
.unpin_block(consensus_service_subscription.id, block_hash)
.await;
}
let _ = outcome.send(Ok(()));
}
}
WakeUpReason::ConsensusNotification(consensus_service::Notification::Block {
block,
..
}) => {
pinned_blocks.insert(block.block_hash);
json_rpc_subscription
.send_notification(methods::ServerToClient::chainHead_v1_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::NewBlock {
block_hash: methods::HashHexString(block.block_hash),
new_runtime: if let (Some(new_runtime), true) =
(&block.runtime_update, config.with_runtime)
{
Some(convert_runtime_spec(new_runtime.runtime_version()))
} else {
None
},
parent_block_hash: methods::HashHexString(block.parent_hash),
},
})
.await;
if block.is_new_best {
current_best_block = block.block_hash;
json_rpc_subscription
.send_notification(methods::ServerToClient::chainHead_v1_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::BestBlockChanged {
best_block_hash: methods::HashHexString(block.block_hash),
},
})
.await;
}
}
WakeUpReason::ConsensusNotification(
consensus_service::Notification::Finalized {
finalized_blocks_newest_to_oldest,
pruned_blocks_hashes,
best_block_hash,
},
) => {
json_rpc_subscription
.send_notification(methods::ServerToClient::chainHead_v1_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::Finalized {
finalized_blocks_hashes: finalized_blocks_newest_to_oldest
.into_iter()
.map(methods::HashHexString)
.rev()
.collect(),
pruned_blocks_hashes: pruned_blocks_hashes
.into_iter()
.map(methods::HashHexString)
.collect(),
},
})
.await;
if best_block_hash != current_best_block {
current_best_block = best_block_hash;
json_rpc_subscription
.send_notification(methods::ServerToClient::chainHead_v1_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::BestBlockChanged {
best_block_hash: methods::HashHexString(best_block_hash),
},
})
.await;
}
}
WakeUpReason::ConsensusSubscriptionStop => {
json_rpc_subscription
.send_notification(methods::ServerToClient::chainHead_v1_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::Stop {},
})
.await;
}
}
}
}));
return_value
}
fn convert_runtime_spec(runtime: &executor::CoreVersion) -> methods::MaybeRuntimeSpec {
let runtime = runtime.decode();
methods::MaybeRuntimeSpec::Valid {
spec: methods::RuntimeSpec {
impl_name: runtime.impl_name.into(),
spec_name: runtime.spec_name.into(),
impl_version: runtime.impl_version,
spec_version: runtime.spec_version,
transaction_version: runtime.transaction_version,
apis: runtime
.apis
.map(|api| (methods::HexString(api.name_hash.to_vec()), api.version))
.collect(),
},
}
}