forest/rpc/channel.rs
1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3//! Subscription related types and traits for server implementations.
4//!
5//! Most of the code in this module comes from the `jsonrpsee` crate.
6//! See <https://github.com/paritytech/jsonrpsee/blob/v0.21.0/core/src/server/subscription.rs>.
7//! We slightly customized it from the original design to support Filecoin `pubsub` specification.
8//! The principal changed types are the `PendingSubscriptionSink` and `SubscriptionSink`, adding an `u64` channel identifier member.
9//!
10//! The remaining types and methods must be duplicated because they are private.
11//!
12//! The sequence diagram of a channel lifetime is as follows:
13//! ```text
14//! ┌─────────────┐ ┌─────────────┐
15//! │ WS Client │ │ Node │
16//! └─────────────┘ └─────────────┘
17//! │ │
18//! │ ┌────────────────────────────────┐ │
19//! │──┤ Subscription message ├───────────────────────────────▶ │
20//! │ │ │ │
21//! │ │{ jsonrpc:'2.0', │ │
22//! │ │ id:<id>, │ │
23//! │ │ method:'Filecoin.ChainNotify',│ │
24//! │ │ params:[] } │ │
25//! │ └────────────────────────────────┘ │
26//! │ ┌────────────────────────────────┐ │
27//! │ ◀───────────────────────────────┤ Opened channel message ├──│
28//! │ │ │ │
29//! │ │{ jsonrpc:'2.0', │ │
30//! │ │ result:<channId>, │ │
31//! │ │ id:<id> } │ │
32//! │ └────────────────────────────────┘ │
33//! │ │
34//! │ │
35//! │ ┌────────────────────────────────┐ │
36//! │ ◀───────────────────────────────┤ Notification message ├──│
37//! │ │ │ │
38//! │ │{ jsonrpc:'2.0', │ │
39//! │ │ method:'xrpc.ch.val', │ │
40//! │ │ params:[<channId>,<payload>] }│ │
41//! │ └────────────────────────────────┘ │
42//! │ │
43//! │ │
44//! │ │
45//! │ After a few notifications │
46//! │ ┌────────────────────────────────┐ │
47//! │──┤ Cancel subscription ├───────────────────────────────▶ │
48//! │ │ │ │
49//! │ │{ jsonrpc:'2.0', │ │
50//! │ │ method:'xrpc.cancel', │ │
51//! │ │ params:[<id>], │ │
52//! │ │ id:null } │ │
53//! │ └────────────────────────────────┘ │
54//! │ ┌────────────────────────────────┐ │
55//! │ ◀───────────────────────────────┤ Closed channel message ├──│
56//! │ │ │ │
57//! │ │{ jsonrpc:'2.0', │ │
58//! │ │ method:'xrpc.ch.close', │ │
59//! │ │ params:[<channId>] } │ │
60//! │ └────────────────────────────────┘ │
61//! ```
62
63use ahash::HashMap;
64use jsonrpsee::{
65 ConnectionId, MethodResponse, MethodSink,
66 server::{
67 IntoSubscriptionCloseResponse, MethodCallback, Methods, RegisterMethodError,
68 ResponsePayload,
69 },
70 types::{ErrorObjectOwned, Id, Params, error::ErrorCode},
71};
72use parking_lot::Mutex;
73use serde_json::value::{RawValue, to_raw_value};
74use std::sync::Arc;
75use std::sync::atomic::{AtomicU64, Ordering};
76use tokio::sync::broadcast::error::RecvError;
77use tokio::sync::{mpsc, oneshot};
78
79use super::error::ServerError;
80
81pub const NOTIF_METHOD_NAME: &str = "xrpc.ch.val";
82pub const CANCEL_METHOD_NAME: &str = "xrpc.cancel";
83
84pub type ChannelId = u64;
85
86/// Type-alias for subscribers.
87pub type Subscribers =
88 Arc<Mutex<HashMap<(ConnectionId, Id<'static>), (MethodSink, mpsc::Receiver<()>, ChannelId)>>>;
89
90/// Represents a single subscription that is waiting to be accepted or rejected.
91///
92/// If this is dropped without calling `PendingSubscription::reject` or `PendingSubscriptionSink::accept`
93/// a default error is sent out as response to the subscription call.
94///
95/// Thus, if you want a customized error message then `PendingSubscription::reject` must be called.
96#[derive(Debug)]
97#[must_use = "PendingSubscriptionSink does nothing unless `accept` or `reject` is called"]
98pub struct PendingSubscriptionSink {
99 /// Sink.
100 pub(crate) inner: MethodSink,
101 /// `MethodCallback`.
102 pub(crate) method: &'static str,
103 /// Shared Mutex of subscriptions for this method.
104 pub(crate) subscribers: Subscribers,
105 /// ID of the `subscription call` (i.e. not the same as subscription id) which is used
106 /// to reply to subscription method call and must only be used once.
107 pub(crate) id: Id<'static>,
108 /// Sender to answer the subscribe call.
109 pub(crate) subscribe: oneshot::Sender<MethodResponse>,
110 /// Channel identifier.
111 pub(crate) channel_id: ChannelId,
112 /// Connection identifier.
113 pub(crate) connection_id: ConnectionId,
114}
115
116impl PendingSubscriptionSink {
117 /// Attempt to accept the subscription and respond the subscription method call.
118 ///
119 /// # Panics
120 ///
121 /// Panics if the subscription response exceeded the `max_response_size`.
122 pub async fn accept(self) -> Result<SubscriptionSink, String> {
123 let channel_id = self.channel_id();
124 let id = self.id.clone();
125 let response = MethodResponse::subscription_response(
126 self.id,
127 ResponsePayload::success_borrowed(&channel_id),
128 self.inner.max_response_size() as usize,
129 );
130 let success = response.is_success();
131
132 // Ideally the message should be sent only once.
133 //
134 // The same message is sent twice here because one is sent directly to the transport layer and
135 // the other one is sent internally to accept the subscription.
136 self.inner
137 .send(response.to_json())
138 .await
139 .map_err(|e| e.to_string())?;
140 self.subscribe
141 .send(response)
142 .map_err(|e| format!("accept error: {}", e.as_json()))?;
143
144 if success {
145 let (tx, rx) = mpsc::channel(1);
146 self.subscribers.lock().insert(
147 (self.connection_id, id),
148 (self.inner.clone(), rx, self.channel_id),
149 );
150 tracing::debug!(
151 "Accepting subscription (conn_id={}, chann_id={})",
152 self.connection_id.0,
153 self.channel_id
154 );
155 Ok(SubscriptionSink {
156 inner: self.inner,
157 method: self.method,
158 unsubscribe: IsUnsubscribed(tx),
159 channel_id: self.channel_id,
160 })
161 } else {
162 panic!(
163 "The subscription response was too big; adjust the `max_response_size` or change Subscription ID generation"
164 );
165 }
166 }
167
168 /// Returns the channel identifier
169 pub fn channel_id(&self) -> ChannelId {
170 self.channel_id
171 }
172}
173
174/// Represents a subscription until it is unsubscribed.
175#[derive(Debug, Clone)]
176pub struct IsUnsubscribed(mpsc::Sender<()>);
177
178impl IsUnsubscribed {
179 /// Wrapper over [`tokio::sync::mpsc::Sender::closed`]
180 pub async fn unsubscribed(&self) {
181 self.0.closed().await;
182 }
183}
184
185/// Represents a single subscription that hasn't been processed yet.
186#[derive(Debug, Clone)]
187pub struct SubscriptionSink {
188 /// Sink.
189 inner: MethodSink,
190 /// `MethodCallback`.
191 method: &'static str,
192 /// A future that fires once the unsubscribe method has been called.
193 unsubscribe: IsUnsubscribed,
194 /// Channel identifier.
195 channel_id: ChannelId,
196}
197
198impl SubscriptionSink {
199 /// Get the method name.
200 pub fn method_name(&self) -> &str {
201 self.method
202 }
203
204 /// Get the channel ID.
205 pub fn channel_id(&self) -> ChannelId {
206 self.channel_id
207 }
208
209 /// Send out a response on the subscription and wait until there is capacity.
210 ///
211 ///
212 /// Returns
213 /// - `Ok(())` if the message could be sent.
214 /// - `Err(unsent_msg)` if the connection or subscription was closed.
215 ///
216 /// # Cancel safety
217 ///
218 /// This method is cancel-safe and dropping a future loses its spot in the waiting queue.
219 pub async fn send(&self, msg: Box<serde_json::value::RawValue>) -> Result<(), String> {
220 // Only possible to trigger when the connection is dropped.
221 if self.is_closed() {
222 return Err(format!("disconnect error: {msg}"));
223 }
224
225 self.inner.send(msg).await.map_err(|e| e.to_string())
226 }
227
228 /// Returns whether the subscription is closed.
229 pub fn is_closed(&self) -> bool {
230 self.inner.is_closed()
231 }
232
233 /// Completes when the subscription has been closed.
234 pub async fn closed(&self) {
235 // Both are cancel-safe thus ok to use select here.
236 tokio::select! {
237 _ = self.inner.closed() => (),
238 _ = self.unsubscribe.unsubscribed() => (),
239 }
240 }
241}
242
243fn create_notif_message(
244 sink: &SubscriptionSink,
245 result: &impl serde::Serialize,
246) -> anyhow::Result<Box<RawValue>> {
247 let method = sink.method_name();
248 let channel_id = sink.channel_id();
249 let result = serde_json::to_value(result)?;
250 let msg = serde_json::json!({
251 "jsonrpc": "2.0",
252 "method": method,
253 "params": [channel_id, result]
254 });
255
256 tracing::debug!("Sending notification: {}", msg);
257
258 Ok(to_raw_value(&msg)?)
259}
260
261fn close_payload(channel_id: ChannelId) -> serde_json::Value {
262 serde_json::json!({
263 "jsonrpc":"2.0",
264 "method":"xrpc.ch.close",
265 "params":[channel_id]
266 })
267}
268
269fn close_channel_response(channel_id: ChannelId) -> MethodResponse {
270 MethodResponse::response(
271 Id::Null,
272 ResponsePayload::success(close_payload(channel_id)),
273 1024,
274 )
275}
276
277#[derive(Debug, Clone)]
278pub struct RpcModule {
279 id_provider: Arc<AtomicU64>,
280 channels: Subscribers,
281 methods: Methods,
282}
283
284impl From<RpcModule> for Methods {
285 fn from(module: RpcModule) -> Methods {
286 module.methods
287 }
288}
289
290impl Default for RpcModule {
291 fn default() -> Self {
292 let mut methods = Methods::default();
293
294 let channels = Subscribers::default();
295 methods
296 .verify_and_insert(
297 CANCEL_METHOD_NAME,
298 MethodCallback::Unsubscription(Arc::new({
299 let channels = channels.clone();
300 move |id,
301 params: Params,
302 connection_id: ConnectionId,
303 _max_response,
304 _extensions| {
305 let cb = || {
306 let arr: [Id<'_>; 1] = params.parse()?;
307 let sub_id = arr[0].clone().into_owned();
308
309 tracing::debug!("Got cancel request (id={sub_id})");
310
311 let opt = channels.lock().remove(&(connection_id, sub_id));
312 match opt {
313 Some((_, _, channel_id)) => {
314 Ok::<ChannelId, ServerError>(channel_id)
315 }
316 None => Err::<ChannelId, ServerError>(ServerError::from(
317 anyhow::anyhow!("channel not found"),
318 )),
319 }
320 };
321 let result = cb();
322 match result {
323 Ok(channel_id) => {
324 let resp = close_channel_response(channel_id);
325 tracing::debug!("Sending close message: {}", resp.as_json());
326 resp
327 }
328 Err(e) => {
329 let error: ErrorObjectOwned = e.into();
330 MethodResponse::error(id, error)
331 }
332 }
333 }
334 })),
335 )
336 .expect("Inserting a method into an empty methods map is infallible.");
337
338 Self {
339 id_provider: Arc::new(AtomicU64::new(0)),
340 channels,
341 methods,
342 }
343 }
344}
345
346impl RpcModule {
347 pub fn register_channel<R, F>(
348 &mut self,
349 subscribe_method_name: &'static str,
350 callback: F,
351 ) -> Result<&mut MethodCallback, RegisterMethodError>
352 where
353 F: (Fn(Params) -> tokio::sync::broadcast::Receiver<R>) + Send + Sync + 'static,
354 R: serde::Serialize + Clone + Send + 'static,
355 {
356 self.register_channel_raw(subscribe_method_name, {
357 move |params, pending| {
358 let mut receiver = callback(params);
359 tokio::spawn(async move {
360 let sink = pending.accept().await.unwrap();
361 tracing::debug!("Channel created: chann_id={}", sink.channel_id);
362
363 loop {
364 tokio::select! {
365 action = receiver.recv() => {
366 match action {
367 Ok(msg) => {
368 match create_notif_message(&sink, &msg) {
369 Ok(msg) => {
370 if let Err(e) = sink.send(msg).await {
371 tracing::error!("Failed to send message: {:?}", e);
372 break;
373 }
374 }
375 Err(e) => {
376 tracing::error!("Failed to serialize channel message: {:?}", e);
377 break;
378 }
379 }
380 }
381 Err(RecvError::Closed) => {
382 if let Ok(payload) = to_raw_value(&close_payload(sink.channel_id())) {
383 let _ = sink.send(payload).await;
384 }
385 break;
386 }
387 Err(RecvError::Lagged(_)) => {
388 }
389 }
390 },
391 _ = sink.closed() => {
392 break;
393 }
394 }
395 }
396
397 tracing::debug!("Send notification task ended (chann_id={})", sink.channel_id);
398 });
399 }
400 })
401 }
402
403 fn register_channel_raw<R, F>(
404 &mut self,
405 subscribe_method_name: &'static str,
406 callback: F,
407 ) -> Result<&mut MethodCallback, RegisterMethodError>
408 where
409 F: (Fn(Params, PendingSubscriptionSink) -> R) + Send + Sync + 'static,
410 R: IntoSubscriptionCloseResponse,
411 {
412 self.methods.verify_method_name(subscribe_method_name)?;
413 let subscribers = self.channels.clone();
414
415 // Subscribe
416 self.methods.verify_and_insert(
417 subscribe_method_name,
418 MethodCallback::Subscription(Arc::new({
419 let id_provider = self.id_provider.clone();
420 move |id, params, method_sink, conn, _extensions| {
421 let channel_id = id_provider.fetch_add(1, Ordering::Relaxed);
422
423 // response to the subscription call.
424 let (tx, rx) = oneshot::channel();
425
426 let sink = PendingSubscriptionSink {
427 inner: method_sink.clone(),
428 method: NOTIF_METHOD_NAME,
429 subscribers: subscribers.clone(),
430 id: id.clone().into_owned(),
431 subscribe: tx,
432 channel_id,
433 connection_id: conn.conn_id,
434 };
435
436 callback(params, sink);
437
438 let id = id.clone().into_owned();
439
440 Box::pin(async move {
441 match rx.await {
442 Ok(rp) => rp,
443 Err(_) => MethodResponse::error(id, ErrorCode::InternalError),
444 }
445 })
446 }
447 })),
448 )
449 }
450}