forest/rpc/
channel.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3//! Subscription related types and traits for server implementations.
4//!
5//! Most of the code in this module comes from the `jsonrpsee` crate.
6//! See <https://github.com/paritytech/jsonrpsee/blob/v0.21.0/core/src/server/subscription.rs>.
7//! We slightly customized it from the original design to support Filecoin `pubsub` specification.
8//! The principal changed types are the `PendingSubscriptionSink` and `SubscriptionSink`, adding an `u64` channel identifier member.
9//!
10//! The remaining types and methods must be duplicated because they are private.
11//!
12//! The sequence diagram of a channel lifetime is as follows:
13//! ```text
14//!  ┌─────────────┐                                                       ┌─────────────┐
15//!  │  WS Client  │                                                       │    Node     │
16//!  └─────────────┘                                                       └─────────────┘
17//!         │                                                                     │
18//!         │  ┌────────────────────────────────┐                                 │
19//!         │──┤ Subscription message           ├───────────────────────────────▶ │
20//!         │  │                                │                                 │
21//!         │  │{ jsonrpc:'2.0',                │                                 │
22//!         │  │  id:<id>,                      │                                 │
23//!         │  │  method:'Filecoin.ChainNotify',│                                 │
24//!         │  │  params:[] }                   │                                 │
25//!         │  └────────────────────────────────┘                                 │
26//!         │                                 ┌────────────────────────────────┐  │
27//!         │ ◀───────────────────────────────┤ Opened channel message         ├──│
28//!         │                                 │                                │  │
29//!         │                                 │{ jsonrpc:'2.0',                │  │
30//!         │                                 │  result:<channId>,             │  │
31//!         │                                 │  id:<id> }                     │  │
32//!         │                                 └────────────────────────────────┘  │
33//!         │                                                                     │
34//!         │                                                                     │
35//!         │                                 ┌────────────────────────────────┐  │
36//!         │ ◀───────────────────────────────┤ Notification message           ├──│
37//!         │                                 │                                │  │
38//!         │                                 │{ jsonrpc:'2.0',                │  │
39//!         │                                 │  method:'xrpc.ch.val',         │  │
40//!         │                                 │  params:[<channId>,<payload>] }│  │
41//!         │                                 └────────────────────────────────┘  │
42//!         │                                                                     │
43//!         │                                                                     │
44//!         │                                                                     │
45//!         │                      After a few notifications                      │
46//!         │  ┌────────────────────────────────┐                                 │
47//!         │──┤ Cancel subscription            ├───────────────────────────────▶ │
48//!         │  │                                │                                 │
49//!         │  │{ jsonrpc:'2.0',                │                                 │
50//!         │  │  method:'xrpc.cancel',         │                                 │
51//!         │  │  params:[<id>],                │                                 │
52//!         │  │  id:null }                     │                                 │
53//!         │  └────────────────────────────────┘                                 │
54//!         │                                 ┌────────────────────────────────┐  │
55//!         │ ◀───────────────────────────────┤ Closed channel message         ├──│
56//!         │                                 │                                │  │
57//!         │                                 │{ jsonrpc:'2.0',                │  │
58//!         │                                 │  method:'xrpc.ch.close',       │  │
59//!         │                                 │  params:[<channId>] }          │  │
60//!         │                                 └────────────────────────────────┘  │
61//! ```
62
63use ahash::HashMap;
64use jsonrpsee::{
65    ConnectionId, MethodResponse, MethodSink,
66    server::{
67        IntoSubscriptionCloseResponse, MethodCallback, Methods, RegisterMethodError,
68        ResponsePayload,
69    },
70    types::{ErrorObjectOwned, Id, Params, error::ErrorCode},
71};
72use parking_lot::Mutex;
73use serde_json::value::{RawValue, to_raw_value};
74use std::sync::Arc;
75use std::sync::atomic::{AtomicU64, Ordering};
76use tokio::sync::broadcast::error::RecvError;
77use tokio::sync::{mpsc, oneshot};
78
79use super::error::ServerError;
80
81pub const NOTIF_METHOD_NAME: &str = "xrpc.ch.val";
82pub const CANCEL_METHOD_NAME: &str = "xrpc.cancel";
83
84pub type ChannelId = u64;
85
86/// Type-alias for subscribers.
87pub type Subscribers =
88    Arc<Mutex<HashMap<(ConnectionId, Id<'static>), (MethodSink, mpsc::Receiver<()>, ChannelId)>>>;
89
90/// Represents a single subscription that is waiting to be accepted or rejected.
91///
92/// If this is dropped without calling `PendingSubscription::reject` or `PendingSubscriptionSink::accept`
93/// a default error is sent out as response to the subscription call.
94///
95/// Thus, if you want a customized error message then `PendingSubscription::reject` must be called.
96#[derive(Debug)]
97#[must_use = "PendingSubscriptionSink does nothing unless `accept` or `reject` is called"]
98pub struct PendingSubscriptionSink {
99    /// Sink.
100    pub(crate) inner: MethodSink,
101    /// `MethodCallback`.
102    pub(crate) method: &'static str,
103    /// Shared Mutex of subscriptions for this method.
104    pub(crate) subscribers: Subscribers,
105    /// ID of the `subscription call` (i.e. not the same as subscription id) which is used
106    /// to reply to subscription method call and must only be used once.
107    pub(crate) id: Id<'static>,
108    /// Sender to answer the subscribe call.
109    pub(crate) subscribe: oneshot::Sender<MethodResponse>,
110    /// Channel identifier.
111    pub(crate) channel_id: ChannelId,
112    /// Connection identifier.
113    pub(crate) connection_id: ConnectionId,
114}
115
116impl PendingSubscriptionSink {
117    /// Attempt to accept the subscription and respond the subscription method call.
118    ///
119    /// # Panics
120    ///
121    /// Panics if the subscription response exceeded the `max_response_size`.
122    pub async fn accept(self) -> Result<SubscriptionSink, String> {
123        let channel_id = self.channel_id();
124        let id = self.id.clone();
125        let response = MethodResponse::subscription_response(
126            self.id,
127            ResponsePayload::success_borrowed(&channel_id),
128            self.inner.max_response_size() as usize,
129        );
130        let success = response.is_success();
131
132        // Ideally the message should be sent only once.
133        //
134        // The same message is sent twice here because one is sent directly to the transport layer and
135        // the other one is sent internally to accept the subscription.
136        self.inner
137            .send(response.to_json())
138            .await
139            .map_err(|e| e.to_string())?;
140        self.subscribe
141            .send(response)
142            .map_err(|e| format!("accept error: {}", e.as_json()))?;
143
144        if success {
145            let (tx, rx) = mpsc::channel(1);
146            self.subscribers.lock().insert(
147                (self.connection_id, id),
148                (self.inner.clone(), rx, self.channel_id),
149            );
150            tracing::debug!(
151                "Accepting subscription (conn_id={}, chann_id={})",
152                self.connection_id.0,
153                self.channel_id
154            );
155            Ok(SubscriptionSink {
156                inner: self.inner,
157                method: self.method,
158                unsubscribe: IsUnsubscribed(tx),
159                channel_id: self.channel_id,
160            })
161        } else {
162            panic!(
163                "The subscription response was too big; adjust the `max_response_size` or change Subscription ID generation"
164            );
165        }
166    }
167
168    /// Returns the channel identifier
169    pub fn channel_id(&self) -> ChannelId {
170        self.channel_id
171    }
172}
173
174/// Represents a subscription until it is unsubscribed.
175#[derive(Debug, Clone)]
176pub struct IsUnsubscribed(mpsc::Sender<()>);
177
178impl IsUnsubscribed {
179    /// Wrapper over [`tokio::sync::mpsc::Sender::closed`]
180    pub async fn unsubscribed(&self) {
181        self.0.closed().await;
182    }
183}
184
185/// Represents a single subscription that hasn't been processed yet.
186#[derive(Debug, Clone)]
187pub struct SubscriptionSink {
188    /// Sink.
189    inner: MethodSink,
190    /// `MethodCallback`.
191    method: &'static str,
192    /// A future that fires once the unsubscribe method has been called.
193    unsubscribe: IsUnsubscribed,
194    /// Channel identifier.
195    channel_id: ChannelId,
196}
197
198impl SubscriptionSink {
199    /// Get the method name.
200    pub fn method_name(&self) -> &str {
201        self.method
202    }
203
204    /// Get the channel ID.
205    pub fn channel_id(&self) -> ChannelId {
206        self.channel_id
207    }
208
209    /// Send out a response on the subscription and wait until there is capacity.
210    ///
211    ///
212    /// Returns
213    /// - `Ok(())` if the message could be sent.
214    /// - `Err(unsent_msg)` if the connection or subscription was closed.
215    ///
216    /// # Cancel safety
217    ///
218    /// This method is cancel-safe and dropping a future loses its spot in the waiting queue.
219    pub async fn send(&self, msg: Box<serde_json::value::RawValue>) -> Result<(), String> {
220        // Only possible to trigger when the connection is dropped.
221        if self.is_closed() {
222            return Err(format!("disconnect error: {msg}"));
223        }
224
225        self.inner.send(msg).await.map_err(|e| e.to_string())
226    }
227
228    /// Returns whether the subscription is closed.
229    pub fn is_closed(&self) -> bool {
230        self.inner.is_closed()
231    }
232
233    /// Completes when the subscription has been closed.
234    pub async fn closed(&self) {
235        // Both are cancel-safe thus ok to use select here.
236        tokio::select! {
237            _ = self.inner.closed() => (),
238            _ = self.unsubscribe.unsubscribed() => (),
239        }
240    }
241}
242
243fn create_notif_message(
244    sink: &SubscriptionSink,
245    result: &impl serde::Serialize,
246) -> anyhow::Result<Box<RawValue>> {
247    let method = sink.method_name();
248    let channel_id = sink.channel_id();
249    let result = serde_json::to_value(result)?;
250    let msg = serde_json::json!({
251        "jsonrpc": "2.0",
252        "method": method,
253        "params": [channel_id, result]
254    });
255
256    tracing::debug!("Sending notification: {}", msg);
257
258    Ok(to_raw_value(&msg)?)
259}
260
261fn close_payload(channel_id: ChannelId) -> serde_json::Value {
262    serde_json::json!({
263        "jsonrpc":"2.0",
264        "method":"xrpc.ch.close",
265        "params":[channel_id]
266    })
267}
268
269fn close_channel_response(channel_id: ChannelId) -> MethodResponse {
270    MethodResponse::response(
271        Id::Null,
272        ResponsePayload::success(close_payload(channel_id)),
273        1024,
274    )
275}
276
277#[derive(Debug, Clone)]
278pub struct RpcModule {
279    id_provider: Arc<AtomicU64>,
280    channels: Subscribers,
281    methods: Methods,
282}
283
284impl From<RpcModule> for Methods {
285    fn from(module: RpcModule) -> Methods {
286        module.methods
287    }
288}
289
290impl Default for RpcModule {
291    fn default() -> Self {
292        let mut methods = Methods::default();
293
294        let channels = Subscribers::default();
295        methods
296            .verify_and_insert(
297                CANCEL_METHOD_NAME,
298                MethodCallback::Unsubscription(Arc::new({
299                    let channels = channels.clone();
300                    move |id,
301                          params: Params,
302                          connection_id: ConnectionId,
303                          _max_response,
304                          _extensions| {
305                        let cb = || {
306                            let arr: [Id<'_>; 1] = params.parse()?;
307                            let sub_id = arr[0].clone().into_owned();
308
309                            tracing::debug!("Got cancel request (id={sub_id})");
310
311                            let opt = channels.lock().remove(&(connection_id, sub_id));
312                            match opt {
313                                Some((_, _, channel_id)) => {
314                                    Ok::<ChannelId, ServerError>(channel_id)
315                                }
316                                None => Err::<ChannelId, ServerError>(ServerError::from(
317                                    anyhow::anyhow!("channel not found"),
318                                )),
319                            }
320                        };
321                        let result = cb();
322                        match result {
323                            Ok(channel_id) => {
324                                let resp = close_channel_response(channel_id);
325                                tracing::debug!("Sending close message: {}", resp.as_json());
326                                resp
327                            }
328                            Err(e) => {
329                                let error: ErrorObjectOwned = e.into();
330                                MethodResponse::error(id, error)
331                            }
332                        }
333                    }
334                })),
335            )
336            .expect("Inserting a method into an empty methods map is infallible.");
337
338        Self {
339            id_provider: Arc::new(AtomicU64::new(0)),
340            channels,
341            methods,
342        }
343    }
344}
345
346impl RpcModule {
347    pub fn register_channel<R, F>(
348        &mut self,
349        subscribe_method_name: &'static str,
350        callback: F,
351    ) -> Result<&mut MethodCallback, RegisterMethodError>
352    where
353        F: (Fn(Params) -> tokio::sync::broadcast::Receiver<R>) + Send + Sync + 'static,
354        R: serde::Serialize + Clone + Send + 'static,
355    {
356        self.register_channel_raw(subscribe_method_name, {
357            move |params, pending| {
358                let mut receiver = callback(params);
359                tokio::spawn(async move {
360                    let sink = pending.accept().await.unwrap();
361                    tracing::debug!("Channel created: chann_id={}", sink.channel_id);
362
363                    loop {
364                        tokio::select! {
365                            action = receiver.recv() => {
366                                match action {
367                                    Ok(msg) => {
368                                        match create_notif_message(&sink, &msg) {
369                                            Ok(msg) => {
370                                                if let Err(e) = sink.send(msg).await {
371                                                    tracing::error!("Failed to send message: {:?}", e);
372                                                    break;
373                                                }
374                                            }
375                                            Err(e) => {
376                                                tracing::error!("Failed to serialize channel message: {:?}", e);
377                                                break;
378                                            }
379                                        }
380                                    }
381                                    Err(RecvError::Closed) => {
382                                        if let Ok(payload) = to_raw_value(&close_payload(sink.channel_id())) {
383                                            let _ = sink.send(payload).await;
384                                        }
385                                        break;
386                                    }
387                                    Err(RecvError::Lagged(_)) => {
388                                    }
389                                }
390                            },
391                            _ = sink.closed() => {
392                                break;
393                            }
394                        }
395                    }
396
397                    tracing::debug!("Send notification task ended (chann_id={})", sink.channel_id);
398                });
399            }
400        })
401    }
402
403    fn register_channel_raw<R, F>(
404        &mut self,
405        subscribe_method_name: &'static str,
406        callback: F,
407    ) -> Result<&mut MethodCallback, RegisterMethodError>
408    where
409        F: (Fn(Params, PendingSubscriptionSink) -> R) + Send + Sync + 'static,
410        R: IntoSubscriptionCloseResponse,
411    {
412        self.methods.verify_method_name(subscribe_method_name)?;
413        let subscribers = self.channels.clone();
414
415        // Subscribe
416        self.methods.verify_and_insert(
417            subscribe_method_name,
418            MethodCallback::Subscription(Arc::new({
419                let id_provider = self.id_provider.clone();
420                move |id, params, method_sink, conn, _extensions| {
421                    let channel_id = id_provider.fetch_add(1, Ordering::Relaxed);
422
423                    // response to the subscription call.
424                    let (tx, rx) = oneshot::channel();
425
426                    let sink = PendingSubscriptionSink {
427                        inner: method_sink.clone(),
428                        method: NOTIF_METHOD_NAME,
429                        subscribers: subscribers.clone(),
430                        id: id.clone().into_owned(),
431                        subscribe: tx,
432                        channel_id,
433                        connection_id: conn.conn_id,
434                    };
435
436                    callback(params, sink);
437
438                    let id = id.clone().into_owned();
439
440                    Box::pin(async move {
441                        match rx.await {
442                            Ok(rp) => rp,
443                            Err(_) => MethodResponse::error(id, ErrorCode::InternalError),
444                        }
445                    })
446                }
447            })),
448        )
449    }
450}