forest/rpc/methods/eth/
pubsub.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4//! Official documentation for the Ethereum pubsub protocol is available at:
5//! https://geth.ethereum.org/docs/interacting-with-geth/rpc/pubsub
6//!
7//! Note that Filecoin uses this protocol without modifications.
8//!
9//! The sequence diagram for an event subscription is shown below:
10//! ```text
11//!  ┌─────────────┐                                                       ┌─────────────┐
12//!  │  WS Client  │                                                       │    Node     │
13//!  └─────────────┘                                                       └─────────────┘
14//!         │                                                                     │
15//!         │  ┌────────────────────────────────┐                                 │
16//!         │──┤ Subscription message           ├───────────────────────────────▶ │
17//!         │  │                                │                                 │
18//!         │  │{ jsonrpc:'2.0',                │                                 │
19//!         │  │  id:<id>,                      │                                 │
20//!         │  │  method:'eth_subscribe',       │                                 │
21//!         │  │  params:[<eventType>] }        │                                 │
22//!         │  └────────────────────────────────┘                                 │
23//!         │                                 ┌────────────────────────────────┐  │
24//!         │ ◀───────────────────────────────┤ Opened subscription message    ├──│
25//!         │                                 │                                │  │
26//!         │                                 │{ jsonrpc:'2.0',                │  │
27//!         │                                 │  id:<id>,                      │  │
28//!         │                                 │  result:<subId> }              │  │
29//!         │                                 └────────────────────────────────┘  │
30//!         │                                                                     │
31//!         │                                                                     │
32//!         │                                 ┌────────────────────────────────┐  │
33//!         │ ◀───────────────────────────────┤ Notification message           ├──│
34//!         │                                 │                                │  │
35//!         │                                 │{ jsonrpc:'2.0',                │  │
36//!         │                                 │  method:'eth_subscription',    │  │
37//!         │                                 │  params:{ subscription:<subId>,│  │
38//!         │                                 │           result:<payload> } } │  │
39//!         │                                 └────────────────────────────────┘  │
40//!         │                                                                     │
41//!         │                                                                     │
42//!         │                                                                     │
43//!         │                      After a few notifications                      │
44//!         │  ┌────────────────────────────────┐                                 │
45//!         │──┤ Cancel subscription            ├───────────────────────────────▶ │
46//!         │  │                                │                                 │
47//!         │  │{ jsonrpc:'2.0',                │                                 │
48//!         │  │  id:<id>,                      │                                 │
49//!         │  │  method:'eth_unsubscribe',     │                                 │
50//!         │  │  params:[<subId>] }            │                                 │
51//!         │  └────────────────────────────────┘                                 │
52//!         │                                 ┌────────────────────────────────┐  │
53//!         │ ◀───────────────────────────────┤ Closed subscription message    ├──│
54//!         │                                 │                                │  │
55//!         │                                 │{ jsonrpc:'2.0',                │  │
56//!         │                                 │  id:<id>,                      │  │
57//!         │                                 │  result:true }                 │  │
58//!         │                                 └────────────────────────────────┘  │
59//! ```
60//!
61
62use crate::rpc::eth::pubsub_trait::{
63    EthPubSubApiServer, LogFilter, SubscriptionKind, SubscriptionParams,
64};
65use crate::rpc::{RPCState, chain};
66use fvm_ipld_blockstore::Blockstore;
67use jsonrpsee::PendingSubscriptionSink;
68use jsonrpsee::core::{SubscriptionError, SubscriptionResult};
69use std::sync::Arc;
70use tokio::sync::broadcast::{Receiver as Subscriber, error::RecvError};
71
72pub struct EthPubSub<DB> {
73    ctx: Arc<RPCState<DB>>,
74}
75
76impl<DB> EthPubSub<DB> {
77    pub fn new(ctx: Arc<RPCState<DB>>) -> Self {
78        Self { ctx }
79    }
80}
81
82#[async_trait::async_trait]
83impl<DB> EthPubSubApiServer for EthPubSub<DB>
84where
85    DB: Blockstore + Send + Sync + 'static,
86{
87    async fn subscribe(
88        &self,
89        pending: PendingSubscriptionSink,
90        kind: SubscriptionKind,
91        params: Option<SubscriptionParams>,
92    ) -> SubscriptionResult {
93        let sink = pending.accept().await?;
94        let ctx = self.ctx.clone();
95
96        match kind {
97            SubscriptionKind::NewHeads => self.handle_new_heads_subscription(sink, ctx).await,
98            SubscriptionKind::PendingTransactions => {
99                return Err(SubscriptionError::from(
100                    jsonrpsee::types::ErrorObjectOwned::owned(
101                        jsonrpsee::types::error::METHOD_NOT_FOUND_CODE,
102                        "pendingTransactions subscription not yet implemented",
103                        None::<()>,
104                    ),
105                ));
106            }
107            SubscriptionKind::Logs => {
108                let filter = params.and_then(|p| p.filter);
109                self.handle_logs_subscription(sink, ctx, filter).await
110            }
111        }
112
113        Ok(())
114    }
115}
116
117impl<DB> EthPubSub<DB>
118where
119    DB: Blockstore + Send + Sync + 'static,
120{
121    async fn handle_new_heads_subscription(
122        &self,
123        accepted_sink: jsonrpsee::SubscriptionSink,
124        ctx: Arc<RPCState<DB>>,
125    ) {
126        let (subscriber, handle) = chain::new_heads(&ctx);
127        tokio::spawn(async move {
128            handle_subscription(subscriber, accepted_sink, handle).await;
129        });
130    }
131
132    async fn handle_logs_subscription(
133        &self,
134        accepted_sink: jsonrpsee::SubscriptionSink,
135        ctx: Arc<RPCState<DB>>,
136        filter_spec: Option<LogFilter>,
137    ) {
138        let filter_spec = filter_spec.map(Into::into);
139        let (logs, handle) = chain::logs(&ctx, filter_spec);
140        tokio::spawn(async move {
141            handle_subscription(logs, accepted_sink, handle).await;
142        });
143    }
144}
145
146async fn handle_subscription<T>(
147    mut subscriber: Subscriber<T>,
148    sink: jsonrpsee::SubscriptionSink,
149    handle: tokio::task::JoinHandle<()>,
150) where
151    T: serde::Serialize + Clone,
152{
153    loop {
154        tokio::select! {
155            action = subscriber.recv() => {
156                match action {
157                    Ok(v) => {
158                        match jsonrpsee::SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &v) {
159                            Ok(msg) => {
160                                if let Err(e) = sink.send(msg).await {
161                                    tracing::error!("Failed to send message: {:?}", e);
162                                    break;
163                                }
164                            }
165                            Err(e) => {
166                                tracing::error!("Failed to serialize message: {:?}", e);
167                                break;
168                            }
169                        }
170                    }
171                    Err(RecvError::Closed) => {
172                        break;
173                    }
174                    Err(RecvError::Lagged(_)) => {
175                    }
176                }
177            }
178            _ = sink.closed() => {
179                break;
180            }
181        }
182    }
183    handle.abort();
184
185    tracing::info!("Subscription task ended (id: {:?})", sink.subscription_id());
186}