jsonrpc-pubsub 18.0.0

Publish-Subscribe extension for jsonrpc.
Documentation
use crate::core;
use crate::core::futures::Future;

use crate::subscription::{new_subscription, Subscriber};
use crate::types::{PubSubMetadata, SubscriptionId};

/// Subscribe handler
pub trait SubscribeRpcMethod<M: PubSubMetadata>: Send + Sync + 'static {
	/// Called when client is requesting new subscription to be started.
	fn call(&self, params: core::Params, meta: M, subscriber: Subscriber);
}

impl<M, F> SubscribeRpcMethod<M> for F
where
	F: Fn(core::Params, M, Subscriber) + Send + Sync + 'static,
	M: PubSubMetadata,
{
	fn call(&self, params: core::Params, meta: M, subscriber: Subscriber) {
		(*self)(params, meta, subscriber)
	}
}

/// Unsubscribe handler
pub trait UnsubscribeRpcMethod<M>: Send + Sync + 'static {
	/// Output type
	type Out: Future<Output = core::Result<core::Value>> + Send + 'static;
	/// Called when client is requesting to cancel existing subscription.
	///
	/// Metadata is not available if the session was closed without unsubscribing.
	fn call(&self, id: SubscriptionId, meta: Option<M>) -> Self::Out;
}

impl<M, F, I> UnsubscribeRpcMethod<M> for F
where
	F: Fn(SubscriptionId, Option<M>) -> I + Send + Sync + 'static,
	I: Future<Output = core::Result<core::Value>> + Send + 'static,
{
	type Out = I;
	fn call(&self, id: SubscriptionId, meta: Option<M>) -> Self::Out {
		(*self)(id, meta)
	}
}

/// Publish-Subscribe extension of `IoHandler`.
pub struct PubSubHandler<T: PubSubMetadata, S: core::Middleware<T> = core::middleware::Noop> {
	handler: core::MetaIoHandler<T, S>,
}

impl<T: PubSubMetadata> Default for PubSubHandler<T> {
	fn default() -> Self {
		PubSubHandler {
			handler: Default::default(),
		}
	}
}

impl<T: PubSubMetadata, S: core::Middleware<T>> PubSubHandler<T, S> {
	/// Creates new `PubSubHandler`
	pub fn new(handler: core::MetaIoHandler<T, S>) -> Self {
		PubSubHandler { handler }
	}

	/// Adds new subscription.
	pub fn add_subscription<F, G>(&mut self, notification: &str, subscribe: (&str, F), unsubscribe: (&str, G))
	where
		F: SubscribeRpcMethod<T>,
		G: UnsubscribeRpcMethod<T>,
	{
		let (sub, unsub) = new_subscription(notification, subscribe.1, unsubscribe.1);
		self.handler.add_method_with_meta(subscribe.0, sub);
		self.handler.add_method_with_meta(unsubscribe.0, unsub);
	}
}

impl<T: PubSubMetadata, S: core::Middleware<T>> ::std::ops::Deref for PubSubHandler<T, S> {
	type Target = core::MetaIoHandler<T, S>;

	fn deref(&self) -> &Self::Target {
		&self.handler
	}
}

impl<T: PubSubMetadata, S: core::Middleware<T>> ::std::ops::DerefMut for PubSubHandler<T, S> {
	fn deref_mut(&mut self) -> &mut Self::Target {
		&mut self.handler
	}
}

impl<T: PubSubMetadata, S: core::Middleware<T>> Into<core::MetaIoHandler<T, S>> for PubSubHandler<T, S> {
	fn into(self) -> core::MetaIoHandler<T, S> {
		self.handler
	}
}

#[cfg(test)]
mod tests {
	use std::sync::atomic::{AtomicBool, Ordering};
	use std::sync::Arc;

	use crate::core;
	use crate::core::futures::channel::mpsc;
	use crate::core::futures::future;
	use crate::subscription::{Session, Subscriber};
	use crate::types::{PubSubMetadata, SubscriptionId};

	use super::PubSubHandler;

	#[derive(Clone)]
	struct Metadata(Arc<Session>);
	impl core::Metadata for Metadata {}
	impl PubSubMetadata for Metadata {
		fn session(&self) -> Option<Arc<Session>> {
			Some(self.0.clone())
		}
	}

	#[test]
	fn should_handle_subscription() {
		// given
		let mut handler = PubSubHandler::default();
		let called = Arc::new(AtomicBool::new(false));
		let called2 = called.clone();
		handler.add_subscription(
			"hello",
			("subscribe_hello", |params, _meta, subscriber: Subscriber| {
				assert_eq!(params, core::Params::None);
				let _sink = subscriber.assign_id(SubscriptionId::Number(5));
			}),
			("unsubscribe_hello", move |id, _meta| {
				// Should be called because session is dropped.
				called2.store(true, Ordering::SeqCst);
				assert_eq!(id, SubscriptionId::Number(5));
				future::ok(core::Value::Bool(true))
			}),
		);

		// when
		let (tx, _rx) = mpsc::unbounded();
		let meta = Metadata(Arc::new(Session::new(tx)));
		let req = r#"{"jsonrpc":"2.0","id":1,"method":"subscribe_hello","params":null}"#;
		let res = handler.handle_request_sync(req, meta);

		// then
		let response = r#"{"jsonrpc":"2.0","result":5,"id":1}"#;
		assert_eq!(res, Some(response.into()));
		assert_eq!(called.load(Ordering::SeqCst), true);
	}
}