jsonrpsee_core/server/
subscription.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27//! Subscription related types and traits for server implementations.
28
29use super::helpers::MethodSink;
30use super::{MethodResponse, MethodsError, ResponsePayload};
31use crate::server::LOG_TARGET;
32use crate::server::error::{DisconnectError, PendingSubscriptionAcceptError, SendTimeoutError, TrySendError};
33use crate::server::rpc_module::ConnectionId;
34use crate::{error::SubscriptionError, traits::IdProvider};
35use jsonrpsee_types::SubscriptionPayload;
36use jsonrpsee_types::response::SubscriptionPayloadError;
37use jsonrpsee_types::{ErrorObjectOwned, Id, SubscriptionId, SubscriptionResponse};
38use parking_lot::Mutex;
39use rustc_hash::FxHashMap;
40use serde::{Serialize, de::DeserializeOwned};
41use serde_json::value::RawValue;
42use std::{sync::Arc, time::Duration};
43use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot};
44
45/// Type-alias for subscribers.
46pub type Subscribers = Arc<Mutex<FxHashMap<SubscriptionKey, (MethodSink, mpsc::Receiver<()>)>>>;
47/// Subscription permit.
48pub type SubscriptionPermit = OwnedSemaphorePermit;
49
50/// Convert something into a subscription close notification
51/// before a subscription is terminated.
52pub trait IntoSubscriptionCloseResponse {
53	/// Convert something into a subscription response
54	fn into_response(self) -> SubscriptionCloseResponse;
55}
56
57/// Represents what action that will sent when a subscription callback returns.
58#[derive(Debug)]
59pub enum SubscriptionCloseResponse {
60	/// No further message will be sent.
61	None,
62	/// Send a subscription notification.
63	///
64	/// The subscription notification has the following format:
65	///
66	/// ```json
67	/// {
68	///  "jsonrpc": "2.0",
69	///  "method": "<method>",
70	///  "params": {
71	///    "subscription": "<subscriptionID>",
72	///    "result": <your msg>
73	///    }
74	///  }
75	/// }
76	/// ```
77	Notif(SubscriptionMessage),
78	/// Send a subscription error notification
79	///
80	/// The error notification has the following format:
81	///
82	/// ```json
83	/// {
84	///  "jsonrpc": "2.0",
85	///  "method": "<method>",
86	///  "params": {
87	///    "subscription": "<subscriptionID>",
88	///    "error": <your msg>
89	///    }
90	///  }
91	/// }
92	/// ```
93	NotifErr(SubscriptionError),
94}
95
96impl IntoSubscriptionCloseResponse for Result<(), SubscriptionError> {
97	fn into_response(self) -> SubscriptionCloseResponse {
98		match self {
99			Ok(()) => SubscriptionCloseResponse::None,
100			Err(e) => SubscriptionCloseResponse::NotifErr(e),
101		}
102	}
103}
104
105impl IntoSubscriptionCloseResponse for () {
106	fn into_response(self) -> SubscriptionCloseResponse {
107		SubscriptionCloseResponse::None
108	}
109}
110
111impl IntoSubscriptionCloseResponse for SubscriptionCloseResponse {
112	fn into_response(self) -> Self {
113		self
114	}
115}
116
117/// A complete subscription message or partial subscription message.
118#[derive(Debug, Clone)]
119pub enum SubscriptionMessageInner {
120	/// Complete JSON message.
121	Complete(Box<RawValue>),
122	/// Need subscription ID and method name.
123	NeedsData(Box<RawValue>),
124}
125
126/// Subscription message.
127#[derive(Debug, Clone)]
128pub struct SubscriptionMessage(pub(crate) SubscriptionMessageInner);
129
130impl From<Box<RawValue>> for SubscriptionMessage {
131	fn from(json: Box<RawValue>) -> Self {
132		Self(SubscriptionMessageInner::NeedsData(json))
133	}
134}
135
136impl SubscriptionMessage {
137	/// Create a subscription message this is more efficient than [`SubscriptionMessage::from`]
138	/// because it only allocates once.
139	///
140	/// Fails if the json `result` couldn't be serialized.
141	pub fn new(method: &str, subscription: SubscriptionId, result: &impl Serialize) -> Result<Self, serde_json::Error> {
142		let json = serde_json::value::to_raw_value(&SubscriptionResponse::new(
143			method.into(),
144			SubscriptionPayload { subscription, result },
145		))?;
146		Ok(Self::from_complete_message(json))
147	}
148
149	pub(crate) fn from_complete_message(msg: Box<RawValue>) -> Self {
150		SubscriptionMessage(SubscriptionMessageInner::Complete(msg))
151	}
152}
153
154/// Represent a unique subscription entry based on [`SubscriptionId`] and [`ConnectionId`].
155#[derive(Clone, Debug, PartialEq, Eq, Hash)]
156pub struct SubscriptionKey {
157	pub(crate) conn_id: ConnectionId,
158	pub(crate) sub_id: SubscriptionId<'static>,
159}
160
161/// Represents a subscription until it is unsubscribed.
162///
163// NOTE: The reason why we use `mpsc` here is because it allows `IsUnsubscribed::unsubscribed`
164// to be &self instead of &mut self.
165#[derive(Debug, Clone)]
166pub struct IsUnsubscribed(mpsc::Sender<()>);
167
168impl IsUnsubscribed {
169	/// Returns true if the unsubscribe method has been invoked or the subscription has been canceled.
170	///
171	/// This can be called multiple times as the element in the channel is never
172	/// removed.
173	pub fn is_unsubscribed(&self) -> bool {
174		self.0.is_closed()
175	}
176
177	/// Wrapper over [`tokio::sync::mpsc::Sender::closed`]
178	///
179	/// # Cancel safety
180	///
181	/// This method is cancel safe. Once the channel is closed,
182	/// it stays closed forever and all future calls to closed will return immediately.
183	pub async fn unsubscribed(&self) {
184		self.0.closed().await;
185	}
186}
187
188/// Represents a single subscription that is waiting to be accepted or rejected.
189///
190/// If this is dropped without calling `PendingSubscription::reject` or `PendingSubscriptionSink::accept`
191/// a default error is sent out as response to the subscription call.
192///
193/// Thus, if you want a customized error message then `PendingSubscription::reject` must be called.
194#[derive(Debug)]
195#[must_use = "PendingSubscriptionSink does nothing unless `accept` or `reject` is called"]
196pub struct PendingSubscriptionSink {
197	/// Sink.
198	pub(crate) inner: MethodSink,
199	/// MethodCallback.
200	pub(crate) method: &'static str,
201	/// Shared Mutex of subscriptions for this method.
202	pub(crate) subscribers: Subscribers,
203	/// Unique subscription.
204	pub(crate) uniq_sub: SubscriptionKey,
205	/// ID of the `subscription call` (i.e. not the same as subscription id) which is used
206	/// to reply to subscription method call and must only be used once.
207	pub(crate) id: Id<'static>,
208	/// Sender to answer the subscribe call.
209	pub(crate) subscribe: oneshot::Sender<MethodResponse>,
210	/// Subscription permit.
211	pub(crate) permit: OwnedSemaphorePermit,
212}
213
214impl PendingSubscriptionSink {
215	/// Reject the subscription by responding to the subscription method call with
216	/// the error message from [`jsonrpsee_types::error::ErrorObject`].
217	///
218	/// # Note
219	///
220	/// If this is used in the async subscription callback
221	/// the return value is simply ignored because no further notification are propagated
222	/// once reject has been called.
223	pub async fn reject(self, err: impl Into<ErrorObjectOwned>) {
224		let err = MethodResponse::subscription_error(self.id, err.into());
225		_ = self.inner.send(err.to_json()).await;
226		_ = self.subscribe.send(err);
227	}
228
229	/// Attempt to accept the subscription and respond the subscription method call.
230	///
231	/// # Panics
232	///
233	/// Panics if the subscription response exceeded the `max_response_size`.
234	pub async fn accept(self) -> Result<SubscriptionSink, PendingSubscriptionAcceptError> {
235		let response = MethodResponse::subscription_response(
236			self.id,
237			ResponsePayload::success_borrowed(&self.uniq_sub.sub_id),
238			self.inner.max_response_size() as usize,
239		);
240		let success = response.is_success();
241
242		// TODO: #1052
243		//
244		// Ideally the message should be sent only once.
245		//
246		// The same message is sent twice here because one is sent directly to the transport layer and
247		// the other one is sent internally to accept the subscription.
248		self.inner.send(response.to_json()).await.map_err(|_| PendingSubscriptionAcceptError)?;
249		self.subscribe.send(response).map_err(|_| PendingSubscriptionAcceptError)?;
250
251		if success {
252			let (tx, rx) = mpsc::channel(1);
253			self.subscribers.lock().insert(self.uniq_sub.clone(), (self.inner.clone(), rx));
254			Ok(SubscriptionSink {
255				inner: self.inner,
256				method: self.method,
257				subscribers: self.subscribers,
258				uniq_sub: self.uniq_sub,
259				unsubscribe: IsUnsubscribed(tx),
260				_permit: Arc::new(self.permit),
261			})
262		} else {
263			panic!(
264				"The subscription response was too big; adjust the `max_response_size` or change Subscription ID generation"
265			);
266		}
267	}
268
269	/// Returns connection identifier, which was used to perform pending subscription request
270	pub fn connection_id(&self) -> ConnectionId {
271		self.uniq_sub.conn_id
272	}
273
274	/// Get the capacity of the channel.
275	pub fn capacity(&self) -> usize {
276		self.inner.capacity()
277	}
278
279	/// Get the max capacity of the channel.
280	pub fn max_capacity(&self) -> usize {
281		self.inner.max_capacity()
282	}
283
284	/// Get the method name.
285	pub fn method_name(&self) -> &str {
286		self.method
287	}
288}
289
290/// Represents a single subscription that hasn't been processed yet.
291#[derive(Debug, Clone)]
292pub struct SubscriptionSink {
293	/// Sink.
294	inner: MethodSink,
295	/// MethodCallback.
296	method: &'static str,
297	/// Shared Mutex of subscriptions for this method.
298	subscribers: Subscribers,
299	/// Unique subscription.
300	uniq_sub: SubscriptionKey,
301	/// A future to that fires once the unsubscribe method has been called.
302	unsubscribe: IsUnsubscribed,
303	/// Subscription permit
304	_permit: Arc<SubscriptionPermit>,
305}
306
307impl SubscriptionSink {
308	/// Get the subscription ID.
309	pub fn subscription_id(&self) -> SubscriptionId<'static> {
310		self.uniq_sub.sub_id.clone()
311	}
312
313	/// Get the method name.
314	pub fn method_name(&self) -> &str {
315		self.method
316	}
317
318	/// Get the connection ID.
319	pub fn connection_id(&self) -> ConnectionId {
320		self.uniq_sub.conn_id
321	}
322
323	/// Send out a response on the subscription and wait until there is capacity.
324	///
325	///
326	/// Returns
327	/// - `Ok(())` if the message could be sent.
328	/// - `Err(unsent_msg)` if the connection or subscription was closed.
329	///
330	/// # Cancel safety
331	///
332	/// This method is cancel-safe and dropping a future loses its spot in the waiting queue.
333	pub async fn send(&self, msg: impl Into<SubscriptionMessage>) -> Result<(), DisconnectError> {
334		let msg = msg.into();
335
336		// Only possible to trigger when the connection is dropped.
337		if self.is_closed() {
338			return Err(DisconnectError(msg));
339		}
340
341		let json = sub_message_to_json(msg, &self.uniq_sub.sub_id, self.method);
342		self.inner.send(json).await
343	}
344
345	/// Similar to `SubscriptionSink::send` but only waits for a limited time.
346	pub async fn send_timeout(
347		&self,
348		msg: impl Into<SubscriptionMessage>,
349		timeout: Duration,
350	) -> Result<(), SendTimeoutError> {
351		let msg = msg.into();
352
353		// Only possible to trigger when the connection is dropped.
354		if self.is_closed() {
355			return Err(SendTimeoutError::Closed(msg));
356		}
357
358		let json = sub_message_to_json(msg, &self.uniq_sub.sub_id, self.method);
359		self.inner.send_timeout(json, timeout).await
360	}
361
362	/// Attempts to immediately send out the message as JSON string to the subscribers but fails if the
363	/// channel is full or the connection/subscription is closed
364	///
365	///
366	/// This differs from [`SubscriptionSink::send`] where it will until there is capacity
367	/// in the channel.
368	pub fn try_send(&mut self, msg: impl Into<SubscriptionMessage>) -> Result<(), TrySendError> {
369		let msg = msg.into();
370
371		// Only possible to trigger when the connection is dropped.
372		if self.is_closed() {
373			return Err(TrySendError::Closed(msg));
374		}
375
376		let json = sub_message_to_json(msg, &self.uniq_sub.sub_id, self.method);
377		self.inner.try_send(json)
378	}
379
380	/// Returns whether the subscription is closed.
381	pub fn is_closed(&self) -> bool {
382		self.inner.is_closed() || !self.is_active_subscription()
383	}
384
385	/// Completes when the subscription has been closed.
386	pub async fn closed(&self) {
387		// Both are cancel-safe thus ok to use select here.
388		tokio::select! {
389			_ = self.inner.closed() => (),
390			_ = self.unsubscribe.unsubscribed() => (),
391		}
392	}
393
394	/// Get the capacity of the subscription.
395	pub fn capacity(&self) -> usize {
396		self.inner.capacity()
397	}
398
399	/// Get the max capacity of the subscription.
400	pub fn max_capacity(&self) -> usize {
401		self.inner.max_capacity()
402	}
403
404	fn is_active_subscription(&self) -> bool {
405		!self.unsubscribe.is_unsubscribed()
406	}
407}
408
409impl Drop for SubscriptionSink {
410	fn drop(&mut self) {
411		if self.is_active_subscription() {
412			self.subscribers.lock().remove(&self.uniq_sub);
413		}
414	}
415}
416
417/// Wrapper struct that maintains a subscription "mainly" for testing.
418#[derive(Debug)]
419pub struct Subscription {
420	pub(crate) rx: mpsc::Receiver<Box<RawValue>>,
421	pub(crate) sub_id: SubscriptionId<'static>,
422}
423
424impl Subscription {
425	/// Close the subscription channel.
426	pub fn close(&mut self) {
427		tracing::trace!(target: LOG_TARGET, "[Subscription::close] Notifying");
428		self.rx.close();
429	}
430
431	/// Get the subscription ID
432	pub fn subscription_id(&self) -> &SubscriptionId {
433		&self.sub_id
434	}
435
436	/// Receives the next value on the subscription if the value could be decoded as T.
437	pub async fn next<T: DeserializeOwned>(&mut self) -> Option<Result<(T, SubscriptionId<'static>), MethodsError>> {
438		let raw = self.rx.recv().await?;
439
440		tracing::debug!(target: LOG_TARGET, "[Subscription::next]: rx {}", raw);
441
442		// clippy complains about this but it doesn't compile without the extra res binding.
443		#[allow(clippy::let_and_return)]
444		let res = match serde_json::from_str::<SubscriptionResponse<T>>(raw.get()) {
445			Ok(r) => Some(Ok((r.params.result, r.params.subscription.into_owned()))),
446			Err(e) => {
447				match serde_json::from_str::<jsonrpsee_types::response::SubscriptionError<&RawValue>>(raw.get()) {
448					Ok(_) => None,
449					Err(_) => Some(Err(e.into())),
450				}
451			}
452		};
453		res
454	}
455}
456
457impl Drop for Subscription {
458	fn drop(&mut self) {
459		self.close();
460	}
461}
462
463/// This wraps [`tokio::sync::Semaphore`] and is used to limit the number of subscriptions per connection.
464#[derive(Debug, Clone)]
465pub struct BoundedSubscriptions {
466	guard: Arc<Semaphore>,
467	max: u32,
468}
469
470impl BoundedSubscriptions {
471	/// Create a new bounded subscription.
472	pub fn new(max_subscriptions: u32) -> Self {
473		Self { guard: Arc::new(Semaphore::new(max_subscriptions as usize)), max: max_subscriptions }
474	}
475
476	/// Attempts to acquire a subscription slot.
477	///
478	/// Fails if `max_subscriptions` have been exceeded.
479	pub fn acquire(&self) -> Option<SubscriptionPermit> {
480		Arc::clone(&self.guard).try_acquire_owned().ok()
481	}
482
483	/// Get the maximum number of permitted subscriptions.
484	pub const fn max(&self) -> u32 {
485		self.max
486	}
487}
488
489#[derive(Debug)]
490/// Helper struct to manage subscriptions.
491pub struct SubscriptionState<'a> {
492	/// Connection ID
493	pub conn_id: ConnectionId,
494	/// ID provider.
495	pub id_provider: &'a dyn IdProvider,
496	/// Subscription limit
497	pub subscription_permit: SubscriptionPermit,
498}
499
500pub(crate) fn sub_message_to_json(msg: SubscriptionMessage, sub_id: &SubscriptionId, method: &str) -> Box<RawValue> {
501	match msg.0 {
502		SubscriptionMessageInner::Complete(msg) => msg,
503		SubscriptionMessageInner::NeedsData(result) => serde_json::value::to_raw_value(&SubscriptionResponse::new(
504			method.into(),
505			SubscriptionPayload { subscription: sub_id.clone(), result },
506		))
507		.expect("Serialize infallible; qed"),
508	}
509}
510
511pub(crate) fn sub_err_to_json(error: SubscriptionError, sub_id: SubscriptionId, method: &str) -> Box<RawValue> {
512	serde_json::value::to_raw_value(&jsonrpsee_types::response::SubscriptionError::new(
513		method.into(),
514		SubscriptionPayloadError { subscription: sub_id, error },
515	))
516	.expect("Serialize infallible; qed")
517}