use crate::blocks::Tipset;
use crate::message_pool::MpoolUpdate;
use crate::prelude::ShallowClone;
use crate::rpc::RPCState;
use crate::rpc::eth::pubsub_trait::{EthPubSubApiServer, SubscriptionKind, SubscriptionParams};
use crate::rpc::eth::types::{ApiHeaders, EthFilterSpec};
use crate::rpc::eth::{
Block as EthBlock, TxInfo, eth_logs_with_filter, eth_tx_hash_from_signed_message,
};
use crate::utils::broadcast::subscription_stream;
use futures::{Stream, StreamExt as _};
use jsonrpsee::core::SubscriptionResult;
use jsonrpsee::{PendingSubscriptionSink, SubscriptionSink};
use std::sync::Arc;
#[derive(derive_more::Constructor)]
pub struct EthPubSub {
ctx: Arc<RPCState>,
}
#[async_trait::async_trait]
impl EthPubSubApiServer for EthPubSub {
async fn subscribe(
&self,
pending: PendingSubscriptionSink,
kind: SubscriptionKind,
params: Option<SubscriptionParams>,
) -> SubscriptionResult {
let sink = pending.accept().await?;
let ctx = self.ctx.shallow_clone();
match kind {
SubscriptionKind::NewHeads => spawn_new_heads(sink, ctx),
SubscriptionKind::PendingTransactions => spawn_pending_transactions(sink, ctx),
SubscriptionKind::Logs => {
let filter = params.and_then(|p| p.filter).map(EthFilterSpec::from);
spawn_logs(sink, ctx, filter);
}
}
Ok(())
}
}
fn head_message_tipsets(ctx: &Arc<RPCState>) -> impl Stream<Item = Tipset> + Send + use<> {
let rx = ctx.chain_store().subscribe_head_changes();
let ctx = ctx.shallow_clone();
subscription_stream(rx).flat_map(move |changes| {
let ctx = ctx.shallow_clone();
let items: Vec<_> = changes
.applies
.into_iter()
.filter_map(|applied| {
if applied.epoch() == 0 {
return None;
}
match ctx.chain_index().load_required_tipset(applied.parents()) {
Ok(parent) => Some(parent),
Err(e) => {
tracing::error!("Failed to load parent tipset of {}: {e:#}", applied.key());
None
}
}
})
.collect();
futures::stream::iter(items)
})
}
fn spawn_new_heads(sink: SubscriptionSink, ctx: Arc<RPCState>) {
let stream = head_message_tipsets(&ctx)
.filter_map(move |ts| {
let state_mngr = ctx.state_manager.shallow_clone();
async move {
EthBlock::from_filecoin_tipset(&state_mngr, ts, TxInfo::Full)
.await
.inspect_err(|e| {
tracing::error!("Failed to convert tipset to eth block: {e:#}")
})
.ok()
.map(ApiHeaders)
}
})
.boxed();
tokio::spawn(pipe_stream_to_sink(stream, sink));
}
fn spawn_logs(sink: SubscriptionSink, ctx: Arc<RPCState>, filter: Option<EthFilterSpec>) {
let stream = head_message_tipsets(&ctx)
.filter_map(move |ts| {
let ctx = ctx.shallow_clone();
let filter = filter.clone();
async move {
eth_logs_with_filter(&ctx, &ts, filter)
.await
.inspect_err(|e| {
tracing::error!("Failed to fetch logs for tipset {}: {e:#}", ts.key())
})
.ok()
}
})
.flat_map(futures::stream::iter)
.boxed();
tokio::spawn(pipe_stream_to_sink(stream, sink));
}
fn spawn_pending_transactions(sink: SubscriptionSink, ctx: Arc<RPCState>) {
let mpool_rx = ctx.mpool.subscribe_to_updates();
let eth_chain_id = ctx.chain_config().eth_chain_id;
let stream = subscription_stream(mpool_rx)
.filter_map(move |update| async move {
let MpoolUpdate::Add(msg) = update else {
return None;
};
eth_tx_hash_from_signed_message(&msg, eth_chain_id)
.inspect_err(|e| {
tracing::error!("Failed to compute eth tx hash from mpool message: {e:#}")
})
.ok()
})
.boxed();
tokio::spawn(pipe_stream_to_sink(stream, sink));
}
async fn pipe_stream_to_sink<S, T>(mut stream: S, sink: SubscriptionSink)
where
S: Stream<Item = T> + Unpin + Send,
T: serde::Serialize + Send,
{
loop {
tokio::select! {
_ = sink.closed() => break,
maybe = stream.next() => {
let Some(item) = maybe else { break };
let msg = match jsonrpsee::SubscriptionMessage::new(
sink.method_name(),
sink.subscription_id(),
&item,
) {
Ok(m) => m,
Err(e) => {
tracing::error!("Failed to serialize subscription message: {e:?}");
continue;
}
};
if let Err(e) = sink.send(msg).await {
tracing::debug!("Subscription sink send failed (client disconnected): {e:?}");
break;
}
}
}
}
tracing::debug!("Subscription task ended (id: {:?})", sink.subscription_id());
}