forest/rpc/methods/eth/pubsub.rs
1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4//! Official documentation for the Ethereum pubsub protocol is available at:
5//! https://geth.ethereum.org/docs/interacting-with-geth/rpc/pubsub
6//!
7//! Note that Filecoin uses this protocol without modifications.
8//!
9//! The sequence diagram for an event subscription is shown below:
10//! ```text
11//! ┌─────────────┐ ┌─────────────┐
12//! │ WS Client │ │ Node │
13//! └─────────────┘ └─────────────┘
14//! │ │
15//! │ ┌────────────────────────────────┐ │
16//! │──┤ Subscription message ├───────────────────────────────▶ │
17//! │ │ │ │
18//! │ │{ jsonrpc:'2.0', │ │
19//! │ │ id:<id>, │ │
20//! │ │ method:'eth_subscribe', │ │
21//! │ │ params:[<eventType>] } │ │
22//! │ └────────────────────────────────┘ │
23//! │ ┌────────────────────────────────┐ │
24//! │ ◀───────────────────────────────┤ Opened subscription message ├──│
25//! │ │ │ │
26//! │ │{ jsonrpc:'2.0', │ │
27//! │ │ id:<id>, │ │
28//! │ │ result:<subId> } │ │
29//! │ └────────────────────────────────┘ │
30//! │ │
31//! │ │
32//! │ ┌────────────────────────────────┐ │
33//! │ ◀───────────────────────────────┤ Notification message ├──│
34//! │ │ │ │
35//! │ │{ jsonrpc:'2.0', │ │
36//! │ │ method:'eth_subscription', │ │
37//! │ │ params:{ subscription:<subId>,│ │
38//! │ │ result:<payload> } } │ │
39//! │ └────────────────────────────────┘ │
40//! │ │
41//! │ │
42//! │ │
43//! │ After a few notifications │
44//! │ ┌────────────────────────────────┐ │
45//! │──┤ Cancel subscription ├───────────────────────────────▶ │
46//! │ │ │ │
47//! │ │{ jsonrpc:'2.0', │ │
48//! │ │ id:<id>, │ │
49//! │ │ method:'eth_unsubscribe', │ │
50//! │ │ params:[<subId>] } │ │
51//! │ └────────────────────────────────┘ │
52//! │ ┌────────────────────────────────┐ │
53//! │ ◀───────────────────────────────┤ Closed subscription message ├──│
54//! │ │ │ │
55//! │ │{ jsonrpc:'2.0', │ │
56//! │ │ id:<id>, │ │
57//! │ │ result:true } │ │
58//! │ └────────────────────────────────┘ │
59//! ```
60//!
61
62use crate::rpc::eth::pubsub_trait::{
63 EthPubSubApiServer, LogFilter, SubscriptionKind, SubscriptionParams,
64};
65use crate::rpc::{RPCState, chain};
66use fvm_ipld_blockstore::Blockstore;
67use jsonrpsee::PendingSubscriptionSink;
68use jsonrpsee::core::{SubscriptionError, SubscriptionResult};
69use std::sync::Arc;
70use tokio::sync::broadcast::{Receiver as Subscriber, error::RecvError};
71
72pub struct EthPubSub<DB> {
73 ctx: Arc<RPCState<DB>>,
74}
75
76impl<DB> EthPubSub<DB> {
77 pub fn new(ctx: Arc<RPCState<DB>>) -> Self {
78 Self { ctx }
79 }
80}
81
82#[async_trait::async_trait]
83impl<DB> EthPubSubApiServer for EthPubSub<DB>
84where
85 DB: Blockstore + Send + Sync + 'static,
86{
87 async fn subscribe(
88 &self,
89 pending: PendingSubscriptionSink,
90 kind: SubscriptionKind,
91 params: Option<SubscriptionParams>,
92 ) -> SubscriptionResult {
93 let sink = pending.accept().await?;
94 let ctx = self.ctx.clone();
95
96 match kind {
97 SubscriptionKind::NewHeads => self.handle_new_heads_subscription(sink, ctx).await,
98 SubscriptionKind::PendingTransactions => {
99 return Err(SubscriptionError::from(
100 jsonrpsee::types::ErrorObjectOwned::owned(
101 jsonrpsee::types::error::METHOD_NOT_FOUND_CODE,
102 "pendingTransactions subscription not yet implemented",
103 None::<()>,
104 ),
105 ));
106 }
107 SubscriptionKind::Logs => {
108 let filter = params.and_then(|p| p.filter);
109 self.handle_logs_subscription(sink, ctx, filter).await
110 }
111 }
112
113 Ok(())
114 }
115}
116
117impl<DB> EthPubSub<DB>
118where
119 DB: Blockstore + Send + Sync + 'static,
120{
121 async fn handle_new_heads_subscription(
122 &self,
123 accepted_sink: jsonrpsee::SubscriptionSink,
124 ctx: Arc<RPCState<DB>>,
125 ) {
126 let (subscriber, handle) = chain::new_heads(&ctx);
127 tokio::spawn(async move {
128 handle_subscription(subscriber, accepted_sink, handle).await;
129 });
130 }
131
132 async fn handle_logs_subscription(
133 &self,
134 accepted_sink: jsonrpsee::SubscriptionSink,
135 ctx: Arc<RPCState<DB>>,
136 filter_spec: Option<LogFilter>,
137 ) {
138 let filter_spec = filter_spec.map(Into::into);
139 let (logs, handle) = chain::logs(&ctx, filter_spec);
140 tokio::spawn(async move {
141 handle_subscription(logs, accepted_sink, handle).await;
142 });
143 }
144}
145
146async fn handle_subscription<T>(
147 mut subscriber: Subscriber<T>,
148 sink: jsonrpsee::SubscriptionSink,
149 handle: tokio::task::JoinHandle<()>,
150) where
151 T: serde::Serialize + Clone,
152{
153 loop {
154 tokio::select! {
155 action = subscriber.recv() => {
156 match action {
157 Ok(v) => {
158 match jsonrpsee::SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &v) {
159 Ok(msg) => {
160 if let Err(e) = sink.send(msg).await {
161 tracing::error!("Failed to send message: {:?}", e);
162 break;
163 }
164 }
165 Err(e) => {
166 tracing::error!("Failed to serialize message: {:?}", e);
167 break;
168 }
169 }
170 }
171 Err(RecvError::Closed) => {
172 break;
173 }
174 Err(RecvError::Lagged(_)) => {
175 }
176 }
177 }
178 _ = sink.closed() => {
179 break;
180 }
181 }
182 }
183 handle.abort();
184
185 tracing::info!("Subscription task ended (id: {:?})", sink.subscription_id());
186}