jsonrpsee_core/server/
subscription.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27//! Subscription related types and traits for server implementations.
28
29use super::helpers::MethodSink;
30use super::{MethodResponse, MethodsError, ResponsePayload};
31use crate::server::error::{DisconnectError, PendingSubscriptionAcceptError, SendTimeoutError, TrySendError};
32use crate::server::rpc_module::ConnectionId;
33use crate::server::LOG_TARGET;
34use crate::{error::StringError, traits::IdProvider};
35use jsonrpsee_types::SubscriptionPayload;
36use jsonrpsee_types::{response::SubscriptionError, ErrorObjectOwned, Id, SubscriptionId, SubscriptionResponse};
37use parking_lot::Mutex;
38use rustc_hash::FxHashMap;
39use serde::{de::DeserializeOwned, Serialize};
40use std::{sync::Arc, time::Duration};
41use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, Semaphore};
42
43/// Type-alias for subscribers.
44pub type Subscribers = Arc<Mutex<FxHashMap<SubscriptionKey, (MethodSink, mpsc::Receiver<()>)>>>;
45/// Subscription permit.
46pub type SubscriptionPermit = OwnedSemaphorePermit;
47
48/// Convert something into a subscription close notification
49/// before a subscription is terminated.
50pub trait IntoSubscriptionCloseResponse {
51	/// Convert something into a subscription response
52	fn into_response(self) -> SubscriptionCloseResponse;
53}
54
55/// Represents what action that will sent when a subscription callback returns.
56#[derive(Debug)]
57pub enum SubscriptionCloseResponse {
58	/// No further message will be sent.
59	None,
60	/// Send a subscription notification.
61	///
62	/// The subscription notification has the following format:
63	///
64	/// ```json
65	/// {
66	///  "jsonrpc": "2.0",
67	///  "method": "<method>",
68	///  "params": {
69	///    "subscription": "<subscriptionID>",
70	///    "result": <your msg>
71	///    }
72	///  }
73	/// }
74	/// ```
75	Notif(SubscriptionMessage),
76	/// Send a subscription error notification
77	///
78	/// The error notification has the following format:
79	///
80	/// ```json
81	/// {
82	///  "jsonrpc": "2.0",
83	///  "method": "<method>",
84	///  "params": {
85	///    "subscription": "<subscriptionID>",
86	///    "error": <your msg>
87	///    }
88	///  }
89	/// }
90	/// ```
91	NotifErr(SubscriptionMessage),
92}
93
94impl IntoSubscriptionCloseResponse for Result<(), StringError> {
95	fn into_response(self) -> SubscriptionCloseResponse {
96		match self {
97			Ok(()) => SubscriptionCloseResponse::None,
98			Err(e) => SubscriptionCloseResponse::NotifErr(e.0.into()),
99		}
100	}
101}
102
103impl IntoSubscriptionCloseResponse for () {
104	fn into_response(self) -> SubscriptionCloseResponse {
105		SubscriptionCloseResponse::None
106	}
107}
108
109impl IntoSubscriptionCloseResponse for SubscriptionCloseResponse {
110	fn into_response(self) -> Self {
111		self
112	}
113}
114
115/// A complete subscription message or partial subscription message.
116#[derive(Debug, Clone)]
117pub enum SubscriptionMessageInner {
118	/// Complete JSON message.
119	Complete(String),
120	/// Need subscription ID and method name.
121	NeedsData(String),
122}
123
124/// Subscription message.
125#[derive(Debug, Clone)]
126pub struct SubscriptionMessage(pub(crate) SubscriptionMessageInner);
127
128impl SubscriptionMessage {
129	/// Create a new subscription message from JSON.
130	///
131	/// Fails if the value couldn't be serialized.
132	pub fn from_json(t: &impl Serialize) -> Result<Self, serde_json::Error> {
133		serde_json::to_string(t).map(|json| SubscriptionMessage(SubscriptionMessageInner::NeedsData(json)))
134	}
135
136	/// Create a subscription message this is more efficient than [`SubscriptionMessage::from_json`]
137	/// because it only allocates once.
138	///
139	/// Fails if the json `result` couldn't be serialized.
140	pub fn new(method: &str, subscription: SubscriptionId, result: &impl Serialize) -> Result<Self, serde_json::Error> {
141		let json = serde_json::to_string(&SubscriptionResponse::new(
142			method.into(),
143			SubscriptionPayload { subscription, result },
144		))?;
145		Ok(Self::from_complete_message(json))
146	}
147
148	pub(crate) fn from_complete_message(msg: String) -> Self {
149		SubscriptionMessage(SubscriptionMessageInner::Complete(msg))
150	}
151
152	pub(crate) fn empty() -> Self {
153		Self::from_complete_message(String::new())
154	}
155}
156
157impl<T> From<T> for SubscriptionMessage
158where
159	T: AsRef<str>,
160{
161	fn from(s: T) -> Self {
162		// Add "<s.as_ref()>"
163		let json_str = {
164			let s = s.as_ref();
165			let mut res = String::with_capacity(s.len() + 2);
166			res.push('"');
167			res.push_str(s);
168			res.push('"');
169			res
170		};
171
172		SubscriptionMessage(SubscriptionMessageInner::NeedsData(json_str))
173	}
174}
175
176/// Represent a unique subscription entry based on [`SubscriptionId`] and [`ConnectionId`].
177#[derive(Clone, Debug, PartialEq, Eq, Hash)]
178pub struct SubscriptionKey {
179	pub(crate) conn_id: ConnectionId,
180	pub(crate) sub_id: SubscriptionId<'static>,
181}
182
183#[derive(Debug, Clone, Copy)]
184pub(crate) enum SubNotifResultOrError {
185	Result,
186	Error,
187}
188
189impl SubNotifResultOrError {
190	pub(crate) const fn as_str(&self) -> &str {
191		match self {
192			Self::Result => "result",
193			Self::Error => "error",
194		}
195	}
196}
197
198/// Represents a subscription until it is unsubscribed.
199///
200// NOTE: The reason why we use `mpsc` here is because it allows `IsUnsubscribed::unsubscribed`
201// to be &self instead of &mut self.
202#[derive(Debug, Clone)]
203pub struct IsUnsubscribed(mpsc::Sender<()>);
204
205impl IsUnsubscribed {
206	/// Returns true if the unsubscribe method has been invoked or the subscription has been canceled.
207	///
208	/// This can be called multiple times as the element in the channel is never
209	/// removed.
210	pub fn is_unsubscribed(&self) -> bool {
211		self.0.is_closed()
212	}
213
214	/// Wrapper over [`tokio::sync::mpsc::Sender::closed`]
215	///
216	/// # Cancel safety
217	///
218	/// This method is cancel safe. Once the channel is closed,
219	/// it stays closed forever and all future calls to closed will return immediately.
220	pub async fn unsubscribed(&self) {
221		self.0.closed().await;
222	}
223}
224
225/// Represents a single subscription that is waiting to be accepted or rejected.
226///
227/// If this is dropped without calling `PendingSubscription::reject` or `PendingSubscriptionSink::accept`
228/// a default error is sent out as response to the subscription call.
229///
230/// Thus, if you want a customized error message then `PendingSubscription::reject` must be called.
231#[derive(Debug)]
232#[must_use = "PendingSubscriptionSink does nothing unless `accept` or `reject` is called"]
233pub struct PendingSubscriptionSink {
234	/// Sink.
235	pub(crate) inner: MethodSink,
236	/// MethodCallback.
237	pub(crate) method: &'static str,
238	/// Shared Mutex of subscriptions for this method.
239	pub(crate) subscribers: Subscribers,
240	/// Unique subscription.
241	pub(crate) uniq_sub: SubscriptionKey,
242	/// ID of the `subscription call` (i.e. not the same as subscription id) which is used
243	/// to reply to subscription method call and must only be used once.
244	pub(crate) id: Id<'static>,
245	/// Sender to answer the subscribe call.
246	pub(crate) subscribe: oneshot::Sender<MethodResponse>,
247	/// Subscription permit.
248	pub(crate) permit: OwnedSemaphorePermit,
249}
250
251impl PendingSubscriptionSink {
252	/// Reject the subscription by responding to the subscription method call with
253	/// the error message from [`jsonrpsee_types::error::ErrorObject`].
254	///
255	/// # Note
256	///
257	/// If this is used in the async subscription callback
258	/// the return value is simply ignored because no further notification are propagated
259	/// once reject has been called.
260	pub async fn reject(self, err: impl Into<ErrorObjectOwned>) {
261		let err = MethodResponse::subscription_error(self.id, err.into());
262		_ = self.inner.send(err.to_result()).await;
263		_ = self.subscribe.send(err);
264	}
265
266	/// Attempt to accept the subscription and respond the subscription method call.
267	///
268	/// # Panics
269	///
270	/// Panics if the subscription response exceeded the `max_response_size`.
271	pub async fn accept(self) -> Result<SubscriptionSink, PendingSubscriptionAcceptError> {
272		let response = MethodResponse::subscription_response(
273			self.id,
274			ResponsePayload::success_borrowed(&self.uniq_sub.sub_id),
275			self.inner.max_response_size() as usize,
276		);
277		let success = response.is_success();
278
279		// TODO: #1052
280		//
281		// Ideally the message should be sent only once.
282		//
283		// The same message is sent twice here because one is sent directly to the transport layer and
284		// the other one is sent internally to accept the subscription.
285		self.inner.send(response.to_result()).await.map_err(|_| PendingSubscriptionAcceptError)?;
286		self.subscribe.send(response).map_err(|_| PendingSubscriptionAcceptError)?;
287
288		if success {
289			let (tx, rx) = mpsc::channel(1);
290			self.subscribers.lock().insert(self.uniq_sub.clone(), (self.inner.clone(), rx));
291			Ok(SubscriptionSink {
292				inner: self.inner,
293				method: self.method,
294				subscribers: self.subscribers,
295				uniq_sub: self.uniq_sub,
296				unsubscribe: IsUnsubscribed(tx),
297				_permit: Arc::new(self.permit),
298			})
299		} else {
300			panic!("The subscription response was too big; adjust the `max_response_size` or change Subscription ID generation");
301		}
302	}
303
304	/// Returns connection identifier, which was used to perform pending subscription request
305	pub fn connection_id(&self) -> ConnectionId {
306		self.uniq_sub.conn_id
307	}
308
309	/// Get the capacity of the channel.
310	pub fn capacity(&self) -> usize {
311		self.inner.capacity()
312	}
313
314	/// Get the max capacity of the channel.
315	pub fn max_capacity(&self) -> usize {
316		self.inner.max_capacity()
317	}
318
319	/// Get the method name.
320	pub fn method_name(&self) -> &str {
321		self.method
322	}
323}
324
325/// Represents a single subscription that hasn't been processed yet.
326#[derive(Debug, Clone)]
327pub struct SubscriptionSink {
328	/// Sink.
329	inner: MethodSink,
330	/// MethodCallback.
331	method: &'static str,
332	/// Shared Mutex of subscriptions for this method.
333	subscribers: Subscribers,
334	/// Unique subscription.
335	uniq_sub: SubscriptionKey,
336	/// A future to that fires once the unsubscribe method has been called.
337	unsubscribe: IsUnsubscribed,
338	/// Subscription permit
339	_permit: Arc<SubscriptionPermit>,
340}
341
342impl SubscriptionSink {
343	/// Get the subscription ID.
344	pub fn subscription_id(&self) -> SubscriptionId<'static> {
345		self.uniq_sub.sub_id.clone()
346	}
347
348	/// Get the method name.
349	pub fn method_name(&self) -> &str {
350		self.method
351	}
352
353	/// Get the connection ID.
354	pub fn connection_id(&self) -> ConnectionId {
355		self.uniq_sub.conn_id
356	}
357
358	/// Send out a response on the subscription and wait until there is capacity.
359	///
360	///
361	/// Returns
362	/// - `Ok(())` if the message could be sent.
363	/// - `Err(unsent_msg)` if the connection or subscription was closed.
364	///
365	/// # Cancel safety
366	///
367	/// This method is cancel-safe and dropping a future loses its spot in the waiting queue.
368	pub async fn send(&self, msg: SubscriptionMessage) -> Result<(), DisconnectError> {
369		// Only possible to trigger when the connection is dropped.
370		if self.is_closed() {
371			return Err(DisconnectError(msg));
372		}
373
374		let json = sub_message_to_json(msg, SubNotifResultOrError::Result, &self.uniq_sub.sub_id, self.method);
375		self.inner.send(json).await.map_err(Into::into)
376	}
377
378	/// Similar to `SubscriptionSink::send` but only waits for a limited time.
379	pub async fn send_timeout(&self, msg: SubscriptionMessage, timeout: Duration) -> Result<(), SendTimeoutError> {
380		// Only possible to trigger when the connection is dropped.
381		if self.is_closed() {
382			return Err(SendTimeoutError::Closed(msg));
383		}
384
385		let json = sub_message_to_json(msg, SubNotifResultOrError::Result, &self.uniq_sub.sub_id, self.method);
386		self.inner.send_timeout(json, timeout).await.map_err(Into::into)
387	}
388
389	/// Attempts to immediately send out the message as JSON string to the subscribers but fails if the
390	/// channel is full or the connection/subscription is closed
391	///
392	///
393	/// This differs from [`SubscriptionSink::send`] where it will until there is capacity
394	/// in the channel.
395	pub fn try_send(&mut self, msg: SubscriptionMessage) -> Result<(), TrySendError> {
396		// Only possible to trigger when the connection is dropped.
397		if self.is_closed() {
398			return Err(TrySendError::Closed(msg));
399		}
400
401		let json = sub_message_to_json(msg, SubNotifResultOrError::Result, &self.uniq_sub.sub_id, self.method);
402		self.inner.try_send(json).map_err(Into::into)
403	}
404
405	/// Returns whether the subscription is closed.
406	pub fn is_closed(&self) -> bool {
407		self.inner.is_closed() || !self.is_active_subscription()
408	}
409
410	/// Completes when the subscription has been closed.
411	pub async fn closed(&self) {
412		// Both are cancel-safe thus ok to use select here.
413		tokio::select! {
414			_ = self.inner.closed() => (),
415			_ = self.unsubscribe.unsubscribed() => (),
416		}
417	}
418
419	/// Get the capacity of the subscription.
420	pub fn capacity(&self) -> usize {
421		self.inner.capacity()
422	}
423
424	/// Get the max capacity of the subscription.
425	pub fn max_capacity(&self) -> usize {
426		self.inner.max_capacity()
427	}
428
429	fn is_active_subscription(&self) -> bool {
430		!self.unsubscribe.is_unsubscribed()
431	}
432}
433
434impl Drop for SubscriptionSink {
435	fn drop(&mut self) {
436		if self.is_active_subscription() {
437			self.subscribers.lock().remove(&self.uniq_sub);
438		}
439	}
440}
441
442/// Wrapper struct that maintains a subscription "mainly" for testing.
443#[derive(Debug)]
444pub struct Subscription {
445	pub(crate) rx: mpsc::Receiver<String>,
446	pub(crate) sub_id: SubscriptionId<'static>,
447}
448
449impl Subscription {
450	/// Close the subscription channel.
451	pub fn close(&mut self) {
452		tracing::trace!(target: LOG_TARGET, "[Subscription::close] Notifying");
453		self.rx.close();
454	}
455
456	/// Get the subscription ID
457	pub fn subscription_id(&self) -> &SubscriptionId {
458		&self.sub_id
459	}
460
461	/// Receives the next value on the subscription if the value could be decoded as T.
462	pub async fn next<T: DeserializeOwned>(&mut self) -> Option<Result<(T, SubscriptionId<'static>), MethodsError>> {
463		let raw = self.rx.recv().await?;
464
465		tracing::debug!(target: LOG_TARGET, "[Subscription::next]: rx {}", raw);
466
467		// clippy complains about this but it doesn't compile without the extra res binding.
468		#[allow(clippy::let_and_return)]
469		let res = match serde_json::from_str::<SubscriptionResponse<T>>(&raw) {
470			Ok(r) => Some(Ok((r.params.result, r.params.subscription.into_owned()))),
471			Err(e) => match serde_json::from_str::<SubscriptionError<serde_json::Value>>(&raw) {
472				Ok(_) => None,
473				Err(_) => Some(Err(e.into())),
474			},
475		};
476		res
477	}
478}
479
480impl Drop for Subscription {
481	fn drop(&mut self) {
482		self.close();
483	}
484}
485
486/// This wraps [`tokio::sync::Semaphore`] and is used to limit the number of subscriptions per connection.
487#[derive(Debug, Clone)]
488pub struct BoundedSubscriptions {
489	guard: Arc<Semaphore>,
490	max: u32,
491}
492
493impl BoundedSubscriptions {
494	/// Create a new bounded subscription.
495	pub fn new(max_subscriptions: u32) -> Self {
496		Self { guard: Arc::new(Semaphore::new(max_subscriptions as usize)), max: max_subscriptions }
497	}
498
499	/// Attempts to acquire a subscription slot.
500	///
501	/// Fails if `max_subscriptions` have been exceeded.
502	pub fn acquire(&self) -> Option<SubscriptionPermit> {
503		Arc::clone(&self.guard).try_acquire_owned().ok()
504	}
505
506	/// Get the maximum number of permitted subscriptions.
507	pub const fn max(&self) -> u32 {
508		self.max
509	}
510}
511
512#[derive(Debug)]
513/// Helper struct to manage subscriptions.
514pub struct SubscriptionState<'a> {
515	/// Connection ID
516	pub conn_id: ConnectionId,
517	/// ID provider.
518	pub id_provider: &'a dyn IdProvider,
519	/// Subscription limit
520	pub subscription_permit: SubscriptionPermit,
521}
522
523pub(crate) fn sub_message_to_json(
524	msg: SubscriptionMessage,
525	result_or_err: SubNotifResultOrError,
526	sub_id: &SubscriptionId,
527	method: &str,
528) -> String {
529	let result_or_err = result_or_err.as_str();
530
531	match msg.0 {
532		SubscriptionMessageInner::Complete(msg) => msg,
533		SubscriptionMessageInner::NeedsData(result) => {
534			let sub_id = serde_json::to_string(&sub_id).expect("valid JSON; qed");
535			format!(
536				r#"{{"jsonrpc":"2.0","method":"{method}","params":{{"subscription":{sub_id},"{result_or_err}":{result}}}}}"#,
537			)
538		}
539	}
540}