pub mod layer;
use std::borrow::Cow;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_util::future::Either;
use jsonrpsee_types::{ErrorObject, Id};
use pin_project::pin_project;
use serde::Serialize;
use serde::ser::SerializeSeq;
use serde_json::value::RawValue;
use tower::layer::LayerFn;
use tower::layer::util::{Identity, Stack};
pub type Notification<'a> = jsonrpsee_types::Notification<'a, Option<Cow<'a, RawValue>>>;
pub use jsonrpsee_types::{Extensions, Request};
#[derive(Debug)]
pub struct BatchEntryErr<'a>(jsonrpsee_types::Response<'a, ()>);
impl<'a> BatchEntryErr<'a> {
pub fn new(id: Id<'a>, err: ErrorObject<'a>) -> Self {
let payload = jsonrpsee_types::ResponsePayload::Error(err);
let response = jsonrpsee_types::Response::new(payload, id);
Self(response)
}
pub fn into_parts(self) -> (ErrorObject<'a>, Id<'a>) {
let err = match self.0.payload {
jsonrpsee_types::ResponsePayload::Error(err) => err,
_ => unreachable!("BatchEntryErr can only be created from error payload; qed"),
};
(err, self.0.id)
}
}
#[derive(Debug, Default)]
pub struct Batch<'a> {
inner: Vec<Result<BatchEntry<'a>, BatchEntryErr<'a>>>,
extensions: Option<Extensions>,
}
impl std::fmt::Display for Batch<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let fmt = serde_json::to_string(self).map_err(|_| std::fmt::Error)?;
f.write_str(&fmt)
}
}
impl<'a> IntoIterator for Batch<'a> {
type Item = Result<BatchEntry<'a>, BatchEntryErr<'a>>;
type IntoIter = std::vec::IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.inner.into_iter()
}
}
impl<'a> Batch<'a> {
pub fn from(entries: Vec<Result<BatchEntry<'a>, BatchEntryErr<'a>>>) -> Self {
Self { inner: entries, extensions: None }
}
pub fn new() -> Self {
Self { inner: Vec::new(), extensions: None }
}
pub fn with_capacity(capacity: usize) -> Self {
Self { inner: Vec::with_capacity(capacity), extensions: None }
}
pub fn push(&mut self, req: Request<'a>) {
match self.extensions {
Some(ref mut ext) => {
ext.extend(req.extensions().clone());
}
None => {
self.extensions = Some(req.extensions().clone());
}
};
self.inner.push(Ok(BatchEntry::Call(req)));
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub fn iter(&self) -> impl Iterator<Item = &Result<BatchEntry<'a>, BatchEntryErr<'a>>> {
self.inner.iter()
}
pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut Result<BatchEntry<'a>, BatchEntryErr<'a>>> {
self.inner.iter_mut()
}
pub fn into_extensions(self) -> Extensions {
match self.extensions {
Some(ext) => ext,
None => self.extensions_from_iter(),
}
}
pub fn extensions(&mut self) -> &Extensions {
if self.extensions.is_none() {
self.extensions = Some(self.extensions_from_iter());
}
self.extensions.as_ref().expect("Extensions inserted above; qed")
}
pub fn extensions_mut(&mut self) -> &mut Extensions {
if self.extensions.is_none() {
self.extensions = Some(self.extensions_from_iter());
}
self.extensions.as_mut().expect("Extensions inserted above; qed")
}
fn extensions_from_iter(&self) -> Extensions {
let mut ext = Extensions::new();
for entry in self.inner.iter().flatten() {
ext.extend(entry.extensions().clone());
}
ext
}
}
impl Serialize for Batch<'_> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut seq = serializer.serialize_seq(Some(self.inner.len()))?;
for entry in &self.inner {
match entry {
Ok(entry) => seq.serialize_element(entry)?,
Err(err) => seq.serialize_element(&err.0)?,
}
}
seq.end()
}
}
#[derive(Debug, Clone)]
pub struct IsSubscription {
sub_id: Id<'static>,
unsub_id: Id<'static>,
unsub_method: String,
}
impl IsSubscription {
pub fn new(sub_id: Id<'static>, unsub_id: Id<'static>, unsub_method: String) -> Self {
Self { sub_id, unsub_id, unsub_method }
}
pub fn sub_req_id(&self) -> Id<'static> {
self.sub_id.clone()
}
pub fn unsub_req_id(&self) -> Id<'static> {
self.unsub_id.clone()
}
pub fn unsubscribe_method(&self) -> &str {
&self.unsub_method
}
}
#[derive(Debug, Clone)]
pub struct IsBatch {
pub id_range: std::ops::Range<u64>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(untagged)]
pub enum BatchEntry<'a> {
Call(Request<'a>),
Notification(Notification<'a>),
}
impl<'a> BatchEntry<'a> {
pub fn extensions(&self) -> &Extensions {
match self {
BatchEntry::Call(req) => req.extensions(),
BatchEntry::Notification(n) => n.extensions(),
}
}
pub fn extensions_mut(&mut self) -> &mut Extensions {
match self {
BatchEntry::Call(req) => req.extensions_mut(),
BatchEntry::Notification(n) => n.extensions_mut(),
}
}
pub fn method_name(&self) -> &str {
match self {
BatchEntry::Call(req) => req.method_name(),
BatchEntry::Notification(n) => n.method_name(),
}
}
pub fn set_method_name(&mut self, name: impl Into<Cow<'a, str>>) {
match self {
BatchEntry::Call(req) => {
req.method = name.into();
}
BatchEntry::Notification(n) => {
n.method = name.into();
}
}
}
pub fn params(&self) -> Option<&Cow<'a, RawValue>> {
match self {
BatchEntry::Call(req) => req.params.as_ref(),
BatchEntry::Notification(n) => n.params.as_ref(),
}
}
pub fn set_params(&mut self, params: Option<Box<RawValue>>) {
match self {
BatchEntry::Call(req) => {
req.params = params.map(Cow::Owned);
}
BatchEntry::Notification(n) => {
n.params = params.map(Cow::Owned);
}
}
}
pub fn into_extensions(self) -> Extensions {
match self {
BatchEntry::Call(req) => req.extensions,
BatchEntry::Notification(n) => n.extensions,
}
}
}
pub trait RpcServiceT {
type MethodResponse;
type NotificationResponse;
type BatchResponse;
fn call<'a>(&self, request: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a;
fn batch<'a>(&self, requests: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a;
fn notification<'a>(&self, n: Notification<'a>) -> impl Future<Output = Self::NotificationResponse> + Send + 'a;
}
#[derive(Debug, Clone)]
pub struct RpcServiceBuilder<L>(tower::ServiceBuilder<L>);
impl Default for RpcServiceBuilder<Identity> {
fn default() -> Self {
RpcServiceBuilder(tower::ServiceBuilder::new())
}
}
impl RpcServiceBuilder<Identity> {
pub fn new() -> Self {
Self(tower::ServiceBuilder::new())
}
}
impl<L> RpcServiceBuilder<L> {
pub fn option_layer<T>(self, layer: Option<T>) -> RpcServiceBuilder<Stack<layer::Either<T, Identity>, L>> {
let layer =
if let Some(layer) = layer { layer::Either::Left(layer) } else { layer::Either::Right(Identity::new()) };
RpcServiceBuilder(self.0.layer(layer))
}
pub fn layer<T>(self, layer: T) -> RpcServiceBuilder<Stack<T, L>> {
RpcServiceBuilder(self.0.layer(layer))
}
pub fn layer_fn<F>(self, f: F) -> RpcServiceBuilder<Stack<LayerFn<F>, L>> {
RpcServiceBuilder(self.0.layer_fn(f))
}
pub fn rpc_logger(self, max_log_len: u32) -> RpcServiceBuilder<Stack<layer::RpcLoggerLayer, L>> {
RpcServiceBuilder(self.0.layer(layer::RpcLoggerLayer::new(max_log_len)))
}
pub fn service<S>(&self, service: S) -> L::Service
where
L: tower::Layer<S>,
{
self.0.service(service)
}
}
#[derive(Debug)]
#[pin_project]
pub struct ResponseFuture<F, R>(#[pin] futures_util::future::Either<F, std::future::Ready<R>>);
impl<F, R> ResponseFuture<F, R> {
pub fn future(f: F) -> ResponseFuture<F, R> {
ResponseFuture(Either::Left(f))
}
pub fn ready(response: R) -> ResponseFuture<F, R> {
ResponseFuture(Either::Right(std::future::ready(response)))
}
}
impl<F, R> Future for ResponseFuture<F, R>
where
F: Future<Output = R>,
{
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project().0.poll(cx) {
Poll::Ready(rp) => Poll::Ready(rp),
Poll::Pending => Poll::Pending,
}
}
}
#[cfg(test)]
mod tests {
use jsonrpsee_types::{ErrorCode, ErrorObject};
#[test]
fn serialize_batch_entry() {
use super::{BatchEntry, Notification, Request};
use jsonrpsee_types::Id;
let req = Request::borrowed("say_hello", None, Id::Number(1));
let batch_entry = BatchEntry::Call(req.clone());
assert_eq!(
serde_json::to_string(&batch_entry).unwrap(),
"{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"say_hello\"}",
);
let notification = Notification::new("say_hello".into(), None);
let batch_entry = BatchEntry::Notification(notification.clone());
assert_eq!(
serde_json::to_string(&batch_entry).unwrap(),
"{\"jsonrpc\":\"2.0\",\"method\":\"say_hello\",\"params\":null}",
);
}
#[test]
fn serialize_batch_works() {
use super::{Batch, BatchEntry, BatchEntryErr, Notification, Request};
use jsonrpsee_types::Id;
let req = Request::borrowed("say_hello", None, Id::Number(1));
let notification = Notification::new("say_hello".into(), None);
let batch = Batch::from(vec![
Ok(BatchEntry::Call(req)),
Ok(BatchEntry::Notification(notification)),
Err(BatchEntryErr::new(Id::Number(2), ErrorObject::from(ErrorCode::InvalidRequest))),
]);
assert_eq!(
serde_json::to_string(&batch).unwrap(),
r#"[{"jsonrpc":"2.0","id":1,"method":"say_hello"},{"jsonrpc":"2.0","method":"say_hello","params":null},{"jsonrpc":"2.0","id":2,"error":{"code":-32600,"message":"Invalid request"}}]"#,
);
}
}